@ -85,7 +85,8 @@ pub struct Actor {
setup_state : SetupState ,
setup_state : SetupState ,
latest_announcements : Option < BTreeMap < OracleEventId , oracle ::Announcement > > ,
latest_announcements : Option < BTreeMap < OracleEventId , oracle ::Announcement > > ,
oracle_actor : Address < oracle ::Actor < Actor , monitor ::Actor < Actor > > > ,
oracle_actor : Address < oracle ::Actor < Actor , monitor ::Actor < Actor > > > ,
current_pending_proposals : UpdateCfdProposals ,
// Maker needs to also store TakerId to be able to send a reply back
current_pending_proposals : HashMap < OrderId , ( UpdateCfdProposal , TakerId ) > ,
}
}
enum SetupState {
enum SetupState {
@ -126,10 +127,33 @@ impl Actor {
}
}
}
}
/// Send pending proposals for the purposes of UI updates.
/// Filters out the TakerIds, as they are an implementation detail inside of
/// the actor
fn send_pending_proposals ( & self ) -> Result < ( ) > {
fn send_pending_proposals ( & self ) -> Result < ( ) > {
Ok ( self
Ok ( self . update_cfd_feed_sender . send (
. update_cfd_feed_sender
self . current_pending_proposals
. send ( self . current_pending_proposals . clone ( ) ) ? )
. iter ( )
. map ( | ( order_id , ( update_cfd , _ ) ) | ( * order_id , ( update_cfd . clone ( ) ) ) )
. collect ( ) ,
) ? )
}
fn get_taker_id_of_proposal ( & self , order_id : & OrderId ) -> Result < TakerId > {
let ( _ , taker_id ) = self
. current_pending_proposals
. get ( order_id )
. context ( "Could not find proposal for given order id" ) ? ;
Ok ( * taker_id )
}
/// Removes a proposal and updates the update cfd proposals' feed
fn remove_pending_proposal ( & mut self , order_id : & OrderId ) -> Result < ( ) > {
if self . current_pending_proposals . remove ( order_id ) . is_none ( ) {
anyhow ::bail ! ( "Could not find proposal with order id: {}" , & order_id )
}
self . send_pending_proposals ( ) ? ;
Ok ( ( ) )
}
}
async fn handle_new_order (
async fn handle_new_order (
@ -193,35 +217,50 @@ impl Actor {
Ok ( ( ) )
Ok ( ( ) )
}
}
async fn handle_propose_settlement ( & mut self , proposal : SettlementProposal ) -> Result < ( ) > {
async fn handle_propose_settlement (
& mut self ,
taker_id : TakerId ,
proposal : SettlementProposal ,
) -> Result < ( ) > {
tracing ::info ! (
tracing ::info ! (
"Received settlement proposal from the taker: {:?}" ,
"Received settlement proposal from the taker: {:?}" ,
proposal
proposal
) ;
) ;
self . current_pending_proposals . insert (
self . current_pending_proposals . insert (
proposal . order_id ,
proposal . order_id ,
(
UpdateCfdProposal ::Settlement {
UpdateCfdProposal ::Settlement {
proposal ,
proposal ,
direction : SettlementKind ::Incoming ,
direction : SettlementKind ::Incoming ,
} ,
} ,
taker_id ,
) ,
) ;
) ;
self . send_pending_proposals ( ) ? ;
self . send_pending_proposals ( ) ? ;
Ok ( ( ) )
Ok ( ( ) )
}
}
async fn handle_propose_roll_over ( & mut self , proposal : RollOverProposal ) -> Result < ( ) > {
async fn handle_propose_roll_over (
& mut self ,
taker_id : TakerId ,
proposal : RollOverProposal ,
) -> Result < ( ) > {
tracing ::info ! (
tracing ::info ! (
"Received proposal from the taker: {:?} to roll over order {}" ,
"Received proposal from the taker {}: {:?} to roll over order {}" ,
taker_id ,
proposal ,
proposal ,
proposal . order_id
proposal . order_id
) ;
) ;
self . current_pending_proposals . insert (
self . current_pending_proposals . insert (
proposal . order_id ,
proposal . order_id ,
(
UpdateCfdProposal ::RollOverProposal {
UpdateCfdProposal ::RollOverProposal {
proposal ,
proposal ,
direction : SettlementKind ::Incoming ,
direction : SettlementKind ::Incoming ,
} ,
} ,
taker_id ,
) ,
) ;
) ;
self . send_pending_proposals ( ) ? ;
self . send_pending_proposals ( ) ? ;
@ -516,47 +555,57 @@ impl Actor {
async fn handle_accept_settlement ( & mut self , order_id : OrderId ) -> Result < ( ) > {
async fn handle_accept_settlement ( & mut self , order_id : OrderId ) -> Result < ( ) > {
tracing ::debug ! ( % order_id , "Maker accepts a settlement proposal" ) ;
tracing ::debug ! ( % order_id , "Maker accepts a settlement proposal" ) ;
// TODO: Initiate the settlement
self . current_pending_proposals
let taker_id = self . get_taker_id_of_proposal ( & order_id ) ? ;
. remove ( & order_id )
. context ( "Could not find proposal for given order id" ) ? ;
// TODO: Initiate the settlement - should we start calculating the
self . send_pending_proposals ( ) ? ;
// signature here?
self . takers
. do_send_async ( maker_inc_connections ::TakerMessage {
taker_id ,
command : TakerCommand ::NotifySettlementAccepted { id : order_id } ,
} )
. await ? ;
self . remove_pending_proposal ( & order_id )
. context ( "accepted settlement" ) ? ;
Ok ( ( ) )
Ok ( ( ) )
}
}
async fn handle_reject_settlement ( & mut self , order_id : OrderId ) -> Result < ( ) > {
async fn handle_reject_settlement ( & mut self , order_id : OrderId ) -> Result < ( ) > {
tracing ::debug ! ( % order_id , "Maker rejects a settlement proposal" ) ;
tracing ::debug ! ( % order_id , "Maker rejects a settlement proposal" ) ;
// TODO: Handle rejection offer:
// - notify the taker that the settlement was rejected
self . current_pending_proposals
let taker_id = self . get_taker_id_of_proposal ( & order_id ) ? ;
. remove ( & order_id )
. context ( "Could not find proposal for given order id" ) ? ;
self . takers
self . send_pending_proposals ( ) ? ;
. do_send_async ( maker_inc_connections ::TakerMessage {
taker_id ,
command : TakerCommand ::NotifySettlementRejected { id : order_id } ,
} )
. await ? ;
self . remove_pending_proposal ( & order_id )
. context ( "rejected settlement" ) ? ;
Ok ( ( ) )
Ok ( ( ) )
}
}
async fn handle_accept_roll_over ( & mut self , order_id : OrderId ) -> Result < ( ) > {
async fn handle_accept_roll_over ( & mut self , order_id : OrderId ) -> Result < ( ) > {
tracing ::debug ! ( % order_id , "Maker accepts a roll over proposal" ) ;
tracing ::debug ! ( % order_id , "Maker accepts a rollover proposal" ) ;
// TODO: Initiate the roll over logic
// TODO: Initiate the roll over logic
self . current_pending_proposals
self . remove_pending_proposal ( & order_id )
. remove ( & order_id )
. context ( "accepted rollover" ) ? ;
. context ( "Could not find roll over proposal for given order id" ) ? ;
self . send_pending_proposals ( ) ? ;
Ok ( ( ) )
Ok ( ( ) )
}
}
async fn handle_reject_roll_over ( & mut self , order_id : OrderId ) -> Result < ( ) > {
async fn handle_reject_roll_over ( & mut self , order_id : OrderId ) -> Result < ( ) > {
tracing ::debug ! ( % order_id , "Maker rejects a roll over proposal" ) ;
tracing ::debug ! ( % order_id , "Maker rejects a rollover proposal" ) ;
// TODO: Handle rejection and notify the taker that the roll over was rejected
// TODO: Handle rejection and notify the taker that the rollover was rejected
self . current_pending_proposals
self . remove_pending_proposal ( & order_id )
. remove ( & order_id )
. context ( "rejected rollover" ) ? ;
. context ( "Could not find roll over proposal for given order id" ) ? ;
self . send_pending_proposals ( ) ? ;
Ok ( ( ) )
Ok ( ( ) )
}
}
@ -726,16 +775,13 @@ impl Handler<monitor::Event> for Actor {
#[ async_trait ]
#[ async_trait ]
impl Handler < TakerStreamMessage > for Actor {
impl Handler < TakerStreamMessage > for Actor {
async fn handle ( & mut self , msg : TakerStreamMessage , _ctx : & mut Context < Self > ) -> KeepRunning {
async fn handle ( & mut self , msg : TakerStreamMessage , _ctx : & mut Context < Self > ) -> KeepRunning {
let TakerStreamMessage {
let TakerStreamMessage { taker_id , item } = msg ;
taker_id : taker ,
item ,
} = msg ;
let msg = match item {
let msg = match item {
Ok ( msg ) = > msg ,
Ok ( msg ) = > msg ,
Err ( e ) = > {
Err ( e ) = > {
tracing ::warn ! (
tracing ::warn ! (
"Error while receiving message from taker {}: {:#}" ,
"Error while receiving message from taker {}: {:#}" ,
taker ,
taker_id ,
e
e
) ;
) ;
return KeepRunning ::Yes ;
return KeepRunning ::Yes ;
@ -744,7 +790,7 @@ impl Handler<TakerStreamMessage> for Actor {
match msg {
match msg {
wire ::TakerToMaker ::TakeOrder { order_id , quantity } = > {
wire ::TakerToMaker ::TakeOrder { order_id , quantity } = > {
log_error ! ( self . handle_take_order ( taker , order_id , quantity ) )
log_error ! ( self . handle_take_order ( taker_id , order_id , quantity ) )
}
}
wire ::TakerToMaker ::ProposeSettlement {
wire ::TakerToMaker ::ProposeSettlement {
order_id ,
order_id ,
@ -752,24 +798,30 @@ impl Handler<TakerStreamMessage> for Actor {
taker ,
taker ,
maker ,
maker ,
} = > {
} = > {
log_error ! ( self . handle_propose_settlement ( SettlementProposal {
log_error ! ( self . handle_propose_settlement (
taker_id ,
SettlementProposal {
order_id ,
order_id ,
timestamp ,
timestamp ,
taker ,
taker ,
maker
maker
} ) )
}
) )
}
}
wire ::TakerToMaker ::Protocol ( msg ) = > {
wire ::TakerToMaker ::Protocol ( msg ) = > {
log_error ! ( self . handle_inc_protocol_msg ( taker , msg ) )
log_error ! ( self . handle_inc_protocol_msg ( taker_id , msg ) )
}
}
TakerToMaker ::ProposeRollOver {
TakerToMaker ::ProposeRollOver {
order_id ,
order_id ,
timestamp ,
timestamp ,
} = > {
} = > {
log_error ! ( self . handle_propose_roll_over ( RollOverProposal {
log_error ! ( self . handle_propose_roll_over (
taker_id ,
RollOverProposal {
order_id ,
order_id ,
timestamp ,
timestamp ,
} ) )
}
) )
}
}
}
}