diff --git a/daemon/src/db.rs b/daemon/src/db.rs index b45d75f..cf96ac8 100644 --- a/daemon/src/db.rs +++ b/daemon/src/db.rs @@ -1,5 +1,5 @@ use crate::model::cfd::{Cfd, CfdState, Order, OrderId, Origin}; -use crate::model::{Leverage, OracleEventId, Position}; +use crate::model::{BitMexPriceEventId, Leverage, Position}; use anyhow::{Context, Result}; use rocket_db_pools::sqlx; use sqlx::pool::PoolConnection; @@ -24,7 +24,7 @@ pub async fn insert_order(order: &Order, conn: &mut PoolConnection) -> a let creation_timestamp = serde_json::to_string(&order.creation_timestamp).unwrap(); let term = serde_json::to_string(&order.term).unwrap(); let origin = serde_json::to_string(&order.origin).unwrap(); - let oracle_event_id = order.oracle_event_id.0.clone(); + let oracle_event_id = order.oracle_event_id.to_string(); sqlx::query!( r#" @@ -88,7 +88,6 @@ pub async fn load_order_by_id( let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap(); let term = serde_json::from_str(row.term.as_str()).unwrap(); let origin = serde_json::from_str(row.origin.as_str()).unwrap(); - let oracle_event_id = OracleEventId(row.oracle_event_id); Ok(Order { id: uuid, @@ -102,7 +101,7 @@ pub async fn load_order_by_id( creation_timestamp, term, origin, - oracle_event_id, + oracle_event_id: row.oracle_event_id.parse().unwrap(), }) } @@ -299,7 +298,7 @@ pub async fn load_cfd_by_order_id( let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap(); let term = serde_json::from_str(row.term.as_str()).unwrap(); let origin: Origin = serde_json::from_str(row.origin.as_str()).unwrap(); - let oracle_event_id = OracleEventId(row.oracle_event_id.clone()); + let oracle_event_id = row.oracle_event_id.parse().unwrap(); let quantity = serde_json::from_str(row.quantity_usd.as_str()).unwrap(); let latest_state = serde_json::from_str(row.state.as_str()).unwrap(); @@ -377,7 +376,7 @@ pub async fn load_all_cfds(conn: &mut PoolConnection) -> anyhow::Result< let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap(); let term = serde_json::from_str(row.term.as_str()).unwrap(); let origin: Origin = serde_json::from_str(row.origin.as_str()).unwrap(); - let oracle_event_id = OracleEventId(row.oracle_event_id.clone()); + let oracle_event_id = row.oracle_event_id.clone().parse().unwrap(); let quantity = serde_json::from_str(row.quantity_usd.as_str()).unwrap(); let latest_state = serde_json::from_str(row.state.as_str()).unwrap(); @@ -410,9 +409,11 @@ pub async fn load_all_cfds(conn: &mut PoolConnection) -> anyhow::Result< /// Loads all CFDs with the latest state as the CFD state pub async fn load_cfds_by_oracle_event_id( - oracle_event_id: OracleEventId, + oracle_event_id: BitMexPriceEventId, conn: &mut PoolConnection, ) -> anyhow::Result> { + let oracle_event_id = oracle_event_id.to_string(); + let rows = sqlx::query!( r#" select @@ -443,7 +444,7 @@ pub async fn load_cfds_by_oracle_event_id( ) and orders.oracle_event_id = ? "#, - oracle_event_id.0 + oracle_event_id ) .fetch_all(conn) .await?; @@ -462,7 +463,7 @@ pub async fn load_cfds_by_oracle_event_id( let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap(); let term = serde_json::from_str(row.term.as_str()).unwrap(); let origin: Origin = serde_json::from_str(row.origin.as_str()).unwrap(); - let oracle_event_id = OracleEventId(row.oracle_event_id.clone()); + let oracle_event_id = row.oracle_event_id.parse().unwrap(); let quantity = serde_json::from_str(row.quantity_usd.as_str()).unwrap(); let latest_state = serde_json::from_str(row.state.as_str()).unwrap(); @@ -501,6 +502,8 @@ mod tests { use rust_decimal_macros::dec; use sqlx::SqlitePool; use tempfile::tempdir; + use time::macros::datetime; + use time::OffsetDateTime; use crate::db::insert_order; use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Order}; @@ -580,22 +583,24 @@ mod tests { let pool = setup_test_db().await; let mut conn = pool.acquire().await.unwrap(); - let oracle_event_id_1 = OracleEventId("dummy_1".to_string()); - let oracle_event_id_2 = OracleEventId("dummy_2".to_string()); + let oracle_event_id_1 = + BitMexPriceEventId::with_20_digits(datetime!(2021-10-13 10:00:00).assume_utc()); + let oracle_event_id_2 = + BitMexPriceEventId::with_20_digits(datetime!(2021-10-25 18:00:00).assume_utc()); - let cfd_1 = Cfd::default() - .with_order(Order::default().with_oracle_event_id(oracle_event_id_1.clone())); + let cfd_1 = + Cfd::default().with_order(Order::default().with_oracle_event_id(oracle_event_id_1)); insert_order(&cfd_1.order, &mut conn).await.unwrap(); insert_cfd(cfd_1.clone(), &mut conn).await.unwrap(); - let cfd_from_db = load_cfds_by_oracle_event_id(oracle_event_id_1.clone(), &mut conn) + let cfd_from_db = load_cfds_by_oracle_event_id(oracle_event_id_1, &mut conn) .await .unwrap(); assert_eq!(vec![cfd_1.clone()], cfd_from_db); - let cfd_2 = Cfd::default() - .with_order(Order::default().with_oracle_event_id(oracle_event_id_1.clone())); + let cfd_2 = + Cfd::default().with_order(Order::default().with_oracle_event_id(oracle_event_id_1)); insert_order(&cfd_2.order, &mut conn).await.unwrap(); insert_cfd(cfd_2.clone(), &mut conn).await.unwrap(); @@ -605,8 +610,8 @@ mod tests { .unwrap(); assert_eq!(vec![cfd_1, cfd_2], cfd_from_db); - let cfd_3 = Cfd::default() - .with_order(Order::default().with_oracle_event_id(oracle_event_id_2.clone())); + let cfd_3 = + Cfd::default().with_order(Order::default().with_oracle_event_id(oracle_event_id_2)); insert_order(&cfd_3.order, &mut conn).await.unwrap(); insert_cfd(cfd_3.clone(), &mut conn).await.unwrap(); @@ -647,8 +652,6 @@ mod tests { // file has to exist in order to connect with sqlite File::create(temp_db.clone()).unwrap(); - dbg!(&temp_db); - let pool = SqlitePool::connect(format!("sqlite:{}", temp_db.display()).as_str()) .await .unwrap(); @@ -686,14 +689,14 @@ mod tests { Usd(dec!(100)), Usd(dec!(1000)), Origin::Theirs, - OracleEventId("Dummy".to_string()), + BitMexPriceEventId::with_20_digits(OffsetDateTime::now_utc()), ) .unwrap() } } impl Order { - pub fn with_oracle_event_id(mut self, oracle_event_id: OracleEventId) -> Self { + pub fn with_oracle_event_id(mut self, oracle_event_id: BitMexPriceEventId) -> Self { self.oracle_event_id = oracle_event_id; self } diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index ed76532..7a18042 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -33,6 +33,7 @@ mod maker_cfd; mod maker_inc_connections; mod model; mod monitor; +mod olivia; mod oracle; mod payout_curve; mod routes; @@ -41,6 +42,7 @@ mod seed; mod send_to_socket; mod setup_contract; mod to_sse_event; +mod tokio_ext; mod wallet; mod wallet_sync; mod wire; @@ -151,7 +153,7 @@ async fn main() -> Result<()> { ext_priv_key, ) .await?; - let wallet_info = wallet.sync().await.unwrap(); + let wallet_info = wallet.sync().await?; let auth_password = seed.derive_auth_password::(); @@ -176,7 +178,12 @@ async fn main() -> Result<()> { .merge(("address", opts.http_address.ip())) .merge(("port", opts.http_address.port())); - let listener = tokio::net::TcpListener::bind(&format!("0.0.0.0:{}", opts.p2p_port)).await?; + let p2p_socket = format!("0.0.0.0:{}", opts.p2p_port) + .parse::() + .unwrap(); + let listener = tokio::net::TcpListener::bind(p2p_socket) + .await + .with_context(|| format!("Failed to listen on {}", p2p_socket))?; let local_addr = listener.local_addr().unwrap(); tracing::info!("Listening on {}", local_addr); diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 426b1e2..33de35b 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -200,7 +200,7 @@ impl Actor { oracle::next_announcement_after(time::OffsetDateTime::now_utc() + Order::TERM)?; self.oracle_actor - .do_send_async(oracle::FetchAnnouncement(oracle_event_id.clone())) + .do_send_async(oracle::FetchAnnouncement(oracle_event_id)) .await?; let order = Order::new( @@ -586,13 +586,13 @@ impl Actor { let offer_announcement = self .oracle_actor - .send(oracle::GetAnnouncement(cfd.order.oracle_event_id.clone())) + .send(oracle::GetAnnouncement(cfd.order.oracle_event_id)) .await? .with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?; self.oracle_actor .do_send_async(oracle::MonitorAttestation { - event_id: offer_announcement.id.clone(), + event_id: offer_announcement.id, }) .await?; @@ -767,7 +767,7 @@ impl Actor { oracle::next_announcement_after(time::OffsetDateTime::now_utc() + Order::TERM)?; let announcement = self .oracle_actor - .send(oracle::GetAnnouncement(oracle_event_id.clone())) + .send(oracle::GetAnnouncement(oracle_event_id)) .await? .with_context(|| format!("Announcement {} not found", oracle_event_id))?; @@ -783,7 +783,7 @@ impl Actor { self.oracle_actor .do_send_async(oracle::MonitorAttestation { - event_id: announcement.id.clone(), + event_id: announcement.id, }) .await?; @@ -892,12 +892,12 @@ impl Actor { ); let mut conn = self.db.acquire().await?; - let cfds = load_cfds_by_oracle_event_id(attestation.id.clone(), &mut conn).await?; + let cfds = load_cfds_by_oracle_event_id(attestation.id, &mut conn).await?; for mut cfd in cfds { if cfd .handle(CfdStateChangeEvent::OracleAttestation(Attestation::new( - attestation.id.clone(), + attestation.id, attestation.price, attestation.scalars.clone(), cfd.dlc() diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index 83b575b..9233b85 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -1,6 +1,6 @@ use crate::actors::log_error; use crate::model::cfd::{Order, OrderId}; -use crate::model::{OracleEventId, TakerId}; +use crate::model::{BitMexPriceEventId, TakerId}; use crate::{maker_cfd, send_to_socket, wire}; use anyhow::{Context as AnyhowContext, Result}; use async_trait::async_trait; @@ -38,7 +38,7 @@ pub enum TakerCommand { }, NotifyRollOverAccepted { id: OrderId, - oracle_event_id: OracleEventId, + oracle_event_id: BitMexPriceEventId, }, NotifyRollOverRejected { id: OrderId, diff --git a/daemon/src/model.rs b/daemon/src/model.rs index 3a7ddbf..cec96b5 100644 --- a/daemon/src/model.rs +++ b/daemon/src/model.rs @@ -1,15 +1,14 @@ -use std::fmt::{Display, Formatter}; - +use crate::olivia; use anyhow::{Context, Result}; +use bdk::bitcoin::{Address, Amount}; +use reqwest::Url; use rust_decimal::prelude::ToPrimitive; use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; - -use bdk::bitcoin::{Address, Amount}; -use reqwest::Url; -use std::fmt; -use std::str::FromStr; +use serde_with::{DeserializeFromStr, SerializeDisplay}; use std::time::SystemTime; +use std::{fmt, str}; +use time::{OffsetDateTime, PrimitiveDateTime, Time}; use uuid::Uuid; pub mod cfd; @@ -46,8 +45,8 @@ impl Usd { } } -impl Display for Usd { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { +impl fmt::Display for Usd { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.0.fmt(f) } } @@ -61,8 +60,8 @@ impl From for Usd { #[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq)] pub struct Percent(pub Decimal); -impl Display for Percent { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { +impl fmt::Display for Percent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.0.round_dp(2).fmt(f) } } @@ -96,8 +95,8 @@ impl Default for TakerId { } } -impl Display for TakerId { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { +impl fmt::Display for TakerId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.0.fmt(f) } } @@ -109,36 +108,123 @@ pub struct WalletInfo { pub last_updated_at: SystemTime, } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub struct OracleEventId(pub String); +#[derive( + Debug, Clone, Copy, SerializeDisplay, DeserializeFromStr, PartialEq, Eq, Hash, PartialOrd, Ord, +)] +pub struct BitMexPriceEventId { + /// The timestamp this price event refers to. + timestamp: OffsetDateTime, + digits: usize, +} + +impl BitMexPriceEventId { + pub fn new(timestamp: OffsetDateTime, digits: usize) -> Self { + let (hours, minutes, seconds) = timestamp.time().as_hms(); + let time_without_nanos = + Time::from_hms(hours, minutes, seconds).expect("original timestamp was valid"); + + let timestamp_without_nanos = timestamp.replace_time(time_without_nanos); -impl OracleEventId { - pub fn to_olivia_url(&self) -> Url { - Url::from_str("https://h00.ooo") + Self { + timestamp: timestamp_without_nanos, + digits, + } + } + + pub fn with_20_digits(timestamp: OffsetDateTime) -> Self { + Self::new(timestamp, 20) + } + + /// Checks whether this event has likely already occurred. + /// + /// We can't be sure about it because our local clock might be off from the oracle's clock. + pub fn has_likely_occured(&self) -> bool { + let now = OffsetDateTime::now_utc(); + + now > self.timestamp + } + + pub fn to_olivia_url(self) -> Url { + "https://h00.ooo" + .parse::() .expect("valid URL from constant") - .join(self.0.as_str()) + .join(&self.to_string()) .expect("Event id can be joined") } } -impl Display for OracleEventId { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - self.0.fmt(f) +impl fmt::Display for BitMexPriceEventId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "/x/BitMEX/BXBT/{}.price?n={}", + self.timestamp + .format(&olivia::EVENT_TIME_FORMAT) + .expect("should always format and we can't return an error here"), + self.digits + ) + } +} + +impl str::FromStr for BitMexPriceEventId { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let remaining = s.trim_start_matches("/x/BitMEX/BXBT/"); + let (timestamp, rest) = remaining.split_at(19); + let digits = rest.trim_start_matches(".price?n="); + + Ok(Self { + timestamp: PrimitiveDateTime::parse(timestamp, &olivia::EVENT_TIME_FORMAT) + .with_context(|| format!("Failed to parse {} as timestamp", timestamp))? + .assume_utc(), + digits: digits.parse()?, + }) } } #[cfg(test)] mod tests { + use time::macros::datetime; + use super::*; #[test] fn to_olivia_url() { - let url = OracleEventId("/x/BitMEX/BXBT/2021-09-23T10:00:00.price?n=20".to_string()) + let url = BitMexPriceEventId::with_20_digits(datetime!(2021-09-23 10:00:00).assume_utc()) .to_olivia_url(); assert_eq!( url, - Url::from_str("https://h00.ooo/x/BitMEX/BXBT/2021-09-23T10:00:00.price?n=20").unwrap() + "https://h00.ooo/x/BitMEX/BXBT/2021-09-23T10:00:00.price?n=20" + .parse() + .unwrap() ); } + + #[test] + fn parse_event_id() { + let parsed = "/x/BitMEX/BXBT/2021-09-23T10:00:00.price?n=20" + .parse::() + .unwrap(); + let expected = + BitMexPriceEventId::with_20_digits(datetime!(2021-09-23 10:00:00).assume_utc()); + + assert_eq!(parsed, expected); + } + + #[test] + fn new_event_has_no_nanos() { + let now = BitMexPriceEventId::with_20_digits(OffsetDateTime::now_utc()); + + assert_eq!(now.timestamp.nanosecond(), 0); + } + + #[test] + fn has_occured_if_in_the_past() { + let past_event = + BitMexPriceEventId::with_20_digits(datetime!(2021-09-23 10:00:00).assume_utc()); + + assert!(past_event.has_likely_occured()); + } } diff --git a/daemon/src/model/cfd.rs b/daemon/src/model/cfd.rs index 681359c..5772d4b 100644 --- a/daemon/src/model/cfd.rs +++ b/daemon/src/model/cfd.rs @@ -1,4 +1,4 @@ -use crate::model::{Leverage, OracleEventId, Percent, Position, TakerId, TradingPair, Usd}; +use crate::model::{BitMexPriceEventId, Leverage, Percent, Position, TakerId, TradingPair, Usd}; use crate::{monitor, oracle}; use anyhow::{bail, Context, Result}; use bdk::bitcoin::secp256k1::{SecretKey, Signature}; @@ -97,7 +97,7 @@ pub struct Order { /// The id of the event to be used for price attestation /// /// The maker includes this into the Order based on the Oracle announcement to be used. - pub oracle_event_id: OracleEventId, + pub oracle_event_id: BitMexPriceEventId, } #[allow(dead_code)] // Only one binary and the tests use this. @@ -109,7 +109,7 @@ impl Order { min_quantity: Usd, max_quantity: Usd, origin: Origin, - oracle_event_id: OracleEventId, + oracle_event_id: BitMexPriceEventId, ) -> Result { let leverage = Leverage(2); let maintenance_margin_rate = dec!(0.005); @@ -286,7 +286,7 @@ pub enum CfdState { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct Attestation { - pub id: OracleEventId, + pub id: BitMexPriceEventId, pub scalars: Vec, #[serde(with = "::bdk::bitcoin::util::amount::serde::as_sat")] payout: Amount, @@ -296,7 +296,7 @@ pub struct Attestation { impl Attestation { pub fn new( - id: OracleEventId, + id: BitMexPriceEventId, price: u64, scalars: Vec, dlc: Dlc, @@ -1483,7 +1483,7 @@ pub struct Dlc { /// The fully signed lock transaction ready to be published on chain pub lock: (Transaction, Descriptor), pub commit: (Transaction, EcdsaAdaptorSignature, Descriptor), - pub cets: HashMap>, + pub cets: HashMap>, pub refund: (Transaction, Signature), #[serde(with = "::bdk::bitcoin::util::amount::serde::as_sat")] diff --git a/daemon/src/monitor.rs b/daemon/src/monitor.rs index 51222b0..5cecc65 100644 --- a/daemon/src/monitor.rs +++ b/daemon/src/monitor.rs @@ -1,6 +1,6 @@ use crate::actors::log_error; use crate::model::cfd::{CetStatus, Cfd, CfdState, Dlc, OrderId}; -use crate::model::OracleEventId; +use crate::model::BitMexPriceEventId; use crate::oracle::Attestation; use crate::{model, oracle}; use anyhow::{Context, Result}; @@ -27,7 +27,7 @@ pub struct StartMonitoring { pub struct MonitorParams { lock: (Txid, Descriptor), commit: (Txid, Descriptor), - cets: HashMap>, + cets: HashMap>, refund: (Txid, Script, u32), revoked_commits: Vec<(Txid, Script)>, } @@ -209,7 +209,7 @@ where fn monitor_cet_finality( &mut self, - cets: HashMap>, + cets: HashMap>, attestation: Attestation, order_id: OrderId, ) -> Result<()> { @@ -581,8 +581,8 @@ impl From for Cet { } fn map_cets( - cets: HashMap>, -) -> HashMap> { + cets: HashMap>, +) -> HashMap> { cets.into_iter() .map(|(event_id, cets)| { ( diff --git a/daemon/src/olivia.rs b/daemon/src/olivia.rs new file mode 100644 index 0000000..e948ed3 --- /dev/null +++ b/daemon/src/olivia.rs @@ -0,0 +1,5 @@ +use time::format_description::FormatItem; +use time::macros::format_description; + +pub const EVENT_TIME_FORMAT: &[FormatItem] = + format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]"); diff --git a/daemon/src/oracle.rs b/daemon/src/oracle.rs index d311df7..d96f6f0 100644 --- a/daemon/src/oracle.rs +++ b/daemon/src/oracle.rs @@ -1,25 +1,20 @@ use crate::actors::log_error; use crate::model::cfd::{Cfd, CfdState}; -use crate::model::OracleEventId; +use crate::model::BitMexPriceEventId; +use crate::tokio_ext; use anyhow::{Context, Result}; use async_trait::async_trait; use cfd_protocol::secp256k1_zkp::{schnorrsig, SecretKey}; -use reqwest::StatusCode; -use rocket::time::format_description::FormatItem; -use rocket::time::macros::format_description; use rocket::time::{OffsetDateTime, Time}; use serde::Deserialize; use std::collections::{HashMap, HashSet}; use std::ops::Add; use time::ext::NumericalDuration; -const OLIVIA_EVENT_TIME_FORMAT: &[FormatItem] = - format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]"); - pub struct Actor { - announcements: HashMap)>, - pending_announcements: HashSet, - pending_attestations: HashSet, + announcements: HashMap)>, + pending_announcements: HashSet, + pending_attestations: HashSet, cfd_actor_address: xtra::Address, monitor_actor_address: xtra::Address, } @@ -32,25 +27,25 @@ pub struct Sync; /// The `Announcement` corresponds to the `OracleEventId` included in /// the message. #[derive(Debug, Clone)] -pub struct FetchAnnouncement(pub OracleEventId); +pub struct FetchAnnouncement(pub BitMexPriceEventId); pub struct MonitorAttestation { - pub event_id: OracleEventId, + pub event_id: BitMexPriceEventId, } /// Message used to request the `Announcement` from the /// `oracle::Actor`'s local state. /// -/// The `Announcement` corresponds to the `OracleEventId` included in +/// The `Announcement` corresponds to the [`BitMexPriceEventId`] included in /// the message. #[derive(Debug, Clone)] -pub struct GetAnnouncement(pub OracleEventId); +pub struct GetAnnouncement(pub BitMexPriceEventId); // TODO: Split xtra::Message and API object #[derive(Debug, Clone, Deserialize, PartialEq)] #[serde(try_from = "olivia_api::Response")] pub struct Attestation { - pub id: OracleEventId, + pub id: BitMexPriceEventId, pub price: u64, pub scalars: Vec, } @@ -58,11 +53,18 @@ pub struct Attestation { /// A module-private message to allow parallelization of fetching announcements. #[derive(Debug)] struct NewAnnouncementFetched { - id: OracleEventId, + id: BitMexPriceEventId, expected_outcome_time: OffsetDateTime, nonce_pks: Vec, } +/// A module-private message to allow parallelization of fetching attestations. +#[derive(Debug)] +struct NewAttestationFetched { + id: BitMexPriceEventId, + attestation: Attestation, +} + impl Actor { pub fn new( cfd_actor_address: xtra::Address, @@ -111,11 +113,11 @@ where CFD: 'static, M: 'static, { - async fn update_pending_announcements(&mut self, ctx: &mut xtra::Context) -> Result<()> { - let this = ctx.address().expect("self to be alive"); + fn update_pending_announcements(&mut self, ctx: &mut xtra::Context) { for event_id in self.pending_announcements.iter().cloned() { - let this = this.clone(); - tokio::spawn(async move { + let this = ctx.address().expect("self to be alive"); + + tokio_ext::spawn_fallible(async move { let url = event_id.to_olivia_url(); tracing::debug!("Fetching announcement for {}", event_id); @@ -143,8 +145,6 @@ where Ok(()) }); } - - Ok(()) } } @@ -153,61 +153,65 @@ where CFD: xtra::Handler, M: xtra::Handler, { - async fn update_state(&mut self, ctx: &mut xtra::Context) -> Result<()> { - self.update_pending_announcements(ctx) - .await - .context("failed to update pending announcements")?; - self.update_pending_attestations() - .await - .context("failed to update pending attestations")?; + fn update_pending_attestations(&mut self, ctx: &mut xtra::Context) { + for event_id in self.pending_attestations.iter().copied() { + if !event_id.has_likely_occured() { + tracing::trace!( + "Skipping {} because it likely hasn't occurred yet", + event_id + ); + + continue; + } - Ok(()) - } + let this = ctx.address().expect("self to be alive"); + + tokio_ext::spawn_fallible(async move { + let url = event_id.to_olivia_url(); - async fn update_pending_attestations(&mut self) -> Result<()> { - let pending_attestations = self.pending_attestations.clone(); - for event_id in pending_attestations.into_iter() { - { - let res = match reqwest::get(event_id.to_olivia_url()).await { - Ok(res) if res.status().is_success() => res, - Ok(res) if res.status() == StatusCode::NOT_FOUND => { - tracing::trace!("Attestation not ready yet"); - continue; - } - Ok(res) => { - tracing::warn!("Unexpected response, status {}", res.status()); - continue; - } - Err(e) => { - tracing::warn!(%event_id, "Failed to fetch attestation: {}", e); - continue; - } - }; - - let attestation = match res + tracing::debug!("Fetching attestation for {}", event_id); + + let response = reqwest::get(url.clone()) + .await + .with_context(|| format!("Failed to GET {}", url))?; + + if !response.status().is_success() { + anyhow::bail!("GET {} responded with {}", url, response.status()); + } + + let attestation = response .json::() .await - .with_context(|| format!("Failed to decode body for event {}", event_id)) - { - Ok(attestation) => attestation, - Err(e) => { - tracing::debug!("{:#}", e); - continue; - } - }; - - self.cfd_actor_address - .clone() - .do_send_async(attestation.clone()) - .await?; - self.monitor_actor_address - .clone() - .do_send_async(attestation) - .await?; - - self.pending_attestations.remove(&event_id); - } + .context("Failed to deserialize as Attestation")?; + + this.send(NewAttestationFetched { + id: event_id, + attestation, + }) + .await?; + + Ok(()) + }); } + } + + async fn handle_new_attestation_fetched( + &mut self, + id: BitMexPriceEventId, + attestation: Attestation, + ) -> Result<()> { + tracing::info!("Fetched new attestation for {}", id); + + self.cfd_actor_address + .clone() + .do_send_async(attestation.clone()) + .await?; + self.monitor_actor_address + .clone() + .do_send_async(attestation) + .await?; + + self.pending_attestations.remove(&id); Ok(()) } @@ -216,7 +220,7 @@ where #[async_trait] impl xtra::Handler for Actor { async fn handle(&mut self, msg: MonitorAttestation, _ctx: &mut xtra::Context) { - if !self.pending_attestations.insert(msg.event_id.clone()) { + if !self.pending_attestations.insert(msg.event_id) { tracing::trace!("Attestation {} already being monitored", msg.event_id); } } @@ -225,7 +229,7 @@ impl xtra::Handler for Actor xtra::Handler for Actor { async fn handle(&mut self, msg: FetchAnnouncement, _ctx: &mut xtra::Context) { - if !self.pending_announcements.insert(msg.0.clone()) { + if !self.pending_announcements.insert(msg.0) { tracing::trace!("Announcement {} already being fetched", msg.0); } } @@ -241,7 +245,7 @@ impl xtra::Handler for Actor self.announcements .get_key_value(&msg.0) .map(|(id, (time, nonce_pks))| Announcement { - id: id.clone(), + id: *id, expected_outcome_time: *time, nonce_pks: nonce_pks.clone(), }) @@ -257,11 +261,22 @@ impl xtra::Handler for Actor xtra::Handler for Actor +where + CFD: xtra::Handler, + M: xtra::Handler, +{ + async fn handle(&mut self, msg: NewAttestationFetched, _ctx: &mut xtra::Context) { + log_error!(self.handle_new_attestation_fetched(msg.id, msg.attestation)); + } +} + #[allow(dead_code)] -pub fn next_announcement_after(timestamp: OffsetDateTime) -> Result { +pub fn next_announcement_after(timestamp: OffsetDateTime) -> Result { let adjusted = ceil_to_next_hour(timestamp)?; - Ok(event_id(adjusted)) + Ok(BitMexPriceEventId::with_20_digits(adjusted)) } fn ceil_to_next_hour(original: OffsetDateTime) -> Result { @@ -273,16 +288,6 @@ fn ceil_to_next_hour(original: OffsetDateTime) -> Result OracleEventId { - let datetime = datetime - .format(&OLIVIA_EVENT_TIME_FORMAT) - .expect("valid formatter for datetime"); - - OracleEventId(format!("/x/BitMEX/BXBT/{}.price?n=20", datetime)) -} - #[derive(Debug, Clone, serde::Deserialize, PartialEq)] #[serde(try_from = "olivia_api::Response")] pub struct Announcement { @@ -290,7 +295,7 @@ pub struct Announcement { /// /// Doubles up as the path of the URL for this event i.e. /// https://h00.ooo/{id}. - pub id: OracleEventId, + pub id: BitMexPriceEventId, pub expected_outcome_time: OffsetDateTime, pub nonce_pks: Vec, } @@ -298,7 +303,7 @@ pub struct Announcement { impl From for cfd_protocol::Announcement { fn from(announcement: Announcement) -> Self { cfd_protocol::Announcement { - id: announcement.id.0, + id: announcement.id.to_string(), nonce_pks: announcement.nonce_pks, } } @@ -313,7 +318,8 @@ where M: xtra::Handler, { async fn handle(&mut self, _: Sync, ctx: &mut xtra::Context) { - log_error!(self.update_state(ctx)) + self.update_pending_announcements(ctx); + self.update_pending_attestations(ctx); } } @@ -340,8 +346,12 @@ impl xtra::Message for NewAnnouncementFetched { type Result = (); } +impl xtra::Message for NewAttestationFetched { + type Result = (); +} + mod olivia_api { - use crate::model::OracleEventId; + use crate::model::BitMexPriceEventId; use anyhow::Context; use cfd_protocol::secp256k1_zkp::{schnorrsig, SecretKey}; use std::convert::TryFrom; @@ -363,7 +373,7 @@ mod olivia_api { serde_json::from_str::(&response.announcement.oracle_event.data)?; Ok(Self { - id: OracleEventId(data.id), + id: data.id, expected_outcome_time: data.expected_outcome_time, nonce_pks: data.schemes.olivia_v1.nonces, }) @@ -381,7 +391,7 @@ mod olivia_api { let attestation = response.attestation.context("attestation missing")?; Ok(Self { - id: OracleEventId(data.id), + id: data.id, price: attestation.outcome.parse()?, scalars: attestation.schemes.olivia_v1.scalars, }) @@ -402,7 +412,7 @@ mod olivia_api { #[derive(Debug, Clone, serde::Deserialize)] #[serde(rename_all = "kebab-case")] struct AnnouncementData { - id: String, + id: BitMexPriceEventId, #[serde(with = "timestamp")] expected_outcome_time: OffsetDateTime, schemes: Schemes, @@ -431,7 +441,7 @@ mod olivia_api { } mod timestamp { - use crate::oracle::OLIVIA_EVENT_TIME_FORMAT; + use crate::olivia; use serde::de::Error as _; use serde::{Deserialize, Deserializer}; use time::{OffsetDateTime, PrimitiveDateTime}; @@ -441,7 +451,7 @@ mod olivia_api { D: Deserializer<'a>, { let string = String::deserialize(deserializer)?; - let date_time = PrimitiveDateTime::parse(&string, &OLIVIA_EVENT_TIME_FORMAT) + let date_time = PrimitiveDateTime::parse(&string, &olivia::EVENT_TIME_FORMAT) .map_err(D::Error::custom)?; Ok(date_time.assume_utc()) @@ -452,7 +462,7 @@ mod olivia_api { mod tests { use std::vec; - use crate::model::OracleEventId; + use crate::model::BitMexPriceEventId; use crate::oracle; use time::macros::datetime; @@ -462,7 +472,7 @@ mod olivia_api { let deserialized = serde_json::from_str::(json).unwrap(); let expected = oracle::Announcement { - id: OracleEventId("/x/BitMEX/BXBT/2021-10-04T22:00:00.price?n=20".to_string()), + id: BitMexPriceEventId::with_20_digits(datetime!(2021-10-04 22:00:00).assume_utc()), expected_outcome_time: datetime!(2021-10-04 22:00:00).assume_utc(), nonce_pks: vec![ "8d72028eeaf4b85aec0f750f05a4a320cac193f5d8494bfe05cd4b29f3df4239" @@ -537,7 +547,7 @@ mod olivia_api { let deserialized = serde_json::from_str::(json).unwrap(); let expected = oracle::Attestation { - id: OracleEventId("/x/BitMEX/BXBT/2021-10-04T22:00:00.price?n=20".to_string()), + id: BitMexPriceEventId::with_20_digits(datetime!(2021-10-04 22:00:00).assume_utc()), price: 48935, scalars: vec![ "1327b3bd0f1faf45d6fed6c96d0c158da22a2033a6fed98bed036df0a4eef484" @@ -613,19 +623,15 @@ mod tests { use super::*; use time::macros::datetime; - #[test] - fn next_event_id_is_correct() { - let event_id = event_id(datetime!(2021-09-23 10:00:00).assume_utc()); - - assert_eq!(event_id.0, "/x/BitMEX/BXBT/2021-09-23T10:00:00.price?n=20"); - } - #[test] fn next_event_id_after_timestamp() { let event_id = next_announcement_after(datetime!(2021-09-23 10:40:00).assume_utc()).unwrap(); - assert_eq!(event_id.0, "/x/BitMEX/BXBT/2021-09-23T11:00:00.price?n=20"); + assert_eq!( + event_id.to_string(), + "/x/BitMEX/BXBT/2021-09-23T11:00:00.price?n=20" + ); } #[test] @@ -633,6 +639,9 @@ mod tests { let event_id = next_announcement_after(datetime!(2021-09-23 23:40:00).assume_utc()).unwrap(); - assert_eq!(event_id.0, "/x/BitMEX/BXBT/2021-09-24T00:00:00.price?n=20"); + assert_eq!( + event_id.to_string(), + "/x/BitMEX/BXBT/2021-09-24T00:00:00.price?n=20" + ); } } diff --git a/daemon/src/setup_contract.rs b/daemon/src/setup_contract.rs index 48f4738..0ef9ba3 100644 --- a/daemon/src/setup_contract.rs +++ b/daemon/src/setup_contract.rs @@ -1,5 +1,4 @@ use crate::model::cfd::{Cet, Cfd, Dlc, RevokedCommit, Role}; -use crate::model::OracleEventId; use crate::wallet::Wallet; use crate::wire::{ Msg0, Msg1, Msg2, RollOverMsg, RollOverMsg0, RollOverMsg1, RollOverMsg2, SetupMsg, @@ -213,7 +212,7 @@ pub async fn new( }) }) .collect::>>()?; - Ok((OracleEventId(event_id), cets)) + Ok((event_id.parse()?, cets)) }) .collect::>>()?; @@ -272,7 +271,7 @@ pub async fn roll_over( let payouts = HashMap::from_iter([( // TODO : we want to support multiple announcements Announcement { - id: announcement.id.0, + id: announcement.id.to_string(), nonce_pks: announcement.nonce_pks.clone(), }, payout_curve::calculate(cfd.order.price, cfd.quantity_usd, cfd.order.leverage)?, @@ -435,7 +434,7 @@ pub async fn roll_over( }) }) .collect::>>()?; - Ok((OracleEventId(event_id), cets)) + Ok((event_id.parse()?, cets)) }) .collect::>>()?; diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 2dd4d86..7dfae56 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -32,6 +32,7 @@ mod keypair; mod logger; mod model; mod monitor; +mod olivia; mod oracle; mod payout_curve; mod routes; @@ -41,6 +42,7 @@ mod send_to_socket; mod setup_contract; mod taker_cfd; mod to_sse_event; +mod tokio_ext; mod wallet; mod wallet_sync; mod wire; diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 8f53986..928249d 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -8,7 +8,7 @@ use crate::model::cfd::{ Role, RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal, UpdateCfdProposals, }; -use crate::model::{OracleEventId, Usd}; +use crate::model::{BitMexPriceEventId, Usd}; use crate::monitor::{self, MonitorParams}; use crate::wallet::Wallet; use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg}; @@ -244,7 +244,7 @@ impl Actor { order.origin = Origin::Theirs; self.oracle_actor - .do_send_async(oracle::FetchAnnouncement(order.oracle_event_id.clone())) + .do_send_async(oracle::FetchAnnouncement(order.oracle_event_id)) .await?; let mut conn = self.db.acquire().await?; @@ -289,13 +289,13 @@ impl Actor { let offer_announcement = self .oracle_actor - .send(oracle::GetAnnouncement(cfd.order.oracle_event_id.clone())) + .send(oracle::GetAnnouncement(cfd.order.oracle_event_id)) .await? .with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?; self.oracle_actor .do_send_async(oracle::MonitorAttestation { - event_id: offer_announcement.id.clone(), + event_id: offer_announcement.id, }) .await?; @@ -380,7 +380,7 @@ impl Actor { async fn handle_roll_over_accepted( &mut self, order_id: OrderId, - oracle_event_id: OracleEventId, + oracle_event_id: BitMexPriceEventId, ctx: &mut Context, ) -> Result<()> { tracing::info!(%order_id, "Roll; over request got accepted"); @@ -398,13 +398,13 @@ impl Actor { let announcement = self .oracle_actor - .send(oracle::GetAnnouncement(oracle_event_id.clone())) + .send(oracle::GetAnnouncement(oracle_event_id)) .await? .with_context(|| format!("Announcement {} not found", oracle_event_id))?; self.oracle_actor .do_send_async(oracle::MonitorAttestation { - event_id: announcement.id.clone(), + event_id: announcement.id, }) .await?; @@ -633,12 +633,12 @@ impl Actor { ); let mut conn = self.db.acquire().await?; - let cfds = load_cfds_by_oracle_event_id(attestation.id.clone(), &mut conn).await?; + let cfds = load_cfds_by_oracle_event_id(attestation.id, &mut conn).await?; for mut cfd in cfds { if cfd .handle(CfdStateChangeEvent::OracleAttestation(Attestation::new( - attestation.id.clone(), + attestation.id, attestation.price, attestation.scalars.clone(), cfd.dlc() diff --git a/daemon/src/tokio_ext.rs b/daemon/src/tokio_ext.rs new file mode 100644 index 0000000..a0c4f0b --- /dev/null +++ b/daemon/src/tokio_ext.rs @@ -0,0 +1,14 @@ +use std::fmt; +use std::future::Future; + +pub fn spawn_fallible(future: F) +where + F: Future> + Send + 'static, + E: fmt::Display, +{ + tokio::spawn(async move { + if let Err(e) = future.await { + tracing::warn!("Task failed: {:#}", e); + } + }); +} diff --git a/daemon/src/wallet.rs b/daemon/src/wallet.rs index e7c4008..5f1d7ff 100644 --- a/daemon/src/wallet.rs +++ b/daemon/src/wallet.rs @@ -57,7 +57,9 @@ impl Wallet { pub async fn sync(&self) -> Result { let wallet = self.wallet.lock().await; - wallet.sync(NoopProgress, None)?; + wallet + .sync(NoopProgress, None) + .context("Failed to sync wallet")?; let balance = wallet.get_balance()?; diff --git a/daemon/src/wire.rs b/daemon/src/wire.rs index 7619e2d..627394d 100644 --- a/daemon/src/wire.rs +++ b/daemon/src/wire.rs @@ -1,5 +1,5 @@ use crate::model::cfd::OrderId; -use crate::model::{OracleEventId, Usd}; +use crate::model::{BitMexPriceEventId, Usd}; use crate::Order; use anyhow::{bail, Result}; use bdk::bitcoin::secp256k1::Signature; @@ -72,7 +72,7 @@ pub enum MakerToTaker { RollOverProtocol(RollOverMsg), ConfirmRollOver { order_id: OrderId, - oracle_event_id: OracleEventId, + oracle_event_id: BitMexPriceEventId, }, RejectRollOver(OrderId), }