diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index d5fa294..7b3e158 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -15,7 +15,6 @@ use daemon::{ oracle, wallet_sync, }; use futures::Future; -use rocket::fairing::AdHoc; use sqlx::sqlite::SqliteConnectOptions; use sqlx::SqlitePool; use std::collections::HashMap; @@ -176,70 +175,52 @@ async fn main() -> Result<()> { ) .await?; + db::run_migrations(&db) + .await + .context("Db migrations failed")?; + + // Create actors + let mut conn = db.acquire().await?; + + housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn).await?; + housekeeping::rebroadcast_transactions(&mut conn, &wallet).await?; + + let ActorSystem { + cfd_actor_addr, + cfd_feed_receiver, + order_feed_receiver, + update_cfd_feed_receiver, + } = ActorSystem::new( + db.clone(), + wallet.clone(), + oracle, + |cfds, channel| oracle::Actor::new(cfds, channel), + { + |channel, cfds| { + let electrum = opts.network.electrum().to_string(); + monitor::Actor::new(electrum, channel, cfds) + } + }, + |channel0, channel1| maker_inc_connections::Actor::new(channel0, channel1), + listener, + ) + .await?; + + tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); + + let cfd_action_channel = MessageChannel::::clone_channel(&cfd_actor_addr); + let new_order_channel = MessageChannel::::clone_channel(&cfd_actor_addr); + rocket::custom(figment) + .manage(order_feed_receiver) + .manage(update_cfd_feed_receiver) + .manage(cfd_action_channel) + .manage(new_order_channel) + .manage(cfd_feed_receiver) .manage(wallet_feed_receiver) .manage(auth_password) .manage(quote_updates) .manage(bitcoin_network) - .attach(AdHoc::try_on_ignite("SQL migrations", { - let db = db.clone(); - - move |rocket| async move { - match db::run_migrations(&db).await { - Ok(_) => Ok(rocket), - Err(_) => Err(rocket), - } - } - })) - .attach(AdHoc::try_on_ignite("Create actors", { - let db = db.clone(); - - move |rocket| async move { - let mut conn = db.acquire().await.unwrap(); - - housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn) - .await - .unwrap(); - housekeeping::rebroadcast_transactions(&mut conn, &wallet) - .await - .unwrap(); - - let ActorSystem { - cfd_actor_addr, - cfd_feed_receiver, - order_feed_receiver, - update_cfd_feed_receiver, - } = ActorSystem::new( - db, - wallet.clone(), - oracle, - |cfds, channel| oracle::Actor::new(cfds, channel), - { - |channel, cfds| { - let electrum = opts.network.electrum().to_string(); - monitor::Actor::new(electrum, channel, cfds) - } - }, - |channel0, channel1| maker_inc_connections::Actor::new(channel0, channel1), - listener, - ) - .await; - - tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); - - let cfd_action_channel = - MessageChannel::::clone_channel(&cfd_actor_addr); - let new_order_channel = - MessageChannel::::clone_channel(&cfd_actor_addr); - - Ok(rocket - .manage(order_feed_receiver) - .manage(update_cfd_feed_receiver) - .manage(cfd_action_channel) - .manage(new_order_channel) - .manage(cfd_feed_receiver)) - } - })) .mount( "/api", rocket::routes![ @@ -295,13 +276,13 @@ where Box>, ) -> T, listener: TcpListener, - ) -> Self + ) -> Result where F: Future>, { - let mut conn = db.acquire().await.unwrap(); + let mut conn = db.acquire().await?; - let cfds = load_all_cfds(&mut conn).await.unwrap(); + let cfds = load_all_cfds(&mut conn).await?; let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); @@ -334,20 +315,17 @@ where tokio::spawn( monitor_ctx .notify_interval(Duration::from_secs(20), || monitor::Sync) - .unwrap(), + .map_err(|e| anyhow::anyhow!(e))?, ); tokio::spawn( - monitor_ctx.run( - monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()) - .await - .unwrap(), - ), + monitor_ctx + .run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?), ); tokio::spawn( oracle_ctx .notify_interval(Duration::from_secs(5), || oracle::Sync) - .unwrap(), + .map_err(|e| anyhow::anyhow!(e))?, ); let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr]) .create(None) @@ -355,7 +333,7 @@ where tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor)))); - oracle_addr.do_send_async(oracle::Sync).await.unwrap(); + oracle_addr.do_send_async(oracle::Sync).await?; let listener_stream = futures::stream::poll_fn(move |ctx| { let message = match futures::ready!(listener.poll_accept(ctx)) { @@ -370,11 +348,11 @@ where tokio::spawn(inc_conn_addr.attach_stream(listener_stream)); - Self { + Ok(Self { cfd_actor_addr, cfd_feed_receiver, order_feed_receiver, update_cfd_feed_receiver, - } + }) } } diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 79da2c8..cb211ac 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -12,7 +12,6 @@ use daemon::{ bitmex_price_feed, fan_out, housekeeping, logger, monitor, oracle, taker_cfd, wallet_sync, wire, }; use futures::{Future, Stream}; -use rocket::fairing::AdHoc; use sqlx::sqlite::SqliteConnectOptions; use sqlx::SqlitePool; use std::collections::HashMap; @@ -153,72 +152,55 @@ async fn main() -> Result<()> { ) .await?; + db::run_migrations(&db) + .await + .context("Db migrations failed")?; + + // Create actors + let mut conn = db.acquire().await?; + + housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn).await?; + housekeeping::rebroadcast_transactions(&mut conn, &wallet).await?; + + let connection::Actor { + send_to_maker, + read_from_maker, + } = connection::Actor::new(opts.maker).await; + + let ActorSystem { + cfd_actor_addr, + cfd_feed_receiver, + order_feed_receiver, + update_cfd_feed_receiver, + } = ActorSystem::new( + db.clone(), + wallet.clone(), + oracle, + send_to_maker, + read_from_maker, + |cfds, channel| oracle::Actor::new(cfds, channel), + { + |channel, cfds| { + let electrum = opts.network.electrum().to_string(); + monitor::Actor::new(electrum, channel, cfds) + } + }, + ) + .await?; + + tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); + let take_offer_channel = MessageChannel::::clone_channel(&cfd_actor_addr); + let cfd_action_channel = MessageChannel::::clone_channel(&cfd_actor_addr); + rocket::custom(figment) + .manage(order_feed_receiver) + .manage(update_cfd_feed_receiver) + .manage(take_offer_channel) + .manage(cfd_action_channel) + .manage(cfd_feed_receiver) .manage(wallet_feed_receiver) .manage(quote_updates) .manage(bitcoin_network) - .attach(AdHoc::try_on_ignite("SQL migrations", { - let db = db.clone(); - - move |rocket| async move { - match db::run_migrations(&db).await { - Ok(_) => Ok(rocket), - Err(_) => Err(rocket), - } - } - })) - .attach(AdHoc::try_on_ignite("Create actors", { - let db = db.clone(); - - move |rocket| async move { - let mut conn = db.acquire().await.unwrap(); - - housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn) - .await - .unwrap(); - housekeeping::rebroadcast_transactions(&mut conn, &wallet) - .await - .unwrap(); - - let connection::Actor { - send_to_maker, - read_from_maker, - } = connection::Actor::new(opts.maker).await; - - let ActorSystem { - cfd_actor_addr, - cfd_feed_receiver, - order_feed_receiver, - update_cfd_feed_receiver, - } = ActorSystem::new( - db, - wallet.clone(), - oracle, - send_to_maker, - read_from_maker, - |cfds, channel| oracle::Actor::new(cfds, channel), - { - |channel, cfds| { - let electrum = opts.network.electrum().to_string(); - monitor::Actor::new(electrum, channel, cfds) - } - }, - ) - .await; - - tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); - let take_offer_channel = - MessageChannel::::clone_channel(&cfd_actor_addr); - let cfd_action_channel = - MessageChannel::::clone_channel(&cfd_actor_addr); - Ok(rocket - .manage(order_feed_receiver) - .manage(update_cfd_feed_receiver) - .manage(take_offer_channel) - .manage(cfd_action_channel) - .manage(cfd_feed_receiver)) - } - })) .mount( "/api", rocket::routes![ @@ -267,13 +249,13 @@ where read_from_maker: Box + Unpin + Send>, oracle_constructor: impl Fn(Vec, Box>) -> O, monitor_constructor: impl Fn(Box>, Vec) -> F, - ) -> Self + ) -> Result where F: Future>, { - let mut conn = db.acquire().await.unwrap(); + let mut conn = db.acquire().await?; - let cfds = load_all_cfds(&mut conn).await.unwrap(); + let cfds = load_all_cfds(&mut conn).await?; let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); @@ -302,14 +284,11 @@ where tokio::spawn( monitor_ctx .notify_interval(Duration::from_secs(20), || monitor::Sync) - .unwrap(), + .map_err(|e| anyhow::anyhow!(e))?, ); tokio::spawn( - monitor_ctx.run( - monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()) - .await - .unwrap(), - ), + monitor_ctx + .run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?), ); tokio::spawn( @@ -323,11 +302,11 @@ where tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor)))); - Self { + Ok(Self { cfd_actor_addr, cfd_feed_receiver, order_feed_receiver, update_cfd_feed_receiver, - } + }) } }