Browse Source

Modify AddressMap::send API

resilient-broadcast
luckysori 3 years ago
committed by Lucas Soriano del Pino
parent
commit
1595cb0949
No known key found for this signature in database GPG Key ID: 89CE0DB40A19D524
  1. 12
      daemon/src/address_map.rs
  2. 15
      daemon/src/connection.rs
  3. 7
      daemon/src/maker_cfd.rs

12
daemon/src/address_map.rs

@ -46,7 +46,7 @@ where
} }
/// Sends a message to the actor stored with the given key. /// Sends a message to the actor stored with the given key.
pub async fn send<M>(&self, key: &K, msg: M) -> bool pub async fn send<M>(&self, key: &K, msg: M) -> Result<(), NotConnected>
where where
M: Message<Result = ()>, M: Message<Result = ()>,
A: Handler<M>, A: Handler<M>,
@ -56,15 +56,17 @@ where
addr.send(msg) addr.send(msg)
.await .await
.expect("we checked that we are connected"); .expect("we checked that we are connected");
Ok(())
true
} }
Some(_) => false, _ => Err(NotConnected),
None => false,
} }
} }
} }
#[derive(thiserror::Error, Debug)]
#[error("Receiving actor is down")]
pub struct NotConnected;
/// A message to notify that an actor instance is stopping. /// A message to notify that an actor instance is stopping.
pub struct Stopping<A> { pub struct Stopping<A> {
pub me: Address<A>, pub me: Address<A>,

15
daemon/src/connection.rs

@ -369,7 +369,12 @@ impl Actor {
} }
} }
wire::MakerToTaker::Settlement { order_id, msg } => { wire::MakerToTaker::Settlement { order_id, msg } => {
if !self.collab_settlement_actors.send(&order_id, msg).await { if self
.collab_settlement_actors
.send(&order_id, msg)
.await
.is_err()
{
tracing::warn!(%order_id, "No active collaborative settlement"); tracing::warn!(%order_id, "No active collaborative settlement");
} }
} }
@ -377,28 +382,30 @@ impl Actor {
order_id, order_id,
oracle_event_id, oracle_event_id,
} => { } => {
if !self if self
.rollover_actors .rollover_actors
.send( .send(
&order_id, &order_id,
rollover_taker::RollOverAccepted { oracle_event_id }, rollover_taker::RollOverAccepted { oracle_event_id },
) )
.await .await
.is_err()
{ {
tracing::warn!(%order_id, "No active rollover"); tracing::warn!(%order_id, "No active rollover");
} }
} }
wire::MakerToTaker::RejectRollOver(order_id) => { wire::MakerToTaker::RejectRollOver(order_id) => {
if !self if self
.rollover_actors .rollover_actors
.send(&order_id, rollover_taker::RollOverRejected) .send(&order_id, rollover_taker::RollOverRejected)
.await .await
.is_err()
{ {
tracing::warn!(%order_id, "No active rollover"); tracing::warn!(%order_id, "No active rollover");
} }
} }
wire::MakerToTaker::RollOverProtocol { order_id, msg } => { wire::MakerToTaker::RollOverProtocol { order_id, msg } => {
if !self.rollover_actors.send(&order_id, msg).await { if self.rollover_actors.send(&order_id, msg).await.is_err() {
tracing::warn!(%order_id, "No active rollover"); tracing::warn!(%order_id, "No active rollover");
} }
} }

7
daemon/src/maker_cfd.rs

@ -523,13 +523,10 @@ impl<O, M, T, W> Actor<O, M, T, W> {
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
if !self self.setup_actors
.setup_actors
.send(&order_id, setup_maker::Accepted) .send(&order_id, setup_maker::Accepted)
.await .await
{ .with_context(|| format!("No active contract setup for order {}", order_id))?;
anyhow::bail!("No active contract setup for order {}", order_id);
}
cfd.state = CfdState::contract_setup(); cfd.state = CfdState::contract_setup();
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;

Loading…
Cancel
Save