diff --git a/daemon/src/cleanup.rs b/daemon/src/cleanup.rs new file mode 100644 index 0000000..6ae528c --- /dev/null +++ b/daemon/src/cleanup.rs @@ -0,0 +1,27 @@ +use crate::db::{insert_new_cfd_state_by_order_id, load_all_cfds}; +use crate::model::cfd::{Cfd, CfdState, CfdStateCommon}; +use anyhow::Result; +use sqlx::SqlitePool; +use std::time::SystemTime; + +pub async fn transition_non_continue_cfds_to_setup_failed(db: SqlitePool) -> Result<()> { + let mut conn = db.acquire().await?; + + let cfds = load_all_cfds(&mut conn).await?; + + for cfd in cfds.iter().filter(|cfd| Cfd::is_cleanup(cfd)) { + insert_new_cfd_state_by_order_id( + cfd.order.id, + CfdState::SetupFailed { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + info: format!("Was in state {} which cannot be continued.", cfd.state), + }, + &mut conn, + ) + .await?; + } + + Ok(()) +} diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 3444be9..d98b6a9 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -1,5 +1,6 @@ use crate::auth::MAKER_USERNAME; use crate::cfd_feed::CfdFeed; +use crate::db::load_all_cfds; use crate::seed::Seed; use crate::wallet::Wallet; use anyhow::{Context, Result}; @@ -10,7 +11,6 @@ use model::cfd::Order; use model::WalletInfo; use rocket::fairing::AdHoc; use rocket_db_pools::Database; -use std::collections::HashMap; use std::path::PathBuf; use std::task::Poll; use tokio::sync::watch; @@ -22,6 +22,7 @@ mod actors; mod auth; mod bitmex_price_feed; mod cfd_feed; +mod cleanup; mod db; mod keypair; mod logger; @@ -150,11 +151,17 @@ async fn main() -> Result<()> { None => return Err(rocket), }; + cleanup::transition_non_continue_cfds_to_setup_failed(db.clone()) + .await + .unwrap(); + let (maker_inc_connections_address, maker_inc_connections_context) = xtra::Context::new(None); let (monitor_actor_address, monitor_actor_context) = xtra::Context::new(None); + let mut conn = db.acquire().await.unwrap(); + let cfds = load_all_cfds(&mut conn).await.unwrap(); let cfd_maker_actor_inbox = maker_cfd::Actor::new( db, wallet.clone(), @@ -163,6 +170,7 @@ async fn main() -> Result<()> { order_feed_sender, maker_inc_connections_address.clone(), monitor_actor_address, + cfds.clone(), ) .await .unwrap() @@ -174,11 +182,9 @@ async fn main() -> Result<()> { cfd_maker_actor_inbox.clone(), )), ); - tokio::spawn(monitor_actor_context.run(monitor::Actor::new( - &opts.electrum, - HashMap::new(), - cfd_maker_actor_inbox.clone(), - ))); + tokio::spawn(monitor_actor_context.run( + monitor::Actor::new(&opts.electrum, cfd_maker_actor_inbox.clone(), cfds).await, + )); let listener_stream = futures::stream::poll_fn(move |ctx| { let message = match futures::ready!(listener.poll_accept(ctx)) { diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index bacf569..562345b 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -67,6 +67,7 @@ enum SetupState { } impl Actor { + #[allow(clippy::too_many_arguments)] pub async fn new( db: sqlx::SqlitePool, wallet: Wallet, @@ -75,12 +76,26 @@ impl Actor { order_feed_sender: watch::Sender>, takers: Address, monitor_actor: Address>, + cfds: Vec, ) -> Result { let mut conn = db.acquire().await?; // populate the CFD feed with existing CFDs cfd_feed.update(&mut conn).await?; + for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) { + let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?; + + tracing::info!("Lock transaction published with txid {}", txid); + } + + for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) { + let signed_refund_tx = cfd.refund_tx()?; + let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?; + + tracing::info!("Refund transaction published on chain: {}", txid); + } + Ok(Self { db, wallet, @@ -186,24 +201,10 @@ impl Actor { // refund timelock let cfd = load_cfd_by_order_id(msg.order_id, &mut conn).await?; - let script_pubkey = dlc.address.script_pubkey(); self.monitor_actor .do_send_async(monitor::StartMonitoring { id: msg.order_id, - params: MonitorParams { - lock: (dlc.lock.0.txid(), dlc.lock.1), - commit: (dlc.commit.0.txid(), dlc.commit.2), - cets: dlc - .cets - .into_iter() - .map(|(tx, _, range)| (tx.txid(), script_pubkey.clone(), range)) - .collect(), - refund: ( - dlc.refund.0.txid(), - script_pubkey, - cfd.refund_timelock_in_blocks(), - ), - }, + params: MonitorParams::from_dlc_and_timelocks(dlc, cfd.refund_timelock_in_blocks()), }) .await?; diff --git a/daemon/src/model/cfd.rs b/daemon/src/model/cfd.rs index 8a6a8e3..147d50e 100644 --- a/daemon/src/model/cfd.rs +++ b/daemon/src/model/cfd.rs @@ -162,6 +162,7 @@ pub enum CfdState { /// The maker rejected the CFD order. /// /// This state applies to taker and maker. + /// This is a final state. Rejected { common: CfdStateCommon, }, @@ -206,9 +207,23 @@ pub enum CfdState { dlc: Dlc, }, + /// The Cfd was refunded and the refund transaction reached finality + /// + /// This state applies to taker and maker. + /// This is a final state. Refunded { common: CfdStateCommon, }, + + /// The Cfd was in a state that could not be continued after the application got interrupted + /// + /// This state applies to taker and maker. + /// This is a final state. + /// It is safe to remove Cfds in this state from the database. + SetupFailed { + common: CfdStateCommon, + info: String, + }, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] @@ -233,6 +248,7 @@ impl CfdState { CfdState::OpenCommitted { common, .. } => common, CfdState::MustRefund { common, .. } => common, CfdState::Refunded { common, .. } => common, + CfdState::SetupFailed { common, .. } => common, }; *common @@ -276,6 +292,9 @@ impl Display for CfdState { CfdState::Refunded { .. } => { write!(f, "Refunded") } + CfdState::SetupFailed { .. } => { + write!(f, "Safely Aborted") + } } } } @@ -384,13 +403,15 @@ impl Cfd { dlc, }, OutgoingOrderRequest { .. } => unreachable!("taker-only state"), - IncomingOrderRequest { .. } - | Accepted { .. } - | Rejected { .. } - | ContractSetup { .. } => bail!("Did not expect lock finality yet: ignoring"), - Open { .. } | OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => { + IncomingOrderRequest { .. } | Accepted { .. } | ContractSetup { .. } => { + bail!("Did not expect lock finality yet: ignoring") + } + Open { .. } | OpenCommitted { .. } | MustRefund { .. } => { bail!("State already assumes lock finality: ignoring") } + Rejected { .. } | Refunded { .. } | SetupFailed { .. } => { + bail!("The cfd is already in final state {}", self.state) + } }, monitor::Event::CommitFinality(_) => { let dlc = match self.state.clone() { @@ -400,15 +421,15 @@ impl Cfd { dlc } OutgoingOrderRequest { .. } => unreachable!("taker-only state"), - IncomingOrderRequest { .. } - | Accepted { .. } - | Rejected { .. } - | ContractSetup { .. } => { + IncomingOrderRequest { .. } | Accepted { .. } | ContractSetup { .. } => { bail!("Did not expect commit finality yet: ignoring") } - OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => { + OpenCommitted { .. } | MustRefund { .. } => { bail!("State already assumes commit finality: ignoring") } + Rejected { .. } | Refunded { .. } | SetupFailed { .. } => { + bail!("The cfd is already in final state {}", self.state) + } }; OpenCommitted { @@ -463,10 +484,7 @@ impl Cfd { } } OutgoingOrderRequest { .. } => unreachable!("taker-only state"), - IncomingOrderRequest { .. } - | Accepted { .. } - | Rejected { .. } - | ContractSetup { .. } => { + IncomingOrderRequest { .. } | Accepted { .. } | ContractSetup { .. } => { bail!("Did not expect CET timelock expiry yet: ignoring") } OpenCommitted { @@ -477,21 +495,21 @@ impl Cfd { cet_status: CetStatus::Ready(_), .. } => bail!("State already assumes CET timelock expiry: ignoring"), - MustRefund { .. } | Refunded { .. } => { + MustRefund { .. } => { bail!("Refund path does not care about CET timelock expiry: ignoring") } + Rejected { .. } | Refunded { .. } | SetupFailed { .. } => { + bail!("The cfd is already in final state {}", self.state) + } }, monitor::Event::RefundTimelockExpired(_) => { let dlc = match self.state.clone() { OpenCommitted { dlc, .. } => dlc, - MustRefund { .. } | Refunded { .. } => { + MustRefund { .. } => { bail!("State already assumes refund timelock expiry: ignoring") } OutgoingOrderRequest { .. } => unreachable!("taker-only state"), - IncomingOrderRequest { .. } - | Accepted { .. } - | Rejected { .. } - | ContractSetup { .. } => { + IncomingOrderRequest { .. } | Accepted { .. } | ContractSetup { .. } => { bail!("Did not expect refund timelock expiry yet: ignoring") } PendingOpen { dlc, .. } => { @@ -502,6 +520,9 @@ impl Cfd { tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead"); dlc } + Rejected { .. } | Refunded { .. } | SetupFailed { .. } => { + bail!("The cfd is already in final state {}", self.state) + } }; MustRefund { @@ -515,10 +536,7 @@ impl Cfd { match self.state { MustRefund { .. } => (), OutgoingOrderRequest { .. } => unreachable!("taker-only state"), - IncomingOrderRequest { .. } - | Accepted { .. } - | Rejected { .. } - | ContractSetup { .. } => { + IncomingOrderRequest { .. } | Accepted { .. } | ContractSetup { .. } => { bail!("Did not expect refund finality yet: ignoring") } PendingOpen { .. } => { @@ -530,7 +548,9 @@ impl Cfd { OpenCommitted { .. } => { tracing::debug!(%order_id, "Was waiting on refund timelock expiry, jumping ahead"); } - Refunded { .. } => bail!("State already assumes refund finality: ignoring"), + Rejected { .. } | Refunded { .. } | SetupFailed { .. } => { + bail!("The cfd is already in final state {}", self.state) + } } Refunded { @@ -573,6 +593,28 @@ impl Cfd { Ok(signed_refund_tx) } + + pub fn pending_open_dlc(&self) -> Option { + if let CfdState::PendingOpen { dlc, .. } = self.state.clone() { + Some(dlc) + } else { + None + } + } + + pub fn is_must_refund(&self) -> bool { + matches!(self.state.clone(), CfdState::MustRefund { .. }) + } + + pub fn is_cleanup(&self) -> bool { + matches!( + self.state.clone(), + CfdState::OutgoingOrderRequest { .. } + | CfdState::IncomingOrderRequest { .. } + | CfdState::Accepted { .. } + | CfdState::ContractSetup { .. } + ) + } } #[derive(Debug, Clone)] diff --git a/daemon/src/monitor.rs b/daemon/src/monitor.rs index 7567734..1057d49 100644 --- a/daemon/src/monitor.rs +++ b/daemon/src/monitor.rs @@ -1,5 +1,6 @@ use crate::actors::log_error; -use crate::model::cfd::{Cfd, OrderId}; +use crate::model::cfd::{CetStatus, Cfd, CfdState, Dlc, OrderId}; +use crate::monitor::subscription::Subscription; use anyhow::Result; use async_trait::async_trait; use bdk::bitcoin::{PublicKey, Script, Txid}; @@ -15,35 +16,142 @@ const FINALITY_CONFIRMATIONS: u32 = 1; #[derive(Clone)] pub struct MonitorParams { - pub lock: (Txid, Descriptor), - pub commit: (Txid, Descriptor), - pub cets: Vec<(Txid, Script, RangeInclusive)>, - pub refund: (Txid, Script, u32), + lock: (Txid, Descriptor), + commit: (Txid, Descriptor), + cets: Vec<(Txid, Script, RangeInclusive)>, + refund: (Txid, Script, u32), +} + +impl MonitorParams { + pub fn from_dlc_and_timelocks(dlc: Dlc, refund_timelock_in_blocks: u32) -> Self { + let script_pubkey = dlc.address.script_pubkey(); + MonitorParams { + lock: (dlc.lock.0.txid(), dlc.lock.1), + commit: (dlc.commit.0.txid(), dlc.commit.2), + cets: dlc + .cets + .into_iter() + .map(|(tx, _, range)| (tx.txid(), script_pubkey.clone(), range)) + .collect(), + refund: ( + dlc.refund.0.txid(), + script_pubkey, + refund_timelock_in_blocks, + ), + } + } } impl Actor where T: xtra::Actor + xtra::Handler, { - pub fn new( + pub async fn new( electrum_rpc_url: &str, - cfds: HashMap, cfd_actor_addr: xtra::Address, + cfds: Vec, ) -> Self { let monitor = Monitor::new(electrum_rpc_url, FINALITY_CONFIRMATIONS).unwrap(); - Self { + let mut actor = Self { monitor, - cfds, + cfds: HashMap::new(), cfd_actor_addr, + }; + + for cfd in cfds { + match cfd.state.clone() { + // In PendingOpen we know the complete dlc setup and assume that the lock transaction will be published + CfdState::PendingOpen { dlc, .. } => { + let params = MonitorParams::from_dlc_and_timelocks(dlc.clone(), cfd.refund_timelock_in_blocks()); + actor.cfds.insert(cfd.order.id, params.clone()); + actor.monitor_all(¶ms, cfd.order.id).await; + } + CfdState::Open { dlc, .. } => { + let params = MonitorParams::from_dlc_and_timelocks(dlc.clone(), cfd.refund_timelock_in_blocks()); + actor.cfds.insert(cfd.order.id, params.clone()); + + actor.monitor_commit_finality_and_timelocks(¶ms, cfd.order.id).await; + actor.monitor_refund_finality( params.clone(),cfd.order.id).await; + } + CfdState::OpenCommitted { dlc, cet_status, .. } => { + let params = MonitorParams::from_dlc_and_timelocks(dlc.clone(), cfd.refund_timelock_in_blocks()); + actor.cfds.insert(cfd.order.id, params.clone()); + + let commit_subscription = actor + .monitor + .subscribe_to((params.commit.0, params.commit.1.script_pubkey())) + .await; + + match cet_status { + CetStatus::Unprepared + | CetStatus::OracleSigned(_) => { + actor.monitor_commit_cet_timelock(cfd.order.id, &commit_subscription).await; + actor.monitor_commit_refund_timelock(¶ms, cfd.order.id, &commit_subscription).await; + actor.monitor_refund_finality( params.clone(),cfd.order.id).await; + } + CetStatus::TimelockExpired => { + actor.monitor_commit_refund_timelock(¶ms, cfd.order.id, &commit_subscription).await; + actor.monitor_refund_finality( params.clone(),cfd.order.id).await; + } + CetStatus::Ready(_price) => { + // TODO: monitor CET finality + + actor.monitor_commit_refund_timelock(¶ms, cfd.order.id, &commit_subscription).await; + actor.monitor_refund_finality( params.clone(),cfd.order.id).await; + } + } + } + CfdState::MustRefund { dlc, .. } => { + // TODO: CET monitoring (?) - note: would require to add CetStatus information to MustRefund + + let params = MonitorParams::from_dlc_and_timelocks(dlc.clone(), cfd.refund_timelock_in_blocks()); + actor.cfds.insert(cfd.order.id, params.clone()); + + let commit_subscription = actor + .monitor + .subscribe_to((params.commit.0, params.commit.1.script_pubkey())) + .await; + + actor.monitor_commit_refund_timelock(¶ms, cfd.order.id, &commit_subscription).await; + actor.monitor_refund_finality( params.clone(),cfd.order.id).await; + } + + // too early to monitor + CfdState::OutgoingOrderRequest { .. } + | CfdState::IncomingOrderRequest { .. } + | CfdState::Accepted { .. } + | CfdState::ContractSetup { .. } + + // final states + | CfdState::Rejected { .. } + | CfdState::Refunded { .. } + | CfdState::SetupFailed { .. } => () + } } + + actor } async fn handle_start_monitoring(&mut self, msg: StartMonitoring) -> Result<()> { let StartMonitoring { id, params } = msg; self.cfds.insert(id, params.clone()); + self.monitor_all(¶ms, id).await; + Ok(()) + } + + async fn monitor_all(&self, params: &MonitorParams, order_id: OrderId) { + self.monitor_lock_finality(params, order_id).await; + + self.monitor_commit_finality_and_timelocks(params, order_id) + .await; + + self.monitor_refund_finality(params.clone(), order_id).await; + } + + async fn monitor_lock_finality(&self, params: &MonitorParams, order_id: OrderId) { tokio::spawn({ let cfd_actor_addr = self.cfd_actor_addr.clone(); let lock_subscription = self @@ -54,17 +162,32 @@ where lock_subscription.wait_until_final().await.unwrap(); cfd_actor_addr - .do_send_async(Event::LockFinality(id)) + .do_send_async(Event::LockFinality(order_id)) .await .unwrap(); } }); + } + async fn monitor_commit_finality_and_timelocks( + &self, + params: &MonitorParams, + order_id: OrderId, + ) { let commit_subscription = self .monitor .subscribe_to((params.commit.0, params.commit.1.script_pubkey())) .await; + self.monitor_commit_finality(order_id, &commit_subscription) + .await; + self.monitor_commit_cet_timelock(order_id, &commit_subscription) + .await; + self.monitor_commit_refund_timelock(params, order_id, &commit_subscription) + .await; + } + + async fn monitor_commit_finality(&self, order_id: OrderId, commit_subscription: &Subscription) { tokio::spawn({ let cfd_actor_addr = self.cfd_actor_addr.clone(); let commit_subscription = commit_subscription.clone(); @@ -72,12 +195,18 @@ where commit_subscription.wait_until_final().await.unwrap(); cfd_actor_addr - .do_send_async(Event::CommitFinality(id)) + .do_send_async(Event::CommitFinality(order_id)) .await .unwrap(); } }); + } + async fn monitor_commit_cet_timelock( + &self, + order_id: OrderId, + commit_subscription: &Subscription, + ) { tokio::spawn({ let cfd_actor_addr = self.cfd_actor_addr.clone(); let commit_subscription = commit_subscription.clone(); @@ -88,12 +217,19 @@ where .unwrap(); cfd_actor_addr - .do_send_async(Event::CetTimelockExpired(id)) + .do_send_async(Event::CetTimelockExpired(order_id)) .await .unwrap(); } }); + } + async fn monitor_commit_refund_timelock( + &self, + params: &MonitorParams, + order_id: OrderId, + commit_subscription: &Subscription, + ) { tokio::spawn({ let cfd_actor_addr = self.cfd_actor_addr.clone(); let commit_subscription = commit_subscription.clone(); @@ -105,12 +241,14 @@ where .unwrap(); cfd_actor_addr - .do_send_async(Event::RefundTimelockExpired(id)) + .do_send_async(Event::RefundTimelockExpired(order_id)) .await .unwrap(); } }); + } + async fn monitor_refund_finality(&self, params: MonitorParams, order_id: OrderId) { tokio::spawn({ let cfd_actor_addr = self.cfd_actor_addr.clone(); let refund_subscription = self @@ -121,15 +259,11 @@ where refund_subscription.wait_until_final().await.unwrap(); cfd_actor_addr - .do_send_async(Event::RefundFinality(id)) + .do_send_async(Event::RefundFinality(order_id)) .await .unwrap(); } }); - - // TODO: CET subscription => Requires information from Oracle - - Ok(()) } } diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index f011920..4a775e1 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -1,4 +1,5 @@ use crate::cfd_feed::CfdFeed; +use crate::db::load_all_cfds; use crate::model::WalletInfo; use crate::wallet::Wallet; use anyhow::{Context, Result}; @@ -10,7 +11,6 @@ use model::cfd::Order; use rocket::fairing::AdHoc; use rocket_db_pools::Database; use seed::Seed; -use std::collections::HashMap; use std::net::SocketAddr; use std::path::PathBuf; use std::thread::sleep; @@ -25,6 +25,7 @@ use xtra::Actor; mod actors; mod bitmex_price_feed; mod cfd_feed; +mod cleanup; mod db; mod keypair; mod logger; @@ -153,20 +154,27 @@ async fn main() -> Result<()> { None => return Err(rocket), }; + cleanup::transition_non_continue_cfds_to_setup_failed(db.clone()) + .await + .unwrap(); + let send_to_maker = send_to_socket::Actor::new(write) .create(None) .spawn_global(); let (monitor_actor_address, monitor_actor_context) = xtra::Context::new(None); + let mut conn = db.acquire().await.unwrap(); + let cfds = load_all_cfds(&mut conn).await.unwrap(); let cfd_actor_inbox = taker_cfd::Actor::new( - db, + db.clone(), wallet.clone(), schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle), cfd_feed_updater, order_feed_sender, send_to_maker, monitor_actor_address, + cfds.clone(), ) .await .unwrap() @@ -177,11 +185,11 @@ async fn main() -> Result<()> { .map(move |item| taker_cfd::MakerStreamMessage { item }); tokio::spawn(cfd_actor_inbox.clone().attach_stream(read)); - tokio::spawn(monitor_actor_context.run(monitor::Actor::new( - &opts.electrum, - HashMap::new(), - cfd_actor_inbox.clone(), - ))); + tokio::spawn( + monitor_actor_context.run( + monitor::Actor::new(&opts.electrum, cfd_actor_inbox.clone(), cfds).await, + ), + ); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); tokio::spawn({ let cfd_actor_inbox = cfd_actor_inbox.clone(); diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index dcac221..bf36b54 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -57,6 +57,7 @@ pub struct Actor { } impl Actor { + #[allow(clippy::too_many_arguments)] pub async fn new( db: sqlx::SqlitePool, wallet: Wallet, @@ -65,12 +66,26 @@ impl Actor { order_feed_actor_inbox: watch::Sender>, send_to_maker: Address>, monitor_actor: Address>, + cfds: Vec, ) -> Result { let mut conn = db.acquire().await?; // populate the CFD feed with existing CFDs cfd_feed.update(&mut conn).await?; + for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) { + let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?; + + tracing::info!("Lock transaction published with txid {}", txid); + } + + for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) { + let signed_refund_tx = cfd.refund_tx()?; + let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?; + + tracing::info!("Refund transaction published on chain: {}", txid); + } + Ok(Self { db, wallet, @@ -250,24 +265,10 @@ impl Actor { // refund timelock let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - let script_pubkey = dlc.address.script_pubkey(); self.monitor_actor .do_send_async(monitor::StartMonitoring { id: order_id, - params: MonitorParams { - lock: (dlc.lock.0.txid(), dlc.lock.1), - commit: (dlc.commit.0.txid(), dlc.commit.2), - cets: dlc - .cets - .into_iter() - .map(|(tx, _, range)| (tx.txid(), script_pubkey.clone(), range)) - .collect(), - refund: ( - dlc.refund.0.txid(), - script_pubkey, - cfd.refund_timelock_in_blocks(), - ), - }, + params: MonitorParams::from_dlc_and_timelocks(dlc, cfd.refund_timelock_in_blocks()), }) .await?;