Browse Source

Merge #279

279: More oracle actor fixes r=thomaseizinger a=thomaseizinger

- Re-organize oracle module a bit
- Parallelize fetching of announcements via self messages
- Add error message to JSON deserializer
- Don't fail attestation sync just because it is not attested yet


Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
refactor/no-log-handler
bors[bot] 3 years ago
committed by GitHub
parent
commit
f526927f53
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      daemon/src/maker.rs
  2. 141
      daemon/src/oracle.rs
  3. 6
      daemon/src/taker.rs

6
daemon/src/maker.rs

@ -276,8 +276,10 @@ async fn main() -> Result<()> {
cfds,
)));
// use `.send` here to ensure we only continue once the update was processed
oracle_actor_address.send(oracle::Sync).await.unwrap();
oracle_actor_address
.do_send_async(oracle::Sync)
.await
.unwrap();
let listener_stream = futures::stream::poll_fn(move |ctx| {
let message = match futures::ready!(listener.poll_accept(ctx)) {

141
daemon/src/oracle.rs

@ -4,8 +4,6 @@ use crate::model::OracleEventId;
use anyhow::{Context, Result};
use async_trait::async_trait;
use cfd_protocol::secp256k1_zkp::{schnorrsig, SecretKey};
use futures::stream::FuturesUnordered;
use futures::TryStreamExt;
use reqwest::StatusCode;
use rocket::time::format_description::FormatItem;
use rocket::time::macros::format_description;
@ -25,6 +23,31 @@ pub struct Actor<CFD, M> {
monitor_actor_address: xtra::Address<M>,
}
pub struct Sync;
pub struct MonitorEvent {
pub event_id: OracleEventId,
}
#[derive(Debug, Clone)]
pub struct GetAnnouncement(pub OracleEventId);
// TODO: Split xtra::Message and API object
#[derive(Debug, Clone, Deserialize, PartialEq)]
#[serde(try_from = "olivia_api::Response")]
pub struct Attestation {
pub id: OracleEventId,
pub price: u64,
pub scalars: Vec<SecretKey>,
}
/// A module-private message to allow parallelization of fetching announcements.
#[derive(Debug)]
struct NewAnnouncementFetched {
id: OracleEventId,
announcement: Announcement,
}
impl<CFD, M> Actor<CFD, M> {
pub fn new(
cfd_actor_address: xtra::Address<CFD>,
@ -72,8 +95,8 @@ where
CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
async fn update_state(&mut self) -> Result<()> {
self.update_latest_announcements()
async fn update_state(&mut self, ctx: &mut xtra::Context<Self>) -> Result<()> {
self.update_latest_announcements(ctx)
.await
.context("failed to update announcements")?;
self.update_pending_attestations()
@ -83,11 +106,15 @@ where
Ok(())
}
async fn update_latest_announcements(&mut self) -> Result<()> {
let new_announcements = next_ids()?
.into_iter()
.filter(|event_id| !self.latest_announcements.contains_key(event_id))
.map(|event_id| async move {
async fn update_latest_announcements(&mut self, ctx: &mut xtra::Context<Self>) -> Result<()> {
for event_id in next_ids()? {
if self.latest_announcements.contains_key(&event_id) {
continue;
}
let this = ctx.address().expect("self to be alive");
tokio::spawn(async move {
let url = event_id.to_olivia_url();
tracing::debug!("Fetching attestation for {}", event_id);
@ -104,13 +131,16 @@ where
.json::<Announcement>()
.await
.context("Failed to deserialize as Announcement")?;
Result::<_, anyhow::Error>::Ok((event_id, announcement))
})
.collect::<FuturesUnordered<_>>()
.try_collect::<HashMap<_, _>>()
.await?;
self.latest_announcements.extend(new_announcements); // FIXME: This results in linear memory growth.
this.send(NewAnnouncementFetched {
id: event_id,
announcement,
})
.await?;
Ok(())
});
}
Ok(())
}
@ -135,7 +165,17 @@ where
}
};
let attestation = res.json::<Attestation>().await?;
let attestation = match res
.json::<Attestation>()
.await
.with_context(|| format!("Failed to decode body for event {}", event_id))
{
Ok(attestation) => attestation,
Err(e) => {
tracing::debug!("{:#}", e);
continue;
}
};
self.cfd_actor_address
.clone()
@ -154,33 +194,6 @@ where
}
}
impl<CFD: 'static, M: 'static> xtra::Actor for Actor<CFD, M> {}
pub struct Sync;
impl xtra::Message for Sync {
type Result = ();
}
#[async_trait]
impl<CFD, M> xtra::Handler<Sync> for Actor<CFD, M>
where
CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
async fn handle(&mut self, _: Sync, _ctx: &mut xtra::Context<Self>) {
log_error!(self.update_state())
}
}
pub struct MonitorEvent {
pub event_id: OracleEventId,
}
impl xtra::Message for MonitorEvent {
type Result = ();
}
#[async_trait]
impl<CFD: 'static, M: 'static> xtra::Handler<MonitorEvent> for Actor<CFD, M> {
async fn handle(&mut self, msg: MonitorEvent, _ctx: &mut xtra::Context<Self>) {
@ -190,9 +203,6 @@ impl<CFD: 'static, M: 'static> xtra::Handler<MonitorEvent> for Actor<CFD, M> {
}
}
#[derive(Debug, Clone)]
pub struct GetAnnouncement(pub OracleEventId);
#[async_trait]
impl<CFD: 'static, M: 'static> xtra::Handler<GetAnnouncement> for Actor<CFD, M> {
async fn handle(
@ -204,6 +214,13 @@ impl<CFD: 'static, M: 'static> xtra::Handler<GetAnnouncement> for Actor<CFD, M>
}
}
#[async_trait]
impl<CFD: 'static, M: 'static> xtra::Handler<NewAnnouncementFetched> for Actor<CFD, M> {
async fn handle(&mut self, msg: NewAnnouncementFetched, _ctx: &mut xtra::Context<Self>) {
self.latest_announcements.insert(msg.id, msg.announcement);
}
}
/// Construct the URL of the next 24 `BitMEX/BXBT` hourly events
/// `olivia` will attest to.
fn next_ids() -> Result<Vec<OracleEventId>> {
@ -269,14 +286,24 @@ impl From<Announcement> for cfd_protocol::Announcement {
}
}
// TODO: Implement real deserialization once price attestation is
// implemented in `olivia`
#[derive(Debug, Clone, Deserialize, PartialEq)]
#[serde(try_from = "olivia_api::Response")]
pub struct Attestation {
pub id: OracleEventId,
pub price: u64,
pub scalars: Vec<SecretKey>,
impl<CFD: 'static, M: 'static> xtra::Actor for Actor<CFD, M> {}
#[async_trait]
impl<CFD, M> xtra::Handler<Sync> for Actor<CFD, M>
where
CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
async fn handle(&mut self, _: Sync, ctx: &mut xtra::Context<Self>) {
log_error!(self.update_state(ctx))
}
}
impl xtra::Message for Sync {
type Result = ();
}
impl xtra::Message for MonitorEvent {
type Result = ();
}
impl xtra::Message for GetAnnouncement {
@ -287,6 +314,10 @@ impl xtra::Message for Attestation {
type Result = ();
}
impl xtra::Message for NewAnnouncementFetched {
type Result = ();
}
mod olivia_api {
use crate::model::OracleEventId;
use anyhow::Context;
@ -325,7 +356,7 @@ mod olivia_api {
let data =
serde_json::from_str::<AnnouncementData>(&response.announcement.oracle_event.data)?;
let attestation = response.attestation.context("Missing attestation")?;
let attestation = response.attestation.context("attestation missing")?;
Ok(Self {
id: OracleEventId(data.id),

6
daemon/src/taker.rs

@ -274,8 +274,10 @@ async fn main() -> Result<()> {
cfds,
)));
// use `.send` here to ensure we only continue once the update was processed
oracle_actor_address.send(oracle::Sync).await.unwrap();
oracle_actor_address
.do_send_async(oracle::Sync)
.await
.unwrap();
Ok(rocket.manage(cfd_actor_inbox).manage(cfd_feed_receiver))
},

Loading…
Cancel
Save