@ -12,7 +12,6 @@ use daemon::{
bitmex_price_feed , fan_out , housekeeping , logger , monitor , oracle , taker_cfd , wallet_sync , wire ,
bitmex_price_feed , fan_out , housekeeping , logger , monitor , oracle , taker_cfd , wallet_sync , wire ,
} ;
} ;
use futures ::{ Future , Stream } ;
use futures ::{ Future , Stream } ;
use rocket ::fairing ::AdHoc ;
use sqlx ::sqlite ::SqliteConnectOptions ;
use sqlx ::sqlite ::SqliteConnectOptions ;
use sqlx ::SqlitePool ;
use sqlx ::SqlitePool ;
use std ::collections ::HashMap ;
use std ::collections ::HashMap ;
@ -153,72 +152,55 @@ async fn main() -> Result<()> {
)
)
. await ? ;
. await ? ;
db ::run_migrations ( & db )
. await
. context ( "Db migrations failed" ) ? ;
// Create actors
let mut conn = db . acquire ( ) . await ? ;
housekeeping ::transition_non_continue_cfds_to_setup_failed ( & mut conn ) . await ? ;
housekeeping ::rebroadcast_transactions ( & mut conn , & wallet ) . await ? ;
let connection ::Actor {
send_to_maker ,
read_from_maker ,
} = connection ::Actor ::new ( opts . maker ) . await ;
let ActorSystem {
cfd_actor_addr ,
cfd_feed_receiver ,
order_feed_receiver ,
update_cfd_feed_receiver ,
} = ActorSystem ::new (
db . clone ( ) ,
wallet . clone ( ) ,
oracle ,
send_to_maker ,
read_from_maker ,
| cfds , channel | oracle ::Actor ::new ( cfds , channel ) ,
{
| channel , cfds | {
let electrum = opts . network . electrum ( ) . to_string ( ) ;
monitor ::Actor ::new ( electrum , channel , cfds )
}
} ,
)
. await ? ;
tokio ::spawn ( wallet_sync ::new ( wallet , wallet_feed_sender ) ) ;
let take_offer_channel = MessageChannel ::< taker_cfd ::TakeOffer > ::clone_channel ( & cfd_actor_addr ) ;
let cfd_action_channel = MessageChannel ::< taker_cfd ::CfdAction > ::clone_channel ( & cfd_actor_addr ) ;
rocket ::custom ( figment )
rocket ::custom ( figment )
. manage ( order_feed_receiver )
. manage ( update_cfd_feed_receiver )
. manage ( take_offer_channel )
. manage ( cfd_action_channel )
. manage ( cfd_feed_receiver )
. manage ( wallet_feed_receiver )
. manage ( wallet_feed_receiver )
. manage ( quote_updates )
. manage ( quote_updates )
. manage ( bitcoin_network )
. manage ( bitcoin_network )
. attach ( AdHoc ::try_on_ignite ( "SQL migrations" , {
let db = db . clone ( ) ;
move | rocket | async move {
match db ::run_migrations ( & db ) . await {
Ok ( _ ) = > Ok ( rocket ) ,
Err ( _ ) = > Err ( rocket ) ,
}
}
} ) )
. attach ( AdHoc ::try_on_ignite ( "Create actors" , {
let db = db . clone ( ) ;
move | rocket | async move {
let mut conn = db . acquire ( ) . await . unwrap ( ) ;
housekeeping ::transition_non_continue_cfds_to_setup_failed ( & mut conn )
. await
. unwrap ( ) ;
housekeeping ::rebroadcast_transactions ( & mut conn , & wallet )
. await
. unwrap ( ) ;
let connection ::Actor {
send_to_maker ,
read_from_maker ,
} = connection ::Actor ::new ( opts . maker ) . await ;
let ActorSystem {
cfd_actor_addr ,
cfd_feed_receiver ,
order_feed_receiver ,
update_cfd_feed_receiver ,
} = ActorSystem ::new (
db ,
wallet . clone ( ) ,
oracle ,
send_to_maker ,
read_from_maker ,
| cfds , channel | oracle ::Actor ::new ( cfds , channel ) ,
{
| channel , cfds | {
let electrum = opts . network . electrum ( ) . to_string ( ) ;
monitor ::Actor ::new ( electrum , channel , cfds )
}
} ,
)
. await ;
tokio ::spawn ( wallet_sync ::new ( wallet , wallet_feed_sender ) ) ;
let take_offer_channel =
MessageChannel ::< taker_cfd ::TakeOffer > ::clone_channel ( & cfd_actor_addr ) ;
let cfd_action_channel =
MessageChannel ::< taker_cfd ::CfdAction > ::clone_channel ( & cfd_actor_addr ) ;
Ok ( rocket
. manage ( order_feed_receiver )
. manage ( update_cfd_feed_receiver )
. manage ( take_offer_channel )
. manage ( cfd_action_channel )
. manage ( cfd_feed_receiver ) )
}
} ) )
. mount (
. mount (
"/api" ,
"/api" ,
rocket ::routes ! [
rocket ::routes ! [
@ -267,13 +249,13 @@ where
read_from_maker : Box < dyn Stream < Item = taker_cfd ::MakerStreamMessage > + Unpin + Send > ,
read_from_maker : Box < dyn Stream < Item = taker_cfd ::MakerStreamMessage > + Unpin + Send > ,
oracle_constructor : impl Fn ( Vec < Cfd > , Box < dyn StrongMessageChannel < Attestation > > ) -> O ,
oracle_constructor : impl Fn ( Vec < Cfd > , Box < dyn StrongMessageChannel < Attestation > > ) -> O ,
monitor_constructor : impl Fn ( Box < dyn StrongMessageChannel < monitor ::Event > > , Vec < Cfd > ) -> F ,
monitor_constructor : impl Fn ( Box < dyn StrongMessageChannel < monitor ::Event > > , Vec < Cfd > ) -> F ,
) -> Self
) -> Result < Self >
where
where
F : Future < Output = Result < M > > ,
F : Future < Output = Result < M > > ,
{
{
let mut conn = db . acquire ( ) . await . unwrap ( ) ;
let mut conn = db . acquire ( ) . await ? ;
let cfds = load_all_cfds ( & mut conn ) . await . unwrap ( ) ;
let cfds = load_all_cfds ( & mut conn ) . await ? ;
let ( cfd_feed_sender , cfd_feed_receiver ) = watch ::channel ( cfds . clone ( ) ) ;
let ( cfd_feed_sender , cfd_feed_receiver ) = watch ::channel ( cfds . clone ( ) ) ;
let ( order_feed_sender , order_feed_receiver ) = watch ::channel ::< Option < Order > > ( None ) ;
let ( order_feed_sender , order_feed_receiver ) = watch ::channel ::< Option < Order > > ( None ) ;
@ -302,14 +284,11 @@ where
tokio ::spawn (
tokio ::spawn (
monitor_ctx
monitor_ctx
. notify_interval ( Duration ::from_secs ( 20 ) , | | monitor ::Sync )
. notify_interval ( Duration ::from_secs ( 20 ) , | | monitor ::Sync )
. unwrap ( ) ,
. map_err ( | e | anyhow ::anyhow ! ( e ) ) ? ,
) ;
) ;
tokio ::spawn (
tokio ::spawn (
monitor_ctx . run (
monitor_ctx
monitor_constructor ( Box ::new ( cfd_actor_addr . clone ( ) ) , cfds . clone ( ) )
. run ( monitor_constructor ( Box ::new ( cfd_actor_addr . clone ( ) ) , cfds . clone ( ) ) . await ? ) ,
. await
. unwrap ( ) ,
) ,
) ;
) ;
tokio ::spawn (
tokio ::spawn (
@ -323,11 +302,11 @@ where
tokio ::spawn ( oracle_ctx . run ( oracle_constructor ( cfds , Box ::new ( fan_out_actor ) ) ) ) ;
tokio ::spawn ( oracle_ctx . run ( oracle_constructor ( cfds , Box ::new ( fan_out_actor ) ) ) ) ;
Self {
Ok ( Self {
cfd_actor_addr ,
cfd_actor_addr ,
cfd_feed_receiver ,
cfd_feed_receiver ,
order_feed_receiver ,
order_feed_receiver ,
update_cfd_feed_receiver ,
update_cfd_feed_receiver ,
}
} )
}
}
}
}