Browse Source

Merge #237

237: Oracle actor startup behavior r=da-kami a=da-kami



Co-authored-by: Daniel Karzel <daniel@comit.network>
feature/integration-tests
bors[bot] 3 years ago
committed by GitHub
parent
commit
8be1e46374
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      daemon/src/maker.rs
  2. 53
      daemon/src/oracle.rs
  3. 16
      daemon/src/taker.rs

8
daemon/src/maker.rs

@ -227,7 +227,7 @@ async fn main() -> Result<()> {
update_cfd_feed_sender, update_cfd_feed_sender,
maker_inc_connections_address.clone(), maker_inc_connections_address.clone(),
monitor_actor_address.clone(), monitor_actor_address.clone(),
oracle_actor_address, oracle_actor_address.clone(),
) )
.create(None) .create(None)
.spawn_global(); .spawn_global();
@ -247,7 +247,7 @@ async fn main() -> Result<()> {
monitor::Actor::new( monitor::Actor::new(
opts.network.electrum(), opts.network.electrum(),
cfd_maker_actor_inbox.clone(), cfd_maker_actor_inbox.clone(),
cfds, cfds.clone(),
) )
.await .await
.unwrap(), .unwrap(),
@ -262,8 +262,12 @@ async fn main() -> Result<()> {
tokio::spawn(oracle_actor_context.run(oracle::Actor::new( tokio::spawn(oracle_actor_context.run(oracle::Actor::new(
cfd_maker_actor_inbox.clone(), cfd_maker_actor_inbox.clone(),
monitor_actor_address, monitor_actor_address,
cfds,
))); )));
// use `.send` here to ensure we only continue once the update was processed
oracle_actor_address.send(oracle::Sync).await.unwrap();
let listener_stream = futures::stream::poll_fn(move |ctx| { let listener_stream = futures::stream::poll_fn(move |ctx| {
let message = match futures::ready!(listener.poll_accept(ctx)) { let message = match futures::ready!(listener.poll_accept(ctx)) {
Ok((stream, address)) => { Ok((stream, address)) => {

53
daemon/src/oracle.rs

@ -1,10 +1,12 @@
use crate::actors::log_error; use crate::actors::log_error;
use crate::model::cfd::{Cfd, CfdState};
use crate::model::OracleEventId; use crate::model::OracleEventId;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use async_trait::async_trait; use async_trait::async_trait;
use cfd_protocol::secp256k1_zkp::{schnorrsig, SecretKey}; use cfd_protocol::secp256k1_zkp::{schnorrsig, SecretKey};
use futures::stream::FuturesOrdered; use futures::stream::FuturesOrdered;
use futures::TryStreamExt; use futures::TryStreamExt;
use reqwest::StatusCode;
use rocket::time::format_description::FormatItem; use rocket::time::format_description::FormatItem;
use rocket::time::macros::format_description; use rocket::time::macros::format_description;
use rocket::time::{Duration, OffsetDateTime, Time}; use rocket::time::{Duration, OffsetDateTime, Time};
@ -24,7 +26,7 @@ where
M: xtra::Handler<Attestation>, M: xtra::Handler<Attestation>,
{ {
latest_announcements: Option<[Announcement; 24]>, latest_announcements: Option<[Announcement; 24]>,
pending_attestations: HashSet<String>, pending_attestations: HashSet<OracleEventId>,
cfd_actor_address: xtra::Address<CFD>, cfd_actor_address: xtra::Address<CFD>,
monitor_actor_address: xtra::Address<M>, monitor_actor_address: xtra::Address<M>,
} }
@ -37,10 +39,38 @@ where
pub fn new( pub fn new(
cfd_actor_address: xtra::Address<CFD>, cfd_actor_address: xtra::Address<CFD>,
monitor_actor_address: xtra::Address<M>, monitor_actor_address: xtra::Address<M>,
cfds: Vec<Cfd>,
) -> Self { ) -> Self {
let mut pending_attestations = HashSet::new();
for cfd in cfds {
match cfd.state.clone() {
CfdState::PendingOpen { .. }
| CfdState::Open { .. }
| CfdState::PendingCommit { .. }
| CfdState::OpenCommitted { .. }
| CfdState::PendingCet { .. } => {
pending_attestations.insert(cfd.order.oracle_event_id);
}
// Irrelevant for restart
CfdState::OutgoingOrderRequest { .. }
| CfdState::IncomingOrderRequest { .. }
| CfdState::Accepted { .. }
| CfdState::Rejected { .. }
| CfdState::ContractSetup { .. }
// Final states
| CfdState::Closed { .. }
| CfdState::MustRefund { .. }
| CfdState::Refunded { .. }
| CfdState::SetupFailed { .. } => ()
}
}
Self { Self {
latest_announcements: None, latest_announcements: None,
pending_attestations: HashSet::new(), pending_attestations,
cfd_actor_address, cfd_actor_address,
monitor_actor_address, monitor_actor_address,
} }
@ -97,15 +127,22 @@ where
for event_id in pending_attestations.into_iter() { for event_id in pending_attestations.into_iter() {
{ {
let res = match reqwest::get(format!("{}{}", OLIVIA_URL, event_id)).await { let res = match reqwest::get(format!("{}{}", OLIVIA_URL, event_id)).await {
Ok(res) => res, Ok(res) if res.status().is_success() => res,
Ok(res) if res.status() == StatusCode::NOT_FOUND => {
tracing::trace!("Attestation not ready yet");
continue;
}
Ok(res) => {
tracing::warn!("Unexpected response, status {}", res.status());
continue;
}
Err(e) => { Err(e) => {
// TODO: Can we differentiate between errors? tracing::warn!(%event_id, "Failed to fetch attestation: {}", e);
tracing::warn!(%event_id, "Attestation not available: {}", e);
continue; continue;
} }
}; };
let attestation = res.json::<Attestation>().await?; let attestation = dbg!(res).json::<Attestation>().await?;
self.cfd_actor_address self.cfd_actor_address
.clone() .clone()
@ -123,7 +160,7 @@ where
Ok(()) Ok(())
} }
fn monitor_event(&mut self, event_id: String) { fn monitor_event(&mut self, event_id: OracleEventId) {
if !self.pending_attestations.insert(event_id.clone()) { if !self.pending_attestations.insert(event_id.clone()) {
tracing::trace!("Event {} already being monitored", event_id); tracing::trace!("Event {} already being monitored", event_id);
} }
@ -169,7 +206,7 @@ where
M: xtra::Handler<Attestation>, M: xtra::Handler<Attestation>,
{ {
async fn handle(&mut self, msg: MonitorEvent, _ctx: &mut xtra::Context<Self>) { async fn handle(&mut self, msg: MonitorEvent, _ctx: &mut xtra::Context<Self>) {
self.monitor_event(msg.event_id.0) self.monitor_event(msg.event_id)
} }
} }

16
daemon/src/taker.rs

@ -227,7 +227,7 @@ async fn main() -> Result<()> {
update_cfd_feed_sender, update_cfd_feed_sender,
send_to_maker, send_to_maker,
monitor_actor_address.clone(), monitor_actor_address.clone(),
oracle_actor_address, oracle_actor_address.clone(),
) )
.create(None) .create(None)
.spawn_global(); .spawn_global();
@ -243,9 +243,13 @@ async fn main() -> Result<()> {
); );
tokio::spawn( tokio::spawn(
monitor_actor_context.run( monitor_actor_context.run(
monitor::Actor::new(opts.network.electrum(), cfd_actor_inbox.clone(), cfds) monitor::Actor::new(
.await opts.network.electrum(),
.unwrap(), cfd_actor_inbox.clone(),
cfds.clone(),
)
.await
.unwrap(),
), ),
); );
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
@ -257,8 +261,12 @@ async fn main() -> Result<()> {
tokio::spawn(oracle_actor_context.run(oracle::Actor::new( tokio::spawn(oracle_actor_context.run(oracle::Actor::new(
cfd_actor_inbox.clone(), cfd_actor_inbox.clone(),
monitor_actor_address, monitor_actor_address,
cfds,
))); )));
// use `.send` here to ensure we only continue once the update was processed
oracle_actor_address.send(oracle::Sync).await.unwrap();
Ok(rocket.manage(cfd_actor_inbox).manage(cfd_feed_receiver)) Ok(rocket.manage(cfd_actor_inbox).manage(cfd_feed_receiver))
}, },
)) ))

Loading…
Cancel
Save