Browse Source

Pass changes in cfd proposals into projection actor

Instead of passing the whole HashMap to the projection actor, only send what has
actually changed.

Provide separate messages to update settlement proposal and roll over proposals,
as they will be sent from different actors.
no-buy-button-while-setting-up-cfd
Mariusz Klochowicz 3 years ago
parent
commit
2ee97c41fe
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 90
      daemon/src/maker_cfd.rs
  2. 127
      daemon/src/projection.rs
  3. 65
      daemon/src/taker_cfd.rs

90
daemon/src/maker_cfd.rs

@ -6,12 +6,14 @@ use crate::model::cfd::{
}; };
use crate::model::{Identity, Price, Timestamp, Usd}; use crate::model::{Identity, Price, Timestamp, Usd};
use crate::monitor::MonitorParams; use crate::monitor::MonitorParams;
use crate::projection::Update; use crate::projection::{
try_into_update_rollover_proposal, try_into_update_settlement_proposal, Update,
UpdateRollOverProposal, UpdateSettlementProposal,
};
use crate::setup_contract::{RolloverParams, SetupParams}; use crate::setup_contract::{RolloverParams, SetupParams};
use crate::tokio_ext::FutureExt; use crate::tokio_ext::FutureExt;
use crate::{ use crate::{
log_error, maker_inc_connections, monitor, oracle, projection, setup_contract, wallet, wire, log_error, maker_inc_connections, monitor, oracle, projection, setup_contract, wallet, wire,
UpdateCfdProposals,
}; };
use anyhow::{Context as _, Result}; use anyhow::{Context as _, Result};
use async_trait::async_trait; use async_trait::async_trait;
@ -176,17 +178,16 @@ impl<O, M, T, W> Actor<O, M, T, W> {
} }
}; };
self.current_pending_proposals.insert( let new_proposal = UpdateCfdProposal::RollOverProposal {
proposal.order_id, proposal: proposal.clone(),
(
UpdateCfdProposal::RollOverProposal {
proposal,
direction: SettlementKind::Incoming, direction: SettlementKind::Incoming,
}, };
taker_id,
), self.current_pending_proposals
); .insert(proposal.order_id, (new_proposal.clone(), taker_id));
self.send_pending_proposals().await?; self.projection_actor
.send(try_into_update_rollover_proposal(new_proposal)?)
.await??;
Ok(()) Ok(())
} }
@ -200,17 +201,17 @@ impl<O, M, T, W> Actor<O, M, T, W> {
"Received settlement proposal from the taker: {:?}", "Received settlement proposal from the taker: {:?}",
proposal proposal
); );
self.current_pending_proposals.insert(
proposal.order_id, let new_proposal = UpdateCfdProposal::Settlement {
( proposal: proposal.clone(),
UpdateCfdProposal::Settlement {
proposal,
direction: SettlementKind::Incoming, direction: SettlementKind::Incoming,
}, };
taker_id,
), self.current_pending_proposals
); .insert(proposal.order_id, (new_proposal.clone(), taker_id));
self.send_pending_proposals().await?; self.projection_actor
.send(try_into_update_settlement_proposal(new_proposal)?)
.await??;
Ok(()) Ok(())
} }
@ -253,28 +254,35 @@ impl<O, M, T, W> Actor<O, M, T, W> {
Ok(()) Ok(())
} }
/// Send pending proposals for the purposes of UI updates.
/// Filters out the Identities, as they are an implementation detail inside of
/// the actor
async fn send_pending_proposals(&self) -> Result<()> {
let pending_proposal = self
.current_pending_proposals
.iter()
.map(|(order_id, (update_cfd, _))| (*order_id, (update_cfd.clone())))
.collect::<UpdateCfdProposals>();
let _ = self
.projection_actor
.send(projection::Update(pending_proposal))
.await?;
Ok(())
}
/// Removes a proposal and updates the update cfd proposals' feed /// Removes a proposal and updates the update cfd proposals' feed
async fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> { async fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> {
if self.current_pending_proposals.remove(order_id).is_none() { let removed_proposal = self.current_pending_proposals.remove(order_id);
anyhow::bail!("Could not find proposal with order id: {}", &order_id)
// Strip the identity, ID doesn't care about this implementation detail
let removed_proposal = removed_proposal.map(|(proposal, _)| proposal);
if let Some(removed_proposal) = removed_proposal {
match removed_proposal {
UpdateCfdProposal::Settlement { .. } => {
self.projection_actor
.send(UpdateSettlementProposal {
order: *order_id,
proposal: None,
})
.await??
}
UpdateCfdProposal::RollOverProposal { .. } => {
self.projection_actor
.send(UpdateRollOverProposal {
order: *order_id,
proposal: None,
})
.await??
}
}
} else {
anyhow::bail!("Could not find proposal with order id: {}", &order_id);
} }
self.send_pending_proposals().await?;
Ok(()) Ok(())
} }

