diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index b086acc..5bbd20a 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -276,8 +276,10 @@ async fn main() -> Result<()> { cfds, ))); - // use `.send` here to ensure we only continue once the update was processed - oracle_actor_address.send(oracle::Sync).await.unwrap(); + oracle_actor_address + .do_send_async(oracle::Sync) + .await + .unwrap(); let listener_stream = futures::stream::poll_fn(move |ctx| { let message = match futures::ready!(listener.poll_accept(ctx)) { diff --git a/daemon/src/oracle.rs b/daemon/src/oracle.rs index 299a567..5399cef 100644 --- a/daemon/src/oracle.rs +++ b/daemon/src/oracle.rs @@ -4,8 +4,6 @@ use crate::model::OracleEventId; use anyhow::{Context, Result}; use async_trait::async_trait; use cfd_protocol::secp256k1_zkp::{schnorrsig, SecretKey}; -use futures::stream::FuturesUnordered; -use futures::TryStreamExt; use reqwest::StatusCode; use rocket::time::format_description::FormatItem; use rocket::time::macros::format_description; @@ -25,6 +23,31 @@ pub struct Actor { monitor_actor_address: xtra::Address, } +pub struct Sync; + +pub struct MonitorEvent { + pub event_id: OracleEventId, +} + +#[derive(Debug, Clone)] +pub struct GetAnnouncement(pub OracleEventId); + +// TODO: Split xtra::Message and API object +#[derive(Debug, Clone, Deserialize, PartialEq)] +#[serde(try_from = "olivia_api::Response")] +pub struct Attestation { + pub id: OracleEventId, + pub price: u64, + pub scalars: Vec, +} + +/// A module-private message to allow parallelization of fetching announcements. +#[derive(Debug)] +struct NewAnnouncementFetched { + id: OracleEventId, + announcement: Announcement, +} + impl Actor { pub fn new( cfd_actor_address: xtra::Address, @@ -72,8 +95,8 @@ where CFD: xtra::Handler, M: xtra::Handler, { - async fn update_state(&mut self) -> Result<()> { - self.update_latest_announcements() + async fn update_state(&mut self, ctx: &mut xtra::Context) -> Result<()> { + self.update_latest_announcements(ctx) .await .context("failed to update announcements")?; self.update_pending_attestations() @@ -83,11 +106,15 @@ where Ok(()) } - async fn update_latest_announcements(&mut self) -> Result<()> { - let new_announcements = next_ids()? - .into_iter() - .filter(|event_id| !self.latest_announcements.contains_key(event_id)) - .map(|event_id| async move { + async fn update_latest_announcements(&mut self, ctx: &mut xtra::Context) -> Result<()> { + for event_id in next_ids()? { + if self.latest_announcements.contains_key(&event_id) { + continue; + } + + let this = ctx.address().expect("self to be alive"); + + tokio::spawn(async move { let url = event_id.to_olivia_url(); tracing::debug!("Fetching attestation for {}", event_id); @@ -104,13 +131,16 @@ where .json::() .await .context("Failed to deserialize as Announcement")?; - Result::<_, anyhow::Error>::Ok((event_id, announcement)) - }) - .collect::>() - .try_collect::>() - .await?; - self.latest_announcements.extend(new_announcements); // FIXME: This results in linear memory growth. + this.send(NewAnnouncementFetched { + id: event_id, + announcement, + }) + .await?; + + Ok(()) + }); + } Ok(()) } @@ -135,7 +165,17 @@ where } }; - let attestation = res.json::().await?; + let attestation = match res + .json::() + .await + .with_context(|| format!("Failed to decode body for event {}", event_id)) + { + Ok(attestation) => attestation, + Err(e) => { + tracing::debug!("{:#}", e); + continue; + } + }; self.cfd_actor_address .clone() @@ -154,33 +194,6 @@ where } } -impl xtra::Actor for Actor {} - -pub struct Sync; - -impl xtra::Message for Sync { - type Result = (); -} - -#[async_trait] -impl xtra::Handler for Actor -where - CFD: xtra::Handler, - M: xtra::Handler, -{ - async fn handle(&mut self, _: Sync, _ctx: &mut xtra::Context) { - log_error!(self.update_state()) - } -} - -pub struct MonitorEvent { - pub event_id: OracleEventId, -} - -impl xtra::Message for MonitorEvent { - type Result = (); -} - #[async_trait] impl xtra::Handler for Actor { async fn handle(&mut self, msg: MonitorEvent, _ctx: &mut xtra::Context) { @@ -190,9 +203,6 @@ impl xtra::Handler for Actor { } } -#[derive(Debug, Clone)] -pub struct GetAnnouncement(pub OracleEventId); - #[async_trait] impl xtra::Handler for Actor { async fn handle( @@ -204,6 +214,13 @@ impl xtra::Handler for Actor } } +#[async_trait] +impl xtra::Handler for Actor { + async fn handle(&mut self, msg: NewAnnouncementFetched, _ctx: &mut xtra::Context) { + self.latest_announcements.insert(msg.id, msg.announcement); + } +} + /// Construct the URL of the next 24 `BitMEX/BXBT` hourly events /// `olivia` will attest to. fn next_ids() -> Result> { @@ -269,14 +286,24 @@ impl From for cfd_protocol::Announcement { } } -// TODO: Implement real deserialization once price attestation is -// implemented in `olivia` -#[derive(Debug, Clone, Deserialize, PartialEq)] -#[serde(try_from = "olivia_api::Response")] -pub struct Attestation { - pub id: OracleEventId, - pub price: u64, - pub scalars: Vec, +impl xtra::Actor for Actor {} + +#[async_trait] +impl xtra::Handler for Actor +where + CFD: xtra::Handler, + M: xtra::Handler, +{ + async fn handle(&mut self, _: Sync, ctx: &mut xtra::Context) { + log_error!(self.update_state(ctx)) + } +} + +impl xtra::Message for Sync { + type Result = (); +} +impl xtra::Message for MonitorEvent { + type Result = (); } impl xtra::Message for GetAnnouncement { @@ -287,6 +314,10 @@ impl xtra::Message for Attestation { type Result = (); } +impl xtra::Message for NewAnnouncementFetched { + type Result = (); +} + mod olivia_api { use crate::model::OracleEventId; use anyhow::Context; @@ -325,7 +356,7 @@ mod olivia_api { let data = serde_json::from_str::(&response.announcement.oracle_event.data)?; - let attestation = response.attestation.context("Missing attestation")?; + let attestation = response.attestation.context("attestation missing")?; Ok(Self { id: OracleEventId(data.id), diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 0b29c91..c8bfc87 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -274,8 +274,10 @@ async fn main() -> Result<()> { cfds, ))); - // use `.send` here to ensure we only continue once the update was processed - oracle_actor_address.send(oracle::Sync).await.unwrap(); + oracle_actor_address + .do_send_async(oracle::Sync) + .await + .unwrap(); Ok(rocket.manage(cfd_actor_inbox).manage(cfd_feed_receiver)) },