diff --git a/daemon/src/oracle.rs b/daemon/src/oracle.rs index 27479cc..299a567 100644 --- a/daemon/src/oracle.rs +++ b/daemon/src/oracle.rs @@ -4,12 +4,12 @@ use crate::model::OracleEventId; use anyhow::{Context, Result}; use async_trait::async_trait; use cfd_protocol::secp256k1_zkp::{schnorrsig, SecretKey}; -use futures::stream::FuturesOrdered; +use futures::stream::FuturesUnordered; use futures::TryStreamExt; use reqwest::StatusCode; use rocket::time::format_description::FormatItem; use rocket::time::macros::format_description; -use rocket::time::{Duration, OffsetDateTime, Time}; +use rocket::time::{OffsetDateTime, Time}; use serde::Deserialize; use std::collections::{HashMap, HashSet}; use std::ops::Add; @@ -84,10 +84,14 @@ where } async fn update_latest_announcements(&mut self) -> Result<()> { - self.latest_announcements = next_ids() + let new_announcements = next_ids()? .into_iter() + .filter(|event_id| !self.latest_announcements.contains_key(event_id)) .map(|event_id| async move { let url = event_id.to_olivia_url(); + + tracing::debug!("Fetching attestation for {}", event_id); + let response = reqwest::get(url.clone()) .await .with_context(|| format!("Failed to GET {}", url))?; @@ -102,10 +106,12 @@ where .context("Failed to deserialize as Announcement")?; Result::<_, anyhow::Error>::Ok((event_id, announcement)) }) - .collect::>() - .try_collect() + .collect::>() + .try_collect::>() .await?; + self.latest_announcements.extend(new_announcements); // FIXME: This results in linear memory growth. + Ok(()) } @@ -200,30 +206,36 @@ impl xtra::Handler for Actor /// Construct the URL of the next 24 `BitMEX/BXBT` hourly events /// `olivia` will attest to. -fn next_ids() -> Vec { - next_24_hours(OffsetDateTime::now_utc()) +fn next_ids() -> Result> { + let ids = next_24_hours(OffsetDateTime::now_utc())? .into_iter() .map(event_id) - .collect() + .collect(); + + Ok(ids) } -fn next_24_hours(datetime: OffsetDateTime) -> Vec { - let adjusted = datetime.replace_time(Time::from_hms(datetime.hour(), 0, 0).expect("in_range")); - (1..=24).map(|i| adjusted + Duration::hours(i)).collect() +fn next_24_hours(datetime: OffsetDateTime) -> Result> { + let adjusted = ceil_to_next_hour(datetime)?; + let timestamps = (0..=24).map(|i| adjusted + i.hours()).collect(); + + Ok(timestamps) } #[allow(dead_code)] pub fn next_announcement_after(timestamp: OffsetDateTime) -> Result { - // always ceil to next hour and roll over to next day automatically + let adjusted = ceil_to_next_hour(timestamp)?; - let timestamp = timestamp.add(1.hours()); + Ok(event_id(adjusted)) +} - let adjusted = timestamp.replace_time( - Time::from_hms(timestamp.hour(), 0, 0) - .context("Could not adjust time for next announcement")?, - ); +fn ceil_to_next_hour(original: OffsetDateTime) -> Result { + let timestamp = original.add(1.hours()); + let exact_hour = Time::from_hms(timestamp.hour(), 0, 0) + .context("Could not adjust time for next announcement")?; + let adjusted = timestamp.replace_time(exact_hour); - Ok(event_id(adjusted)) + Ok(adjusted) } /// Construct the URL of `olivia`'s `BitMEX/BXBT` event to be attested @@ -575,7 +587,7 @@ mod tests { fn next_24() { let datetime = datetime!(2021-09-23 10:43:12); - let next_24_hours = next_24_hours(datetime.assume_utc()); + let next_24_hours = next_24_hours(datetime.assume_utc()).unwrap(); let expected = vec![ datetime!(2021-09-23 11:00:00).assume_utc(), datetime!(2021-09-23 12:00:00).assume_utc(), @@ -601,6 +613,7 @@ mod tests { datetime!(2021-09-24 08:00:00).assume_utc(), datetime!(2021-09-24 09:00:00).assume_utc(), datetime!(2021-09-24 10:00:00).assume_utc(), + datetime!(2021-09-24 11:00:00).assume_utc(), ]; assert_eq!(next_24_hours, expected)