Browse Source

Introduce a `fan_out::Actor` instead of iterating in the `oracle::Actor`

A fan-out actor takes an incoming message and forwards it to a set
of actors. This allows our `oracle::Actor` to be unaware, how many
other actors are interested in the attestation, thus simplifying it.
refactor/no-log-handler
Thomas Eizinger 3 years ago
parent
commit
f043928250
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 29
      daemon/src/fan_out.rs
  2. 1
      daemon/src/lib.rs
  3. 16
      daemon/src/maker.rs
  4. 11
      daemon/src/oracle.rs
  5. 14
      daemon/src/taker.rs

29
daemon/src/fan_out.rs

@ -0,0 +1,29 @@
use xtra::prelude::MessageChannel;
/// A fan-out actor takes every incoming message and forwards it to a set of other actors.
pub struct Actor<M: xtra::Message<Result = ()>> {
receivers: Vec<Box<dyn MessageChannel<M>>>,
}
impl<M: xtra::Message<Result = ()>> Actor<M> {
pub fn new(receivers: &[&dyn MessageChannel<M>]) -> Self {
Self {
receivers: receivers.iter().map(|c| c.clone_channel()).collect(),
}
}
}
impl<M> xtra::Actor for Actor<M> where M: xtra::Message<Result = ()> {}
#[async_trait::async_trait]
impl<M> xtra::Handler<M> for Actor<M>
where
M: xtra::Message<Result = ()> + Clone + Sync + 'static,
{
async fn handle(&mut self, message: M, _: &mut xtra::Context<Self>) {
for receiver in &self.receivers {
// Not sure why here is no `do_send_async` ...
let _ = receiver.do_send(message.clone());
}
}
}

1
daemon/src/lib.rs

@ -2,6 +2,7 @@ pub mod actors;
pub mod auth; pub mod auth;
pub mod bitmex_price_feed; pub mod bitmex_price_feed;
pub mod db; pub mod db;
pub mod fan_out;
pub mod housekeeping; pub mod housekeeping;
pub mod keypair; pub mod keypair;
pub mod logger; pub mod logger;

16
daemon/src/maker.rs

@ -9,8 +9,8 @@ use daemon::model::WalletInfo;
use daemon::seed::Seed; use daemon::seed::Seed;
use daemon::wallet::Wallet; use daemon::wallet::Wallet;
use daemon::{ use daemon::{
bitmex_price_feed, housekeeping, logger, maker_cfd, maker_inc_connections, monitor, oracle, bitmex_price_feed, fan_out, housekeeping, logger, maker_cfd, maker_inc_connections, monitor,
wallet_sync, oracle, wallet_sync,
}; };
use rocket::fairing::AdHoc; use rocket::fairing::AdHoc;
use rocket_db_pools::Database; use rocket_db_pools::Database;
@ -259,13 +259,11 @@ async fn main() -> Result<()> {
.notify_interval(Duration::from_secs(60), || oracle::Sync) .notify_interval(Duration::from_secs(60), || oracle::Sync)
.unwrap(), .unwrap(),
); );
tokio::spawn(oracle_actor_context.run(oracle::Actor::new( let actor = fan_out::Actor::new(&[&cfd_maker_actor_inbox, &monitor_actor_address])
cfds, .create(None)
[ .spawn_global();
Box::new(cfd_maker_actor_inbox.clone()),
Box::new(monitor_actor_address), tokio::spawn(oracle_actor_context.run(oracle::Actor::new(cfds, actor)));
],
)));
oracle_actor_address oracle_actor_address
.do_send_async(oracle::Sync) .do_send_async(oracle::Sync)

11
daemon/src/oracle.rs

@ -15,7 +15,7 @@ pub struct Actor {
announcements: HashMap<BitMexPriceEventId, (OffsetDateTime, Vec<schnorrsig::PublicKey>)>, announcements: HashMap<BitMexPriceEventId, (OffsetDateTime, Vec<schnorrsig::PublicKey>)>,
pending_announcements: HashSet<BitMexPriceEventId>, pending_announcements: HashSet<BitMexPriceEventId>,
pending_attestations: HashSet<BitMexPriceEventId>, pending_attestations: HashSet<BitMexPriceEventId>,
attestation_channels: [Box<dyn StrongMessageChannel<Attestation>>; 2], attestation_channel: Box<dyn StrongMessageChannel<Attestation>>,
} }
pub struct Sync; pub struct Sync;
@ -67,7 +67,7 @@ struct NewAttestationFetched {
impl Actor { impl Actor {
pub fn new( pub fn new(
cfds: Vec<Cfd>, cfds: Vec<Cfd>,
attestation_channels: [Box<dyn StrongMessageChannel<Attestation>>; 2], attestation_channel: impl StrongMessageChannel<Attestation> + 'static,
) -> Self { ) -> Self {
let mut pending_attestations = HashSet::new(); let mut pending_attestations = HashSet::new();
@ -102,7 +102,7 @@ impl Actor {
announcements: HashMap::new(), announcements: HashMap::new(),
pending_announcements: HashSet::new(), pending_announcements: HashSet::new(),
pending_attestations, pending_attestations,
attestation_channels, attestation_channel: Box::new(attestation_channel),
} }
} }
} }
@ -193,10 +193,7 @@ impl Actor {
) -> Result<()> { ) -> Result<()> {
tracing::info!("Fetched new attestation for {}", id); tracing::info!("Fetched new attestation for {}", id);
for channel in self.attestation_channels.iter() { let _ = self.attestation_channel.send(attestation).await;
channel.do_send(attestation.clone())?;
}
self.pending_attestations.remove(&id); self.pending_attestations.remove(&id);
Ok(()) Ok(())

14
daemon/src/taker.rs

@ -8,7 +8,7 @@ use daemon::model::WalletInfo;
use daemon::seed::Seed; use daemon::seed::Seed;
use daemon::wallet::Wallet; use daemon::wallet::Wallet;
use daemon::{ use daemon::{
bitmex_price_feed, housekeeping, logger, monitor, oracle, send_to_socket, taker_cfd, bitmex_price_feed, fan_out, housekeeping, logger, monitor, oracle, send_to_socket, taker_cfd,
wallet_sync, wire, wallet_sync, wire,
}; };
use futures::StreamExt; use futures::StreamExt;
@ -253,13 +253,11 @@ async fn main() -> Result<()> {
.notify_interval(Duration::from_secs(60), || oracle::Sync) .notify_interval(Duration::from_secs(60), || oracle::Sync)
.unwrap(), .unwrap(),
); );
tokio::spawn(oracle_actor_context.run(oracle::Actor::new( let actor = fan_out::Actor::new(&[&cfd_actor_inbox, &monitor_actor_address])
cfds, .create(None)
[ .spawn_global();
Box::new(cfd_actor_inbox.clone()),
Box::new(monitor_actor_address), tokio::spawn(oracle_actor_context.run(oracle::Actor::new(cfds, actor)));
],
)));
oracle_actor_address oracle_actor_address
.do_send_async(oracle::Sync) .do_send_async(oracle::Sync)

Loading…
Cancel
Save