Browse Source

Only fetch announcements when needed

Instead of always keeping track of the next 24 hours of announcements,
we instruct the `oracle::Actor` to fetch (if necessary) announcements
based on new orders.

At the moment this results in removing code which allowed us to fetch
several announcements in parallel, but we will need to reintroduce it
once we let the `daemon`s create CFDs with multiple attestation
events.
refactor/no-log-handler
Lucas Soriano del Pino 3 years ago
parent
commit
5b9a9db6df
No known key found for this signature in database GPG Key ID: EE611E973A1530E7
  1. 6
      daemon/src/maker_cfd.rs
  2. 150
      daemon/src/oracle.rs
  3. 4
      daemon/src/setup_contract.rs
  4. 6
      daemon/src/taker_cfd.rs

6
daemon/src/maker_cfd.rs

@ -198,6 +198,10 @@ impl Actor {
let oracle_event_id =
oracle::next_announcement_after(time::OffsetDateTime::now_utc() + Order::TERM)?;
self.oracle_actor
.do_send_async(oracle::FetchAnnouncement(oracle_event_id.clone()))
.await?;
let order = Order::new(
price,
min_quantity,
@ -599,7 +603,7 @@ impl Actor {
})
}),
receiver,
(self.oracle_pk, offer_announcement.clone().into()),
(self.oracle_pk, offer_announcement),
cfd,
self.wallet.clone(),
Role::Maker,

150
daemon/src/oracle.rs

