Browse Source

Replace HashMap in connection::Actor with AddressMap

resilient-broadcast
Lucas Soriano del Pino 3 years ago
parent
commit
f41fbba91c
No known key found for this signature in database GPG Key ID: 89CE0DB40A19D524
  1. 19
      daemon/src/address_map.rs
  2. 61
      daemon/src/connection.rs
  3. 8
      daemon/src/setup_taker.rs

19
daemon/src/address_map.rs

@ -1,3 +1,4 @@
use anyhow::Result;
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::collections::HashMap; use std::collections::HashMap;
use std::hash::Hash; use std::hash::Hash;
@ -61,6 +62,24 @@ where
_ => Err(NotConnected::new::<A>()), _ => Err(NotConnected::new::<A>()),
} }
} }
/// Sends a message to the actor stored with the given key.
pub async fn send_fallible<M>(&self, key: &K, msg: M) -> Result<Result<()>, NotConnected>
where
M: Message<Result = anyhow::Result<()>>,
A: Handler<M> + ActorName,
{
match self.inner.get(key) {
Some(addr) if addr.is_connected() => {
let res = addr
.send(msg)
.await
.expect("we checked that we are connected");
Ok(res)
}
_ => Err(NotConnected::new::<A>()),
}
}
} }
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]

61
daemon/src/connection.rs

@ -11,7 +11,6 @@ use crate::{
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use bdk::bitcoin::Amount; use bdk::bitcoin::Amount;
use futures::{SinkExt, StreamExt, TryStreamExt}; use futures::{SinkExt, StreamExt, TryStreamExt};
use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
@ -40,7 +39,7 @@ pub struct Actor {
heartbeat_timeout: Duration, heartbeat_timeout: Duration,
connect_timeout: Duration, connect_timeout: Duration,
connected_state: Option<ConnectedState>, connected_state: Option<ConnectedState>,
setup_actors: HashMap<OrderId, xtra::Address<setup_taker::Actor>>, setup_actors: AddressMap<OrderId, setup_taker::Actor>,
collab_settlement_actors: AddressMap<OrderId, collab_settlement_taker::Actor>, collab_settlement_actors: AddressMap<OrderId, collab_settlement_taker::Actor>,
rollover_actors: AddressMap<OrderId, rollover_taker::Actor>, rollover_actors: AddressMap<OrderId, rollover_taker::Actor>,
} }
@ -119,7 +118,7 @@ impl Actor {
current_order: current_order.clone_channel(), current_order: current_order.clone_channel(),
heartbeat_timeout: hearthbeat_timeout, heartbeat_timeout: hearthbeat_timeout,
connected_state: None, connected_state: None,
setup_actors: HashMap::new(), setup_actors: AddressMap::default(),
connect_timeout, connect_timeout,
collab_settlement_actors: AddressMap::default(), collab_settlement_actors: AddressMap::default(),
rollover_actors: AddressMap::default(), rollover_actors: AddressMap::default(),
@ -332,40 +331,44 @@ impl Actor {
.expect("wire messages only to arrive in connected state") .expect("wire messages only to arrive in connected state")
.last_heartbeat = SystemTime::now(); .last_heartbeat = SystemTime::now();
} }
wire::MakerToTaker::ConfirmOrder(order_id) => match self.setup_actors.get(&order_id) { wire::MakerToTaker::ConfirmOrder(order_id) => {
Some(addr) => { if self
let _ = addr.send(setup_taker::Accepted).await; .setup_actors
} .send_fallible(&order_id, setup_taker::Accepted)
None => { .await
.is_err()
{
tracing::warn!(%order_id, "No active contract setup"); tracing::warn!(%order_id, "No active contract setup");
} }
}, }
wire::MakerToTaker::RejectOrder(order_id) => match self.setup_actors.get(&order_id) { wire::MakerToTaker::RejectOrder(order_id) => {
Some(addr) => { if self
let _ = addr.send(setup_taker::Rejected::without_reason()).await; .setup_actors
} .send_fallible(&order_id, setup_taker::Rejected::without_reason())
None => { .await
.is_err()
{
tracing::warn!(%order_id, "No active contract setup"); tracing::warn!(%order_id, "No active contract setup");
} }
}, }
wire::MakerToTaker::Protocol { order_id, msg } => { wire::MakerToTaker::Protocol { order_id, msg } => {
match self.setup_actors.get(&order_id) { if self
Some(addr) => { .setup_actors
let _ = addr.send(msg).await; .send_fallible(&order_id, msg)
} .await
None => { .is_err()
tracing::warn!(%order_id, "No active contract setup"); {
} tracing::warn!(%order_id, "No active contract setup");
} }
} }
wire::MakerToTaker::InvalidOrderId(order_id) => { wire::MakerToTaker::InvalidOrderId(order_id) => {
match self.setup_actors.get(&order_id) { if self
Some(addr) => { .setup_actors
let _ = addr.send(setup_taker::Rejected::invalid_order_id()).await; .send_fallible(&order_id, setup_taker::Rejected::invalid_order_id())
} .await
None => { .is_err()
tracing::warn!(%order_id, "No active contract setup"); {
} tracing::warn!(%order_id, "No active contract setup");
} }
} }
wire::MakerToTaker::Settlement { order_id, msg } => { wire::MakerToTaker::Settlement { order_id, msg } => {

8
daemon/src/setup_taker.rs

@ -4,7 +4,7 @@ use crate::oracle::Announcement;
use crate::setup_contract::{self, SetupParams}; use crate::setup_contract::{self, SetupParams};
use crate::tokio_ext::spawn_fallible; use crate::tokio_ext::spawn_fallible;
use crate::wire::{self, SetupMsg}; use crate::wire::{self, SetupMsg};
use crate::{connection, wallet}; use crate::{address_map, connection, wallet};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use async_trait::async_trait; use async_trait::async_trait;
use futures::channel::mpsc::{self, UnboundedSender}; use futures::channel::mpsc::{self, UnboundedSender};
@ -244,3 +244,9 @@ impl xtra::Message for Started {
impl xtra::Message for Completed { impl xtra::Message for Completed {
type Result = (); type Result = ();
} }
impl address_map::ActorName for Actor {
fn actor_name() -> String {
"Taker contract setup".to_string()
}
}

Loading…
Cancel
Save