From 1dcb23bdf670a342e7e90de5624bd733cb979d8c Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Thu, 7 Oct 2021 15:36:35 +1100 Subject: [PATCH] Handle oracle actor pending attestations upon startup We `insert` - which returns true if the value was not present in the set yet and false if it already was (does not fail if already in set). --- daemon/src/maker.rs | 3 ++- daemon/src/oracle.rs | 31 ++++++++++++++++++++++++++++++- daemon/src/taker.rs | 11 ++++++++--- 3 files changed, 40 insertions(+), 5 deletions(-) diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 7602a89..7611b8d 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -247,7 +247,7 @@ async fn main() -> Result<()> { monitor::Actor::new( opts.network.electrum(), cfd_maker_actor_inbox.clone(), - cfds, + cfds.clone(), ) .await .unwrap(), @@ -262,6 +262,7 @@ async fn main() -> Result<()> { tokio::spawn(oracle_actor_context.run(oracle::Actor::new( cfd_maker_actor_inbox.clone(), monitor_actor_address, + cfds, ))); let listener_stream = futures::stream::poll_fn(move |ctx| { diff --git a/daemon/src/oracle.rs b/daemon/src/oracle.rs index c7e5169..7839817 100644 --- a/daemon/src/oracle.rs +++ b/daemon/src/oracle.rs @@ -1,4 +1,5 @@ use crate::actors::log_error; +use crate::model::cfd::{Cfd, CfdState}; use crate::model::OracleEventId; use anyhow::{Context, Result}; use async_trait::async_trait; @@ -37,10 +38,38 @@ where pub fn new( cfd_actor_address: xtra::Address, monitor_actor_address: xtra::Address, + cfds: Vec, ) -> Self { + let mut pending_attestations = HashSet::new(); + + for cfd in cfds { + match cfd.state.clone() { + CfdState::PendingOpen { .. } + | CfdState::Open { .. } + | CfdState::PendingCommit { .. } + | CfdState::OpenCommitted { .. } + | CfdState::PendingCet { .. } => { + pending_attestations.insert(cfd.order.oracle_event_id); + } + + // Irrelevant for restart + CfdState::OutgoingOrderRequest { .. } + | CfdState::IncomingOrderRequest { .. } + | CfdState::Accepted { .. } + | CfdState::Rejected { .. } + | CfdState::ContractSetup { .. } + + // Final states + | CfdState::Closed { .. } + | CfdState::MustRefund { .. } + | CfdState::Refunded { .. } + | CfdState::SetupFailed { .. } => () + } + } + Self { latest_announcements: None, - pending_attestations: HashSet::new(), + pending_attestations, cfd_actor_address, monitor_actor_address, } diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index cf9c74e..f17fa25 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -243,9 +243,13 @@ async fn main() -> Result<()> { ); tokio::spawn( monitor_actor_context.run( - monitor::Actor::new(opts.network.electrum(), cfd_actor_inbox.clone(), cfds) - .await - .unwrap(), + monitor::Actor::new( + opts.network.electrum(), + cfd_actor_inbox.clone(), + cfds.clone(), + ) + .await + .unwrap(), ), ); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); @@ -257,6 +261,7 @@ async fn main() -> Result<()> { tokio::spawn(oracle_actor_context.run(oracle::Actor::new( cfd_actor_inbox.clone(), monitor_actor_address, + cfds, ))); Ok(rocket.manage(cfd_actor_inbox).manage(cfd_feed_receiver))