Browse Source

Merge #287

287: Fetch attestations only when it is time r=thomaseizinger a=thomaseizinger

- Rename `OracleEventId`
- Refer to types within `fmt` module via module qualifier
- fixup! Refer to types within `fmt` module via module qualifier
- Prefer `.parse` over `FromStr` to avoid an import
- Properly model event IDs
- Fix typo
- Only fetch attestation that are likely to have occured
- Introduce drop-in replacement for tokio::spawn that logs errors


Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
refactor/no-log-handler
bors[bot] 3 years ago
committed by GitHub
parent
commit
1507da93dd
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 47
      daemon/src/db.rs
  2. 11
      daemon/src/maker.rs
  3. 14
      daemon/src/maker_cfd.rs
  4. 4
      daemon/src/maker_inc_connections.rs
  5. 134
      daemon/src/model.rs
  6. 12
      daemon/src/model/cfd.rs
  7. 10
      daemon/src/monitor.rs
  8. 5
      daemon/src/olivia.rs
  9. 225
      daemon/src/oracle.rs
  10. 7
      daemon/src/setup_contract.rs
  11. 2
      daemon/src/taker.rs
  12. 18
      daemon/src/taker_cfd.rs
  13. 14
      daemon/src/tokio_ext.rs
  14. 4
      daemon/src/wallet.rs
  15. 4
      daemon/src/wire.rs

47
daemon/src/db.rs

