Browse Source

Move taker's `ActorSystem` to `lib` as `Taker`

contact-taker-before-changing-cfd-state
Daniel Karzel 3 years ago
committed by Mariusz Klochowicz
parent
commit
1d6140c516
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 89
      daemon/src/lib.rs
  2. 110
      daemon/src/taker.rs

89
daemon/src/lib.rs

@ -6,6 +6,7 @@ use crate::oracle::Attestation;
use crate::wallet::Wallet;
use anyhow::Result;
use cfd_protocol::secp256k1_zkp::schnorrsig;
use futures::Stream;
use sqlx::SqlitePool;
use std::collections::HashMap;
use std::future::Future;
@ -160,3 +161,91 @@ where
})
}
}
pub struct Taker<O, M> {
pub cfd_actor_addr: Address<taker_cfd::Actor<O, M>>,
pub cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
pub order_feed_receiver: watch::Receiver<Option<Order>>,
pub update_cfd_feed_receiver: watch::Receiver<UpdateCfdProposals>,
}
impl<O, M> Taker<O, M>
where
O: xtra::Handler<oracle::MonitorAttestation>
+ xtra::Handler<oracle::GetAnnouncement>
+ xtra::Handler<oracle::FetchAnnouncement>
+ xtra::Handler<oracle::Sync>,
M: xtra::Handler<monitor::StartMonitoring>
+ xtra::Handler<monitor::Sync>
+ xtra::Handler<monitor::CollaborativeSettlement>
+ xtra::Handler<oracle::Attestation>,
{
pub async fn new<F>(
db: SqlitePool,
wallet: Wallet,
oracle_pk: schnorrsig::PublicKey,
send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker>>,
read_from_maker: Box<dyn Stream<Item = taker_cfd::MakerStreamMessage> + Unpin + Send>,
oracle_constructor: impl Fn(Vec<Cfd>, Box<dyn StrongMessageChannel<Attestation>>) -> O,
monitor_constructor: impl Fn(Box<dyn StrongMessageChannel<monitor::Event>>, Vec<Cfd>) -> F,
) -> Result<Self>
where
F: Future<Output = Result<M>>,
{
let mut conn = db.acquire().await?;
let cfds = load_all_cfds(&mut conn).await?;
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (update_cfd_feed_sender, update_cfd_feed_receiver) =
watch::channel::<UpdateCfdProposals>(HashMap::new());
let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None);
let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None);
let cfd_actor_addr = taker_cfd::Actor::new(
db,
wallet,
oracle_pk,
cfd_feed_sender,
order_feed_sender,
update_cfd_feed_sender,
send_to_maker,
monitor_addr.clone(),
oracle_addr,
)
.create(None)
.spawn_global();
tokio::spawn(cfd_actor_addr.clone().attach_stream(read_from_maker));
tokio::spawn(
monitor_ctx
.notify_interval(Duration::from_secs(20), || monitor::Sync)
.map_err(|e| anyhow::anyhow!(e))?,
);
tokio::spawn(
monitor_ctx
.run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?),
);
tokio::spawn(
oracle_ctx
.notify_interval(Duration::from_secs(5), || oracle::Sync)
.unwrap(),
);
let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr])
.create(None)
.spawn_global();
tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor))));
Ok(Self {
cfd_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
})
}
}

110
daemon/src/taker.rs

@ -2,28 +2,26 @@ use anyhow::{Context, Result};
use bdk::bitcoin;
use bdk::bitcoin::secp256k1::schnorrsig;
use clap::Clap;
use daemon::db::{self, load_all_cfds};
use daemon::model::cfd::{Cfd, Order, UpdateCfdProposals};
use daemon::db::{self};
use daemon::model::WalletInfo;
use daemon::oracle::Attestation;
use daemon::seed::Seed;
use daemon::wallet::Wallet;
use daemon::{
bitmex_price_feed, fan_out, housekeeping, logger, monitor, oracle, taker_cfd, wallet_sync, wire,
bitmex_price_feed, housekeeping, logger, monitor, oracle, taker_cfd, wallet_sync, Taker,
};
use futures::{Future, Stream};
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
use tokio::sync::watch;
use tracing_subscriber::filter::LevelFilter;
use xtra::prelude::{MessageChannel, StrongMessageChannel};
use xtra::spawn::TokioGlobalSpawnExt;
use xtra::{Actor, Address};
use xtra::prelude::MessageChannel;
mod connection;
mod routes_taker;
@ -167,12 +165,12 @@ async fn main() -> Result<()> {
read_from_maker,
} = connection::Actor::new(opts.maker).await;
let ActorSystem {
let Taker {
cfd_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
} = ActorSystem::new(
} = Taker::new(
db.clone(),
wallet.clone(),
oracle,
@ -222,91 +220,3 @@ async fn main() -> Result<()> {
Ok(())
}
pub struct ActorSystem<O, M> {
cfd_actor_addr: Address<taker_cfd::Actor<O, M>>,
cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
order_feed_receiver: watch::Receiver<Option<Order>>,
update_cfd_feed_receiver: watch::Receiver<UpdateCfdProposals>,
}
impl<O, M> ActorSystem<O, M>
where
O: xtra::Handler<oracle::MonitorAttestation>
+ xtra::Handler<oracle::GetAnnouncement>
+ xtra::Handler<oracle::FetchAnnouncement>
+ xtra::Handler<oracle::Sync>,
M: xtra::Handler<monitor::StartMonitoring>
+ xtra::Handler<monitor::Sync>
+ xtra::Handler<monitor::CollaborativeSettlement>
+ xtra::Handler<oracle::Attestation>,
{
pub async fn new<F>(
db: SqlitePool,
wallet: Wallet,
oracle_pk: schnorrsig::PublicKey,
send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker>>,
read_from_maker: Box<dyn Stream<Item = taker_cfd::MakerStreamMessage> + Unpin + Send>,
oracle_constructor: impl Fn(Vec<Cfd>, Box<dyn StrongMessageChannel<Attestation>>) -> O,
monitor_constructor: impl Fn(Box<dyn StrongMessageChannel<monitor::Event>>, Vec<Cfd>) -> F,
) -> Result<Self>
where
F: Future<Output = Result<M>>,
{
let mut conn = db.acquire().await?;
let cfds = load_all_cfds(&mut conn).await?;
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (update_cfd_feed_sender, update_cfd_feed_receiver) =
watch::channel::<UpdateCfdProposals>(HashMap::new());
let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None);
let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None);
let cfd_actor_addr = taker_cfd::Actor::new(
db,
wallet,
oracle_pk,
cfd_feed_sender,
order_feed_sender,
update_cfd_feed_sender,
send_to_maker,
monitor_addr.clone(),
oracle_addr,
)
.create(None)
.spawn_global();
tokio::spawn(cfd_actor_addr.clone().attach_stream(read_from_maker));
tokio::spawn(
monitor_ctx
.notify_interval(Duration::from_secs(20), || monitor::Sync)
.map_err(|e| anyhow::anyhow!(e))?,
);
tokio::spawn(
monitor_ctx
.run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?),
);
tokio::spawn(
oracle_ctx
.notify_interval(Duration::from_secs(5), || oracle::Sync)
.unwrap(),
);
let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr])
.create(None)
.spawn_global();
tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor))));
Ok(Self {
cfd_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
})
}
}

Loading…
Cancel
Save