Browse Source

Create cfds from the local state kept in projection actor

chore/leaner-release-process
Mariusz Klochowicz 3 years ago
parent
commit
0fb4457f37
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 73
      daemon/src/projection.rs
  2. 8
      daemon/src/to_sse_event.rs

73
daemon/src/projection.rs

@ -13,7 +13,7 @@ use xtra_productivity::xtra_productivity;
pub struct Actor { pub struct Actor {
tx: Tx, tx: Tx,
_state: State, state: State,
} }
pub struct Feeds { pub struct Feeds {
@ -35,7 +35,7 @@ impl Actor {
let (tx_cfds, rx_cfds) = watch::channel(init_cfds); let (tx_cfds, rx_cfds) = watch::channel(init_cfds);
let (tx_order, rx_order) = watch::channel(None); let (tx_order, rx_order) = watch::channel(None);
let (tx_update_cfd_feed, rx_update_cfd_feed) = watch::channel(HashMap::new()); let (tx_update_cfd_feed, rx_update_cfd_feed) = watch::channel(HashMap::new());
let (tx_quote, rx_quote) = watch::channel(init_quote.into()); let (tx_quote, rx_quote) = watch::channel(init_quote.clone().into());
let (tx_connected_takers, rx_connected_takers) = watch::channel(Vec::new()); let (tx_connected_takers, rx_connected_takers) = watch::channel(Vec::new());
( (
@ -47,9 +47,12 @@ impl Actor {
settlements: tx_update_cfd_feed, settlements: tx_update_cfd_feed,
connected_takers: tx_connected_takers, connected_takers: tx_connected_takers,
}, },
_state: State { state: State {
_role: role, role,
_network: network, network,
cfds: vec![],
proposals: HashMap::new(),
quote: init_quote,
}, },
}, },
Feeds { Feeds {
@ -76,8 +79,37 @@ struct Tx {
/// Internal struct to keep state in one place /// Internal struct to keep state in one place
struct State { struct State {
pub _role: Role, role: Role,
pub _network: Network, network: Network,
quote: bitmex_price_feed::Quote,
proposals: UpdateCfdProposals,
cfds: Vec<ModelCfd>,
}
impl State {
pub fn to_cfd(&self) -> Vec<Cfd> {
// FIXME: starting with the intermediate struct, only temporarily
let temp = CfdsWithAuxData::new(
self.cfds.clone(),
self.quote.clone(),
self.proposals.clone(),
self.role,
self.network,
);
temp.into()
}
pub fn update_proposals(&mut self, proposals: UpdateCfdProposals) {
let _ = std::mem::replace(&mut self.proposals, proposals);
}
pub fn update_quote(&mut self, quote: bitmex_price_feed::Quote) {
let _ = std::mem::replace(&mut self.quote, quote);
}
pub fn update_cfds(&mut self, cfds: Vec<ModelCfd>) {
let _ = std::mem::replace(&mut self.cfds, cfds);
}
} }
pub struct Update<T>(pub T); pub struct Update<T>(pub T);
@ -88,13 +120,19 @@ impl Actor {
let _ = self.tx.order.send(msg.0.map(|x| x.into())); let _ = self.tx.order.send(msg.0.map(|x| x.into()));
} }
fn handle(&mut self, msg: Update<bitmex_price_feed::Quote>) { fn handle(&mut self, msg: Update<bitmex_price_feed::Quote>) {
let _ = self.tx.quote.send(msg.0.into()); let quote = msg.0;
self.state.update_quote(quote.clone());
let _ = self.tx.quote.send(quote.into());
} }
fn handle(&mut self, msg: Update<Vec<ModelCfd>>) { fn handle(&mut self, msg: Update<Vec<ModelCfd>>) {
let _ = self.tx.cfds.send(msg.0); let cfds = msg.0;
self.state.update_cfds(cfds.clone());
let _ = self.tx.cfds.send(cfds);
} }
fn handle(&mut self, msg: Update<UpdateCfdProposals>) { fn handle(&mut self, msg: Update<UpdateCfdProposals>) {
let _ = self.tx.settlements.send(msg.0); let proposals = msg.0;
self.state.update_proposals(proposals.clone());
let _ = self.tx.settlements.send(proposals);
} }
fn handle(&mut self, msg: Update<Vec<model::Identity>>) { fn handle(&mut self, msg: Update<Vec<model::Identity>>) {
let _ = self let _ = self
@ -412,8 +450,8 @@ pub struct Cfd {
pub expiry_timestamp: OffsetDateTime, pub expiry_timestamp: OffsetDateTime,
} }
impl From<&CfdsWithAuxData> for Vec<Cfd> { impl From<CfdsWithAuxData> for Vec<Cfd> {
fn from(input: &CfdsWithAuxData) -> Self { fn from(input: CfdsWithAuxData) -> Self {
let current_price = input.current_price; let current_price = input.current_price;
let network = input.network; let network = input.network;
@ -476,22 +514,19 @@ pub struct CfdsWithAuxData {
impl CfdsWithAuxData { impl CfdsWithAuxData {
pub fn new( pub fn new(
rx_cfds: &watch::Receiver<Vec<model::cfd::Cfd>>, cfds: Vec<model::cfd::Cfd>,
rx_quote: &watch::Receiver<Quote>, quote: bitmex_price_feed::Quote,
rx_updates: &watch::Receiver<UpdateCfdProposals>, pending_proposals: UpdateCfdProposals,
role: Role, role: Role,
network: Network, network: Network,
) -> Self { ) -> Self {
let quote: bitmex_price_feed::Quote = rx_quote.borrow().clone().into();
let current_price = match role { let current_price = match role {
Role::Maker => quote.for_maker(), Role::Maker => quote.for_maker(),
Role::Taker => quote.for_taker(), Role::Taker => quote.for_taker(),
}; };
let pending_proposals = rx_updates.borrow().clone();
CfdsWithAuxData { CfdsWithAuxData {
cfds: rx_cfds.borrow().clone(), cfds,
current_price, current_price,
pending_proposals, pending_proposals,
network, network,

8
daemon/src/to_sse_event.rs

@ -1,7 +1,7 @@
use crate::connection::ConnectionStatus; use crate::connection::ConnectionStatus;
use crate::model; use crate::model;
use crate::model::Timestamp; use crate::model::Timestamp;
use crate::projection::{Cfd, CfdAction, CfdOrder, CfdsWithAuxData, Identity, Quote}; use crate::projection::{Cfd, CfdAction, CfdOrder, Identity, Quote};
use bdk::bitcoin::Amount; use bdk::bitcoin::Amount;
use rocket::request::FromParam; use rocket::request::FromParam;
use rocket::response::stream::Event; use rocket::response::stream::Event;
@ -20,11 +20,9 @@ pub trait ToSseEvent {
fn to_sse_event(&self) -> Event; fn to_sse_event(&self) -> Event;
} }
impl ToSseEvent for CfdsWithAuxData { impl ToSseEvent for Vec<Cfd> {
// TODO: This conversion can fail, we might want to change the API
fn to_sse_event(&self) -> Event { fn to_sse_event(&self) -> Event {
let cfds: Vec<Cfd> = self.into(); Event::json(&self).event("cfds")
Event::json(&cfds).event("cfds")
} }
} }

Loading…
Cancel
Save