From ee9d81e7bc716bedf65dd9944a34d70c9ab1f248 Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Thu, 25 Nov 2021 14:12:05 +1030 Subject: [PATCH] Refactor projection actor before adding new functionality Create watch channels within the actor, returning a single struct called `Feeds`. Manage the struct with Rocket and use it in SSE events. --- daemon/src/maker.rs | 30 +++----------- daemon/src/projection.rs | 78 ++++++++++++++++++++++++------------- daemon/src/routes_maker.rs | 24 ++++++------ daemon/src/routes_taker.rs | 23 ++++++----- daemon/src/taker.rs | 34 +++------------- daemon/tests/harness/mod.rs | 62 +++++++---------------------- 6 files changed, 99 insertions(+), 152 deletions(-) diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 0b74080..75c8db6 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -4,10 +4,8 @@ use bdk::bitcoin::Amount; use bdk::{bitcoin, FeeRate}; use clap::{Parser, Subcommand}; use daemon::auth::{self, MAKER_USERNAME}; -use daemon::bitmex_price_feed::Quote; use daemon::db::load_all_cfds; -use daemon::model::cfd::{Order, UpdateCfdProposals}; -use daemon::model::{TakerId, WalletInfo}; +use daemon::model::WalletInfo; use daemon::seed::Seed; use daemon::tokio_ext::FutureExt; use daemon::{ @@ -17,7 +15,6 @@ use daemon::{ }; use sqlx::sqlite::SqliteConnectOptions; use sqlx::SqlitePool; -use std::collections::HashMap; use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; @@ -272,27 +269,14 @@ async fn main() -> Result<()> { let (task, init_quote) = bitmex_price_feed::new(projection_actor).await?; tasks.add(task); - // TODO: Move to projection actor let cfds = { let mut conn = db.acquire().await?; load_all_cfds(&mut conn).await? }; - let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); - let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); - let (update_cfd_feed_sender, update_cfd_feed_receiver) = - watch::channel::(HashMap::new()); - let (quote_sender, quote_receiver) = watch::channel::(init_quote); - let (connected_takers_feed_sender, connected_takers_feed_receiver) = - watch::channel::>(Vec::new()); - - tasks.add(projection_context.run(projection::Actor::new( - cfd_feed_sender, - order_feed_sender, - quote_sender, - update_cfd_feed_sender, - connected_takers_feed_sender, - ))); + + let (proj_actor, projection_feeds) = projection::Actor::new(cfds, init_quote); + tasks.add(projection_context.run(proj_actor)); let listener_stream = futures::stream::poll_fn(move |ctx| { let message = match futures::ready!(listener.poll_accept(ctx)) { @@ -309,14 +293,10 @@ async fn main() -> Result<()> { tasks.add(wallet_sync::new(wallet.clone(), wallet_feed_sender)); rocket::custom(figment) - .manage(order_feed_receiver) - .manage(update_cfd_feed_receiver) + .manage(projection_feeds) .manage(cfd_actor_addr) - .manage(cfd_feed_receiver) - .manage(connected_takers_feed_receiver) .manage(wallet_feed_receiver) .manage(auth_password) - .manage(quote_receiver) .manage(bitcoin_network) .manage(wallet) .mount( diff --git a/daemon/src/projection.rs b/daemon/src/projection.rs index 5d69af8..ede1e52 100644 --- a/daemon/src/projection.rs +++ b/daemon/src/projection.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use crate::bitmex_price_feed::Quote; use crate::model::TakerId; use crate::{Cfd, Order, UpdateCfdProposals}; @@ -5,51 +7,75 @@ use tokio::sync::watch; use xtra_productivity::xtra_productivity; pub struct Actor { - tx_cfds: watch::Sender>, - tx_order: watch::Sender>, - tx_quote: watch::Sender, - tx_settlements: watch::Sender, - // TODO: Use this channel to communicate maker status as well with generic - // ID of connected counterparties - tx_connected_takers: watch::Sender>, + tx: Tx, +} + +pub struct Feeds { + pub cfds: watch::Receiver>, + pub order: watch::Receiver>, + pub quote: watch::Receiver, + pub settlements: watch::Receiver, + pub connected_takers: watch::Receiver>, } impl Actor { - pub fn new( - tx_cfds: watch::Sender>, - tx_order: watch::Sender>, - tx_quote: watch::Sender, - tx_settlements: watch::Sender, - tx_connected_takers: watch::Sender>, - ) -> Self { - Self { - tx_cfds, - tx_order, - tx_quote, - tx_settlements, - tx_connected_takers, - } + pub fn new(init_cfds: Vec, init_quote: Quote) -> (Self, Feeds) { + let (tx_cfds, rx_cfds) = watch::channel(init_cfds); + let (tx_order, rx_order) = watch::channel(None); + let (tx_update_cfd_feed, rx_update_cfd_feed) = watch::channel(HashMap::new()); + let (tx_quote, rx_quote) = watch::channel(init_quote); + let (tx_connected_takers, rx_connected_takers) = watch::channel(Vec::new()); + + ( + Self { + tx: Tx { + cfds: tx_cfds, + order: tx_order, + quote: tx_quote, + settlements: tx_update_cfd_feed, + connected_takers: tx_connected_takers, + }, + }, + Feeds { + cfds: rx_cfds, + order: rx_order, + quote: rx_quote, + settlements: rx_update_cfd_feed, + connected_takers: rx_connected_takers, + }, + ) } } +/// Internal struct to keep all the senders around in one place +struct Tx { + pub cfds: watch::Sender>, + pub order: watch::Sender>, + pub quote: watch::Sender, + pub settlements: watch::Sender, + // TODO: Use this channel to communicate maker status as well with generic + // ID of connected counterparties + pub connected_takers: watch::Sender>, +} + pub struct Update(pub T); #[xtra_productivity] impl Actor { fn handle(&mut self, msg: Update>) { - let _ = self.tx_cfds.send(msg.0); + let _ = self.tx.cfds.send(msg.0); } fn handle(&mut self, msg: Update>) { - let _ = self.tx_order.send(msg.0); + let _ = self.tx.order.send(msg.0); } fn handle(&mut self, msg: Update) { - let _ = self.tx_quote.send(msg.0); + let _ = self.tx.quote.send(msg.0); } fn handle(&mut self, msg: Update) { - let _ = self.tx_settlements.send(msg.0); + let _ = self.tx.settlements.send(msg.0); } fn handle(&mut self, msg: Update>) { - let _ = self.tx_connected_takers.send(msg.0); + let _ = self.tx.connected_takers.send(msg.0); } } diff --git a/daemon/src/routes_maker.rs b/daemon/src/routes_maker.rs index a209280..3e06b9a 100644 --- a/daemon/src/routes_maker.rs +++ b/daemon/src/routes_maker.rs @@ -1,11 +1,12 @@ use anyhow::Result; use bdk::bitcoin::Network; use daemon::auth::Authenticated; -use daemon::model::cfd::{Cfd, Order, OrderId, Role, UpdateCfdProposals}; -use daemon::model::{Price, TakerId, Usd, WalletInfo}; +use daemon::model::cfd::{OrderId, Role}; +use daemon::model::{Price, Usd, WalletInfo}; +use daemon::projection::Feeds; use daemon::routes::EmbeddedFileExt; use daemon::to_sse_event::{CfdAction, CfdsWithAuxData, ToSseEvent}; -use daemon::{bitmex_price_feed, maker_cfd, maker_inc_connections, monitor, oracle, wallet}; +use daemon::{maker_cfd, maker_inc_connections, monitor, oracle, wallet}; use http_api_problem::{HttpApiProblem, StatusCode}; use rocket::http::{ContentType, Header, Status}; use rocket::response::stream::EventStream; @@ -27,21 +28,18 @@ pub type Maker = xtra::Address< #[allow(clippy::too_many_arguments)] #[rocket::get("/feed")] pub async fn maker_feed( - rx_cfds: &State>>, - rx_order: &State>>, + rx: &State, rx_wallet: &State>, - rx_quote: &State>, - rx_settlements: &State>, - rx_connected_takers: &State>>, network: &State, _auth: Authenticated, ) -> EventStream![] { - let mut rx_cfds = rx_cfds.inner().clone(); - let mut rx_order = rx_order.inner().clone(); + let rx = rx.inner(); + let mut rx_cfds = rx.cfds.clone(); + let mut rx_order = rx.order.clone(); let mut rx_wallet = rx_wallet.inner().clone(); - let mut rx_quote = rx_quote.inner().clone(); - let mut rx_settlements = rx_settlements.inner().clone(); - let mut rx_connected_takers = rx_connected_takers.inner().clone(); + let mut rx_quote = rx.quote.clone(); + let mut rx_settlements = rx.settlements.clone(); + let mut rx_connected_takers = rx.connected_takers.clone(); let network = *network.inner(); EventStream! { diff --git a/daemon/src/routes_taker.rs b/daemon/src/routes_taker.rs index ec916b5..0fc32ea 100644 --- a/daemon/src/routes_taker.rs +++ b/daemon/src/routes_taker.rs @@ -1,10 +1,11 @@ use bdk::bitcoin::{Amount, Network}; use daemon::connection::ConnectionStatus; -use daemon::model::cfd::{calculate_long_margin, Cfd, Order, OrderId, Role, UpdateCfdProposals}; +use daemon::model::cfd::{calculate_long_margin, OrderId, Role}; use daemon::model::{Leverage, Price, Usd, WalletInfo}; +use daemon::projection::Feeds; use daemon::routes::EmbeddedFileExt; use daemon::to_sse_event::{CfdAction, CfdsWithAuxData, ToSseEvent}; -use daemon::{bitmex_price_feed, taker_cfd, wallet}; +use daemon::{taker_cfd, wallet}; use http_api_problem::{HttpApiProblem, StatusCode}; use rocket::http::{ContentType, Status}; use rocket::response::stream::EventStream; @@ -21,19 +22,17 @@ use xtra::prelude::*; #[rocket::get("/feed")] pub async fn feed( - rx_cfds: &State>>, - rx_order: &State>>, + rx: &State, rx_wallet: &State>, - rx_quote: &State>, - rx_settlements: &State>, rx_maker_status: &State>, network: &State, ) -> EventStream![] { - let mut rx_cfds = rx_cfds.inner().clone(); - let mut rx_order = rx_order.inner().clone(); + let rx = rx.inner(); + let mut rx_cfds = rx.cfds.clone(); + let mut rx_order = rx.order.clone(); + let mut rx_quote = rx.quote.clone(); + let mut rx_settlements = rx.settlements.clone(); let mut rx_wallet = rx_wallet.inner().clone(); - let mut rx_quote = rx_quote.inner().clone(); - let mut rx_settlements = rx_settlements.inner().clone(); let mut rx_maker_status = rx_maker_status.inner().clone(); let network = *network.inner(); @@ -138,7 +137,7 @@ pub async fn post_cfd_action( id: OrderId, action: CfdAction, cfd_action_channel: &State>>, - quote_updates: &State>, + feeds: &State, ) -> Result, HttpApiProblem> { use taker_cfd::CfdAction::*; let result = match action { @@ -153,7 +152,7 @@ pub async fn post_cfd_action( } CfdAction::Commit => cfd_action_channel.send(Commit { order_id: id }), CfdAction::Settle => { - let current_price = quote_updates.borrow().for_taker(); + let current_price = feeds.quote.borrow().for_taker(); cfd_action_channel.send(ProposeSettlement { order_id: id, current_price, diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index ddc5872..a2db57f 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -3,11 +3,9 @@ use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::{Address, Amount}; use bdk::{bitcoin, FeeRate}; use clap::{Parser, Subcommand}; -use daemon::bitmex_price_feed::Quote; use daemon::connection::connect; use daemon::db::load_all_cfds; -use daemon::model::cfd::{Order, UpdateCfdProposals}; -use daemon::model::{TakerId, WalletInfo}; +use daemon::model::WalletInfo; use daemon::seed::Seed; use daemon::tokio_ext::FutureExt; use daemon::{ @@ -16,13 +14,11 @@ use daemon::{ }; use sqlx::sqlite::SqliteConnectOptions; use sqlx::SqlitePool; -use std::collections::HashMap; use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; use tokio::sync::watch; use tracing_subscriber::filter::LevelFilter; -use watch::channel; use xtra::prelude::MessageChannel; use xtra::Actor; @@ -254,31 +250,14 @@ async fn main() -> Result<()> { let (task, init_quote) = bitmex_price_feed::new(projection_actor).await?; tasks.add(task); - // TODO: Move to projection actor let cfds = { let mut conn = db.acquire().await?; load_all_cfds(&mut conn).await? }; - let (cfd_feed_sender, cfd_feed_receiver) = channel(cfds.clone()); - let (order_feed_sender, order_feed_receiver) = channel::>(None); - let (update_cfd_feed_sender, update_cfd_feed_receiver) = - channel::(HashMap::new()); - let (quote_sender, quote_receiver) = channel::(init_quote); - - // TODO: Use this channel to convey maker status. - // For now, the receiver is dropped instead of managed by Rocket to - // highlight that we're not using it - let (connected_takers_feed_sender, _connected_takers_feed_receiver) = - watch::channel::>(vec![]); - - tasks.add(projection_context.run(projection::Actor::new( - cfd_feed_sender, - order_feed_sender, - quote_sender, - update_cfd_feed_sender, - connected_takers_feed_sender, - ))); + + let (proj_actor, projection_feeds) = projection::Actor::new(cfds, init_quote); + tasks.add(projection_context.run(proj_actor)); let possible_addresses = resolve_maker_addresses(&opts.maker).await?; @@ -294,13 +273,10 @@ async fn main() -> Result<()> { let cfd_action_channel = MessageChannel::::clone_channel(&cfd_actor_addr); let rocket = rocket::custom(figment) - .manage(order_feed_receiver) - .manage(update_cfd_feed_receiver) + .manage(projection_feeds) .manage(take_offer_channel) .manage(cfd_action_channel) - .manage(cfd_feed_receiver) .manage(wallet_feed_receiver) - .manage(quote_receiver) .manage(bitcoin_network) .manage(wallet) .manage(maker_online_status_feed_receiver) diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index 9fea018..f423111 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -4,15 +4,15 @@ use crate::harness::mocks::wallet::WalletActor; use crate::schnorrsig; use daemon::bitmex_price_feed::Quote; use daemon::connection::{connect, ConnectionStatus}; -use daemon::model::cfd::{Cfd, Order, Origin, UpdateCfdProposals}; +use daemon::model::cfd::{Cfd, Order, Origin}; use daemon::model::{Price, TakerId, Timestamp, Usd}; +use daemon::projection::Feeds; use daemon::seed::Seed; use daemon::{ db, maker_cfd, maker_inc_connections, projection, taker_cfd, MakerActorSystem, Tasks, }; use rust_decimal_macros::dec; use sqlx::SqlitePool; -use std::collections::HashMap; use std::net::SocketAddr; use std::str::FromStr; use std::task::Poll; @@ -94,25 +94,23 @@ pub struct Maker { pub system: MakerActorSystem, pub mocks: mocks::Mocks, + pub feeds: Feeds, pub listen_addr: SocketAddr, pub identity_pk: x25519_dalek::PublicKey, - cfd_feed_receiver: watch::Receiver>, - order_feed_receiver: watch::Receiver>, - connected_takers_feed_receiver: watch::Receiver>, _tasks: Tasks, } impl Maker { pub fn cfd_feed(&mut self) -> &mut watch::Receiver> { - &mut self.cfd_feed_receiver + &mut self.feeds.cfds } pub fn order_feed(&mut self) -> &mut watch::Receiver> { - &mut self.order_feed_receiver + &mut self.feeds.order } pub fn connected_takers_feed(&mut self) -> &mut watch::Receiver> { - &mut self.connected_takers_feed_receiver + &mut self.feeds.connected_takers } pub async fn start(config: &MakerConfig, listener: TcpListener) -> Self { @@ -162,21 +160,8 @@ impl Maker { ask: Price::new(dec!(10000)).unwrap(), }; - let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(vec![]); - let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); - let (update_cfd_feed_sender, _update_cfd_feed_receiver) = - watch::channel::(HashMap::new()); - let (quote_sender, _) = watch::channel::(dummy_quote); - let (connected_takers_feed_sender, connected_takers_feed_receiver) = - watch::channel::>(vec![]); - - tasks.add(projection_context.run(projection::Actor::new( - cfd_feed_sender, - order_feed_sender, - quote_sender, - update_cfd_feed_sender, - connected_takers_feed_sender, - ))); + let (proj_actor, feeds) = projection::Actor::new(vec![], dummy_quote); + tasks.add(projection_context.run(proj_actor)); let address = listener.local_addr().unwrap(); @@ -195,13 +180,11 @@ impl Maker { Self { system: maker, + feeds, identity_pk, listen_addr: address, mocks, _tasks: tasks, - cfd_feed_receiver, - order_feed_receiver, - connected_takers_feed_receiver, } } @@ -240,18 +223,17 @@ pub struct Taker { pub id: TakerId, pub system: daemon::TakerActorSystem, pub mocks: mocks::Mocks, - cfd_feed_receiver: watch::Receiver>, - order_feed_receiver: watch::Receiver>, + pub feeds: Feeds, _tasks: Tasks, } impl Taker { pub fn cfd_feed(&mut self) -> &mut watch::Receiver> { - &mut self.cfd_feed_receiver + &mut self.feeds.cfds } pub fn order_feed(&mut self) -> &mut watch::Receiver> { - &mut self.order_feed_receiver + &mut self.feeds.order } pub fn maker_status_feed(&mut self) -> &mut watch::Receiver { @@ -299,21 +281,8 @@ impl Taker { ask: Price::new(dec!(10000)).unwrap(), }; - let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(vec![]); - let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); - let (update_cfd_feed_sender, _) = watch::channel::(HashMap::new()); - let (quote_sender, _) = watch::channel::(dummy_quote); - - let (connected_takers_feed_sender, _connected_takers_feed_receiver) = - watch::channel::>(vec![]); - - tasks.add(projection_context.run(projection::Actor::new( - cfd_feed_sender, - order_feed_sender, - quote_sender, - update_cfd_feed_sender, - connected_takers_feed_sender, - ))); + let (proj_actor, feeds) = projection::Actor::new(vec![], dummy_quote); + tasks.add(projection_context.run(proj_actor)); tasks.add(connect( taker.maker_online_status_feed_receiver.clone(), @@ -325,10 +294,9 @@ impl Taker { Self { id: TakerId::new(identity_pk), system: taker, + feeds, mocks, _tasks: tasks, - order_feed_receiver, - cfd_feed_receiver, } }