Browse Source

Properly model event IDs

refactor/no-log-handler
Thomas Eizinger 3 years ago
parent
commit
fbcb629a8e
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 39
      daemon/src/db.rs
  2. 1
      daemon/src/maker.rs
  3. 14
      daemon/src/maker_cfd.rs
  4. 84
      daemon/src/model.rs
  5. 5
      daemon/src/olivia.rs
  6. 56
      daemon/src/oracle.rs
  7. 7
      daemon/src/setup_contract.rs
  8. 1
      daemon/src/taker.rs
  9. 14
      daemon/src/taker_cfd.rs

39
daemon/src/db.rs

@ -24,7 +24,7 @@ pub async fn insert_order(order: &Order, conn: &mut PoolConnection<Sqlite>) -> a
let creation_timestamp = serde_json::to_string(&order.creation_timestamp).unwrap();
let term = serde_json::to_string(&order.term).unwrap();
let origin = serde_json::to_string(&order.origin).unwrap();
let oracle_event_id = order.oracle_event_id.0.clone();
let oracle_event_id = order.oracle_event_id.to_string();
sqlx::query!(
r#"
@ -88,7 +88,6 @@ pub async fn load_order_by_id(
let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap();
let term = serde_json::from_str(row.term.as_str()).unwrap();
let origin = serde_json::from_str(row.origin.as_str()).unwrap();
let oracle_event_id = BitMexPriceEventId(row.oracle_event_id);
Ok(Order {
id: uuid,
@ -102,7 +101,7 @@ pub async fn load_order_by_id(
creation_timestamp,
term,
origin,
oracle_event_id,
oracle_event_id: row.oracle_event_id.parse().unwrap(),
})
}
@ -299,7 +298,7 @@ pub async fn load_cfd_by_order_id(
let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap();
let term = serde_json::from_str(row.term.as_str()).unwrap();
let origin: Origin = serde_json::from_str(row.origin.as_str()).unwrap();
let oracle_event_id = BitMexPriceEventId(row.oracle_event_id.clone());
let oracle_event_id = row.oracle_event_id.parse().unwrap();
let quantity = serde_json::from_str(row.quantity_usd.as_str()).unwrap();
let latest_state = serde_json::from_str(row.state.as_str()).unwrap();
@ -377,7 +376,7 @@ pub async fn load_all_cfds(conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<
let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap();
let term = serde_json::from_str(row.term.as_str()).unwrap();
let origin: Origin = serde_json::from_str(row.origin.as_str()).unwrap();
let oracle_event_id = BitMexPriceEventId(row.oracle_event_id.clone());
let oracle_event_id = row.oracle_event_id.clone().parse().unwrap();
let quantity = serde_json::from_str(row.quantity_usd.as_str()).unwrap();
let latest_state = serde_json::from_str(row.state.as_str()).unwrap();
@ -413,6 +412,8 @@ pub async fn load_cfds_by_oracle_event_id(
oracle_event_id: BitMexPriceEventId,
conn: &mut PoolConnection<Sqlite>,
) -> anyhow::Result<Vec<Cfd>> {
let oracle_event_id = oracle_event_id.to_string();
let rows = sqlx::query!(
r#"
select
@ -443,7 +444,7 @@ pub async fn load_cfds_by_oracle_event_id(
)
and orders.oracle_event_id = ?
"#,
oracle_event_id.0
oracle_event_id
)
.fetch_all(conn)
.await?;
@ -462,7 +463,7 @@ pub async fn load_cfds_by_oracle_event_id(
let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap();
let term = serde_json::from_str(row.term.as_str()).unwrap();
let origin: Origin = serde_json::from_str(row.origin.as_str()).unwrap();
let oracle_event_id = BitMexPriceEventId(row.oracle_event_id.clone());
let oracle_event_id = row.oracle_event_id.parse().unwrap();
let quantity = serde_json::from_str(row.quantity_usd.as_str()).unwrap();
let latest_state = serde_json::from_str(row.state.as_str()).unwrap();
@ -501,6 +502,8 @@ mod tests {
use rust_decimal_macros::dec;
use sqlx::SqlitePool;
use tempfile::tempdir;
use time::macros::datetime;
use time::OffsetDateTime;
use crate::db::insert_order;
use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Order};
@ -580,22 +583,24 @@ mod tests {
let pool = setup_test_db().await;
let mut conn = pool.acquire().await.unwrap();
let oracle_event_id_1 = BitMexPriceEventId("dummy_1".to_string());
let oracle_event_id_2 = BitMexPriceEventId("dummy_2".to_string());
let oracle_event_id_1 =
BitMexPriceEventId::with_20_digits(datetime!(2021-10-13 10:00:00).assume_utc());
let oracle_event_id_2 =
BitMexPriceEventId::with_20_digits(datetime!(2021-10-25 18:00:00).assume_utc());
let cfd_1 = Cfd::default()
.with_order(Order::default().with_oracle_event_id(oracle_event_id_1.clone()));
let cfd_1 =
Cfd::default().with_order(Order::default().with_oracle_event_id(oracle_event_id_1));
insert_order(&cfd_1.order, &mut conn).await.unwrap();
insert_cfd(cfd_1.clone(), &mut conn).await.unwrap();
let cfd_from_db = load_cfds_by_oracle_event_id(oracle_event_id_1.clone(), &mut conn)
let cfd_from_db = load_cfds_by_oracle_event_id(oracle_event_id_1, &mut conn)
.await
.unwrap();
assert_eq!(vec![cfd_1.clone()], cfd_from_db);
let cfd_2 = Cfd::default()
.with_order(Order::default().with_oracle_event_id(oracle_event_id_1.clone()));
let cfd_2 =
Cfd::default().with_order(Order::default().with_oracle_event_id(oracle_event_id_1));
insert_order(&cfd_2.order, &mut conn).await.unwrap();
insert_cfd(cfd_2.clone(), &mut conn).await.unwrap();
@ -605,8 +610,8 @@ mod tests {
.unwrap();
assert_eq!(vec![cfd_1, cfd_2], cfd_from_db);
let cfd_3 = Cfd::default()
.with_order(Order::default().with_oracle_event_id(oracle_event_id_2.clone()));
let cfd_3 =
Cfd::default().with_order(Order::default().with_oracle_event_id(oracle_event_id_2));
insert_order(&cfd_3.order, &mut conn).await.unwrap();
insert_cfd(cfd_3.clone(), &mut conn).await.unwrap();
@ -686,7 +691,7 @@ mod tests {
Usd(dec!(100)),
Usd(dec!(1000)),
Origin::Theirs,
BitMexPriceEventId("Dummy".to_string()),
BitMexPriceEventId::with_20_digits(OffsetDateTime::now_utc()),
)
.unwrap()
}

1
daemon/src/maker.rs

@ -33,6 +33,7 @@ mod maker_cfd;
mod maker_inc_connections;
mod model;
mod monitor;
mod olivia;
mod oracle;
mod payout_curve;
mod routes;

14
daemon/src/maker_cfd.rs

@ -200,7 +200,7 @@ impl Actor {
oracle::next_announcement_after(time::OffsetDateTime::now_utc() + Order::TERM)?;
self.oracle_actor
.do_send_async(oracle::FetchAnnouncement(oracle_event_id.clone()))
.do_send_async(oracle::FetchAnnouncement(oracle_event_id))
.await?;
let order = Order::new(
@ -586,13 +586,13 @@ impl Actor {
let offer_announcement = self
.oracle_actor
.send(oracle::GetAnnouncement(cfd.order.oracle_event_id.clone()))
.send(oracle::GetAnnouncement(cfd.order.oracle_event_id))
.await?
.with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?;
self.oracle_actor
.do_send_async(oracle::MonitorAttestation {
event_id: offer_announcement.id.clone(),
event_id: offer_announcement.id,
})
.await?;
@ -767,7 +767,7 @@ impl Actor {
oracle::next_announcement_after(time::OffsetDateTime::now_utc() + Order::TERM)?;
let announcement = self
.oracle_actor
.send(oracle::GetAnnouncement(oracle_event_id.clone()))
.send(oracle::GetAnnouncement(oracle_event_id))
.await?
.with_context(|| format!("Announcement {} not found", oracle_event_id))?;
@ -783,7 +783,7 @@ impl Actor {
self.oracle_actor
.do_send_async(oracle::MonitorAttestation {
event_id: announcement.id.clone(),
event_id: announcement.id,
})
.await?;
@ -892,12 +892,12 @@ impl Actor {
);
let mut conn = self.db.acquire().await?;
let cfds = load_cfds_by_oracle_event_id(attestation.id.clone(), &mut conn).await?;
let cfds = load_cfds_by_oracle_event_id(attestation.id, &mut conn).await?;
for mut cfd in cfds {
if cfd
.handle(CfdStateChangeEvent::OracleAttestation(Attestation::new(
attestation.id.clone(),
attestation.id,
attestation.price,
attestation.scalars.clone(),
cfd.dlc()

84
daemon/src/model.rs

@ -1,11 +1,14 @@
use crate::olivia;
use anyhow::{Context, Result};
use bdk::bitcoin::{Address, Amount};
use reqwest::Url;
use rust_decimal::prelude::ToPrimitive;
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use std::fmt;
use serde_with::{DeserializeFromStr, SerializeDisplay};
use std::time::SystemTime;
use std::{fmt, str};
use time::{OffsetDateTime, PrimitiveDateTime, Time};
use uuid::Uuid;
pub mod cfd;
@ -105,32 +108,81 @@ pub struct WalletInfo {
pub last_updated_at: SystemTime,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct BitMexPriceEventId(pub String);
#[derive(
Debug, Clone, Copy, SerializeDisplay, DeserializeFromStr, PartialEq, Eq, Hash, PartialOrd, Ord,
)]
pub struct BitMexPriceEventId {
/// The timestamp this price event refers to.
timestamp: OffsetDateTime,
digits: usize,
}
impl BitMexPriceEventId {
pub fn to_olivia_url(&self) -> Url {
pub fn new(timestamp: OffsetDateTime, digits: usize) -> Self {
let (hours, minutes, seconds) = timestamp.time().as_hms();
let time_without_nanos =
Time::from_hms(hours, minutes, seconds).expect("original timestamp was valid");
let timestamp_without_nanos = timestamp.replace_time(time_without_nanos);
Self {
timestamp: timestamp_without_nanos,
digits,
}
}
pub fn with_20_digits(timestamp: OffsetDateTime) -> Self {
Self::new(timestamp, 20)
}
pub fn to_olivia_url(self) -> Url {
"https://h00.ooo"
.parse::<Url>()
.expect("valid URL from constant")
.join(self.0.as_str())
.join(&self.to_string())
.expect("Event id can be joined")
}
}
impl fmt::Display for BitMexPriceEventId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
write!(
f,
"/x/BitMEX/BXBT/{}.price?n={}",
self.timestamp
.format(&olivia::EVENT_TIME_FORMAT)
.expect("should always format and we can't return an error here"),
self.digits
)
}
}
impl str::FromStr for BitMexPriceEventId {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let remaining = s.trim_start_matches("/x/BitMEX/BXBT/");
let (timestamp, rest) = remaining.split_at(19);
let digits = rest.trim_start_matches(".price?n=");
Ok(Self {
timestamp: PrimitiveDateTime::parse(timestamp, &olivia::EVENT_TIME_FORMAT)
.with_context(|| format!("Failed to parse {} as timestamp", timestamp))?
.assume_utc(),
digits: digits.parse()?,
})
}
}
#[cfg(test)]
mod tests {
use time::macros::datetime;
use super::*;
#[test]
fn to_olivia_url() {
let url = BitMexPriceEventId("/x/BitMEX/BXBT/2021-09-23T10:00:00.price?n=20".to_string())
let url = BitMexPriceEventId::with_20_digits(datetime!(2021-09-23 10:00:00).assume_utc())
.to_olivia_url();
assert_eq!(
@ -140,4 +192,22 @@ mod tests {
.unwrap()
);
}
#[test]
fn parse_event_id() {
let parsed = "/x/BitMEX/BXBT/2021-09-23T10:00:00.price?n=20"
.parse::<BitMexPriceEventId>()
.unwrap();
let expected =
BitMexPriceEventId::with_20_digits(datetime!(2021-09-23 10:00:00).assume_utc());
assert_eq!(parsed, expected);
}
#[test]
fn new_event_has_no_nanos() {
let now = BitMexPriceEventId::with_20_digits(OffsetDateTime::now_utc());
assert_eq!(now.timestamp.nanosecond(), 0);
}
}

5
daemon/src/olivia.rs

@ -0,0 +1,5 @@
use time::format_description::FormatItem;
use time::macros::format_description;
pub const EVENT_TIME_FORMAT: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]");

56
daemon/src/oracle.rs

@ -5,17 +5,12 @@ use anyhow::{Context, Result};
use async_trait::async_trait;
use cfd_protocol::secp256k1_zkp::{schnorrsig, SecretKey};
use reqwest::StatusCode;
use rocket::time::format_description::FormatItem;
use rocket::time::macros::format_description;
use rocket::time::{OffsetDateTime, Time};
use serde::Deserialize;
use std::collections::{HashMap, HashSet};
use std::ops::Add;
use time::ext::NumericalDuration;
const OLIVIA_EVENT_TIME_FORMAT: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]:[minute]:[second]");
pub struct Actor<CFD, M> {
announcements: HashMap<BitMexPriceEventId, (OffsetDateTime, Vec<schnorrsig::PublicKey>)>,
pending_announcements: HashSet<BitMexPriceEventId>,
@ -213,7 +208,7 @@ where
#[async_trait]
impl<CFD: 'static, M: 'static> xtra::Handler<MonitorAttestation> for Actor<CFD, M> {
async fn handle(&mut self, msg: MonitorAttestation, _ctx: &mut xtra::Context<Self>) {
if !self.pending_attestations.insert(msg.event_id.clone()) {
if !self.pending_attestations.insert(msg.event_id) {
tracing::trace!("Attestation {} already being monitored", msg.event_id);
}
}
@ -222,7 +217,7 @@ impl<CFD: 'static, M: 'static> xtra::Handler<MonitorAttestation> for Actor<CFD,
#[async_trait]
impl<CFD: 'static, M: 'static> xtra::Handler<FetchAnnouncement> for Actor<CFD, M> {
async fn handle(&mut self, msg: FetchAnnouncement, _ctx: &mut xtra::Context<Self>) {
if !self.pending_announcements.insert(msg.0.clone()) {
if !self.pending_announcements.insert(msg.0) {
tracing::trace!("Announcement {} already being fetched", msg.0);
}
}
@ -238,7 +233,7 @@ impl<CFD: 'static, M: 'static> xtra::Handler<GetAnnouncement> for Actor<CFD, M>
self.announcements
.get_key_value(&msg.0)
.map(|(id, (time, nonce_pks))| Announcement {
id: id.clone(),
id: *id,
expected_outcome_time: *time,
nonce_pks: nonce_pks.clone(),
})
@ -258,7 +253,7 @@ impl<CFD: 'static, M: 'static> xtra::Handler<NewAnnouncementFetched> for Actor<C
pub fn next_announcement_after(timestamp: OffsetDateTime) -> Result<BitMexPriceEventId> {
let adjusted = ceil_to_next_hour(timestamp)?;
Ok(event_id(adjusted))
Ok(BitMexPriceEventId::with_20_digits(adjusted))
}
fn ceil_to_next_hour(original: OffsetDateTime) -> Result<OffsetDateTime, anyhow::Error> {
@ -270,16 +265,6 @@ fn ceil_to_next_hour(original: OffsetDateTime) -> Result<OffsetDateTime, anyhow:
Ok(adjusted)
}
/// Construct the URL of `olivia`'s `BitMEX/BXBT` event to be attested
/// for at the time indicated by the argument `datetime`.
fn event_id(datetime: OffsetDateTime) -> BitMexPriceEventId {
let datetime = datetime
.format(&OLIVIA_EVENT_TIME_FORMAT)
.expect("valid formatter for datetime");
BitMexPriceEventId(format!("/x/BitMEX/BXBT/{}.price?n=20", datetime))
}
#[derive(Debug, Clone, serde::Deserialize, PartialEq)]
#[serde(try_from = "olivia_api::Response")]
pub struct Announcement {
@ -295,7 +280,7 @@ pub struct Announcement {
impl From<Announcement> for cfd_protocol::Announcement {
fn from(announcement: Announcement) -> Self {
cfd_protocol::Announcement {
id: announcement.id.0,
id: announcement.id.to_string(),
nonce_pks: announcement.nonce_pks,
}
}
@ -360,7 +345,7 @@ mod olivia_api {
serde_json::from_str::<AnnouncementData>(&response.announcement.oracle_event.data)?;
Ok(Self {
id: BitMexPriceEventId(data.id),
id: data.id,
expected_outcome_time: data.expected_outcome_time,
nonce_pks: data.schemes.olivia_v1.nonces,
})
@ -378,7 +363,7 @@ mod olivia_api {
let attestation = response.attestation.context("attestation missing")?;
Ok(Self {
id: BitMexPriceEventId(data.id),
id: data.id,
price: attestation.outcome.parse()?,
scalars: attestation.schemes.olivia_v1.scalars,
})
@ -399,7 +384,7 @@ mod olivia_api {
#[derive(Debug, Clone, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
struct AnnouncementData {
id: String,
id: BitMexPriceEventId,
#[serde(with = "timestamp")]
expected_outcome_time: OffsetDateTime,
schemes: Schemes,
@ -428,7 +413,7 @@ mod olivia_api {
}
mod timestamp {
use crate::oracle::OLIVIA_EVENT_TIME_FORMAT;
use crate::olivia;
use serde::de::Error as _;
use serde::{Deserialize, Deserializer};
use time::{OffsetDateTime, PrimitiveDateTime};
@ -438,7 +423,7 @@ mod olivia_api {
D: Deserializer<'a>,
{
let string = String::deserialize(deserializer)?;
let date_time = PrimitiveDateTime::parse(&string, &OLIVIA_EVENT_TIME_FORMAT)
let date_time = PrimitiveDateTime::parse(&string, &olivia::EVENT_TIME_FORMAT)
.map_err(D::Error::custom)?;
Ok(date_time.assume_utc())
@ -459,7 +444,7 @@ mod olivia_api {
let deserialized = serde_json::from_str::<oracle::Announcement>(json).unwrap();
let expected = oracle::Announcement {
id: BitMexPriceEventId("/x/BitMEX/BXBT/2021-10-04T22:00:00.price?n=20".to_string()),
id: BitMexPriceEventId::with_20_digits(datetime!(2021-10-04 22:00:00).assume_utc()),
expected_outcome_time: datetime!(2021-10-04 22:00:00).assume_utc(),
nonce_pks: vec![
"8d72028eeaf4b85aec0f750f05a4a320cac193f5d8494bfe05cd4b29f3df4239"
@ -534,7 +519,7 @@ mod olivia_api {
let deserialized = serde_json::from_str::<oracle::Attestation>(json).unwrap();
let expected = oracle::Attestation {
id: BitMexPriceEventId("/x/BitMEX/BXBT/2021-10-04T22:00:00.price?n=20".to_string()),
id: BitMexPriceEventId::with_20_digits(datetime!(2021-10-04 22:00:00).assume_utc()),
price: 48935,
scalars: vec![
"1327b3bd0f1faf45d6fed6c96d0c158da22a2033a6fed98bed036df0a4eef484"
@ -610,19 +595,15 @@ mod tests {
use super::*;
use time::macros::datetime;
#[test]
fn next_event_id_is_correct() {
let event_id = event_id(datetime!(2021-09-23 10:00:00).assume_utc());
assert_eq!(event_id.0, "/x/BitMEX/BXBT/2021-09-23T10:00:00.price?n=20");
}
#[test]
fn next_event_id_after_timestamp() {
let event_id =
next_announcement_after(datetime!(2021-09-23 10:40:00).assume_utc()).unwrap();
assert_eq!(event_id.0, "/x/BitMEX/BXBT/2021-09-23T11:00:00.price?n=20");
assert_eq!(
event_id.to_string(),
"/x/BitMEX/BXBT/2021-09-23T11:00:00.price?n=20"
);
}
#[test]
@ -630,6 +611,9 @@ mod tests {
let event_id =
next_announcement_after(datetime!(2021-09-23 23:40:00).assume_utc()).unwrap();
assert_eq!(event_id.0, "/x/BitMEX/BXBT/2021-09-24T00:00:00.price?n=20");
assert_eq!(
event_id.to_string(),
"/x/BitMEX/BXBT/2021-09-24T00:00:00.price?n=20"
);
}
}

7
daemon/src/setup_contract.rs

@ -1,5 +1,4 @@
use crate::model::cfd::{Cet, Cfd, Dlc, RevokedCommit, Role};
use crate::model::BitMexPriceEventId;
use crate::wallet::Wallet;
use crate::wire::{
Msg0, Msg1, Msg2, RollOverMsg, RollOverMsg0, RollOverMsg1, RollOverMsg2, SetupMsg,
@ -213,7 +212,7 @@ pub async fn new(
})
})
.collect::<Result<Vec<_>>>()?;
Ok((BitMexPriceEventId(event_id), cets))
Ok((event_id.parse()?, cets))
})
.collect::<Result<HashMap<_, _>>>()?;
@ -272,7 +271,7 @@ pub async fn roll_over(
let payouts = HashMap::from_iter([(
// TODO : we want to support multiple announcements
Announcement {
id: announcement.id.0,
id: announcement.id.to_string(),
nonce_pks: announcement.nonce_pks.clone(),
},
payout_curve::calculate(cfd.order.price, cfd.quantity_usd, cfd.order.leverage)?,
@ -435,7 +434,7 @@ pub async fn roll_over(
})
})
.collect::<Result<Vec<_>>>()?;
Ok((BitMexPriceEventId(event_id), cets))
Ok((event_id.parse()?, cets))
})
.collect::<Result<HashMap<_, _>>>()?;

1
daemon/src/taker.rs

@ -32,6 +32,7 @@ mod keypair;
mod logger;
mod model;
mod monitor;
mod olivia;
mod oracle;
mod payout_curve;
mod routes;

14
daemon/src/taker_cfd.rs

@ -244,7 +244,7 @@ impl Actor {
order.origin = Origin::Theirs;
self.oracle_actor
.do_send_async(oracle::FetchAnnouncement(order.oracle_event_id.clone()))
.do_send_async(oracle::FetchAnnouncement(order.oracle_event_id))
.await?;
let mut conn = self.db.acquire().await?;
@ -289,13 +289,13 @@ impl Actor {
let offer_announcement = self
.oracle_actor
.send(oracle::GetAnnouncement(cfd.order.oracle_event_id.clone()))
.send(oracle::GetAnnouncement(cfd.order.oracle_event_id))
.await?
.with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?;
self.oracle_actor
.do_send_async(oracle::MonitorAttestation {
event_id: offer_announcement.id.clone(),
event_id: offer_announcement.id,
})
.await?;
@ -398,13 +398,13 @@ impl Actor {
let announcement = self
.oracle_actor
.send(oracle::GetAnnouncement(oracle_event_id.clone()))
.send(oracle::GetAnnouncement(oracle_event_id))
.await?
.with_context(|| format!("Announcement {} not found", oracle_event_id))?;
self.oracle_actor
.do_send_async(oracle::MonitorAttestation {
event_id: announcement.id.clone(),
event_id: announcement.id,
})
.await?;
@ -633,12 +633,12 @@ impl Actor {
);
let mut conn = self.db.acquire().await?;
let cfds = load_cfds_by_oracle_event_id(attestation.id.clone(), &mut conn).await?;
let cfds = load_cfds_by_oracle_event_id(attestation.id, &mut conn).await?;
for mut cfd in cfds {
if cfd
.handle(CfdStateChangeEvent::OracleAttestation(Attestation::new(
attestation.id.clone(),
attestation.id,
attestation.price,
attestation.scalars.clone(),
cfd.dlc()

Loading…
Cancel
Save