diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 6dc0594..97edb62 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -260,9 +260,11 @@ async fn main() -> Result<()> { .unwrap(), ); tokio::spawn(oracle_actor_context.run(oracle::Actor::new( - cfd_maker_actor_inbox.clone(), - monitor_actor_address, cfds, + [ + Box::new(cfd_maker_actor_inbox.clone()), + Box::new(monitor_actor_address), + ], ))); oracle_actor_address diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index aa80248..5295cb9 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -86,10 +86,10 @@ pub struct Actor { update_cfd_feed_sender: watch::Sender, takers: Address, current_order_id: Option, - monitor_actor: Address>, + monitor_actor: Address, setup_state: SetupState, roll_over_state: RollOverState, - oracle_actor: Address>>, + oracle_actor: Address, // Maker needs to also store TakerId to be able to send a reply back current_pending_proposals: HashMap, // TODO: Persist instead of using an in-memory hashmap for resiliency? @@ -124,8 +124,8 @@ impl Actor { order_feed_sender: watch::Sender>, update_cfd_feed_sender: watch::Sender, takers: Address, - monitor_actor: Address>, - oracle_actor: Address>>, + monitor_actor: Address, + oracle_actor: Address, ) -> Self { Self { db, diff --git a/daemon/src/monitor.rs b/daemon/src/monitor.rs index 50d600b..a5c4c35 100644 --- a/daemon/src/monitor.rs +++ b/daemon/src/monitor.rs @@ -14,6 +14,7 @@ use std::convert::{TryFrom, TryInto}; use std::fmt; use std::marker::Send; use std::ops::{Add, RangeInclusive}; +use xtra::prelude::StrongMessageChannel; const FINALITY_CONFIRMATIONS: u32 = 1; @@ -33,26 +34,19 @@ pub struct MonitorParams { pub struct Sync; -pub struct Actor -where - T: xtra::Actor, -{ +pub struct Actor { cfds: HashMap, - cfd_actor_addr: xtra::Address, - + event_channel: Box>, client: C, latest_block_height: BlockHeight, current_status: BTreeMap<(Txid, Script), ScriptStatus>, awaiting_status: HashMap<(Txid, Script), Vec<(ScriptStatus, Event)>>, } -impl Actor -where - T: xtra::Actor + xtra::Handler, -{ +impl Actor { pub async fn new( electrum_rpc_url: &str, - cfd_actor_addr: xtra::Address, + event_channel: impl StrongMessageChannel + 'static, cfds: Vec, ) -> Result { let client = bdk::electrum_client::Client::new(electrum_rpc_url) @@ -66,7 +60,7 @@ where let mut actor = Self { cfds: HashMap::new(), - cfd_actor_addr, + event_channel: Box::new(event_channel), client, latest_block_height: BlockHeight::try_from(latest_block)?, current_status: BTreeMap::default(), @@ -131,7 +125,7 @@ where actor.monitor_cet_finality(map_cets(dlc.cets), attestation.into(), cfd.order.id)?; actor.monitor_commit_refund_timelock(¶ms, cfd.order.id); actor.monitor_refund_finality(¶ms,cfd.order.id); - } + } CfdState::PendingClose { collaborative_close, .. } => { let transaction = collaborative_close.tx; let close_params = (transaction.txid(), @@ -164,9 +158,8 @@ where } } -impl Actor +impl Actor where - T: xtra::Actor + xtra::Handler, C: bdk::electrum_client::ElectrumApi, { fn monitor_all(&mut self, params: &MonitorParams, order_id: OrderId) { @@ -415,7 +408,7 @@ where for (target_status, event) in reached_monitoring_target { tracing::info!(%txid, target = %target_status, current = %status, "Bitcoin transaction reached monitoring target"); - self.cfd_actor_addr.send(event).await?; + self.event_channel.send(event).await?; } } } @@ -622,18 +615,11 @@ impl xtra::Message for Sync { type Result = (); } -impl xtra::Actor for Actor -where - T: xtra::Actor, - C: Send, - C: 'static, -{ -} +impl xtra::Actor for Actor where C: Send + 'static {} #[async_trait] -impl xtra::Handler for Actor +impl xtra::Handler for Actor where - T: xtra::Actor + xtra::Handler, C: bdk::electrum_client::ElectrumApi + Send + 'static, { async fn handle(&mut self, msg: StartMonitoring, _ctx: &mut xtra::Context) { @@ -644,9 +630,8 @@ where } } #[async_trait] -impl xtra::Handler for Actor +impl xtra::Handler for Actor where - T: xtra::Actor + xtra::Handler, C: bdk::electrum_client::ElectrumApi + Send + 'static, { async fn handle(&mut self, _: Sync, _ctx: &mut xtra::Context) { @@ -655,10 +640,7 @@ where } #[async_trait] -impl xtra::Handler for Actor -where - T: xtra::Actor + xtra::Handler, -{ +impl xtra::Handler for Actor { async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut xtra::Context) { log_error!(self.handle_oracle_attestation(msg)); } @@ -690,7 +672,7 @@ mod tests { let refund_expired = Event::RefundTimelockExpired(OrderId::default()); let mut monitor = Actor::for_test( - recorder_address, + Box::new(recorder_address), [( (txid1(), script1()), vec![ @@ -736,7 +718,7 @@ mod tests { let refund_finality = Event::RefundFinality(OrderId::default()); let mut monitor = Actor::for_test( - recorder_address, + Box::new(recorder_address), [ ( (txid1(), script1()), @@ -773,7 +755,7 @@ mod tests { let cet_finality = Event::CetFinality(OrderId::default()); let mut monitor = Actor::for_test( - recorder_address, + Box::new(recorder_address), [( (txid1(), script1()), vec![(ScriptStatus::finality(), cet_finality.clone())], @@ -790,18 +772,15 @@ mod tests { assert!(monitor.awaiting_status.is_empty()); } - impl Actor - where - A: xtra::Actor + xtra::Handler, - { + impl Actor { #[allow(clippy::type_complexity)] fn for_test( - address: xtra::Address, + event_channel: Box>, subscriptions: [((Txid, Script), Vec<(ScriptStatus, Event)>); N], ) -> Self { Actor { cfds: HashMap::default(), - cfd_actor_addr: address, + event_channel, client: stub::Client::default(), latest_block_height: BlockHeight(0), current_status: BTreeMap::default(), diff --git a/daemon/src/oracle.rs b/daemon/src/oracle.rs index 770b3eb..91740a8 100644 --- a/daemon/src/oracle.rs +++ b/daemon/src/oracle.rs @@ -9,13 +9,13 @@ use serde::Deserialize; use std::collections::{HashMap, HashSet}; use std::ops::Add; use time::ext::NumericalDuration; +use xtra::prelude::StrongMessageChannel; -pub struct Actor { +pub struct Actor { announcements: HashMap)>, pending_announcements: HashSet, pending_attestations: HashSet, - cfd_actor_address: xtra::Address, - monitor_actor_address: xtra::Address, + attestation_channels: [Box>; 2], } pub struct Sync; @@ -64,11 +64,10 @@ struct NewAttestationFetched { attestation: Attestation, } -impl Actor { +impl Actor { pub fn new( - cfd_actor_address: xtra::Address, - monitor_actor_address: xtra::Address, cfds: Vec, + attestation_channels: [Box>; 2], ) -> Self { let mut pending_attestations = HashSet::new(); @@ -103,17 +102,12 @@ impl Actor { announcements: HashMap::new(), pending_announcements: HashSet::new(), pending_attestations, - cfd_actor_address, - monitor_actor_address, + attestation_channels, } } } -impl Actor -where - CFD: 'static, - M: 'static, -{ +impl Actor { fn update_pending_announcements(&mut self, ctx: &mut xtra::Context) { for event_id in self.pending_announcements.iter().cloned() { let this = ctx.address().expect("self to be alive"); @@ -149,11 +143,7 @@ where } } -impl Actor -where - CFD: xtra::Handler, - M: xtra::Handler, -{ +impl Actor { fn update_pending_attestations(&mut self, ctx: &mut xtra::Context) { for event_id in self.pending_attestations.iter().copied() { if !event_id.has_likely_occured() { @@ -203,14 +193,9 @@ where ) -> Result<()> { tracing::info!("Fetched new attestation for {}", id); - self.cfd_actor_address - .clone() - .do_send_async(attestation.clone()) - .await?; - self.monitor_actor_address - .clone() - .do_send_async(attestation) - .await?; + for channel in self.attestation_channels.iter() { + channel.do_send(attestation.clone())?; + } self.pending_attestations.remove(&id); @@ -219,7 +204,7 @@ where } #[async_trait] -impl xtra::Handler for Actor { +impl xtra::Handler for Actor { async fn handle(&mut self, msg: MonitorAttestation, _ctx: &mut xtra::Context) { if !self.pending_attestations.insert(msg.event_id) { tracing::trace!("Attestation {} already being monitored", msg.event_id); @@ -228,7 +213,7 @@ impl xtra::Handler for Actor xtra::Handler for Actor { +impl xtra::Handler for Actor { async fn handle(&mut self, msg: FetchAnnouncement, _ctx: &mut xtra::Context) { if !self.pending_announcements.insert(msg.0) { tracing::trace!("Announcement {} already being fetched", msg.0); @@ -237,7 +222,7 @@ impl xtra::Handler for Actor xtra::Handler for Actor { +impl xtra::Handler for Actor { async fn handle( &mut self, msg: GetAnnouncement, @@ -254,7 +239,7 @@ impl xtra::Handler for Actor } #[async_trait] -impl xtra::Handler for Actor { +impl xtra::Handler for Actor { async fn handle(&mut self, msg: NewAnnouncementFetched, _ctx: &mut xtra::Context) { self.pending_announcements.remove(&msg.id); self.announcements @@ -263,11 +248,7 @@ impl xtra::Handler for Actor xtra::Handler for Actor -where - CFD: xtra::Handler, - M: xtra::Handler, -{ +impl xtra::Handler for Actor { async fn handle(&mut self, msg: NewAttestationFetched, _ctx: &mut xtra::Context) { log_error!(self.handle_new_attestation_fetched(msg.id, msg.attestation)); } @@ -310,14 +291,10 @@ impl From for cfd_protocol::Announcement { } } -impl xtra::Actor for Actor {} +impl xtra::Actor for Actor {} #[async_trait] -impl xtra::Handler for Actor -where - CFD: xtra::Handler, - M: xtra::Handler, -{ +impl xtra::Handler for Actor { async fn handle(&mut self, _: Sync, ctx: &mut xtra::Context) { self.update_pending_announcements(ctx); self.update_pending_attestations(ctx); diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index d77246f..aebc453 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -254,9 +254,11 @@ async fn main() -> Result<()> { .unwrap(), ); tokio::spawn(oracle_actor_context.run(oracle::Actor::new( - cfd_actor_inbox.clone(), - monitor_actor_address, cfds, + [ + Box::new(cfd_actor_inbox.clone()), + Box::new(monitor_actor_address), + ], ))); oracle_actor_address diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 5870670..3cc3d7f 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -77,10 +77,10 @@ pub struct Actor { order_feed_actor_inbox: watch::Sender>, update_cfd_feed_sender: watch::Sender, send_to_maker: Address>, - monitor_actor: Address>, + monitor_actor: Address, setup_state: SetupState, roll_over_state: RollOverState, - oracle_actor: Address>>, + oracle_actor: Address, current_pending_proposals: UpdateCfdProposals, } @@ -94,8 +94,8 @@ impl Actor { order_feed_actor_inbox: watch::Sender>, update_cfd_feed_sender: watch::Sender, send_to_maker: Address>, - monitor_actor: Address>, - oracle_actor: Address>>, + monitor_actor: Address, + oracle_actor: Address, ) -> Self { Self { db,