From 57f08febb52eb43a7d7ef9f8fe75f90ad29380b8 Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Thu, 25 Nov 2021 09:55:03 +1030 Subject: [PATCH] Limit scope of the database connections acquired during application setup The dirty shutdown we observed were caused by the sqlite pool patiently waiting until we close the connection we've opened in the application setup. This is turn was eventually triggering Rocket timeout (which is by default configured to wait 5s for cleanup). Tighten the scope of connections needed during application startup only to the necessary minimum (housekeeping & loading cfds at startup) --- daemon/src/housekeeping.rs | 15 ++++++++++++--- daemon/src/maker.rs | 12 +++++++----- daemon/src/taker.rs | 12 +++++++----- 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/daemon/src/housekeeping.rs b/daemon/src/housekeeping.rs index ef20c69..ace8df9 100644 --- a/daemon/src/housekeeping.rs +++ b/daemon/src/housekeeping.rs @@ -3,10 +3,19 @@ use crate::model::cfd::{Cfd, CfdState}; use crate::{try_continue, wallet}; use anyhow::Result; use sqlx::pool::PoolConnection; -use sqlx::Sqlite; +use sqlx::{Sqlite, SqlitePool}; use xtra::Address; -pub async fn transition_non_continue_cfds_to_setup_failed( +/// Perform necessary housekeeping before actor system startup +pub async fn new(db: &SqlitePool, wallet: &Address) -> Result<()> { + let mut conn = db.acquire().await?; + + transition_non_continue_cfds_to_setup_failed(&mut conn).await?; + rebroadcast_transactions(&mut conn, wallet).await?; + Ok(()) +} + +async fn transition_non_continue_cfds_to_setup_failed( conn: &mut PoolConnection, ) -> Result<()> { let mut cfds = load_all_cfds(conn).await?; @@ -23,7 +32,7 @@ pub async fn transition_non_continue_cfds_to_setup_failed( Ok(()) } -pub async fn rebroadcast_transactions( +async fn rebroadcast_transactions( conn: &mut PoolConnection, wallet: &Address, ) -> Result<()> { diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index bdacf4a..0b74080 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -235,10 +235,7 @@ async fn main() -> Result<()> { .context("Db migrations failed")?; // Create actors - let mut conn = db.acquire().await?; - - housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn).await?; - housekeeping::rebroadcast_transactions(&mut conn, &wallet).await?; + housekeeping::new(&db, &wallet).await?; let (projection_actor, projection_context) = xtra::Context::new(None); @@ -275,7 +272,12 @@ async fn main() -> Result<()> { let (task, init_quote) = bitmex_price_feed::new(projection_actor).await?; tasks.add(task); - let cfds = load_all_cfds(&mut conn).await?; + // TODO: Move to projection actor + let cfds = { + let mut conn = db.acquire().await?; + + load_all_cfds(&mut conn).await? + }; let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); let (update_cfd_feed_sender, update_cfd_feed_receiver) = diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index b070b84..ddc5872 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -225,10 +225,7 @@ async fn main() -> Result<()> { .context("Db migrations failed")?; // Create actors - let mut conn = db.acquire().await?; - - housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn).await?; - housekeeping::rebroadcast_transactions(&mut conn, &wallet).await?; + housekeeping::new(&db, &wallet).await?; let (projection_actor, projection_context) = xtra::Context::new(None); @@ -257,7 +254,12 @@ async fn main() -> Result<()> { let (task, init_quote) = bitmex_price_feed::new(projection_actor).await?; tasks.add(task); - let cfds = load_all_cfds(&mut conn).await?; + // TODO: Move to projection actor + let cfds = { + let mut conn = db.acquire().await?; + + load_all_cfds(&mut conn).await? + }; let (cfd_feed_sender, cfd_feed_receiver) = channel(cfds.clone()); let (order_feed_sender, order_feed_receiver) = channel::>(None); let (update_cfd_feed_sender, update_cfd_feed_receiver) =