Browse Source

Merge #862

862: Cleanup ephemeral r=luckysori a=luckysori

Trying to align setup actors with the rest of the ephemeral actors. Let me know if there's anything else.

Co-authored-by: Lucas Soriano del Pino <lucas_soriano@fastmail.com>
resilient-broadcast
bors[bot] 3 years ago
committed by GitHub
parent
commit
4562702961
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 19
      daemon/src/address_map.rs
  2. 61
      daemon/src/connection.rs
  3. 4
      daemon/src/setup_maker.rs
  4. 12
      daemon/src/setup_taker.rs

19
daemon/src/address_map.rs

@ -1,3 +1,4 @@
use anyhow::Result;
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::collections::HashMap; use std::collections::HashMap;
use std::hash::Hash; use std::hash::Hash;
@ -61,6 +62,24 @@ where
_ => Err(NotConnected::new::<A>()), _ => Err(NotConnected::new::<A>()),
} }
} }
/// Sends a message to the actor stored with the given key.
pub async fn send_fallible<M>(&self, key: &K, msg: M) -> Result<Result<()>, NotConnected>
where
M: Message<Result = anyhow::Result<()>>,
A: Handler<M> + ActorName,
{
match self.inner.get(key) {
Some(addr) if addr.is_connected() => {
let res = addr
.send(msg)
.await
.expect("we checked that we are connected");
Ok(res)
}
_ => Err(NotConnected::new::<A>()),
}
}
} }
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]

61
daemon/src/connection.rs

