diff --git a/daemon/src/collab_settlement_taker.rs b/daemon/src/collab_settlement_taker.rs index 4cf651a..8605646 100644 --- a/daemon/src/collab_settlement_taker.rs +++ b/daemon/src/collab_settlement_taker.rs @@ -8,6 +8,7 @@ use crate::model::cfd::SettlementKind; use crate::model::cfd::SettlementProposal; use crate::model::Price; use crate::projection; +use crate::send_async_safe::SendAsyncSafe; use crate::wire; use anyhow::Context; use anyhow::Result; @@ -80,12 +81,8 @@ impl Actor { let (tx, sig) = dlc.close_transaction(&self.proposal)?; - // Need to use `do_send_async` here because this handler is called in - // context of a message arriving over the wire, and would result in a - // deadlock otherwise. - #[allow(clippy::disallowed_method)] self.connection - .do_send_async(wire::TakerToMaker::Settlement { + .send_async_safe(wire::TakerToMaker::Settlement { order_id, msg: wire::taker_to_maker::Settlement::Initiate { sig_taker: sig }, }) diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 3b10ca9..d81d8b6 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -53,6 +53,7 @@ pub mod rollover_maker; pub mod rollover_taker; pub mod routes; pub mod seed; +pub mod send_async_safe; pub mod send_to_socket; pub mod setup_contract; pub mod setup_maker; diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 136abc9..55ba5f8 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -27,6 +27,7 @@ use crate::projection; use crate::projection::Update; use crate::rollover_maker; use crate::rollover_maker::Completed; +use crate::send_async_safe::SendAsyncSafe; use crate::setup_maker; use crate::wallet; use crate::wire; @@ -205,11 +206,8 @@ where T: xtra::Handler, { async fn handle_taker_connected(&mut self, taker_id: Identity) -> Result<()> { - // Need to use `do_send_async` here because we are being invoked from the - // `maker_inc_connections::Actor`. Using `send` would result in a deadlock. - #[allow(clippy::disallowed_method)] self.takers - .do_send_async(maker_inc_connections::TakerMessage { + .send_async_safe(maker_inc_connections::TakerMessage { taker_id, msg: wire::MakerToTaker::CurrentOrder(self.current_order.clone()), }) @@ -413,12 +411,8 @@ where // have to remove the current order. self.current_order = None; - // Need to use `do_send_async` here because invoking the - // corresponding handler can result in a deadlock with another - // invocation in `maker_inc_connections.rs` - #[allow(clippy::disallowed_method)] self.takers - .do_send_async(maker_inc_connections::BroadcastOrder(None)) + .send_async_safe(maker_inc_connections::BroadcastOrder(None)) .await?; self.projection_actor.send(projection::Update(None)).await?; diff --git a/daemon/src/send_async_safe.rs b/daemon/src/send_async_safe.rs new file mode 100644 index 0000000..ab5db2f --- /dev/null +++ b/daemon/src/send_async_safe.rs @@ -0,0 +1,57 @@ +use async_trait::async_trait; + +#[async_trait] +pub trait SendAsyncSafe +where + M: xtra::Message, +{ + /// Send a message to an actor without waiting for them to handle + /// it. + /// + /// As soon as this method returns, we know if the receiving actor + /// is alive. If they are, the message we are sending will + /// eventually be handled by them, but we don't wait for them to + /// do so. + async fn send_async_safe(&self, msg: M) -> Result<(), xtra::Disconnected>; +} + +#[async_trait] +impl SendAsyncSafe for xtra::Address +where + A: xtra::Handler, + M: xtra::Message, +{ + async fn send_async_safe(&self, msg: M) -> Result<(), xtra::Disconnected> { + #[allow(clippy::disallowed_method)] + self.do_send_async(msg).await + } +} + +#[async_trait] +impl SendAsyncSafe> for xtra::Address +where + A: xtra::Handler, + M: xtra::Message>, + E: std::error::Error + Send, +{ + async fn send_async_safe(&self, msg: M) -> Result<(), xtra::Disconnected> { + if !self.is_connected() { + return Err(xtra::Disconnected); + } + + let send_fut = self.send(msg); + + #[allow(clippy::disallowed_method)] + tokio::spawn(async { + let e = match send_fut.await { + Ok(Err(e)) => format!("{:#}", e), + Err(e) => format!("{:#}", e), + Ok(Ok(())) => return, + }; + + tracing::warn!("Async message invocation failed: {:#}", e) + }); + + Ok(()) + } +}