Browse Source

Merge #917

917: Make oracle and monitor actor load CFDs from DB r=thomaseizinger a=thomaseizinger

With event sourcing, these actors will use a different model. Passing
them the DB connection directly now makes this change easier.

Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
update-blockstream-electrum-server-url
bors[bot] 3 years ago
committed by GitHub
parent
commit
0a23346ad0
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 41
      daemon/src/lib.rs
  2. 6
      daemon/src/maker.rs
  3. 6
      daemon/src/monitor.rs
  4. 15
      daemon/src/oracle.rs
  5. 6
      daemon/src/taker.rs
  6. 8
      daemon/tests/harness/mod.rs

41
daemon/src/lib.rs

@ -1,6 +1,5 @@
#![cfg_attr(not(test), warn(clippy::unwrap_used))]
#![warn(clippy::disallowed_method)]
use crate::db::load_all_cfds;
use crate::maker_cfd::FromTaker;
use crate::maker_cfd::TakerConnected;
use crate::model::cfd::Cfd;
@ -133,12 +132,12 @@ where
+ xtra::Handler<wallet::TryBroadcastTransaction>,
{
#[allow(clippy::too_many_arguments)]
pub async fn new<F>(
pub async fn new<FO, FM>(
db: SqlitePool,
wallet_addr: Address<W>,
oracle_pk: schnorrsig::PublicKey,
oracle_constructor: impl FnOnce(Vec<Cfd>, Box<dyn StrongMessageChannel<Attestation>>) -> O,
monitor_constructor: impl FnOnce(Box<dyn StrongMessageChannel<monitor::Event>>, Vec<Cfd>) -> F,
oracle_constructor: impl FnOnce(Box<dyn StrongMessageChannel<Attestation>>) -> FO,
monitor_constructor: impl FnOnce(Box<dyn StrongMessageChannel<monitor::Event>>) -> FM,
inc_conn_constructor: impl FnOnce(
Box<dyn MessageChannel<TakerConnected>>,
Box<dyn MessageChannel<TakerDisconnected>>,
@ -149,12 +148,9 @@ where
projection_actor: Address<projection::Actor>,
) -> Result<Self>
where
F: Future<Output = Result<M>>,
FO: Future<Output = Result<O>>,
FM: Future<Output = Result<M>>,
{
let mut conn = db.acquire().await?;
let cfds = load_all_cfds(&mut conn).await?;
let (monitor_addr, monitor_ctx) = xtra::Context::new(None);
let (oracle_addr, oracle_ctx) = xtra::Context::new(None);
let (inc_conn_addr, inc_conn_ctx) = xtra::Context::new(None);
@ -183,10 +179,7 @@ where
Box::new(cfd_actor_addr.clone()),
)));
tasks.add(
monitor_ctx
.run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?),
);
tasks.add(monitor_ctx.run(monitor_constructor(Box::new(cfd_actor_addr.clone())).await?));
let (fan_out_actor, fan_out_actor_fut) =
fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr])
@ -194,7 +187,7 @@ where
.run();
tasks.add(fan_out_actor_fut);
tasks.add(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor))));
tasks.add(oracle_ctx.run(oracle_constructor(Box::new(fan_out_actor)).await?));
oracle_addr.send(oracle::Sync).await?;
@ -229,13 +222,13 @@ where
+ xtra::Handler<wallet::TryBroadcastTransaction>,
{
#[allow(clippy::too_many_arguments)]
pub async fn new<F>(
pub async fn new<FM, FO>(
db: SqlitePool,
wallet_addr: Address<W>,
oracle_pk: schnorrsig::PublicKey,
identity_sk: x25519_dalek::StaticSecret,
oracle_constructor: impl FnOnce(Vec<Cfd>, Box<dyn StrongMessageChannel<Attestation>>) -> O,
monitor_constructor: impl FnOnce(Box<dyn StrongMessageChannel<monitor::Event>>, Vec<Cfd>) -> F,
oracle_constructor: impl FnOnce(Box<dyn StrongMessageChannel<Attestation>>) -> FO,
monitor_constructor: impl FnOnce(Box<dyn StrongMessageChannel<monitor::Event>>) -> FM,
n_payouts: usize,
maker_heartbeat_interval: Duration,
connect_timeout: Duration,
@ -243,12 +236,9 @@ where
maker_identity: Identity,
) -> Result<Self>
where
F: Future<Output = Result<M>>,
FO: Future<Output = Result<O>>,
FM: Future<Output = Result<M>>,
{
let mut conn = db.acquire().await?;
let cfds = load_all_cfds(&mut conn).await?;
let (maker_online_status_feed_sender, maker_online_status_feed_receiver) =
watch::channel(ConnectionStatus::Offline { reason: None });
@ -295,10 +285,7 @@ where
connect_timeout,
)));
tasks.add(
monitor_ctx
.run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?),
);
tasks.add(monitor_ctx.run(monitor_constructor(Box::new(cfd_actor_addr.clone())).await?));
let (fan_out_actor, fan_out_actor_fut) =
fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr])
@ -307,7 +294,7 @@ where
tasks.add(fan_out_actor_fut);
tasks.add(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor))));
tasks.add(oracle_ctx.run(oracle_constructor(Box::new(fan_out_actor)).await?));
tracing::debug!("Taker actor system ready");

