Browse Source

Watch relevant olivia announcements and attestations

fix-olivia-event-id
Lucas Soriano del Pino 3 years ago
parent
commit
23d7c6110f
No known key found for this signature in database GPG Key ID: EE611E973A1530E7
  1. 76
      Cargo.lock
  2. 3
      daemon/Cargo.toml
  3. 17
      daemon/src/maker.rs
  4. 56
      daemon/src/maker_cfd.rs
  5. 3
      daemon/src/model/cfd.rs
  6. 46
      daemon/src/monitor.rs
  7. 366
      daemon/src/oracle.rs
  8. 4
      daemon/src/setup_contract.rs
  9. 16
      daemon/src/taker.rs
  10. 56
      daemon/src/taker_cfd.rs

76
Cargo.lock

@ -479,6 +479,7 @@ dependencies = [
"hex",
"hkdf",
"rand 0.6.5",
"reqwest",
"rocket",
"rocket-basicauth",
"rocket_db_pools",
@ -493,6 +494,7 @@ dependencies = [
"sqlx",
"tempfile",
"thiserror",
"time",
"tokio",
"tokio-tungstenite",
"tokio-util",
@ -1044,6 +1046,12 @@ dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "ipnet"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68f2d64f2edebec4ce84ad108148e67e1064789bee435edc5b60ad398714a3a9"
[[package]]
name = "itertools"
version = "0.10.1"
@ -1753,6 +1761,38 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "reqwest"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "246e9f61b9bb77df069a947682be06e31ac43ea37862e244a69f177694ea6d22"
dependencies = [
"base64 0.13.0",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"http",
"http-body",
"hyper",
"ipnet",
"js-sys",
"lazy_static",
"log",
"mime",
"percent-encoding",
"pin-project-lite",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"winreg",
]
[[package]]
name = "ring"
version = "0.16.20"
@ -2143,6 +2183,18 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edfa57a7f8d9c1d260a549e7224100f6c43d43f9103e06dd8b4095a9b2b43ce9"
dependencies = [
"form_urlencoded",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "serde_with"
version = "1.10.0"
@ -2496,6 +2548,7 @@ checksum = "cde1cf55178e0293453ba2cca0d5f8392a922e52aa958aee9c28ed02becc6d03"
dependencies = [
"itoa",
"libc",
"serde",
"time-macros",
]
@ -2892,6 +2945,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce"
dependencies = [
"cfg-if 1.0.0",
"serde",
"serde_json",
"wasm-bindgen-macro",
]
@ -2910,6 +2965,18 @@ dependencies = [
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e8d7523cb1f2a4c96c1317ca690031b714a51cc14e05f712446691f413f5d39"
dependencies = [
"cfg-if 1.0.0",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.78"
@ -3030,6 +3097,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "winreg"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69"
dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "ws2_32-sys"
version = "0.2.1"

3
daemon/Cargo.toml

@ -15,6 +15,7 @@ futures = { version = "0.3", default-features = false }
hex = "0.4"
hkdf = "0.11"
rand = "0.6"
reqwest = { version = "0.11", default-features = false, features = ["json"] }
rocket = { version = "0.5.0-rc.1", features = ["json"] }
rocket-basicauth = { version = "2", default-features = false }
rocket_db_pools = { git = "https://github.com/SergioBenitez/Rocket", features = ["sqlx_sqlite"] }
@ -28,6 +29,7 @@ serde_with = { version = "1", features = ["macros"] }
sha2 = "0.9"
sqlx = { version = "0.5", features = ["offline"] }
thiserror = "1"
time = { version = "0.3", features = ["serde"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "net"] }
tokio-tungstenite = { version = "0.15", features = ["rustls-tls"] }
tokio-util = { version = "0.6", features = ["codec"] }
@ -46,3 +48,4 @@ path = "src/maker.rs"
[dev-dependencies]
tempfile = "3"
time = { version = "0.3", features = ["std"] }

17
daemon/src/maker.rs

@ -12,6 +12,7 @@ use rocket::fairing::AdHoc;
use rocket_db_pools::Database;
use std::path::PathBuf;
use std::task::Poll;
use std::time::Duration;
use tokio::sync::watch;
use tracing_subscriber::filter::LevelFilter;
use xtra::prelude::*;
@ -28,6 +29,7 @@ mod maker_cfd;
mod maker_inc_connections;
mod model;
mod monitor;
mod oracle;
mod routes;
mod routes_maker;
mod seed;
@ -158,6 +160,8 @@ async fn main() -> Result<()> {
let (monitor_actor_address, monitor_actor_context) = xtra::Context::new(None);
let (oracle_actor_address, mut oracle_actor_context) = xtra::Context::new(None);
let mut conn = db.acquire().await.unwrap();
let cfds = load_all_cfds(&mut conn).await.unwrap();
let cfd_maker_actor_inbox = maker_cfd::Actor::new(
@ -167,8 +171,9 @@ async fn main() -> Result<()> {
cfd_feed_sender,
order_feed_sender,
maker_inc_connections_address.clone(),
monitor_actor_address,
monitor_actor_address.clone(),
cfds.clone(),
oracle_actor_address,
)
.await
.unwrap()
@ -184,6 +189,16 @@ async fn main() -> Result<()> {
monitor::Actor::new(&opts.electrum, cfd_maker_actor_inbox.clone(), cfds).await,
));
tokio::spawn(
oracle_actor_context
.notify_interval(Duration::from_secs(60), || oracle::Sync)
.unwrap(),
);
tokio::spawn(oracle_actor_context.run(oracle::Actor::new(
cfd_maker_actor_inbox.clone(),
monitor_actor_address,
)));
let listener_stream = futures::stream::poll_fn(move |ctx| {
let message = match futures::ready!(listener.poll_accept(ctx)) {
Ok((stream, address)) => {

56
daemon/src/maker_cfd.rs

@ -8,7 +8,7 @@ use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc,
use crate::model::{TakerId, Usd};
use crate::monitor::MonitorParams;
use crate::wallet::Wallet;
use crate::{maker_inc_connections, monitor, setup_contract, wire};
use crate::{maker_inc_connections, monitor, oracle, setup_contract, wire};
use anyhow::{Context as _, Result};
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
@ -53,6 +53,8 @@ pub struct Actor {
current_order_id: Option<OrderId>,
monitor_actor: Address<monitor::Actor<Actor>>,
setup_state: SetupState,
latest_announcement: Option<oracle::Announcement>,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>,
}
enum SetupState {
@ -74,6 +76,7 @@ impl Actor {
takers: Address<maker_inc_connections::Actor>,
monitor_actor: Address<monitor::Actor<Actor>>,
cfds: Vec<Cfd>,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>,
) -> Result<Self> {
// populate the CFD feed with existing CFDs
cfd_feed_actor_inbox.send(cfds.clone())?;
@ -101,6 +104,8 @@ impl Actor {
current_order_id: None,
monitor_actor,
setup_state: SetupState::None,
latest_announcement: None,
oracle_actor,
})
}
@ -311,6 +316,18 @@ impl Actor {
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let latest_announcement = self
.latest_announcement
.to_owned()
.context("Unaware of oracle's latest announcement.")?;
let nonce_pks = latest_announcement.nonce_pks;
self.oracle_actor
.do_send_async(oracle::MonitorEvent {
event_id: latest_announcement.id,
})
.await?;
let contract_future = setup_contract::new(
self.takers.clone().into_sink().with(move |msg| {
future::ok(maker_inc_connections::TakerMessage {
@ -319,7 +336,7 @@ impl Actor {
})
}),
receiver,
self.oracle_pk,
(self.oracle_pk, nonce_pks),
cfd,
self.wallet.clone(),
setup_contract::Role::Maker,
@ -417,6 +434,27 @@ impl Actor {
Ok(())
}
async fn handle_oracle_announcements(
&mut self,
announcements: oracle::Announcements,
) -> Result<()> {
tracing::debug!("Updating latest oracle announcements");
self.latest_announcement = Some(announcements.0.last().unwrap().clone());
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
tracing::debug!(
"Learnt latest oracle attestation for event: {}",
attestation.id
);
todo!(
"Update all CFDs which care about this particular attestation, based on the event ID"
);
}
}
#[async_trait]
@ -493,6 +531,20 @@ impl Handler<TakerStreamMessage> for Actor {
}
}
#[async_trait]
impl Handler<oracle::Announcements> for Actor {
async fn handle(&mut self, msg: oracle::Announcements, _ctx: &mut Context<Self>) {
log_error!(self.handle_oracle_announcements(msg))
}
}
#[async_trait]
impl Handler<oracle::Attestation> for Actor {
async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context<Self>) {
log_error!(self.handle_oracle_attestation(msg))
}
}
impl Message for NewOrder {
type Result = ();
}

3
daemon/src/model/cfd.rs

@ -569,6 +569,9 @@ impl Cfd {
},
}
}
monitor::Event::CetFinality(_) => {
todo!("Implement state transition")
}
},
};

46
daemon/src/monitor.rs

@ -1,6 +1,7 @@
use crate::actors::log_error;
use crate::model::cfd::{CetStatus, Cfd, CfdState, Dlc, OrderId};
use crate::monitor::subscription::Subscription;
use crate::oracle;
use anyhow::Result;
use async_trait::async_trait;
use bdk::bitcoin::{PublicKey, Script, Txid};
@ -265,6 +266,38 @@ where
}
});
}
pub async fn handle_oracle_attestation(&self, attestation: oracle::Attestation) -> Result<()> {
for (order_id, MonitorParams { cets, .. }) in self.cfds.clone().into_iter() {
let (txid, script_pubkey) =
match cets.iter().find_map(|(txid, script_pubkey, range)| {
range
.contains(&attestation.price)
.then(|| (txid, script_pubkey))
}) {
Some(cet) => cet,
None => continue,
};
tokio::spawn({
let cfd_actor_addr = self.cfd_actor_addr.clone();
let cet_subscription = self
.monitor
.subscribe_to((*txid, script_pubkey.clone()))
.await;
async move {
cet_subscription.wait_until_final().await.unwrap();
cfd_actor_addr
.do_send_async(Event::CetFinality(order_id))
.await
.unwrap();
}
});
}
Ok(())
}
}
pub struct StartMonitoring {
@ -281,6 +314,7 @@ pub enum Event {
LockFinality(OrderId),
CommitFinality(OrderId),
CetTimelockExpired(OrderId),
CetFinality(OrderId),
RefundTimelockExpired(OrderId),
RefundFinality(OrderId),
}
@ -293,6 +327,7 @@ impl Event {
Event::CetTimelockExpired(order_id) => order_id,
Event::RefundTimelockExpired(order_id) => order_id,
Event::RefundFinality(order_id) => order_id,
Event::CetFinality(order_id) => order_id,
};
*order_id
@ -314,7 +349,6 @@ where
impl<T> xtra::Actor for Actor<T> where T: xtra::Actor {}
// TODO: The traitbound for LockFinality should not be needed here, but we could not work around it
#[async_trait]
impl<T> xtra::Handler<StartMonitoring> for Actor<T>
where
@ -324,3 +358,13 @@ where
log_error!(self.handle_start_monitoring(msg));
}
}
#[async_trait]
impl<T> xtra::Handler<oracle::Attestation> for Actor<T>
where
T: xtra::Actor + xtra::Handler<Event>,
{
async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut xtra::Context<Self>) {
log_error!(self.handle_oracle_attestation(msg));
}
}

