Browse Source

Move init of syncing task to respective actor

This reduces code duplication across maker and taker.
update-blockstream-electrum-server-url
Thomas Eizinger 3 years ago
parent
commit
a5b5dd66d8
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 29
      daemon/src/lib.rs
  2. 20
      daemon/src/monitor.rs
  3. 14
      daemon/src/oracle.rs

29
daemon/src/lib.rs

@ -155,8 +155,8 @@ where
let cfds = load_all_cfds(&mut conn).await?;
let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None);
let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None);
let (monitor_addr, monitor_ctx) = xtra::Context::new(None);
let (oracle_addr, oracle_ctx) = xtra::Context::new(None);
let (inc_conn_addr, inc_conn_ctx) = xtra::Context::new(None);
let mut tasks = Tasks::default();
@ -183,21 +183,11 @@ where
Box::new(cfd_actor_addr.clone()),
)));
tasks.add(
monitor_ctx
.notify_interval(Duration::from_secs(20), || monitor::Sync)
.map_err(|e| anyhow::anyhow!(e))?,
);
tasks.add(
monitor_ctx
.run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?),
);
tasks.add(
oracle_ctx
.notify_interval(Duration::from_secs(5), || oracle::Sync)
.map_err(|e| anyhow::anyhow!(e))?,
);
let (fan_out_actor, fan_out_actor_fut) =
fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr])
.create(None)
@ -262,8 +252,8 @@ where
let (maker_online_status_feed_sender, maker_online_status_feed_receiver) =
watch::channel(ConnectionStatus::Offline { reason: None });
let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None);
let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None);
let (monitor_addr, monitor_ctx) = xtra::Context::new(None);
let (oracle_addr, oracle_ctx) = xtra::Context::new(None);
let mut tasks = Tasks::default();
@ -305,22 +295,11 @@ where
connect_timeout,
)));
tasks.add(
monitor_ctx
.notify_interval(Duration::from_secs(20), || monitor::Sync)
.map_err(|e| anyhow::anyhow!(e))?,
);
tasks.add(
monitor_ctx
.run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?),
);
tasks.add(
oracle_ctx
.notify_interval(Duration::from_secs(5), || oracle::Sync)
.map_err(|e| anyhow::anyhow!(e))?,
);
let (fan_out_actor, fan_out_actor_fut) =
fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr])
.create(None)

20
daemon/src/monitor.rs

@ -8,6 +8,7 @@ use crate::model::BitMexPriceEventId;
use crate::oracle;
use crate::oracle::Attestation;
use crate::try_continue;
use crate::Tasks;
use anyhow::Context;
use anyhow::Result;
use async_trait::async_trait;
@ -28,6 +29,7 @@ use std::fmt;
use std::marker::Send;
use std::ops::Add;
use std::ops::RangeInclusive;
use std::time::Duration;
use xtra::prelude::StrongMessageChannel;
use xtra_productivity::xtra_productivity;
@ -62,6 +64,7 @@ pub struct Actor<C = bdk::electrum_client::Client> {
latest_block_height: BlockHeight,
current_status: BTreeMap<(Txid, Script), ScriptStatus>,
awaiting_status: HashMap<(Txid, Script), Vec<(ScriptStatus, Event)>>,
tasks: Tasks,
}
impl Actor<bdk::electrum_client::Client> {
@ -86,6 +89,7 @@ impl Actor<bdk::electrum_client::Client> {
latest_block_height: BlockHeight::try_from(latest_block)?,
current_status: BTreeMap::default(),
awaiting_status: HashMap::default(),
tasks: Tasks::default(),
};
for cfd in cfds {
@ -632,7 +636,20 @@ impl xtra::Message for Sync {
type Result = ();
}
impl<C> xtra::Actor for Actor<C> where C: Send + 'static {}
#[async_trait]
impl<C> xtra::Actor for Actor<C>
where
C: Send + 'static,
Self: xtra::Handler<Sync>,
{
async fn started(&mut self, ctx: &mut xtra::Context<Self>) {
let fut = ctx
.notify_interval(Duration::from_secs(20), || Sync)
.expect("we just started");
self.tasks.add(fut);
}
}
#[xtra_productivity]
impl<C> Actor<C>
@ -822,6 +839,7 @@ mod tests {
latest_block_height: BlockHeight(0),
current_status: BTreeMap::default(),
awaiting_status: HashMap::from_iter(subscriptions),
tasks: Tasks::default(),
}
}
}

14
daemon/src/oracle.rs

@ -4,6 +4,7 @@ use crate::model::BitMexPriceEventId;
use crate::tokio_ext;
use crate::try_continue;
use crate::xtra_ext::LogFailure;
use crate::Tasks;
use anyhow::Context;
use anyhow::Result;
use async_trait::async_trait;
@ -25,6 +26,7 @@ pub struct Actor {
pending_attestations: HashSet<BitMexPriceEventId>,
attestation_channel: Box<dyn StrongMessageChannel<Attestation>>,
announcement_lookahead: Duration,
tasks: Tasks,
}
pub struct Sync;
@ -104,6 +106,7 @@ impl Actor {
pending_attestations,
attestation_channel,
announcement_lookahead,
tasks: Tasks::default(),
}
}
@ -304,7 +307,16 @@ impl From<Announcement> for maia::Announcement {
}
}
impl xtra::Actor for Actor {}
#[async_trait]
impl xtra::Actor for Actor {
async fn started(&mut self, ctx: &mut xtra::Context<Self>) {
let fut = ctx
.notify_interval(std::time::Duration::from_secs(5), || Sync)
.expect("we just started");
self.tasks.add(fut);
}
}
impl xtra::Message for Attestation {
type Result = ();

Loading…
Cancel
Save