Browse Source

Handle oracle actor pending attestations upon startup

We `insert` - which returns true if the value was not present in the set yet and false if it already was (does not fail if already in set).
feature/integration-tests
Daniel Karzel 3 years ago
parent
commit
1dcb23bdf6
No known key found for this signature in database GPG Key ID: 30C3FC2E438ADB6E
  1. 3
      daemon/src/maker.rs
  2. 31
      daemon/src/oracle.rs
  3. 11
      daemon/src/taker.rs

3
daemon/src/maker.rs

@ -247,7 +247,7 @@ async fn main() -> Result<()> {
monitor::Actor::new( monitor::Actor::new(
opts.network.electrum(), opts.network.electrum(),
cfd_maker_actor_inbox.clone(), cfd_maker_actor_inbox.clone(),
cfds, cfds.clone(),
) )
.await .await
.unwrap(), .unwrap(),
@ -262,6 +262,7 @@ async fn main() -> Result<()> {
tokio::spawn(oracle_actor_context.run(oracle::Actor::new( tokio::spawn(oracle_actor_context.run(oracle::Actor::new(
cfd_maker_actor_inbox.clone(), cfd_maker_actor_inbox.clone(),
monitor_actor_address, monitor_actor_address,
cfds,
))); )));
let listener_stream = futures::stream::poll_fn(move |ctx| { let listener_stream = futures::stream::poll_fn(move |ctx| {

31
daemon/src/oracle.rs

@ -1,4 +1,5 @@
use crate::actors::log_error; use crate::actors::log_error;
use crate::model::cfd::{Cfd, CfdState};
use crate::model::OracleEventId; use crate::model::OracleEventId;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use async_trait::async_trait; use async_trait::async_trait;
@ -37,10 +38,38 @@ where
pub fn new( pub fn new(
cfd_actor_address: xtra::Address<CFD>, cfd_actor_address: xtra::Address<CFD>,
monitor_actor_address: xtra::Address<M>, monitor_actor_address: xtra::Address<M>,
cfds: Vec<Cfd>,
) -> Self { ) -> Self {
let mut pending_attestations = HashSet::new();
for cfd in cfds {
match cfd.state.clone() {
CfdState::PendingOpen { .. }
| CfdState::Open { .. }
| CfdState::PendingCommit { .. }
| CfdState::OpenCommitted { .. }
| CfdState::PendingCet { .. } => {
pending_attestations.insert(cfd.order.oracle_event_id);
}
// Irrelevant for restart
CfdState::OutgoingOrderRequest { .. }
| CfdState::IncomingOrderRequest { .. }
| CfdState::Accepted { .. }
| CfdState::Rejected { .. }
| CfdState::ContractSetup { .. }
// Final states
| CfdState::Closed { .. }
| CfdState::MustRefund { .. }
| CfdState::Refunded { .. }
| CfdState::SetupFailed { .. } => ()
}
}
Self { Self {
latest_announcements: None, latest_announcements: None,
pending_attestations: HashSet::new(), pending_attestations,
cfd_actor_address, cfd_actor_address,
monitor_actor_address, monitor_actor_address,
} }

11
daemon/src/taker.rs

@ -243,9 +243,13 @@ async fn main() -> Result<()> {
); );
tokio::spawn( tokio::spawn(
monitor_actor_context.run( monitor_actor_context.run(
monitor::Actor::new(opts.network.electrum(), cfd_actor_inbox.clone(), cfds) monitor::Actor::new(
.await opts.network.electrum(),
.unwrap(), cfd_actor_inbox.clone(),
cfds.clone(),
)
.await
.unwrap(),
), ),
); );
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
@ -257,6 +261,7 @@ async fn main() -> Result<()> {
tokio::spawn(oracle_actor_context.run(oracle::Actor::new( tokio::spawn(oracle_actor_context.run(oracle::Actor::new(
cfd_actor_inbox.clone(), cfd_actor_inbox.clone(),
monitor_actor_address, monitor_actor_address,
cfds,
))); )));
Ok(rocket.manage(cfd_actor_inbox).manage(cfd_feed_receiver)) Ok(rocket.manage(cfd_actor_inbox).manage(cfd_feed_receiver))

Loading…
Cancel
Save