diff --git a/daemon/src/cfd_actors.rs b/daemon/src/cfd_actors.rs index d44b607..6355c14 100644 --- a/daemon/src/cfd_actors.rs +++ b/daemon/src/cfd_actors.rs @@ -1,7 +1,6 @@ use crate::db::load_cfd_by_order_id; use crate::model::cfd::{Attestation, Cfd, CfdState, CfdStateChangeEvent, OrderId}; -use crate::wallet::Wallet; -use crate::{db, monitor, oracle, try_continue}; +use crate::{db, monitor, oracle, try_continue, wallet}; use anyhow::{bail, Context, Result}; use sqlx::pool::PoolConnection; use sqlx::Sqlite; @@ -34,15 +33,21 @@ pub async fn append_cfd_state( Ok(()) } -pub async fn try_cet_publication( +pub async fn try_cet_publication( cfd: &mut Cfd, conn: &mut PoolConnection, - wallet: &Wallet, + wallet: &xtra::Address, update_sender: &watch::Sender>, -) -> Result<()> { +) -> Result<()> +where + W: xtra::Handler, +{ match cfd.cet()? { Ok(cet) => { - let txid = wallet.try_broadcast_transaction(cet).await?; + let txid = wallet + .send(wallet::TryBroadcastTransaction { tx: cet }) + .await? + .context("Failed to send transaction")?; tracing::info!("CET published with txid {}", txid); if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() { @@ -60,12 +65,15 @@ pub async fn try_cet_publication( Ok(()) } -pub async fn handle_monitoring_event( +pub async fn handle_monitoring_event( event: monitor::Event, conn: &mut PoolConnection, - wallet: &Wallet, + wallet: &xtra::Address, update_sender: &watch::Sender>, -) -> Result<()> { +) -> Result<()> +where + W: xtra::Handler, +{ let order_id = event.order_id(); let mut cfd = db::load_cfd_by_order_id(order_id, conn).await?; @@ -82,24 +90,37 @@ pub async fn handle_monitoring_event( try_cet_publication(&mut cfd, conn, wallet, update_sender).await?; } else if let CfdState::MustRefund { .. } = cfd.state { let signed_refund_tx = cfd.refund_tx()?; - let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?; + let txid = wallet + .send(wallet::TryBroadcastTransaction { + tx: signed_refund_tx, + }) + .await? + .context("Failed to publish CET")?; tracing::info!("Refund transaction published on chain: {}", txid); } Ok(()) } -pub async fn handle_commit( +pub async fn handle_commit( order_id: OrderId, conn: &mut PoolConnection, - wallet: &Wallet, + wallet: &xtra::Address, update_sender: &watch::Sender>, -) -> Result<()> { +) -> Result<()> +where + W: xtra::Handler, +{ let mut cfd = db::load_cfd_by_order_id(order_id, conn).await?; let signed_commit_tx = cfd.commit_tx()?; - let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?; + let txid = wallet + .send(wallet::TryBroadcastTransaction { + tx: signed_commit_tx, + }) + .await? + .context("Failed to publish commit tx")?; if cfd.handle(CfdStateChangeEvent::CommitTxSent)?.is_none() { bail!("If we can get the commit tx we should be able to transition") @@ -111,12 +132,15 @@ pub async fn handle_commit( Ok(()) } -pub async fn handle_oracle_attestation( +pub async fn handle_oracle_attestation( attestation: oracle::Attestation, conn: &mut PoolConnection, - wallet: &Wallet, + wallet: &xtra::Address, update_sender: &watch::Sender>, -) -> Result<()> { +) -> Result<()> +where + W: xtra::Handler, +{ tracing::debug!( "Learnt latest oracle attestation for event: {}", attestation.id diff --git a/daemon/src/connection.rs b/daemon/src/connection.rs index 5a6d9ce..05dbe02 100644 --- a/daemon/src/connection.rs +++ b/daemon/src/connection.rs @@ -1,4 +1,4 @@ -use daemon::{send_to_socket, taker_cfd, wire}; +use crate::{send_to_socket, taker_cfd, wire}; use futures::{Stream, StreamExt}; use std::net::SocketAddr; use std::time::Duration; diff --git a/daemon/src/housekeeping.rs b/daemon/src/housekeeping.rs index dd5d225..ef20c69 100644 --- a/daemon/src/housekeeping.rs +++ b/daemon/src/housekeeping.rs @@ -1,10 +1,11 @@ use crate::db::{append_cfd_state, load_all_cfds}; use crate::model::cfd::{Cfd, CfdState}; -use crate::try_continue; -use crate::wallet::Wallet; +use crate::{try_continue, wallet}; use anyhow::Result; use sqlx::pool::PoolConnection; use sqlx::Sqlite; +use xtra::Address; + pub async fn transition_non_continue_cfds_to_setup_failed( conn: &mut PoolConnection, ) -> Result<()> { @@ -24,25 +25,40 @@ pub async fn transition_non_continue_cfds_to_setup_failed( pub async fn rebroadcast_transactions( conn: &mut PoolConnection, - wallet: &Wallet, + wallet: &Address, ) -> Result<()> { let cfds = load_all_cfds(conn).await?; for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) { - let txid = try_continue!(wallet.try_broadcast_transaction(dlc.lock.0.clone()).await); + let txid = try_continue!(wallet + .send(wallet::TryBroadcastTransaction { + tx: dlc.lock.0.clone() + }) + .await + .expect("if sending to actor fails here we are screwed anyway")); tracing::info!("Lock transaction published with txid {}", txid); } for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) { let signed_refund_tx = cfd.refund_tx()?; - let txid = try_continue!(wallet.try_broadcast_transaction(signed_refund_tx).await); + let txid = try_continue!(wallet + .send(wallet::TryBroadcastTransaction { + tx: signed_refund_tx + }) + .await + .expect("if sending to actor fails here we are screwed anyway")); tracing::info!("Refund transaction published on chain: {}", txid); } for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) { let signed_commit_tx = cfd.commit_tx()?; - let txid = try_continue!(wallet.try_broadcast_transaction(signed_commit_tx).await); + let txid = try_continue!(wallet + .send(wallet::TryBroadcastTransaction { + tx: signed_commit_tx + }) + .await + .expect("if sending to actor fails here we are screwed anyway")); tracing::info!("Commit transaction published on chain: {}", txid); } @@ -50,7 +66,10 @@ pub async fn rebroadcast_transactions( for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_cet(cfd)) { // Double question mark OK because if we are in PendingCet we must have been Ready before let signed_cet = cfd.cet()??; - let txid = try_continue!(wallet.try_broadcast_transaction(signed_cet).await); + let txid = try_continue!(wallet + .send(wallet::TryBroadcastTransaction { tx: signed_cet }) + .await + .expect("if sending to actor fails here we are screwed anyway")); tracing::info!("CET published on chain: {}", txid); } diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 95c6362..95daf54 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -3,7 +3,6 @@ use crate::db::load_all_cfds; use crate::maker_cfd::{FromTaker, NewTakerOnline}; use crate::model::cfd::{Cfd, Order, UpdateCfdProposals}; use crate::oracle::Attestation; -use crate::wallet::Wallet; use anyhow::Result; use cfd_protocol::secp256k1_zkp::schnorrsig; use futures::Stream; @@ -20,6 +19,7 @@ pub mod actors; pub mod auth; pub mod bitmex_price_feed; pub mod cfd_actors; +pub mod connection; pub mod db; pub mod fan_out; pub mod forward_only_ok; @@ -45,15 +45,15 @@ pub mod wallet; pub mod wallet_sync; pub mod wire; -pub struct Maker { - pub cfd_actor_addr: Address>, +pub struct MakerActorSystem { + pub cfd_actor_addr: Address>, pub cfd_feed_receiver: watch::Receiver>, pub order_feed_receiver: watch::Receiver>, pub update_cfd_feed_receiver: watch::Receiver, pub inc_conn_addr: Address, } -impl Maker +impl MakerActorSystem where O: xtra::Handler + xtra::Handler @@ -65,14 +65,18 @@ where + xtra::Handler, T: xtra::Handler + xtra::Handler, + W: xtra::Handler + + xtra::Handler + + xtra::Handler + + xtra::Handler, { pub async fn new( db: SqlitePool, - wallet: Wallet, + wallet_addr: Address, oracle_pk: schnorrsig::PublicKey, - oracle_constructor: impl Fn(Vec, Box>) -> O, - monitor_constructor: impl Fn(Box>, Vec) -> F, - inc_conn_constructor: impl Fn( + oracle_constructor: impl FnOnce(Vec, Box>) -> O, + monitor_constructor: impl FnOnce(Box>, Vec) -> F, + inc_conn_constructor: impl FnOnce( Box>, Box>, ) -> T, @@ -96,7 +100,7 @@ where let cfd_actor_addr = maker_cfd::Actor::new( db, - wallet, + wallet_addr, term, oracle_pk, cfd_feed_sender, @@ -147,14 +151,14 @@ where } } -pub struct Taker { - pub cfd_actor_addr: Address>, +pub struct TakerActorSystem { + pub cfd_actor_addr: Address>, pub cfd_feed_receiver: watch::Receiver>, pub order_feed_receiver: watch::Receiver>, pub update_cfd_feed_receiver: watch::Receiver, } -impl Taker +impl TakerActorSystem where O: xtra::Handler + xtra::Handler @@ -164,15 +168,19 @@ where + xtra::Handler + xtra::Handler + xtra::Handler, + W: xtra::Handler + + xtra::Handler + + xtra::Handler + + xtra::Handler, { pub async fn new( db: SqlitePool, - wallet: Wallet, + wallet_addr: Address, oracle_pk: schnorrsig::PublicKey, send_to_maker: Box>, read_from_maker: Box + Unpin + Send>, - oracle_constructor: impl Fn(Vec, Box>) -> O, - monitor_constructor: impl Fn(Box>, Vec) -> F, + oracle_constructor: impl FnOnce(Vec, Box>) -> O, + monitor_constructor: impl FnOnce(Box>, Vec) -> F, ) -> Result where F: Future>, @@ -191,7 +199,7 @@ where let cfd_actor_addr = taker_cfd::Actor::new( db, - wallet, + wallet_addr, oracle_pk, cfd_feed_sender, order_feed_sender, @@ -220,6 +228,7 @@ where .notify_interval(Duration::from_secs(5), || oracle::Sync) .unwrap(), ); + let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr]) .create(None) .spawn_global(); diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index cffc91f..a5f1d51 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -8,15 +8,13 @@ use daemon::db::{self}; use daemon::model::WalletInfo; use daemon::seed::Seed; -use daemon::wallet::Wallet; use daemon::{ bitmex_price_feed, housekeeping, logger, maker_cfd, maker_inc_connections, monitor, oracle, - wallet_sync, Maker, + wallet, wallet_sync, MakerActorSystem, }; use sqlx::sqlite::SqliteConnectOptions; use sqlx::SqlitePool; -use xtra::prelude::MessageChannel; use std::net::SocketAddr; use std::path::PathBuf; @@ -25,6 +23,9 @@ use std::str::FromStr; use std::task::Poll; use tokio::sync::watch; use tracing_subscriber::filter::LevelFilter; +use xtra::prelude::*; +use xtra::spawn::TokioGlobalSpawnExt; +use xtra::Actor; mod routes_maker; @@ -129,13 +130,16 @@ async fn main() -> Result<()> { let bitcoin_network = opts.network.bitcoin_network(); let ext_priv_key = seed.derive_extended_priv_key(bitcoin_network)?; - let wallet = Wallet::new( + let wallet = wallet::Actor::new( opts.network.electrum(), &data_dir.join("maker_wallet.sqlite"), ext_priv_key, ) - .await?; - let wallet_info = wallet.sync().await?; + .await? + .create(None) + .spawn_global(); + + let wallet_info = wallet.send(wallet::Sync).await??; let auth_password = seed.derive_auth_password::(); @@ -186,13 +190,13 @@ async fn main() -> Result<()> { housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn).await?; housekeeping::rebroadcast_transactions(&mut conn, &wallet).await?; - let Maker { + let MakerActorSystem { cfd_actor_addr, cfd_feed_receiver, order_feed_receiver, update_cfd_feed_receiver, inc_conn_addr: incoming_connection_addr, - } = Maker::new( + } = MakerActorSystem::new( db.clone(), wallet.clone(), oracle, diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 85d5941..26f0ad7 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -8,8 +8,7 @@ use crate::model::cfd::{ }; use crate::model::{TakerId, Usd}; use crate::monitor::MonitorParams; -use crate::wallet::Wallet; -use crate::{log_error, maker_inc_connections, monitor, oracle, setup_contract, wire}; +use crate::{log_error, maker_inc_connections, monitor, oracle, setup_contract, wallet, wire}; use anyhow::{Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; @@ -59,9 +58,9 @@ pub struct FromTaker { pub msg: wire::TakerToMaker, } -pub struct Actor { +pub struct Actor { db: sqlx::SqlitePool, - wallet: Wallet, + wallet: Address, term: Duration, oracle_pk: schnorrsig::PublicKey, cfd_feed_actor_inbox: watch::Sender>, @@ -94,11 +93,11 @@ enum RollOverState { None, } -impl Actor { +impl Actor { #[allow(clippy::too_many_arguments)] pub fn new( db: sqlx::SqlitePool, - wallet: Wallet, + wallet: Address, term: Duration, oracle_pk: schnorrsig::PublicKey, cfd_feed_actor_inbox: watch::Sender>, @@ -127,19 +126,6 @@ impl Actor { } } - async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { - let mut conn = self.db.acquire().await?; - cfd_actors::handle_commit( - order_id, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; - - Ok(()) - } - async fn handle_propose_roll_over( &mut self, proposal: RollOverProposal, @@ -204,30 +190,6 @@ impl Actor { Ok(()) } - async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { - let mut conn = self.db.acquire().await?; - cfd_actors::handle_monitoring_event( - event, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; - Ok(()) - } - - async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { - let mut conn = self.db.acquire().await?; - cfd_actors::handle_oracle_attestation( - attestation, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; - Ok(()) - } - async fn handle_inc_protocol_msg( &mut self, taker_id: TakerId, @@ -311,7 +273,49 @@ impl Actor { } } -impl Actor +impl Actor +where + W: xtra::Handler, +{ + async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { + let mut conn = self.db.acquire().await?; + cfd_actors::handle_commit( + order_id, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; + + Ok(()) + } + + async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { + let mut conn = self.db.acquire().await?; + cfd_actors::handle_monitoring_event( + event, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; + Ok(()) + } + + async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { + let mut conn = self.db.acquire().await?; + cfd_actors::handle_oracle_attestation( + attestation, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; + Ok(()) + } +} + +impl Actor where T: xtra::Handler, { @@ -401,7 +405,7 @@ where } } -impl Actor +impl Actor where T: xtra::Handler + xtra::Handler, @@ -530,11 +534,12 @@ where } } -impl Actor +impl Actor where Self: xtra::Handler, O: xtra::Handler, T: xtra::Handler, + W: xtra::Handler + xtra::Handler, { async fn handle_accept_order( &mut self, @@ -622,7 +627,7 @@ where } } -impl Actor +impl Actor where O: xtra::Handler, T: xtra::Handler, @@ -667,10 +672,11 @@ where } } -impl Actor +impl Actor where O: xtra::Handler, M: xtra::Handler, + W: xtra::Handler, { async fn handle_cfd_setup_completed( &mut self, @@ -694,8 +700,10 @@ where let txid = self .wallet - .try_broadcast_transaction(dlc.lock.0.clone()) - .await?; + .send(wallet::TryBroadcastTransaction { + tx: dlc.lock.0.clone(), + }) + .await??; tracing::info!("Lock transaction published with txid {}", txid); @@ -716,7 +724,7 @@ where } } -impl Actor +impl Actor where Self: xtra::Handler, O: xtra::Handler + xtra::Handler, @@ -809,7 +817,7 @@ where } } -impl Actor +impl Actor where M: xtra::Handler, { @@ -842,9 +850,10 @@ where } } -impl Actor +impl Actor where M: xtra::Handler, + W: xtra::Handler, { async fn handle_initiate_settlement( &mut self, @@ -888,8 +897,10 @@ where let txid = self .wallet - .try_broadcast_transaction(spend_tx.clone()) - .await + .send(wallet::TryBroadcastTransaction { + tx: spend_tx.clone(), + }) + .await? .context("Broadcasting spend transaction")?; tracing::info!("Close transaction published with txid {}", txid); @@ -909,12 +920,15 @@ where } #[async_trait] -impl Handler for Actor +impl Handler for Actor where Self: xtra::Handler + xtra::Handler, O: xtra::Handler + xtra::Handler, T: xtra::Handler + xtra::Handler, + W: xtra::Handler + + xtra::Handler + + xtra::Handler, { async fn handle(&mut self, msg: CfdAction, ctx: &mut Context) { use CfdAction::*; @@ -933,7 +947,7 @@ where } #[async_trait] -impl Handler for Actor +impl Handler for Actor where O: xtra::Handler, T: xtra::Handler, @@ -944,7 +958,7 @@ where } #[async_trait] -impl Handler for Actor +impl Handler for Actor where T: xtra::Handler, { @@ -954,10 +968,12 @@ where } #[async_trait] -impl Handler for Actor +impl Handler + for Actor where O: xtra::Handler, M: xtra::Handler, + W: xtra::Handler, { async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context) { log_error!(self.handle_cfd_setup_completed(msg.order_id, msg.dlc)); @@ -965,7 +981,8 @@ where } #[async_trait] -impl Handler for Actor +impl Handler + for Actor where M: xtra::Handler, { @@ -975,18 +992,22 @@ where } #[async_trait] -impl Handler for Actor { +impl Handler for Actor +where + W: xtra::Handler, +{ async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context) { log_error!(self.handle_monitoring_event(msg)) } } #[async_trait] -impl Handler for Actor +impl Handler for Actor where T: xtra::Handler + xtra::Handler, M: xtra::Handler, + W: xtra::Handler, { async fn handle(&mut self, FromTaker { taker_id, msg }: FromTaker, _ctx: &mut Context) { match msg { @@ -1041,7 +1062,11 @@ where } #[async_trait] -impl Handler for Actor { +impl Handler + for Actor +where + W: xtra::Handler, +{ async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context) { log_error!(self.handle_oracle_attestation(msg)) } @@ -1071,4 +1096,4 @@ impl Message for FromTaker { type Result = (); } -impl xtra::Actor for Actor {} +impl xtra::Actor for Actor {} diff --git a/daemon/src/setup_contract.rs b/daemon/src/setup_contract.rs index 1243f39..dbeab5d 100644 --- a/daemon/src/setup_contract.rs +++ b/daemon/src/setup_contract.rs @@ -1,10 +1,9 @@ use crate::model::cfd::{Cet, Cfd, Dlc, RevokedCommit, Role}; use crate::tokio_ext::FutureExt; -use crate::wallet::Wallet; use crate::wire::{ Msg0, Msg1, Msg2, RollOverMsg, RollOverMsg0, RollOverMsg1, RollOverMsg2, SetupMsg, }; -use crate::{model, oracle, payout_curve}; +use crate::{model, oracle, payout_curve, wallet}; use anyhow::{Context, Result}; use bdk::bitcoin::secp256k1::{schnorrsig, Signature, SECP256K1}; use bdk::bitcoin::util::psbt::PartiallySignedTransaction; @@ -23,25 +22,33 @@ use std::collections::HashMap; use std::iter::FromIterator; use std::ops::RangeInclusive; use std::time::Duration; +use xtra::Address; /// Given an initial set of parameters, sets up the CFD contract with /// the other party. -pub async fn new( +pub async fn new( mut sink: impl Sink + Unpin, mut stream: impl FusedStream + Unpin, (oracle_pk, announcement): (schnorrsig::PublicKey, oracle::Announcement), cfd: Cfd, - wallet: Wallet, + wallet: Address, role: Role, -) -> Result { +) -> Result +where + W: xtra::Handler + xtra::Handler, +{ let (sk, pk) = crate::keypair::new(&mut rand::thread_rng()); let (rev_sk, rev_pk) = crate::keypair::new(&mut rand::thread_rng()); let (publish_sk, publish_pk) = crate::keypair::new(&mut rand::thread_rng()); let margin = cfd.margin().context("Failed to calculate margin")?; let own_params = wallet - .build_party_params(margin, pk) + .send(wallet::BuildPartyParams { + amount: margin, + identity_pk: pk, + }) .await + .context("Failed to send message to wallet actor")? .context("Failed to build party params")?; let own_punish = PunishParams { @@ -171,7 +178,11 @@ pub async fn new( tracing::info!("Verified all signatures"); - let mut signed_lock_tx = wallet.sign(lock_tx).await?; + let mut signed_lock_tx = wallet + .send(wallet::Sign { psbt: lock_tx }) + .await + .context("Failed to send message to wallet actor")? + .context("Failed to sign transaction")?; sink.send(SetupMsg::Msg2(Msg2 { signed_lock: signed_lock_tx.clone(), })) diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 41f3e8c..d3474e7 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -7,9 +7,9 @@ use daemon::db::{self}; use daemon::model::WalletInfo; use daemon::seed::Seed; -use daemon::wallet::Wallet; use daemon::{ - bitmex_price_feed, housekeeping, logger, monitor, oracle, taker_cfd, wallet_sync, Taker, + bitmex_price_feed, connection, housekeeping, logger, monitor, oracle, taker_cfd, wallet, + wallet_sync, TakerActorSystem, }; use sqlx::sqlite::SqliteConnectOptions; @@ -22,8 +22,9 @@ use std::str::FromStr; use tokio::sync::watch; use tracing_subscriber::filter::LevelFilter; use xtra::prelude::MessageChannel; +use xtra::spawn::TokioGlobalSpawnExt; +use xtra::Actor; -mod connection; mod routes_taker; #[derive(Clap)] @@ -121,13 +122,15 @@ async fn main() -> Result<()> { let bitcoin_network = opts.network.bitcoin_network(); let ext_priv_key = seed.derive_extended_priv_key(bitcoin_network)?; - let wallet = Wallet::new( + let wallet = wallet::Actor::new( opts.network.electrum(), &data_dir.join("taker_wallet.sqlite"), ext_priv_key, ) - .await?; - let wallet_info = wallet.sync().await.unwrap(); + .await? + .create(None) + .spawn_global(); + let wallet_info = wallet.send(wallet::Sync).await??; // TODO: Actually fetch it from Olivia let oracle = schnorrsig::PublicKey::from_str( @@ -165,12 +168,12 @@ async fn main() -> Result<()> { read_from_maker, } = connection::Actor::new(opts.maker).await; - let Taker { + let TakerActorSystem { cfd_actor_addr, cfd_feed_receiver, order_feed_receiver, update_cfd_feed_receiver, - } = Taker::new( + } = TakerActorSystem::new( db.clone(), wallet.clone(), oracle, diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index f2a98ae..74decb1 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -7,9 +7,8 @@ use crate::model::cfd::{ }; use crate::model::{BitMexPriceEventId, Usd}; use crate::monitor::{self, MonitorParams}; -use crate::wallet::Wallet; use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg}; -use crate::{log_error, oracle, setup_contract, wire}; +use crate::{log_error, oracle, setup_contract, wallet, wire}; use anyhow::{Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; @@ -68,9 +67,9 @@ enum RollOverState { None, } -pub struct Actor { +pub struct Actor { db: sqlx::SqlitePool, - wallet: Wallet, + wallet: Address, oracle_pk: schnorrsig::PublicKey, cfd_feed_actor_inbox: watch::Sender>, order_feed_actor_inbox: watch::Sender>, @@ -83,11 +82,16 @@ pub struct Actor { current_pending_proposals: UpdateCfdProposals, } -impl Actor { +impl Actor +where + W: xtra::Handler + + xtra::Handler + + xtra::Handler, +{ #[allow(clippy::too_many_arguments)] pub fn new( db: sqlx::SqlitePool, - wallet: Wallet, + wallet: Address, oracle_pk: schnorrsig::PublicKey, cfd_feed_actor_inbox: watch::Sender>, order_feed_actor_inbox: watch::Sender>, @@ -111,7 +115,9 @@ impl Actor { current_pending_proposals: HashMap::new(), } } +} +impl Actor { fn send_pending_update_proposals(&self) -> Result<()> { Ok(self .update_cfd_feed_sender @@ -165,6 +171,25 @@ impl Actor { Ok(()) } +} + +impl Actor +where + W: xtra::Handler + + xtra::Handler + + xtra::Handler, +{ + async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { + let mut conn = self.db.acquire().await?; + cfd_actors::handle_commit( + order_id, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; + Ok(()) + } async fn handle_propose_settlement( &mut self, @@ -255,42 +280,6 @@ impl Actor { Ok(()) } - async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { - let mut conn = self.db.acquire().await?; - cfd_actors::handle_monitoring_event( - event, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; - Ok(()) - } - - async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { - let mut conn = self.db.acquire().await?; - cfd_actors::handle_commit( - order_id, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; - Ok(()) - } - - async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { - let mut conn = self.db.acquire().await?; - cfd_actors::handle_oracle_attestation( - attestation, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; - Ok(()) - } - async fn handle_invalid_order_id(&mut self, order_id: OrderId) -> Result<()> { tracing::debug!(%order_id, "Invalid order ID"); @@ -311,7 +300,7 @@ impl Actor { } } -impl Actor +impl Actor where O: xtra::Handler, { @@ -334,7 +323,42 @@ where } Ok(()) } +} + +impl Actor +where + W: xtra::Handler, +{ + async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { + let mut conn = self.db.acquire().await?; + cfd_actors::handle_oracle_attestation( + attestation, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; + Ok(()) + } + async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { + let mut conn = self.db.acquire().await?; + cfd_actors::handle_monitoring_event( + event, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; + Ok(()) + } +} + +impl Actor +where + O: xtra::Handler, + W: xtra::Handler, +{ async fn handle_propose_roll_over(&mut self, order_id: OrderId) -> Result<()> { if self.current_pending_proposals.contains_key(&order_id) { anyhow::bail!("An update for order id {} is already in progress", order_id) @@ -371,11 +395,20 @@ where Ok(()) } } +impl Actor +where + O: xtra::Handler, + W: xtra::Handler + + xtra::Handler + + xtra::Handler, +{ +} -impl Actor +impl Actor where O: xtra::Handler, M: xtra::Handler, + W: xtra::Handler, { async fn handle_cfd_setup_completed( &mut self, @@ -399,8 +432,10 @@ where let txid = self .wallet - .try_broadcast_transaction(dlc.lock.0.clone()) - .await?; + .send(wallet::TryBroadcastTransaction { + tx: dlc.lock.0.clone(), + }) + .await??; tracing::info!("Lock transaction published with txid {}", txid); @@ -421,10 +456,11 @@ where } } -impl Actor +impl Actor where Self: xtra::Handler, O: xtra::Handler + xtra::Handler, + W: xtra::Handler + xtra::Handler, { async fn handle_order_accepted( &mut self, @@ -486,10 +522,13 @@ where } } -impl Actor +impl Actor where Self: xtra::Handler, O: xtra::Handler, + W: xtra::Handler + + xtra::Handler + + xtra::Handler, { async fn handle_roll_over_accepted( &mut self, @@ -546,7 +585,7 @@ where } } -impl Actor +impl Actor where M: xtra::Handler, { @@ -580,9 +619,12 @@ where } } -impl Actor +impl Actor where M: xtra::Handler, + W: xtra::Handler + + xtra::Handler + + xtra::Handler, { async fn handle_settlement_accepted( &mut self, @@ -630,16 +672,19 @@ where } #[async_trait] -impl Handler for Actor { +impl Handler for Actor { async fn handle(&mut self, msg: TakeOffer, _ctx: &mut Context) { log_error!(self.handle_take_offer(msg.order_id, msg.quantity)); } } #[async_trait] -impl Handler for Actor +impl Handler for Actor where O: xtra::Handler, + W: xtra::Handler + + xtra::Handler + + xtra::Handler, { async fn handle(&mut self, msg: CfdAction, _ctx: &mut Context) { use CfdAction::*; @@ -661,13 +706,16 @@ where } #[async_trait] -impl Handler for Actor +impl Handler for Actor where Self: xtra::Handler + xtra::Handler, O: xtra::Handler + xtra::Handler + xtra::Handler, M: xtra::Handler, + W: xtra::Handler + + xtra::Handler + + xtra::Handler, { async fn handle( &mut self, @@ -723,10 +771,11 @@ where } #[async_trait] -impl Handler for Actor +impl Handler for Actor where O: xtra::Handler, M: xtra::Handler, + W: xtra::Handler, { async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context) { log_error!(self.handle_cfd_setup_completed(msg.order_id, msg.dlc)); @@ -734,7 +783,7 @@ where } #[async_trait] -impl Handler for Actor +impl Handler for Actor where M: xtra::Handler, { @@ -744,14 +793,20 @@ where } #[async_trait] -impl Handler for Actor { +impl Handler for Actor +where + W: xtra::Handler, +{ async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context) { log_error!(self.handle_monitoring_event(msg)) } } #[async_trait] -impl Handler for Actor { +impl Handler for Actor +where + W: xtra::Handler, +{ async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context) { log_error!(self.handle_oracle_attestation(msg)) } @@ -778,4 +833,4 @@ impl Message for CfdRollOverCompleted { type Result = (); } -impl xtra::Actor for Actor {} +impl xtra::Actor for Actor {} diff --git a/daemon/src/wallet.rs b/daemon/src/wallet.rs index 639db0c..47119e9 100644 --- a/daemon/src/wallet.rs +++ b/daemon/src/wallet.rs @@ -13,9 +13,10 @@ use std::path::Path; use std::sync::Arc; use std::time::SystemTime; use tokio::sync::Mutex; +use xtra_productivity::xtra_productivity; #[derive(Clone)] -pub struct Wallet { +pub struct Actor { wallet: Arc>>, } @@ -23,7 +24,7 @@ pub struct Wallet { #[error("The transaction is already in the blockchain")] pub struct TransactionAlreadyInBlockchain; -impl Wallet { +impl Actor { pub async fn new( electrum_rpc_url: &str, wallet_dir: &Path, @@ -46,17 +47,11 @@ impl Wallet { Ok(Self { wallet }) } +} - pub async fn build_party_params( - &self, - amount: Amount, - identity_pk: PublicKey, - ) -> Result { - let wallet = self.wallet.lock().await; - wallet.build_party_params(amount, identity_pk) - } - - pub async fn sync(&self) -> Result { +#[xtra_productivity] +impl Actor { + pub async fn handle_sync(&self, _msg: Sync) -> Result { let wallet = self.wallet.lock().await; wallet .sync(NoopProgress, None) @@ -75,10 +70,8 @@ impl Wallet { Ok(wallet_info) } - pub async fn sign( - &self, - mut psbt: PartiallySignedTransaction, - ) -> Result { + pub async fn handle_sign(&self, msg: Sign) -> Result { + let mut psbt = msg.psbt; let wallet = self.wallet.lock().await; wallet @@ -94,7 +87,22 @@ impl Wallet { Ok(psbt) } - pub async fn try_broadcast_transaction(&self, tx: Transaction) -> Result { + pub async fn build_party_params( + &self, + BuildPartyParams { + amount, + identity_pk, + }: BuildPartyParams, + ) -> Result { + let wallet = self.wallet.lock().await; + wallet.build_party_params(amount, identity_pk) + } + + pub async fn handle_try_broadcast_transaction( + &self, + msg: TryBroadcastTransaction, + ) -> Result { + let tx = msg.tx; let wallet = self.wallet.lock().await; let txid = tx.txid(); @@ -128,6 +136,23 @@ impl Wallet { } } +impl xtra::Actor for Actor {} + +pub struct BuildPartyParams { + pub amount: Amount, + pub identity_pk: PublicKey, +} + +pub struct Sync; + +pub struct Sign { + pub psbt: PartiallySignedTransaction, +} + +pub struct TryBroadcastTransaction { + pub tx: Transaction, +} + fn parse_rpc_protocol_error_code(error_value: &Value) -> Result { let json = error_value .as_str() diff --git a/daemon/src/wallet_sync.rs b/daemon/src/wallet_sync.rs index 5c33fed..0ff13ae 100644 --- a/daemon/src/wallet_sync.rs +++ b/daemon/src/wallet_sync.rs @@ -1,14 +1,19 @@ use crate::model::WalletInfo; -use crate::wallet::Wallet; +use crate::wallet; use std::time::Duration; use tokio::sync::watch; use tokio::time::sleep; +use xtra::Address; -pub async fn new(wallet: Wallet, sender: watch::Sender) { +pub async fn new(wallet: Address, sender: watch::Sender) { loop { sleep(Duration::from_secs(10)).await; - let info = match wallet.sync().await { + let info = match wallet + .send(wallet::Sync) + .await + .expect("Wallet actor to be available") + { Ok(info) => info, Err(e) => { tracing::warn!("Failed to sync wallet: {:#}", e);