diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 40d0234..236445b 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -10,7 +10,6 @@ use connection::ConnectionStatus; use futures::future::RemoteHandle; use maia::secp256k1_zkp::schnorrsig; use maker_cfd::TakerDisconnected; -use model::TakerId; use sqlx::SqlitePool; use std::future::Future; use std::time::Duration; @@ -91,7 +90,6 @@ impl Default for Tasks { pub struct MakerActorSystem { pub cfd_actor_addr: Address>, - pub connected_takers_feed_receiver: watch::Receiver>, pub inc_conn_addr: Address, pub tasks: Tasks, } @@ -135,9 +133,6 @@ where let cfds = load_all_cfds(&mut conn).await?; - let (connected_takers_feed_sender, connected_takers_feed_receiver) = - watch::channel::>(Vec::new()); - let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None); let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None); let (inc_conn_addr, inc_conn_ctx) = xtra::Context::new(None); @@ -150,7 +145,6 @@ where settlement_interval, oracle_pk, projection_actor, - connected_takers_feed_sender, inc_conn_addr.clone(), monitor_addr.clone(), oracle_addr.clone(), @@ -196,7 +190,6 @@ where Ok(Self { cfd_actor_addr, - connected_takers_feed_receiver, inc_conn_addr, tasks, }) diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 11e83ab..19eb9bd 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -7,7 +7,7 @@ use daemon::auth::{self, MAKER_USERNAME}; use daemon::bitmex_price_feed::Quote; use daemon::db::load_all_cfds; use daemon::model::cfd::{Order, UpdateCfdProposals}; -use daemon::model::WalletInfo; +use daemon::model::{TakerId, WalletInfo}; use daemon::seed::Seed; use daemon::tokio_ext::FutureExt; use daemon::{ @@ -23,7 +23,6 @@ use std::path::PathBuf; use std::str::FromStr; use std::task::Poll; use tokio::sync::watch; -use tokio::sync::watch::channel; use tracing_subscriber::filter::LevelFilter; use xtra::prelude::*; use xtra::Actor; @@ -246,7 +245,6 @@ async fn main() -> Result<()> { let MakerActorSystem { cfd_actor_addr, - connected_takers_feed_receiver, inc_conn_addr: incoming_connection_addr, tasks: _tasks, } = MakerActorSystem::new( @@ -279,17 +277,20 @@ async fn main() -> Result<()> { tasks.add(task); let cfds = load_all_cfds(&mut conn).await?; - let (cfd_feed_sender, cfd_feed_receiver) = channel(cfds.clone()); - let (order_feed_sender, order_feed_receiver) = channel::>(None); + let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); + let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); let (update_cfd_feed_sender, update_cfd_feed_receiver) = - channel::(HashMap::new()); - let (quote_sender, quote_receiver) = channel::(init_quote); + watch::channel::(HashMap::new()); + let (quote_sender, quote_receiver) = watch::channel::(init_quote); + let (connected_takers_feed_sender, connected_takers_feed_receiver) = + watch::channel::>(Vec::new()); tasks.add(projection_context.run(projection::Actor::new( cfd_feed_sender, order_feed_sender, quote_sender, update_cfd_feed_sender, + connected_takers_feed_sender, ))); let listener_stream = futures::stream::poll_fn(move |ctx| { diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 63fa866..10fcc2c 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -6,6 +6,7 @@ use crate::model::cfd::{ }; use crate::model::{Price, TakerId, Timestamp, Usd}; use crate::monitor::MonitorParams; +use crate::projection::Update; use crate::setup_contract::{RolloverParams, SetupParams}; use crate::tokio_ext::FutureExt; use crate::{ @@ -23,7 +24,6 @@ use sqlx::pool::PoolConnection; use sqlx::Sqlite; use std::collections::{HashMap, HashSet}; use time::Duration; -use tokio::sync::watch; use xtra::prelude::*; pub enum CfdAction { @@ -71,7 +71,6 @@ pub struct Actor { settlement_interval: Duration, oracle_pk: schnorrsig::PublicKey, projection_actor: Address, - connected_takers_feed_sender: watch::Sender>, takers: Address, current_order_id: Option, monitor_actor: Address, @@ -81,7 +80,6 @@ pub struct Actor { // Maker needs to also store TakerId to be able to send a reply back current_pending_proposals: HashMap, current_agreed_proposals: HashMap, - // TODO: Keep in projection actor connected_takers: HashSet, n_payouts: usize, } @@ -112,7 +110,6 @@ impl Actor { settlement_interval: Duration, oracle_pk: schnorrsig::PublicKey, projection_actor: Address, - connected_takers_feed_sender: watch::Sender>, takers: Address, monitor_actor: Address, oracle_actor: Address, @@ -124,7 +121,6 @@ impl Actor { settlement_interval, oracle_pk, projection_actor, - connected_takers_feed_sender, takers, current_order_id: None, monitor_actor, @@ -287,6 +283,18 @@ impl Actor { }; Ok((proposal.clone(), *taker_id)) } + + async fn update_connected_takers(&mut self) -> Result<()> { + self.projection_actor + .send(Update( + self.connected_takers + .clone() + .into_iter() + .collect::>(), + )) + .await?; + Ok(()) + } } impl Actor @@ -346,9 +354,7 @@ where if !self.connected_takers.insert(taker_id) { tracing::warn!("Taker already connected: {:?}", &taker_id); } - self.connected_takers_feed_sender - .send(self.connected_takers.clone().into_iter().collect())?; - + self.update_connected_takers().await?; Ok(()) } @@ -356,8 +362,7 @@ where if !self.connected_takers.remove(&taker_id) { tracing::warn!("Removed unknown taker: {:?}", &taker_id); } - self.connected_takers_feed_sender - .send(self.connected_takers.clone().into_iter().collect())?; + self.update_connected_takers().await?; Ok(()) } diff --git a/daemon/src/projection.rs b/daemon/src/projection.rs index 1af3eac..5d69af8 100644 --- a/daemon/src/projection.rs +++ b/daemon/src/projection.rs @@ -1,4 +1,5 @@ use crate::bitmex_price_feed::Quote; +use crate::model::TakerId; use crate::{Cfd, Order, UpdateCfdProposals}; use tokio::sync::watch; use xtra_productivity::xtra_productivity; @@ -8,6 +9,9 @@ pub struct Actor { tx_order: watch::Sender>, tx_quote: watch::Sender, tx_settlements: watch::Sender, + // TODO: Use this channel to communicate maker status as well with generic + // ID of connected counterparties + tx_connected_takers: watch::Sender>, } impl Actor { @@ -16,12 +20,14 @@ impl Actor { tx_order: watch::Sender>, tx_quote: watch::Sender, tx_settlements: watch::Sender, + tx_connected_takers: watch::Sender>, ) -> Self { Self { tx_cfds, tx_order, tx_quote, tx_settlements, + tx_connected_takers, } } } @@ -42,6 +48,9 @@ impl Actor { fn handle(&mut self, msg: Update) { let _ = self.tx_settlements.send(msg.0); } + fn handle(&mut self, msg: Update>) { + let _ = self.tx_connected_takers.send(msg.0); + } } impl xtra::Actor for Actor {} diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 28c424e..b070b84 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -7,7 +7,7 @@ use daemon::bitmex_price_feed::Quote; use daemon::connection::connect; use daemon::db::load_all_cfds; use daemon::model::cfd::{Order, UpdateCfdProposals}; -use daemon::model::WalletInfo; +use daemon::model::{TakerId, WalletInfo}; use daemon::seed::Seed; use daemon::tokio_ext::FutureExt; use daemon::{ @@ -264,11 +264,18 @@ async fn main() -> Result<()> { channel::(HashMap::new()); let (quote_sender, quote_receiver) = channel::(init_quote); + // TODO: Use this channel to convey maker status. + // For now, the receiver is dropped instead of managed by Rocket to + // highlight that we're not using it + let (connected_takers_feed_sender, _connected_takers_feed_receiver) = + watch::channel::>(vec![]); + tasks.add(projection_context.run(projection::Actor::new( cfd_feed_sender, order_feed_sender, quote_sender, update_cfd_feed_sender, + connected_takers_feed_sender, ))); let possible_addresses = resolve_maker_addresses(&opts.maker).await?; diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index 600e252..857bc82 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -20,7 +20,6 @@ use std::task::Poll; use std::time::Duration; use tokio::net::TcpListener; use tokio::sync::watch; -use tokio::sync::watch::channel; use tracing::subscriber::DefaultGuard; use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::util::SubscriberInitExt; @@ -59,6 +58,7 @@ pub struct Maker { pub identity_pk: x25519_dalek::PublicKey, cfd_feed_receiver: watch::Receiver>, order_feed_receiver: watch::Receiver>, + connected_takers_feed_receiver: watch::Receiver>, _tasks: Tasks, } @@ -72,7 +72,7 @@ impl Maker { } pub fn connected_takers_feed(&mut self) -> &mut watch::Receiver> { - &mut self.system.connected_takers_feed_receiver + &mut self.connected_takers_feed_receiver } pub async fn start( @@ -126,17 +126,20 @@ impl Maker { ask: Price::new(dec!(10000)).unwrap(), }; - let (cfd_feed_sender, cfd_feed_receiver) = channel(vec![]); - let (order_feed_sender, order_feed_receiver) = channel::>(None); + let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(vec![]); + let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); let (update_cfd_feed_sender, _update_cfd_feed_receiver) = - channel::(HashMap::new()); - let (quote_sender, _) = channel::(dummy_quote); + watch::channel::(HashMap::new()); + let (quote_sender, _) = watch::channel::(dummy_quote); + let (connected_takers_feed_sender, connected_takers_feed_receiver) = + watch::channel::>(vec![]); tasks.add(projection_context.run(projection::Actor::new( cfd_feed_sender, order_feed_sender, quote_sender, update_cfd_feed_sender, + connected_takers_feed_sender, ))); let address = listener.local_addr().unwrap(); @@ -162,6 +165,7 @@ impl Maker { _tasks: tasks, cfd_feed_receiver, order_feed_receiver, + connected_takers_feed_receiver, } } @@ -261,16 +265,20 @@ impl Taker { ask: Price::new(dec!(10000)).unwrap(), }; - let (cfd_feed_sender, cfd_feed_receiver) = channel(vec![]); - let (order_feed_sender, order_feed_receiver) = channel::>(None); - let (update_cfd_feed_sender, _) = channel::(HashMap::new()); - let (quote_sender, _) = channel::(dummy_quote); + let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(vec![]); + let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); + let (update_cfd_feed_sender, _) = watch::channel::(HashMap::new()); + let (quote_sender, _) = watch::channel::(dummy_quote); + + let (connected_takers_feed_sender, _connected_takers_feed_receiver) = + watch::channel::>(vec![]); tasks.add(projection_context.run(projection::Actor::new( cfd_feed_sender, order_feed_sender, quote_sender, update_cfd_feed_sender, + connected_takers_feed_sender, ))); tasks.add(connect(