diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 9a290cd..514605b 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -16,8 +16,8 @@ use bdk::bitcoin::secp256k1::schnorrsig; use cfd_protocol::secp256k1_zkp::Signature; use futures::channel::mpsc; use futures::{future, SinkExt}; -use rocket_db_pools::sqlx::Sqlite; use sqlx::pool::PoolConnection; +use sqlx::Sqlite; use std::collections::HashMap; use std::time::SystemTime; use tokio::sync::watch; @@ -58,19 +58,19 @@ pub struct FromTaker { pub msg: wire::TakerToMaker, } -pub struct Actor { +pub struct Actor { db: sqlx::SqlitePool, wallet: Wallet, oracle_pk: schnorrsig::PublicKey, cfd_feed_actor_inbox: watch::Sender>, order_feed_sender: watch::Sender>, update_cfd_feed_sender: watch::Sender, - takers: Address, + takers: Address, current_order_id: Option, - monitor_actor: Address, + monitor_actor: Address, setup_state: SetupState, roll_over_state: RollOverState, - oracle_actor: Address, + oracle_actor: Address, // Maker needs to also store TakerId to be able to send a reply back current_pending_proposals: HashMap, // TODO: Persist instead of using an in-memory hashmap for resiliency? @@ -95,7 +95,7 @@ enum RollOverState { None, } -impl Actor { +impl Actor { #[allow(clippy::too_many_arguments)] pub fn new( db: sqlx::SqlitePool, @@ -104,9 +104,9 @@ impl Actor { cfd_feed_actor_inbox: watch::Sender>, order_feed_sender: watch::Sender>, update_cfd_feed_sender: watch::Sender, - takers: Address, - monitor_actor: Address, - oracle_actor: Address, + takers: Address, + monitor_actor: Address, + oracle_actor: Address, ) -> Self { Self { db, @@ -126,104 +126,55 @@ impl Actor { } } - /// Send pending proposals for the purposes of UI updates. - /// Filters out the TakerIds, as they are an implementation detail inside of - /// the actor - fn send_pending_proposals(&self) -> Result<()> { - Ok(self.update_cfd_feed_sender.send( - self.current_pending_proposals - .iter() - .map(|(order_id, (update_cfd, _))| (*order_id, (update_cfd.clone()))) - .collect(), - )?) - } - - fn get_taker_id_of_proposal(&self, order_id: &OrderId) -> Result { - let (_, taker_id) = self - .current_pending_proposals - .get(order_id) - .context("Could not find proposal for given order id")?; - Ok(*taker_id) - } + async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { + let mut conn = self.db.acquire().await?; + cfd_actors::handle_commit( + order_id, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; - /// Removes a proposal and updates the update cfd proposals' feed - fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> { - if self.current_pending_proposals.remove(order_id).is_none() { - anyhow::bail!("Could not find proposal with order id: {}", &order_id) - } - self.send_pending_proposals()?; Ok(()) } - fn get_settlement_proposal(&self, order_id: OrderId) -> Result<(SettlementProposal, TakerId)> { - let (update_proposal, taker_id) = self - .current_pending_proposals - .get(&order_id) - .context("have a proposal that is about to be accepted")?; - - let proposal = match update_proposal { - UpdateCfdProposal::Settlement { proposal, .. } => proposal, - UpdateCfdProposal::RollOverProposal { .. } => { - anyhow::bail!("did not expect a rollover proposal"); - } - }; - Ok((proposal.clone(), *taker_id)) - } - - async fn handle_new_order( + async fn handle_propose_roll_over( &mut self, - price: Usd, - min_quantity: Usd, - max_quantity: Usd, + proposal: RollOverProposal, + taker_id: TakerId, ) -> Result<()> { - let oracle_event_id = - oracle::next_announcement_after(time::OffsetDateTime::now_utc() + Order::TERM)?; - - self.oracle_actor - .do_send_async(oracle::FetchAnnouncement(oracle_event_id)) - .await?; - - let order = Order::new( - price, - min_quantity, - max_quantity, - Origin::Ours, - oracle_event_id, - )?; - - // 1. Save to DB - let mut conn = self.db.acquire().await?; - insert_order(&order, &mut conn).await?; - - // 2. Update actor state to current order - self.current_order_id.replace(order.id); - - // 3. Notify UI via feed - self.order_feed_sender.send(Some(order.clone()))?; - - // 4. Inform connected takers - self.takers - .do_send_async(maker_inc_connections::BroadcastOrder(Some(order))) - .await?; - Ok(()) - } + tracing::info!( + "Received proposal from the taker {}: {:?} to roll over order {}", + taker_id, + proposal, + proposal.order_id + ); - async fn handle_new_taker_online(&mut self, taker_id: TakerId) -> Result<()> { + // check if CFD is in open state, otherwise we should not proceed let mut conn = self.db.acquire().await?; - - let current_order = match self.current_order_id { - Some(current_order_id) => Some(load_order_by_id(current_order_id, &mut conn).await?), - None => None, + let cfd = load_cfd_by_order_id(proposal.order_id, &mut conn).await?; + match cfd { + Cfd { + state: CfdState::Open { .. }, + .. + } => (), + _ => { + anyhow::bail!("Order is in invalid state. Cannot propose roll over.") + } }; - self.takers - .do_send_async(maker_inc_connections::TakerMessage { - taker_id, - command: TakerCommand::SendOrder { - order: current_order, + self.current_pending_proposals.insert( + proposal.order_id, + ( + UpdateCfdProposal::RollOverProposal { + proposal, + direction: SettlementKind::Incoming, }, - }) - .await?; + taker_id, + ), + ); + self.send_pending_proposals()?; Ok(()) } @@ -309,43 +260,27 @@ impl Actor { Ok(()) } - async fn handle_propose_roll_over( - &mut self, - proposal: RollOverProposal, - taker_id: TakerId, - ) -> Result<()> { - tracing::info!( - "Received proposal from the taker {}: {:?} to roll over order {}", - taker_id, - proposal, - proposal.order_id - ); - - // check if CFD is in open state, otherwise we should not proceed + async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { let mut conn = self.db.acquire().await?; - let cfd = load_cfd_by_order_id(proposal.order_id, &mut conn).await?; - match cfd { - Cfd { - state: CfdState::Open { .. }, - .. - } => (), - _ => { - anyhow::bail!("Order is in invalid state. Cannot propose roll over.") - } - }; - - self.current_pending_proposals.insert( - proposal.order_id, - ( - UpdateCfdProposal::RollOverProposal { - proposal, - direction: SettlementKind::Incoming, - }, - taker_id, - ), - ); - self.send_pending_proposals()?; + cfd_actors::handle_monitoring_event( + event, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; + Ok(()) + } + async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { + let mut conn = self.db.acquire().await?; + cfd_actors::handle_oracle_attestation( + attestation, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; Ok(()) } @@ -387,78 +322,146 @@ impl Actor { Ok(()) } - async fn handle_cfd_setup_completed( - &mut self, - order_id: OrderId, - dlc: Result, - ) -> Result<()> { - self.setup_state = SetupState::None; + /// Send pending proposals for the purposes of UI updates. + /// Filters out the TakerIds, as they are an implementation detail inside of + /// the actor + fn send_pending_proposals(&self) -> Result<()> { + Ok(self.update_cfd_feed_sender.send( + self.current_pending_proposals + .iter() + .map(|(order_id, (update_cfd, _))| (*order_id, (update_cfd.clone()))) + .collect(), + )?) + } - let dlc = dlc.context("Failed to setup contract with taker")?; + /// Removes a proposal and updates the update cfd proposals' feed + fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> { + if self.current_pending_proposals.remove(order_id).is_none() { + anyhow::bail!("Could not find proposal with order id: {}", &order_id) + } + self.send_pending_proposals()?; + Ok(()) + } - let mut conn = self.db.acquire().await?; + fn get_taker_id_of_proposal(&self, order_id: &OrderId) -> Result { + let (_, taker_id) = self + .current_pending_proposals + .get(order_id) + .context("Could not find proposal for given order id")?; + Ok(*taker_id) + } - let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - cfd.state = CfdState::PendingOpen { - common: CfdStateCommon::default(), - dlc: dlc.clone(), - attestation: None, + fn get_settlement_proposal(&self, order_id: OrderId) -> Result<(SettlementProposal, TakerId)> { + let (update_proposal, taker_id) = self + .current_pending_proposals + .get(&order_id) + .context("have a proposal that is about to be accepted")?; + + let proposal = match update_proposal { + UpdateCfdProposal::Settlement { proposal, .. } => proposal, + UpdateCfdProposal::RollOverProposal { .. } => { + anyhow::bail!("did not expect a rollover proposal"); + } }; + Ok((proposal.clone(), *taker_id)) + } +} - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; +impl Actor +where + T: xtra::Handler, +{ + async fn handle_new_taker_online(&mut self, taker_id: TakerId) -> Result<()> { + let mut conn = self.db.acquire().await?; - let txid = self - .wallet - .try_broadcast_transaction(dlc.lock.0.clone()) + let current_order = match self.current_order_id { + Some(current_order_id) => Some(load_order_by_id(current_order_id, &mut conn).await?), + None => None, + }; + + self.takers + .do_send_async(maker_inc_connections::TakerMessage { + taker_id, + command: TakerCommand::SendOrder { + order: current_order, + }, + }) .await?; - tracing::info!("Lock transaction published with txid {}", txid); + Ok(()) + } - self.monitor_actor - .do_send_async(monitor::StartMonitoring { - id: order_id, - params: MonitorParams::from_dlc_and_timelocks(dlc, cfd.refund_timelock_in_blocks()), + async fn handle_accept_settlement(&mut self, order_id: OrderId) -> Result<()> { + tracing::debug!(%order_id, "Maker accepts a settlement proposal" ); + + let taker_id = self.get_taker_id_of_proposal(&order_id)?; + + self.takers + .do_send_async(maker_inc_connections::TakerMessage { + taker_id, + command: TakerCommand::NotifySettlementAccepted { id: order_id }, }) .await?; - self.oracle_actor - .do_send_async(oracle::MonitorAttestation { - event_id: cfd.order.oracle_event_id, + self.current_agreed_proposals + .insert(order_id, self.get_settlement_proposal(order_id)?); + self.remove_pending_proposal(&order_id) + .context("accepted settlement")?; + Ok(()) + } + + async fn handle_reject_settlement(&mut self, order_id: OrderId) -> Result<()> { + tracing::debug!(%order_id, "Maker rejects a settlement proposal" ); + + let taker_id = self.get_taker_id_of_proposal(&order_id)?; + + self.takers + .do_send_async(maker_inc_connections::TakerMessage { + taker_id, + command: TakerCommand::NotifySettlementRejected { id: order_id }, }) .await?; + self.remove_pending_proposal(&order_id) + .context("rejected settlement")?; Ok(()) } - async fn handle_cfd_roll_over_completed( - &mut self, - order_id: OrderId, - dlc: Result, - ) -> Result<()> { - let dlc = dlc.context("Failed to roll over contract with taker")?; - self.roll_over_state = RollOverState::None; + async fn handle_reject_roll_over(&mut self, order_id: OrderId) -> Result<()> { + tracing::debug!(%order_id, "Maker rejects a roll_over proposal" ); - let mut conn = self.db.acquire().await?; - let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - cfd.state = CfdState::Open { - common: CfdStateCommon::default(), - dlc: dlc.clone(), - attestation: None, - collaborative_close: None, + // Validate if order is actually being requested to be extended + let (_, taker_id) = match self.current_pending_proposals.get(&order_id) { + Some(( + UpdateCfdProposal::RollOverProposal { + proposal, + direction: SettlementKind::Incoming, + }, + taker_id, + )) => (proposal, *taker_id), + _ => { + anyhow::bail!("Order is in invalid state. Ignoring reject roll over request.") + } }; - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; - - self.monitor_actor - .do_send_async(monitor::StartMonitoring { - id: order_id, - params: MonitorParams::from_dlc_and_timelocks(dlc, cfd.refund_timelock_in_blocks()), + self.takers + .do_send_async(maker_inc_connections::TakerMessage { + taker_id, + command: TakerCommand::NotifyRollOverRejected { id: order_id }, }) .await?; + self.remove_pending_proposal(&order_id) + .context("rejected roll_over")?; Ok(()) } +} +impl Actor +where + T: xtra::Handler + + xtra::Handler, +{ async fn handle_take_order( &mut self, taker_id: TakerId, @@ -492,7 +495,7 @@ impl Actor { } }; - // 2. Create a new CFD + // 2. Insert CFD in DB let cfd = Cfd::new( current_order.clone(), quantity, @@ -515,6 +518,7 @@ impl Actor { ); self.reject_order(taker_id, cfd, conn).await?; + return Ok(()); } @@ -528,6 +532,66 @@ impl Actor { Ok(()) } + async fn handle_reject_order(&mut self, order_id: OrderId) -> Result<()> { + tracing::debug!(%order_id, "Maker rejects an order" ); + + let mut conn = self.db.acquire().await?; + let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + + let taker_id = match cfd { + Cfd { + state: CfdState::IncomingOrderRequest { taker_id, .. }, + .. + } => taker_id, + _ => { + anyhow::bail!("Order is in invalid state. Ignoring trying to accept it.") + } + }; + + self.reject_order(taker_id, cfd, conn).await?; + + Ok(()) + } + + /// Reject an order + /// + /// Rejection includes removing the order and saving in the db that it was rejected. + /// In the current model it is essential to remove the order because a taker + /// that received a rejection cannot communicate with the maker until a new order is published. + async fn reject_order( + &mut self, + taker_id: TakerId, + mut cfd: Cfd, + mut conn: PoolConnection, + ) -> Result<()> { + cfd.state = CfdState::rejected(); + + append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + + self.takers + .do_send_async(maker_inc_connections::TakerMessage { + taker_id, + command: TakerCommand::NotifyOrderRejected { id: cfd.order.id }, + }) + .await?; + + // Remove order for all + self.current_order_id = None; + self.takers + .do_send_async(maker_inc_connections::BroadcastOrder(None)) + .await?; + self.order_feed_sender.send(None)?; + + Ok(()) + } +} + +impl Actor +where + Self: xtra::Handler, + O: xtra::Handler, + T: xtra::Handler, +{ async fn handle_accept_order( &mut self, order_id: OrderId, @@ -612,108 +676,107 @@ impl Actor { Ok(()) } +} - async fn handle_reject_order(&mut self, order_id: OrderId) -> Result<()> { - tracing::debug!(%order_id, "Maker rejects an order" ); +impl Actor +where + O: xtra::Handler, + T: xtra::Handler, +{ + async fn handle_new_order( + &mut self, + price: Usd, + min_quantity: Usd, + max_quantity: Usd, + ) -> Result<()> { + let oracle_event_id = + oracle::next_announcement_after(time::OffsetDateTime::now_utc() + Order::TERM)?; + + self.oracle_actor + .do_send_async(oracle::FetchAnnouncement(oracle_event_id)) + .await?; + + let order = Order::new( + price, + min_quantity, + max_quantity, + Origin::Ours, + oracle_event_id, + )?; + // 1. Save to DB let mut conn = self.db.acquire().await?; - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + insert_order(&order, &mut conn).await?; - let taker_id = match cfd { - Cfd { - state: CfdState::IncomingOrderRequest { taker_id, .. }, - .. - } => taker_id, - _ => { - anyhow::bail!("Order is in invalid state. Ignoring trying to accept it.") - } - }; + // 2. Update actor state to current order + self.current_order_id.replace(order.id); - self.reject_order(taker_id, cfd, conn).await?; + // 3. Notify UI via feed + self.order_feed_sender.send(Some(order.clone()))?; + // 4. Inform connected takers + self.takers + .do_send_async(maker_inc_connections::BroadcastOrder(Some(order))) + .await?; Ok(()) } +} - /// Reject an order - /// - /// Rejection includes removing the order and saving in the db that it was rejected. - /// In the current model it is essential to remove the order because a taker - /// that received a rejection cannot communicate with the maker until a new order is published. - async fn reject_order( +impl Actor +where + O: xtra::Handler, + M: xtra::Handler, +{ + async fn handle_cfd_setup_completed( &mut self, - taker_id: TakerId, - mut cfd: Cfd, - mut conn: PoolConnection, + order_id: OrderId, + dlc: Result, ) -> Result<()> { - // Update order in db - cfd.state = CfdState::rejected(); - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; - - self.takers - .do_send_async(maker_inc_connections::TakerMessage { - taker_id, - command: TakerCommand::NotifyOrderRejected { id: cfd.order.id }, - }) - .await?; - - // Remove order for all - self.current_order_id = None; - self.takers - .do_send_async(maker_inc_connections::BroadcastOrder(None)) - .await?; - self.order_feed_sender.send(None)?; + self.setup_state = SetupState::None; - Ok(()) - } + let dlc = dlc.context("Failed to setup contract with taker")?; - async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { let mut conn = self.db.acquire().await?; - cfd_actors::handle_commit( - order_id, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; - Ok(()) - } - async fn handle_accept_settlement(&mut self, order_id: OrderId) -> Result<()> { - tracing::debug!(%order_id, "Maker accepts a settlement proposal" ); + let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + cfd.state = CfdState::PendingOpen { + common: CfdStateCommon::default(), + dlc: dlc.clone(), + attestation: None, + }; - let taker_id = self.get_taker_id_of_proposal(&order_id)?; + append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; - self.takers - .do_send_async(maker_inc_connections::TakerMessage { - taker_id, - command: TakerCommand::NotifySettlementAccepted { id: order_id }, - }) + let txid = self + .wallet + .try_broadcast_transaction(dlc.lock.0.clone()) .await?; - self.current_agreed_proposals - .insert(order_id, self.get_settlement_proposal(order_id)?); - self.remove_pending_proposal(&order_id) - .context("accepted settlement")?; - Ok(()) - } - - async fn handle_reject_settlement(&mut self, order_id: OrderId) -> Result<()> { - tracing::debug!(%order_id, "Maker rejects a settlement proposal" ); + tracing::info!("Lock transaction published with txid {}", txid); - let taker_id = self.get_taker_id_of_proposal(&order_id)?; + self.monitor_actor + .do_send_async(monitor::StartMonitoring { + id: order_id, + params: MonitorParams::from_dlc_and_timelocks(dlc, cfd.refund_timelock_in_blocks()), + }) + .await?; - self.takers - .do_send_async(maker_inc_connections::TakerMessage { - taker_id, - command: TakerCommand::NotifySettlementRejected { id: order_id }, + self.oracle_actor + .do_send_async(oracle::MonitorAttestation { + event_id: cfd.order.oracle_event_id, }) .await?; - self.remove_pending_proposal(&order_id) - .context("rejected settlement")?; Ok(()) } +} +impl Actor +where + Self: xtra::Handler, + O: xtra::Handler + xtra::Handler, + T: xtra::Handler, +{ async fn handle_accept_roll_over( &mut self, order_id: OrderId, @@ -801,63 +864,49 @@ impl Actor { .context("accepted roll_over")?; Ok(()) } +} - async fn handle_reject_roll_over(&mut self, order_id: OrderId) -> Result<()> { - tracing::debug!(%order_id, "Maker rejects a roll_over proposal" ); +impl Actor +where + M: xtra::Handler, +{ + async fn handle_cfd_roll_over_completed( + &mut self, + order_id: OrderId, + dlc: Result, + ) -> Result<()> { + let dlc = dlc.context("Failed to roll over contract with taker")?; + self.roll_over_state = RollOverState::None; - // Validate if order is actually being requested to be extended - let (_, taker_id) = match self.current_pending_proposals.get(&order_id) { - Some(( - UpdateCfdProposal::RollOverProposal { - proposal, - direction: SettlementKind::Incoming, - }, - taker_id, - )) => (proposal, *taker_id), - _ => { - anyhow::bail!("Order is in invalid state. Ignoring reject roll over request.") - } + let mut conn = self.db.acquire().await?; + let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + cfd.state = CfdState::Open { + common: CfdStateCommon::default(), + dlc: dlc.clone(), + attestation: None, + collaborative_close: None, }; + append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; - self.takers - .do_send_async(maker_inc_connections::TakerMessage { - taker_id, - command: TakerCommand::NotifyRollOverRejected { id: order_id }, + self.monitor_actor + .do_send_async(monitor::StartMonitoring { + id: order_id, + params: MonitorParams::from_dlc_and_timelocks(dlc, cfd.refund_timelock_in_blocks()), }) .await?; - self.remove_pending_proposal(&order_id) - .context("rejected roll_over")?; - Ok(()) - } - - async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { - let mut conn = self.db.acquire().await?; - cfd_actors::handle_monitoring_event( - event, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; - Ok(()) - } - - async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { - let mut conn = self.db.acquire().await?; - cfd_actors::handle_oracle_attestation( - attestation, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; Ok(()) } } #[async_trait] -impl Handler for Actor { +impl Handler for Actor +where + Self: xtra::Handler + xtra::Handler, + O: xtra::Handler + xtra::Handler, + T: xtra::Handler + + xtra::Handler, +{ async fn handle(&mut self, msg: CfdAction, ctx: &mut Context) { use CfdAction::*; if let Err(e) = match msg { @@ -875,42 +924,60 @@ impl Handler for Actor { } #[async_trait] -impl Handler for Actor { +impl Handler for Actor +where + O: xtra::Handler, + T: xtra::Handler, +{ async fn handle(&mut self, msg: NewOrder, _ctx: &mut Context) { log_error!(self.handle_new_order(msg.price, msg.min_quantity, msg.max_quantity)); } } #[async_trait] -impl Handler for Actor { +impl Handler for Actor +where + T: xtra::Handler, +{ async fn handle(&mut self, msg: NewTakerOnline, _ctx: &mut Context) { log_error!(self.handle_new_taker_online(msg.id)); } } #[async_trait] -impl Handler for Actor { +impl Handler for Actor +where + O: xtra::Handler, + M: xtra::Handler, +{ async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context) { log_error!(self.handle_cfd_setup_completed(msg.order_id, msg.dlc)); } } #[async_trait] -impl Handler for Actor { +impl Handler for Actor +where + M: xtra::Handler, +{ async fn handle(&mut self, msg: CfdRollOverCompleted, _ctx: &mut Context) { log_error!(self.handle_cfd_roll_over_completed(msg.order_id, msg.dlc)); } } #[async_trait] -impl Handler for Actor { +impl Handler for Actor { async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context) { log_error!(self.handle_monitoring_event(msg)) } } #[async_trait] -impl Handler for Actor { +impl Handler for Actor +where + T: xtra::Handler + + xtra::Handler, +{ async fn handle(&mut self, FromTaker { taker_id, msg }: FromTaker, _ctx: &mut Context) { match msg { wire::TakerToMaker::TakeOrder { order_id, quantity } => { @@ -964,7 +1031,7 @@ impl Handler for Actor { } #[async_trait] -impl Handler for Actor { +impl Handler for Actor { async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context) { log_error!(self.handle_oracle_attestation(msg)) } @@ -994,4 +1061,4 @@ impl Message for FromTaker { type Result = (); } -impl xtra::Actor for Actor {} +impl xtra::Actor for Actor {}