diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index c5bf8d2..72d25e5 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -4,23 +4,28 @@ use bdk::bitcoin::secp256k1::schnorrsig; use clap::Clap; use daemon::auth::{self, MAKER_USERNAME}; use daemon::db::{self, load_all_cfds}; -use daemon::model::cfd::{Order, UpdateCfdProposals}; +use daemon::maker_cfd::{FromTaker, NewTakerOnline}; +use daemon::model::cfd::{Cfd, Order, UpdateCfdProposals}; use daemon::model::WalletInfo; +use daemon::oracle::Attestation; use daemon::seed::Seed; use daemon::wallet::Wallet; use daemon::{ bitmex_price_feed, fan_out, housekeeping, logger, maker_cfd, maker_inc_connections, monitor, oracle, wallet_sync, }; +use futures::Future; use rocket::fairing::AdHoc; use rocket_db_pools::Database; +use sqlx::SqlitePool; use std::collections::HashMap; use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; use std::task::Poll; use std::time::Duration; -use tokio::sync::watch; +use tokio::net::TcpListener; +use tokio::sync::watch::{self, Receiver}; use tracing_subscriber::filter::LevelFilter; use xtra::prelude::*; use xtra::spawn::TokioGlobalSpawnExt; @@ -148,10 +153,7 @@ async fn main() -> Result<()> { "ddd4636845a90185991826be5a494cde9f4a6947b1727217afedc6292fa4caf7", )?; - let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::(wallet_info); - let (update_cfd_feed_sender, update_cfd_feed_receiver) = - watch::channel::(HashMap::new()); let figment = rocket::Config::figment() .merge(("databases.maker.url", data_dir.join("maker.sqlite"))) @@ -172,9 +174,7 @@ async fn main() -> Result<()> { tokio::spawn(task); rocket::custom(figment) - .manage(order_feed_receiver) .manage(wallet_feed_receiver) - .manage(update_cfd_feed_receiver) .manage(auth_password) .manage(quote_updates) .manage(bitcoin_network) @@ -191,13 +191,13 @@ async fn main() -> Result<()> { } }, )) - .attach(AdHoc::try_on_ignite( - "Create actors", + .attach(AdHoc::try_on_ignite("Create actors", { move |rocket| async move { let db = match Db::fetch(&rocket) { Some(db) => (**db).clone(), None => return Err(rocket), }; + let mut conn = db.acquire().await.unwrap(); housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn) @@ -206,98 +206,45 @@ async fn main() -> Result<()> { housekeeping::rebroadcast_transactions(&mut conn, &wallet) .await .unwrap(); - let cfds = load_all_cfds(&mut conn).await.unwrap(); - - let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); - let (maker_inc_connections_address, maker_inc_connections_context) = - xtra::Context::new(None); - - let (monitor_actor_address, mut monitor_actor_context) = xtra::Context::new(None); - let (oracle_actor_address, mut oracle_actor_context) = xtra::Context::new(None); - - let mut conn = db.acquire().await.unwrap(); - let cfds = load_all_cfds(&mut conn).await.unwrap(); - let cfd_maker_actor_inbox = maker_cfd::Actor::new( + let ActorSystem { + cfd_actor_addr, + cfd_feed_receiver, + order_feed_receiver, + update_cfd_feed_receiver, + } = ActorSystem::new( db, wallet.clone(), oracle, - cfd_feed_sender, - order_feed_sender, - update_cfd_feed_sender, - maker_inc_connections_address.clone(), - monitor_actor_address.clone(), - oracle_actor_address.clone(), - ) - .create(None) - .spawn_global(); - - tokio::spawn( - maker_inc_connections_context.run(maker_inc_connections::Actor::new( - &cfd_maker_actor_inbox, - &cfd_maker_actor_inbox, - )), - ); - tokio::spawn( - monitor_actor_context - .notify_interval(Duration::from_secs(20), || monitor::Sync) - .unwrap(), - ); - tokio::spawn( - monitor_actor_context.run( - monitor::Actor::new( - opts.network.electrum(), - cfd_maker_actor_inbox.clone(), - cfds.clone(), - ) - .await - .unwrap(), - ), - ); - - tokio::spawn( - oracle_actor_context - .notify_interval(Duration::from_secs(5), || oracle::Sync) - .unwrap(), - ); - let actor = fan_out::Actor::new(&[&cfd_maker_actor_inbox, &monitor_actor_address]) - .create(None) - .spawn_global(); - - tokio::spawn(oracle_actor_context.run(oracle::Actor::new(cfds, actor))); - - oracle_actor_address - .do_send_async(oracle::Sync) - .await - .unwrap(); - - let listener_stream = futures::stream::poll_fn(move |ctx| { - let message = match futures::ready!(listener.poll_accept(ctx)) { - Ok((stream, address)) => { - maker_inc_connections::ListenerMessage::NewConnection { - stream, - address, + |cfds, channel| oracle::Actor::new(cfds, channel), + { + |channel, cfds| { + let electrum = opts.network.electrum().to_string(); + async move { + monitor::Actor::new(electrum, channel, cfds.clone()).await } } - Err(e) => maker_inc_connections::ListenerMessage::Error { source: e }, - }; - - Poll::Ready(Some(message)) - }); + }, + |channel0, channel1| maker_inc_connections::Actor::new(channel0, channel1), + listener, + ) + .await; - tokio::spawn(maker_inc_connections_address.attach_stream(listener_stream)); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); let cfd_action_channel = - MessageChannel::::clone_channel(&cfd_maker_actor_inbox); + MessageChannel::::clone_channel(&cfd_actor_addr); let new_order_channel = - MessageChannel::::clone_channel(&cfd_maker_actor_inbox); + MessageChannel::::clone_channel(&cfd_actor_addr); + Ok(rocket + .manage(order_feed_receiver) + .manage(update_cfd_feed_receiver) .manage(cfd_action_channel) .manage(new_order_channel) .manage(cfd_feed_receiver)) - }, - )) + } + })) .mount( "/api", rocket::routes![ @@ -318,3 +265,118 @@ async fn main() -> Result<()> { Ok(()) } + +pub struct ActorSystem { + cfd_actor_addr: Address>, + cfd_feed_receiver: Receiver>, + order_feed_receiver: Receiver>, + update_cfd_feed_receiver: Receiver, +} + +impl ActorSystem +where + O: xtra::Handler + + xtra::Handler + + xtra::Handler + + xtra::Handler, + M: xtra::Handler + + xtra::Handler + + xtra::Handler, + T: xtra::Handler + + xtra::Handler + + xtra::Handler, +{ + pub async fn new( + db: SqlitePool, + wallet: Wallet, + oracle_pk: schnorrsig::PublicKey, + oracle_constructor: impl Fn(Vec, Box>) -> O, + monitor_constructor: impl Fn(Box>, Vec) -> F, + inc_conn_constructor: impl Fn( + Box>, + Box>, + ) -> T, + listener: TcpListener, + ) -> Self + where + F: Future>, + { + let mut conn = db.acquire().await.unwrap(); + + let cfds = load_all_cfds(&mut conn).await.unwrap(); + + let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); + let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); + let (update_cfd_feed_sender, update_cfd_feed_receiver) = + watch::channel::(HashMap::new()); + + let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None); + let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None); + let (inc_conn_addr, inc_conn_ctx) = xtra::Context::new(None); + + let cfd_actor_addr = maker_cfd::Actor::new( + db, + wallet, + oracle_pk, + cfd_feed_sender, + order_feed_sender, + update_cfd_feed_sender, + inc_conn_addr.clone(), + monitor_addr, + oracle_addr.clone(), + ) + .create(None) + .spawn_global(); + + tokio::spawn(inc_conn_ctx.run(inc_conn_constructor( + Box::new(cfd_actor_addr.clone()), + Box::new(cfd_actor_addr.clone()), + ))); + + tokio::spawn( + monitor_ctx + .notify_interval(Duration::from_secs(20), || monitor::Sync) + .unwrap(), + ); + tokio::spawn( + monitor_ctx.run( + monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()) + .await + .unwrap(), + ), + ); + + tokio::spawn( + oracle_ctx + .notify_interval(Duration::from_secs(5), || oracle::Sync) + .unwrap(), + ); + let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &cfd_actor_addr]) + .create(None) + .spawn_global(); + + tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor)))); + + oracle_addr.do_send_async(oracle::Sync).await.unwrap(); + + let listener_stream = futures::stream::poll_fn(move |ctx| { + let message = match futures::ready!(listener.poll_accept(ctx)) { + Ok((stream, address)) => { + maker_inc_connections::ListenerMessage::NewConnection { stream, address } + } + Err(e) => maker_inc_connections::ListenerMessage::Error { source: e }, + }; + + Poll::Ready(Some(message)) + }); + + tokio::spawn(inc_conn_addr.attach_stream(listener_stream)); + + Self { + cfd_actor_addr, + cfd_feed_receiver, + order_feed_receiver, + update_cfd_feed_receiver, + } + } +} diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index 6af347f..4e2ef44 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -70,8 +70,8 @@ pub struct Actor { impl Actor { pub fn new( - new_taker_channel: &impl MessageChannel, - taker_msg_channel: &impl MessageChannel, + new_taker_channel: Box>, + taker_msg_channel: Box>, ) -> Self { Self { write_connections: HashMap::new(), diff --git a/daemon/src/monitor.rs b/daemon/src/monitor.rs index 8c0890c..0221557 100644 --- a/daemon/src/monitor.rs +++ b/daemon/src/monitor.rs @@ -50,11 +50,11 @@ pub struct Actor { impl Actor { pub async fn new( - electrum_rpc_url: &str, - event_channel: impl StrongMessageChannel + 'static, + electrum_rpc_url: String, + event_channel: Box>, cfds: Vec, ) -> Result { - let client = bdk::electrum_client::Client::new(electrum_rpc_url) + let client = bdk::electrum_client::Client::new(&electrum_rpc_url) .context("Failed to initialize Electrum RPC client")?; // Initially fetch the latest block for storing the height. @@ -65,7 +65,7 @@ impl Actor { let mut actor = Self { cfds: HashMap::new(), - event_channel: Box::new(event_channel), + event_channel, client, latest_block_height: BlockHeight::try_from(latest_block)?, current_status: BTreeMap::default(), diff --git a/daemon/src/oracle.rs b/daemon/src/oracle.rs index 3c5f63b..c95cfb6 100644 --- a/daemon/src/oracle.rs +++ b/daemon/src/oracle.rs @@ -67,7 +67,7 @@ struct NewAttestationFetched { impl Actor { pub fn new( cfds: Vec, - attestation_channel: impl StrongMessageChannel + 'static, + attestation_channel: Box>, ) -> Self { let mut pending_attestations = HashSet::new(); @@ -101,7 +101,7 @@ impl Actor { announcements: HashMap::new(), pending_announcements: HashSet::new(), pending_attestations, - attestation_channel: Box::new(attestation_channel), + attestation_channel, } } } diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index bc2acef..4bc6c12 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -240,8 +240,8 @@ async fn main() -> Result<()> { tokio::spawn( monitor_actor_context.run( monitor::Actor::new( - opts.network.electrum(), - cfd_actor_inbox.clone(), + opts.network.electrum().to_string(), + Box::new(cfd_actor_inbox.clone()), cfds.clone(), ) .await @@ -258,7 +258,7 @@ async fn main() -> Result<()> { .create(None) .spawn_global(); - tokio::spawn(oracle_actor_context.run(oracle::Actor::new(cfds, actor))); + tokio::spawn(oracle_actor_context.run(oracle::Actor::new(cfds, Box::new(actor)))); oracle_actor_address .do_send_async(oracle::Sync)