diff --git a/daemon/src/cleanup.rs b/daemon/src/cleanup.rs index 6ae528c..cd2d7a4 100644 --- a/daemon/src/cleanup.rs +++ b/daemon/src/cleanup.rs @@ -1,13 +1,14 @@ use crate::db::{insert_new_cfd_state_by_order_id, load_all_cfds}; use crate::model::cfd::{Cfd, CfdState, CfdStateCommon}; use anyhow::Result; -use sqlx::SqlitePool; +use sqlx::pool::PoolConnection; +use sqlx::Sqlite; use std::time::SystemTime; -pub async fn transition_non_continue_cfds_to_setup_failed(db: SqlitePool) -> Result<()> { - let mut conn = db.acquire().await?; - - let cfds = load_all_cfds(&mut conn).await?; +pub async fn transition_non_continue_cfds_to_setup_failed( + conn: &mut PoolConnection, +) -> Result<()> { + let cfds = load_all_cfds(conn).await?; for cfd in cfds.iter().filter(|cfd| Cfd::is_cleanup(cfd)) { insert_new_cfd_state_by_order_id( @@ -18,7 +19,7 @@ pub async fn transition_non_continue_cfds_to_setup_failed(db: SqlitePool) -> Res }, info: format!("Was in state {} which cannot be continued.", cfd.state), }, - &mut conn, + conn, ) .await?; } diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 9fedf66..003cdaf 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -6,7 +6,7 @@ use anyhow::{Context, Result}; use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1}; use bdk::bitcoin::Network; use clap::Clap; -use model::cfd::{Cfd, Order}; +use model::cfd::Order; use model::WalletInfo; use rocket::fairing::AdHoc; use rocket_db_pools::Database; @@ -108,7 +108,6 @@ async fn main() -> Result<()> { let oracle = schnorrsig::KeyPair::new(SECP256K1, &mut rand::thread_rng()); // TODO: Fetch oracle public key from oracle. - let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::>(vec![]); let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::(wallet_info); @@ -125,7 +124,6 @@ async fn main() -> Result<()> { tokio::spawn(task); rocket::custom(figment) - .manage(cfd_feed_receiver) .manage(order_feed_receiver) .manage(wallet_feed_receiver) .manage(auth_password) @@ -150,11 +148,14 @@ async fn main() -> Result<()> { Some(db) => (**db).clone(), None => return Err(rocket), }; + let mut conn = db.acquire().await.unwrap(); - cleanup::transition_non_continue_cfds_to_setup_failed(db.clone()) + cleanup::transition_non_continue_cfds_to_setup_failed(&mut conn) .await .unwrap(); + let cfds = load_all_cfds(&mut conn).await.unwrap(); + let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); let (maker_inc_connections_address, maker_inc_connections_context) = xtra::Context::new(None); @@ -216,7 +217,9 @@ async fn main() -> Result<()> { tokio::spawn(maker_inc_connections_address.attach_stream(listener_stream)); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); - Ok(rocket.manage(cfd_maker_actor_inbox)) + Ok(rocket + .manage(cfd_maker_actor_inbox) + .manage(cfd_feed_receiver)) }, )) .mount( diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 3e71f39..fa4862b 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -80,9 +80,6 @@ impl Actor { cfds: Vec, oracle_actor: Address>>, ) -> Result { - // populate the CFD feed with existing CFDs - cfd_feed_actor_inbox.send(cfds.clone())?; - for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) { let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?; diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 2f0d39a..54a94f3 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -6,7 +6,7 @@ use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1}; use bdk::bitcoin::Network; use clap::Clap; use futures::StreamExt; -use model::cfd::{Cfd, Order}; +use model::cfd::Order; use rocket::fairing::AdHoc; use rocket_db_pools::Database; use seed::Seed; @@ -103,7 +103,6 @@ async fn main() -> Result<()> { let oracle = schnorrsig::KeyPair::new(SECP256K1, &mut rand::thread_rng()); // TODO: Fetch oracle public key from oracle. - let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::>(vec![]); let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::(wallet_info); @@ -128,7 +127,6 @@ async fn main() -> Result<()> { .merge(("port", opts.http_port)); rocket::custom(figment) - .manage(cfd_feed_receiver) .manage(order_feed_receiver) .manage(wallet_feed_receiver) .manage(quote_updates) @@ -152,11 +150,14 @@ async fn main() -> Result<()> { Some(db) => (**db).clone(), None => return Err(rocket), }; + let mut conn = db.acquire().await.unwrap(); - cleanup::transition_non_continue_cfds_to_setup_failed(db.clone()) + cleanup::transition_non_continue_cfds_to_setup_failed(&mut conn) .await .unwrap(); + let cfds = load_all_cfds(&mut conn).await.unwrap(); + let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); let send_to_maker = send_to_socket::Actor::new(write) .create(None) .spawn_global(); @@ -193,7 +194,6 @@ async fn main() -> Result<()> { ), ); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); - tokio::spawn( oracle_actor_context .notify_interval(Duration::from_secs(60), || oracle::Sync) @@ -204,7 +204,7 @@ async fn main() -> Result<()> { monitor_actor_address, ))); - Ok(rocket.manage(cfd_actor_inbox)) + Ok(rocket.manage(cfd_actor_inbox).manage(cfd_feed_receiver)) }, )) .mount( diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index ea2d3ed..107e78a 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -73,9 +73,6 @@ impl Actor { cfds: Vec, oracle_actor: Address>>, ) -> Result { - // populate the CFD feed with existing CFDs - cfd_feed_actor_inbox.send(cfds.clone())?; - for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) { let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?;