From 75033a9c32893fcbcfacaa49222c5e13fab5de6f Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Mon, 27 Sep 2021 15:25:11 +1000 Subject: [PATCH 1/5] Allow transitions to same state in db We currently have cases where the CFD might transition to a same state twice when restarting. Furthermore, we might transition from `Open` to `Open` upon renewal. Since it is currently unclear if this restriction is really needed we just print a warning in the logs instead of failing. --- daemon/src/db.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/daemon/src/db.rs b/daemon/src/db.rs index 74a0126..f5823d3 100644 --- a/daemon/src/db.rs +++ b/daemon/src/db.rs @@ -170,7 +170,11 @@ pub async fn insert_new_cfd_state_by_order_id( // make sure that the new state is different than the current one to avoid that we save the same // state twice if mem::discriminant(&latest_cfd_state_in_db) == mem::discriminant(&new_state) { - anyhow::bail!("Cannot insert new state {} for cfd with order_id {} because it currently already is in state {}", new_state, order_id, latest_cfd_state_in_db); + tracing::warn!( + "Same state transition for cfd with order_id {}: {}", + order_id, + latest_cfd_state_in_db + ); } let cfd_state = serde_json::to_string(&new_state)?; From 4fb84cc58f14d50788ce9933451d985a14216d76 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Mon, 27 Sep 2021 15:31:24 +1000 Subject: [PATCH 2/5] Bundle monitoring events that change cfd state This goes towards handling all cfd state transitions in one place, so it becomes easier to reason about it. Currently, state transitions are scattered over the code of the cfd actor. This change bundles the actual transition in the cfd itself. Inserting the new state into the db is still in the responsibility of the caller. note: the `monitor` import in `taker.rs` should technically not be part of this commit, but was needed because I decided that the `CfdMonitoringEvent` should live in the `monitor` actor. But since that is not part of the `Cfd` in `model` the taker needs to know about the `monitor`. --- daemon/src/maker_cfd.rs | 339 +++++----------------------------------- daemon/src/model/cfd.rs | 189 +++++++++++++++++++++- daemon/src/monitor.rs | 67 ++++---- daemon/src/taker.rs | 1 + 4 files changed, 254 insertions(+), 342 deletions(-) diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index f4e0344..9a88d54 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -4,16 +4,13 @@ use crate::db::{ load_cfd_by_order_id, load_order_by_id, }; use crate::maker_inc_connections::TakerCommand; -use crate::model::cfd::{CetStatus, Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId}; +use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId}; use crate::model::{TakerId, Usd}; -use crate::monitor::{ - CetTimelockExpired, CommitFinality, LockFinality, MonitorParams, RefundFinality, - RefundTimelockExpired, -}; +use crate::monitor::{CfdMonitoringEvent, MonitorParams}; use crate::wallet::Wallet; use crate::wire::SetupMsg; use crate::{maker_inc_connections, monitor, setup_contract_actor}; -use anyhow::{bail, Result}; +use anyhow::Result; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::{Amount, PublicKey}; @@ -414,276 +411,44 @@ impl Actor { Ok(()) } - async fn handle_lock_finality(&mut self, msg: LockFinality) -> Result<()> { - let order_id = msg.0; - tracing::debug!(%order_id, "Lock transaction has reached finality"); - - let mut conn = self.db.acquire().await?; - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - - use CfdState::*; - let dlc = match cfd.state { - PendingOpen { dlc, .. } => dlc, - OutgoingOrderRequest { .. } => unreachable!("taker-only state"), - IncomingOrderRequest { .. } - | Accepted { .. } - | Rejected { .. } - | ContractSetup { .. } => bail!("Did not expect lock finality yet: ignoring"), - Open { .. } | OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => { - bail!("State already assumes lock finality: ignoring") - } - }; - - insert_new_cfd_state_by_order_id( - msg.0, - CfdState::Open { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc, - }, - &mut conn, - ) - .await?; - - Ok(()) - } - - async fn handle_commit_finality(&mut self, msg: CommitFinality) -> Result<()> { - let order_id = msg.0; - tracing::debug!(%order_id, "Commit transaction has reached finality"); - - let mut conn = self.db.acquire().await?; - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - - use CfdState::*; - let dlc = match cfd.state { - Open { dlc, .. } => dlc, - PendingOpen { dlc, .. } => { - tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); - dlc - } - OutgoingOrderRequest { .. } => unreachable!("taker-only state"), - IncomingOrderRequest { .. } - | Accepted { .. } - | Rejected { .. } - | ContractSetup { .. } => bail!("Did not expect commit finality yet: ignoring"), - OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => { - bail!("State already assumes commit finality: ignoring") - } - }; - - insert_new_cfd_state_by_order_id( - msg.0, - CfdState::OpenCommitted { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc, - cet_status: CetStatus::Unprepared, - }, - &mut conn, - ) - .await?; - - Ok(()) - } - - async fn handle_cet_timelock_expired(&mut self, msg: CetTimelockExpired) -> Result<()> { - let order_id = msg.0; - tracing::debug!(%order_id, "CET timelock has expired"); - - let mut conn = self.db.acquire().await?; - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - - use CfdState::*; - let new_state = match cfd.state { - CfdState::OpenCommitted { - dlc, - cet_status: CetStatus::Unprepared, - .. - } => CfdState::OpenCommitted { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc, - cet_status: CetStatus::TimelockExpired, - }, - CfdState::OpenCommitted { - dlc, - cet_status: CetStatus::OracleSigned(price), - .. - } => CfdState::OpenCommitted { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc, - cet_status: CetStatus::Ready(price), - }, - PendingOpen { dlc, .. } => { - tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); - CfdState::OpenCommitted { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc, - cet_status: CetStatus::TimelockExpired, - } - } - Open { dlc, .. } => { - tracing::debug!(%order_id, "Was not aware of commit TX broadcast, jumping ahead"); - CfdState::OpenCommitted { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc, - cet_status: CetStatus::TimelockExpired, - } - } - OutgoingOrderRequest { .. } => unreachable!("taker-only state"), - IncomingOrderRequest { .. } - | Accepted { .. } - | Rejected { .. } - | ContractSetup { .. } => bail!("Did not expect CET timelock expiry yet: ignoring"), - OpenCommitted { - cet_status: CetStatus::TimelockExpired, - .. - } - | OpenCommitted { - cet_status: CetStatus::Ready(_), - .. - } => bail!("State already assumes CET timelock expiry: ignoring"), - MustRefund { .. } | Refunded { .. } => { - bail!("Refund path does not care about CET timelock expiry: ignoring") - } - }; - - insert_new_cfd_state_by_order_id(msg.0, new_state, &mut conn).await?; - - Ok(()) - } - - async fn handle_refund_timelock_expired(&mut self, msg: RefundTimelockExpired) -> Result<()> { - let order_id = msg.0; - tracing::debug!(%order_id, "Refund timelock has expired"); + async fn handle_monitoring_event(&mut self, event: CfdMonitoringEvent) -> Result<()> { + let order_id = event.order_id(); let mut conn = self.db.acquire().await?; let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - use CfdState::*; - let dlc = match cfd.state { - OpenCommitted { dlc, .. } => { - insert_new_cfd_state_by_order_id( - msg.0, - MustRefund { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc: dlc.clone(), - }, - &mut conn, - ) + let new_state = cfd.transition_to(CfdStateChangeEvent::Monitor(event))?; + + insert_new_cfd_state_by_order_id(order_id, new_state.clone(), &mut conn).await?; + + // TODO: Consider sending a message to ourselves to trigger broadcasting refund? + if let CfdState::MustRefund { dlc, .. } = new_state { + let sig_hash = spending_tx_sighash( + &dlc.refund.0, + &dlc.commit.2, + Amount::from_sat(dlc.commit.0.output[0].value), + ); + let our_sig = SECP256K1.sign(&sig_hash, &dlc.identity); + let our_pubkey = PublicKey::new(bdk::bitcoin::secp256k1::PublicKey::from_secret_key( + SECP256K1, + &dlc.identity, + )); + let counterparty_sig = dlc.refund.1; + let counterparty_pubkey = dlc.identity_counterparty; + let signed_refund_tx = finalize_spend_transaction( + dlc.refund.0, + &dlc.commit.2, + (our_pubkey, our_sig), + (counterparty_pubkey, counterparty_sig), + )?; + + let txid = self + .wallet + .try_broadcast_transaction(signed_refund_tx) .await?; - dlc - } - MustRefund { .. } | Refunded { .. } => { - bail!("State already assumes refund timelock expiry: ignoring") - } - OutgoingOrderRequest { .. } => unreachable!("taker-only state"), - IncomingOrderRequest { .. } - | Accepted { .. } - | Rejected { .. } - | ContractSetup { .. } => bail!("Did not expect refund timelock expiry yet: ignoring"), - PendingOpen { dlc, .. } => { - tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); - dlc - } - Open { dlc, .. } => { - tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead"); - dlc - } - }; - - insert_new_cfd_state_by_order_id( - msg.0, - MustRefund { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc: dlc.clone(), - }, - &mut conn, - ) - .await?; - - let sig_hash = spending_tx_sighash( - &dlc.refund.0, - &dlc.commit.2, - Amount::from_sat(dlc.commit.0.output[0].value), - ); - let our_sig = SECP256K1.sign(&sig_hash, &dlc.identity); - let our_pubkey = PublicKey::new(bdk::bitcoin::secp256k1::PublicKey::from_secret_key( - SECP256K1, - &dlc.identity, - )); - let counterparty_sig = dlc.refund.1; - let counterparty_pubkey = dlc.identity_counterparty; - let signed_refund_tx = finalize_spend_transaction( - dlc.refund.0, - &dlc.commit.2, - (our_pubkey, our_sig), - (counterparty_pubkey, counterparty_sig), - )?; - - let txid = self - .wallet - .try_broadcast_transaction(signed_refund_tx) - .await?; - - tracing::info!("Refund transaction published on chain: {}", txid); - - Ok(()) - } - - async fn handle_refund_finality(&mut self, msg: RefundFinality) -> Result<()> { - let order_id = msg.0; - tracing::debug!(%order_id, "Refund transaction has reached finality"); - - let mut conn = self.db.acquire().await?; - - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - - use CfdState::*; - match cfd.state { - MustRefund { .. } => (), - OutgoingOrderRequest { .. } => unreachable!("taker-only state"), - IncomingOrderRequest { .. } - | Accepted { .. } - | Rejected { .. } - | ContractSetup { .. } => bail!("Did not expect refund finality yet: ignoring"), - PendingOpen { .. } => { - tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); - } - Open { .. } => { - tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead"); - } - OpenCommitted { .. } => { - tracing::debug!(%order_id, "Was waiting on refund timelock expiry, jumping ahead"); - } - Refunded { .. } => bail!("State already assumes refund finality: ignoring"), - }; - - insert_new_cfd_state_by_order_id( - msg.0, - CfdState::Refunded { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - }, - &mut conn, - ) - .await?; + tracing::info!("Refund transaction published on chain: {}", txid); + } Ok(()) } @@ -739,37 +504,9 @@ impl Handler for Actor { } #[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: LockFinality, _ctx: &mut Context) { - log_error!(self.handle_lock_finality(msg)) - } -} - -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: CommitFinality, _ctx: &mut Context) { - log_error!(self.handle_commit_finality(msg)) - } -} - -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: CetTimelockExpired, _ctx: &mut Context) { - log_error!(self.handle_cet_timelock_expired(msg)) - } -} - -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: RefundTimelockExpired, _ctx: &mut Context) { - log_error!(self.handle_refund_timelock_expired(msg)) - } -} - -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: RefundFinality, _ctx: &mut Context) { - log_error!(self.handle_refund_finality(msg)) +impl Handler for Actor { + async fn handle(&mut self, msg: CfdMonitoringEvent, _ctx: &mut Context) { + log_error!(self.handle_monitoring_event(msg)) } } diff --git a/daemon/src/model/cfd.rs b/daemon/src/model/cfd.rs index 93672ee..c399dde 100644 --- a/daemon/src/model/cfd.rs +++ b/daemon/src/model/cfd.rs @@ -1,5 +1,6 @@ use crate::model::{Leverage, Position, TakerId, TradingPair, Usd}; -use anyhow::Result; +use crate::monitor::CfdMonitoringEvent; +use anyhow::{bail, Result}; use bdk::bitcoin::secp256k1::{SecretKey, Signature}; use bdk::bitcoin::{Address, Amount, PublicKey, Transaction}; use bdk::descriptor::Descriptor; @@ -363,6 +364,192 @@ impl Cfd { #[allow(dead_code)] pub const CET_TIMELOCK: u32 = 12; + + pub fn transition_to(&self, event: CfdStateChangeEvent) -> Result { + use CfdState::*; + + // TODO: Display impl + tracing::info!("Cfd state change event {:?}", event); + + let order_id = self.order.id; + + let new_state = match event { + CfdStateChangeEvent::Monitor(event) => match event { + CfdMonitoringEvent::LockFinality(_) => match self.state.clone() { + PendingOpen { dlc, .. } => CfdState::Open { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + }, + OutgoingOrderRequest { .. } => unreachable!("taker-only state"), + IncomingOrderRequest { .. } + | Accepted { .. } + | Rejected { .. } + | ContractSetup { .. } => bail!("Did not expect lock finality yet: ignoring"), + Open { .. } | OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => { + bail!("State already assumes lock finality: ignoring") + } + }, + CfdMonitoringEvent::CommitFinality(_) => { + let dlc = match self.state.clone() { + Open { dlc, .. } => dlc, + PendingOpen { dlc, .. } => { + tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); + dlc + } + OutgoingOrderRequest { .. } => unreachable!("taker-only state"), + IncomingOrderRequest { .. } + | Accepted { .. } + | Rejected { .. } + | ContractSetup { .. } => { + bail!("Did not expect commit finality yet: ignoring") + } + OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => { + bail!("State already assumes commit finality: ignoring") + } + }; + + OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::Unprepared, + } + } + CfdMonitoringEvent::CetTimelockExpired(_) => match self.state.clone() { + CfdState::OpenCommitted { + dlc, + cet_status: CetStatus::Unprepared, + .. + } => CfdState::OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::TimelockExpired, + }, + CfdState::OpenCommitted { + dlc, + cet_status: CetStatus::OracleSigned(price), + .. + } => CfdState::OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::Ready(price), + }, + PendingOpen { dlc, .. } => { + tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); + CfdState::OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::TimelockExpired, + } + } + Open { dlc, .. } => { + tracing::debug!(%order_id, "Was not aware of commit TX broadcast, jumping ahead"); + CfdState::OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::TimelockExpired, + } + } + OutgoingOrderRequest { .. } => unreachable!("taker-only state"), + IncomingOrderRequest { .. } + | Accepted { .. } + | Rejected { .. } + | ContractSetup { .. } => { + bail!("Did not expect CET timelock expiry yet: ignoring") + } + OpenCommitted { + cet_status: CetStatus::TimelockExpired, + .. + } + | OpenCommitted { + cet_status: CetStatus::Ready(_), + .. + } => bail!("State already assumes CET timelock expiry: ignoring"), + MustRefund { .. } | Refunded { .. } => { + bail!("Refund path does not care about CET timelock expiry: ignoring") + } + }, + CfdMonitoringEvent::RefundTimelockExpired(_) => { + let dlc = match self.state.clone() { + OpenCommitted { dlc, .. } => dlc, + MustRefund { .. } | Refunded { .. } => { + bail!("State already assumes refund timelock expiry: ignoring") + } + OutgoingOrderRequest { .. } => unreachable!("taker-only state"), + IncomingOrderRequest { .. } + | Accepted { .. } + | Rejected { .. } + | ContractSetup { .. } => { + bail!("Did not expect refund timelock expiry yet: ignoring") + } + PendingOpen { dlc, .. } => { + tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); + dlc + } + Open { dlc, .. } => { + tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead"); + dlc + } + }; + + MustRefund { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + } + } + CfdMonitoringEvent::RefundFinality(_) => { + match self.state { + MustRefund { .. } => (), + OutgoingOrderRequest { .. } => unreachable!("taker-only state"), + IncomingOrderRequest { .. } + | Accepted { .. } + | Rejected { .. } + | ContractSetup { .. } => { + bail!("Did not expect refund finality yet: ignoring") + } + PendingOpen { .. } => { + tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); + } + Open { .. } => { + tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead"); + } + OpenCommitted { .. } => { + tracing::debug!(%order_id, "Was waiting on refund timelock expiry, jumping ahead"); + } + Refunded { .. } => bail!("State already assumes refund finality: ignoring"), + } + + Refunded { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + } + } + }, + }; + + Ok(new_state) + } +} + +#[derive(Debug, Clone)] +pub enum CfdStateChangeEvent { + // TODO: groupd other events by actors into enums and add them here so we can bundle all + // transitions into cfd.transition_to(...) + Monitor(CfdMonitoringEvent), } fn calculate_profit( diff --git a/daemon/src/monitor.rs b/daemon/src/monitor.rs index bdc9734..edbdb11 100644 --- a/daemon/src/monitor.rs +++ b/daemon/src/monitor.rs @@ -23,12 +23,7 @@ pub struct MonitorParams { impl Actor where - T: xtra::Actor - + xtra::Handler - + xtra::Handler - + xtra::Handler - + xtra::Handler - + xtra::Handler, + T: xtra::Actor + xtra::Handler, { pub fn new( electrum_rpc_url: &str, @@ -59,7 +54,7 @@ where lock_subscription.wait_until_final().await.unwrap(); cfd_actor_addr - .do_send_async(LockFinality(id)) + .do_send_async(CfdMonitoringEvent::LockFinality(id)) .await .unwrap(); } @@ -77,7 +72,7 @@ where commit_subscription.wait_until_final().await.unwrap(); cfd_actor_addr - .do_send_async(CommitFinality(id)) + .do_send_async(CfdMonitoringEvent::CommitFinality(id)) .await .unwrap(); } @@ -93,7 +88,7 @@ where .unwrap(); cfd_actor_addr - .do_send_async(CetTimelockExpired(id)) + .do_send_async(CfdMonitoringEvent::CetTimelockExpired(id)) .await .unwrap(); } @@ -110,7 +105,7 @@ where .unwrap(); cfd_actor_addr - .do_send_async(RefundTimelockExpired(id)) + .do_send_async(CfdMonitoringEvent::RefundTimelockExpired(id)) .await .unwrap(); } @@ -126,7 +121,7 @@ where refund_subscription.wait_until_final().await.unwrap(); cfd_actor_addr - .do_send_async(RefundFinality(id)) + .do_send_async(CfdMonitoringEvent::RefundFinality(id)) .await .unwrap(); } @@ -147,33 +142,30 @@ impl xtra::Message for StartMonitoring { type Result = (); } -pub struct LockFinality(pub OrderId); - -impl xtra::Message for LockFinality { - type Result = (); -} - -pub struct CommitFinality(pub OrderId); - -impl xtra::Message for CommitFinality { - type Result = (); -} - -pub struct CetTimelockExpired(pub OrderId); - -impl xtra::Message for CetTimelockExpired { - type Result = (); +#[derive(Debug, Clone)] +pub enum CfdMonitoringEvent { + LockFinality(OrderId), + CommitFinality(OrderId), + CetTimelockExpired(OrderId), + RefundTimelockExpired(OrderId), + RefundFinality(OrderId), } -pub struct RefundTimelockExpired(pub OrderId); - -impl xtra::Message for RefundTimelockExpired { - type Result = (); +impl CfdMonitoringEvent { + pub fn order_id(&self) -> OrderId { + let order_id = match self { + CfdMonitoringEvent::LockFinality(order_id) => order_id, + CfdMonitoringEvent::CommitFinality(order_id) => order_id, + CfdMonitoringEvent::CetTimelockExpired(order_id) => order_id, + CfdMonitoringEvent::RefundTimelockExpired(order_id) => order_id, + CfdMonitoringEvent::RefundFinality(order_id) => order_id, + }; + + *order_id + } } -pub struct RefundFinality(pub OrderId); - -impl xtra::Message for RefundFinality { +impl xtra::Message for CfdMonitoringEvent { type Result = (); } @@ -192,12 +184,7 @@ impl xtra::Actor for Actor where T: xtra::Actor {} #[async_trait] impl xtra::Handler for Actor where - T: xtra::Actor - + xtra::Handler - + xtra::Handler - + xtra::Handler - + xtra::Handler - + xtra::Handler, + T: xtra::Actor + xtra::Handler, { async fn handle(&mut self, msg: StartMonitoring, _ctx: &mut xtra::Context) { log_error!(self.handle_start_monitoring(msg)); diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 79adfd5..32b6148 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -24,6 +24,7 @@ mod db; mod keypair; mod logger; mod model; +mod monitor; mod routes; mod routes_taker; mod seed; From 2fd2c30325e6b717c68ba4d11fb92f6760aea470 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Mon, 27 Sep 2021 15:38:24 +1000 Subject: [PATCH 3/5] move refund transaction signing onto cfd --- daemon/src/maker_cfd.rs | 28 ++++------------------------ daemon/src/model/cfd.rs | 32 +++++++++++++++++++++++++++++++- 2 files changed, 35 insertions(+), 25 deletions(-) diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 9a88d54..08d2e50 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -13,9 +13,6 @@ use crate::{maker_inc_connections, monitor, setup_contract_actor}; use anyhow::Result; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; -use bdk::bitcoin::{Amount, PublicKey}; -use cfd_protocol::secp256k1_zkp::SECP256K1; -use cfd_protocol::{finalize_spend_transaction, spending_tx_sighash}; use std::time::SystemTime; use tokio::sync::{mpsc, watch}; use xtra::prelude::*; @@ -421,27 +418,10 @@ impl Actor { insert_new_cfd_state_by_order_id(order_id, new_state.clone(), &mut conn).await?; - // TODO: Consider sending a message to ourselves to trigger broadcasting refund? - if let CfdState::MustRefund { dlc, .. } = new_state { - let sig_hash = spending_tx_sighash( - &dlc.refund.0, - &dlc.commit.2, - Amount::from_sat(dlc.commit.0.output[0].value), - ); - let our_sig = SECP256K1.sign(&sig_hash, &dlc.identity); - let our_pubkey = PublicKey::new(bdk::bitcoin::secp256k1::PublicKey::from_secret_key( - SECP256K1, - &dlc.identity, - )); - let counterparty_sig = dlc.refund.1; - let counterparty_pubkey = dlc.identity_counterparty; - let signed_refund_tx = finalize_spend_transaction( - dlc.refund.0, - &dlc.commit.2, - (our_pubkey, our_sig), - (counterparty_pubkey, counterparty_sig), - )?; - + // TODO: Not sure that should be done here... + // Consider sending a message to ourselves to trigger broadcasting refund? + if let CfdState::MustRefund { .. } = new_state { + let signed_refund_tx = cfd.refund_tx()?; let txid = self .wallet .try_broadcast_transaction(signed_refund_tx) diff --git a/daemon/src/model/cfd.rs b/daemon/src/model/cfd.rs index c399dde..f8cf1cd 100644 --- a/daemon/src/model/cfd.rs +++ b/daemon/src/model/cfd.rs @@ -4,7 +4,8 @@ use anyhow::{bail, Result}; use bdk::bitcoin::secp256k1::{SecretKey, Signature}; use bdk::bitcoin::{Address, Amount, PublicKey, Transaction}; use bdk::descriptor::Descriptor; -use cfd_protocol::secp256k1_zkp::EcdsaAdaptorSignature; +use cfd_protocol::secp256k1_zkp::{EcdsaAdaptorSignature, SECP256K1}; +use cfd_protocol::{finalize_spend_transaction, spending_tx_sighash}; use rust_decimal::Decimal; use rust_decimal_macros::dec; use serde::{Deserialize, Serialize}; @@ -543,6 +544,35 @@ impl Cfd { Ok(new_state) } + + pub fn refund_tx(&self) -> Result { + let dlc = if let CfdState::MustRefund { dlc, .. } = self.state.clone() { + dlc + } else { + bail!("Refund transaction can only be constructed when in state MustRefund, but we are currently in {}", self.state.clone()) + }; + + let sig_hash = spending_tx_sighash( + &dlc.refund.0, + &dlc.commit.2, + Amount::from_sat(dlc.commit.0.output[0].value), + ); + let our_sig = SECP256K1.sign(&sig_hash, &dlc.identity); + let our_pubkey = PublicKey::new(bdk::bitcoin::secp256k1::PublicKey::from_secret_key( + SECP256K1, + &dlc.identity, + )); + let counterparty_sig = dlc.refund.1; + let counterparty_pubkey = dlc.identity_counterparty; + let signed_refund_tx = finalize_spend_transaction( + dlc.refund.0, + &dlc.commit.2, + (our_pubkey, our_sig), + (counterparty_pubkey, counterparty_sig), + )?; + + Ok(signed_refund_tx) + } } #[derive(Debug, Clone)] From 608a962b8e367d0e6a1d325576e770bc8b4a8459 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Mon, 27 Sep 2021 16:05:24 +1000 Subject: [PATCH 4/5] Taker monitors Wire the monitor into the taker. --- daemon/src/taker.rs | 11 +++++++ daemon/src/taker_cfd.rs | 71 ++++++++++++++++++++++++++++++++++++++--- 2 files changed, 77 insertions(+), 5 deletions(-) diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 32b6148..406cd38 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -8,6 +8,7 @@ use model::cfd::{Cfd, Order}; use rocket::fairing::AdHoc; use rocket_db_pools::Database; use seed::Seed; +use std::collections::HashMap; use std::net::SocketAddr; use std::path::PathBuf; use std::thread::sleep; @@ -78,6 +79,7 @@ async fn main() -> Result<()> { let data_dir = opts .data_dir + .clone() .unwrap_or_else(|| std::env::current_dir().expect("unable to get cwd")); if !data_dir.exists() { @@ -163,6 +165,8 @@ async fn main() -> Result<()> { .create(None) .spawn_global(); + let (monitor_actor_address, monitor_actor_context) = xtra::Context::new(None); + let cfd_actor_inbox = taker_cfd::Actor::new( db, wallet.clone(), @@ -170,6 +174,7 @@ async fn main() -> Result<()> { cfd_feed_sender, order_feed_sender, send_to_maker, + monitor_actor_address, ) .await .unwrap() @@ -179,6 +184,12 @@ async fn main() -> Result<()> { let inc_maker_messages_actor = taker_inc_message_actor::new(read, cfd_actor_inbox.clone()); + tokio::spawn(monitor_actor_context.run(monitor::Actor::new( + &opts.electrum, + HashMap::new(), + cfd_actor_inbox.clone(), + ))); + tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); tokio::spawn(inc_maker_messages_actor); diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index e995b9f..445bc35 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -4,11 +4,12 @@ use crate::db::{ }; use crate::actors::log_error; -use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId}; +use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId}; use crate::model::Usd; +use crate::monitor::{CfdMonitoringEvent, MonitorParams}; use crate::wallet::Wallet; use crate::wire::SetupMsg; -use crate::{send_to_socket, setup_contract_actor, wire}; +use crate::{monitor, send_to_socket, setup_contract_actor, wire}; use anyhow::Result; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; @@ -43,6 +44,7 @@ pub struct Actor { // TODO: Move the contract setup into a dedicated actor and send messages to that actor that // manages the state instead of this ugly buffer contract_setup_message_buffer: Vec, + monitor_actor: Address>, } impl Actor { @@ -53,6 +55,7 @@ impl Actor { cfd_feed_actor_inbox: watch::Sender>, order_feed_actor_inbox: watch::Sender>, send_to_maker: Address, + monitor_actor: Address>, ) -> Result { let mut conn = db.acquire().await?; cfd_feed_actor_inbox.send(load_all_cfds(&mut conn).await?)?; @@ -66,6 +69,7 @@ impl Actor { send_to_maker, current_contract_setup: None, contract_setup_message_buffer: vec![], + monitor_actor, }) } @@ -231,12 +235,62 @@ impl Actor { self.cfd_feed_actor_inbox .send(load_all_cfds(&mut conn).await?)?; - let txid = self.wallet.try_broadcast_transaction(dlc.lock.0).await?; + let txid = self + .wallet + .try_broadcast_transaction(dlc.lock.0.clone()) + .await?; tracing::info!("Lock transaction published with txid {}", txid); - // TODO: tx monitoring, once confirmed with x blocks transition the Cfd to - // Open + // TODO: It's a bit suspicious to load this just to get the + // refund timelock + let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + + let script_pubkey = dlc.address.script_pubkey(); + self.monitor_actor + .do_send_async(monitor::StartMonitoring { + id: order_id, + params: MonitorParams { + lock: (dlc.lock.0.txid(), dlc.lock.1), + commit: (dlc.commit.0.txid(), dlc.commit.2), + cets: dlc + .cets + .into_iter() + .map(|(tx, _, range)| (tx.txid(), script_pubkey.clone(), range)) + .collect(), + refund: ( + dlc.refund.0.txid(), + script_pubkey, + cfd.refund_timelock_in_blocks(), + ), + }, + }) + .await?; + + Ok(()) + } + + async fn handle_monitoring_event(&mut self, event: CfdMonitoringEvent) -> Result<()> { + let order_id = event.order_id(); + + let mut conn = self.db.acquire().await?; + let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + + let new_state = cfd.transition_to(CfdStateChangeEvent::Monitor(event))?; + + insert_new_cfd_state_by_order_id(order_id, new_state.clone(), &mut conn).await?; + + // TODO: Not sure that should be done here... + // Consider sending a message to ourselves to trigger broadcasting refund? + if let CfdState::MustRefund { .. } = new_state { + let signed_refund_tx = cfd.refund_tx()?; + let txid = self + .wallet + .try_broadcast_transaction(signed_refund_tx) + .await?; + + tracing::info!("Refund transaction published on chain: {}", txid); + } Ok(()) } @@ -284,6 +338,13 @@ impl Handler for Actor { } } +#[async_trait] +impl Handler for Actor { + async fn handle(&mut self, msg: CfdMonitoringEvent, _ctx: &mut Context) { + log_error!(self.handle_monitoring_event(msg)) + } +} + impl Message for TakeOffer { type Result = (); } From 2fe376cb557bb7a166f8a0fcd359dc9ef8230d29 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Mon, 27 Sep 2021 17:22:12 +1000 Subject: [PATCH 5/5] Work in review comments --- daemon/src/maker_cfd.rs | 13 +++++++------ daemon/src/model/cfd.rs | 16 ++++++++-------- daemon/src/monitor.rs | 30 +++++++++++++++--------------- daemon/src/taker_cfd.rs | 13 +++++++------ 4 files changed, 37 insertions(+), 35 deletions(-) diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 08d2e50..d0eca64 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -6,7 +6,7 @@ use crate::db::{ use crate::maker_inc_connections::TakerCommand; use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId}; use crate::model::{TakerId, Usd}; -use crate::monitor::{CfdMonitoringEvent, MonitorParams}; +use crate::monitor::MonitorParams; use crate::wallet::Wallet; use crate::wire::SetupMsg; use crate::{maker_inc_connections, monitor, setup_contract_actor}; @@ -408,18 +408,19 @@ impl Actor { Ok(()) } - async fn handle_monitoring_event(&mut self, event: CfdMonitoringEvent) -> Result<()> { + async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { let order_id = event.order_id(); let mut conn = self.db.acquire().await?; let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - let new_state = cfd.transition_to(CfdStateChangeEvent::Monitor(event))?; + let new_state = cfd.handle(CfdStateChangeEvent::Monitor(event))?; insert_new_cfd_state_by_order_id(order_id, new_state.clone(), &mut conn).await?; // TODO: Not sure that should be done here... - // Consider sending a message to ourselves to trigger broadcasting refund? + // Consider bubbling the refund availability up to the user, and let user trigger + // transaction publication if let CfdState::MustRefund { .. } = new_state { let signed_refund_tx = cfd.refund_tx()?; let txid = self @@ -484,8 +485,8 @@ impl Handler for Actor { } #[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: CfdMonitoringEvent, _ctx: &mut Context) { +impl Handler for Actor { + async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context) { log_error!(self.handle_monitoring_event(msg)) } } diff --git a/daemon/src/model/cfd.rs b/daemon/src/model/cfd.rs index f8cf1cd..8a6a8e3 100644 --- a/daemon/src/model/cfd.rs +++ b/daemon/src/model/cfd.rs @@ -1,5 +1,5 @@ use crate::model::{Leverage, Position, TakerId, TradingPair, Usd}; -use crate::monitor::CfdMonitoringEvent; +use crate::monitor; use anyhow::{bail, Result}; use bdk::bitcoin::secp256k1::{SecretKey, Signature}; use bdk::bitcoin::{Address, Amount, PublicKey, Transaction}; @@ -366,7 +366,7 @@ impl Cfd { #[allow(dead_code)] pub const CET_TIMELOCK: u32 = 12; - pub fn transition_to(&self, event: CfdStateChangeEvent) -> Result { + pub fn handle(&self, event: CfdStateChangeEvent) -> Result { use CfdState::*; // TODO: Display impl @@ -376,7 +376,7 @@ impl Cfd { let new_state = match event { CfdStateChangeEvent::Monitor(event) => match event { - CfdMonitoringEvent::LockFinality(_) => match self.state.clone() { + monitor::Event::LockFinality(_) => match self.state.clone() { PendingOpen { dlc, .. } => CfdState::Open { common: CfdStateCommon { transition_timestamp: SystemTime::now(), @@ -392,7 +392,7 @@ impl Cfd { bail!("State already assumes lock finality: ignoring") } }, - CfdMonitoringEvent::CommitFinality(_) => { + monitor::Event::CommitFinality(_) => { let dlc = match self.state.clone() { Open { dlc, .. } => dlc, PendingOpen { dlc, .. } => { @@ -419,7 +419,7 @@ impl Cfd { cet_status: CetStatus::Unprepared, } } - CfdMonitoringEvent::CetTimelockExpired(_) => match self.state.clone() { + monitor::Event::CetTimelockExpired(_) => match self.state.clone() { CfdState::OpenCommitted { dlc, cet_status: CetStatus::Unprepared, @@ -481,7 +481,7 @@ impl Cfd { bail!("Refund path does not care about CET timelock expiry: ignoring") } }, - CfdMonitoringEvent::RefundTimelockExpired(_) => { + monitor::Event::RefundTimelockExpired(_) => { let dlc = match self.state.clone() { OpenCommitted { dlc, .. } => dlc, MustRefund { .. } | Refunded { .. } => { @@ -511,7 +511,7 @@ impl Cfd { dlc, } } - CfdMonitoringEvent::RefundFinality(_) => { + monitor::Event::RefundFinality(_) => { match self.state { MustRefund { .. } => (), OutgoingOrderRequest { .. } => unreachable!("taker-only state"), @@ -579,7 +579,7 @@ impl Cfd { pub enum CfdStateChangeEvent { // TODO: groupd other events by actors into enums and add them here so we can bundle all // transitions into cfd.transition_to(...) - Monitor(CfdMonitoringEvent), + Monitor(monitor::Event), } fn calculate_profit( diff --git a/daemon/src/monitor.rs b/daemon/src/monitor.rs index edbdb11..7567734 100644 --- a/daemon/src/monitor.rs +++ b/daemon/src/monitor.rs @@ -23,7 +23,7 @@ pub struct MonitorParams { impl Actor where - T: xtra::Actor + xtra::Handler, + T: xtra::Actor + xtra::Handler, { pub fn new( electrum_rpc_url: &str, @@ -54,7 +54,7 @@ where lock_subscription.wait_until_final().await.unwrap(); cfd_actor_addr - .do_send_async(CfdMonitoringEvent::LockFinality(id)) + .do_send_async(Event::LockFinality(id)) .await .unwrap(); } @@ -72,7 +72,7 @@ where commit_subscription.wait_until_final().await.unwrap(); cfd_actor_addr - .do_send_async(CfdMonitoringEvent::CommitFinality(id)) + .do_send_async(Event::CommitFinality(id)) .await .unwrap(); } @@ -88,7 +88,7 @@ where .unwrap(); cfd_actor_addr - .do_send_async(CfdMonitoringEvent::CetTimelockExpired(id)) + .do_send_async(Event::CetTimelockExpired(id)) .await .unwrap(); } @@ -105,7 +105,7 @@ where .unwrap(); cfd_actor_addr - .do_send_async(CfdMonitoringEvent::RefundTimelockExpired(id)) + .do_send_async(Event::RefundTimelockExpired(id)) .await .unwrap(); } @@ -121,7 +121,7 @@ where refund_subscription.wait_until_final().await.unwrap(); cfd_actor_addr - .do_send_async(CfdMonitoringEvent::RefundFinality(id)) + .do_send_async(Event::RefundFinality(id)) .await .unwrap(); } @@ -143,7 +143,7 @@ impl xtra::Message for StartMonitoring { } #[derive(Debug, Clone)] -pub enum CfdMonitoringEvent { +pub enum Event { LockFinality(OrderId), CommitFinality(OrderId), CetTimelockExpired(OrderId), @@ -151,21 +151,21 @@ pub enum CfdMonitoringEvent { RefundFinality(OrderId), } -impl CfdMonitoringEvent { +impl Event { pub fn order_id(&self) -> OrderId { let order_id = match self { - CfdMonitoringEvent::LockFinality(order_id) => order_id, - CfdMonitoringEvent::CommitFinality(order_id) => order_id, - CfdMonitoringEvent::CetTimelockExpired(order_id) => order_id, - CfdMonitoringEvent::RefundTimelockExpired(order_id) => order_id, - CfdMonitoringEvent::RefundFinality(order_id) => order_id, + Event::LockFinality(order_id) => order_id, + Event::CommitFinality(order_id) => order_id, + Event::CetTimelockExpired(order_id) => order_id, + Event::RefundTimelockExpired(order_id) => order_id, + Event::RefundFinality(order_id) => order_id, }; *order_id } } -impl xtra::Message for CfdMonitoringEvent { +impl xtra::Message for Event { type Result = (); } @@ -184,7 +184,7 @@ impl xtra::Actor for Actor where T: xtra::Actor {} #[async_trait] impl xtra::Handler for Actor where - T: xtra::Actor + xtra::Handler, + T: xtra::Actor + xtra::Handler, { async fn handle(&mut self, msg: StartMonitoring, _ctx: &mut xtra::Context) { log_error!(self.handle_start_monitoring(msg)); diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 445bc35..a619023 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -6,7 +6,7 @@ use crate::db::{ use crate::actors::log_error; use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId}; use crate::model::Usd; -use crate::monitor::{CfdMonitoringEvent, MonitorParams}; +use crate::monitor::MonitorParams; use crate::wallet::Wallet; use crate::wire::SetupMsg; use crate::{monitor, send_to_socket, setup_contract_actor, wire}; @@ -270,18 +270,19 @@ impl Actor { Ok(()) } - async fn handle_monitoring_event(&mut self, event: CfdMonitoringEvent) -> Result<()> { + async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { let order_id = event.order_id(); let mut conn = self.db.acquire().await?; let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - let new_state = cfd.transition_to(CfdStateChangeEvent::Monitor(event))?; + let new_state = cfd.handle(CfdStateChangeEvent::Monitor(event))?; insert_new_cfd_state_by_order_id(order_id, new_state.clone(), &mut conn).await?; // TODO: Not sure that should be done here... - // Consider sending a message to ourselves to trigger broadcasting refund? + // Consider bubbling the refund availability up to the user, and let user trigger + // transaction publication if let CfdState::MustRefund { .. } = new_state { let signed_refund_tx = cfd.refund_tx()?; let txid = self @@ -339,8 +340,8 @@ impl Handler for Actor { } #[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: CfdMonitoringEvent, _ctx: &mut Context) { +impl Handler for Actor { + async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context) { log_error!(self.handle_monitoring_event(msg)) } }