diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index f278eeb..c4fac5b 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -6,6 +6,7 @@ use crate::oracle::Attestation; use crate::wallet::Wallet; use anyhow::Result; use cfd_protocol::secp256k1_zkp::schnorrsig; +use futures::Stream; use sqlx::SqlitePool; use std::collections::HashMap; use std::future::Future; @@ -160,3 +161,91 @@ where }) } } + +pub struct Taker { + pub cfd_actor_addr: Address>, + pub cfd_feed_receiver: watch::Receiver>, + pub order_feed_receiver: watch::Receiver>, + pub update_cfd_feed_receiver: watch::Receiver, +} + +impl Taker +where + O: xtra::Handler + + xtra::Handler + + xtra::Handler + + xtra::Handler, + M: xtra::Handler + + xtra::Handler + + xtra::Handler + + xtra::Handler, +{ + pub async fn new( + db: SqlitePool, + wallet: Wallet, + oracle_pk: schnorrsig::PublicKey, + send_to_maker: Box>, + read_from_maker: Box + Unpin + Send>, + oracle_constructor: impl Fn(Vec, Box>) -> O, + monitor_constructor: impl Fn(Box>, Vec) -> F, + ) -> Result + where + F: Future>, + { + let mut conn = db.acquire().await?; + + let cfds = load_all_cfds(&mut conn).await?; + + let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); + let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); + let (update_cfd_feed_sender, update_cfd_feed_receiver) = + watch::channel::(HashMap::new()); + + let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None); + let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None); + + let cfd_actor_addr = taker_cfd::Actor::new( + db, + wallet, + oracle_pk, + cfd_feed_sender, + order_feed_sender, + update_cfd_feed_sender, + send_to_maker, + monitor_addr.clone(), + oracle_addr, + ) + .create(None) + .spawn_global(); + + tokio::spawn(cfd_actor_addr.clone().attach_stream(read_from_maker)); + + tokio::spawn( + monitor_ctx + .notify_interval(Duration::from_secs(20), || monitor::Sync) + .map_err(|e| anyhow::anyhow!(e))?, + ); + tokio::spawn( + monitor_ctx + .run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?), + ); + + tokio::spawn( + oracle_ctx + .notify_interval(Duration::from_secs(5), || oracle::Sync) + .unwrap(), + ); + let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr]) + .create(None) + .spawn_global(); + + tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor)))); + + Ok(Self { + cfd_actor_addr, + cfd_feed_receiver, + order_feed_receiver, + update_cfd_feed_receiver, + }) + } +} diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index cb211ac..41f3e8c 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -2,28 +2,26 @@ use anyhow::{Context, Result}; use bdk::bitcoin; use bdk::bitcoin::secp256k1::schnorrsig; use clap::Clap; -use daemon::db::{self, load_all_cfds}; -use daemon::model::cfd::{Cfd, Order, UpdateCfdProposals}; +use daemon::db::{self}; + use daemon::model::WalletInfo; -use daemon::oracle::Attestation; + use daemon::seed::Seed; use daemon::wallet::Wallet; use daemon::{ - bitmex_price_feed, fan_out, housekeeping, logger, monitor, oracle, taker_cfd, wallet_sync, wire, + bitmex_price_feed, housekeeping, logger, monitor, oracle, taker_cfd, wallet_sync, Taker, }; -use futures::{Future, Stream}; + use sqlx::sqlite::SqliteConnectOptions; use sqlx::SqlitePool; -use std::collections::HashMap; + use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; -use std::time::Duration; + use tokio::sync::watch; use tracing_subscriber::filter::LevelFilter; -use xtra::prelude::{MessageChannel, StrongMessageChannel}; -use xtra::spawn::TokioGlobalSpawnExt; -use xtra::{Actor, Address}; +use xtra::prelude::MessageChannel; mod connection; mod routes_taker; @@ -167,12 +165,12 @@ async fn main() -> Result<()> { read_from_maker, } = connection::Actor::new(opts.maker).await; - let ActorSystem { + let Taker { cfd_actor_addr, cfd_feed_receiver, order_feed_receiver, update_cfd_feed_receiver, - } = ActorSystem::new( + } = Taker::new( db.clone(), wallet.clone(), oracle, @@ -222,91 +220,3 @@ async fn main() -> Result<()> { Ok(()) } - -pub struct ActorSystem { - cfd_actor_addr: Address>, - cfd_feed_receiver: watch::Receiver>, - order_feed_receiver: watch::Receiver>, - update_cfd_feed_receiver: watch::Receiver, -} - -impl ActorSystem -where - O: xtra::Handler - + xtra::Handler - + xtra::Handler - + xtra::Handler, - M: xtra::Handler - + xtra::Handler - + xtra::Handler - + xtra::Handler, -{ - pub async fn new( - db: SqlitePool, - wallet: Wallet, - oracle_pk: schnorrsig::PublicKey, - send_to_maker: Box>, - read_from_maker: Box + Unpin + Send>, - oracle_constructor: impl Fn(Vec, Box>) -> O, - monitor_constructor: impl Fn(Box>, Vec) -> F, - ) -> Result - where - F: Future>, - { - let mut conn = db.acquire().await?; - - let cfds = load_all_cfds(&mut conn).await?; - - let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); - let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); - let (update_cfd_feed_sender, update_cfd_feed_receiver) = - watch::channel::(HashMap::new()); - - let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None); - let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None); - - let cfd_actor_addr = taker_cfd::Actor::new( - db, - wallet, - oracle_pk, - cfd_feed_sender, - order_feed_sender, - update_cfd_feed_sender, - send_to_maker, - monitor_addr.clone(), - oracle_addr, - ) - .create(None) - .spawn_global(); - - tokio::spawn(cfd_actor_addr.clone().attach_stream(read_from_maker)); - - tokio::spawn( - monitor_ctx - .notify_interval(Duration::from_secs(20), || monitor::Sync) - .map_err(|e| anyhow::anyhow!(e))?, - ); - tokio::spawn( - monitor_ctx - .run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?), - ); - - tokio::spawn( - oracle_ctx - .notify_interval(Duration::from_secs(5), || oracle::Sync) - .unwrap(), - ); - let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr]) - .create(None) - .spawn_global(); - - tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor)))); - - Ok(Self { - cfd_actor_addr, - cfd_feed_receiver, - order_feed_receiver, - update_cfd_feed_receiver, - }) - } -}