Browse Source
Limit scope of the database connections acquired during application setup
The dirty shutdown we observed were caused by the sqlite pool patiently waiting
until we close the connection we've opened in the application setup.
This is turn was eventually triggering Rocket timeout (which is by default
configured to wait 5s for cleanup).
Tighten the scope of connections needed during application startup only to the
necessary minimum (housekeeping & loading cfds at startup)
debug-collab-settlement
Mariusz Klochowicz
3 years ago
No known key found for this signature in database
GPG Key ID: 470C865699C8D4D
3 changed files with
26 additions and
13 deletions
daemon/src/housekeeping.rs
daemon/src/maker.rs
daemon/src/taker.rs
@ -3,10 +3,19 @@ use crate::model::cfd::{Cfd, CfdState};
use crate ::{ try_continue , wallet } ;
use anyhow ::Result ;
use sqlx ::pool ::PoolConnection ;
use sqlx ::Sqlite ;
use sqlx ::{ Sqlite , SqlitePool } ;
use xtra ::Address ;
pub async fn transition_non_continue_cfds_to_setup_failed (
/// Perform necessary housekeeping before actor system startup
pub async fn new ( db : & SqlitePool , wallet : & Address < wallet ::Actor > ) -> Result < ( ) > {
let mut conn = db . acquire ( ) . await ? ;
transition_non_continue_cfds_to_setup_failed ( & mut conn ) . await ? ;
rebroadcast_transactions ( & mut conn , wallet ) . await ? ;
Ok ( ( ) )
}
async fn transition_non_continue_cfds_to_setup_failed (
conn : & mut PoolConnection < Sqlite > ,
) -> Result < ( ) > {
let mut cfds = load_all_cfds ( conn ) . await ? ;
@ -23,7 +32,7 @@ pub async fn transition_non_continue_cfds_to_setup_failed(
Ok ( ( ) )
}
pub async fn rebroadcast_transactions (
async fn rebroadcast_transactions (
conn : & mut PoolConnection < Sqlite > ,
wallet : & Address < wallet ::Actor > ,
) -> Result < ( ) > {
@ -235,10 +235,7 @@ async fn main() -> Result<()> {
. context ( "Db migrations failed" ) ? ;
// Create actors
let mut conn = db . acquire ( ) . await ? ;
housekeeping ::transition_non_continue_cfds_to_setup_failed ( & mut conn ) . await ? ;
housekeeping ::rebroadcast_transactions ( & mut conn , & wallet ) . await ? ;
housekeeping ::new ( & db , & wallet ) . await ? ;
let ( projection_actor , projection_context ) = xtra ::Context ::new ( None ) ;
@ -275,7 +272,12 @@ async fn main() -> Result<()> {
let ( task , init_quote ) = bitmex_price_feed ::new ( projection_actor ) . await ? ;
tasks . add ( task ) ;
let cfds = load_all_cfds ( & mut conn ) . await ? ;
// TODO: Move to projection actor
let cfds = {
let mut conn = db . acquire ( ) . await ? ;
load_all_cfds ( & mut conn ) . await ?
} ;
let ( cfd_feed_sender , cfd_feed_receiver ) = watch ::channel ( cfds . clone ( ) ) ;
let ( order_feed_sender , order_feed_receiver ) = watch ::channel ::< Option < Order > > ( None ) ;
let ( update_cfd_feed_sender , update_cfd_feed_receiver ) =
@ -225,10 +225,7 @@ async fn main() -> Result<()> {
. context ( "Db migrations failed" ) ? ;
// Create actors
let mut conn = db . acquire ( ) . await ? ;
housekeeping ::transition_non_continue_cfds_to_setup_failed ( & mut conn ) . await ? ;
housekeeping ::rebroadcast_transactions ( & mut conn , & wallet ) . await ? ;
housekeeping ::new ( & db , & wallet ) . await ? ;
let ( projection_actor , projection_context ) = xtra ::Context ::new ( None ) ;
@ -257,7 +254,12 @@ async fn main() -> Result<()> {
let ( task , init_quote ) = bitmex_price_feed ::new ( projection_actor ) . await ? ;
tasks . add ( task ) ;
let cfds = load_all_cfds ( & mut conn ) . await ? ;
// TODO: Move to projection actor
let cfds = {
let mut conn = db . acquire ( ) . await ? ;
load_all_cfds ( & mut conn ) . await ?
} ;
let ( cfd_feed_sender , cfd_feed_receiver ) = channel ( cfds . clone ( ) ) ;
let ( order_feed_sender , order_feed_receiver ) = channel ::< Option < Order > > ( None ) ;
let ( update_cfd_feed_sender , update_cfd_feed_receiver ) =