Browse Source

Move CfdOrder inside projection actor

debug-collab-settlement
Mariusz Klochowicz 3 years ago
parent
commit
73fb28ddba
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 65
      daemon/src/projection.rs
  2. 43
      daemon/src/to_sse_event.rs
  3. 12
      daemon/tests/happy_path.rs
  4. 9
      daemon/tests/harness/flow.rs
  5. 30
      daemon/tests/harness/mod.rs

65
daemon/src/projection.rs

@ -1,8 +1,8 @@
use std::collections::HashMap;
use crate::model::{TakerId, Timestamp};
use crate::{bitmex_price_feed, model};
use crate::{Cfd, Order, UpdateCfdProposals};
use crate::model::cfd::OrderId;
use crate::model::{Leverage, Position, TakerId, Timestamp, TradingPair};
use crate::{bitmex_price_feed, model, Cfd, Order, UpdateCfdProposals};
use rust_decimal::Decimal;
use serde::Serialize;
use tokio::sync::watch;
@ -13,9 +13,10 @@ pub struct Actor {
}
pub struct Feeds {
pub order: watch::Receiver<Option<Order>>,
pub cfds: watch::Receiver<Vec<Cfd>>,
pub quote: watch::Receiver<Quote>,
pub order: watch::Receiver<Option<CfdOrder>>,
// TODO: Convert items below here into projections
pub cfds: watch::Receiver<Vec<Cfd>>,
pub settlements: watch::Receiver<UpdateCfdProposals>,
pub connected_takers: watch::Receiver<Vec<TakerId>>,
}
@ -52,7 +53,7 @@ impl Actor {
/// Internal struct to keep all the senders around in one place
struct Tx {
pub cfds: watch::Sender<Vec<Cfd>>,
pub order: watch::Sender<Option<Order>>,
pub order: watch::Sender<Option<CfdOrder>>,
pub quote: watch::Sender<Quote>,
pub settlements: watch::Sender<UpdateCfdProposals>,
// TODO: Use this channel to communicate maker status as well with generic
@ -64,15 +65,15 @@ pub struct Update<T>(pub T);
#[xtra_productivity]
impl Actor {
fn handle(&mut self, msg: Update<Vec<Cfd>>) {
let _ = self.tx.cfds.send(msg.0);
}
fn handle(&mut self, msg: Update<Option<Order>>) {
let _ = self.tx.order.send(msg.0);
let _ = self.tx.order.send(msg.0.map(|x| x.into()));
}
fn handle(&mut self, msg: Update<bitmex_price_feed::Quote>) {
let _ = self.tx.quote.send(msg.0.into());
}
fn handle(&mut self, msg: Update<Vec<Cfd>>) {
let _ = self.tx.cfds.send(msg.0);
}
fn handle(&mut self, msg: Update<UpdateCfdProposals>) {
let _ = self.tx.settlements.send(msg.0);
}
@ -85,7 +86,7 @@ impl xtra::Actor for Actor {}
/// Types
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct Usd {
inner: model::Usd,
}
@ -122,7 +123,7 @@ impl Serialize for Price {
}
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct Price {
inner: model::Price,
}
@ -199,3 +200,43 @@ mod tests {
assert_ser_tokens(&price, &[Token::Str("1000.12")]);
}
}
#[derive(Debug, Clone, PartialEq, Serialize)]
pub struct CfdOrder {
pub id: OrderId,
pub trading_pair: TradingPair,
pub position: Position,
pub price: Price,
pub min_quantity: Usd,
pub max_quantity: Usd,
pub leverage: Leverage,
pub liquidation_price: Price,
pub creation_timestamp: Timestamp,
pub settlement_time_interval_in_secs: u64,
}
impl From<Order> for CfdOrder {
fn from(order: Order) -> Self {
Self {
id: order.id,
trading_pair: order.trading_pair,
position: order.position,
price: order.price.into(),
min_quantity: order.min_quantity.into(),
max_quantity: order.max_quantity.into(),
leverage: order.leverage,
liquidation_price: order.liquidation_price.into(),
creation_timestamp: order.creation_timestamp,
settlement_time_interval_in_secs: order
.settlement_interval
.whole_seconds()
.try_into()
.expect("settlement_time_interval_hours is always positive number"),
}
}
}

43
daemon/src/to_sse_event.rs

@ -3,14 +3,13 @@ use crate::model::cfd::{
Dlc, OrderId, Payout, Role, SettlementKind, UpdateCfdProposal, UpdateCfdProposals,
};
use crate::model::{Leverage, Position, TakerId, Timestamp, TradingPair};
use crate::projection::{Price, Quote, Usd};
use crate::projection::{CfdOrder, Price, Quote, Usd};
use crate::{bitmex_price_feed, model};
use bdk::bitcoin::{Amount, Network, SignedAmount, Txid};
use rocket::request::FromParam;
use rocket::response::stream::Event;
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use std::convert::TryInto;
use time::OffsetDateTime;
use tokio::sync::watch;
@ -157,25 +156,6 @@ pub enum CfdState {
SetupFailed,
}
#[derive(Debug, Clone, Serialize)]
pub struct CfdOrder {
pub id: OrderId,
pub trading_pair: TradingPair,
pub position: Position,
pub price: Price,
pub min_quantity: Usd,
pub max_quantity: Usd,
pub leverage: Leverage,
pub liquidation_price: Price,
pub creation_timestamp: Timestamp,
pub settlement_time_interval_in_secs: u64,
}
pub trait ToSseEvent {
fn to_sse_event(&self) -> Event;
}
@ -280,26 +260,9 @@ impl ToSseEvent for Vec<TakerId> {
}
}
impl ToSseEvent for Option<model::cfd::Order> {
impl ToSseEvent for Option<CfdOrder> {
fn to_sse_event(&self) -> Event {
let order = self.clone().map(|order| CfdOrder {
id: order.id,
trading_pair: order.trading_pair,
position: order.position,
price: order.price.into(),
min_quantity: order.min_quantity.into(),
max_quantity: order.max_quantity.into(),
leverage: order.leverage,
liquidation_price: order.liquidation_price.into(),
creation_timestamp: order.creation_timestamp,
settlement_time_interval_in_secs: order
.settlement_interval
.whole_seconds()
.try_into()
.expect("settlement_time_interval_hours is always positive number"),
});
Event::json(&order).event("order")
Event::json(&self).event("order")
}
}

12
daemon/tests/happy_path.rs

@ -2,7 +2,7 @@ use std::time::Duration;
use crate::harness::flow::{is_next_none, next, next_cfd, next_order, next_some};
use crate::harness::{
assert_is_same_order, dummy_new_order, init_tracing, start_both, Maker, MakerConfig, Taker,
dummy_new_order, init_tracing, order_from_cfd, start_both, Maker, MakerConfig, Taker,
TakerConfig,
};
use daemon::connection::ConnectionStatus;
@ -25,7 +25,7 @@ async fn taker_receives_order_from_maker_on_publication() {
let (published, received) =
tokio::join!(next_some(maker.order_feed()), next_some(taker.order_feed()));
assert_is_same_order(&published.unwrap(), &received.unwrap());
assert_eq!(published.unwrap(), received.unwrap());
}
#[tokio::test]
@ -46,8 +46,8 @@ async fn taker_takes_order_and_maker_rejects() {
taker.take_order(received.clone(), Usd::new(dec!(10))).await;
let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap();
assert_is_same_order(&taker_cfd.order, &received);
assert_is_same_order(&maker_cfd.order, &received);
assert_eq!(order_from_cfd(&taker_cfd), received);
assert_eq!(order_from_cfd(&maker_cfd), received);
assert!(matches!(
taker_cfd.state,
CfdState::OutgoingOrderRequest { .. }
@ -61,8 +61,8 @@ async fn taker_takes_order_and_maker_rejects() {
let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap();
// TODO: More elaborate Cfd assertions
assert_is_same_order(&taker_cfd.order, &received);
assert_is_same_order(&maker_cfd.order, &received);
assert_eq!(order_from_cfd(&taker_cfd), received);
assert_eq!(order_from_cfd(&maker_cfd), received);
assert!(matches!(taker_cfd.state, CfdState::Rejected { .. }));
assert!(matches!(maker_cfd.state, CfdState::Rejected { .. }));
}

9
daemon/tests/harness/flow.rs

@ -1,5 +1,6 @@
use anyhow::{Context, Result};
use daemon::model::cfd::{Cfd, Order};
use daemon::model::cfd::Cfd;
use daemon::projection::CfdOrder;
use daemon::tokio_ext::FutureExt;
use std::time::Duration;
use tokio::sync::watch;
@ -24,9 +25,9 @@ pub async fn next_cfd(
}
pub async fn next_order(
rx_a: &mut watch::Receiver<Option<Order>>,
rx_b: &mut watch::Receiver<Option<Order>>,
) -> Result<(Order, Order)> {
rx_a: &mut watch::Receiver<Option<CfdOrder>>,
rx_b: &mut watch::Receiver<Option<CfdOrder>>,
) -> Result<(CfdOrder, CfdOrder)> {
let (a, b) = tokio::join!(next_some(rx_a), next_some(rx_b));
Ok((a?, b?))

30
daemon/tests/harness/mod.rs

@ -4,9 +4,9 @@ use crate::harness::mocks::wallet::WalletActor;
use crate::schnorrsig;
use daemon::bitmex_price_feed::Quote;
use daemon::connection::{connect, ConnectionStatus};
use daemon::model::cfd::{Cfd, Order, Origin};
use daemon::model::cfd::Cfd;
use daemon::model::{Price, TakerId, Timestamp, Usd};
use daemon::projection::Feeds;
use daemon::projection::{CfdOrder, Feeds};
use daemon::seed::Seed;
use daemon::{
db, maker_cfd, maker_inc_connections, projection, taker_cfd, MakerActorSystem, Tasks,
@ -124,7 +124,7 @@ impl Maker {
&mut self.feeds.cfds
}
pub fn order_feed(&mut self) -> &mut watch::Receiver<Option<Order>> {
pub fn order_feed(&mut self) -> &mut watch::Receiver<Option<CfdOrder>> {
&mut self.feeds.order
}
@ -218,7 +218,7 @@ impl Maker {
.unwrap();
}
pub async fn reject_take_request(&self, order: Order) {
pub async fn reject_take_request(&self, order: CfdOrder) {
self.system
.cfd_actor_addr
.send(maker_cfd::RejectOrder { order_id: order.id })
@ -227,7 +227,7 @@ impl Maker {
.unwrap();
}
pub async fn accept_take_request(&self, order: Order) {
pub async fn accept_take_request(&self, order: CfdOrder) {
self.system
.cfd_actor_addr
.send(maker_cfd::AcceptOrder { order_id: order.id })
@ -251,7 +251,7 @@ impl Taker {
&mut self.feeds.cfds
}
pub fn order_feed(&mut self) -> &mut watch::Receiver<Option<Order>> {
pub fn order_feed(&mut self) -> &mut watch::Receiver<Option<CfdOrder>> {
&mut self.feeds.order
}
@ -319,7 +319,7 @@ impl Taker {
}
}
pub async fn take_order(&self, order: Order, quantity: Usd) {
pub async fn take_order(&self, order: CfdOrder, quantity: Usd) {
self.system
.cfd_actor_addr
.send(taker_cfd::TakeOffer {
@ -343,18 +343,6 @@ async fn in_memory_db() -> SqlitePool {
pool
}
/// The order cannot be directly compared in tests as the origin is different,
/// therefore wrap the assertion macro in a code that unifies the 'Origin'
pub fn assert_is_same_order(a: &Order, b: &Order) {
// Assume the same origin
let mut a = a.clone();
let mut b = b.clone();
a.origin = Origin::Ours;
b.origin = Origin::Ours;
assert_eq!(a, b);
}
pub fn dummy_new_order() -> maker_cfd::NewOrder {
maker_cfd::NewOrder {
price: Price::new(dec!(50_000)).expect("unexpected failure"),
@ -387,3 +375,7 @@ pub fn init_tracing() -> DefaultGuard {
guard
}
pub fn order_from_cfd(cfd: &Cfd) -> CfdOrder {
cfd.order.clone().into()
}

Loading…
Cancel
Save