Browse Source

Use xtra_productivity in `oracle::Actor`

contact-taker-before-changing-cfd-state
Thomas Eizinger 3 years ago
parent
commit
01546cdd4f
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 66
      daemon/src/oracle.rs

66
daemon/src/oracle.rs

@ -10,6 +10,7 @@ use std::collections::{HashMap, HashSet};
use std::ops::Add;
use time::ext::NumericalDuration;
use xtra::prelude::StrongMessageChannel;
use xtra_productivity::xtra_productivity;
pub struct Actor {
announcements: HashMap<BitMexPriceEventId, (OffsetDateTime, Vec<schnorrsig::PublicKey>)>,
@ -195,27 +196,29 @@ impl Actor {
}
}
#[async_trait]
impl xtra::Handler<MonitorAttestation> for Actor {
async fn handle(&mut self, msg: MonitorAttestation, _ctx: &mut xtra::Context<Self>) {
#[xtra_productivity]
impl Actor {
fn handle_monitor_attestation(
&mut self,
msg: MonitorAttestation,
_ctx: &mut xtra::Context<Self>,
) {
if !self.pending_attestations.insert(msg.event_id) {
tracing::trace!("Attestation {} already being monitored", msg.event_id);
}
}
}
#[async_trait]
impl xtra::Handler<FetchAnnouncement> for Actor {
async fn handle(&mut self, msg: FetchAnnouncement, _ctx: &mut xtra::Context<Self>) {
fn handle_fetch_announcement(
&mut self,
msg: FetchAnnouncement,
_ctx: &mut xtra::Context<Self>,
) {
if !self.pending_announcements.insert(msg.0) {
tracing::trace!("Announcement {} already being fetched", msg.0);
}
}
}
#[async_trait]
impl xtra::Handler<GetAnnouncement> for Actor {
async fn handle(
fn handle_get_announcement(
&mut self,
msg: GetAnnouncement,
_ctx: &mut xtra::Context<Self>,
@ -235,15 +238,21 @@ impl xtra::Handler<GetAnnouncement> for Actor {
announcement
}
}
#[async_trait]
impl xtra::Handler<NewAnnouncementFetched> for Actor {
async fn handle(&mut self, msg: NewAnnouncementFetched, _ctx: &mut xtra::Context<Self>) {
fn handle_new_announcement_fetched(
&mut self,
msg: NewAnnouncementFetched,
_ctx: &mut xtra::Context<Self>,
) {
self.pending_announcements.remove(&msg.id);
self.announcements
.insert(msg.id, (msg.expected_outcome_time, msg.nonce_pks));
}
fn handle_sync(&mut self, _: Sync, ctx: &mut xtra::Context<Self>) {
self.update_pending_announcements(ctx);
self.update_pending_attestations(ctx);
}
}
#[async_trait]
@ -291,37 +300,10 @@ impl From<Announcement> for cfd_protocol::Announcement {
impl xtra::Actor for Actor {}
#[async_trait]
impl xtra::Handler<Sync> for Actor {
async fn handle(&mut self, _: Sync, ctx: &mut xtra::Context<Self>) {
self.update_pending_announcements(ctx);
self.update_pending_attestations(ctx);
}
}
impl xtra::Message for Sync {
type Result = ();
}
impl xtra::Message for MonitorAttestation {
type Result = ();
}
impl xtra::Message for FetchAnnouncement {
type Result = ();
}
impl xtra::Message for GetAnnouncement {
type Result = Option<Announcement>;
}
impl xtra::Message for Attestation {
type Result = ();
}
impl xtra::Message for NewAnnouncementFetched {
type Result = ();
}
impl xtra::Message for NewAttestationFetched {
type Result = ();
}

Loading…
Cancel
Save