@ -17,7 +17,8 @@ const OLIVIA_EVENT_TIME_FORMAT: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]");
pub struct Actor<CFD, M> {
latest_announcements: HashMap<OracleEventId, Announcement>,
announcements: HashMap<OracleEventId, (OffsetDateTime, Vec<schnorrsig::PublicKey>)>,
pending_announcements: HashSet<OracleEventId>,
pending_attestations: HashSet<OracleEventId>,
cfd_actor_address: xtra::Address<CFD>,
monitor_actor_address: xtra::Address<M>,
@ -25,10 +26,23 @@ pub struct Actor<CFD, M> {
pub struct Sync;
/// Message used to tell the `oracle::Actor` to fetch an
/// `Announcement` from `olivia`.
///
/// The `Announcement` corresponds to the `OracleEventId` included in
/// the message.
#[derive(Debug, Clone)]
pub struct FetchAnnouncement(pub OracleEventId);
pub struct MonitorEvent {
pub event_id: OracleEventId,
}
/// Message used to request the `Announcement` from the
/// `oracle::Actor`'s local state.
///
/// The `Announcement` corresponds to the `OracleEventId` included in
/// the message.
#[derive(Debug, Clone)]
pub struct GetAnnouncement(pub OracleEventId);
@ -45,7 +59,8 @@ pub struct Attestation {
#[derive(Debug)]
struct NewAnnouncementFetched {
id: OracleEventId,
announcement: Announcement,
expected_outcome_time: OffsetDateTime,
nonce_pks: Vec<schnorrsig::PublicKey>,
}
impl<CFD, M> Actor<CFD, M> {
@ -82,7 +97,8 @@ impl<CFD, M> Actor<CFD, M> {
}
Self {
latest_announcements: HashMap::new(),
announcements: HashMap::new(),
pending_announcements: HashSet::new(),
pending_attestations,
cfd_actor_address,
monitor_actor_address,
@ -92,32 +108,17 @@ impl<CFD, M> Actor<CFD, M> {
impl<CFD, M> Actor<CFD, M>
where
CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
CFD: 'static,
M: 'static,
{
async fn update_state(&mut self, ctx: &mut xtra::Context<Self>) -> Result<()> {
self.update_latest_announcements(ctx)
.await
.context("failed to update announcements")?;
self.update_pending_attestations()
.await
.context("failed to update pending attestations")?;
Ok(())
}
async fn update_latest_announcements(&mut self, ctx: &mut xtra::Context<Self>) -> Result<()> {
for event_id in next_ids()? {
if self.latest_announcements.contains_key(&event_id) {
continue;
}
async fn update_pending_announcements(&mut self, ctx: &mut xtra::Context<Self>) -> Result<()> {
let this = ctx.address().expect("self to be alive");
for event_id in self.pending_announcements.iter().cloned() {
let this = this.clone();
tokio::spawn(async move {
let url = event_id.to_olivia_url();
tracing::debug!("Fetching attestation for {}", event_id);
tracing::debug!("Fetching announcement for {}", event_id);
let response = reqwest::get(url.clone())
.await
@ -134,7 +135,8 @@ where
this.send(NewAnnouncementFetched {
id: event_id,
announcement,
nonce_pks: announcement.nonce_pks,
expected_outcome_time: announcement.expected_outcome_time,
})
.await?;
@ -144,6 +146,23 @@ where
Ok(())
}
}
impl<CFD, M> Actor<CFD, M>
where
CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
async fn update_state(&mut self, ctx: &mut xtra::Context<Self>) -> Result<()> {
self.update_pending_announcements(ctx)
.await
.context("failed to update pending announcements")?;
self.update_pending_attestations()
.await
.context("failed to update pending attestations")?;
Ok(())
}
async fn update_pending_attestations(&mut self) -> Result<()> {
let pending_attestations = self.pending_attestations.clone();
@ -198,7 +217,16 @@ where
impl<CFD: 'static, M: 'static> xtra::Handler<MonitorEvent> for Actor<CFD, M> {
async fn handle(&mut self, msg: MonitorEvent, _ctx: &mut xtra::Context<Self>) {
if !self.pending_attestations.insert(msg.event_id.clone()) {
tracing::trace!("Event {} already being monitored", msg.event_id);
tracing::trace!("Attestation {} already being monitored", msg.event_id);
}
}
}
#[async_trait]
impl<CFD: 'static, M: 'static> xtra::Handler<FetchAnnouncement> for Actor<CFD, M> {
async fn handle(&mut self, msg: FetchAnnouncement, _ctx: &mut xtra::Context<Self>) {
if !self.pending_announcements.insert(msg.0.clone()) {
tracing::trace!("Announcement {} already being fetched", msg.0);
}
}
}
@ -210,33 +238,23 @@ impl<CFD: 'static, M: 'static> xtra::Handler<GetAnnouncement> for Actor<CFD, M>
msg: GetAnnouncement,
_ctx: &mut xtra::Context<Self>,
) -> Option<Announcement> {
self.latest_announcements.get(&msg.0).cloned()
self.announcements
.get_key_value(&msg.0)
.map(|(id, (time, nonce_pks))| Announcement {
id: id.clone(),
expected_outcome_time: *time,
nonce_pks: nonce_pks.clone(),
})
}
}
#[async_trait]
impl<CFD: 'static, M: 'static> xtra::Handler<NewAnnouncementFetched> for Actor<CFD, M> {
async fn handle(&mut self, msg: NewAnnouncementFetched, _ctx: &mut xtra::Context<Self>) {
self.latest_announcements.insert(msg.id, msg.announcement);
}
self.pending_announcements.remove(&msg.id);
self.announcements
.insert(msg.id, (msg.expected_outcome_time, msg.nonce_pks));
}
/// Construct the URL of the next 24 `BitMEX/BXBT` hourly events
/// `olivia` will attest to.
fn next_ids() -> Result<Vec<OracleEventId>> {
let ids = next_24_hours(OffsetDateTime::now_utc())?
.into_iter()
.map(event_id)
.collect();
Ok(ids)
}
fn next_24_hours(datetime: OffsetDateTime) -> Result<Vec<OffsetDateTime>> {
let adjusted = ceil_to_next_hour(datetime)?;
let timestamps = (0..=24).map(|i| adjusted + i.hours()).collect();
Ok(timestamps)
}
#[allow(dead_code)]
@ -306,6 +324,10 @@ impl xtra::Message for MonitorEvent {
type Result = ();
}
impl xtra::Message for FetchAnnouncement {
type Result = ();
}
impl xtra::Message for GetAnnouncement {
type Result = Option<Announcement>;
}
@ -613,40 +635,4 @@ mod tests {
assert_eq!(event_id.0, "/x/BitMEX/BXBT/2021-09-24T00:00:00.price?n=20");
}
#[test]
fn next_24() {
let datetime = datetime!(2021-09-23 10:43:12);
let next_24_hours = next_24_hours(datetime.assume_utc()).unwrap();
let expected = vec![
datetime!(2021-09-23 11:00:00).assume_utc(),
datetime!(2021-09-23 12:00:00).assume_utc(),
datetime!(2021-09-23 13:00:00).assume_utc(),
datetime!(2021-09-23 14:00:00).assume_utc(),
datetime!(2021-09-23 15:00:00).assume_utc(),
datetime!(2021-09-23 16:00:00).assume_utc(),
datetime!(2021-09-23 17:00:00).assume_utc(),
datetime!(2021-09-23 18:00:00).assume_utc(),
datetime!(2021-09-23 19:00:00).assume_utc(),
datetime!(2021-09-23 20:00:00).assume_utc(),
datetime!(2021-09-23 21:00:00).assume_utc(),
datetime!(2021-09-23 22:00:00).assume_utc(),
datetime!(2021-09-23 23:00:00).assume_utc(),
datetime!(2021-09-24 00:00:00).assume_utc(),
datetime!(2021-09-24 01:00:00).assume_utc(),
datetime!(2021-09-24 02:00:00).assume_utc(),
datetime!(2021-09-24 03:00:00).assume_utc(),
datetime!(2021-09-24 04:00:00).assume_utc(),
datetime!(2021-09-24 05:00:00).assume_utc(),
datetime!(2021-09-24 06:00:00).assume_utc(),
datetime!(2021-09-24 07:00:00).assume_utc(),
datetime!(2021-09-24 08:00:00).assume_utc(),
datetime!(2021-09-24 09:00:00).assume_utc(),
datetime!(2021-09-24 10:00:00).assume_utc(),
datetime!(2021-09-24 11:00:00).assume_utc(),
];
assert_eq!(next_24_hours, expected)
}
}

4
daemon/src/setup_contract.rs

@ -32,7 +32,7 @@ use std::ops::RangeInclusive;
pub async fn new(
mut sink: impl Sink<SetupMsg, Error = anyhow::Error> + Unpin,
mut stream: impl FusedStream<Item = SetupMsg> + Unpin,
(oracle_pk, announcement): (schnorrsig::PublicKey, Announcement),
(oracle_pk, announcement): (schnorrsig::PublicKey, oracle::Announcement),
cfd: Cfd,
wallet: Wallet,
role: Role,
@ -74,7 +74,7 @@ pub async fn new(
}
let payouts = HashMap::from_iter([(
announcement.clone(),
announcement.into(),
payout_curve::calculate(cfd.order.price, cfd.quantity_usd, cfd.order.leverage)?,
)]);

6
daemon/src/taker_cfd.rs

@ -242,6 +242,10 @@ impl Actor {
Some(mut order) => {
order.origin = Origin::Theirs;
self.oracle_actor
.do_send_async(oracle::FetchAnnouncement(order.oracle_event_id.clone()))
.await?;
let mut conn = self.db.acquire().await?;
insert_order(&order, &mut conn).await?;
self.order_feed_actor_inbox.send(Some(order))?;
@ -300,7 +304,7 @@ impl Actor {
.into_sink()
.with(|msg| future::ok(wire::TakerToMaker::Protocol(msg))),
receiver,
(self.oracle_pk, offer_announcement.clone().into()),
(self.oracle_pk, offer_announcement),
cfd,
self.wallet.clone(),
Role::Taker,

Loading…
Cancel
Save