@ -11,7 +11,6 @@ use crate::{
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use bdk::bitcoin::Amount; use bdk::bitcoin::Amount;
use futures::{SinkExt, StreamExt, TryStreamExt}; use futures::{SinkExt, StreamExt, TryStreamExt};
use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
@ -40,7 +39,7 @@ pub struct Actor {
heartbeat_timeout: Duration, heartbeat_timeout: Duration,
connect_timeout: Duration, connect_timeout: Duration,
connected_state: Option<ConnectedState>, connected_state: Option<ConnectedState>,
setup_actors: HashMap<OrderId, xtra::Address<setup_taker::Actor>>, setup_actors: AddressMap<OrderId, setup_taker::Actor>,
collab_settlement_actors: AddressMap<OrderId, collab_settlement_taker::Actor>, collab_settlement_actors: AddressMap<OrderId, collab_settlement_taker::Actor>,
rollover_actors: AddressMap<OrderId, rollover_taker::Actor>, rollover_actors: AddressMap<OrderId, rollover_taker::Actor>,
} }
@ -119,7 +118,7 @@ impl Actor {
current_order: current_order.clone_channel(), current_order: current_order.clone_channel(),
heartbeat_timeout: hearthbeat_timeout, heartbeat_timeout: hearthbeat_timeout,
connected_state: None, connected_state: None,
setup_actors: HashMap::new(), setup_actors: AddressMap::default(),
connect_timeout, connect_timeout,
collab_settlement_actors: AddressMap::default(), collab_settlement_actors: AddressMap::default(),
rollover_actors: AddressMap::default(), rollover_actors: AddressMap::default(),
@ -332,40 +331,44 @@ impl Actor {
.expect("wire messages only to arrive in connected state") .expect("wire messages only to arrive in connected state")
.last_heartbeat = SystemTime::now(); .last_heartbeat = SystemTime::now();
} }
wire::MakerToTaker::ConfirmOrder(order_id) => match self.setup_actors.get(&order_id) { wire::MakerToTaker::ConfirmOrder(order_id) => {
Some(addr) => { if self
let _ = addr.send(setup_taker::Accepted).await; .setup_actors
} .send_fallible(&order_id, setup_taker::Accepted)
None => { .await
.is_err()
{
tracing::warn!(%order_id, "No active contract setup"); tracing::warn!(%order_id, "No active contract setup");
} }
}, }
wire::MakerToTaker::RejectOrder(order_id) => match self.setup_actors.get(&order_id) { wire::MakerToTaker::RejectOrder(order_id) => {
Some(addr) => { if self
let _ = addr.send(setup_taker::Rejected::without_reason()).await; .setup_actors
} .send_fallible(&order_id, setup_taker::Rejected::without_reason())
None => { .await
.is_err()
{
tracing::warn!(%order_id, "No active contract setup"); tracing::warn!(%order_id, "No active contract setup");
} }
}, }
wire::MakerToTaker::Protocol { order_id, msg } => { wire::MakerToTaker::Protocol { order_id, msg } => {
match self.setup_actors.get(&order_id) { if self
Some(addr) => { .setup_actors
let _ = addr.send(msg).await; .send_fallible(&order_id, msg)
} .await
None => { .is_err()
tracing::warn!(%order_id, "No active contract setup"); {
} tracing::warn!(%order_id, "No active contract setup");
} }
} }
wire::MakerToTaker::InvalidOrderId(order_id) => { wire::MakerToTaker::InvalidOrderId(order_id) => {
match self.setup_actors.get(&order_id) { if self
Some(addr) => { .setup_actors
let _ = addr.send(setup_taker::Rejected::invalid_order_id()).await; .send_fallible(&order_id, setup_taker::Rejected::invalid_order_id())
} .await
None => { .is_err()
tracing::warn!(%order_id, "No active contract setup"); {
} tracing::warn!(%order_id, "No active contract setup");
} }
} }
wire::MakerToTaker::Settlement { order_id, msg } => { wire::MakerToTaker::Settlement { order_id, msg } => {

4
daemon/src/setup_maker.rs

@ -150,7 +150,7 @@ impl Actor {
}; };
if let Err(error) = fut.await { if let Err(error) = fut.await {
tracing::error!(%order_id, "Stopping setup_maker actor: {}", error); tracing::warn!(%order_id, "Stopping setup_maker actor: {}", error);
self.complete(Completed::Failed { order_id, error }, ctx) self.complete(Completed::Failed { order_id, error }, ctx)
.await; .await;
@ -287,6 +287,6 @@ impl xtra::Message for Started {
impl ActorName for Actor { impl ActorName for Actor {
fn actor_name() -> String { fn actor_name() -> String {
"Taker contract setup".to_string() "Maker contract setup".to_string()
} }
} }

12
daemon/src/setup_taker.rs

@ -4,7 +4,7 @@ use crate::oracle::Announcement;
use crate::setup_contract::{self, SetupParams}; use crate::setup_contract::{self, SetupParams};
use crate::tokio_ext::spawn_fallible; use crate::tokio_ext::spawn_fallible;
use crate::wire::{self, SetupMsg}; use crate::wire::{self, SetupMsg};
use crate::{connection, wallet}; use crate::{address_map, connection, wallet};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use async_trait::async_trait; use async_trait::async_trait;
use futures::channel::mpsc::{self, UnboundedSender}; use futures::channel::mpsc::{self, UnboundedSender};
@ -117,7 +117,7 @@ impl Actor {
tracing::info!(%order_id, "Order got rejected"); tracing::info!(%order_id, "Order got rejected");
if msg.is_invalid_order { if msg.is_invalid_order {
tracing::error!(%order_id, "Rejection reason: Invalid order ID"); tracing::warn!(%order_id, "Rejection reason: Invalid order ID");
} }
self.on_completed self.on_completed
@ -182,7 +182,7 @@ impl xtra::Actor for Actor {
.await; .await;
if let Err(e) = res { if let Err(e) = res {
tracing::error!(%self.order.id, "Stopping setup_taker actor: {}", e); tracing::warn!(%self.order.id, "Stopping setup_taker actor: {}", e);
ctx.stop() ctx.stop()
} }
} }
@ -244,3 +244,9 @@ impl xtra::Message for Started {
impl xtra::Message for Completed { impl xtra::Message for Completed {
type Result = (); type Result = ();
} }
impl address_map::ActorName for Actor {
fn actor_name() -> String {
"Taker contract setup".to_string()
}
}

Loading…
Cancel
Save