@ -23,23 +23,24 @@ use futures::{future, SinkExt};
use std ::collections ::HashMap ;
use xtra ::prelude ::* ;
use xtra ::Actor as _ ;
use xtra_productivity ::xtra_productivity ;
pub struct TakeOffer {
pub order_id : OrderId ,
pub quantity : Usd ,
}
pub enum CfdAction {
ProposeSettlement {
order_id : OrderId ,
current_price : Price ,
} ,
ProposeRollOver {
order_id : OrderId ,
} ,
Commit {
order_id : OrderId ,
} ,
pub struct ProposeSettlement {
pub order_id : OrderId ,
pub current_price : Price ,
}
pub struct ProposeRollOver {
pub order_id : OrderId ,
}
pub struct Commit {
pub order_id : OrderId ,
}
pub struct CfdRollOverCompleted {
@ -135,24 +136,59 @@ impl<O, M, W> Actor<O, M, W> {
}
}
#[ xtra_productivity ]
impl < O , M , W > Actor < O , M , W >
where
W : xtra ::Handler < wallet ::TryBroadcastTransaction >
+ xtra ::Handler < wallet ::Sign >
+ xtra ::Handler < wallet ::BuildPartyParams > ,
W : xtra ::Handler < wallet ::TryBroadcastTransaction > ,
{
async fn handle_commit ( & mut self , order_id : OrderId ) -> Result < ( ) > {
async fn handle_commit ( & mut self , msg : Commit ) -> Result < ( ) > {
let Commit { order_id } = msg ;
let mut conn = self . db . acquire ( ) . await ? ;
cfd_actors ::handle_commit ( order_id , & mut conn , & self . wallet , & self . projection_actor )
. await ? ;
Ok ( ( ) )
}
async fn handle_propose_settlement (
& mut self ,
order_id : OrderId ,
current_price : Price ,
) -> Result < ( ) > {
async fn handle_propose_roll_over ( & mut self , msg : ProposeRollOver ) -> Result < ( ) > {
let ProposeRollOver { order_id } = msg ;
if self . current_pending_proposals . contains_key ( & order_id ) {
anyhow ::bail ! ( "An update for order id {} is already in progress" , order_id )
}
let proposal = RollOverProposal {
order_id ,
timestamp : Timestamp ::now ( ) ,
} ;
self . current_pending_proposals . insert (
proposal . order_id ,
UpdateCfdProposal ::RollOverProposal {
proposal : proposal . clone ( ) ,
direction : SettlementKind ::Outgoing ,
} ,
) ;
self . send_pending_update_proposals ( ) . await ? ;
self . conn_actor
. send ( wire ::TakerToMaker ::ProposeRollOver {
order_id : proposal . order_id ,
timestamp : proposal . timestamp ,
} )
. await ? ;
Ok ( ( ) )
}
}
#[ xtra_productivity ]
impl < O , M , W > Actor < O , M , W > {
async fn handle_propose_settlement ( & mut self , msg : ProposeSettlement ) -> Result < ( ) > {
let ProposeSettlement {
order_id ,
current_price ,
} = msg ;
let mut conn = self . db . acquire ( ) . await ? ;
let cfd = load_cfd_by_order_id ( order_id , & mut conn ) . await ? ;
@ -196,7 +232,14 @@ where
. await ? ;
Ok ( ( ) )
}
}
impl < O , M , W > Actor < O , M , W >
where
W : xtra ::Handler < wallet ::TryBroadcastTransaction >
+ xtra ::Handler < wallet ::Sign >
+ xtra ::Handler < wallet ::BuildPartyParams > ,
{
async fn handle_settlement_rejected ( & mut self , order_id : OrderId ) -> Result < ( ) > {
tracing ::info ! ( % order_id , "Settlement proposal got rejected" ) ;
@ -318,34 +361,6 @@ where
. await ? ;
Ok ( ( ) )
}
async fn handle_propose_roll_over ( & mut self , order_id : OrderId ) -> Result < ( ) > {
if self . current_pending_proposals . contains_key ( & order_id ) {
anyhow ::bail ! ( "An update for order id {} is already in progress" , order_id )
}
let proposal = RollOverProposal {
order_id ,
timestamp : Timestamp ::now ( ) ,
} ;
self . current_pending_proposals . insert (
proposal . order_id ,
UpdateCfdProposal ::RollOverProposal {
proposal : proposal . clone ( ) ,
direction : SettlementKind ::Outgoing ,
} ,
) ;
self . send_pending_update_proposals ( ) . await ? ;
self . conn_actor
. send ( wire ::TakerToMaker ::ProposeRollOver {
order_id : proposal . order_id ,
timestamp : proposal . timestamp ,
} )
. await ? ;
Ok ( ( ) )
}
}
impl < O , M , W > Actor < O , M , W >
@ -657,34 +672,6 @@ where
}
}
#[ async_trait ]
impl < O : 'static , M : 'static , W : 'static > Handler < CfdAction > for Actor < O , M , W >
where
W : xtra ::Handler < wallet ::TryBroadcastTransaction >
+ xtra ::Handler < wallet ::Sign >
+ xtra ::Handler < wallet ::BuildPartyParams > ,
{
async fn handle ( & mut self , msg : CfdAction , _ctx : & mut Context < Self > ) -> Result < ( ) > {
use CfdAction ::* ;
if let Err ( e ) = match msg {
Commit { order_id } = > self . handle_commit ( order_id ) . await ,
ProposeSettlement {
order_id ,
current_price ,
} = > {
self . handle_propose_settlement ( order_id , current_price )
. await
}
ProposeRollOver { order_id } = > self . handle_propose_roll_over ( order_id ) . await ,
} {
tracing ::error ! ( "Message handler failed: {:#}" , e ) ;
anyhow ::bail ! ( e )
}
Ok ( ( ) )
}
}
#[ async_trait ]
impl < O : 'static , M : 'static , W : 'static > Handler < wire ::MakerToTaker > for Actor < O , M , W >
where
@ -785,10 +772,6 @@ impl Message for TakeOffer {
type Result = Result < ( ) > ;
}
impl Message for CfdAction {
type Result = Result < ( ) > ;
}
impl Message for CfdRollOverCompleted {
type Result = ( ) ;
}