From 73fb28ddba53c5d0c1d2688c9c3c01563481d2ca Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Thu, 25 Nov 2021 19:07:51 +1030 Subject: [PATCH] Move CfdOrder inside projection actor --- daemon/src/projection.rs | 65 +++++++++++++++++++++++++++++------- daemon/src/to_sse_event.rs | 43 ++---------------------- daemon/tests/happy_path.rs | 12 +++---- daemon/tests/harness/flow.rs | 9 ++--- daemon/tests/harness/mod.rs | 30 ++++++----------- 5 files changed, 78 insertions(+), 81 deletions(-) diff --git a/daemon/src/projection.rs b/daemon/src/projection.rs index a589f15..3f4e8f0 100644 --- a/daemon/src/projection.rs +++ b/daemon/src/projection.rs @@ -1,8 +1,8 @@ use std::collections::HashMap; -use crate::model::{TakerId, Timestamp}; -use crate::{bitmex_price_feed, model}; -use crate::{Cfd, Order, UpdateCfdProposals}; +use crate::model::cfd::OrderId; +use crate::model::{Leverage, Position, TakerId, Timestamp, TradingPair}; +use crate::{bitmex_price_feed, model, Cfd, Order, UpdateCfdProposals}; use rust_decimal::Decimal; use serde::Serialize; use tokio::sync::watch; @@ -13,9 +13,10 @@ pub struct Actor { } pub struct Feeds { - pub order: watch::Receiver>, - pub cfds: watch::Receiver>, pub quote: watch::Receiver, + pub order: watch::Receiver>, + // TODO: Convert items below here into projections + pub cfds: watch::Receiver>, pub settlements: watch::Receiver, pub connected_takers: watch::Receiver>, } @@ -52,7 +53,7 @@ impl Actor { /// Internal struct to keep all the senders around in one place struct Tx { pub cfds: watch::Sender>, - pub order: watch::Sender>, + pub order: watch::Sender>, pub quote: watch::Sender, pub settlements: watch::Sender, // TODO: Use this channel to communicate maker status as well with generic @@ -64,15 +65,15 @@ pub struct Update(pub T); #[xtra_productivity] impl Actor { - fn handle(&mut self, msg: Update>) { - let _ = self.tx.cfds.send(msg.0); - } fn handle(&mut self, msg: Update>) { - let _ = self.tx.order.send(msg.0); + let _ = self.tx.order.send(msg.0.map(|x| x.into())); } fn handle(&mut self, msg: Update) { let _ = self.tx.quote.send(msg.0.into()); } + fn handle(&mut self, msg: Update>) { + let _ = self.tx.cfds.send(msg.0); + } fn handle(&mut self, msg: Update) { let _ = self.tx.settlements.send(msg.0); } @@ -85,7 +86,7 @@ impl xtra::Actor for Actor {} /// Types -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct Usd { inner: model::Usd, } @@ -122,7 +123,7 @@ impl Serialize for Price { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct Price { inner: model::Price, } @@ -199,3 +200,43 @@ mod tests { assert_ser_tokens(&price, &[Token::Str("1000.12")]); } } + +#[derive(Debug, Clone, PartialEq, Serialize)] +pub struct CfdOrder { + pub id: OrderId, + + pub trading_pair: TradingPair, + pub position: Position, + + pub price: Price, + + pub min_quantity: Usd, + pub max_quantity: Usd, + + pub leverage: Leverage, + pub liquidation_price: Price, + + pub creation_timestamp: Timestamp, + pub settlement_time_interval_in_secs: u64, +} + +impl From for CfdOrder { + fn from(order: Order) -> Self { + Self { + id: order.id, + trading_pair: order.trading_pair, + position: order.position, + price: order.price.into(), + min_quantity: order.min_quantity.into(), + max_quantity: order.max_quantity.into(), + leverage: order.leverage, + liquidation_price: order.liquidation_price.into(), + creation_timestamp: order.creation_timestamp, + settlement_time_interval_in_secs: order + .settlement_interval + .whole_seconds() + .try_into() + .expect("settlement_time_interval_hours is always positive number"), + } + } +} diff --git a/daemon/src/to_sse_event.rs b/daemon/src/to_sse_event.rs index b7570fb..d9b260f 100644 --- a/daemon/src/to_sse_event.rs +++ b/daemon/src/to_sse_event.rs @@ -3,14 +3,13 @@ use crate::model::cfd::{ Dlc, OrderId, Payout, Role, SettlementKind, UpdateCfdProposal, UpdateCfdProposals, }; use crate::model::{Leverage, Position, TakerId, Timestamp, TradingPair}; -use crate::projection::{Price, Quote, Usd}; +use crate::projection::{CfdOrder, Price, Quote, Usd}; use crate::{bitmex_price_feed, model}; use bdk::bitcoin::{Amount, Network, SignedAmount, Txid}; use rocket::request::FromParam; use rocket::response::stream::Event; use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; -use std::convert::TryInto; use time::OffsetDateTime; use tokio::sync::watch; @@ -157,25 +156,6 @@ pub enum CfdState { SetupFailed, } -#[derive(Debug, Clone, Serialize)] -pub struct CfdOrder { - pub id: OrderId, - - pub trading_pair: TradingPair, - pub position: Position, - - pub price: Price, - - pub min_quantity: Usd, - pub max_quantity: Usd, - - pub leverage: Leverage, - pub liquidation_price: Price, - - pub creation_timestamp: Timestamp, - pub settlement_time_interval_in_secs: u64, -} - pub trait ToSseEvent { fn to_sse_event(&self) -> Event; } @@ -280,26 +260,9 @@ impl ToSseEvent for Vec { } } -impl ToSseEvent for Option { +impl ToSseEvent for Option { fn to_sse_event(&self) -> Event { - let order = self.clone().map(|order| CfdOrder { - id: order.id, - trading_pair: order.trading_pair, - position: order.position, - price: order.price.into(), - min_quantity: order.min_quantity.into(), - max_quantity: order.max_quantity.into(), - leverage: order.leverage, - liquidation_price: order.liquidation_price.into(), - creation_timestamp: order.creation_timestamp, - settlement_time_interval_in_secs: order - .settlement_interval - .whole_seconds() - .try_into() - .expect("settlement_time_interval_hours is always positive number"), - }); - - Event::json(&order).event("order") + Event::json(&self).event("order") } } diff --git a/daemon/tests/happy_path.rs b/daemon/tests/happy_path.rs index 55bdc5b..b9730a5 100644 --- a/daemon/tests/happy_path.rs +++ b/daemon/tests/happy_path.rs @@ -2,7 +2,7 @@ use std::time::Duration; use crate::harness::flow::{is_next_none, next, next_cfd, next_order, next_some}; use crate::harness::{ - assert_is_same_order, dummy_new_order, init_tracing, start_both, Maker, MakerConfig, Taker, + dummy_new_order, init_tracing, order_from_cfd, start_both, Maker, MakerConfig, Taker, TakerConfig, }; use daemon::connection::ConnectionStatus; @@ -25,7 +25,7 @@ async fn taker_receives_order_from_maker_on_publication() { let (published, received) = tokio::join!(next_some(maker.order_feed()), next_some(taker.order_feed())); - assert_is_same_order(&published.unwrap(), &received.unwrap()); + assert_eq!(published.unwrap(), received.unwrap()); } #[tokio::test] @@ -46,8 +46,8 @@ async fn taker_takes_order_and_maker_rejects() { taker.take_order(received.clone(), Usd::new(dec!(10))).await; let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); - assert_is_same_order(&taker_cfd.order, &received); - assert_is_same_order(&maker_cfd.order, &received); + assert_eq!(order_from_cfd(&taker_cfd), received); + assert_eq!(order_from_cfd(&maker_cfd), received); assert!(matches!( taker_cfd.state, CfdState::OutgoingOrderRequest { .. } @@ -61,8 +61,8 @@ async fn taker_takes_order_and_maker_rejects() { let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); // TODO: More elaborate Cfd assertions - assert_is_same_order(&taker_cfd.order, &received); - assert_is_same_order(&maker_cfd.order, &received); + assert_eq!(order_from_cfd(&taker_cfd), received); + assert_eq!(order_from_cfd(&maker_cfd), received); assert!(matches!(taker_cfd.state, CfdState::Rejected { .. })); assert!(matches!(maker_cfd.state, CfdState::Rejected { .. })); } diff --git a/daemon/tests/harness/flow.rs b/daemon/tests/harness/flow.rs index 4fd965f..4a0cabe 100644 --- a/daemon/tests/harness/flow.rs +++ b/daemon/tests/harness/flow.rs @@ -1,5 +1,6 @@ use anyhow::{Context, Result}; -use daemon::model::cfd::{Cfd, Order}; +use daemon::model::cfd::Cfd; +use daemon::projection::CfdOrder; use daemon::tokio_ext::FutureExt; use std::time::Duration; use tokio::sync::watch; @@ -24,9 +25,9 @@ pub async fn next_cfd( } pub async fn next_order( - rx_a: &mut watch::Receiver>, - rx_b: &mut watch::Receiver>, -) -> Result<(Order, Order)> { + rx_a: &mut watch::Receiver>, + rx_b: &mut watch::Receiver>, +) -> Result<(CfdOrder, CfdOrder)> { let (a, b) = tokio::join!(next_some(rx_a), next_some(rx_b)); Ok((a?, b?)) diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index 8c564cc..0040b2e 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -4,9 +4,9 @@ use crate::harness::mocks::wallet::WalletActor; use crate::schnorrsig; use daemon::bitmex_price_feed::Quote; use daemon::connection::{connect, ConnectionStatus}; -use daemon::model::cfd::{Cfd, Order, Origin}; +use daemon::model::cfd::Cfd; use daemon::model::{Price, TakerId, Timestamp, Usd}; -use daemon::projection::Feeds; +use daemon::projection::{CfdOrder, Feeds}; use daemon::seed::Seed; use daemon::{ db, maker_cfd, maker_inc_connections, projection, taker_cfd, MakerActorSystem, Tasks, @@ -124,7 +124,7 @@ impl Maker { &mut self.feeds.cfds } - pub fn order_feed(&mut self) -> &mut watch::Receiver> { + pub fn order_feed(&mut self) -> &mut watch::Receiver> { &mut self.feeds.order } @@ -218,7 +218,7 @@ impl Maker { .unwrap(); } - pub async fn reject_take_request(&self, order: Order) { + pub async fn reject_take_request(&self, order: CfdOrder) { self.system .cfd_actor_addr .send(maker_cfd::RejectOrder { order_id: order.id }) @@ -227,7 +227,7 @@ impl Maker { .unwrap(); } - pub async fn accept_take_request(&self, order: Order) { + pub async fn accept_take_request(&self, order: CfdOrder) { self.system .cfd_actor_addr .send(maker_cfd::AcceptOrder { order_id: order.id }) @@ -251,7 +251,7 @@ impl Taker { &mut self.feeds.cfds } - pub fn order_feed(&mut self) -> &mut watch::Receiver> { + pub fn order_feed(&mut self) -> &mut watch::Receiver> { &mut self.feeds.order } @@ -319,7 +319,7 @@ impl Taker { } } - pub async fn take_order(&self, order: Order, quantity: Usd) { + pub async fn take_order(&self, order: CfdOrder, quantity: Usd) { self.system .cfd_actor_addr .send(taker_cfd::TakeOffer { @@ -343,18 +343,6 @@ async fn in_memory_db() -> SqlitePool { pool } -/// The order cannot be directly compared in tests as the origin is different, -/// therefore wrap the assertion macro in a code that unifies the 'Origin' -pub fn assert_is_same_order(a: &Order, b: &Order) { - // Assume the same origin - let mut a = a.clone(); - let mut b = b.clone(); - a.origin = Origin::Ours; - b.origin = Origin::Ours; - - assert_eq!(a, b); -} - pub fn dummy_new_order() -> maker_cfd::NewOrder { maker_cfd::NewOrder { price: Price::new(dec!(50_000)).expect("unexpected failure"), @@ -387,3 +375,7 @@ pub fn init_tracing() -> DefaultGuard { guard } + +pub fn order_from_cfd(cfd: &Cfd) -> CfdOrder { + cfd.order.clone().into() +}