diff --git a/daemon/src/connection.rs b/daemon/src/connection.rs index dbb45bd..6663b24 100644 --- a/daemon/src/connection.rs +++ b/daemon/src/connection.rs @@ -1,9 +1,13 @@ use crate::address_map::{AddressMap, Stopping}; use crate::model::cfd::OrderId; use crate::model::{Identity, Price, Timestamp, Usd}; +use crate::taker_cfd::CurrentOrder; use crate::tokio_ext::FutureExt; use crate::wire::{EncryptedJsonCodec, TakerToMaker, Version}; -use crate::{collab_settlement_taker, log_error, noise, send_to_socket, setup_taker, wire, Tasks}; +use crate::{ + collab_settlement_taker, log_error, noise, rollover_taker, send_to_socket, setup_taker, wire, + Tasks, +}; use anyhow::{bail, Context, Result}; use bdk::bitcoin::Amount; use futures::{SinkExt, StreamExt, TryStreamExt}; @@ -31,13 +35,14 @@ pub struct Actor { send_to_maker: Box>, send_to_maker_ctx: xtra::Context>, identity_sk: x25519_dalek::StaticSecret, - maker_to_taker: Box>, + current_order: Box>, /// Max duration since the last heartbeat until we die. heartbeat_timeout: Duration, connect_timeout: Duration, connected_state: Option, setup_actors: HashMap>, collab_settlement_actors: AddressMap, + rollover_actors: AddressMap, } pub struct Connect { @@ -90,10 +95,16 @@ pub struct ProposeSettlement { pub address: xtra::Address, } +pub struct ProposeRollOver { + pub order_id: OrderId, + pub timestamp: Timestamp, + pub address: xtra::Address, +} + impl Actor { pub fn new( status_sender: watch::Sender, - maker_to_taker: Box>, + current_order: &(impl MessageChannel + 'static), identity_sk: x25519_dalek::StaticSecret, hearthbeat_timeout: Duration, connect_timeout: Duration, @@ -105,12 +116,13 @@ impl Actor { send_to_maker: Box::new(send_to_maker_addr), send_to_maker_ctx, identity_sk, - maker_to_taker, + current_order: current_order.clone_channel(), heartbeat_timeout: hearthbeat_timeout, connected_state: None, setup_actors: HashMap::new(), connect_timeout, collab_settlement_actors: AddressMap::default(), + rollover_actors: AddressMap::default(), } } } @@ -127,6 +139,10 @@ impl Actor { ) { self.collab_settlement_actors.gc(message); } + + async fn handle_rollover_actor_stopping(&mut self, message: Stopping) { + self.rollover_actors.gc(message); + } } #[xtra_productivity] @@ -170,6 +186,25 @@ impl Actor { Ok(()) } + + async fn handle_propose_roll_over(&mut self, msg: ProposeRollOver) -> Result<()> { + let ProposeRollOver { + order_id, + timestamp, + address, + } = msg; + + self.send_to_maker + .send(wire::TakerToMaker::ProposeRollOver { + order_id, + timestamp, + }) + .await?; + + self.rollover_actors.insert(order_id, address); + + Ok(()) + } } #[xtra_productivity] @@ -338,13 +373,41 @@ impl Actor { tracing::warn!(%order_id, "No active collaborative settlement"); } } + wire::MakerToTaker::ConfirmRollOver { + order_id, + oracle_event_id, + } => { + if !self + .rollover_actors + .send( + &order_id, + rollover_taker::RollOverAccepted { oracle_event_id }, + ) + .await + { + tracing::warn!(%order_id, "No active rollover"); + } + } + wire::MakerToTaker::RejectRollOver(order_id) => { + if !self + .rollover_actors + .send(&order_id, rollover_taker::RollOverRejected) + .await + { + tracing::warn!(%order_id, "No active rollover"); + } + } + wire::MakerToTaker::RollOverProtocol { order_id, msg } => { + if !self.rollover_actors.send(&order_id, msg).await { + tracing::warn!(%order_id, "No active rollover"); + } + } + wire::MakerToTaker::CurrentOrder(msg) => { + log_error!(self.current_order.send(CurrentOrder(msg))); + } wire::MakerToTaker::Hello(_) => { tracing::warn!("Ignoring unexpected Hello message from maker. Hello is only expected when opening a new connection.") } - other => { - // this one should go to the taker cfd actor - log_error!(self.maker_to_taker.send(other)); - } } KeepRunning::Yes } diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 57e7e22..55a0d93 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -41,6 +41,7 @@ pub mod olivia; pub mod oracle; pub mod payout_curve; pub mod projection; +pub mod rollover_taker; pub mod routes; pub mod seed; pub mod send_to_socket; @@ -270,7 +271,7 @@ where tasks.add(connection_actor_ctx.run(connection::Actor::new( maker_online_status_feed_sender, - Box::new(cfd_actor_addr.clone()), + &cfd_actor_addr, identity_sk, maker_heartbeat_interval, connect_timeout, diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 0b0227d..500dadb 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -729,7 +729,7 @@ where self.takers.clone().into_sink().with(move |msg| { future::ok(maker_inc_connections::TakerMessage { taker_id, - msg: wire::MakerToTaker::RollOverProtocol(msg), + msg: wire::MakerToTaker::RollOverProtocol { order_id, msg }, }) }), receiver, diff --git a/daemon/src/rollover_taker.rs b/daemon/src/rollover_taker.rs new file mode 100644 index 0000000..36744be --- /dev/null +++ b/daemon/src/rollover_taker.rs @@ -0,0 +1,318 @@ +use crate::address_map::Stopping; +use crate::connection; +use crate::model::cfd::{Cfd, Dlc, OrderId, Role, RollOverProposal, SettlementKind}; +use crate::model::{BitMexPriceEventId, Timestamp}; +use crate::oracle::{self, GetAnnouncement}; +use crate::projection::{self, UpdateRollOverProposal}; +use crate::setup_contract::{self, RolloverParams}; +use crate::tokio_ext::spawn_fallible; +use crate::wire::{self, RollOverMsg}; +use anyhow::{Context, Result}; +use async_trait::async_trait; +use futures::channel::mpsc::{self, UnboundedSender}; +use futures::{future, SinkExt}; +use maia::secp256k1_zkp::schnorrsig; +use xtra::prelude::MessageChannel; +use xtra_productivity::xtra_productivity; + +pub struct Actor { + cfd: Cfd, + n_payouts: usize, + oracle_pk: schnorrsig::PublicKey, + timestamp: Timestamp, + maker: xtra::Address, + get_announcement: Box>, + projection: xtra::Address, + on_completed: Box>, + on_stopping: Vec>>>, + rollover_msg_sender: Option>, +} + +impl Actor { + pub fn new( + (cfd, n_payouts): (Cfd, usize), + oracle_pk: schnorrsig::PublicKey, + maker: xtra::Address, + get_announcement: &(impl MessageChannel + 'static), + projection: xtra::Address, + on_completed: &(impl MessageChannel + 'static), + (on_stopping0, on_stopping1): ( + &(impl MessageChannel> + 'static), + &(impl MessageChannel> + 'static), + ), + ) -> Self { + Self { + cfd, + n_payouts, + oracle_pk, + timestamp: Timestamp::now(), + maker, + get_announcement: get_announcement.clone_channel(), + projection, + on_completed: on_completed.clone_channel(), + on_stopping: vec![on_stopping0.clone_channel(), on_stopping1.clone_channel()], + rollover_msg_sender: None, + } + } + + async fn propose(&self, this: xtra::Address) -> Result<()> { + self.maker + .send(connection::ProposeRollOver { + order_id: self.cfd.order.id, + timestamp: self.timestamp, + address: this, + }) + .await??; + + self.update_proposal(Some(( + RollOverProposal { + order_id: self.cfd.order.id, + timestamp: self.timestamp, + }, + SettlementKind::Outgoing, + ))) + .await?; + + Ok(()) + } + + async fn handle_confirmed( + &mut self, + msg: RollOverAccepted, + ctx: &mut xtra::Context, + ) -> Result<()> { + let RollOverAccepted { oracle_event_id } = msg; + let announcement = self + .get_announcement + .send(oracle::GetAnnouncement(oracle_event_id)) + .await? + .with_context(|| format!("Announcement {} not found", oracle_event_id))?; + + let order_id = self.cfd.order.id; + tracing::info!(%order_id, "Rollover proposal got accepted"); + + self.update_proposal(None).await?; + + let (sender, receiver) = mpsc::unbounded::(); + // store the writing end to forward messages from the maker to + // the spawned rollover task + self.rollover_msg_sender = Some(sender); + + let rollover_fut = setup_contract::roll_over( + xtra::message_channel::MessageChannel::sink(&self.maker) + .with(|msg| future::ok(wire::TakerToMaker::RollOverProtocol(msg))), + receiver, + (self.oracle_pk, announcement), + RolloverParams::new( + self.cfd.order.price, + self.cfd.quantity_usd, + self.cfd.order.leverage, + self.cfd.refund_timelock_in_blocks(), + ), + Role::Taker, + self.cfd.dlc().context("No DLC in CFD")?, + self.n_payouts, + ); + + let this = ctx.address().expect("self to be alive"); + spawn_fallible::<_, anyhow::Error>(async move { + let _ = match rollover_fut.await { + Ok(dlc) => this.send(RolloverSucceeded { dlc }).await?, + Err(error) => this.send(RolloverFailed { error }).await?, + }; + + Ok(()) + }); + + Ok(()) + } + + async fn handle_rejected(&self) -> Result<()> { + let order_id = self.cfd.order.id; + tracing::info!(%order_id, "Rollover proposal got rejected"); + + self.update_proposal(None).await?; + + Ok(()) + } + + pub async fn forward_protocol_msg(&mut self, msg: wire::RollOverMsg) -> Result<()> { + let sender = self + .rollover_msg_sender + .as_mut() + .context("Cannot forward message to rollover task")?; + sender.send(msg).await?; + + Ok(()) + } + + async fn update_proposal( + &self, + proposal: Option<(RollOverProposal, SettlementKind)>, + ) -> Result<()> { + self.projection + .send(UpdateRollOverProposal { + order: self.cfd.order.id, + proposal, + }) + .await?; + + Ok(()) + } + + async fn complete(&mut self, completed: Completed, ctx: &mut xtra::Context) { + let _ = self.on_completed.send(completed).await; + + ctx.stop(); + } +} + +#[async_trait] +impl xtra::Actor for Actor { + async fn started(&mut self, ctx: &mut xtra::Context) { + let this = ctx.address().expect("self to be alive"); + if let Err(e) = self.propose(this).await { + self.complete( + Completed::Failed { + order_id: self.cfd.order.id, + error: e, + }, + ctx, + ) + .await; + } + } + + async fn stopping(&mut self, ctx: &mut xtra::Context) -> xtra::KeepRunning { + // inform other actors that we are stopping so that our + // address can be GCd from their AddressMaps + let me = ctx.address().expect("we are still alive"); + + for channel in self.on_stopping.iter() { + let _ = channel.send(Stopping { me: me.clone() }).await; + } + + xtra::KeepRunning::StopAll + } +} + +#[xtra_productivity] +impl Actor { + pub async fn handle_confirm_rollover( + &mut self, + msg: RollOverAccepted, + ctx: &mut xtra::Context, + ) { + if let Err(error) = self.handle_confirmed(msg, ctx).await { + self.complete( + Completed::Failed { + order_id: self.cfd.order.id, + error, + }, + ctx, + ) + .await; + } + } + + pub async fn reject_rollover(&mut self, _: RollOverRejected, ctx: &mut xtra::Context) { + let order_id = self.cfd.order.id; + let completed = if let Err(error) = self.handle_rejected().await { + Completed::Failed { order_id, error } + } else { + Completed::Rejected { order_id } + }; + + self.complete(completed, ctx).await; + } + + pub async fn handle_rollover_succeeded( + &mut self, + msg: RolloverSucceeded, + ctx: &mut xtra::Context, + ) { + self.complete( + Completed::UpdatedContract { + order_id: self.cfd.order.id, + dlc: msg.dlc, + }, + ctx, + ) + .await; + } + + pub async fn handle_rollover_failed( + &mut self, + msg: RolloverFailed, + ctx: &mut xtra::Context, + ) { + self.complete( + Completed::Failed { + order_id: self.cfd.order.id, + error: msg.error, + }, + ctx, + ) + .await; + } + + pub async fn handle_protocol_msg( + &mut self, + msg: wire::RollOverMsg, + ctx: &mut xtra::Context, + ) { + if let Err(error) = self.forward_protocol_msg(msg).await { + self.complete( + Completed::Failed { + order_id: self.cfd.order.id, + error, + }, + ctx, + ) + .await; + } + } +} + +/// Message sent from the `connection::Actor` to the +/// `rollover_taker::Actor` to notify that the maker has accepted the +/// rollover proposal. +pub struct RollOverAccepted { + pub oracle_event_id: BitMexPriceEventId, +} + +/// Message sent from the `connection::Actor` to the +/// `rollover_taker::Actor` to notify that the maker has rejected the +/// rollover proposal. +pub struct RollOverRejected; + +/// Message sent from the spawned task to `rollover_taker::Actor` to +/// notify that rollover has finished successfully. +pub struct RolloverSucceeded { + dlc: Dlc, +} + +/// Message sent from the spawned task to `rollover_taker::Actor` to +/// notify that rollover has failed. +pub struct RolloverFailed { + error: anyhow::Error, +} + +#[allow(clippy::large_enum_variant)] +pub enum Completed { + UpdatedContract { + order_id: OrderId, + dlc: Dlc, + }, + Rejected { + order_id: OrderId, + }, + Failed { + order_id: OrderId, + error: anyhow::Error, + }, +} + +impl xtra::Message for Completed { + type Result = Result<()>; +} diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index be53d63..2614895 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -1,33 +1,22 @@ -use crate::address_map::AddressMap; +use crate::address_map::{AddressMap, Stopping}; use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_update_feed}; use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; -use crate::model::cfd::{ - Cfd, CfdState, CfdStateCommon, Completed, Dlc, Order, OrderId, Origin, Role, RollOverProposal, - SettlementKind, UpdateCfdProposal, UpdateCfdProposals, -}; -use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd}; +use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Completed, Order, OrderId, Origin, Role}; +use crate::model::{Price, Usd}; use crate::monitor::{self, MonitorParams}; -use crate::projection::{ - try_into_update_rollover_proposal, UpdateRollOverProposal, UpdateSettlementProposal, -}; -use crate::setup_contract::RolloverParams; -use crate::tokio_ext::FutureExt; -use crate::wire::RollOverMsg; use crate::{ - collab_settlement_taker, connection, log_error, oracle, projection, setup_contract, - setup_taker, wallet, wire, Tasks, + collab_settlement_taker, connection, log_error, oracle, projection, rollover_taker, + setup_taker, wallet, Tasks, }; use anyhow::{bail, Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; -use futures::channel::mpsc; -use futures::future::RemoteHandle; -use futures::{future, SinkExt}; -use std::collections::HashMap; use xtra::prelude::*; use xtra::Actor as _; use xtra_productivity::xtra_productivity; +pub struct CurrentOrder(pub Option); + pub struct TakeOffer { pub order_id: OrderId, pub quantity: Usd, @@ -46,19 +35,6 @@ pub struct Commit { pub order_id: OrderId, } -pub struct CfdRollOverCompleted { - pub order_id: OrderId, - pub dlc: Result, -} - -enum RollOverState { - Active { - sender: mpsc::UnboundedSender, - _task: RemoteHandle<()>, - }, - None, -} - pub struct Actor { db: sqlx::SqlitePool, wallet: Address, @@ -68,9 +44,8 @@ pub struct Actor { monitor_actor: Address, setup_actors: AddressMap, collab_settlement_actors: AddressMap, - roll_over_state: RollOverState, + rollover_actors: AddressMap, oracle_actor: Address, - current_pending_proposals: UpdateCfdProposals, n_payouts: usize, tasks: Tasks, } @@ -99,48 +74,16 @@ where projection_actor, conn_actor, monitor_actor, - roll_over_state: RollOverState::None, oracle_actor, - current_pending_proposals: HashMap::new(), n_payouts, setup_actors: AddressMap::default(), collab_settlement_actors: AddressMap::default(), + rollover_actors: AddressMap::default(), tasks: Tasks::default(), } } } -impl Actor { - /// Removes a proposal and updates the update cfd proposals' feed - async fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> { - let removed_proposal = self.current_pending_proposals.remove(order_id); - - if let Some(removed_proposal) = removed_proposal { - match removed_proposal { - UpdateCfdProposal::Settlement { .. } => { - self.projection_actor - .send(UpdateSettlementProposal { - order: *order_id, - proposal: None, - }) - .await? - } - UpdateCfdProposal::RollOverProposal { .. } => { - self.projection_actor - .send(UpdateRollOverProposal { - order: *order_id, - proposal: None, - }) - .await? - } - } - } else { - anyhow::bail!("Could not find proposal with order id: {}", &order_id); - } - Ok(()) - } -} - #[xtra_productivity] impl Actor where @@ -156,38 +99,6 @@ where Ok(()) } - async fn handle_propose_roll_over(&mut self, msg: ProposeRollOver) -> Result<()> { - let ProposeRollOver { order_id } = msg; - - if self.current_pending_proposals.contains_key(&order_id) { - anyhow::bail!("An update for order id {} is already in progress", order_id) - } - - let proposal = RollOverProposal { - order_id, - timestamp: Timestamp::now(), - }; - - let new_proposal = UpdateCfdProposal::RollOverProposal { - proposal: proposal.clone(), - direction: SettlementKind::Outgoing, - }; - - self.current_pending_proposals - .insert(proposal.order_id, new_proposal.clone()); - self.projection_actor - .send(try_into_update_rollover_proposal(new_proposal)?) - .await?; - - self.conn_actor - .send(wire::TakerToMaker::ProposeRollOver { - order_id: proposal.order_id, - timestamp: proposal.timestamp, - }) - .await?; - Ok(()) - } - async fn handle_propose_settlement( &mut self, msg: ProposeSettlement, @@ -263,36 +174,6 @@ where } } -impl Actor -where - W: xtra::Handler - + xtra::Handler - + xtra::Handler, -{ - async fn handle_roll_over_rejected(&mut self, order_id: OrderId) -> Result<()> { - tracing::info!(%order_id, "Roll over proposal got rejected"); - - self.remove_pending_proposal(&order_id) - .await - .context("rejected settlement")?; - - Ok(()) - } - - async fn handle_inc_roll_over_msg(&mut self, msg: RollOverMsg) -> Result<()> { - match &mut self.roll_over_state { - RollOverState::Active { sender, .. } => { - sender.send(msg).await?; - } - RollOverState::None => { - anyhow::bail!("Received message without an active roll_over setup") - } - } - - Ok(()) - } -} - impl Actor { async fn handle_new_order(&mut self, order: Option) -> Result<()> { tracing::trace!("new order {:?}", order); @@ -510,92 +391,67 @@ where } } -impl Actor +#[xtra_productivity] +impl Actor where - Self: xtra::Handler, - O: xtra::Handler, - W: xtra::Handler - + xtra::Handler - + xtra::Handler, + M: xtra::Handler, + O: xtra::Handler + xtra::Handler, { - async fn handle_roll_over_accepted( + async fn handle_propose_rollover( &mut self, - order_id: OrderId, - oracle_event_id: BitMexPriceEventId, + msg: ProposeRollOver, ctx: &mut Context, ) -> Result<()> { - tracing::info!(%order_id, "Roll; over request got accepted"); - - let (sender, receiver) = mpsc::unbounded(); + let ProposeRollOver { order_id } = msg; - if let RollOverState::Active { .. } = self.roll_over_state { - anyhow::bail!("Already rolling over a contract!") - } + let disconnected = self + .rollover_actors + .get_disconnected(order_id) + .with_context(|| format!("Rollover for order {} is already in progress", order_id))?; let mut conn = self.db.acquire().await?; - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - let dlc = cfd.open_dlc().context("CFD was in wrong state")?; - - let announcement = self - .oracle_actor - .send(oracle::GetAnnouncement(oracle_event_id)) - .await? - .with_context(|| format!("Announcement {} not found", oracle_event_id))?; - - let contract_future = setup_contract::roll_over( - xtra::message_channel::MessageChannel::sink(&self.conn_actor) - .with(|msg| future::ok(wire::TakerToMaker::RollOverProtocol(msg))), - receiver, - (self.oracle_pk, announcement), - RolloverParams::new( - cfd.order.price, - cfd.quantity_usd, - cfd.order.leverage, - cfd.refund_timelock_in_blocks(), - ), - Role::Taker, - dlc, - self.n_payouts, - ); let this = ctx .address() .expect("actor to be able to give address to itself"); + let (addr, fut) = rollover_taker::Actor::new( + (cfd, self.n_payouts), + self.oracle_pk, + self.conn_actor.clone(), + &self.oracle_actor, + self.projection_actor.clone(), + &this, + (&this, &self.conn_actor), + ) + .create(None) + .run(); - let task = async move { - let dlc = contract_future.await; - - this.send(CfdRollOverCompleted { order_id, dlc }) - .await - .expect("always connected to ourselves") - } - .spawn_with_handle(); - - self.roll_over_state = RollOverState::Active { - sender, - _task: task, - }; + disconnected.insert(addr); + self.tasks.add(fut); - self.remove_pending_proposal(&order_id) - .await - .context("Could not remove accepted roll over")?; Ok(()) } } -impl Actor +#[xtra_productivity(message_impl = false)] +impl Actor where M: xtra::Handler, O: xtra::Handler, { - async fn handle_roll_over_completed( - &mut self, - order_id: OrderId, - dlc: Result, - ) -> Result<()> { - let dlc = dlc.context("Failed to roll over contract with maker")?; - self.roll_over_state = RollOverState::None; + async fn handle_rollover_completed(&mut self, msg: rollover_taker::Completed) -> Result<()> { + use rollover_taker::Completed::*; + let (order_id, dlc) = match msg { + UpdatedContract { order_id, dlc } => (order_id, dlc), + Rejected { .. } => { + return Ok(()); + } + Failed { order_id, error } => { + tracing::warn!(%order_id, "Rollover failed: {:#}", error); + return Ok(()); + } + }; let mut conn = self.db.acquire().await?; let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; @@ -625,48 +481,17 @@ where } } -#[async_trait] -impl Handler for Actor -where - Self: xtra::Handler, - O: xtra::Handler + xtra::Handler, - W: xtra::Handler - + xtra::Handler - + xtra::Handler, -{ - async fn handle(&mut self, msg: wire::MakerToTaker, ctx: &mut Context) { - match msg { - wire::MakerToTaker::CurrentOrder(current_order) => { - log_error!(self.handle_new_order(current_order)) - } - wire::MakerToTaker::ConfirmRollOver { - order_id, - oracle_event_id, - } => { - log_error!(self.handle_roll_over_accepted(order_id, oracle_event_id, ctx)) - } - wire::MakerToTaker::RejectRollOver(order_id) => { - log_error!(self.handle_roll_over_rejected(order_id)) - } - wire::MakerToTaker::RollOverProtocol(roll_over_msg) => { - log_error!(self.handle_inc_roll_over_msg(roll_over_msg)) - } - wire::MakerToTaker::Heartbeat => { - unreachable!("Heartbeats should be handled somewhere else") - } - wire::MakerToTaker::ConfirmOrder(_) - | wire::MakerToTaker::RejectOrder(_) - | wire::MakerToTaker::Protocol { .. } - | wire::MakerToTaker::InvalidOrderId(_) => { - unreachable!("These messages should be sent to the `setup_taker::Actor`") - } - wire::MakerToTaker::Settlement { .. } => { - unreachable!("These messages should be sent to the `collab_settlement::Actor`") - } - wire::MakerToTaker::Hello(_) => { - unreachable!("Connection related messages are handled in the connection actor") - } - } +#[xtra_productivity(message_impl = false)] +impl Actor { + async fn handle_rollover_actor_stopping(&mut self, msg: Stopping) { + self.rollover_actors.gc(msg); + } +} + +#[xtra_productivity] +impl Actor { + async fn handle_current_order(&mut self, msg: CurrentOrder) { + log_error!(self.handle_new_order(msg.0)); } } @@ -682,17 +507,6 @@ where } } -#[async_trait] -impl Handler for Actor -where - M: xtra::Handler, - O: xtra::Handler, -{ - async fn handle(&mut self, msg: CfdRollOverCompleted, _ctx: &mut Context) { - log_error!(self.handle_roll_over_completed(msg.order_id, msg.dlc)); - } -} - #[async_trait] impl Handler for Actor where @@ -720,8 +534,4 @@ impl Handler for Actor } } -impl Message for CfdRollOverCompleted { - type Result = (); -} - impl xtra::Actor for Actor {} diff --git a/daemon/src/wire.rs b/daemon/src/wire.rs index 1595f56..acbe859 100644 --- a/daemon/src/wire.rs +++ b/daemon/src/wire.rs @@ -106,7 +106,10 @@ pub enum MakerToTaker { order_id: OrderId, msg: SetupMsg, }, - RollOverProtocol(RollOverMsg), + RollOverProtocol { + order_id: OrderId, + msg: RollOverMsg, + }, ConfirmRollOver { order_id: OrderId, oracle_event_id: BitMexPriceEventId, @@ -141,7 +144,7 @@ impl fmt::Display for MakerToTaker { MakerToTaker::Protocol { .. } => write!(f, "Protocol"), MakerToTaker::ConfirmRollOver { .. } => write!(f, "ConfirmRollOver"), MakerToTaker::RejectRollOver(_) => write!(f, "RejectRollOver"), - MakerToTaker::RollOverProtocol(_) => write!(f, "RollOverProtocol"), + MakerToTaker::RollOverProtocol { .. } => write!(f, "RollOverProtocol"), MakerToTaker::Settlement { .. } => write!(f, "Settlement"), } }