From 09c331f5479c2bec3e91734706fe64242e2fd8f1 Mon Sep 17 00:00:00 2001 From: Lucas Soriano del Pino Date: Thu, 14 Oct 2021 15:07:08 +1100 Subject: [PATCH] Replace CFD actor Address with MessageChannel in oracle::Actor The `oracle::Actor` only needs to be able to send the `oracle::Attestation` to an actor who can handle it. We were already able to express this using trait bounds, but it was blocking us from being able to make the `{maker_taker}_cfd::Actor` generic over different actors due to infinite, cyclic types. --- daemon/src/maker.rs | 6 +++-- daemon/src/maker_cfd.rs | 4 +-- daemon/src/oracle.rs | 59 +++++++++++++---------------------------- daemon/src/taker.rs | 6 +++-- daemon/src/taker_cfd.rs | 4 +-- 5 files changed, 30 insertions(+), 49 deletions(-) diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 6dc0594..97edb62 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -260,9 +260,11 @@ async fn main() -> Result<()> { .unwrap(), ); tokio::spawn(oracle_actor_context.run(oracle::Actor::new( - cfd_maker_actor_inbox.clone(), - monitor_actor_address, cfds, + [ + Box::new(cfd_maker_actor_inbox.clone()), + Box::new(monitor_actor_address), + ], ))); oracle_actor_address diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index f9e6e3f..5295cb9 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -89,7 +89,7 @@ pub struct Actor { monitor_actor: Address, setup_state: SetupState, roll_over_state: RollOverState, - oracle_actor: Address>, + oracle_actor: Address, // Maker needs to also store TakerId to be able to send a reply back current_pending_proposals: HashMap, // TODO: Persist instead of using an in-memory hashmap for resiliency? @@ -125,7 +125,7 @@ impl Actor { update_cfd_feed_sender: watch::Sender, takers: Address, monitor_actor: Address, - oracle_actor: Address>, + oracle_actor: Address, ) -> Self { Self { db, diff --git a/daemon/src/oracle.rs b/daemon/src/oracle.rs index 770b3eb..91740a8 100644 --- a/daemon/src/oracle.rs +++ b/daemon/src/oracle.rs @@ -9,13 +9,13 @@ use serde::Deserialize; use std::collections::{HashMap, HashSet}; use std::ops::Add; use time::ext::NumericalDuration; +use xtra::prelude::StrongMessageChannel; -pub struct Actor { +pub struct Actor { announcements: HashMap)>, pending_announcements: HashSet, pending_attestations: HashSet, - cfd_actor_address: xtra::Address, - monitor_actor_address: xtra::Address, + attestation_channels: [Box>; 2], } pub struct Sync; @@ -64,11 +64,10 @@ struct NewAttestationFetched { attestation: Attestation, } -impl Actor { +impl Actor { pub fn new( - cfd_actor_address: xtra::Address, - monitor_actor_address: xtra::Address, cfds: Vec, + attestation_channels: [Box>; 2], ) -> Self { let mut pending_attestations = HashSet::new(); @@ -103,17 +102,12 @@ impl Actor { announcements: HashMap::new(), pending_announcements: HashSet::new(), pending_attestations, - cfd_actor_address, - monitor_actor_address, + attestation_channels, } } } -impl Actor -where - CFD: 'static, - M: 'static, -{ +impl Actor { fn update_pending_announcements(&mut self, ctx: &mut xtra::Context) { for event_id in self.pending_announcements.iter().cloned() { let this = ctx.address().expect("self to be alive"); @@ -149,11 +143,7 @@ where } } -impl Actor -where - CFD: xtra::Handler, - M: xtra::Handler, -{ +impl Actor { fn update_pending_attestations(&mut self, ctx: &mut xtra::Context) { for event_id in self.pending_attestations.iter().copied() { if !event_id.has_likely_occured() { @@ -203,14 +193,9 @@ where ) -> Result<()> { tracing::info!("Fetched new attestation for {}", id); - self.cfd_actor_address - .clone() - .do_send_async(attestation.clone()) - .await?; - self.monitor_actor_address - .clone() - .do_send_async(attestation) - .await?; + for channel in self.attestation_channels.iter() { + channel.do_send(attestation.clone())?; + } self.pending_attestations.remove(&id); @@ -219,7 +204,7 @@ where } #[async_trait] -impl xtra::Handler for Actor { +impl xtra::Handler for Actor { async fn handle(&mut self, msg: MonitorAttestation, _ctx: &mut xtra::Context) { if !self.pending_attestations.insert(msg.event_id) { tracing::trace!("Attestation {} already being monitored", msg.event_id); @@ -228,7 +213,7 @@ impl xtra::Handler for Actor xtra::Handler for Actor { +impl xtra::Handler for Actor { async fn handle(&mut self, msg: FetchAnnouncement, _ctx: &mut xtra::Context) { if !self.pending_announcements.insert(msg.0) { tracing::trace!("Announcement {} already being fetched", msg.0); @@ -237,7 +222,7 @@ impl xtra::Handler for Actor xtra::Handler for Actor { +impl xtra::Handler for Actor { async fn handle( &mut self, msg: GetAnnouncement, @@ -254,7 +239,7 @@ impl xtra::Handler for Actor } #[async_trait] -impl xtra::Handler for Actor { +impl xtra::Handler for Actor { async fn handle(&mut self, msg: NewAnnouncementFetched, _ctx: &mut xtra::Context) { self.pending_announcements.remove(&msg.id); self.announcements @@ -263,11 +248,7 @@ impl xtra::Handler for Actor xtra::Handler for Actor -where - CFD: xtra::Handler, - M: xtra::Handler, -{ +impl xtra::Handler for Actor { async fn handle(&mut self, msg: NewAttestationFetched, _ctx: &mut xtra::Context) { log_error!(self.handle_new_attestation_fetched(msg.id, msg.attestation)); } @@ -310,14 +291,10 @@ impl From for cfd_protocol::Announcement { } } -impl xtra::Actor for Actor {} +impl xtra::Actor for Actor {} #[async_trait] -impl xtra::Handler for Actor -where - CFD: xtra::Handler, - M: xtra::Handler, -{ +impl xtra::Handler for Actor { async fn handle(&mut self, _: Sync, ctx: &mut xtra::Context) { self.update_pending_announcements(ctx); self.update_pending_attestations(ctx); diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index d77246f..aebc453 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -254,9 +254,11 @@ async fn main() -> Result<()> { .unwrap(), ); tokio::spawn(oracle_actor_context.run(oracle::Actor::new( - cfd_actor_inbox.clone(), - monitor_actor_address, cfds, + [ + Box::new(cfd_actor_inbox.clone()), + Box::new(monitor_actor_address), + ], ))); oracle_actor_address diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index cf94305..3cc3d7f 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -80,7 +80,7 @@ pub struct Actor { monitor_actor: Address, setup_state: SetupState, roll_over_state: RollOverState, - oracle_actor: Address>, + oracle_actor: Address, current_pending_proposals: UpdateCfdProposals, } @@ -95,7 +95,7 @@ impl Actor { update_cfd_feed_sender: watch::Sender, send_to_maker: Address>, monitor_actor: Address, - oracle_actor: Address>, + oracle_actor: Address, ) -> Self { Self { db,