366
daemon/src/oracle.rs

@ -0,0 +1,366 @@
use crate::actors::log_error;
use anyhow::Result;
use async_trait::async_trait;
use cfd_protocol::secp256k1_zkp::{schnorrsig, SecretKey};
use futures::stream::FuturesOrdered;
use futures::TryStreamExt;
use rocket::time::{format_description, Duration, OffsetDateTime, Time};
use std::collections::HashSet;
use std::convert::TryFrom;
/// Where `olivia` is located.
const OLIVIA_URL: &str = "https://h00.ooo/";
const OLIVIA_EVENT_TIME_FORMAT: &str = "[year]-[month]-[day]T[hour]:[minute]:[second]";
pub struct Actor<CFD, M>
where
CFD: xtra::Handler<Announcements> + xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
latest_announcements: Option<[Announcement; 24]>,
pending_attestations: HashSet<String>,
cfd_actor_address: xtra::Address<CFD>,
monitor_actor_address: xtra::Address<M>,
}
impl<CFD, M> Actor<CFD, M>
where
CFD: xtra::Handler<Announcements> + xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
pub fn new(
cfd_actor_address: xtra::Address<CFD>,
monitor_actor_address: xtra::Address<M>,
) -> Self {
Self {
latest_announcements: None,
pending_attestations: HashSet::new(),
cfd_actor_address,
monitor_actor_address,
}
}
async fn update_state(&mut self) -> Result<()> {
self.update_latest_announcements().await?;
self.update_pending_attestations().await?;
Ok(())
}
async fn update_latest_announcements(&mut self) -> Result<()> {
let new_announcements = next_urls()
.into_iter()
.map(|event_url| async {
let announcement = reqwest::get(event_url)
.await?
.json::<Announcement>()
.await?;
Result::<_, anyhow::Error>::Ok(announcement)
})
.collect::<FuturesOrdered<_>>()
.try_collect::<Vec<_>>()
.await?;
let new_announcements = <[Announcement; 24]>::try_from(new_announcements)
.map_err(|vec| anyhow::anyhow!("wrong number of announcements: {}", vec.len()))?;
if self.latest_announcements.as_ref() != Some(&new_announcements) {
self.latest_announcements = Some(new_announcements.clone());
self.cfd_actor_address
.do_send_async(Announcements(new_announcements))
.await?;
}
Ok(())
}
async fn update_pending_attestations(&mut self) -> Result<()> {
let pending_attestations = self.pending_attestations.clone();
for event_id in pending_attestations.into_iter() {
{
let res = match reqwest::get(format!("{}{}", OLIVIA_URL, event_id)).await {
Ok(res) => res,
Err(e) => {
// TODO: Can we differentiate between errors?
tracing::warn!(%event_id, "Attestation not available: {}", e);
continue;
}
};
let attestation = res.json::<Attestation>().await?;
self.cfd_actor_address
.clone()
.do_send_async(attestation.clone())
.await?;
self.monitor_actor_address
.clone()
.do_send_async(attestation)
.await?;
self.pending_attestations.remove(&event_id);
}
}
Ok(())
}
fn monitor_event(&mut self, event_id: String) {
if !self.pending_attestations.insert(event_id.clone()) {
tracing::trace!("Event {} already being monitored", event_id);
}
}
}
impl<CFD, M> xtra::Actor for Actor<CFD, M>
where
CFD: xtra::Handler<Announcements> + xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
}
pub struct Sync;
impl xtra::Message for Sync {
type Result = ();
}
#[async_trait]
impl<CFD, M> xtra::Handler<Sync> for Actor<CFD, M>
where
CFD: xtra::Handler<Announcements> + xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
async fn handle(&mut self, _: Sync, _ctx: &mut xtra::Context<Self>) {
log_error!(self.update_state())
}
}
pub struct MonitorEvent {
pub event_id: String,
}
impl xtra::Message for MonitorEvent {
type Result = ();
}
#[async_trait]
impl<CFD, M> xtra::Handler<MonitorEvent> for Actor<CFD, M>
where
CFD: xtra::Handler<Announcements> + xtra::Handler<Attestation>,
M: xtra::Handler<Attestation>,
{
async fn handle(&mut self, msg: MonitorEvent, _ctx: &mut xtra::Context<Self>) {
self.monitor_event(msg.event_id)
}
}
/// Construct the URL of the next 24 `BitMEX/BXBT` hourly events
/// `olivia` will attest to.
fn next_urls() -> Vec<String> {
next_24_hours(OffsetDateTime::now_utc())
.into_iter()
.map(event_url)
.collect()
}
fn next_24_hours(datetime: OffsetDateTime) -> Vec<OffsetDateTime> {
let adjusted = datetime.replace_time(Time::from_hms(datetime.hour(), 0, 0).expect("in_range"));
(1..=24).map(|i| adjusted + Duration::hours(i)).collect()
}
/// Construct the URL of `olivia`'s `BitMEX/BXBT` event to be attested
/// for at the time indicated by the argument `datetime`.
fn event_url(datetime: OffsetDateTime) -> String {
let formatter = format_description::parse(OLIVIA_EVENT_TIME_FORMAT).expect("valid formatter");
datetime
.format(&formatter)
.expect("valid formatter for datetime");
format!("{}/BitMEX/BXBT/{}/price", OLIVIA_URL, datetime)
}
#[derive(Debug, Clone, serde::Deserialize, PartialEq)]
#[serde(from = "olivia_api::Announcement")]
pub struct Announcement {
/// Identifier for an oracle event.
///
/// Doubles up as the path of the URL for this event i.e.
/// https://h00.ooo/{id}.
pub id: String,
pub expected_outcome_time: OffsetDateTime,
pub nonce_pks: Vec<schnorrsig::PublicKey>,
}
pub struct Announcements(pub [Announcement; 24]);
// TODO: Implement real deserialization once price attestation is
// implemented in `olivia`
#[derive(Debug, Clone, serde::Deserialize)]
pub struct Attestation {
pub id: String,
pub price: u64,
pub scalars: Vec<SecretKey>,
}
impl xtra::Message for Announcements {
type Result = ();
}
impl xtra::Message for Attestation {
type Result = ();
}
mod olivia_api {
use cfd_protocol::secp256k1_zkp::schnorrsig;
use time::OffsetDateTime;
impl From<Announcement> for super::Announcement {
fn from(announcement: Announcement) -> Self {
Self {
id: announcement.id,
expected_outcome_time: announcement.expected_outcome_time,
nonce_pks: announcement.schemes.olivia_v1.nonces,
}
}
}
#[derive(Debug, Clone, serde::Deserialize)]
pub(crate) struct Announcement {
id: String,
#[serde(rename = "expected-outcome-time")]
#[serde(with = "timestamp")]
expected_outcome_time: OffsetDateTime,
schemes: Schemes,
}
#[derive(Debug, Clone, serde::Deserialize)]
struct Schemes {
#[serde(rename = "olivia-v1")]
olivia_v1: OliviaV1,
}
#[derive(Debug, Clone, serde::Deserialize)]
struct OliviaV1 {
nonces: Vec<schnorrsig::PublicKey>,
}
mod timestamp {
use crate::oracle::OLIVIA_EVENT_TIME_FORMAT;
use serde::de::Error as _;
use serde::{Deserialize, Deserializer};
use time::{format_description, OffsetDateTime, PrimitiveDateTime};
pub fn deserialize<'a, D>(deserializer: D) -> Result<OffsetDateTime, D::Error>
where
D: Deserializer<'a>,
{
let string = String::deserialize(deserializer)?;
let format = format_description::parse(OLIVIA_EVENT_TIME_FORMAT).expect("valid format");
let date_time = PrimitiveDateTime::parse(&string, &format).map_err(D::Error::custom)?;
Ok(date_time.assume_utc())
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use crate::oracle;
use cfd_protocol::secp256k1_zkp::schnorrsig;
use time::macros::datetime;
#[test]
fn deserialize_announcement() {
let json = r#"
{
"id": "/BitMEX/BXBT/2021-09-28T03:20:00/price/",
"expected-outcome-time": "2021-09-28T03:20:00",
"descriptor": {
"type": "enum",
"outcomes": [
"0",
"1"
]
},
"schemes": {
"olivia-v1": {
"nonces": [
"41a26c4853f2edc5604069541fdd1df103b52c31e959451d115f8220aeb8b414"
]
},
"ecdsa-v1": {}
}
}
"#;
let deserialized = serde_json::from_str::<oracle::Announcement>(json).unwrap();
let expected = oracle::Announcement {
id: "/BitMEX/BXBT/2021-09-28T03:20:00/price/".to_string(),
expected_outcome_time: datetime!(2021-09-28 03:20:00).assume_utc(),
nonce_pks: vec![schnorrsig::PublicKey::from_str(
"41a26c4853f2edc5604069541fdd1df103b52c31e959451d115f8220aeb8b414",
)
.unwrap()],
};
assert_eq!(deserialized, expected)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use time::format_description;
use time::macros::datetime;
#[test]
fn next_event_url_is_correct() {
let datetime = datetime!(2021-09-23 10:43:00);
let format = format_description::parse(OLIVIA_EVENT_TIME_FORMAT).unwrap();
let date_time_formatted = datetime.format(&format).unwrap();
let expected = "2021-09-23T10:43:00";
assert_eq!(date_time_formatted, expected);
}
#[test]
fn next_24() {
let datetime = datetime!(2021-09-23 10:43:12);
let next_24_hours = next_24_hours(datetime.assume_utc());
let expected = vec![
datetime!(2021-09-23 11:00:00).assume_utc(),
datetime!(2021-09-23 12:00:00).assume_utc(),
datetime!(2021-09-23 13:00:00).assume_utc(),
datetime!(2021-09-23 14:00:00).assume_utc(),
datetime!(2021-09-23 15:00:00).assume_utc(),
datetime!(2021-09-23 16:00:00).assume_utc(),
datetime!(2021-09-23 17:00:00).assume_utc(),
datetime!(2021-09-23 18:00:00).assume_utc(),
datetime!(2021-09-23 19:00:00).assume_utc(),
datetime!(2021-09-23 20:00:00).assume_utc(),
datetime!(2021-09-23 21:00:00).assume_utc(),
datetime!(2021-09-23 22:00:00).assume_utc(),
datetime!(2021-09-23 23:00:00).assume_utc(),
datetime!(2021-09-24 00:00:00).assume_utc(),
datetime!(2021-09-24 01:00:00).assume_utc(),
datetime!(2021-09-24 02:00:00).assume_utc(),
datetime!(2021-09-24 03:00:00).assume_utc(),
datetime!(2021-09-24 04:00:00).assume_utc(),
datetime!(2021-09-24 05:00:00).assume_utc(),
datetime!(2021-09-24 06:00:00).assume_utc(),
datetime!(2021-09-24 07:00:00).assume_utc(),
datetime!(2021-09-24 08:00:00).assume_utc(),
datetime!(2021-09-24 09:00:00).assume_utc(),
datetime!(2021-09-24 10:00:00).assume_utc(),
];
assert_eq!(next_24_hours, expected)
}
}

4
daemon/src/setup_contract.rs

@ -20,7 +20,7 @@ use std::ops::RangeInclusive;
pub async fn new(
mut sink: impl Sink<SetupMsg, Error = anyhow::Error> + Unpin,
mut stream: impl FusedStream<Item = SetupMsg> + Unpin,
oracle_pk: schnorrsig::PublicKey,
(oracle_pk, nonce_pks): (schnorrsig::PublicKey, Vec<schnorrsig::PublicKey>),
cfd: Cfd,
wallet: Wallet,
role: Role,
@ -64,7 +64,7 @@ pub async fn new(
let own_cfd_txs = create_cfd_transactions(
(params.maker().clone(), *params.maker_punish()),
(params.taker().clone(), *params.taker_punish()),
(oracle_pk, &[]),
(oracle_pk, &nonce_pks),
(
model::cfd::Cfd::CET_TIMELOCK,
cfd.refund_timelock_in_blocks(),

16
daemon/src/taker.rs

@ -29,6 +29,7 @@ mod keypair;
mod logger;
mod model;
mod monitor;
mod oracle;
mod routes;
mod routes_taker;
mod seed;
@ -162,6 +163,8 @@ async fn main() -> Result<()> {
let (monitor_actor_address, monitor_actor_context) = xtra::Context::new(None);
let (oracle_actor_address, mut oracle_actor_context) = xtra::Context::new(None);
let mut conn = db.acquire().await.unwrap();
let cfds = load_all_cfds(&mut conn).await.unwrap();
let cfd_actor_inbox = taker_cfd::Actor::new(
@ -171,8 +174,9 @@ async fn main() -> Result<()> {
cfd_feed_sender,
order_feed_sender,
send_to_maker,
monitor_actor_address,
monitor_actor_address.clone(),
cfds.clone(),
oracle_actor_address,
)
.await
.unwrap()
@ -190,6 +194,16 @@ async fn main() -> Result<()> {
);
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
tokio::spawn(
oracle_actor_context
.notify_interval(Duration::from_secs(60), || oracle::Sync)
.unwrap(),
);
tokio::spawn(oracle_actor_context.run(oracle::Actor::new(
cfd_actor_inbox.clone(),
monitor_actor_address,
)));
Ok(rocket.manage(cfd_actor_inbox))
},
))

56
daemon/src/taker_cfd.rs

@ -10,7 +10,7 @@ use crate::model::Usd;
use crate::monitor::{self, MonitorParams};
use crate::wallet::Wallet;
use crate::wire::SetupMsg;
use crate::{send_to_socket, setup_contract, wire};
use crate::{oracle, send_to_socket, setup_contract, wire};
use anyhow::{Context as _, Result};
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
@ -51,6 +51,8 @@ pub struct Actor {
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>,
monitor_actor: Address<monitor::Actor<Actor>>,
setup_state: SetupState,
latest_announcement: Option<oracle::Announcement>,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>,
}
impl Actor {
@ -64,6 +66,7 @@ impl Actor {
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>,
monitor_actor: Address<monitor::Actor<Actor>>,
cfds: Vec<Cfd>,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>,
) -> Result<Self> {
// populate the CFD feed with existing CFDs
cfd_feed_actor_inbox.send(cfds.clone())?;
@ -90,6 +93,8 @@ impl Actor {
send_to_maker,
monitor_actor,
setup_state: SetupState::None,
latest_announcement: None,
oracle_actor,
})
}
@ -166,13 +171,25 @@ impl Actor {
.send(load_all_cfds(&mut conn).await?)?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let latest_announcement = self
.latest_announcement
.to_owned()
.context("Unaware of oracle's latest announcement.")?;
let nonce_pks = latest_announcement.nonce_pks;
self.oracle_actor
.do_send_async(oracle::MonitorEvent {
event_id: latest_announcement.id,
})
.await?;
let contract_future = setup_contract::new(
self.send_to_maker
.clone()
.into_sink()
.with(|msg| future::ok(wire::TakerToMaker::Protocol(msg))),
receiver,
self.oracle_pk,
(self.oracle_pk, nonce_pks),
cfd,
self.wallet.clone(),
setup_contract::Role::Taker,
@ -301,6 +318,27 @@ impl Actor {
Ok(())
}
async fn handle_oracle_announcements(
&mut self,
announcements: oracle::Announcements,
) -> Result<()> {
tracing::debug!("Updating latest oracle announcements");
self.latest_announcement = Some(announcements.0.last().unwrap().clone());
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
tracing::debug!(
"Learnt latest oracle attestation for event: {}",
attestation.id
);
todo!(
"Update all CFDs which care about this particular attestation, based on the event ID"
);
}
}
#[async_trait]
@ -359,6 +397,20 @@ impl Handler<monitor::Event> for Actor {
}
}
#[async_trait]
impl Handler<oracle::Announcements> for Actor {
async fn handle(&mut self, msg: oracle::Announcements, _ctx: &mut Context<Self>) {
log_error!(self.handle_oracle_announcements(msg))
}
}
#[async_trait]
impl Handler<oracle::Attestation> for Actor {
async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context<Self>) {
log_error!(self.handle_oracle_attestation(msg))
}
}
impl Message for TakeOffer {
type Result = ();
}

Loading…
Cancel
Save