diff --git a/daemon/src/db.rs b/daemon/src/db.rs index 74a0126..f5823d3 100644 --- a/daemon/src/db.rs +++ b/daemon/src/db.rs @@ -170,7 +170,11 @@ pub async fn insert_new_cfd_state_by_order_id( // make sure that the new state is different than the current one to avoid that we save the same // state twice if mem::discriminant(&latest_cfd_state_in_db) == mem::discriminant(&new_state) { - anyhow::bail!("Cannot insert new state {} for cfd with order_id {} because it currently already is in state {}", new_state, order_id, latest_cfd_state_in_db); + tracing::warn!( + "Same state transition for cfd with order_id {}: {}", + order_id, + latest_cfd_state_in_db + ); } let cfd_state = serde_json::to_string(&new_state)?; diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index f4e0344..d0eca64 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -4,21 +4,15 @@ use crate::db::{ load_cfd_by_order_id, load_order_by_id, }; use crate::maker_inc_connections::TakerCommand; -use crate::model::cfd::{CetStatus, Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId}; +use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId}; use crate::model::{TakerId, Usd}; -use crate::monitor::{ - CetTimelockExpired, CommitFinality, LockFinality, MonitorParams, RefundFinality, - RefundTimelockExpired, -}; +use crate::monitor::MonitorParams; use crate::wallet::Wallet; use crate::wire::SetupMsg; use crate::{maker_inc_connections, monitor, setup_contract_actor}; -use anyhow::{bail, Result}; +use anyhow::Result; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; -use bdk::bitcoin::{Amount, PublicKey}; -use cfd_protocol::secp256k1_zkp::SECP256K1; -use cfd_protocol::{finalize_spend_transaction, spending_tx_sighash}; use std::time::SystemTime; use tokio::sync::{mpsc, watch}; use xtra::prelude::*; @@ -414,276 +408,28 @@ impl Actor { Ok(()) } - async fn handle_lock_finality(&mut self, msg: LockFinality) -> Result<()> { - let order_id = msg.0; - tracing::debug!(%order_id, "Lock transaction has reached finality"); - - let mut conn = self.db.acquire().await?; - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - - use CfdState::*; - let dlc = match cfd.state { - PendingOpen { dlc, .. } => dlc, - OutgoingOrderRequest { .. } => unreachable!("taker-only state"), - IncomingOrderRequest { .. } - | Accepted { .. } - | Rejected { .. } - | ContractSetup { .. } => bail!("Did not expect lock finality yet: ignoring"), - Open { .. } | OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => { - bail!("State already assumes lock finality: ignoring") - } - }; - - insert_new_cfd_state_by_order_id( - msg.0, - CfdState::Open { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc, - }, - &mut conn, - ) - .await?; - - Ok(()) - } - - async fn handle_commit_finality(&mut self, msg: CommitFinality) -> Result<()> { - let order_id = msg.0; - tracing::debug!(%order_id, "Commit transaction has reached finality"); + async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { + let order_id = event.order_id(); let mut conn = self.db.acquire().await?; let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - use CfdState::*; - let dlc = match cfd.state { - Open { dlc, .. } => dlc, - PendingOpen { dlc, .. } => { - tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); - dlc - } - OutgoingOrderRequest { .. } => unreachable!("taker-only state"), - IncomingOrderRequest { .. } - | Accepted { .. } - | Rejected { .. } - | ContractSetup { .. } => bail!("Did not expect commit finality yet: ignoring"), - OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => { - bail!("State already assumes commit finality: ignoring") - } - }; - - insert_new_cfd_state_by_order_id( - msg.0, - CfdState::OpenCommitted { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc, - cet_status: CetStatus::Unprepared, - }, - &mut conn, - ) - .await?; - - Ok(()) - } - - async fn handle_cet_timelock_expired(&mut self, msg: CetTimelockExpired) -> Result<()> { - let order_id = msg.0; - tracing::debug!(%order_id, "CET timelock has expired"); - - let mut conn = self.db.acquire().await?; - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - - use CfdState::*; - let new_state = match cfd.state { - CfdState::OpenCommitted { - dlc, - cet_status: CetStatus::Unprepared, - .. - } => CfdState::OpenCommitted { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc, - cet_status: CetStatus::TimelockExpired, - }, - CfdState::OpenCommitted { - dlc, - cet_status: CetStatus::OracleSigned(price), - .. - } => CfdState::OpenCommitted { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc, - cet_status: CetStatus::Ready(price), - }, - PendingOpen { dlc, .. } => { - tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); - CfdState::OpenCommitted { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc, - cet_status: CetStatus::TimelockExpired, - } - } - Open { dlc, .. } => { - tracing::debug!(%order_id, "Was not aware of commit TX broadcast, jumping ahead"); - CfdState::OpenCommitted { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc, - cet_status: CetStatus::TimelockExpired, - } - } - OutgoingOrderRequest { .. } => unreachable!("taker-only state"), - IncomingOrderRequest { .. } - | Accepted { .. } - | Rejected { .. } - | ContractSetup { .. } => bail!("Did not expect CET timelock expiry yet: ignoring"), - OpenCommitted { - cet_status: CetStatus::TimelockExpired, - .. - } - | OpenCommitted { - cet_status: CetStatus::Ready(_), - .. - } => bail!("State already assumes CET timelock expiry: ignoring"), - MustRefund { .. } | Refunded { .. } => { - bail!("Refund path does not care about CET timelock expiry: ignoring") - } - }; - - insert_new_cfd_state_by_order_id(msg.0, new_state, &mut conn).await?; - - Ok(()) - } - - async fn handle_refund_timelock_expired(&mut self, msg: RefundTimelockExpired) -> Result<()> { - let order_id = msg.0; - tracing::debug!(%order_id, "Refund timelock has expired"); + let new_state = cfd.handle(CfdStateChangeEvent::Monitor(event))?; - let mut conn = self.db.acquire().await?; - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + insert_new_cfd_state_by_order_id(order_id, new_state.clone(), &mut conn).await?; - use CfdState::*; - let dlc = match cfd.state { - OpenCommitted { dlc, .. } => { - insert_new_cfd_state_by_order_id( - msg.0, - MustRefund { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc: dlc.clone(), - }, - &mut conn, - ) + // TODO: Not sure that should be done here... + // Consider bubbling the refund availability up to the user, and let user trigger + // transaction publication + if let CfdState::MustRefund { .. } = new_state { + let signed_refund_tx = cfd.refund_tx()?; + let txid = self + .wallet + .try_broadcast_transaction(signed_refund_tx) .await?; - dlc - } - MustRefund { .. } | Refunded { .. } => { - bail!("State already assumes refund timelock expiry: ignoring") - } - OutgoingOrderRequest { .. } => unreachable!("taker-only state"), - IncomingOrderRequest { .. } - | Accepted { .. } - | Rejected { .. } - | ContractSetup { .. } => bail!("Did not expect refund timelock expiry yet: ignoring"), - PendingOpen { dlc, .. } => { - tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); - dlc - } - Open { dlc, .. } => { - tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead"); - dlc - } - }; - - insert_new_cfd_state_by_order_id( - msg.0, - MustRefund { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc: dlc.clone(), - }, - &mut conn, - ) - .await?; - - let sig_hash = spending_tx_sighash( - &dlc.refund.0, - &dlc.commit.2, - Amount::from_sat(dlc.commit.0.output[0].value), - ); - let our_sig = SECP256K1.sign(&sig_hash, &dlc.identity); - let our_pubkey = PublicKey::new(bdk::bitcoin::secp256k1::PublicKey::from_secret_key( - SECP256K1, - &dlc.identity, - )); - let counterparty_sig = dlc.refund.1; - let counterparty_pubkey = dlc.identity_counterparty; - let signed_refund_tx = finalize_spend_transaction( - dlc.refund.0, - &dlc.commit.2, - (our_pubkey, our_sig), - (counterparty_pubkey, counterparty_sig), - )?; - - let txid = self - .wallet - .try_broadcast_transaction(signed_refund_tx) - .await?; - - tracing::info!("Refund transaction published on chain: {}", txid); - - Ok(()) - } - - async fn handle_refund_finality(&mut self, msg: RefundFinality) -> Result<()> { - let order_id = msg.0; - tracing::debug!(%order_id, "Refund transaction has reached finality"); - - let mut conn = self.db.acquire().await?; - - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - - use CfdState::*; - match cfd.state { - MustRefund { .. } => (), - OutgoingOrderRequest { .. } => unreachable!("taker-only state"), - IncomingOrderRequest { .. } - | Accepted { .. } - | Rejected { .. } - | ContractSetup { .. } => bail!("Did not expect refund finality yet: ignoring"), - PendingOpen { .. } => { - tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); - } - Open { .. } => { - tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead"); - } - OpenCommitted { .. } => { - tracing::debug!(%order_id, "Was waiting on refund timelock expiry, jumping ahead"); - } - Refunded { .. } => bail!("State already assumes refund finality: ignoring"), - }; - - insert_new_cfd_state_by_order_id( - msg.0, - CfdState::Refunded { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - }, - &mut conn, - ) - .await?; + tracing::info!("Refund transaction published on chain: {}", txid); + } Ok(()) } @@ -739,37 +485,9 @@ impl Handler for Actor { } #[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: LockFinality, _ctx: &mut Context) { - log_error!(self.handle_lock_finality(msg)) - } -} - -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: CommitFinality, _ctx: &mut Context) { - log_error!(self.handle_commit_finality(msg)) - } -} - -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: CetTimelockExpired, _ctx: &mut Context) { - log_error!(self.handle_cet_timelock_expired(msg)) - } -} - -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: RefundTimelockExpired, _ctx: &mut Context) { - log_error!(self.handle_refund_timelock_expired(msg)) - } -} - -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: RefundFinality, _ctx: &mut Context) { - log_error!(self.handle_refund_finality(msg)) +impl Handler for Actor { + async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context) { + log_error!(self.handle_monitoring_event(msg)) } } diff --git a/daemon/src/model/cfd.rs b/daemon/src/model/cfd.rs index 93672ee..8a6a8e3 100644 --- a/daemon/src/model/cfd.rs +++ b/daemon/src/model/cfd.rs @@ -1,9 +1,11 @@ use crate::model::{Leverage, Position, TakerId, TradingPair, Usd}; -use anyhow::Result; +use crate::monitor; +use anyhow::{bail, Result}; use bdk::bitcoin::secp256k1::{SecretKey, Signature}; use bdk::bitcoin::{Address, Amount, PublicKey, Transaction}; use bdk::descriptor::Descriptor; -use cfd_protocol::secp256k1_zkp::EcdsaAdaptorSignature; +use cfd_protocol::secp256k1_zkp::{EcdsaAdaptorSignature, SECP256K1}; +use cfd_protocol::{finalize_spend_transaction, spending_tx_sighash}; use rust_decimal::Decimal; use rust_decimal_macros::dec; use serde::{Deserialize, Serialize}; @@ -363,6 +365,221 @@ impl Cfd { #[allow(dead_code)] pub const CET_TIMELOCK: u32 = 12; + + pub fn handle(&self, event: CfdStateChangeEvent) -> Result { + use CfdState::*; + + // TODO: Display impl + tracing::info!("Cfd state change event {:?}", event); + + let order_id = self.order.id; + + let new_state = match event { + CfdStateChangeEvent::Monitor(event) => match event { + monitor::Event::LockFinality(_) => match self.state.clone() { + PendingOpen { dlc, .. } => CfdState::Open { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + }, + OutgoingOrderRequest { .. } => unreachable!("taker-only state"), + IncomingOrderRequest { .. } + | Accepted { .. } + | Rejected { .. } + | ContractSetup { .. } => bail!("Did not expect lock finality yet: ignoring"), + Open { .. } | OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => { + bail!("State already assumes lock finality: ignoring") + } + }, + monitor::Event::CommitFinality(_) => { + let dlc = match self.state.clone() { + Open { dlc, .. } => dlc, + PendingOpen { dlc, .. } => { + tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); + dlc + } + OutgoingOrderRequest { .. } => unreachable!("taker-only state"), + IncomingOrderRequest { .. } + | Accepted { .. } + | Rejected { .. } + | ContractSetup { .. } => { + bail!("Did not expect commit finality yet: ignoring") + } + OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => { + bail!("State already assumes commit finality: ignoring") + } + }; + + OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::Unprepared, + } + } + monitor::Event::CetTimelockExpired(_) => match self.state.clone() { + CfdState::OpenCommitted { + dlc, + cet_status: CetStatus::Unprepared, + .. + } => CfdState::OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::TimelockExpired, + }, + CfdState::OpenCommitted { + dlc, + cet_status: CetStatus::OracleSigned(price), + .. + } => CfdState::OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::Ready(price), + }, + PendingOpen { dlc, .. } => { + tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); + CfdState::OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::TimelockExpired, + } + } + Open { dlc, .. } => { + tracing::debug!(%order_id, "Was not aware of commit TX broadcast, jumping ahead"); + CfdState::OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::TimelockExpired, + } + } + OutgoingOrderRequest { .. } => unreachable!("taker-only state"), + IncomingOrderRequest { .. } + | Accepted { .. } + | Rejected { .. } + | ContractSetup { .. } => { + bail!("Did not expect CET timelock expiry yet: ignoring") + } + OpenCommitted { + cet_status: CetStatus::TimelockExpired, + .. + } + | OpenCommitted { + cet_status: CetStatus::Ready(_), + .. + } => bail!("State already assumes CET timelock expiry: ignoring"), + MustRefund { .. } | Refunded { .. } => { + bail!("Refund path does not care about CET timelock expiry: ignoring") + } + }, + monitor::Event::RefundTimelockExpired(_) => { + let dlc = match self.state.clone() { + OpenCommitted { dlc, .. } => dlc, + MustRefund { .. } | Refunded { .. } => { + bail!("State already assumes refund timelock expiry: ignoring") + } + OutgoingOrderRequest { .. } => unreachable!("taker-only state"), + IncomingOrderRequest { .. } + | Accepted { .. } + | Rejected { .. } + | ContractSetup { .. } => { + bail!("Did not expect refund timelock expiry yet: ignoring") + } + PendingOpen { dlc, .. } => { + tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); + dlc + } + Open { dlc, .. } => { + tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead"); + dlc + } + }; + + MustRefund { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + } + } + monitor::Event::RefundFinality(_) => { + match self.state { + MustRefund { .. } => (), + OutgoingOrderRequest { .. } => unreachable!("taker-only state"), + IncomingOrderRequest { .. } + | Accepted { .. } + | Rejected { .. } + | ContractSetup { .. } => { + bail!("Did not expect refund finality yet: ignoring") + } + PendingOpen { .. } => { + tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); + } + Open { .. } => { + tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead"); + } + OpenCommitted { .. } => { + tracing::debug!(%order_id, "Was waiting on refund timelock expiry, jumping ahead"); + } + Refunded { .. } => bail!("State already assumes refund finality: ignoring"), + } + + Refunded { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + } + } + }, + }; + + Ok(new_state) + } + + pub fn refund_tx(&self) -> Result { + let dlc = if let CfdState::MustRefund { dlc, .. } = self.state.clone() { + dlc + } else { + bail!("Refund transaction can only be constructed when in state MustRefund, but we are currently in {}", self.state.clone()) + }; + + let sig_hash = spending_tx_sighash( + &dlc.refund.0, + &dlc.commit.2, + Amount::from_sat(dlc.commit.0.output[0].value), + ); + let our_sig = SECP256K1.sign(&sig_hash, &dlc.identity); + let our_pubkey = PublicKey::new(bdk::bitcoin::secp256k1::PublicKey::from_secret_key( + SECP256K1, + &dlc.identity, + )); + let counterparty_sig = dlc.refund.1; + let counterparty_pubkey = dlc.identity_counterparty; + let signed_refund_tx = finalize_spend_transaction( + dlc.refund.0, + &dlc.commit.2, + (our_pubkey, our_sig), + (counterparty_pubkey, counterparty_sig), + )?; + + Ok(signed_refund_tx) + } +} + +#[derive(Debug, Clone)] +pub enum CfdStateChangeEvent { + // TODO: groupd other events by actors into enums and add them here so we can bundle all + // transitions into cfd.transition_to(...) + Monitor(monitor::Event), } fn calculate_profit( diff --git a/daemon/src/monitor.rs b/daemon/src/monitor.rs index bdc9734..7567734 100644 --- a/daemon/src/monitor.rs +++ b/daemon/src/monitor.rs @@ -23,12 +23,7 @@ pub struct MonitorParams { impl Actor where - T: xtra::Actor - + xtra::Handler - + xtra::Handler - + xtra::Handler - + xtra::Handler - + xtra::Handler, + T: xtra::Actor + xtra::Handler, { pub fn new( electrum_rpc_url: &str, @@ -59,7 +54,7 @@ where lock_subscription.wait_until_final().await.unwrap(); cfd_actor_addr - .do_send_async(LockFinality(id)) + .do_send_async(Event::LockFinality(id)) .await .unwrap(); } @@ -77,7 +72,7 @@ where commit_subscription.wait_until_final().await.unwrap(); cfd_actor_addr - .do_send_async(CommitFinality(id)) + .do_send_async(Event::CommitFinality(id)) .await .unwrap(); } @@ -93,7 +88,7 @@ where .unwrap(); cfd_actor_addr - .do_send_async(CetTimelockExpired(id)) + .do_send_async(Event::CetTimelockExpired(id)) .await .unwrap(); } @@ -110,7 +105,7 @@ where .unwrap(); cfd_actor_addr - .do_send_async(RefundTimelockExpired(id)) + .do_send_async(Event::RefundTimelockExpired(id)) .await .unwrap(); } @@ -126,7 +121,7 @@ where refund_subscription.wait_until_final().await.unwrap(); cfd_actor_addr - .do_send_async(RefundFinality(id)) + .do_send_async(Event::RefundFinality(id)) .await .unwrap(); } @@ -147,33 +142,30 @@ impl xtra::Message for StartMonitoring { type Result = (); } -pub struct LockFinality(pub OrderId); - -impl xtra::Message for LockFinality { - type Result = (); -} - -pub struct CommitFinality(pub OrderId); - -impl xtra::Message for CommitFinality { - type Result = (); -} - -pub struct CetTimelockExpired(pub OrderId); - -impl xtra::Message for CetTimelockExpired { - type Result = (); +#[derive(Debug, Clone)] +pub enum Event { + LockFinality(OrderId), + CommitFinality(OrderId), + CetTimelockExpired(OrderId), + RefundTimelockExpired(OrderId), + RefundFinality(OrderId), } -pub struct RefundTimelockExpired(pub OrderId); - -impl xtra::Message for RefundTimelockExpired { - type Result = (); +impl Event { + pub fn order_id(&self) -> OrderId { + let order_id = match self { + Event::LockFinality(order_id) => order_id, + Event::CommitFinality(order_id) => order_id, + Event::CetTimelockExpired(order_id) => order_id, + Event::RefundTimelockExpired(order_id) => order_id, + Event::RefundFinality(order_id) => order_id, + }; + + *order_id + } } -pub struct RefundFinality(pub OrderId); - -impl xtra::Message for RefundFinality { +impl xtra::Message for Event { type Result = (); } @@ -192,12 +184,7 @@ impl xtra::Actor for Actor where T: xtra::Actor {} #[async_trait] impl xtra::Handler for Actor where - T: xtra::Actor - + xtra::Handler - + xtra::Handler - + xtra::Handler - + xtra::Handler - + xtra::Handler, + T: xtra::Actor + xtra::Handler, { async fn handle(&mut self, msg: StartMonitoring, _ctx: &mut xtra::Context) { log_error!(self.handle_start_monitoring(msg)); diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 79adfd5..406cd38 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -8,6 +8,7 @@ use model::cfd::{Cfd, Order}; use rocket::fairing::AdHoc; use rocket_db_pools::Database; use seed::Seed; +use std::collections::HashMap; use std::net::SocketAddr; use std::path::PathBuf; use std::thread::sleep; @@ -24,6 +25,7 @@ mod db; mod keypair; mod logger; mod model; +mod monitor; mod routes; mod routes_taker; mod seed; @@ -77,6 +79,7 @@ async fn main() -> Result<()> { let data_dir = opts .data_dir + .clone() .unwrap_or_else(|| std::env::current_dir().expect("unable to get cwd")); if !data_dir.exists() { @@ -162,6 +165,8 @@ async fn main() -> Result<()> { .create(None) .spawn_global(); + let (monitor_actor_address, monitor_actor_context) = xtra::Context::new(None); + let cfd_actor_inbox = taker_cfd::Actor::new( db, wallet.clone(), @@ -169,6 +174,7 @@ async fn main() -> Result<()> { cfd_feed_sender, order_feed_sender, send_to_maker, + monitor_actor_address, ) .await .unwrap() @@ -178,6 +184,12 @@ async fn main() -> Result<()> { let inc_maker_messages_actor = taker_inc_message_actor::new(read, cfd_actor_inbox.clone()); + tokio::spawn(monitor_actor_context.run(monitor::Actor::new( + &opts.electrum, + HashMap::new(), + cfd_actor_inbox.clone(), + ))); + tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); tokio::spawn(inc_maker_messages_actor); diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index e995b9f..a619023 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -4,11 +4,12 @@ use crate::db::{ }; use crate::actors::log_error; -use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId}; +use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId}; use crate::model::Usd; +use crate::monitor::MonitorParams; use crate::wallet::Wallet; use crate::wire::SetupMsg; -use crate::{send_to_socket, setup_contract_actor, wire}; +use crate::{monitor, send_to_socket, setup_contract_actor, wire}; use anyhow::Result; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; @@ -43,6 +44,7 @@ pub struct Actor { // TODO: Move the contract setup into a dedicated actor and send messages to that actor that // manages the state instead of this ugly buffer contract_setup_message_buffer: Vec, + monitor_actor: Address>, } impl Actor { @@ -53,6 +55,7 @@ impl Actor { cfd_feed_actor_inbox: watch::Sender>, order_feed_actor_inbox: watch::Sender>, send_to_maker: Address, + monitor_actor: Address>, ) -> Result { let mut conn = db.acquire().await?; cfd_feed_actor_inbox.send(load_all_cfds(&mut conn).await?)?; @@ -66,6 +69,7 @@ impl Actor { send_to_maker, current_contract_setup: None, contract_setup_message_buffer: vec![], + monitor_actor, }) } @@ -231,12 +235,63 @@ impl Actor { self.cfd_feed_actor_inbox .send(load_all_cfds(&mut conn).await?)?; - let txid = self.wallet.try_broadcast_transaction(dlc.lock.0).await?; + let txid = self + .wallet + .try_broadcast_transaction(dlc.lock.0.clone()) + .await?; tracing::info!("Lock transaction published with txid {}", txid); - // TODO: tx monitoring, once confirmed with x blocks transition the Cfd to - // Open + // TODO: It's a bit suspicious to load this just to get the + // refund timelock + let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + + let script_pubkey = dlc.address.script_pubkey(); + self.monitor_actor + .do_send_async(monitor::StartMonitoring { + id: order_id, + params: MonitorParams { + lock: (dlc.lock.0.txid(), dlc.lock.1), + commit: (dlc.commit.0.txid(), dlc.commit.2), + cets: dlc + .cets + .into_iter() + .map(|(tx, _, range)| (tx.txid(), script_pubkey.clone(), range)) + .collect(), + refund: ( + dlc.refund.0.txid(), + script_pubkey, + cfd.refund_timelock_in_blocks(), + ), + }, + }) + .await?; + + Ok(()) + } + + async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { + let order_id = event.order_id(); + + let mut conn = self.db.acquire().await?; + let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + + let new_state = cfd.handle(CfdStateChangeEvent::Monitor(event))?; + + insert_new_cfd_state_by_order_id(order_id, new_state.clone(), &mut conn).await?; + + // TODO: Not sure that should be done here... + // Consider bubbling the refund availability up to the user, and let user trigger + // transaction publication + if let CfdState::MustRefund { .. } = new_state { + let signed_refund_tx = cfd.refund_tx()?; + let txid = self + .wallet + .try_broadcast_transaction(signed_refund_tx) + .await?; + + tracing::info!("Refund transaction published on chain: {}", txid); + } Ok(()) } @@ -284,6 +339,13 @@ impl Handler for Actor { } } +#[async_trait] +impl Handler for Actor { + async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context) { + log_error!(self.handle_monitoring_event(msg)) + } +} + impl Message for TakeOffer { type Result = (); }