127
daemon/src/projection.rs

@ -1,8 +1,12 @@
use std::collections::HashMap; use std::collections::HashMap;
use crate::model::cfd::{Cfd as ModelCfd, OrderId, Role, SettlementKind, UpdateCfdProposal}; use crate::model::cfd::{
Cfd as ModelCfd, OrderId, Role, RollOverProposal, SettlementKind, SettlementProposal,
UpdateCfdProposal,
};
use crate::model::{Leverage, Position, Timestamp, TradingPair}; use crate::model::{Leverage, Position, Timestamp, TradingPair};
use crate::{bitmex_price_feed, model, tx, Order, UpdateCfdProposals}; use crate::{bitmex_price_feed, model, tx, Order, UpdateCfdProposals};
use anyhow::Result;
use bdk::bitcoin::{Amount, Network, SignedAmount}; use bdk::bitcoin::{Amount, Network, SignedAmount};
use itertools::Itertools; use itertools::Itertools;
use rust_decimal::Decimal; use rust_decimal::Decimal;
@ -11,6 +15,22 @@ use time::OffsetDateTime;
use tokio::sync::watch; use tokio::sync::watch;
use xtra_productivity::xtra_productivity; use xtra_productivity::xtra_productivity;
/// Amend a given settlement proposal (if `proposal.is_none()`, it should be removed)
pub struct UpdateSettlementProposal {
pub order: OrderId,
pub proposal: Option<(SettlementProposal, SettlementKind)>,
}
/// Amend a given rollover proposal (if `proposal.is_none()`, it should be removed)
pub struct UpdateRollOverProposal {
pub order: OrderId,
pub proposal: Option<(RollOverProposal, SettlementKind)>,
}
/// Store the latest state of `T` for display purposes
/// (replaces previously stored values)
pub struct Update<T>(pub T);
pub struct Actor { pub struct Actor {
tx: Tx, tx: Tx,
state: State, state: State,
@ -95,8 +115,14 @@ impl State {
temp.into() temp.into()
} }
pub fn update_proposals(&mut self, proposals: UpdateCfdProposals) { pub fn amend_settlement_proposal(&mut self, proposal: UpdateSettlementProposal) -> Result<()> {
let _ = std::mem::replace(&mut self.proposals, proposals); let order = proposal.order;
self.amend_cfd_proposal(order, proposal.into())
}
pub fn amend_rollover_proposal(&mut self, proposal: UpdateRollOverProposal) -> Result<()> {
let order = proposal.order;
self.amend_cfd_proposal(order, proposal.into())
} }
pub fn update_quote(&mut self, quote: bitmex_price_feed::Quote) { pub fn update_quote(&mut self, quote: bitmex_price_feed::Quote) {
@ -106,9 +132,24 @@ impl State {
pub fn update_cfds(&mut self, cfds: Vec<ModelCfd>) { pub fn update_cfds(&mut self, cfds: Vec<ModelCfd>) {
let _ = std::mem::replace(&mut self.cfds, cfds); let _ = std::mem::replace(&mut self.cfds, cfds);
} }
}
pub struct Update<T>(pub T); fn amend_cfd_proposal(
&mut self,
order: OrderId,
proposal: Option<UpdateCfdProposal>,
) -> Result<()> {
if let Some(proposal) = proposal {
self.proposals.insert(order, proposal);
tracing::trace!(%order, "Cfd proposal got updated");
} else {
if self.proposals.remove(&order).is_none() {
anyhow::bail!("Could not find proposal with order id: {}", &order)
}
tracing::trace!(%order, "Removed cfd proposal");
};
Ok(())
}
}
#[xtra_productivity] #[xtra_productivity]
impl Actor { impl Actor {
@ -125,16 +166,22 @@ impl Actor {
self.state.update_cfds(msg.0); self.state.update_cfds(msg.0);
let _ = self.tx.cfds.send(self.state.to_cfds()); let _ = self.tx.cfds.send(self.state.to_cfds());
} }
fn handle(&mut self, msg: Update<UpdateCfdProposals>) {
self.state.update_proposals(msg.0);
let _ = self.tx.cfds.send(self.state.to_cfds());
}
fn handle(&mut self, msg: Update<Vec<model::Identity>>) { fn handle(&mut self, msg: Update<Vec<model::Identity>>) {
let _ = self let _ = self
.tx .tx
.connected_takers .connected_takers
.send(msg.0.iter().map(|x| x.into()).collect_vec()); .send(msg.0.iter().map(|x| x.into()).collect_vec());
} }
fn handle(&mut self, msg: UpdateSettlementProposal) -> Result<()> {
self.state.amend_settlement_proposal(msg)?;
let _ = self.tx.cfds.send(self.state.to_cfds());
Ok(())
}
fn handle(&mut self, msg: UpdateRollOverProposal) -> Result<()> {
self.state.amend_rollover_proposal(msg)?;
let _ = self.tx.cfds.send(self.state.to_cfds());
Ok(())
}
} }
impl xtra::Actor for Actor {} impl xtra::Actor for Actor {}
@ -578,3 +625,65 @@ mod tests {
assert_eq!(json, "\"SetupFailed\""); assert_eq!(json, "\"SetupFailed\"");
} }
} }
pub fn try_into_update_settlement_proposal(
cfd_update_proposal: UpdateCfdProposal,
) -> Result<UpdateSettlementProposal> {
match cfd_update_proposal {
UpdateCfdProposal::Settlement {
proposal,
direction,
} => Ok(UpdateSettlementProposal {
order: proposal.order_id,
proposal: Some((proposal, direction)),
}),
UpdateCfdProposal::RollOverProposal { .. } => {
anyhow::bail!("Can't convert a RollOver proposal")
}
}
}
pub fn try_into_update_rollover_proposal(
cfd_update_proposal: UpdateCfdProposal,
) -> Result<UpdateRollOverProposal> {
match cfd_update_proposal {
UpdateCfdProposal::RollOverProposal {
proposal,
direction,
} => Ok(UpdateRollOverProposal {
order: proposal.order_id,
proposal: Some((proposal, direction)),
}),
UpdateCfdProposal::Settlement { .. } => {
anyhow::bail!("Can't convert a Settlement proposal")
}
}
}
impl From<UpdateSettlementProposal> for Option<UpdateCfdProposal> {
fn from(proposal: UpdateSettlementProposal) -> Self {
let UpdateSettlementProposal { order: _, proposal } = proposal;
if let Some((proposal, kind)) = proposal {
Some(UpdateCfdProposal::Settlement {
proposal,
direction: kind,
})
} else {
None
}
}
}
impl From<UpdateRollOverProposal> for Option<UpdateCfdProposal> {
fn from(proposal: UpdateRollOverProposal) -> Self {
let UpdateRollOverProposal { order: _, proposal } = proposal;
if let Some((proposal, kind)) = proposal {
Some(UpdateCfdProposal::RollOverProposal {
proposal,
direction: kind,
})
} else {
None
}
}
}

