From 01a6dba79bf23b996cd0f385cab3de38a6140914 Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Mon, 29 Nov 2021 17:09:40 +1030 Subject: [PATCH] Use projection::Cfd in the feeds Roll-out its usage in the actor tests --- daemon/src/projection.rs | 43 ++++++++++++++++-------------------- daemon/src/routes_maker.rs | 40 +++++---------------------------- daemon/src/routes_taker.rs | 41 +++++----------------------------- daemon/tests/happy_path.rs | 22 +++++++++--------- daemon/tests/harness/flow.rs | 3 +-- daemon/tests/harness/mod.rs | 8 ++----- 6 files changed, 44 insertions(+), 113 deletions(-) diff --git a/daemon/src/projection.rs b/daemon/src/projection.rs index f310ce3..a04c659 100644 --- a/daemon/src/projection.rs +++ b/daemon/src/projection.rs @@ -20,9 +20,7 @@ pub struct Feeds { pub quote: watch::Receiver, pub order: watch::Receiver>, pub connected_takers: watch::Receiver>, - // TODO: Convert items below here into projections - pub cfds: watch::Receiver>, - pub settlements: watch::Receiver, + pub cfds: watch::Receiver>, } impl Actor { @@ -32,10 +30,17 @@ impl Actor { init_cfds: Vec, init_quote: bitmex_price_feed::Quote, ) -> (Self, Feeds) { - let (tx_cfds, rx_cfds) = watch::channel(init_cfds); + let state = State { + role, + network, + cfds: init_cfds, + proposals: HashMap::new(), + quote: init_quote.clone(), + }; + + let (tx_cfds, rx_cfds) = watch::channel(state.to_cfds()); let (tx_order, rx_order) = watch::channel(None); - let (tx_update_cfd_feed, rx_update_cfd_feed) = watch::channel(HashMap::new()); - let (tx_quote, rx_quote) = watch::channel(init_quote.clone().into()); + let (tx_quote, rx_quote) = watch::channel(init_quote.into()); let (tx_connected_takers, rx_connected_takers) = watch::channel(Vec::new()); ( @@ -44,22 +49,14 @@ impl Actor { cfds: tx_cfds, order: tx_order, quote: tx_quote, - settlements: tx_update_cfd_feed, connected_takers: tx_connected_takers, }, - state: State { - role, - network, - cfds: vec![], - proposals: HashMap::new(), - quote: init_quote, - }, + state, }, Feeds { cfds: rx_cfds, order: rx_order, quote: rx_quote, - settlements: rx_update_cfd_feed, connected_takers: rx_connected_takers, }, ) @@ -68,10 +65,9 @@ impl Actor { /// Internal struct to keep all the senders around in one place struct Tx { - pub cfds: watch::Sender>, + pub cfds: watch::Sender>, pub order: watch::Sender>, pub quote: watch::Sender, - pub settlements: watch::Sender, // TODO: Use this channel to communicate maker status as well with generic // ID of connected counterparties pub connected_takers: watch::Sender>, @@ -87,7 +83,7 @@ struct State { } impl State { - pub fn to_cfd(&self) -> Vec { + pub fn to_cfds(&self) -> Vec { // FIXME: starting with the intermediate struct, only temporarily let temp = CfdsWithAuxData::new( self.cfds.clone(), @@ -123,16 +119,15 @@ impl Actor { let quote = msg.0; self.state.update_quote(quote.clone()); let _ = self.tx.quote.send(quote.into()); + let _ = self.tx.cfds.send(self.state.to_cfds()); } fn handle(&mut self, msg: Update>) { - let cfds = msg.0; - self.state.update_cfds(cfds.clone()); - let _ = self.tx.cfds.send(cfds); + self.state.update_cfds(msg.0); + let _ = self.tx.cfds.send(self.state.to_cfds()); } fn handle(&mut self, msg: Update) { - let proposals = msg.0; - self.state.update_proposals(proposals.clone()); - let _ = self.tx.settlements.send(proposals); + self.state.update_proposals(msg.0); + let _ = self.tx.cfds.send(self.state.to_cfds()); } fn handle(&mut self, msg: Update>) { let _ = self diff --git a/daemon/src/routes_maker.rs b/daemon/src/routes_maker.rs index 865a6af..4f0fdaa 100644 --- a/daemon/src/routes_maker.rs +++ b/daemon/src/routes_maker.rs @@ -1,9 +1,9 @@ use anyhow::Result; use bdk::bitcoin::Network; use daemon::auth::Authenticated; -use daemon::model::cfd::{OrderId, Role}; +use daemon::model::cfd::OrderId; use daemon::model::{Price, Usd, WalletInfo}; -use daemon::projection::{CfdAction, CfdsWithAuxData, Feeds}; +use daemon::projection::{CfdAction, Feeds}; use daemon::routes::EmbeddedFileExt; use daemon::to_sse_event::ToSseEvent; use daemon::{maker_cfd, maker_inc_connections, monitor, oracle, wallet}; @@ -30,7 +30,6 @@ pub type Maker = xtra::Address< pub async fn maker_feed( rx: &State, rx_wallet: &State>, - network: &State, _auth: Authenticated, ) -> EventStream![] { let rx = rx.inner(); @@ -38,9 +37,7 @@ pub async fn maker_feed( let mut rx_order = rx.order.clone(); let mut rx_wallet = rx_wallet.inner().clone(); let mut rx_quote = rx.quote.clone(); - let mut rx_settlements = rx.settlements.clone(); let mut rx_connected_takers = rx.connected_takers.clone(); - let network = *network.inner(); EventStream! { let wallet_info = rx_wallet.borrow().clone(); @@ -52,12 +49,8 @@ pub async fn maker_feed( let quote = rx_quote.borrow().clone(); yield quote.to_sse_event(); - yield CfdsWithAuxData::new( - &rx_cfds, - &rx_quote, - &rx_settlements, - Role::Maker, network - ).to_sse_event(); + let cfds = rx_cfds.borrow().clone(); + yield cfds.to_sse_event(); let takers = rx_connected_takers.borrow().clone(); yield takers.to_sse_event(); @@ -77,33 +70,12 @@ pub async fn maker_feed( yield takers.to_sse_event(); } Ok(()) = rx_cfds.changed() => { - yield CfdsWithAuxData::new( - &rx_cfds, - &rx_quote, - &rx_settlements, - Role::Maker, - network - ).to_sse_event(); - } - Ok(()) = rx_settlements.changed() => { - yield CfdsWithAuxData::new( - &rx_cfds, - &rx_quote, - &rx_settlements, - Role::Maker, - network - ).to_sse_event(); + let cfds = rx_cfds.borrow().clone(); + yield cfds.to_sse_event(); } Ok(()) = rx_quote.changed() => { let quote = rx_quote.borrow().clone(); yield quote.to_sse_event(); - yield CfdsWithAuxData::new( - &rx_cfds, - &rx_quote, - &rx_settlements, - Role::Maker, - network - ).to_sse_event(); } } } diff --git a/daemon/src/routes_taker.rs b/daemon/src/routes_taker.rs index 59812b5..a03b39a 100644 --- a/daemon/src/routes_taker.rs +++ b/daemon/src/routes_taker.rs @@ -1,8 +1,8 @@ use bdk::bitcoin::{Amount, Network}; use daemon::connection::ConnectionStatus; -use daemon::model::cfd::{calculate_long_margin, OrderId, Role}; +use daemon::model::cfd::{calculate_long_margin, OrderId}; use daemon::model::{Leverage, Price, Usd, WalletInfo}; -use daemon::projection::{CfdAction, CfdsWithAuxData, Feeds}; +use daemon::projection::{CfdAction, Feeds}; use daemon::routes::EmbeddedFileExt; use daemon::to_sse_event::ToSseEvent; use daemon::{bitmex_price_feed, monitor, oracle, taker_cfd, tx, wallet}; @@ -27,16 +27,13 @@ pub async fn feed( rx: &State, rx_wallet: &State>, rx_maker_status: &State>, - network: &State, ) -> EventStream![] { let rx = rx.inner(); let mut rx_cfds = rx.cfds.clone(); let mut rx_order = rx.order.clone(); let mut rx_quote = rx.quote.clone(); - let mut rx_settlements = rx.settlements.clone(); let mut rx_wallet = rx_wallet.inner().clone(); let mut rx_maker_status = rx_maker_status.inner().clone(); - let network = *network.inner(); EventStream! { let wallet_info = rx_wallet.borrow().clone(); @@ -51,13 +48,8 @@ pub async fn feed( let quote = rx_quote.borrow().clone(); yield quote.to_sse_event(); - yield CfdsWithAuxData::new( - &rx_cfds, - &rx_quote, - &rx_settlements, - Role::Taker, - network - ).to_sse_event(); + let cfds = rx_cfds.borrow().clone(); + yield cfds.to_sse_event(); loop{ select! { @@ -74,33 +66,12 @@ pub async fn feed( yield order.to_sse_event(); } Ok(()) = rx_cfds.changed() => { - yield CfdsWithAuxData::new( - &rx_cfds, - &rx_quote, - &rx_settlements, - Role::Taker, - network - ).to_sse_event(); - } - Ok(()) = rx_settlements.changed() => { - yield CfdsWithAuxData::new( - &rx_cfds, - &rx_quote, - &rx_settlements, - Role::Taker, - network - ).to_sse_event(); + let cfds = rx_cfds.borrow().clone(); + yield cfds.to_sse_event(); } Ok(()) = rx_quote.changed() => { let quote = rx_quote.borrow().clone(); yield quote.to_sse_event(); - yield CfdsWithAuxData::new( - &rx_cfds, - &rx_quote, - &rx_settlements, - Role::Taker, - network - ).to_sse_event(); } } } diff --git a/daemon/tests/happy_path.rs b/daemon/tests/happy_path.rs index e244806..4cfebdf 100644 --- a/daemon/tests/happy_path.rs +++ b/daemon/tests/happy_path.rs @@ -2,13 +2,11 @@ use std::time::Duration; use crate::harness::flow::{is_next_none, next, next_cfd, next_order, next_some}; use crate::harness::{ - dummy_new_order, init_tracing, order_from_cfd, start_both, Maker, MakerConfig, Taker, - TakerConfig, + dummy_new_order, init_tracing, start_both, Maker, MakerConfig, Taker, TakerConfig, }; use daemon::connection::ConnectionStatus; -use daemon::model::cfd::CfdState; use daemon::model::Usd; -use daemon::projection::Identity; +use daemon::projection::{CfdState, Identity}; use maia::secp256k1_zkp::schnorrsig; use rust_decimal_macros::dec; use tokio::time::sleep; @@ -47,8 +45,8 @@ async fn taker_takes_order_and_maker_rejects() { taker.take_order(received.clone(), Usd::new(dec!(10))).await; let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); - assert_eq!(order_from_cfd(&taker_cfd), received); - assert_eq!(order_from_cfd(&maker_cfd), received); + assert_eq!(taker_cfd.order_id, received.id); + assert_eq!(maker_cfd.order_id, received.id); assert!(matches!( taker_cfd.state, CfdState::OutgoingOrderRequest { .. } @@ -62,8 +60,8 @@ async fn taker_takes_order_and_maker_rejects() { let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); // TODO: More elaborate Cfd assertions - assert_eq!(order_from_cfd(&taker_cfd), received); - assert_eq!(order_from_cfd(&maker_cfd), received); + assert_eq!(taker_cfd.order_id, received.id); + assert_eq!(maker_cfd.order_id, received.id); assert!(matches!(taker_cfd.state, CfdState::Rejected { .. })); assert!(matches!(maker_cfd.state, CfdState::Rejected { .. })); } @@ -105,8 +103,8 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() { let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); // TODO: More elaborate Cfd assertions - assert_eq!(taker_cfd.order.id, received.id); - assert_eq!(maker_cfd.order.id, received.id); + assert_eq!(taker_cfd.order_id, received.id); + assert_eq!(maker_cfd.order_id, received.id); assert!(matches!(taker_cfd.state, CfdState::ContractSetup { .. })); assert!(matches!(maker_cfd.state, CfdState::ContractSetup { .. })); @@ -115,8 +113,8 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() { let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); // TODO: More elaborate Cfd assertions - assert_eq!(taker_cfd.order.id, received.id); - assert_eq!(maker_cfd.order.id, received.id); + assert_eq!(taker_cfd.order_id, received.id); + assert_eq!(maker_cfd.order_id, received.id); assert!(matches!(taker_cfd.state, CfdState::PendingOpen { .. })); assert!(matches!(maker_cfd.state, CfdState::PendingOpen { .. })); } diff --git a/daemon/tests/harness/flow.rs b/daemon/tests/harness/flow.rs index b98a786..d43dfa7 100644 --- a/daemon/tests/harness/flow.rs +++ b/daemon/tests/harness/flow.rs @@ -1,6 +1,5 @@ use anyhow::{Context, Result}; -use daemon::model::cfd::Cfd; -use daemon::projection::CfdOrder; +use daemon::projection::{Cfd, CfdOrder}; use daemon::tokio_ext::FutureExt; use std::time::Duration; use tokio::sync::watch; diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index 70880b5..c990403 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -5,9 +5,9 @@ use crate::schnorrsig; use ::bdk::bitcoin::Network; use daemon::bitmex_price_feed::Quote; use daemon::connection::{connect, ConnectionStatus}; -use daemon::model::cfd::{Cfd, Role}; +use daemon::model::cfd::Role; use daemon::model::{self, Price, Timestamp, Usd}; -use daemon::projection::{CfdOrder, Feeds, Identity}; +use daemon::projection::{Cfd, CfdOrder, Feeds, Identity}; use daemon::seed::Seed; use daemon::{ db, maker_cfd, maker_inc_connections, projection, taker_cfd, MakerActorSystem, Tasks, @@ -378,7 +378,3 @@ pub fn init_tracing() -> DefaultGuard { guard } - -pub fn order_from_cfd(cfd: &Cfd) -> CfdOrder { - cfd.order.clone().into() -}