From 810011d645cc9e21bf0f946ec5c7a3ae5002861c Mon Sep 17 00:00:00 2001 From: rishflab Date: Mon, 6 Dec 2021 19:16:30 +1100 Subject: [PATCH] Move rollover logic to dedicated actor The rollover actor is spawned the the maker receives a rollover proposal message from the taker. After the rollover is complete, it sends itself a completion message triggering a "cleanup" handler. Cleanup involves sending messages to maker_cfd actor and the maker_inc_connections to trigger removal of stored references to the rollover actor. --- daemon/src/lib.rs | 5 +- daemon/src/maker_cfd.rs | 403 +++++++++------------------- daemon/src/maker_inc_connections.rs | 22 ++ daemon/src/rollover_maker.rs | 317 ++++++++++++++++++++++ daemon/src/rollover_taker.rs | 5 +- daemon/src/wire.rs | 7 +- 6 files changed, 478 insertions(+), 281 deletions(-) create mode 100644 daemon/src/rollover_maker.rs diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 053378d..3b10ca9 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -49,6 +49,7 @@ pub mod olivia; pub mod oracle; pub mod payout_curve; pub mod projection; +pub mod rollover_maker; pub mod rollover_taker; pub mod routes; pub mod seed; @@ -122,7 +123,9 @@ where + xtra::Handler + xtra::Handler> + xtra::Handler - + xtra::Handler>, + + xtra::Handler> + + xtra::Handler> + + xtra::Handler, W: xtra::Handler + xtra::Handler + xtra::Handler, diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 604646b..137d247 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -10,15 +10,11 @@ use crate::maker_inc_connections; use crate::model::cfd::Cfd; use crate::model::cfd::CfdState; use crate::model::cfd::CfdStateCommon; -use crate::model::cfd::Dlc; use crate::model::cfd::Order; use crate::model::cfd::OrderId; use crate::model::cfd::Origin; -use crate::model::cfd::Role; use crate::model::cfd::RollOverProposal; -use crate::model::cfd::SettlementKind; use crate::model::cfd::SettlementProposal; -use crate::model::cfd::UpdateCfdProposal; use crate::model::Identity; use crate::model::Price; use crate::model::Timestamp; @@ -27,14 +23,10 @@ use crate::monitor::MonitorParams; use crate::monitor::{self}; use crate::oracle; use crate::projection; -use crate::projection::try_into_update_rollover_proposal; use crate::projection::Update; -use crate::projection::UpdateRollOverProposal; -use crate::projection::UpdateSettlementProposal; -use crate::setup_contract; -use crate::setup_contract::RolloverParams; +use crate::rollover_maker; +use crate::rollover_maker::Completed; use crate::setup_maker; -use crate::tokio_ext::FutureExt; use crate::wallet; use crate::wire; use crate::wire::TakerToMaker; @@ -43,13 +35,8 @@ use anyhow::Context as _; use anyhow::Result; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; -use futures::channel::mpsc; -use futures::future; -use futures::future::RemoteHandle; -use futures::SinkExt; use sqlx::pool::PoolConnection; use sqlx::Sqlite; -use std::collections::HashMap; use std::collections::HashSet; use time::Duration; use xtra::prelude::*; @@ -74,6 +61,10 @@ pub struct AcceptRollOver { pub struct RejectRollOver { pub order_id: OrderId, } +pub struct RollOverProposed { + pub order_id: OrderId, + pub address: xtra::Address, +} pub struct Commit { pub order_id: OrderId, } @@ -92,11 +83,6 @@ pub struct TakerDisconnected { pub id: Identity, } -pub struct CfdRollOverCompleted { - pub order_id: OrderId, - pub dlc: Result, -} - pub struct FromTaker { pub taker_id: Identity, pub msg: wire::TakerToMaker, @@ -108,29 +94,18 @@ pub struct Actor { settlement_interval: Duration, oracle_pk: schnorrsig::PublicKey, projection_actor: Address, + rollover_actors: AddressMap, takers: Address, current_order: Option, monitor_actor: Address, setup_actors: AddressMap, settlement_actors: AddressMap, - roll_over_state: RollOverState, oracle_actor: Address, - // Maker needs to also store Identity to be able to send a reply back - current_pending_proposals: HashMap, connected_takers: HashSet, n_payouts: usize, tasks: Tasks, } -enum RollOverState { - Active { - taker: Identity, - sender: mpsc::UnboundedSender, - _task: RemoteHandle<()>, - }, - None, -} - impl Actor { #[allow(clippy::too_many_arguments)] pub fn new( @@ -150,13 +125,12 @@ impl Actor { settlement_interval, oracle_pk, projection_actor, + rollover_actors: AddressMap::default(), takers, current_order: None, monitor_actor, setup_actors: AddressMap::default(), - roll_over_state: RollOverState::None, oracle_actor, - current_pending_proposals: HashMap::new(), n_payouts, connected_takers: HashSet::new(), settlement_actors: AddressMap::default(), @@ -164,95 +138,6 @@ impl Actor { } } - async fn handle_propose_roll_over( - &mut self, - proposal: RollOverProposal, - taker_id: Identity, - ) -> Result<()> { - tracing::info!( - "Received proposal from the taker {}: {:?} to roll over order {}", - taker_id, - proposal, - proposal.order_id - ); - - // check if CFD is in open state, otherwise we should not proceed - let mut conn = self.db.acquire().await?; - let cfd = load_cfd_by_order_id(proposal.order_id, &mut conn).await?; - match cfd { - Cfd { - state: CfdState::Open { .. }, - .. - } => (), - _ => { - anyhow::bail!("Order is in invalid state. Cannot propose roll over.") - } - }; - - let new_proposal = UpdateCfdProposal::RollOverProposal { - proposal: proposal.clone(), - direction: SettlementKind::Incoming, - }; - - self.current_pending_proposals - .insert(proposal.order_id, (new_proposal.clone(), taker_id)); - self.projection_actor - .send(try_into_update_rollover_proposal(new_proposal)?) - .await?; - - Ok(()) - } - - async fn handle_inc_roll_over_protocol_msg( - &mut self, - taker_id: Identity, - msg: wire::RollOverMsg, - ) -> Result<()> { - match &mut self.roll_over_state { - RollOverState::Active { taker, sender, .. } if taker_id == *taker => { - sender.send(msg).await?; - } - RollOverState::Active { taker, .. } => { - anyhow::bail!("Currently rolling over with different taker {}", taker) - } - RollOverState::None => {} - } - - Ok(()) - } - - /// Removes a proposal and updates the update cfd proposals' feed - async fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> { - let removed_proposal = self.current_pending_proposals.remove(order_id); - - // Strip the identity, ID doesn't care about this implementation detail - let removed_proposal = removed_proposal.map(|(proposal, _)| proposal); - - if let Some(removed_proposal) = removed_proposal { - match removed_proposal { - UpdateCfdProposal::Settlement { .. } => { - self.projection_actor - .send(UpdateSettlementProposal { - order: *order_id, - proposal: None, - }) - .await? - } - UpdateCfdProposal::RollOverProposal { .. } => { - self.projection_actor - .send(UpdateRollOverProposal { - order: *order_id, - proposal: None, - }) - .await? - } - } - } else { - anyhow::bail!("Could not find proposal with order id: {}", &order_id); - } - Ok(()) - } - async fn update_connected_takers(&mut self) -> Result<()> { self.projection_actor .send(Update( @@ -365,6 +250,111 @@ where } } +#[xtra_productivity] +impl Actor { + async fn handle_accept_rollover(&mut self, msg: AcceptRollOver) -> Result<()> { + if self + .rollover_actors + .send(&msg.order_id, rollover_maker::AcceptRollOver) + .await + .is_err() + { + tracing::warn!(%msg.order_id, "No active rollover"); + } + + Ok(()) + } + + async fn handle_reject_rollover(&mut self, msg: RejectRollOver) -> Result<()> { + if self + .rollover_actors + .send(&msg.order_id, rollover_maker::RejectRollOver) + .await + .is_err() + { + tracing::warn!(%msg.order_id, "No active rollover"); + } + + Ok(()) + } +} + +impl Actor +where + O: xtra::Handler + xtra::Handler, + M: xtra::Handler, + T: xtra::Handler + + xtra::Handler> + + xtra::Handler, + W: 'static, + Self: xtra::Handler>, +{ + async fn handle_propose_roll_over( + &mut self, + proposal: RollOverProposal, + taker_id: Identity, + ctx: &mut Context, + ) -> Result<()> { + tracing::info!( + "Received proposal from the taker {}: {:?} to roll over order {}", + taker_id, + proposal, + proposal.order_id + ); + + // check if CFD is in open state, otherwise we should not proceed + let mut conn = self.db.acquire().await?; + let cfd = load_cfd_by_order_id(proposal.order_id, &mut conn).await?; + match cfd { + Cfd { + state: CfdState::Open { .. }, + .. + } => (), + _ => { + anyhow::bail!("Order is in invalid state. Cannot propose roll over.") + } + }; + + let this = ctx.address().expect("acquired own address"); + + let (rollover_actor_addr, rollover_actor_future) = rollover_maker::Actor::new( + &self.takers, + cfd, + taker_id, + self.oracle_pk, + &this, + &self.oracle_actor, + (&self.takers, &this), + self.projection_actor.clone(), + proposal.clone(), + self.n_payouts, + ) + .create(None) + .run(); + + self.tasks.add(rollover_actor_future); + + self.takers + .send(RollOverProposed { + order_id: proposal.order_id, + address: rollover_actor_addr.clone(), + }) + .await?; + + self.rollover_actors + .insert(proposal.order_id, rollover_actor_addr); + + Ok(()) + } +} + +#[xtra_productivity(message_impl = false)] +impl Actor { + async fn handle_rollover_actor_stopping(&mut self, msg: Stopping) { + self.rollover_actors.gc(msg); + } +} + impl Actor where O: xtra::Handler + xtra::Handler, @@ -569,41 +559,6 @@ where Ok(()) } - - async fn handle_reject_roll_over(&mut self, msg: RejectRollOver) -> Result<()> { - let RejectRollOver { order_id } = msg; - - tracing::debug!(%order_id, "Maker rejects a roll_over proposal" ); - - // Validate if order is actually being requested to be extended - let (_, taker_id) = match self.current_pending_proposals.get(&order_id) { - Some(( - UpdateCfdProposal::RollOverProposal { - proposal, - direction: SettlementKind::Incoming, - }, - taker_id, - )) => (proposal, *taker_id), - _ => { - anyhow::bail!("Order is in invalid state. Ignoring reject roll over request.") - } - }; - - // clean-up state ahead of sending to ensure consistency in case we fail to deliver the - // message - self.remove_pending_proposal(&order_id) - .await - .context("rejected roll_over")?; - - self.takers - .send(maker_inc_connections::TakerMessage { - taker_id, - msg: wire::MakerToTaker::RejectRollOver(order_id), - }) - .await??; - - Ok(()) - } } #[xtra_productivity] @@ -658,109 +613,6 @@ where } } -#[xtra_productivity] -impl Actor -where - Self: xtra::Handler, - O: xtra::Handler + xtra::Handler, - T: xtra::Handler, - W: xtra::Handler + xtra::Handler, -{ - async fn handle_accept_roll_over( - &mut self, - msg: AcceptRollOver, - ctx: &mut Context, - ) -> Result<()> { - let AcceptRollOver { order_id } = msg; - - tracing::debug!(%order_id, "Maker accepts a roll_over proposal" ); - - let mut conn = self.db.acquire().await?; - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - - // Validate if order is actually being requested to be extended - let (proposal, taker_id) = match self.current_pending_proposals.get(&order_id) { - Some(( - UpdateCfdProposal::RollOverProposal { - proposal, - direction: SettlementKind::Incoming, - }, - taker_id, - )) => (proposal, *taker_id), - _ => { - anyhow::bail!("Order is in invalid state. Ignoring trying to accept the roll over request it.") - } - }; - - let dlc = cfd.open_dlc().context("CFD was in wrong state")?; - - let oracle_event_id = oracle::next_announcement_after( - time::OffsetDateTime::now_utc() + cfd.settlement_interval, - )?; - let announcement = self - .oracle_actor - .send(oracle::GetAnnouncement(oracle_event_id)) - .await? - .with_context(|| format!("Announcement {} not found", oracle_event_id))?; - - self.takers - .send(maker_inc_connections::TakerMessage { - taker_id, - msg: wire::MakerToTaker::ConfirmRollOver { - order_id: proposal.order_id, - oracle_event_id, - }, - }) - .await??; - - let (sender, receiver) = mpsc::unbounded(); - let contract_future = setup_contract::roll_over( - self.takers.clone().into_sink().with(move |msg| { - future::ok(maker_inc_connections::TakerMessage { - taker_id, - msg: wire::MakerToTaker::RollOverProtocol { order_id, msg }, - }) - }), - receiver, - (self.oracle_pk, announcement), - RolloverParams::new( - cfd.price, - cfd.quantity_usd, - cfd.leverage, - cfd.refund_timelock_in_blocks(), - cfd.fee_rate, - ), - Role::Maker, - dlc, - self.n_payouts, - ); - - let this = ctx - .address() - .expect("actor to be able to give address to itself"); - - let task = async move { - let dlc = contract_future.await; - - this.send(CfdRollOverCompleted { order_id, dlc }) - .await - .expect("always connected to ourselves") - } - .spawn_with_handle(); - - self.roll_over_state = RollOverState::Active { - sender, - taker: taker_id, - _task: task, - }; - - self.remove_pending_proposal(&order_id) - .await - .context("accepted roll_over")?; - Ok(()) - } -} - #[xtra_productivity] impl Actor where @@ -782,13 +634,10 @@ where M: xtra::Handler, O: xtra::Handler, { - async fn handle_roll_over_completed( - &mut self, - order_id: OrderId, - dlc: Result, - ) -> Result<()> { - let dlc = dlc.context("Failed to roll over contract with taker")?; - self.roll_over_state = RollOverState::None; + async fn handle_roll_over_completed(&mut self, msg: Completed) -> Result<()> { + // We handle rollover success in the maker_cfd::Actor instead of the rollover_maker::Actor + // because we do not have access to the DB in the rollover_maker::Actor + let Completed { order_id, dlc } = msg; let mut conn = self.db.acquire().await?; let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; @@ -990,14 +839,13 @@ where } #[async_trait] -impl Handler - for Actor +impl Handler for Actor where M: xtra::Handler, O: xtra::Handler, { - async fn handle(&mut self, msg: CfdRollOverCompleted, _ctx: &mut Context) { - log_error!(self.handle_roll_over_completed(msg.order_id, msg.dlc)); + async fn handle(&mut self, msg: Completed, _ctx: &mut Context) { + log_error!(self.handle_roll_over_completed(msg)); } } @@ -1020,8 +868,10 @@ where + xtra::Handler + xtra::Handler + xtra::Handler> + + xtra::Handler> + xtra::Handler - + xtra::Handler>, + + xtra::Handler> + + xtra::Handler, W: xtra::Handler + xtra::Handler + xtra::Handler, @@ -1069,10 +919,11 @@ where timestamp, }, taker_id, + ctx )) } - wire::TakerToMaker::RollOverProtocol(msg) => { - log_error!(self.handle_inc_roll_over_protocol_msg(taker_id, msg)) + wire::TakerToMaker::RollOverProtocol { .. } => { + unreachable!("This kind of message should be sent to the rollover_maker::Actor`") } wire::TakerToMaker::Protocol { .. } => { unreachable!("This kind of message should be sent to the `setup_maker::Actor`") @@ -1103,7 +954,7 @@ impl Message for TakerDisconnected { type Result = (); } -impl Message for CfdRollOverCompleted { +impl Message for Completed { type Result = (); } diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index b84faf7..cc78c3b 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -10,6 +10,7 @@ use crate::model::cfd::OrderId; use crate::model::Identity; use crate::noise; use crate::noise::TransportStateExt; +use crate::rollover_maker; use crate::send_to_socket; use crate::setup_maker; use crate::tokio_ext::FutureExt; @@ -105,6 +106,7 @@ pub struct Actor { heartbeat_interval: Duration, setup_actors: AddressMap, settlement_actors: AddressMap, + rollover_actors: AddressMap, connection_tasks: HashMap, } @@ -125,6 +127,7 @@ impl Actor { heartbeat_interval, setup_actors: AddressMap::default(), settlement_actors: AddressMap::default(), + rollover_actors: AddressMap::default(), connection_tasks: HashMap::new(), } } @@ -330,6 +333,11 @@ impl Actor { self.drop_taker_connection(&taker_id).await; } + + async fn handle_rollover_proposed(&mut self, message: maker_cfd::RollOverProposed) { + self.rollover_actors + .insert(message.order_id, message.address); + } } #[xtra_productivity(message_impl = false)] @@ -345,6 +353,16 @@ impl Actor { tracing::error!(%order_id, "No active contract setup"); } }, + RollOverProtocol { order_id, msg } => { + if self + .rollover_actors + .send(&order_id, rollover_maker::ProtocolMsg(msg)) + .await + .is_err() + { + tracing::warn!(%order_id, "No active rollover actor") + } + } Settlement { order_id, msg: taker_to_maker::Settlement::Initiate { sig_taker }, @@ -368,6 +386,10 @@ impl Actor { self.setup_actors.gc(message); } + async fn handle_rollover_actor_stopping(&mut self, message: Stopping) { + self.rollover_actors.gc(message); + } + async fn handle_settlement_actor_stopping( &mut self, message: Stopping, diff --git a/daemon/src/rollover_maker.rs b/daemon/src/rollover_maker.rs new file mode 100644 index 0000000..0a8a403 --- /dev/null +++ b/daemon/src/rollover_maker.rs @@ -0,0 +1,317 @@ +use crate::address_map::ActorName; +use crate::maker_inc_connections; +use crate::maker_inc_connections::TakerMessage; +use crate::model::cfd::Dlc; +use crate::model::cfd::OrderId; +use crate::model::cfd::Role; +use crate::model::cfd::RollOverProposal; +use crate::model::cfd::SettlementKind; +use crate::model::cfd::UpdateCfdProposal; +use crate::model::Identity; +use crate::oracle; +use crate::oracle::GetAnnouncement; +use crate::projection; +use crate::projection::try_into_update_rollover_proposal; +use crate::projection::UpdateRollOverProposal; +use crate::schnorrsig; +use crate::setup_contract; +use crate::setup_contract::RolloverParams; +use crate::tokio_ext::spawn_fallible; +use crate::wire; +use crate::wire::MakerToTaker; +use crate::wire::RollOverMsg; +use crate::Cfd; +use crate::Stopping; +use anyhow::Context as _; +use anyhow::Result; +use futures::channel::mpsc; +use futures::channel::mpsc::UnboundedSender; +use futures::future; +use futures::SinkExt; +use xtra::prelude::MessageChannel; +use xtra::Context; +use xtra::KeepRunning; +use xtra_productivity::xtra_productivity; + +pub struct AcceptRollOver; + +pub struct RejectRollOver; + +pub struct ProtocolMsg(pub wire::RollOverMsg); + +/// Message sent from the spawned task to `rollover_taker::Actor` to +/// notify that rollover has finished successfully. +pub struct RolloverSucceeded { + dlc: Dlc, +} + +/// Message sent from the spawned task to `rollover_taker::Actor` to +/// notify that rollover has failed. +pub struct RolloverFailed { + error: anyhow::Error, +} + +#[allow(clippy::large_enum_variant)] +pub struct Completed { + pub order_id: OrderId, + pub dlc: Dlc, +} + +pub struct Actor { + send_to_taker_actor: Box>, + cfd: Cfd, + taker_id: Identity, + n_payouts: usize, + oracle_pk: schnorrsig::PublicKey, + sent_from_taker: Option>, + maker_cfd_actor: Box>, + oracle_actor: Box>, + on_stopping: Vec>>>, + projection_actor: xtra::Address, + proposal: RollOverProposal, +} + +#[async_trait::async_trait] +impl xtra::Actor for Actor { + async fn stopping(&mut self, ctx: &mut Context) -> KeepRunning { + let address = ctx.address().expect("acquired own actor address"); + + for channel in self.on_stopping.iter() { + let _ = channel + .send(Stopping { + me: address.clone(), + }) + .await; + } + + KeepRunning::StopAll + } + + async fn started(&mut self, _ctx: &mut Context) { + let new_proposal = UpdateCfdProposal::RollOverProposal { + proposal: self.proposal.clone(), + direction: SettlementKind::Incoming, + }; + + self.projection_actor + .send( + try_into_update_rollover_proposal(new_proposal) + .expect("update cfd proposal is rollover proposal"), + ) + .await + .expect("projection actor is running"); + } +} + +impl Actor { + #[allow(clippy::too_many_arguments)] + pub fn new( + send_to_taker_actor: &(impl MessageChannel + 'static), + cfd: Cfd, + taker_id: Identity, + oracle_pk: schnorrsig::PublicKey, + maker_cfd_actor: &(impl MessageChannel + 'static), + oracle_actor: &(impl MessageChannel + 'static), + (on_stopping0, on_stopping1): ( + &(impl MessageChannel> + 'static), + &(impl MessageChannel> + 'static), + ), + projection_actor: xtra::Address, + proposal: RollOverProposal, + n_payouts: usize, + ) -> Self { + Self { + send_to_taker_actor: send_to_taker_actor.clone_channel(), + cfd, + taker_id, + n_payouts, + oracle_pk, + sent_from_taker: None, + maker_cfd_actor: maker_cfd_actor.clone_channel(), + oracle_actor: oracle_actor.clone_channel(), + on_stopping: vec![on_stopping0.clone_channel(), on_stopping1.clone_channel()], + projection_actor, + proposal, + } + } + + async fn update_contract(&mut self, dlc: Dlc, ctx: &mut xtra::Context) -> Result<()> { + let msg = Completed { + order_id: self.cfd.id, + dlc, + }; + self.maker_cfd_actor.send(msg).await?; + ctx.stop(); + Ok(()) + } + + async fn fail(&mut self, ctx: &mut xtra::Context, error: anyhow::Error) { + tracing::info!(%self.cfd.id, %error, "Rollover failed"); + if let Err(err) = self + .projection_actor + .send(projection::UpdateRollOverProposal { + order: self.cfd.id, + proposal: None, + }) + .await + { + tracing::error!(%err, "projection actor unreachable when attempting to fail rollover"); + } + ctx.stop(); + } + + async fn accept(&mut self, ctx: &mut xtra::Context) -> Result<()> { + let order_id = self.cfd.id; + + let (sender, receiver) = mpsc::unbounded(); + + self.sent_from_taker = Some(sender); + + tracing::debug!(%order_id, "Maker accepts a roll_over proposal" ); + + let cfd = self.cfd.clone(); + + let dlc = cfd.open_dlc().expect("CFD was in wrong state"); + + let oracle_event_id = oracle::next_announcement_after( + time::OffsetDateTime::now_utc() + cfd.settlement_interval, + )?; + + let taker_id = self.taker_id; + + self.send_to_taker_actor + .send(maker_inc_connections::TakerMessage { + taker_id, + msg: wire::MakerToTaker::ConfirmRollOver { + order_id, + oracle_event_id, + }, + }) + .await??; + + self.projection_actor + .send(UpdateRollOverProposal { + order: order_id, + proposal: None, + }) + .await?; + + let announcement = self + .oracle_actor + .send(oracle::GetAnnouncement(oracle_event_id)) + .await? + .with_context(|| format!("Announcement {} not found", oracle_event_id))?; + + let rollover_fut = setup_contract::roll_over( + self.send_to_taker_actor.sink().with(move |msg| { + future::ok(maker_inc_connections::TakerMessage { + taker_id, + msg: wire::MakerToTaker::RollOverProtocol { order_id, msg }, + }) + }), + receiver, + (self.oracle_pk, announcement), + RolloverParams::new( + cfd.price, + cfd.quantity_usd, + cfd.leverage, + cfd.refund_timelock_in_blocks(), + cfd.fee_rate, + ), + Role::Maker, + dlc, + self.n_payouts, + ); + + let this = ctx.address().expect("self to be alive"); + + spawn_fallible::<_, anyhow::Error>(async move { + let _ = match rollover_fut.await { + Ok(dlc) => this.send(RolloverSucceeded { dlc }).await?, + Err(error) => this.send(RolloverFailed { error }).await?, + }; + + Ok(()) + }); + + Ok(()) + } + + async fn reject(&mut self, ctx: &mut xtra::Context) -> Result<()> { + tracing::info!(%self.cfd.id, "Maker rejects a roll_over proposal" ); + + self.send_to_taker_actor + .send(TakerMessage { + taker_id: self.taker_id, + msg: MakerToTaker::RejectRollOver(self.cfd.id), + }) + .await??; + self.projection_actor + .send(UpdateRollOverProposal { + order: self.cfd.id, + proposal: None, + }) + .await?; + ctx.stop(); + + Ok(()) + } + + pub async fn forward_protocol_msg(&mut self, msg: ProtocolMsg) -> Result<()> { + let sender = self + .sent_from_taker + .as_mut() + .context("cannot forward message to rollover task")?; + sender.send(msg.0).await?; + Ok(()) + } +} + +#[xtra_productivity] +impl Actor { + async fn handle_accept_rollover( + &mut self, + _msg: AcceptRollOver, + ctx: &mut xtra::Context, + ) { + if let Err(err) = self.accept(ctx).await { + self.fail(ctx, err).await; + }; + } + + async fn handle_reject_rollover( + &mut self, + _msg: RejectRollOver, + ctx: &mut xtra::Context, + ) { + if let Err(err) = self.reject(ctx).await { + self.fail(ctx, err).await; + }; + } + + async fn handle_protocol_msg(&mut self, msg: ProtocolMsg, ctx: &mut xtra::Context) { + if let Err(err) = self.forward_protocol_msg(msg).await { + self.fail(ctx, err).await; + }; + } + + async fn handle_rollover_failed(&mut self, msg: RolloverFailed, ctx: &mut xtra::Context) { + self.fail(ctx, msg.error).await; + } + + async fn handle_rollover_succeeded( + &mut self, + msg: RolloverSucceeded, + ctx: &mut xtra::Context, + ) { + if let Err(err) = self.update_contract(msg.dlc.clone(), ctx).await { + self.fail(ctx, err).await; + } + } +} + +impl ActorName for Actor { + fn actor_name() -> String { + "Maker rollover".to_string() + } +} diff --git a/daemon/src/rollover_taker.rs b/daemon/src/rollover_taker.rs index bf9df21..87cc805 100644 --- a/daemon/src/rollover_taker.rs +++ b/daemon/src/rollover_taker.rs @@ -113,8 +113,9 @@ impl Actor { self.rollover_msg_sender = Some(sender); let rollover_fut = setup_contract::roll_over( - xtra::message_channel::MessageChannel::sink(&self.maker) - .with(|msg| future::ok(wire::TakerToMaker::RollOverProtocol(msg))), + xtra::message_channel::MessageChannel::sink(&self.maker).with(move |msg| { + future::ok(wire::TakerToMaker::RollOverProtocol { order_id, msg }) + }), receiver, (self.oracle_pk, announcement), RolloverParams::new( diff --git a/daemon/src/wire.rs b/daemon/src/wire.rs index dbcf7cb..24b0f30 100644 --- a/daemon/src/wire.rs +++ b/daemon/src/wire.rs @@ -86,7 +86,10 @@ pub enum TakerToMaker { order_id: OrderId, msg: SetupMsg, }, - RollOverProtocol(RollOverMsg), + RollOverProtocol { + order_id: OrderId, + msg: RollOverMsg, + }, Settlement { order_id: OrderId, msg: taker_to_maker::Settlement, @@ -99,7 +102,7 @@ impl fmt::Display for TakerToMaker { TakerToMaker::TakeOrder { .. } => write!(f, "TakeOrder"), TakerToMaker::Protocol { .. } => write!(f, "Protocol"), TakerToMaker::ProposeRollOver { .. } => write!(f, "ProposeRollOver"), - TakerToMaker::RollOverProtocol(_) => write!(f, "RollOverProtocol"), + TakerToMaker::RollOverProtocol { .. } => write!(f, "RollOverProtocol"), TakerToMaker::Settlement { .. } => write!(f, "Settlement"), TakerToMaker::Hello(_) => write!(f, "Hello"), }