From 532c56acb29075ebafcfd3f7369241497ebe50d7 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Fri, 22 Oct 2021 14:58:37 +1100 Subject: [PATCH] Move maker's `ActorSystem` to `lib` as `Maker` --- daemon/src/lib.rs | 132 ++++++++++++++++++++++++++++++++++++++++ daemon/src/maker.rs | 142 ++++---------------------------------------- 2 files changed, 143 insertions(+), 131 deletions(-) diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index f2bf2af..f278eeb 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -1,4 +1,21 @@ #![cfg_attr(not(test), warn(clippy::unwrap_used))] +use crate::db::load_all_cfds; +use crate::maker_cfd::{FromTaker, NewTakerOnline}; +use crate::model::cfd::{Cfd, Order, UpdateCfdProposals}; +use crate::oracle::Attestation; +use crate::wallet::Wallet; +use anyhow::Result; +use cfd_protocol::secp256k1_zkp::schnorrsig; +use sqlx::SqlitePool; +use std::collections::HashMap; +use std::future::Future; +use std::task::Poll; +use std::time::Duration; +use tokio::net::TcpListener; +use tokio::sync::watch; +use xtra::message_channel::{MessageChannel, StrongMessageChannel}; +use xtra::spawn::TokioGlobalSpawnExt; +use xtra::{Actor, Address}; pub mod actors; pub mod auth; @@ -28,3 +45,118 @@ pub mod try_continue; pub mod wallet; pub mod wallet_sync; pub mod wire; + +pub struct Maker { + pub cfd_actor_addr: Address>, + pub cfd_feed_receiver: watch::Receiver>, + pub order_feed_receiver: watch::Receiver>, + pub update_cfd_feed_receiver: watch::Receiver, +} + +impl Maker +where + O: xtra::Handler + + xtra::Handler + + xtra::Handler + + xtra::Handler, + M: xtra::Handler + + xtra::Handler + + xtra::Handler + + xtra::Handler, + T: xtra::Handler + + xtra::Handler + + xtra::Handler, +{ + pub async fn new( + db: SqlitePool, + wallet: Wallet, + oracle_pk: schnorrsig::PublicKey, + oracle_constructor: impl Fn(Vec, Box>) -> O, + monitor_constructor: impl Fn(Box>, Vec) -> F, + inc_conn_constructor: impl Fn( + Box>, + Box>, + ) -> T, + listener: TcpListener, + term: time::Duration, + ) -> Result + where + F: Future>, + { + let mut conn = db.acquire().await?; + + let cfds = load_all_cfds(&mut conn).await?; + + let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); + let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); + let (update_cfd_feed_sender, update_cfd_feed_receiver) = + watch::channel::(HashMap::new()); + + let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None); + let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None); + let (inc_conn_addr, inc_conn_ctx) = xtra::Context::new(None); + + let cfd_actor_addr = maker_cfd::Actor::new( + db, + wallet, + term, + oracle_pk, + cfd_feed_sender, + order_feed_sender, + update_cfd_feed_sender, + inc_conn_addr.clone(), + monitor_addr.clone(), + oracle_addr.clone(), + ) + .create(None) + .spawn_global(); + + tokio::spawn(inc_conn_ctx.run(inc_conn_constructor( + Box::new(cfd_actor_addr.clone()), + Box::new(cfd_actor_addr.clone()), + ))); + + tokio::spawn( + monitor_ctx + .notify_interval(Duration::from_secs(20), || monitor::Sync) + .map_err(|e| anyhow::anyhow!(e))?, + ); + tokio::spawn( + monitor_ctx + .run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?), + ); + + tokio::spawn( + oracle_ctx + .notify_interval(Duration::from_secs(5), || oracle::Sync) + .map_err(|e| anyhow::anyhow!(e))?, + ); + let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr]) + .create(None) + .spawn_global(); + + tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor)))); + + oracle_addr.do_send_async(oracle::Sync).await?; + + let listener_stream = futures::stream::poll_fn(move |ctx| { + let message = match futures::ready!(listener.poll_accept(ctx)) { + Ok((stream, address)) => { + maker_inc_connections::ListenerMessage::NewConnection { stream, address } + } + Err(e) => maker_inc_connections::ListenerMessage::Error { source: e }, + }; + + Poll::Ready(Some(message)) + }); + + tokio::spawn(inc_conn_addr.attach_stream(listener_stream)); + + Ok(Self { + cfd_actor_addr, + cfd_feed_receiver, + order_feed_receiver, + update_cfd_feed_receiver, + }) + } +} diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index eac861e..5575419 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -3,31 +3,27 @@ use bdk::bitcoin; use bdk::bitcoin::secp256k1::schnorrsig; use clap::Clap; use daemon::auth::{self, MAKER_USERNAME}; -use daemon::db::{self, load_all_cfds}; -use daemon::maker_cfd::{FromTaker, NewTakerOnline}; -use daemon::model::cfd::{Cfd, Order, UpdateCfdProposals}; +use daemon::db::{self}; + use daemon::model::WalletInfo; -use daemon::oracle::Attestation; + use daemon::seed::Seed; use daemon::wallet::Wallet; use daemon::{ - bitmex_price_feed, fan_out, housekeeping, logger, maker_cfd, maker_inc_connections, monitor, - oracle, wallet_sync, + bitmex_price_feed, housekeeping, logger, maker_cfd, maker_inc_connections, monitor, oracle, + wallet_sync, Maker, }; -use futures::Future; + use sqlx::sqlite::SqliteConnectOptions; use sqlx::SqlitePool; -use std::collections::HashMap; +use xtra::prelude::MessageChannel; + use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; -use std::task::Poll; -use std::time::Duration; -use tokio::net::TcpListener; + use tokio::sync::watch; use tracing_subscriber::filter::LevelFilter; -use xtra::prelude::*; -use xtra::spawn::TokioGlobalSpawnExt; mod routes_maker; @@ -189,12 +185,12 @@ async fn main() -> Result<()> { housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn).await?; housekeeping::rebroadcast_transactions(&mut conn, &wallet).await?; - let ActorSystem { + let Maker { cfd_actor_addr, cfd_feed_receiver, order_feed_receiver, update_cfd_feed_receiver, - } = ActorSystem::new( + } = Maker::new( db.clone(), wallet.clone(), oracle, @@ -248,119 +244,3 @@ async fn main() -> Result<()> { Ok(()) } - -pub struct ActorSystem { - cfd_actor_addr: Address>, - cfd_feed_receiver: watch::Receiver>, - order_feed_receiver: watch::Receiver>, - update_cfd_feed_receiver: watch::Receiver, -} - -impl ActorSystem -where - O: xtra::Handler - + xtra::Handler - + xtra::Handler - + xtra::Handler, - M: xtra::Handler - + xtra::Handler - + xtra::Handler - + xtra::Handler, - T: xtra::Handler - + xtra::Handler - + xtra::Handler, -{ - #[allow(clippy::too_many_arguments)] - pub async fn new( - db: SqlitePool, - wallet: Wallet, - oracle_pk: schnorrsig::PublicKey, - oracle_constructor: impl Fn(Vec, Box>) -> O, - monitor_constructor: impl Fn(Box>, Vec) -> F, - inc_conn_constructor: impl Fn( - Box>, - Box>, - ) -> T, - listener: TcpListener, - term: time::Duration, - ) -> Result - where - F: Future>, - { - let mut conn = db.acquire().await?; - - let cfds = load_all_cfds(&mut conn).await?; - - let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); - let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); - let (update_cfd_feed_sender, update_cfd_feed_receiver) = - watch::channel::(HashMap::new()); - - let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None); - let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None); - let (inc_conn_addr, inc_conn_ctx) = xtra::Context::new(None); - - let cfd_actor_addr = maker_cfd::Actor::new( - db, - wallet, - term, - oracle_pk, - cfd_feed_sender, - order_feed_sender, - update_cfd_feed_sender, - inc_conn_addr.clone(), - monitor_addr.clone(), - oracle_addr.clone(), - ) - .create(None) - .spawn_global(); - - tokio::spawn(inc_conn_ctx.run(inc_conn_constructor( - Box::new(cfd_actor_addr.clone()), - Box::new(cfd_actor_addr.clone()), - ))); - - tokio::spawn( - monitor_ctx - .notify_interval(Duration::from_secs(20), || monitor::Sync) - .map_err(|e| anyhow::anyhow!(e))?, - ); - tokio::spawn( - monitor_ctx - .run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?), - ); - - tokio::spawn( - oracle_ctx - .notify_interval(Duration::from_secs(5), || oracle::Sync) - .map_err(|e| anyhow::anyhow!(e))?, - ); - let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr]) - .create(None) - .spawn_global(); - - tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor)))); - - oracle_addr.do_send_async(oracle::Sync).await?; - - let listener_stream = futures::stream::poll_fn(move |ctx| { - let message = match futures::ready!(listener.poll_accept(ctx)) { - Ok((stream, address)) => { - maker_inc_connections::ListenerMessage::NewConnection { stream, address } - } - Err(e) => maker_inc_connections::ListenerMessage::Error { source: e }, - }; - - Poll::Ready(Some(message)) - }); - - tokio::spawn(inc_conn_addr.attach_stream(listener_stream)); - - Ok(Self { - cfd_actor_addr, - cfd_feed_receiver, - order_feed_receiver, - update_cfd_feed_receiver, - }) - } -}