@ -5,7 +5,6 @@ use crate::cfd_actors::insert_cfd_and_update_feed;
use crate ::cfd_actors ::{ self } ;
use crate ::cfd_actors ::{ self } ;
use crate ::collab_settlement_maker ;
use crate ::collab_settlement_maker ;
use crate ::db ::load_cfd ;
use crate ::db ::load_cfd ;
use crate ::log_error ;
use crate ::maker_inc_connections ;
use crate ::maker_inc_connections ;
use crate ::model ::cfd ::Cfd ;
use crate ::model ::cfd ::Cfd ;
use crate ::model ::cfd ::CfdState ;
use crate ::model ::cfd ::CfdState ;
@ -557,49 +556,50 @@ where
M : xtra ::Handler < monitor ::CollaborativeSettlement > ,
M : xtra ::Handler < monitor ::CollaborativeSettlement > ,
W : xtra ::Handler < wallet ::TryBroadcastTransaction > ,
W : xtra ::Handler < wallet ::TryBroadcastTransaction > ,
{
{
async fn handle_settlement_completed ( & mut self , msg : collab_settlement_maker ::Completed ) {
async fn handle_settlement_completed (
log_error ! ( async {
& mut self ,
use collab_settlement_maker ::Completed ::* ;
msg : collab_settlement_maker ::Completed ,
let ( order_id , settlement , script_pubkey ) = match msg {
) -> Result < ( ) > {
Confirmed {
use collab_settlement_maker ::Completed ::* ;
order_id ,
let ( order_id , settlement , script_pubkey ) = match msg {
settlement ,
Confirmed {
script_pubkey ,
order_id ,
} = > ( order_id , settlement , script_pubkey ) ,
settlement ,
Rejected { . . } = > {
script_pubkey ,
return Ok ( ( ) ) ;
} = > ( order_id , settlement , script_pubkey ) ,
}
Rejected { . . } = > {
Failed { order_id , error } = > {
return Ok ( ( ) ) ;
tracing ::warn ! ( % order_id , "Collaborative settlement failed: {:#}" , error ) ;
}
return Ok ( ( ) ) ;
Failed { order_id , error } = > {
}
tracing ::warn ! ( % order_id , "Collaborative settlement failed: {:#}" , error ) ;
} ;
return Ok ( ( ) ) ;
}
} ;
let mut conn = self . db . acquire ( ) . await ? ;
let mut conn = self . db . acquire ( ) . await ? ;
let mut cfd = load_cfd ( order_id , & mut conn ) . await ? ;
let mut cfd = load_cfd ( order_id , & mut conn ) . await ? ;
let tx = settlement . tx . clone ( ) ;
let tx = settlement . tx . clone ( ) ;
cfd . handle_proposal_signed ( settlement )
cfd . handle_proposal_signed ( settlement )
. context ( "Failed to update state with collaborative settlement" ) ? ;
. context ( "Failed to update state with collaborative settlement" ) ? ;
append_cfd_state ( & cfd , & mut conn , & self . projection_actor ) . await ? ;
append_cfd_state ( & cfd , & mut conn , & self . projection_actor ) . await ? ;
let txid = self
let txid = self
. wallet
. wallet
. send ( wallet ::TryBroadcastTransaction { tx } )
. send ( wallet ::TryBroadcastTransaction { tx } )
. await ?
. await ?
. context ( "Broadcasting close transaction" ) ? ;
. context ( "Broadcasting close transaction" ) ? ;
tracing ::info ! ( % order_id , "Close transaction published with txid {}" , txid ) ;
tracing ::info ! ( % order_id , "Close transaction published with txid {}" , txid ) ;
self . monitor_actor
self . monitor_actor
. send ( monitor ::CollaborativeSettlement {
. send ( monitor ::CollaborativeSettlement {
order_id ,
order_id ,
tx : ( txid , script_pubkey ) ,
tx : ( txid , script_pubkey ) ,
} )
} )
. await ? ;
. await ? ;
anyhow ::Ok ( ( ) )
Ok ( ( ) )
} ) ;
}
}
}
}
@ -754,56 +754,54 @@ where
M : xtra ::Handler < monitor ::StartMonitoring > ,
M : xtra ::Handler < monitor ::StartMonitoring > ,
W : xtra ::Handler < wallet ::TryBroadcastTransaction > ,
W : xtra ::Handler < wallet ::TryBroadcastTransaction > ,
{
{
async fn handle_setup_completed ( & mut self , msg : setup_maker ::Completed ) {
async fn handle_setup_completed ( & mut self , msg : setup_maker ::Completed ) -> Result < ( ) > {
log_error ! ( async {
use setup_maker ::Completed ::* ;
use setup_maker ::Completed ::* ;
let ( order_id , dlc ) = match msg {
let ( order_id , dlc ) = match msg {
NewContract { order_id , dlc } = > ( order_id , dlc ) ,
NewContract { order_id , dlc } = > ( order_id , dlc ) ,
Failed { order_id , error } = > {
Failed { order_id , error } = > {
self . append_cfd_state_setup_failed ( order_id , error ) . await ? ;
self . append_cfd_state_setup_failed ( order_id , error ) . await ? ;
return anyhow ::Ok ( ( ) ) ;
return anyhow ::Ok ( ( ) ) ;
}
}
Rejected ( order_id ) = > {
Rejected ( order_id ) = > {
self . append_cfd_state_rejected ( order_id ) . await ? ;
self . append_cfd_state_rejected ( order_id ) . await ? ;
return anyhow ::Ok ( ( ) ) ;
return anyhow ::Ok ( ( ) ) ;
}
}
} ;
} ;
let mut conn = self . db . acquire ( ) . await ? ;
let mut conn = self . db . acquire ( ) . await ? ;
let mut cfd = load_cfd ( order_id , & mut conn ) . await ? ;
let mut cfd = load_cfd ( order_id , & mut conn ) . await ? ;
* cfd . state_mut ( ) = CfdState ::PendingOpen {
* cfd . state_mut ( ) = CfdState ::PendingOpen {
common : CfdStateCommon ::default ( ) ,
common : CfdStateCommon ::default ( ) ,
dlc : dlc . clone ( ) ,
dlc : dlc . clone ( ) ,
attestation : None ,
attestation : None ,
} ;
} ;
append_cfd_state ( & cfd , & mut conn , & self . projection_actor ) . await ? ;
append_cfd_state ( & cfd , & mut conn , & self . projection_actor ) . await ? ;
let txid = self
let txid = self
. wallet
. wallet
. send ( wallet ::TryBroadcastTransaction {
. send ( wallet ::TryBroadcastTransaction {
tx : dlc . lock . 0. clone ( ) ,
tx : dlc . lock . 0. clone ( ) ,
} )
} )
. await ? ? ;
. await ? ? ;
tracing ::info ! ( "Lock transaction published with txid {}" , txid ) ;
tracing ::info ! ( "Lock transaction published with txid {}" , txid ) ;
self . monitor_actor
self . monitor_actor
. send ( monitor ::StartMonitoring {
. send ( monitor ::StartMonitoring {
id : order_id ,
id : order_id ,
params : MonitorParams ::new ( dlc . clone ( ) , cfd . refund_timelock_in_blocks ( ) ) ,
params : MonitorParams ::new ( dlc . clone ( ) , cfd . refund_timelock_in_blocks ( ) ) ,
} )
} )
. await ? ;
. await ? ;
self . oracle_actor
self . oracle_actor
. send ( oracle ::MonitorAttestation {
. send ( oracle ::MonitorAttestation {
event_id : dlc . settlement_event_id ,
event_id : dlc . settlement_event_id ,
} )
} )
. await ? ;
. await ? ;
Ok ( ( ) )
Ok ( ( ) )
} ) ;
}
}
}
}
@ -812,8 +810,8 @@ impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<TakerConnected> for
where
where
T : xtra ::Handler < maker_inc_connections ::TakerMessage > ,
T : xtra ::Handler < maker_inc_connections ::TakerMessage > ,
{
{
async fn handle ( & mut self , msg : TakerConnected , _ctx : & mut Context < Self > ) {
async fn handle ( & mut self , msg : TakerConnected , _ctx : & mut Context < Self > ) -> Result < ( ) > {
log_error ! ( self . handle_taker_connected ( msg . id ) ) ;
self . handle_taker_connected ( msg . id ) . await
}
}
}
}
@ -823,8 +821,8 @@ impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<TakerDisconnected>
where
where
T : xtra ::Handler < maker_inc_connections ::TakerMessage > ,
T : xtra ::Handler < maker_inc_connections ::TakerMessage > ,
{
{
async fn handle ( & mut self , msg : TakerDisconnected , _ctx : & mut Context < Self > ) {
async fn handle ( & mut self , msg : TakerDisconnected , _ctx : & mut Context < Self > ) -> Result < ( ) > {
log_error ! ( self . handle_taker_disconnected ( msg . id ) ) ;
self . handle_taker_disconnected ( msg . id ) . await
}
}
}
}
@ -834,8 +832,8 @@ where
M : xtra ::Handler < monitor ::StartMonitoring > ,
M : xtra ::Handler < monitor ::StartMonitoring > ,
O : xtra ::Handler < oracle ::MonitorAttestation > ,
O : xtra ::Handler < oracle ::MonitorAttestation > ,
{
{
async fn handle ( & mut self , msg : Completed , _ctx : & mut Context < Self > ) {
async fn handle ( & mut self , msg : Completed , _ctx : & mut Context < Self > ) -> Result < ( ) > {
log_error ! ( self . handle_roll_over_completed ( msg ) ) ;
self . handle_roll_over_completed ( msg ) . await
}
}
}
}
@ -844,8 +842,8 @@ impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<monitor::Event> for
where
where
W : xtra ::Handler < wallet ::TryBroadcastTransaction > ,
W : xtra ::Handler < wallet ::TryBroadcastTransaction > ,
{
{
async fn handle ( & mut self , msg : monitor ::Event , _ctx : & mut Context < Self > ) {
async fn handle ( & mut self , msg : monitor ::Event , _ctx : & mut Context < Self > ) -> Result < ( ) > {
log_error ! ( self . handle_monitoring_event ( msg ) )
self . handle_monitoring_event ( msg ) . await
}
}
}
}
@ -866,10 +864,15 @@ where
+ xtra ::Handler < wallet ::BuildPartyParams >
+ xtra ::Handler < wallet ::BuildPartyParams >
+ xtra ::Handler < wallet ::TryBroadcastTransaction > ,
+ xtra ::Handler < wallet ::TryBroadcastTransaction > ,
{
{
async fn handle ( & mut self , FromTaker { taker_id , msg } : FromTaker , ctx : & mut Context < Self > ) {
async fn handle (
& mut self ,
FromTaker { taker_id , msg } : FromTaker ,
ctx : & mut Context < Self > ,
) -> Result < ( ) > {
match msg {
match msg {
wire ::TakerToMaker ::TakeOrder { order_id , quantity } = > {
wire ::TakerToMaker ::TakeOrder { order_id , quantity } = > {
log_error ! ( self . handle_take_order ( taker_id , order_id , quantity , ctx ) )
self . handle_take_order ( taker_id , order_id , quantity , ctx )
. await ?
}
}
wire ::TakerToMaker ::Settlement {
wire ::TakerToMaker ::Settlement {
order_id ,
order_id ,
@ -881,17 +884,22 @@ where
price ,
price ,
} ,
} ,
} = > {
} = > {
log_error ! ( self . handle_propose_settlement (
if let Err ( e ) = self
taker_id ,
. handle_propose_settlement (
SettlementProposal {
taker_id ,
order_id ,
SettlementProposal {
timestamp ,
order_id ,
taker ,
timestamp ,
maker ,
taker ,
price
maker ,
} ,
price ,
ctx
} ,
) )
ctx ,
)
. await
{
tracing ::warn ! ( "Failed ot handle settlement proposal: {:#}" , e ) ;
}
}
}
wire ::TakerToMaker ::Settlement {
wire ::TakerToMaker ::Settlement {
msg : wire ::taker_to_maker ::Settlement ::Initiate { . . } ,
msg : wire ::taker_to_maker ::Settlement ::Initiate { . . } ,
@ -903,14 +911,19 @@ where
order_id ,
order_id ,
timestamp ,
timestamp ,
} = > {
} = > {
log_error ! ( self . handle_propose_roll_over (
if let Err ( e ) = self
RolloverProposal {
. handle_propose_roll_over (
order_id ,
RolloverProposal {
timestamp ,
order_id ,
} ,
timestamp ,
taker_id ,
} ,
ctx
taker_id ,
) )
ctx ,
)
. await
{
tracing ::warn ! ( "Failed to handle rollover proposal: {:#}" , e ) ;
}
}
}
wire ::TakerToMaker ::RollOverProtocol { . . } = > {
wire ::TakerToMaker ::RollOverProtocol { . . } = > {
unreachable ! ( "This kind of message should be sent to the rollover_maker::Actor`" )
unreachable ! ( "This kind of message should be sent to the rollover_maker::Actor`" )
@ -921,7 +934,9 @@ where
TakerToMaker ::Hello ( _ ) = > {
TakerToMaker ::Hello ( _ ) = > {
unreachable ! ( "The Hello message is not sent to the cfd actor" )
unreachable ! ( "The Hello message is not sent to the cfd actor" )
}
}
}
} ;
Ok ( ( ) )
}
}
}
}
@ -932,24 +947,26 @@ where
W : xtra ::Handler < wallet ::TryBroadcastTransaction > ,
W : xtra ::Handler < wallet ::TryBroadcastTransaction > ,
{
{
async fn handle ( & mut self , msg : oracle ::Attestation , _ctx : & mut Context < Self > ) {
async fn handle ( & mut self , msg : oracle ::Attestation , _ctx : & mut Context < Self > ) {
log_error ! ( self . handle_oracle_attestation ( msg ) )
if let Err ( e ) = self . handle_oracle_attestation ( msg ) . await {
tracing ::warn ! ( "Failed to handle oracle attestation: {:#}" , e )
}
}
}
}
}
impl Message for TakerConnected {
impl Message for TakerConnected {
type Result = ( ) ;
type Result = Result < ( ) > ;
}
}
impl Message for TakerDisconnected {
impl Message for TakerDisconnected {
type Result = ( ) ;
type Result = Result < ( ) > ;
}
}
impl Message for Completed {
impl Message for Completed {
type Result = ( ) ;
type Result = Result < ( ) > ;
}
}
impl Message for FromTaker {
impl Message for FromTaker {
type Result = ( ) ;
type Result = Result < ( ) > ;
}
}
impl < O : 'static , M : 'static , T : 'static , W : 'static > xtra ::Actor for Actor < O , M , T , W > { }
impl < O : 'static , M : 'static , T : 'static , W : 'static > xtra ::Actor for Actor < O , M , T , W > { }