Browse Source

Replace CFD actor Address with MessageChannel in oracle::Actor

The `oracle::Actor` only needs to be able to send the
`oracle::Attestation` to an actor who can handle it. We were already
able to express this using trait bounds, but it was blocking us from
being able to make the `{maker_taker}_cfd::Actor` generic over
different actors due to infinite, cyclic types.
refactor/no-log-handler
Lucas Soriano del Pino 3 years ago
parent
commit
09c331f547
No known key found for this signature in database GPG Key ID: EE611E973A1530E7
  1. 6
      daemon/src/maker.rs
  2. 4
      daemon/src/maker_cfd.rs
  3. 59
      daemon/src/oracle.rs
  4. 6
      daemon/src/taker.rs
  5. 4
      daemon/src/taker_cfd.rs

6
daemon/src/maker.rs

@ -260,9 +260,11 @@ async fn main() -> Result<()> {
.unwrap(),
);
tokio::spawn(oracle_actor_context.run(oracle::Actor::new(
cfd_maker_actor_inbox.clone(),
monitor_actor_address,
cfds,
[
Box::new(cfd_maker_actor_inbox.clone()),
Box::new(monitor_actor_address),
],
)));
oracle_actor_address

4
daemon/src/maker_cfd.rs

@ -89,7 +89,7 @@ pub struct Actor {
monitor_actor: Address<monitor::Actor>,
setup_state: SetupState,
roll_over_state: RollOverState,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor>>,
oracle_actor: Address<oracle::Actor>,
// Maker needs to also store TakerId to be able to send a reply back
current_pending_proposals: HashMap<OrderId, (UpdateCfdProposal, TakerId)>,
// TODO: Persist instead of using an in-memory hashmap for resiliency?
@ -125,7 +125,7 @@ impl Actor {
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
takers: Address<maker_inc_connections::Actor>,
monitor_actor: Address<monitor::Actor>,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor>>,
oracle_actor: Address<oracle::Actor>,
) -> Self {
Self {
db,

59
daemon/src/oracle.rs

@ -9,13 +9,13 @@ use serde::Deserialize;
use std::collections::{HashMap, HashSet};
use std::ops::Add;
use time::ext::NumericalDuration;
use xtra::prelude::StrongMessageChannel;
pub struct Actor<CFD, M> {
pub struct Actor {
announcements: HashMap<BitMexPriceEventId, (OffsetDateTime, Vec<schnorrsig::PublicKey>)>,
pending_announcements: HashSet<BitMexPriceEventId>,
pending_attestations: HashSet<BitMexPriceEventId>,
cfd_actor_address: xtra::Address<CFD>,
monitor_actor_address: xtra::Address<M>,
attestation_channels: [Box<dyn StrongMessageChannel<Attestation>>; 2],
}
pub struct Sync;
@ -64,11 +64,10 @@ struct NewAttestationFetched {
attestation: Attestation,
}
impl<CFD, M> Actor<CFD, M> {
impl Actor {
pub fn new(
cfd_actor_address: xtra::Address<CFD>,
monitor_actor_address: xtra::Address<M>,
cfds: Vec<Cfd>,
attestation_channels: [Box<dyn StrongMessageChannel<Attestation>>; 2],
) -> Self {
let mut pending_attestations = HashSet::new();
@ -103,17 +102,12 @@ impl<CFD, M> Actor<CFD, M> {
announcements: HashMap::new(),
pending_announcements: HashSet::new(),
pending_attestations,
cfd_actor_address,
monitor_actor_address,
attestation_channels,
}
}
}
impl<CFD, M> Actor<CFD, M>
where
CFD: 'static,
M: 'static,
{
impl Actor {
fn update_pending_announcements(&mut self, ctx: &mut xtra::Context<Self>) {
for event_id in self.pending_announcements.iter().cloned() {
let this = ctx.address().expect("self to be alive");
@ -149,11 +143,7 @@ where
}
}
impl<CFD, M> Actor<CFD, M>
where
CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
impl Actor {
fn update_pending_attestations(&mut self, ctx: &mut xtra::Context<Self>) {
for event_id in self.pending_attestations.iter().copied() {
if !event_id.has_likely_occured() {
@ -203,14 +193,9 @@ where
) -> Result<()> {
tracing::info!("Fetched new attestation for {}", id);
self.cfd_actor_address
.clone()
.do_send_async(attestation.clone())
.await?;
self.monitor_actor_address
.clone()
.do_send_async(attestation)
.await?;
for channel in self.attestation_channels.iter() {
channel.do_send(attestation.clone())?;
}
self.pending_attestations.remove(&id);
@ -219,7 +204,7 @@ where
}
#[async_trait]
impl<CFD: 'static, M: 'static> xtra::Handler<MonitorAttestation> for Actor<CFD, M> {
impl xtra::Handler<MonitorAttestation> for Actor {
async fn handle(&mut self, msg: MonitorAttestation, _ctx: &mut xtra::Context<Self>) {
if !self.pending_attestations.insert(msg.event_id) {
tracing::trace!("Attestation {} already being monitored", msg.event_id);
@ -228,7 +213,7 @@ impl<CFD: 'static, M: 'static> xtra::Handler<MonitorAttestation> for Actor<CFD,
}
#[async_trait]
impl<CFD: 'static, M: 'static> xtra::Handler<FetchAnnouncement> for Actor<CFD, M> {
impl xtra::Handler<FetchAnnouncement> for Actor {
async fn handle(&mut self, msg: FetchAnnouncement, _ctx: &mut xtra::Context<Self>) {
if !self.pending_announcements.insert(msg.0) {
tracing::trace!("Announcement {} already being fetched", msg.0);
@ -237,7 +222,7 @@ impl<CFD: 'static, M: 'static> xtra::Handler<FetchAnnouncement> for Actor<CFD, M
}
#[async_trait]
impl<CFD: 'static, M: 'static> xtra::Handler<GetAnnouncement> for Actor<CFD, M> {
impl xtra::Handler<GetAnnouncement> for Actor {
async fn handle(
&mut self,
msg: GetAnnouncement,
@ -254,7 +239,7 @@ impl<CFD: 'static, M: 'static> xtra::Handler<GetAnnouncement> for Actor<CFD, M>
}
#[async_trait]
impl<CFD: 'static, M: 'static> xtra::Handler<NewAnnouncementFetched> for Actor<CFD, M> {
impl xtra::Handler<NewAnnouncementFetched> for Actor {
async fn handle(&mut self, msg: NewAnnouncementFetched, _ctx: &mut xtra::Context<Self>) {
self.pending_announcements.remove(&msg.id);
self.announcements
@ -263,11 +248,7 @@ impl<CFD: 'static, M: 'static> xtra::Handler<NewAnnouncementFetched> for Actor<C
}
#[async_trait]
impl<CFD, M> xtra::Handler<NewAttestationFetched> for Actor<CFD, M>
where
CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
impl xtra::Handler<NewAttestationFetched> for Actor {
async fn handle(&mut self, msg: NewAttestationFetched, _ctx: &mut xtra::Context<Self>) {
log_error!(self.handle_new_attestation_fetched(msg.id, msg.attestation));
}
@ -310,14 +291,10 @@ impl From<Announcement> for cfd_protocol::Announcement {
}
}
impl<CFD: 'static, M: 'static> xtra::Actor for Actor<CFD, M> {}
impl xtra::Actor for Actor {}
#[async_trait]
impl<CFD, M> xtra::Handler<Sync> for Actor<CFD, M>
where
CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
impl xtra::Handler<Sync> for Actor {
async fn handle(&mut self, _: Sync, ctx: &mut xtra::Context<Self>) {
self.update_pending_announcements(ctx);
self.update_pending_attestations(ctx);

6
daemon/src/taker.rs

@ -254,9 +254,11 @@ async fn main() -> Result<()> {
.unwrap(),
);
tokio::spawn(oracle_actor_context.run(oracle::Actor::new(
cfd_actor_inbox.clone(),
monitor_actor_address,
cfds,
[
Box::new(cfd_actor_inbox.clone()),
Box::new(monitor_actor_address),
],
)));
oracle_actor_address

4
daemon/src/taker_cfd.rs

@ -80,7 +80,7 @@ pub struct Actor {
monitor_actor: Address<monitor::Actor>,
setup_state: SetupState,
roll_over_state: RollOverState,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor>>,
oracle_actor: Address<oracle::Actor>,
current_pending_proposals: UpdateCfdProposals,
}
@ -95,7 +95,7 @@ impl Actor {
update_cfd_feed_sender: watch::Sender<UpdateCfdProposals>,
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>,
monitor_actor: Address<monitor::Actor>,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor>>,
oracle_actor: Address<oracle::Actor>,
) -> Self {
Self {
db,

Loading…
Cancel
Save