diff --git a/daemon/src/actors.rs b/daemon/src/actors.rs index 4f329a3..c97159d 100644 --- a/daemon/src/actors.rs +++ b/daemon/src/actors.rs @@ -1,10 +1,3 @@ -use crate::{ - db, - model::cfd::{Cfd, CfdState, OrderId}, -}; -use sqlx::{pool::PoolConnection, Sqlite}; -use tokio::sync::watch; - /// Wrapper for handlers to log errors #[macro_export] macro_rules! log_error { @@ -14,24 +7,3 @@ macro_rules! log_error { } }; } - -pub async fn insert_cfd( - cfd: Cfd, - conn: &mut PoolConnection, - update_sender: &watch::Sender>, -) -> anyhow::Result<()> { - db::insert_cfd(cfd, conn).await?; - update_sender.send(db::load_all_cfds(conn).await?)?; - Ok(()) -} - -pub async fn insert_new_cfd_state_by_order_id( - order_id: OrderId, - new_state: &CfdState, - conn: &mut PoolConnection, - update_sender: &watch::Sender>, -) -> anyhow::Result<()> { - db::insert_new_cfd_state_by_order_id(order_id, new_state, conn).await?; - update_sender.send(db::load_all_cfds(conn).await?)?; - Ok(()) -} diff --git a/daemon/src/cfd_actors.rs b/daemon/src/cfd_actors.rs new file mode 100644 index 0000000..2005b7c --- /dev/null +++ b/daemon/src/cfd_actors.rs @@ -0,0 +1,149 @@ +use crate::model::cfd::{Attestation, Cfd, CfdState, CfdStateChangeEvent, OrderId}; +use crate::wallet::Wallet; +use crate::{db, monitor, oracle}; +use anyhow::{bail, Result}; +use sqlx::pool::PoolConnection; +use sqlx::Sqlite; +use tokio::sync::watch; + +pub async fn insert_cfd( + cfd: Cfd, + conn: &mut PoolConnection, + update_sender: &watch::Sender>, +) -> Result<()> { + db::insert_cfd(cfd, conn).await?; + update_sender.send(db::load_all_cfds(conn).await?)?; + Ok(()) +} + +pub async fn insert_new_cfd_state_by_order_id( + order_id: OrderId, + new_state: &CfdState, + conn: &mut PoolConnection, + update_sender: &watch::Sender>, +) -> Result<()> { + db::insert_new_cfd_state_by_order_id(order_id, new_state, conn).await?; + update_sender.send(db::load_all_cfds(conn).await?)?; + Ok(()) +} + +pub async fn try_cet_publication( + cfd: &mut Cfd, + conn: &mut PoolConnection, + wallet: &Wallet, + update_sender: &watch::Sender>, +) -> Result<()> { + match cfd.cet()? { + Ok(cet) => { + let txid = wallet.try_broadcast_transaction(cet).await?; + tracing::info!("CET published with txid {}", txid); + + if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() { + bail!("If we can get the CET we should be able to transition") + } + + insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, conn, update_sender).await?; + } + Err(not_ready_yet) => { + tracing::debug!("{:#}", not_ready_yet); + return Ok(()); + } + }; + + Ok(()) +} + +pub async fn handle_monitoring_event( + event: monitor::Event, + conn: &mut PoolConnection, + wallet: &Wallet, + update_sender: &watch::Sender>, +) -> Result<()> { + let order_id = event.order_id(); + + let mut cfd = db::load_cfd_by_order_id(order_id, conn).await?; + + if cfd.handle(CfdStateChangeEvent::Monitor(event))?.is_none() { + // early exit if there was not state change + // this is for cases where we are already in a final state + return Ok(()); + } + + insert_new_cfd_state_by_order_id(order_id, &cfd.state, conn, update_sender).await?; + + if let CfdState::OpenCommitted { .. } = cfd.state { + try_cet_publication(&mut cfd, conn, wallet, update_sender).await?; + } else if let CfdState::MustRefund { .. } = cfd.state { + let signed_refund_tx = cfd.refund_tx()?; + let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?; + + tracing::info!("Refund transaction published on chain: {}", txid); + } + Ok(()) +} + +pub async fn handle_commit( + order_id: OrderId, + conn: &mut PoolConnection, + wallet: &Wallet, + update_sender: &watch::Sender>, +) -> Result<()> { + let mut cfd = db::load_cfd_by_order_id(order_id, conn).await?; + + let signed_commit_tx = cfd.commit_tx()?; + + let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?; + + if cfd.handle(CfdStateChangeEvent::CommitTxSent)?.is_none() { + bail!("If we can get the commit tx we should be able to transition") + } + + insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, conn, update_sender).await?; + tracing::info!("Commit transaction published on chain: {}", txid); + + Ok(()) +} + +pub async fn handle_oracle_attestation( + attestation: oracle::Attestation, + conn: &mut PoolConnection, + wallet: &Wallet, + update_sender: &watch::Sender>, +) -> Result<()> { + tracing::debug!( + "Learnt latest oracle attestation for event: {}", + attestation.id + ); + + let mut cfds = db::load_cfds_by_oracle_event_id(attestation.id, conn).await?; + + for (cfd, dlc) in cfds + .iter_mut() + .filter_map(|cfd| cfd.dlc().map(|dlc| (cfd, dlc))) + { + if cfd + .handle(CfdStateChangeEvent::OracleAttestation(Attestation::new( + attestation.id, + attestation.price, + attestation.scalars.clone(), + dlc, + cfd.role(), + )?))? + .is_none() + { + // if we don't transition to a new state after oracle attestation we ignore the cfd + // this is for cases where we cannot handle the attestation which should be in a + // final state + continue; + } + + insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, conn, update_sender).await?; + + if let Err(e) = try_cet_publication(cfd, conn, wallet, update_sender).await { + tracing::error!("Error when trying to publish CET: {:#}", e); + continue; + } + } + + Ok(()) +} diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index b401d9d..7b23376 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -1,6 +1,7 @@ pub mod actors; pub mod auth; pub mod bitmex_price_feed; +pub mod cfd_actors; pub mod db; pub mod fan_out; pub mod housekeeping; diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 4c21e9b..f2dd833 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -1,19 +1,16 @@ -use crate::actors::insert_cfd; -use crate::actors::insert_new_cfd_state_by_order_id; -use crate::db::{ - insert_order, load_cfd_by_order_id, load_cfds_by_oracle_event_id, load_order_by_id, -}; +use crate::cfd_actors::{self, insert_cfd, insert_new_cfd_state_by_order_id}; +use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; use crate::maker_inc_connections::TakerCommand; use crate::model::cfd::{ - Attestation, Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc, - Order, OrderId, Origin, Role, RollOverProposal, SettlementKind, SettlementProposal, - UpdateCfdProposal, UpdateCfdProposals, + Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc, Order, + OrderId, Origin, Role, RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal, + UpdateCfdProposals, }; use crate::model::{TakerId, Usd}; use crate::monitor::MonitorParams; use crate::wallet::Wallet; use crate::{log_error, maker_inc_connections, monitor, oracle, setup_contract, wire}; -use anyhow::{bail, Context as _, Result}; +use anyhow::{Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; use cfd_protocol::secp256k1_zkp::Signature; @@ -701,27 +698,13 @@ impl Actor { async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { let mut conn = self.db.acquire().await?; - let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - - let signed_commit_tx = cfd.commit_tx()?; - - let txid = self - .wallet - .try_broadcast_transaction(signed_commit_tx) - .await?; - - tracing::info!("Commit transaction published on chain: {}", txid); - - if let Some(new_state) = cfd.handle(CfdStateChangeEvent::CommitTxSent)? { - insert_new_cfd_state_by_order_id( - cfd.order.id, - &new_state, - &mut conn, - &self.cfd_feed_actor_inbox, - ) - .await?; - } - + cfd_actors::handle_commit( + order_id, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; Ok(()) } @@ -879,114 +862,26 @@ impl Actor { } async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { - let order_id = event.order_id(); - let mut conn = self.db.acquire().await?; - let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - - if cfd.handle(CfdStateChangeEvent::Monitor(event))?.is_none() { - // early exit if there was not state change - // this is for cases where we are already in a final state - return Ok(()); - } - - insert_new_cfd_state_by_order_id( - order_id, - &cfd.state, + cfd_actors::handle_monitoring_event( + event, &mut conn, + &self.wallet, &self.cfd_feed_actor_inbox, ) .await?; - - // TODO: code duplication maker/taker - if let CfdState::OpenCommitted { .. } = cfd.state { - self.try_cet_publication(&mut cfd).await?; - } else if let CfdState::MustRefund { .. } = cfd.state { - let signed_refund_tx = cfd.refund_tx()?; - let txid = self - .wallet - .try_broadcast_transaction(signed_refund_tx) - .await?; - - tracing::info!("Refund transaction published on chain: {}", txid); - } - Ok(()) } async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { - tracing::debug!( - "Learnt latest oracle attestation for event: {}", - attestation.id - ); - let mut conn = self.db.acquire().await?; - let mut cfds = load_cfds_by_oracle_event_id(attestation.id, &mut conn).await?; - - for (cfd, dlc) in cfds - .iter_mut() - .filter_map(|cfd| cfd.dlc().map(|dlc| (cfd, dlc))) - { - if cfd - .handle(CfdStateChangeEvent::OracleAttestation(Attestation::new( - attestation.id, - attestation.price, - attestation.scalars.clone(), - dlc, - cfd.role(), - )?))? - .is_none() - { - // if we don't transition to a new state after oracle attestation we ignore the cfd - // this is for cases where we cannot handle the attestation which should be in a - // final state - continue; - } - - insert_new_cfd_state_by_order_id( - cfd.order.id, - &cfd.state, - &mut conn, - &self.cfd_feed_actor_inbox, - ) - .await?; - - if let Err(e) = self.try_cet_publication(cfd).await { - tracing::error!("Error when trying to publish CET: {:#}", e); - continue; - } - } - - Ok(()) - } - - // TODO: code duplication maker/taker - async fn try_cet_publication(&mut self, cfd: &mut Cfd) -> Result<()> { - let mut conn = self.db.acquire().await?; - - match cfd.cet()? { - Ok(cet) => { - let txid = self.wallet.try_broadcast_transaction(cet).await?; - tracing::info!("CET published with txid {}", txid); - - if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() { - bail!("If we can get the CET we should be able to transition") - } - - insert_new_cfd_state_by_order_id( - cfd.order.id, - &cfd.state, - &mut conn, - &self.cfd_feed_actor_inbox, - ) - .await?; - } - Err(not_ready_yet) => { - tracing::debug!("{:#}", not_ready_yet); - return Ok(()); - } - }; - + cfd_actors::handle_oracle_attestation( + attestation, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; Ok(()) } } diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index a640f1f..4c3c69c 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -1,19 +1,16 @@ -use crate::actors::insert_cfd; -use crate::actors::insert_new_cfd_state_by_order_id; -use crate::db::{ - insert_order, load_cfd_by_order_id, load_cfds_by_oracle_event_id, load_order_by_id, -}; +use crate::cfd_actors::{self, insert_cfd, insert_new_cfd_state_by_order_id}; +use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; use crate::model::cfd::{ - Attestation, Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc, - Order, OrderId, Origin, Role, RollOverProposal, SettlementKind, SettlementProposal, - UpdateCfdProposal, UpdateCfdProposals, + Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc, Order, + OrderId, Origin, Role, RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal, + UpdateCfdProposals, }; use crate::model::{BitMexPriceEventId, Usd}; use crate::monitor::{self, MonitorParams}; use crate::wallet::Wallet; use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg}; use crate::{log_error, oracle, send_to_socket, setup_contract, wire}; -use anyhow::{bail, Context as _, Result}; +use anyhow::{Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; use futures::channel::mpsc; @@ -569,142 +566,38 @@ impl Actor { } async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { - let order_id = event.order_id(); - let mut conn = self.db.acquire().await?; - let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - - if cfd.handle(CfdStateChangeEvent::Monitor(event))?.is_none() { - // early exit if there was not state change - // this is for cases where we are already in a final state - return Ok(()); - } - - insert_new_cfd_state_by_order_id( - order_id, - &cfd.state, + cfd_actors::handle_monitoring_event( + event, &mut conn, + &self.wallet, &self.cfd_feed_actor_inbox, ) .await?; - - // TODO: code duplicateion maker/taker - if let CfdState::OpenCommitted { .. } = cfd.state { - self.try_cet_publication(&mut cfd).await?; - } else if let CfdState::MustRefund { .. } = cfd.state { - let signed_refund_tx = cfd.refund_tx()?; - let txid = self - .wallet - .try_broadcast_transaction(signed_refund_tx) - .await?; - - tracing::info!("Refund transaction published on chain: {}", txid); - } - Ok(()) } - // TODO: code duplicateion maker/taker async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { let mut conn = self.db.acquire().await?; - let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - - let signed_commit_tx = cfd.commit_tx()?; - - let txid = self - .wallet - .try_broadcast_transaction(signed_commit_tx) - .await?; - - if cfd.handle(CfdStateChangeEvent::CommitTxSent)?.is_none() { - bail!("If we can get the commit tx we should be able to transition") - } - - insert_new_cfd_state_by_order_id( - cfd.order.id, - &cfd.state, + cfd_actors::handle_commit( + order_id, &mut conn, + &self.wallet, &self.cfd_feed_actor_inbox, ) .await?; - tracing::info!("Commit transaction published on chain: {}", txid); - Ok(()) } async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { - tracing::debug!( - "Learnt latest oracle attestation for event: {}", - attestation.id - ); - let mut conn = self.db.acquire().await?; - let mut cfds = load_cfds_by_oracle_event_id(attestation.id, &mut conn).await?; - - for (cfd, dlc) in cfds - .iter_mut() - .filter_map(|cfd| cfd.dlc().map(|dlc| (cfd, dlc))) - { - if cfd - .handle(CfdStateChangeEvent::OracleAttestation(Attestation::new( - attestation.id, - attestation.price, - attestation.scalars.clone(), - dlc, - cfd.role(), - )?))? - .is_none() - { - // if we don't transition to a new state after oracle attestation we ignore the cfd - // this is for cases where we cannot handle the attestation which should be in a - // final state - continue; - } - - insert_new_cfd_state_by_order_id( - cfd.order.id, - &cfd.state, - &mut conn, - &self.cfd_feed_actor_inbox, - ) - .await?; - - if let Err(e) = self.try_cet_publication(cfd).await { - tracing::error!("Error when trying to publish CET: {:#}", e); - continue; - } - } - - Ok(()) - } - - // TODO: code duplication maker/taker - async fn try_cet_publication(&mut self, cfd: &mut Cfd) -> Result<()> { - let mut conn = self.db.acquire().await?; - - match cfd.cet()? { - Ok(cet) => { - let txid = self.wallet.try_broadcast_transaction(cet).await?; - tracing::info!("CET published with txid {}", txid); - - if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() { - bail!("If we can get the CET we should be able to transition") - } - - insert_new_cfd_state_by_order_id( - cfd.order.id, - &cfd.state, - &mut conn, - &self.cfd_feed_actor_inbox, - ) - .await?; - } - Err(not_ready_yet) => { - tracing::debug!("{:#}", not_ready_yet); - return Ok(()); - } - }; - + cfd_actors::handle_oracle_attestation( + attestation, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; Ok(()) } }