diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index de4f6a3..5fb064b 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -6,12 +6,14 @@ use crate::model::cfd::{ }; use crate::model::{Identity, Price, Timestamp, Usd}; use crate::monitor::MonitorParams; -use crate::projection::Update; +use crate::projection::{ + try_into_update_rollover_proposal, try_into_update_settlement_proposal, Update, + UpdateRollOverProposal, UpdateSettlementProposal, +}; use crate::setup_contract::{RolloverParams, SetupParams}; use crate::tokio_ext::FutureExt; use crate::{ log_error, maker_inc_connections, monitor, oracle, projection, setup_contract, wallet, wire, - UpdateCfdProposals, }; use anyhow::{Context as _, Result}; use async_trait::async_trait; @@ -176,17 +178,16 @@ impl Actor { } }; - self.current_pending_proposals.insert( - proposal.order_id, - ( - UpdateCfdProposal::RollOverProposal { - proposal, - direction: SettlementKind::Incoming, - }, - taker_id, - ), - ); - self.send_pending_proposals().await?; + let new_proposal = UpdateCfdProposal::RollOverProposal { + proposal: proposal.clone(), + direction: SettlementKind::Incoming, + }; + + self.current_pending_proposals + .insert(proposal.order_id, (new_proposal.clone(), taker_id)); + self.projection_actor + .send(try_into_update_rollover_proposal(new_proposal)?) + .await??; Ok(()) } @@ -200,17 +201,17 @@ impl Actor { "Received settlement proposal from the taker: {:?}", proposal ); - self.current_pending_proposals.insert( - proposal.order_id, - ( - UpdateCfdProposal::Settlement { - proposal, - direction: SettlementKind::Incoming, - }, - taker_id, - ), - ); - self.send_pending_proposals().await?; + + let new_proposal = UpdateCfdProposal::Settlement { + proposal: proposal.clone(), + direction: SettlementKind::Incoming, + }; + + self.current_pending_proposals + .insert(proposal.order_id, (new_proposal.clone(), taker_id)); + self.projection_actor + .send(try_into_update_settlement_proposal(new_proposal)?) + .await??; Ok(()) } @@ -253,28 +254,35 @@ impl Actor { Ok(()) } - /// Send pending proposals for the purposes of UI updates. - /// Filters out the Identities, as they are an implementation detail inside of - /// the actor - async fn send_pending_proposals(&self) -> Result<()> { - let pending_proposal = self - .current_pending_proposals - .iter() - .map(|(order_id, (update_cfd, _))| (*order_id, (update_cfd.clone()))) - .collect::(); - let _ = self - .projection_actor - .send(projection::Update(pending_proposal)) - .await?; - Ok(()) - } - /// Removes a proposal and updates the update cfd proposals' feed async fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> { - if self.current_pending_proposals.remove(order_id).is_none() { - anyhow::bail!("Could not find proposal with order id: {}", &order_id) + let removed_proposal = self.current_pending_proposals.remove(order_id); + + // Strip the identity, ID doesn't care about this implementation detail + let removed_proposal = removed_proposal.map(|(proposal, _)| proposal); + + if let Some(removed_proposal) = removed_proposal { + match removed_proposal { + UpdateCfdProposal::Settlement { .. } => { + self.projection_actor + .send(UpdateSettlementProposal { + order: *order_id, + proposal: None, + }) + .await?? + } + UpdateCfdProposal::RollOverProposal { .. } => { + self.projection_actor + .send(UpdateRollOverProposal { + order: *order_id, + proposal: None, + }) + .await?? + } + } + } else { + anyhow::bail!("Could not find proposal with order id: {}", &order_id); } - self.send_pending_proposals().await?; Ok(()) } diff --git a/daemon/src/projection.rs b/daemon/src/projection.rs index a04c659..f5fefd2 100644 --- a/daemon/src/projection.rs +++ b/daemon/src/projection.rs @@ -1,8 +1,12 @@ use std::collections::HashMap; -use crate::model::cfd::{Cfd as ModelCfd, OrderId, Role, SettlementKind, UpdateCfdProposal}; +use crate::model::cfd::{ + Cfd as ModelCfd, OrderId, Role, RollOverProposal, SettlementKind, SettlementProposal, + UpdateCfdProposal, +}; use crate::model::{Leverage, Position, Timestamp, TradingPair}; use crate::{bitmex_price_feed, model, tx, Order, UpdateCfdProposals}; +use anyhow::Result; use bdk::bitcoin::{Amount, Network, SignedAmount}; use itertools::Itertools; use rust_decimal::Decimal; @@ -11,6 +15,22 @@ use time::OffsetDateTime; use tokio::sync::watch; use xtra_productivity::xtra_productivity; +/// Amend a given settlement proposal (if `proposal.is_none()`, it should be removed) +pub struct UpdateSettlementProposal { + pub order: OrderId, + pub proposal: Option<(SettlementProposal, SettlementKind)>, +} + +/// Amend a given rollover proposal (if `proposal.is_none()`, it should be removed) +pub struct UpdateRollOverProposal { + pub order: OrderId, + pub proposal: Option<(RollOverProposal, SettlementKind)>, +} + +/// Store the latest state of `T` for display purposes +/// (replaces previously stored values) +pub struct Update(pub T); + pub struct Actor { tx: Tx, state: State, @@ -95,8 +115,14 @@ impl State { temp.into() } - pub fn update_proposals(&mut self, proposals: UpdateCfdProposals) { - let _ = std::mem::replace(&mut self.proposals, proposals); + pub fn amend_settlement_proposal(&mut self, proposal: UpdateSettlementProposal) -> Result<()> { + let order = proposal.order; + self.amend_cfd_proposal(order, proposal.into()) + } + + pub fn amend_rollover_proposal(&mut self, proposal: UpdateRollOverProposal) -> Result<()> { + let order = proposal.order; + self.amend_cfd_proposal(order, proposal.into()) } pub fn update_quote(&mut self, quote: bitmex_price_feed::Quote) { @@ -106,9 +132,24 @@ impl State { pub fn update_cfds(&mut self, cfds: Vec) { let _ = std::mem::replace(&mut self.cfds, cfds); } -} -pub struct Update(pub T); + fn amend_cfd_proposal( + &mut self, + order: OrderId, + proposal: Option, + ) -> Result<()> { + if let Some(proposal) = proposal { + self.proposals.insert(order, proposal); + tracing::trace!(%order, "Cfd proposal got updated"); + } else { + if self.proposals.remove(&order).is_none() { + anyhow::bail!("Could not find proposal with order id: {}", &order) + } + tracing::trace!(%order, "Removed cfd proposal"); + }; + Ok(()) + } +} #[xtra_productivity] impl Actor { @@ -125,16 +166,22 @@ impl Actor { self.state.update_cfds(msg.0); let _ = self.tx.cfds.send(self.state.to_cfds()); } - fn handle(&mut self, msg: Update) { - self.state.update_proposals(msg.0); - let _ = self.tx.cfds.send(self.state.to_cfds()); - } fn handle(&mut self, msg: Update>) { let _ = self .tx .connected_takers .send(msg.0.iter().map(|x| x.into()).collect_vec()); } + fn handle(&mut self, msg: UpdateSettlementProposal) -> Result<()> { + self.state.amend_settlement_proposal(msg)?; + let _ = self.tx.cfds.send(self.state.to_cfds()); + Ok(()) + } + fn handle(&mut self, msg: UpdateRollOverProposal) -> Result<()> { + self.state.amend_rollover_proposal(msg)?; + let _ = self.tx.cfds.send(self.state.to_cfds()); + Ok(()) + } } impl xtra::Actor for Actor {} @@ -578,3 +625,65 @@ mod tests { assert_eq!(json, "\"SetupFailed\""); } } + +pub fn try_into_update_settlement_proposal( + cfd_update_proposal: UpdateCfdProposal, +) -> Result { + match cfd_update_proposal { + UpdateCfdProposal::Settlement { + proposal, + direction, + } => Ok(UpdateSettlementProposal { + order: proposal.order_id, + proposal: Some((proposal, direction)), + }), + UpdateCfdProposal::RollOverProposal { .. } => { + anyhow::bail!("Can't convert a RollOver proposal") + } + } +} + +pub fn try_into_update_rollover_proposal( + cfd_update_proposal: UpdateCfdProposal, +) -> Result { + match cfd_update_proposal { + UpdateCfdProposal::RollOverProposal { + proposal, + direction, + } => Ok(UpdateRollOverProposal { + order: proposal.order_id, + proposal: Some((proposal, direction)), + }), + UpdateCfdProposal::Settlement { .. } => { + anyhow::bail!("Can't convert a Settlement proposal") + } + } +} + +impl From for Option { + fn from(proposal: UpdateSettlementProposal) -> Self { + let UpdateSettlementProposal { order: _, proposal } = proposal; + if let Some((proposal, kind)) = proposal { + Some(UpdateCfdProposal::Settlement { + proposal, + direction: kind, + }) + } else { + None + } + } +} + +impl From for Option { + fn from(proposal: UpdateRollOverProposal) -> Self { + let UpdateRollOverProposal { order: _, proposal } = proposal; + if let Some((proposal, kind)) = proposal { + Some(UpdateCfdProposal::RollOverProposal { + proposal, + direction: kind, + }) + } else { + None + } + } +} diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index fc21f80..ddc6a15 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -8,6 +8,10 @@ use crate::model::cfd::{ }; use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd}; use crate::monitor::{self, MonitorParams}; +use crate::projection::{ + try_into_update_rollover_proposal, try_into_update_settlement_proposal, UpdateRollOverProposal, + UpdateSettlementProposal, +}; use crate::setup_contract::RolloverParams; use crate::tokio_ext::FutureExt; use crate::wire::RollOverMsg; @@ -106,19 +110,32 @@ where } impl Actor { - async fn send_pending_update_proposals(&self) -> Result<()> { - Ok(self - .projection_actor - .send(projection::Update(self.current_pending_proposals.clone())) - .await?) - } - /// Removes a proposal and updates the update cfd proposals' feed async fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> { - if self.current_pending_proposals.remove(order_id).is_none() { - anyhow::bail!("Could not find proposal with order id: {}", &order_id) + let removed_proposal = self.current_pending_proposals.remove(order_id); + + if let Some(removed_proposal) = removed_proposal { + match removed_proposal { + UpdateCfdProposal::Settlement { .. } => { + self.projection_actor + .send(UpdateSettlementProposal { + order: *order_id, + proposal: None, + }) + .await?? + } + UpdateCfdProposal::RollOverProposal { .. } => { + self.projection_actor + .send(UpdateRollOverProposal { + order: *order_id, + proposal: None, + }) + .await?? + } + } + } else { + anyhow::bail!("Could not find proposal with order id: {}", &order_id); } - self.send_pending_update_proposals().await?; Ok(()) } @@ -162,14 +179,16 @@ where timestamp: Timestamp::now(), }; - self.current_pending_proposals.insert( - proposal.order_id, - UpdateCfdProposal::RollOverProposal { - proposal: proposal.clone(), - direction: SettlementKind::Outgoing, - }, - ); - self.send_pending_update_proposals().await?; + let new_proposal = UpdateCfdProposal::RollOverProposal { + proposal: proposal.clone(), + direction: SettlementKind::Outgoing, + }; + + self.current_pending_proposals + .insert(proposal.order_id, new_proposal.clone()); + self.projection_actor + .send(try_into_update_rollover_proposal(new_proposal)?) + .await??; self.conn_actor .send(wire::TakerToMaker::ProposeRollOver { @@ -212,14 +231,16 @@ impl Actor { ) } - self.current_pending_proposals.insert( - proposal.order_id, - UpdateCfdProposal::Settlement { - proposal: proposal.clone(), - direction: SettlementKind::Outgoing, - }, - ); - self.send_pending_update_proposals().await?; + let new_proposal = UpdateCfdProposal::Settlement { + proposal: proposal.clone(), + direction: SettlementKind::Outgoing, + }; + + self.current_pending_proposals + .insert(proposal.order_id, new_proposal.clone()); + self.projection_actor + .send(try_into_update_settlement_proposal(new_proposal)?) + .await??; self.conn_actor .send(wire::TakerToMaker::ProposeSettlement {