From 23d7c6110f81f19e777ff612e250ed40bb5f5e8e Mon Sep 17 00:00:00 2001 From: Lucas Soriano del Pino Date: Thu, 30 Sep 2021 18:20:23 +1000 Subject: [PATCH] Watch relevant olivia announcements and attestations --- Cargo.lock | 76 ++++++++ daemon/Cargo.toml | 3 + daemon/src/maker.rs | 17 +- daemon/src/maker_cfd.rs | 56 +++++- daemon/src/model/cfd.rs | 3 + daemon/src/monitor.rs | 46 ++++- daemon/src/oracle.rs | 366 +++++++++++++++++++++++++++++++++++ daemon/src/setup_contract.rs | 4 +- daemon/src/taker.rs | 16 +- daemon/src/taker_cfd.rs | 56 +++++- 10 files changed, 634 insertions(+), 9 deletions(-) create mode 100644 daemon/src/oracle.rs diff --git a/Cargo.lock b/Cargo.lock index 3760e5c..ec8d4b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -479,6 +479,7 @@ dependencies = [ "hex", "hkdf", "rand 0.6.5", + "reqwest", "rocket", "rocket-basicauth", "rocket_db_pools", @@ -493,6 +494,7 @@ dependencies = [ "sqlx", "tempfile", "thiserror", + "time", "tokio", "tokio-tungstenite", "tokio-util", @@ -1044,6 +1046,12 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "ipnet" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f2d64f2edebec4ce84ad108148e67e1064789bee435edc5b60ad398714a3a9" + [[package]] name = "itertools" version = "0.10.1" @@ -1753,6 +1761,38 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "reqwest" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "246e9f61b9bb77df069a947682be06e31ac43ea37862e244a69f177694ea6d22" +dependencies = [ + "base64 0.13.0", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "http", + "http-body", + "hyper", + "ipnet", + "js-sys", + "lazy_static", + "log", + "mime", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + [[package]] name = "ring" version = "0.16.20" @@ -2143,6 +2183,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edfa57a7f8d9c1d260a549e7224100f6c43d43f9103e06dd8b4095a9b2b43ce9" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_with" version = "1.10.0" @@ -2496,6 +2548,7 @@ checksum = "cde1cf55178e0293453ba2cca0d5f8392a922e52aa958aee9c28ed02becc6d03" dependencies = [ "itoa", "libc", + "serde", "time-macros", ] @@ -2892,6 +2945,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce" dependencies = [ "cfg-if 1.0.0", + "serde", + "serde_json", "wasm-bindgen-macro", ] @@ -2910,6 +2965,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e8d7523cb1f2a4c96c1317ca690031b714a51cc14e05f712446691f413f5d39" +dependencies = [ + "cfg-if 1.0.0", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.78" @@ -3030,6 +3097,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "winreg" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "ws2_32-sys" version = "0.2.1" diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml index fa2bb47..4758bc6 100644 --- a/daemon/Cargo.toml +++ b/daemon/Cargo.toml @@ -15,6 +15,7 @@ futures = { version = "0.3", default-features = false } hex = "0.4" hkdf = "0.11" rand = "0.6" +reqwest = { version = "0.11", default-features = false, features = ["json"] } rocket = { version = "0.5.0-rc.1", features = ["json"] } rocket-basicauth = { version = "2", default-features = false } rocket_db_pools = { git = "https://github.com/SergioBenitez/Rocket", features = ["sqlx_sqlite"] } @@ -28,6 +29,7 @@ serde_with = { version = "1", features = ["macros"] } sha2 = "0.9" sqlx = { version = "0.5", features = ["offline"] } thiserror = "1" +time = { version = "0.3", features = ["serde"] } tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "net"] } tokio-tungstenite = { version = "0.15", features = ["rustls-tls"] } tokio-util = { version = "0.6", features = ["codec"] } @@ -46,3 +48,4 @@ path = "src/maker.rs" [dev-dependencies] tempfile = "3" +time = { version = "0.3", features = ["std"] } diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index fbb0eea..9fedf66 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -12,6 +12,7 @@ use rocket::fairing::AdHoc; use rocket_db_pools::Database; use std::path::PathBuf; use std::task::Poll; +use std::time::Duration; use tokio::sync::watch; use tracing_subscriber::filter::LevelFilter; use xtra::prelude::*; @@ -28,6 +29,7 @@ mod maker_cfd; mod maker_inc_connections; mod model; mod monitor; +mod oracle; mod routes; mod routes_maker; mod seed; @@ -158,6 +160,8 @@ async fn main() -> Result<()> { let (monitor_actor_address, monitor_actor_context) = xtra::Context::new(None); + let (oracle_actor_address, mut oracle_actor_context) = xtra::Context::new(None); + let mut conn = db.acquire().await.unwrap(); let cfds = load_all_cfds(&mut conn).await.unwrap(); let cfd_maker_actor_inbox = maker_cfd::Actor::new( @@ -167,8 +171,9 @@ async fn main() -> Result<()> { cfd_feed_sender, order_feed_sender, maker_inc_connections_address.clone(), - monitor_actor_address, + monitor_actor_address.clone(), cfds.clone(), + oracle_actor_address, ) .await .unwrap() @@ -184,6 +189,16 @@ async fn main() -> Result<()> { monitor::Actor::new(&opts.electrum, cfd_maker_actor_inbox.clone(), cfds).await, )); + tokio::spawn( + oracle_actor_context + .notify_interval(Duration::from_secs(60), || oracle::Sync) + .unwrap(), + ); + tokio::spawn(oracle_actor_context.run(oracle::Actor::new( + cfd_maker_actor_inbox.clone(), + monitor_actor_address, + ))); + let listener_stream = futures::stream::poll_fn(move |ctx| { let message = match futures::ready!(listener.poll_accept(ctx)) { Ok((stream, address)) => { diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 2b285a1..62656ed 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -8,7 +8,7 @@ use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, use crate::model::{TakerId, Usd}; use crate::monitor::MonitorParams; use crate::wallet::Wallet; -use crate::{maker_inc_connections, monitor, setup_contract, wire}; +use crate::{maker_inc_connections, monitor, oracle, setup_contract, wire}; use anyhow::{Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; @@ -53,6 +53,8 @@ pub struct Actor { current_order_id: Option, monitor_actor: Address>, setup_state: SetupState, + latest_announcement: Option, + oracle_actor: Address>>, } enum SetupState { @@ -74,6 +76,7 @@ impl Actor { takers: Address, monitor_actor: Address>, cfds: Vec, + oracle_actor: Address>>, ) -> Result { // populate the CFD feed with existing CFDs cfd_feed_actor_inbox.send(cfds.clone())?; @@ -101,6 +104,8 @@ impl Actor { current_order_id: None, monitor_actor, setup_state: SetupState::None, + latest_announcement: None, + oracle_actor, }) } @@ -311,6 +316,18 @@ impl Actor { self.cfd_feed_actor_inbox .send(load_all_cfds(&mut conn).await?)?; + let latest_announcement = self + .latest_announcement + .to_owned() + .context("Unaware of oracle's latest announcement.")?; + let nonce_pks = latest_announcement.nonce_pks; + + self.oracle_actor + .do_send_async(oracle::MonitorEvent { + event_id: latest_announcement.id, + }) + .await?; + let contract_future = setup_contract::new( self.takers.clone().into_sink().with(move |msg| { future::ok(maker_inc_connections::TakerMessage { @@ -319,7 +336,7 @@ impl Actor { }) }), receiver, - self.oracle_pk, + (self.oracle_pk, nonce_pks), cfd, self.wallet.clone(), setup_contract::Role::Maker, @@ -417,6 +434,27 @@ impl Actor { Ok(()) } + + async fn handle_oracle_announcements( + &mut self, + announcements: oracle::Announcements, + ) -> Result<()> { + tracing::debug!("Updating latest oracle announcements"); + self.latest_announcement = Some(announcements.0.last().unwrap().clone()); + + Ok(()) + } + + async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { + tracing::debug!( + "Learnt latest oracle attestation for event: {}", + attestation.id + ); + + todo!( + "Update all CFDs which care about this particular attestation, based on the event ID" + ); + } } #[async_trait] @@ -493,6 +531,20 @@ impl Handler for Actor { } } +#[async_trait] +impl Handler for Actor { + async fn handle(&mut self, msg: oracle::Announcements, _ctx: &mut Context) { + log_error!(self.handle_oracle_announcements(msg)) + } +} + +#[async_trait] +impl Handler for Actor { + async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context) { + log_error!(self.handle_oracle_attestation(msg)) + } +} + impl Message for NewOrder { type Result = (); } diff --git a/daemon/src/model/cfd.rs b/daemon/src/model/cfd.rs index 71264f5..c404ccb 100644 --- a/daemon/src/model/cfd.rs +++ b/daemon/src/model/cfd.rs @@ -569,6 +569,9 @@ impl Cfd { }, } } + monitor::Event::CetFinality(_) => { + todo!("Implement state transition") + } }, }; diff --git a/daemon/src/monitor.rs b/daemon/src/monitor.rs index 1057d49..9ab6913 100644 --- a/daemon/src/monitor.rs +++ b/daemon/src/monitor.rs @@ -1,6 +1,7 @@ use crate::actors::log_error; use crate::model::cfd::{CetStatus, Cfd, CfdState, Dlc, OrderId}; use crate::monitor::subscription::Subscription; +use crate::oracle; use anyhow::Result; use async_trait::async_trait; use bdk::bitcoin::{PublicKey, Script, Txid}; @@ -265,6 +266,38 @@ where } }); } + + pub async fn handle_oracle_attestation(&self, attestation: oracle::Attestation) -> Result<()> { + for (order_id, MonitorParams { cets, .. }) in self.cfds.clone().into_iter() { + let (txid, script_pubkey) = + match cets.iter().find_map(|(txid, script_pubkey, range)| { + range + .contains(&attestation.price) + .then(|| (txid, script_pubkey)) + }) { + Some(cet) => cet, + None => continue, + }; + + tokio::spawn({ + let cfd_actor_addr = self.cfd_actor_addr.clone(); + let cet_subscription = self + .monitor + .subscribe_to((*txid, script_pubkey.clone())) + .await; + async move { + cet_subscription.wait_until_final().await.unwrap(); + + cfd_actor_addr + .do_send_async(Event::CetFinality(order_id)) + .await + .unwrap(); + } + }); + } + + Ok(()) + } } pub struct StartMonitoring { @@ -281,6 +314,7 @@ pub enum Event { LockFinality(OrderId), CommitFinality(OrderId), CetTimelockExpired(OrderId), + CetFinality(OrderId), RefundTimelockExpired(OrderId), RefundFinality(OrderId), } @@ -293,6 +327,7 @@ impl Event { Event::CetTimelockExpired(order_id) => order_id, Event::RefundTimelockExpired(order_id) => order_id, Event::RefundFinality(order_id) => order_id, + Event::CetFinality(order_id) => order_id, }; *order_id @@ -314,7 +349,6 @@ where impl xtra::Actor for Actor where T: xtra::Actor {} -// TODO: The traitbound for LockFinality should not be needed here, but we could not work around it #[async_trait] impl xtra::Handler for Actor where @@ -324,3 +358,13 @@ where log_error!(self.handle_start_monitoring(msg)); } } + +#[async_trait] +impl xtra::Handler for Actor +where + T: xtra::Actor + xtra::Handler, +{ + async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut xtra::Context) { + log_error!(self.handle_oracle_attestation(msg)); + } +} diff --git a/daemon/src/oracle.rs b/daemon/src/oracle.rs new file mode 100644 index 0000000..be944d0 --- /dev/null +++ b/daemon/src/oracle.rs @@ -0,0 +1,366 @@ +use crate::actors::log_error; +use anyhow::Result; +use async_trait::async_trait; +use cfd_protocol::secp256k1_zkp::{schnorrsig, SecretKey}; +use futures::stream::FuturesOrdered; +use futures::TryStreamExt; +use rocket::time::{format_description, Duration, OffsetDateTime, Time}; +use std::collections::HashSet; +use std::convert::TryFrom; + +/// Where `olivia` is located. +const OLIVIA_URL: &str = "https://h00.ooo/"; + +const OLIVIA_EVENT_TIME_FORMAT: &str = "[year]-[month]-[day]T[hour]:[minute]:[second]"; + +pub struct Actor +where + CFD: xtra::Handler + xtra::Handler, + M: xtra::Handler, +{ + latest_announcements: Option<[Announcement; 24]>, + pending_attestations: HashSet, + cfd_actor_address: xtra::Address, + monitor_actor_address: xtra::Address, +} + +impl Actor +where + CFD: xtra::Handler + xtra::Handler, + M: xtra::Handler, +{ + pub fn new( + cfd_actor_address: xtra::Address, + monitor_actor_address: xtra::Address, + ) -> Self { + Self { + latest_announcements: None, + pending_attestations: HashSet::new(), + cfd_actor_address, + monitor_actor_address, + } + } + + async fn update_state(&mut self) -> Result<()> { + self.update_latest_announcements().await?; + self.update_pending_attestations().await?; + + Ok(()) + } + + async fn update_latest_announcements(&mut self) -> Result<()> { + let new_announcements = next_urls() + .into_iter() + .map(|event_url| async { + let announcement = reqwest::get(event_url) + .await? + .json::() + .await?; + Result::<_, anyhow::Error>::Ok(announcement) + }) + .collect::>() + .try_collect::>() + .await?; + + let new_announcements = <[Announcement; 24]>::try_from(new_announcements) + .map_err(|vec| anyhow::anyhow!("wrong number of announcements: {}", vec.len()))?; + + if self.latest_announcements.as_ref() != Some(&new_announcements) { + self.latest_announcements = Some(new_announcements.clone()); + self.cfd_actor_address + .do_send_async(Announcements(new_announcements)) + .await?; + } + + Ok(()) + } + + async fn update_pending_attestations(&mut self) -> Result<()> { + let pending_attestations = self.pending_attestations.clone(); + for event_id in pending_attestations.into_iter() { + { + let res = match reqwest::get(format!("{}{}", OLIVIA_URL, event_id)).await { + Ok(res) => res, + Err(e) => { + // TODO: Can we differentiate between errors? + tracing::warn!(%event_id, "Attestation not available: {}", e); + continue; + } + }; + + let attestation = res.json::().await?; + + self.cfd_actor_address + .clone() + .do_send_async(attestation.clone()) + .await?; + self.monitor_actor_address + .clone() + .do_send_async(attestation) + .await?; + + self.pending_attestations.remove(&event_id); + } + } + + Ok(()) + } + + fn monitor_event(&mut self, event_id: String) { + if !self.pending_attestations.insert(event_id.clone()) { + tracing::trace!("Event {} already being monitored", event_id); + } + } +} + +impl xtra::Actor for Actor +where + CFD: xtra::Handler + xtra::Handler, + M: xtra::Handler, +{ +} + +pub struct Sync; + +impl xtra::Message for Sync { + type Result = (); +} + +#[async_trait] +impl xtra::Handler for Actor +where + CFD: xtra::Handler + xtra::Handler, + M: xtra::Handler, +{ + async fn handle(&mut self, _: Sync, _ctx: &mut xtra::Context) { + log_error!(self.update_state()) + } +} + +pub struct MonitorEvent { + pub event_id: String, +} + +impl xtra::Message for MonitorEvent { + type Result = (); +} + +#[async_trait] +impl xtra::Handler for Actor +where + CFD: xtra::Handler + xtra::Handler, + M: xtra::Handler, +{ + async fn handle(&mut self, msg: MonitorEvent, _ctx: &mut xtra::Context) { + self.monitor_event(msg.event_id) + } +} + +/// Construct the URL of the next 24 `BitMEX/BXBT` hourly events +/// `olivia` will attest to. +fn next_urls() -> Vec { + next_24_hours(OffsetDateTime::now_utc()) + .into_iter() + .map(event_url) + .collect() +} + +fn next_24_hours(datetime: OffsetDateTime) -> Vec { + let adjusted = datetime.replace_time(Time::from_hms(datetime.hour(), 0, 0).expect("in_range")); + (1..=24).map(|i| adjusted + Duration::hours(i)).collect() +} + +/// Construct the URL of `olivia`'s `BitMEX/BXBT` event to be attested +/// for at the time indicated by the argument `datetime`. +fn event_url(datetime: OffsetDateTime) -> String { + let formatter = format_description::parse(OLIVIA_EVENT_TIME_FORMAT).expect("valid formatter"); + datetime + .format(&formatter) + .expect("valid formatter for datetime"); + + format!("{}/BitMEX/BXBT/{}/price", OLIVIA_URL, datetime) +} + +#[derive(Debug, Clone, serde::Deserialize, PartialEq)] +#[serde(from = "olivia_api::Announcement")] +pub struct Announcement { + /// Identifier for an oracle event. + /// + /// Doubles up as the path of the URL for this event i.e. + /// https://h00.ooo/{id}. + pub id: String, + pub expected_outcome_time: OffsetDateTime, + pub nonce_pks: Vec, +} + +pub struct Announcements(pub [Announcement; 24]); + +// TODO: Implement real deserialization once price attestation is +// implemented in `olivia` +#[derive(Debug, Clone, serde::Deserialize)] +pub struct Attestation { + pub id: String, + pub price: u64, + pub scalars: Vec, +} + +impl xtra::Message for Announcements { + type Result = (); +} + +impl xtra::Message for Attestation { + type Result = (); +} + +mod olivia_api { + use cfd_protocol::secp256k1_zkp::schnorrsig; + use time::OffsetDateTime; + + impl From for super::Announcement { + fn from(announcement: Announcement) -> Self { + Self { + id: announcement.id, + expected_outcome_time: announcement.expected_outcome_time, + nonce_pks: announcement.schemes.olivia_v1.nonces, + } + } + } + + #[derive(Debug, Clone, serde::Deserialize)] + pub(crate) struct Announcement { + id: String, + #[serde(rename = "expected-outcome-time")] + #[serde(with = "timestamp")] + expected_outcome_time: OffsetDateTime, + schemes: Schemes, + } + + #[derive(Debug, Clone, serde::Deserialize)] + struct Schemes { + #[serde(rename = "olivia-v1")] + olivia_v1: OliviaV1, + } + + #[derive(Debug, Clone, serde::Deserialize)] + struct OliviaV1 { + nonces: Vec, + } + + mod timestamp { + use crate::oracle::OLIVIA_EVENT_TIME_FORMAT; + use serde::de::Error as _; + use serde::{Deserialize, Deserializer}; + use time::{format_description, OffsetDateTime, PrimitiveDateTime}; + + pub fn deserialize<'a, D>(deserializer: D) -> Result + where + D: Deserializer<'a>, + { + let string = String::deserialize(deserializer)?; + let format = format_description::parse(OLIVIA_EVENT_TIME_FORMAT).expect("valid format"); + + let date_time = PrimitiveDateTime::parse(&string, &format).map_err(D::Error::custom)?; + + Ok(date_time.assume_utc()) + } + } + + #[cfg(test)] + mod tests { + use std::str::FromStr; + + use crate::oracle; + use cfd_protocol::secp256k1_zkp::schnorrsig; + use time::macros::datetime; + + #[test] + fn deserialize_announcement() { + let json = r#" + { + "id": "/BitMEX/BXBT/2021-09-28T03:20:00/price/", + "expected-outcome-time": "2021-09-28T03:20:00", + "descriptor": { + "type": "enum", + "outcomes": [ + "0", + "1" + ] + }, + "schemes": { + "olivia-v1": { + "nonces": [ + "41a26c4853f2edc5604069541fdd1df103b52c31e959451d115f8220aeb8b414" + ] + }, + "ecdsa-v1": {} + } +} +"#; + + let deserialized = serde_json::from_str::(json).unwrap(); + let expected = oracle::Announcement { + id: "/BitMEX/BXBT/2021-09-28T03:20:00/price/".to_string(), + expected_outcome_time: datetime!(2021-09-28 03:20:00).assume_utc(), + nonce_pks: vec![schnorrsig::PublicKey::from_str( + "41a26c4853f2edc5604069541fdd1df103b52c31e959451d115f8220aeb8b414", + ) + .unwrap()], + }; + + assert_eq!(deserialized, expected) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use time::format_description; + use time::macros::datetime; + + #[test] + fn next_event_url_is_correct() { + let datetime = datetime!(2021-09-23 10:43:00); + let format = format_description::parse(OLIVIA_EVENT_TIME_FORMAT).unwrap(); + + let date_time_formatted = datetime.format(&format).unwrap(); + let expected = "2021-09-23T10:43:00"; + + assert_eq!(date_time_formatted, expected); + } + + #[test] + fn next_24() { + let datetime = datetime!(2021-09-23 10:43:12); + + let next_24_hours = next_24_hours(datetime.assume_utc()); + let expected = vec![ + datetime!(2021-09-23 11:00:00).assume_utc(), + datetime!(2021-09-23 12:00:00).assume_utc(), + datetime!(2021-09-23 13:00:00).assume_utc(), + datetime!(2021-09-23 14:00:00).assume_utc(), + datetime!(2021-09-23 15:00:00).assume_utc(), + datetime!(2021-09-23 16:00:00).assume_utc(), + datetime!(2021-09-23 17:00:00).assume_utc(), + datetime!(2021-09-23 18:00:00).assume_utc(), + datetime!(2021-09-23 19:00:00).assume_utc(), + datetime!(2021-09-23 20:00:00).assume_utc(), + datetime!(2021-09-23 21:00:00).assume_utc(), + datetime!(2021-09-23 22:00:00).assume_utc(), + datetime!(2021-09-23 23:00:00).assume_utc(), + datetime!(2021-09-24 00:00:00).assume_utc(), + datetime!(2021-09-24 01:00:00).assume_utc(), + datetime!(2021-09-24 02:00:00).assume_utc(), + datetime!(2021-09-24 03:00:00).assume_utc(), + datetime!(2021-09-24 04:00:00).assume_utc(), + datetime!(2021-09-24 05:00:00).assume_utc(), + datetime!(2021-09-24 06:00:00).assume_utc(), + datetime!(2021-09-24 07:00:00).assume_utc(), + datetime!(2021-09-24 08:00:00).assume_utc(), + datetime!(2021-09-24 09:00:00).assume_utc(), + datetime!(2021-09-24 10:00:00).assume_utc(), + ]; + + assert_eq!(next_24_hours, expected) + } +} diff --git a/daemon/src/setup_contract.rs b/daemon/src/setup_contract.rs index 375099b..15e9d32 100644 --- a/daemon/src/setup_contract.rs +++ b/daemon/src/setup_contract.rs @@ -20,7 +20,7 @@ use std::ops::RangeInclusive; pub async fn new( mut sink: impl Sink + Unpin, mut stream: impl FusedStream + Unpin, - oracle_pk: schnorrsig::PublicKey, + (oracle_pk, nonce_pks): (schnorrsig::PublicKey, Vec), cfd: Cfd, wallet: Wallet, role: Role, @@ -64,7 +64,7 @@ pub async fn new( let own_cfd_txs = create_cfd_transactions( (params.maker().clone(), *params.maker_punish()), (params.taker().clone(), *params.taker_punish()), - (oracle_pk, &[]), + (oracle_pk, &nonce_pks), ( model::cfd::Cfd::CET_TIMELOCK, cfd.refund_timelock_in_blocks(), diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 77f1f49..2e1c9b4 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -29,6 +29,7 @@ mod keypair; mod logger; mod model; mod monitor; +mod oracle; mod routes; mod routes_taker; mod seed; @@ -162,6 +163,8 @@ async fn main() -> Result<()> { let (monitor_actor_address, monitor_actor_context) = xtra::Context::new(None); + let (oracle_actor_address, mut oracle_actor_context) = xtra::Context::new(None); + let mut conn = db.acquire().await.unwrap(); let cfds = load_all_cfds(&mut conn).await.unwrap(); let cfd_actor_inbox = taker_cfd::Actor::new( @@ -171,8 +174,9 @@ async fn main() -> Result<()> { cfd_feed_sender, order_feed_sender, send_to_maker, - monitor_actor_address, + monitor_actor_address.clone(), cfds.clone(), + oracle_actor_address, ) .await .unwrap() @@ -190,6 +194,16 @@ async fn main() -> Result<()> { ); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); + tokio::spawn( + oracle_actor_context + .notify_interval(Duration::from_secs(60), || oracle::Sync) + .unwrap(), + ); + tokio::spawn(oracle_actor_context.run(oracle::Actor::new( + cfd_actor_inbox.clone(), + monitor_actor_address, + ))); + Ok(rocket.manage(cfd_actor_inbox)) }, )) diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index ba1b59a..4db44aa 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -10,7 +10,7 @@ use crate::model::Usd; use crate::monitor::{self, MonitorParams}; use crate::wallet::Wallet; use crate::wire::SetupMsg; -use crate::{send_to_socket, setup_contract, wire}; +use crate::{oracle, send_to_socket, setup_contract, wire}; use anyhow::{Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; @@ -51,6 +51,8 @@ pub struct Actor { send_to_maker: Address>, monitor_actor: Address>, setup_state: SetupState, + latest_announcement: Option, + oracle_actor: Address>>, } impl Actor { @@ -64,6 +66,7 @@ impl Actor { send_to_maker: Address>, monitor_actor: Address>, cfds: Vec, + oracle_actor: Address>>, ) -> Result { // populate the CFD feed with existing CFDs cfd_feed_actor_inbox.send(cfds.clone())?; @@ -90,6 +93,8 @@ impl Actor { send_to_maker, monitor_actor, setup_state: SetupState::None, + latest_announcement: None, + oracle_actor, }) } @@ -166,13 +171,25 @@ impl Actor { .send(load_all_cfds(&mut conn).await?)?; let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + let latest_announcement = self + .latest_announcement + .to_owned() + .context("Unaware of oracle's latest announcement.")?; + let nonce_pks = latest_announcement.nonce_pks; + + self.oracle_actor + .do_send_async(oracle::MonitorEvent { + event_id: latest_announcement.id, + }) + .await?; + let contract_future = setup_contract::new( self.send_to_maker .clone() .into_sink() .with(|msg| future::ok(wire::TakerToMaker::Protocol(msg))), receiver, - self.oracle_pk, + (self.oracle_pk, nonce_pks), cfd, self.wallet.clone(), setup_contract::Role::Taker, @@ -301,6 +318,27 @@ impl Actor { Ok(()) } + + async fn handle_oracle_announcements( + &mut self, + announcements: oracle::Announcements, + ) -> Result<()> { + tracing::debug!("Updating latest oracle announcements"); + self.latest_announcement = Some(announcements.0.last().unwrap().clone()); + + Ok(()) + } + + async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { + tracing::debug!( + "Learnt latest oracle attestation for event: {}", + attestation.id + ); + + todo!( + "Update all CFDs which care about this particular attestation, based on the event ID" + ); + } } #[async_trait] @@ -359,6 +397,20 @@ impl Handler for Actor { } } +#[async_trait] +impl Handler for Actor { + async fn handle(&mut self, msg: oracle::Announcements, _ctx: &mut Context) { + log_error!(self.handle_oracle_announcements(msg)) + } +} + +#[async_trait] +impl Handler for Actor { + async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context) { + log_error!(self.handle_oracle_attestation(msg)) + } +} + impl Message for TakeOffer { type Result = (); }