Browse Source

Make oracle and monitor actor load CFDs from DB

With event sourcing, these actors will use a different model. Passing
them the DB connection directly now makes this change easier.
update-blockstream-electrum-server-url
Thomas Eizinger 3 years ago
parent
commit
739a4a40f5
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 41
      daemon/src/lib.rs
  2. 6
      daemon/src/maker.rs
  3. 6
      daemon/src/monitor.rs
  4. 15
      daemon/src/oracle.rs
  5. 6
      daemon/src/taker.rs
  6. 8
      daemon/tests/harness/mod.rs

41
daemon/src/lib.rs

@ -1,6 +1,5 @@
#![cfg_attr(not(test), warn(clippy::unwrap_used))] #![cfg_attr(not(test), warn(clippy::unwrap_used))]
#![warn(clippy::disallowed_method)] #![warn(clippy::disallowed_method)]
use crate::db::load_all_cfds;
use crate::maker_cfd::FromTaker; use crate::maker_cfd::FromTaker;
use crate::maker_cfd::TakerConnected; use crate::maker_cfd::TakerConnected;
use crate::model::cfd::Cfd; use crate::model::cfd::Cfd;
@ -133,12 +132,12 @@ where
+ xtra::Handler<wallet::TryBroadcastTransaction>, + xtra::Handler<wallet::TryBroadcastTransaction>,
{ {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub async fn new<F>( pub async fn new<FO, FM>(
db: SqlitePool, db: SqlitePool,
wallet_addr: Address<W>, wallet_addr: Address<W>,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
oracle_constructor: impl FnOnce(Vec<Cfd>, Box<dyn StrongMessageChannel<Attestation>>) -> O, oracle_constructor: impl FnOnce(Box<dyn StrongMessageChannel<Attestation>>) -> FO,
monitor_constructor: impl FnOnce(Box<dyn StrongMessageChannel<monitor::Event>>, Vec<Cfd>) -> F, monitor_constructor: impl FnOnce(Box<dyn StrongMessageChannel<monitor::Event>>) -> FM,
inc_conn_constructor: impl FnOnce( inc_conn_constructor: impl FnOnce(
Box<dyn MessageChannel<TakerConnected>>, Box<dyn MessageChannel<TakerConnected>>,
Box<dyn MessageChannel<TakerDisconnected>>, Box<dyn MessageChannel<TakerDisconnected>>,
@ -149,12 +148,9 @@ where
projection_actor: Address<projection::Actor>, projection_actor: Address<projection::Actor>,
) -> Result<Self> ) -> Result<Self>
where where
F: Future<Output = Result<M>>, FO: Future<Output = Result<O>>,
FM: Future<Output = Result<M>>,
{ {
let mut conn = db.acquire().await?;
let cfds = load_all_cfds(&mut conn).await?;
let (monitor_addr, monitor_ctx) = xtra::Context::new(None); let (monitor_addr, monitor_ctx) = xtra::Context::new(None);
let (oracle_addr, oracle_ctx) = xtra::Context::new(None); let (oracle_addr, oracle_ctx) = xtra::Context::new(None);
let (inc_conn_addr, inc_conn_ctx) = xtra::Context::new(None); let (inc_conn_addr, inc_conn_ctx) = xtra::Context::new(None);
@ -183,10 +179,7 @@ where
Box::new(cfd_actor_addr.clone()), Box::new(cfd_actor_addr.clone()),
))); )));
tasks.add( tasks.add(monitor_ctx.run(monitor_constructor(Box::new(cfd_actor_addr.clone())).await?));
monitor_ctx
.run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?),
);
let (fan_out_actor, fan_out_actor_fut) = let (fan_out_actor, fan_out_actor_fut) =
fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr]) fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr])
@ -194,7 +187,7 @@ where
.run(); .run();
tasks.add(fan_out_actor_fut); tasks.add(fan_out_actor_fut);
tasks.add(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor)))); tasks.add(oracle_ctx.run(oracle_constructor(Box::new(fan_out_actor)).await?));
oracle_addr.send(oracle::Sync).await?; oracle_addr.send(oracle::Sync).await?;
@ -229,13 +222,13 @@ where
+ xtra::Handler<wallet::TryBroadcastTransaction>, + xtra::Handler<wallet::TryBroadcastTransaction>,
{ {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub async fn new<F>( pub async fn new<FM, FO>(
db: SqlitePool, db: SqlitePool,
wallet_addr: Address<W>, wallet_addr: Address<W>,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
identity_sk: x25519_dalek::StaticSecret, identity_sk: x25519_dalek::StaticSecret,
oracle_constructor: impl FnOnce(Vec<Cfd>, Box<dyn StrongMessageChannel<Attestation>>) -> O, oracle_constructor: impl FnOnce(Box<dyn StrongMessageChannel<Attestation>>) -> FO,
monitor_constructor: impl FnOnce(Box<dyn StrongMessageChannel<monitor::Event>>, Vec<Cfd>) -> F, monitor_constructor: impl FnOnce(Box<dyn StrongMessageChannel<monitor::Event>>) -> FM,
n_payouts: usize, n_payouts: usize,
maker_heartbeat_interval: Duration, maker_heartbeat_interval: Duration,
connect_timeout: Duration, connect_timeout: Duration,
@ -243,12 +236,9 @@ where
maker_identity: Identity, maker_identity: Identity,
) -> Result<Self> ) -> Result<Self>
where where
F: Future<Output = Result<M>>, FO: Future<Output = Result<O>>,
FM: Future<Output = Result<M>>,
{ {
let mut conn = db.acquire().await?;
let cfds = load_all_cfds(&mut conn).await?;
let (maker_online_status_feed_sender, maker_online_status_feed_receiver) = let (maker_online_status_feed_sender, maker_online_status_feed_receiver) =
watch::channel(ConnectionStatus::Offline { reason: None }); watch::channel(ConnectionStatus::Offline { reason: None });
@ -295,10 +285,7 @@ where
connect_timeout, connect_timeout,
))); )));
tasks.add( tasks.add(monitor_ctx.run(monitor_constructor(Box::new(cfd_actor_addr.clone())).await?));
monitor_ctx
.run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?),
);
let (fan_out_actor, fan_out_actor_fut) = let (fan_out_actor, fan_out_actor_fut) =
fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr]) fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr])
@ -307,7 +294,7 @@ where
tasks.add(fan_out_actor_fut); tasks.add(fan_out_actor_fut);
tasks.add(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor)))); tasks.add(oracle_ctx.run(oracle_constructor(Box::new(fan_out_actor)).await?));
tracing::debug!("Taker actor system ready"); tracing::debug!("Taker actor system ready");

