diff --git a/daemon/tests/happy_path.rs b/daemon/tests/happy_path.rs index 789f86e..e6436ba 100644 --- a/daemon/tests/happy_path.rs +++ b/daemon/tests/happy_path.rs @@ -1,39 +1,38 @@ -use anyhow::{Context, Result}; -use bdk::bitcoin::util::psbt::PartiallySignedTransaction; -use bdk::bitcoin::{ecdsa, Txid}; -use cfd_protocol::secp256k1_zkp::{schnorrsig, Secp256k1}; -use cfd_protocol::PartyParams; -use daemon::maker_cfd::CfdAction; +use crate::harness::mocks::monitor::Monitor; +use crate::harness::mocks::oracle::Oracle; +use crate::harness::mocks::wallet::Wallet; +use crate::harness::start_both; +use anyhow::Context; +use cfd_protocol::secp256k1_zkp::schnorrsig; +use daemon::maker_cfd; use daemon::model::cfd::{Cfd, CfdState, Order, Origin}; -use daemon::model::{Price, Timestamp, Usd, WalletInfo}; +use daemon::model::{Price, Usd}; use daemon::tokio_ext::FutureExt; -use daemon::{ - connection, db, maker_cfd, maker_inc_connections, monitor, oracle, taker_cfd, wallet, -}; -use rand::thread_rng; use rust_decimal_macros::dec; -use sqlx::SqlitePool; -use std::net::SocketAddr; -use std::str::FromStr; -use std::task::Poll; use std::time::Duration; use tokio::sync::watch; use tracing::subscriber::DefaultGuard; use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; -use xtra::spawn::TokioGlobalSpawnExt; -use xtra::Actor; -use xtra_productivity::xtra_productivity; +mod harness; #[tokio::test] async fn taker_receives_order_from_maker_on_publication() { let _guard = init_tracing(); - let (mut maker, mut taker) = start_both().await; + let (mut maker, mut taker) = start_both( + Oracle::default(), + Monitor, + Wallet::default(), + Oracle::default(), + Monitor, + Wallet::default(), + ) + .await; assert!(is_next_none(&mut taker.order_feed).await); - maker.publish_order(new_dummy_order()); + maker.publish_order(dummy_new_order()); let (published, received) = tokio::join!( next_some(&mut maker.order_feed), @@ -46,12 +45,20 @@ async fn taker_receives_order_from_maker_on_publication() { #[tokio::test] async fn taker_takes_order_and_maker_rejects() { let _guard = init_tracing(); - let (mut maker, mut taker) = start_both().await; + let (mut maker, mut taker) = start_both( + Oracle::default(), + Monitor, + Wallet::default().with_wallet_info(), + Oracle::default(), + Monitor, + Wallet::default().with_wallet_info(), + ) + .await; // TODO: Why is this needed? For the cfd stream it is not needed is_next_none(&mut taker.order_feed).await; - maker.publish_order(new_dummy_order()); + maker.publish_order(dummy_new_order()); let (_, received) = next_order(&mut maker.order_feed, &mut taker.order_feed).await; @@ -79,6 +86,55 @@ async fn taker_takes_order_and_maker_rejects() { assert!(matches!(maker_cfd.state, CfdState::Rejected { .. })); } +#[tokio::test] +async fn taker_takes_order_and_maker_accepts_and_contract_setup() { + let _guard = init_tracing(); + let (mut maker, mut taker) = start_both( + Oracle::default() + .with_dummy_announcement(harness::cfd_protocol::OliviaData::example_0().announcement()), + Monitor, + Wallet::default() + .with_wallet_info() + .with_party_params() + .with_psbt(harness::bdk::dummy_partially_signed_transaction()) + .with_txid(harness::bdk::dummy_tx_id()), + Oracle::default() + .with_dummy_announcement(harness::cfd_protocol::OliviaData::example_0().announcement()), + Monitor, + Wallet::default() + .with_wallet_info() + .with_party_params() + .with_psbt(harness::bdk::dummy_partially_signed_transaction()) + .with_txid(harness::bdk::dummy_tx_id()), + ) + .await; + + is_next_none(&mut taker.order_feed).await; + + maker.publish_order(dummy_new_order()); + + let (_, received) = next_order(&mut maker.order_feed, &mut taker.order_feed).await; + + taker.take_order(received.clone(), Usd::new(dec!(5))); + let (_, _) = next_cfd(&mut taker.cfd_feed, &mut maker.cfd_feed).await; + + maker.accept_take_request(received.clone()); + + let (taker_cfd, maker_cfd) = next_cfd(&mut taker.cfd_feed, &mut maker.cfd_feed).await; + // TODO: More elaborate Cfd assertions + assert_eq!(taker_cfd.order.id, received.id); + assert_eq!(maker_cfd.order.id, received.id); + assert!(matches!(taker_cfd.state, CfdState::ContractSetup { .. })); + assert!(matches!(maker_cfd.state, CfdState::ContractSetup { .. })); + + let (taker_cfd, maker_cfd) = next_cfd(&mut taker.cfd_feed, &mut maker.cfd_feed).await; + // TODO: More elaborate Cfd assertions + assert_eq!(taker_cfd.order.id, received.id); + assert_eq!(maker_cfd.order.id, received.id); + assert!(matches!(taker_cfd.state, CfdState::PendingOpen { .. })); + assert!(matches!(maker_cfd.state, CfdState::PendingOpen { .. })); +} + /// The order cannot be directly compared in tests as the origin is different, /// therefore wrap the assertion macro in a code that unifies the 'Origin' fn assert_is_same_order(a: &Order, b: &Order) { @@ -91,10 +147,10 @@ fn assert_is_same_order(a: &Order, b: &Order) { assert_eq!(a, b); } -fn new_dummy_order() -> maker_cfd::NewOrder { +fn dummy_new_order() -> maker_cfd::NewOrder { maker_cfd::NewOrder { price: Price::new(dec!(50_000)).expect("unexpected failure"), - min_quantity: Usd::new(dec!(10)), + min_quantity: Usd::new(dec!(5)), max_quantity: Usd::new(dec!(100)), } } @@ -152,8 +208,9 @@ async fn next(rx: &mut watch::Receiver) -> T where T: Clone, { + // TODO: Make timeout configurable, only contract setup can take up to 2 min on CI rx.changed() - .timeout(Duration::from_secs(5)) + .timeout(Duration::from_secs(120)) .await .context("Waiting for next element in channel is taking too long, aborting") .unwrap() @@ -185,212 +242,3 @@ fn init_tracing() -> DefaultGuard { guard } - -/// Test Stub simulating the Oracle actor -struct Oracle; -impl xtra::Actor for Oracle {} - -#[xtra_productivity(message_impl = false)] -impl Oracle { - async fn handle_get_announcement( - &mut self, - _msg: oracle::GetAnnouncement, - ) -> Option { - todo!("stub this if needed") - } - - async fn handle(&mut self, _msg: oracle::MonitorAttestation) { - todo!("stub this if needed") - } - - async fn handle(&mut self, _msg: oracle::Sync) {} -} - -/// Test Stub simulating the Monitor actor -struct Monitor; -impl xtra::Actor for Monitor {} - -#[xtra_productivity(message_impl = false)] -impl Monitor { - async fn handle(&mut self, _msg: monitor::Sync) {} - - async fn handle(&mut self, _msg: monitor::StartMonitoring) { - todo!("stub this if needed") - } - - async fn handle(&mut self, _msg: monitor::CollaborativeSettlement) { - todo!("stub this if needed") - } - - async fn handle(&mut self, _msg: oracle::Attestation) { - todo!("stub this if needed") - } -} - -/// Test Stub simulating the Wallet actor -struct Wallet; -impl xtra::Actor for Wallet {} - -#[xtra_productivity(message_impl = false)] -impl Wallet { - async fn handle(&mut self, _msg: wallet::BuildPartyParams) -> Result { - todo!("stub this if needed") - } - async fn handle(&mut self, _msg: wallet::Sync) -> Result { - let s = Secp256k1::new(); - let public_key = ecdsa::PublicKey::new(s.generate_keypair(&mut thread_rng()).1); - let address = bdk::bitcoin::Address::p2pkh(&public_key, bdk::bitcoin::Network::Testnet); - - Ok(WalletInfo { - balance: bdk::bitcoin::Amount::ONE_BTC, - address, - last_updated_at: Timestamp::now()?, - }) - } - async fn handle(&mut self, _msg: wallet::Sign) -> Result { - todo!("stub this if needed") - } - async fn handle(&mut self, _msg: wallet::TryBroadcastTransaction) -> Result { - todo!("stub this if needed") - } -} - -/// Maker Test Setup -#[derive(Clone)] -struct Maker { - cfd_actor_addr: - xtra::Address>, - order_feed: watch::Receiver>, - cfd_feed: watch::Receiver>, - #[allow(dead_code)] // we need to keep the xtra::Address for refcounting - inc_conn_actor_addr: xtra::Address, - listen_addr: SocketAddr, -} - -impl Maker { - async fn start(oracle_pk: schnorrsig::PublicKey) -> Self { - let db = in_memory_db().await; - - let wallet_addr = Wallet {}.create(None).spawn_global(); - - let settlement_time_interval_hours = time::Duration::hours(24); - - let maker = daemon::MakerActorSystem::new( - db, - wallet_addr, - oracle_pk, - |_, _| Oracle, - |_, _| async { Ok(Monitor) }, - |channel0, channel1| maker_inc_connections::Actor::new(channel0, channel1), - settlement_time_interval_hours, - ) - .await - .unwrap(); - - let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); - - let address = listener.local_addr().unwrap(); - - let listener_stream = futures::stream::poll_fn(move |ctx| { - let message = match futures::ready!(listener.poll_accept(ctx)) { - Ok((stream, address)) => { - dbg!("new connection"); - maker_inc_connections::ListenerMessage::NewConnection { stream, address } - } - Err(e) => maker_inc_connections::ListenerMessage::Error { source: e }, - }; - - Poll::Ready(Some(message)) - }); - - tokio::spawn(maker.inc_conn_addr.clone().attach_stream(listener_stream)); - - Self { - cfd_actor_addr: maker.cfd_actor_addr, - order_feed: maker.order_feed_receiver, - cfd_feed: maker.cfd_feed_receiver, - inc_conn_actor_addr: maker.inc_conn_addr, - listen_addr: address, - } - } - - fn publish_order(&mut self, new_order_params: maker_cfd::NewOrder) { - self.cfd_actor_addr.do_send(new_order_params).unwrap(); - } - - fn reject_take_request(&self, order: Order) { - self.cfd_actor_addr - .do_send(CfdAction::RejectOrder { order_id: order.id }) - .unwrap(); - } -} - -/// Taker Test Setup -#[derive(Clone)] -struct Taker { - order_feed: watch::Receiver>, - cfd_feed: watch::Receiver>, - cfd_actor_addr: xtra::Address>, -} - -impl Taker { - async fn start(oracle_pk: schnorrsig::PublicKey, maker_address: SocketAddr) -> Self { - let connection::Actor { - send_to_maker, - read_from_maker, - } = connection::Actor::new(maker_address).await; - - let db = in_memory_db().await; - - let wallet_addr = Wallet {}.create(None).spawn_global(); - - let taker = daemon::TakerActorSystem::new( - db, - wallet_addr, - oracle_pk, - send_to_maker, - read_from_maker, - |_, _| Oracle, - |_, _| async { Ok(Monitor) }, - ) - .await - .unwrap(); - - Self { - order_feed: taker.order_feed_receiver, - cfd_feed: taker.cfd_feed_receiver, - cfd_actor_addr: taker.cfd_actor_addr, - } - } - - fn take_order(&self, order: Order, quantity: Usd) { - self.cfd_actor_addr - .do_send(taker_cfd::TakeOffer { - order_id: order.id, - quantity, - }) - .unwrap(); - } -} - -async fn start_both() -> (Maker, Taker) { - let oracle_pk: schnorrsig::PublicKey = schnorrsig::PublicKey::from_str( - "ddd4636845a90185991826be5a494cde9f4a6947b1727217afedc6292fa4caf7", - ) - .unwrap(); - - let maker = Maker::start(oracle_pk).await; - let taker = Taker::start(oracle_pk, maker.listen_addr).await; - (maker, taker) -} - -async fn in_memory_db() -> SqlitePool { - // Note: Every :memory: database is distinct from every other. So, opening two database - // connections each with the filename ":memory:" will create two independent in-memory - // databases. see: https://www.sqlite.org/inmemorydb.html - let pool = SqlitePool::connect(":memory:").await.unwrap(); - - db::run_migrations(&pool).await.unwrap(); - - pool -} diff --git a/daemon/tests/harness/bdk.rs b/daemon/tests/harness/bdk.rs new file mode 100644 index 0000000..51eb4cb --- /dev/null +++ b/daemon/tests/harness/bdk.rs @@ -0,0 +1,29 @@ +use bdk::bitcoin::util::psbt::{Global, PartiallySignedTransaction}; +use bdk::bitcoin::{Transaction, Txid}; +use std::collections::BTreeMap; + +pub fn dummy_partially_signed_transaction() -> PartiallySignedTransaction { + // very simple dummy psbt that does not contain anything + // pulled in from github.com-1ecc6299db9ec823/bitcoin-0.27.1/src/util/psbt/mod.rs:238 + + PartiallySignedTransaction { + global: Global { + unsigned_tx: Transaction { + version: 2, + lock_time: 0, + input: vec![], + output: vec![], + }, + xpub: Default::default(), + version: 0, + proprietary: BTreeMap::new(), + unknown: BTreeMap::new(), + }, + inputs: vec![], + outputs: vec![], + } +} + +pub fn dummy_tx_id() -> Txid { + dummy_partially_signed_transaction().extract_tx().txid() +} diff --git a/daemon/tests/harness/cfd_protocol.rs b/daemon/tests/harness/cfd_protocol.rs new file mode 100644 index 0000000..9e7822a --- /dev/null +++ b/daemon/tests/harness/cfd_protocol.rs @@ -0,0 +1,143 @@ +use anyhow::Result; +use bdk::bitcoin; +use bdk::bitcoin::util::bip32::ExtendedPrivKey; +use bdk::bitcoin::{Amount, Network}; +use cfd_protocol::secp256k1_zkp::rand::{CryptoRng, RngCore}; +use cfd_protocol::secp256k1_zkp::{schnorrsig, SecretKey}; +use cfd_protocol::Announcement; +use std::str::FromStr; + +pub fn dummy_wallet( + rng: &mut (impl RngCore + CryptoRng), + utxo_amount: Amount, + num_utxos: u8, +) -> Result> { + use bdk::{populate_test_db, testutils}; + + let mut seed = [0u8; 32]; + rng.fill_bytes(&mut seed); + + let key = ExtendedPrivKey::new_master(Network::Regtest, &seed)?; + let descriptors = testutils!(@descriptors (&format!("wpkh({}/*)", key))); + + let mut database = bdk::database::MemoryDatabase::new(); + + for index in 0..num_utxos { + populate_test_db!( + &mut database, + testutils! { + @tx ( (@external descriptors, index as u32) => utxo_amount.as_sat() ) (@confirmations 1) + }, + Some(100) + ); + } + + let wallet = bdk::Wallet::new_offline(&descriptors.0, None, Network::Regtest, database)?; + + Ok(wallet) +} + +#[allow(dead_code)] +pub struct OliviaData { + id: String, + pk: schnorrsig::PublicKey, + nonce_pks: Vec, + price: u64, + attestations: Vec, +} + +impl OliviaData { + // Note: protocol tests have example 1 as well, but so far we are not asserting on that level of + // granularity; example 1 can be pulled in once needed + pub fn example_0() -> Self { + Self::example( + Self::EVENT_ID_0, + Self::PRICE_0, + &Self::NONCE_PKS_0, + &Self::ATTESTATIONS_0, + ) + } + + /// Generate an example of all the data from `olivia` needed to test the + /// CFD protocol end-to-end. + fn example(id: &str, price: u64, nonce_pks: &[&str], attestations: &[&str]) -> Self { + let oracle_pk = schnorrsig::PublicKey::from_str(Self::OLIVIA_PK).unwrap(); + + let id = id.to_string(); + + let nonce_pks = nonce_pks + .iter() + .map(|pk| schnorrsig::PublicKey::from_str(pk).unwrap()) + .collect(); + + let attestations = attestations + .iter() + .map(|pk| SecretKey::from_str(pk).unwrap()) + .collect(); + + Self { + id, + pk: oracle_pk, + nonce_pks, + attestations, + price, + } + } + + pub fn announcement(&self) -> Announcement { + Announcement { + id: self.id.clone(), + nonce_pks: self.nonce_pks.clone(), + } + } + + const OLIVIA_PK: &'static str = + "ddd4636845a90185991826be5a494cde9f4a6947b1727217afedc6292fa4caf7"; + + const EVENT_ID_0: &'static str = "/x/BitMEX/BXBT/2021-10-05T02:00:00.price?n=20"; + const NONCE_PKS_0: [&'static str; 20] = [ + "d02d163cf9623f567c4e3faf851a9266ac1ede13da4ca4141f3a7717fba9a739", + "bc310f26aa5addbc382f653d8530aaead7c25e3546abc24639f490e36d4bdb88", + "2661375f570dcc32300d442e85b6d72dfa3232dccda45e8fb4a2d1e758d1d374", + "fcc68fbf071d391b14c0867cb4defb5a8abc12418dff3dfc2f84fd4025cb2716", + "cf5c2b7fe3851c64a7ff9635a9bfc50cdd301401d002f2da049f4c6a20e8457b", + "14f1005d8c2832a2c4666dd732dd9bb3af9c8f70ebcdaec96869b1ca0c8e0de6", + "299ee1c9c20fab8b067adf452a7d7661b5e7f5dd6bc707562805002e7cb8443e", + "bcb4e5a594346de298993a7a31762f598b5224b977e23182369e9ed3e5127f78", + "25e09a16ee5d469069abfb62cd5e1f20af50cf15241f571e64fa28b127304574", + "3ed5a1422f43299caf281123aba88bd4bc61ec863f4afb79c7ce7663ad44db5d", + "a7e0f61212735c192c4bf16e0a3e925e65f9f3feb6f1e5e8d6f5c18cf2dbb5a8", + "a36a631015d9036d0c321fea7cf12f589aa196e7279b4a290de5112c2940e540", + "b5bdd931f81970139e7301ac654b378077c3ed993ca7893ed93fee5fc6f7a782", + "00090816e256b41e042dce38bde99ab3cf9482f9b066836988d3ed54833638e8", + "3530408e93c251f5f488d3b1c608157177c459d6fab1966abebf765bcc9338d2", + "603269ce88d112ff7fcfcaab82f228be97deca37f8190084d509c71b51a30432", + "f0587414fcc6c56aef11d4a1d287ad6b55b237c5b8a5d5d93eb9ca06f6466ccf", + "763009afb0ffd99c7b835488cb3b0302f3b78f59bbfd5292bedab8ef9da8c1b7", + "3867af9048309a05004a164bdea09899f23ff1d83b6491b2b53a1b7b92e0eb2e", + "688118e6b59e27944c277513db2711a520f4283c7c53a11f58d9f6a46d82c964", + ]; + const PRICE_0: u64 = 49262; + const ATTESTATIONS_0: [&'static str; 20] = [ + "5bc7663195971daaa1e3e6a81b4bca65882791644bc446fc060cbc118a3ace0f", + "721d0cb56a0778a1ca7907f81a0787f34385b13f854c845c4c5539f7f6267958", + "044aeef0d525c8ff48758c80939e95807bc640990cc03f53ab6fc0b262045221", + "79f5175423ec6ee69c8d0e55251db85f3015c2edfa5a03095443fbbf35eb2282", + "233b9ec549e9cc7c702109d29636db85a3ec63a66f3b53444bcc7586d36ca439", + "2961a00320b7c9a70220060019a6ca88e18c205fadd2f873c174e5ccbbed527e", + "bdb76e8f81c39ade4205ead9b68118757fc49ec22769605f26ef904b235283d6", + "6e75dafedf4ed685513ec1f5c93508de4fad2be05b46001ac00c03474f4690e1", + "cfcfc27eb9273b343b3042f0386e77efe329066be079788bb00ab47d72f26780", + "2d931ffd2963e74566365674583abc427bdb6ae571c4887d81f1920f0850665d", + "33b6f1112fa046cbc04be44c615e70519702662c1f72d8d49b3c4613614a8a46", + "19e569b15410fa9a758c1a6c211eae8c1547efbe0ac6a7709902be93415f2f09", + "d859dd5c9a58e1836d1eea3ebe7f48198a681d29e5a5cd6922532d2e94a53a1d", + "3387eb2ad5e64cd102167766bb72b447f4a2e5129d161e422f9d41cd7d1cc281", + "db35a9778a1e3abc8d8ab2f4a79346ae2154c9e0b4932d859d1f3e244f67ae76", + "c3be969e8b889cfb2ece71123e6be5538a2d3a1229637b18bccc179073c38059", + "6f73263f430e10b82d0fd06c4ddd3b8a6b58c3e756745bd0d9e71a399e517921", + "0818c9c245d7d2162cd393c562a121f80405a27d22ae465e95030c31ebb4bd24", + "b7c03f0bd6d63bd78ad4ea0f3452ff9717ba65ca42038e6e90a1aa558b7942dc", + "90c4d8ec9f408ccb62a62daa993c20f2f86799e1fdea520c6d060418e55fd216", + ]; +} diff --git a/daemon/tests/harness/mocks/mod.rs b/daemon/tests/harness/mocks/mod.rs new file mode 100644 index 0000000..1a6fb41 --- /dev/null +++ b/daemon/tests/harness/mocks/mod.rs @@ -0,0 +1,3 @@ +pub mod monitor; +pub mod oracle; +pub mod wallet; diff --git a/daemon/tests/harness/mocks/monitor.rs b/daemon/tests/harness/mocks/monitor.rs new file mode 100644 index 0000000..b03a394 --- /dev/null +++ b/daemon/tests/harness/mocks/monitor.rs @@ -0,0 +1,29 @@ +use daemon::{monitor, oracle}; +use xtra_productivity::xtra_productivity; + +/// Test Stub simulating the Monitor actor +pub struct Monitor; +impl xtra::Actor for Monitor {} + +#[xtra_productivity(message_impl = false)] +impl Monitor { + async fn handle(&mut self, _msg: monitor::Sync) {} + + async fn handle(&mut self, _msg: monitor::StartMonitoring) { + todo!("stub this if needed") + } + + async fn handle(&mut self, _msg: monitor::CollaborativeSettlement) { + todo!("stub this if needed") + } + + async fn handle(&mut self, _msg: oracle::Attestation) { + todo!("stub this if needed") + } +} + +impl Default for Monitor { + fn default() -> Self { + Monitor + } +} diff --git a/daemon/tests/harness/mocks/oracle.rs b/daemon/tests/harness/mocks/oracle.rs new file mode 100644 index 0000000..36aeaf5 --- /dev/null +++ b/daemon/tests/harness/mocks/oracle.rs @@ -0,0 +1,45 @@ +use daemon::model::BitMexPriceEventId; +use daemon::oracle; +use time::OffsetDateTime; +use xtra_productivity::xtra_productivity; + +pub struct Oracle { + announcement: Option, +} + +impl Oracle { + pub fn with_dummy_announcement( + mut self, + dummy_announcement: cfd_protocol::Announcement, + ) -> Self { + self.announcement = Some(oracle::Announcement { + id: BitMexPriceEventId::new(OffsetDateTime::UNIX_EPOCH, 0), + expected_outcome_time: OffsetDateTime::now_utc(), + nonce_pks: dummy_announcement.nonce_pks, + }); + + self + } +} + +impl xtra::Actor for Oracle {} + +#[xtra_productivity(message_impl = false)] +impl Oracle { + async fn handle_get_announcement( + &mut self, + _msg: oracle::GetAnnouncement, + ) -> Option { + self.announcement.clone() + } + + async fn handle(&mut self, _msg: oracle::MonitorAttestation) {} + + async fn handle(&mut self, _msg: oracle::Sync) {} +} + +impl Default for Oracle { + fn default() -> Self { + Oracle { announcement: None } + } +} diff --git a/daemon/tests/harness/mocks/wallet.rs b/daemon/tests/harness/mocks/wallet.rs new file mode 100644 index 0000000..9883051 --- /dev/null +++ b/daemon/tests/harness/mocks/wallet.rs @@ -0,0 +1,93 @@ +use crate::harness::cfd_protocol::dummy_wallet; +use anyhow::{anyhow, Result}; +use bdk::bitcoin::util::psbt::PartiallySignedTransaction; +use bdk::bitcoin::{ecdsa, Amount, Txid}; +use cfd_protocol::secp256k1_zkp::Secp256k1; +use cfd_protocol::{PartyParams, WalletExt}; +use daemon::model::{Timestamp, WalletInfo}; +use daemon::wallet; +use rand::thread_rng; +use xtra_productivity::xtra_productivity; + +pub struct Wallet { + party_params: bool, + wallet_info: Option, + psbt: Option, + txid: Option, +} + +impl Wallet { + pub fn with_party_params(mut self) -> Self { + self.party_params = true; + self + } + + pub fn with_wallet_info(mut self) -> Self { + let s = Secp256k1::new(); + let public_key = ecdsa::PublicKey::new(s.generate_keypair(&mut thread_rng()).1); + let address = bdk::bitcoin::Address::p2pkh(&public_key, bdk::bitcoin::Network::Testnet); + + self.wallet_info = Some(WalletInfo { + balance: bdk::bitcoin::Amount::ONE_BTC, + address, + last_updated_at: Timestamp::now().unwrap(), + }); + + self + } + + pub fn with_psbt(mut self, psbt: PartiallySignedTransaction) -> Self { + self.psbt = Some(psbt); + self + } + + pub fn with_txid(mut self, txid: Txid) -> Self { + self.txid = Some(txid); + self + } +} + +impl xtra::Actor for Wallet {} + +#[xtra_productivity(message_impl = false)] +impl Wallet { + async fn handle(&mut self, msg: wallet::BuildPartyParams) -> Result { + if self.party_params { + let mut rng = thread_rng(); + let wallet = dummy_wallet(&mut rng, Amount::from_btc(0.4).unwrap(), 5).unwrap(); + + let party_params = wallet + .build_party_params(msg.amount, msg.identity_pk) + .unwrap(); + + return Ok(party_params); + } + + panic!("Stub not configured to return party params") + } + async fn handle(&mut self, _msg: wallet::Sync) -> Result { + self.wallet_info + .clone() + .ok_or_else(|| anyhow!("Stub not configured to return WalletInfo")) + } + async fn handle(&mut self, _msg: wallet::Sign) -> Result { + self.psbt + .clone() + .ok_or_else(|| anyhow!("Stub not configured to return PartiallySignedTransaction")) + } + async fn handle(&mut self, _msg: wallet::TryBroadcastTransaction) -> Result { + self.txid + .ok_or_else(|| anyhow!("Stub not configured to return Txid")) + } +} + +impl Default for Wallet { + fn default() -> Self { + Wallet { + party_params: false, + wallet_info: None, + psbt: None, + txid: None, + } + } +} diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs new file mode 100644 index 0000000..9c89517 --- /dev/null +++ b/daemon/tests/harness/mod.rs @@ -0,0 +1,192 @@ +use crate::harness::mocks::monitor::Monitor; +use crate::harness::mocks::oracle::Oracle; +use crate::harness::mocks::wallet::Wallet; +use crate::schnorrsig; +use daemon::maker_cfd::CfdAction; +use daemon::model::cfd::{Cfd, Order}; +use daemon::model::Usd; +use daemon::{connection, db, maker_cfd, maker_inc_connections, taker_cfd}; +use sqlx::SqlitePool; +use std::net::SocketAddr; +use std::str::FromStr; +use std::task::Poll; +use tokio::sync::watch; +use xtra::spawn::TokioGlobalSpawnExt; +use xtra::Actor; + +pub mod bdk; +pub mod cfd_protocol; +pub mod mocks; + +// TODO: Remove all these parameters once we have the mock framework (because we have +// Arcs then and can just default inside) +pub async fn start_both( + taker_oracle: Oracle, + taker_monitor: Monitor, + taker_wallet: Wallet, + maker_oracle: Oracle, + maker_monitor: Monitor, + maker_wallet: Wallet, +) -> (Maker, Taker) { + let oracle_pk: schnorrsig::PublicKey = schnorrsig::PublicKey::from_str( + "ddd4636845a90185991826be5a494cde9f4a6947b1727217afedc6292fa4caf7", + ) + .unwrap(); + + let maker = Maker::start(oracle_pk, maker_oracle, maker_monitor, maker_wallet).await; + let taker = Taker::start( + oracle_pk, + maker.listen_addr, + taker_oracle, + taker_monitor, + taker_wallet, + ) + .await; + (maker, taker) +} + +/// Maker Test Setup +#[derive(Clone)] +pub struct Maker { + pub cfd_actor_addr: + xtra::Address>, + pub order_feed: watch::Receiver>, + pub cfd_feed: watch::Receiver>, + #[allow(dead_code)] // we need to keep the xtra::Address for refcounting + pub inc_conn_actor_addr: xtra::Address, + pub listen_addr: SocketAddr, +} + +impl Maker { + pub async fn start( + oracle_pk: schnorrsig::PublicKey, + oracle: Oracle, + monitor: Monitor, + wallet: Wallet, + ) -> Self { + let db = in_memory_db().await; + + let wallet_addr = wallet.create(None).spawn_global(); + + let settlement_time_interval_hours = time::Duration::hours(24); + + let maker = daemon::MakerActorSystem::new( + db, + wallet_addr, + oracle_pk, + |_, _| oracle, + |_, _| async { Ok(monitor) }, + |channel0, channel1| maker_inc_connections::Actor::new(channel0, channel1), + settlement_time_interval_hours, + ) + .await + .unwrap(); + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + + let address = listener.local_addr().unwrap(); + + let listener_stream = futures::stream::poll_fn(move |ctx| { + let message = match futures::ready!(listener.poll_accept(ctx)) { + Ok((stream, address)) => { + dbg!("new connection"); + maker_inc_connections::ListenerMessage::NewConnection { stream, address } + } + Err(e) => maker_inc_connections::ListenerMessage::Error { source: e }, + }; + + Poll::Ready(Some(message)) + }); + + tokio::spawn(maker.inc_conn_addr.clone().attach_stream(listener_stream)); + + Self { + cfd_actor_addr: maker.cfd_actor_addr, + order_feed: maker.order_feed_receiver, + cfd_feed: maker.cfd_feed_receiver, + inc_conn_actor_addr: maker.inc_conn_addr, + listen_addr: address, + } + } + + pub fn publish_order(&mut self, new_order_params: maker_cfd::NewOrder) { + self.cfd_actor_addr.do_send(new_order_params).unwrap(); + } + + pub fn reject_take_request(&self, order: Order) { + self.cfd_actor_addr + .do_send(CfdAction::RejectOrder { order_id: order.id }) + .unwrap(); + } + + pub fn accept_take_request(&self, order: Order) { + self.cfd_actor_addr + .do_send(CfdAction::AcceptOrder { order_id: order.id }) + .unwrap(); + } +} + +/// Taker Test Setup +#[derive(Clone)] +pub struct Taker { + pub order_feed: watch::Receiver>, + pub cfd_feed: watch::Receiver>, + pub cfd_actor_addr: xtra::Address>, +} + +impl Taker { + pub async fn start( + oracle_pk: schnorrsig::PublicKey, + maker_address: SocketAddr, + oracle: Oracle, + monitor: Monitor, + wallet: Wallet, + ) -> Self { + let connection::Actor { + send_to_maker, + read_from_maker, + } = connection::Actor::new(maker_address).await; + + let db = in_memory_db().await; + + let wallet_addr = wallet.create(None).spawn_global(); + + let taker = daemon::TakerActorSystem::new( + db, + wallet_addr, + oracle_pk, + send_to_maker, + read_from_maker, + |_, _| oracle, + |_, _| async { Ok(monitor) }, + ) + .await + .unwrap(); + + Self { + order_feed: taker.order_feed_receiver, + cfd_feed: taker.cfd_feed_receiver, + cfd_actor_addr: taker.cfd_actor_addr, + } + } + + pub fn take_order(&self, order: Order, quantity: Usd) { + self.cfd_actor_addr + .do_send(taker_cfd::TakeOffer { + order_id: order.id, + quantity, + }) + .unwrap(); + } +} + +async fn in_memory_db() -> SqlitePool { + // Note: Every :memory: database is distinct from every other. So, opening two database + // connections each with the filename ":memory:" will create two independent in-memory + // databases. see: https://www.sqlite.org/inmemorydb.html + let pool = SqlitePool::connect(":memory:").await.unwrap(); + + db::run_migrations(&pool).await.unwrap(); + + pool +}