diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 3dae2c6..e0c5d97 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -55,7 +55,6 @@ pub mod tokio_ext; pub mod try_continue; pub mod tx; pub mod wallet; -pub mod wallet_sync; pub mod wire; // Certain operations (e.g. contract setup) take long time in debug mode, @@ -115,7 +114,6 @@ where + xtra::Handler + xtra::Handler>, W: xtra::Handler - + xtra::Handler + xtra::Handler + xtra::Handler, { @@ -222,7 +220,6 @@ where + xtra::Handler + xtra::Handler, W: xtra::Handler - + xtra::Handler + xtra::Handler + xtra::Handler, { diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 4601416..40bf22f 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -8,7 +8,7 @@ use daemon::model::cfd::Role; use daemon::seed::Seed; use daemon::{ bitmex_price_feed, db, housekeeping, logger, maker_inc_connections, monitor, oracle, - projection, wallet, wallet_sync, MakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, + projection, wallet, MakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, SETTLEMENT_INTERVAL, }; use sqlx::sqlite::SqliteConnectOptions; @@ -17,7 +17,6 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; use std::task::Poll; -use tokio::sync::watch; use tracing_subscriber::filter::LevelFilter; use xtra::Actor; @@ -160,13 +159,13 @@ async fn main() -> Result<()> { let mut tasks = Tasks::default(); - let (wallet, wallet_fut) = wallet::Actor::new( + let (wallet, wallet_feed_receiver) = wallet::Actor::new( opts.network.electrum(), &data_dir.join("maker_wallet.sqlite"), ext_priv_key, - )? - .create(None) - .run(); + )?; + + let (wallet, wallet_fut) = wallet.create(None).run(); tasks.add(wallet_fut); if let Some(Withdraw::Withdraw { @@ -202,8 +201,6 @@ async fn main() -> Result<()> { "ddd4636845a90185991826be5a494cde9f4a6947b1727217afedc6292fa4caf7", )?; - let (wallet_feed_sender, wallet_feed_receiver) = watch::channel(None); - let figment = rocket::Config::figment() .merge(("address", opts.http_address.ip())) .merge(("port", opts.http_address.port())); @@ -289,7 +286,6 @@ async fn main() -> Result<()> { }); tasks.add(incoming_connection_addr.attach_stream(listener_stream)); - tasks.add(wallet_sync::new(wallet.clone(), wallet_feed_sender)); rocket::custom(figment) .manage(projection_feeds) diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 4fea894..635de2d 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -8,7 +8,7 @@ use daemon::model::cfd::Role; use daemon::model::Identity; use daemon::seed::Seed; use daemon::{ - bitmex_price_feed, db, housekeeping, logger, monitor, oracle, projection, wallet, wallet_sync, + bitmex_price_feed, db, housekeeping, logger, monitor, oracle, projection, wallet, TakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, SETTLEMENT_INTERVAL, }; use sqlx::sqlite::SqliteConnectOptions; @@ -17,7 +17,6 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; use std::time::Duration; -use tokio::sync::watch; use tracing_subscriber::filter::LevelFilter; use xtra::Actor; @@ -171,13 +170,13 @@ async fn main() -> Result<()> { let mut tasks = Tasks::default(); - let (wallet, wallet_fut) = wallet::Actor::new( + let (wallet, wallet_feed_receiver) = wallet::Actor::new( opts.network.electrum(), &data_dir.join("taker_wallet.sqlite"), ext_priv_key, - )? - .create(None) - .run(); + )?; + + let (wallet, wallet_fut) = wallet.create(None).run(); tasks.add(wallet_fut); if let Some(Withdraw::Withdraw { @@ -202,8 +201,6 @@ async fn main() -> Result<()> { "ddd4636845a90185991826be5a494cde9f4a6947b1727217afedc6292fa4caf7", )?; - let (wallet_feed_sender, wallet_feed_receiver) = watch::channel(None); - let figment = rocket::Config::figment() .merge(("address", opts.http_address.ip())) .merge(("port", opts.http_address.port())); @@ -270,8 +267,6 @@ async fn main() -> Result<()> { possible_addresses, )); - tasks.add(wallet_sync::new(wallet.clone(), wallet_feed_sender)); - let rocket = rocket::custom(figment) .manage(projection_feeds) .manage(cfd_actor_addr) diff --git a/daemon/src/wallet.rs b/daemon/src/wallet.rs index 2fcf36d..dbd1ea0 100644 --- a/daemon/src/wallet.rs +++ b/daemon/src/wallet.rs @@ -1,5 +1,7 @@ use crate::model::{Timestamp, WalletInfo}; +use crate::Tasks; use anyhow::{bail, Context, Result}; +use async_trait::async_trait; use bdk::bitcoin::consensus::encode::serialize_hex; use bdk::bitcoin::util::bip32::ExtendedPrivKey; use bdk::bitcoin::util::psbt::PartiallySignedTransaction; @@ -11,9 +13,10 @@ use bdk::wallet::AddressIndex; use bdk::{electrum_client, FeeRate, KeychainKind, SignOptions}; use maia::{PartyParams, TxBuilderExt}; use rocket::serde::json::Value; - use std::collections::HashSet; use std::path::Path; +use std::time::Duration; +use tokio::sync::watch; use xtra_productivity::xtra_productivity; const DUST_AMOUNT: u64 = 546; @@ -21,6 +24,8 @@ const DUST_AMOUNT: u64 = 546; pub struct Actor { wallet: bdk::Wallet, used_utxos: HashSet, + tasks: Tasks, + sender: watch::Sender>, } #[derive(thiserror::Error, Debug, Clone, Copy)] @@ -32,7 +37,7 @@ impl Actor { electrum_rpc_url: &str, wallet_dir: &Path, ext_priv_key: ExtendedPrivKey, - ) -> Result { + ) -> Result<(Self, watch::Receiver>)> { let client = bdk::electrum_client::Client::new(electrum_rpc_url) .context("Failed to initialize Electrum RPC client")?; @@ -46,10 +51,15 @@ impl Actor { ElectrumBlockchain::from(client), )?; - Ok(Self { + let (sender, receiver) = watch::channel(None); + let actor = Self { wallet, + tasks: Tasks::default(), + sender, used_utxos: HashSet::default(), - }) + }; + + Ok((actor, receiver)) } /// Calculates the maximum "giveable" amount of this wallet. @@ -86,11 +96,8 @@ impl Actor { Err(e) => bail!("Failed to build transaction. {:#}", e), } } -} -#[xtra_productivity] -impl Actor { - pub fn handle_sync(&mut self, _msg: Sync) -> Result { + fn sync_internal(&mut self) -> Result { self.wallet .sync(NoopProgress, None) .context("Failed to sync wallet")?; @@ -107,6 +114,24 @@ impl Actor { Ok(wallet_info) } +} + +#[xtra_productivity] +impl Actor { + pub fn handle_sync(&mut self, _msg: Sync) -> Result<()> { + let wallet_info_update = match self.sync_internal() { + Ok(wallet_info) => Some(wallet_info), + Err(e) => { + tracing::debug!("{:#}", e); + + None + } + }; + + let _ = self.sender.send(wallet_info_update); + + Ok(()) + } pub fn handle_sign(&mut self, msg: Sign) -> Result { let mut psbt = msg.psbt; @@ -222,14 +247,30 @@ impl Actor { } } -impl xtra::Actor for Actor {} +#[async_trait] +impl xtra::Actor for Actor { + async fn started(&mut self, ctx: &mut xtra::Context) { + let this = ctx.address().expect("self to be alive"); + + self.tasks.add(async move { + loop { + tokio::time::sleep(Duration::from_secs(10)).await; + + if this.send(Sync).await.is_err() { + return; // we are disconnected, meaning actor stopped, just exit the loop. + } + } + }); + } +} pub struct BuildPartyParams { pub amount: Amount, pub identity_pk: PublicKey, } -pub struct Sync; +/// Private message to trigger a sync. +struct Sync; pub struct Sign { pub psbt: PartiallySignedTransaction, diff --git a/daemon/src/wallet_sync.rs b/daemon/src/wallet_sync.rs deleted file mode 100644 index 0d668ed..0000000 --- a/daemon/src/wallet_sync.rs +++ /dev/null @@ -1,29 +0,0 @@ -use crate::model::WalletInfo; -use crate::wallet; -use std::time::Duration; -use tokio::sync::watch; -use tokio::time::sleep; -use xtra::Address; - -pub async fn new(wallet: Address, sender: watch::Sender>) { - loop { - sleep(Duration::from_secs(10)).await; - - let info = match wallet - .send(wallet::Sync) - .await - .expect("Wallet actor to be available") - { - Ok(info) => Some(info), - Err(e) => { - tracing::warn!("Failed to sync wallet: {:#}", e); - None - } - }; - - if sender.send(info).is_err() { - tracing::warn!("Wallet feed receiver not available, stopping wallet sync"); - break; - } - } -} diff --git a/daemon/tests/harness/mocks/wallet.rs b/daemon/tests/harness/mocks/wallet.rs index e31e592..0566313 100644 --- a/daemon/tests/harness/mocks/wallet.rs +++ b/daemon/tests/harness/mocks/wallet.rs @@ -28,10 +28,6 @@ impl WalletActor { async fn handle(&mut self, msg: wallet::BuildPartyParams) -> Result { self.mock.lock().await.build_party_params(msg) } - async fn handle(&mut self, msg: wallet::Sync) -> Result { - tracing::info!("We are handling the wallet sync msg"); - self.mock.lock().await.sync(msg) - } async fn handle(&mut self, msg: wallet::Sign) -> Result { self.mock.lock().await.sign(msg) } @@ -53,10 +49,6 @@ pub trait Wallet { fn broadcast(&mut self, _msg: wallet::TryBroadcastTransaction) -> Result { unreachable!("mockall will reimplement this method") } - - fn sync(&mut self, _msg: wallet::Sync) -> Result { - unreachable!("mockall will reimplement this method") - } } #[allow(dead_code)]