Browse Source

Only fetch attestation that are likely to have occured

Fixes #280.
refactor/no-log-handler
Thomas Eizinger 3 years ago
parent
commit
a416283094
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 17
      daemon/src/model.rs
  2. 133
      daemon/src/oracle.rs

17
daemon/src/model.rs

@ -135,6 +135,15 @@ impl BitMexPriceEventId {
Self::new(timestamp, 20)
}
/// Checks whether this event has likely already occurred.
///
/// We can't be sure about it because our local clock might be off from the oracle's clock.
pub fn has_likely_occured(&self) -> bool {
let now = OffsetDateTime::now_utc();
now > self.timestamp
}
pub fn to_olivia_url(self) -> Url {
"https://h00.ooo"
.parse::<Url>()
@ -210,4 +219,12 @@ mod tests {
assert_eq!(now.timestamp.nanosecond(), 0);
}
#[test]
fn has_occured_if_in_the_past() {
let past_event =
BitMexPriceEventId::with_20_digits(datetime!(2021-09-23 10:00:00).assume_utc());
assert!(past_event.has_likely_occured());
}
}

133
daemon/src/oracle.rs

@ -5,7 +5,6 @@ use crate::tokio_ext;
use anyhow::{Context, Result};
use async_trait::async_trait;
use cfd_protocol::secp256k1_zkp::{schnorrsig, SecretKey};
use reqwest::StatusCode;
use rocket::time::{OffsetDateTime, Time};
use serde::Deserialize;
use std::collections::{HashMap, HashSet};
@ -59,6 +58,13 @@ struct NewAnnouncementFetched {
nonce_pks: Vec<schnorrsig::PublicKey>,
}
/// A module-private message to allow parallelization of fetching attestations.
#[derive(Debug)]
struct NewAttestationFetched {
id: BitMexPriceEventId,
attestation: Attestation,
}
impl<CFD, M> Actor<CFD, M> {
pub fn new(
cfd_actor_address: xtra::Address<CFD>,
@ -108,10 +114,9 @@ where
M: 'static,
{
fn update_pending_announcements(&mut self, ctx: &mut xtra::Context<Self>) {
let this = ctx.address().expect("self to be alive");
for event_id in self.pending_announcements.iter().cloned() {
let this = this.clone();
let this = ctx.address().expect("self to be alive");
tokio_ext::spawn_fallible(async move {
let url = event_id.to_olivia_url();
@ -148,59 +153,65 @@ where
CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
async fn update_state(&mut self, ctx: &mut xtra::Context<Self>) -> Result<()> {
self.update_pending_announcements(ctx);
self.update_pending_attestations()
.await
.context("failed to update pending attestations")?;
fn update_pending_attestations(&mut self, ctx: &mut xtra::Context<Self>) {
for event_id in self.pending_attestations.iter().copied() {
if !event_id.has_likely_occured() {
tracing::trace!(
"Skipping {} because it likely hasn't occurred yet",
event_id
);
continue;
}
Ok(())
}
let this = ctx.address().expect("self to be alive");
tokio_ext::spawn_fallible(async move {
let url = event_id.to_olivia_url();
tracing::debug!("Fetching attestation for {}", event_id);
let response = reqwest::get(url.clone())
.await
.with_context(|| format!("Failed to GET {}", url))?;
async fn update_pending_attestations(&mut self) -> Result<()> {
let pending_attestations = self.pending_attestations.clone();
for event_id in pending_attestations.into_iter() {
{
let res = match reqwest::get(event_id.to_olivia_url()).await {
Ok(res) if res.status().is_success() => res,
Ok(res) if res.status() == StatusCode::NOT_FOUND => {
tracing::trace!("Attestation not ready yet");
continue;
}
Ok(res) => {
tracing::warn!("Unexpected response, status {}", res.status());
continue;
}
Err(e) => {
tracing::warn!(%event_id, "Failed to fetch attestation: {}", e);
continue;
}
};
let attestation = match res
if !response.status().is_success() {
anyhow::bail!("GET {} responded with {}", url, response.status());
}
let attestation = response
.json::<Attestation>()
.await
.with_context(|| format!("Failed to decode body for event {}", event_id))
{
Ok(attestation) => attestation,
Err(e) => {
tracing::debug!("{:#}", e);
continue;
}
};
self.cfd_actor_address
.clone()
.do_send_async(attestation.clone())
.await?;
self.monitor_actor_address
.clone()
.do_send_async(attestation)
.await?;
self.pending_attestations.remove(&event_id);
}
.context("Failed to deserialize as Attestation")?;
this.send(NewAttestationFetched {
id: event_id,
attestation,
})
.await?;
Ok(())
});
}
}
async fn handle_new_attestation_fetched(
&mut self,
id: BitMexPriceEventId,
attestation: Attestation,
) -> Result<()> {
tracing::info!("Fetched new attestation for {}", id);
self.cfd_actor_address
.clone()
.do_send_async(attestation.clone())
.await?;
self.monitor_actor_address
.clone()
.do_send_async(attestation)
.await?;
self.pending_attestations.remove(&id);
Ok(())
}
@ -250,6 +261,17 @@ impl<CFD: 'static, M: 'static> xtra::Handler<NewAnnouncementFetched> for Actor<C
}
}
#[async_trait]
impl<CFD, M> xtra::Handler<NewAttestationFetched> for Actor<CFD, M>
where
CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
async fn handle(&mut self, msg: NewAttestationFetched, _ctx: &mut xtra::Context<Self>) {
log_error!(self.handle_new_attestation_fetched(msg.id, msg.attestation));
}
}
#[allow(dead_code)]
pub fn next_announcement_after(timestamp: OffsetDateTime) -> Result<BitMexPriceEventId> {
let adjusted = ceil_to_next_hour(timestamp)?;
@ -296,7 +318,8 @@ where
M: xtra::Handler<Attestation>,
{
async fn handle(&mut self, _: Sync, ctx: &mut xtra::Context<Self>) {
log_error!(self.update_state(ctx))
self.update_pending_announcements(ctx);
self.update_pending_attestations(ctx);
}
}
@ -323,6 +346,10 @@ impl xtra::Message for NewAnnouncementFetched {
type Result = ();
}
impl xtra::Message for NewAttestationFetched {
type Result = ();
}
mod olivia_api {
use crate::model::BitMexPriceEventId;
use anyhow::Context;

Loading…
Cancel
Save