From 4fb84cc58f14d50788ce9933451d985a14216d76 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Mon, 27 Sep 2021 15:31:24 +1000 Subject: [PATCH] Bundle monitoring events that change cfd state This goes towards handling all cfd state transitions in one place, so it becomes easier to reason about it. Currently, state transitions are scattered over the code of the cfd actor. This change bundles the actual transition in the cfd itself. Inserting the new state into the db is still in the responsibility of the caller. note: the `monitor` import in `taker.rs` should technically not be part of this commit, but was needed because I decided that the `CfdMonitoringEvent` should live in the `monitor` actor. But since that is not part of the `Cfd` in `model` the taker needs to know about the `monitor`. --- daemon/src/maker_cfd.rs | 339 +++++----------------------------------- daemon/src/model/cfd.rs | 189 +++++++++++++++++++++- daemon/src/monitor.rs | 67 ++++---- daemon/src/taker.rs | 1 + 4 files changed, 254 insertions(+), 342 deletions(-) diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index f4e0344..9a88d54 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -4,16 +4,13 @@ use crate::db::{ load_cfd_by_order_id, load_order_by_id, }; use crate::maker_inc_connections::TakerCommand; -use crate::model::cfd::{CetStatus, Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId}; +use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId}; use crate::model::{TakerId, Usd}; -use crate::monitor::{ - CetTimelockExpired, CommitFinality, LockFinality, MonitorParams, RefundFinality, - RefundTimelockExpired, -}; +use crate::monitor::{CfdMonitoringEvent, MonitorParams}; use crate::wallet::Wallet; use crate::wire::SetupMsg; use crate::{maker_inc_connections, monitor, setup_contract_actor}; -use anyhow::{bail, Result}; +use anyhow::Result; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::{Amount, PublicKey}; @@ -414,276 +411,44 @@ impl Actor { Ok(()) } - async fn handle_lock_finality(&mut self, msg: LockFinality) -> Result<()> { - let order_id = msg.0; - tracing::debug!(%order_id, "Lock transaction has reached finality"); - - let mut conn = self.db.acquire().await?; - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - - use CfdState::*; - let dlc = match cfd.state { - PendingOpen { dlc, .. } => dlc, - OutgoingOrderRequest { .. } => unreachable!("taker-only state"), - IncomingOrderRequest { .. } - | Accepted { .. } - | Rejected { .. } - | ContractSetup { .. } => bail!("Did not expect lock finality yet: ignoring"), - Open { .. } | OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => { - bail!("State already assumes lock finality: ignoring") - } - }; - - insert_new_cfd_state_by_order_id( - msg.0, - CfdState::Open { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc, - }, - &mut conn, - ) - .await?; - - Ok(()) - } - - async fn handle_commit_finality(&mut self, msg: CommitFinality) -> Result<()> { - let order_id = msg.0; - tracing::debug!(%order_id, "Commit transaction has reached finality"); - - let mut conn = self.db.acquire().await?; - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - - use CfdState::*; - let dlc = match cfd.state { - Open { dlc, .. } => dlc, - PendingOpen { dlc, .. } => { - tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); - dlc - } - OutgoingOrderRequest { .. } => unreachable!("taker-only state"), - IncomingOrderRequest { .. } - | Accepted { .. } - | Rejected { .. } - | ContractSetup { .. } => bail!("Did not expect commit finality yet: ignoring"), - OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => { - bail!("State already assumes commit finality: ignoring") - } - }; - - insert_new_cfd_state_by_order_id( - msg.0, - CfdState::OpenCommitted { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc, - cet_status: CetStatus::Unprepared, - }, - &mut conn, - ) - .await?; - - Ok(()) - } - - async fn handle_cet_timelock_expired(&mut self, msg: CetTimelockExpired) -> Result<()> { - let order_id = msg.0; - tracing::debug!(%order_id, "CET timelock has expired"); - - let mut conn = self.db.acquire().await?; - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - - use CfdState::*; - let new_state = match cfd.state { - CfdState::OpenCommitted { - dlc, - cet_status: CetStatus::Unprepared, - .. - } => CfdState::OpenCommitted { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc, - cet_status: CetStatus::TimelockExpired, - }, - CfdState::OpenCommitted { - dlc, - cet_status: CetStatus::OracleSigned(price), - .. - } => CfdState::OpenCommitted { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc, - cet_status: CetStatus::Ready(price), - }, - PendingOpen { dlc, .. } => { - tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); - CfdState::OpenCommitted { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc, - cet_status: CetStatus::TimelockExpired, - } - } - Open { dlc, .. } => { - tracing::debug!(%order_id, "Was not aware of commit TX broadcast, jumping ahead"); - CfdState::OpenCommitted { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc, - cet_status: CetStatus::TimelockExpired, - } - } - OutgoingOrderRequest { .. } => unreachable!("taker-only state"), - IncomingOrderRequest { .. } - | Accepted { .. } - | Rejected { .. } - | ContractSetup { .. } => bail!("Did not expect CET timelock expiry yet: ignoring"), - OpenCommitted { - cet_status: CetStatus::TimelockExpired, - .. - } - | OpenCommitted { - cet_status: CetStatus::Ready(_), - .. - } => bail!("State already assumes CET timelock expiry: ignoring"), - MustRefund { .. } | Refunded { .. } => { - bail!("Refund path does not care about CET timelock expiry: ignoring") - } - }; - - insert_new_cfd_state_by_order_id(msg.0, new_state, &mut conn).await?; - - Ok(()) - } - - async fn handle_refund_timelock_expired(&mut self, msg: RefundTimelockExpired) -> Result<()> { - let order_id = msg.0; - tracing::debug!(%order_id, "Refund timelock has expired"); + async fn handle_monitoring_event(&mut self, event: CfdMonitoringEvent) -> Result<()> { + let order_id = event.order_id(); let mut conn = self.db.acquire().await?; let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - use CfdState::*; - let dlc = match cfd.state { - OpenCommitted { dlc, .. } => { - insert_new_cfd_state_by_order_id( - msg.0, - MustRefund { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc: dlc.clone(), - }, - &mut conn, - ) + let new_state = cfd.transition_to(CfdStateChangeEvent::Monitor(event))?; + + insert_new_cfd_state_by_order_id(order_id, new_state.clone(), &mut conn).await?; + + // TODO: Consider sending a message to ourselves to trigger broadcasting refund? + if let CfdState::MustRefund { dlc, .. } = new_state { + let sig_hash = spending_tx_sighash( + &dlc.refund.0, + &dlc.commit.2, + Amount::from_sat(dlc.commit.0.output[0].value), + ); + let our_sig = SECP256K1.sign(&sig_hash, &dlc.identity); + let our_pubkey = PublicKey::new(bdk::bitcoin::secp256k1::PublicKey::from_secret_key( + SECP256K1, + &dlc.identity, + )); + let counterparty_sig = dlc.refund.1; + let counterparty_pubkey = dlc.identity_counterparty; + let signed_refund_tx = finalize_spend_transaction( + dlc.refund.0, + &dlc.commit.2, + (our_pubkey, our_sig), + (counterparty_pubkey, counterparty_sig), + )?; + + let txid = self + .wallet + .try_broadcast_transaction(signed_refund_tx) .await?; - dlc - } - MustRefund { .. } | Refunded { .. } => { - bail!("State already assumes refund timelock expiry: ignoring") - } - OutgoingOrderRequest { .. } => unreachable!("taker-only state"), - IncomingOrderRequest { .. } - | Accepted { .. } - | Rejected { .. } - | ContractSetup { .. } => bail!("Did not expect refund timelock expiry yet: ignoring"), - PendingOpen { dlc, .. } => { - tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); - dlc - } - Open { dlc, .. } => { - tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead"); - dlc - } - }; - - insert_new_cfd_state_by_order_id( - msg.0, - MustRefund { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc: dlc.clone(), - }, - &mut conn, - ) - .await?; - - let sig_hash = spending_tx_sighash( - &dlc.refund.0, - &dlc.commit.2, - Amount::from_sat(dlc.commit.0.output[0].value), - ); - let our_sig = SECP256K1.sign(&sig_hash, &dlc.identity); - let our_pubkey = PublicKey::new(bdk::bitcoin::secp256k1::PublicKey::from_secret_key( - SECP256K1, - &dlc.identity, - )); - let counterparty_sig = dlc.refund.1; - let counterparty_pubkey = dlc.identity_counterparty; - let signed_refund_tx = finalize_spend_transaction( - dlc.refund.0, - &dlc.commit.2, - (our_pubkey, our_sig), - (counterparty_pubkey, counterparty_sig), - )?; - - let txid = self - .wallet - .try_broadcast_transaction(signed_refund_tx) - .await?; - - tracing::info!("Refund transaction published on chain: {}", txid); - - Ok(()) - } - - async fn handle_refund_finality(&mut self, msg: RefundFinality) -> Result<()> { - let order_id = msg.0; - tracing::debug!(%order_id, "Refund transaction has reached finality"); - - let mut conn = self.db.acquire().await?; - - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - - use CfdState::*; - match cfd.state { - MustRefund { .. } => (), - OutgoingOrderRequest { .. } => unreachable!("taker-only state"), - IncomingOrderRequest { .. } - | Accepted { .. } - | Rejected { .. } - | ContractSetup { .. } => bail!("Did not expect refund finality yet: ignoring"), - PendingOpen { .. } => { - tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); - } - Open { .. } => { - tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead"); - } - OpenCommitted { .. } => { - tracing::debug!(%order_id, "Was waiting on refund timelock expiry, jumping ahead"); - } - Refunded { .. } => bail!("State already assumes refund finality: ignoring"), - }; - - insert_new_cfd_state_by_order_id( - msg.0, - CfdState::Refunded { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - }, - &mut conn, - ) - .await?; + tracing::info!("Refund transaction published on chain: {}", txid); + } Ok(()) } @@ -739,37 +504,9 @@ impl Handler for Actor { } #[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: LockFinality, _ctx: &mut Context) { - log_error!(self.handle_lock_finality(msg)) - } -} - -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: CommitFinality, _ctx: &mut Context) { - log_error!(self.handle_commit_finality(msg)) - } -} - -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: CetTimelockExpired, _ctx: &mut Context) { - log_error!(self.handle_cet_timelock_expired(msg)) - } -} - -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: RefundTimelockExpired, _ctx: &mut Context) { - log_error!(self.handle_refund_timelock_expired(msg)) - } -} - -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: RefundFinality, _ctx: &mut Context) { - log_error!(self.handle_refund_finality(msg)) +impl Handler for Actor { + async fn handle(&mut self, msg: CfdMonitoringEvent, _ctx: &mut Context) { + log_error!(self.handle_monitoring_event(msg)) } } diff --git a/daemon/src/model/cfd.rs b/daemon/src/model/cfd.rs index 93672ee..c399dde 100644 --- a/daemon/src/model/cfd.rs +++ b/daemon/src/model/cfd.rs @@ -1,5 +1,6 @@ use crate::model::{Leverage, Position, TakerId, TradingPair, Usd}; -use anyhow::Result; +use crate::monitor::CfdMonitoringEvent; +use anyhow::{bail, Result}; use bdk::bitcoin::secp256k1::{SecretKey, Signature}; use bdk::bitcoin::{Address, Amount, PublicKey, Transaction}; use bdk::descriptor::Descriptor; @@ -363,6 +364,192 @@ impl Cfd { #[allow(dead_code)] pub const CET_TIMELOCK: u32 = 12; + + pub fn transition_to(&self, event: CfdStateChangeEvent) -> Result { + use CfdState::*; + + // TODO: Display impl + tracing::info!("Cfd state change event {:?}", event); + + let order_id = self.order.id; + + let new_state = match event { + CfdStateChangeEvent::Monitor(event) => match event { + CfdMonitoringEvent::LockFinality(_) => match self.state.clone() { + PendingOpen { dlc, .. } => CfdState::Open { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + }, + OutgoingOrderRequest { .. } => unreachable!("taker-only state"), + IncomingOrderRequest { .. } + | Accepted { .. } + | Rejected { .. } + | ContractSetup { .. } => bail!("Did not expect lock finality yet: ignoring"), + Open { .. } | OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => { + bail!("State already assumes lock finality: ignoring") + } + }, + CfdMonitoringEvent::CommitFinality(_) => { + let dlc = match self.state.clone() { + Open { dlc, .. } => dlc, + PendingOpen { dlc, .. } => { + tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); + dlc + } + OutgoingOrderRequest { .. } => unreachable!("taker-only state"), + IncomingOrderRequest { .. } + | Accepted { .. } + | Rejected { .. } + | ContractSetup { .. } => { + bail!("Did not expect commit finality yet: ignoring") + } + OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => { + bail!("State already assumes commit finality: ignoring") + } + }; + + OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::Unprepared, + } + } + CfdMonitoringEvent::CetTimelockExpired(_) => match self.state.clone() { + CfdState::OpenCommitted { + dlc, + cet_status: CetStatus::Unprepared, + .. + } => CfdState::OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::TimelockExpired, + }, + CfdState::OpenCommitted { + dlc, + cet_status: CetStatus::OracleSigned(price), + .. + } => CfdState::OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::Ready(price), + }, + PendingOpen { dlc, .. } => { + tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); + CfdState::OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::TimelockExpired, + } + } + Open { dlc, .. } => { + tracing::debug!(%order_id, "Was not aware of commit TX broadcast, jumping ahead"); + CfdState::OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::TimelockExpired, + } + } + OutgoingOrderRequest { .. } => unreachable!("taker-only state"), + IncomingOrderRequest { .. } + | Accepted { .. } + | Rejected { .. } + | ContractSetup { .. } => { + bail!("Did not expect CET timelock expiry yet: ignoring") + } + OpenCommitted { + cet_status: CetStatus::TimelockExpired, + .. + } + | OpenCommitted { + cet_status: CetStatus::Ready(_), + .. + } => bail!("State already assumes CET timelock expiry: ignoring"), + MustRefund { .. } | Refunded { .. } => { + bail!("Refund path does not care about CET timelock expiry: ignoring") + } + }, + CfdMonitoringEvent::RefundTimelockExpired(_) => { + let dlc = match self.state.clone() { + OpenCommitted { dlc, .. } => dlc, + MustRefund { .. } | Refunded { .. } => { + bail!("State already assumes refund timelock expiry: ignoring") + } + OutgoingOrderRequest { .. } => unreachable!("taker-only state"), + IncomingOrderRequest { .. } + | Accepted { .. } + | Rejected { .. } + | ContractSetup { .. } => { + bail!("Did not expect refund timelock expiry yet: ignoring") + } + PendingOpen { dlc, .. } => { + tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); + dlc + } + Open { dlc, .. } => { + tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead"); + dlc + } + }; + + MustRefund { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + } + } + CfdMonitoringEvent::RefundFinality(_) => { + match self.state { + MustRefund { .. } => (), + OutgoingOrderRequest { .. } => unreachable!("taker-only state"), + IncomingOrderRequest { .. } + | Accepted { .. } + | Rejected { .. } + | ContractSetup { .. } => { + bail!("Did not expect refund finality yet: ignoring") + } + PendingOpen { .. } => { + tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); + } + Open { .. } => { + tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead"); + } + OpenCommitted { .. } => { + tracing::debug!(%order_id, "Was waiting on refund timelock expiry, jumping ahead"); + } + Refunded { .. } => bail!("State already assumes refund finality: ignoring"), + } + + Refunded { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + } + } + }, + }; + + Ok(new_state) + } +} + +#[derive(Debug, Clone)] +pub enum CfdStateChangeEvent { + // TODO: groupd other events by actors into enums and add them here so we can bundle all + // transitions into cfd.transition_to(...) + Monitor(CfdMonitoringEvent), } fn calculate_profit( diff --git a/daemon/src/monitor.rs b/daemon/src/monitor.rs index bdc9734..edbdb11 100644 --- a/daemon/src/monitor.rs +++ b/daemon/src/monitor.rs @@ -23,12 +23,7 @@ pub struct MonitorParams { impl Actor where - T: xtra::Actor - + xtra::Handler - + xtra::Handler - + xtra::Handler - + xtra::Handler - + xtra::Handler, + T: xtra::Actor + xtra::Handler, { pub fn new( electrum_rpc_url: &str, @@ -59,7 +54,7 @@ where lock_subscription.wait_until_final().await.unwrap(); cfd_actor_addr - .do_send_async(LockFinality(id)) + .do_send_async(CfdMonitoringEvent::LockFinality(id)) .await .unwrap(); } @@ -77,7 +72,7 @@ where commit_subscription.wait_until_final().await.unwrap(); cfd_actor_addr - .do_send_async(CommitFinality(id)) + .do_send_async(CfdMonitoringEvent::CommitFinality(id)) .await .unwrap(); } @@ -93,7 +88,7 @@ where .unwrap(); cfd_actor_addr - .do_send_async(CetTimelockExpired(id)) + .do_send_async(CfdMonitoringEvent::CetTimelockExpired(id)) .await .unwrap(); } @@ -110,7 +105,7 @@ where .unwrap(); cfd_actor_addr - .do_send_async(RefundTimelockExpired(id)) + .do_send_async(CfdMonitoringEvent::RefundTimelockExpired(id)) .await .unwrap(); } @@ -126,7 +121,7 @@ where refund_subscription.wait_until_final().await.unwrap(); cfd_actor_addr - .do_send_async(RefundFinality(id)) + .do_send_async(CfdMonitoringEvent::RefundFinality(id)) .await .unwrap(); } @@ -147,33 +142,30 @@ impl xtra::Message for StartMonitoring { type Result = (); } -pub struct LockFinality(pub OrderId); - -impl xtra::Message for LockFinality { - type Result = (); -} - -pub struct CommitFinality(pub OrderId); - -impl xtra::Message for CommitFinality { - type Result = (); -} - -pub struct CetTimelockExpired(pub OrderId); - -impl xtra::Message for CetTimelockExpired { - type Result = (); +#[derive(Debug, Clone)] +pub enum CfdMonitoringEvent { + LockFinality(OrderId), + CommitFinality(OrderId), + CetTimelockExpired(OrderId), + RefundTimelockExpired(OrderId), + RefundFinality(OrderId), } -pub struct RefundTimelockExpired(pub OrderId); - -impl xtra::Message for RefundTimelockExpired { - type Result = (); +impl CfdMonitoringEvent { + pub fn order_id(&self) -> OrderId { + let order_id = match self { + CfdMonitoringEvent::LockFinality(order_id) => order_id, + CfdMonitoringEvent::CommitFinality(order_id) => order_id, + CfdMonitoringEvent::CetTimelockExpired(order_id) => order_id, + CfdMonitoringEvent::RefundTimelockExpired(order_id) => order_id, + CfdMonitoringEvent::RefundFinality(order_id) => order_id, + }; + + *order_id + } } -pub struct RefundFinality(pub OrderId); - -impl xtra::Message for RefundFinality { +impl xtra::Message for CfdMonitoringEvent { type Result = (); } @@ -192,12 +184,7 @@ impl xtra::Actor for Actor where T: xtra::Actor {} #[async_trait] impl xtra::Handler for Actor where - T: xtra::Actor - + xtra::Handler - + xtra::Handler - + xtra::Handler - + xtra::Handler - + xtra::Handler, + T: xtra::Actor + xtra::Handler, { async fn handle(&mut self, msg: StartMonitoring, _ctx: &mut xtra::Context) { log_error!(self.handle_start_monitoring(msg)); diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 79adfd5..32b6148 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -24,6 +24,7 @@ mod db; mod keypair; mod logger; mod model; +mod monitor; mod routes; mod routes_taker; mod seed;