From f04392825047a3ff80457869932e8497b2962422 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 14 Oct 2021 16:21:01 +1100 Subject: [PATCH] Introduce a `fan_out::Actor` instead of iterating in the `oracle::Actor` A fan-out actor takes an incoming message and forwards it to a set of actors. This allows our `oracle::Actor` to be unaware, how many other actors are interested in the attestation, thus simplifying it. --- daemon/src/fan_out.rs | 29 +++++++++++++++++++++++++++++ daemon/src/lib.rs | 1 + daemon/src/maker.rs | 16 +++++++--------- daemon/src/oracle.rs | 11 ++++------- daemon/src/taker.rs | 14 ++++++-------- 5 files changed, 47 insertions(+), 24 deletions(-) create mode 100644 daemon/src/fan_out.rs diff --git a/daemon/src/fan_out.rs b/daemon/src/fan_out.rs new file mode 100644 index 0000000..04e1236 --- /dev/null +++ b/daemon/src/fan_out.rs @@ -0,0 +1,29 @@ +use xtra::prelude::MessageChannel; + +/// A fan-out actor takes every incoming message and forwards it to a set of other actors. +pub struct Actor> { + receivers: Vec>>, +} + +impl> Actor { + pub fn new(receivers: &[&dyn MessageChannel]) -> Self { + Self { + receivers: receivers.iter().map(|c| c.clone_channel()).collect(), + } + } +} + +impl xtra::Actor for Actor where M: xtra::Message {} + +#[async_trait::async_trait] +impl xtra::Handler for Actor +where + M: xtra::Message + Clone + Sync + 'static, +{ + async fn handle(&mut self, message: M, _: &mut xtra::Context) { + for receiver in &self.receivers { + // Not sure why here is no `do_send_async` ... + let _ = receiver.do_send(message.clone()); + } + } +} diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index ad71158..b401d9d 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -2,6 +2,7 @@ pub mod actors; pub mod auth; pub mod bitmex_price_feed; pub mod db; +pub mod fan_out; pub mod housekeeping; pub mod keypair; pub mod logger; diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 97edb62..029a0fb 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -9,8 +9,8 @@ use daemon::model::WalletInfo; use daemon::seed::Seed; use daemon::wallet::Wallet; use daemon::{ - bitmex_price_feed, housekeeping, logger, maker_cfd, maker_inc_connections, monitor, oracle, - wallet_sync, + bitmex_price_feed, fan_out, housekeeping, logger, maker_cfd, maker_inc_connections, monitor, + oracle, wallet_sync, }; use rocket::fairing::AdHoc; use rocket_db_pools::Database; @@ -259,13 +259,11 @@ async fn main() -> Result<()> { .notify_interval(Duration::from_secs(60), || oracle::Sync) .unwrap(), ); - tokio::spawn(oracle_actor_context.run(oracle::Actor::new( - cfds, - [ - Box::new(cfd_maker_actor_inbox.clone()), - Box::new(monitor_actor_address), - ], - ))); + let actor = fan_out::Actor::new(&[&cfd_maker_actor_inbox, &monitor_actor_address]) + .create(None) + .spawn_global(); + + tokio::spawn(oracle_actor_context.run(oracle::Actor::new(cfds, actor))); oracle_actor_address .do_send_async(oracle::Sync) diff --git a/daemon/src/oracle.rs b/daemon/src/oracle.rs index 91740a8..39e9b62 100644 --- a/daemon/src/oracle.rs +++ b/daemon/src/oracle.rs @@ -15,7 +15,7 @@ pub struct Actor { announcements: HashMap)>, pending_announcements: HashSet, pending_attestations: HashSet, - attestation_channels: [Box>; 2], + attestation_channel: Box>, } pub struct Sync; @@ -67,7 +67,7 @@ struct NewAttestationFetched { impl Actor { pub fn new( cfds: Vec, - attestation_channels: [Box>; 2], + attestation_channel: impl StrongMessageChannel + 'static, ) -> Self { let mut pending_attestations = HashSet::new(); @@ -102,7 +102,7 @@ impl Actor { announcements: HashMap::new(), pending_announcements: HashSet::new(), pending_attestations, - attestation_channels, + attestation_channel: Box::new(attestation_channel), } } } @@ -193,10 +193,7 @@ impl Actor { ) -> Result<()> { tracing::info!("Fetched new attestation for {}", id); - for channel in self.attestation_channels.iter() { - channel.do_send(attestation.clone())?; - } - + let _ = self.attestation_channel.send(attestation).await; self.pending_attestations.remove(&id); Ok(()) diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index aebc453..fa87db4 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -8,7 +8,7 @@ use daemon::model::WalletInfo; use daemon::seed::Seed; use daemon::wallet::Wallet; use daemon::{ - bitmex_price_feed, housekeeping, logger, monitor, oracle, send_to_socket, taker_cfd, + bitmex_price_feed, fan_out, housekeeping, logger, monitor, oracle, send_to_socket, taker_cfd, wallet_sync, wire, }; use futures::StreamExt; @@ -253,13 +253,11 @@ async fn main() -> Result<()> { .notify_interval(Duration::from_secs(60), || oracle::Sync) .unwrap(), ); - tokio::spawn(oracle_actor_context.run(oracle::Actor::new( - cfds, - [ - Box::new(cfd_actor_inbox.clone()), - Box::new(monitor_actor_address), - ], - ))); + let actor = fan_out::Actor::new(&[&cfd_actor_inbox, &monitor_actor_address]) + .create(None) + .spawn_global(); + + tokio::spawn(oracle_actor_context.run(oracle::Actor::new(cfds, actor))); oracle_actor_address .do_send_async(oracle::Sync)