From 2ee97c41fe272c3f5b5b7db8725064627fce10e6 Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Tue, 30 Nov 2021 14:31:03 +1030 Subject: [PATCH] Pass changes in cfd proposals into projection actor Instead of passing the whole HashMap to the projection actor, only send what has actually changed. Provide separate messages to update settlement proposal and roll over proposals, as they will be sent from different actors. --- daemon/src/maker_cfd.rs | 94 ++++++++++++++++------------- daemon/src/projection.rs | 127 ++++++++++++++++++++++++++++++++++++--- daemon/src/taker_cfd.rs | 73 ++++++++++++++-------- 3 files changed, 216 insertions(+), 78 deletions(-) diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index de4f6a3..5fb064b 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -6,12 +6,14 @@ use crate::model::cfd::{ }; use crate::model::{Identity, Price, Timestamp, Usd}; use crate::monitor::MonitorParams; -use crate::projection::Update; +use crate::projection::{ + try_into_update_rollover_proposal, try_into_update_settlement_proposal, Update, + UpdateRollOverProposal, UpdateSettlementProposal, +}; use crate::setup_contract::{RolloverParams, SetupParams}; use crate::tokio_ext::FutureExt; use crate::{ log_error, maker_inc_connections, monitor, oracle, projection, setup_contract, wallet, wire, - UpdateCfdProposals, }; use anyhow::{Context as _, Result}; use async_trait::async_trait; @@ -176,17 +178,16 @@ impl Actor { } }; - self.current_pending_proposals.insert( - proposal.order_id, - ( - UpdateCfdProposal::RollOverProposal { - proposal, - direction: SettlementKind::Incoming, - }, - taker_id, - ), - ); - self.send_pending_proposals().await?; + let new_proposal = UpdateCfdProposal::RollOverProposal { + proposal: proposal.clone(), + direction: SettlementKind::Incoming, + }; + + self.current_pending_proposals + .insert(proposal.order_id, (new_proposal.clone(), taker_id)); + self.projection_actor + .send(try_into_update_rollover_proposal(new_proposal)?) + .await??; Ok(()) } @@ -200,17 +201,17 @@ impl Actor { "Received settlement proposal from the taker: {:?}", proposal ); - self.current_pending_proposals.insert( - proposal.order_id, - ( - UpdateCfdProposal::Settlement { - proposal, - direction: SettlementKind::Incoming, - }, - taker_id, - ), - ); - self.send_pending_proposals().await?; + + let new_proposal = UpdateCfdProposal::Settlement { + proposal: proposal.clone(), + direction: SettlementKind::Incoming, + }; + + self.current_pending_proposals + .insert(proposal.order_id, (new_proposal.clone(), taker_id)); + self.projection_actor + .send(try_into_update_settlement_proposal(new_proposal)?) + .await??; Ok(()) } @@ -253,28 +254,35 @@ impl Actor { Ok(()) } - /// Send pending proposals for the purposes of UI updates. - /// Filters out the Identities, as they are an implementation detail inside of - /// the actor - async fn send_pending_proposals(&self) -> Result<()> { - let pending_proposal = self - .current_pending_proposals - .iter() - .map(|(order_id, (update_cfd, _))| (*order_id, (update_cfd.clone()))) - .collect::(); - let _ = self - .projection_actor - .send(projection::Update(pending_proposal)) - .await?; - Ok(()) - } - /// Removes a proposal and updates the update cfd proposals' feed async fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> { - if self.current_pending_proposals.remove(order_id).is_none() { - anyhow::bail!("Could not find proposal with order id: {}", &order_id) + let removed_proposal = self.current_pending_proposals.remove(order_id); + + // Strip the identity, ID doesn't care about this implementation detail + let removed_proposal = removed_proposal.map(|(proposal, _)| proposal); + + if let Some(removed_proposal) = removed_proposal { + match removed_proposal { + UpdateCfdProposal::Settlement { .. } => { + self.projection_actor + .send(UpdateSettlementProposal { + order: *order_id, + proposal: None, + }) + .await?? + } + UpdateCfdProposal::RollOverProposal { .. } => { + self.projection_actor + .send(UpdateRollOverProposal { + order: *order_id, + proposal: None, + }) + .await?? + } + } + } else { + anyhow::bail!("Could not find proposal with order id: {}", &order_id); } - self.send_pending_proposals().await?; Ok(()) } diff --git a/daemon/src/projection.rs b/daemon/src/projection.rs index a04c659..f5fefd2 100644 --- a/daemon/src/projection.rs +++ b/daemon/src/projection.rs @@ -1,8 +1,12 @@ use std::collections::HashMap; -use crate::model::cfd::{Cfd as ModelCfd, OrderId, Role, SettlementKind, UpdateCfdProposal}; +use crate::model::cfd::{ + Cfd as ModelCfd, OrderId, Role, RollOverProposal, SettlementKind, SettlementProposal, + UpdateCfdProposal, +}; use crate::model::{Leverage, Position, Timestamp, TradingPair}; use crate::{bitmex_price_feed, model, tx, Order, UpdateCfdProposals}; +use anyhow::Result; use bdk::bitcoin::{Amount, Network, SignedAmount}; use itertools::Itertools; use rust_decimal::Decimal; @@ -11,6 +15,22 @@ use time::OffsetDateTime; use tokio::sync::watch; use xtra_productivity::xtra_productivity; +/// Amend a given settlement proposal (if `proposal.is_none()`, it should be removed) +pub struct UpdateSettlementProposal { + pub order: OrderId, + pub proposal: Option<(SettlementProposal, SettlementKind)>, +} + +/// Amend a given rollover proposal (if `proposal.is_none()`, it should be removed) +pub struct UpdateRollOverProposal { + pub order: OrderId, + pub proposal: Option<(RollOverProposal, SettlementKind)>, +} + +/// Store the latest state of `T` for display purposes +/// (replaces previously stored values) +pub struct Update(pub T); + pub struct Actor { tx: Tx, state: State, @@ -95,8 +115,14 @@ impl State { temp.into() } - pub fn update_proposals(&mut self, proposals: UpdateCfdProposals) { - let _ = std::mem::replace(&mut self.proposals, proposals); + pub fn amend_settlement_proposal(&mut self, proposal: UpdateSettlementProposal) -> Result<()> { + let order = proposal.order; + self.amend_cfd_proposal(order, proposal.into()) + } + + pub fn amend_rollover_proposal(&mut self, proposal: UpdateRollOverProposal) -> Result<()> { + let order = proposal.order; + self.amend_cfd_proposal(order, proposal.into()) } pub fn update_quote(&mut self, quote: bitmex_price_feed::Quote) { @@ -106,9 +132,24 @@ impl State { pub fn update_cfds(&mut self, cfds: Vec) { let _ = std::mem::replace(&mut self.cfds, cfds); } -} -pub struct Update(pub T); + fn amend_cfd_proposal( + &mut self, + order: OrderId, + proposal: Option, + ) -> Result<()> { + if let Some(proposal) = proposal { + self.proposals.insert(order, proposal); + tracing::trace!(%order, "Cfd proposal got updated"); + } else { + if self.proposals.remove(&order).is_none() { + anyhow::bail!("Could not find proposal with order id: {}", &order) + } + tracing::trace!(%order, "Removed cfd proposal"); + }; + Ok(()) + } +} #[xtra_productivity] impl Actor { @@ -125,16 +166,22 @@ impl Actor { self.state.update_cfds(msg.0); let _ = self.tx.cfds.send(self.state.to_cfds()); } - fn handle(&mut self, msg: Update) { - self.state.update_proposals(msg.0); - let _ = self.tx.cfds.send(self.state.to_cfds()); - } fn handle(&mut self, msg: Update>) { let _ = self .tx .connected_takers .send(msg.0.iter().map(|x| x.into()).collect_vec()); } + fn handle(&mut self, msg: UpdateSettlementProposal) -> Result<()> { + self.state.amend_settlement_proposal(msg)?; + let _ = self.tx.cfds.send(self.state.to_cfds()); + Ok(()) + } + fn handle(&mut self, msg: UpdateRollOverProposal) -> Result<()> { + self.state.amend_rollover_proposal(msg)?; + let _ = self.tx.cfds.send(self.state.to_cfds()); + Ok(()) + } } impl xtra::Actor for Actor {} @@ -578,3 +625,65 @@ mod tests { assert_eq!(json, "\"SetupFailed\""); } } + +pub fn try_into_update_settlement_proposal( + cfd_update_proposal: UpdateCfdProposal, +) -> Result { + match cfd_update_proposal { + UpdateCfdProposal::Settlement { + proposal, + direction, + } => Ok(UpdateSettlementProposal { + order: proposal.order_id, + proposal: Some((proposal, direction)), + }), + UpdateCfdProposal::RollOverProposal { .. } => { + anyhow::bail!("Can't convert a RollOver proposal") + } + } +} + +pub fn try_into_update_rollover_proposal( + cfd_update_proposal: UpdateCfdProposal, +) -> Result { + match cfd_update_proposal { + UpdateCfdProposal::RollOverProposal { + proposal, + direction, + } => Ok(UpdateRollOverProposal { + order: proposal.order_id, + proposal: Some((proposal, direction)), + }), + UpdateCfdProposal::Settlement { .. } => { + anyhow::bail!("Can't convert a Settlement proposal") + } + } +} + +impl From for Option { + fn from(proposal: UpdateSettlementProposal) -> Self { + let UpdateSettlementProposal { order: _, proposal } = proposal; + if let Some((proposal, kind)) = proposal { + Some(UpdateCfdProposal::Settlement { + proposal, + direction: kind, + }) + } else { + None + } + } +} + +impl From for Option { + fn from(proposal: UpdateRollOverProposal) -> Self { + let UpdateRollOverProposal { order: _, proposal } = proposal; + if let Some((proposal, kind)) = proposal { + Some(UpdateCfdProposal::RollOverProposal { + proposal, + direction: kind, + }) + } else { + None + } + } +} diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index fc21f80..ddc6a15 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -8,6 +8,10 @@ use crate::model::cfd::{ }; use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd}; use crate::monitor::{self, MonitorParams}; +use crate::projection::{ + try_into_update_rollover_proposal, try_into_update_settlement_proposal, UpdateRollOverProposal, + UpdateSettlementProposal, +}; use crate::setup_contract::RolloverParams; use crate::tokio_ext::FutureExt; use crate::wire::RollOverMsg; @@ -106,19 +110,32 @@ where } impl Actor { - async fn send_pending_update_proposals(&self) -> Result<()> { - Ok(self - .projection_actor - .send(projection::Update(self.current_pending_proposals.clone())) - .await?) - } - /// Removes a proposal and updates the update cfd proposals' feed async fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> { - if self.current_pending_proposals.remove(order_id).is_none() { - anyhow::bail!("Could not find proposal with order id: {}", &order_id) + let removed_proposal = self.current_pending_proposals.remove(order_id); + + if let Some(removed_proposal) = removed_proposal { + match removed_proposal { + UpdateCfdProposal::Settlement { .. } => { + self.projection_actor + .send(UpdateSettlementProposal { + order: *order_id, + proposal: None, + }) + .await?? + } + UpdateCfdProposal::RollOverProposal { .. } => { + self.projection_actor + .send(UpdateRollOverProposal { + order: *order_id, + proposal: None, + }) + .await?? + } + } + } else { + anyhow::bail!("Could not find proposal with order id: {}", &order_id); } - self.send_pending_update_proposals().await?; Ok(()) } @@ -162,14 +179,16 @@ where timestamp: Timestamp::now(), }; - self.current_pending_proposals.insert( - proposal.order_id, - UpdateCfdProposal::RollOverProposal { - proposal: proposal.clone(), - direction: SettlementKind::Outgoing, - }, - ); - self.send_pending_update_proposals().await?; + let new_proposal = UpdateCfdProposal::RollOverProposal { + proposal: proposal.clone(), + direction: SettlementKind::Outgoing, + }; + + self.current_pending_proposals + .insert(proposal.order_id, new_proposal.clone()); + self.projection_actor + .send(try_into_update_rollover_proposal(new_proposal)?) + .await??; self.conn_actor .send(wire::TakerToMaker::ProposeRollOver { @@ -212,14 +231,16 @@ impl Actor { ) } - self.current_pending_proposals.insert( - proposal.order_id, - UpdateCfdProposal::Settlement { - proposal: proposal.clone(), - direction: SettlementKind::Outgoing, - }, - ); - self.send_pending_update_proposals().await?; + let new_proposal = UpdateCfdProposal::Settlement { + proposal: proposal.clone(), + direction: SettlementKind::Outgoing, + }; + + self.current_pending_proposals + .insert(proposal.order_id, new_proposal.clone()); + self.projection_actor + .send(try_into_update_settlement_proposal(new_proposal)?) + .await??; self.conn_actor .send(wire::TakerToMaker::ProposeSettlement {