diff --git a/daemon/src/db.rs b/daemon/src/db.rs index 20e1d73..cfe0f73 100644 --- a/daemon/src/db.rs +++ b/daemon/src/db.rs @@ -24,7 +24,7 @@ pub async fn insert_order(order: &Order, conn: &mut PoolConnection) -> a let creation_timestamp = serde_json::to_string(&order.creation_timestamp).unwrap(); let term = serde_json::to_string(&order.term).unwrap(); let origin = serde_json::to_string(&order.origin).unwrap(); - let oracle_event_id = order.oracle_event_id.0.clone(); + let oracle_event_id = order.oracle_event_id.to_string(); sqlx::query!( r#" @@ -88,7 +88,6 @@ pub async fn load_order_by_id( let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap(); let term = serde_json::from_str(row.term.as_str()).unwrap(); let origin = serde_json::from_str(row.origin.as_str()).unwrap(); - let oracle_event_id = BitMexPriceEventId(row.oracle_event_id); Ok(Order { id: uuid, @@ -102,7 +101,7 @@ pub async fn load_order_by_id( creation_timestamp, term, origin, - oracle_event_id, + oracle_event_id: row.oracle_event_id.parse().unwrap(), }) } @@ -299,7 +298,7 @@ pub async fn load_cfd_by_order_id( let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap(); let term = serde_json::from_str(row.term.as_str()).unwrap(); let origin: Origin = serde_json::from_str(row.origin.as_str()).unwrap(); - let oracle_event_id = BitMexPriceEventId(row.oracle_event_id.clone()); + let oracle_event_id = row.oracle_event_id.parse().unwrap(); let quantity = serde_json::from_str(row.quantity_usd.as_str()).unwrap(); let latest_state = serde_json::from_str(row.state.as_str()).unwrap(); @@ -377,7 +376,7 @@ pub async fn load_all_cfds(conn: &mut PoolConnection) -> anyhow::Result< let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap(); let term = serde_json::from_str(row.term.as_str()).unwrap(); let origin: Origin = serde_json::from_str(row.origin.as_str()).unwrap(); - let oracle_event_id = BitMexPriceEventId(row.oracle_event_id.clone()); + let oracle_event_id = row.oracle_event_id.clone().parse().unwrap(); let quantity = serde_json::from_str(row.quantity_usd.as_str()).unwrap(); let latest_state = serde_json::from_str(row.state.as_str()).unwrap(); @@ -413,6 +412,8 @@ pub async fn load_cfds_by_oracle_event_id( oracle_event_id: BitMexPriceEventId, conn: &mut PoolConnection, ) -> anyhow::Result> { + let oracle_event_id = oracle_event_id.to_string(); + let rows = sqlx::query!( r#" select @@ -443,7 +444,7 @@ pub async fn load_cfds_by_oracle_event_id( ) and orders.oracle_event_id = ? "#, - oracle_event_id.0 + oracle_event_id ) .fetch_all(conn) .await?; @@ -462,7 +463,7 @@ pub async fn load_cfds_by_oracle_event_id( let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap(); let term = serde_json::from_str(row.term.as_str()).unwrap(); let origin: Origin = serde_json::from_str(row.origin.as_str()).unwrap(); - let oracle_event_id = BitMexPriceEventId(row.oracle_event_id.clone()); + let oracle_event_id = row.oracle_event_id.parse().unwrap(); let quantity = serde_json::from_str(row.quantity_usd.as_str()).unwrap(); let latest_state = serde_json::from_str(row.state.as_str()).unwrap(); @@ -501,6 +502,8 @@ mod tests { use rust_decimal_macros::dec; use sqlx::SqlitePool; use tempfile::tempdir; + use time::macros::datetime; + use time::OffsetDateTime; use crate::db::insert_order; use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Order}; @@ -580,22 +583,24 @@ mod tests { let pool = setup_test_db().await; let mut conn = pool.acquire().await.unwrap(); - let oracle_event_id_1 = BitMexPriceEventId("dummy_1".to_string()); - let oracle_event_id_2 = BitMexPriceEventId("dummy_2".to_string()); + let oracle_event_id_1 = + BitMexPriceEventId::with_20_digits(datetime!(2021-10-13 10:00:00).assume_utc()); + let oracle_event_id_2 = + BitMexPriceEventId::with_20_digits(datetime!(2021-10-25 18:00:00).assume_utc()); - let cfd_1 = Cfd::default() - .with_order(Order::default().with_oracle_event_id(oracle_event_id_1.clone())); + let cfd_1 = + Cfd::default().with_order(Order::default().with_oracle_event_id(oracle_event_id_1)); insert_order(&cfd_1.order, &mut conn).await.unwrap(); insert_cfd(cfd_1.clone(), &mut conn).await.unwrap(); - let cfd_from_db = load_cfds_by_oracle_event_id(oracle_event_id_1.clone(), &mut conn) + let cfd_from_db = load_cfds_by_oracle_event_id(oracle_event_id_1, &mut conn) .await .unwrap(); assert_eq!(vec![cfd_1.clone()], cfd_from_db); - let cfd_2 = Cfd::default() - .with_order(Order::default().with_oracle_event_id(oracle_event_id_1.clone())); + let cfd_2 = + Cfd::default().with_order(Order::default().with_oracle_event_id(oracle_event_id_1)); insert_order(&cfd_2.order, &mut conn).await.unwrap(); insert_cfd(cfd_2.clone(), &mut conn).await.unwrap(); @@ -605,8 +610,8 @@ mod tests { .unwrap(); assert_eq!(vec![cfd_1, cfd_2], cfd_from_db); - let cfd_3 = Cfd::default() - .with_order(Order::default().with_oracle_event_id(oracle_event_id_2.clone())); + let cfd_3 = + Cfd::default().with_order(Order::default().with_oracle_event_id(oracle_event_id_2)); insert_order(&cfd_3.order, &mut conn).await.unwrap(); insert_cfd(cfd_3.clone(), &mut conn).await.unwrap(); @@ -686,7 +691,7 @@ mod tests { Usd(dec!(100)), Usd(dec!(1000)), Origin::Theirs, - BitMexPriceEventId("Dummy".to_string()), + BitMexPriceEventId::with_20_digits(OffsetDateTime::now_utc()), ) .unwrap() } diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index ed76532..94fe34b 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -33,6 +33,7 @@ mod maker_cfd; mod maker_inc_connections; mod model; mod monitor; +mod olivia; mod oracle; mod payout_curve; mod routes; diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 426b1e2..33de35b 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -200,7 +200,7 @@ impl Actor { oracle::next_announcement_after(time::OffsetDateTime::now_utc() + Order::TERM)?; self.oracle_actor - .do_send_async(oracle::FetchAnnouncement(oracle_event_id.clone())) + .do_send_async(oracle::FetchAnnouncement(oracle_event_id)) .await?; let order = Order::new( @@ -586,13 +586,13 @@ impl Actor { let offer_announcement = self .oracle_actor - .send(oracle::GetAnnouncement(cfd.order.oracle_event_id.clone())) + .send(oracle::GetAnnouncement(cfd.order.oracle_event_id)) .await? .with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?; self.oracle_actor .do_send_async(oracle::MonitorAttestation { - event_id: offer_announcement.id.clone(), + event_id: offer_announcement.id, }) .await?; @@ -767,7 +767,7 @@ impl Actor { oracle::next_announcement_after(time::OffsetDateTime::now_utc() + Order::TERM)?; let announcement = self .oracle_actor - .send(oracle::GetAnnouncement(oracle_event_id.clone())) + .send(oracle::GetAnnouncement(oracle_event_id)) .await? .with_context(|| format!("Announcement {} not found", oracle_event_id))?; @@ -783,7 +783,7 @@ impl Actor { self.oracle_actor .do_send_async(oracle::MonitorAttestation { - event_id: announcement.id.clone(), + event_id: announcement.id, }) .await?; @@ -892,12 +892,12 @@ impl Actor { ); let mut conn = self.db.acquire().await?; - let cfds = load_cfds_by_oracle_event_id(attestation.id.clone(), &mut conn).await?; + let cfds = load_cfds_by_oracle_event_id(attestation.id, &mut conn).await?; for mut cfd in cfds { if cfd .handle(CfdStateChangeEvent::OracleAttestation(Attestation::new( - attestation.id.clone(), + attestation.id, attestation.price, attestation.scalars.clone(), cfd.dlc() diff --git a/daemon/src/model.rs b/daemon/src/model.rs index 0d6b898..520cea7 100644 --- a/daemon/src/model.rs +++ b/daemon/src/model.rs @@ -1,11 +1,14 @@ +use crate::olivia; use anyhow::{Context, Result}; use bdk::bitcoin::{Address, Amount}; use reqwest::Url; use rust_decimal::prelude::ToPrimitive; use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; -use std::fmt; +use serde_with::{DeserializeFromStr, SerializeDisplay}; use std::time::SystemTime; +use std::{fmt, str}; +use time::{OffsetDateTime, PrimitiveDateTime, Time}; use uuid::Uuid; pub mod cfd; @@ -105,32 +108,81 @@ pub struct WalletInfo { pub last_updated_at: SystemTime, } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, Ord)] -pub struct BitMexPriceEventId(pub String); +#[derive( + Debug, Clone, Copy, SerializeDisplay, DeserializeFromStr, PartialEq, Eq, Hash, PartialOrd, Ord, +)] +pub struct BitMexPriceEventId { + /// The timestamp this price event refers to. + timestamp: OffsetDateTime, + digits: usize, +} impl BitMexPriceEventId { - pub fn to_olivia_url(&self) -> Url { + pub fn new(timestamp: OffsetDateTime, digits: usize) -> Self { + let (hours, minutes, seconds) = timestamp.time().as_hms(); + let time_without_nanos = + Time::from_hms(hours, minutes, seconds).expect("original timestamp was valid"); + + let timestamp_without_nanos = timestamp.replace_time(time_without_nanos); + + Self { + timestamp: timestamp_without_nanos, + digits, + } + } + + pub fn with_20_digits(timestamp: OffsetDateTime) -> Self { + Self::new(timestamp, 20) + } + + pub fn to_olivia_url(self) -> Url { "https://h00.ooo" .parse::() .expect("valid URL from constant") - .join(self.0.as_str()) + .join(&self.to_string()) .expect("Event id can be joined") } } impl fmt::Display for BitMexPriceEventId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) + write!( + f, + "/x/BitMEX/BXBT/{}.price?n={}", + self.timestamp + .format(&olivia::EVENT_TIME_FORMAT) + .expect("should always format and we can't return an error here"), + self.digits + ) + } +} + +impl str::FromStr for BitMexPriceEventId { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let remaining = s.trim_start_matches("/x/BitMEX/BXBT/"); + let (timestamp, rest) = remaining.split_at(19); + let digits = rest.trim_start_matches(".price?n="); + + Ok(Self { + timestamp: PrimitiveDateTime::parse(timestamp, &olivia::EVENT_TIME_FORMAT) + .with_context(|| format!("Failed to parse {} as timestamp", timestamp))? + .assume_utc(), + digits: digits.parse()?, + }) } } #[cfg(test)] mod tests { + use time::macros::datetime; + use super::*; #[test] fn to_olivia_url() { - let url = BitMexPriceEventId("/x/BitMEX/BXBT/2021-09-23T10:00:00.price?n=20".to_string()) + let url = BitMexPriceEventId::with_20_digits(datetime!(2021-09-23 10:00:00).assume_utc()) .to_olivia_url(); assert_eq!( @@ -140,4 +192,22 @@ mod tests { .unwrap() ); } + + #[test] + fn parse_event_id() { + let parsed = "/x/BitMEX/BXBT/2021-09-23T10:00:00.price?n=20" + .parse::() + .unwrap(); + let expected = + BitMexPriceEventId::with_20_digits(datetime!(2021-09-23 10:00:00).assume_utc()); + + assert_eq!(parsed, expected); + } + + #[test] + fn new_event_has_no_nanos() { + let now = BitMexPriceEventId::with_20_digits(OffsetDateTime::now_utc()); + + assert_eq!(now.timestamp.nanosecond(), 0); + } } diff --git a/daemon/src/olivia.rs b/daemon/src/olivia.rs new file mode 100644 index 0000000..e948ed3 --- /dev/null +++ b/daemon/src/olivia.rs @@ -0,0 +1,5 @@ +use time::format_description::FormatItem; +use time::macros::format_description; + +pub const EVENT_TIME_FORMAT: &[FormatItem] = + format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]"); diff --git a/daemon/src/oracle.rs b/daemon/src/oracle.rs index baebca5..991cabf 100644 --- a/daemon/src/oracle.rs +++ b/daemon/src/oracle.rs @@ -5,17 +5,12 @@ use anyhow::{Context, Result}; use async_trait::async_trait; use cfd_protocol::secp256k1_zkp::{schnorrsig, SecretKey}; use reqwest::StatusCode; -use rocket::time::format_description::FormatItem; -use rocket::time::macros::format_description; use rocket::time::{OffsetDateTime, Time}; use serde::Deserialize; use std::collections::{HashMap, HashSet}; use std::ops::Add; use time::ext::NumericalDuration; -const OLIVIA_EVENT_TIME_FORMAT: &[FormatItem] = - format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]"); - pub struct Actor { announcements: HashMap)>, pending_announcements: HashSet, @@ -213,7 +208,7 @@ where #[async_trait] impl xtra::Handler for Actor { async fn handle(&mut self, msg: MonitorAttestation, _ctx: &mut xtra::Context) { - if !self.pending_attestations.insert(msg.event_id.clone()) { + if !self.pending_attestations.insert(msg.event_id) { tracing::trace!("Attestation {} already being monitored", msg.event_id); } } @@ -222,7 +217,7 @@ impl xtra::Handler for Actor xtra::Handler for Actor { async fn handle(&mut self, msg: FetchAnnouncement, _ctx: &mut xtra::Context) { - if !self.pending_announcements.insert(msg.0.clone()) { + if !self.pending_announcements.insert(msg.0) { tracing::trace!("Announcement {} already being fetched", msg.0); } } @@ -238,7 +233,7 @@ impl xtra::Handler for Actor self.announcements .get_key_value(&msg.0) .map(|(id, (time, nonce_pks))| Announcement { - id: id.clone(), + id: *id, expected_outcome_time: *time, nonce_pks: nonce_pks.clone(), }) @@ -258,7 +253,7 @@ impl xtra::Handler for Actor Result { let adjusted = ceil_to_next_hour(timestamp)?; - Ok(event_id(adjusted)) + Ok(BitMexPriceEventId::with_20_digits(adjusted)) } fn ceil_to_next_hour(original: OffsetDateTime) -> Result { @@ -270,16 +265,6 @@ fn ceil_to_next_hour(original: OffsetDateTime) -> Result BitMexPriceEventId { - let datetime = datetime - .format(&OLIVIA_EVENT_TIME_FORMAT) - .expect("valid formatter for datetime"); - - BitMexPriceEventId(format!("/x/BitMEX/BXBT/{}.price?n=20", datetime)) -} - #[derive(Debug, Clone, serde::Deserialize, PartialEq)] #[serde(try_from = "olivia_api::Response")] pub struct Announcement { @@ -295,7 +280,7 @@ pub struct Announcement { impl From for cfd_protocol::Announcement { fn from(announcement: Announcement) -> Self { cfd_protocol::Announcement { - id: announcement.id.0, + id: announcement.id.to_string(), nonce_pks: announcement.nonce_pks, } } @@ -360,7 +345,7 @@ mod olivia_api { serde_json::from_str::(&response.announcement.oracle_event.data)?; Ok(Self { - id: BitMexPriceEventId(data.id), + id: data.id, expected_outcome_time: data.expected_outcome_time, nonce_pks: data.schemes.olivia_v1.nonces, }) @@ -378,7 +363,7 @@ mod olivia_api { let attestation = response.attestation.context("attestation missing")?; Ok(Self { - id: BitMexPriceEventId(data.id), + id: data.id, price: attestation.outcome.parse()?, scalars: attestation.schemes.olivia_v1.scalars, }) @@ -399,7 +384,7 @@ mod olivia_api { #[derive(Debug, Clone, serde::Deserialize)] #[serde(rename_all = "kebab-case")] struct AnnouncementData { - id: String, + id: BitMexPriceEventId, #[serde(with = "timestamp")] expected_outcome_time: OffsetDateTime, schemes: Schemes, @@ -428,7 +413,7 @@ mod olivia_api { } mod timestamp { - use crate::oracle::OLIVIA_EVENT_TIME_FORMAT; + use crate::olivia; use serde::de::Error as _; use serde::{Deserialize, Deserializer}; use time::{OffsetDateTime, PrimitiveDateTime}; @@ -438,7 +423,7 @@ mod olivia_api { D: Deserializer<'a>, { let string = String::deserialize(deserializer)?; - let date_time = PrimitiveDateTime::parse(&string, &OLIVIA_EVENT_TIME_FORMAT) + let date_time = PrimitiveDateTime::parse(&string, &olivia::EVENT_TIME_FORMAT) .map_err(D::Error::custom)?; Ok(date_time.assume_utc()) @@ -459,7 +444,7 @@ mod olivia_api { let deserialized = serde_json::from_str::(json).unwrap(); let expected = oracle::Announcement { - id: BitMexPriceEventId("/x/BitMEX/BXBT/2021-10-04T22:00:00.price?n=20".to_string()), + id: BitMexPriceEventId::with_20_digits(datetime!(2021-10-04 22:00:00).assume_utc()), expected_outcome_time: datetime!(2021-10-04 22:00:00).assume_utc(), nonce_pks: vec![ "8d72028eeaf4b85aec0f750f05a4a320cac193f5d8494bfe05cd4b29f3df4239" @@ -534,7 +519,7 @@ mod olivia_api { let deserialized = serde_json::from_str::(json).unwrap(); let expected = oracle::Attestation { - id: BitMexPriceEventId("/x/BitMEX/BXBT/2021-10-04T22:00:00.price?n=20".to_string()), + id: BitMexPriceEventId::with_20_digits(datetime!(2021-10-04 22:00:00).assume_utc()), price: 48935, scalars: vec![ "1327b3bd0f1faf45d6fed6c96d0c158da22a2033a6fed98bed036df0a4eef484" @@ -610,19 +595,15 @@ mod tests { use super::*; use time::macros::datetime; - #[test] - fn next_event_id_is_correct() { - let event_id = event_id(datetime!(2021-09-23 10:00:00).assume_utc()); - - assert_eq!(event_id.0, "/x/BitMEX/BXBT/2021-09-23T10:00:00.price?n=20"); - } - #[test] fn next_event_id_after_timestamp() { let event_id = next_announcement_after(datetime!(2021-09-23 10:40:00).assume_utc()).unwrap(); - assert_eq!(event_id.0, "/x/BitMEX/BXBT/2021-09-23T11:00:00.price?n=20"); + assert_eq!( + event_id.to_string(), + "/x/BitMEX/BXBT/2021-09-23T11:00:00.price?n=20" + ); } #[test] @@ -630,6 +611,9 @@ mod tests { let event_id = next_announcement_after(datetime!(2021-09-23 23:40:00).assume_utc()).unwrap(); - assert_eq!(event_id.0, "/x/BitMEX/BXBT/2021-09-24T00:00:00.price?n=20"); + assert_eq!( + event_id.to_string(), + "/x/BitMEX/BXBT/2021-09-24T00:00:00.price?n=20" + ); } } diff --git a/daemon/src/setup_contract.rs b/daemon/src/setup_contract.rs index d849569..0ef9ba3 100644 --- a/daemon/src/setup_contract.rs +++ b/daemon/src/setup_contract.rs @@ -1,5 +1,4 @@ use crate::model::cfd::{Cet, Cfd, Dlc, RevokedCommit, Role}; -use crate::model::BitMexPriceEventId; use crate::wallet::Wallet; use crate::wire::{ Msg0, Msg1, Msg2, RollOverMsg, RollOverMsg0, RollOverMsg1, RollOverMsg2, SetupMsg, @@ -213,7 +212,7 @@ pub async fn new( }) }) .collect::>>()?; - Ok((BitMexPriceEventId(event_id), cets)) + Ok((event_id.parse()?, cets)) }) .collect::>>()?; @@ -272,7 +271,7 @@ pub async fn roll_over( let payouts = HashMap::from_iter([( // TODO : we want to support multiple announcements Announcement { - id: announcement.id.0, + id: announcement.id.to_string(), nonce_pks: announcement.nonce_pks.clone(), }, payout_curve::calculate(cfd.order.price, cfd.quantity_usd, cfd.order.leverage)?, @@ -435,7 +434,7 @@ pub async fn roll_over( }) }) .collect::>>()?; - Ok((BitMexPriceEventId(event_id), cets)) + Ok((event_id.parse()?, cets)) }) .collect::>>()?; diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 2dd4d86..9ba4646 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -32,6 +32,7 @@ mod keypair; mod logger; mod model; mod monitor; +mod olivia; mod oracle; mod payout_curve; mod routes; diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 8a9a0fe..928249d 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -244,7 +244,7 @@ impl Actor { order.origin = Origin::Theirs; self.oracle_actor - .do_send_async(oracle::FetchAnnouncement(order.oracle_event_id.clone())) + .do_send_async(oracle::FetchAnnouncement(order.oracle_event_id)) .await?; let mut conn = self.db.acquire().await?; @@ -289,13 +289,13 @@ impl Actor { let offer_announcement = self .oracle_actor - .send(oracle::GetAnnouncement(cfd.order.oracle_event_id.clone())) + .send(oracle::GetAnnouncement(cfd.order.oracle_event_id)) .await? .with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?; self.oracle_actor .do_send_async(oracle::MonitorAttestation { - event_id: offer_announcement.id.clone(), + event_id: offer_announcement.id, }) .await?; @@ -398,13 +398,13 @@ impl Actor { let announcement = self .oracle_actor - .send(oracle::GetAnnouncement(oracle_event_id.clone())) + .send(oracle::GetAnnouncement(oracle_event_id)) .await? .with_context(|| format!("Announcement {} not found", oracle_event_id))?; self.oracle_actor .do_send_async(oracle::MonitorAttestation { - event_id: announcement.id.clone(), + event_id: announcement.id, }) .await?; @@ -633,12 +633,12 @@ impl Actor { ); let mut conn = self.db.acquire().await?; - let cfds = load_cfds_by_oracle_event_id(attestation.id.clone(), &mut conn).await?; + let cfds = load_cfds_by_oracle_event_id(attestation.id, &mut conn).await?; for mut cfd in cfds { if cfd .handle(CfdStateChangeEvent::OracleAttestation(Attestation::new( - attestation.id.clone(), + attestation.id, attestation.price, attestation.scalars.clone(), cfd.dlc()