From ca9371474645049f5efde414459db28ee4a04f9b Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 15 Oct 2021 12:58:33 +1100 Subject: [PATCH] Refactor `insert_new_cfd_state_by_order_id` to `append_cfd_state` We almost always have the entire `Cfd` available when we call this function. It is therefore much easier to simply pass the entire `Cfd` in instead of selective data. --- daemon/src/cfd_actors.rs | 19 +++--- daemon/src/db.rs | 25 +++----- daemon/src/housekeeping.rs | 23 +++---- daemon/src/maker_cfd.rs | 126 +++++++++++++++---------------------- daemon/src/taker_cfd.rs | 93 ++++++++++----------------- 5 files changed, 113 insertions(+), 173 deletions(-) diff --git a/daemon/src/cfd_actors.rs b/daemon/src/cfd_actors.rs index f3190bb..8484812 100644 --- a/daemon/src/cfd_actors.rs +++ b/daemon/src/cfd_actors.rs @@ -7,22 +7,21 @@ use sqlx::Sqlite; use tokio::sync::watch; pub async fn insert_cfd( - cfd: Cfd, + cfd: &Cfd, conn: &mut PoolConnection, update_sender: &watch::Sender>, ) -> Result<()> { - db::insert_cfd(&cfd, conn).await?; + db::insert_cfd(cfd, conn).await?; update_sender.send(db::load_all_cfds(conn).await?)?; Ok(()) } -pub async fn insert_new_cfd_state_by_order_id( - order_id: OrderId, - new_state: &CfdState, +pub async fn append_cfd_state( + cfd: &Cfd, conn: &mut PoolConnection, update_sender: &watch::Sender>, ) -> Result<()> { - db::insert_new_cfd_state_by_order_id(order_id, new_state, conn).await?; + db::append_cfd_state(cfd, conn).await?; update_sender.send(db::load_all_cfds(conn).await?)?; Ok(()) } @@ -42,7 +41,7 @@ pub async fn try_cet_publication( bail!("If we can get the CET we should be able to transition") } - insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, conn, update_sender).await?; + append_cfd_state(cfd, conn, update_sender).await?; } Err(not_ready_yet) => { tracing::debug!("{:#}", not_ready_yet); @@ -69,7 +68,7 @@ pub async fn handle_monitoring_event( return Ok(()); } - insert_new_cfd_state_by_order_id(order_id, &cfd.state, conn, update_sender).await?; + append_cfd_state(&cfd, conn, update_sender).await?; if let CfdState::OpenCommitted { .. } = cfd.state { try_cet_publication(&mut cfd, conn, wallet, update_sender).await?; @@ -98,7 +97,7 @@ pub async fn handle_commit( bail!("If we can get the commit tx we should be able to transition") } - insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, conn, update_sender).await?; + append_cfd_state(&cfd, conn, update_sender).await?; tracing::info!("Commit transaction published on chain: {}", txid); Ok(()) @@ -137,7 +136,7 @@ pub async fn handle_oracle_attestation( continue; } - insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, conn, update_sender).await?; + append_cfd_state(cfd, conn, update_sender).await?; if let Err(e) = try_cet_publication(cfd, conn, wallet, update_sender).await { tracing::error!("Error when trying to publish CET: {:#}", e); diff --git a/daemon/src/db.rs b/daemon/src/db.rs index 456031b..6510938 100644 --- a/daemon/src/db.rs +++ b/daemon/src/db.rs @@ -160,22 +160,19 @@ pub async fn insert_cfd(cfd: &Cfd, conn: &mut PoolConnection) -> anyhow: Ok(()) } -pub async fn insert_new_cfd_state_by_order_id( - order_id: OrderId, - new_state: &CfdState, - conn: &mut PoolConnection, -) -> anyhow::Result<()> { - let cfd_id = load_cfd_id_by_order_uuid(order_id, conn).await?; - let latest_cfd_state_in_db = load_latest_cfd_state(cfd_id, conn) +pub async fn append_cfd_state(cfd: &Cfd, conn: &mut PoolConnection) -> anyhow::Result<()> { + let cfd_id = load_cfd_id_by_order_uuid(cfd.order.id, conn).await?; + let current_state = load_latest_cfd_state(cfd_id, conn) .await .context("loading latest state failed")?; + let new_state = &cfd.state; - if mem::discriminant(&latest_cfd_state_in_db) == mem::discriminant(new_state) { + if mem::discriminant(¤t_state) == mem::discriminant(new_state) { // Since we have states where we add information this happens quite frequently tracing::trace!( "Same state transition for cfd with order_id {}: {}", - order_id, - latest_cfd_state_in_db + cfd.order.id, + current_state ); } @@ -599,9 +596,7 @@ mod tests { transition_timestamp: SystemTime::now(), }, }; - insert_new_cfd_state_by_order_id(cfd_1.order.id, &cfd_1.state, &mut conn) - .await - .unwrap(); + append_cfd_state(&cfd_1, &mut conn).await.unwrap(); let cfds_from_db = load_all_cfds(&mut conn).await.unwrap(); assert_eq!(vec![cfd_1.clone()], cfds_from_db); @@ -616,9 +611,7 @@ mod tests { transition_timestamp: SystemTime::now(), }, }; - insert_new_cfd_state_by_order_id(cfd_2.order.id, &cfd_2.state, &mut conn) - .await - .unwrap(); + append_cfd_state(&cfd_2, &mut conn).await.unwrap(); let cfds_from_db = load_all_cfds(&mut conn).await.unwrap(); assert_eq!(vec![cfd_1, cfd_2], cfds_from_db); diff --git a/daemon/src/housekeeping.rs b/daemon/src/housekeeping.rs index db43c80..8291e3d 100644 --- a/daemon/src/housekeeping.rs +++ b/daemon/src/housekeeping.rs @@ -1,4 +1,4 @@ -use crate::db::{insert_new_cfd_state_by_order_id, load_all_cfds}; +use crate::db::{append_cfd_state, load_all_cfds}; use crate::model::cfd::{Cfd, CfdState, CfdStateCommon}; use crate::wallet::Wallet; use anyhow::Result; @@ -9,20 +9,17 @@ use std::time::SystemTime; pub async fn transition_non_continue_cfds_to_setup_failed( conn: &mut PoolConnection, ) -> Result<()> { - let cfds = load_all_cfds(conn).await?; + let mut cfds = load_all_cfds(conn).await?; - for cfd in cfds.iter().filter(|cfd| Cfd::is_cleanup(cfd)) { - insert_new_cfd_state_by_order_id( - cfd.order.id, - &CfdState::SetupFailed { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - info: format!("Was in state {} which cannot be continued.", cfd.state), + for cfd in cfds.iter_mut().filter(|cfd| Cfd::is_cleanup(cfd)) { + cfd.state = CfdState::SetupFailed { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), }, - conn, - ) - .await?; + info: format!("Was in state {} which cannot be continued.", cfd.state), + }; + + append_cfd_state(cfd, conn).await?; } Ok(()) diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index a6260ef..3e1aa91 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -1,4 +1,4 @@ -use crate::cfd_actors::{self, insert_cfd, insert_new_cfd_state_by_order_id}; +use crate::cfd_actors::{self, append_cfd_state, insert_cfd}; use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; use crate::maker_inc_connections::TakerCommand; use crate::model::cfd::{ @@ -292,13 +292,7 @@ impl Actor { proposal.price, ), ))?; - insert_new_cfd_state_by_order_id( - cfd.order.id, - &cfd.state, - &mut conn, - &self.cfd_feed_actor_inbox, - ) - .await?; + append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; let spend_tx = dlc.finalize_spend_transaction((tx, sig_maker), sig_taker)?; @@ -405,19 +399,16 @@ impl Actor { let mut conn = self.db.acquire().await?; - insert_new_cfd_state_by_order_id( - order_id, - &CfdState::PendingOpen { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc: dlc.clone(), - attestation: None, + let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + cfd.state = CfdState::PendingOpen { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), }, - &mut conn, - &self.cfd_feed_actor_inbox, - ) - .await?; + dlc: dlc.clone(), + attestation: None, + }; + + append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; let txid = self .wallet @@ -426,8 +417,6 @@ impl Actor { tracing::info!("Lock transaction published with txid {}", txid); - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - self.monitor_actor .do_send_async(monitor::StartMonitoring { id: order_id, @@ -453,22 +442,17 @@ impl Actor { self.roll_over_state = RollOverState::None; let mut conn = self.db.acquire().await?; - insert_new_cfd_state_by_order_id( - order_id, - &CfdState::Open { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc: dlc.clone(), - attestation: None, - collaborative_close: None, + let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + cfd.state = CfdState::Open { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), }, - &mut conn, - &self.cfd_feed_actor_inbox, - ) - .await?; + dlc: dlc.clone(), + attestation: None, + collaborative_close: None, + }; - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; self.monitor_actor .do_send_async(monitor::StartMonitoring { @@ -513,20 +497,7 @@ impl Actor { } }; - // 2. check if order has acceptable amounts - if quantity < current_order.min_quantity || quantity > current_order.max_quantity { - tracing::warn!( - "Order rejected because quantity {} was out of bounds. It was either <{} or >{}", - quantity, - current_order.min_quantity, - current_order.max_quantity - ); - - self.reject_order(taker_id, order_id, conn).await?; - return Ok(()); - } - - // 3. Insert CFD in DB + // 2. Create a new CFD let cfd = Cfd::new( current_order.clone(), quantity, @@ -537,7 +508,20 @@ impl Actor { taker_id, }, ); - insert_cfd(cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + insert_cfd(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + + // 3. check if order has acceptable amounts + if quantity < current_order.min_quantity || quantity > current_order.max_quantity { + tracing::warn!( + "Order rejected because quantity {} was out of bounds. It was either <{} or >{}", + quantity, + current_order.min_quantity, + current_order.max_quantity + ); + + self.reject_order(taker_id, cfd, conn).await?; + return Ok(()); + } // 4. Remove current order self.current_order_id = None; @@ -563,7 +547,7 @@ impl Actor { let mut conn = self.db.acquire().await?; // 1. Validate if order is still valid - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; let taker_id = match cfd { Cfd { state: CfdState::IncomingOrderRequest { taker_id, .. }, @@ -583,17 +567,13 @@ impl Actor { .with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?; // 3. Insert that we are in contract setup and refresh our own feed - insert_new_cfd_state_by_order_id( - order_id, - &CfdState::ContractSetup { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, + cfd.state = CfdState::ContractSetup { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), }, - &mut conn, - &self.cfd_feed_actor_inbox, - ) - .await?; + }; + + append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; // 4. Notify the taker that we are ready for contract setup // Use `.send` here to ensure we only continue once the message has been sent @@ -658,7 +638,7 @@ impl Actor { } }; - self.reject_order(taker_id, order_id, conn).await?; + self.reject_order(taker_id, cfd, conn).await?; Ok(()) } @@ -671,26 +651,22 @@ impl Actor { async fn reject_order( &mut self, taker_id: TakerId, - order_id: OrderId, + mut cfd: Cfd, mut conn: PoolConnection, ) -> Result<()> { // Update order in db - insert_new_cfd_state_by_order_id( - order_id, - &CfdState::Rejected { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, + cfd.state = CfdState::Rejected { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), }, - &mut conn, - &self.cfd_feed_actor_inbox, - ) - .await?; + }; + + append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; self.takers .do_send_async(maker_inc_connections::TakerMessage { taker_id, - command: TakerCommand::NotifyOrderRejected { id: order_id }, + command: TakerCommand::NotifyOrderRejected { id: cfd.order.id }, }) .await?; diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 4c3c69c..49f7d1a 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -1,4 +1,4 @@ -use crate::cfd_actors::{self, insert_cfd, insert_new_cfd_state_by_order_id}; +use crate::cfd_actors::{self, append_cfd_state, insert_cfd}; use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; use crate::model::cfd::{ Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc, Order, @@ -156,7 +156,7 @@ impl Actor { }, ); - insert_cfd(cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + insert_cfd(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; self.send_to_maker .do_send_async(wire::TakerToMaker::TakeOrder { order_id, quantity }) @@ -268,19 +268,14 @@ impl Actor { } let mut conn = self.db.acquire().await?; - insert_new_cfd_state_by_order_id( - order_id, - &CfdState::ContractSetup { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, + let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + cfd.state = CfdState::ContractSetup { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), }, - &mut conn, - &self.cfd_feed_actor_inbox, - ) - .await?; + }; - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; let offer_announcement = self .oracle_actor @@ -326,17 +321,14 @@ impl Actor { tracing::debug!(%order_id, "Order rejected"); let mut conn = self.db.acquire().await?; - insert_new_cfd_state_by_order_id( - order_id, - &CfdState::Rejected { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, + let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + cfd.state = CfdState::Rejected { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), }, - &mut conn, - &self.cfd_feed_actor_inbox, - ) - .await?; + }; + + append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; Ok(()) } @@ -366,13 +358,7 @@ impl Actor { cfd.handle(CfdStateChangeEvent::ProposalSigned( CollaborativeSettlement::new(tx, dlc.script_pubkey_for(cfd.role()), proposal.price), ))?; - insert_new_cfd_state_by_order_id( - cfd.order.id, - &cfd.state, - &mut conn, - &self.cfd_feed_actor_inbox, - ) - .await?; + append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; self.remove_pending_proposal(&order_id)?; @@ -489,20 +475,16 @@ impl Actor { tracing::info!("Setup complete, publishing on chain now"); let mut conn = self.db.acquire().await?; - - insert_new_cfd_state_by_order_id( - order_id, - &CfdState::PendingOpen { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc: dlc.clone(), - attestation: None, + let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + cfd.state = CfdState::PendingOpen { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), }, - &mut conn, - &self.cfd_feed_actor_inbox, - ) - .await?; + dlc: dlc.clone(), + attestation: None, + }; + + append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; let txid = self .wallet @@ -511,8 +493,6 @@ impl Actor { tracing::info!("Lock transaction published with txid {}", txid); - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - self.monitor_actor .do_send_async(monitor::StartMonitoring { id: order_id, @@ -538,22 +518,17 @@ impl Actor { self.roll_over_state = RollOverState::None; let mut conn = self.db.acquire().await?; - insert_new_cfd_state_by_order_id( - order_id, - &CfdState::Open { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc: dlc.clone(), - attestation: None, - collaborative_close: None, + let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + cfd.state = CfdState::Open { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), }, - &mut conn, - &self.cfd_feed_actor_inbox, - ) - .await?; + dlc: dlc.clone(), + attestation: None, + collaborative_close: None, + }; - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; self.monitor_actor .do_send_async(monitor::StartMonitoring {