From 818d3849fc07feda8164b2053b68abdb48ac73a1 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Fri, 24 Dec 2021 16:00:57 +1100 Subject: [PATCH] Post-processing for events concerning wallet, monitor, oracle Removes a shitload of complexity. Will cleanup impl blocks (i.e. combine impl blocks) in cfd actors in separate commit. --- daemon/src/cfd_actors.rs | 66 ++----------- daemon/src/lib.rs | 42 ++++---- daemon/src/maker_cfd.rs | 174 +++++----------------------------- daemon/src/monitor.rs | 2 +- daemon/src/process_manager.rs | 128 ++++++++++++++++++++++++- daemon/src/routes_maker.rs | 4 +- daemon/src/routes_taker.rs | 3 +- daemon/src/taker_cfd.rs | 152 ++++------------------------- daemon/tests/harness/mod.rs | 6 +- 9 files changed, 202 insertions(+), 375 deletions(-) diff --git a/daemon/src/cfd_actors.rs b/daemon/src/cfd_actors.rs index 6b44fd6..57f08fc 100644 --- a/daemon/src/cfd_actors.rs +++ b/daemon/src/cfd_actors.rs @@ -1,14 +1,11 @@ use crate::db; use crate::model::cfd::Cfd; -use crate::model::cfd::CfdEvent; -use crate::model::cfd::Event; use crate::model::cfd::OrderId; use crate::monitor; use crate::oracle; use crate::process_manager; use crate::projection; use crate::try_continue; -use crate::wallet; use anyhow::Context; use anyhow::Result; use sqlx::pool::PoolConnection; @@ -25,15 +22,11 @@ pub async fn insert_cfd_and_update_feed( Ok(()) } -pub async fn handle_monitoring_event( +pub async fn handle_monitoring_event( event: monitor::Event, db: &SqlitePool, - wallet: &xtra::Address, process_manager_address: &xtra::Address, -) -> Result<()> -where - W: xtra::Handler, -{ +) -> Result<()> { let mut conn = db.acquire().await?; let order_id = event.order_id(); @@ -62,9 +55,6 @@ where .await? { tracing::error!("Sending event to process manager failed: {:#}", e); - } else { - // TODO: Move into process manager - post_process_event(event, wallet).await?; } Ok(()) @@ -99,15 +89,11 @@ pub async fn load_cfd(order_id: OrderId, conn: &mut PoolConnection) -> R Ok(cfd) } -pub async fn handle_commit( +pub async fn handle_commit( order_id: OrderId, conn: &mut PoolConnection, - wallet: &xtra::Address, process_manager_address: &xtra::Address, -) -> Result<()> -where - W: xtra::Handler, -{ +) -> Result<()> { let cfd = load_cfd(order_id, conn).await?; let event = cfd.manual_commit_to_blockchain()?; @@ -116,23 +102,16 @@ where .await? { tracing::error!("Sending event to process manager failed: {:#}", e); - } else { - // TODO: Move into process manager - post_process_event(event, wallet).await?; } Ok(()) } -pub async fn handle_oracle_attestation( +pub async fn handle_oracle_attestation( attestation: oracle::Attestation, db: &SqlitePool, - wallet: &xtra::Address, process_manager_address: &xtra::Address, -) -> Result<()> -where - W: xtra::Handler, -{ +) -> Result<()> { let mut conn = db.acquire().await?; tracing::debug!( @@ -153,42 +132,9 @@ where .await? { tracing::error!("Sending event to process manager failed: {:#}", e); - } else { - // TODO: Move into process manager - try_continue!(post_process_event(event, wallet).await); } } } Ok(()) } - -async fn post_process_event(event: Event, wallet: &xtra::Address) -> Result<()> -where - W: xtra::Handler, -{ - match event.event { - CfdEvent::OracleAttestedPostCetTimelock { cet, .. } - | CfdEvent::CetTimelockConfirmedPostOracleAttestation { cet } => { - let txid = wallet - .send(wallet::TryBroadcastTransaction { tx: cet }) - .await? - .context("Failed to broadcast CET")?; - - tracing::info!(%txid, "CET published"); - } - CfdEvent::OracleAttestedPriorCetTimelock { commit_tx: tx, .. } - | CfdEvent::ManualCommit { tx } => { - let txid = wallet - .send(wallet::TryBroadcastTransaction { tx }) - .await? - .context("Failed to broadcast commit transaction")?; - - tracing::info!(%txid, "Commit transaction published"); - } - - _ => {} - } - - Ok(()) -} diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 5aca129..843095e 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -111,22 +111,18 @@ impl Tasks { } } -pub struct MakerActorSystem { - pub cfd_actor_addr: Address>, +pub struct MakerActorSystem { + pub cfd_actor_addr: Address>, wallet_actor_addr: Address, inc_conn_addr: Address, _tasks: Tasks, } -impl MakerActorSystem +impl MakerActorSystem where O: xtra::Handler + xtra::Handler + xtra::Handler, - M: xtra::Handler - + xtra::Handler - + xtra::Handler - + xtra::Handler, T: xtra::Handler + xtra::Handler + xtra::Handler @@ -142,7 +138,7 @@ where + xtra::Handler, { #[allow(clippy::too_many_arguments)] - pub async fn new( + pub async fn new( db: SqlitePool, wallet_addr: Address, oracle_pk: schnorrsig::PublicKey, @@ -158,6 +154,10 @@ where projection_actor: Address, ) -> Result where + M: xtra::Handler + + xtra::Handler + + xtra::Handler + + xtra::Handler, FO: Future>, FM: Future>, { @@ -172,6 +172,10 @@ where db.clone(), Role::Maker, &projection_actor, + &wallet_addr, + &monitor_addr, + &monitor_addr, + &oracle_addr, ))); let (cfd_actor_addr, cfd_actor_fut) = maker_cfd::Actor::new( @@ -182,7 +186,6 @@ where projection_actor, process_manager_addr.clone(), inc_conn_addr.clone(), - monitor_addr.clone(), oracle_addr.clone(), n_payouts, ) @@ -318,23 +321,19 @@ where } } -pub struct TakerActorSystem { - pub cfd_actor_addr: Address>, +pub struct TakerActorSystem { + pub cfd_actor_addr: Address>, pub connection_actor_addr: Address, pub maker_online_status_feed_receiver: watch::Receiver, wallet_actor_addr: Address, _tasks: Tasks, } -impl TakerActorSystem +impl TakerActorSystem where O: xtra::Handler + xtra::Handler + xtra::Handler, - M: xtra::Handler - + xtra::Handler - + xtra::Handler - + xtra::Handler, W: xtra::Handler + xtra::Handler + xtra::Handler @@ -342,7 +341,7 @@ where + xtra::Handler, { #[allow(clippy::too_many_arguments)] - pub async fn new( + pub async fn new( db: SqlitePool, wallet_actor_addr: Address, oracle_pk: schnorrsig::PublicKey, @@ -356,6 +355,10 @@ where maker_identity: Identity, ) -> Result where + M: xtra::Handler + + xtra::Handler + + xtra::Handler + + xtra::Handler, FO: Future>, FM: Future>, { @@ -372,6 +375,10 @@ where db.clone(), Role::Taker, &projection_actor, + &wallet_actor_addr, + &monitor_addr, + &monitor_addr, + &oracle_addr, ))); let (connection_actor_addr, connection_actor_ctx) = xtra::Context::new(None); @@ -382,7 +389,6 @@ where projection_actor.clone(), process_manager_addr, connection_actor_addr.clone(), - monitor_addr.clone(), oracle_addr.clone(), n_payouts, maker_identity, diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 49cb397..9006fd5 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -7,7 +7,6 @@ use crate::collab_settlement_maker; use crate::maker_inc_connections; use crate::model; use crate::model::cfd::Cfd; -use crate::model::cfd::CfdEvent; use crate::model::cfd::CollaborativeSettlement; use crate::model::cfd::Order; use crate::model::cfd::OrderId; @@ -21,7 +20,6 @@ use crate::model::Position; use crate::model::Price; use crate::model::Usd; use crate::monitor; -use crate::monitor::MonitorParams; use crate::oracle; use crate::process_manager; use crate::projection; @@ -34,7 +32,6 @@ use crate::wallet; use crate::wire; use crate::wire::TakerToMaker; use crate::Tasks; -use anyhow::bail; use anyhow::Context as _; use anyhow::Result; use async_trait::async_trait; @@ -91,7 +88,7 @@ pub struct FromTaker { pub msg: wire::TakerToMaker, } -pub struct Actor { +pub struct Actor { db: sqlx::SqlitePool, wallet: Address, settlement_interval: Duration, @@ -101,7 +98,6 @@ pub struct Actor { rollover_actors: AddressMap, takers: Address, current_order: Option, - monitor_actor: Address, setup_actors: AddressMap, settlement_actors: AddressMap, oracle_actor: Address, @@ -110,7 +106,7 @@ pub struct Actor { tasks: Tasks, } -impl Actor { +impl Actor { #[allow(clippy::too_many_arguments)] pub fn new( db: sqlx::SqlitePool, @@ -120,7 +116,6 @@ impl Actor { projection_actor: Address, process_manager_actor: Address, takers: Address, - monitor_actor: Address, oracle_actor: Address, n_payouts: usize, ) -> Self { @@ -134,7 +129,6 @@ impl Actor { rollover_actors: AddressMap::default(), takers, current_order: None, - monitor_actor, setup_actors: AddressMap::default(), oracle_actor, n_payouts, @@ -157,7 +151,7 @@ impl Actor { } } -impl Actor +impl Actor where T: xtra::Handler, { @@ -186,7 +180,7 @@ where } #[xtra_productivity] -impl Actor { +impl Actor { async fn handle_accept_rollover(&mut self, msg: AcceptRollOver) -> Result<()> { if self .rollover_actors @@ -214,10 +208,9 @@ impl Actor { } } -impl Actor +impl Actor where O: xtra::Handler + xtra::Handler, - M: xtra::Handler, T: xtra::Handler + xtra::Handler> + xtra::Handler, @@ -276,16 +269,15 @@ where } #[xtra_productivity(message_impl = false)] -impl Actor { +impl Actor { async fn handle_rollover_actor_stopping(&mut self, msg: Stopping) { self.rollover_actors.gc(msg); } } -impl Actor +impl Actor where O: xtra::Handler + xtra::Handler, - M: xtra::Handler, T: xtra::Handler + xtra::Handler + xtra::Handler @@ -390,7 +382,7 @@ where } #[xtra_productivity] -impl Actor { +impl Actor { async fn handle_accept_order(&mut self, msg: AcceptOrder) -> Result<()> { let AcceptOrder { order_id } = msg; @@ -441,7 +433,7 @@ impl Actor { } #[xtra_productivity(message_impl = false)] -impl Actor { +impl Actor { async fn handle_setup_actor_stopping(&mut self, message: Stopping) { self.setup_actors.gc(message); } @@ -455,11 +447,7 @@ impl Actor { } #[xtra_productivity(message_impl = false)] -impl Actor -where - M: xtra::Handler, - W: xtra::Handler, -{ +impl Actor { async fn handle_settlement_completed( &mut self, msg: model::cfd::Completed, @@ -477,80 +465,24 @@ where tracing::error!("Sending event to process manager failed: {:#}", e); } - match event.event { - CfdEvent::CollaborativeSettlementCompleted { - spend_tx, script, .. - } => { - let txid = self - .wallet - .send(wallet::TryBroadcastTransaction { tx: spend_tx }) - .await? - .context("Broadcasting close transaction")?; - - tracing::info!(%order_id, "Close transaction published with txid {}", txid); - - self.monitor_actor - .send(monitor::CollaborativeSettlement { - order_id, - tx: (txid, script), - }) - .await?; - } - CfdEvent::CollaborativeSettlementRejected { commit_tx } => { - let txid = self - .wallet - .send(wallet::TryBroadcastTransaction { tx: commit_tx }) - .await? - .context("Broadcasting commit transaction")?; - - tracing::info!( - "Closing non-collaboratively. Commit tx published with txid {}", - txid - ) - } - CfdEvent::CollaborativeSettlementFailed { commit_tx } => { - let txid = self - .wallet - .send(wallet::TryBroadcastTransaction { tx: commit_tx }) - .await? - .context("Broadcasting commit transaction")?; - - tracing::warn!( - "Closing non-collaboratively. Commit tx published with txid {}", - txid - ) - } - _ => bail!("Unexpected event {:?}", event.event), - } - Ok(()) } } #[xtra_productivity] -impl Actor -where - W: xtra::Handler, -{ +impl Actor { async fn handle_commit(&mut self, msg: Commit) -> Result<()> { let Commit { order_id } = msg; let mut conn = self.db.acquire().await?; - cfd_actors::handle_commit( - order_id, - &mut conn, - &self.wallet, - &self.process_manager_actor, - ) - .await?; + cfd_actors::handle_commit(order_id, &mut conn, &self.process_manager_actor).await?; Ok(()) } } -impl Actor +impl Actor where - M: xtra::Handler, O: xtra::Handler, { async fn handle_roll_over_completed(&mut self, _: Completed) -> Result<()> { @@ -560,10 +492,9 @@ where } } -impl Actor +impl Actor where O: xtra::Handler, - M: xtra::Handler + xtra::Handler, T: xtra::Handler + xtra::Handler>, W: xtra::Handler, @@ -608,7 +539,7 @@ where } #[xtra_productivity] -impl Actor +impl Actor where T: xtra::Handler, { @@ -652,12 +583,7 @@ where } #[xtra_productivity(message_impl = false)] -impl Actor -where - O: xtra::Handler, - M: xtra::Handler, - W: xtra::Handler, -{ +impl Actor { async fn handle_setup_completed(&mut self, msg: SetupCompleted) -> Result<()> { let order_id = msg.order_id(); let mut conn = self.db.acquire().await?; @@ -672,44 +598,12 @@ where tracing::error!("Sending event to process manager failed: {:#}", e); } - let dlc = match event.event { - CfdEvent::ContractSetupCompleted { dlc } => dlc, - CfdEvent::OfferRejected | CfdEvent::ContractSetupFailed => { - return Ok(()); - } - _ => bail!("Unexpected event {:?}", event.event), - }; - - tracing::info!("Setup complete, publishing on chain now"); - - let txid = self - .wallet - .send(wallet::TryBroadcastTransaction { - tx: dlc.lock.0.clone(), - }) - .await??; - - tracing::info!("Lock transaction published with txid {}", txid); - - self.monitor_actor - .send(monitor::StartMonitoring { - id: order_id, - params: MonitorParams::new(dlc.clone()), - }) - .await?; - - self.oracle_actor - .send(oracle::MonitorAttestation { - event_id: dlc.settlement_event_id, - }) - .await?; - Ok(()) } } #[async_trait] -impl Handler for Actor +impl Handler for Actor where T: xtra::Handler, { @@ -719,8 +613,7 @@ where } #[async_trait] -impl Handler - for Actor +impl Handler for Actor where T: xtra::Handler, { @@ -730,9 +623,8 @@ where } #[async_trait] -impl Handler for Actor +impl Handler for Actor where - M: xtra::Handler, O: xtra::Handler, { async fn handle(&mut self, msg: Completed, _ctx: &mut Context) -> Result<()> { @@ -741,31 +633,18 @@ where } #[xtra_productivity(message_impl = false)] -impl Actor -where - W: xtra::Handler, -{ +impl Actor { async fn handle_monitor(&mut self, msg: monitor::Event) { - if let Err(e) = cfd_actors::handle_monitoring_event( - msg, - &self.db, - &self.wallet, - &self.process_manager_actor, - ) - .await + if let Err(e) = + cfd_actors::handle_monitoring_event(msg, &self.db, &self.process_manager_actor).await { tracing::error!("Unable to handle monotoring event: {:#}", e) } } async fn handle_attestation(&mut self, msg: oracle::Attestation) { - if let Err(e) = cfd_actors::handle_oracle_attestation( - msg, - &self.db, - &self.wallet, - &self.process_manager_actor, - ) - .await + if let Err(e) = + cfd_actors::handle_oracle_attestation(msg, &self.db, &self.process_manager_actor).await { tracing::warn!("Failed to handle oracle attestation: {:#}", e) } @@ -773,10 +652,9 @@ where } #[async_trait] -impl Handler for Actor +impl Handler for Actor where O: xtra::Handler + xtra::Handler, - M: xtra::Handler + xtra::Handler, T: xtra::Handler + xtra::Handler + xtra::Handler @@ -879,4 +757,4 @@ impl Message for FromTaker { type Result = (); } -impl xtra::Actor for Actor {} +impl xtra::Actor for Actor {} diff --git a/daemon/src/monitor.rs b/daemon/src/monitor.rs index 450da58..32396e0 100644 --- a/daemon/src/monitor.rs +++ b/daemon/src/monitor.rs @@ -100,7 +100,7 @@ impl Cfd { // might become relevant. See also https://github.com/itchysats/itchysats/issues/605 and https://github.com/itchysats/itchysats/issues/236. fn apply(self, event: cfd::Event) -> Self { match event.event { - CfdEvent::ContractSetupCompleted { dlc } => Self { + CfdEvent::ContractSetupCompleted { dlc, .. } => Self { params: Some(MonitorParams::new(dlc)), monitor_lock_finality: true, monitor_commit_finality: true, diff --git a/daemon/src/process_manager.rs b/daemon/src/process_manager.rs index 7b2ab27..bae22f3 100644 --- a/daemon/src/process_manager.rs +++ b/daemon/src/process_manager.rs @@ -1,15 +1,25 @@ use crate::db::append_event; use crate::model::cfd; +use crate::model::cfd::CfdEvent; use crate::model::cfd::Role; +use crate::monitor; +use crate::monitor::MonitorParams; +use crate::oracle; use crate::projection; +use crate::wallet; +use anyhow::Context; use anyhow::Result; use xtra::prelude::MessageChannel; use xtra_productivity::xtra_productivity; pub struct Actor { db: sqlx::SqlitePool, - _role: Role, + role: Role, cfds_changed: Box>, + try_broadcast_transaction: Box>, + start_monitoring: Box>, + monitor_collaborative_settlement: Box>, + monitor_attestation: Box>, } pub struct Event(cfd::Event); @@ -25,11 +35,20 @@ impl Actor { db: sqlx::SqlitePool, role: Role, cfds_changed: &(impl MessageChannel + 'static), + try_broadcast_transaction: &(impl MessageChannel + 'static), + start_monitoring: &(impl MessageChannel + 'static), + monitor_collaborative_settlement: &(impl MessageChannel + + 'static), + monitor_attestation: &(impl MessageChannel + 'static), ) -> Self { Self { db, - _role: role, + role, cfds_changed: cfds_changed.clone_channel(), + try_broadcast_transaction: try_broadcast_transaction.clone_channel(), + start_monitoring: start_monitoring.clone_channel(), + monitor_collaborative_settlement: monitor_collaborative_settlement.clone_channel(), + monitor_attestation: monitor_attestation.clone_channel(), } } } @@ -43,7 +62,110 @@ impl Actor { let mut conn = self.db.acquire().await?; append_event(event.clone(), &mut conn).await?; - // TODO: 2. Post-process event by sending out messages + // 2. Post process event + match event.event { + CfdEvent::ContractSetupCompleted { dlc } => { + tracing::info!("Setup complete, publishing on chain now"); + + let lock_tx = dlc.lock.0.clone(); + let txid = self + .try_broadcast_transaction + .send(wallet::TryBroadcastTransaction { tx: lock_tx }) + .await??; + + tracing::info!("Lock transaction published with txid {}", txid); + + self.start_monitoring + .send(monitor::StartMonitoring { + id: event.id, + params: MonitorParams::new(dlc.clone()), + }) + .await?; + + self.monitor_attestation + .send(oracle::MonitorAttestation { + event_id: dlc.settlement_event_id, + }) + .await?; + } + CfdEvent::CollaborativeSettlementCompleted { + spend_tx, script, .. + } => { + let txid = match self.role { + Role::Maker => { + let txid = self + .try_broadcast_transaction + .send(wallet::TryBroadcastTransaction { tx: spend_tx }) + .await? + .context("Broadcasting close transaction")?; + + tracing::info!(order_id=%event.id, "Close transaction published with txid {}", txid); + + txid + } + Role::Taker => { + // TODO: Publish the tx once the collaborative settlement is symmetric, + // allowing the taker to publish as well. + let txid = spend_tx.txid(); + tracing::info!(order_id=%event.id, "Collaborative settlement completed successfully {}", txid); + txid + } + }; + + self.monitor_collaborative_settlement + .send(monitor::CollaborativeSettlement { + order_id: event.id, + tx: (txid, script), + }) + .await?; + } + CfdEvent::CollaborativeSettlementRejected { commit_tx } => { + let txid = self + .try_broadcast_transaction + .send(wallet::TryBroadcastTransaction { tx: commit_tx }) + .await? + .context("Broadcasting commit transaction")?; + + tracing::info!( + "Closing non-collaboratively. Commit tx published with txid {}", + txid + ) + } + CfdEvent::CollaborativeSettlementFailed { commit_tx } => { + let txid = self + .try_broadcast_transaction + .send(wallet::TryBroadcastTransaction { tx: commit_tx }) + .await? + .context("Broadcasting commit transaction")?; + + tracing::warn!( + "Closing non-collaboratively. Commit tx published with txid {}", + txid + ) + } + CfdEvent::OracleAttestedPostCetTimelock { cet, .. } + | CfdEvent::CetTimelockConfirmedPostOracleAttestation { cet } => { + let txid = self + .try_broadcast_transaction + .send(wallet::TryBroadcastTransaction { tx: cet }) + .await? + .context("Failed to broadcast CET")?; + + tracing::info!(%txid, "CET published"); + } + CfdEvent::OracleAttestedPriorCetTimelock { commit_tx: tx, .. } + | CfdEvent::ManualCommit { tx } => { + let txid = self + .try_broadcast_transaction + .send(wallet::TryBroadcastTransaction { tx }) + .await? + .context("Failed to broadcast commit transaction")?; + + tracing::info!(%txid, "Commit transaction published"); + } + + _ => {} // TODO: Monitor post processing for rollover + } // 3. Update UI self.cfds_changed.send(projection::CfdsChanged).await?; diff --git a/daemon/src/routes_maker.rs b/daemon/src/routes_maker.rs index 4180329..8862397 100644 --- a/daemon/src/routes_maker.rs +++ b/daemon/src/routes_maker.rs @@ -7,7 +7,6 @@ use daemon::model::Identity; use daemon::model::Price; use daemon::model::Usd; use daemon::model::WalletInfo; -use daemon::monitor; use daemon::oracle; use daemon::projection::Cfd; use daemon::projection::CfdAction; @@ -33,8 +32,7 @@ use std::path::PathBuf; use tokio::select; use tokio::sync::watch; -pub type Maker = - MakerActorSystem; +pub type Maker = MakerActorSystem; #[allow(clippy::too_many_arguments)] #[rocket::get("/feed")] diff --git a/daemon/src/routes_taker.rs b/daemon/src/routes_taker.rs index 6a876cd..18a80d6 100644 --- a/daemon/src/routes_taker.rs +++ b/daemon/src/routes_taker.rs @@ -8,7 +8,6 @@ use daemon::model::Leverage; use daemon::model::Price; use daemon::model::Usd; use daemon::model::WalletInfo; -use daemon::monitor; use daemon::oracle; use daemon::projection; use daemon::projection::CfdAction; @@ -34,7 +33,7 @@ use std::path::PathBuf; use tokio::select; use tokio::sync::watch; -type Taker = TakerActorSystem; +type Taker = TakerActorSystem; #[rocket::get("/feed")] pub async fn feed( diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index e535389..20b8c9e 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -5,7 +5,6 @@ use crate::cfd_actors::load_cfd; use crate::collab_settlement_taker; use crate::connection; use crate::model::cfd::Cfd; -use crate::model::cfd::CfdEvent; use crate::model::cfd::CollaborativeSettlement; use crate::model::cfd::Completed; use crate::model::cfd::Order; @@ -18,17 +17,14 @@ use crate::model::Position; use crate::model::Price; use crate::model::Usd; use crate::monitor; -use crate::monitor::MonitorParams; use crate::oracle; use crate::process_manager; use crate::projection; use crate::setup_taker; use crate::wallet; use crate::Tasks; -use anyhow::bail; use anyhow::Context as _; use anyhow::Result; -use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; use xtra::prelude::*; use xtra::Actor as _; @@ -50,14 +46,13 @@ pub struct Commit { pub order_id: OrderId, } -pub struct Actor { +pub struct Actor { db: sqlx::SqlitePool, wallet: Address, oracle_pk: schnorrsig::PublicKey, projection_actor: Address, process_manager_actor: Address, conn_actor: Address, - monitor_actor: Address, setup_actors: AddressMap, collab_settlement_actors: AddressMap, oracle_actor: Address, @@ -67,7 +62,7 @@ pub struct Actor { maker_identity: Identity, } -impl Actor +impl Actor where W: xtra::Handler + xtra::Handler @@ -81,7 +76,6 @@ where projection_actor: Address, process_manager_actor: Address, conn_actor: Address, - monitor_actor: Address, oracle_actor: Address, n_payouts: usize, maker_identity: Identity, @@ -93,7 +87,6 @@ where projection_actor, process_manager_actor, conn_actor, - monitor_actor, oracle_actor, n_payouts, setup_actors: AddressMap::default(), @@ -106,22 +99,15 @@ where } #[xtra_productivity] -impl Actor +impl Actor where W: xtra::Handler, - M: xtra::Handler, { async fn handle_commit(&mut self, msg: Commit) -> Result<()> { let Commit { order_id } = msg; let mut conn = self.db.acquire().await?; - cfd_actors::handle_commit( - order_id, - &mut conn, - &self.wallet, - &self.process_manager_actor, - ) - .await?; + cfd_actors::handle_commit(order_id, &mut conn, &self.process_manager_actor).await?; Ok(()) } @@ -165,11 +151,7 @@ where } #[xtra_productivity(message_impl = false)] -impl Actor -where - W: xtra::Handler, - M: xtra::Handler, -{ +impl Actor { async fn handle_settlement_completed( &mut self, msg: Completed, @@ -187,55 +169,11 @@ where tracing::error!("Sending event to process manager failed: {:#}", e); } - match event.event { - CfdEvent::CollaborativeSettlementCompleted { - spend_tx, script, .. - } => { - // TODO: Publish the tx once the collaborative settlement is symmetric, allowing the - // taker to publish as well. - - let txid = spend_tx.txid(); - tracing::info!(%order_id, "Collaborative settlement completed successfully {}", txid); - - self.monitor_actor - .send(monitor::CollaborativeSettlement { - order_id, - tx: (txid, script), - }) - .await?; - } - CfdEvent::CollaborativeSettlementRejected { commit_tx } => { - let txid = self - .wallet - .send(wallet::TryBroadcastTransaction { tx: commit_tx }) - .await? - .context("Broadcasting commit transaction")?; - - tracing::info!( - "Closing non-collaboratively. Commit tx published with txid {}", - txid - ) - } - CfdEvent::CollaborativeSettlementFailed { commit_tx } => { - let txid = self - .wallet - .send(wallet::TryBroadcastTransaction { tx: commit_tx }) - .await? - .context("Broadcasting commit transaction")?; - - tracing::warn!( - "Closing non-collaboratively. Commit tx published with txid {}", - txid - ) - } - _ => bail!("Unexpected event {:?}", event.event), - } - Ok(()) } } -impl Actor { +impl Actor { async fn handle_new_order(&mut self, order: Option) -> Result<()> { tracing::trace!("new order {:?}", order); match order { @@ -257,7 +195,7 @@ impl Actor { } #[xtra_productivity] -impl Actor +impl Actor where Self: xtra::Handler, O: xtra::Handler + xtra::Handler, @@ -331,12 +269,8 @@ where } } -impl Actor -where - O: xtra::Handler, - M: xtra::Handler, - W: xtra::Handler, -{ +#[xtra_productivity(message_impl = false)] +impl Actor { async fn handle_setup_completed(&mut self, msg: SetupCompleted) -> Result<()> { let mut conn = self.db.acquire().await?; let order_id = msg.order_id(); @@ -351,91 +285,37 @@ where tracing::error!("Sending event to process manager failed: {:#}", e); } - let dlc = match event.event { - CfdEvent::ContractSetupCompleted { dlc } => dlc, - CfdEvent::OfferRejected | CfdEvent::ContractSetupFailed => { - return Ok(()); - } - _ => bail!("Unexpected event {:?}", event.event), - }; - - tracing::info!("Setup complete, publishing on chain now"); - - let txid = self - .wallet - .send(wallet::TryBroadcastTransaction { - tx: dlc.lock.0.clone(), - }) - .await??; - - tracing::info!("Lock transaction published with txid {}", txid); - - self.monitor_actor - .send(monitor::StartMonitoring { - id: order_id, - params: MonitorParams::new(dlc.clone()), - }) - .await?; - - self.oracle_actor - .send(oracle::MonitorAttestation { - event_id: dlc.settlement_event_id, - }) - .await?; - Ok(()) } } #[xtra_productivity] -impl Actor { +impl Actor { async fn handle_current_order(&mut self, msg: CurrentOrder) -> Result<()> { self.handle_new_order(msg.0).await } } -#[async_trait] -impl Handler for Actor -where - O: xtra::Handler, - M: xtra::Handler, - W: xtra::Handler, -{ - async fn handle(&mut self, msg: SetupCompleted, _ctx: &mut Context) -> Result<()> { - self.handle_setup_completed(msg).await - } -} - #[xtra_productivity(message_impl = false)] -impl Actor +impl Actor where W: xtra::Handler, { async fn handle_monitor(&mut self, msg: monitor::Event) { - if let Err(e) = cfd_actors::handle_monitoring_event( - msg, - &self.db, - &self.wallet, - &self.process_manager_actor, - ) - .await + if let Err(e) = + cfd_actors::handle_monitoring_event(msg, &self.db, &self.process_manager_actor).await { tracing::error!("Unable to handle monotoring event: {:#}", e) } } async fn handle_attestation(&mut self, msg: oracle::Attestation) { - if let Err(e) = cfd_actors::handle_oracle_attestation( - msg, - &self.db, - &self.wallet, - &self.process_manager_actor, - ) - .await + if let Err(e) = + cfd_actors::handle_oracle_attestation(msg, &self.db, &self.process_manager_actor).await { tracing::warn!("Failed to handle oracle attestation: {:#}", e) } } } -impl xtra::Actor for Actor {} +impl xtra::Actor for Actor {} diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index 00c9089..13e29fa 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -1,4 +1,3 @@ -use crate::harness::mocks::monitor::MonitorActor; use crate::harness::mocks::oracle::OracleActor; use crate::harness::mocks::wallet::WalletActor; use crate::schnorrsig; @@ -116,8 +115,7 @@ impl Default for TakerConfig { /// Maker Test Setup pub struct Maker { - pub system: - MakerActorSystem, + pub system: MakerActorSystem, pub mocks: mocks::Mocks, pub feeds: Feeds, pub listen_addr: SocketAddr, @@ -238,7 +236,7 @@ impl Maker { /// Taker Test Setup pub struct Taker { pub id: Identity, - pub system: daemon::TakerActorSystem, + pub system: daemon::TakerActorSystem, pub mocks: mocks::Mocks, pub feeds: Feeds, _tasks: Tasks,