diff --git a/daemon/src/address_map.rs b/daemon/src/address_map.rs index 0ea15a5..7425b6e 100644 --- a/daemon/src/address_map.rs +++ b/daemon/src/address_map.rs @@ -1,3 +1,4 @@ +use anyhow::Result; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::hash::Hash; @@ -61,6 +62,24 @@ where _ => Err(NotConnected::new::()), } } + + /// Sends a message to the actor stored with the given key. + pub async fn send_fallible(&self, key: &K, msg: M) -> Result, NotConnected> + where + M: Message>, + A: Handler + ActorName, + { + match self.inner.get(key) { + Some(addr) if addr.is_connected() => { + let res = addr + .send(msg) + .await + .expect("we checked that we are connected"); + Ok(res) + } + _ => Err(NotConnected::new::()), + } + } } #[derive(thiserror::Error, Debug)] diff --git a/daemon/src/connection.rs b/daemon/src/connection.rs index 0130ad6..44acab1 100644 --- a/daemon/src/connection.rs +++ b/daemon/src/connection.rs @@ -11,7 +11,6 @@ use crate::{ use anyhow::{bail, Context, Result}; use bdk::bitcoin::Amount; use futures::{SinkExt, StreamExt, TryStreamExt}; -use std::collections::HashMap; use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; @@ -40,7 +39,7 @@ pub struct Actor { heartbeat_timeout: Duration, connect_timeout: Duration, connected_state: Option, - setup_actors: HashMap>, + setup_actors: AddressMap, collab_settlement_actors: AddressMap, rollover_actors: AddressMap, } @@ -119,7 +118,7 @@ impl Actor { current_order: current_order.clone_channel(), heartbeat_timeout: hearthbeat_timeout, connected_state: None, - setup_actors: HashMap::new(), + setup_actors: AddressMap::default(), connect_timeout, collab_settlement_actors: AddressMap::default(), rollover_actors: AddressMap::default(), @@ -332,40 +331,44 @@ impl Actor { .expect("wire messages only to arrive in connected state") .last_heartbeat = SystemTime::now(); } - wire::MakerToTaker::ConfirmOrder(order_id) => match self.setup_actors.get(&order_id) { - Some(addr) => { - let _ = addr.send(setup_taker::Accepted).await; - } - None => { + wire::MakerToTaker::ConfirmOrder(order_id) => { + if self + .setup_actors + .send_fallible(&order_id, setup_taker::Accepted) + .await + .is_err() + { tracing::warn!(%order_id, "No active contract setup"); } - }, - wire::MakerToTaker::RejectOrder(order_id) => match self.setup_actors.get(&order_id) { - Some(addr) => { - let _ = addr.send(setup_taker::Rejected::without_reason()).await; - } - None => { + } + wire::MakerToTaker::RejectOrder(order_id) => { + if self + .setup_actors + .send_fallible(&order_id, setup_taker::Rejected::without_reason()) + .await + .is_err() + { tracing::warn!(%order_id, "No active contract setup"); } - }, + } wire::MakerToTaker::Protocol { order_id, msg } => { - match self.setup_actors.get(&order_id) { - Some(addr) => { - let _ = addr.send(msg).await; - } - None => { - tracing::warn!(%order_id, "No active contract setup"); - } + if self + .setup_actors + .send_fallible(&order_id, msg) + .await + .is_err() + { + tracing::warn!(%order_id, "No active contract setup"); } } wire::MakerToTaker::InvalidOrderId(order_id) => { - match self.setup_actors.get(&order_id) { - Some(addr) => { - let _ = addr.send(setup_taker::Rejected::invalid_order_id()).await; - } - None => { - tracing::warn!(%order_id, "No active contract setup"); - } + if self + .setup_actors + .send_fallible(&order_id, setup_taker::Rejected::invalid_order_id()) + .await + .is_err() + { + tracing::warn!(%order_id, "No active contract setup"); } } wire::MakerToTaker::Settlement { order_id, msg } => { diff --git a/daemon/src/setup_taker.rs b/daemon/src/setup_taker.rs index 4c7ea85..7005cf6 100644 --- a/daemon/src/setup_taker.rs +++ b/daemon/src/setup_taker.rs @@ -4,7 +4,7 @@ use crate::oracle::Announcement; use crate::setup_contract::{self, SetupParams}; use crate::tokio_ext::spawn_fallible; use crate::wire::{self, SetupMsg}; -use crate::{connection, wallet}; +use crate::{address_map, connection, wallet}; use anyhow::{Context, Result}; use async_trait::async_trait; use futures::channel::mpsc::{self, UnboundedSender}; @@ -244,3 +244,9 @@ impl xtra::Message for Started { impl xtra::Message for Completed { type Result = (); } + +impl address_map::ActorName for Actor { + fn actor_name() -> String { + "Taker contract setup".to_string() + } +}