From eee0fdac252999ccf98538306a9be6f30f548d9d Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 29 Sep 2021 12:02:53 +1000 Subject: [PATCH] Rework monitoring actor to use local state instead of tasks This should greatly improve resilience as we no longer have any tasks within `tokio` that can fail without recovering. Instead, we periodically ping the actor with a `Sync` message which updates the local state of all scripts and sends out messages accordingly. --- daemon/src/maker.rs | 17 +- daemon/src/monitor.rs | 532 ++++++++++++++++++----------- daemon/src/monitor/subscription.rs | 429 ----------------------- daemon/src/taker.rs | 11 +- 4 files changed, 359 insertions(+), 630 deletions(-) delete mode 100644 daemon/src/monitor/subscription.rs diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index b31d377..6f718f7 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -160,7 +160,7 @@ async fn main() -> Result<()> { let (maker_inc_connections_address, maker_inc_connections_context) = xtra::Context::new(None); - let (monitor_actor_address, monitor_actor_context) = xtra::Context::new(None); + let (monitor_actor_address, mut monitor_actor_context) = xtra::Context::new(None); let (oracle_actor_address, mut oracle_actor_context) = xtra::Context::new(None); @@ -187,9 +187,18 @@ async fn main() -> Result<()> { cfd_maker_actor_inbox.clone(), )), ); - tokio::spawn(monitor_actor_context.run( - monitor::Actor::new(&opts.electrum, cfd_maker_actor_inbox.clone(), cfds).await, - )); + tokio::spawn( + monitor_actor_context + .notify_interval(Duration::from_secs(20), || monitor::Sync) + .unwrap(), + ); + tokio::spawn( + monitor_actor_context.run( + monitor::Actor::new(&opts.electrum, cfd_maker_actor_inbox.clone(), cfds) + .await + .unwrap(), + ), + ); tokio::spawn( oracle_actor_context diff --git a/daemon/src/monitor.rs b/daemon/src/monitor.rs index 6f3685c..452eb23 100644 --- a/daemon/src/monitor.rs +++ b/daemon/src/monitor.rs @@ -1,20 +1,25 @@ use crate::actors::log_error; use crate::model::cfd::{CetStatus, Cfd, CfdState, Dlc, OrderId}; -use crate::monitor::subscription::Subscription; use crate::oracle; -use anyhow::Result; +use anyhow::{Context, Result}; use async_trait::async_trait; use bdk::bitcoin::{PublicKey, Script, Txid}; use bdk::descriptor::Descriptor; +use bdk::electrum_client::{ElectrumApi, GetHistoryRes, HeaderNotification}; use bdk::miniscript::DescriptorTrait; -use std::collections::HashMap; -use std::ops::RangeInclusive; -use subscription::Monitor; - -mod subscription; +use std::collections::hash_map::Entry; +use std::collections::{BTreeMap, HashMap}; +use std::convert::{TryFrom, TryInto}; +use std::fmt; +use std::ops::{Add, RangeInclusive}; const FINALITY_CONFIRMATIONS: u32 = 1; +pub struct StartMonitoring { + pub id: OrderId, + pub params: MonitorParams, +} + #[derive(Clone)] pub struct MonitorParams { lock: (Txid, Descriptor), @@ -23,24 +28,19 @@ pub struct MonitorParams { refund: (Txid, Script, u32), } -impl MonitorParams { - pub fn from_dlc_and_timelocks(dlc: Dlc, refund_timelock_in_blocks: u32) -> Self { - let script_pubkey = dlc.address.script_pubkey(); - MonitorParams { - lock: (dlc.lock.0.txid(), dlc.lock.1), - commit: (dlc.commit.0.txid(), dlc.commit.2), - cets: dlc - .cets - .into_iter() - .map(|(tx, _, range)| (tx.txid(), script_pubkey.clone(), range)) - .collect(), - refund: ( - dlc.refund.0.txid(), - script_pubkey, - refund_timelock_in_blocks, - ), - } - } +pub struct Sync; + +pub struct Actor +where + T: xtra::Actor, +{ + cfds: HashMap, + cfd_actor_addr: xtra::Address, + + client: bdk::electrum_client::Client, + latest_block_height: BlockHeight, + current_status: BTreeMap<(Txid, Script), ScriptStatus>, + awaiting_status: HashMap<(Txid, Script), Vec<(ScriptStatus, Event)>>, } impl Actor @@ -51,13 +51,23 @@ where electrum_rpc_url: &str, cfd_actor_addr: xtra::Address, cfds: Vec, - ) -> Self { - let monitor = Monitor::new(electrum_rpc_url, FINALITY_CONFIRMATIONS).unwrap(); + ) -> Result { + let client = bdk::electrum_client::Client::new(electrum_rpc_url) + .context("Failed to initialize Electrum RPC client")?; + + // Initially fetch the latest block for storing the height. + // We do not act on this subscription after this call. + let latest_block = client + .block_headers_subscribe() + .context("Failed to subscribe to header notifications")?; let mut actor = Self { - monitor, cfds: HashMap::new(), cfd_actor_addr, + client, + latest_block_height: BlockHeight::try_from(latest_block)?, + current_status: BTreeMap::default(), + awaiting_status: HashMap::default(), }; for cfd in cfds { @@ -66,40 +76,37 @@ where CfdState::PendingOpen { dlc, .. } => { let params = MonitorParams::from_dlc_and_timelocks(dlc.clone(), cfd.refund_timelock_in_blocks()); actor.cfds.insert(cfd.order.id, params.clone()); - actor.monitor_all(¶ms, cfd.order.id).await; + actor.monitor_all(¶ms, cfd.order.id); } CfdState::Open { dlc, .. } | CfdState::PendingCommit { dlc, .. } => { let params = MonitorParams::from_dlc_and_timelocks(dlc.clone(), cfd.refund_timelock_in_blocks()); actor.cfds.insert(cfd.order.id, params.clone()); - actor.monitor_commit_finality_and_timelocks(¶ms, cfd.order.id).await; - actor.monitor_refund_finality( params.clone(),cfd.order.id).await; + actor.monitor_commit_finality(¶ms, cfd.order.id); + actor.monitor_commit_cet_timelock(¶ms, cfd.order.id); + actor.monitor_commit_refund_timelock(¶ms, cfd.order.id); + actor.monitor_refund_finality(¶ms,cfd.order.id); } CfdState::OpenCommitted { dlc, cet_status, .. } => { let params = MonitorParams::from_dlc_and_timelocks(dlc.clone(), cfd.refund_timelock_in_blocks()); actor.cfds.insert(cfd.order.id, params.clone()); - let commit_subscription = actor - .monitor - .subscribe_to((params.commit.0, params.commit.1.script_pubkey())) - .await; - match cet_status { CetStatus::Unprepared | CetStatus::OracleSigned(_) => { - actor.monitor_commit_cet_timelock(cfd.order.id, &commit_subscription).await; - actor.monitor_commit_refund_timelock(¶ms, cfd.order.id, &commit_subscription).await; - actor.monitor_refund_finality( params.clone(),cfd.order.id).await; + actor.monitor_commit_cet_timelock(¶ms, cfd.order.id); + actor.monitor_commit_refund_timelock(¶ms, cfd.order.id); + actor.monitor_refund_finality(¶ms,cfd.order.id); } CetStatus::TimelockExpired => { - actor.monitor_commit_refund_timelock(¶ms, cfd.order.id, &commit_subscription).await; - actor.monitor_refund_finality( params.clone(),cfd.order.id).await; + actor.monitor_commit_refund_timelock(¶ms, cfd.order.id); + actor.monitor_refund_finality(¶ms,cfd.order.id); } CetStatus::Ready(_price) => { // TODO: monitor CET finality - actor.monitor_commit_refund_timelock(¶ms, cfd.order.id, &commit_subscription).await; - actor.monitor_refund_finality( params.clone(),cfd.order.id).await; + actor.monitor_commit_refund_timelock(¶ms, cfd.order.id); + actor.monitor_refund_finality(¶ms,cfd.order.id); } } } @@ -109,13 +116,8 @@ where let params = MonitorParams::from_dlc_and_timelocks(dlc.clone(), cfd.refund_timelock_in_blocks()); actor.cfds.insert(cfd.order.id, params.clone()); - let commit_subscription = actor - .monitor - .subscribe_to((params.commit.0, params.commit.1.script_pubkey())) - .await; - - actor.monitor_commit_refund_timelock(¶ms, cfd.order.id, &commit_subscription).await; - actor.monitor_refund_finality( params.clone(),cfd.order.id).await; + actor.monitor_commit_refund_timelock(¶ms, cfd.order.id); + actor.monitor_refund_finality(¶ms,cfd.order.id); } // too early to monitor @@ -131,143 +133,86 @@ where } } - actor + Ok(actor) } - async fn handle_start_monitoring(&mut self, msg: StartMonitoring) -> Result<()> { - let StartMonitoring { id, params } = msg; - - self.cfds.insert(id, params.clone()); - self.monitor_all(¶ms, id).await; - - Ok(()) + fn monitor_all(&mut self, params: &MonitorParams, order_id: OrderId) { + self.monitor_lock_finality(params, order_id); + self.monitor_commit_finality(params, order_id); + self.monitor_commit_cet_timelock(params, order_id); + self.monitor_commit_refund_timelock(params, order_id); + self.monitor_refund_finality(params, order_id); } - async fn monitor_all(&self, params: &MonitorParams, order_id: OrderId) { - self.monitor_lock_finality(params, order_id).await; - - self.monitor_commit_finality_and_timelocks(params, order_id) - .await; - - self.monitor_refund_finality(params.clone(), order_id).await; + fn monitor_lock_finality(&mut self, params: &MonitorParams, order_id: OrderId) { + self.awaiting_status + .entry((params.lock.0, params.lock.1.script_pubkey())) + .or_default() + .push((ScriptStatus::finality(), Event::LockFinality(order_id))); } - async fn monitor_lock_finality(&self, params: &MonitorParams, order_id: OrderId) { - tokio::spawn({ - let cfd_actor_addr = self.cfd_actor_addr.clone(); - let lock_subscription = self - .monitor - .subscribe_to((params.lock.0, params.lock.1.script_pubkey())) - .await; - async move { - lock_subscription.wait_until_final().await.unwrap(); - - cfd_actor_addr - .do_send_async(Event::LockFinality(order_id)) - .await - .unwrap(); - } - }); + fn monitor_commit_finality(&mut self, params: &MonitorParams, order_id: OrderId) { + self.awaiting_status + .entry((params.commit.0, params.commit.1.script_pubkey())) + .or_default() + .push((ScriptStatus::finality(), Event::CommitFinality(order_id))); } - async fn monitor_commit_finality_and_timelocks( - &self, - params: &MonitorParams, - order_id: OrderId, - ) { - let commit_subscription = self - .monitor - .subscribe_to((params.commit.0, params.commit.1.script_pubkey())) - .await; - - self.monitor_commit_finality(order_id, &commit_subscription) - .await; - self.monitor_commit_cet_timelock(order_id, &commit_subscription) - .await; - self.monitor_commit_refund_timelock(params, order_id, &commit_subscription) - .await; + fn monitor_commit_cet_timelock(&mut self, params: &MonitorParams, order_id: OrderId) { + self.awaiting_status + .entry((params.commit.0, params.commit.1.script_pubkey())) + .or_default() + .push(( + ScriptStatus::Confirmed(Confirmed::with_confirmations(Cfd::CET_TIMELOCK)), + Event::CetTimelockExpired(order_id), + )); } - async fn monitor_commit_finality(&self, order_id: OrderId, commit_subscription: &Subscription) { - tokio::spawn({ - let cfd_actor_addr = self.cfd_actor_addr.clone(); - let commit_subscription = commit_subscription.clone(); - async move { - commit_subscription.wait_until_final().await.unwrap(); - - cfd_actor_addr - .do_send_async(Event::CommitFinality(order_id)) - .await - .unwrap(); - } - }); + fn monitor_commit_refund_timelock(&mut self, params: &MonitorParams, order_id: OrderId) { + self.awaiting_status + .entry((params.commit.0, params.commit.1.script_pubkey())) + .or_default() + .push(( + ScriptStatus::Confirmed(Confirmed::with_confirmations(params.refund.2)), + Event::RefundTimelockExpired(order_id), + )); } - async fn monitor_commit_cet_timelock( - &self, - order_id: OrderId, - commit_subscription: &Subscription, - ) { - tokio::spawn({ - let cfd_actor_addr = self.cfd_actor_addr.clone(); - let commit_subscription = commit_subscription.clone(); - async move { - commit_subscription - .wait_until_confirmed_with(Cfd::CET_TIMELOCK) - .await - .unwrap(); - - cfd_actor_addr - .do_send_async(Event::CetTimelockExpired(order_id)) - .await - .unwrap(); - } - }); + fn monitor_refund_finality(&mut self, params: &MonitorParams, order_id: OrderId) { + self.awaiting_status + .entry((params.refund.0, params.refund.1.clone())) + .or_default() + .push((ScriptStatus::finality(), Event::RefundFinality(order_id))); } - async fn monitor_commit_refund_timelock( - &self, - params: &MonitorParams, - order_id: OrderId, - commit_subscription: &Subscription, - ) { - tokio::spawn({ - let cfd_actor_addr = self.cfd_actor_addr.clone(); - let commit_subscription = commit_subscription.clone(); - let refund_timelock = params.refund.2; - async move { - commit_subscription - .wait_until_confirmed_with(refund_timelock) - .await - .unwrap(); - - cfd_actor_addr - .do_send_async(Event::RefundTimelockExpired(order_id)) - .await - .unwrap(); - } - }); - } + async fn sync(&mut self) -> Result<()> { + // Fetch the latest block for storing the height. + // We do not act on this subscription after this call, as we cannot rely on + // subscription push notifications because eventually the Electrum server will + // close the connection and subscriptions are not automatically renewed + // upon renewing the connection. + let latest_block_height = self + .client + .block_headers_subscribe() + .context("Failed to subscribe to header notifications")? + .try_into()?; + + tracing::trace!( + "Updating status of {} transactions", + self.awaiting_status.len() + ); + + let histories = self + .client + .batch_script_get_history(self.awaiting_status.keys().map(|(_, script)| script)) + .context("Failed to get script histories")?; + + self.update_state(latest_block_height, histories).await?; - async fn monitor_refund_finality(&self, params: MonitorParams, order_id: OrderId) { - tokio::spawn({ - let cfd_actor_addr = self.cfd_actor_addr.clone(); - let refund_subscription = self - .monitor - .subscribe_to((params.refund.0, params.refund.1)) - .await; - async move { - refund_subscription.wait_until_final().await.unwrap(); - - cfd_actor_addr - .do_send_async(Event::RefundFinality(order_id)) - .await - .unwrap(); - } - }); + Ok(()) } - pub async fn handle_oracle_attestation(&self, attestation: oracle::Attestation) -> Result<()> { + async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { for (order_id, MonitorParams { cets, .. }) in self.cfds.clone().into_iter() { let (txid, script_pubkey) = match cets.iter().find_map(|(txid, script_pubkey, range)| { @@ -279,30 +224,200 @@ where None => continue, }; - tokio::spawn({ - let cfd_actor_addr = self.cfd_actor_addr.clone(); - let cet_subscription = self - .monitor - .subscribe_to((*txid, script_pubkey.clone())) - .await; - async move { - cet_subscription.wait_until_final().await.unwrap(); - - cfd_actor_addr - .do_send_async(Event::CetFinality(order_id)) - .await - .unwrap(); + self.awaiting_status + .entry((*txid, script_pubkey.clone())) + .or_default() + .push((ScriptStatus::finality(), Event::CetFinality(order_id))); + } + + Ok(()) + } + + async fn update_state( + &mut self, + latest_block_height: BlockHeight, + histories: Vec>, + ) -> Result<()> { + if latest_block_height > self.latest_block_height { + tracing::debug!( + block_height = u32::from(latest_block_height), + "Got notification for new block" + ); + self.latest_block_height = latest_block_height; + } + + // 1. shape response into local data format + let new_status = histories.into_iter().zip(self.awaiting_status.keys().cloned()).map(|(script_history, (txid, script))| { + let new_script_status = match script_history.as_slice() { + [] => ScriptStatus::Unseen, + [remaining @ .., last] => { + if !remaining.is_empty() { + tracing::warn!("Found more than a single history entry for script. This is highly unexpected and those history entries will be ignored") + } + + if last.height <= 0 { + ScriptStatus::InMempool + } else { + ScriptStatus::Confirmed( + Confirmed::from_inclusion_and_latest_block( + u32::try_from(last.height).expect("we checked that height is > 0"), + u32::from(self.latest_block_height), + ), + ) + } } - }); + }; + + ((txid, script), new_script_status) + }).collect::>(); + + // 2. log any changes since our last sync + for ((txid, script), status) in new_status.iter() { + let old = self.current_status.get(&(*txid, script.clone())); + + print_status_change(*txid, old, status); + } + + // 3. update local state + self.current_status = new_status; + + // 4. check for finished monitoring tasks + for ((txid, script), status) in self.current_status.iter() { + match self.awaiting_status.entry((*txid, script.clone())) { + Entry::Vacant(_) => { + unreachable!("we are only fetching the status of scripts we are waiting for") + } + Entry::Occupied(mut occupied) => { + let targets = occupied.insert(Vec::new()); + + // Split vec into two lists, all the ones for which we reached the target and + // the ones which we need to still monitor + let (reached_monitoring_target, remaining) = targets + .into_iter() + .partition::, _>(|(target_status, event)| { + tracing::trace!( + "{:?} requires {} and we have {}", + event, + target_status, + status + ); + + status >= target_status + }); + + tracing::trace!("{} subscriptions reached their monitoring target, {} remaining for this script", reached_monitoring_target.len(), remaining.len()); + + occupied.insert(remaining); + + for (target_status, event) in reached_monitoring_target { + tracing::info!(%txid, target = %target_status, current = %status, "Bitcoin transaction reached monitoring target"); + self.cfd_actor_addr.send(event).await?; + } + } + } } Ok(()) } } -pub struct StartMonitoring { - pub id: OrderId, - pub params: MonitorParams, +fn print_status_change(txid: Txid, old: Option<&ScriptStatus>, new: &ScriptStatus) { + match (old, new) { + (None, new_status) if new_status > &ScriptStatus::Unseen => { + tracing::debug!(%txid, status = %new_status, "Found relevant Bitcoin transaction"); + } + (Some(old_status), new_status) if old_status != new_status => { + tracing::debug!(%txid, %new_status, %old_status, "Bitcoin transaction status changed"); + } + _ => {} + } +} + +#[derive(Debug, Copy, Clone, PartialEq, PartialOrd)] +struct Confirmed { + /// The depth of this transaction within the blockchain. + /// + /// Will be zero if the transaction is included in the latest block. + depth: u32, +} + +impl Confirmed { + fn with_confirmations(blocks: u32) -> Self { + Self { depth: blocks - 1 } + } + + /// Compute the depth of a transaction based on its inclusion height and the + /// latest known block. + /// + /// Our information about the latest block might be outdated. To avoid an + /// overflow, we make sure the depth is 0 in case the inclusion height + /// exceeds our latest known block, + fn from_inclusion_and_latest_block(inclusion_height: u32, latest_block: u32) -> Self { + let depth = latest_block.saturating_sub(inclusion_height); + + Self { depth } + } + + fn confirmations(&self) -> u32 { + self.depth + 1 + } +} + +#[derive(Debug, Copy, Clone, PartialEq, PartialOrd)] +enum ScriptStatus { + Unseen, + InMempool, + Confirmed(Confirmed), +} + +impl ScriptStatus { + fn finality() -> Self { + Self::Confirmed(Confirmed::with_confirmations(FINALITY_CONFIRMATIONS)) + } +} + +impl fmt::Display for ScriptStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ScriptStatus::Unseen => write!(f, "unseen"), + ScriptStatus::InMempool => write!(f, "in mempool"), + ScriptStatus::Confirmed(inner) => { + write!(f, "confirmed with {} blocks", inner.confirmations()) + } + } + } +} + +/// Represent a block height, or block number, expressed in absolute block +/// count. E.g. The transaction was included in block #655123, 655123 block +/// after the genesis block. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd)] +struct BlockHeight(u32); + +impl From for u32 { + fn from(height: BlockHeight) -> Self { + height.0 + } +} + +impl TryFrom for BlockHeight { + type Error = anyhow::Error; + + fn try_from(value: HeaderNotification) -> Result { + Ok(Self( + value + .height + .try_into() + .context("Failed to fit usize into u32")?, + )) + } +} + +impl Add for BlockHeight { + type Output = BlockHeight; + fn add(self, rhs: u32) -> Self::Output { + BlockHeight(self.0 + rhs) + } } impl xtra::Message for StartMonitoring { @@ -334,17 +449,32 @@ impl Event { } } +impl MonitorParams { + pub fn from_dlc_and_timelocks(dlc: Dlc, refund_timelock_in_blocks: u32) -> Self { + let script_pubkey = dlc.address.script_pubkey(); + MonitorParams { + lock: (dlc.lock.0.txid(), dlc.lock.1), + commit: (dlc.commit.0.txid(), dlc.commit.2), + cets: dlc + .cets + .into_iter() + .map(|(tx, _, range)| (tx.txid(), script_pubkey.clone(), range)) + .collect(), + refund: ( + dlc.refund.0.txid(), + script_pubkey, + refund_timelock_in_blocks, + ), + } + } +} + impl xtra::Message for Event { type Result = (); } -pub struct Actor -where - T: xtra::Actor, -{ - monitor: Monitor, - cfds: HashMap, - cfd_actor_addr: xtra::Address, +impl xtra::Message for Sync { + type Result = (); } impl xtra::Actor for Actor where T: xtra::Actor {} @@ -355,7 +485,19 @@ where T: xtra::Actor + xtra::Handler, { async fn handle(&mut self, msg: StartMonitoring, _ctx: &mut xtra::Context) { - log_error!(self.handle_start_monitoring(msg)); + let StartMonitoring { id, params } = msg; + + self.monitor_all(¶ms, id); + self.cfds.insert(id, params); + } +} +#[async_trait] +impl xtra::Handler for Actor +where + T: xtra::Actor + xtra::Handler, +{ + async fn handle(&mut self, _: Sync, _ctx: &mut xtra::Context) { + log_error!(self.sync()); } } diff --git a/daemon/src/monitor/subscription.rs b/daemon/src/monitor/subscription.rs deleted file mode 100644 index 43808a3..0000000 --- a/daemon/src/monitor/subscription.rs +++ /dev/null @@ -1,429 +0,0 @@ -#![allow(dead_code)] - -use anyhow::{bail, Context, Result}; -use bdk::bitcoin::{Script, Txid}; -use bdk::electrum_client::{ElectrumApi, GetHistoryRes, HeaderNotification}; -use serde::{Deserialize, Serialize}; -use std::collections::{BTreeMap, HashMap}; -use std::convert::{TryFrom, TryInto}; -use std::fmt; -use std::ops::Add; -use std::sync::Arc; -use tokio::sync::{watch, Mutex}; -use tokio::time::{Duration, Instant}; - -pub struct Monitor { - client: Arc>, - finality_confirmations: u32, -} - -impl Monitor { - pub fn new(electrum_rpc_url: &str, finality_confirmations: u32) -> Result { - let client = bdk::electrum_client::Client::new(electrum_rpc_url) - .context("Failed to initialize Electrum RPC client")?; - - let client = Client::new(client, Duration::from_secs(10))?; - - let monitor = Monitor { - client: Arc::new(Mutex::new(client)), - finality_confirmations, - }; - - Ok(monitor) - } - - pub async fn subscribe_to(&self, tx: impl Watchable + Send + 'static) -> Subscription { - let txid = tx.id(); - let script = tx.script(); - - let sub = self - .client - .lock() - .await - .subscriptions - .entry((txid, script.clone())) - .or_insert_with(|| { - let (sender, receiver) = watch::channel(ScriptStatus::Unseen); - let client = self.client.clone(); - - tokio::spawn(async move { - let mut last_status = None; - - // TODO: We need feedback in the monitoring actor about failures in here - loop { - tokio::time::sleep(Duration::from_secs(5)).await; - - let new_status = match client.lock().await.status_of_script(&tx) { - Ok(new_status) => new_status, - Err(error) => { - tracing::warn!(%txid, "Failed to get status of script: {:#}", error); - return; - } - }; - - last_status = Some(print_status_change(txid, last_status, new_status)); - - let all_receivers_gone = sender.send(new_status).is_err(); - - if all_receivers_gone { - tracing::debug!(%txid, "All receivers gone, removing subscription"); - client.lock().await.subscriptions.remove(&(txid, script)); - return; - } - } - }); - - Subscription { - receiver, - finality_confirmations: self.finality_confirmations, - txid, - } - }) - .clone(); - - sub - } -} - -/// Represents a subscription to the status of a given transaction. -#[derive(Debug, Clone)] -pub struct Subscription { - receiver: watch::Receiver, - finality_confirmations: u32, - txid: Txid, -} - -impl Subscription { - pub async fn wait_until_final(&self) -> Result<()> { - let conf_target = self.finality_confirmations; - let txid = self.txid; - - tracing::info!(%txid, required_confirmation=%conf_target, "Waiting for Bitcoin transaction finality"); - - let mut seen_confirmations = 0; - - self.wait_until(|status| match status { - ScriptStatus::Confirmed(inner) => { - let confirmations = inner.confirmations(); - - if confirmations > seen_confirmations { - tracing::info!(%txid, - seen_confirmations = %confirmations, - needed_confirmations = %conf_target, - "Waiting for Bitcoin transaction finality"); - seen_confirmations = confirmations; - } - - inner.meets_target(conf_target) - } - _ => false, - }) - .await - } - - pub async fn wait_until_seen(&self) -> Result<()> { - self.wait_until(ScriptStatus::has_been_seen).await - } - - pub async fn wait_until_confirmed_with(&self, target: T) -> Result<()> - where - u32: PartialOrd, - T: Copy, - { - self.wait_until(|status| status.is_confirmed_with(target)) - .await - } - - async fn wait_until(&self, mut predicate: impl FnMut(&ScriptStatus) -> bool) -> Result<()> { - let mut receiver = self.receiver.clone(); - - while !predicate(&receiver.borrow()) { - receiver - .changed() - .await - .context("Failed while waiting for next status update")?; - } - - Ok(()) - } -} - -/// Defines a watchable transaction. -/// -/// For a transaction to be watchable, we need to know two things: Its -/// transaction ID and the specific output script that is going to change. -/// A transaction can obviously have multiple outputs but our protocol purposes, -/// we are usually interested in a specific one. -pub trait Watchable { - fn id(&self) -> Txid; - fn script(&self) -> Script; -} - -impl Watchable for (Txid, Script) { - fn id(&self) -> Txid { - self.0 - } - - fn script(&self) -> Script { - self.1.clone() - } -} - -fn print_status_change(txid: Txid, old: Option, new: ScriptStatus) -> ScriptStatus { - match (old, new) { - (None, new_status) => { - tracing::debug!(%txid, status = %new_status, "Found relevant Bitcoin transaction"); - } - (Some(old_status), new_status) if old_status != new_status => { - tracing::debug!(%txid, %new_status, %old_status, "Bitcoin transaction status changed"); - } - _ => {} - } - - new -} - -pub struct Client { - electrum: bdk::electrum_client::Client, - latest_block_height: BlockHeight, - last_sync: Instant, - sync_interval: Duration, - script_history: BTreeMap>, - subscriptions: HashMap<(Txid, Script), Subscription>, -} - -impl Client { - fn new(electrum: bdk::electrum_client::Client, interval: Duration) -> Result { - // Initially fetch the latest block for storing the height. - // We do not act on this subscription after this call. - let latest_block = electrum - .block_headers_subscribe() - .context("Failed to subscribe to header notifications")?; - - Ok(Self { - electrum, - latest_block_height: BlockHeight::try_from(latest_block)?, - last_sync: Instant::now(), - sync_interval: interval, - script_history: Default::default(), - subscriptions: Default::default(), - }) - } - - fn update_state(&mut self) -> Result<()> { - let now = Instant::now(); - if now < self.last_sync + self.sync_interval { - return Ok(()); - } - - self.last_sync = now; - self.update_latest_block()?; - self.update_script_histories()?; - - Ok(()) - } - - fn status_of_script(&mut self, tx: &T) -> Result - where - T: Watchable, - { - let txid = tx.id(); - let script = tx.script(); - - if !self.script_history.contains_key(&script) { - self.script_history.insert(script.clone(), vec![]); - } - - self.update_state()?; - - let history = self.script_history.entry(script).or_default(); - - let history_of_tx = history - .iter() - .filter(|entry| entry.tx_hash == txid) - .collect::>(); - - match history_of_tx.as_slice() { - [] => Ok(ScriptStatus::Unseen), - [remaining @ .., last] => { - if !remaining.is_empty() { - tracing::warn!("Found more than a single history entry for script. This is highly unexpected and those history entries will be ignored") - } - - if last.height <= 0 { - Ok(ScriptStatus::InMempool) - } else { - Ok(ScriptStatus::Confirmed( - Confirmed::from_inclusion_and_latest_block( - u32::try_from(last.height)?, - u32::from(self.latest_block_height), - ), - )) - } - } - } - } - - fn update_latest_block(&mut self) -> Result<()> { - // Fetch the latest block for storing the height. - // We do not act on this subscription after this call, as we cannot rely on - // subscription push notifications because eventually the Electrum server will - // close the connection and subscriptions are not automatically renewed - // upon renewing the connection. - let latest_block = self - .electrum - .block_headers_subscribe() - .context("Failed to subscribe to header notifications")?; - let latest_block_height = BlockHeight::try_from(latest_block)?; - - if latest_block_height > self.latest_block_height { - tracing::debug!( - block_height = u32::from(latest_block_height), - "Got notification for new block" - ); - self.latest_block_height = latest_block_height; - } - - Ok(()) - } - - fn update_script_histories(&mut self) -> Result<()> { - let histories = self - .electrum - .batch_script_get_history(self.script_history.keys()) - .context("Failed to get script histories")?; - - if histories.len() != self.script_history.len() { - bail!( - "Expected {} history entries, received {}", - self.script_history.len(), - histories.len() - ); - } - - let scripts = self.script_history.keys().cloned(); - let histories = histories.into_iter(); - - self.script_history = scripts.zip(histories).collect::>(); - - Ok(()) - } -} - -/// Represent a block height, or block number, expressed in absolute block -/// count. E.g. The transaction was included in block #655123, 655123 block -/// after the genesis block. -#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)] -#[serde(transparent)] -pub struct BlockHeight(u32); - -impl From for u32 { - fn from(height: BlockHeight) -> Self { - height.0 - } -} - -impl TryFrom for BlockHeight { - type Error = anyhow::Error; - - fn try_from(value: HeaderNotification) -> Result { - Ok(Self( - value - .height - .try_into() - .context("Failed to fit usize into u32")?, - )) - } -} - -impl Add for BlockHeight { - type Output = BlockHeight; - fn add(self, rhs: u32) -> Self::Output { - BlockHeight(self.0 + rhs) - } -} - -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum ExpiredTimelocks { - None, - Cancel, - Punish, -} - -#[derive(Debug, Copy, Clone, PartialEq)] -pub struct Confirmed { - /// The depth of this transaction within the blockchain. - /// - /// Will be zero if the transaction is included in the latest block. - depth: u32, -} - -impl Confirmed { - pub fn new(depth: u32) -> Self { - Self { depth } - } - - /// Compute the depth of a transaction based on its inclusion height and the - /// latest known block. - /// - /// Our information about the latest block might be outdated. To avoid an - /// overflow, we make sure the depth is 0 in case the inclusion height - /// exceeds our latest known block, - pub fn from_inclusion_and_latest_block(inclusion_height: u32, latest_block: u32) -> Self { - let depth = latest_block.saturating_sub(inclusion_height); - - Self { depth } - } - - pub fn confirmations(&self) -> u32 { - self.depth + 1 - } - - pub fn meets_target(&self, target: T) -> bool - where - u32: PartialOrd, - { - self.confirmations() >= target - } -} - -#[derive(Debug, Copy, Clone, PartialEq)] -pub enum ScriptStatus { - Unseen, - InMempool, - Confirmed(Confirmed), -} - -impl ScriptStatus { - /// Check if the script has any confirmations. - pub fn is_confirmed(&self) -> bool { - matches!(self, ScriptStatus::Confirmed(_)) - } - - /// Check if the script has met the given confirmation target. - pub fn is_confirmed_with(&self, target: T) -> bool - where - u32: PartialOrd, - { - match self { - ScriptStatus::Confirmed(inner) => inner.meets_target(target), - _ => false, - } - } - - pub fn has_been_seen(&self) -> bool { - matches!(self, ScriptStatus::InMempool | ScriptStatus::Confirmed(_)) - } -} - -impl fmt::Display for ScriptStatus { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ScriptStatus::Unseen => write!(f, "unseen"), - ScriptStatus::InMempool => write!(f, "in mempool"), - ScriptStatus::Confirmed(inner) => { - write!(f, "confirmed with {} blocks", inner.confirmations()) - } - } - } -} diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index a13d4b9..7d737c9 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -163,7 +163,7 @@ async fn main() -> Result<()> { .create(None) .spawn_global(); - let (monitor_actor_address, monitor_actor_context) = xtra::Context::new(None); + let (monitor_actor_address, mut monitor_actor_context) = xtra::Context::new(None); let (oracle_actor_address, mut oracle_actor_context) = xtra::Context::new(None); @@ -189,9 +189,16 @@ async fn main() -> Result<()> { .map(move |item| taker_cfd::MakerStreamMessage { item }); tokio::spawn(cfd_actor_inbox.clone().attach_stream(read)); + tokio::spawn( + monitor_actor_context + .notify_interval(Duration::from_secs(20), || monitor::Sync) + .unwrap(), + ); tokio::spawn( monitor_actor_context.run( - monitor::Actor::new(&opts.electrum, cfd_actor_inbox.clone(), cfds).await, + monitor::Actor::new(&opts.electrum, cfd_actor_inbox.clone(), cfds) + .await + .unwrap(), ), ); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));