Browse Source

Merge pull request #178 from comit-network/simplify-monitoring

Rework monitoring actor to use local state instead of tasks
upload-correct-windows-binary
Thomas Eizinger 3 years ago
committed by GitHub
parent
commit
268f94a6b8
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      daemon/src/maker.rs
  2. 10
      daemon/src/model/cfd.rs
  3. 832
      daemon/src/monitor.rs
  4. 429
      daemon/src/monitor/subscription.rs
  5. 11
      daemon/src/taker.rs

17
daemon/src/maker.rs

@ -160,7 +160,7 @@ async fn main() -> Result<()> {
let (maker_inc_connections_address, maker_inc_connections_context) =
xtra::Context::new(None);
let (monitor_actor_address, monitor_actor_context) = xtra::Context::new(None);
let (monitor_actor_address, mut monitor_actor_context) = xtra::Context::new(None);
let (oracle_actor_address, mut oracle_actor_context) = xtra::Context::new(None);
@ -187,9 +187,18 @@ async fn main() -> Result<()> {
cfd_maker_actor_inbox.clone(),
)),
);
tokio::spawn(monitor_actor_context.run(
monitor::Actor::new(&opts.electrum, cfd_maker_actor_inbox.clone(), cfds).await,
));
tokio::spawn(
monitor_actor_context
.notify_interval(Duration::from_secs(20), || monitor::Sync)
.unwrap(),
);
tokio::spawn(
monitor_actor_context.run(
monitor::Actor::new(&opts.electrum, cfd_maker_actor_inbox.clone(), cfds)
.await
.unwrap(),
),
);
tokio::spawn(
oracle_actor_context

10
daemon/src/model/cfd.rs

@ -10,7 +10,7 @@ use rocket::request::FromParam;
use rust_decimal::Decimal;
use rust_decimal_macros::dec;
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
use std::fmt;
use std::ops::RangeInclusive;
use std::time::{Duration, SystemTime};
use uuid::Uuid;
@ -24,8 +24,8 @@ impl Default for OrderId {
}
}
impl Display for OrderId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
impl fmt::Display for OrderId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
@ -297,8 +297,8 @@ impl CfdState {
}
}
impl Display for CfdState {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
impl fmt::Display for CfdState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
CfdState::OutgoingOrderRequest { .. } => {
write!(f, "Request sent")

832
daemon/src/monitor.rs

@ -1,20 +1,26 @@
use crate::actors::log_error;
use crate::model::cfd::{CetStatus, Cfd, CfdState, Dlc, OrderId};
use crate::monitor::subscription::Subscription;
use crate::oracle;
use anyhow::Result;
use anyhow::{Context, Result};
use async_trait::async_trait;
use bdk::bitcoin::{PublicKey, Script, Txid};
use bdk::descriptor::Descriptor;
use bdk::electrum_client::{ElectrumApi, GetHistoryRes, HeaderNotification};
use bdk::miniscript::DescriptorTrait;
use std::collections::HashMap;
use std::ops::RangeInclusive;
use subscription::Monitor;
mod subscription;
use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::convert::{TryFrom, TryInto};
use std::fmt;
use std::marker::Send;
use std::ops::{Add, RangeInclusive};
const FINALITY_CONFIRMATIONS: u32 = 1;
pub struct StartMonitoring {
pub id: OrderId,
pub params: MonitorParams,
}
#[derive(Clone)]
pub struct MonitorParams {
lock: (Txid, Descriptor<PublicKey>),
@ -23,27 +29,22 @@ pub struct MonitorParams {
refund: (Txid, Script, u32),
}
impl MonitorParams {
pub fn from_dlc_and_timelocks(dlc: Dlc, refund_timelock_in_blocks: u32) -> Self {
let script_pubkey = dlc.address.script_pubkey();
MonitorParams {
lock: (dlc.lock.0.txid(), dlc.lock.1),
commit: (dlc.commit.0.txid(), dlc.commit.2),
cets: dlc
.cets
.into_iter()
.map(|(tx, _, range)| (tx.txid(), script_pubkey.clone(), range))
.collect(),
refund: (
dlc.refund.0.txid(),
script_pubkey,
refund_timelock_in_blocks,
),
}
}
pub struct Sync;
pub struct Actor<T, C = bdk::electrum_client::Client>
where
T: xtra::Actor,
{
cfds: HashMap<OrderId, MonitorParams>,
cfd_actor_addr: xtra::Address<T>,
client: C,
latest_block_height: BlockHeight,
current_status: BTreeMap<(Txid, Script), ScriptStatus>,
awaiting_status: HashMap<(Txid, Script), Vec<(ScriptStatus, Event)>>,
}
impl<T> Actor<T>
impl<T> Actor<T, bdk::electrum_client::Client>
where
T: xtra::Actor + xtra::Handler<Event>,
{
@ -51,13 +52,23 @@ where
electrum_rpc_url: &str,
cfd_actor_addr: xtra::Address<T>,
cfds: Vec<Cfd>,
) -> Self {
let monitor = Monitor::new(electrum_rpc_url, FINALITY_CONFIRMATIONS).unwrap();
) -> Result<Self> {
let client = bdk::electrum_client::Client::new(electrum_rpc_url)
.context("Failed to initialize Electrum RPC client")?;
// Initially fetch the latest block for storing the height.
// We do not act on this subscription after this call.
let latest_block = client
.block_headers_subscribe()
.context("Failed to subscribe to header notifications")?;
let mut actor = Self {
monitor,
cfds: HashMap::new(),
cfd_actor_addr,
client,
latest_block_height: BlockHeight::try_from(latest_block)?,
current_status: BTreeMap::default(),
awaiting_status: HashMap::default(),
};
for cfd in cfds {
@ -66,40 +77,37 @@ where
CfdState::PendingOpen { dlc, .. } => {
let params = MonitorParams::from_dlc_and_timelocks(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.order.id, params.clone());
actor.monitor_all(&params, cfd.order.id).await;
actor.monitor_all(&params, cfd.order.id);
}
CfdState::Open { dlc, .. } | CfdState::PendingCommit { dlc, .. } => {
let params = MonitorParams::from_dlc_and_timelocks(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.order.id, params.clone());
actor.monitor_commit_finality_and_timelocks(&params, cfd.order.id).await;
actor.monitor_refund_finality( params.clone(),cfd.order.id).await;
actor.monitor_commit_finality(&params, cfd.order.id);
actor.monitor_commit_cet_timelock(&params, cfd.order.id);
actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id);
}
CfdState::OpenCommitted { dlc, cet_status, .. } => {
let params = MonitorParams::from_dlc_and_timelocks(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.order.id, params.clone());
let commit_subscription = actor
.monitor
.subscribe_to((params.commit.0, params.commit.1.script_pubkey()))
.await;
match cet_status {
CetStatus::Unprepared
| CetStatus::OracleSigned(_) => {
actor.monitor_commit_cet_timelock(cfd.order.id, &commit_subscription).await;
actor.monitor_commit_refund_timelock(&params, cfd.order.id, &commit_subscription).await;
actor.monitor_refund_finality( params.clone(),cfd.order.id).await;
actor.monitor_commit_cet_timelock(&params, cfd.order.id);
actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id);
}
CetStatus::TimelockExpired => {
actor.monitor_commit_refund_timelock(&params, cfd.order.id, &commit_subscription).await;
actor.monitor_refund_finality( params.clone(),cfd.order.id).await;
actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id);
}
CetStatus::Ready(_price) => {
// TODO: monitor CET finality
actor.monitor_commit_refund_timelock(&params, cfd.order.id, &commit_subscription).await;
actor.monitor_refund_finality( params.clone(),cfd.order.id).await;
actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id);
}
}
}
@ -109,13 +117,8 @@ where
let params = MonitorParams::from_dlc_and_timelocks(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.order.id, params.clone());
let commit_subscription = actor
.monitor
.subscribe_to((params.commit.0, params.commit.1.script_pubkey()))
.await;
actor.monitor_commit_refund_timelock(&params, cfd.order.id, &commit_subscription).await;
actor.monitor_refund_finality( params.clone(),cfd.order.id).await;
actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id);
}
// too early to monitor
@ -131,143 +134,92 @@ where
}
}
actor
Ok(actor)
}
}
async fn handle_start_monitoring(&mut self, msg: StartMonitoring) -> Result<()> {
let StartMonitoring { id, params } = msg;
impl<T, C> Actor<T, C>
where
T: xtra::Actor + xtra::Handler<Event>,
C: bdk::electrum_client::ElectrumApi,
{
fn monitor_all(&mut self, params: &MonitorParams, order_id: OrderId) {
self.monitor_lock_finality(params, order_id);
self.monitor_commit_finality(params, order_id);
self.monitor_commit_cet_timelock(params, order_id);
self.monitor_commit_refund_timelock(params, order_id);
self.monitor_refund_finality(params, order_id);
}
self.cfds.insert(id, params.clone());
self.monitor_all(&params, id).await;
fn monitor_lock_finality(&mut self, params: &MonitorParams, order_id: OrderId) {
self.awaiting_status
.entry((params.lock.0, params.lock.1.script_pubkey()))
.or_default()
.push((ScriptStatus::finality(), Event::LockFinality(order_id)));
}
Ok(())
fn monitor_commit_finality(&mut self, params: &MonitorParams, order_id: OrderId) {
self.awaiting_status
.entry((params.commit.0, params.commit.1.script_pubkey()))
.or_default()
.push((ScriptStatus::finality(), Event::CommitFinality(order_id)));
}
async fn monitor_all(&self, params: &MonitorParams, order_id: OrderId) {
self.monitor_lock_finality(params, order_id).await;
fn monitor_commit_cet_timelock(&mut self, params: &MonitorParams, order_id: OrderId) {
self.awaiting_status
.entry((params.commit.0, params.commit.1.script_pubkey()))
.or_default()
.push((
ScriptStatus::with_confirmations(Cfd::CET_TIMELOCK),
Event::CetTimelockExpired(order_id),
));
}
self.monitor_commit_finality_and_timelocks(params, order_id)
.await;
fn monitor_commit_refund_timelock(&mut self, params: &MonitorParams, order_id: OrderId) {
self.awaiting_status
.entry((params.commit.0, params.commit.1.script_pubkey()))
.or_default()
.push((
ScriptStatus::with_confirmations(params.refund.2),
Event::RefundTimelockExpired(order_id),
));
}
self.monitor_refund_finality(params.clone(), order_id).await;
fn monitor_refund_finality(&mut self, params: &MonitorParams, order_id: OrderId) {
self.awaiting_status
.entry((params.refund.0, params.refund.1.clone()))
.or_default()
.push((ScriptStatus::finality(), Event::RefundFinality(order_id)));
}
async fn monitor_lock_finality(&self, params: &MonitorParams, order_id: OrderId) {
tokio::spawn({
let cfd_actor_addr = self.cfd_actor_addr.clone();
let lock_subscription = self
.monitor
.subscribe_to((params.lock.0, params.lock.1.script_pubkey()))
.await;
async move {
lock_subscription.wait_until_final().await.unwrap();
async fn sync(&mut self) -> Result<()> {
// Fetch the latest block for storing the height.
// We do not act on this subscription after this call, as we cannot rely on
// subscription push notifications because eventually the Electrum server will
// close the connection and subscriptions are not automatically renewed
// upon renewing the connection.
let latest_block_height = self
.client
.block_headers_subscribe()
.context("Failed to subscribe to header notifications")?
.try_into()?;
tracing::trace!(
"Updating status of {} transactions",
self.awaiting_status.len()
);
let histories = self
.client
.batch_script_get_history(self.awaiting_status.keys().map(|(_, script)| script))
.context("Failed to get script histories")?;
self.update_state(latest_block_height, histories).await?;
cfd_actor_addr
.do_send_async(Event::LockFinality(order_id))
.await
.unwrap();
}
});
}
async fn monitor_commit_finality_and_timelocks(
&self,
params: &MonitorParams,
order_id: OrderId,
) {
let commit_subscription = self
.monitor
.subscribe_to((params.commit.0, params.commit.1.script_pubkey()))
.await;
self.monitor_commit_finality(order_id, &commit_subscription)
.await;
self.monitor_commit_cet_timelock(order_id, &commit_subscription)
.await;
self.monitor_commit_refund_timelock(params, order_id, &commit_subscription)
.await;
}
async fn monitor_commit_finality(&self, order_id: OrderId, commit_subscription: &Subscription) {
tokio::spawn({
let cfd_actor_addr = self.cfd_actor_addr.clone();
let commit_subscription = commit_subscription.clone();
async move {
commit_subscription.wait_until_final().await.unwrap();
cfd_actor_addr
.do_send_async(Event::CommitFinality(order_id))
.await
.unwrap();
}
});
}
async fn monitor_commit_cet_timelock(
&self,
order_id: OrderId,
commit_subscription: &Subscription,
) {
tokio::spawn({
let cfd_actor_addr = self.cfd_actor_addr.clone();
let commit_subscription = commit_subscription.clone();
async move {
commit_subscription
.wait_until_confirmed_with(Cfd::CET_TIMELOCK)
.await
.unwrap();
cfd_actor_addr
.do_send_async(Event::CetTimelockExpired(order_id))
.await
.unwrap();
}
});
}
async fn monitor_commit_refund_timelock(
&self,
params: &MonitorParams,
order_id: OrderId,
commit_subscription: &Subscription,
) {
tokio::spawn({
let cfd_actor_addr = self.cfd_actor_addr.clone();
let commit_subscription = commit_subscription.clone();
let refund_timelock = params.refund.2;
async move {
commit_subscription
.wait_until_confirmed_with(refund_timelock)
.await
.unwrap();
cfd_actor_addr
.do_send_async(Event::RefundTimelockExpired(order_id))
.await
.unwrap();
}
});
}
async fn monitor_refund_finality(&self, params: MonitorParams, order_id: OrderId) {
tokio::spawn({
let cfd_actor_addr = self.cfd_actor_addr.clone();
let refund_subscription = self
.monitor
.subscribe_to((params.refund.0, params.refund.1))
.await;
async move {
refund_subscription.wait_until_final().await.unwrap();
cfd_actor_addr
.do_send_async(Event::RefundFinality(order_id))
.await
.unwrap();
}
});
Ok(())
}
pub async fn handle_oracle_attestation(&self, attestation: oracle::Attestation) -> Result<()> {
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
for (order_id, MonitorParams { cets, .. }) in self.cfds.clone().into_iter() {
let (txid, script_pubkey) =
match cets.iter().find_map(|(txid, script_pubkey, range)| {
@ -279,37 +231,211 @@ where
None => continue,
};
tokio::spawn({
let cfd_actor_addr = self.cfd_actor_addr.clone();
let cet_subscription = self
.monitor
.subscribe_to((*txid, script_pubkey.clone()))
.await;
async move {
cet_subscription.wait_until_final().await.unwrap();
cfd_actor_addr
.do_send_async(Event::CetFinality(order_id))
.await
.unwrap();
self.awaiting_status
.entry((*txid, script_pubkey.clone()))
.or_default()
.push((ScriptStatus::finality(), Event::CetFinality(order_id)));
}
Ok(())
}
async fn update_state(
&mut self,
latest_block_height: BlockHeight,
histories: Vec<Vec<GetHistoryRes>>,
) -> Result<()> {
if latest_block_height > self.latest_block_height {
tracing::debug!(
block_height = u32::from(latest_block_height),
"Got notification for new block"
);
self.latest_block_height = latest_block_height;
}
// 1. shape response into local data format
let new_status = histories.into_iter().zip(self.awaiting_status.keys().cloned()).map(|(script_history, (txid, script))| {
let new_script_status = match script_history.as_slice() {
[] => ScriptStatus::Unseen,
[remaining @ .., last] => {
if !remaining.is_empty() {
tracing::warn!("Found more than a single history entry for script. This is highly unexpected and those history entries will be ignored")
}
if last.height <= 0 {
ScriptStatus::InMempool
} else {
ScriptStatus::Confirmed(
Confirmed::from_inclusion_and_latest_block(
u32::try_from(last.height).expect("we checked that height is > 0"),
u32::from(self.latest_block_height),
),
)
}
}
};
((txid, script), new_script_status)
}).collect::<BTreeMap<_, _>>();
// 2. log any changes since our last sync
for ((txid, script), status) in new_status.iter() {
let old = self.current_status.get(&(*txid, script.clone()));
print_status_change(*txid, old, status);
}
// 3. update local state
self.current_status = new_status;
// 4. check for finished monitoring tasks
for ((txid, script), status) in self.current_status.iter() {
match self.awaiting_status.entry((*txid, script.clone())) {
Entry::Vacant(_) => {
unreachable!("we are only fetching the status of scripts we are waiting for")
}
Entry::Occupied(mut occupied) => {
let targets = occupied.insert(Vec::new());
// Split vec into two lists, all the ones for which we reached the target and
// the ones which we need to still monitor
let (reached_monitoring_target, remaining) = targets
.into_iter()
.partition::<Vec<_>, _>(|(target_status, event)| {
tracing::trace!(
"{:?} requires {} and we have {}",
event,
target_status,
status
);
status >= target_status
});
tracing::trace!("{} subscriptions reached their monitoring target, {} remaining for this script", reached_monitoring_target.len(), remaining.len());
occupied.insert(remaining);
for (target_status, event) in reached_monitoring_target {
tracing::info!(%txid, target = %target_status, current = %status, "Bitcoin transaction reached monitoring target");
self.cfd_actor_addr.send(event).await?;
}
}
});
}
}
Ok(())
}
}
pub struct StartMonitoring {
pub id: OrderId,
pub params: MonitorParams,
fn print_status_change(txid: Txid, old: Option<&ScriptStatus>, new: &ScriptStatus) {
match (old, new) {
(None, new_status) if new_status > &ScriptStatus::Unseen => {
tracing::debug!(%txid, status = %new_status, "Found relevant Bitcoin transaction");
}
(Some(old_status), new_status) if old_status != new_status => {
tracing::debug!(%txid, %new_status, %old_status, "Bitcoin transaction status changed");
}
_ => {}
}
}
#[derive(Debug, Copy, Clone, PartialEq, PartialOrd)]
struct Confirmed {
/// The depth of this transaction within the blockchain.
///
/// Will be zero if the transaction is included in the latest block.
depth: u32,
}
impl Confirmed {
fn with_confirmations(blocks: u32) -> Self {
Self { depth: blocks - 1 }
}
/// Compute the depth of a transaction based on its inclusion height and the
/// latest known block.
///
/// Our information about the latest block might be outdated. To avoid an
/// overflow, we make sure the depth is 0 in case the inclusion height
/// exceeds our latest known block,
fn from_inclusion_and_latest_block(inclusion_height: u32, latest_block: u32) -> Self {
let depth = latest_block.saturating_sub(inclusion_height);
Self { depth }
}
fn confirmations(&self) -> u32 {
self.depth + 1
}
}
#[derive(Debug, Copy, Clone, PartialEq, PartialOrd)]
enum ScriptStatus {
Unseen,
InMempool,
Confirmed(Confirmed),
}
impl ScriptStatus {
fn with_confirmations(confirmations: u32) -> Self {
Self::Confirmed(Confirmed::with_confirmations(confirmations))
}
fn finality() -> Self {
Self::with_confirmations(FINALITY_CONFIRMATIONS)
}
}
impl fmt::Display for ScriptStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ScriptStatus::Unseen => write!(f, "unseen"),
ScriptStatus::InMempool => write!(f, "in mempool"),
ScriptStatus::Confirmed(inner) => {
write!(f, "confirmed with {} blocks", inner.confirmations())
}
}
}
}
/// Represent a block height, or block number, expressed in absolute block
/// count. E.g. The transaction was included in block #655123, 655123 block
/// after the genesis block.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd)]
struct BlockHeight(u32);
impl From<BlockHeight> for u32 {
fn from(height: BlockHeight) -> Self {
height.0
}
}
impl TryFrom<HeaderNotification> for BlockHeight {
type Error = anyhow::Error;
fn try_from(value: HeaderNotification) -> Result<Self, Self::Error> {
Ok(Self(
value
.height
.try_into()
.context("Failed to fit usize into u32")?,
))
}
}
impl Add<u32> for BlockHeight {
type Output = BlockHeight;
fn add(self, rhs: u32) -> Self::Output {
BlockHeight(self.0 + rhs)
}
}
impl xtra::Message for StartMonitoring {
type Result = ();
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub enum Event {
LockFinality(OrderId),
CommitFinality(OrderId),
@ -334,28 +460,63 @@ impl Event {
}
}
impl MonitorParams {
pub fn from_dlc_and_timelocks(dlc: Dlc, refund_timelock_in_blocks: u32) -> Self {
let script_pubkey = dlc.address.script_pubkey();
MonitorParams {
lock: (dlc.lock.0.txid(), dlc.lock.1),
commit: (dlc.commit.0.txid(), dlc.commit.2),
cets: dlc
.cets
.into_iter()
.map(|(tx, _, range)| (tx.txid(), script_pubkey.clone(), range))
.collect(),
refund: (
dlc.refund.0.txid(),
script_pubkey,
refund_timelock_in_blocks,
),
}
}
}
impl xtra::Message for Event {
type Result = ();
}
pub struct Actor<T>
impl xtra::Message for Sync {
type Result = ();
}
impl<T, C> xtra::Actor for Actor<T, C>
where
T: xtra::Actor,
C: Send,
C: 'static,
{
monitor: Monitor,
cfds: HashMap<OrderId, MonitorParams>,
cfd_actor_addr: xtra::Address<T>,
}
impl<T> xtra::Actor for Actor<T> where T: xtra::Actor {}
#[async_trait]
impl<T> xtra::Handler<StartMonitoring> for Actor<T>
impl<T, C> xtra::Handler<StartMonitoring> for Actor<T, C>
where
T: xtra::Actor + xtra::Handler<Event>,
C: bdk::electrum_client::ElectrumApi + Send + 'static,
{
async fn handle(&mut self, msg: StartMonitoring, _ctx: &mut xtra::Context<Self>) {
log_error!(self.handle_start_monitoring(msg));
let StartMonitoring { id, params } = msg;
self.monitor_all(&params, id);
self.cfds.insert(id, params);
}
}
#[async_trait]
impl<T, C> xtra::Handler<Sync> for Actor<T, C>
where
T: xtra::Actor + xtra::Handler<Event>,
C: bdk::electrum_client::ElectrumApi + Send + 'static,
{
async fn handle(&mut self, _: Sync, _ctx: &mut xtra::Context<Self>) {
log_error!(self.sync());
}
}
@ -368,3 +529,272 @@ where
log_error!(self.handle_oracle_attestation(msg));
}
}
#[cfg(test)]
mod tests {
use super::*;
use bdk::bitcoin::blockdata::block;
use bdk::electrum_client::{
Batch, Error, GetBalanceRes, GetHeadersRes, GetMerkleRes, ListUnspentRes,
RawHeaderNotification, ServerFeaturesRes,
};
use std::iter::FromIterator;
use tracing_subscriber::prelude::*;
#[tokio::test]
async fn can_handle_multiple_subscriptions_on_the_same_transaction() {
let _guard = tracing_subscriber::fmt()
.with_env_filter("trace")
.with_test_writer()
.set_default();
let (recorder_address, mut recorder_context) =
xtra::Context::<MessageRecordingActor>::new(None);
let mut recorder = MessageRecordingActor::default();
let commit_finality = Event::CommitFinality(OrderId::default());
let refund_expired = Event::RefundTimelockExpired(OrderId::default());
let mut monitor = Actor::for_test(
recorder_address,
[(
(txid1(), script1()),
vec![
(ScriptStatus::finality(), commit_finality.clone()),
(
ScriptStatus::with_confirmations(Cfd::CET_TIMELOCK),
refund_expired.clone(),
),
],
)],
);
monitor.client.include_tx(txid1(), 5);
monitor.client.advance_to_height(10);
recorder_context
.handle_while(&mut recorder, monitor.sync())
.await
.unwrap();
assert_eq!(recorder.events[0], commit_finality);
monitor.client.advance_to_height(20);
recorder_context
.handle_while(&mut recorder, monitor.sync())
.await
.unwrap();
assert_eq!(recorder.events[1], refund_expired);
}
impl<A> Actor<A, stub::Client>
where
A: xtra::Actor + xtra::Handler<Event>,
{
#[allow(clippy::type_complexity)]
fn for_test<const N: usize>(
address: xtra::Address<A>,
subscriptions: [((Txid, Script), Vec<(ScriptStatus, Event)>); N],
) -> Self {
Actor {
cfds: HashMap::default(),
cfd_actor_addr: address,
client: stub::Client::default(),
latest_block_height: BlockHeight(0),
current_status: BTreeMap::default(),
awaiting_status: HashMap::from_iter(subscriptions),
}
}
}
fn txid1() -> Txid {
"1278ef8104c2f63c03d4d52bace29bed28bd5e664e67543735ddc95a39bfdc0f"
.parse()
.unwrap()
}
fn script1() -> Script {
"6a4c50001d97ca0002d3829148f63cc8ee21241e3f1c5eaee58781dd45a7d814710fac571b92aadff583e85d5a295f61856f469b401efe615657bf040c32f1000065bce011a420ca9ea3657fff154d95d1a95c".parse().unwrap()
}
#[derive(Default)]
struct MessageRecordingActor {
events: Vec<Event>,
}
impl xtra::Actor for MessageRecordingActor {}
#[async_trait]
impl xtra::Handler<Event> for MessageRecordingActor {
async fn handle(&mut self, message: Event, _ctx: &mut xtra::Context<Self>) {
self.events.push(message);
}
}
mod stub {
use super::*;
use bdk::electrum_client::ScriptStatus;
#[derive(Default)]
pub struct Client {
transactions: HashMap<Txid, i32>,
block_height: usize,
}
impl Client {
pub fn include_tx(&mut self, tx: Txid, height: i32) {
self.transactions.insert(tx, height);
}
pub fn advance_to_height(&mut self, height: usize) {
self.block_height = height;
}
}
impl ElectrumApi for Client {
fn block_headers_subscribe(&self) -> Result<HeaderNotification, Error> {
Ok(HeaderNotification {
height: self.block_height,
header: block::BlockHeader {
version: 0,
prev_blockhash: Default::default(),
merkle_root: Default::default(),
time: 0,
bits: 0,
nonce: 0,
},
})
}
fn batch_script_get_history<'s, I>(
&self,
_: I,
) -> Result<Vec<Vec<GetHistoryRes>>, Error>
where
I: IntoIterator<Item = &'s Script> + Clone,
{
Ok(self
.transactions
.iter()
.map(|(tx, included_at)| {
vec![GetHistoryRes {
height: *included_at,
tx_hash: *tx,
fee: None,
}]
})
.collect())
}
fn batch_call(&self, _batch: &Batch) -> Result<Vec<serde_json::Value>, Error> {
unreachable!("This is a test.")
}
fn block_headers_subscribe_raw(&self) -> Result<RawHeaderNotification, Error> {
unreachable!("This is a test.")
}
fn block_headers_pop_raw(&self) -> Result<Option<RawHeaderNotification>, Error> {
unreachable!("This is a test.")
}
fn block_header_raw(&self, _height: usize) -> Result<Vec<u8>, Error> {
unreachable!("This is a test.")
}
fn block_headers(&self, _: usize, _: usize) -> Result<GetHeadersRes, Error> {
unreachable!("This is a test.")
}
fn estimate_fee(&self, _number: usize) -> Result<f64, Error> {
unreachable!("This is a test.")
}
fn relay_fee(&self) -> Result<f64, Error> {
unreachable!("This is a test.")
}
fn script_subscribe(&self, _script: &Script) -> Result<Option<ScriptStatus>, Error> {
unreachable!("This is a test.")
}
fn script_unsubscribe(&self, _script: &Script) -> Result<bool, Error> {
unreachable!("This is a test.")
}
fn script_pop(&self, _: &Script) -> Result<Option<ScriptStatus>, Error> {
unreachable!("This is a test.")
}
fn script_get_balance(&self, _script: &Script) -> Result<GetBalanceRes, Error> {
unreachable!("This is a test.")
}
fn batch_script_get_balance<'s, I>(&self, _: I) -> Result<Vec<GetBalanceRes>, Error>
where
I: IntoIterator<Item = &'s Script> + Clone,
{
unreachable!("This is a test.")
}
fn script_get_history(&self, _script: &Script) -> Result<Vec<GetHistoryRes>, Error> {
unreachable!("This is a test.")
}
fn script_list_unspent(&self, _script: &Script) -> Result<Vec<ListUnspentRes>, Error> {
unreachable!("This is a test.")
}
fn batch_script_list_unspent<'s, I>(
&self,
_: I,
) -> Result<Vec<Vec<ListUnspentRes>>, Error>
where
I: IntoIterator<Item = &'s Script> + Clone,
{
unreachable!("This is a test.")
}
fn transaction_get_raw(&self, _txid: &Txid) -> Result<Vec<u8>, Error> {
unreachable!("This is a test.")
}
fn batch_transaction_get_raw<'t, I>(&self, _txids: I) -> Result<Vec<Vec<u8>>, Error>
where
I: IntoIterator<Item = &'t Txid> + Clone,
{
unreachable!("This is a test.")
}
fn batch_block_header_raw<I>(&self, _heights: I) -> Result<Vec<Vec<u8>>, Error>
where
I: IntoIterator<Item = u32> + Clone,
{
unreachable!("This is a test.")
}
fn batch_estimate_fee<I>(&self, _numbers: I) -> Result<Vec<f64>, Error>
where
I: IntoIterator<Item = usize> + Clone,
{
unreachable!("This is a test.")
}
fn transaction_broadcast_raw(&self, _raw_tx: &[u8]) -> Result<Txid, Error> {
unreachable!("This is a test.")
}
fn transaction_get_merkle(&self, _: &Txid, _: usize) -> Result<GetMerkleRes, Error> {
unreachable!("This is a test.")
}
fn server_features(&self) -> Result<ServerFeaturesRes, Error> {
unreachable!("This is a test.")
}
fn ping(&self) -> Result<(), Error> {
unreachable!("This is a test.")
}
}
}
}

