From eb33172e9f12fd96363c3df97f01a3c3abd4ce96 Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Wed, 6 Oct 2021 15:51:59 +1030 Subject: [PATCH] Handle collab settlement response from the maker - store TakerId for proposals to be able to respond to specific taker about settlement proposal when the user takes action - send the collab settlement response (accept or reject) from the maker - handle rejection (remove from the map), CFD state in the UI returns to "Open" --- daemon/src/maker_cfd.rs | 160 ++++++++++++++++++---------- daemon/src/maker_inc_connections.rs | 10 ++ daemon/src/taker_cfd.rs | 37 +++++++ daemon/src/wire.rs | 4 + 4 files changed, 157 insertions(+), 54 deletions(-) diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 00bd933..144f926 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -85,7 +85,8 @@ pub struct Actor { setup_state: SetupState, latest_announcements: Option>, oracle_actor: Address>>, - current_pending_proposals: UpdateCfdProposals, + // Maker needs to also store TakerId to be able to send a reply back + current_pending_proposals: HashMap, } enum SetupState { @@ -126,10 +127,33 @@ impl Actor { } } + /// Send pending proposals for the purposes of UI updates. + /// Filters out the TakerIds, as they are an implementation detail inside of + /// the actor fn send_pending_proposals(&self) -> Result<()> { - Ok(self - .update_cfd_feed_sender - .send(self.current_pending_proposals.clone())?) + Ok(self.update_cfd_feed_sender.send( + self.current_pending_proposals + .iter() + .map(|(order_id, (update_cfd, _))| (*order_id, (update_cfd.clone()))) + .collect(), + )?) + } + + fn get_taker_id_of_proposal(&self, order_id: &OrderId) -> Result { + let (_, taker_id) = self + .current_pending_proposals + .get(order_id) + .context("Could not find proposal for given order id")?; + Ok(*taker_id) + } + + /// Removes a proposal and updates the update cfd proposals' feed + fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> { + if self.current_pending_proposals.remove(order_id).is_none() { + anyhow::bail!("Could not find proposal with order id: {}", &order_id) + } + self.send_pending_proposals()?; + Ok(()) } async fn handle_new_order( @@ -193,35 +217,50 @@ impl Actor { Ok(()) } - async fn handle_propose_settlement(&mut self, proposal: SettlementProposal) -> Result<()> { + async fn handle_propose_settlement( + &mut self, + taker_id: TakerId, + proposal: SettlementProposal, + ) -> Result<()> { tracing::info!( "Received settlement proposal from the taker: {:?}", proposal ); self.current_pending_proposals.insert( proposal.order_id, - UpdateCfdProposal::Settlement { - proposal, - direction: SettlementKind::Incoming, - }, + ( + UpdateCfdProposal::Settlement { + proposal, + direction: SettlementKind::Incoming, + }, + taker_id, + ), ); self.send_pending_proposals()?; Ok(()) } - async fn handle_propose_roll_over(&mut self, proposal: RollOverProposal) -> Result<()> { + async fn handle_propose_roll_over( + &mut self, + taker_id: TakerId, + proposal: RollOverProposal, + ) -> Result<()> { tracing::info!( - "Received proposal from the taker: {:?} to roll over order {}", + "Received proposal from the taker {}: {:?} to roll over order {}", + taker_id, proposal, proposal.order_id ); self.current_pending_proposals.insert( proposal.order_id, - UpdateCfdProposal::RollOverProposal { - proposal, - direction: SettlementKind::Incoming, - }, + ( + UpdateCfdProposal::RollOverProposal { + proposal, + direction: SettlementKind::Incoming, + }, + taker_id, + ), ); self.send_pending_proposals()?; @@ -516,47 +555,57 @@ impl Actor { async fn handle_accept_settlement(&mut self, order_id: OrderId) -> Result<()> { tracing::debug!(%order_id, "Maker accepts a settlement proposal" ); - // TODO: Initiate the settlement - self.current_pending_proposals - .remove(&order_id) - .context("Could not find proposal for given order id")?; - self.send_pending_proposals()?; + let taker_id = self.get_taker_id_of_proposal(&order_id)?; + + // TODO: Initiate the settlement - should we start calculating the + // signature here? + + self.takers + .do_send_async(maker_inc_connections::TakerMessage { + taker_id, + command: TakerCommand::NotifySettlementAccepted { id: order_id }, + }) + .await?; + + self.remove_pending_proposal(&order_id) + .context("accepted settlement")?; Ok(()) } async fn handle_reject_settlement(&mut self, order_id: OrderId) -> Result<()> { tracing::debug!(%order_id, "Maker rejects a settlement proposal" ); - // TODO: Handle rejection offer: - // - notify the taker that the settlement was rejected - self.current_pending_proposals - .remove(&order_id) - .context("Could not find proposal for given order id")?; - self.send_pending_proposals()?; + let taker_id = self.get_taker_id_of_proposal(&order_id)?; + + self.takers + .do_send_async(maker_inc_connections::TakerMessage { + taker_id, + command: TakerCommand::NotifySettlementRejected { id: order_id }, + }) + .await?; + + self.remove_pending_proposal(&order_id) + .context("rejected settlement")?; Ok(()) } async fn handle_accept_roll_over(&mut self, order_id: OrderId) -> Result<()> { - tracing::debug!(%order_id, "Maker accepts a roll over proposal" ); + tracing::debug!(%order_id, "Maker accepts a rollover proposal" ); // TODO: Initiate the roll over logic - self.current_pending_proposals - .remove(&order_id) - .context("Could not find roll over proposal for given order id")?; - self.send_pending_proposals()?; + self.remove_pending_proposal(&order_id) + .context("accepted rollover")?; Ok(()) } async fn handle_reject_roll_over(&mut self, order_id: OrderId) -> Result<()> { - tracing::debug!(%order_id, "Maker rejects a roll over proposal" ); - // TODO: Handle rejection and notify the taker that the roll over was rejected + tracing::debug!(%order_id, "Maker rejects a rollover proposal" ); + // TODO: Handle rejection and notify the taker that the rollover was rejected - self.current_pending_proposals - .remove(&order_id) - .context("Could not find roll over proposal for given order id")?; - self.send_pending_proposals()?; + self.remove_pending_proposal(&order_id) + .context("rejected rollover")?; Ok(()) } @@ -726,16 +775,13 @@ impl Handler for Actor { #[async_trait] impl Handler for Actor { async fn handle(&mut self, msg: TakerStreamMessage, _ctx: &mut Context) -> KeepRunning { - let TakerStreamMessage { - taker_id: taker, - item, - } = msg; + let TakerStreamMessage { taker_id, item } = msg; let msg = match item { Ok(msg) => msg, Err(e) => { tracing::warn!( "Error while receiving message from taker {}: {:#}", - taker, + taker_id, e ); return KeepRunning::Yes; @@ -744,7 +790,7 @@ impl Handler for Actor { match msg { wire::TakerToMaker::TakeOrder { order_id, quantity } => { - log_error!(self.handle_take_order(taker, order_id, quantity)) + log_error!(self.handle_take_order(taker_id, order_id, quantity)) } wire::TakerToMaker::ProposeSettlement { order_id, @@ -752,24 +798,30 @@ impl Handler for Actor { taker, maker, } => { - log_error!(self.handle_propose_settlement(SettlementProposal { - order_id, - timestamp, - taker, - maker - })) + log_error!(self.handle_propose_settlement( + taker_id, + SettlementProposal { + order_id, + timestamp, + taker, + maker + } + )) } wire::TakerToMaker::Protocol(msg) => { - log_error!(self.handle_inc_protocol_msg(taker, msg)) + log_error!(self.handle_inc_protocol_msg(taker_id, msg)) } TakerToMaker::ProposeRollOver { order_id, timestamp, } => { - log_error!(self.handle_propose_roll_over(RollOverProposal { - order_id, - timestamp, - })) + log_error!(self.handle_propose_roll_over( + taker_id, + RollOverProposal { + order_id, + timestamp, + } + )) } } diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index e840d8d..e4ab843 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -22,6 +22,8 @@ pub enum TakerCommand { NotifyInvalidOrderId { id: OrderId }, NotifyOrderAccepted { id: OrderId }, NotifyOrderRejected { id: OrderId }, + NotifySettlementAccepted { id: OrderId }, + NotifySettlementRejected { id: OrderId }, Protocol(wire::SetupMsg), } @@ -94,6 +96,14 @@ impl Actor { self.send_to_taker(msg.taker_id, wire::MakerToTaker::RejectOrder(id)) .await?; } + TakerCommand::NotifySettlementAccepted { id } => { + self.send_to_taker(msg.taker_id, wire::MakerToTaker::ConfirmSettlement(id)) + .await?; + } + TakerCommand::NotifySettlementRejected { id } => { + self.send_to_taker(msg.taker_id, wire::MakerToTaker::RejectSettlement(id)) + .await?; + } TakerCommand::Protocol(setup_msg) => { self.send_to_taker(msg.taker_id, wire::MakerToTaker::Protocol(setup_msg)) .await?; diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 82f3827..c14d198 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -107,6 +107,15 @@ impl Actor { .send(self.current_pending_proposals.clone())?) } + /// Removes a proposal and updates the update cfd proposals' feed + fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> { + if self.current_pending_proposals.remove(order_id).is_none() { + anyhow::bail!("Could not find proposal with order id: {}", &order_id) + } + self.send_pending_update_proposals()?; + Ok(()) + } + async fn handle_take_offer(&mut self, order_id: OrderId, quantity: Usd) -> Result<()> { let mut conn = self.db.acquire().await?; @@ -311,6 +320,28 @@ impl Actor { Ok(()) } + async fn handle_settlement_accepted( + &mut self, + order_id: OrderId, + _ctx: &mut Context, + ) -> Result<()> { + tracing::info!(%order_id, "Settlement proposal got accepted"); + + // TODO: Initiate collaborative settlement + + self.remove_pending_proposal(&order_id)?; + + Ok(()) + } + + async fn handle_settlement_rejected(&mut self, order_id: OrderId) -> Result<()> { + tracing::info!(%order_id, "Settlement proposal got rejected"); + + self.remove_pending_proposal(&order_id)?; + + Ok(()) + } + async fn handle_inc_protocol_msg(&mut self, msg: SetupMsg) -> Result<()> { match &mut self.setup_state { SetupState::Active { sender } => { @@ -530,6 +561,12 @@ impl Handler for Actor { wire::MakerToTaker::RejectOrder(order_id) => { log_error!(self.handle_order_rejected(order_id)) } + wire::MakerToTaker::ConfirmSettlement(order_id) => { + log_error!(self.handle_settlement_accepted(order_id, ctx)) + } + wire::MakerToTaker::RejectSettlement(order_id) => { + log_error!(self.handle_settlement_rejected(order_id)) + } wire::MakerToTaker::InvalidOrderId(_) => todo!(), wire::MakerToTaker::Protocol(setup_msg) => { log_error!(self.handle_inc_protocol_msg(setup_msg)) diff --git a/daemon/src/wire.rs b/daemon/src/wire.rs index e0f5dd2..070a601 100644 --- a/daemon/src/wire.rs +++ b/daemon/src/wire.rs @@ -58,6 +58,8 @@ pub enum MakerToTaker { CurrentOrder(Option), ConfirmOrder(OrderId), // TODO: Include payout curve in "accept" message from maker RejectOrder(OrderId), + ConfirmSettlement(OrderId), + RejectSettlement(OrderId), InvalidOrderId(OrderId), Protocol(SetupMsg), } @@ -68,6 +70,8 @@ impl fmt::Display for MakerToTaker { MakerToTaker::CurrentOrder(_) => write!(f, "CurrentOrder"), MakerToTaker::ConfirmOrder(_) => write!(f, "ConfirmOrder"), MakerToTaker::RejectOrder(_) => write!(f, "RejectOrder"), + MakerToTaker::ConfirmSettlement(_) => write!(f, "ConfirmSettlement"), + MakerToTaker::RejectSettlement(_) => write!(f, "RejectSettlement"), MakerToTaker::InvalidOrderId(_) => write!(f, "InvalidOrderId"), MakerToTaker::Protocol(_) => write!(f, "Protocol"), }