From dc0abb92d3522f697802970d88909338d96a169a Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Thu, 14 Oct 2021 18:08:31 +1030 Subject: [PATCH 1/3] Remove stale TODO The Payout enum got implemented --- daemon/src/model/cfd.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/daemon/src/model/cfd.rs b/daemon/src/model/cfd.rs index 996314e..ffb34d9 100644 --- a/daemon/src/model/cfd.rs +++ b/daemon/src/model/cfd.rs @@ -262,7 +262,6 @@ pub enum CfdState { /// commit + cet). Closed { common: CfdStateCommon, - // TODO: Use an enum of either Attestation or CollaborativeSettlement payout: Payout, }, From 6779486f762d272bdf77a8f7ffc1edaa73846f0f Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Thu, 14 Oct 2021 18:59:06 +1030 Subject: [PATCH 2/3] Create wrapper functions to avoid the need for manually refreshing feed Inserting cfd or updating its state requires a follow-up update on the Cfd feed sent to the UI. Encapsulate the behaviour in functions that are shared across Cfd actors. --- daemon/src/actors.rs | 28 +++++++++++++++ daemon/src/db.rs | 1 - daemon/src/maker_cfd.rs | 80 ++++++++++++++++++++++------------------- daemon/src/taker_cfd.rs | 78 ++++++++++++++++++++++------------------ 4 files changed, 115 insertions(+), 72 deletions(-) diff --git a/daemon/src/actors.rs b/daemon/src/actors.rs index c97159d..4f329a3 100644 --- a/daemon/src/actors.rs +++ b/daemon/src/actors.rs @@ -1,3 +1,10 @@ +use crate::{ + db, + model::cfd::{Cfd, CfdState, OrderId}, +}; +use sqlx::{pool::PoolConnection, Sqlite}; +use tokio::sync::watch; + /// Wrapper for handlers to log errors #[macro_export] macro_rules! log_error { @@ -7,3 +14,24 @@ macro_rules! log_error { } }; } + +pub async fn insert_cfd( + cfd: Cfd, + conn: &mut PoolConnection, + update_sender: &watch::Sender>, +) -> anyhow::Result<()> { + db::insert_cfd(cfd, conn).await?; + update_sender.send(db::load_all_cfds(conn).await?)?; + Ok(()) +} + +pub async fn insert_new_cfd_state_by_order_id( + order_id: OrderId, + new_state: &CfdState, + conn: &mut PoolConnection, + update_sender: &watch::Sender>, +) -> anyhow::Result<()> { + db::insert_new_cfd_state_by_order_id(order_id, new_state, conn).await?; + update_sender.send(db::load_all_cfds(conn).await?)?; + Ok(()) +} diff --git a/daemon/src/db.rs b/daemon/src/db.rs index f851039..a426775 100644 --- a/daemon/src/db.rs +++ b/daemon/src/db.rs @@ -148,7 +148,6 @@ pub async fn insert_cfd(cfd: Cfd, conn: &mut PoolConnection) -> anyhow:: Ok(()) } -#[allow(dead_code)] pub async fn insert_new_cfd_state_by_order_id( order_id: OrderId, new_state: &CfdState, diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 54ef783..4c21e9b 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -1,6 +1,7 @@ +use crate::actors::insert_cfd; +use crate::actors::insert_new_cfd_state_by_order_id; use crate::db::{ - insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds, - load_cfd_by_order_id, load_cfds_by_oracle_event_id, load_order_by_id, + insert_order, load_cfd_by_order_id, load_cfds_by_oracle_event_id, load_order_by_id, }; use crate::maker_inc_connections::TakerCommand; use crate::model::cfd::{ @@ -310,9 +311,13 @@ impl Actor { proposal.price, ), ))?; - insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, &mut conn).await?; - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; + insert_new_cfd_state_by_order_id( + cfd.order.id, + &cfd.state, + &mut conn, + &self.cfd_feed_actor_inbox, + ) + .await?; let spend_tx = dlc.finalize_spend_transaction((tx, sig_maker), sig_taker)?; @@ -429,12 +434,10 @@ impl Actor { attestation: None, }, &mut conn, + &self.cfd_feed_actor_inbox, ) .await?; - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; - let txid = self .wallet .try_broadcast_transaction(dlc.lock.0.clone()) @@ -480,12 +483,10 @@ impl Actor { collaborative_close: None, }, &mut conn, + &self.cfd_feed_actor_inbox, ) .await?; - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; self.monitor_actor @@ -547,10 +548,7 @@ impl Actor { taker_id, }, ); - insert_cfd(cfd, &mut conn).await?; - - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; + insert_cfd(cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; // 4. Remove current order self.current_order_id = None; @@ -604,12 +602,10 @@ impl Actor { }, }, &mut conn, + &self.cfd_feed_actor_inbox, ) .await?; - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; - // 4. Notify the taker that we are ready for contract setup // Use `.send` here to ensure we only continue once the message has been sent // Nothing done after this call should be able to fail, otherwise we notified the taker, but @@ -682,9 +678,9 @@ impl Actor { }, }, &mut conn, + &self.cfd_feed_actor_inbox, ) - .await - .unwrap(); + .await?; self.takers .do_send_async(maker_inc_connections::TakerMessage { @@ -692,8 +688,6 @@ impl Actor { command: TakerCommand::NotifyOrderRejected { id: order_id }, }) .await?; - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; // Remove order for all self.current_order_id = None; @@ -719,11 +713,15 @@ impl Actor { tracing::info!("Commit transaction published on chain: {}", txid); if let Some(new_state) = cfd.handle(CfdStateChangeEvent::CommitTxSent)? { - insert_new_cfd_state_by_order_id(cfd.order.id, &new_state, &mut conn).await?; + insert_new_cfd_state_by_order_id( + cfd.order.id, + &new_state, + &mut conn, + &self.cfd_feed_actor_inbox, + ) + .await?; } - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; Ok(()) } @@ -892,10 +890,13 @@ impl Actor { return Ok(()); } - insert_new_cfd_state_by_order_id(order_id, &cfd.state, &mut conn).await?; - - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; + insert_new_cfd_state_by_order_id( + order_id, + &cfd.state, + &mut conn, + &self.cfd_feed_actor_inbox, + ) + .await?; // TODO: code duplication maker/taker if let CfdState::OpenCommitted { .. } = cfd.state { @@ -942,9 +943,13 @@ impl Actor { continue; } - insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, &mut conn).await?; - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; + insert_new_cfd_state_by_order_id( + cfd.order.id, + &cfd.state, + &mut conn, + &self.cfd_feed_actor_inbox, + ) + .await?; if let Err(e) = self.try_cet_publication(cfd).await { tracing::error!("Error when trying to publish CET: {:#}", e); @@ -968,10 +973,13 @@ impl Actor { bail!("If we can get the CET we should be able to transition") } - insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, &mut conn).await?; - - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; + insert_new_cfd_state_by_order_id( + cfd.order.id, + &cfd.state, + &mut conn, + &self.cfd_feed_actor_inbox, + ) + .await?; } Err(not_ready_yet) => { tracing::debug!("{:#}", not_ready_yet); diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index c0df59c..a640f1f 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -1,6 +1,7 @@ +use crate::actors::insert_cfd; +use crate::actors::insert_new_cfd_state_by_order_id; use crate::db::{ - insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds, - load_cfd_by_order_id, load_cfds_by_oracle_event_id, load_order_by_id, + insert_order, load_cfd_by_order_id, load_cfds_by_oracle_event_id, load_order_by_id, }; use crate::model::cfd::{ Attestation, Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc, @@ -158,10 +159,8 @@ impl Actor { }, ); - insert_cfd(cfd, &mut conn).await?; + insert_cfd(cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; self.send_to_maker .do_send_async(wire::TakerToMaker::TakeOrder { order_id, quantity }) .await?; @@ -280,11 +279,10 @@ impl Actor { }, }, &mut conn, + &self.cfd_feed_actor_inbox, ) .await?; - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; let offer_announcement = self @@ -339,12 +337,10 @@ impl Actor { }, }, &mut conn, + &self.cfd_feed_actor_inbox, ) .await?; - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; - Ok(()) } @@ -373,10 +369,13 @@ impl Actor { cfd.handle(CfdStateChangeEvent::ProposalSigned( CollaborativeSettlement::new(tx, dlc.script_pubkey_for(cfd.role()), proposal.price), ))?; - insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, &mut conn).await?; - - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; + insert_new_cfd_state_by_order_id( + cfd.order.id, + &cfd.state, + &mut conn, + &self.cfd_feed_actor_inbox, + ) + .await?; self.remove_pending_proposal(&order_id)?; @@ -504,12 +503,10 @@ impl Actor { attestation: None, }, &mut conn, + &self.cfd_feed_actor_inbox, ) .await?; - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; - let txid = self .wallet .try_broadcast_transaction(dlc.lock.0.clone()) @@ -555,12 +552,10 @@ impl Actor { collaborative_close: None, }, &mut conn, + &self.cfd_feed_actor_inbox, ) .await?; - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; self.monitor_actor @@ -585,10 +580,13 @@ impl Actor { return Ok(()); } - insert_new_cfd_state_by_order_id(order_id, &cfd.state, &mut conn).await?; - - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; + insert_new_cfd_state_by_order_id( + order_id, + &cfd.state, + &mut conn, + &self.cfd_feed_actor_inbox, + ) + .await?; // TODO: code duplicateion maker/taker if let CfdState::OpenCommitted { .. } = cfd.state { @@ -622,10 +620,13 @@ impl Actor { bail!("If we can get the commit tx we should be able to transition") } - insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, &mut conn).await?; - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; - + insert_new_cfd_state_by_order_id( + cfd.order.id, + &cfd.state, + &mut conn, + &self.cfd_feed_actor_inbox, + ) + .await?; tracing::info!("Commit transaction published on chain: {}", txid); Ok(()) @@ -660,9 +661,13 @@ impl Actor { continue; } - insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, &mut conn).await?; - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; + insert_new_cfd_state_by_order_id( + cfd.order.id, + &cfd.state, + &mut conn, + &self.cfd_feed_actor_inbox, + ) + .await?; if let Err(e) = self.try_cet_publication(cfd).await { tracing::error!("Error when trying to publish CET: {:#}", e); @@ -686,10 +691,13 @@ impl Actor { bail!("If we can get the CET we should be able to transition") } - insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, &mut conn).await?; - - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; + insert_new_cfd_state_by_order_id( + cfd.order.id, + &cfd.state, + &mut conn, + &self.cfd_feed_actor_inbox, + ) + .await?; } Err(not_ready_yet) => { tracing::debug!("{:#}", not_ready_yet); From 3730cfae36abe007454b8603f19f50cac6fbbc2b Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Thu, 14 Oct 2021 19:41:15 +1030 Subject: [PATCH 3/3] Reduce code duplication in cfd actors Extract shared handlers code into a common module --- daemon/src/actors.rs | 28 -------- daemon/src/cfd_actors.rs | 149 ++++++++++++++++++++++++++++++++++++++ daemon/src/lib.rs | 1 + daemon/src/maker_cfd.rs | 151 ++++++--------------------------------- daemon/src/taker_cfd.rs | 145 +++++-------------------------------- 5 files changed, 192 insertions(+), 282 deletions(-) create mode 100644 daemon/src/cfd_actors.rs diff --git a/daemon/src/actors.rs b/daemon/src/actors.rs index 4f329a3..c97159d 100644 --- a/daemon/src/actors.rs +++ b/daemon/src/actors.rs @@ -1,10 +1,3 @@ -use crate::{ - db, - model::cfd::{Cfd, CfdState, OrderId}, -}; -use sqlx::{pool::PoolConnection, Sqlite}; -use tokio::sync::watch; - /// Wrapper for handlers to log errors #[macro_export] macro_rules! log_error { @@ -14,24 +7,3 @@ macro_rules! log_error { } }; } - -pub async fn insert_cfd( - cfd: Cfd, - conn: &mut PoolConnection, - update_sender: &watch::Sender>, -) -> anyhow::Result<()> { - db::insert_cfd(cfd, conn).await?; - update_sender.send(db::load_all_cfds(conn).await?)?; - Ok(()) -} - -pub async fn insert_new_cfd_state_by_order_id( - order_id: OrderId, - new_state: &CfdState, - conn: &mut PoolConnection, - update_sender: &watch::Sender>, -) -> anyhow::Result<()> { - db::insert_new_cfd_state_by_order_id(order_id, new_state, conn).await?; - update_sender.send(db::load_all_cfds(conn).await?)?; - Ok(()) -} diff --git a/daemon/src/cfd_actors.rs b/daemon/src/cfd_actors.rs new file mode 100644 index 0000000..2005b7c --- /dev/null +++ b/daemon/src/cfd_actors.rs @@ -0,0 +1,149 @@ +use crate::model::cfd::{Attestation, Cfd, CfdState, CfdStateChangeEvent, OrderId}; +use crate::wallet::Wallet; +use crate::{db, monitor, oracle}; +use anyhow::{bail, Result}; +use sqlx::pool::PoolConnection; +use sqlx::Sqlite; +use tokio::sync::watch; + +pub async fn insert_cfd( + cfd: Cfd, + conn: &mut PoolConnection, + update_sender: &watch::Sender>, +) -> Result<()> { + db::insert_cfd(cfd, conn).await?; + update_sender.send(db::load_all_cfds(conn).await?)?; + Ok(()) +} + +pub async fn insert_new_cfd_state_by_order_id( + order_id: OrderId, + new_state: &CfdState, + conn: &mut PoolConnection, + update_sender: &watch::Sender>, +) -> Result<()> { + db::insert_new_cfd_state_by_order_id(order_id, new_state, conn).await?; + update_sender.send(db::load_all_cfds(conn).await?)?; + Ok(()) +} + +pub async fn try_cet_publication( + cfd: &mut Cfd, + conn: &mut PoolConnection, + wallet: &Wallet, + update_sender: &watch::Sender>, +) -> Result<()> { + match cfd.cet()? { + Ok(cet) => { + let txid = wallet.try_broadcast_transaction(cet).await?; + tracing::info!("CET published with txid {}", txid); + + if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() { + bail!("If we can get the CET we should be able to transition") + } + + insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, conn, update_sender).await?; + } + Err(not_ready_yet) => { + tracing::debug!("{:#}", not_ready_yet); + return Ok(()); + } + }; + + Ok(()) +} + +pub async fn handle_monitoring_event( + event: monitor::Event, + conn: &mut PoolConnection, + wallet: &Wallet, + update_sender: &watch::Sender>, +) -> Result<()> { + let order_id = event.order_id(); + + let mut cfd = db::load_cfd_by_order_id(order_id, conn).await?; + + if cfd.handle(CfdStateChangeEvent::Monitor(event))?.is_none() { + // early exit if there was not state change + // this is for cases where we are already in a final state + return Ok(()); + } + + insert_new_cfd_state_by_order_id(order_id, &cfd.state, conn, update_sender).await?; + + if let CfdState::OpenCommitted { .. } = cfd.state { + try_cet_publication(&mut cfd, conn, wallet, update_sender).await?; + } else if let CfdState::MustRefund { .. } = cfd.state { + let signed_refund_tx = cfd.refund_tx()?; + let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?; + + tracing::info!("Refund transaction published on chain: {}", txid); + } + Ok(()) +} + +pub async fn handle_commit( + order_id: OrderId, + conn: &mut PoolConnection, + wallet: &Wallet, + update_sender: &watch::Sender>, +) -> Result<()> { + let mut cfd = db::load_cfd_by_order_id(order_id, conn).await?; + + let signed_commit_tx = cfd.commit_tx()?; + + let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?; + + if cfd.handle(CfdStateChangeEvent::CommitTxSent)?.is_none() { + bail!("If we can get the commit tx we should be able to transition") + } + + insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, conn, update_sender).await?; + tracing::info!("Commit transaction published on chain: {}", txid); + + Ok(()) +} + +pub async fn handle_oracle_attestation( + attestation: oracle::Attestation, + conn: &mut PoolConnection, + wallet: &Wallet, + update_sender: &watch::Sender>, +) -> Result<()> { + tracing::debug!( + "Learnt latest oracle attestation for event: {}", + attestation.id + ); + + let mut cfds = db::load_cfds_by_oracle_event_id(attestation.id, conn).await?; + + for (cfd, dlc) in cfds + .iter_mut() + .filter_map(|cfd| cfd.dlc().map(|dlc| (cfd, dlc))) + { + if cfd + .handle(CfdStateChangeEvent::OracleAttestation(Attestation::new( + attestation.id, + attestation.price, + attestation.scalars.clone(), + dlc, + cfd.role(), + )?))? + .is_none() + { + // if we don't transition to a new state after oracle attestation we ignore the cfd + // this is for cases where we cannot handle the attestation which should be in a + // final state + continue; + } + + insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, conn, update_sender).await?; + + if let Err(e) = try_cet_publication(cfd, conn, wallet, update_sender).await { + tracing::error!("Error when trying to publish CET: {:#}", e); + continue; + } + } + + Ok(()) +} diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index b401d9d..7b23376 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -1,6 +1,7 @@ pub mod actors; pub mod auth; pub mod bitmex_price_feed; +pub mod cfd_actors; pub mod db; pub mod fan_out; pub mod housekeeping; diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 4c21e9b..f2dd833 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -1,19 +1,16 @@ -use crate::actors::insert_cfd; -use crate::actors::insert_new_cfd_state_by_order_id; -use crate::db::{ - insert_order, load_cfd_by_order_id, load_cfds_by_oracle_event_id, load_order_by_id, -}; +use crate::cfd_actors::{self, insert_cfd, insert_new_cfd_state_by_order_id}; +use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; use crate::maker_inc_connections::TakerCommand; use crate::model::cfd::{ - Attestation, Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc, - Order, OrderId, Origin, Role, RollOverProposal, SettlementKind, SettlementProposal, - UpdateCfdProposal, UpdateCfdProposals, + Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc, Order, + OrderId, Origin, Role, RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal, + UpdateCfdProposals, }; use crate::model::{TakerId, Usd}; use crate::monitor::MonitorParams; use crate::wallet::Wallet; use crate::{log_error, maker_inc_connections, monitor, oracle, setup_contract, wire}; -use anyhow::{bail, Context as _, Result}; +use anyhow::{Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; use cfd_protocol::secp256k1_zkp::Signature; @@ -701,27 +698,13 @@ impl Actor { async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { let mut conn = self.db.acquire().await?; - let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - - let signed_commit_tx = cfd.commit_tx()?; - - let txid = self - .wallet - .try_broadcast_transaction(signed_commit_tx) - .await?; - - tracing::info!("Commit transaction published on chain: {}", txid); - - if let Some(new_state) = cfd.handle(CfdStateChangeEvent::CommitTxSent)? { - insert_new_cfd_state_by_order_id( - cfd.order.id, - &new_state, - &mut conn, - &self.cfd_feed_actor_inbox, - ) - .await?; - } - + cfd_actors::handle_commit( + order_id, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; Ok(()) } @@ -879,114 +862,26 @@ impl Actor { } async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { - let order_id = event.order_id(); - let mut conn = self.db.acquire().await?; - let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - - if cfd.handle(CfdStateChangeEvent::Monitor(event))?.is_none() { - // early exit if there was not state change - // this is for cases where we are already in a final state - return Ok(()); - } - - insert_new_cfd_state_by_order_id( - order_id, - &cfd.state, + cfd_actors::handle_monitoring_event( + event, &mut conn, + &self.wallet, &self.cfd_feed_actor_inbox, ) .await?; - - // TODO: code duplication maker/taker - if let CfdState::OpenCommitted { .. } = cfd.state { - self.try_cet_publication(&mut cfd).await?; - } else if let CfdState::MustRefund { .. } = cfd.state { - let signed_refund_tx = cfd.refund_tx()?; - let txid = self - .wallet - .try_broadcast_transaction(signed_refund_tx) - .await?; - - tracing::info!("Refund transaction published on chain: {}", txid); - } - Ok(()) } async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { - tracing::debug!( - "Learnt latest oracle attestation for event: {}", - attestation.id - ); - let mut conn = self.db.acquire().await?; - let mut cfds = load_cfds_by_oracle_event_id(attestation.id, &mut conn).await?; - - for (cfd, dlc) in cfds - .iter_mut() - .filter_map(|cfd| cfd.dlc().map(|dlc| (cfd, dlc))) - { - if cfd - .handle(CfdStateChangeEvent::OracleAttestation(Attestation::new( - attestation.id, - attestation.price, - attestation.scalars.clone(), - dlc, - cfd.role(), - )?))? - .is_none() - { - // if we don't transition to a new state after oracle attestation we ignore the cfd - // this is for cases where we cannot handle the attestation which should be in a - // final state - continue; - } - - insert_new_cfd_state_by_order_id( - cfd.order.id, - &cfd.state, - &mut conn, - &self.cfd_feed_actor_inbox, - ) - .await?; - - if let Err(e) = self.try_cet_publication(cfd).await { - tracing::error!("Error when trying to publish CET: {:#}", e); - continue; - } - } - - Ok(()) - } - - // TODO: code duplication maker/taker - async fn try_cet_publication(&mut self, cfd: &mut Cfd) -> Result<()> { - let mut conn = self.db.acquire().await?; - - match cfd.cet()? { - Ok(cet) => { - let txid = self.wallet.try_broadcast_transaction(cet).await?; - tracing::info!("CET published with txid {}", txid); - - if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() { - bail!("If we can get the CET we should be able to transition") - } - - insert_new_cfd_state_by_order_id( - cfd.order.id, - &cfd.state, - &mut conn, - &self.cfd_feed_actor_inbox, - ) - .await?; - } - Err(not_ready_yet) => { - tracing::debug!("{:#}", not_ready_yet); - return Ok(()); - } - }; - + cfd_actors::handle_oracle_attestation( + attestation, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; Ok(()) } } diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index a640f1f..4c3c69c 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -1,19 +1,16 @@ -use crate::actors::insert_cfd; -use crate::actors::insert_new_cfd_state_by_order_id; -use crate::db::{ - insert_order, load_cfd_by_order_id, load_cfds_by_oracle_event_id, load_order_by_id, -}; +use crate::cfd_actors::{self, insert_cfd, insert_new_cfd_state_by_order_id}; +use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; use crate::model::cfd::{ - Attestation, Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc, - Order, OrderId, Origin, Role, RollOverProposal, SettlementKind, SettlementProposal, - UpdateCfdProposal, UpdateCfdProposals, + Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc, Order, + OrderId, Origin, Role, RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal, + UpdateCfdProposals, }; use crate::model::{BitMexPriceEventId, Usd}; use crate::monitor::{self, MonitorParams}; use crate::wallet::Wallet; use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg}; use crate::{log_error, oracle, send_to_socket, setup_contract, wire}; -use anyhow::{bail, Context as _, Result}; +use anyhow::{Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; use futures::channel::mpsc; @@ -569,142 +566,38 @@ impl Actor { } async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { - let order_id = event.order_id(); - let mut conn = self.db.acquire().await?; - let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - - if cfd.handle(CfdStateChangeEvent::Monitor(event))?.is_none() { - // early exit if there was not state change - // this is for cases where we are already in a final state - return Ok(()); - } - - insert_new_cfd_state_by_order_id( - order_id, - &cfd.state, + cfd_actors::handle_monitoring_event( + event, &mut conn, + &self.wallet, &self.cfd_feed_actor_inbox, ) .await?; - - // TODO: code duplicateion maker/taker - if let CfdState::OpenCommitted { .. } = cfd.state { - self.try_cet_publication(&mut cfd).await?; - } else if let CfdState::MustRefund { .. } = cfd.state { - let signed_refund_tx = cfd.refund_tx()?; - let txid = self - .wallet - .try_broadcast_transaction(signed_refund_tx) - .await?; - - tracing::info!("Refund transaction published on chain: {}", txid); - } - Ok(()) } - // TODO: code duplicateion maker/taker async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { let mut conn = self.db.acquire().await?; - let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - - let signed_commit_tx = cfd.commit_tx()?; - - let txid = self - .wallet - .try_broadcast_transaction(signed_commit_tx) - .await?; - - if cfd.handle(CfdStateChangeEvent::CommitTxSent)?.is_none() { - bail!("If we can get the commit tx we should be able to transition") - } - - insert_new_cfd_state_by_order_id( - cfd.order.id, - &cfd.state, + cfd_actors::handle_commit( + order_id, &mut conn, + &self.wallet, &self.cfd_feed_actor_inbox, ) .await?; - tracing::info!("Commit transaction published on chain: {}", txid); - Ok(()) } async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { - tracing::debug!( - "Learnt latest oracle attestation for event: {}", - attestation.id - ); - let mut conn = self.db.acquire().await?; - let mut cfds = load_cfds_by_oracle_event_id(attestation.id, &mut conn).await?; - - for (cfd, dlc) in cfds - .iter_mut() - .filter_map(|cfd| cfd.dlc().map(|dlc| (cfd, dlc))) - { - if cfd - .handle(CfdStateChangeEvent::OracleAttestation(Attestation::new( - attestation.id, - attestation.price, - attestation.scalars.clone(), - dlc, - cfd.role(), - )?))? - .is_none() - { - // if we don't transition to a new state after oracle attestation we ignore the cfd - // this is for cases where we cannot handle the attestation which should be in a - // final state - continue; - } - - insert_new_cfd_state_by_order_id( - cfd.order.id, - &cfd.state, - &mut conn, - &self.cfd_feed_actor_inbox, - ) - .await?; - - if let Err(e) = self.try_cet_publication(cfd).await { - tracing::error!("Error when trying to publish CET: {:#}", e); - continue; - } - } - - Ok(()) - } - - // TODO: code duplication maker/taker - async fn try_cet_publication(&mut self, cfd: &mut Cfd) -> Result<()> { - let mut conn = self.db.acquire().await?; - - match cfd.cet()? { - Ok(cet) => { - let txid = self.wallet.try_broadcast_transaction(cet).await?; - tracing::info!("CET published with txid {}", txid); - - if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() { - bail!("If we can get the CET we should be able to transition") - } - - insert_new_cfd_state_by_order_id( - cfd.order.id, - &cfd.state, - &mut conn, - &self.cfd_feed_actor_inbox, - ) - .await?; - } - Err(not_ready_yet) => { - tracing::debug!("{:#}", not_ready_yet); - return Ok(()); - } - }; - + cfd_actors::handle_oracle_attestation( + attestation, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; Ok(()) } }