6
daemon/src/maker.rs

@ -249,11 +249,11 @@ async fn main() -> Result<()> {
db.clone(), db.clone(),
wallet.clone(), wallet.clone(),
oracle, oracle,
|cfds, channel| oracle::Actor::new(cfds, channel, SETTLEMENT_INTERVAL), |channel| oracle::Actor::new(db.clone(), channel, SETTLEMENT_INTERVAL),
{ {
|channel, cfds| { |channel| {
let electrum = opts.network.electrum().to_string(); let electrum = opts.network.electrum().to_string();
monitor::Actor::new(electrum, channel, cfds) monitor::Actor::new(db.clone(), electrum, channel)
} }
}, },
|channel0, channel1, channel2| { |channel0, channel1, channel2| {

6
daemon/src/monitor.rs

@ -1,3 +1,4 @@
use crate::db;
use crate::model; use crate::model;
use crate::model::cfd::CetStatus; use crate::model::cfd::CetStatus;
use crate::model::cfd::Cfd; use crate::model::cfd::Cfd;
@ -20,6 +21,7 @@ use bdk::electrum_client::ElectrumApi;
use bdk::electrum_client::GetHistoryRes; use bdk::electrum_client::GetHistoryRes;
use bdk::electrum_client::HeaderNotification; use bdk::electrum_client::HeaderNotification;
use bdk::miniscript::DescriptorTrait; use bdk::miniscript::DescriptorTrait;
use sqlx::SqlitePool;
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::collections::HashMap; use std::collections::HashMap;
@ -69,10 +71,12 @@ pub struct Actor<C = bdk::electrum_client::Client> {
impl Actor<bdk::electrum_client::Client> { impl Actor<bdk::electrum_client::Client> {
pub async fn new( pub async fn new(
db: SqlitePool,
electrum_rpc_url: String, electrum_rpc_url: String,
event_channel: Box<dyn StrongMessageChannel<Event>>, event_channel: Box<dyn StrongMessageChannel<Event>>,
cfds: Vec<Cfd>,
) -> Result<Self> { ) -> Result<Self> {
let cfds = db::load_all_cfds(&mut db.acquire().await?).await?;
let client = bdk::electrum_client::Client::new(&electrum_rpc_url) let client = bdk::electrum_client::Client::new(&electrum_rpc_url)
.context("Failed to initialize Electrum RPC client")?; .context("Failed to initialize Electrum RPC client")?;

15
daemon/src/oracle.rs

@ -1,4 +1,4 @@
use crate::model::cfd::Cfd; use crate::db;
use crate::model::cfd::CfdState; use crate::model::cfd::CfdState;
use crate::model::BitMexPriceEventId; use crate::model::BitMexPriceEventId;
use crate::tokio_ext; use crate::tokio_ext;
@ -13,6 +13,7 @@ use maia::secp256k1_zkp::SecretKey;
use rocket::time::OffsetDateTime; use rocket::time::OffsetDateTime;
use rocket::time::Time; use rocket::time::Time;
use serde::Deserialize; use serde::Deserialize;
use sqlx::SqlitePool;
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::HashSet; use std::collections::HashSet;
use std::ops::Add; use std::ops::Add;
@ -68,11 +69,13 @@ struct NewAttestationFetched {
} }
impl Actor { impl Actor {
pub fn new( pub async fn new(
cfds: Vec<Cfd>, db: SqlitePool,
attestation_channel: Box<dyn StrongMessageChannel<Attestation>>, attestation_channel: Box<dyn StrongMessageChannel<Attestation>>,
announcement_lookahead: Duration, announcement_lookahead: Duration,
) -> Self { ) -> Result<Self> {
let cfds = db::load_all_cfds(&mut db.acquire().await?).await?;
let mut pending_attestations = HashSet::new(); let mut pending_attestations = HashSet::new();
for cfd in cfds { for cfd in cfds {
@ -101,13 +104,13 @@ impl Actor {
} }
} }
Self { Ok(Self {
announcements: HashMap::new(), announcements: HashMap::new(),
pending_attestations, pending_attestations,
attestation_channel, attestation_channel,
announcement_lookahead, announcement_lookahead,
tasks: Tasks::default(), tasks: Tasks::default(),
} })
} }
fn ensure_having_announcements( fn ensure_having_announcements(

6
daemon/src/taker.rs

@ -243,11 +243,11 @@ async fn main() -> Result<()> {
wallet.clone(), wallet.clone(),
oracle, oracle,
identity_sk, identity_sk,
|cfds, channel| oracle::Actor::new(cfds, channel, SETTLEMENT_INTERVAL), |channel| oracle::Actor::new(db.clone(), channel, SETTLEMENT_INTERVAL),
{ {
|channel, cfds| { |channel| {
let electrum = opts.network.electrum().to_string(); let electrum = opts.network.electrum().to_string();
monitor::Actor::new(electrum, channel, cfds) monitor::Actor::new(db.clone(), electrum, channel)
} }
}, },
N_PAYOUTS, N_PAYOUTS,

8
daemon/tests/harness/mod.rs

@ -162,8 +162,8 @@ impl Maker {
db.clone(), db.clone(),
wallet_addr, wallet_addr,
config.oracle_pk, config.oracle_pk,
|_, _| oracle, |_| async { Ok(oracle) },
|_, _| async { Ok(monitor) }, |_| async { Ok(monitor) },
|channel0, channel1, channel2| { |channel0, channel1, channel2| {
maker_inc_connections::Actor::new( maker_inc_connections::Actor::new(
channel0, channel0,
@ -297,8 +297,8 @@ impl Taker {
wallet_addr, wallet_addr,
config.oracle_pk, config.oracle_pk,
identity_sk, identity_sk,
|_, _| oracle, |_| async { Ok(oracle) },
|_, _| async { Ok(monitor) }, |_| async { Ok(monitor) },
config.n_payouts, config.n_payouts,
config.heartbeat_timeout, config.heartbeat_timeout,
Duration::from_secs(10), Duration::from_secs(10),

Loading…
Cancel
Save