Browse Source

Merge #703

703: Explicitly drop database connection acquired during application setup r=klochowicz a=klochowicz

The dirty shutdown we observed were caused by the sqlite pool patiently waiting
until we close the connection we've opened in the application setup.
This is turn was eventually triggering Rocket timeout (which is by default
configured to wait 5s for cleanup).

When loading cfds gets moved into the projection actor, the connection can be
moved to a tighter scope (only for the housekeeping functions).

Resolves #477 

Co-authored-by: Mariusz Klochowicz <mariusz@klochowicz.com>
debug-collab-settlement
bors[bot] 3 years ago
committed by GitHub
parent
commit
e74ccf965d
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      daemon/src/housekeeping.rs
  2. 12
      daemon/src/maker.rs
  3. 12
      daemon/src/taker.rs

15
daemon/src/housekeeping.rs

@ -3,10 +3,19 @@ use crate::model::cfd::{Cfd, CfdState};
use crate::{try_continue, wallet}; use crate::{try_continue, wallet};
use anyhow::Result; use anyhow::Result;
use sqlx::pool::PoolConnection; use sqlx::pool::PoolConnection;
use sqlx::Sqlite; use sqlx::{Sqlite, SqlitePool};
use xtra::Address; use xtra::Address;
pub async fn transition_non_continue_cfds_to_setup_failed( /// Perform necessary housekeeping before actor system startup
pub async fn new(db: &SqlitePool, wallet: &Address<wallet::Actor>) -> Result<()> {
let mut conn = db.acquire().await?;
transition_non_continue_cfds_to_setup_failed(&mut conn).await?;
rebroadcast_transactions(&mut conn, wallet).await?;
Ok(())
}
async fn transition_non_continue_cfds_to_setup_failed(
conn: &mut PoolConnection<Sqlite>, conn: &mut PoolConnection<Sqlite>,
) -> Result<()> { ) -> Result<()> {
let mut cfds = load_all_cfds(conn).await?; let mut cfds = load_all_cfds(conn).await?;
@ -23,7 +32,7 @@ pub async fn transition_non_continue_cfds_to_setup_failed(
Ok(()) Ok(())
} }
pub async fn rebroadcast_transactions( async fn rebroadcast_transactions(
conn: &mut PoolConnection<Sqlite>, conn: &mut PoolConnection<Sqlite>,
wallet: &Address<wallet::Actor>, wallet: &Address<wallet::Actor>,
) -> Result<()> { ) -> Result<()> {

12
daemon/src/maker.rs

@ -235,10 +235,7 @@ async fn main() -> Result<()> {
.context("Db migrations failed")?; .context("Db migrations failed")?;
// Create actors // Create actors
let mut conn = db.acquire().await?; housekeeping::new(&db, &wallet).await?;
housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn).await?;
housekeeping::rebroadcast_transactions(&mut conn, &wallet).await?;
let (projection_actor, projection_context) = xtra::Context::new(None); let (projection_actor, projection_context) = xtra::Context::new(None);
@ -275,7 +272,12 @@ async fn main() -> Result<()> {
let (task, init_quote) = bitmex_price_feed::new(projection_actor).await?; let (task, init_quote) = bitmex_price_feed::new(projection_actor).await?;
tasks.add(task); tasks.add(task);
let cfds = load_all_cfds(&mut conn).await?; // TODO: Move to projection actor
let cfds = {
let mut conn = db.acquire().await?;
load_all_cfds(&mut conn).await?
};
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None); let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (update_cfd_feed_sender, update_cfd_feed_receiver) = let (update_cfd_feed_sender, update_cfd_feed_receiver) =

12
daemon/src/taker.rs

@ -225,10 +225,7 @@ async fn main() -> Result<()> {
.context("Db migrations failed")?; .context("Db migrations failed")?;
// Create actors // Create actors
let mut conn = db.acquire().await?; housekeeping::new(&db, &wallet).await?;
housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn).await?;
housekeeping::rebroadcast_transactions(&mut conn, &wallet).await?;
let (projection_actor, projection_context) = xtra::Context::new(None); let (projection_actor, projection_context) = xtra::Context::new(None);
@ -257,7 +254,12 @@ async fn main() -> Result<()> {
let (task, init_quote) = bitmex_price_feed::new(projection_actor).await?; let (task, init_quote) = bitmex_price_feed::new(projection_actor).await?;
tasks.add(task); tasks.add(task);
let cfds = load_all_cfds(&mut conn).await?; // TODO: Move to projection actor
let cfds = {
let mut conn = db.acquire().await?;
load_all_cfds(&mut conn).await?
};
let (cfd_feed_sender, cfd_feed_receiver) = channel(cfds.clone()); let (cfd_feed_sender, cfd_feed_receiver) = channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = channel::<Option<Order>>(None); let (order_feed_sender, order_feed_receiver) = channel::<Option<Order>>(None);
let (update_cfd_feed_sender, update_cfd_feed_receiver) = let (update_cfd_feed_sender, update_cfd_feed_receiver) =

Loading…
Cancel
Save