6
daemon/src/maker.rs

@ -249,11 +249,11 @@ async fn main() -> Result<()> {
db.clone(),
wallet.clone(),
oracle,
|cfds, channel| oracle::Actor::new(cfds, channel, SETTLEMENT_INTERVAL),
|channel| oracle::Actor::new(db.clone(), channel, SETTLEMENT_INTERVAL),
{
|channel, cfds| {
|channel| {
let electrum = opts.network.electrum().to_string();
monitor::Actor::new(electrum, channel, cfds)
monitor::Actor::new(db.clone(), electrum, channel)
}
},
|channel0, channel1, channel2| {

6
daemon/src/monitor.rs

@ -1,3 +1,4 @@
use crate::db;
use crate::model;
use crate::model::cfd::CetStatus;
use crate::model::cfd::Cfd;
@ -20,6 +21,7 @@ use bdk::electrum_client::ElectrumApi;
use bdk::electrum_client::GetHistoryRes;
use bdk::electrum_client::HeaderNotification;
use bdk::miniscript::DescriptorTrait;
use sqlx::SqlitePool;
use std::collections::hash_map::Entry;
use std::collections::BTreeMap;
use std::collections::HashMap;
@ -69,10 +71,12 @@ pub struct Actor<C = bdk::electrum_client::Client> {
impl Actor<bdk::electrum_client::Client> {
pub async fn new(
db: SqlitePool,
electrum_rpc_url: String,
event_channel: Box<dyn StrongMessageChannel<Event>>,
cfds: Vec<Cfd>,
) -> Result<Self> {
let cfds = db::load_all_cfds(&mut db.acquire().await?).await?;
let client = bdk::electrum_client::Client::new(&electrum_rpc_url)
.context("Failed to initialize Electrum RPC client")?;

15
daemon/src/oracle.rs

@ -1,4 +1,4 @@
use crate::model::cfd::Cfd;
use crate::db;
use crate::model::cfd::CfdState;
use crate::model::BitMexPriceEventId;
use crate::tokio_ext;
@ -13,6 +13,7 @@ use maia::secp256k1_zkp::SecretKey;
use rocket::time::OffsetDateTime;
use rocket::time::Time;
use serde::Deserialize;
use sqlx::SqlitePool;
use std::collections::HashMap;
use std::collections::HashSet;
use std::ops::Add;
@ -68,11 +69,13 @@ struct NewAttestationFetched {
}
impl Actor {
pub fn new(
cfds: Vec<Cfd>,
pub async fn new(
db: SqlitePool,
attestation_channel: Box<dyn StrongMessageChannel<Attestation>>,
announcement_lookahead: Duration,
) -> Self {
) -> Result<Self> {
let cfds = db::load_all_cfds(&mut db.acquire().await?).await?;
let mut pending_attestations = HashSet::new();
for cfd in cfds {
@ -101,13 +104,13 @@ impl Actor {
}
}
Self {
Ok(Self {
announcements: HashMap::new(),
pending_attestations,
attestation_channel,
announcement_lookahead,
tasks: Tasks::default(),
}
})
}
fn ensure_having_announcements(

6
daemon/src/taker.rs

@ -243,11 +243,11 @@ async fn main() -> Result<()> {
wallet.clone(),
oracle,
identity_sk,
|cfds, channel| oracle::Actor::new(cfds, channel, SETTLEMENT_INTERVAL),
|channel| oracle::Actor::new(db.clone(), channel, SETTLEMENT_INTERVAL),
{
|channel, cfds| {
|channel| {
let electrum = opts.network.electrum().to_string();
monitor::Actor::new(electrum, channel, cfds)
monitor::Actor::new(db.clone(), electrum, channel)
}
},
N_PAYOUTS,

8
daemon/tests/harness/mod.rs

@ -162,8 +162,8 @@ impl Maker {
db.clone(),
wallet_addr,
config.oracle_pk,
|_, _| oracle,
|_, _| async { Ok(monitor) },
|_| async { Ok(oracle) },
|_| async { Ok(monitor) },
|channel0, channel1, channel2| {
maker_inc_connections::Actor::new(
channel0,
@ -297,8 +297,8 @@ impl Taker {
wallet_addr,
config.oracle_pk,
identity_sk,
|_, _| oracle,
|_, _| async { Ok(monitor) },
|_| async { Ok(oracle) },
|_| async { Ok(monitor) },
config.n_payouts,
config.heartbeat_timeout,
Duration::from_secs(10),

Loading…
Cancel
Save