diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 7602a89..df837e3 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -227,7 +227,7 @@ async fn main() -> Result<()> { update_cfd_feed_sender, maker_inc_connections_address.clone(), monitor_actor_address.clone(), - oracle_actor_address, + oracle_actor_address.clone(), ) .create(None) .spawn_global(); @@ -247,7 +247,7 @@ async fn main() -> Result<()> { monitor::Actor::new( opts.network.electrum(), cfd_maker_actor_inbox.clone(), - cfds, + cfds.clone(), ) .await .unwrap(), @@ -262,8 +262,12 @@ async fn main() -> Result<()> { tokio::spawn(oracle_actor_context.run(oracle::Actor::new( cfd_maker_actor_inbox.clone(), monitor_actor_address, + cfds, ))); + // use `.send` here to ensure we only continue once the update was processed + oracle_actor_address.send(oracle::Sync).await.unwrap(); + let listener_stream = futures::stream::poll_fn(move |ctx| { let message = match futures::ready!(listener.poll_accept(ctx)) { Ok((stream, address)) => { diff --git a/daemon/src/oracle.rs b/daemon/src/oracle.rs index fbf05d4..be78ff7 100644 --- a/daemon/src/oracle.rs +++ b/daemon/src/oracle.rs @@ -1,10 +1,12 @@ use crate::actors::log_error; +use crate::model::cfd::{Cfd, CfdState}; use crate::model::OracleEventId; use anyhow::{Context, Result}; use async_trait::async_trait; use cfd_protocol::secp256k1_zkp::{schnorrsig, SecretKey}; use futures::stream::FuturesOrdered; use futures::TryStreamExt; +use reqwest::StatusCode; use rocket::time::format_description::FormatItem; use rocket::time::macros::format_description; use rocket::time::{Duration, OffsetDateTime, Time}; @@ -24,7 +26,7 @@ where M: xtra::Handler, { latest_announcements: Option<[Announcement; 24]>, - pending_attestations: HashSet, + pending_attestations: HashSet, cfd_actor_address: xtra::Address, monitor_actor_address: xtra::Address, } @@ -37,10 +39,38 @@ where pub fn new( cfd_actor_address: xtra::Address, monitor_actor_address: xtra::Address, + cfds: Vec, ) -> Self { + let mut pending_attestations = HashSet::new(); + + for cfd in cfds { + match cfd.state.clone() { + CfdState::PendingOpen { .. } + | CfdState::Open { .. } + | CfdState::PendingCommit { .. } + | CfdState::OpenCommitted { .. } + | CfdState::PendingCet { .. } => { + pending_attestations.insert(cfd.order.oracle_event_id); + } + + // Irrelevant for restart + CfdState::OutgoingOrderRequest { .. } + | CfdState::IncomingOrderRequest { .. } + | CfdState::Accepted { .. } + | CfdState::Rejected { .. } + | CfdState::ContractSetup { .. } + + // Final states + | CfdState::Closed { .. } + | CfdState::MustRefund { .. } + | CfdState::Refunded { .. } + | CfdState::SetupFailed { .. } => () + } + } + Self { latest_announcements: None, - pending_attestations: HashSet::new(), + pending_attestations, cfd_actor_address, monitor_actor_address, } @@ -97,15 +127,22 @@ where for event_id in pending_attestations.into_iter() { { let res = match reqwest::get(format!("{}{}", OLIVIA_URL, event_id)).await { - Ok(res) => res, + Ok(res) if res.status().is_success() => res, + Ok(res) if res.status() == StatusCode::NOT_FOUND => { + tracing::trace!("Attestation not ready yet"); + continue; + } + Ok(res) => { + tracing::warn!("Unexpected response, status {}", res.status()); + continue; + } Err(e) => { - // TODO: Can we differentiate between errors? - tracing::warn!(%event_id, "Attestation not available: {}", e); + tracing::warn!(%event_id, "Failed to fetch attestation: {}", e); continue; } }; - let attestation = res.json::().await?; + let attestation = dbg!(res).json::().await?; self.cfd_actor_address .clone() @@ -123,7 +160,7 @@ where Ok(()) } - fn monitor_event(&mut self, event_id: String) { + fn monitor_event(&mut self, event_id: OracleEventId) { if !self.pending_attestations.insert(event_id.clone()) { tracing::trace!("Event {} already being monitored", event_id); } @@ -169,7 +206,7 @@ where M: xtra::Handler, { async fn handle(&mut self, msg: MonitorEvent, _ctx: &mut xtra::Context) { - self.monitor_event(msg.event_id.0) + self.monitor_event(msg.event_id) } } diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index cf9c74e..0e7358f 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -227,7 +227,7 @@ async fn main() -> Result<()> { update_cfd_feed_sender, send_to_maker, monitor_actor_address.clone(), - oracle_actor_address, + oracle_actor_address.clone(), ) .create(None) .spawn_global(); @@ -243,9 +243,13 @@ async fn main() -> Result<()> { ); tokio::spawn( monitor_actor_context.run( - monitor::Actor::new(opts.network.electrum(), cfd_actor_inbox.clone(), cfds) - .await - .unwrap(), + monitor::Actor::new( + opts.network.electrum(), + cfd_actor_inbox.clone(), + cfds.clone(), + ) + .await + .unwrap(), ), ); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); @@ -257,8 +261,12 @@ async fn main() -> Result<()> { tokio::spawn(oracle_actor_context.run(oracle::Actor::new( cfd_actor_inbox.clone(), monitor_actor_address, + cfds, ))); + // use `.send` here to ensure we only continue once the update was processed + oracle_actor_address.send(oracle::Sync).await.unwrap(); + Ok(rocket.manage(cfd_actor_inbox).manage(cfd_feed_receiver)) }, ))