Browse Source

Introduce SendAsyncSafe trait

remove-long-heartbeat-interval-in-debug-mode
Lucas Soriano del Pino 3 years ago
parent
commit
c09b8c4e2d
No known key found for this signature in database GPG Key ID: 89CE0DB40A19D524
  1. 7
      daemon/src/collab_settlement_taker.rs
  2. 1
      daemon/src/lib.rs
  3. 12
      daemon/src/maker_cfd.rs
  4. 57
      daemon/src/send_async_safe.rs

7
daemon/src/collab_settlement_taker.rs

@ -8,6 +8,7 @@ use crate::model::cfd::SettlementKind;
use crate::model::cfd::SettlementProposal;
use crate::model::Price;
use crate::projection;
use crate::send_async_safe::SendAsyncSafe;
use crate::wire;
use anyhow::Context;
use anyhow::Result;
@ -80,12 +81,8 @@ impl Actor {
let (tx, sig) = dlc.close_transaction(&self.proposal)?;
// Need to use `do_send_async` here because this handler is called in
// context of a message arriving over the wire, and would result in a
// deadlock otherwise.
#[allow(clippy::disallowed_method)]
self.connection
.do_send_async(wire::TakerToMaker::Settlement {
.send_async_safe(wire::TakerToMaker::Settlement {
order_id,
msg: wire::taker_to_maker::Settlement::Initiate { sig_taker: sig },
})

1
daemon/src/lib.rs

@ -53,6 +53,7 @@ pub mod rollover_maker;
pub mod rollover_taker;
pub mod routes;
pub mod seed;
pub mod send_async_safe;
pub mod send_to_socket;
pub mod setup_contract;
pub mod setup_maker;

12
daemon/src/maker_cfd.rs

@ -27,6 +27,7 @@ use crate::projection;
use crate::projection::Update;
use crate::rollover_maker;
use crate::rollover_maker::Completed;
use crate::send_async_safe::SendAsyncSafe;
use crate::setup_maker;
use crate::wallet;
use crate::wire;
@ -205,11 +206,8 @@ where
T: xtra::Handler<maker_inc_connections::TakerMessage>,
{
async fn handle_taker_connected(&mut self, taker_id: Identity) -> Result<()> {
// Need to use `do_send_async` here because we are being invoked from the
// `maker_inc_connections::Actor`. Using `send` would result in a deadlock.
#[allow(clippy::disallowed_method)]
self.takers
.do_send_async(maker_inc_connections::TakerMessage {
.send_async_safe(maker_inc_connections::TakerMessage {
taker_id,
msg: wire::MakerToTaker::CurrentOrder(self.current_order.clone()),
})
@ -413,12 +411,8 @@ where
// have to remove the current order.
self.current_order = None;
// Need to use `do_send_async` here because invoking the
// corresponding handler can result in a deadlock with another
// invocation in `maker_inc_connections.rs`
#[allow(clippy::disallowed_method)]
self.takers
.do_send_async(maker_inc_connections::BroadcastOrder(None))
.send_async_safe(maker_inc_connections::BroadcastOrder(None))
.await?;
self.projection_actor.send(projection::Update(None)).await?;

57
daemon/src/send_async_safe.rs

@ -0,0 +1,57 @@
use async_trait::async_trait;
#[async_trait]
pub trait SendAsyncSafe<M, R>
where
M: xtra::Message<Result = R>,
{
/// Send a message to an actor without waiting for them to handle
/// it.
///
/// As soon as this method returns, we know if the receiving actor
/// is alive. If they are, the message we are sending will
/// eventually be handled by them, but we don't wait for them to
/// do so.
async fn send_async_safe(&self, msg: M) -> Result<(), xtra::Disconnected>;
}
#[async_trait]
impl<A, M> SendAsyncSafe<M, ()> for xtra::Address<A>
where
A: xtra::Handler<M>,
M: xtra::Message<Result = ()>,
{
async fn send_async_safe(&self, msg: M) -> Result<(), xtra::Disconnected> {
#[allow(clippy::disallowed_method)]
self.do_send_async(msg).await
}
}
#[async_trait]
impl<A, M, E> SendAsyncSafe<M, Result<(), E>> for xtra::Address<A>
where
A: xtra::Handler<M>,
M: xtra::Message<Result = Result<(), E>>,
E: std::error::Error + Send,
{
async fn send_async_safe(&self, msg: M) -> Result<(), xtra::Disconnected> {
if !self.is_connected() {
return Err(xtra::Disconnected);
}
let send_fut = self.send(msg);
#[allow(clippy::disallowed_method)]
tokio::spawn(async {
let e = match send_fut.await {
Ok(Err(e)) => format!("{:#}", e),
Err(e) => format!("{:#}", e),
Ok(Ok(())) => return,
};
tracing::warn!("Async message invocation failed: {:#}", e)
});
Ok(())
}
}
Loading…
Cancel
Save