@ -329,6 +329,9 @@ where
None = > None ,
} ;
// Need to use `do_send_async` here because we are being invoked from the
// `maker_inc_connections::Actor`. Using `send` would result in a deadlock.
#[ allow(clippy::disallowed_method) ]
self . takers
. do_send_async ( maker_inc_connections ::TakerMessage {
taker_id ,
@ -346,17 +349,27 @@ where
let taker_id = self . get_taker_id_of_proposal ( & order_id ) ? ;
self . takers
. do_send_async ( maker_inc_connections ::TakerMessage {
match self
. takers
. send ( maker_inc_connections ::TakerMessage {
taker_id ,
command : TakerCommand ::NotifySettlementAccepted { id : order_id } ,
} )
. await ? ;
. await ?
{
Ok ( _ ) = > {
self . current_agreed_proposals
. insert ( order_id , self . get_settlement_proposal ( order_id ) ? ) ;
self . remove_pending_proposal ( & order_id )
. context ( "accepted settlement" ) ? ;
}
Err ( e ) = > {
tracing ::warn ! ( "Failed to notify taker of accepted settlement: {}" , e ) ;
self . remove_pending_proposal ( & order_id )
. context ( "accepted settlement" ) ? ;
}
}
self . current_agreed_proposals
. insert ( order_id , self . get_settlement_proposal ( order_id ) ? ) ;
self . remove_pending_proposal ( & order_id )
. context ( "accepted settlement" ) ? ;
Ok ( ( ) )
}
@ -365,15 +378,18 @@ where
let taker_id = self . get_taker_id_of_proposal ( & order_id ) ? ;
// clean-up state ahead of sending to ensure consistency in case we fail to deliver the
// message
self . remove_pending_proposal ( & order_id )
. context ( "rejected settlement" ) ? ;
self . takers
. do_send_async ( maker_inc_connections ::TakerMessage {
. send ( maker_inc_connections ::TakerMessage {
taker_id ,
command : TakerCommand ::NotifySettlementRejected { id : order_id } ,
} )
. await ? ;
. await ? ? ;
self . remove_pending_proposal ( & order_id )
. context ( "rejected settlement" ) ? ;
Ok ( ( ) )
}
@ -394,15 +410,18 @@ where
}
} ;
// clean-up state ahead of sending to ensure consistency in case we fail to deliver the
// message
self . remove_pending_proposal ( & order_id )
. context ( "rejected roll_over" ) ? ;
self . takers
. do_send_async ( maker_inc_connections ::TakerMessage {
. send ( maker_inc_connections ::TakerMessage {
taker_id ,
command : TakerCommand ::NotifyRollOverRejected { id : order_id } ,
} )
. await ? ;
. await ? ? ;
self . remove_pending_proposal ( & order_id )
. context ( "rejected roll_over" ) ? ;
Ok ( ( ) )
}
}
@ -428,19 +447,19 @@ where
load_order_by_id ( current_order_id , & mut conn ) . await ?
}
_ = > {
self . takers
. do_send_async ( maker_inc_connections ::TakerMessage {
taker_id ,
command : TakerCommand ::NotifyInvalidOrderId { id : order_id } ,
} )
. await ? ;
// An outdated order on the taker side does not require any state change on the
// maker. notifying the taker with a specific message should be sufficient.
// Since this is a scenario that we should rarely see we log
// a warning to be sure we don't trigger this code path frequently.
tracing ::warn ! ( "Taker tried to take order with outdated id {}" , order_id ) ;
self . takers
. send ( maker_inc_connections ::TakerMessage {
taker_id ,
command : TakerCommand ::NotifyInvalidOrderId { id : order_id } ,
} )
. await ? ? ;
return Ok ( ( ) ) ;
}
} ;
@ -451,7 +470,7 @@ where
// have to remove the current order.
self . current_order_id = None ;
self . takers
. do_ send_async ( maker_inc_connections ::BroadcastOrder ( None ) )
. send ( maker_inc_connections ::BroadcastOrder ( None ) )
. await ? ;
self . order_feed_sender . send ( None ) ? ;
@ -517,11 +536,11 @@ where
append_cfd_state ( & cfd , & mut conn , & self . cfd_feed_actor_inbox ) . await ? ;
self . takers
. do_ send_async ( maker_inc_connections ::TakerMessage {
. send ( maker_inc_connections ::TakerMessage {
taker_id ,
command : TakerCommand ::NotifyOrderRejected { id : cfd . order . id } ,
} )
. await ? ;
. await ? ? ;
Ok ( ( ) )
}
@ -606,8 +625,9 @@ where
tokio ::spawn ( async move {
let dlc = contract_future . await ;
this . do_ send_async ( CfdSetupCompleted { order_id , dlc } )
this . send ( CfdSetupCompleted { order_id , dlc } )
. await
. expect ( "always connected to ourselves" ) ;
} ) ;
// 6. Record that we are in an active contract setup
@ -668,7 +688,7 @@ where
tracing ::info ! ( "Lock transaction published with txid {}" , txid ) ;
self . monitor_actor
. do_ send_async ( monitor ::StartMonitoring {
. send ( monitor ::StartMonitoring {
id : order_id ,
params : MonitorParams ::new (
dlc ,
@ -679,7 +699,7 @@ where
. await ? ;
self . oracle_actor
. do_ send_async ( oracle ::MonitorAttestation {
. send ( oracle ::MonitorAttestation {
event_id : cfd . order . oracle_event_id ,
} )
. await ? ;
@ -740,7 +760,7 @@ where
. await ? ? ;
self . oracle_actor
. do_ send_async ( oracle ::MonitorAttestation {
. send ( oracle ::MonitorAttestation {
event_id : announcement . id ,
} )
. await ? ;
@ -773,8 +793,9 @@ where
tokio ::spawn ( async move {
let dlc = contract_future . await ;
this . do_ send_async ( CfdRollOverCompleted { order_id , dlc } )
this . send ( CfdRollOverCompleted { order_id , dlc } )
. await
. expect ( "always connected to ourselves" )
} ) ;
self . remove_pending_proposal ( & order_id )
@ -806,7 +827,7 @@ where
append_cfd_state ( & cfd , & mut conn , & self . cfd_feed_actor_inbox ) . await ? ;
self . monitor_actor
. do_ send_async ( monitor ::StartMonitoring {
. send ( monitor ::StartMonitoring {
id : order_id ,
params : MonitorParams ::new (
dlc ,
@ -875,7 +896,7 @@ where
tracing ::info ! ( "Close transaction published with txid {}" , txid ) ;
self . monitor_actor
. do_ send_async ( monitor ::CollaborativeSettlement {
. send ( monitor ::CollaborativeSettlement {
order_id ,
tx : ( txid , own_script_pubkey ) ,
} )
@ -954,7 +975,7 @@ where
// 4. Inform connected takers
self . takers
. do_ send_async ( maker_inc_connections ::BroadcastOrder ( Some ( order ) ) )
. send ( maker_inc_connections ::BroadcastOrder ( Some ( order ) ) )
. await ? ;
Ok ( ( ) )