From 5fde06500b7c55644de50c15b9e23ec670009c77 Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Thu, 25 Nov 2021 15:22:06 +1030 Subject: [PATCH 1/2] Move Quote inside projection actor --- daemon/src/projection.rs | 133 +++++++++++++++++++++++++++++++++++-- daemon/src/routes_taker.rs | 5 +- daemon/src/to_sse_event.rs | 96 ++------------------------ 3 files changed, 134 insertions(+), 100 deletions(-) diff --git a/daemon/src/projection.rs b/daemon/src/projection.rs index ede1e52..a589f15 100644 --- a/daemon/src/projection.rs +++ b/daemon/src/projection.rs @@ -1,8 +1,10 @@ use std::collections::HashMap; -use crate::bitmex_price_feed::Quote; -use crate::model::TakerId; +use crate::model::{TakerId, Timestamp}; +use crate::{bitmex_price_feed, model}; use crate::{Cfd, Order, UpdateCfdProposals}; +use rust_decimal::Decimal; +use serde::Serialize; use tokio::sync::watch; use xtra_productivity::xtra_productivity; @@ -11,19 +13,19 @@ pub struct Actor { } pub struct Feeds { - pub cfds: watch::Receiver>, pub order: watch::Receiver>, + pub cfds: watch::Receiver>, pub quote: watch::Receiver, pub settlements: watch::Receiver, pub connected_takers: watch::Receiver>, } impl Actor { - pub fn new(init_cfds: Vec, init_quote: Quote) -> (Self, Feeds) { + pub fn new(init_cfds: Vec, init_quote: bitmex_price_feed::Quote) -> (Self, Feeds) { let (tx_cfds, rx_cfds) = watch::channel(init_cfds); let (tx_order, rx_order) = watch::channel(None); let (tx_update_cfd_feed, rx_update_cfd_feed) = watch::channel(HashMap::new()); - let (tx_quote, rx_quote) = watch::channel(init_quote); + let (tx_quote, rx_quote) = watch::channel(init_quote.into()); let (tx_connected_takers, rx_connected_takers) = watch::channel(Vec::new()); ( @@ -68,8 +70,8 @@ impl Actor { fn handle(&mut self, msg: Update>) { let _ = self.tx.order.send(msg.0); } - fn handle(&mut self, msg: Update) { - let _ = self.tx.quote.send(msg.0); + fn handle(&mut self, msg: Update) { + let _ = self.tx.quote.send(msg.0.into()); } fn handle(&mut self, msg: Update) { let _ = self.tx.settlements.send(msg.0); @@ -80,3 +82,120 @@ impl Actor { } impl xtra::Actor for Actor {} + +/// Types + +#[derive(Debug, Clone)] +pub struct Usd { + inner: model::Usd, +} + +impl Usd { + fn new(usd: model::Usd) -> Self { + Self { + inner: model::Usd::new(usd.into_decimal().round_dp(2)), + } + } +} + +impl From for Usd { + fn from(usd: model::Usd) -> Self { + Self::new(usd) + } +} + +impl Serialize for Usd { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + ::serialize(&self.inner.into_decimal(), serializer) + } +} + +impl Serialize for Price { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + ::serialize(&self.inner.into_decimal(), serializer) + } +} + +#[derive(Debug, Clone)] +pub struct Price { + inner: model::Price, +} + +impl Price { + fn new(price: model::Price) -> Self { + Self { + inner: model::Price::new(price.into_decimal().round_dp(2)).expect( + "rounding a valid price to 2 decimal places should still result in a valid price", + ), + } + } +} + +impl From for Price { + fn from(price: model::Price) -> Self { + Self::new(price) + } +} + +// TODO: Remove this after CfdsWithAuxData is removed +impl From for model::Price { + fn from(price: Price) -> Self { + price.inner + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct Quote { + bid: Price, + ask: Price, + last_updated_at: Timestamp, +} + +impl From for Quote { + fn from(quote: bitmex_price_feed::Quote) -> Self { + Quote { + bid: quote.bid.into(), + ask: quote.ask.into(), + last_updated_at: quote.timestamp, + } + } +} + +// TODO: Remove this after CfdsWithAuxData is removed +impl From for bitmex_price_feed::Quote { + fn from(quote: Quote) -> Self { + Self { + timestamp: quote.last_updated_at, + bid: quote.bid.into(), + ask: quote.ask.into(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use rust_decimal_macros::dec; + use serde_test::{assert_ser_tokens, Token}; + + #[test] + fn usd_serializes_with_only_cents() { + let usd = Usd::new(model::Usd::new(dec!(1000.12345))); + + assert_ser_tokens(&usd, &[Token::Str("1000.12")]); + } + + #[test] + fn price_serializes_with_only_cents() { + let price = Price::new(model::Price::new(dec!(1000.12345)).unwrap()); + + assert_ser_tokens(&price, &[Token::Str("1000.12")]); + } +} diff --git a/daemon/src/routes_taker.rs b/daemon/src/routes_taker.rs index 0fc32ea..994322b 100644 --- a/daemon/src/routes_taker.rs +++ b/daemon/src/routes_taker.rs @@ -5,7 +5,7 @@ use daemon::model::{Leverage, Price, Usd, WalletInfo}; use daemon::projection::Feeds; use daemon::routes::EmbeddedFileExt; use daemon::to_sse_event::{CfdAction, CfdsWithAuxData, ToSseEvent}; -use daemon::{taker_cfd, wallet}; +use daemon::{bitmex_price_feed, taker_cfd, wallet}; use http_api_problem::{HttpApiProblem, StatusCode}; use rocket::http::{ContentType, Status}; use rocket::response::stream::EventStream; @@ -152,7 +152,8 @@ pub async fn post_cfd_action( } CfdAction::Commit => cfd_action_channel.send(Commit { order_id: id }), CfdAction::Settle => { - let current_price = feeds.quote.borrow().for_taker(); + let quote: bitmex_price_feed::Quote = feeds.quote.borrow().clone().into(); + let current_price = quote.for_taker(); cfd_action_channel.send(ProposeSettlement { order_id: id, current_price, diff --git a/daemon/src/to_sse_event.rs b/daemon/src/to_sse_event.rs index 32be7c6..b7570fb 100644 --- a/daemon/src/to_sse_event.rs +++ b/daemon/src/to_sse_event.rs @@ -3,6 +3,7 @@ use crate::model::cfd::{ Dlc, OrderId, Payout, Role, SettlementKind, UpdateCfdProposal, UpdateCfdProposals, }; use crate::model::{Leverage, Position, TakerId, Timestamp, TradingPair}; +use crate::projection::{Price, Quote, Usd}; use crate::{bitmex_price_feed, model}; use bdk::bitcoin::{Amount, Network, SignedAmount, Txid}; use rocket::request::FromParam; @@ -13,64 +14,6 @@ use std::convert::TryInto; use time::OffsetDateTime; use tokio::sync::watch; -#[derive(Debug, Clone)] -pub struct Usd { - inner: model::Usd, -} - -impl Usd { - fn new(usd: model::Usd) -> Self { - Self { - inner: model::Usd::new(usd.into_decimal().round_dp(2)), - } - } -} - -impl From for Usd { - fn from(usd: model::Usd) -> Self { - Self::new(usd) - } -} - -impl Serialize for Usd { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - ::serialize(&self.inner.into_decimal(), serializer) - } -} - -#[derive(Debug, Clone)] -pub struct Price { - inner: model::Price, -} - -impl Price { - fn new(price: model::Price) -> Self { - Self { - inner: model::Price::new(price.into_decimal().round_dp(2)).expect( - "rounding a valid price to 2 decimal places should still result in a valid price", - ), - } - } -} - -impl From for Price { - fn from(price: model::Price) -> Self { - Self::new(price) - } -} - -impl Serialize for Price { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - ::serialize(&self.inner.into_decimal(), serializer) - } -} - #[derive(Debug, Clone, Serialize)] pub struct Cfd { pub order_id: OrderId, @@ -250,12 +193,12 @@ pub struct CfdsWithAuxData { impl CfdsWithAuxData { pub fn new( rx_cfds: &watch::Receiver>, - rx_quote: &watch::Receiver, + rx_quote: &watch::Receiver, rx_updates: &watch::Receiver, role: Role, network: Network, ) -> Self { - let quote = rx_quote.borrow().clone(); + let quote: bitmex_price_feed::Quote = rx_quote.borrow().clone().into(); let current_price = match role { Role::Maker => quote.for_maker(), Role::Taker => quote.for_taker(), @@ -487,21 +430,9 @@ fn to_tx_url_list(state: model::cfd::CfdState, network: Network) -> Vec { } } -#[derive(Debug, Clone, Serialize)] -pub struct Quote { - bid: Price, - ask: Price, - last_updated_at: Timestamp, -} - -impl ToSseEvent for bitmex_price_feed::Quote { +impl ToSseEvent for Quote { fn to_sse_event(&self) -> Event { - let quote = Quote { - bid: self.bid.into(), - ask: self.ask.into(), - last_updated_at: self.timestamp, - }; - Event::json("e).event("quote") + Event::json(self).event("quote") } } @@ -537,9 +468,6 @@ fn available_actions(state: CfdState, role: Role) -> Vec { mod tests { use super::*; - use rust_decimal_macros::dec; - use serde_test::{assert_ser_tokens, Token}; - #[test] fn state_snapshot_test() { // Make sure to update the UI after changing this test! @@ -567,18 +495,4 @@ mod tests { let json = serde_json::to_string(&CfdState::SetupFailed).unwrap(); assert_eq!(json, "\"SetupFailed\""); } - - #[test] - fn usd_serializes_with_only_cents() { - let usd = Usd::new(model::Usd::new(dec!(1000.12345))); - - assert_ser_tokens(&usd, &[Token::Str("1000.12")]); - } - - #[test] - fn price_serializes_with_only_cents() { - let price = Price::new(model::Price::new(dec!(1000.12345)).unwrap()); - - assert_ser_tokens(&price, &[Token::Str("1000.12")]); - } } From 73fb28ddba53c5d0c1d2688c9c3c01563481d2ca Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Thu, 25 Nov 2021 19:07:51 +1030 Subject: [PATCH 2/2] Move CfdOrder inside projection actor --- daemon/src/projection.rs | 65 +++++++++++++++++++++++++++++------- daemon/src/to_sse_event.rs | 43 ++---------------------- daemon/tests/happy_path.rs | 12 +++---- daemon/tests/harness/flow.rs | 9 ++--- daemon/tests/harness/mod.rs | 30 ++++++----------- 5 files changed, 78 insertions(+), 81 deletions(-) diff --git a/daemon/src/projection.rs b/daemon/src/projection.rs index a589f15..3f4e8f0 100644 --- a/daemon/src/projection.rs +++ b/daemon/src/projection.rs @@ -1,8 +1,8 @@ use std::collections::HashMap; -use crate::model::{TakerId, Timestamp}; -use crate::{bitmex_price_feed, model}; -use crate::{Cfd, Order, UpdateCfdProposals}; +use crate::model::cfd::OrderId; +use crate::model::{Leverage, Position, TakerId, Timestamp, TradingPair}; +use crate::{bitmex_price_feed, model, Cfd, Order, UpdateCfdProposals}; use rust_decimal::Decimal; use serde::Serialize; use tokio::sync::watch; @@ -13,9 +13,10 @@ pub struct Actor { } pub struct Feeds { - pub order: watch::Receiver>, - pub cfds: watch::Receiver>, pub quote: watch::Receiver, + pub order: watch::Receiver>, + // TODO: Convert items below here into projections + pub cfds: watch::Receiver>, pub settlements: watch::Receiver, pub connected_takers: watch::Receiver>, } @@ -52,7 +53,7 @@ impl Actor { /// Internal struct to keep all the senders around in one place struct Tx { pub cfds: watch::Sender>, - pub order: watch::Sender>, + pub order: watch::Sender>, pub quote: watch::Sender, pub settlements: watch::Sender, // TODO: Use this channel to communicate maker status as well with generic @@ -64,15 +65,15 @@ pub struct Update(pub T); #[xtra_productivity] impl Actor { - fn handle(&mut self, msg: Update>) { - let _ = self.tx.cfds.send(msg.0); - } fn handle(&mut self, msg: Update>) { - let _ = self.tx.order.send(msg.0); + let _ = self.tx.order.send(msg.0.map(|x| x.into())); } fn handle(&mut self, msg: Update) { let _ = self.tx.quote.send(msg.0.into()); } + fn handle(&mut self, msg: Update>) { + let _ = self.tx.cfds.send(msg.0); + } fn handle(&mut self, msg: Update) { let _ = self.tx.settlements.send(msg.0); } @@ -85,7 +86,7 @@ impl xtra::Actor for Actor {} /// Types -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct Usd { inner: model::Usd, } @@ -122,7 +123,7 @@ impl Serialize for Price { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct Price { inner: model::Price, } @@ -199,3 +200,43 @@ mod tests { assert_ser_tokens(&price, &[Token::Str("1000.12")]); } } + +#[derive(Debug, Clone, PartialEq, Serialize)] +pub struct CfdOrder { + pub id: OrderId, + + pub trading_pair: TradingPair, + pub position: Position, + + pub price: Price, + + pub min_quantity: Usd, + pub max_quantity: Usd, + + pub leverage: Leverage, + pub liquidation_price: Price, + + pub creation_timestamp: Timestamp, + pub settlement_time_interval_in_secs: u64, +} + +impl From for CfdOrder { + fn from(order: Order) -> Self { + Self { + id: order.id, + trading_pair: order.trading_pair, + position: order.position, + price: order.price.into(), + min_quantity: order.min_quantity.into(), + max_quantity: order.max_quantity.into(), + leverage: order.leverage, + liquidation_price: order.liquidation_price.into(), + creation_timestamp: order.creation_timestamp, + settlement_time_interval_in_secs: order + .settlement_interval + .whole_seconds() + .try_into() + .expect("settlement_time_interval_hours is always positive number"), + } + } +} diff --git a/daemon/src/to_sse_event.rs b/daemon/src/to_sse_event.rs index b7570fb..d9b260f 100644 --- a/daemon/src/to_sse_event.rs +++ b/daemon/src/to_sse_event.rs @@ -3,14 +3,13 @@ use crate::model::cfd::{ Dlc, OrderId, Payout, Role, SettlementKind, UpdateCfdProposal, UpdateCfdProposals, }; use crate::model::{Leverage, Position, TakerId, Timestamp, TradingPair}; -use crate::projection::{Price, Quote, Usd}; +use crate::projection::{CfdOrder, Price, Quote, Usd}; use crate::{bitmex_price_feed, model}; use bdk::bitcoin::{Amount, Network, SignedAmount, Txid}; use rocket::request::FromParam; use rocket::response::stream::Event; use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; -use std::convert::TryInto; use time::OffsetDateTime; use tokio::sync::watch; @@ -157,25 +156,6 @@ pub enum CfdState { SetupFailed, } -#[derive(Debug, Clone, Serialize)] -pub struct CfdOrder { - pub id: OrderId, - - pub trading_pair: TradingPair, - pub position: Position, - - pub price: Price, - - pub min_quantity: Usd, - pub max_quantity: Usd, - - pub leverage: Leverage, - pub liquidation_price: Price, - - pub creation_timestamp: Timestamp, - pub settlement_time_interval_in_secs: u64, -} - pub trait ToSseEvent { fn to_sse_event(&self) -> Event; } @@ -280,26 +260,9 @@ impl ToSseEvent for Vec { } } -impl ToSseEvent for Option { +impl ToSseEvent for Option { fn to_sse_event(&self) -> Event { - let order = self.clone().map(|order| CfdOrder { - id: order.id, - trading_pair: order.trading_pair, - position: order.position, - price: order.price.into(), - min_quantity: order.min_quantity.into(), - max_quantity: order.max_quantity.into(), - leverage: order.leverage, - liquidation_price: order.liquidation_price.into(), - creation_timestamp: order.creation_timestamp, - settlement_time_interval_in_secs: order - .settlement_interval - .whole_seconds() - .try_into() - .expect("settlement_time_interval_hours is always positive number"), - }); - - Event::json(&order).event("order") + Event::json(&self).event("order") } } diff --git a/daemon/tests/happy_path.rs b/daemon/tests/happy_path.rs index 55bdc5b..b9730a5 100644 --- a/daemon/tests/happy_path.rs +++ b/daemon/tests/happy_path.rs @@ -2,7 +2,7 @@ use std::time::Duration; use crate::harness::flow::{is_next_none, next, next_cfd, next_order, next_some}; use crate::harness::{ - assert_is_same_order, dummy_new_order, init_tracing, start_both, Maker, MakerConfig, Taker, + dummy_new_order, init_tracing, order_from_cfd, start_both, Maker, MakerConfig, Taker, TakerConfig, }; use daemon::connection::ConnectionStatus; @@ -25,7 +25,7 @@ async fn taker_receives_order_from_maker_on_publication() { let (published, received) = tokio::join!(next_some(maker.order_feed()), next_some(taker.order_feed())); - assert_is_same_order(&published.unwrap(), &received.unwrap()); + assert_eq!(published.unwrap(), received.unwrap()); } #[tokio::test] @@ -46,8 +46,8 @@ async fn taker_takes_order_and_maker_rejects() { taker.take_order(received.clone(), Usd::new(dec!(10))).await; let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); - assert_is_same_order(&taker_cfd.order, &received); - assert_is_same_order(&maker_cfd.order, &received); + assert_eq!(order_from_cfd(&taker_cfd), received); + assert_eq!(order_from_cfd(&maker_cfd), received); assert!(matches!( taker_cfd.state, CfdState::OutgoingOrderRequest { .. } @@ -61,8 +61,8 @@ async fn taker_takes_order_and_maker_rejects() { let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); // TODO: More elaborate Cfd assertions - assert_is_same_order(&taker_cfd.order, &received); - assert_is_same_order(&maker_cfd.order, &received); + assert_eq!(order_from_cfd(&taker_cfd), received); + assert_eq!(order_from_cfd(&maker_cfd), received); assert!(matches!(taker_cfd.state, CfdState::Rejected { .. })); assert!(matches!(maker_cfd.state, CfdState::Rejected { .. })); } diff --git a/daemon/tests/harness/flow.rs b/daemon/tests/harness/flow.rs index 4fd965f..4a0cabe 100644 --- a/daemon/tests/harness/flow.rs +++ b/daemon/tests/harness/flow.rs @@ -1,5 +1,6 @@ use anyhow::{Context, Result}; -use daemon::model::cfd::{Cfd, Order}; +use daemon::model::cfd::Cfd; +use daemon::projection::CfdOrder; use daemon::tokio_ext::FutureExt; use std::time::Duration; use tokio::sync::watch; @@ -24,9 +25,9 @@ pub async fn next_cfd( } pub async fn next_order( - rx_a: &mut watch::Receiver>, - rx_b: &mut watch::Receiver>, -) -> Result<(Order, Order)> { + rx_a: &mut watch::Receiver>, + rx_b: &mut watch::Receiver>, +) -> Result<(CfdOrder, CfdOrder)> { let (a, b) = tokio::join!(next_some(rx_a), next_some(rx_b)); Ok((a?, b?)) diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index 8c564cc..0040b2e 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -4,9 +4,9 @@ use crate::harness::mocks::wallet::WalletActor; use crate::schnorrsig; use daemon::bitmex_price_feed::Quote; use daemon::connection::{connect, ConnectionStatus}; -use daemon::model::cfd::{Cfd, Order, Origin}; +use daemon::model::cfd::Cfd; use daemon::model::{Price, TakerId, Timestamp, Usd}; -use daemon::projection::Feeds; +use daemon::projection::{CfdOrder, Feeds}; use daemon::seed::Seed; use daemon::{ db, maker_cfd, maker_inc_connections, projection, taker_cfd, MakerActorSystem, Tasks, @@ -124,7 +124,7 @@ impl Maker { &mut self.feeds.cfds } - pub fn order_feed(&mut self) -> &mut watch::Receiver> { + pub fn order_feed(&mut self) -> &mut watch::Receiver> { &mut self.feeds.order } @@ -218,7 +218,7 @@ impl Maker { .unwrap(); } - pub async fn reject_take_request(&self, order: Order) { + pub async fn reject_take_request(&self, order: CfdOrder) { self.system .cfd_actor_addr .send(maker_cfd::RejectOrder { order_id: order.id }) @@ -227,7 +227,7 @@ impl Maker { .unwrap(); } - pub async fn accept_take_request(&self, order: Order) { + pub async fn accept_take_request(&self, order: CfdOrder) { self.system .cfd_actor_addr .send(maker_cfd::AcceptOrder { order_id: order.id }) @@ -251,7 +251,7 @@ impl Taker { &mut self.feeds.cfds } - pub fn order_feed(&mut self) -> &mut watch::Receiver> { + pub fn order_feed(&mut self) -> &mut watch::Receiver> { &mut self.feeds.order } @@ -319,7 +319,7 @@ impl Taker { } } - pub async fn take_order(&self, order: Order, quantity: Usd) { + pub async fn take_order(&self, order: CfdOrder, quantity: Usd) { self.system .cfd_actor_addr .send(taker_cfd::TakeOffer { @@ -343,18 +343,6 @@ async fn in_memory_db() -> SqlitePool { pool } -/// The order cannot be directly compared in tests as the origin is different, -/// therefore wrap the assertion macro in a code that unifies the 'Origin' -pub fn assert_is_same_order(a: &Order, b: &Order) { - // Assume the same origin - let mut a = a.clone(); - let mut b = b.clone(); - a.origin = Origin::Ours; - b.origin = Origin::Ours; - - assert_eq!(a, b); -} - pub fn dummy_new_order() -> maker_cfd::NewOrder { maker_cfd::NewOrder { price: Price::new(dec!(50_000)).expect("unexpected failure"), @@ -387,3 +375,7 @@ pub fn init_tracing() -> DefaultGuard { guard } + +pub fn order_from_cfd(cfd: &Cfd) -> CfdOrder { + cfd.order.clone().into() +}