From cd8d55986735b0db8150133d50b1cf51f1cf1a1b Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Fri, 22 Oct 2021 14:34:05 +1100 Subject: [PATCH 1/7] Macro attribute so we can control impl of `xtra::Message` Co-authored-by: Mariusz Klochowicz --- xtra_productivity/src/lib.rs | 34 ++++++++++++++++--- .../tests/pass/can_handle_message.rs | 20 +++++++++++ 2 files changed, 49 insertions(+), 5 deletions(-) diff --git a/xtra_productivity/src/lib.rs b/xtra_productivity/src/lib.rs index ca1890e..99e1d26 100644 --- a/xtra_productivity/src/lib.rs +++ b/xtra_productivity/src/lib.rs @@ -1,10 +1,26 @@ use proc_macro::TokenStream; use quote::quote; -use syn::{FnArg, GenericParam, ImplItem, ItemImpl, ReturnType}; +use syn::{FnArg, GenericParam, ImplItem, ItemImpl, MetaNameValue, ReturnType}; #[proc_macro_attribute] -pub fn xtra_productivity(_attribute: TokenStream, item: TokenStream) -> TokenStream { +pub fn xtra_productivity(attribute: TokenStream, item: TokenStream) -> TokenStream { let block = syn::parse::(item).unwrap(); + let want_message_impl = if attribute.is_empty() { + true + } else { + let attribute = syn::parse::(attribute).unwrap(); + if !attribute.path.is_ident("message_impl") { + panic!( + "Unexpected attribute {:?}", + attribute.path.get_ident().unwrap() + ) + } + + matches!( + attribute.lit, + syn::Lit::Bool(syn::LitBool { value: true, .. }) + ) + }; let actor = block.self_ty; @@ -60,10 +76,18 @@ pub fn xtra_productivity(_attribute: TokenStream, item: TokenStream) -> TokenStr ReturnType::Type(_, ref t) => quote! { #t } }; - quote! { - impl xtra::Message for #message_type { - type Result = #result_type; + let message_impl = if want_message_impl { + quote! { + impl xtra::Message for #message_type { + type Result = #result_type; + } } + } else { + quote! {} + }; + + quote! { + #message_impl #[async_trait::async_trait] impl<#generic_params> xtra::Handler<#message_type> for #actor diff --git a/xtra_productivity/tests/pass/can_handle_message.rs b/xtra_productivity/tests/pass/can_handle_message.rs index 14f54ee..91e751d 100644 --- a/xtra_productivity/tests/pass/can_handle_message.rs +++ b/xtra_productivity/tests/pass/can_handle_message.rs @@ -30,6 +30,21 @@ impl DummyActor { fn is_i32(_: i32) {} +struct DummyMessageWithoutMessageImpl; + +#[xtra_productivity(message_impl = false)] +impl DummyActor { + pub fn handle_dummy_message_without_message_impl( + &mut self, + _message: DummyMessageWithoutMessageImpl, + ) { + } +} + +impl xtra::Message for DummyMessageWithoutMessageImpl { + type Result = (); +} + #[tokio::main] async fn main() { // Create dummy actor @@ -39,4 +54,9 @@ async fn main() { let i32 = dummy_actor.send(DummyMessage).await.unwrap(); is_i32(i32); dummy_actor.send(DummyMessageWithContext).await.unwrap(); + + dummy_actor + .send(DummyMessageWithoutMessageImpl) + .await + .unwrap(); } From 532c56acb29075ebafcfd3f7369241497ebe50d7 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Fri, 22 Oct 2021 14:58:37 +1100 Subject: [PATCH 2/7] Move maker's `ActorSystem` to `lib` as `Maker` --- daemon/src/lib.rs | 132 ++++++++++++++++++++++++++++++++++++++++ daemon/src/maker.rs | 142 ++++---------------------------------------- 2 files changed, 143 insertions(+), 131 deletions(-) diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index f2bf2af..f278eeb 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -1,4 +1,21 @@ #![cfg_attr(not(test), warn(clippy::unwrap_used))] +use crate::db::load_all_cfds; +use crate::maker_cfd::{FromTaker, NewTakerOnline}; +use crate::model::cfd::{Cfd, Order, UpdateCfdProposals}; +use crate::oracle::Attestation; +use crate::wallet::Wallet; +use anyhow::Result; +use cfd_protocol::secp256k1_zkp::schnorrsig; +use sqlx::SqlitePool; +use std::collections::HashMap; +use std::future::Future; +use std::task::Poll; +use std::time::Duration; +use tokio::net::TcpListener; +use tokio::sync::watch; +use xtra::message_channel::{MessageChannel, StrongMessageChannel}; +use xtra::spawn::TokioGlobalSpawnExt; +use xtra::{Actor, Address}; pub mod actors; pub mod auth; @@ -28,3 +45,118 @@ pub mod try_continue; pub mod wallet; pub mod wallet_sync; pub mod wire; + +pub struct Maker { + pub cfd_actor_addr: Address>, + pub cfd_feed_receiver: watch::Receiver>, + pub order_feed_receiver: watch::Receiver>, + pub update_cfd_feed_receiver: watch::Receiver, +} + +impl Maker +where + O: xtra::Handler + + xtra::Handler + + xtra::Handler + + xtra::Handler, + M: xtra::Handler + + xtra::Handler + + xtra::Handler + + xtra::Handler, + T: xtra::Handler + + xtra::Handler + + xtra::Handler, +{ + pub async fn new( + db: SqlitePool, + wallet: Wallet, + oracle_pk: schnorrsig::PublicKey, + oracle_constructor: impl Fn(Vec, Box>) -> O, + monitor_constructor: impl Fn(Box>, Vec) -> F, + inc_conn_constructor: impl Fn( + Box>, + Box>, + ) -> T, + listener: TcpListener, + term: time::Duration, + ) -> Result + where + F: Future>, + { + let mut conn = db.acquire().await?; + + let cfds = load_all_cfds(&mut conn).await?; + + let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); + let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); + let (update_cfd_feed_sender, update_cfd_feed_receiver) = + watch::channel::(HashMap::new()); + + let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None); + let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None); + let (inc_conn_addr, inc_conn_ctx) = xtra::Context::new(None); + + let cfd_actor_addr = maker_cfd::Actor::new( + db, + wallet, + term, + oracle_pk, + cfd_feed_sender, + order_feed_sender, + update_cfd_feed_sender, + inc_conn_addr.clone(), + monitor_addr.clone(), + oracle_addr.clone(), + ) + .create(None) + .spawn_global(); + + tokio::spawn(inc_conn_ctx.run(inc_conn_constructor( + Box::new(cfd_actor_addr.clone()), + Box::new(cfd_actor_addr.clone()), + ))); + + tokio::spawn( + monitor_ctx + .notify_interval(Duration::from_secs(20), || monitor::Sync) + .map_err(|e| anyhow::anyhow!(e))?, + ); + tokio::spawn( + monitor_ctx + .run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?), + ); + + tokio::spawn( + oracle_ctx + .notify_interval(Duration::from_secs(5), || oracle::Sync) + .map_err(|e| anyhow::anyhow!(e))?, + ); + let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr]) + .create(None) + .spawn_global(); + + tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor)))); + + oracle_addr.do_send_async(oracle::Sync).await?; + + let listener_stream = futures::stream::poll_fn(move |ctx| { + let message = match futures::ready!(listener.poll_accept(ctx)) { + Ok((stream, address)) => { + maker_inc_connections::ListenerMessage::NewConnection { stream, address } + } + Err(e) => maker_inc_connections::ListenerMessage::Error { source: e }, + }; + + Poll::Ready(Some(message)) + }); + + tokio::spawn(inc_conn_addr.attach_stream(listener_stream)); + + Ok(Self { + cfd_actor_addr, + cfd_feed_receiver, + order_feed_receiver, + update_cfd_feed_receiver, + }) + } +} diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index eac861e..5575419 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -3,31 +3,27 @@ use bdk::bitcoin; use bdk::bitcoin::secp256k1::schnorrsig; use clap::Clap; use daemon::auth::{self, MAKER_USERNAME}; -use daemon::db::{self, load_all_cfds}; -use daemon::maker_cfd::{FromTaker, NewTakerOnline}; -use daemon::model::cfd::{Cfd, Order, UpdateCfdProposals}; +use daemon::db::{self}; + use daemon::model::WalletInfo; -use daemon::oracle::Attestation; + use daemon::seed::Seed; use daemon::wallet::Wallet; use daemon::{ - bitmex_price_feed, fan_out, housekeeping, logger, maker_cfd, maker_inc_connections, monitor, - oracle, wallet_sync, + bitmex_price_feed, housekeeping, logger, maker_cfd, maker_inc_connections, monitor, oracle, + wallet_sync, Maker, }; -use futures::Future; + use sqlx::sqlite::SqliteConnectOptions; use sqlx::SqlitePool; -use std::collections::HashMap; +use xtra::prelude::MessageChannel; + use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; -use std::task::Poll; -use std::time::Duration; -use tokio::net::TcpListener; + use tokio::sync::watch; use tracing_subscriber::filter::LevelFilter; -use xtra::prelude::*; -use xtra::spawn::TokioGlobalSpawnExt; mod routes_maker; @@ -189,12 +185,12 @@ async fn main() -> Result<()> { housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn).await?; housekeeping::rebroadcast_transactions(&mut conn, &wallet).await?; - let ActorSystem { + let Maker { cfd_actor_addr, cfd_feed_receiver, order_feed_receiver, update_cfd_feed_receiver, - } = ActorSystem::new( + } = Maker::new( db.clone(), wallet.clone(), oracle, @@ -248,119 +244,3 @@ async fn main() -> Result<()> { Ok(()) } - -pub struct ActorSystem { - cfd_actor_addr: Address>, - cfd_feed_receiver: watch::Receiver>, - order_feed_receiver: watch::Receiver>, - update_cfd_feed_receiver: watch::Receiver, -} - -impl ActorSystem -where - O: xtra::Handler - + xtra::Handler - + xtra::Handler - + xtra::Handler, - M: xtra::Handler - + xtra::Handler - + xtra::Handler - + xtra::Handler, - T: xtra::Handler - + xtra::Handler - + xtra::Handler, -{ - #[allow(clippy::too_many_arguments)] - pub async fn new( - db: SqlitePool, - wallet: Wallet, - oracle_pk: schnorrsig::PublicKey, - oracle_constructor: impl Fn(Vec, Box>) -> O, - monitor_constructor: impl Fn(Box>, Vec) -> F, - inc_conn_constructor: impl Fn( - Box>, - Box>, - ) -> T, - listener: TcpListener, - term: time::Duration, - ) -> Result - where - F: Future>, - { - let mut conn = db.acquire().await?; - - let cfds = load_all_cfds(&mut conn).await?; - - let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); - let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); - let (update_cfd_feed_sender, update_cfd_feed_receiver) = - watch::channel::(HashMap::new()); - - let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None); - let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None); - let (inc_conn_addr, inc_conn_ctx) = xtra::Context::new(None); - - let cfd_actor_addr = maker_cfd::Actor::new( - db, - wallet, - term, - oracle_pk, - cfd_feed_sender, - order_feed_sender, - update_cfd_feed_sender, - inc_conn_addr.clone(), - monitor_addr.clone(), - oracle_addr.clone(), - ) - .create(None) - .spawn_global(); - - tokio::spawn(inc_conn_ctx.run(inc_conn_constructor( - Box::new(cfd_actor_addr.clone()), - Box::new(cfd_actor_addr.clone()), - ))); - - tokio::spawn( - monitor_ctx - .notify_interval(Duration::from_secs(20), || monitor::Sync) - .map_err(|e| anyhow::anyhow!(e))?, - ); - tokio::spawn( - monitor_ctx - .run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?), - ); - - tokio::spawn( - oracle_ctx - .notify_interval(Duration::from_secs(5), || oracle::Sync) - .map_err(|e| anyhow::anyhow!(e))?, - ); - let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr]) - .create(None) - .spawn_global(); - - tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor)))); - - oracle_addr.do_send_async(oracle::Sync).await?; - - let listener_stream = futures::stream::poll_fn(move |ctx| { - let message = match futures::ready!(listener.poll_accept(ctx)) { - Ok((stream, address)) => { - maker_inc_connections::ListenerMessage::NewConnection { stream, address } - } - Err(e) => maker_inc_connections::ListenerMessage::Error { source: e }, - }; - - Poll::Ready(Some(message)) - }); - - tokio::spawn(inc_conn_addr.attach_stream(listener_stream)); - - Ok(Self { - cfd_actor_addr, - cfd_feed_receiver, - order_feed_receiver, - update_cfd_feed_receiver, - }) - } -} From 1d6140c5169dfe9a99291ae385fa6a0db35688f8 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Fri, 22 Oct 2021 15:03:45 +1100 Subject: [PATCH 3/7] Move taker's `ActorSystem` to `lib` as `Taker` --- daemon/src/lib.rs | 89 +++++++++++++++++++++++++++++++++++ daemon/src/taker.rs | 110 ++++---------------------------------------- 2 files changed, 99 insertions(+), 100 deletions(-) diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index f278eeb..c4fac5b 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -6,6 +6,7 @@ use crate::oracle::Attestation; use crate::wallet::Wallet; use anyhow::Result; use cfd_protocol::secp256k1_zkp::schnorrsig; +use futures::Stream; use sqlx::SqlitePool; use std::collections::HashMap; use std::future::Future; @@ -160,3 +161,91 @@ where }) } } + +pub struct Taker { + pub cfd_actor_addr: Address>, + pub cfd_feed_receiver: watch::Receiver>, + pub order_feed_receiver: watch::Receiver>, + pub update_cfd_feed_receiver: watch::Receiver, +} + +impl Taker +where + O: xtra::Handler + + xtra::Handler + + xtra::Handler + + xtra::Handler, + M: xtra::Handler + + xtra::Handler + + xtra::Handler + + xtra::Handler, +{ + pub async fn new( + db: SqlitePool, + wallet: Wallet, + oracle_pk: schnorrsig::PublicKey, + send_to_maker: Box>, + read_from_maker: Box + Unpin + Send>, + oracle_constructor: impl Fn(Vec, Box>) -> O, + monitor_constructor: impl Fn(Box>, Vec) -> F, + ) -> Result + where + F: Future>, + { + let mut conn = db.acquire().await?; + + let cfds = load_all_cfds(&mut conn).await?; + + let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); + let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); + let (update_cfd_feed_sender, update_cfd_feed_receiver) = + watch::channel::(HashMap::new()); + + let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None); + let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None); + + let cfd_actor_addr = taker_cfd::Actor::new( + db, + wallet, + oracle_pk, + cfd_feed_sender, + order_feed_sender, + update_cfd_feed_sender, + send_to_maker, + monitor_addr.clone(), + oracle_addr, + ) + .create(None) + .spawn_global(); + + tokio::spawn(cfd_actor_addr.clone().attach_stream(read_from_maker)); + + tokio::spawn( + monitor_ctx + .notify_interval(Duration::from_secs(20), || monitor::Sync) + .map_err(|e| anyhow::anyhow!(e))?, + ); + tokio::spawn( + monitor_ctx + .run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?), + ); + + tokio::spawn( + oracle_ctx + .notify_interval(Duration::from_secs(5), || oracle::Sync) + .unwrap(), + ); + let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr]) + .create(None) + .spawn_global(); + + tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor)))); + + Ok(Self { + cfd_actor_addr, + cfd_feed_receiver, + order_feed_receiver, + update_cfd_feed_receiver, + }) + } +} diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index cb211ac..41f3e8c 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -2,28 +2,26 @@ use anyhow::{Context, Result}; use bdk::bitcoin; use bdk::bitcoin::secp256k1::schnorrsig; use clap::Clap; -use daemon::db::{self, load_all_cfds}; -use daemon::model::cfd::{Cfd, Order, UpdateCfdProposals}; +use daemon::db::{self}; + use daemon::model::WalletInfo; -use daemon::oracle::Attestation; + use daemon::seed::Seed; use daemon::wallet::Wallet; use daemon::{ - bitmex_price_feed, fan_out, housekeeping, logger, monitor, oracle, taker_cfd, wallet_sync, wire, + bitmex_price_feed, housekeeping, logger, monitor, oracle, taker_cfd, wallet_sync, Taker, }; -use futures::{Future, Stream}; + use sqlx::sqlite::SqliteConnectOptions; use sqlx::SqlitePool; -use std::collections::HashMap; + use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; -use std::time::Duration; + use tokio::sync::watch; use tracing_subscriber::filter::LevelFilter; -use xtra::prelude::{MessageChannel, StrongMessageChannel}; -use xtra::spawn::TokioGlobalSpawnExt; -use xtra::{Actor, Address}; +use xtra::prelude::MessageChannel; mod connection; mod routes_taker; @@ -167,12 +165,12 @@ async fn main() -> Result<()> { read_from_maker, } = connection::Actor::new(opts.maker).await; - let ActorSystem { + let Taker { cfd_actor_addr, cfd_feed_receiver, order_feed_receiver, update_cfd_feed_receiver, - } = ActorSystem::new( + } = Taker::new( db.clone(), wallet.clone(), oracle, @@ -222,91 +220,3 @@ async fn main() -> Result<()> { Ok(()) } - -pub struct ActorSystem { - cfd_actor_addr: Address>, - cfd_feed_receiver: watch::Receiver>, - order_feed_receiver: watch::Receiver>, - update_cfd_feed_receiver: watch::Receiver, -} - -impl ActorSystem -where - O: xtra::Handler - + xtra::Handler - + xtra::Handler - + xtra::Handler, - M: xtra::Handler - + xtra::Handler - + xtra::Handler - + xtra::Handler, -{ - pub async fn new( - db: SqlitePool, - wallet: Wallet, - oracle_pk: schnorrsig::PublicKey, - send_to_maker: Box>, - read_from_maker: Box + Unpin + Send>, - oracle_constructor: impl Fn(Vec, Box>) -> O, - monitor_constructor: impl Fn(Box>, Vec) -> F, - ) -> Result - where - F: Future>, - { - let mut conn = db.acquire().await?; - - let cfds = load_all_cfds(&mut conn).await?; - - let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); - let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); - let (update_cfd_feed_sender, update_cfd_feed_receiver) = - watch::channel::(HashMap::new()); - - let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None); - let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None); - - let cfd_actor_addr = taker_cfd::Actor::new( - db, - wallet, - oracle_pk, - cfd_feed_sender, - order_feed_sender, - update_cfd_feed_sender, - send_to_maker, - monitor_addr.clone(), - oracle_addr, - ) - .create(None) - .spawn_global(); - - tokio::spawn(cfd_actor_addr.clone().attach_stream(read_from_maker)); - - tokio::spawn( - monitor_ctx - .notify_interval(Duration::from_secs(20), || monitor::Sync) - .map_err(|e| anyhow::anyhow!(e))?, - ); - tokio::spawn( - monitor_ctx - .run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?), - ); - - tokio::spawn( - oracle_ctx - .notify_interval(Duration::from_secs(5), || oracle::Sync) - .unwrap(), - ); - let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr]) - .create(None) - .spawn_global(); - - tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor)))); - - Ok(Self { - cfd_actor_addr, - cfd_feed_receiver, - order_feed_receiver, - update_cfd_feed_receiver, - }) - } -} From fe4172f1c0f80c1db6b5448c69caff40d0ff89db Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Fri, 22 Oct 2021 15:41:03 +1100 Subject: [PATCH 4/7] Move tcp connection setup outside of `Maker` actor system It is an implementation detail that we use a tcp connection and bites us in the tests. Co-authored-by: Mariusz Klochowicz --- daemon/src/lib.rs | 21 +++------------------ daemon/src/maker.rs | 16 +++++++++++++++- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index c4fac5b..95c6362 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -10,9 +10,7 @@ use futures::Stream; use sqlx::SqlitePool; use std::collections::HashMap; use std::future::Future; -use std::task::Poll; use std::time::Duration; -use tokio::net::TcpListener; use tokio::sync::watch; use xtra::message_channel::{MessageChannel, StrongMessageChannel}; use xtra::spawn::TokioGlobalSpawnExt; @@ -52,6 +50,7 @@ pub struct Maker { pub cfd_feed_receiver: watch::Receiver>, pub order_feed_receiver: watch::Receiver>, pub update_cfd_feed_receiver: watch::Receiver, + pub inc_conn_addr: Address, } impl Maker @@ -65,8 +64,7 @@ where + xtra::Handler + xtra::Handler, T: xtra::Handler - + xtra::Handler - + xtra::Handler, + + xtra::Handler, { pub async fn new( db: SqlitePool, @@ -78,7 +76,6 @@ where Box>, Box>, ) -> T, - listener: TcpListener, term: time::Duration, ) -> Result where @@ -140,24 +137,12 @@ where oracle_addr.do_send_async(oracle::Sync).await?; - let listener_stream = futures::stream::poll_fn(move |ctx| { - let message = match futures::ready!(listener.poll_accept(ctx)) { - Ok((stream, address)) => { - maker_inc_connections::ListenerMessage::NewConnection { stream, address } - } - Err(e) => maker_inc_connections::ListenerMessage::Error { source: e }, - }; - - Poll::Ready(Some(message)) - }); - - tokio::spawn(inc_conn_addr.attach_stream(listener_stream)); - Ok(Self { cfd_actor_addr, cfd_feed_receiver, order_feed_receiver, update_cfd_feed_receiver, + inc_conn_addr, }) } } diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 5575419..cffc91f 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -22,6 +22,7 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; +use std::task::Poll; use tokio::sync::watch; use tracing_subscriber::filter::LevelFilter; @@ -190,6 +191,7 @@ async fn main() -> Result<()> { cfd_feed_receiver, order_feed_receiver, update_cfd_feed_receiver, + inc_conn_addr: incoming_connection_addr, } = Maker::new( db.clone(), wallet.clone(), @@ -202,11 +204,23 @@ async fn main() -> Result<()> { } }, |channel0, channel1| maker_inc_connections::Actor::new(channel0, channel1), - listener, time::Duration::hours(opts.term as i64), ) .await?; + let listener_stream = futures::stream::poll_fn(move |ctx| { + let message = match futures::ready!(listener.poll_accept(ctx)) { + Ok((stream, address)) => { + maker_inc_connections::ListenerMessage::NewConnection { stream, address } + } + Err(e) => maker_inc_connections::ListenerMessage::Error { source: e }, + }; + + Poll::Ready(Some(message)) + }); + + tokio::spawn(incoming_connection_addr.attach_stream(listener_stream)); + tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); let cfd_action_channel = MessageChannel::::clone_channel(&cfd_actor_addr); From 40f5b81e4d36a5d523b958ca1d994fb115cbe3b0 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Mon, 25 Oct 2021 12:48:47 +1100 Subject: [PATCH 5/7] Convert Wallet to actor Include a generic Wallet actor constructor in the actor systems and allow passing in a generic Wallet actor implementing xtra::Handlers into the cfd actors. Rename 'Maker' and 'Taker' to 'MakerActorSystem' and 'TakerActorSystem' for readability. Co-authored-by: Mariusz Klochowicz --- daemon/src/cfd_actors.rs | 58 ++++++++---- daemon/src/connection.rs | 2 +- daemon/src/housekeeping.rs | 33 +++++-- daemon/src/lib.rs | 41 +++++---- daemon/src/maker.rs | 20 ++-- daemon/src/maker_cfd.rs | 153 ++++++++++++++++++------------- daemon/src/setup_contract.rs | 25 +++-- daemon/src/taker.rs | 19 ++-- daemon/src/taker_cfd.rs | 171 +++++++++++++++++++++++------------ daemon/src/wallet.rs | 59 ++++++++---- daemon/src/wallet_sync.rs | 11 ++- 11 files changed, 386 insertions(+), 206 deletions(-) diff --git a/daemon/src/cfd_actors.rs b/daemon/src/cfd_actors.rs index d44b607..6355c14 100644 --- a/daemon/src/cfd_actors.rs +++ b/daemon/src/cfd_actors.rs @@ -1,7 +1,6 @@ use crate::db::load_cfd_by_order_id; use crate::model::cfd::{Attestation, Cfd, CfdState, CfdStateChangeEvent, OrderId}; -use crate::wallet::Wallet; -use crate::{db, monitor, oracle, try_continue}; +use crate::{db, monitor, oracle, try_continue, wallet}; use anyhow::{bail, Context, Result}; use sqlx::pool::PoolConnection; use sqlx::Sqlite; @@ -34,15 +33,21 @@ pub async fn append_cfd_state( Ok(()) } -pub async fn try_cet_publication( +pub async fn try_cet_publication( cfd: &mut Cfd, conn: &mut PoolConnection, - wallet: &Wallet, + wallet: &xtra::Address, update_sender: &watch::Sender>, -) -> Result<()> { +) -> Result<()> +where + W: xtra::Handler, +{ match cfd.cet()? { Ok(cet) => { - let txid = wallet.try_broadcast_transaction(cet).await?; + let txid = wallet + .send(wallet::TryBroadcastTransaction { tx: cet }) + .await? + .context("Failed to send transaction")?; tracing::info!("CET published with txid {}", txid); if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() { @@ -60,12 +65,15 @@ pub async fn try_cet_publication( Ok(()) } -pub async fn handle_monitoring_event( +pub async fn handle_monitoring_event( event: monitor::Event, conn: &mut PoolConnection, - wallet: &Wallet, + wallet: &xtra::Address, update_sender: &watch::Sender>, -) -> Result<()> { +) -> Result<()> +where + W: xtra::Handler, +{ let order_id = event.order_id(); let mut cfd = db::load_cfd_by_order_id(order_id, conn).await?; @@ -82,24 +90,37 @@ pub async fn handle_monitoring_event( try_cet_publication(&mut cfd, conn, wallet, update_sender).await?; } else if let CfdState::MustRefund { .. } = cfd.state { let signed_refund_tx = cfd.refund_tx()?; - let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?; + let txid = wallet + .send(wallet::TryBroadcastTransaction { + tx: signed_refund_tx, + }) + .await? + .context("Failed to publish CET")?; tracing::info!("Refund transaction published on chain: {}", txid); } Ok(()) } -pub async fn handle_commit( +pub async fn handle_commit( order_id: OrderId, conn: &mut PoolConnection, - wallet: &Wallet, + wallet: &xtra::Address, update_sender: &watch::Sender>, -) -> Result<()> { +) -> Result<()> +where + W: xtra::Handler, +{ let mut cfd = db::load_cfd_by_order_id(order_id, conn).await?; let signed_commit_tx = cfd.commit_tx()?; - let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?; + let txid = wallet + .send(wallet::TryBroadcastTransaction { + tx: signed_commit_tx, + }) + .await? + .context("Failed to publish commit tx")?; if cfd.handle(CfdStateChangeEvent::CommitTxSent)?.is_none() { bail!("If we can get the commit tx we should be able to transition") @@ -111,12 +132,15 @@ pub async fn handle_commit( Ok(()) } -pub async fn handle_oracle_attestation( +pub async fn handle_oracle_attestation( attestation: oracle::Attestation, conn: &mut PoolConnection, - wallet: &Wallet, + wallet: &xtra::Address, update_sender: &watch::Sender>, -) -> Result<()> { +) -> Result<()> +where + W: xtra::Handler, +{ tracing::debug!( "Learnt latest oracle attestation for event: {}", attestation.id diff --git a/daemon/src/connection.rs b/daemon/src/connection.rs index 5a6d9ce..05dbe02 100644 --- a/daemon/src/connection.rs +++ b/daemon/src/connection.rs @@ -1,4 +1,4 @@ -use daemon::{send_to_socket, taker_cfd, wire}; +use crate::{send_to_socket, taker_cfd, wire}; use futures::{Stream, StreamExt}; use std::net::SocketAddr; use std::time::Duration; diff --git a/daemon/src/housekeeping.rs b/daemon/src/housekeeping.rs index dd5d225..ef20c69 100644 --- a/daemon/src/housekeeping.rs +++ b/daemon/src/housekeeping.rs @@ -1,10 +1,11 @@ use crate::db::{append_cfd_state, load_all_cfds}; use crate::model::cfd::{Cfd, CfdState}; -use crate::try_continue; -use crate::wallet::Wallet; +use crate::{try_continue, wallet}; use anyhow::Result; use sqlx::pool::PoolConnection; use sqlx::Sqlite; +use xtra::Address; + pub async fn transition_non_continue_cfds_to_setup_failed( conn: &mut PoolConnection, ) -> Result<()> { @@ -24,25 +25,40 @@ pub async fn transition_non_continue_cfds_to_setup_failed( pub async fn rebroadcast_transactions( conn: &mut PoolConnection, - wallet: &Wallet, + wallet: &Address, ) -> Result<()> { let cfds = load_all_cfds(conn).await?; for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) { - let txid = try_continue!(wallet.try_broadcast_transaction(dlc.lock.0.clone()).await); + let txid = try_continue!(wallet + .send(wallet::TryBroadcastTransaction { + tx: dlc.lock.0.clone() + }) + .await + .expect("if sending to actor fails here we are screwed anyway")); tracing::info!("Lock transaction published with txid {}", txid); } for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) { let signed_refund_tx = cfd.refund_tx()?; - let txid = try_continue!(wallet.try_broadcast_transaction(signed_refund_tx).await); + let txid = try_continue!(wallet + .send(wallet::TryBroadcastTransaction { + tx: signed_refund_tx + }) + .await + .expect("if sending to actor fails here we are screwed anyway")); tracing::info!("Refund transaction published on chain: {}", txid); } for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) { let signed_commit_tx = cfd.commit_tx()?; - let txid = try_continue!(wallet.try_broadcast_transaction(signed_commit_tx).await); + let txid = try_continue!(wallet + .send(wallet::TryBroadcastTransaction { + tx: signed_commit_tx + }) + .await + .expect("if sending to actor fails here we are screwed anyway")); tracing::info!("Commit transaction published on chain: {}", txid); } @@ -50,7 +66,10 @@ pub async fn rebroadcast_transactions( for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_cet(cfd)) { // Double question mark OK because if we are in PendingCet we must have been Ready before let signed_cet = cfd.cet()??; - let txid = try_continue!(wallet.try_broadcast_transaction(signed_cet).await); + let txid = try_continue!(wallet + .send(wallet::TryBroadcastTransaction { tx: signed_cet }) + .await + .expect("if sending to actor fails here we are screwed anyway")); tracing::info!("CET published on chain: {}", txid); } diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 95c6362..95daf54 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -3,7 +3,6 @@ use crate::db::load_all_cfds; use crate::maker_cfd::{FromTaker, NewTakerOnline}; use crate::model::cfd::{Cfd, Order, UpdateCfdProposals}; use crate::oracle::Attestation; -use crate::wallet::Wallet; use anyhow::Result; use cfd_protocol::secp256k1_zkp::schnorrsig; use futures::Stream; @@ -20,6 +19,7 @@ pub mod actors; pub mod auth; pub mod bitmex_price_feed; pub mod cfd_actors; +pub mod connection; pub mod db; pub mod fan_out; pub mod forward_only_ok; @@ -45,15 +45,15 @@ pub mod wallet; pub mod wallet_sync; pub mod wire; -pub struct Maker { - pub cfd_actor_addr: Address>, +pub struct MakerActorSystem { + pub cfd_actor_addr: Address>, pub cfd_feed_receiver: watch::Receiver>, pub order_feed_receiver: watch::Receiver>, pub update_cfd_feed_receiver: watch::Receiver, pub inc_conn_addr: Address, } -impl Maker +impl MakerActorSystem where O: xtra::Handler + xtra::Handler @@ -65,14 +65,18 @@ where + xtra::Handler, T: xtra::Handler + xtra::Handler, + W: xtra::Handler + + xtra::Handler + + xtra::Handler + + xtra::Handler, { pub async fn new( db: SqlitePool, - wallet: Wallet, + wallet_addr: Address, oracle_pk: schnorrsig::PublicKey, - oracle_constructor: impl Fn(Vec, Box>) -> O, - monitor_constructor: impl Fn(Box>, Vec) -> F, - inc_conn_constructor: impl Fn( + oracle_constructor: impl FnOnce(Vec, Box>) -> O, + monitor_constructor: impl FnOnce(Box>, Vec) -> F, + inc_conn_constructor: impl FnOnce( Box>, Box>, ) -> T, @@ -96,7 +100,7 @@ where let cfd_actor_addr = maker_cfd::Actor::new( db, - wallet, + wallet_addr, term, oracle_pk, cfd_feed_sender, @@ -147,14 +151,14 @@ where } } -pub struct Taker { - pub cfd_actor_addr: Address>, +pub struct TakerActorSystem { + pub cfd_actor_addr: Address>, pub cfd_feed_receiver: watch::Receiver>, pub order_feed_receiver: watch::Receiver>, pub update_cfd_feed_receiver: watch::Receiver, } -impl Taker +impl TakerActorSystem where O: xtra::Handler + xtra::Handler @@ -164,15 +168,19 @@ where + xtra::Handler + xtra::Handler + xtra::Handler, + W: xtra::Handler + + xtra::Handler + + xtra::Handler + + xtra::Handler, { pub async fn new( db: SqlitePool, - wallet: Wallet, + wallet_addr: Address, oracle_pk: schnorrsig::PublicKey, send_to_maker: Box>, read_from_maker: Box + Unpin + Send>, - oracle_constructor: impl Fn(Vec, Box>) -> O, - monitor_constructor: impl Fn(Box>, Vec) -> F, + oracle_constructor: impl FnOnce(Vec, Box>) -> O, + monitor_constructor: impl FnOnce(Box>, Vec) -> F, ) -> Result where F: Future>, @@ -191,7 +199,7 @@ where let cfd_actor_addr = taker_cfd::Actor::new( db, - wallet, + wallet_addr, oracle_pk, cfd_feed_sender, order_feed_sender, @@ -220,6 +228,7 @@ where .notify_interval(Duration::from_secs(5), || oracle::Sync) .unwrap(), ); + let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr]) .create(None) .spawn_global(); diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index cffc91f..a5f1d51 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -8,15 +8,13 @@ use daemon::db::{self}; use daemon::model::WalletInfo; use daemon::seed::Seed; -use daemon::wallet::Wallet; use daemon::{ bitmex_price_feed, housekeeping, logger, maker_cfd, maker_inc_connections, monitor, oracle, - wallet_sync, Maker, + wallet, wallet_sync, MakerActorSystem, }; use sqlx::sqlite::SqliteConnectOptions; use sqlx::SqlitePool; -use xtra::prelude::MessageChannel; use std::net::SocketAddr; use std::path::PathBuf; @@ -25,6 +23,9 @@ use std::str::FromStr; use std::task::Poll; use tokio::sync::watch; use tracing_subscriber::filter::LevelFilter; +use xtra::prelude::*; +use xtra::spawn::TokioGlobalSpawnExt; +use xtra::Actor; mod routes_maker; @@ -129,13 +130,16 @@ async fn main() -> Result<()> { let bitcoin_network = opts.network.bitcoin_network(); let ext_priv_key = seed.derive_extended_priv_key(bitcoin_network)?; - let wallet = Wallet::new( + let wallet = wallet::Actor::new( opts.network.electrum(), &data_dir.join("maker_wallet.sqlite"), ext_priv_key, ) - .await?; - let wallet_info = wallet.sync().await?; + .await? + .create(None) + .spawn_global(); + + let wallet_info = wallet.send(wallet::Sync).await??; let auth_password = seed.derive_auth_password::(); @@ -186,13 +190,13 @@ async fn main() -> Result<()> { housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn).await?; housekeeping::rebroadcast_transactions(&mut conn, &wallet).await?; - let Maker { + let MakerActorSystem { cfd_actor_addr, cfd_feed_receiver, order_feed_receiver, update_cfd_feed_receiver, inc_conn_addr: incoming_connection_addr, - } = Maker::new( + } = MakerActorSystem::new( db.clone(), wallet.clone(), oracle, diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 85d5941..26f0ad7 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -8,8 +8,7 @@ use crate::model::cfd::{ }; use crate::model::{TakerId, Usd}; use crate::monitor::MonitorParams; -use crate::wallet::Wallet; -use crate::{log_error, maker_inc_connections, monitor, oracle, setup_contract, wire}; +use crate::{log_error, maker_inc_connections, monitor, oracle, setup_contract, wallet, wire}; use anyhow::{Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; @@ -59,9 +58,9 @@ pub struct FromTaker { pub msg: wire::TakerToMaker, } -pub struct Actor { +pub struct Actor { db: sqlx::SqlitePool, - wallet: Wallet, + wallet: Address, term: Duration, oracle_pk: schnorrsig::PublicKey, cfd_feed_actor_inbox: watch::Sender>, @@ -94,11 +93,11 @@ enum RollOverState { None, } -impl Actor { +impl Actor { #[allow(clippy::too_many_arguments)] pub fn new( db: sqlx::SqlitePool, - wallet: Wallet, + wallet: Address, term: Duration, oracle_pk: schnorrsig::PublicKey, cfd_feed_actor_inbox: watch::Sender>, @@ -127,19 +126,6 @@ impl Actor { } } - async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { - let mut conn = self.db.acquire().await?; - cfd_actors::handle_commit( - order_id, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; - - Ok(()) - } - async fn handle_propose_roll_over( &mut self, proposal: RollOverProposal, @@ -204,30 +190,6 @@ impl Actor { Ok(()) } - async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { - let mut conn = self.db.acquire().await?; - cfd_actors::handle_monitoring_event( - event, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; - Ok(()) - } - - async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { - let mut conn = self.db.acquire().await?; - cfd_actors::handle_oracle_attestation( - attestation, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; - Ok(()) - } - async fn handle_inc_protocol_msg( &mut self, taker_id: TakerId, @@ -311,7 +273,49 @@ impl Actor { } } -impl Actor +impl Actor +where + W: xtra::Handler, +{ + async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { + let mut conn = self.db.acquire().await?; + cfd_actors::handle_commit( + order_id, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; + + Ok(()) + } + + async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { + let mut conn = self.db.acquire().await?; + cfd_actors::handle_monitoring_event( + event, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; + Ok(()) + } + + async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { + let mut conn = self.db.acquire().await?; + cfd_actors::handle_oracle_attestation( + attestation, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; + Ok(()) + } +} + +impl Actor where T: xtra::Handler, { @@ -401,7 +405,7 @@ where } } -impl Actor +impl Actor where T: xtra::Handler + xtra::Handler, @@ -530,11 +534,12 @@ where } } -impl Actor +impl Actor where Self: xtra::Handler, O: xtra::Handler, T: xtra::Handler, + W: xtra::Handler + xtra::Handler, { async fn handle_accept_order( &mut self, @@ -622,7 +627,7 @@ where } } -impl Actor +impl Actor where O: xtra::Handler, T: xtra::Handler, @@ -667,10 +672,11 @@ where } } -impl Actor +impl Actor where O: xtra::Handler, M: xtra::Handler, + W: xtra::Handler, { async fn handle_cfd_setup_completed( &mut self, @@ -694,8 +700,10 @@ where let txid = self .wallet - .try_broadcast_transaction(dlc.lock.0.clone()) - .await?; + .send(wallet::TryBroadcastTransaction { + tx: dlc.lock.0.clone(), + }) + .await??; tracing::info!("Lock transaction published with txid {}", txid); @@ -716,7 +724,7 @@ where } } -impl Actor +impl Actor where Self: xtra::Handler, O: xtra::Handler + xtra::Handler, @@ -809,7 +817,7 @@ where } } -impl Actor +impl Actor where M: xtra::Handler, { @@ -842,9 +850,10 @@ where } } -impl Actor +impl Actor where M: xtra::Handler, + W: xtra::Handler, { async fn handle_initiate_settlement( &mut self, @@ -888,8 +897,10 @@ where let txid = self .wallet - .try_broadcast_transaction(spend_tx.clone()) - .await + .send(wallet::TryBroadcastTransaction { + tx: spend_tx.clone(), + }) + .await? .context("Broadcasting spend transaction")?; tracing::info!("Close transaction published with txid {}", txid); @@ -909,12 +920,15 @@ where } #[async_trait] -impl Handler for Actor +impl Handler for Actor where Self: xtra::Handler + xtra::Handler, O: xtra::Handler + xtra::Handler, T: xtra::Handler + xtra::Handler, + W: xtra::Handler + + xtra::Handler + + xtra::Handler, { async fn handle(&mut self, msg: CfdAction, ctx: &mut Context) { use CfdAction::*; @@ -933,7 +947,7 @@ where } #[async_trait] -impl Handler for Actor +impl Handler for Actor where O: xtra::Handler, T: xtra::Handler, @@ -944,7 +958,7 @@ where } #[async_trait] -impl Handler for Actor +impl Handler for Actor where T: xtra::Handler, { @@ -954,10 +968,12 @@ where } #[async_trait] -impl Handler for Actor +impl Handler + for Actor where O: xtra::Handler, M: xtra::Handler, + W: xtra::Handler, { async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context) { log_error!(self.handle_cfd_setup_completed(msg.order_id, msg.dlc)); @@ -965,7 +981,8 @@ where } #[async_trait] -impl Handler for Actor +impl Handler + for Actor where M: xtra::Handler, { @@ -975,18 +992,22 @@ where } #[async_trait] -impl Handler for Actor { +impl Handler for Actor +where + W: xtra::Handler, +{ async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context) { log_error!(self.handle_monitoring_event(msg)) } } #[async_trait] -impl Handler for Actor +impl Handler for Actor where T: xtra::Handler + xtra::Handler, M: xtra::Handler, + W: xtra::Handler, { async fn handle(&mut self, FromTaker { taker_id, msg }: FromTaker, _ctx: &mut Context) { match msg { @@ -1041,7 +1062,11 @@ where } #[async_trait] -impl Handler for Actor { +impl Handler + for Actor +where + W: xtra::Handler, +{ async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context) { log_error!(self.handle_oracle_attestation(msg)) } @@ -1071,4 +1096,4 @@ impl Message for FromTaker { type Result = (); } -impl xtra::Actor for Actor {} +impl xtra::Actor for Actor {} diff --git a/daemon/src/setup_contract.rs b/daemon/src/setup_contract.rs index 1243f39..dbeab5d 100644 --- a/daemon/src/setup_contract.rs +++ b/daemon/src/setup_contract.rs @@ -1,10 +1,9 @@ use crate::model::cfd::{Cet, Cfd, Dlc, RevokedCommit, Role}; use crate::tokio_ext::FutureExt; -use crate::wallet::Wallet; use crate::wire::{ Msg0, Msg1, Msg2, RollOverMsg, RollOverMsg0, RollOverMsg1, RollOverMsg2, SetupMsg, }; -use crate::{model, oracle, payout_curve}; +use crate::{model, oracle, payout_curve, wallet}; use anyhow::{Context, Result}; use bdk::bitcoin::secp256k1::{schnorrsig, Signature, SECP256K1}; use bdk::bitcoin::util::psbt::PartiallySignedTransaction; @@ -23,25 +22,33 @@ use std::collections::HashMap; use std::iter::FromIterator; use std::ops::RangeInclusive; use std::time::Duration; +use xtra::Address; /// Given an initial set of parameters, sets up the CFD contract with /// the other party. -pub async fn new( +pub async fn new( mut sink: impl Sink + Unpin, mut stream: impl FusedStream + Unpin, (oracle_pk, announcement): (schnorrsig::PublicKey, oracle::Announcement), cfd: Cfd, - wallet: Wallet, + wallet: Address, role: Role, -) -> Result { +) -> Result +where + W: xtra::Handler + xtra::Handler, +{ let (sk, pk) = crate::keypair::new(&mut rand::thread_rng()); let (rev_sk, rev_pk) = crate::keypair::new(&mut rand::thread_rng()); let (publish_sk, publish_pk) = crate::keypair::new(&mut rand::thread_rng()); let margin = cfd.margin().context("Failed to calculate margin")?; let own_params = wallet - .build_party_params(margin, pk) + .send(wallet::BuildPartyParams { + amount: margin, + identity_pk: pk, + }) .await + .context("Failed to send message to wallet actor")? .context("Failed to build party params")?; let own_punish = PunishParams { @@ -171,7 +178,11 @@ pub async fn new( tracing::info!("Verified all signatures"); - let mut signed_lock_tx = wallet.sign(lock_tx).await?; + let mut signed_lock_tx = wallet + .send(wallet::Sign { psbt: lock_tx }) + .await + .context("Failed to send message to wallet actor")? + .context("Failed to sign transaction")?; sink.send(SetupMsg::Msg2(Msg2 { signed_lock: signed_lock_tx.clone(), })) diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 41f3e8c..d3474e7 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -7,9 +7,9 @@ use daemon::db::{self}; use daemon::model::WalletInfo; use daemon::seed::Seed; -use daemon::wallet::Wallet; use daemon::{ - bitmex_price_feed, housekeeping, logger, monitor, oracle, taker_cfd, wallet_sync, Taker, + bitmex_price_feed, connection, housekeeping, logger, monitor, oracle, taker_cfd, wallet, + wallet_sync, TakerActorSystem, }; use sqlx::sqlite::SqliteConnectOptions; @@ -22,8 +22,9 @@ use std::str::FromStr; use tokio::sync::watch; use tracing_subscriber::filter::LevelFilter; use xtra::prelude::MessageChannel; +use xtra::spawn::TokioGlobalSpawnExt; +use xtra::Actor; -mod connection; mod routes_taker; #[derive(Clap)] @@ -121,13 +122,15 @@ async fn main() -> Result<()> { let bitcoin_network = opts.network.bitcoin_network(); let ext_priv_key = seed.derive_extended_priv_key(bitcoin_network)?; - let wallet = Wallet::new( + let wallet = wallet::Actor::new( opts.network.electrum(), &data_dir.join("taker_wallet.sqlite"), ext_priv_key, ) - .await?; - let wallet_info = wallet.sync().await.unwrap(); + .await? + .create(None) + .spawn_global(); + let wallet_info = wallet.send(wallet::Sync).await??; // TODO: Actually fetch it from Olivia let oracle = schnorrsig::PublicKey::from_str( @@ -165,12 +168,12 @@ async fn main() -> Result<()> { read_from_maker, } = connection::Actor::new(opts.maker).await; - let Taker { + let TakerActorSystem { cfd_actor_addr, cfd_feed_receiver, order_feed_receiver, update_cfd_feed_receiver, - } = Taker::new( + } = TakerActorSystem::new( db.clone(), wallet.clone(), oracle, diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index f2a98ae..74decb1 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -7,9 +7,8 @@ use crate::model::cfd::{ }; use crate::model::{BitMexPriceEventId, Usd}; use crate::monitor::{self, MonitorParams}; -use crate::wallet::Wallet; use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg}; -use crate::{log_error, oracle, setup_contract, wire}; +use crate::{log_error, oracle, setup_contract, wallet, wire}; use anyhow::{Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; @@ -68,9 +67,9 @@ enum RollOverState { None, } -pub struct Actor { +pub struct Actor { db: sqlx::SqlitePool, - wallet: Wallet, + wallet: Address, oracle_pk: schnorrsig::PublicKey, cfd_feed_actor_inbox: watch::Sender>, order_feed_actor_inbox: watch::Sender>, @@ -83,11 +82,16 @@ pub struct Actor { current_pending_proposals: UpdateCfdProposals, } -impl Actor { +impl Actor +where + W: xtra::Handler + + xtra::Handler + + xtra::Handler, +{ #[allow(clippy::too_many_arguments)] pub fn new( db: sqlx::SqlitePool, - wallet: Wallet, + wallet: Address, oracle_pk: schnorrsig::PublicKey, cfd_feed_actor_inbox: watch::Sender>, order_feed_actor_inbox: watch::Sender>, @@ -111,7 +115,9 @@ impl Actor { current_pending_proposals: HashMap::new(), } } +} +impl Actor { fn send_pending_update_proposals(&self) -> Result<()> { Ok(self .update_cfd_feed_sender @@ -165,6 +171,25 @@ impl Actor { Ok(()) } +} + +impl Actor +where + W: xtra::Handler + + xtra::Handler + + xtra::Handler, +{ + async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { + let mut conn = self.db.acquire().await?; + cfd_actors::handle_commit( + order_id, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; + Ok(()) + } async fn handle_propose_settlement( &mut self, @@ -255,42 +280,6 @@ impl Actor { Ok(()) } - async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { - let mut conn = self.db.acquire().await?; - cfd_actors::handle_monitoring_event( - event, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; - Ok(()) - } - - async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { - let mut conn = self.db.acquire().await?; - cfd_actors::handle_commit( - order_id, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; - Ok(()) - } - - async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { - let mut conn = self.db.acquire().await?; - cfd_actors::handle_oracle_attestation( - attestation, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; - Ok(()) - } - async fn handle_invalid_order_id(&mut self, order_id: OrderId) -> Result<()> { tracing::debug!(%order_id, "Invalid order ID"); @@ -311,7 +300,7 @@ impl Actor { } } -impl Actor +impl Actor where O: xtra::Handler, { @@ -334,7 +323,42 @@ where } Ok(()) } +} + +impl Actor +where + W: xtra::Handler, +{ + async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { + let mut conn = self.db.acquire().await?; + cfd_actors::handle_oracle_attestation( + attestation, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; + Ok(()) + } + async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { + let mut conn = self.db.acquire().await?; + cfd_actors::handle_monitoring_event( + event, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; + Ok(()) + } +} + +impl Actor +where + O: xtra::Handler, + W: xtra::Handler, +{ async fn handle_propose_roll_over(&mut self, order_id: OrderId) -> Result<()> { if self.current_pending_proposals.contains_key(&order_id) { anyhow::bail!("An update for order id {} is already in progress", order_id) @@ -371,11 +395,20 @@ where Ok(()) } } +impl Actor +where + O: xtra::Handler, + W: xtra::Handler + + xtra::Handler + + xtra::Handler, +{ +} -impl Actor +impl Actor where O: xtra::Handler, M: xtra::Handler, + W: xtra::Handler, { async fn handle_cfd_setup_completed( &mut self, @@ -399,8 +432,10 @@ where let txid = self .wallet - .try_broadcast_transaction(dlc.lock.0.clone()) - .await?; + .send(wallet::TryBroadcastTransaction { + tx: dlc.lock.0.clone(), + }) + .await??; tracing::info!("Lock transaction published with txid {}", txid); @@ -421,10 +456,11 @@ where } } -impl Actor +impl Actor where Self: xtra::Handler, O: xtra::Handler + xtra::Handler, + W: xtra::Handler + xtra::Handler, { async fn handle_order_accepted( &mut self, @@ -486,10 +522,13 @@ where } } -impl Actor +impl Actor where Self: xtra::Handler, O: xtra::Handler, + W: xtra::Handler + + xtra::Handler + + xtra::Handler, { async fn handle_roll_over_accepted( &mut self, @@ -546,7 +585,7 @@ where } } -impl Actor +impl Actor where M: xtra::Handler, { @@ -580,9 +619,12 @@ where } } -impl Actor +impl Actor where M: xtra::Handler, + W: xtra::Handler + + xtra::Handler + + xtra::Handler, { async fn handle_settlement_accepted( &mut self, @@ -630,16 +672,19 @@ where } #[async_trait] -impl Handler for Actor { +impl Handler for Actor { async fn handle(&mut self, msg: TakeOffer, _ctx: &mut Context) { log_error!(self.handle_take_offer(msg.order_id, msg.quantity)); } } #[async_trait] -impl Handler for Actor +impl Handler for Actor where O: xtra::Handler, + W: xtra::Handler + + xtra::Handler + + xtra::Handler, { async fn handle(&mut self, msg: CfdAction, _ctx: &mut Context) { use CfdAction::*; @@ -661,13 +706,16 @@ where } #[async_trait] -impl Handler for Actor +impl Handler for Actor where Self: xtra::Handler + xtra::Handler, O: xtra::Handler + xtra::Handler + xtra::Handler, M: xtra::Handler, + W: xtra::Handler + + xtra::Handler + + xtra::Handler, { async fn handle( &mut self, @@ -723,10 +771,11 @@ where } #[async_trait] -impl Handler for Actor +impl Handler for Actor where O: xtra::Handler, M: xtra::Handler, + W: xtra::Handler, { async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context) { log_error!(self.handle_cfd_setup_completed(msg.order_id, msg.dlc)); @@ -734,7 +783,7 @@ where } #[async_trait] -impl Handler for Actor +impl Handler for Actor where M: xtra::Handler, { @@ -744,14 +793,20 @@ where } #[async_trait] -impl Handler for Actor { +impl Handler for Actor +where + W: xtra::Handler, +{ async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context) { log_error!(self.handle_monitoring_event(msg)) } } #[async_trait] -impl Handler for Actor { +impl Handler for Actor +where + W: xtra::Handler, +{ async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context) { log_error!(self.handle_oracle_attestation(msg)) } @@ -778,4 +833,4 @@ impl Message for CfdRollOverCompleted { type Result = (); } -impl xtra::Actor for Actor {} +impl xtra::Actor for Actor {} diff --git a/daemon/src/wallet.rs b/daemon/src/wallet.rs index 639db0c..47119e9 100644 --- a/daemon/src/wallet.rs +++ b/daemon/src/wallet.rs @@ -13,9 +13,10 @@ use std::path::Path; use std::sync::Arc; use std::time::SystemTime; use tokio::sync::Mutex; +use xtra_productivity::xtra_productivity; #[derive(Clone)] -pub struct Wallet { +pub struct Actor { wallet: Arc>>, } @@ -23,7 +24,7 @@ pub struct Wallet { #[error("The transaction is already in the blockchain")] pub struct TransactionAlreadyInBlockchain; -impl Wallet { +impl Actor { pub async fn new( electrum_rpc_url: &str, wallet_dir: &Path, @@ -46,17 +47,11 @@ impl Wallet { Ok(Self { wallet }) } +} - pub async fn build_party_params( - &self, - amount: Amount, - identity_pk: PublicKey, - ) -> Result { - let wallet = self.wallet.lock().await; - wallet.build_party_params(amount, identity_pk) - } - - pub async fn sync(&self) -> Result { +#[xtra_productivity] +impl Actor { + pub async fn handle_sync(&self, _msg: Sync) -> Result { let wallet = self.wallet.lock().await; wallet .sync(NoopProgress, None) @@ -75,10 +70,8 @@ impl Wallet { Ok(wallet_info) } - pub async fn sign( - &self, - mut psbt: PartiallySignedTransaction, - ) -> Result { + pub async fn handle_sign(&self, msg: Sign) -> Result { + let mut psbt = msg.psbt; let wallet = self.wallet.lock().await; wallet @@ -94,7 +87,22 @@ impl Wallet { Ok(psbt) } - pub async fn try_broadcast_transaction(&self, tx: Transaction) -> Result { + pub async fn build_party_params( + &self, + BuildPartyParams { + amount, + identity_pk, + }: BuildPartyParams, + ) -> Result { + let wallet = self.wallet.lock().await; + wallet.build_party_params(amount, identity_pk) + } + + pub async fn handle_try_broadcast_transaction( + &self, + msg: TryBroadcastTransaction, + ) -> Result { + let tx = msg.tx; let wallet = self.wallet.lock().await; let txid = tx.txid(); @@ -128,6 +136,23 @@ impl Wallet { } } +impl xtra::Actor for Actor {} + +pub struct BuildPartyParams { + pub amount: Amount, + pub identity_pk: PublicKey, +} + +pub struct Sync; + +pub struct Sign { + pub psbt: PartiallySignedTransaction, +} + +pub struct TryBroadcastTransaction { + pub tx: Transaction, +} + fn parse_rpc_protocol_error_code(error_value: &Value) -> Result { let json = error_value .as_str() diff --git a/daemon/src/wallet_sync.rs b/daemon/src/wallet_sync.rs index 5c33fed..0ff13ae 100644 --- a/daemon/src/wallet_sync.rs +++ b/daemon/src/wallet_sync.rs @@ -1,14 +1,19 @@ use crate::model::WalletInfo; -use crate::wallet::Wallet; +use crate::wallet; use std::time::Duration; use tokio::sync::watch; use tokio::time::sleep; +use xtra::Address; -pub async fn new(wallet: Wallet, sender: watch::Sender) { +pub async fn new(wallet: Address, sender: watch::Sender) { loop { sleep(Duration::from_secs(10)).await; - let info = match wallet.sync().await { + let info = match wallet + .send(wallet::Sync) + .await + .expect("Wallet actor to be available") + { Ok(info) => info, Err(e) => { tracing::warn!("Failed to sync wallet: {:#}", e); From 1b7239c9440ae518107cac4c3fc07ecdd571e39d Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Tue, 26 Oct 2021 10:05:10 +1100 Subject: [PATCH 6/7] Simple actor test showcasing how to write actor tests Co-authored-by: Mariusz Klochowicz --- daemon/tests/happy_path.rs | 273 +++++++++++++++++++++++++++++++++++++ 1 file changed, 273 insertions(+) create mode 100644 daemon/tests/happy_path.rs diff --git a/daemon/tests/happy_path.rs b/daemon/tests/happy_path.rs new file mode 100644 index 0000000..f137dfd --- /dev/null +++ b/daemon/tests/happy_path.rs @@ -0,0 +1,273 @@ +use anyhow::Result; +use bdk::bitcoin::util::psbt::PartiallySignedTransaction; +use bdk::bitcoin::{ecdsa, Txid}; +use cfd_protocol::secp256k1_zkp::{schnorrsig, Secp256k1}; +use cfd_protocol::PartyParams; +use daemon::model::cfd::Order; +use daemon::model::{Usd, WalletInfo}; +use daemon::{connection, db, logger, maker_cfd, maker_inc_connections, monitor, oracle, wallet}; +use rand::thread_rng; +use rust_decimal_macros::dec; +use sqlx::SqlitePool; +use std::net::SocketAddr; +use std::str::FromStr; +use std::task::Poll; +use std::time::SystemTime; +use tokio::sync::watch; +use tracing_subscriber::filter::LevelFilter; +use xtra::spawn::TokioGlobalSpawnExt; +use xtra::Actor; +use xtra_productivity::xtra_productivity; + +#[tokio::test] +async fn taker_receives_order_from_maker_on_publication() { + let (mut maker, mut taker) = start_both().await; + + assert!(is_next_none(&mut taker.order_feed).await); + + let (published, received) = tokio::join!( + maker.publish_order(new_dummy_order()), + next_some(&mut taker.order_feed) + ); + + // TODO: Add assertion function so we can assert on the other order values + assert_eq!(published.id, received.id); +} + +fn new_dummy_order() -> maker_cfd::NewOrder { + maker_cfd::NewOrder { + price: Usd::new(dec!(50_000)), + min_quantity: Usd::new(dec!(10)), + max_quantity: Usd::new(dec!(100)), + } +} + +/// Returns the value if the next Option received on the stream is Some +/// +/// Panics if None is received on the stream. +async fn next_some(rx: &mut watch::Receiver>) -> T +where + T: Clone, +{ + if let Some(value) = next(rx).await { + value + } else { + panic!("Received None when Some was expected") + } +} + +/// Returns true if the next Option received on the stream is None +/// +/// Returns false if Some is received. +async fn is_next_none(rx: &mut watch::Receiver>) -> bool +where + T: Clone, +{ + next(rx).await.is_none() +} + +/// Returns watch channel value upon change +async fn next(rx: &mut watch::Receiver) -> T +where + T: Clone, +{ + rx.changed().await.unwrap(); + rx.borrow().clone() +} + +fn init_tracing() { + logger::init(LevelFilter::DEBUG, false).unwrap(); + tracing::info!("Running version: {}", env!("VERGEN_GIT_SEMVER_LIGHTWEIGHT")); +} + +/// Test Stub simulating the Oracle actor +struct Oracle; +impl xtra::Actor for Oracle {} + +#[xtra_productivity(message_impl = false)] +impl Oracle { + async fn handle_fetch_announcement(&mut self, _msg: oracle::FetchAnnouncement) {} + + async fn handle_get_announcement( + &mut self, + _msg: oracle::GetAnnouncement, + ) -> Option { + todo!("stub this if needed") + } + + async fn handle(&mut self, _msg: oracle::MonitorAttestation) { + todo!("stub this if needed") + } + + async fn handle(&mut self, _msg: oracle::Sync) {} +} + +/// Test Stub simulating the Monitor actor +struct Monitor; +impl xtra::Actor for Monitor {} + +#[xtra_productivity(message_impl = false)] +impl Monitor { + async fn handle(&mut self, _msg: monitor::Sync) {} + + async fn handle(&mut self, _msg: monitor::StartMonitoring) { + todo!("stub this if needed") + } + + async fn handle(&mut self, _msg: monitor::CollaborativeSettlement) { + todo!("stub this if needed") + } + + async fn handle(&mut self, _msg: oracle::Attestation) { + todo!("stub this if needed") + } +} + +/// Test Stub simulating the Wallet actor +struct Wallet; +impl xtra::Actor for Wallet {} + +#[xtra_productivity(message_impl = false)] +impl Wallet { + async fn handle(&mut self, _msg: wallet::BuildPartyParams) -> Result { + todo!("stub this if needed") + } + async fn handle(&mut self, _msg: wallet::Sync) -> Result { + let s = Secp256k1::new(); + let public_key = ecdsa::PublicKey::new(s.generate_keypair(&mut thread_rng()).1); + let address = bdk::bitcoin::Address::p2pkh(&public_key, bdk::bitcoin::Network::Testnet); + + Ok(WalletInfo { + balance: bdk::bitcoin::Amount::ONE_BTC, + address, + last_updated_at: SystemTime::now(), + }) + } + async fn handle(&mut self, _msg: wallet::Sign) -> Result { + todo!("stub this if needed") + } + async fn handle(&mut self, _msg: wallet::TryBroadcastTransaction) -> Result { + todo!("stub this if needed") + } +} + +/// Maker Test Setup +struct Maker { + cfd_actor_addr: + xtra::Address>, + order_feed_receiver: watch::Receiver>, + #[allow(dead_code)] // we need to keep the xtra::Address for refcounting + inc_conn_addr: xtra::Address, + address: SocketAddr, +} + +impl Maker { + async fn start(oracle_pk: schnorrsig::PublicKey) -> Self { + let db = in_memory_db().await; + + let wallet_addr = Wallet {}.create(None).spawn_global(); + + let term = time::Duration::hours(24); + + let maker = daemon::MakerActorSystem::new( + db, + wallet_addr, + oracle_pk, + |_, _| Oracle, + |_, _| async { Ok(Monitor) }, + |channel0, channel1| maker_inc_connections::Actor::new(channel0, channel1), + term, + ) + .await + .unwrap(); + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + + let address = listener.local_addr().unwrap(); + + let listener_stream = futures::stream::poll_fn(move |ctx| { + let message = match futures::ready!(listener.poll_accept(ctx)) { + Ok((stream, address)) => { + dbg!("new connection"); + maker_inc_connections::ListenerMessage::NewConnection { stream, address } + } + Err(e) => maker_inc_connections::ListenerMessage::Error { source: e }, + }; + + Poll::Ready(Some(message)) + }); + + tokio::spawn(maker.inc_conn_addr.clone().attach_stream(listener_stream)); + + Self { + cfd_actor_addr: maker.cfd_actor_addr, + order_feed_receiver: maker.order_feed_receiver, + inc_conn_addr: maker.inc_conn_addr, + address, + } + } + + async fn publish_order(&mut self, new_order_params: maker_cfd::NewOrder) -> Order { + self.cfd_actor_addr.send(new_order_params).await.unwrap(); + let next_order = self.order_feed_receiver.borrow().clone().unwrap(); + + next_order + } +} + +/// Taker Test Setup +struct Taker { + order_feed: watch::Receiver>, +} + +impl Taker { + async fn start(oracle_pk: schnorrsig::PublicKey, maker_address: SocketAddr) -> Self { + let connection::Actor { + send_to_maker, + read_from_maker, + } = connection::Actor::new(maker_address).await; + + let db = in_memory_db().await; + + let wallet_addr = Wallet {}.create(None).spawn_global(); + + let taker = daemon::TakerActorSystem::new( + db, + wallet_addr, + oracle_pk, + send_to_maker, + read_from_maker, + |_, _| Oracle, + |_, _| async { Ok(Monitor) }, + ) + .await + .unwrap(); + + Self { + order_feed: taker.order_feed_receiver, + } + } +} + +async fn start_both() -> (Maker, Taker) { + init_tracing(); + let oracle_pk: schnorrsig::PublicKey = schnorrsig::PublicKey::from_str( + "ddd4636845a90185991826be5a494cde9f4a6947b1727217afedc6292fa4caf7", + ) + .unwrap(); + + let maker = Maker::start(oracle_pk).await; + let taker = Taker::start(oracle_pk, maker.address).await; + (maker, taker) +} + +async fn in_memory_db() -> SqlitePool { + // Note: Every :memory: database is distinct from every other. So, opening two database + // connections each with the filename ":memory:" will create two independent in-memory + // databases. see: https://www.sqlite.org/inmemorydb.html + let pool = SqlitePool::connect(":memory:").await.unwrap(); + + db::run_migrations(&pool).await.unwrap(); + + pool +} From 6ead1af2c2610d0f1af164209e354c79abd6281f Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Tue, 26 Oct 2021 11:36:58 +1030 Subject: [PATCH 7/7] Remove unwrap Spotted by clippy. --- daemon/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 95daf54..b7bbb0b 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -226,7 +226,7 @@ where tokio::spawn( oracle_ctx .notify_interval(Duration::from_secs(5), || oracle::Sync) - .unwrap(), + .map_err(|e| anyhow::anyhow!(e))?, ); let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr])