Browse Source

Introduce maker ActorSystem

debug-statements
Lucas Soriano del Pino 3 years ago
parent
commit
79c377a7b8
No known key found for this signature in database GPG Key ID: EE611E973A1530E7
  1. 234
      daemon/src/maker.rs
  2. 4
      daemon/src/maker_inc_connections.rs
  3. 8
      daemon/src/monitor.rs
  4. 4
      daemon/src/oracle.rs
  5. 6
      daemon/src/taker.rs

234
daemon/src/maker.rs

@ -4,23 +4,28 @@ use bdk::bitcoin::secp256k1::schnorrsig;
use clap::Clap;
use daemon::auth::{self, MAKER_USERNAME};
use daemon::db::{self, load_all_cfds};
use daemon::model::cfd::{Order, UpdateCfdProposals};
use daemon::maker_cfd::{FromTaker, NewTakerOnline};
use daemon::model::cfd::{Cfd, Order, UpdateCfdProposals};
use daemon::model::WalletInfo;
use daemon::oracle::Attestation;
use daemon::seed::Seed;
use daemon::wallet::Wallet;
use daemon::{
bitmex_price_feed, fan_out, housekeeping, logger, maker_cfd, maker_inc_connections, monitor,
oracle, wallet_sync,
};
use futures::Future;
use rocket::fairing::AdHoc;
use rocket_db_pools::Database;
use sqlx::SqlitePool;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::task::Poll;
use std::time::Duration;
use tokio::sync::watch;
use tokio::net::TcpListener;
use tokio::sync::watch::{self, Receiver};
use tracing_subscriber::filter::LevelFilter;
use xtra::prelude::*;
use xtra::spawn::TokioGlobalSpawnExt;
@ -148,10 +153,7 @@ async fn main() -> Result<()> {
"ddd4636845a90185991826be5a494cde9f4a6947b1727217afedc6292fa4caf7",
)?;
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info);
let (update_cfd_feed_sender, update_cfd_feed_receiver) =
watch::channel::<UpdateCfdProposals>(HashMap::new());
let figment = rocket::Config::figment()
.merge(("databases.maker.url", data_dir.join("maker.sqlite")))
@ -172,9 +174,7 @@ async fn main() -> Result<()> {
tokio::spawn(task);
rocket::custom(figment)
.manage(order_feed_receiver)
.manage(wallet_feed_receiver)
.manage(update_cfd_feed_receiver)
.manage(auth_password)
.manage(quote_updates)
.manage(bitcoin_network)
@ -191,13 +191,13 @@ async fn main() -> Result<()> {
}
},
))
.attach(AdHoc::try_on_ignite(
"Create actors",
.attach(AdHoc::try_on_ignite("Create actors", {
move |rocket| async move {
let db = match Db::fetch(&rocket) {
Some(db) => (**db).clone(),
None => return Err(rocket),
};
let mut conn = db.acquire().await.unwrap();
housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn)
@ -206,98 +206,45 @@ async fn main() -> Result<()> {
housekeeping::rebroadcast_transactions(&mut conn, &wallet)
.await
.unwrap();
let cfds = load_all_cfds(&mut conn).await.unwrap();
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
let (maker_inc_connections_address, maker_inc_connections_context) =
xtra::Context::new(None);
let (monitor_actor_address, mut monitor_actor_context) = xtra::Context::new(None);
let (oracle_actor_address, mut oracle_actor_context) = xtra::Context::new(None);
let mut conn = db.acquire().await.unwrap();
let cfds = load_all_cfds(&mut conn).await.unwrap();
let cfd_maker_actor_inbox = maker_cfd::Actor::new(
let ActorSystem {
cfd_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
} = ActorSystem::new(
db,
wallet.clone(),
oracle,
cfd_feed_sender,
order_feed_sender,
update_cfd_feed_sender,
maker_inc_connections_address.clone(),
monitor_actor_address.clone(),
oracle_actor_address.clone(),
)
.create(None)
.spawn_global();
tokio::spawn(
maker_inc_connections_context.run(maker_inc_connections::Actor::new(
&cfd_maker_actor_inbox,
&cfd_maker_actor_inbox,
)),
);
tokio::spawn(
monitor_actor_context
.notify_interval(Duration::from_secs(20), || monitor::Sync)
.unwrap(),
);
tokio::spawn(
monitor_actor_context.run(
monitor::Actor::new(
opts.network.electrum(),
cfd_maker_actor_inbox.clone(),
cfds.clone(),
)
.await
.unwrap(),
),
);
tokio::spawn(
oracle_actor_context
.notify_interval(Duration::from_secs(5), || oracle::Sync)
.unwrap(),
);
let actor = fan_out::Actor::new(&[&cfd_maker_actor_inbox, &monitor_actor_address])
.create(None)
.spawn_global();
tokio::spawn(oracle_actor_context.run(oracle::Actor::new(cfds, actor)));
oracle_actor_address
.do_send_async(oracle::Sync)
.await
.unwrap();
let listener_stream = futures::stream::poll_fn(move |ctx| {
let message = match futures::ready!(listener.poll_accept(ctx)) {
Ok((stream, address)) => {
maker_inc_connections::ListenerMessage::NewConnection {
stream,
address,
|cfds, channel| oracle::Actor::new(cfds, channel),
{
|channel, cfds| {
let electrum = opts.network.electrum().to_string();
async move {
monitor::Actor::new(electrum, channel, cfds.clone()).await
}
}
Err(e) => maker_inc_connections::ListenerMessage::Error { source: e },
};
Poll::Ready(Some(message))
});
},
|channel0, channel1| maker_inc_connections::Actor::new(channel0, channel1),
listener,
)
.await;
tokio::spawn(maker_inc_connections_address.attach_stream(listener_stream));
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
let cfd_action_channel =
MessageChannel::<maker_cfd::CfdAction>::clone_channel(&cfd_maker_actor_inbox);
MessageChannel::<maker_cfd::CfdAction>::clone_channel(&cfd_actor_addr);
let new_order_channel =
MessageChannel::<maker_cfd::NewOrder>::clone_channel(&cfd_maker_actor_inbox);
MessageChannel::<maker_cfd::NewOrder>::clone_channel(&cfd_actor_addr);
Ok(rocket
.manage(order_feed_receiver)
.manage(update_cfd_feed_receiver)
.manage(cfd_action_channel)
.manage(new_order_channel)
.manage(cfd_feed_receiver))
},
))
}
}))
.mount(
"/api",
rocket::routes![
@ -318,3 +265,118 @@ async fn main() -> Result<()> {
Ok(())
}
pub struct ActorSystem<O, M, T> {
cfd_actor_addr: Address<maker_cfd::Actor<O, M, T>>,
cfd_feed_receiver: Receiver<Vec<Cfd>>,
order_feed_receiver: Receiver<Option<Order>>,
update_cfd_feed_receiver: Receiver<UpdateCfdProposals>,
}
impl<O, M, T> ActorSystem<O, M, T>
where
O: xtra::Handler<oracle::MonitorAttestation>
+ xtra::Handler<oracle::GetAnnouncement>
+ xtra::Handler<oracle::FetchAnnouncement>
+ xtra::Handler<oracle::Sync>,
M: xtra::Handler<monitor::StartMonitoring>
+ xtra::Handler<monitor::Sync>
+ xtra::Handler<monitor::CollaborativeSettlement>,
T: xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>
+ xtra::Handler<maker_inc_connections::ListenerMessage>,
{
pub async fn new<F>(
db: SqlitePool,
wallet: Wallet,
oracle_pk: schnorrsig::PublicKey,
oracle_constructor: impl Fn(Vec<Cfd>, Box<dyn StrongMessageChannel<Attestation>>) -> O,
monitor_constructor: impl Fn(Box<dyn StrongMessageChannel<monitor::Event>>, Vec<Cfd>) -> F,
inc_conn_constructor: impl Fn(
Box<dyn MessageChannel<NewTakerOnline>>,
Box<dyn MessageChannel<FromTaker>>,
) -> T,
listener: TcpListener,
) -> Self
where
F: Future<Output = Result<M>>,
{
let mut conn = db.acquire().await.unwrap();
let cfds = load_all_cfds(&mut conn).await.unwrap();
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (update_cfd_feed_sender, update_cfd_feed_receiver) =
watch::channel::<UpdateCfdProposals>(HashMap::new());
let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None);
let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None);
let (inc_conn_addr, inc_conn_ctx) = xtra::Context::new(None);
let cfd_actor_addr = maker_cfd::Actor::new(
db,
wallet,
oracle_pk,
cfd_feed_sender,
order_feed_sender,
update_cfd_feed_sender,
inc_conn_addr.clone(),
monitor_addr,
oracle_addr.clone(),
)
.create(None)
.spawn_global();
tokio::spawn(inc_conn_ctx.run(inc_conn_constructor(
Box::new(cfd_actor_addr.clone()),
Box::new(cfd_actor_addr.clone()),
)));
tokio::spawn(
monitor_ctx
.notify_interval(Duration::from_secs(20), || monitor::Sync)
.unwrap(),
);
tokio::spawn(
monitor_ctx.run(
monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone())
.await
.unwrap(),
),
);
tokio::spawn(
oracle_ctx
.notify_interval(Duration::from_secs(5), || oracle::Sync)
.unwrap(),
);
let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &cfd_actor_addr])
.create(None)
.spawn_global();
tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor))));
oracle_addr.do_send_async(oracle::Sync).await.unwrap();
let listener_stream = futures::stream::poll_fn(move |ctx| {
let message = match futures::ready!(listener.poll_accept(ctx)) {
Ok((stream, address)) => {
maker_inc_connections::ListenerMessage::NewConnection { stream, address }
}
Err(e) => maker_inc_connections::ListenerMessage::Error { source: e },
};
Poll::Ready(Some(message))
});
tokio::spawn(inc_conn_addr.attach_stream(listener_stream));
Self {
cfd_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
}
}
}

4
daemon/src/maker_inc_connections.rs

@ -70,8 +70,8 @@ pub struct Actor {
impl Actor {
pub fn new(
new_taker_channel: &impl MessageChannel<NewTakerOnline>,
taker_msg_channel: &impl MessageChannel<FromTaker>,
new_taker_channel: Box<dyn MessageChannel<NewTakerOnline>>,
taker_msg_channel: Box<dyn MessageChannel<FromTaker>>,
) -> Self {
Self {
write_connections: HashMap::new(),

8
daemon/src/monitor.rs

@ -50,11 +50,11 @@ pub struct Actor<C = bdk::electrum_client::Client> {
impl Actor<bdk::electrum_client::Client> {
pub async fn new(
electrum_rpc_url: &str,
event_channel: impl StrongMessageChannel<Event> + 'static,
electrum_rpc_url: String,
event_channel: Box<dyn StrongMessageChannel<Event>>,
cfds: Vec<Cfd>,
) -> Result<Self> {
let client = bdk::electrum_client::Client::new(electrum_rpc_url)
let client = bdk::electrum_client::Client::new(&electrum_rpc_url)
.context("Failed to initialize Electrum RPC client")?;
// Initially fetch the latest block for storing the height.
@ -65,7 +65,7 @@ impl Actor<bdk::electrum_client::Client> {
let mut actor = Self {
cfds: HashMap::new(),
event_channel: Box::new(event_channel),
event_channel,
client,
latest_block_height: BlockHeight::try_from(latest_block)?,
current_status: BTreeMap::default(),

4
daemon/src/oracle.rs

@ -67,7 +67,7 @@ struct NewAttestationFetched {
impl Actor {
pub fn new(
cfds: Vec<Cfd>,
attestation_channel: impl StrongMessageChannel<Attestation> + 'static,
attestation_channel: Box<dyn StrongMessageChannel<Attestation>>,
) -> Self {
let mut pending_attestations = HashSet::new();
@ -101,7 +101,7 @@ impl Actor {
announcements: HashMap::new(),
pending_announcements: HashSet::new(),
pending_attestations,
attestation_channel: Box::new(attestation_channel),
attestation_channel,
}
}
}

6
daemon/src/taker.rs

@ -240,8 +240,8 @@ async fn main() -> Result<()> {
tokio::spawn(
monitor_actor_context.run(
monitor::Actor::new(
opts.network.electrum(),
cfd_actor_inbox.clone(),
opts.network.electrum().to_string(),
Box::new(cfd_actor_inbox.clone()),
cfds.clone(),
)
.await
@ -258,7 +258,7 @@ async fn main() -> Result<()> {
.create(None)
.spawn_global();
tokio::spawn(oracle_actor_context.run(oracle::Actor::new(cfds, actor)));
tokio::spawn(oracle_actor_context.run(oracle::Actor::new(cfds, Box::new(actor))));
oracle_actor_address
.do_send_async(oracle::Sync)

Loading…
Cancel
Save