Browse Source

Initialize Cfd actors outside of Rocket

Allows us to use `?` instead of unwrapping.
feature/actor-custom-derive
Mariusz Klochowicz 3 years ago
parent
commit
8e2f3cfc95
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 76
      daemon/src/maker.rs
  2. 71
      daemon/src/taker.rs

76
daemon/src/maker.rs

@ -15,7 +15,6 @@ use daemon::{
oracle, wallet_sync,
};
use futures::Future;
use rocket::fairing::AdHoc;
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
use std::collections::HashMap;
@ -176,33 +175,15 @@ async fn main() -> Result<()> {
)
.await?;
rocket::custom(figment)
.manage(wallet_feed_receiver)
.manage(auth_password)
.manage(quote_updates)
.manage(bitcoin_network)
.attach(AdHoc::try_on_ignite("SQL migrations", {
let db = db.clone();
move |rocket| async move {
match db::run_migrations(&db).await {
Ok(_) => Ok(rocket),
Err(_) => Err(rocket),
}
}
}))
.attach(AdHoc::try_on_ignite("Create actors", {
let db = db.clone();
db::run_migrations(&db)
.await
.context("Db migrations failed")?;
move |rocket| async move {
let mut conn = db.acquire().await.unwrap();
// Create actors
let mut conn = db.acquire().await?;
housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn)
.await
.unwrap();
housekeeping::rebroadcast_transactions(&mut conn, &wallet)
.await
.unwrap();
housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn).await?;
housekeeping::rebroadcast_transactions(&mut conn, &wallet).await?;
let ActorSystem {
cfd_actor_addr,
@ -210,7 +191,7 @@ async fn main() -> Result<()> {
order_feed_receiver,
update_cfd_feed_receiver,
} = ActorSystem::new(
db,
db.clone(),
wallet.clone(),
oracle,
|cfds, channel| oracle::Actor::new(cfds, channel),
@ -223,23 +204,23 @@ async fn main() -> Result<()> {
|channel0, channel1| maker_inc_connections::Actor::new(channel0, channel1),
listener,
)
.await;
.await?;
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
let cfd_action_channel =
MessageChannel::<maker_cfd::CfdAction>::clone_channel(&cfd_actor_addr);
let new_order_channel =
MessageChannel::<maker_cfd::NewOrder>::clone_channel(&cfd_actor_addr);
let cfd_action_channel = MessageChannel::<maker_cfd::CfdAction>::clone_channel(&cfd_actor_addr);
let new_order_channel = MessageChannel::<maker_cfd::NewOrder>::clone_channel(&cfd_actor_addr);
Ok(rocket
rocket::custom(figment)
.manage(order_feed_receiver)
.manage(update_cfd_feed_receiver)
.manage(cfd_action_channel)
.manage(new_order_channel)
.manage(cfd_feed_receiver))
}
}))
.manage(cfd_feed_receiver)
.manage(wallet_feed_receiver)
.manage(auth_password)
.manage(quote_updates)
.manage(bitcoin_network)
.mount(
"/api",
rocket::routes![
@ -295,13 +276,13 @@ where
Box<dyn MessageChannel<FromTaker>>,
) -> T,
listener: TcpListener,
) -> Self
) -> Result<Self>
where
F: Future<Output = Result<M>>,
{
let mut conn = db.acquire().await.unwrap();
let mut conn = db.acquire().await?;
let cfds = load_all_cfds(&mut conn).await.unwrap();
let cfds = load_all_cfds(&mut conn).await?;
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
@ -334,20 +315,17 @@ where
tokio::spawn(
monitor_ctx
.notify_interval(Duration::from_secs(20), || monitor::Sync)
.unwrap(),
.map_err(|e| anyhow::anyhow!(e))?,
);
tokio::spawn(
monitor_ctx.run(
monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone())
.await
.unwrap(),
),
monitor_ctx
.run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?),
);
tokio::spawn(
oracle_ctx
.notify_interval(Duration::from_secs(5), || oracle::Sync)
.unwrap(),
.map_err(|e| anyhow::anyhow!(e))?,
);
let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr])
.create(None)
@ -355,7 +333,7 @@ where
tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor))));
oracle_addr.do_send_async(oracle::Sync).await.unwrap();
oracle_addr.do_send_async(oracle::Sync).await?;
let listener_stream = futures::stream::poll_fn(move |ctx| {
let message = match futures::ready!(listener.poll_accept(ctx)) {
@ -370,11 +348,11 @@ where
tokio::spawn(inc_conn_addr.attach_stream(listener_stream));
Self {
Ok(Self {
cfd_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
}
})
}
}

71
daemon/src/taker.rs

@ -12,7 +12,6 @@ use daemon::{
bitmex_price_feed, fan_out, housekeeping, logger, monitor, oracle, taker_cfd, wallet_sync, wire,
};
use futures::{Future, Stream};
use rocket::fairing::AdHoc;
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
use std::collections::HashMap;
@ -153,32 +152,15 @@ async fn main() -> Result<()> {
)
.await?;
rocket::custom(figment)
.manage(wallet_feed_receiver)
.manage(quote_updates)
.manage(bitcoin_network)
.attach(AdHoc::try_on_ignite("SQL migrations", {
let db = db.clone();
move |rocket| async move {
match db::run_migrations(&db).await {
Ok(_) => Ok(rocket),
Err(_) => Err(rocket),
}
}
}))
.attach(AdHoc::try_on_ignite("Create actors", {
let db = db.clone();
db::run_migrations(&db)
.await
.context("Db migrations failed")?;
move |rocket| async move {
let mut conn = db.acquire().await.unwrap();
// Create actors
let mut conn = db.acquire().await?;
housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn)
.await
.unwrap();
housekeeping::rebroadcast_transactions(&mut conn, &wallet)
.await
.unwrap();
housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn).await?;
housekeeping::rebroadcast_transactions(&mut conn, &wallet).await?;
let connection::Actor {
send_to_maker,
@ -191,7 +173,7 @@ async fn main() -> Result<()> {
order_feed_receiver,
update_cfd_feed_receiver,
} = ActorSystem::new(
db,
db.clone(),
wallet.clone(),
oracle,
send_to_maker,
@ -204,21 +186,21 @@ async fn main() -> Result<()> {
}
},
)
.await;
.await?;
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
let take_offer_channel =
MessageChannel::<taker_cfd::TakeOffer>::clone_channel(&cfd_actor_addr);
let cfd_action_channel =
MessageChannel::<taker_cfd::CfdAction>::clone_channel(&cfd_actor_addr);
Ok(rocket
let take_offer_channel = MessageChannel::<taker_cfd::TakeOffer>::clone_channel(&cfd_actor_addr);
let cfd_action_channel = MessageChannel::<taker_cfd::CfdAction>::clone_channel(&cfd_actor_addr);
rocket::custom(figment)
.manage(order_feed_receiver)
.manage(update_cfd_feed_receiver)
.manage(take_offer_channel)
.manage(cfd_action_channel)
.manage(cfd_feed_receiver))
}
}))
.manage(cfd_feed_receiver)
.manage(wallet_feed_receiver)
.manage(quote_updates)
.manage(bitcoin_network)
.mount(
"/api",
rocket::routes![
@ -267,13 +249,13 @@ where
read_from_maker: Box<dyn Stream<Item = taker_cfd::MakerStreamMessage> + Unpin + Send>,
oracle_constructor: impl Fn(Vec<Cfd>, Box<dyn StrongMessageChannel<Attestation>>) -> O,
monitor_constructor: impl Fn(Box<dyn StrongMessageChannel<monitor::Event>>, Vec<Cfd>) -> F,
) -> Self
) -> Result<Self>
where
F: Future<Output = Result<M>>,
{
let mut conn = db.acquire().await.unwrap();
let mut conn = db.acquire().await?;
let cfds = load_all_cfds(&mut conn).await.unwrap();
let cfds = load_all_cfds(&mut conn).await?;
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
@ -302,14 +284,11 @@ where
tokio::spawn(
monitor_ctx
.notify_interval(Duration::from_secs(20), || monitor::Sync)
.unwrap(),
.map_err(|e| anyhow::anyhow!(e))?,
);
tokio::spawn(
monitor_ctx.run(
monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone())
.await
.unwrap(),
),
monitor_ctx
.run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?),
);
tokio::spawn(
@ -323,11 +302,11 @@ where
tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor))));
Self {
Ok(Self {
cfd_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
}
})
}
}

Loading…
Cancel
Save