@ -8,7 +8,6 @@ use crate::model::Timestamp;
use crate ::model ::Usd ;
use crate ::model ::Usd ;
use crate ::noise ;
use crate ::noise ;
use crate ::rollover_taker ;
use crate ::rollover_taker ;
use crate ::send_to_socket ;
use crate ::setup_taker ;
use crate ::setup_taker ;
use crate ::taker_cfd ::CurrentOrder ;
use crate ::taker_cfd ::CurrentOrder ;
use crate ::tokio_ext ::FutureExt ;
use crate ::tokio_ext ::FutureExt ;
@ -38,21 +37,73 @@ use xtra_productivity::xtra_productivity;
/// Time between reconnection attempts
/// Time between reconnection attempts
const CONNECT_TO_MAKER_INTERVAL : Duration = Duration ::from_secs ( 5 ) ;
const CONNECT_TO_MAKER_INTERVAL : Duration = Duration ::from_secs ( 5 ) ;
struct ConnectedState {
/// The "Connected" state of our connection with the maker.
last_heartbeat : SystemTime ,
#[ allow(clippy::large_enum_variant) ]
_tasks : Tasks ,
enum State {
Connected {
last_heartbeat : SystemTime ,
write : wire ::Write < wire ::MakerToTaker , wire ::TakerToMaker > ,
_tasks : Tasks ,
} ,
Disconnected ,
}
impl State {
async fn send ( & mut self , msg : wire ::TakerToMaker ) -> Result < ( ) > {
let msg_str = msg . to_string ( ) ;
let write = match self {
State ::Connected { write , . . } = > write ,
State ::Disconnected = > {
bail ! ( "Cannot send {}, not connected to maker" , msg_str ) ;
}
} ;
write
. send ( msg )
. await
. with_context ( | | format ! ( "Failed to send message {} to maker" , msg_str ) ) ? ;
Ok ( ( ) )
}
fn handle_incoming_heartbeat ( & mut self ) {
match self {
State ::Connected { last_heartbeat , . . } = > {
* last_heartbeat = SystemTime ::now ( ) ;
}
State ::Disconnected = > {
debug_assert ! ( false , "Received heartbeat in disconnected state" )
}
}
}
fn disconnect_if_last_heartbeat_older_than ( & mut self , timeout : Duration ) -> bool {
let duration_since_last_heartbeat = match self {
State ::Connected { last_heartbeat , . . } = > SystemTime ::now ( )
. duration_since ( * last_heartbeat )
. expect ( "clock is monotonic" ) ,
State ::Disconnected = > return false ,
} ;
if duration_since_last_heartbeat < timeout {
return false ;
}
* self = State ::Disconnected ;
true
}
}
}
pub struct Actor {
pub struct Actor {
status_sender : watch ::Sender < ConnectionStatus > ,
status_sender : watch ::Sender < ConnectionStatus > ,
send_to_maker : Box < dyn MessageChannel < wire ::TakerToMaker > > ,
send_to_maker_ctx : xtra ::Context < send_to_socket ::Actor < wire ::MakerToTaker , wire ::TakerToMaker > > ,
identity_sk : x25519_dalek ::StaticSecret ,
identity_sk : x25519_dalek ::StaticSecret ,
current_order : Box < dyn MessageChannel < CurrentOrder > > ,
current_order : Box < dyn MessageChannel < CurrentOrder > > ,
/// Max duration since the last heartbeat until we die.
/// Max duration since the last heartbeat until we die.
heartbeat_timeout : Duration ,
heartbeat_timeout : Duration ,
connect_timeout : Duration ,
connect_timeout : Duration ,
connected_state : Option < ConnectedState > ,
state : State ,
setup_actors : AddressMap < OrderId , setup_taker ::Actor > ,
setup_actors : AddressMap < OrderId , setup_taker ::Actor > ,
collab_settlement_actors : AddressMap < OrderId , collab_settlement_taker ::Actor > ,
collab_settlement_actors : AddressMap < OrderId , collab_settlement_taker ::Actor > ,
rollover_actors : AddressMap < OrderId , rollover_taker ::Actor > ,
rollover_actors : AddressMap < OrderId , rollover_taker ::Actor > ,
@ -122,16 +173,12 @@ impl Actor {
hearthbeat_timeout : Duration ,
hearthbeat_timeout : Duration ,
connect_timeout : Duration ,
connect_timeout : Duration ,
) -> Self {
) -> Self {
let ( send_to_maker_addr , send_to_maker_ctx ) = xtra ::Context ::new ( None ) ;
Self {
Self {
status_sender ,
status_sender ,
send_to_maker : Box ::new ( send_to_maker_addr ) ,
send_to_maker_ctx ,
identity_sk ,
identity_sk ,
current_order : current_order . clone_channel ( ) ,
current_order : current_order . clone_channel ( ) ,
heartbeat_timeout : hearthbeat_timeout ,
heartbeat_timeout : hearthbeat_timeout ,
connected_state : None ,
state : State ::Disconnected ,
setup_actors : AddressMap ::default ( ) ,
setup_actors : AddressMap ::default ( ) ,
connect_timeout ,
connect_timeout ,
collab_settlement_actors : AddressMap ::default ( ) ,
collab_settlement_actors : AddressMap ::default ( ) ,
@ -142,13 +189,6 @@ impl Actor {
#[ xtra_productivity(message_impl = false) ]
#[ xtra_productivity(message_impl = false) ]
impl Actor {
impl Actor {
async fn handle_taker_to_maker ( & mut self , message : wire ::TakerToMaker ) {
let msg_str = message . to_string ( ) ;
if self . send_to_maker . send ( message ) . await . is_err ( ) {
tracing ::warn ! ( "Failed to send wire message {} to maker" , msg_str ) ;
}
}
async fn handle_collab_settlement_actor_stopping (
async fn handle_collab_settlement_actor_stopping (
& mut self ,
& mut self ,
message : Stopping < collab_settlement_taker ::Actor > ,
message : Stopping < collab_settlement_taker ::Actor > ,
@ -163,8 +203,14 @@ impl Actor {
#[ xtra_productivity ]
#[ xtra_productivity ]
impl Actor {
impl Actor {
async fn handle_taker_to_maker ( & mut self , message : wire ::TakerToMaker ) {
if let Err ( e ) = self . state . send ( message ) . await {
tracing ::warn ! ( "{:#}" , e ) ;
}
}
async fn handle_take_order ( & mut self , msg : TakeOrder ) -> Result < ( ) > {
async fn handle_take_order ( & mut self , msg : TakeOrder ) -> Result < ( ) > {
self . send_to_maker
self . state
. send ( wire ::TakerToMaker ::TakeOrder {
. send ( wire ::TakerToMaker ::TakeOrder {
order_id : msg . order_id ,
order_id : msg . order_id ,
quantity : msg . quantity ,
quantity : msg . quantity ,
@ -186,7 +232,7 @@ impl Actor {
address ,
address ,
} = msg ;
} = msg ;
self . send_to_maker
self . state
. send ( wire ::TakerToMaker ::Settlement {
. send ( wire ::TakerToMaker ::Settlement {
order_id ,
order_id ,
msg : wire ::taker_to_maker ::Settlement ::Propose {
msg : wire ::taker_to_maker ::Settlement ::Propose {
@ -210,7 +256,7 @@ impl Actor {
address ,
address ,
} = msg ;
} = msg ;
self . send_to_maker
self . state
. send ( wire ::TakerToMaker ::ProposeRollOver {
. send ( wire ::TakerToMaker ::ProposeRollOver {
order_id ,
order_id ,
timestamp ,
timestamp ,
@ -301,20 +347,18 @@ impl Actor {
let this = ctx . address ( ) . expect ( "self to be alive" ) ;
let this = ctx . address ( ) . expect ( "self to be alive" ) ;
let send_to_socket = send_to_socket ::Actor ::new ( write ) ;
let mut tasks = Tasks ::default ( ) ;
let mut tasks = Tasks ::default ( ) ;
tasks . add ( self . send_to_maker_ctx . attach ( send_to_socket ) ) ;
tasks . add ( this . attach_stream ( read . map ( move | item | MakerStreamMessage { item } ) ) ) ;
tasks . add ( this . attach_stream ( read . map ( move | item | MakerStreamMessage { item } ) ) ) ;
tasks . add (
tasks . add (
ctx . notify_interval ( self . heartbeat_timeout , | | MeasurePulse )
ctx . notify_interval ( self . heartbeat_timeout , | | MeasurePulse )
. expect ( "we just started" ) ,
. expect ( "we just started" ) ,
) ;
) ;
self . connected_ state = Some ( ConnectedState {
self . state = State ::Connected {
last_heartbeat : SystemTime ::now ( ) ,
last_heartbeat : SystemTime ::now ( ) ,
write ,
_tasks : tasks ,
_tasks : tasks ,
} ) ;
} ;
self . status_sender
self . status_sender
. send ( ConnectionStatus ::Online )
. send ( ConnectionStatus ::Online )
. expect ( "receiver to outlive the actor" ) ;
. expect ( "receiver to outlive the actor" ) ;
@ -339,10 +383,7 @@ impl Actor {
match msg {
match msg {
wire ::MakerToTaker ::Heartbeat = > {
wire ::MakerToTaker ::Heartbeat = > {
self . connected_state
self . state . handle_incoming_heartbeat ( ) ;
. as_mut ( )
. expect ( "wire messages only to arrive in connected state" )
. last_heartbeat = SystemTime ::now ( ) ;
}
}
wire ::MakerToTaker ::ConfirmOrder ( order_id ) = > {
wire ::MakerToTaker ::ConfirmOrder ( order_id ) = > {
if self
if self
@ -440,20 +481,13 @@ impl Actor {
}
}
fn handle_measure_pulse ( & mut self , _ : MeasurePulse ) {
fn handle_measure_pulse ( & mut self , _ : MeasurePulse ) {
let time_since_last_heartbeat = SystemTime ::now ( )
if self
. duration_since (
. state
self . connected_state
. disconnect_if_last_heartbeat_older_than ( self . heartbeat_timeout )
. as_ref ( )
{
. expect ( "only run pulse measurements if connected" )
. last_heartbeat ,
)
. expect ( "now is always later than heartbeat" ) ;
if time_since_last_heartbeat > self . heartbeat_timeout {
self . status_sender
self . status_sender
. send ( ConnectionStatus ::Offline { reason : None } )
. send ( ConnectionStatus ::Offline { reason : None } )
. expect ( "watch receiver to outlive the actor" ) ;
. expect ( "watch receiver to outlive the actor" ) ;
self . connected_state = None ;
}
}
}
}
}
}