@ -1,7 +1,8 @@
use crate ::model ::cfd ::OrderId ;
use crate ::model ::cfd ::OrderId ;
use crate ::model ::Usd ;
use crate ::model ::Usd ;
use crate ::tokio_ext ::FutureExt ;
use crate ::{ log_error , noise , send_to_socket , setup_taker , wire , Tasks } ;
use crate ::{ log_error , noise , send_to_socket , setup_taker , wire , Tasks } ;
use anyhow ::Result ;
use anyhow ::{ Context , Result } ;
use futures ::StreamExt ;
use futures ::StreamExt ;
use std ::collections ::HashMap ;
use std ::collections ::HashMap ;
use std ::net ::SocketAddr ;
use std ::net ::SocketAddr ;
@ -29,7 +30,8 @@ pub struct Actor {
identity_sk : x25519_dalek ::StaticSecret ,
identity_sk : x25519_dalek ::StaticSecret ,
maker_to_taker : Box < dyn MessageChannel < wire ::MakerToTaker > > ,
maker_to_taker : Box < dyn MessageChannel < wire ::MakerToTaker > > ,
/// Max duration since the last heartbeat until we die.
/// Max duration since the last heartbeat until we die.
timeout : Duration ,
heartbeat_timeout : Duration ,
connect_timeout : Duration ,
connected_state : Option < ConnectedState > ,
connected_state : Option < ConnectedState > ,
setup_actors : HashMap < OrderId , xtra ::Address < setup_taker ::Actor > > ,
setup_actors : HashMap < OrderId , xtra ::Address < setup_taker ::Actor > > ,
}
}
@ -70,7 +72,8 @@ impl Actor {
status_sender : watch ::Sender < ConnectionStatus > ,
status_sender : watch ::Sender < ConnectionStatus > ,
maker_to_taker : Box < dyn MessageChannel < wire ::MakerToTaker > > ,
maker_to_taker : Box < dyn MessageChannel < wire ::MakerToTaker > > ,
identity_sk : x25519_dalek ::StaticSecret ,
identity_sk : x25519_dalek ::StaticSecret ,
timeout : Duration ,
hearthbeat_timeout : Duration ,
connect_timeout : Duration ,
) -> Self {
) -> Self {
let ( send_to_maker_addr , send_to_maker_ctx ) = xtra ::Context ::new ( None ) ;
let ( send_to_maker_addr , send_to_maker_ctx ) = xtra ::Context ::new ( None ) ;
@ -80,9 +83,10 @@ impl Actor {
send_to_maker_ctx ,
send_to_maker_ctx ,
identity_sk ,
identity_sk ,
maker_to_taker ,
maker_to_taker ,
timeout ,
heartbeat_timeout : hearthbeat_ timeout,
connected_state : None ,
connected_state : None ,
setup_actors : HashMap ::new ( ) ,
setup_actors : HashMap ::new ( ) ,
connect_timeout ,
}
}
}
}
}
}
@ -120,8 +124,20 @@ impl Actor {
} : Connect ,
} : Connect ,
ctx : & mut xtra ::Context < Self > ,
ctx : & mut xtra ::Context < Self > ,
) -> Result < ( ) > {
) -> Result < ( ) > {
tracing ::debug ! ( address = % maker_addr , "Connecting to maker" ) ;
let ( read , write , noise ) = {
let ( read , write , noise ) = {
let mut connection = TcpStream ::connect ( & maker_addr ) . await ? ;
let mut connection = TcpStream ::connect ( & maker_addr )
. timeout ( self . connect_timeout )
. await
. with_context ( | | {
format ! (
"Connection attempt to {} timed out after {}s" ,
maker_addr ,
self . connect_timeout . as_secs ( )
)
} ) ?
. with_context ( | | format ! ( "Failed to connect to {}" , maker_addr ) ) ? ;
let noise =
let noise =
noise ::initiator_handshake ( & mut connection , & self . identity_sk , & maker_identity_pk )
noise ::initiator_handshake ( & mut connection , & self . identity_sk , & maker_identity_pk )
. await ? ;
. await ? ;
@ -142,7 +158,7 @@ impl Actor {
tasks . add ( this . attach_stream ( read ) ) ;
tasks . add ( this . attach_stream ( read ) ) ;
tasks . add (
tasks . add (
ctx . notify_interval ( self . timeout , | | MeasurePulse )
ctx . notify_interval ( self . heartbeat_ timeout, | | MeasurePulse )
. expect ( "we just started" ) ,
. expect ( "we just started" ) ,
) ;
) ;
@ -235,7 +251,7 @@ impl Actor {
)
)
. expect ( "now is always later than heartbeat" ) ;
. expect ( "now is always later than heartbeat" ) ;
if time_since_last_heartbeat > self . timeout {
if time_since_last_heartbeat > self . heartbeat_ timeout {
self . status_sender
self . status_sender
. send ( ConnectionStatus ::Offline )
. send ( ConnectionStatus ::Offline )
. expect ( "watch receiver to outlive the actor" ) ;
. expect ( "watch receiver to outlive the actor" ) ;
@ -256,11 +272,9 @@ pub async fn connect(
) {
) {
loop {
loop {
if maker_online_status_feed_receiver . borrow ( ) . clone ( ) = = ConnectionStatus ::Offline {
if maker_online_status_feed_receiver . borrow ( ) . clone ( ) = = ConnectionStatus ::Offline {
tracing ::info ! ( "No connection to the maker, attempting to connect: " ) ;
tracing ::debug ! ( "No connection to the maker" ) ;
'connect : loop {
'connect : loop {
for address in & maker_addresses {
for address in & maker_addresses {
tracing ::trace ! ( "Connecting to {}" , address ) ;
let connect_msg = Connect {
let connect_msg = Connect {
maker_identity_pk ,
maker_identity_pk ,
maker_addr : * address ,
maker_addr : * address ,
@ -274,11 +288,10 @@ pub async fn connect(
tracing ::trace ! ( % address , "Failed to establish connection: {:#}" , e ) ;
tracing ::trace ! ( % address , "Failed to establish connection: {:#}" , e ) ;
continue ;
continue ;
}
}
tracing ::debug ! ( "Connection established" ) ;
break 'connect ;
break 'connect ;
}
}
tracing ::debug ! (
tracing ::warn ! (
"Tried connecting to {} addresses without success, retrying in {} seconds" ,
"Tried connecting to {} addresses without success, retrying in {} seconds" ,
maker_addresses . len ( ) ,
maker_addresses . len ( ) ,
CONNECT_TO_MAKER_INTERVAL . as_secs ( )
CONNECT_TO_MAKER_INTERVAL . as_secs ( )