From 1114a90e3477cbb6ee3fc4d2dc4125190ae8a4ce Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Thu, 7 Oct 2021 15:26:26 +1100 Subject: [PATCH 1/4] Use `OracleEventId` for pending attestations --- daemon/src/oracle.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/daemon/src/oracle.rs b/daemon/src/oracle.rs index fbf05d4..c7e5169 100644 --- a/daemon/src/oracle.rs +++ b/daemon/src/oracle.rs @@ -24,7 +24,7 @@ where M: xtra::Handler, { latest_announcements: Option<[Announcement; 24]>, - pending_attestations: HashSet, + pending_attestations: HashSet, cfd_actor_address: xtra::Address, monitor_actor_address: xtra::Address, } @@ -123,7 +123,7 @@ where Ok(()) } - fn monitor_event(&mut self, event_id: String) { + fn monitor_event(&mut self, event_id: OracleEventId) { if !self.pending_attestations.insert(event_id.clone()) { tracing::trace!("Event {} already being monitored", event_id); } @@ -169,7 +169,7 @@ where M: xtra::Handler, { async fn handle(&mut self, msg: MonitorEvent, _ctx: &mut xtra::Context) { - self.monitor_event(msg.event_id.0) + self.monitor_event(msg.event_id) } } From 1dcb23bdf670a342e7e90de5624bd733cb979d8c Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Thu, 7 Oct 2021 15:36:35 +1100 Subject: [PATCH 2/4] Handle oracle actor pending attestations upon startup We `insert` - which returns true if the value was not present in the set yet and false if it already was (does not fail if already in set). --- daemon/src/maker.rs | 3 ++- daemon/src/oracle.rs | 31 ++++++++++++++++++++++++++++++- daemon/src/taker.rs | 11 ++++++++--- 3 files changed, 40 insertions(+), 5 deletions(-) diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 7602a89..7611b8d 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -247,7 +247,7 @@ async fn main() -> Result<()> { monitor::Actor::new( opts.network.electrum(), cfd_maker_actor_inbox.clone(), - cfds, + cfds.clone(), ) .await .unwrap(), @@ -262,6 +262,7 @@ async fn main() -> Result<()> { tokio::spawn(oracle_actor_context.run(oracle::Actor::new( cfd_maker_actor_inbox.clone(), monitor_actor_address, + cfds, ))); let listener_stream = futures::stream::poll_fn(move |ctx| { diff --git a/daemon/src/oracle.rs b/daemon/src/oracle.rs index c7e5169..7839817 100644 --- a/daemon/src/oracle.rs +++ b/daemon/src/oracle.rs @@ -1,4 +1,5 @@ use crate::actors::log_error; +use crate::model::cfd::{Cfd, CfdState}; use crate::model::OracleEventId; use anyhow::{Context, Result}; use async_trait::async_trait; @@ -37,10 +38,38 @@ where pub fn new( cfd_actor_address: xtra::Address, monitor_actor_address: xtra::Address, + cfds: Vec, ) -> Self { + let mut pending_attestations = HashSet::new(); + + for cfd in cfds { + match cfd.state.clone() { + CfdState::PendingOpen { .. } + | CfdState::Open { .. } + | CfdState::PendingCommit { .. } + | CfdState::OpenCommitted { .. } + | CfdState::PendingCet { .. } => { + pending_attestations.insert(cfd.order.oracle_event_id); + } + + // Irrelevant for restart + CfdState::OutgoingOrderRequest { .. } + | CfdState::IncomingOrderRequest { .. } + | CfdState::Accepted { .. } + | CfdState::Rejected { .. } + | CfdState::ContractSetup { .. } + + // Final states + | CfdState::Closed { .. } + | CfdState::MustRefund { .. } + | CfdState::Refunded { .. } + | CfdState::SetupFailed { .. } => () + } + } + Self { latest_announcements: None, - pending_attestations: HashSet::new(), + pending_attestations, cfd_actor_address, monitor_actor_address, } diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index cf9c74e..f17fa25 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -243,9 +243,13 @@ async fn main() -> Result<()> { ); tokio::spawn( monitor_actor_context.run( - monitor::Actor::new(opts.network.electrum(), cfd_actor_inbox.clone(), cfds) - .await - .unwrap(), + monitor::Actor::new( + opts.network.electrum(), + cfd_actor_inbox.clone(), + cfds.clone(), + ) + .await + .unwrap(), ), ); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); @@ -257,6 +261,7 @@ async fn main() -> Result<()> { tokio::spawn(oracle_actor_context.run(oracle::Actor::new( cfd_actor_inbox.clone(), monitor_actor_address, + cfds, ))); Ok(rocket.manage(cfd_actor_inbox).manage(cfd_feed_receiver)) From 0660e02b1fb63772d1cf64cd86c9f85687388e7a Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Thu, 7 Oct 2021 15:40:12 +1100 Subject: [PATCH 3/4] Oracle actor initial state update Block until we are sure that the update was done, because otherwise we might have weird side effects in the UI because there is a dependency on the announcements being available when creating offers. --- daemon/src/maker.rs | 5 ++++- daemon/src/taker.rs | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 7611b8d..df837e3 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -227,7 +227,7 @@ async fn main() -> Result<()> { update_cfd_feed_sender, maker_inc_connections_address.clone(), monitor_actor_address.clone(), - oracle_actor_address, + oracle_actor_address.clone(), ) .create(None) .spawn_global(); @@ -265,6 +265,9 @@ async fn main() -> Result<()> { cfds, ))); + // use `.send` here to ensure we only continue once the update was processed + oracle_actor_address.send(oracle::Sync).await.unwrap(); + let listener_stream = futures::stream::poll_fn(move |ctx| { let message = match futures::ready!(listener.poll_accept(ctx)) { Ok((stream, address)) => { diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index f17fa25..0e7358f 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -227,7 +227,7 @@ async fn main() -> Result<()> { update_cfd_feed_sender, send_to_maker, monitor_actor_address.clone(), - oracle_actor_address, + oracle_actor_address.clone(), ) .create(None) .spawn_global(); @@ -264,6 +264,9 @@ async fn main() -> Result<()> { cfds, ))); + // use `.send` here to ensure we only continue once the update was processed + oracle_actor_address.send(oracle::Sync).await.unwrap(); + Ok(rocket.manage(cfd_actor_inbox).manage(cfd_feed_receiver)) }, )) From 6bb4824c2206d84b5442990768717ee80f2c20a4 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Thu, 7 Oct 2021 16:03:49 +1100 Subject: [PATCH 4/4] Handle attestation not ready response ... and other minor cleanups concerning Olivia's response handling. --- daemon/src/oracle.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/daemon/src/oracle.rs b/daemon/src/oracle.rs index 7839817..be78ff7 100644 --- a/daemon/src/oracle.rs +++ b/daemon/src/oracle.rs @@ -6,6 +6,7 @@ use async_trait::async_trait; use cfd_protocol::secp256k1_zkp::{schnorrsig, SecretKey}; use futures::stream::FuturesOrdered; use futures::TryStreamExt; +use reqwest::StatusCode; use rocket::time::format_description::FormatItem; use rocket::time::macros::format_description; use rocket::time::{Duration, OffsetDateTime, Time}; @@ -126,15 +127,22 @@ where for event_id in pending_attestations.into_iter() { { let res = match reqwest::get(format!("{}{}", OLIVIA_URL, event_id)).await { - Ok(res) => res, + Ok(res) if res.status().is_success() => res, + Ok(res) if res.status() == StatusCode::NOT_FOUND => { + tracing::trace!("Attestation not ready yet"); + continue; + } + Ok(res) => { + tracing::warn!("Unexpected response, status {}", res.status()); + continue; + } Err(e) => { - // TODO: Can we differentiate between errors? - tracing::warn!(%event_id, "Attestation not available: {}", e); + tracing::warn!(%event_id, "Failed to fetch attestation: {}", e); continue; } }; - let attestation = res.json::().await?; + let attestation = dbg!(res).json::().await?; self.cfd_actor_address .clone()