429
daemon/src/monitor/subscription.rs

@ -1,429 +0,0 @@
#![allow(dead_code)]
use anyhow::{bail, Context, Result};
use bdk::bitcoin::{Script, Txid};
use bdk::electrum_client::{ElectrumApi, GetHistoryRes, HeaderNotification};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap};
use std::convert::{TryFrom, TryInto};
use std::fmt;
use std::ops::Add;
use std::sync::Arc;
use tokio::sync::{watch, Mutex};
use tokio::time::{Duration, Instant};
pub struct Monitor {
client: Arc<Mutex<Client>>,
finality_confirmations: u32,
}
impl Monitor {
pub fn new(electrum_rpc_url: &str, finality_confirmations: u32) -> Result<Self> {
let client = bdk::electrum_client::Client::new(electrum_rpc_url)
.context("Failed to initialize Electrum RPC client")?;
let client = Client::new(client, Duration::from_secs(10))?;
let monitor = Monitor {
client: Arc::new(Mutex::new(client)),
finality_confirmations,
};
Ok(monitor)
}
pub async fn subscribe_to(&self, tx: impl Watchable + Send + 'static) -> Subscription {
let txid = tx.id();
let script = tx.script();
let sub = self
.client
.lock()
.await
.subscriptions
.entry((txid, script.clone()))
.or_insert_with(|| {
let (sender, receiver) = watch::channel(ScriptStatus::Unseen);
let client = self.client.clone();
tokio::spawn(async move {
let mut last_status = None;
// TODO: We need feedback in the monitoring actor about failures in here
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
let new_status = match client.lock().await.status_of_script(&tx) {
Ok(new_status) => new_status,
Err(error) => {
tracing::warn!(%txid, "Failed to get status of script: {:#}", error);
return;
}
};
last_status = Some(print_status_change(txid, last_status, new_status));
let all_receivers_gone = sender.send(new_status).is_err();
if all_receivers_gone {
tracing::debug!(%txid, "All receivers gone, removing subscription");
client.lock().await.subscriptions.remove(&(txid, script));
return;
}
}
});
Subscription {
receiver,
finality_confirmations: self.finality_confirmations,
txid,
}
})
.clone();
sub
}
}
/// Represents a subscription to the status of a given transaction.
#[derive(Debug, Clone)]
pub struct Subscription {
receiver: watch::Receiver<ScriptStatus>,
finality_confirmations: u32,
txid: Txid,
}
impl Subscription {
pub async fn wait_until_final(&self) -> Result<()> {
let conf_target = self.finality_confirmations;
let txid = self.txid;
tracing::info!(%txid, required_confirmation=%conf_target, "Waiting for Bitcoin transaction finality");
let mut seen_confirmations = 0;
self.wait_until(|status| match status {
ScriptStatus::Confirmed(inner) => {
let confirmations = inner.confirmations();
if confirmations > seen_confirmations {
tracing::info!(%txid,
seen_confirmations = %confirmations,
needed_confirmations = %conf_target,
"Waiting for Bitcoin transaction finality");
seen_confirmations = confirmations;
}
inner.meets_target(conf_target)
}
_ => false,
})
.await
}
pub async fn wait_until_seen(&self) -> Result<()> {
self.wait_until(ScriptStatus::has_been_seen).await
}
pub async fn wait_until_confirmed_with<T>(&self, target: T) -> Result<()>
where
u32: PartialOrd<T>,
T: Copy,
{
self.wait_until(|status| status.is_confirmed_with(target))
.await
}
async fn wait_until(&self, mut predicate: impl FnMut(&ScriptStatus) -> bool) -> Result<()> {
let mut receiver = self.receiver.clone();
while !predicate(&receiver.borrow()) {
receiver
.changed()
.await
.context("Failed while waiting for next status update")?;
}
Ok(())
}
}
/// Defines a watchable transaction.
///
/// For a transaction to be watchable, we need to know two things: Its
/// transaction ID and the specific output script that is going to change.
/// A transaction can obviously have multiple outputs but our protocol purposes,
/// we are usually interested in a specific one.
pub trait Watchable {
fn id(&self) -> Txid;
fn script(&self) -> Script;
}
impl Watchable for (Txid, Script) {
fn id(&self) -> Txid {
self.0
}
fn script(&self) -> Script {
self.1.clone()
}
}
fn print_status_change(txid: Txid, old: Option<ScriptStatus>, new: ScriptStatus) -> ScriptStatus {
match (old, new) {
(None, new_status) => {
tracing::debug!(%txid, status = %new_status, "Found relevant Bitcoin transaction");
}
(Some(old_status), new_status) if old_status != new_status => {
tracing::debug!(%txid, %new_status, %old_status, "Bitcoin transaction status changed");
}
_ => {}
}
new
}
pub struct Client {
electrum: bdk::electrum_client::Client,
latest_block_height: BlockHeight,
last_sync: Instant,
sync_interval: Duration,
script_history: BTreeMap<Script, Vec<GetHistoryRes>>,
subscriptions: HashMap<(Txid, Script), Subscription>,
}
impl Client {
fn new(electrum: bdk::electrum_client::Client, interval: Duration) -> Result<Self> {
// Initially fetch the latest block for storing the height.
// We do not act on this subscription after this call.
let latest_block = electrum
.block_headers_subscribe()
.context("Failed to subscribe to header notifications")?;
Ok(Self {
electrum,
latest_block_height: BlockHeight::try_from(latest_block)?,
last_sync: Instant::now(),
sync_interval: interval,
script_history: Default::default(),
subscriptions: Default::default(),
})
}
fn update_state(&mut self) -> Result<()> {
let now = Instant::now();
if now < self.last_sync + self.sync_interval {
return Ok(());
}
self.last_sync = now;
self.update_latest_block()?;
self.update_script_histories()?;
Ok(())
}
fn status_of_script<T>(&mut self, tx: &T) -> Result<ScriptStatus>
where
T: Watchable,
{
let txid = tx.id();
let script = tx.script();
if !self.script_history.contains_key(&script) {
self.script_history.insert(script.clone(), vec![]);
}
self.update_state()?;
let history = self.script_history.entry(script).or_default();
let history_of_tx = history
.iter()
.filter(|entry| entry.tx_hash == txid)
.collect::<Vec<_>>();
match history_of_tx.as_slice() {
[] => Ok(ScriptStatus::Unseen),
[remaining @ .., last] => {
if !remaining.is_empty() {
tracing::warn!("Found more than a single history entry for script. This is highly unexpected and those history entries will be ignored")
}
if last.height <= 0 {
Ok(ScriptStatus::InMempool)
} else {
Ok(ScriptStatus::Confirmed(
Confirmed::from_inclusion_and_latest_block(
u32::try_from(last.height)?,
u32::from(self.latest_block_height),
),
))
}
}
}
}
fn update_latest_block(&mut self) -> Result<()> {
// Fetch the latest block for storing the height.
// We do not act on this subscription after this call, as we cannot rely on
// subscription push notifications because eventually the Electrum server will
// close the connection and subscriptions are not automatically renewed
// upon renewing the connection.
let latest_block = self
.electrum
.block_headers_subscribe()
.context("Failed to subscribe to header notifications")?;
let latest_block_height = BlockHeight::try_from(latest_block)?;
if latest_block_height > self.latest_block_height {
tracing::debug!(
block_height = u32::from(latest_block_height),
"Got notification for new block"
);
self.latest_block_height = latest_block_height;
}
Ok(())
}
fn update_script_histories(&mut self) -> Result<()> {
let histories = self
.electrum
.batch_script_get_history(self.script_history.keys())
.context("Failed to get script histories")?;
if histories.len() != self.script_history.len() {
bail!(
"Expected {} history entries, received {}",
self.script_history.len(),
histories.len()
);
}
let scripts = self.script_history.keys().cloned();
let histories = histories.into_iter();
self.script_history = scripts.zip(histories).collect::<BTreeMap<_, _>>();
Ok(())
}
}
/// Represent a block height, or block number, expressed in absolute block
/// count. E.g. The transaction was included in block #655123, 655123 block
/// after the genesis block.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
#[serde(transparent)]
pub struct BlockHeight(u32);
impl From<BlockHeight> for u32 {
fn from(height: BlockHeight) -> Self {
height.0
}
}
impl TryFrom<HeaderNotification> for BlockHeight {
type Error = anyhow::Error;
fn try_from(value: HeaderNotification) -> Result<Self, Self::Error> {
Ok(Self(
value
.height
.try_into()
.context("Failed to fit usize into u32")?,
))
}
}
impl Add<u32> for BlockHeight {
type Output = BlockHeight;
fn add(self, rhs: u32) -> Self::Output {
BlockHeight(self.0 + rhs)
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum ExpiredTimelocks {
None,
Cancel,
Punish,
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub struct Confirmed {
/// The depth of this transaction within the blockchain.
///
/// Will be zero if the transaction is included in the latest block.
depth: u32,
}
impl Confirmed {
pub fn new(depth: u32) -> Self {
Self { depth }
}
/// Compute the depth of a transaction based on its inclusion height and the
/// latest known block.
///
/// Our information about the latest block might be outdated. To avoid an
/// overflow, we make sure the depth is 0 in case the inclusion height
/// exceeds our latest known block,
pub fn from_inclusion_and_latest_block(inclusion_height: u32, latest_block: u32) -> Self {
let depth = latest_block.saturating_sub(inclusion_height);
Self { depth }
}
pub fn confirmations(&self) -> u32 {
self.depth + 1
}
pub fn meets_target<T>(&self, target: T) -> bool
where
u32: PartialOrd<T>,
{
self.confirmations() >= target
}
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum ScriptStatus {
Unseen,
InMempool,
Confirmed(Confirmed),
}
impl ScriptStatus {
/// Check if the script has any confirmations.
pub fn is_confirmed(&self) -> bool {
matches!(self, ScriptStatus::Confirmed(_))
}
/// Check if the script has met the given confirmation target.
pub fn is_confirmed_with<T>(&self, target: T) -> bool
where
u32: PartialOrd<T>,
{
match self {
ScriptStatus::Confirmed(inner) => inner.meets_target(target),
_ => false,
}
}
pub fn has_been_seen(&self) -> bool {
matches!(self, ScriptStatus::InMempool | ScriptStatus::Confirmed(_))
}
}
impl fmt::Display for ScriptStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ScriptStatus::Unseen => write!(f, "unseen"),
ScriptStatus::InMempool => write!(f, "in mempool"),
ScriptStatus::Confirmed(inner) => {
write!(f, "confirmed with {} blocks", inner.confirmations())
}
}
}
}

11
daemon/src/taker.rs

@ -163,7 +163,7 @@ async fn main() -> Result<()> {
.create(None)
.spawn_global();
let (monitor_actor_address, monitor_actor_context) = xtra::Context::new(None);
let (monitor_actor_address, mut monitor_actor_context) = xtra::Context::new(None);
let (oracle_actor_address, mut oracle_actor_context) = xtra::Context::new(None);
@ -189,9 +189,16 @@ async fn main() -> Result<()> {
.map(move |item| taker_cfd::MakerStreamMessage { item });
tokio::spawn(cfd_actor_inbox.clone().attach_stream(read));
tokio::spawn(
monitor_actor_context
.notify_interval(Duration::from_secs(20), || monitor::Sync)
.unwrap(),
);
tokio::spawn(
monitor_actor_context.run(
monitor::Actor::new(&opts.electrum, cfd_actor_inbox.clone(), cfds).await,
monitor::Actor::new(&opts.electrum, cfd_actor_inbox.clone(), cfds)
.await
.unwrap(),
),
);
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));

Loading…
Cancel
Save