diff --git a/daemon/src/db.rs b/daemon/src/db.rs index adc5678..b45d75f 100644 --- a/daemon/src/db.rs +++ b/daemon/src/db.rs @@ -172,10 +172,9 @@ pub async fn insert_new_cfd_state_by_order_id( .await .context("loading latest state failed")?; - // make sure that the new state is different than the current one to avoid that we save the same - // state twice if mem::discriminant(&latest_cfd_state_in_db) == mem::discriminant(&new_state) { - tracing::warn!( + // Since we have states where we add information this happens quite frequently + tracing::trace!( "Same state transition for cfd with order_id {}: {}", order_id, latest_cfd_state_in_db diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 1bf8b24..235f443 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -12,7 +12,7 @@ use crate::model::{TakerId, Usd}; use crate::monitor::MonitorParams; use crate::wallet::Wallet; use crate::{maker_inc_connections, monitor, oracle, setup_contract, wire}; -use anyhow::{Context as _, Result}; +use anyhow::{bail, Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; use futures::channel::mpsc; @@ -617,8 +617,9 @@ impl Actor { tracing::info!("Commit transaction published on chain: {}", txid); - let new_state = cfd.handle(CfdStateChangeEvent::CommitTxSent)?; - insert_new_cfd_state_by_order_id(cfd.order.id, new_state, &mut conn).await?; + if let Some(new_state) = cfd.handle(CfdStateChangeEvent::CommitTxSent)? { + insert_new_cfd_state_by_order_id(cfd.order.id, new_state, &mut conn).await?; + } self.cfd_feed_actor_inbox .send(load_all_cfds(&mut conn).await?)?; @@ -785,15 +786,21 @@ impl Actor { let mut conn = self.db.acquire().await?; let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - let new_state = cfd.handle(CfdStateChangeEvent::Monitor(event))?; - insert_new_cfd_state_by_order_id(order_id, new_state.clone(), &mut conn).await?; + if cfd.handle(CfdStateChangeEvent::Monitor(event))?.is_none() { + // early exit if there was not state change + // this is for cases where we are already in a final state + return Ok(()); + } + + insert_new_cfd_state_by_order_id(order_id, cfd.state.clone(), &mut conn).await?; + self.cfd_feed_actor_inbox .send(load_all_cfds(&mut conn).await?)?; // TODO: code duplication maker/taker - if let CfdState::OpenCommitted { .. } = new_state { + if let CfdState::OpenCommitted { .. } = cfd.state { self.try_cet_publication(cfd).await?; - } else if let CfdState::MustRefund { .. } = new_state { + } else if let CfdState::MustRefund { .. } = cfd.state { let signed_refund_tx = cfd.refund_tx()?; let txid = self .wallet @@ -816,10 +823,26 @@ impl Actor { let cfds = load_cfds_by_oracle_event_id(attestation.id.clone(), &mut conn).await?; for mut cfd in cfds { - cfd.handle(CfdStateChangeEvent::OracleAttestation(attestation.clone()))?; + if cfd + .handle(CfdStateChangeEvent::OracleAttestation( + attestation.clone().into(), + ))? + .is_none() + { + // if we don't transition to a new state after oracle attestation we ignore the cfd + // this is for cases where we cannot handle the attestation which should be in a + // final state + continue; + } + insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn).await?; + self.cfd_feed_actor_inbox + .send(load_all_cfds(&mut conn).await?)?; - self.try_cet_publication(cfd).await?; + if let Err(e) = self.try_cet_publication(cfd).await { + tracing::error!("Error when trying to publish CET: {:#}", e); + continue; + } } Ok(()) @@ -834,14 +857,18 @@ impl Actor { let txid = self.wallet.try_broadcast_transaction(cet).await?; tracing::info!("CET published with txid {}", txid); - cfd.handle(CfdStateChangeEvent::CetSent)?; - insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state, &mut conn).await?; + if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() { + bail!("If we can get the CET we should be able to transition") + } + + insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn) + .await?; + + self.cfd_feed_actor_inbox + .send(load_all_cfds(&mut conn).await?)?; } Err(not_ready_yet) => { - tracing::debug!( - "Attestation received but we are not ready to publish it yet: {:#}", - not_ready_yet - ); + tracing::debug!("{:#}", not_ready_yet); return Ok(()); } }; diff --git a/daemon/src/model/cfd.rs b/daemon/src/model/cfd.rs index fc2d0e1..617dc97 100644 --- a/daemon/src/model/cfd.rs +++ b/daemon/src/model/cfd.rs @@ -1,6 +1,5 @@ use crate::model::{Leverage, OracleEventId, Percent, Position, TakerId, TradingPair, Usd}; -use crate::monitor; -use crate::oracle::Attestation; +use crate::{monitor, oracle}; use anyhow::{bail, Context, Result}; use bdk::bitcoin::secp256k1::{SecretKey, Signature}; use bdk::bitcoin::{Address, Amount, PublicKey, Script, SignedAmount, Transaction, Txid}; @@ -281,6 +280,33 @@ pub enum CfdState { }, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct Attestation { + pub id: OracleEventId, + pub price: u64, + pub scalars: Vec, +} + +impl From for Attestation { + fn from(attestation: oracle::Attestation) -> Self { + Attestation { + id: attestation.id, + price: attestation.price, + scalars: attestation.scalars, + } + } +} + +impl From for oracle::Attestation { + fn from(attestation: Attestation) -> oracle::Attestation { + oracle::Attestation { + id: attestation.id, + price: attestation.price, + scalars: attestation.scalars, + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(tag = "type", content = "payload")] pub enum CetStatus { @@ -290,6 +316,17 @@ pub enum CetStatus { Ready(Attestation), } +impl fmt::Display for CetStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + CetStatus::Unprepared => write!(f, "Unprepared"), + CetStatus::TimelockExpired => write!(f, "TimelockExpired"), + CetStatus::OracleSigned(_) => write!(f, "OracleSigned"), + CetStatus::Ready(_) => write!(f, "Ready"), + } + } +} + impl CfdState { fn get_common(&self) -> CfdStateCommon { let common = match self { @@ -505,7 +542,7 @@ impl Cfd { #[allow(dead_code)] pub const CET_TIMELOCK: u32 = 12; - pub fn handle(&mut self, event: CfdStateChangeEvent) -> Result { + pub fn handle(&mut self, event: CfdStateChangeEvent) -> Result> { use CfdState::*; // TODO: Display impl @@ -513,6 +550,16 @@ impl Cfd { let order_id = self.order.id; + // early exit if already final + if let SetupFailed { .. } | Closed { .. } | Refunded { .. } = self.state.clone() { + tracing::trace!( + "Ignoring event {:?} because cfd already in state {}", + event, + self.state + ); + return Ok(None); + } + let new_state = match event { CfdStateChangeEvent::Monitor(event) => match event { monitor::Event::LockFinality(_) => { @@ -543,11 +590,20 @@ impl Cfd { } } monitor::Event::CommitFinality(_) => { - let dlc = if let PendingCommit { dlc, .. } = self.state.clone() { - dlc - } else if let PendingOpen { dlc, .. } | Open { dlc, .. } = self.state.clone() { + let (dlc, attestation) = if let PendingCommit { + dlc, attestation, .. + } = self.state.clone() + { + (dlc, attestation) + } else if let PendingOpen { + dlc, attestation, .. + } + | Open { + dlc, attestation, .. + } = self.state.clone() + { tracing::debug!(%order_id, "Was in unexpected state {}, jumping ahead to OpenCommitted", self.state); - dlc + (dlc, attestation) } else { bail!( "Cannot transition to OpenCommitted because of unexpected state {}", @@ -560,7 +616,11 @@ impl Cfd { transition_timestamp: SystemTime::now(), }, dlc, - cet_status: CetStatus::Unprepared, + cet_status: if let Some(attestation) = attestation { + CetStatus::OracleSigned(attestation) + } else { + CetStatus::Unprepared + }, } } monitor::Event::CetTimelockExpired(_) => match self.state.clone() { @@ -620,7 +680,7 @@ impl Cfd { dlc } else { bail!( - "Cannot transition to OpenCommitted because of unexpected state {}", + "Cannot transition to MustRefund because of unexpected state {}", self.state ) }; @@ -681,7 +741,7 @@ impl Cfd { } } CfdStateChangeEvent::OracleAttestation(attestation) => match self.state.clone() { - CfdState::Open { dlc, .. } => CfdState::Open { + CfdState::PendingOpen { dlc, .. } | CfdState::Open { dlc, .. } => CfdState::Open { common: CfdStateCommon { transition_timestamp: SystemTime::now(), }, @@ -741,7 +801,7 @@ impl Cfd { self.state = new_state.clone(); - Ok(new_state) + Ok(Some(new_state)) } pub fn refund_tx(&self) -> Result { @@ -822,10 +882,13 @@ impl Cfd { cet_status: CetStatus::Ready(attestation), .. } => (dlc, attestation), - CfdState::OpenCommitted { .. } - | CfdState::Open { .. } - | CfdState::PendingCommit { .. } => { - return Ok(Err(NotReadyYet)); + CfdState::OpenCommitted { cet_status, .. } => { + return Ok(Err(NotReadyYet { cet_status })); + } + CfdState::Open { .. } | CfdState::PendingCommit { .. } => { + return Ok(Err(NotReadyYet { + cet_status: CetStatus::Unprepared, + })); } _ => bail!("Cannot publish CET in state {}", self.state.clone()), }; @@ -919,9 +982,11 @@ impl Cfd { } } -#[derive(thiserror::Error, Debug, Clone, Copy)] -#[error("The cfd is not ready for CET publication yet")] -pub struct NotReadyYet; +#[derive(thiserror::Error, Debug, Clone)] +#[error("The cfd is not ready for CET publication yet: {cet_status}")] +pub struct NotReadyYet { + cet_status: CetStatus, +} #[derive(Debug, Clone)] pub enum CfdStateChangeEvent { @@ -1301,9 +1366,9 @@ pub struct Dlc { pub cets: HashMap>, pub refund: (Transaction, Signature), - #[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")] + #[serde(with = "::bdk::bitcoin::util::amount::serde::as_sat")] pub maker_lock_amount: Amount, - #[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")] + #[serde(with = "::bdk::bitcoin::util::amount::serde::as_sat")] pub taker_lock_amount: Amount, pub revoked_commit: Vec, diff --git a/daemon/src/monitor.rs b/daemon/src/monitor.rs index c847b99..447f463 100644 --- a/daemon/src/monitor.rs +++ b/daemon/src/monitor.rs @@ -111,7 +111,7 @@ where actor.monitor_refund_finality(¶ms,cfd.order.id); } CetStatus::OracleSigned(attestation) => { - actor.monitor_cet_finality(map_cets(dlc.cets, dlc.maker_address.script_pubkey()), attestation, cfd.order.id)?; + actor.monitor_cet_finality(map_cets(dlc.cets, dlc.maker_address.script_pubkey()), attestation.into(), cfd.order.id)?; actor.monitor_commit_cet_timelock(¶ms, cfd.order.id); actor.monitor_commit_refund_timelock(¶ms, cfd.order.id); actor.monitor_refund_finality(¶ms,cfd.order.id); @@ -121,7 +121,7 @@ where actor.monitor_refund_finality(¶ms,cfd.order.id); } CetStatus::Ready(attestation) => { - actor.monitor_cet_finality(map_cets(dlc.cets, dlc.maker_address.script_pubkey()), attestation, cfd.order.id)?; + actor.monitor_cet_finality(map_cets(dlc.cets, dlc.maker_address.script_pubkey()), attestation.into(), cfd.order.id)?; actor.monitor_commit_refund_timelock(¶ms, cfd.order.id); actor.monitor_refund_finality(¶ms,cfd.order.id); } diff --git a/daemon/src/oracle.rs b/daemon/src/oracle.rs index f316cde..60ee365 100644 --- a/daemon/src/oracle.rs +++ b/daemon/src/oracle.rs @@ -10,7 +10,7 @@ use reqwest::StatusCode; use rocket::time::format_description::FormatItem; use rocket::time::macros::format_description; use rocket::time::{Duration, OffsetDateTime, Time}; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; use std::collections::{HashMap, HashSet}; use std::ops::Add; use time::ext::NumericalDuration; @@ -133,7 +133,7 @@ where } }; - let attestation = dbg!(res).json::().await?; + let attestation = res.json::().await?; self.cfd_actor_address .clone() @@ -284,7 +284,7 @@ impl From for cfd_protocol::Announcement { // TODO: Implement real deserialization once price attestation is // implemented in `olivia` -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Clone, Deserialize, PartialEq)] #[serde(try_from = "olivia_api::Response")] pub struct Attestation { pub id: OracleEventId, diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 9ffe227..5a7dd7a 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -12,7 +12,7 @@ use crate::monitor::{self, MonitorParams}; use crate::wallet::Wallet; use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg}; use crate::{oracle, send_to_socket, setup_contract, wire}; -use anyhow::{Context as _, Result}; +use anyhow::{bail, Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; use futures::channel::mpsc; @@ -541,17 +541,21 @@ impl Actor { let mut conn = self.db.acquire().await?; let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - let new_state = cfd.handle(CfdStateChangeEvent::Monitor(event))?; + if cfd.handle(CfdStateChangeEvent::Monitor(event))?.is_none() { + // early exit if there was not state change + // this is for cases where we are already in a final state + return Ok(()); + } - insert_new_cfd_state_by_order_id(order_id, new_state.clone(), &mut conn).await?; + insert_new_cfd_state_by_order_id(order_id, cfd.state.clone(), &mut conn).await?; self.cfd_feed_actor_inbox .send(load_all_cfds(&mut conn).await?)?; // TODO: code duplicateion maker/taker - if let CfdState::OpenCommitted { .. } = new_state { + if let CfdState::OpenCommitted { .. } = cfd.state { self.try_cet_publication(cfd).await?; - } else if let CfdState::MustRefund { .. } = new_state { + } else if let CfdState::MustRefund { .. } = cfd.state { let signed_refund_tx = cfd.refund_tx()?; let txid = self .wallet @@ -576,14 +580,16 @@ impl Actor { .try_broadcast_transaction(signed_commit_tx) .await?; - tracing::info!("Commit transaction published on chain: {}", txid); - - let new_state = cfd.handle(CfdStateChangeEvent::CommitTxSent)?; - insert_new_cfd_state_by_order_id(cfd.order.id, new_state, &mut conn).await?; + if cfd.handle(CfdStateChangeEvent::CommitTxSent)?.is_none() { + bail!("If we can get the commit tx we should be able to transition") + } + insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn).await?; self.cfd_feed_actor_inbox .send(load_all_cfds(&mut conn).await?)?; + tracing::info!("Commit transaction published on chain: {}", txid); + Ok(()) } @@ -597,10 +603,26 @@ impl Actor { let cfds = load_cfds_by_oracle_event_id(attestation.id.clone(), &mut conn).await?; for mut cfd in cfds { - cfd.handle(CfdStateChangeEvent::OracleAttestation(attestation.clone()))?; + if cfd + .handle(CfdStateChangeEvent::OracleAttestation( + attestation.clone().into(), + ))? + .is_none() + { + // if we don't transition to a new state after oracle attestation we ignore the cfd + // this is for cases where we cannot handle the attestation which should be in a + // final state + continue; + } + insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn).await?; + self.cfd_feed_actor_inbox + .send(load_all_cfds(&mut conn).await?)?; - self.try_cet_publication(cfd).await?; + if let Err(e) = self.try_cet_publication(cfd).await { + tracing::error!("Error when trying to publish CET: {:#}", e); + continue; + } } Ok(()) @@ -615,14 +637,18 @@ impl Actor { let txid = self.wallet.try_broadcast_transaction(cet).await?; tracing::info!("CET published with txid {}", txid); - cfd.handle(CfdStateChangeEvent::CetSent)?; - insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state, &mut conn).await?; + if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() { + bail!("If we can get the CET we should be able to transition") + } + + insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn) + .await?; + + self.cfd_feed_actor_inbox + .send(load_all_cfds(&mut conn).await?)?; } Err(not_ready_yet) => { - tracing::debug!( - "Attestation received but we are not ready to publish it yet: {:#}", - not_ready_yet - ); + tracing::debug!("{:#}", not_ready_yet); return Ok(()); } };