From 971b9f9368c40f0da12de3d4e8f4ad1b7de60b84 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Fri, 8 Oct 2021 11:39:03 +1100 Subject: [PATCH] Replace `Announcements` with requesting specific `Announcement` Instead of the oracle actor sending out the `Announcements` we only store them internally and let the cfd actors chose what `Announcement` is relevant given the term of the offer. The specific `Announcement` is then fetched from the oracle actor. The monitoring logic for pending attestations remains the same. --- daemon/src/maker_cfd.rs | 54 ++++++++++--------------------------- daemon/src/oracle.rs | 60 ++++++++++++++++++++++++----------------- daemon/src/taker_cfd.rs | 49 +++++++-------------------------- 3 files changed, 59 insertions(+), 104 deletions(-) diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index d01eeaa..4f1405c 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -8,7 +8,7 @@ use crate::model::cfd::{ Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId, Origin, Role, RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal, UpdateCfdProposals, }; -use crate::model::{OracleEventId, TakerId, Usd}; +use crate::model::{TakerId, Usd}; use crate::monitor::MonitorParams; use crate::wallet::Wallet; use crate::{maker_inc_connections, monitor, oracle, setup_contract, wire}; @@ -17,7 +17,7 @@ use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; use futures::channel::mpsc; use futures::{future, SinkExt}; -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::time::SystemTime; use tokio::sync::watch; use xtra::prelude::*; @@ -88,7 +88,6 @@ pub struct Actor { monitor_actor: Address>, setup_state: SetupState, roll_over_state: RollOverState, - latest_announcements: Option>, oracle_actor: Address>>, // Maker needs to also store TakerId to be able to send a reply back current_pending_proposals: HashMap, @@ -135,7 +134,6 @@ impl Actor { monitor_actor, setup_state: SetupState::None, roll_over_state: RollOverState::None, - latest_announcements: None, oracle_actor, current_pending_proposals: HashMap::new(), } @@ -513,13 +511,11 @@ impl Actor { self.cfd_feed_actor_inbox .send(load_all_cfds(&mut conn).await?)?; - let offer_announcements = self - .latest_announcements - .clone() - .context("No oracle announcements available")?; - let offer_announcement = offer_announcements - .get(&cfd.order.oracle_event_id) - .context("Order's announcement not found in current oracle announcements")?; + let offer_announcement = self + .oracle_actor + .send(oracle::GetAnnouncement(cfd.order.oracle_event_id.clone())) + .await? + .with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?; self.oracle_actor .do_send_async(oracle::MonitorEvent { @@ -694,13 +690,13 @@ impl Actor { // TODO: do we want to store in the db that we rolled over? - let (oracle_event_id, announcement) = self - .latest_announcements - .clone() - .context("Cannot roll over because no announcement from oracle was found")? - .into_iter() - .next_back() - .context("Empty list of announcements")?; + let oracle_event_id = + oracle::next_announcement_after(time::OffsetDateTime::now_utc() + Order::TERM); + let announcement = self + .oracle_actor + .send(oracle::GetAnnouncement(oracle_event_id.clone())) + .await? + .with_context(|| format!("Announcement {} not found", oracle_event_id))?; self.takers .send(maker_inc_connections::TakerMessage { @@ -810,21 +806,6 @@ impl Actor { Ok(()) } - async fn handle_oracle_announcements( - &mut self, - announcements: oracle::Announcements, - ) -> Result<()> { - self.latest_announcements.replace( - announcements - .0 - .iter() - .map(|announcement| (announcement.id.clone(), announcement.clone())) - .collect(), - ); - - Ok(()) - } - async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { tracing::debug!( "Learnt latest oracle attestation for event: {}", @@ -1014,13 +995,6 @@ impl Handler for Actor { } } -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: oracle::Announcements, _ctx: &mut Context) { - log_error!(self.handle_oracle_announcements(msg)) - } -} - #[async_trait] impl Handler for Actor { async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context) { diff --git a/daemon/src/oracle.rs b/daemon/src/oracle.rs index 0a6afdb..5d55c24 100644 --- a/daemon/src/oracle.rs +++ b/daemon/src/oracle.rs @@ -11,8 +11,7 @@ use rocket::time::format_description::FormatItem; use rocket::time::macros::format_description; use rocket::time::{Duration, OffsetDateTime, Time}; use serde::{Deserialize, Serialize}; -use std::collections::HashSet; -use std::convert::TryFrom; +use std::collections::{HashMap, HashSet}; /// Where `olivia` is located. const OLIVIA_URL: &str = "https://h00.ooo"; @@ -22,10 +21,10 @@ const OLIVIA_EVENT_TIME_FORMAT: &[FormatItem] = pub struct Actor where - CFD: xtra::Handler + xtra::Handler, + CFD: xtra::Handler, M: xtra::Handler, { - latest_announcements: Option<[Announcement; 24]>, + latest_announcements: HashMap, pending_attestations: HashSet, cfd_actor_address: xtra::Address, monitor_actor_address: xtra::Address, @@ -33,7 +32,7 @@ where impl Actor where - CFD: xtra::Handler + xtra::Handler, + CFD: xtra::Handler, M: xtra::Handler, { pub fn new( @@ -69,7 +68,7 @@ where } Self { - latest_announcements: None, + latest_announcements: HashMap::new(), pending_attestations, cfd_actor_address, monitor_actor_address, @@ -103,21 +102,13 @@ where .json::() .await .context("Failed to deserialize as Announcement")?; - Result::<_, anyhow::Error>::Ok(announcement) + Result::<_, anyhow::Error>::Ok((OracleEventId(event_url), announcement)) }) .collect::>() - .try_collect::>() + .try_collect::>() .await?; - let new_announcements = <[Announcement; 24]>::try_from(new_announcements) - .map_err(|vec| anyhow::anyhow!("wrong number of announcements: {}", vec.len()))?; - - if self.latest_announcements.as_ref() != Some(&new_announcements) { - self.latest_announcements = Some(new_announcements.clone()); - self.cfd_actor_address - .do_send_async(Announcements(new_announcements)) - .await?; - } + self.latest_announcements = new_announcements; Ok(()) } @@ -165,11 +156,15 @@ where tracing::trace!("Event {} already being monitored", event_id); } } + + pub fn handle_get_announcement(&self, event_id: OracleEventId) -> Option { + self.latest_announcements.get(&event_id).cloned() + } } impl xtra::Actor for Actor where - CFD: xtra::Handler + xtra::Handler, + CFD: xtra::Handler, M: xtra::Handler, { } @@ -183,7 +178,7 @@ impl xtra::Message for Sync { #[async_trait] impl xtra::Handler for Actor where - CFD: xtra::Handler + xtra::Handler, + CFD: xtra::Handler, M: xtra::Handler, { async fn handle(&mut self, _: Sync, _ctx: &mut xtra::Context) { @@ -202,7 +197,7 @@ impl xtra::Message for MonitorEvent { #[async_trait] impl xtra::Handler for Actor where - CFD: xtra::Handler + xtra::Handler, + CFD: xtra::Handler, M: xtra::Handler, { async fn handle(&mut self, msg: MonitorEvent, _ctx: &mut xtra::Context) { @@ -210,6 +205,24 @@ where } } +#[derive(Debug, Clone)] +pub struct GetAnnouncement(pub OracleEventId); + +#[async_trait] +impl xtra::Handler for Actor +where + CFD: xtra::Handler, + M: xtra::Handler, +{ + async fn handle( + &mut self, + msg: GetAnnouncement, + _ctx: &mut xtra::Context, + ) -> Option { + self.handle_get_announcement(msg.0) + } +} + /// Construct the URL of the next 24 `BitMEX/BXBT` hourly events /// `olivia` will attest to. fn next_urls() -> Vec { @@ -265,9 +278,6 @@ impl From for cfd_protocol::Announcement { } } -#[derive(Debug, Clone)] -pub struct Announcements(pub [Announcement; 24]); - // TODO: Implement real deserialization once price attestation is // implemented in `olivia` #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] @@ -278,8 +288,8 @@ pub struct Attestation { pub scalars: Vec, } -impl xtra::Message for Announcements { - type Result = (); +impl xtra::Message for GetAnnouncement { + type Result = Option; } impl xtra::Message for Attestation { diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index c6a5231..9ffe227 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -17,7 +17,7 @@ use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; use futures::channel::mpsc; use futures::{future, SinkExt}; -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::time::SystemTime; use tokio::sync::watch; use xtra::prelude::*; @@ -80,7 +80,6 @@ pub struct Actor { monitor_actor: Address>, setup_state: SetupState, roll_over_state: RollOverState, - latest_announcements: Option>, oracle_actor: Address>>, current_pending_proposals: UpdateCfdProposals, } @@ -110,7 +109,6 @@ impl Actor { setup_state: SetupState::None, roll_over_state: RollOverState::None, oracle_actor, - latest_announcements: None, current_pending_proposals: HashMap::new(), } } @@ -271,13 +269,11 @@ impl Actor { .send(load_all_cfds(&mut conn).await?)?; let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - let offer_announcements = self - .latest_announcements - .clone() - .context("No oracle announcements available")?; - let offer_announcement = offer_announcements - .get(&cfd.order.oracle_event_id) - .context("Order's announcement not found in current oracle announcements")?; + let offer_announcement = self + .oracle_actor + .send(oracle::GetAnnouncement(cfd.order.oracle_event_id.clone())) + .await? + .with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?; self.oracle_actor .do_send_async(oracle::MonitorEvent { @@ -367,14 +363,11 @@ impl Actor { let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; let dlc = cfd.open_dlc().context("CFD was in wrong state")?; - // TODO: we need to get multiple announcements for the next 24h let announcement = self - .latest_announcements - .clone() - .context("Cannot roll over because no announcement from oracle was found")? - .get(&oracle_event_id) - .context("Empty list of announcements")? - .clone(); + .oracle_actor + .send(oracle::GetAnnouncement(oracle_event_id.clone())) + .await? + .with_context(|| format!("Announcement {} not found", oracle_event_id))?; self.oracle_actor .do_send_async(oracle::MonitorEvent { @@ -594,21 +587,6 @@ impl Actor { Ok(()) } - async fn handle_oracle_announcements( - &mut self, - announcements: oracle::Announcements, - ) -> Result<()> { - self.latest_announcements.replace( - announcements - .0 - .iter() - .map(|announcement| (announcement.id.clone(), announcement.clone())) - .collect(), - ); - - Ok(()) - } - async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { tracing::debug!( "Learnt latest oracle attestation for event: {}", @@ -748,13 +726,6 @@ impl Handler for Actor { } } -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: oracle::Announcements, _ctx: &mut Context) { - log_error!(self.handle_oracle_announcements(msg)) - } -} - #[async_trait] impl Handler for Actor { async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context) {