65
daemon/src/taker_cfd.rs

@ -8,6 +8,10 @@ use crate::model::cfd::{
}; };
use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd}; use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd};
use crate::monitor::{self, MonitorParams}; use crate::monitor::{self, MonitorParams};
use crate::projection::{
try_into_update_rollover_proposal, try_into_update_settlement_proposal, UpdateRollOverProposal,
UpdateSettlementProposal,
};
use crate::setup_contract::RolloverParams; use crate::setup_contract::RolloverParams;
use crate::tokio_ext::FutureExt; use crate::tokio_ext::FutureExt;
use crate::wire::RollOverMsg; use crate::wire::RollOverMsg;
@ -106,19 +110,32 @@ where
} }
impl<O, M, W> Actor<O, M, W> { impl<O, M, W> Actor<O, M, W> {
async fn send_pending_update_proposals(&self) -> Result<()> {
Ok(self
.projection_actor
.send(projection::Update(self.current_pending_proposals.clone()))
.await?)
}
/// Removes a proposal and updates the update cfd proposals' feed /// Removes a proposal and updates the update cfd proposals' feed
async fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> { async fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> {
if self.current_pending_proposals.remove(order_id).is_none() { let removed_proposal = self.current_pending_proposals.remove(order_id);
anyhow::bail!("Could not find proposal with order id: {}", &order_id)
if let Some(removed_proposal) = removed_proposal {
match removed_proposal {
UpdateCfdProposal::Settlement { .. } => {
self.projection_actor
.send(UpdateSettlementProposal {
order: *order_id,
proposal: None,
})
.await??
}
UpdateCfdProposal::RollOverProposal { .. } => {
self.projection_actor
.send(UpdateRollOverProposal {
order: *order_id,
proposal: None,
})
.await??
}
}
} else {
anyhow::bail!("Could not find proposal with order id: {}", &order_id);
} }
self.send_pending_update_proposals().await?;
Ok(()) Ok(())
} }
@ -162,14 +179,16 @@ where
timestamp: Timestamp::now(), timestamp: Timestamp::now(),
}; };
self.current_pending_proposals.insert( let new_proposal = UpdateCfdProposal::RollOverProposal {
proposal.order_id,
UpdateCfdProposal::RollOverProposal {
proposal: proposal.clone(), proposal: proposal.clone(),
direction: SettlementKind::Outgoing, direction: SettlementKind::Outgoing,
}, };
);
self.send_pending_update_proposals().await?; self.current_pending_proposals
.insert(proposal.order_id, new_proposal.clone());
self.projection_actor
.send(try_into_update_rollover_proposal(new_proposal)?)
.await??;
self.conn_actor self.conn_actor
.send(wire::TakerToMaker::ProposeRollOver { .send(wire::TakerToMaker::ProposeRollOver {
@ -212,14 +231,16 @@ impl<O, M, W> Actor<O, M, W> {
) )
} }
self.current_pending_proposals.insert( let new_proposal = UpdateCfdProposal::Settlement {
proposal.order_id,
UpdateCfdProposal::Settlement {
proposal: proposal.clone(), proposal: proposal.clone(),
direction: SettlementKind::Outgoing, direction: SettlementKind::Outgoing,
}, };
);
self.send_pending_update_proposals().await?; self.current_pending_proposals
.insert(proposal.order_id, new_proposal.clone());
self.projection_actor
.send(try_into_update_settlement_proposal(new_proposal)?)
.await??;
self.conn_actor self.conn_actor
.send(wire::TakerToMaker::ProposeSettlement { .send(wire::TakerToMaker::ProposeSettlement {

Loading…
Cancel
Save