From cf004aa84e9a3835991f2955275661f8e395f23e Mon Sep 17 00:00:00 2001 From: Lucas Soriano del Pino Date: Fri, 10 Dec 2021 11:49:53 +1100 Subject: [PATCH] Phase out log_error macro In general, we would like to let the sender of an `xtra::Message` to be able to handle all the possible errors. The `log_error` macro catered to the complete opposite use case, so we remove it. In order to make error handling at the sending site more ergonomic, we introduce a `LogFailure` trait designed for fallible `xtra::Messages`. With this patch we've tried to change _where_ we handle errors, but not _what_ we do with them. --- daemon/src/actors.rs | 9 - daemon/src/collab_settlement_maker.rs | 7 +- daemon/src/connection.rs | 13 +- daemon/src/lib.rs | 2 +- daemon/src/maker_cfd.rs | 247 ++++++++++++++------------ daemon/src/maker_inc_connections.rs | 9 +- daemon/src/monitor.rs | 19 +- daemon/src/oracle.rs | 14 +- daemon/src/rollover_maker.rs | 6 +- daemon/src/setup_maker.rs | 7 +- daemon/src/setup_taker.rs | 13 +- daemon/src/taker_cfd.rs | 21 +-- daemon/src/xtra_ext.rs | 50 ++++++ daemon/tests/harness/mocks/monitor.rs | 5 +- daemon/tests/harness/mod.rs | 10 +- 15 files changed, 267 insertions(+), 165 deletions(-) delete mode 100644 daemon/src/actors.rs create mode 100644 daemon/src/xtra_ext.rs diff --git a/daemon/src/actors.rs b/daemon/src/actors.rs deleted file mode 100644 index c97159d..0000000 --- a/daemon/src/actors.rs +++ /dev/null @@ -1,9 +0,0 @@ -/// Wrapper for handlers to log errors -#[macro_export] -macro_rules! log_error { - ($future:expr) => { - if let Err(e) = $future.await { - tracing::error!("Message handler failed: {:#}", e); - } - }; -} diff --git a/daemon/src/collab_settlement_maker.rs b/daemon/src/collab_settlement_maker.rs index 407eb4b..30da89c 100644 --- a/daemon/src/collab_settlement_maker.rs +++ b/daemon/src/collab_settlement_maker.rs @@ -9,6 +9,7 @@ use crate::model::cfd::SettlementKind; use crate::model::cfd::SettlementProposal; use crate::model::Identity; use crate::projection; +use crate::xtra_ext::LogFailure; use anyhow::Context; use async_trait::async_trait; use bdk::bitcoin::Script; @@ -172,7 +173,11 @@ impl Actor { } async fn complete(&mut self, completed: Completed, ctx: &mut xtra::Context) { - let _ = self.on_completed.send(completed).await; + let _ = self + .on_completed + .send(completed) + .log_failure("Failed to inform about collab settlement completion") + .await; ctx.stop(); } diff --git a/daemon/src/connection.rs b/daemon/src/connection.rs index dfe0598..855db38 100644 --- a/daemon/src/connection.rs +++ b/daemon/src/connection.rs @@ -1,7 +1,6 @@ use crate::address_map::AddressMap; use crate::address_map::Stopping; use crate::collab_settlement_taker; -use crate::log_error; use crate::model::cfd::OrderId; use crate::model::Identity; use crate::model::Price; @@ -17,6 +16,7 @@ use crate::wire; use crate::wire::EncryptedJsonCodec; use crate::wire::TakerToMaker; use crate::wire::Version; +use crate::xtra_ext::LogFailure; use crate::Tasks; use anyhow::bail; use anyhow::Context; @@ -146,7 +146,10 @@ impl Actor { #[xtra_productivity(message_impl = false)] impl Actor { async fn handle_taker_to_maker(&mut self, message: wire::TakerToMaker) { - log_error!(self.send_to_maker.send(message)); + let msg_str = message.to_string(); + if self.send_to_maker.send(message).await.is_err() { + tracing::warn!("Failed to send wire message {} to maker", msg_str); + } } async fn handle_collab_settlement_actor_stopping( @@ -430,7 +433,11 @@ impl Actor { } } wire::MakerToTaker::CurrentOrder(msg) => { - log_error!(self.current_order.send(CurrentOrder(msg))); + let _ = self + .current_order + .send(CurrentOrder(msg)) + .log_failure("Failed to forward current order from maker") + .await; } wire::MakerToTaker::Hello(_) => { tracing::warn!("Ignoring unexpected Hello message from maker. Hello is only expected when opening a new connection.") diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index b415051..5f393f4 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -26,7 +26,6 @@ use xtra::Address; pub mod sqlx_ext; // Must come first because it is a macro. -pub mod actors; pub mod address_map; pub mod auth; pub mod auto_rollover; @@ -67,6 +66,7 @@ pub mod try_continue; pub mod tx; pub mod wallet; pub mod wire; +pub mod xtra_ext; // Certain operations (e.g. contract setup) take long time in debug mode, // causing us to lag behind in processing heartbeats. diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index ecb56b9..f1bc4d9 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -5,7 +5,6 @@ use crate::cfd_actors::insert_cfd_and_update_feed; use crate::cfd_actors::{self}; use crate::collab_settlement_maker; use crate::db::load_cfd; -use crate::log_error; use crate::maker_inc_connections; use crate::model::cfd::Cfd; use crate::model::cfd::CfdState; @@ -557,49 +556,50 @@ where M: xtra::Handler, W: xtra::Handler, { - async fn handle_settlement_completed(&mut self, msg: collab_settlement_maker::Completed) { - log_error!(async { - use collab_settlement_maker::Completed::*; - let (order_id, settlement, script_pubkey) = match msg { - Confirmed { - order_id, - settlement, - script_pubkey, - } => (order_id, settlement, script_pubkey), - Rejected { .. } => { - return Ok(()); - } - Failed { order_id, error } => { - tracing::warn!(%order_id, "Collaborative settlement failed: {:#}", error); - return Ok(()); - } - }; + async fn handle_settlement_completed( + &mut self, + msg: collab_settlement_maker::Completed, + ) -> Result<()> { + use collab_settlement_maker::Completed::*; + let (order_id, settlement, script_pubkey) = match msg { + Confirmed { + order_id, + settlement, + script_pubkey, + } => (order_id, settlement, script_pubkey), + Rejected { .. } => { + return Ok(()); + } + Failed { order_id, error } => { + tracing::warn!(%order_id, "Collaborative settlement failed: {:#}", error); + return Ok(()); + } + }; - let mut conn = self.db.acquire().await?; - let mut cfd = load_cfd(order_id, &mut conn).await?; + let mut conn = self.db.acquire().await?; + let mut cfd = load_cfd(order_id, &mut conn).await?; - let tx = settlement.tx.clone(); - cfd.handle_proposal_signed(settlement) - .context("Failed to update state with collaborative settlement")?; + let tx = settlement.tx.clone(); + cfd.handle_proposal_signed(settlement) + .context("Failed to update state with collaborative settlement")?; - append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; - let txid = self - .wallet - .send(wallet::TryBroadcastTransaction { tx }) - .await? - .context("Broadcasting close transaction")?; - tracing::info!(%order_id, "Close transaction published with txid {}", txid); + let txid = self + .wallet + .send(wallet::TryBroadcastTransaction { tx }) + .await? + .context("Broadcasting close transaction")?; + tracing::info!(%order_id, "Close transaction published with txid {}", txid); - self.monitor_actor - .send(monitor::CollaborativeSettlement { - order_id, - tx: (txid, script_pubkey), - }) - .await?; + self.monitor_actor + .send(monitor::CollaborativeSettlement { + order_id, + tx: (txid, script_pubkey), + }) + .await?; - anyhow::Ok(()) - }); + Ok(()) } } @@ -754,56 +754,54 @@ where M: xtra::Handler, W: xtra::Handler, { - async fn handle_setup_completed(&mut self, msg: setup_maker::Completed) { - log_error!(async { - use setup_maker::Completed::*; - let (order_id, dlc) = match msg { - NewContract { order_id, dlc } => (order_id, dlc), - Failed { order_id, error } => { - self.append_cfd_state_setup_failed(order_id, error).await?; - return anyhow::Ok(()); - } - Rejected(order_id) => { - self.append_cfd_state_rejected(order_id).await?; - return anyhow::Ok(()); - } - }; + async fn handle_setup_completed(&mut self, msg: setup_maker::Completed) -> Result<()> { + use setup_maker::Completed::*; + let (order_id, dlc) = match msg { + NewContract { order_id, dlc } => (order_id, dlc), + Failed { order_id, error } => { + self.append_cfd_state_setup_failed(order_id, error).await?; + return anyhow::Ok(()); + } + Rejected(order_id) => { + self.append_cfd_state_rejected(order_id).await?; + return anyhow::Ok(()); + } + }; - let mut conn = self.db.acquire().await?; - let mut cfd = load_cfd(order_id, &mut conn).await?; + let mut conn = self.db.acquire().await?; + let mut cfd = load_cfd(order_id, &mut conn).await?; - *cfd.state_mut() = CfdState::PendingOpen { - common: CfdStateCommon::default(), - dlc: dlc.clone(), - attestation: None, - }; + *cfd.state_mut() = CfdState::PendingOpen { + common: CfdStateCommon::default(), + dlc: dlc.clone(), + attestation: None, + }; - append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; - let txid = self - .wallet - .send(wallet::TryBroadcastTransaction { - tx: dlc.lock.0.clone(), - }) - .await??; + let txid = self + .wallet + .send(wallet::TryBroadcastTransaction { + tx: dlc.lock.0.clone(), + }) + .await??; - tracing::info!("Lock transaction published with txid {}", txid); + tracing::info!("Lock transaction published with txid {}", txid); - self.monitor_actor - .send(monitor::StartMonitoring { - id: order_id, - params: MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()), - }) - .await?; + self.monitor_actor + .send(monitor::StartMonitoring { + id: order_id, + params: MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()), + }) + .await?; - self.oracle_actor - .send(oracle::MonitorAttestation { - event_id: dlc.settlement_event_id, - }) - .await?; + self.oracle_actor + .send(oracle::MonitorAttestation { + event_id: dlc.settlement_event_id, + }) + .await?; - Ok(()) - }); + Ok(()) } } @@ -812,8 +810,8 @@ impl Handler for where T: xtra::Handler, { - async fn handle(&mut self, msg: TakerConnected, _ctx: &mut Context) { - log_error!(self.handle_taker_connected(msg.id)); + async fn handle(&mut self, msg: TakerConnected, _ctx: &mut Context) -> Result<()> { + self.handle_taker_connected(msg.id).await } } @@ -823,8 +821,8 @@ impl Handler where T: xtra::Handler, { - async fn handle(&mut self, msg: TakerDisconnected, _ctx: &mut Context) { - log_error!(self.handle_taker_disconnected(msg.id)); + async fn handle(&mut self, msg: TakerDisconnected, _ctx: &mut Context) -> Result<()> { + self.handle_taker_disconnected(msg.id).await } } @@ -834,8 +832,8 @@ where M: xtra::Handler, O: xtra::Handler, { - async fn handle(&mut self, msg: Completed, _ctx: &mut Context) { - log_error!(self.handle_roll_over_completed(msg)); + async fn handle(&mut self, msg: Completed, _ctx: &mut Context) -> Result<()> { + self.handle_roll_over_completed(msg).await } } @@ -844,8 +842,8 @@ impl Handler for where W: xtra::Handler, { - async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context) { - log_error!(self.handle_monitoring_event(msg)) + async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context) -> Result<()> { + self.handle_monitoring_event(msg).await } } @@ -866,10 +864,15 @@ where + xtra::Handler + xtra::Handler, { - async fn handle(&mut self, FromTaker { taker_id, msg }: FromTaker, ctx: &mut Context) { + async fn handle( + &mut self, + FromTaker { taker_id, msg }: FromTaker, + ctx: &mut Context, + ) -> Result<()> { match msg { wire::TakerToMaker::TakeOrder { order_id, quantity } => { - log_error!(self.handle_take_order(taker_id, order_id, quantity, ctx)) + self.handle_take_order(taker_id, order_id, quantity, ctx) + .await? } wire::TakerToMaker::Settlement { order_id, @@ -881,17 +884,22 @@ where price, }, } => { - log_error!(self.handle_propose_settlement( - taker_id, - SettlementProposal { - order_id, - timestamp, - taker, - maker, - price - }, - ctx - )) + if let Err(e) = self + .handle_propose_settlement( + taker_id, + SettlementProposal { + order_id, + timestamp, + taker, + maker, + price, + }, + ctx, + ) + .await + { + tracing::warn!("Failed ot handle settlement proposal: {:#}", e); + } } wire::TakerToMaker::Settlement { msg: wire::taker_to_maker::Settlement::Initiate { .. }, @@ -903,14 +911,19 @@ where order_id, timestamp, } => { - log_error!(self.handle_propose_roll_over( - RolloverProposal { - order_id, - timestamp, - }, - taker_id, - ctx - )) + if let Err(e) = self + .handle_propose_roll_over( + RolloverProposal { + order_id, + timestamp, + }, + taker_id, + ctx, + ) + .await + { + tracing::warn!("Failed to handle rollover proposal: {:#}", e); + } } wire::TakerToMaker::RollOverProtocol { .. } => { unreachable!("This kind of message should be sent to the rollover_maker::Actor`") @@ -921,7 +934,9 @@ where TakerToMaker::Hello(_) => { unreachable!("The Hello message is not sent to the cfd actor") } - } + }; + + Ok(()) } } @@ -932,24 +947,26 @@ where W: xtra::Handler, { async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context) { - log_error!(self.handle_oracle_attestation(msg)) + if let Err(e) = self.handle_oracle_attestation(msg).await { + tracing::warn!("Failed to handle oracle attestation: {:#}", e) + } } } impl Message for TakerConnected { - type Result = (); + type Result = Result<()>; } impl Message for TakerDisconnected { - type Result = (); + type Result = Result<()>; } impl Message for Completed { - type Result = (); + type Result = Result<()>; } impl Message for FromTaker { - type Result = (); + type Result = Result<()>; } impl xtra::Actor for Actor {} diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index cc78c3b..c7b84ee 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -20,6 +20,7 @@ use crate::wire::EncryptedJsonCodec; use crate::wire::MakerToTaker; use crate::wire::TakerToMaker; use crate::wire::Version; +use crate::xtra_ext::LogFailure; use crate::Tasks; use anyhow::bail; use anyhow::Context; @@ -138,6 +139,7 @@ impl Actor { let _ = self .taker_disconnected_channel .send(maker_cfd::TakerDisconnected { id: *taker_id }) + .log_failure("Failed to inform about taker disconnect") .await; let _ = self.connection_tasks.remove(taker_id); } @@ -219,7 +221,7 @@ impl Actor { let this = ctx.address().expect("self to be alive"); let read_fut = async move { while let Ok(Some(msg)) = read.try_next().await { - let res = this.send(FromTaker { taker_id, msg }).await; + let res = this.send(FromTaker { taker_id, msg }).log_failure("").await; if res.is_err() { break; @@ -249,6 +251,7 @@ impl Actor { let _ = self .taker_connected_channel .send(maker_cfd::TakerConnected { id: taker_id }) + .log_failure("Failed to report new taker connection") .await; Ok(()) @@ -342,7 +345,7 @@ impl Actor { #[xtra_productivity(message_impl = false)] impl Actor { - async fn handle_msg_from_taker(&mut self, msg: FromTaker) { + async fn handle_msg_from_taker(&mut self, msg: FromTaker) -> Result<()> { use wire::TakerToMaker::*; match msg.msg { Protocol { order_id, msg } => match self.setup_actors.get_connected(&order_id) { @@ -380,6 +383,8 @@ impl Actor { let _ = self.taker_msg_channel.send(msg); } } + + Ok(()) } async fn handle_setup_actor_stopping(&mut self, message: Stopping) { diff --git a/daemon/src/monitor.rs b/daemon/src/monitor.rs index 34b4e84..a1bce8f 100644 --- a/daemon/src/monitor.rs +++ b/daemon/src/monitor.rs @@ -1,4 +1,3 @@ -use crate::log_error; use crate::model; use crate::model::cfd::CetStatus; use crate::model::cfd::Cfd; @@ -333,7 +332,7 @@ where Ok(()) } - async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { + async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) { for (order_id, MonitorParams { cets, .. }) in self .cfds .clone() @@ -342,8 +341,6 @@ where { try_continue!(self.monitor_cet_finality(cets, attestation.clone(), order_id)) } - - Ok(()) } async fn update_state( @@ -433,7 +430,7 @@ where for (target_status, event) in reached_monitoring_target { tracing::info!(%txid, target = %target_status, current = %status, "Bitcoin transaction reached monitoring target"); - self.event_channel.send(event).await?; + self.event_channel.send(event).await??; } } } @@ -628,7 +625,7 @@ fn map_cets( } impl xtra::Message for Event { - type Result = (); + type Result = Result<()>; } impl xtra::Message for Sync { @@ -669,14 +666,16 @@ where C: bdk::electrum_client::ElectrumApi + Send + 'static, { async fn handle(&mut self, _: Sync, _ctx: &mut xtra::Context) { - log_error!(self.sync()); + if let Err(e) = self.sync().await { + tracing::warn!("Sync failed: {:#}", e); + } } } #[async_trait] impl xtra::Handler for Actor { async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut xtra::Context) { - log_error!(self.handle_oracle_attestation(msg)); + self.handle_oracle_attestation(msg).await } } @@ -852,8 +851,10 @@ mod tests { #[async_trait] impl xtra::Handler for MessageRecordingActor { - async fn handle(&mut self, message: Event, _ctx: &mut xtra::Context) { + async fn handle(&mut self, message: Event, _ctx: &mut xtra::Context) -> Result<()> { self.events.push(message); + + Ok(()) } } diff --git a/daemon/src/oracle.rs b/daemon/src/oracle.rs index 820e0f4..90c0f19 100644 --- a/daemon/src/oracle.rs +++ b/daemon/src/oracle.rs @@ -1,9 +1,9 @@ -use crate::log_error; use crate::model::cfd::Cfd; use crate::model::cfd::CfdState; use crate::model::BitMexPriceEventId; use crate::tokio_ext; use crate::try_continue; +use crate::xtra_ext::LogFailure; use anyhow::Context; use anyhow::Result; use async_trait::async_trait; @@ -188,6 +188,7 @@ impl Actor { id: event_id, attestation, }) + .log_failure("Failed to send attestation to oracle::Actor") .await?; Ok(()) @@ -257,8 +258,13 @@ pub struct NoAnnouncement(pub BitMexPriceEventId); #[async_trait] impl xtra::Handler for Actor { - async fn handle(&mut self, msg: NewAttestationFetched, _ctx: &mut xtra::Context) { - log_error!(self.handle_new_attestation_fetched(msg.id, msg.attestation)); + async fn handle( + &mut self, + msg: NewAttestationFetched, + _ctx: &mut xtra::Context, + ) -> Result<()> { + self.handle_new_attestation_fetched(msg.id, msg.attestation) + .await } } @@ -305,7 +311,7 @@ impl xtra::Message for Attestation { } impl xtra::Message for NewAttestationFetched { - type Result = (); + type Result = Result<()>; } mod olivia_api { diff --git a/daemon/src/rollover_maker.rs b/daemon/src/rollover_maker.rs index 4e07d8a..47ab247 100644 --- a/daemon/src/rollover_maker.rs +++ b/daemon/src/rollover_maker.rs @@ -20,6 +20,7 @@ use crate::tokio_ext::spawn_fallible; use crate::wire; use crate::wire::MakerToTaker; use crate::wire::RollOverMsg; +use crate::xtra_ext::LogFailure; use crate::Cfd; use crate::Stopping; use anyhow::Context as _; @@ -140,7 +141,10 @@ impl Actor { order_id: self.cfd.id(), dlc, }; - self.maker_cfd_actor.send(msg).await?; + self.maker_cfd_actor + .send(msg) + .log_failure("Failed to report rollover completion") + .await?; ctx.stop(); Ok(()) } diff --git a/daemon/src/setup_maker.rs b/daemon/src/setup_maker.rs index 69db083..57dcddf 100644 --- a/daemon/src/setup_maker.rs +++ b/daemon/src/setup_maker.rs @@ -14,6 +14,7 @@ use crate::tokio_ext::spawn_fallible; use crate::wallet; use crate::wire; use crate::wire::SetupMsg; +use crate::xtra_ext::LogFailure; use anyhow::Context; use anyhow::Result; use async_trait::async_trait; @@ -121,7 +122,11 @@ impl Actor { } async fn complete(&mut self, completed: Completed, ctx: &mut xtra::Context) { - let _ = self.on_completed.send(completed).await; + let _ = self + .on_completed + .send(completed) + .log_failure("Failed to inform about contract setup completion") + .await; ctx.stop(); } diff --git a/daemon/src/setup_taker.rs b/daemon/src/setup_taker.rs index 1ca4296..bc5d4ec 100644 --- a/daemon/src/setup_taker.rs +++ b/daemon/src/setup_taker.rs @@ -13,6 +13,7 @@ use crate::tokio_ext::spawn_fallible; use crate::wallet; use crate::wire; use crate::wire::SetupMsg; +use crate::xtra_ext::LogFailure; use anyhow::Context; use anyhow::Result; use async_trait::async_trait; @@ -74,7 +75,10 @@ impl Actor { // inform the `taker_cfd::Actor` about the start of contract // setup, so that the db and UI can be updated accordingly - self.on_accepted.send(Started(order_id)).await?; + self.on_accepted + .send(Started(order_id)) + .log_failure("Failed to inform about contract setup start") + .await?; let (sender, receiver) = mpsc::unbounded::(); // store the writing end to forward messages from the maker to @@ -124,6 +128,7 @@ impl Actor { self.on_completed .send(Completed::Rejected { order_id }) + .log_failure("Failed to inform about contract setup rejection") .await?; ctx.stop(); @@ -147,6 +152,7 @@ impl Actor { order_id: msg.order_id, dlc: msg.dlc, }) + .log_failure("Failed to inform about contract setup completion") .await?; ctx.stop(); @@ -160,6 +166,7 @@ impl Actor { order_id: msg.order_id, error: msg.error, }) + .log_failure("Failed to inform about contract setup failure") .await?; ctx.stop(); @@ -240,11 +247,11 @@ impl Rejected { } impl xtra::Message for Started { - type Result = (); + type Result = Result<()>; } impl xtra::Message for Completed { - type Result = (); + type Result = Result<()>; } impl address_map::ActorName for Actor { diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 27cbea3..17b132c 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -5,7 +5,6 @@ use crate::cfd_actors::{self}; use crate::collab_settlement_taker; use crate::connection; use crate::db::load_cfd; -use crate::log_error; use crate::model::cfd::Cfd; use crate::model::cfd::CfdState; use crate::model::cfd::CfdStateCommon; @@ -412,8 +411,8 @@ where #[xtra_productivity] impl Actor { - async fn handle_current_order(&mut self, msg: CurrentOrder) { - log_error!(self.handle_new_order(msg.0)); + async fn handle_current_order(&mut self, msg: CurrentOrder) -> Result<()> { + self.handle_new_order(msg.0).await } } @@ -424,8 +423,8 @@ where M: xtra::Handler, W: xtra::Handler, { - async fn handle(&mut self, msg: Completed, _ctx: &mut Context) { - log_error!(self.handle_setup_completed(msg)) + async fn handle(&mut self, msg: Completed, _ctx: &mut Context) -> Result<()> { + self.handle_setup_completed(msg).await } } @@ -434,8 +433,8 @@ impl Handler for Actor, { - async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context) { - log_error!(self.handle_monitoring_event(msg)) + async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context) -> Result<()> { + self.handle_monitoring_event(msg).await } } @@ -445,14 +444,16 @@ where W: xtra::Handler, { async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context) { - log_error!(self.handle_oracle_attestation(msg)) + if let Err(e) = self.handle_oracle_attestation(msg).await { + tracing::warn!("Failed to handle oracle attestation: {:#}", e) + } } } #[async_trait] impl Handler for Actor { - async fn handle(&mut self, msg: setup_taker::Started, _ctx: &mut Context) { - log_error!(self.handle_setup_started(msg.0)) + async fn handle(&mut self, msg: setup_taker::Started, _ctx: &mut Context) -> Result<()> { + self.handle_setup_started(msg.0).await } } diff --git a/daemon/src/xtra_ext.rs b/daemon/src/xtra_ext.rs new file mode 100644 index 0000000..d05110e --- /dev/null +++ b/daemon/src/xtra_ext.rs @@ -0,0 +1,50 @@ +use async_trait::async_trait; +use xtra::address; +use xtra::message_channel; +use xtra::Actor; +use xtra::Disconnected; +use xtra::Message; + +#[async_trait] +pub trait LogFailure { + async fn log_failure(self, context: &str) -> Result<(), Disconnected>; +} + +#[async_trait] +impl LogFailure for address::SendFuture +where + A: Actor, + M: Message>, +{ + async fn log_failure(self, context: &str) -> Result<(), Disconnected> { + if let Err(e) = self.await? { + tracing::warn!( + "{}: Message handler for message {} failed: {:#}", + context, + std::any::type_name::(), + e + ); + } + + Ok(()) + } +} + +#[async_trait] +impl LogFailure for message_channel::SendFuture +where + M: xtra::Message>, +{ + async fn log_failure(self, context: &str) -> Result<(), Disconnected> { + if let Err(e) = self.await? { + tracing::warn!( + "{}: Message handler for message {} failed: {:#}", + context, + std::any::type_name::(), + e + ); + } + + Ok(()) + } +} diff --git a/daemon/tests/harness/mocks/monitor.rs b/daemon/tests/harness/mocks/monitor.rs index b3934ee..5af2fcd 100644 --- a/daemon/tests/harness/mocks/monitor.rs +++ b/daemon/tests/harness/mocks/monitor.rs @@ -1,8 +1,7 @@ -use std::sync::Arc; - use daemon::monitor; use daemon::oracle; use mockall::*; +use std::sync::Arc; use tokio::sync::Mutex; use xtra_productivity::xtra_productivity; @@ -30,7 +29,7 @@ impl MonitorActor { } async fn handle(&mut self, msg: oracle::Attestation) { - self.mock.lock().await.oracle_attestation(msg) + self.mock.lock().await.oracle_attestation(msg); } } diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index c448dd2..d5b48b5 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -367,9 +367,13 @@ impl Taker { #[macro_export] macro_rules! deliver_event { ($maker:expr, $taker:expr, $event:expr) => { - tracing::debug!("Delivering event: {:?}", $event); - $taker.system.cfd_actor_addr.send($event).await.unwrap(); - $maker.system.cfd_actor_addr.send($event).await.unwrap(); + #[allow(unused_must_use)] + { + tracing::debug!("Delivering event: {:?}", $event); + + $taker.system.cfd_actor_addr.send($event).await.unwrap(); + $maker.system.cfd_actor_addr.send($event).await.unwrap(); + } }; }