@ -1,5 +1,5 @@
use crate::model::cfd::{Cfd, CfdState, Order, OrderId, Origin};
use crate::model::{Leverage, OracleEventId, Position};
use crate::model::{BitMexPriceEventId, Leverage, Position};
use anyhow::{Context, Result};
use rocket_db_pools::sqlx;
use sqlx::pool::PoolConnection;
@ -24,7 +24,7 @@ pub async fn insert_order(order: &Order, conn: &mut PoolConnection<Sqlite>) -> a
let creation_timestamp = serde_json::to_string(&order.creation_timestamp).unwrap();
let term = serde_json::to_string(&order.term).unwrap();
let origin = serde_json::to_string(&order.origin).unwrap();
let oracle_event_id = order.oracle_event_id.0.clone();
let oracle_event_id = order.oracle_event_id.to_string();
sqlx::query!(
r#"
@ -88,7 +88,6 @@ pub async fn load_order_by_id(
let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap();
let term = serde_json::from_str(row.term.as_str()).unwrap();
let origin = serde_json::from_str(row.origin.as_str()).unwrap();
let oracle_event_id = OracleEventId(row.oracle_event_id);
Ok(Order {
id: uuid,
@ -102,7 +101,7 @@ pub async fn load_order_by_id(
creation_timestamp,
term,
origin,
oracle_event_id,
oracle_event_id: row.oracle_event_id.parse().unwrap(),
})
}
@ -299,7 +298,7 @@ pub async fn load_cfd_by_order_id(
let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap();
let term = serde_json::from_str(row.term.as_str()).unwrap();
let origin: Origin = serde_json::from_str(row.origin.as_str()).unwrap();
let oracle_event_id = OracleEventId(row.oracle_event_id.clone());
let oracle_event_id = row.oracle_event_id.parse().unwrap();
let quantity = serde_json::from_str(row.quantity_usd.as_str()).unwrap();
let latest_state = serde_json::from_str(row.state.as_str()).unwrap();
@ -377,7 +376,7 @@ pub async fn load_all_cfds(conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<
let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap();
let term = serde_json::from_str(row.term.as_str()).unwrap();
let origin: Origin = serde_json::from_str(row.origin.as_str()).unwrap();
let oracle_event_id = OracleEventId(row.oracle_event_id.clone());
let oracle_event_id = row.oracle_event_id.clone().parse().unwrap();
let quantity = serde_json::from_str(row.quantity_usd.as_str()).unwrap();
let latest_state = serde_json::from_str(row.state.as_str()).unwrap();
@ -410,9 +409,11 @@ pub async fn load_all_cfds(conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<
/// Loads all CFDs with the latest state as the CFD state
pub async fn load_cfds_by_oracle_event_id(
oracle_event_id: OracleEventId,
oracle_event_id: BitMexPriceEventId,
conn: &mut PoolConnection<Sqlite>,
) -> anyhow::Result<Vec<Cfd>> {
let oracle_event_id = oracle_event_id.to_string();
let rows = sqlx::query!(
r#"
select
@ -443,7 +444,7 @@ pub async fn load_cfds_by_oracle_event_id(
)
and orders.oracle_event_id = ?
"#,
oracle_event_id.0
oracle_event_id
)
.fetch_all(conn)
.await?;
@ -462,7 +463,7 @@ pub async fn load_cfds_by_oracle_event_id(
let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap();
let term = serde_json::from_str(row.term.as_str()).unwrap();
let origin: Origin = serde_json::from_str(row.origin.as_str()).unwrap();
let oracle_event_id = OracleEventId(row.oracle_event_id.clone());
let oracle_event_id = row.oracle_event_id.parse().unwrap();
let quantity = serde_json::from_str(row.quantity_usd.as_str()).unwrap();
let latest_state = serde_json::from_str(row.state.as_str()).unwrap();
@ -501,6 +502,8 @@ mod tests {
use rust_decimal_macros::dec;
use sqlx::SqlitePool;
use tempfile::tempdir;
use time::macros::datetime;
use time::OffsetDateTime;
use crate::db::insert_order;
use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Order};
@ -580,22 +583,24 @@ mod tests {
let pool = setup_test_db().await;
let mut conn = pool.acquire().await.unwrap();
let oracle_event_id_1 = OracleEventId("dummy_1".to_string());
let oracle_event_id_2 = OracleEventId("dummy_2".to_string());
let oracle_event_id_1 =
BitMexPriceEventId::with_20_digits(datetime!(2021-10-13 10:00:00).assume_utc());
let oracle_event_id_2 =
BitMexPriceEventId::with_20_digits(datetime!(2021-10-25 18:00:00).assume_utc());
let cfd_1 = Cfd::default()
.with_order(Order::default().with_oracle_event_id(oracle_event_id_1.clone()));
let cfd_1 =
Cfd::default().with_order(Order::default().with_oracle_event_id(oracle_event_id_1));
insert_order(&cfd_1.order, &mut conn).await.unwrap();
insert_cfd(cfd_1.clone(), &mut conn).await.unwrap();
let cfd_from_db = load_cfds_by_oracle_event_id(oracle_event_id_1.clone(), &mut conn)
let cfd_from_db = load_cfds_by_oracle_event_id(oracle_event_id_1, &mut conn)
.await
.unwrap();
assert_eq!(vec![cfd_1.clone()], cfd_from_db);
let cfd_2 = Cfd::default()
.with_order(Order::default().with_oracle_event_id(oracle_event_id_1.clone()));
let cfd_2 =
Cfd::default().with_order(Order::default().with_oracle_event_id(oracle_event_id_1));
insert_order(&cfd_2.order, &mut conn).await.unwrap();
insert_cfd(cfd_2.clone(), &mut conn).await.unwrap();
@ -605,8 +610,8 @@ mod tests {
.unwrap();
assert_eq!(vec![cfd_1, cfd_2], cfd_from_db);
let cfd_3 = Cfd::default()
.with_order(Order::default().with_oracle_event_id(oracle_event_id_2.clone()));
let cfd_3 =
Cfd::default().with_order(Order::default().with_oracle_event_id(oracle_event_id_2));
insert_order(&cfd_3.order, &mut conn).await.unwrap();
insert_cfd(cfd_3.clone(), &mut conn).await.unwrap();
@ -647,8 +652,6 @@ mod tests {
// file has to exist in order to connect with sqlite
File::create(temp_db.clone()).unwrap();
dbg!(&temp_db);
let pool = SqlitePool::connect(format!("sqlite:{}", temp_db.display()).as_str())
.await
.unwrap();
@ -686,14 +689,14 @@ mod tests {
Usd(dec!(100)),
Usd(dec!(1000)),
Origin::Theirs,
OracleEventId("Dummy".to_string()),
BitMexPriceEventId::with_20_digits(OffsetDateTime::now_utc()),
)
.unwrap()
}
}
impl Order {
pub fn with_oracle_event_id(mut self, oracle_event_id: OracleEventId) -> Self {
pub fn with_oracle_event_id(mut self, oracle_event_id: BitMexPriceEventId) -> Self {
self.oracle_event_id = oracle_event_id;
self
}

11
daemon/src/maker.rs

@ -33,6 +33,7 @@ mod maker_cfd;
mod maker_inc_connections;
mod model;
mod monitor;
mod olivia;
mod oracle;
mod payout_curve;
mod routes;
@ -41,6 +42,7 @@ mod seed;
mod send_to_socket;
mod setup_contract;
mod to_sse_event;
mod tokio_ext;
mod wallet;
mod wallet_sync;
mod wire;
@ -151,7 +153,7 @@ async fn main() -> Result<()> {
ext_priv_key,
)
.await?;
let wallet_info = wallet.sync().await.unwrap();
let wallet_info = wallet.sync().await?;
let auth_password = seed.derive_auth_password::<auth::Password>();
@ -176,7 +178,12 @@ async fn main() -> Result<()> {
.merge(("address", opts.http_address.ip()))
.merge(("port", opts.http_address.port()));
let listener = tokio::net::TcpListener::bind(&format!("0.0.0.0:{}", opts.p2p_port)).await?;
let p2p_socket = format!("0.0.0.0:{}", opts.p2p_port)
.parse::<SocketAddr>()
.unwrap();
let listener = tokio::net::TcpListener::bind(p2p_socket)
.await
.with_context(|| format!("Failed to listen on {}", p2p_socket))?;
let local_addr = listener.local_addr().unwrap();
tracing::info!("Listening on {}", local_addr);

14
daemon/src/maker_cfd.rs

@ -200,7 +200,7 @@ impl Actor {
oracle::next_announcement_after(time::OffsetDateTime::now_utc() + Order::TERM)?;
self.oracle_actor
.do_send_async(oracle::FetchAnnouncement(oracle_event_id.clone()))
.do_send_async(oracle::FetchAnnouncement(oracle_event_id))
.await?;
let order = Order::new(
@ -586,13 +586,13 @@ impl Actor {
let offer_announcement = self
.oracle_actor
.send(oracle::GetAnnouncement(cfd.order.oracle_event_id.clone()))
.send(oracle::GetAnnouncement(cfd.order.oracle_event_id))
.await?
.with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?;
self.oracle_actor
.do_send_async(oracle::MonitorAttestation {
event_id: offer_announcement.id.clone(),
event_id: offer_announcement.id,
})
.await?;
@ -767,7 +767,7 @@ impl Actor {
oracle::next_announcement_after(time::OffsetDateTime::now_utc() + Order::TERM)?;
let announcement = self
.oracle_actor
.send(oracle::GetAnnouncement(oracle_event_id.clone()))
.send(oracle::GetAnnouncement(oracle_event_id))
.await?
.with_context(|| format!("Announcement {} not found", oracle_event_id))?;
@ -783,7 +783,7 @@ impl Actor {
self.oracle_actor
.do_send_async(oracle::MonitorAttestation {
event_id: announcement.id.clone(),
event_id: announcement.id,
})
.await?;
@ -892,12 +892,12 @@ impl Actor {
);
let mut conn = self.db.acquire().await?;
let cfds = load_cfds_by_oracle_event_id(attestation.id.clone(), &mut conn).await?;
let cfds = load_cfds_by_oracle_event_id(attestation.id, &mut conn).await?;
for mut cfd in cfds {
if cfd
.handle(CfdStateChangeEvent::OracleAttestation(Attestation::new(
attestation.id.clone(),
attestation.id,
attestation.price,
attestation.scalars.clone(),
cfd.dlc()

4
daemon/src/maker_inc_connections.rs

@ -1,6 +1,6 @@
use crate::actors::log_error;
use crate::model::cfd::{Order, OrderId};
use crate::model::{OracleEventId, TakerId};
use crate::model::{BitMexPriceEventId, TakerId};
use crate::{maker_cfd, send_to_socket, wire};
use anyhow::{Context as AnyhowContext, Result};
use async_trait::async_trait;
@ -38,7 +38,7 @@ pub enum TakerCommand {
},
NotifyRollOverAccepted {
id: OrderId,
oracle_event_id: OracleEventId,
oracle_event_id: BitMexPriceEventId,
},
NotifyRollOverRejected {
id: OrderId,

134
daemon/src/model.rs

@ -1,15 +1,14 @@
use std::fmt::{Display, Formatter};
use crate::olivia;
use anyhow::{Context, Result};
use bdk::bitcoin::{Address, Amount};
use reqwest::Url;
use rust_decimal::prelude::ToPrimitive;
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use bdk::bitcoin::{Address, Amount};
use reqwest::Url;
use std::fmt;
use std::str::FromStr;
use serde_with::{DeserializeFromStr, SerializeDisplay};
use std::time::SystemTime;
use std::{fmt, str};
use time::{OffsetDateTime, PrimitiveDateTime, Time};
use uuid::Uuid;
pub mod cfd;
@ -46,8 +45,8 @@ impl Usd {
}
}
impl Display for Usd {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
impl fmt::Display for Usd {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
@ -61,8 +60,8 @@ impl From<Decimal> for Usd {
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq)]
pub struct Percent(pub Decimal);
impl Display for Percent {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
impl fmt::Display for Percent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.round_dp(2).fmt(f)
}
}
@ -96,8 +95,8 @@ impl Default for TakerId {
}
}
impl Display for TakerId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
impl fmt::Display for TakerId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
@ -109,36 +108,123 @@ pub struct WalletInfo {
pub last_updated_at: SystemTime,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct OracleEventId(pub String);
#[derive(
Debug, Clone, Copy, SerializeDisplay, DeserializeFromStr, PartialEq, Eq, Hash, PartialOrd, Ord,
)]
pub struct BitMexPriceEventId {
/// The timestamp this price event refers to.
timestamp: OffsetDateTime,
digits: usize,
}
impl BitMexPriceEventId {
pub fn new(timestamp: OffsetDateTime, digits: usize) -> Self {
let (hours, minutes, seconds) = timestamp.time().as_hms();
let time_without_nanos =
Time::from_hms(hours, minutes, seconds).expect("original timestamp was valid");
let timestamp_without_nanos = timestamp.replace_time(time_without_nanos);
impl OracleEventId {
pub fn to_olivia_url(&self) -> Url {
Url::from_str("https://h00.ooo")
Self {
timestamp: timestamp_without_nanos,
digits,
}
}
pub fn with_20_digits(timestamp: OffsetDateTime) -> Self {
Self::new(timestamp, 20)
}
/// Checks whether this event has likely already occurred.
///
/// We can't be sure about it because our local clock might be off from the oracle's clock.
pub fn has_likely_occured(&self) -> bool {
let now = OffsetDateTime::now_utc();
now > self.timestamp
}
pub fn to_olivia_url(self) -> Url {
"https://h00.ooo"
.parse::<Url>()
.expect("valid URL from constant")
.join(self.0.as_str())
.join(&self.to_string())
.expect("Event id can be joined")
}
}
impl Display for OracleEventId {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
impl fmt::Display for BitMexPriceEventId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"/x/BitMEX/BXBT/{}.price?n={}",
self.timestamp
.format(&olivia::EVENT_TIME_FORMAT)
.expect("should always format and we can't return an error here"),
self.digits
)
}
}
impl str::FromStr for BitMexPriceEventId {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let remaining = s.trim_start_matches("/x/BitMEX/BXBT/");
let (timestamp, rest) = remaining.split_at(19);
let digits = rest.trim_start_matches(".price?n=");
Ok(Self {
timestamp: PrimitiveDateTime::parse(timestamp, &olivia::EVENT_TIME_FORMAT)
.with_context(|| format!("Failed to parse {} as timestamp", timestamp))?
.assume_utc(),
digits: digits.parse()?,
})
}
}
#[cfg(test)]
mod tests {
use time::macros::datetime;
use super::*;
#[test]
fn to_olivia_url() {
let url = OracleEventId("/x/BitMEX/BXBT/2021-09-23T10:00:00.price?n=20".to_string())
let url = BitMexPriceEventId::with_20_digits(datetime!(2021-09-23 10:00:00).assume_utc())
.to_olivia_url();
assert_eq!(
url,
Url::from_str("https://h00.ooo/x/BitMEX/BXBT/2021-09-23T10:00:00.price?n=20").unwrap()
"https://h00.ooo/x/BitMEX/BXBT/2021-09-23T10:00:00.price?n=20"
.parse()
.unwrap()
);
}
#[test]
fn parse_event_id() {
let parsed = "/x/BitMEX/BXBT/2021-09-23T10:00:00.price?n=20"
.parse::<BitMexPriceEventId>()
.unwrap();
let expected =
BitMexPriceEventId::with_20_digits(datetime!(2021-09-23 10:00:00).assume_utc());
assert_eq!(parsed, expected);
}
#[test]
fn new_event_has_no_nanos() {
let now = BitMexPriceEventId::with_20_digits(OffsetDateTime::now_utc());
assert_eq!(now.timestamp.nanosecond(), 0);
}
#[test]
fn has_occured_if_in_the_past() {
let past_event =
BitMexPriceEventId::with_20_digits(datetime!(2021-09-23 10:00:00).assume_utc());
assert!(past_event.has_likely_occured());
}
}

12
daemon/src/model/cfd.rs

@ -1,4 +1,4 @@
use crate::model::{Leverage, OracleEventId, Percent, Position, TakerId, TradingPair, Usd};
use crate::model::{BitMexPriceEventId, Leverage, Percent, Position, TakerId, TradingPair, Usd};
use crate::{monitor, oracle};
use anyhow::{bail, Context, Result};
use bdk::bitcoin::secp256k1::{SecretKey, Signature};
@ -97,7 +97,7 @@ pub struct Order {
/// The id of the event to be used for price attestation
///
/// The maker includes this into the Order based on the Oracle announcement to be used.
pub oracle_event_id: OracleEventId,
pub oracle_event_id: BitMexPriceEventId,
}
#[allow(dead_code)] // Only one binary and the tests use this.
@ -109,7 +109,7 @@ impl Order {
min_quantity: Usd,
max_quantity: Usd,
origin: Origin,
oracle_event_id: OracleEventId,
oracle_event_id: BitMexPriceEventId,
) -> Result<Self> {
let leverage = Leverage(2);
let maintenance_margin_rate = dec!(0.005);
@ -286,7 +286,7 @@ pub enum CfdState {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Attestation {
pub id: OracleEventId,
pub id: BitMexPriceEventId,
pub scalars: Vec<SecretKey>,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_sat")]
payout: Amount,
@ -296,7 +296,7 @@ pub struct Attestation {
impl Attestation {
pub fn new(
id: OracleEventId,
id: BitMexPriceEventId,
price: u64,
scalars: Vec<SecretKey>,
dlc: Dlc,
@ -1483,7 +1483,7 @@ pub struct Dlc {
/// The fully signed lock transaction ready to be published on chain
pub lock: (Transaction, Descriptor<PublicKey>),
pub commit: (Transaction, EcdsaAdaptorSignature, Descriptor<PublicKey>),
pub cets: HashMap<OracleEventId, Vec<Cet>>,
pub cets: HashMap<BitMexPriceEventId, Vec<Cet>>,
pub refund: (Transaction, Signature),
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_sat")]

10
daemon/src/monitor.rs

@ -1,6 +1,6 @@
use crate::actors::log_error;
use crate::model::cfd::{CetStatus, Cfd, CfdState, Dlc, OrderId};
use crate::model::OracleEventId;
use crate::model::BitMexPriceEventId;
use crate::oracle::Attestation;
use crate::{model, oracle};
use anyhow::{Context, Result};
@ -27,7 +27,7 @@ pub struct StartMonitoring {
pub struct MonitorParams {
lock: (Txid, Descriptor<PublicKey>),
commit: (Txid, Descriptor<PublicKey>),
cets: HashMap<OracleEventId, Vec<Cet>>,
cets: HashMap<BitMexPriceEventId, Vec<Cet>>,
refund: (Txid, Script, u32),
revoked_commits: Vec<(Txid, Script)>,
}
@ -209,7 +209,7 @@ where
fn monitor_cet_finality(
&mut self,
cets: HashMap<OracleEventId, Vec<Cet>>,
cets: HashMap<BitMexPriceEventId, Vec<Cet>>,
attestation: Attestation,
order_id: OrderId,
) -> Result<()> {
@ -581,8 +581,8 @@ impl From<model::cfd::Cet> for Cet {
}
fn map_cets(
cets: HashMap<OracleEventId, Vec<model::cfd::Cet>>,
) -> HashMap<OracleEventId, Vec<Cet>> {
cets: HashMap<BitMexPriceEventId, Vec<model::cfd::Cet>>,
) -> HashMap<BitMexPriceEventId, Vec<Cet>> {
cets.into_iter()
.map(|(event_id, cets)| {
(

5
daemon/src/olivia.rs

@ -0,0 +1,5 @@
use time::format_description::FormatItem;
use time::macros::format_description;
pub const EVENT_TIME_FORMAT: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]");

225
daemon/src/oracle.rs

@ -1,25 +1,20 @@
use crate::actors::log_error;
use crate::model::cfd::{Cfd, CfdState};
use crate::model::OracleEventId;
use crate::model::BitMexPriceEventId;
use crate::tokio_ext;
use anyhow::{Context, Result};
use async_trait::async_trait;
use cfd_protocol::secp256k1_zkp::{schnorrsig, SecretKey};
use reqwest::StatusCode;
use rocket::time::format_description::FormatItem;
use rocket::time::macros::format_description;
use rocket::time::{OffsetDateTime, Time};
use serde::Deserialize;
use std::collections::{HashMap, HashSet};
use std::ops::Add;
use time::ext::NumericalDuration;
const OLIVIA_EVENT_TIME_FORMAT: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]");
pub struct Actor<CFD, M> {
announcements: HashMap<OracleEventId, (OffsetDateTime, Vec<schnorrsig::PublicKey>)>,
pending_announcements: HashSet<OracleEventId>,
pending_attestations: HashSet<OracleEventId>,
announcements: HashMap<BitMexPriceEventId, (OffsetDateTime, Vec<schnorrsig::PublicKey>)>,
pending_announcements: HashSet<BitMexPriceEventId>,
pending_attestations: HashSet<BitMexPriceEventId>,
cfd_actor_address: xtra::Address<CFD>,
monitor_actor_address: xtra::Address<M>,
}
@ -32,25 +27,25 @@ pub struct Sync;
/// The `Announcement` corresponds to the `OracleEventId` included in
/// the message.
#[derive(Debug, Clone)]
pub struct FetchAnnouncement(pub OracleEventId);
pub struct FetchAnnouncement(pub BitMexPriceEventId);
pub struct MonitorAttestation {
pub event_id: OracleEventId,
pub event_id: BitMexPriceEventId,
}
/// Message used to request the `Announcement` from the
/// `oracle::Actor`'s local state.
///
/// The `Announcement` corresponds to the `OracleEventId` included in
/// The `Announcement` corresponds to the [`BitMexPriceEventId`] included in
/// the message.
#[derive(Debug, Clone)]
pub struct GetAnnouncement(pub OracleEventId);
pub struct GetAnnouncement(pub BitMexPriceEventId);
// TODO: Split xtra::Message and API object
#[derive(Debug, Clone, Deserialize, PartialEq)]
#[serde(try_from = "olivia_api::Response")]
pub struct Attestation {
pub id: OracleEventId,
pub id: BitMexPriceEventId,
pub price: u64,
pub scalars: Vec<SecretKey>,
}
@ -58,11 +53,18 @@ pub struct Attestation {
/// A module-private message to allow parallelization of fetching announcements.
#[derive(Debug)]
struct NewAnnouncementFetched {
id: OracleEventId,
id: BitMexPriceEventId,
expected_outcome_time: OffsetDateTime,
nonce_pks: Vec<schnorrsig::PublicKey>,
}
/// A module-private message to allow parallelization of fetching attestations.
#[derive(Debug)]
struct NewAttestationFetched {
id: BitMexPriceEventId,
attestation: Attestation,
}
impl<CFD, M> Actor<CFD, M> {
pub fn new(
cfd_actor_address: xtra::Address<CFD>,
@ -111,11 +113,11 @@ where
CFD: 'static,
M: 'static,
{
async fn update_pending_announcements(&mut self, ctx: &mut xtra::Context<Self>) -> Result<()> {
let this = ctx.address().expect("self to be alive");
fn update_pending_announcements(&mut self, ctx: &mut xtra::Context<Self>) {
for event_id in self.pending_announcements.iter().cloned() {
let this = this.clone();
tokio::spawn(async move {
let this = ctx.address().expect("self to be alive");
tokio_ext::spawn_fallible(async move {
let url = event_id.to_olivia_url();
tracing::debug!("Fetching announcement for {}", event_id);
@ -143,8 +145,6 @@ where
Ok(())
});
}
Ok(())
}
}
@ -153,61 +153,65 @@ where
CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
async fn update_state(&mut self, ctx: &mut xtra::Context<Self>) -> Result<()> {
self.update_pending_announcements(ctx)
.await
.context("failed to update pending announcements")?;
self.update_pending_attestations()
.await
.context("failed to update pending attestations")?;
fn update_pending_attestations(&mut self, ctx: &mut xtra::Context<Self>) {
for event_id in self.pending_attestations.iter().copied() {
if !event_id.has_likely_occured() {
tracing::trace!(
"Skipping {} because it likely hasn't occurred yet",
event_id
);
continue;
}
Ok(())
}
let this = ctx.address().expect("self to be alive");
tokio_ext::spawn_fallible(async move {
let url = event_id.to_olivia_url();
async fn update_pending_attestations(&mut self) -> Result<()> {
let pending_attestations = self.pending_attestations.clone();
for event_id in pending_attestations.into_iter() {
{
let res = match reqwest::get(event_id.to_olivia_url()).await {
Ok(res) if res.status().is_success() => res,
Ok(res) if res.status() == StatusCode::NOT_FOUND => {
tracing::trace!("Attestation not ready yet");
continue;
}
Ok(res) => {
tracing::warn!("Unexpected response, status {}", res.status());
continue;
}
Err(e) => {
tracing::warn!(%event_id, "Failed to fetch attestation: {}", e);
continue;
}
};
let attestation = match res
tracing::debug!("Fetching attestation for {}", event_id);
let response = reqwest::get(url.clone())
.await
.with_context(|| format!("Failed to GET {}", url))?;
if !response.status().is_success() {
anyhow::bail!("GET {} responded with {}", url, response.status());
}
let attestation = response
.json::<Attestation>()
.await
.with_context(|| format!("Failed to decode body for event {}", event_id))
{
Ok(attestation) => attestation,
Err(e) => {
tracing::debug!("{:#}", e);
continue;
}
};
self.cfd_actor_address
.clone()
.do_send_async(attestation.clone())
.await?;
self.monitor_actor_address
.clone()
.do_send_async(attestation)
.await?;
self.pending_attestations.remove(&event_id);
}
.context("Failed to deserialize as Attestation")?;
this.send(NewAttestationFetched {
id: event_id,
attestation,
})
.await?;
Ok(())
});
}
}
async fn handle_new_attestation_fetched(
&mut self,
id: BitMexPriceEventId,
attestation: Attestation,
) -> Result<()> {
tracing::info!("Fetched new attestation for {}", id);
self.cfd_actor_address
.clone()
.do_send_async(attestation.clone())
.await?;
self.monitor_actor_address
.clone()
.do_send_async(attestation)
.await?;
self.pending_attestations.remove(&id);
Ok(())
}
@ -216,7 +220,7 @@ where
#[async_trait]
impl<CFD: 'static, M: 'static> xtra::Handler<MonitorAttestation> for Actor<CFD, M> {
async fn handle(&mut self, msg: MonitorAttestation, _ctx: &mut xtra::Context<Self>) {
if !self.pending_attestations.insert(msg.event_id.clone()) {
if !self.pending_attestations.insert(msg.event_id) {
tracing::trace!("Attestation {} already being monitored", msg.event_id);
}
}
@ -225,7 +229,7 @@ impl<CFD: 'static, M: 'static> xtra::Handler<MonitorAttestation> for Actor<CFD,
#[async_trait]
impl<CFD: 'static, M: 'static> xtra::Handler<FetchAnnouncement> for Actor<CFD, M> {
async fn handle(&mut self, msg: FetchAnnouncement, _ctx: &mut xtra::Context<Self>) {
if !self.pending_announcements.insert(msg.0.clone()) {
if !self.pending_announcements.insert(msg.0) {
tracing::trace!("Announcement {} already being fetched", msg.0);
}
}
@ -241,7 +245,7 @@ impl<CFD: 'static, M: 'static> xtra::Handler<GetAnnouncement> for Actor<CFD, M>
self.announcements
.get_key_value(&msg.0)
.map(|(id, (time, nonce_pks))| Announcement {
id: id.clone(),
id: *id,
expected_outcome_time: *time,
nonce_pks: nonce_pks.clone(),
})
@ -257,11 +261,22 @@ impl<CFD: 'static, M: 'static> xtra::Handler<NewAnnouncementFetched> for Actor<C
}
}
#[async_trait]
impl<CFD, M> xtra::Handler<NewAttestationFetched> for Actor<CFD, M>
where
CFD: xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
async fn handle(&mut self, msg: NewAttestationFetched, _ctx: &mut xtra::Context<Self>) {
log_error!(self.handle_new_attestation_fetched(msg.id, msg.attestation));
}
}
#[allow(dead_code)]
pub fn next_announcement_after(timestamp: OffsetDateTime) -> Result<OracleEventId> {
pub fn next_announcement_after(timestamp: OffsetDateTime) -> Result<BitMexPriceEventId> {
let adjusted = ceil_to_next_hour(timestamp)?;
Ok(event_id(adjusted))
Ok(BitMexPriceEventId::with_20_digits(adjusted))
}
fn ceil_to_next_hour(original: OffsetDateTime) -> Result<OffsetDateTime, anyhow::Error> {
@ -273,16 +288,6 @@ fn ceil_to_next_hour(original: OffsetDateTime) -> Result<OffsetDateTime, anyhow:
Ok(adjusted)
}
/// Construct the URL of `olivia`'s `BitMEX/BXBT` event to be attested
/// for at the time indicated by the argument `datetime`.
fn event_id(datetime: OffsetDateTime) -> OracleEventId {
let datetime = datetime
.format(&OLIVIA_EVENT_TIME_FORMAT)
.expect("valid formatter for datetime");
OracleEventId(format!("/x/BitMEX/BXBT/{}.price?n=20", datetime))
}
#[derive(Debug, Clone, serde::Deserialize, PartialEq)]
#[serde(try_from = "olivia_api::Response")]
pub struct Announcement {
@ -290,7 +295,7 @@ pub struct Announcement {
///
/// Doubles up as the path of the URL for this event i.e.
/// https://h00.ooo/{id}.
pub id: OracleEventId,
pub id: BitMexPriceEventId,
pub expected_outcome_time: OffsetDateTime,
pub nonce_pks: Vec<schnorrsig::PublicKey>,
}
@ -298,7 +303,7 @@ pub struct Announcement {
impl From<Announcement> for cfd_protocol::Announcement {
fn from(announcement: Announcement) -> Self {
cfd_protocol::Announcement {
id: announcement.id.0,
id: announcement.id.to_string(),
nonce_pks: announcement.nonce_pks,
}
}
@ -313,7 +318,8 @@ where
M: xtra::Handler<Attestation>,
{
async fn handle(&mut self, _: Sync, ctx: &mut xtra::Context<Self>) {
log_error!(self.update_state(ctx))
self.update_pending_announcements(ctx);
self.update_pending_attestations(ctx);
}
}
@ -340,8 +346,12 @@ impl xtra::Message for NewAnnouncementFetched {
type Result = ();
}
impl xtra::Message for NewAttestationFetched {
type Result = ();
}
mod olivia_api {
use crate::model::OracleEventId;
use crate::model::BitMexPriceEventId;
use anyhow::Context;
use cfd_protocol::secp256k1_zkp::{schnorrsig, SecretKey};
use std::convert::TryFrom;
@ -363,7 +373,7 @@ mod olivia_api {
serde_json::from_str::<AnnouncementData>(&response.announcement.oracle_event.data)?;
Ok(Self {
id: OracleEventId(data.id),
id: data.id,
expected_outcome_time: data.expected_outcome_time,
nonce_pks: data.schemes.olivia_v1.nonces,
})
@ -381,7 +391,7 @@ mod olivia_api {
let attestation = response.attestation.context("attestation missing")?;
Ok(Self {
id: OracleEventId(data.id),
id: data.id,
price: attestation.outcome.parse()?,
scalars: attestation.schemes.olivia_v1.scalars,
})
@ -402,7 +412,7 @@ mod olivia_api {
#[derive(Debug, Clone, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
struct AnnouncementData {
id: String,
id: BitMexPriceEventId,
#[serde(with = "timestamp")]
expected_outcome_time: OffsetDateTime,
schemes: Schemes,
@ -431,7 +441,7 @@ mod olivia_api {
}
mod timestamp {
use crate::oracle::OLIVIA_EVENT_TIME_FORMAT;
use crate::olivia;
use serde::de::Error as _;
use serde::{Deserialize, Deserializer};
use time::{OffsetDateTime, PrimitiveDateTime};
@ -441,7 +451,7 @@ mod olivia_api {
D: Deserializer<'a>,
{
let string = String::deserialize(deserializer)?;
let date_time = PrimitiveDateTime::parse(&string, &OLIVIA_EVENT_TIME_FORMAT)
let date_time = PrimitiveDateTime::parse(&string, &olivia::EVENT_TIME_FORMAT)
.map_err(D::Error::custom)?;
Ok(date_time.assume_utc())
@ -452,7 +462,7 @@ mod olivia_api {
mod tests {
use std::vec;
use crate::model::OracleEventId;
use crate::model::BitMexPriceEventId;
use crate::oracle;
use time::macros::datetime;
@ -462,7 +472,7 @@ mod olivia_api {
let deserialized = serde_json::from_str::<oracle::Announcement>(json).unwrap();
let expected = oracle::Announcement {
id: OracleEventId("/x/BitMEX/BXBT/2021-10-04T22:00:00.price?n=20".to_string()),
id: BitMexPriceEventId::with_20_digits(datetime!(2021-10-04 22:00:00).assume_utc()),
expected_outcome_time: datetime!(2021-10-04 22:00:00).assume_utc(),
nonce_pks: vec![
"8d72028eeaf4b85aec0f750f05a4a320cac193f5d8494bfe05cd4b29f3df4239"
@ -537,7 +547,7 @@ mod olivia_api {
let deserialized = serde_json::from_str::<oracle::Attestation>(json).unwrap();
let expected = oracle::Attestation {
id: OracleEventId("/x/BitMEX/BXBT/2021-10-04T22:00:00.price?n=20".to_string()),
id: BitMexPriceEventId::with_20_digits(datetime!(2021-10-04 22:00:00).assume_utc()),
price: 48935,
scalars: vec![
"1327b3bd0f1faf45d6fed6c96d0c158da22a2033a6fed98bed036df0a4eef484"
@ -613,19 +623,15 @@ mod tests {
use super::*;
use time::macros::datetime;
#[test]
fn next_event_id_is_correct() {
let event_id = event_id(datetime!(2021-09-23 10:00:00).assume_utc());
assert_eq!(event_id.0, "/x/BitMEX/BXBT/2021-09-23T10:00:00.price?n=20");
}
#[test]
fn next_event_id_after_timestamp() {
let event_id =
next_announcement_after(datetime!(2021-09-23 10:40:00).assume_utc()).unwrap();
assert_eq!(event_id.0, "/x/BitMEX/BXBT/2021-09-23T11:00:00.price?n=20");
assert_eq!(
event_id.to_string(),
"/x/BitMEX/BXBT/2021-09-23T11:00:00.price?n=20"
);
}
#[test]
@ -633,6 +639,9 @@ mod tests {
let event_id =
next_announcement_after(datetime!(2021-09-23 23:40:00).assume_utc()).unwrap();
assert_eq!(event_id.0, "/x/BitMEX/BXBT/2021-09-24T00:00:00.price?n=20");
assert_eq!(
event_id.to_string(),
"/x/BitMEX/BXBT/2021-09-24T00:00:00.price?n=20"
);
}
}

7
daemon/src/setup_contract.rs

@ -1,5 +1,4 @@
use crate::model::cfd::{Cet, Cfd, Dlc, RevokedCommit, Role};
use crate::model::OracleEventId;
use crate::wallet::Wallet;
use crate::wire::{
Msg0, Msg1, Msg2, RollOverMsg, RollOverMsg0, RollOverMsg1, RollOverMsg2, SetupMsg,
@ -213,7 +212,7 @@ pub async fn new(
})
})
.collect::<Result<Vec<_>>>()?;
Ok((OracleEventId(event_id), cets))
Ok((event_id.parse()?, cets))
})
.collect::<Result<HashMap<_, _>>>()?;
@ -272,7 +271,7 @@ pub async fn roll_over(
let payouts = HashMap::from_iter([(
// TODO : we want to support multiple announcements
Announcement {
id: announcement.id.0,
id: announcement.id.to_string(),
nonce_pks: announcement.nonce_pks.clone(),
},
payout_curve::calculate(cfd.order.price, cfd.quantity_usd, cfd.order.leverage)?,
@ -435,7 +434,7 @@ pub async fn roll_over(
})
})
.collect::<Result<Vec<_>>>()?;
Ok((OracleEventId(event_id), cets))
Ok((event_id.parse()?, cets))
})
.collect::<Result<HashMap<_, _>>>()?;

2
daemon/src/taker.rs

@ -32,6 +32,7 @@ mod keypair;
mod logger;
mod model;
mod monitor;
mod olivia;
mod oracle;
mod payout_curve;
mod routes;
@ -41,6 +42,7 @@ mod send_to_socket;
mod setup_contract;
mod taker_cfd;
mod to_sse_event;
mod tokio_ext;
mod wallet;
mod wallet_sync;
mod wire;

18
daemon/src/taker_cfd.rs

@ -8,7 +8,7 @@ use crate::model::cfd::{
Role, RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal,
UpdateCfdProposals,
};
use crate::model::{OracleEventId, Usd};
use crate::model::{BitMexPriceEventId, Usd};
use crate::monitor::{self, MonitorParams};
use crate::wallet::Wallet;
use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg};
@ -244,7 +244,7 @@ impl Actor {
order.origin = Origin::Theirs;
self.oracle_actor
.do_send_async(oracle::FetchAnnouncement(order.oracle_event_id.clone()))
.do_send_async(oracle::FetchAnnouncement(order.oracle_event_id))
.await?;
let mut conn = self.db.acquire().await?;
@ -289,13 +289,13 @@ impl Actor {
let offer_announcement = self
.oracle_actor
.send(oracle::GetAnnouncement(cfd.order.oracle_event_id.clone()))
.send(oracle::GetAnnouncement(cfd.order.oracle_event_id))
.await?
.with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?;
self.oracle_actor
.do_send_async(oracle::MonitorAttestation {
event_id: offer_announcement.id.clone(),
event_id: offer_announcement.id,
})
.await?;
@ -380,7 +380,7 @@ impl Actor {
async fn handle_roll_over_accepted(
&mut self,
order_id: OrderId,
oracle_event_id: OracleEventId,
oracle_event_id: BitMexPriceEventId,
ctx: &mut Context<Self>,
) -> Result<()> {
tracing::info!(%order_id, "Roll; over request got accepted");
@ -398,13 +398,13 @@ impl Actor {
let announcement = self
.oracle_actor
.send(oracle::GetAnnouncement(oracle_event_id.clone()))
.send(oracle::GetAnnouncement(oracle_event_id))
.await?
.with_context(|| format!("Announcement {} not found", oracle_event_id))?;
self.oracle_actor
.do_send_async(oracle::MonitorAttestation {
event_id: announcement.id.clone(),
event_id: announcement.id,
})
.await?;
@ -633,12 +633,12 @@ impl Actor {
);
let mut conn = self.db.acquire().await?;
let cfds = load_cfds_by_oracle_event_id(attestation.id.clone(), &mut conn).await?;
let cfds = load_cfds_by_oracle_event_id(attestation.id, &mut conn).await?;
for mut cfd in cfds {
if cfd
.handle(CfdStateChangeEvent::OracleAttestation(Attestation::new(
attestation.id.clone(),
attestation.id,
attestation.price,
attestation.scalars.clone(),
cfd.dlc()

14
daemon/src/tokio_ext.rs

@ -0,0 +1,14 @@
use std::fmt;
use std::future::Future;
pub fn spawn_fallible<F, E>(future: F)
where
F: Future<Output = Result<(), E>> + Send + 'static,
E: fmt::Display,
{
tokio::spawn(async move {
if let Err(e) = future.await {
tracing::warn!("Task failed: {:#}", e);
}
});
}

4
daemon/src/wallet.rs

@ -57,7 +57,9 @@ impl Wallet {
pub async fn sync(&self) -> Result<WalletInfo> {
let wallet = self.wallet.lock().await;
wallet.sync(NoopProgress, None)?;
wallet
.sync(NoopProgress, None)
.context("Failed to sync wallet")?;
let balance = wallet.get_balance()?;

4
daemon/src/wire.rs

@ -1,5 +1,5 @@
use crate::model::cfd::OrderId;
use crate::model::{OracleEventId, Usd};
use crate::model::{BitMexPriceEventId, Usd};
use crate::Order;
use anyhow::{bail, Result};
use bdk::bitcoin::secp256k1::Signature;
@ -72,7 +72,7 @@ pub enum MakerToTaker {
RollOverProtocol(RollOverMsg),
ConfirmRollOver {
order_id: OrderId,
oracle_event_id: OracleEventId,
oracle_event_id: BitMexPriceEventId,
},
RejectRollOver(OrderId),
}

Loading…
Cancel
Save