From a416283094a796525980ce2170304890992b74fa Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 12 Oct 2021 18:06:17 +1100 Subject: [PATCH] Only fetch attestation that are likely to have occured Fixes #280. --- daemon/src/model.rs | 17 ++++++ daemon/src/oracle.rs | 133 ++++++++++++++++++++++++++----------------- 2 files changed, 97 insertions(+), 53 deletions(-) diff --git a/daemon/src/model.rs b/daemon/src/model.rs index 520cea7..cec96b5 100644 --- a/daemon/src/model.rs +++ b/daemon/src/model.rs @@ -135,6 +135,15 @@ impl BitMexPriceEventId { Self::new(timestamp, 20) } + /// Checks whether this event has likely already occurred. + /// + /// We can't be sure about it because our local clock might be off from the oracle's clock. + pub fn has_likely_occured(&self) -> bool { + let now = OffsetDateTime::now_utc(); + + now > self.timestamp + } + pub fn to_olivia_url(self) -> Url { "https://h00.ooo" .parse::() @@ -210,4 +219,12 @@ mod tests { assert_eq!(now.timestamp.nanosecond(), 0); } + + #[test] + fn has_occured_if_in_the_past() { + let past_event = + BitMexPriceEventId::with_20_digits(datetime!(2021-09-23 10:00:00).assume_utc()); + + assert!(past_event.has_likely_occured()); + } } diff --git a/daemon/src/oracle.rs b/daemon/src/oracle.rs index 2810e5d..d96f6f0 100644 --- a/daemon/src/oracle.rs +++ b/daemon/src/oracle.rs @@ -5,7 +5,6 @@ use crate::tokio_ext; use anyhow::{Context, Result}; use async_trait::async_trait; use cfd_protocol::secp256k1_zkp::{schnorrsig, SecretKey}; -use reqwest::StatusCode; use rocket::time::{OffsetDateTime, Time}; use serde::Deserialize; use std::collections::{HashMap, HashSet}; @@ -59,6 +58,13 @@ struct NewAnnouncementFetched { nonce_pks: Vec, } +/// A module-private message to allow parallelization of fetching attestations. +#[derive(Debug)] +struct NewAttestationFetched { + id: BitMexPriceEventId, + attestation: Attestation, +} + impl Actor { pub fn new( cfd_actor_address: xtra::Address, @@ -108,10 +114,9 @@ where M: 'static, { fn update_pending_announcements(&mut self, ctx: &mut xtra::Context) { - let this = ctx.address().expect("self to be alive"); - for event_id in self.pending_announcements.iter().cloned() { - let this = this.clone(); + let this = ctx.address().expect("self to be alive"); + tokio_ext::spawn_fallible(async move { let url = event_id.to_olivia_url(); @@ -148,59 +153,65 @@ where CFD: xtra::Handler, M: xtra::Handler, { - async fn update_state(&mut self, ctx: &mut xtra::Context) -> Result<()> { - self.update_pending_announcements(ctx); - self.update_pending_attestations() - .await - .context("failed to update pending attestations")?; + fn update_pending_attestations(&mut self, ctx: &mut xtra::Context) { + for event_id in self.pending_attestations.iter().copied() { + if !event_id.has_likely_occured() { + tracing::trace!( + "Skipping {} because it likely hasn't occurred yet", + event_id + ); + + continue; + } - Ok(()) - } + let this = ctx.address().expect("self to be alive"); + + tokio_ext::spawn_fallible(async move { + let url = event_id.to_olivia_url(); + + tracing::debug!("Fetching attestation for {}", event_id); + + let response = reqwest::get(url.clone()) + .await + .with_context(|| format!("Failed to GET {}", url))?; - async fn update_pending_attestations(&mut self) -> Result<()> { - let pending_attestations = self.pending_attestations.clone(); - for event_id in pending_attestations.into_iter() { - { - let res = match reqwest::get(event_id.to_olivia_url()).await { - Ok(res) if res.status().is_success() => res, - Ok(res) if res.status() == StatusCode::NOT_FOUND => { - tracing::trace!("Attestation not ready yet"); - continue; - } - Ok(res) => { - tracing::warn!("Unexpected response, status {}", res.status()); - continue; - } - Err(e) => { - tracing::warn!(%event_id, "Failed to fetch attestation: {}", e); - continue; - } - }; - - let attestation = match res + if !response.status().is_success() { + anyhow::bail!("GET {} responded with {}", url, response.status()); + } + + let attestation = response .json::() .await - .with_context(|| format!("Failed to decode body for event {}", event_id)) - { - Ok(attestation) => attestation, - Err(e) => { - tracing::debug!("{:#}", e); - continue; - } - }; - - self.cfd_actor_address - .clone() - .do_send_async(attestation.clone()) - .await?; - self.monitor_actor_address - .clone() - .do_send_async(attestation) - .await?; - - self.pending_attestations.remove(&event_id); - } + .context("Failed to deserialize as Attestation")?; + + this.send(NewAttestationFetched { + id: event_id, + attestation, + }) + .await?; + + Ok(()) + }); } + } + + async fn handle_new_attestation_fetched( + &mut self, + id: BitMexPriceEventId, + attestation: Attestation, + ) -> Result<()> { + tracing::info!("Fetched new attestation for {}", id); + + self.cfd_actor_address + .clone() + .do_send_async(attestation.clone()) + .await?; + self.monitor_actor_address + .clone() + .do_send_async(attestation) + .await?; + + self.pending_attestations.remove(&id); Ok(()) } @@ -250,6 +261,17 @@ impl xtra::Handler for Actor xtra::Handler for Actor +where + CFD: xtra::Handler, + M: xtra::Handler, +{ + async fn handle(&mut self, msg: NewAttestationFetched, _ctx: &mut xtra::Context) { + log_error!(self.handle_new_attestation_fetched(msg.id, msg.attestation)); + } +} + #[allow(dead_code)] pub fn next_announcement_after(timestamp: OffsetDateTime) -> Result { let adjusted = ceil_to_next_hour(timestamp)?; @@ -296,7 +318,8 @@ where M: xtra::Handler, { async fn handle(&mut self, _: Sync, ctx: &mut xtra::Context) { - log_error!(self.update_state(ctx)) + self.update_pending_announcements(ctx); + self.update_pending_attestations(ctx); } } @@ -323,6 +346,10 @@ impl xtra::Message for NewAnnouncementFetched { type Result = (); } +impl xtra::Message for NewAttestationFetched { + type Result = (); +} + mod olivia_api { use crate::model::BitMexPriceEventId; use anyhow::Context;