diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index d26ad08..0ba0ab5 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -12,7 +12,6 @@ use model::WalletInfo; use rocket::fairing::AdHoc; use rocket_db_pools::Database; use std::path::PathBuf; -use std::time::Duration; use tokio::sync::watch; use tracing_subscriber::filter::LevelFilter; use xtra::prelude::*; @@ -34,6 +33,7 @@ mod send_wire_message_actor; mod setup_contract_actor; mod to_sse_event; mod wallet; +mod wallet_sync; mod wire; #[derive(Database)] @@ -162,11 +162,10 @@ async fn main() -> Result<()> { let cfd_maker_actor_inbox = maker_cfd::Actor::new( db, - wallet, + wallet.clone(), schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle), cfd_feed_sender, order_feed_sender, - wallet_feed_sender, maker_inc_connections_address.clone(), ) .await @@ -213,20 +212,7 @@ async fn main() -> Result<()> { } }); - // consecutive wallet syncs handled by task that triggers sync - let wallet_sync_interval = Duration::from_secs(10); - tokio::spawn({ - let cfd_actor_inbox = cfd_maker_actor_inbox.clone(); - async move { - loop { - cfd_actor_inbox - .do_send_async(maker_cfd::SyncWallet) - .await - .unwrap(); - tokio::time::sleep(wallet_sync_interval).await; - } - } - }); + tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); Ok(rocket.manage(cfd_maker_actor_inbox)) }, diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 881411f..cb60a81 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -5,7 +5,7 @@ use crate::db::{ }; use crate::maker_inc_connections::TakerCommand; use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId}; -use crate::model::{TakerId, Usd, WalletInfo}; +use crate::model::{TakerId, Usd}; use crate::wallet::Wallet; use crate::wire::SetupMsg; use crate::{maker_inc_connections, setup_contract_actor}; @@ -43,15 +43,12 @@ pub struct CfdSetupCompleted { pub dlc: Dlc, } -pub struct SyncWallet; - pub struct Actor { db: sqlx::SqlitePool, wallet: Wallet, oracle_pk: schnorrsig::PublicKey, cfd_feed_actor_inbox: watch::Sender>, order_feed_sender: watch::Sender>, - wallet_feed_sender: watch::Sender, takers: Address, current_order_id: Option, current_contract_setup: Option>, @@ -67,7 +64,6 @@ impl Actor { oracle_pk: schnorrsig::PublicKey, cfd_feed_actor_inbox: watch::Sender>, order_feed_sender: watch::Sender>, - wallet_feed_sender: watch::Sender, takers: Address, ) -> Result { let mut conn = db.acquire().await?; @@ -81,7 +77,6 @@ impl Actor { oracle_pk, cfd_feed_actor_inbox, order_feed_sender, - wallet_feed_sender, takers, current_order_id: None, current_contract_setup: None, @@ -142,12 +137,6 @@ impl Actor { Ok(()) } - async fn handle_sync_wallet(&mut self) -> Result<()> { - let wallet_info = self.wallet.sync().await?; - self.wallet_feed_sender.send(wallet_info)?; - Ok(()) - } - async fn handle_cfd_setup_completed(&mut self, msg: CfdSetupCompleted) -> Result<()> { let mut conn = self.db.acquire().await?; @@ -447,13 +436,6 @@ impl Handler for Actor { } } -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, _msg: SyncWallet, _ctx: &mut Context) { - log_error!(self.handle_sync_wallet()); - } -} - impl Message for TakeOrder { type Result = (); } @@ -474,10 +456,6 @@ impl Message for CfdSetupCompleted { type Result = (); } -impl Message for SyncWallet { - type Result = (); -} - impl Message for AcceptOrder { type Result = (); } diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 94a1c01..6e5a9fd 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -32,6 +32,7 @@ mod taker_cfd_actor; mod taker_inc_message_actor; mod to_sse_event; mod wallet; +mod wallet_sync; mod wire; const CONNECTION_RETRY_INTERVAL: Duration = Duration::from_secs(5); @@ -161,11 +162,10 @@ async fn main() -> Result<()> { let cfd_actor_inbox = taker_cfd_actor::TakerCfdActor::new( db, - wallet, + wallet.clone(), schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle), cfd_feed_sender, order_feed_sender, - wallet_feed_sender, out_maker_actor_inbox, ) .await @@ -176,22 +176,7 @@ async fn main() -> Result<()> { let inc_maker_messages_actor = taker_inc_message_actor::new(read, cfd_actor_inbox.clone()); - // consecutive wallet syncs handled by task that triggers sync - let wallet_sync_interval = Duration::from_secs(10); - tokio::spawn({ - let cfd_actor_inbox = cfd_actor_inbox.clone(); - async move { - loop { - cfd_actor_inbox - .do_send_async(taker_cfd_actor::SyncWallet) - .await - .unwrap(); - - tokio::time::sleep(wallet_sync_interval).await; - } - } - }); - + tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); tokio::spawn(inc_maker_messages_actor); tokio::spawn(out_maker_messages_actor); diff --git a/daemon/src/taker_cfd_actor.rs b/daemon/src/taker_cfd_actor.rs index 57d1ebe..a847674 100644 --- a/daemon/src/taker_cfd_actor.rs +++ b/daemon/src/taker_cfd_actor.rs @@ -5,7 +5,7 @@ use crate::db::{ use crate::actors::log_error; use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId}; -use crate::model::{Usd, WalletInfo}; +use crate::model::Usd; use crate::wallet::Wallet; use crate::wire::SetupMsg; use crate::{setup_contract_actor, wire}; @@ -16,8 +16,6 @@ use std::time::SystemTime; use tokio::sync::{mpsc, watch}; use xtra::prelude::*; -pub struct SyncWallet; - pub struct TakeOffer { pub order_id: OrderId, pub quantity: Usd, @@ -40,7 +38,6 @@ pub struct TakerCfdActor { oracle_pk: schnorrsig::PublicKey, cfd_feed_actor_inbox: watch::Sender>, order_feed_actor_inbox: watch::Sender>, - wallet_feed_sender: watch::Sender, out_msg_maker_inbox: mpsc::UnboundedSender, current_contract_setup: Option>, // TODO: Move the contract setup into a dedicated actor and send messages to that actor that @@ -55,7 +52,6 @@ impl TakerCfdActor { oracle_pk: schnorrsig::PublicKey, cfd_feed_actor_inbox: watch::Sender>, order_feed_actor_inbox: watch::Sender>, - wallet_feed_sender: watch::Sender, out_msg_maker_inbox: mpsc::UnboundedSender, ) -> Result { let mut conn = db.acquire().await?; @@ -67,19 +63,12 @@ impl TakerCfdActor { oracle_pk, cfd_feed_actor_inbox, order_feed_actor_inbox, - wallet_feed_sender, out_msg_maker_inbox, current_contract_setup: None, contract_setup_message_buffer: vec![], }) } - async fn handle_sync_wallet(&mut self) -> Result<()> { - let wallet_info = self.wallet.sync().await?; - self.wallet_feed_sender.send(wallet_info)?; - Ok(()) - } - async fn handle_take_offer(&mut self, order_id: OrderId, quantity: Usd) -> Result<()> { let mut conn = self.db.acquire().await?; @@ -250,13 +239,6 @@ impl TakerCfdActor { } } -#[async_trait] -impl Handler for TakerCfdActor { - async fn handle(&mut self, _msg: SyncWallet, _ctx: &mut Context) { - log_error!(self.handle_sync_wallet()); - } -} - #[async_trait] impl Handler for TakerCfdActor { async fn handle(&mut self, msg: TakeOffer, _ctx: &mut Context) { @@ -299,10 +281,6 @@ impl Handler for TakerCfdActor { } } -impl Message for SyncWallet { - type Result = (); -} - impl Message for TakeOffer { type Result = (); } diff --git a/daemon/src/wallet_sync.rs b/daemon/src/wallet_sync.rs new file mode 100644 index 0000000..8159d16 --- /dev/null +++ b/daemon/src/wallet_sync.rs @@ -0,0 +1,24 @@ +use crate::wallet::Wallet; +use crate::WalletInfo; +use std::time::Duration; +use tokio::sync::watch; +use tokio::time::sleep; + +pub async fn new(wallet: Wallet, sender: watch::Sender) { + loop { + sleep(Duration::from_secs(10)).await; + + let info = match wallet.sync().await { + Ok(info) => info, + Err(e) => { + tracing::warn!("Failed to sync wallet: {:#}", e); + continue; + } + }; + + if sender.send(info).is_err() { + tracing::warn!("Wallet feed receiver not available, stopping wallet sync"); + break; + } + } +}