From 9666b63cb7ef01d0331f69d63b536653e8a0c324 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Mon, 11 Oct 2021 10:07:56 +1100 Subject: [PATCH] Optional state transition and early exit Reasoning: The state transition can fail, hence the result. It can, however, happen that there is no failure, but we still did not transition to a new state. In such cases we don't want to save the state in the database because it would be an unwanted duplicated entry. --- daemon/src/maker_cfd.rs | 44 +++++++++++++++++++++++++++----------- daemon/src/model/cfd.rs | 14 ++++++++++-- daemon/src/taker_cfd.rs | 47 +++++++++++++++++++++++++++++------------ 3 files changed, 77 insertions(+), 28 deletions(-) diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 4738f4f..2558c33 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -12,7 +12,7 @@ use crate::model::{TakerId, Usd}; use crate::monitor::MonitorParams; use crate::wallet::Wallet; use crate::{maker_inc_connections, monitor, oracle, setup_contract, wire}; -use anyhow::{Context as _, Result}; +use anyhow::{bail, Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; use futures::channel::mpsc; @@ -617,8 +617,9 @@ impl Actor { tracing::info!("Commit transaction published on chain: {}", txid); - let new_state = cfd.handle(CfdStateChangeEvent::CommitTxSent)?; - insert_new_cfd_state_by_order_id(cfd.order.id, new_state, &mut conn).await?; + if let Some(new_state) = cfd.handle(CfdStateChangeEvent::CommitTxSent)? { + insert_new_cfd_state_by_order_id(cfd.order.id, new_state, &mut conn).await?; + } self.cfd_feed_actor_inbox .send(load_all_cfds(&mut conn).await?)?; @@ -785,15 +786,21 @@ impl Actor { let mut conn = self.db.acquire().await?; let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - let new_state = cfd.handle(CfdStateChangeEvent::Monitor(event))?; - insert_new_cfd_state_by_order_id(order_id, new_state.clone(), &mut conn).await?; + if cfd.handle(CfdStateChangeEvent::Monitor(event))?.is_none() { + // early exit if there was not state change + // this is for cases where we are already in a final state + return Ok(()); + } + + insert_new_cfd_state_by_order_id(order_id, cfd.state.clone(), &mut conn).await?; + self.cfd_feed_actor_inbox .send(load_all_cfds(&mut conn).await?)?; // TODO: code duplication maker/taker - if let CfdState::OpenCommitted { .. } = new_state { + if let CfdState::OpenCommitted { .. } = cfd.state { self.try_cet_publication(cfd).await?; - } else if let CfdState::MustRefund { .. } = new_state { + } else if let CfdState::MustRefund { .. } = cfd.state { let signed_refund_tx = cfd.refund_tx()?; let txid = self .wallet @@ -816,9 +823,18 @@ impl Actor { let cfds = load_cfds_by_oracle_event_id(attestation.id.clone(), &mut conn).await?; for mut cfd in cfds { - cfd.handle(CfdStateChangeEvent::OracleAttestation( - attestation.clone().into(), - ))?; + if cfd + .handle(CfdStateChangeEvent::OracleAttestation( + attestation.clone().into(), + ))? + .is_none() + { + // if we don't transition to a new state after oracle attestation we ignore the cfd + // this is for cases where we cannot handle the attestation which should be in a + // final state + continue; + } + insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn).await?; self.try_cet_publication(cfd).await?; @@ -836,8 +852,12 @@ impl Actor { let txid = self.wallet.try_broadcast_transaction(cet).await?; tracing::info!("CET published with txid {}", txid); - cfd.handle(CfdStateChangeEvent::CetSent)?; - insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state, &mut conn).await?; + if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() { + bail!("If we can get the CET we should be able to transition") + } + + insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn) + .await?; } Err(not_ready_yet) => { tracing::debug!("{:#}", not_ready_yet); diff --git a/daemon/src/model/cfd.rs b/daemon/src/model/cfd.rs index 1f46b33..b20eded 100644 --- a/daemon/src/model/cfd.rs +++ b/daemon/src/model/cfd.rs @@ -543,7 +543,7 @@ impl Cfd { #[allow(dead_code)] pub const CET_TIMELOCK: u32 = 12; - pub fn handle(&mut self, event: CfdStateChangeEvent) -> Result { + pub fn handle(&mut self, event: CfdStateChangeEvent) -> Result> { use CfdState::*; // TODO: Display impl @@ -551,6 +551,16 @@ impl Cfd { let order_id = self.order.id; + // early exit if already final + if let SetupFailed { .. } | Closed { .. } | Refunded { .. } = self.state.clone() { + tracing::trace!( + "Ignoring event {:?} because cfd already in state {}", + event, + self.state + ); + return Ok(None); + } + let new_state = match event { CfdStateChangeEvent::Monitor(event) => match event { monitor::Event::LockFinality(_) => { @@ -792,7 +802,7 @@ impl Cfd { self.state = new_state.clone(); - Ok(new_state) + Ok(Some(new_state)) } pub fn refund_tx(&self) -> Result { diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index ecc0f9c..cb252ee 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -12,7 +12,7 @@ use crate::monitor::{self, MonitorParams}; use crate::wallet::Wallet; use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg}; use crate::{oracle, send_to_socket, setup_contract, wire}; -use anyhow::{Context as _, Result}; +use anyhow::{bail, Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; use futures::channel::mpsc; @@ -541,17 +541,21 @@ impl Actor { let mut conn = self.db.acquire().await?; let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - let new_state = cfd.handle(CfdStateChangeEvent::Monitor(event))?; + if cfd.handle(CfdStateChangeEvent::Monitor(event))?.is_none() { + // early exit if there was not state change + // this is for cases where we are already in a final state + return Ok(()); + } - insert_new_cfd_state_by_order_id(order_id, new_state.clone(), &mut conn).await?; + insert_new_cfd_state_by_order_id(order_id, cfd.state.clone(), &mut conn).await?; self.cfd_feed_actor_inbox .send(load_all_cfds(&mut conn).await?)?; // TODO: code duplicateion maker/taker - if let CfdState::OpenCommitted { .. } = new_state { + if let CfdState::OpenCommitted { .. } = cfd.state { self.try_cet_publication(cfd).await?; - } else if let CfdState::MustRefund { .. } = new_state { + } else if let CfdState::MustRefund { .. } = cfd.state { let signed_refund_tx = cfd.refund_tx()?; let txid = self .wallet @@ -576,14 +580,16 @@ impl Actor { .try_broadcast_transaction(signed_commit_tx) .await?; - tracing::info!("Commit transaction published on chain: {}", txid); - - let new_state = cfd.handle(CfdStateChangeEvent::CommitTxSent)?; - insert_new_cfd_state_by_order_id(cfd.order.id, new_state, &mut conn).await?; + if cfd.handle(CfdStateChangeEvent::CommitTxSent)?.is_none() { + bail!("If we can get the commit tx we should be able to transition") + } + insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn).await?; self.cfd_feed_actor_inbox .send(load_all_cfds(&mut conn).await?)?; + tracing::info!("Commit transaction published on chain: {}", txid); + Ok(()) } @@ -597,9 +603,18 @@ impl Actor { let cfds = load_cfds_by_oracle_event_id(attestation.id.clone(), &mut conn).await?; for mut cfd in cfds { - cfd.handle(CfdStateChangeEvent::OracleAttestation( - attestation.clone().into(), - ))?; + if cfd + .handle(CfdStateChangeEvent::OracleAttestation( + attestation.clone().into(), + ))? + .is_none() + { + // if we don't transition to a new state after oracle attestation we ignore the cfd + // this is for cases where we cannot handle the attestation which should be in a + // final state + continue; + } + insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn).await?; self.try_cet_publication(cfd).await?; @@ -617,8 +632,12 @@ impl Actor { let txid = self.wallet.try_broadcast_transaction(cet).await?; tracing::info!("CET published with txid {}", txid); - cfd.handle(CfdStateChangeEvent::CetSent)?; - insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state, &mut conn).await?; + if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() { + bail!("If we can get the CET we should be able to transition") + } + + insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn) + .await?; } Err(not_ready_yet) => { tracing::debug!("{:#}", not_ready_yet);