Browse Source

Move Cfd types into the projection actor

Move the logic of creating Cfd representation suitable for UI into the
projection actor

For the time being, keep the logic as free functions that are still used in the
"old" way (inside ToSseEvent)
chore/leaner-release-process
Mariusz Klochowicz 3 years ago
parent
commit
7464ababc8
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 210
      daemon/src/projection.rs
  2. 4
      daemon/src/routes_maker.rs
  3. 4
      daemon/src/routes_taker.rs
  4. 158
      daemon/src/to_sse_event.rs

210
daemon/src/projection.rs

@ -1,12 +1,13 @@
use std::collections::HashMap; use std::collections::HashMap;
use crate::model::cfd::{OrderId, Role, SettlementKind, UpdateCfdProposal}; use crate::model::cfd::{Cfd as ModelCfd, OrderId, Role, SettlementKind, UpdateCfdProposal};
use crate::model::{Leverage, Position, Timestamp, TradingPair}; use crate::model::{Leverage, Position, Timestamp, TradingPair};
use crate::{bitmex_price_feed, model, tx, Cfd, Order, UpdateCfdProposals}; use crate::{bitmex_price_feed, model, tx, Order, UpdateCfdProposals};
use bdk::bitcoin::{Amount, Network}; use bdk::bitcoin::{Amount, Network, SignedAmount};
use itertools::Itertools; use itertools::Itertools;
use rust_decimal::Decimal; use rust_decimal::Decimal;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use tokio::sync::watch; use tokio::sync::watch;
use xtra_productivity::xtra_productivity; use xtra_productivity::xtra_productivity;
@ -19,12 +20,12 @@ pub struct Feeds {
pub order: watch::Receiver<Option<CfdOrder>>, pub order: watch::Receiver<Option<CfdOrder>>,
pub connected_takers: watch::Receiver<Vec<Identity>>, pub connected_takers: watch::Receiver<Vec<Identity>>,
// TODO: Convert items below here into projections // TODO: Convert items below here into projections
pub cfds: watch::Receiver<Vec<Cfd>>, pub cfds: watch::Receiver<Vec<ModelCfd>>,
pub settlements: watch::Receiver<UpdateCfdProposals>, pub settlements: watch::Receiver<UpdateCfdProposals>,
} }
impl Actor { impl Actor {
pub fn new(init_cfds: Vec<Cfd>, init_quote: bitmex_price_feed::Quote) -> (Self, Feeds) { pub fn new(init_cfds: Vec<ModelCfd>, init_quote: bitmex_price_feed::Quote) -> (Self, Feeds) {
let (tx_cfds, rx_cfds) = watch::channel(init_cfds); let (tx_cfds, rx_cfds) = watch::channel(init_cfds);
let (tx_order, rx_order) = watch::channel(None); let (tx_order, rx_order) = watch::channel(None);
let (tx_update_cfd_feed, rx_update_cfd_feed) = watch::channel(HashMap::new()); let (tx_update_cfd_feed, rx_update_cfd_feed) = watch::channel(HashMap::new());
@ -54,7 +55,7 @@ impl Actor {
/// Internal struct to keep all the senders around in one place /// Internal struct to keep all the senders around in one place
struct Tx { struct Tx {
pub cfds: watch::Sender<Vec<Cfd>>, pub cfds: watch::Sender<Vec<ModelCfd>>,
pub order: watch::Sender<Option<CfdOrder>>, pub order: watch::Sender<Option<CfdOrder>>,
pub quote: watch::Sender<Quote>, pub quote: watch::Sender<Quote>,
pub settlements: watch::Sender<UpdateCfdProposals>, pub settlements: watch::Sender<UpdateCfdProposals>,
@ -73,7 +74,7 @@ impl Actor {
fn handle(&mut self, msg: Update<bitmex_price_feed::Quote>) { fn handle(&mut self, msg: Update<bitmex_price_feed::Quote>) {
let _ = self.tx.quote.send(msg.0.into()); let _ = self.tx.quote.send(msg.0.into());
} }
fn handle(&mut self, msg: Update<Vec<Cfd>>) { fn handle(&mut self, msg: Update<Vec<ModelCfd>>) {
let _ = self.tx.cfds.send(msg.0); let _ = self.tx.cfds.send(msg.0);
} }
fn handle(&mut self, msg: Update<UpdateCfdProposals>) { fn handle(&mut self, msg: Update<UpdateCfdProposals>) {
@ -184,28 +185,6 @@ impl From<Quote> for bitmex_price_feed::Quote {
} }
} }
#[cfg(test)]
mod tests {
use super::*;
use rust_decimal_macros::dec;
use serde_test::{assert_ser_tokens, Token};
#[test]
fn usd_serializes_with_only_cents() {
let usd = Usd::new(model::Usd::new(dec!(1000.12345)));
assert_ser_tokens(&usd, &[Token::Str("1000.12")]);
}
#[test]
fn price_serializes_with_only_cents() {
let price = Price::new(model::Price::new(dec!(1000.12345)).unwrap());
assert_ser_tokens(&price, &[Token::Str("1000.12")]);
}
}
#[derive(Debug, Clone, PartialEq, Serialize)] #[derive(Debug, Clone, PartialEq, Serialize)]
pub struct CfdOrder { pub struct CfdOrder {
pub id: OrderId, pub id: OrderId,
@ -337,8 +316,7 @@ pub struct CfdDetails {
payout: Option<Amount>, payout: Option<Amount>,
} }
// TODO: don't expose fn to_cfd_details(cfd: &model::cfd::Cfd, network: Network) -> CfdDetails {
pub fn to_cfd_details(cfd: &model::cfd::Cfd, network: Network) -> CfdDetails {
CfdDetails { CfdDetails {
tx_url_list: tx::to_tx_url_list(cfd.state.clone(), network), tx_url_list: tx::to_tx_url_list(cfd.state.clone(), network),
payout: cfd.payout(), payout: cfd.payout(),
@ -359,7 +337,7 @@ pub enum CfdAction {
RejectRollOver, RejectRollOver,
} }
pub fn available_actions(state: CfdState, role: Role) -> Vec<CfdAction> { fn available_actions(state: CfdState, role: Role) -> Vec<CfdAction> {
match (state, role) { match (state, role) {
(CfdState::IncomingOrderRequest { .. }, Role::Maker) => { (CfdState::IncomingOrderRequest { .. }, Role::Maker) => {
vec![CfdAction::AcceptOrder, CfdAction::RejectOrder] vec![CfdAction::AcceptOrder, CfdAction::RejectOrder]
@ -386,3 +364,171 @@ pub fn available_actions(state: CfdState, role: Role) -> Vec<CfdAction> {
_ => vec![], _ => vec![],
} }
} }
#[derive(Debug, Clone, Serialize)]
pub struct Cfd {
pub order_id: OrderId,
pub initial_price: Price,
pub leverage: Leverage,
pub trading_pair: TradingPair,
pub position: Position,
pub liquidation_price: Price,
pub quantity_usd: Usd,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
pub margin: Amount,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
pub margin_counterparty: Amount,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
pub profit_btc: SignedAmount,
pub profit_in_percent: String,
pub state: CfdState,
pub actions: Vec<CfdAction>,
pub state_transition_timestamp: i64,
pub details: CfdDetails,
#[serde(with = "::time::serde::timestamp")]
pub expiry_timestamp: OffsetDateTime,
}
impl From<&CfdsWithAuxData> for Vec<Cfd> {
fn from(input: &CfdsWithAuxData) -> Self {
let current_price = input.current_price;
let network = input.network;
let cfds = input
.cfds
.iter()
.map(|cfd| {
let (profit_btc, profit_in_percent) =
cfd.profit(current_price).unwrap_or_else(|error| {
tracing::warn!(
"Calculating profit/loss failed. Falling back to 0. {:#}",
error
);
(SignedAmount::ZERO, Decimal::ZERO.into())
});
let pending_proposal = input.pending_proposals.get(&cfd.order.id);
let state = to_cfd_state(&cfd.state, pending_proposal);
Cfd {
order_id: cfd.order.id,
initial_price: cfd.order.price.into(),
leverage: cfd.order.leverage,
trading_pair: cfd.order.trading_pair.clone(),
position: cfd.position(),
liquidation_price: cfd.order.liquidation_price.into(),
quantity_usd: cfd.quantity_usd.into(),
profit_btc,
profit_in_percent: profit_in_percent.round_dp(1).to_string(),
state: state.clone(),
actions: available_actions(state, cfd.role()),
state_transition_timestamp: cfd.state.get_transition_timestamp().seconds(),
// TODO: Depending on the state the margin might be set (i.e. in Open we save it
// in the DB internally) and does not have to be calculated
margin: cfd.margin().expect("margin to be available"),
margin_counterparty: cfd.counterparty_margin().expect("margin to be available"),
details: to_cfd_details(cfd, network),
expiry_timestamp: match cfd.expiry_timestamp() {
None => cfd.order.oracle_event_id.timestamp(),
Some(timestamp) => timestamp,
},
}
})
.collect::<Vec<Cfd>>();
cfds
}
}
/// Intermediate struct to able to piggy-back additional information along with
/// cfds, so we can avoid a 1:1 mapping between the states in the model and seen
/// by UI
// TODO: Remove this struct out of existence
pub struct CfdsWithAuxData {
pub cfds: Vec<model::cfd::Cfd>,
pub current_price: model::Price,
pub pending_proposals: UpdateCfdProposals,
pub network: Network,
}
impl CfdsWithAuxData {
pub fn new(
rx_cfds: &watch::Receiver<Vec<model::cfd::Cfd>>,
rx_quote: &watch::Receiver<Quote>,
rx_updates: &watch::Receiver<UpdateCfdProposals>,
role: Role,
network: Network,
) -> Self {
let quote: bitmex_price_feed::Quote = rx_quote.borrow().clone().into();
let current_price = match role {
Role::Maker => quote.for_maker(),
Role::Taker => quote.for_taker(),
};
let pending_proposals = rx_updates.borrow().clone();
CfdsWithAuxData {
cfds: rx_cfds.borrow().clone(),
current_price,
pending_proposals,
network,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use rust_decimal_macros::dec;
use serde_test::{assert_ser_tokens, Token};
#[test]
fn usd_serializes_with_only_cents() {
let usd = Usd::new(model::Usd::new(dec!(1000.12345)));
assert_ser_tokens(&usd, &[Token::Str("1000.12")]);
}
#[test]
fn price_serializes_with_only_cents() {
let price = Price::new(model::Price::new(dec!(1000.12345)).unwrap());
assert_ser_tokens(&price, &[Token::Str("1000.12")]);
}
#[test]
fn state_snapshot_test() {
// Make sure to update the UI after changing this test!
let json = serde_json::to_string(&CfdState::OutgoingOrderRequest).unwrap();
assert_eq!(json, "\"OutgoingOrderRequest\"");
let json = serde_json::to_string(&CfdState::IncomingOrderRequest).unwrap();
assert_eq!(json, "\"IncomingOrderRequest\"");
let json = serde_json::to_string(&CfdState::Accepted).unwrap();
assert_eq!(json, "\"Accepted\"");
let json = serde_json::to_string(&CfdState::Rejected).unwrap();
assert_eq!(json, "\"Rejected\"");
let json = serde_json::to_string(&CfdState::ContractSetup).unwrap();
assert_eq!(json, "\"ContractSetup\"");
let json = serde_json::to_string(&CfdState::PendingOpen).unwrap();
assert_eq!(json, "\"PendingOpen\"");
let json = serde_json::to_string(&CfdState::Open).unwrap();
assert_eq!(json, "\"Open\"");
let json = serde_json::to_string(&CfdState::OpenCommitted).unwrap();
assert_eq!(json, "\"OpenCommitted\"");
let json = serde_json::to_string(&CfdState::PendingRefund).unwrap();
assert_eq!(json, "\"PendingRefund\"");
let json = serde_json::to_string(&CfdState::Refunded).unwrap();
assert_eq!(json, "\"Refunded\"");
let json = serde_json::to_string(&CfdState::SetupFailed).unwrap();
assert_eq!(json, "\"SetupFailed\"");
}
}

4
daemon/src/routes_maker.rs

@ -3,9 +3,9 @@ use bdk::bitcoin::Network;
use daemon::auth::Authenticated; use daemon::auth::Authenticated;
use daemon::model::cfd::{OrderId, Role}; use daemon::model::cfd::{OrderId, Role};
use daemon::model::{Price, Usd, WalletInfo}; use daemon::model::{Price, Usd, WalletInfo};
use daemon::projection::{CfdAction, Feeds}; use daemon::projection::{CfdAction, CfdsWithAuxData, Feeds};
use daemon::routes::EmbeddedFileExt; use daemon::routes::EmbeddedFileExt;
use daemon::to_sse_event::{CfdsWithAuxData, ToSseEvent}; use daemon::to_sse_event::ToSseEvent;
use daemon::{maker_cfd, maker_inc_connections, monitor, oracle, wallet}; use daemon::{maker_cfd, maker_inc_connections, monitor, oracle, wallet};
use http_api_problem::{HttpApiProblem, StatusCode}; use http_api_problem::{HttpApiProblem, StatusCode};
use rocket::http::{ContentType, Header, Status}; use rocket::http::{ContentType, Header, Status};

4
daemon/src/routes_taker.rs

@ -2,9 +2,9 @@ use bdk::bitcoin::{Amount, Network};
use daemon::connection::ConnectionStatus; use daemon::connection::ConnectionStatus;
use daemon::model::cfd::{calculate_long_margin, OrderId, Role}; use daemon::model::cfd::{calculate_long_margin, OrderId, Role};
use daemon::model::{Leverage, Price, Usd, WalletInfo}; use daemon::model::{Leverage, Price, Usd, WalletInfo};
use daemon::projection::{CfdAction, Feeds}; use daemon::projection::{CfdAction, CfdsWithAuxData, Feeds};
use daemon::routes::EmbeddedFileExt; use daemon::routes::EmbeddedFileExt;
use daemon::to_sse_event::{CfdsWithAuxData, ToSseEvent}; use daemon::to_sse_event::ToSseEvent;
use daemon::{bitmex_price_feed, monitor, oracle, taker_cfd, tx, wallet}; use daemon::{bitmex_price_feed, monitor, oracle, taker_cfd, tx, wallet};
use http_api_problem::{HttpApiProblem, StatusCode}; use http_api_problem::{HttpApiProblem, StatusCode};
use rocket::http::{ContentType, Status}; use rocket::http::{ContentType, Status};

158
daemon/src/to_sse_event.rs

@ -1,46 +1,11 @@
use crate::connection::ConnectionStatus; use crate::connection::ConnectionStatus;
use crate::model::cfd::{OrderId, Role, UpdateCfdProposals}; use crate::model;
use crate::model::{Leverage, Position, Timestamp, TradingPair}; use crate::model::Timestamp;
use crate::projection::{self, CfdAction, CfdOrder, CfdState, Identity, Price, Quote, Usd}; use crate::projection::{Cfd, CfdAction, CfdOrder, CfdsWithAuxData, Identity, Quote};
use crate::{bitmex_price_feed, model}; use bdk::bitcoin::Amount;
use bdk::bitcoin::{Amount, Network, SignedAmount};
use rocket::request::FromParam; use rocket::request::FromParam;
use rocket::response::stream::Event; use rocket::response::stream::Event;
use rust_decimal::Decimal;
use serde::Serialize; use serde::Serialize;
use time::OffsetDateTime;
use tokio::sync::watch;
#[derive(Debug, Clone, Serialize)]
pub struct Cfd {
pub order_id: OrderId,
pub initial_price: Price,
pub leverage: Leverage,
pub trading_pair: TradingPair,
pub position: Position,
pub liquidation_price: Price,
pub quantity_usd: Usd,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
pub margin: Amount,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
pub margin_counterparty: Amount,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
pub profit_btc: SignedAmount,
pub profit_in_percent: String,
pub state: CfdState,
pub actions: Vec<CfdAction>,
pub state_transition_timestamp: i64,
pub details: projection::CfdDetails,
#[serde(with = "::time::serde::timestamp")]
pub expiry_timestamp: OffsetDateTime,
}
impl<'v> FromParam<'v> for CfdAction { impl<'v> FromParam<'v> for CfdAction {
type Error = serde_plain::Error; type Error = serde_plain::Error;
@ -55,90 +20,10 @@ pub trait ToSseEvent {
fn to_sse_event(&self) -> Event; fn to_sse_event(&self) -> Event;
} }
/// Intermediate struct to able to piggy-back additional information along with
/// cfds, so we can avoid a 1:1 mapping between the states in the model and seen
/// by UI
pub struct CfdsWithAuxData {
pub cfds: Vec<model::cfd::Cfd>,
pub current_price: model::Price,
pub pending_proposals: UpdateCfdProposals,
pub network: Network,
}
impl CfdsWithAuxData {
pub fn new(
rx_cfds: &watch::Receiver<Vec<model::cfd::Cfd>>,
rx_quote: &watch::Receiver<Quote>,
rx_updates: &watch::Receiver<UpdateCfdProposals>,
role: Role,
network: Network,
) -> Self {
let quote: bitmex_price_feed::Quote = rx_quote.borrow().clone().into();
let current_price = match role {
Role::Maker => quote.for_maker(),
Role::Taker => quote.for_taker(),
};
let pending_proposals = rx_updates.borrow().clone();
CfdsWithAuxData {
cfds: rx_cfds.borrow().clone(),
current_price,
pending_proposals,
network,
}
}
}
impl ToSseEvent for CfdsWithAuxData { impl ToSseEvent for CfdsWithAuxData {
// TODO: This conversion can fail, we might want to change the API // TODO: This conversion can fail, we might want to change the API
fn to_sse_event(&self) -> Event { fn to_sse_event(&self) -> Event {
let current_price = self.current_price; let cfds: Vec<Cfd> = self.into();
let network = self.network;
let cfds = self
.cfds
.iter()
.map(|cfd| {
let (profit_btc, profit_in_percent) =
cfd.profit(current_price).unwrap_or_else(|error| {
tracing::warn!(
"Calculating profit/loss failed. Falling back to 0. {:#}",
error
);
(SignedAmount::ZERO, Decimal::ZERO.into())
});
let pending_proposal = self.pending_proposals.get(&cfd.order.id);
let state = projection::to_cfd_state(&cfd.state, pending_proposal);
Cfd {
order_id: cfd.order.id,
initial_price: cfd.order.price.into(),
leverage: cfd.order.leverage,
trading_pair: cfd.order.trading_pair.clone(),
position: cfd.position(),
liquidation_price: cfd.order.liquidation_price.into(),
quantity_usd: cfd.quantity_usd.into(),
profit_btc,
profit_in_percent: profit_in_percent.round_dp(1).to_string(),
state: state.clone(),
actions: projection::available_actions(state, cfd.role()),
state_transition_timestamp: cfd.state.get_transition_timestamp().seconds(),
// TODO: Depending on the state the margin might be set (i.e. in Open we save it
// in the DB internally) and does not have to be calculated
margin: cfd.margin().expect("margin to be available"),
margin_counterparty: cfd.counterparty_margin().expect("margin to be available"),
details: projection::to_cfd_details(cfd, network),
expiry_timestamp: match cfd.expiry_timestamp() {
None => cfd.order.oracle_event_id.timestamp(),
Some(timestamp) => timestamp,
},
}
})
.collect::<Vec<Cfd>>();
Event::json(&cfds).event("cfds") Event::json(&cfds).event("cfds")
} }
} }
@ -191,36 +76,3 @@ impl ToSseEvent for Quote {
Event::json(self).event("quote") Event::json(self).event("quote")
} }
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn state_snapshot_test() {
// Make sure to update the UI after changing this test!
let json = serde_json::to_string(&CfdState::OutgoingOrderRequest).unwrap();
assert_eq!(json, "\"OutgoingOrderRequest\"");
let json = serde_json::to_string(&CfdState::IncomingOrderRequest).unwrap();
assert_eq!(json, "\"IncomingOrderRequest\"");
let json = serde_json::to_string(&CfdState::Accepted).unwrap();
assert_eq!(json, "\"Accepted\"");
let json = serde_json::to_string(&CfdState::Rejected).unwrap();
assert_eq!(json, "\"Rejected\"");
let json = serde_json::to_string(&CfdState::ContractSetup).unwrap();
assert_eq!(json, "\"ContractSetup\"");
let json = serde_json::to_string(&CfdState::PendingOpen).unwrap();
assert_eq!(json, "\"PendingOpen\"");
let json = serde_json::to_string(&CfdState::Open).unwrap();
assert_eq!(json, "\"Open\"");
let json = serde_json::to_string(&CfdState::OpenCommitted).unwrap();
assert_eq!(json, "\"OpenCommitted\"");
let json = serde_json::to_string(&CfdState::PendingRefund).unwrap();
assert_eq!(json, "\"PendingRefund\"");
let json = serde_json::to_string(&CfdState::Refunded).unwrap();
assert_eq!(json, "\"Refunded\"");
let json = serde_json::to_string(&CfdState::SetupFailed).unwrap();
assert_eq!(json, "\"SetupFailed\"");
}
}

Loading…
Cancel
Save