Browse Source

Merge #453

453: Actor test accept r=da-kami a=da-kami

passing accept test - might want to merge this work with `@klochowicz` work :)
draft for having a look. commits not cleaned up

Co-authored-by: Daniel Karzel <daniel@comit.network>
burn-down-handle
bors[bot] 3 years ago
committed by GitHub
parent
commit
53e77ee286
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 316
      daemon/tests/happy_path.rs
  2. 29
      daemon/tests/harness/bdk.rs
  3. 143
      daemon/tests/harness/cfd_protocol.rs
  4. 3
      daemon/tests/harness/mocks/mod.rs
  5. 29
      daemon/tests/harness/mocks/monitor.rs
  6. 45
      daemon/tests/harness/mocks/oracle.rs
  7. 93
      daemon/tests/harness/mocks/wallet.rs
  8. 192
      daemon/tests/harness/mod.rs

316
daemon/tests/happy_path.rs

@ -1,39 +1,38 @@
use anyhow::{Context, Result}; use crate::harness::mocks::monitor::Monitor;
use bdk::bitcoin::util::psbt::PartiallySignedTransaction; use crate::harness::mocks::oracle::Oracle;
use bdk::bitcoin::{ecdsa, Txid}; use crate::harness::mocks::wallet::Wallet;
use cfd_protocol::secp256k1_zkp::{schnorrsig, Secp256k1}; use crate::harness::start_both;
use cfd_protocol::PartyParams; use anyhow::Context;
use daemon::maker_cfd::CfdAction; use cfd_protocol::secp256k1_zkp::schnorrsig;
use daemon::maker_cfd;
use daemon::model::cfd::{Cfd, CfdState, Order, Origin}; use daemon::model::cfd::{Cfd, CfdState, Order, Origin};
use daemon::model::{Price, Timestamp, Usd, WalletInfo}; use daemon::model::{Price, Usd};
use daemon::tokio_ext::FutureExt; use daemon::tokio_ext::FutureExt;
use daemon::{
connection, db, maker_cfd, maker_inc_connections, monitor, oracle, taker_cfd, wallet,
};
use rand::thread_rng;
use rust_decimal_macros::dec; use rust_decimal_macros::dec;
use sqlx::SqlitePool;
use std::net::SocketAddr;
use std::str::FromStr;
use std::task::Poll;
use std::time::Duration; use std::time::Duration;
use tokio::sync::watch; use tokio::sync::watch;
use tracing::subscriber::DefaultGuard; use tracing::subscriber::DefaultGuard;
use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::filter::LevelFilter;
use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
use xtra::spawn::TokioGlobalSpawnExt; mod harness;
use xtra::Actor;
use xtra_productivity::xtra_productivity;
#[tokio::test] #[tokio::test]
async fn taker_receives_order_from_maker_on_publication() { async fn taker_receives_order_from_maker_on_publication() {
let _guard = init_tracing(); let _guard = init_tracing();
let (mut maker, mut taker) = start_both().await; let (mut maker, mut taker) = start_both(
Oracle::default(),
Monitor,
Wallet::default(),
Oracle::default(),
Monitor,
Wallet::default(),
)
.await;
assert!(is_next_none(&mut taker.order_feed).await); assert!(is_next_none(&mut taker.order_feed).await);
maker.publish_order(new_dummy_order()); maker.publish_order(dummy_new_order());
let (published, received) = tokio::join!( let (published, received) = tokio::join!(
next_some(&mut maker.order_feed), next_some(&mut maker.order_feed),
@ -46,12 +45,20 @@ async fn taker_receives_order_from_maker_on_publication() {
#[tokio::test] #[tokio::test]
async fn taker_takes_order_and_maker_rejects() { async fn taker_takes_order_and_maker_rejects() {
let _guard = init_tracing(); let _guard = init_tracing();
let (mut maker, mut taker) = start_both().await; let (mut maker, mut taker) = start_both(
Oracle::default(),
Monitor,
Wallet::default().with_wallet_info(),
Oracle::default(),
Monitor,
Wallet::default().with_wallet_info(),
)
.await;
// TODO: Why is this needed? For the cfd stream it is not needed // TODO: Why is this needed? For the cfd stream it is not needed
is_next_none(&mut taker.order_feed).await; is_next_none(&mut taker.order_feed).await;
maker.publish_order(new_dummy_order()); maker.publish_order(dummy_new_order());
let (_, received) = next_order(&mut maker.order_feed, &mut taker.order_feed).await; let (_, received) = next_order(&mut maker.order_feed, &mut taker.order_feed).await;
@ -79,6 +86,55 @@ async fn taker_takes_order_and_maker_rejects() {
assert!(matches!(maker_cfd.state, CfdState::Rejected { .. })); assert!(matches!(maker_cfd.state, CfdState::Rejected { .. }));
} }
#[tokio::test]
async fn taker_takes_order_and_maker_accepts_and_contract_setup() {
let _guard = init_tracing();
let (mut maker, mut taker) = start_both(
Oracle::default()
.with_dummy_announcement(harness::cfd_protocol::OliviaData::example_0().announcement()),
Monitor,
Wallet::default()
.with_wallet_info()
.with_party_params()
.with_psbt(harness::bdk::dummy_partially_signed_transaction())
.with_txid(harness::bdk::dummy_tx_id()),
Oracle::default()
.with_dummy_announcement(harness::cfd_protocol::OliviaData::example_0().announcement()),
Monitor,
Wallet::default()
.with_wallet_info()
.with_party_params()
.with_psbt(harness::bdk::dummy_partially_signed_transaction())
.with_txid(harness::bdk::dummy_tx_id()),
)
.await;
is_next_none(&mut taker.order_feed).await;
maker.publish_order(dummy_new_order());
let (_, received) = next_order(&mut maker.order_feed, &mut taker.order_feed).await;
taker.take_order(received.clone(), Usd::new(dec!(5)));
let (_, _) = next_cfd(&mut taker.cfd_feed, &mut maker.cfd_feed).await;
maker.accept_take_request(received.clone());
let (taker_cfd, maker_cfd) = next_cfd(&mut taker.cfd_feed, &mut maker.cfd_feed).await;
// TODO: More elaborate Cfd assertions
assert_eq!(taker_cfd.order.id, received.id);
assert_eq!(maker_cfd.order.id, received.id);
assert!(matches!(taker_cfd.state, CfdState::ContractSetup { .. }));
assert!(matches!(maker_cfd.state, CfdState::ContractSetup { .. }));
let (taker_cfd, maker_cfd) = next_cfd(&mut taker.cfd_feed, &mut maker.cfd_feed).await;
// TODO: More elaborate Cfd assertions
assert_eq!(taker_cfd.order.id, received.id);
assert_eq!(maker_cfd.order.id, received.id);
assert!(matches!(taker_cfd.state, CfdState::PendingOpen { .. }));
assert!(matches!(maker_cfd.state, CfdState::PendingOpen { .. }));
}
/// The order cannot be directly compared in tests as the origin is different, /// The order cannot be directly compared in tests as the origin is different,
/// therefore wrap the assertion macro in a code that unifies the 'Origin' /// therefore wrap the assertion macro in a code that unifies the 'Origin'
fn assert_is_same_order(a: &Order, b: &Order) { fn assert_is_same_order(a: &Order, b: &Order) {
@ -91,10 +147,10 @@ fn assert_is_same_order(a: &Order, b: &Order) {
assert_eq!(a, b); assert_eq!(a, b);
} }
fn new_dummy_order() -> maker_cfd::NewOrder { fn dummy_new_order() -> maker_cfd::NewOrder {
maker_cfd::NewOrder { maker_cfd::NewOrder {
price: Price::new(dec!(50_000)).expect("unexpected failure"), price: Price::new(dec!(50_000)).expect("unexpected failure"),
min_quantity: Usd::new(dec!(10)), min_quantity: Usd::new(dec!(5)),
max_quantity: Usd::new(dec!(100)), max_quantity: Usd::new(dec!(100)),
} }
} }
@ -152,8 +208,9 @@ async fn next<T>(rx: &mut watch::Receiver<T>) -> T
where where
T: Clone, T: Clone,
{ {
// TODO: Make timeout configurable, only contract setup can take up to 2 min on CI
rx.changed() rx.changed()
.timeout(Duration::from_secs(5)) .timeout(Duration::from_secs(120))
.await .await
.context("Waiting for next element in channel is taking too long, aborting") .context("Waiting for next element in channel is taking too long, aborting")
.unwrap() .unwrap()
@ -185,212 +242,3 @@ fn init_tracing() -> DefaultGuard {
guard guard
} }
/// Test Stub simulating the Oracle actor
struct Oracle;
impl xtra::Actor for Oracle {}
#[xtra_productivity(message_impl = false)]
impl Oracle {
async fn handle_get_announcement(
&mut self,
_msg: oracle::GetAnnouncement,
) -> Option<oracle::Announcement> {
todo!("stub this if needed")
}
async fn handle(&mut self, _msg: oracle::MonitorAttestation) {
todo!("stub this if needed")
}
async fn handle(&mut self, _msg: oracle::Sync) {}
}
/// Test Stub simulating the Monitor actor
struct Monitor;
impl xtra::Actor for Monitor {}
#[xtra_productivity(message_impl = false)]
impl Monitor {
async fn handle(&mut self, _msg: monitor::Sync) {}
async fn handle(&mut self, _msg: monitor::StartMonitoring) {
todo!("stub this if needed")
}
async fn handle(&mut self, _msg: monitor::CollaborativeSettlement) {
todo!("stub this if needed")
}
async fn handle(&mut self, _msg: oracle::Attestation) {
todo!("stub this if needed")
}
}
/// Test Stub simulating the Wallet actor
struct Wallet;
impl xtra::Actor for Wallet {}
#[xtra_productivity(message_impl = false)]
impl Wallet {
async fn handle(&mut self, _msg: wallet::BuildPartyParams) -> Result<PartyParams> {
todo!("stub this if needed")
}
async fn handle(&mut self, _msg: wallet::Sync) -> Result<WalletInfo> {
let s = Secp256k1::new();
let public_key = ecdsa::PublicKey::new(s.generate_keypair(&mut thread_rng()).1);
let address = bdk::bitcoin::Address::p2pkh(&public_key, bdk::bitcoin::Network::Testnet);
Ok(WalletInfo {
balance: bdk::bitcoin::Amount::ONE_BTC,
address,
last_updated_at: Timestamp::now()?,
})
}
async fn handle(&mut self, _msg: wallet::Sign) -> Result<PartiallySignedTransaction> {
todo!("stub this if needed")
}
async fn handle(&mut self, _msg: wallet::TryBroadcastTransaction) -> Result<Txid> {
todo!("stub this if needed")
}
}
/// Maker Test Setup
#[derive(Clone)]
struct Maker {
cfd_actor_addr:
xtra::Address<maker_cfd::Actor<Oracle, Monitor, maker_inc_connections::Actor, Wallet>>,
order_feed: watch::Receiver<Option<Order>>,
cfd_feed: watch::Receiver<Vec<Cfd>>,
#[allow(dead_code)] // we need to keep the xtra::Address for refcounting
inc_conn_actor_addr: xtra::Address<maker_inc_connections::Actor>,
listen_addr: SocketAddr,
}
impl Maker {
async fn start(oracle_pk: schnorrsig::PublicKey) -> Self {
let db = in_memory_db().await;
let wallet_addr = Wallet {}.create(None).spawn_global();
let settlement_time_interval_hours = time::Duration::hours(24);
let maker = daemon::MakerActorSystem::new(
db,
wallet_addr,
oracle_pk,
|_, _| Oracle,
|_, _| async { Ok(Monitor) },
|channel0, channel1| maker_inc_connections::Actor::new(channel0, channel1),
settlement_time_interval_hours,
)
.await
.unwrap();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let address = listener.local_addr().unwrap();
let listener_stream = futures::stream::poll_fn(move |ctx| {
let message = match futures::ready!(listener.poll_accept(ctx)) {
Ok((stream, address)) => {
dbg!("new connection");
maker_inc_connections::ListenerMessage::NewConnection { stream, address }
}
Err(e) => maker_inc_connections::ListenerMessage::Error { source: e },
};
Poll::Ready(Some(message))
});
tokio::spawn(maker.inc_conn_addr.clone().attach_stream(listener_stream));
Self {
cfd_actor_addr: maker.cfd_actor_addr,
order_feed: maker.order_feed_receiver,
cfd_feed: maker.cfd_feed_receiver,
inc_conn_actor_addr: maker.inc_conn_addr,
listen_addr: address,
}
}
fn publish_order(&mut self, new_order_params: maker_cfd::NewOrder) {
self.cfd_actor_addr.do_send(new_order_params).unwrap();
}
fn reject_take_request(&self, order: Order) {
self.cfd_actor_addr
.do_send(CfdAction::RejectOrder { order_id: order.id })
.unwrap();
}
}
/// Taker Test Setup
#[derive(Clone)]
struct Taker {
order_feed: watch::Receiver<Option<Order>>,
cfd_feed: watch::Receiver<Vec<Cfd>>,
cfd_actor_addr: xtra::Address<taker_cfd::Actor<Oracle, Monitor, Wallet>>,
}
impl Taker {
async fn start(oracle_pk: schnorrsig::PublicKey, maker_address: SocketAddr) -> Self {
let connection::Actor {
send_to_maker,
read_from_maker,
} = connection::Actor::new(maker_address).await;
let db = in_memory_db().await;
let wallet_addr = Wallet {}.create(None).spawn_global();
let taker = daemon::TakerActorSystem::new(
db,
wallet_addr,
oracle_pk,
send_to_maker,
read_from_maker,
|_, _| Oracle,
|_, _| async { Ok(Monitor) },
)
.await
.unwrap();
Self {
order_feed: taker.order_feed_receiver,
cfd_feed: taker.cfd_feed_receiver,
cfd_actor_addr: taker.cfd_actor_addr,
}
}
fn take_order(&self, order: Order, quantity: Usd) {
self.cfd_actor_addr
.do_send(taker_cfd::TakeOffer {
order_id: order.id,
quantity,
})
.unwrap();
}
}
async fn start_both() -> (Maker, Taker) {
let oracle_pk: schnorrsig::PublicKey = schnorrsig::PublicKey::from_str(
"ddd4636845a90185991826be5a494cde9f4a6947b1727217afedc6292fa4caf7",
)
.unwrap();
let maker = Maker::start(oracle_pk).await;
let taker = Taker::start(oracle_pk, maker.listen_addr).await;
(maker, taker)
}
async fn in_memory_db() -> SqlitePool {
// Note: Every :memory: database is distinct from every other. So, opening two database
// connections each with the filename ":memory:" will create two independent in-memory
// databases. see: https://www.sqlite.org/inmemorydb.html
let pool = SqlitePool::connect(":memory:").await.unwrap();
db::run_migrations(&pool).await.unwrap();
pool
}

29
daemon/tests/harness/bdk.rs

@ -0,0 +1,29 @@
use bdk::bitcoin::util::psbt::{Global, PartiallySignedTransaction};
use bdk::bitcoin::{Transaction, Txid};
use std::collections::BTreeMap;
pub fn dummy_partially_signed_transaction() -> PartiallySignedTransaction {
// very simple dummy psbt that does not contain anything
// pulled in from github.com-1ecc6299db9ec823/bitcoin-0.27.1/src/util/psbt/mod.rs:238
PartiallySignedTransaction {
global: Global {
unsigned_tx: Transaction {
version: 2,
lock_time: 0,
input: vec![],
output: vec![],
},
xpub: Default::default(),
version: 0,
proprietary: BTreeMap::new(),
unknown: BTreeMap::new(),
},
inputs: vec![],
outputs: vec![],
}
}
pub fn dummy_tx_id() -> Txid {
dummy_partially_signed_transaction().extract_tx().txid()
}

143
daemon/tests/harness/cfd_protocol.rs

@ -0,0 +1,143 @@
use anyhow::Result;
use bdk::bitcoin;
use bdk::bitcoin::util::bip32::ExtendedPrivKey;
use bdk::bitcoin::{Amount, Network};
use cfd_protocol::secp256k1_zkp::rand::{CryptoRng, RngCore};
use cfd_protocol::secp256k1_zkp::{schnorrsig, SecretKey};
use cfd_protocol::Announcement;
use std::str::FromStr;
pub fn dummy_wallet(
rng: &mut (impl RngCore + CryptoRng),
utxo_amount: Amount,
num_utxos: u8,
) -> Result<bdk::Wallet<(), bdk::database::MemoryDatabase>> {
use bdk::{populate_test_db, testutils};
let mut seed = [0u8; 32];
rng.fill_bytes(&mut seed);
let key = ExtendedPrivKey::new_master(Network::Regtest, &seed)?;
let descriptors = testutils!(@descriptors (&format!("wpkh({}/*)", key)));
let mut database = bdk::database::MemoryDatabase::new();
for index in 0..num_utxos {
populate_test_db!(
&mut database,
testutils! {
@tx ( (@external descriptors, index as u32) => utxo_amount.as_sat() ) (@confirmations 1)
},
Some(100)
);
}
let wallet = bdk::Wallet::new_offline(&descriptors.0, None, Network::Regtest, database)?;
Ok(wallet)
}
#[allow(dead_code)]
pub struct OliviaData {
id: String,
pk: schnorrsig::PublicKey,
nonce_pks: Vec<schnorrsig::PublicKey>,
price: u64,
attestations: Vec<SecretKey>,
}
impl OliviaData {
// Note: protocol tests have example 1 as well, but so far we are not asserting on that level of
// granularity; example 1 can be pulled in once needed
pub fn example_0() -> Self {
Self::example(
Self::EVENT_ID_0,
Self::PRICE_0,
&Self::NONCE_PKS_0,
&Self::ATTESTATIONS_0,
)
}
/// Generate an example of all the data from `olivia` needed to test the
/// CFD protocol end-to-end.
fn example(id: &str, price: u64, nonce_pks: &[&str], attestations: &[&str]) -> Self {
let oracle_pk = schnorrsig::PublicKey::from_str(Self::OLIVIA_PK).unwrap();
let id = id.to_string();
let nonce_pks = nonce_pks
.iter()
.map(|pk| schnorrsig::PublicKey::from_str(pk).unwrap())
.collect();
let attestations = attestations
.iter()
.map(|pk| SecretKey::from_str(pk).unwrap())
.collect();
Self {
id,
pk: oracle_pk,
nonce_pks,
attestations,
price,
}
}
pub fn announcement(&self) -> Announcement {
Announcement {
id: self.id.clone(),
nonce_pks: self.nonce_pks.clone(),
}
}
const OLIVIA_PK: &'static str =
"ddd4636845a90185991826be5a494cde9f4a6947b1727217afedc6292fa4caf7";
const EVENT_ID_0: &'static str = "/x/BitMEX/BXBT/2021-10-05T02:00:00.price?n=20";
const NONCE_PKS_0: [&'static str; 20] = [
"d02d163cf9623f567c4e3faf851a9266ac1ede13da4ca4141f3a7717fba9a739",
"bc310f26aa5addbc382f653d8530aaead7c25e3546abc24639f490e36d4bdb88",
"2661375f570dcc32300d442e85b6d72dfa3232dccda45e8fb4a2d1e758d1d374",
"fcc68fbf071d391b14c0867cb4defb5a8abc12418dff3dfc2f84fd4025cb2716",
"cf5c2b7fe3851c64a7ff9635a9bfc50cdd301401d002f2da049f4c6a20e8457b",
"14f1005d8c2832a2c4666dd732dd9bb3af9c8f70ebcdaec96869b1ca0c8e0de6",
"299ee1c9c20fab8b067adf452a7d7661b5e7f5dd6bc707562805002e7cb8443e",
"bcb4e5a594346de298993a7a31762f598b5224b977e23182369e9ed3e5127f78",
"25e09a16ee5d469069abfb62cd5e1f20af50cf15241f571e64fa28b127304574",
"3ed5a1422f43299caf281123aba88bd4bc61ec863f4afb79c7ce7663ad44db5d",
"a7e0f61212735c192c4bf16e0a3e925e65f9f3feb6f1e5e8d6f5c18cf2dbb5a8",
"a36a631015d9036d0c321fea7cf12f589aa196e7279b4a290de5112c2940e540",
"b5bdd931f81970139e7301ac654b378077c3ed993ca7893ed93fee5fc6f7a782",
"00090816e256b41e042dce38bde99ab3cf9482f9b066836988d3ed54833638e8",
"3530408e93c251f5f488d3b1c608157177c459d6fab1966abebf765bcc9338d2",
"603269ce88d112ff7fcfcaab82f228be97deca37f8190084d509c71b51a30432",
"f0587414fcc6c56aef11d4a1d287ad6b55b237c5b8a5d5d93eb9ca06f6466ccf",
"763009afb0ffd99c7b835488cb3b0302f3b78f59bbfd5292bedab8ef9da8c1b7",
"3867af9048309a05004a164bdea09899f23ff1d83b6491b2b53a1b7b92e0eb2e",
"688118e6b59e27944c277513db2711a520f4283c7c53a11f58d9f6a46d82c964",
];
const PRICE_0: u64 = 49262;
const ATTESTATIONS_0: [&'static str; 20] = [
"5bc7663195971daaa1e3e6a81b4bca65882791644bc446fc060cbc118a3ace0f",
"721d0cb56a0778a1ca7907f81a0787f34385b13f854c845c4c5539f7f6267958",
"044aeef0d525c8ff48758c80939e95807bc640990cc03f53ab6fc0b262045221",
"79f5175423ec6ee69c8d0e55251db85f3015c2edfa5a03095443fbbf35eb2282",
"233b9ec549e9cc7c702109d29636db85a3ec63a66f3b53444bcc7586d36ca439",
"2961a00320b7c9a70220060019a6ca88e18c205fadd2f873c174e5ccbbed527e",
"bdb76e8f81c39ade4205ead9b68118757fc49ec22769605f26ef904b235283d6",
"6e75dafedf4ed685513ec1f5c93508de4fad2be05b46001ac00c03474f4690e1",
"cfcfc27eb9273b343b3042f0386e77efe329066be079788bb00ab47d72f26780",
"2d931ffd2963e74566365674583abc427bdb6ae571c4887d81f1920f0850665d",
"33b6f1112fa046cbc04be44c615e70519702662c1f72d8d49b3c4613614a8a46",
"19e569b15410fa9a758c1a6c211eae8c1547efbe0ac6a7709902be93415f2f09",
"d859dd5c9a58e1836d1eea3ebe7f48198a681d29e5a5cd6922532d2e94a53a1d",
"3387eb2ad5e64cd102167766bb72b447f4a2e5129d161e422f9d41cd7d1cc281",
"db35a9778a1e3abc8d8ab2f4a79346ae2154c9e0b4932d859d1f3e244f67ae76",
"c3be969e8b889cfb2ece71123e6be5538a2d3a1229637b18bccc179073c38059",
"6f73263f430e10b82d0fd06c4ddd3b8a6b58c3e756745bd0d9e71a399e517921",
"0818c9c245d7d2162cd393c562a121f80405a27d22ae465e95030c31ebb4bd24",
"b7c03f0bd6d63bd78ad4ea0f3452ff9717ba65ca42038e6e90a1aa558b7942dc",
"90c4d8ec9f408ccb62a62daa993c20f2f86799e1fdea520c6d060418e55fd216",
];
}

3
daemon/tests/harness/mocks/mod.rs

@ -0,0 +1,3 @@
pub mod monitor;
pub mod oracle;
pub mod wallet;

29
daemon/tests/harness/mocks/monitor.rs

@ -0,0 +1,29 @@
use daemon::{monitor, oracle};
use xtra_productivity::xtra_productivity;
/// Test Stub simulating the Monitor actor
pub struct Monitor;
impl xtra::Actor for Monitor {}
#[xtra_productivity(message_impl = false)]
impl Monitor {
async fn handle(&mut self, _msg: monitor::Sync) {}
async fn handle(&mut self, _msg: monitor::StartMonitoring) {
todo!("stub this if needed")
}
async fn handle(&mut self, _msg: monitor::CollaborativeSettlement) {
todo!("stub this if needed")
}
async fn handle(&mut self, _msg: oracle::Attestation) {
todo!("stub this if needed")
}
}
impl Default for Monitor {
fn default() -> Self {
Monitor
}
}

45
daemon/tests/harness/mocks/oracle.rs

@ -0,0 +1,45 @@
use daemon::model::BitMexPriceEventId;
use daemon::oracle;
use time::OffsetDateTime;
use xtra_productivity::xtra_productivity;
pub struct Oracle {
announcement: Option<oracle::Announcement>,
}
impl Oracle {
pub fn with_dummy_announcement(
mut self,
dummy_announcement: cfd_protocol::Announcement,
) -> Self {
self.announcement = Some(oracle::Announcement {
id: BitMexPriceEventId::new(OffsetDateTime::UNIX_EPOCH, 0),
expected_outcome_time: OffsetDateTime::now_utc(),
nonce_pks: dummy_announcement.nonce_pks,
});
self
}
}
impl xtra::Actor for Oracle {}
#[xtra_productivity(message_impl = false)]
impl Oracle {
async fn handle_get_announcement(
&mut self,
_msg: oracle::GetAnnouncement,
) -> Option<oracle::Announcement> {
self.announcement.clone()
}
async fn handle(&mut self, _msg: oracle::MonitorAttestation) {}
async fn handle(&mut self, _msg: oracle::Sync) {}
}
impl Default for Oracle {
fn default() -> Self {
Oracle { announcement: None }
}
}

93
daemon/tests/harness/mocks/wallet.rs

@ -0,0 +1,93 @@
use crate::harness::cfd_protocol::dummy_wallet;
use anyhow::{anyhow, Result};
use bdk::bitcoin::util::psbt::PartiallySignedTransaction;
use bdk::bitcoin::{ecdsa, Amount, Txid};
use cfd_protocol::secp256k1_zkp::Secp256k1;
use cfd_protocol::{PartyParams, WalletExt};
use daemon::model::{Timestamp, WalletInfo};
use daemon::wallet;
use rand::thread_rng;
use xtra_productivity::xtra_productivity;
pub struct Wallet {
party_params: bool,
wallet_info: Option<WalletInfo>,
psbt: Option<PartiallySignedTransaction>,
txid: Option<Txid>,
}
impl Wallet {
pub fn with_party_params(mut self) -> Self {
self.party_params = true;
self
}
pub fn with_wallet_info(mut self) -> Self {
let s = Secp256k1::new();
let public_key = ecdsa::PublicKey::new(s.generate_keypair(&mut thread_rng()).1);
let address = bdk::bitcoin::Address::p2pkh(&public_key, bdk::bitcoin::Network::Testnet);
self.wallet_info = Some(WalletInfo {
balance: bdk::bitcoin::Amount::ONE_BTC,
address,
last_updated_at: Timestamp::now().unwrap(),
});
self
}
pub fn with_psbt(mut self, psbt: PartiallySignedTransaction) -> Self {
self.psbt = Some(psbt);
self
}
pub fn with_txid(mut self, txid: Txid) -> Self {
self.txid = Some(txid);
self
}
}
impl xtra::Actor for Wallet {}
#[xtra_productivity(message_impl = false)]
impl Wallet {
async fn handle(&mut self, msg: wallet::BuildPartyParams) -> Result<PartyParams> {
if self.party_params {
let mut rng = thread_rng();
let wallet = dummy_wallet(&mut rng, Amount::from_btc(0.4).unwrap(), 5).unwrap();
let party_params = wallet
.build_party_params(msg.amount, msg.identity_pk)
.unwrap();
return Ok(party_params);
}
panic!("Stub not configured to return party params")
}
async fn handle(&mut self, _msg: wallet::Sync) -> Result<WalletInfo> {
self.wallet_info
.clone()
.ok_or_else(|| anyhow!("Stub not configured to return WalletInfo"))
}
async fn handle(&mut self, _msg: wallet::Sign) -> Result<PartiallySignedTransaction> {
self.psbt
.clone()
.ok_or_else(|| anyhow!("Stub not configured to return PartiallySignedTransaction"))
}
async fn handle(&mut self, _msg: wallet::TryBroadcastTransaction) -> Result<Txid> {
self.txid
.ok_or_else(|| anyhow!("Stub not configured to return Txid"))
}
}
impl Default for Wallet {
fn default() -> Self {
Wallet {
party_params: false,
wallet_info: None,
psbt: None,
txid: None,
}
}
}

192
daemon/tests/harness/mod.rs

@ -0,0 +1,192 @@
use crate::harness::mocks::monitor::Monitor;
use crate::harness::mocks::oracle::Oracle;
use crate::harness::mocks::wallet::Wallet;
use crate::schnorrsig;
use daemon::maker_cfd::CfdAction;
use daemon::model::cfd::{Cfd, Order};
use daemon::model::Usd;
use daemon::{connection, db, maker_cfd, maker_inc_connections, taker_cfd};
use sqlx::SqlitePool;
use std::net::SocketAddr;
use std::str::FromStr;
use std::task::Poll;
use tokio::sync::watch;
use xtra::spawn::TokioGlobalSpawnExt;
use xtra::Actor;
pub mod bdk;
pub mod cfd_protocol;
pub mod mocks;
// TODO: Remove all these parameters once we have the mock framework (because we have
// Arcs then and can just default inside)
pub async fn start_both(
taker_oracle: Oracle,
taker_monitor: Monitor,
taker_wallet: Wallet,
maker_oracle: Oracle,
maker_monitor: Monitor,
maker_wallet: Wallet,
) -> (Maker, Taker) {
let oracle_pk: schnorrsig::PublicKey = schnorrsig::PublicKey::from_str(
"ddd4636845a90185991826be5a494cde9f4a6947b1727217afedc6292fa4caf7",
)
.unwrap();
let maker = Maker::start(oracle_pk, maker_oracle, maker_monitor, maker_wallet).await;
let taker = Taker::start(
oracle_pk,
maker.listen_addr,
taker_oracle,
taker_monitor,
taker_wallet,
)
.await;
(maker, taker)
}
/// Maker Test Setup
#[derive(Clone)]
pub struct Maker {
pub cfd_actor_addr:
xtra::Address<maker_cfd::Actor<Oracle, Monitor, maker_inc_connections::Actor, Wallet>>,
pub order_feed: watch::Receiver<Option<Order>>,
pub cfd_feed: watch::Receiver<Vec<Cfd>>,
#[allow(dead_code)] // we need to keep the xtra::Address for refcounting
pub inc_conn_actor_addr: xtra::Address<maker_inc_connections::Actor>,
pub listen_addr: SocketAddr,
}
impl Maker {
pub async fn start(
oracle_pk: schnorrsig::PublicKey,
oracle: Oracle,
monitor: Monitor,
wallet: Wallet,
) -> Self {
let db = in_memory_db().await;
let wallet_addr = wallet.create(None).spawn_global();
let settlement_time_interval_hours = time::Duration::hours(24);
let maker = daemon::MakerActorSystem::new(
db,
wallet_addr,
oracle_pk,
|_, _| oracle,
|_, _| async { Ok(monitor) },
|channel0, channel1| maker_inc_connections::Actor::new(channel0, channel1),
settlement_time_interval_hours,
)
.await
.unwrap();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let address = listener.local_addr().unwrap();
let listener_stream = futures::stream::poll_fn(move |ctx| {
let message = match futures::ready!(listener.poll_accept(ctx)) {
Ok((stream, address)) => {
dbg!("new connection");
maker_inc_connections::ListenerMessage::NewConnection { stream, address }
}
Err(e) => maker_inc_connections::ListenerMessage::Error { source: e },
};
Poll::Ready(Some(message))
});
tokio::spawn(maker.inc_conn_addr.clone().attach_stream(listener_stream));
Self {
cfd_actor_addr: maker.cfd_actor_addr,
order_feed: maker.order_feed_receiver,
cfd_feed: maker.cfd_feed_receiver,
inc_conn_actor_addr: maker.inc_conn_addr,
listen_addr: address,
}
}
pub fn publish_order(&mut self, new_order_params: maker_cfd::NewOrder) {
self.cfd_actor_addr.do_send(new_order_params).unwrap();
}
pub fn reject_take_request(&self, order: Order) {
self.cfd_actor_addr
.do_send(CfdAction::RejectOrder { order_id: order.id })
.unwrap();
}
pub fn accept_take_request(&self, order: Order) {
self.cfd_actor_addr
.do_send(CfdAction::AcceptOrder { order_id: order.id })
.unwrap();
}
}
/// Taker Test Setup
#[derive(Clone)]
pub struct Taker {
pub order_feed: watch::Receiver<Option<Order>>,
pub cfd_feed: watch::Receiver<Vec<Cfd>>,
pub cfd_actor_addr: xtra::Address<taker_cfd::Actor<Oracle, Monitor, Wallet>>,
}
impl Taker {
pub async fn start(
oracle_pk: schnorrsig::PublicKey,
maker_address: SocketAddr,
oracle: Oracle,
monitor: Monitor,
wallet: Wallet,
) -> Self {
let connection::Actor {
send_to_maker,
read_from_maker,
} = connection::Actor::new(maker_address).await;
let db = in_memory_db().await;
let wallet_addr = wallet.create(None).spawn_global();
let taker = daemon::TakerActorSystem::new(
db,
wallet_addr,
oracle_pk,
send_to_maker,
read_from_maker,
|_, _| oracle,
|_, _| async { Ok(monitor) },
)
.await
.unwrap();
Self {
order_feed: taker.order_feed_receiver,
cfd_feed: taker.cfd_feed_receiver,
cfd_actor_addr: taker.cfd_actor_addr,
}
}
pub fn take_order(&self, order: Order, quantity: Usd) {
self.cfd_actor_addr
.do_send(taker_cfd::TakeOffer {
order_id: order.id,
quantity,
})
.unwrap();
}
}
async fn in_memory_db() -> SqlitePool {
// Note: Every :memory: database is distinct from every other. So, opening two database
// connections each with the filename ":memory:" will create two independent in-memory
// databases. see: https://www.sqlite.org/inmemorydb.html
let pool = SqlitePool::connect(":memory:").await.unwrap();
db::run_migrations(&pool).await.unwrap();
pool
}
Loading…
Cancel
Save