Browse Source

Parallelize fetching of announcements via self messages

This significantly improves startup time.
refactor/no-log-handler
Thomas Eizinger 3 years ago
parent
commit
c1c3f9e0d9
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 6
      daemon/src/maker.rs
  2. 57
      daemon/src/oracle.rs
  3. 6
      daemon/src/taker.rs

6
daemon/src/maker.rs

@ -276,8 +276,10 @@ async fn main() -> Result<()> {
cfds, cfds,
))); )));
// use `.send` here to ensure we only continue once the update was processed oracle_actor_address
oracle_actor_address.send(oracle::Sync).await.unwrap(); .do_send_async(oracle::Sync)
.await
.unwrap();
let listener_stream = futures::stream::poll_fn(move |ctx| { let listener_stream = futures::stream::poll_fn(move |ctx| {
let message = match futures::ready!(listener.poll_accept(ctx)) { let message = match futures::ready!(listener.poll_accept(ctx)) {

57
daemon/src/oracle.rs

@ -4,8 +4,6 @@ use crate::model::OracleEventId;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use async_trait::async_trait; use async_trait::async_trait;
use cfd_protocol::secp256k1_zkp::{schnorrsig, SecretKey}; use cfd_protocol::secp256k1_zkp::{schnorrsig, SecretKey};
use futures::stream::FuturesUnordered;
use futures::TryStreamExt;
use reqwest::StatusCode; use reqwest::StatusCode;
use rocket::time::format_description::FormatItem; use rocket::time::format_description::FormatItem;
use rocket::time::macros::format_description; use rocket::time::macros::format_description;
@ -43,6 +41,13 @@ pub struct Attestation {
pub scalars: Vec<SecretKey>, pub scalars: Vec<SecretKey>,
} }
/// A module-private message to allow parallelization of fetching announcements.
#[derive(Debug)]
struct NewAnnouncementFetched {
id: OracleEventId,
announcement: Announcement,
}
impl<CFD, M> Actor<CFD, M> { impl<CFD, M> Actor<CFD, M> {
pub fn new( pub fn new(
cfd_actor_address: xtra::Address<CFD>, cfd_actor_address: xtra::Address<CFD>,
@ -90,8 +95,8 @@ where
CFD: xtra::Handler<Attestation>, CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>, M: xtra::Handler<Attestation>,
{ {
async fn update_state(&mut self) -> Result<()> { async fn update_state(&mut self, ctx: &mut xtra::Context<Self>) -> Result<()> {
self.update_latest_announcements() self.update_latest_announcements(ctx)
.await .await
.context("failed to update announcements")?; .context("failed to update announcements")?;
self.update_pending_attestations() self.update_pending_attestations()
@ -101,11 +106,15 @@ where
Ok(()) Ok(())
} }
async fn update_latest_announcements(&mut self) -> Result<()> { async fn update_latest_announcements(&mut self, ctx: &mut xtra::Context<Self>) -> Result<()> {
let new_announcements = next_ids()? for event_id in next_ids()? {
.into_iter() if self.latest_announcements.contains_key(&event_id) {
.filter(|event_id| !self.latest_announcements.contains_key(event_id)) continue;
.map(|event_id| async move { }
let this = ctx.address().expect("self to be alive");
tokio::spawn(async move {
let url = event_id.to_olivia_url(); let url = event_id.to_olivia_url();
tracing::debug!("Fetching attestation for {}", event_id); tracing::debug!("Fetching attestation for {}", event_id);
@ -122,13 +131,16 @@ where
.json::<Announcement>() .json::<Announcement>()
.await .await
.context("Failed to deserialize as Announcement")?; .context("Failed to deserialize as Announcement")?;
Result::<_, anyhow::Error>::Ok((event_id, announcement))
})
.collect::<FuturesUnordered<_>>()
.try_collect::<HashMap<_, _>>()
.await?;
self.latest_announcements.extend(new_announcements); // FIXME: This results in linear memory growth. this.send(NewAnnouncementFetched {
id: event_id,
announcement,
})
.await?;
Ok(())
});
}
Ok(()) Ok(())
} }
@ -192,6 +204,13 @@ impl<CFD: 'static, M: 'static> xtra::Handler<GetAnnouncement> for Actor<CFD, M>
} }
} }
#[async_trait]
impl<CFD: 'static, M: 'static> xtra::Handler<NewAnnouncementFetched> for Actor<CFD, M> {
async fn handle(&mut self, msg: NewAnnouncementFetched, _ctx: &mut xtra::Context<Self>) {
self.latest_announcements.insert(msg.id, msg.announcement);
}
}
/// Construct the URL of the next 24 `BitMEX/BXBT` hourly events /// Construct the URL of the next 24 `BitMEX/BXBT` hourly events
/// `olivia` will attest to. /// `olivia` will attest to.
fn next_ids() -> Result<Vec<OracleEventId>> { fn next_ids() -> Result<Vec<OracleEventId>> {
@ -265,8 +284,8 @@ where
CFD: xtra::Handler<Attestation>, CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>, M: xtra::Handler<Attestation>,
{ {
async fn handle(&mut self, _: Sync, _ctx: &mut xtra::Context<Self>) { async fn handle(&mut self, _: Sync, ctx: &mut xtra::Context<Self>) {
log_error!(self.update_state()) log_error!(self.update_state(ctx))
} }
} }
@ -285,6 +304,10 @@ impl xtra::Message for Attestation {
type Result = (); type Result = ();
} }
impl xtra::Message for NewAnnouncementFetched {
type Result = ();
}
mod olivia_api { mod olivia_api {
use crate::model::OracleEventId; use crate::model::OracleEventId;
use anyhow::Context; use anyhow::Context;

6
daemon/src/taker.rs

@ -274,8 +274,10 @@ async fn main() -> Result<()> {
cfds, cfds,
))); )));
// use `.send` here to ensure we only continue once the update was processed oracle_actor_address
oracle_actor_address.send(oracle::Sync).await.unwrap(); .do_send_async(oracle::Sync)
.await
.unwrap();
Ok(rocket.manage(cfd_actor_inbox).manage(cfd_feed_receiver)) Ok(rocket.manage(cfd_actor_inbox).manage(cfd_feed_receiver))
}, },

Loading…
Cancel
Save