From 366c415314a2be49660db2176e91593b085e47ff Mon Sep 17 00:00:00 2001 From: Lucas Soriano del Pino Date: Fri, 24 Sep 2021 14:02:22 +1000 Subject: [PATCH] Introduce monitor actor --- Cargo.lock | 94 +++---- cfd_protocol/src/protocol.rs | 19 +- cfd_protocol/tests/cfds.rs | 19 +- daemon/src/maker.rs | 12 + daemon/src/maker_cfd.rs | 358 +++++++++++++++++++++++- daemon/src/model/cfd.rs | 78 +++--- daemon/src/monitor.rs | 205 ++++++++++++++ daemon/src/monitor/subscription.rs | 429 +++++++++++++++++++++++++++++ daemon/src/setup_contract_actor.rs | 12 +- daemon/src/taker_cfd.rs | 2 +- 10 files changed, 1106 insertions(+), 122 deletions(-) create mode 100644 daemon/src/monitor.rs create mode 100644 daemon/src/monitor/subscription.rs diff --git a/Cargo.lock b/Cargo.lock index bcf4778..2227507 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13,15 +13,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "aho-corasick" -version = "0.7.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f" -dependencies = [ - "memchr", -] - [[package]] name = "ansi_term" version = "0.12.1" @@ -154,8 +145,8 @@ dependencies = [ [[package]] name = "bdk" -version = "0.10.1-dev" -source = "git+https://github.com/bitcoindevkit/bdk/#acf157a99a305226203d2b55a567291a93c64720" +version = "0.11.1-dev" +source = "git+https://github.com/bitcoindevkit/bdk/#5a6a2cefdd5bf25fd76ab3f49ab9b2bbf11c2ab7" dependencies = [ "async-trait", "bdk-macros", @@ -168,13 +159,13 @@ dependencies = [ "serde", "serde_json", "sled", + "tokio", ] [[package]] name = "bdk-macros" version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3f510015e946c5995cc169f7ed4c92ba032bbce795c0956ee0d98d82f7aff78" +source = "git+https://github.com/bitcoindevkit/bdk/#5a6a2cefdd5bf25fd76ab3f49ab9b2bbf11c2ab7" dependencies = [ "proc-macro2", "quote", @@ -255,9 +246,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.7.0" +version = "3.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c59e7af012c713f529e7a3ee57ce9b31ddd858d4b512923602f74608b009631" +checksum = "d9df67f7bf9ef8498769f994239c45613ef0c5899415fb58e9add412d2c1a538" [[package]] name = "byteorder" @@ -979,9 +970,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "527e8c9ac747e28542699a951517aa9a6945af506cd1f2e1b53a576c17b6cc11" +checksum = "1323096b05d41827dadeaee54c9981958c0f94e670bc94ed80037d1a7b8b186b" dependencies = [ "bytes", "fnv", @@ -1013,9 +1004,9 @@ checksum = "6456b8a6c8f33fee7d958fcd1b60d55b11940a79e63ae87013e6d22e26034440" [[package]] name = "hyper" -version = "0.14.12" +version = "0.14.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13f67199e765030fa08fe0bd581af683f0d5bc04ea09c2b1102012c5fb90e7fd" +checksum = "15d1cfb9e4f68655fa04c01f59edb405b6074a0f7118ea881e5026e4a1cd8593" dependencies = [ "bytes", "futures-channel", @@ -1095,9 +1086,9 @@ checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" [[package]] name = "js-sys" -version = "0.3.54" +version = "0.3.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1866b355d9c878e5e607473cbe3f63282c0b7aad2db1dbebf55076c686918254" +checksum = "7cc9ffccd38c451a86bf13657df244e9c3f37493cce8e5e21e940963777acc84" dependencies = [ "wasm-bindgen", ] @@ -1110,9 +1101,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.101" +version = "0.2.102" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cb00336871be5ed2c8ed44b60ae9959dc5b9f08539422ed43f09e34ecaeba21" +checksum = "a2a5ac8f984bfcf3a823267e5fde638acc3325f6496633a5da6bb6eb2171e103" [[package]] name = "libsqlite3-sys" @@ -1168,12 +1159,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "maplit" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" - [[package]] name = "matchers" version = "0.0.1" @@ -1223,6 +1208,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d69450033bf162edf854d4aacaff82ca5ef34fa81f6cf69e1c81a103f0834997" dependencies = [ "bitcoin", + "serde", ] [[package]] @@ -1774,8 +1760,6 @@ version = "1.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" dependencies = [ - "aho-corasick", - "memchr", "regex-syntax", ] @@ -2244,9 +2228,9 @@ checksum = "c307a32c1c5c437f38c7fd45d753050587732ba8628319fbdf12a7e289ccc590" [[package]] name = "sled" -version = "0.34.6" +version = "0.34.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d0132f3e393bcb7390c60bb45769498cf4550bcb7a21d7f95c02b69f6362cdc" +checksum = "7f96b4737c2ce5987354855aed3797279def4ebf734436c6aa4552cf8e169935" dependencies = [ "crc32fast", "crossbeam-epoch", @@ -2266,9 +2250,9 @@ checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" [[package]] name = "socket2" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "765f090f0e423d2b55843402a07915add955e7d60657db13707a159727326cad" +checksum = "5dc90fe6c7be1a323296982db1836d1ea9e47b6839496dde9a541bc496df3516" dependencies = [ "libc", "winapi 0.3.9", @@ -2320,14 +2304,12 @@ dependencies = [ [[package]] name = "sqlformat" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "684001e7985ec1a9a66963b77ed151ef22a7876b3fdd7e37a57ec774f54b7d96" +checksum = "b4b7922be017ee70900be125523f38bdd644f4f06a1b16e8fa5a8ee8c34bffd4" dependencies = [ - "lazy_static", - "maplit", + "itertools", "nom", - "regex", "unicode_categories", ] @@ -2551,9 +2533,9 @@ checksum = "72c7ca5e81982d152f0788b131ebf96cc4bbd1b4575a7ed51f0f6fc866a05fdb" [[package]] name = "tinyvec" -version = "1.3.1" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "848a1e1181b9f6753b5e96a092749e29b11d19ede67dfbbd6c7dc7e0f49b5338" +checksum = "5241dd6f21443a3606b432718b166d3cedc962fd4b8bea54a8bc7f514ebda986" dependencies = [ "tinyvec_macros", ] @@ -2828,9 +2810,9 @@ checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b" [[package]] name = "unicode-width" -version = "0.1.8" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3" +checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973" [[package]] name = "unicode-xid" @@ -2931,9 +2913,9 @@ checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" [[package]] name = "wasm-bindgen" -version = "0.2.77" +version = "0.2.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e68338db6becec24d3c7977b5bf8a48be992c934b5d07177e3931f5dc9b076c" +checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce" dependencies = [ "cfg-if 1.0.0", "wasm-bindgen-macro", @@ -2941,9 +2923,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.77" +version = "0.2.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f34c405b4f0658583dba0c1c7c9b694f3cac32655db463b56c254a1c75269523" +checksum = "a317bf8f9fba2476b4b2c85ef4c4af8ff39c3c7f0cdfeed4f82c34a880aa837b" dependencies = [ "bumpalo", "lazy_static", @@ -2956,9 +2938,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.77" +version = "0.2.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9d5a6580be83b19dc570a8f9c324251687ab2184e57086f71625feb57ec77c8" +checksum = "d56146e7c495528bf6587663bea13a8eb588d39b36b679d83972e1a2dbbdacf9" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2966,9 +2948,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.77" +version = "0.2.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3775a030dc6f5a0afd8a84981a21cc92a781eb429acef9ecce476d0c9113e92" +checksum = "7803e0eea25835f8abdc585cd3021b3deb11543c6fe226dcd30b228857c5c5ab" dependencies = [ "proc-macro2", "quote", @@ -2979,15 +2961,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.77" +version = "0.2.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c279e376c7a8e8752a8f1eaa35b7b0bee6bb9fb0cdacfa97cc3f1f289c87e2b4" +checksum = "0237232789cf037d5480773fe568aac745bfe2afbc11a863e97901780a6b47cc" [[package]] name = "web-sys" -version = "0.3.54" +version = "0.3.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a84d70d1ec7d2da2d26a5bd78f4bca1b8c3254805363ce743b7a05bc30d195a" +checksum = "38eb105f1c59d9eaa6b5cdc92b859d85b926e82cb2e0945cd0c9259faa6fe9fb" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/cfd_protocol/src/protocol.rs b/cfd_protocol/src/protocol.rs index 76c9581..77ab5a5 100644 --- a/cfd_protocol/src/protocol.rs +++ b/cfd_protocol/src/protocol.rs @@ -82,7 +82,7 @@ pub fn create_cfd_transactions( (maker, maker_punish_params): (PartyParams, PunishParams), (taker, taker_punish_params): (PartyParams, PunishParams), (oracle_pk, nonce_pks): (schnorrsig::PublicKey, &[schnorrsig::PublicKey]), - refund_timelock: u32, + (cet_timelock, refund_timelock): (u32, u32), payouts: Vec, identity_sk: SecretKey, ) -> Result { @@ -109,7 +109,7 @@ pub fn create_cfd_transactions( taker_punish_params, ), (oracle_pk, nonce_pks), - refund_timelock, + (cet_timelock, refund_timelock), payouts, identity_sk, ) @@ -130,7 +130,7 @@ pub fn renew_cfd_transactions( PunishParams, ), (oracle_pk, nonce_pks): (schnorrsig::PublicKey, &[schnorrsig::PublicKey]), - refund_timelock: u32, + (cet_timelock, refund_timelock): (u32, u32), payouts: Vec, identity_sk: SecretKey, ) -> Result { @@ -149,7 +149,7 @@ pub fn renew_cfd_transactions( taker_punish_params, ), (oracle_pk, nonce_pks), - refund_timelock, + (cet_timelock, refund_timelock), payouts, identity_sk, ) @@ -170,17 +170,10 @@ fn build_cfds( PunishParams, ), (oracle_pk, nonce_pks): (schnorrsig::PublicKey, &[schnorrsig::PublicKey]), - refund_timelock: u32, + (cet_timelock, refund_timelock): (u32, u32), payouts: Vec, identity_sk: SecretKey, ) -> Result { - /// Relative timelock used for every CET. - /// - /// This is used to allow parties to punish the publication of revoked commitment transactions. - /// - /// TODO: Should this be an argument to this function? - const CET_TIMELOCK: u32 = 12; - let commit_tx = CommitTransaction::new( &lock_tx.global.unsigned_tx, ( @@ -230,7 +223,7 @@ fn build_cfds( &maker_address, &taker_address, nonce_pks, - CET_TIMELOCK, + cet_timelock, )?; let encsig = cet.encsign(identity_sk, &oracle_pk)?; diff --git a/cfd_protocol/tests/cfds.rs b/cfd_protocol/tests/cfds.rs index 6cfdf9a..e9a3127 100644 --- a/cfd_protocol/tests/cfds.rs +++ b/cfd_protocol/tests/cfds.rs @@ -46,6 +46,7 @@ fn create_cfd() { ] .concat(); + let cet_timelock = 0; let refund_timelock = 0; let (maker_cfd_txs, taker_cfd_txs, maker, taker, maker_addr, taker_addr) = create_cfd_txs( @@ -54,7 +55,7 @@ fn create_cfd() { (&taker_wallet, taker_lock_amount), (oracle.public_key(), &announcement.nonce_pks()), payouts, - refund_timelock, + (cet_timelock, refund_timelock), ); let lock_desc = lock_descriptor(maker.pk, taker.pk); @@ -126,6 +127,7 @@ fn renew_cfd() { ] .concat(); + let cet_timelock = 0; let refund_timelock = 0; let (maker_cfd_txs, taker_cfd_txs, maker, taker, maker_addr, taker_addr) = create_cfd_txs( @@ -134,7 +136,7 @@ fn renew_cfd() { (&taker_wallet, taker_lock_amount), (oracle.public_key(), &announcement.nonce_pks()), payouts, - refund_timelock, + (cet_timelock, refund_timelock), ); // renew cfd transactions @@ -184,7 +186,7 @@ fn renew_cfd() { }, ), (oracle.public_key(), &announcement.nonce_pks()), - refund_timelock, + (cet_timelock, refund_timelock), payouts.clone(), maker.sk, ) @@ -211,7 +213,7 @@ fn renew_cfd() { }, ), (oracle.public_key(), &announcement.nonce_pks()), - refund_timelock, + (cet_timelock, refund_timelock), payouts, taker.sk, ) @@ -283,6 +285,7 @@ fn collaboratively_close_cfd() { .unwrap()] .concat(); + let cet_timelock = 0; let refund_timelock = 0; let (maker_cfd_txs, _, maker, taker, maker_addr, taker_addr) = create_cfd_txs( @@ -291,7 +294,7 @@ fn collaboratively_close_cfd() { (&taker_wallet, taker_lock_amount), (oracle.public_key(), &announcement.nonce_pks()), payouts, - refund_timelock, + (cet_timelock, refund_timelock), ); let lock_tx = maker_cfd_txs.lock.extract_tx(); @@ -336,7 +339,7 @@ fn create_cfd_txs( (taker_wallet, taker_lock_amount): (&bdk::Wallet<(), bdk::database::MemoryDatabase>, Amount), (oracle_pk, nonce_pks): (schnorrsig::PublicKey, &[schnorrsig::PublicKey]), payouts: Vec, - refund_timelock: u32, + (cet_timelock, refund_timelock): (u32, u32), ) -> ( CfdTransactions, CfdTransactions, @@ -377,7 +380,7 @@ fn create_cfd_txs( }, ), (oracle_pk, nonce_pks), - refund_timelock, + (cet_timelock, refund_timelock), payouts.clone(), maker_sk, ) @@ -398,7 +401,7 @@ fn create_cfd_txs( }, ), (oracle_pk, nonce_pks), - refund_timelock, + (cet_timelock, refund_timelock), payouts, taker_sk, ) diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 0ba0ab5..27b36d0 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -11,6 +11,7 @@ use model::cfd::{Cfd, Order}; use model::WalletInfo; use rocket::fairing::AdHoc; use rocket_db_pools::Database; +use std::collections::HashMap; use std::path::PathBuf; use tokio::sync::watch; use tracing_subscriber::filter::LevelFilter; @@ -26,6 +27,7 @@ mod logger; mod maker_cfd; mod maker_inc_connections; mod model; +mod monitor; mod routes; mod routes_maker; mod seed; @@ -75,6 +77,7 @@ async fn main() -> Result<()> { let data_dir = opts .data_dir + .clone() .unwrap_or_else(|| std::env::current_dir().expect("unable to get cwd")); if !data_dir.exists() { @@ -160,6 +163,8 @@ async fn main() -> Result<()> { let (maker_inc_connections_address, maker_inc_connections_context) = xtra::Context::new(None); + let (monitor_actor_address, monitor_actor_context) = xtra::Context::new(None); + let cfd_maker_actor_inbox = maker_cfd::Actor::new( db, wallet.clone(), @@ -167,6 +172,7 @@ async fn main() -> Result<()> { cfd_feed_sender, order_feed_sender, maker_inc_connections_address.clone(), + monitor_actor_address, ) .await .unwrap() @@ -179,6 +185,12 @@ async fn main() -> Result<()> { )), ); + tokio::spawn(monitor_actor_context.run(monitor::Actor::new( + &opts.electrum, + HashMap::new(), + cfd_maker_actor_inbox.clone(), + ))); + tokio::spawn({ let cfd_maker_actor_inbox = cfd_maker_actor_inbox.clone(); let maker_inc_connections_address = maker_inc_connections_address.clone(); diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 29d8d6c..f4e0344 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -4,14 +4,21 @@ use crate::db::{ load_cfd_by_order_id, load_order_by_id, }; use crate::maker_inc_connections::TakerCommand; -use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId}; +use crate::model::cfd::{CetStatus, Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId}; use crate::model::{TakerId, Usd}; +use crate::monitor::{ + CetTimelockExpired, CommitFinality, LockFinality, MonitorParams, RefundFinality, + RefundTimelockExpired, +}; use crate::wallet::Wallet; use crate::wire::SetupMsg; -use crate::{maker_inc_connections, setup_contract_actor}; -use anyhow::Result; +use crate::{maker_inc_connections, monitor, setup_contract_actor}; +use anyhow::{bail, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; +use bdk::bitcoin::{Amount, PublicKey}; +use cfd_protocol::secp256k1_zkp::SECP256K1; +use cfd_protocol::{finalize_spend_transaction, spending_tx_sighash}; use std::time::SystemTime; use tokio::sync::{mpsc, watch}; use xtra::prelude::*; @@ -55,6 +62,7 @@ pub struct Actor { // TODO: Move the contract setup into a dedicated actor and send messages to that actor that // manages the state instead of this ugly buffer contract_setup_message_buffer: Vec, + monitor_actor: Address>, } impl Actor { @@ -65,6 +73,7 @@ impl Actor { cfd_feed_actor_inbox: watch::Sender>, order_feed_sender: watch::Sender>, takers: Address, + monitor_actor: Address>, ) -> Result { let mut conn = db.acquire().await?; @@ -81,6 +90,7 @@ impl Actor { current_order_id: None, current_contract_setup: None, contract_setup_message_buffer: vec![], + monitor_actor, }) } @@ -158,12 +168,39 @@ impl Actor { self.cfd_feed_actor_inbox .send(load_all_cfds(&mut conn).await?)?; - let txid = self.wallet.try_broadcast_transaction(msg.dlc.lock).await?; + let txid = self + .wallet + .try_broadcast_transaction(msg.dlc.lock.0.clone()) + .await?; tracing::info!("Lock transaction published with txid {}", txid); - // TODO: tx monitoring, once confirmed with x blocks transition the Cfd to - // Open + // TODO: It's a bit suspicious to load this just to get the + // refund timelock + let cfd = load_cfd_by_order_id(msg.order_id, &mut conn).await?; + + let script_pubkey = msg.dlc.address.script_pubkey(); + self.monitor_actor + .do_send_async(monitor::StartMonitoring { + id: msg.order_id, + params: MonitorParams { + lock: (msg.dlc.lock.0.txid(), msg.dlc.lock.1), + commit: (msg.dlc.commit.0.txid(), msg.dlc.commit.2), + cets: msg + .dlc + .cets + .into_iter() + .map(|(tx, _, range)| (tx.txid(), script_pubkey.clone(), range)) + .collect(), + refund: ( + msg.dlc.refund.0.txid(), + script_pubkey, + cfd.refund_timelock_in_blocks(), + ), + }, + }) + .await?; + Ok(()) } @@ -376,6 +413,280 @@ impl Actor { Ok(()) } + + async fn handle_lock_finality(&mut self, msg: LockFinality) -> Result<()> { + let order_id = msg.0; + tracing::debug!(%order_id, "Lock transaction has reached finality"); + + let mut conn = self.db.acquire().await?; + let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + + use CfdState::*; + let dlc = match cfd.state { + PendingOpen { dlc, .. } => dlc, + OutgoingOrderRequest { .. } => unreachable!("taker-only state"), + IncomingOrderRequest { .. } + | Accepted { .. } + | Rejected { .. } + | ContractSetup { .. } => bail!("Did not expect lock finality yet: ignoring"), + Open { .. } | OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => { + bail!("State already assumes lock finality: ignoring") + } + }; + + insert_new_cfd_state_by_order_id( + msg.0, + CfdState::Open { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + }, + &mut conn, + ) + .await?; + + Ok(()) + } + + async fn handle_commit_finality(&mut self, msg: CommitFinality) -> Result<()> { + let order_id = msg.0; + tracing::debug!(%order_id, "Commit transaction has reached finality"); + + let mut conn = self.db.acquire().await?; + let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + + use CfdState::*; + let dlc = match cfd.state { + Open { dlc, .. } => dlc, + PendingOpen { dlc, .. } => { + tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); + dlc + } + OutgoingOrderRequest { .. } => unreachable!("taker-only state"), + IncomingOrderRequest { .. } + | Accepted { .. } + | Rejected { .. } + | ContractSetup { .. } => bail!("Did not expect commit finality yet: ignoring"), + OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => { + bail!("State already assumes commit finality: ignoring") + } + }; + + insert_new_cfd_state_by_order_id( + msg.0, + CfdState::OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::Unprepared, + }, + &mut conn, + ) + .await?; + + Ok(()) + } + + async fn handle_cet_timelock_expired(&mut self, msg: CetTimelockExpired) -> Result<()> { + let order_id = msg.0; + tracing::debug!(%order_id, "CET timelock has expired"); + + let mut conn = self.db.acquire().await?; + let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + + use CfdState::*; + let new_state = match cfd.state { + CfdState::OpenCommitted { + dlc, + cet_status: CetStatus::Unprepared, + .. + } => CfdState::OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::TimelockExpired, + }, + CfdState::OpenCommitted { + dlc, + cet_status: CetStatus::OracleSigned(price), + .. + } => CfdState::OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::Ready(price), + }, + PendingOpen { dlc, .. } => { + tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); + CfdState::OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::TimelockExpired, + } + } + Open { dlc, .. } => { + tracing::debug!(%order_id, "Was not aware of commit TX broadcast, jumping ahead"); + CfdState::OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::TimelockExpired, + } + } + OutgoingOrderRequest { .. } => unreachable!("taker-only state"), + IncomingOrderRequest { .. } + | Accepted { .. } + | Rejected { .. } + | ContractSetup { .. } => bail!("Did not expect CET timelock expiry yet: ignoring"), + OpenCommitted { + cet_status: CetStatus::TimelockExpired, + .. + } + | OpenCommitted { + cet_status: CetStatus::Ready(_), + .. + } => bail!("State already assumes CET timelock expiry: ignoring"), + MustRefund { .. } | Refunded { .. } => { + bail!("Refund path does not care about CET timelock expiry: ignoring") + } + }; + + insert_new_cfd_state_by_order_id(msg.0, new_state, &mut conn).await?; + + Ok(()) + } + + async fn handle_refund_timelock_expired(&mut self, msg: RefundTimelockExpired) -> Result<()> { + let order_id = msg.0; + tracing::debug!(%order_id, "Refund timelock has expired"); + + let mut conn = self.db.acquire().await?; + let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + + use CfdState::*; + let dlc = match cfd.state { + OpenCommitted { dlc, .. } => { + insert_new_cfd_state_by_order_id( + msg.0, + MustRefund { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc: dlc.clone(), + }, + &mut conn, + ) + .await?; + + dlc + } + MustRefund { .. } | Refunded { .. } => { + bail!("State already assumes refund timelock expiry: ignoring") + } + OutgoingOrderRequest { .. } => unreachable!("taker-only state"), + IncomingOrderRequest { .. } + | Accepted { .. } + | Rejected { .. } + | ContractSetup { .. } => bail!("Did not expect refund timelock expiry yet: ignoring"), + PendingOpen { dlc, .. } => { + tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); + dlc + } + Open { dlc, .. } => { + tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead"); + dlc + } + }; + + insert_new_cfd_state_by_order_id( + msg.0, + MustRefund { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc: dlc.clone(), + }, + &mut conn, + ) + .await?; + + let sig_hash = spending_tx_sighash( + &dlc.refund.0, + &dlc.commit.2, + Amount::from_sat(dlc.commit.0.output[0].value), + ); + let our_sig = SECP256K1.sign(&sig_hash, &dlc.identity); + let our_pubkey = PublicKey::new(bdk::bitcoin::secp256k1::PublicKey::from_secret_key( + SECP256K1, + &dlc.identity, + )); + let counterparty_sig = dlc.refund.1; + let counterparty_pubkey = dlc.identity_counterparty; + let signed_refund_tx = finalize_spend_transaction( + dlc.refund.0, + &dlc.commit.2, + (our_pubkey, our_sig), + (counterparty_pubkey, counterparty_sig), + )?; + + let txid = self + .wallet + .try_broadcast_transaction(signed_refund_tx) + .await?; + + tracing::info!("Refund transaction published on chain: {}", txid); + + Ok(()) + } + + async fn handle_refund_finality(&mut self, msg: RefundFinality) -> Result<()> { + let order_id = msg.0; + tracing::debug!(%order_id, "Refund transaction has reached finality"); + + let mut conn = self.db.acquire().await?; + + let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + + use CfdState::*; + match cfd.state { + MustRefund { .. } => (), + OutgoingOrderRequest { .. } => unreachable!("taker-only state"), + IncomingOrderRequest { .. } + | Accepted { .. } + | Rejected { .. } + | ContractSetup { .. } => bail!("Did not expect refund finality yet: ignoring"), + PendingOpen { .. } => { + tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead"); + } + Open { .. } => { + tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead"); + } + OpenCommitted { .. } => { + tracing::debug!(%order_id, "Was waiting on refund timelock expiry, jumping ahead"); + } + Refunded { .. } => bail!("State already assumes refund finality: ignoring"), + }; + + insert_new_cfd_state_by_order_id( + msg.0, + CfdState::Refunded { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + }, + &mut conn, + ) + .await?; + + Ok(()) + } } #[async_trait] @@ -427,6 +738,41 @@ impl Handler for Actor { } } +#[async_trait] +impl Handler for Actor { + async fn handle(&mut self, msg: LockFinality, _ctx: &mut Context) { + log_error!(self.handle_lock_finality(msg)) + } +} + +#[async_trait] +impl Handler for Actor { + async fn handle(&mut self, msg: CommitFinality, _ctx: &mut Context) { + log_error!(self.handle_commit_finality(msg)) + } +} + +#[async_trait] +impl Handler for Actor { + async fn handle(&mut self, msg: CetTimelockExpired, _ctx: &mut Context) { + log_error!(self.handle_cet_timelock_expired(msg)) + } +} + +#[async_trait] +impl Handler for Actor { + async fn handle(&mut self, msg: RefundTimelockExpired, _ctx: &mut Context) { + log_error!(self.handle_refund_timelock_expired(msg)) + } +} + +#[async_trait] +impl Handler for Actor { + async fn handle(&mut self, msg: RefundFinality, _ctx: &mut Context) { + log_error!(self.handle_refund_finality(msg)) + } +} + impl Message for TakeOrder { type Result = (); } diff --git a/daemon/src/model/cfd.rs b/daemon/src/model/cfd.rs index bde02de..93672ee 100644 --- a/daemon/src/model/cfd.rs +++ b/daemon/src/model/cfd.rs @@ -1,7 +1,8 @@ use crate::model::{Leverage, Position, TakerId, TradingPair, Usd}; use anyhow::Result; use bdk::bitcoin::secp256k1::{SecretKey, Signature}; -use bdk::bitcoin::{Amount, Transaction}; +use bdk::bitcoin::{Address, Amount, PublicKey, Transaction}; +use bdk::descriptor::Descriptor; use cfd_protocol::secp256k1_zkp::EcdsaAdaptorSignature; use rust_decimal::Decimal; use rust_decimal_macros::dec; @@ -11,7 +12,7 @@ use std::ops::RangeInclusive; use std::time::{Duration, SystemTime}; use uuid::Uuid; -#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct OrderId(Uuid); impl Default for OrderId { @@ -184,33 +185,39 @@ pub enum CfdState { dlc: Dlc, }, - /// Requested close the position, but we have not passed that on to the blockchain yet. + // TODO: At the moment we are appending to this state. The way this is handled internally is + // by inserting the same state with more information in the database. We could consider + // changing this to insert different states or update the stae instead of inserting again. + /// The CFD contract's commit transaction reached finality on chain /// - /// This state applies to taker and maker. - CloseRequested { - common: CfdStateCommon, - }, - /// The close transaction (CET) was published on the Bitcoin blockchain but we don't have a - /// confirmation yet. - /// - /// This state applies to taker and maker. - PendingClose { + /// This means that the commit transaction was detected on chain and reached finality + /// confirmations and the contract will be forced to close. + OpenCommitted { common: CfdStateCommon, + dlc: Dlc, + cet_status: CetStatus, }, - /// The close transaction is confirmed with at least one block. - /// - /// This state applies to taker and maker. - Closed { + + /// The CFD contract's refund transaction was published but it not final yet + MustRefund { common: CfdStateCommon, + dlc: Dlc, }, - /// Error state - /// - /// This state applies to taker and maker. - Error { + + Refunded { common: CfdStateCommon, }, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(tag = "type", content = "payload")] +pub enum CetStatus { + Unprepared, + TimelockExpired, + OracleSigned(u64), + Ready(u64), +} + impl CfdState { fn get_common(&self) -> CfdStateCommon { let common = match self { @@ -221,10 +228,9 @@ impl CfdState { CfdState::ContractSetup { common } => common, CfdState::PendingOpen { common, .. } => common, CfdState::Open { common, .. } => common, - CfdState::CloseRequested { common } => common, - CfdState::PendingClose { common } => common, - CfdState::Closed { common } => common, - CfdState::Error { common } => common, + CfdState::OpenCommitted { common, .. } => common, + CfdState::MustRefund { common, .. } => common, + CfdState::Refunded { common, .. } => common, }; *common @@ -259,17 +265,14 @@ impl Display for CfdState { CfdState::Open { .. } => { write!(f, "Open") } - CfdState::CloseRequested { .. } => { - write!(f, "Close Requested") - } - CfdState::PendingClose { .. } => { - write!(f, "Pending Close") + CfdState::OpenCommitted { .. } => { + write!(f, "Open Committed") } - CfdState::Closed { .. } => { - write!(f, "Closed") + CfdState::MustRefund { .. } => { + write!(f, "Must Refund") } - CfdState::Error { .. } => { - write!(f, "Error") + CfdState::Refunded { .. } => { + write!(f, "Refunded") } } } @@ -357,6 +360,9 @@ impl Cfd { /// term to get his funds back. #[allow(dead_code)] const REFUND_THRESHOLD: f32 = 1.5; + + #[allow(dead_code)] + pub const CET_TIMELOCK: u32 = 12; } fn calculate_profit( @@ -509,12 +515,14 @@ mod tests { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct Dlc { pub identity: SecretKey, + pub identity_counterparty: PublicKey, pub revocation: SecretKey, pub publish: SecretKey, + pub address: Address, /// The fully signed lock transaction ready to be published on chain - pub lock: Transaction, - pub commit: (Transaction, EcdsaAdaptorSignature), + pub lock: (Transaction, Descriptor), + pub commit: (Transaction, EcdsaAdaptorSignature, Descriptor), pub cets: Vec<(Transaction, EcdsaAdaptorSignature, RangeInclusive)>, pub refund: (Transaction, Signature), } diff --git a/daemon/src/monitor.rs b/daemon/src/monitor.rs new file mode 100644 index 0000000..bdc9734 --- /dev/null +++ b/daemon/src/monitor.rs @@ -0,0 +1,205 @@ +use crate::actors::log_error; +use crate::model::cfd::{Cfd, OrderId}; +use anyhow::Result; +use async_trait::async_trait; +use bdk::bitcoin::{PublicKey, Script, Txid}; +use bdk::descriptor::Descriptor; +use bdk::miniscript::DescriptorTrait; +use std::collections::HashMap; +use std::ops::RangeInclusive; +use subscription::Monitor; + +mod subscription; + +const FINALITY_CONFIRMATIONS: u32 = 1; + +#[derive(Clone)] +pub struct MonitorParams { + pub lock: (Txid, Descriptor), + pub commit: (Txid, Descriptor), + pub cets: Vec<(Txid, Script, RangeInclusive)>, + pub refund: (Txid, Script, u32), +} + +impl Actor +where + T: xtra::Actor + + xtra::Handler + + xtra::Handler + + xtra::Handler + + xtra::Handler + + xtra::Handler, +{ + pub fn new( + electrum_rpc_url: &str, + cfds: HashMap, + cfd_actor_addr: xtra::Address, + ) -> Self { + let monitor = Monitor::new(electrum_rpc_url, FINALITY_CONFIRMATIONS).unwrap(); + + Self { + monitor, + cfds, + cfd_actor_addr, + } + } + + async fn handle_start_monitoring(&mut self, msg: StartMonitoring) -> Result<()> { + let StartMonitoring { id, params } = msg; + + self.cfds.insert(id, params.clone()); + + tokio::spawn({ + let cfd_actor_addr = self.cfd_actor_addr.clone(); + let lock_subscription = self + .monitor + .subscribe_to((params.lock.0, params.lock.1.script_pubkey())) + .await; + async move { + lock_subscription.wait_until_final().await.unwrap(); + + cfd_actor_addr + .do_send_async(LockFinality(id)) + .await + .unwrap(); + } + }); + + let commit_subscription = self + .monitor + .subscribe_to((params.commit.0, params.commit.1.script_pubkey())) + .await; + + tokio::spawn({ + let cfd_actor_addr = self.cfd_actor_addr.clone(); + let commit_subscription = commit_subscription.clone(); + async move { + commit_subscription.wait_until_final().await.unwrap(); + + cfd_actor_addr + .do_send_async(CommitFinality(id)) + .await + .unwrap(); + } + }); + + tokio::spawn({ + let cfd_actor_addr = self.cfd_actor_addr.clone(); + let commit_subscription = commit_subscription.clone(); + async move { + commit_subscription + .wait_until_confirmed_with(Cfd::CET_TIMELOCK) + .await + .unwrap(); + + cfd_actor_addr + .do_send_async(CetTimelockExpired(id)) + .await + .unwrap(); + } + }); + + tokio::spawn({ + let cfd_actor_addr = self.cfd_actor_addr.clone(); + let commit_subscription = commit_subscription.clone(); + let refund_timelock = params.refund.2; + async move { + commit_subscription + .wait_until_confirmed_with(refund_timelock) + .await + .unwrap(); + + cfd_actor_addr + .do_send_async(RefundTimelockExpired(id)) + .await + .unwrap(); + } + }); + + tokio::spawn({ + let cfd_actor_addr = self.cfd_actor_addr.clone(); + let refund_subscription = self + .monitor + .subscribe_to((params.refund.0, params.refund.1)) + .await; + async move { + refund_subscription.wait_until_final().await.unwrap(); + + cfd_actor_addr + .do_send_async(RefundFinality(id)) + .await + .unwrap(); + } + }); + + // TODO: CET subscription => Requires information from Oracle + + Ok(()) + } +} + +pub struct StartMonitoring { + pub id: OrderId, + pub params: MonitorParams, +} + +impl xtra::Message for StartMonitoring { + type Result = (); +} + +pub struct LockFinality(pub OrderId); + +impl xtra::Message for LockFinality { + type Result = (); +} + +pub struct CommitFinality(pub OrderId); + +impl xtra::Message for CommitFinality { + type Result = (); +} + +pub struct CetTimelockExpired(pub OrderId); + +impl xtra::Message for CetTimelockExpired { + type Result = (); +} + +pub struct RefundTimelockExpired(pub OrderId); + +impl xtra::Message for RefundTimelockExpired { + type Result = (); +} + +pub struct RefundFinality(pub OrderId); + +impl xtra::Message for RefundFinality { + type Result = (); +} + +pub struct Actor +where + T: xtra::Actor, +{ + monitor: Monitor, + cfds: HashMap, + cfd_actor_addr: xtra::Address, +} + +impl xtra::Actor for Actor where T: xtra::Actor {} + +// TODO: The traitbound for LockFinality should not be needed here, but we could not work around it +#[async_trait] +impl xtra::Handler for Actor +where + T: xtra::Actor + + xtra::Handler + + xtra::Handler + + xtra::Handler + + xtra::Handler + + xtra::Handler, +{ + async fn handle(&mut self, msg: StartMonitoring, _ctx: &mut xtra::Context) { + log_error!(self.handle_start_monitoring(msg)); + } +} diff --git a/daemon/src/monitor/subscription.rs b/daemon/src/monitor/subscription.rs new file mode 100644 index 0000000..43808a3 --- /dev/null +++ b/daemon/src/monitor/subscription.rs @@ -0,0 +1,429 @@ +#![allow(dead_code)] + +use anyhow::{bail, Context, Result}; +use bdk::bitcoin::{Script, Txid}; +use bdk::electrum_client::{ElectrumApi, GetHistoryRes, HeaderNotification}; +use serde::{Deserialize, Serialize}; +use std::collections::{BTreeMap, HashMap}; +use std::convert::{TryFrom, TryInto}; +use std::fmt; +use std::ops::Add; +use std::sync::Arc; +use tokio::sync::{watch, Mutex}; +use tokio::time::{Duration, Instant}; + +pub struct Monitor { + client: Arc>, + finality_confirmations: u32, +} + +impl Monitor { + pub fn new(electrum_rpc_url: &str, finality_confirmations: u32) -> Result { + let client = bdk::electrum_client::Client::new(electrum_rpc_url) + .context("Failed to initialize Electrum RPC client")?; + + let client = Client::new(client, Duration::from_secs(10))?; + + let monitor = Monitor { + client: Arc::new(Mutex::new(client)), + finality_confirmations, + }; + + Ok(monitor) + } + + pub async fn subscribe_to(&self, tx: impl Watchable + Send + 'static) -> Subscription { + let txid = tx.id(); + let script = tx.script(); + + let sub = self + .client + .lock() + .await + .subscriptions + .entry((txid, script.clone())) + .or_insert_with(|| { + let (sender, receiver) = watch::channel(ScriptStatus::Unseen); + let client = self.client.clone(); + + tokio::spawn(async move { + let mut last_status = None; + + // TODO: We need feedback in the monitoring actor about failures in here + loop { + tokio::time::sleep(Duration::from_secs(5)).await; + + let new_status = match client.lock().await.status_of_script(&tx) { + Ok(new_status) => new_status, + Err(error) => { + tracing::warn!(%txid, "Failed to get status of script: {:#}", error); + return; + } + }; + + last_status = Some(print_status_change(txid, last_status, new_status)); + + let all_receivers_gone = sender.send(new_status).is_err(); + + if all_receivers_gone { + tracing::debug!(%txid, "All receivers gone, removing subscription"); + client.lock().await.subscriptions.remove(&(txid, script)); + return; + } + } + }); + + Subscription { + receiver, + finality_confirmations: self.finality_confirmations, + txid, + } + }) + .clone(); + + sub + } +} + +/// Represents a subscription to the status of a given transaction. +#[derive(Debug, Clone)] +pub struct Subscription { + receiver: watch::Receiver, + finality_confirmations: u32, + txid: Txid, +} + +impl Subscription { + pub async fn wait_until_final(&self) -> Result<()> { + let conf_target = self.finality_confirmations; + let txid = self.txid; + + tracing::info!(%txid, required_confirmation=%conf_target, "Waiting for Bitcoin transaction finality"); + + let mut seen_confirmations = 0; + + self.wait_until(|status| match status { + ScriptStatus::Confirmed(inner) => { + let confirmations = inner.confirmations(); + + if confirmations > seen_confirmations { + tracing::info!(%txid, + seen_confirmations = %confirmations, + needed_confirmations = %conf_target, + "Waiting for Bitcoin transaction finality"); + seen_confirmations = confirmations; + } + + inner.meets_target(conf_target) + } + _ => false, + }) + .await + } + + pub async fn wait_until_seen(&self) -> Result<()> { + self.wait_until(ScriptStatus::has_been_seen).await + } + + pub async fn wait_until_confirmed_with(&self, target: T) -> Result<()> + where + u32: PartialOrd, + T: Copy, + { + self.wait_until(|status| status.is_confirmed_with(target)) + .await + } + + async fn wait_until(&self, mut predicate: impl FnMut(&ScriptStatus) -> bool) -> Result<()> { + let mut receiver = self.receiver.clone(); + + while !predicate(&receiver.borrow()) { + receiver + .changed() + .await + .context("Failed while waiting for next status update")?; + } + + Ok(()) + } +} + +/// Defines a watchable transaction. +/// +/// For a transaction to be watchable, we need to know two things: Its +/// transaction ID and the specific output script that is going to change. +/// A transaction can obviously have multiple outputs but our protocol purposes, +/// we are usually interested in a specific one. +pub trait Watchable { + fn id(&self) -> Txid; + fn script(&self) -> Script; +} + +impl Watchable for (Txid, Script) { + fn id(&self) -> Txid { + self.0 + } + + fn script(&self) -> Script { + self.1.clone() + } +} + +fn print_status_change(txid: Txid, old: Option, new: ScriptStatus) -> ScriptStatus { + match (old, new) { + (None, new_status) => { + tracing::debug!(%txid, status = %new_status, "Found relevant Bitcoin transaction"); + } + (Some(old_status), new_status) if old_status != new_status => { + tracing::debug!(%txid, %new_status, %old_status, "Bitcoin transaction status changed"); + } + _ => {} + } + + new +} + +pub struct Client { + electrum: bdk::electrum_client::Client, + latest_block_height: BlockHeight, + last_sync: Instant, + sync_interval: Duration, + script_history: BTreeMap>, + subscriptions: HashMap<(Txid, Script), Subscription>, +} + +impl Client { + fn new(electrum: bdk::electrum_client::Client, interval: Duration) -> Result { + // Initially fetch the latest block for storing the height. + // We do not act on this subscription after this call. + let latest_block = electrum + .block_headers_subscribe() + .context("Failed to subscribe to header notifications")?; + + Ok(Self { + electrum, + latest_block_height: BlockHeight::try_from(latest_block)?, + last_sync: Instant::now(), + sync_interval: interval, + script_history: Default::default(), + subscriptions: Default::default(), + }) + } + + fn update_state(&mut self) -> Result<()> { + let now = Instant::now(); + if now < self.last_sync + self.sync_interval { + return Ok(()); + } + + self.last_sync = now; + self.update_latest_block()?; + self.update_script_histories()?; + + Ok(()) + } + + fn status_of_script(&mut self, tx: &T) -> Result + where + T: Watchable, + { + let txid = tx.id(); + let script = tx.script(); + + if !self.script_history.contains_key(&script) { + self.script_history.insert(script.clone(), vec![]); + } + + self.update_state()?; + + let history = self.script_history.entry(script).or_default(); + + let history_of_tx = history + .iter() + .filter(|entry| entry.tx_hash == txid) + .collect::>(); + + match history_of_tx.as_slice() { + [] => Ok(ScriptStatus::Unseen), + [remaining @ .., last] => { + if !remaining.is_empty() { + tracing::warn!("Found more than a single history entry for script. This is highly unexpected and those history entries will be ignored") + } + + if last.height <= 0 { + Ok(ScriptStatus::InMempool) + } else { + Ok(ScriptStatus::Confirmed( + Confirmed::from_inclusion_and_latest_block( + u32::try_from(last.height)?, + u32::from(self.latest_block_height), + ), + )) + } + } + } + } + + fn update_latest_block(&mut self) -> Result<()> { + // Fetch the latest block for storing the height. + // We do not act on this subscription after this call, as we cannot rely on + // subscription push notifications because eventually the Electrum server will + // close the connection and subscriptions are not automatically renewed + // upon renewing the connection. + let latest_block = self + .electrum + .block_headers_subscribe() + .context("Failed to subscribe to header notifications")?; + let latest_block_height = BlockHeight::try_from(latest_block)?; + + if latest_block_height > self.latest_block_height { + tracing::debug!( + block_height = u32::from(latest_block_height), + "Got notification for new block" + ); + self.latest_block_height = latest_block_height; + } + + Ok(()) + } + + fn update_script_histories(&mut self) -> Result<()> { + let histories = self + .electrum + .batch_script_get_history(self.script_history.keys()) + .context("Failed to get script histories")?; + + if histories.len() != self.script_history.len() { + bail!( + "Expected {} history entries, received {}", + self.script_history.len(), + histories.len() + ); + } + + let scripts = self.script_history.keys().cloned(); + let histories = histories.into_iter(); + + self.script_history = scripts.zip(histories).collect::>(); + + Ok(()) + } +} + +/// Represent a block height, or block number, expressed in absolute block +/// count. E.g. The transaction was included in block #655123, 655123 block +/// after the genesis block. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)] +#[serde(transparent)] +pub struct BlockHeight(u32); + +impl From for u32 { + fn from(height: BlockHeight) -> Self { + height.0 + } +} + +impl TryFrom for BlockHeight { + type Error = anyhow::Error; + + fn try_from(value: HeaderNotification) -> Result { + Ok(Self( + value + .height + .try_into() + .context("Failed to fit usize into u32")?, + )) + } +} + +impl Add for BlockHeight { + type Output = BlockHeight; + fn add(self, rhs: u32) -> Self::Output { + BlockHeight(self.0 + rhs) + } +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum ExpiredTimelocks { + None, + Cancel, + Punish, +} + +#[derive(Debug, Copy, Clone, PartialEq)] +pub struct Confirmed { + /// The depth of this transaction within the blockchain. + /// + /// Will be zero if the transaction is included in the latest block. + depth: u32, +} + +impl Confirmed { + pub fn new(depth: u32) -> Self { + Self { depth } + } + + /// Compute the depth of a transaction based on its inclusion height and the + /// latest known block. + /// + /// Our information about the latest block might be outdated. To avoid an + /// overflow, we make sure the depth is 0 in case the inclusion height + /// exceeds our latest known block, + pub fn from_inclusion_and_latest_block(inclusion_height: u32, latest_block: u32) -> Self { + let depth = latest_block.saturating_sub(inclusion_height); + + Self { depth } + } + + pub fn confirmations(&self) -> u32 { + self.depth + 1 + } + + pub fn meets_target(&self, target: T) -> bool + where + u32: PartialOrd, + { + self.confirmations() >= target + } +} + +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum ScriptStatus { + Unseen, + InMempool, + Confirmed(Confirmed), +} + +impl ScriptStatus { + /// Check if the script has any confirmations. + pub fn is_confirmed(&self) -> bool { + matches!(self, ScriptStatus::Confirmed(_)) + } + + /// Check if the script has met the given confirmation target. + pub fn is_confirmed_with(&self, target: T) -> bool + where + u32: PartialOrd, + { + match self { + ScriptStatus::Confirmed(inner) => inner.meets_target(target), + _ => false, + } + } + + pub fn has_been_seen(&self) -> bool { + matches!(self, ScriptStatus::InMempool | ScriptStatus::Confirmed(_)) + } +} + +impl fmt::Display for ScriptStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ScriptStatus::Unseen => write!(f, "unseen"), + ScriptStatus::InMempool => write!(f, "in mempool"), + ScriptStatus::Confirmed(inner) => { + write!(f, "confirmed with {} blocks", inner.confirmations()) + } + } + } +} diff --git a/daemon/src/setup_contract_actor.rs b/daemon/src/setup_contract_actor.rs index 5c5d5bc..0e45bde 100644 --- a/daemon/src/setup_contract_actor.rs +++ b/daemon/src/setup_contract_actor.rs @@ -1,3 +1,4 @@ +use crate::model; use crate::model::cfd::{Cfd, Dlc}; use crate::wallet::Wallet; use crate::wire::{Msg0, Msg1, Msg2, SetupMsg}; @@ -61,7 +62,10 @@ pub fn new( (params.maker().clone(), *params.maker_punish()), (params.taker().clone(), *params.taker_punish()), (oracle_pk, &[]), - cfd.refund_timelock_in_blocks(), + ( + model::cfd::Cfd::CET_TIMELOCK, + cfd.refund_timelock_in_blocks(), + ), vec![], sk, ) @@ -142,10 +146,12 @@ pub fn new( Dlc { identity: sk, + identity_counterparty: params.other.identity_pk, revocation: rev_sk, publish: publish_sk, - lock: signed_lock_tx.extract_tx(), - commit: (commit_tx, msg1.commit), + address: params.own.address, + lock: (signed_lock_tx.extract_tx(), lock_desc), + commit: (commit_tx, msg1.commit, commit_desc), cets: msg1 .cets .into_iter() diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index e8c1ef2..8f2e855 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -228,7 +228,7 @@ impl Actor { self.cfd_feed_actor_inbox .send(load_all_cfds(&mut conn).await?)?; - let txid = self.wallet.try_broadcast_transaction(dlc.lock).await?; + let txid = self.wallet.try_broadcast_transaction(dlc.lock.0).await?; tracing::info!("Lock transaction published with txid {}", txid);