@ -11,7 +11,6 @@ use crate::model::cfd::{
use crate ::model ::{ OracleEventId , TakerId , Usd } ;
use crate ::model ::{ OracleEventId , TakerId , Usd } ;
use crate ::monitor ::MonitorParams ;
use crate ::monitor ::MonitorParams ;
use crate ::wallet ::Wallet ;
use crate ::wallet ::Wallet ;
use crate ::wire ::TakerToMaker ;
use crate ::{ maker_inc_connections , monitor , oracle , setup_contract , wire } ;
use crate ::{ maker_inc_connections , monitor , oracle , setup_contract , wire } ;
use anyhow ::{ Context as _ , Result } ;
use anyhow ::{ Context as _ , Result } ;
use async_trait ::async_trait ;
use async_trait ::async_trait ;
@ -67,6 +66,11 @@ pub struct CfdSetupCompleted {
pub dlc : Result < Dlc > ,
pub dlc : Result < Dlc > ,
}
}
pub struct CfdRollOverCompleted {
pub order_id : OrderId ,
pub dlc : Result < Dlc > ,
}
pub struct TakerStreamMessage {
pub struct TakerStreamMessage {
pub taker_id : TakerId ,
pub taker_id : TakerId ,
pub item : Result < wire ::TakerToMaker > ,
pub item : Result < wire ::TakerToMaker > ,
@ -83,6 +87,7 @@ pub struct Actor {
current_order_id : Option < OrderId > ,
current_order_id : Option < OrderId > ,
monitor_actor : Address < monitor ::Actor < Actor > > ,
monitor_actor : Address < monitor ::Actor < Actor > > ,
setup_state : SetupState ,
setup_state : SetupState ,
roll_over_state : RollOverState ,
latest_announcements : Option < BTreeMap < OracleEventId , oracle ::Announcement > > ,
latest_announcements : Option < BTreeMap < OracleEventId , oracle ::Announcement > > ,
oracle_actor : Address < oracle ::Actor < Actor , monitor ::Actor < Actor > > > ,
oracle_actor : Address < oracle ::Actor < Actor , monitor ::Actor < Actor > > > ,
// Maker needs to also store TakerId to be able to send a reply back
// Maker needs to also store TakerId to be able to send a reply back
@ -97,6 +102,14 @@ enum SetupState {
None ,
None ,
}
}
enum RollOverState {
Active {
taker : TakerId ,
sender : mpsc ::UnboundedSender < wire ::RollOverMsg > ,
} ,
None ,
}
impl Actor {
impl Actor {
#[ allow(clippy::too_many_arguments) ]
#[ allow(clippy::too_many_arguments) ]
pub fn new (
pub fn new (
@ -121,6 +134,7 @@ impl Actor {
current_order_id : None ,
current_order_id : None ,
monitor_actor ,
monitor_actor ,
setup_state : SetupState ::None ,
setup_state : SetupState ::None ,
roll_over_state : RollOverState ::None ,
latest_announcements : None ,
latest_announcements : None ,
oracle_actor ,
oracle_actor ,
current_pending_proposals : HashMap ::new ( ) ,
current_pending_proposals : HashMap ::new ( ) ,
@ -243,8 +257,8 @@ impl Actor {
async fn handle_propose_roll_over (
async fn handle_propose_roll_over (
& mut self ,
& mut self ,
taker_id : TakerId ,
proposal : RollOverProposal ,
proposal : RollOverProposal ,
taker_id : TakerId ,
) -> Result < ( ) > {
) -> Result < ( ) > {
tracing ::info ! (
tracing ::info ! (
"Received proposal from the taker {}: {:?} to roll over order {}" ,
"Received proposal from the taker {}: {:?} to roll over order {}" ,
@ -252,6 +266,20 @@ impl Actor {
proposal ,
proposal ,
proposal . order_id
proposal . order_id
) ;
) ;
// check if CFD is in open state, otherwise we should not proceed
let mut conn = self . db . acquire ( ) . await ? ;
let cfd = load_cfd_by_order_id ( proposal . order_id , & mut conn ) . await ? ;
match cfd {
Cfd {
state : CfdState ::Open { . . } ,
. .
} = > ( ) ,
_ = > {
anyhow ::bail ! ( "Order is in invalid state. Cannot propose roll over." )
}
} ;
self . current_pending_proposals . insert (
self . current_pending_proposals . insert (
proposal . order_id ,
proposal . order_id ,
(
(
@ -287,6 +315,24 @@ impl Actor {
Ok ( ( ) )
Ok ( ( ) )
}
}
async fn handle_inc_roll_over_protocol_msg (
& mut self ,
taker_id : TakerId ,
msg : wire ::RollOverMsg ,
) -> Result < ( ) > {
match & mut self . roll_over_state {
RollOverState ::Active { taker , sender } if taker_id = = * taker = > {
sender . send ( msg ) . await ? ;
}
RollOverState ::Active { taker , . . } = > {
anyhow ::bail ! ( "Currently rolling over with different taker {}" , taker )
}
RollOverState ::None = > { }
}
Ok ( ( ) )
}
async fn handle_cfd_setup_completed (
async fn handle_cfd_setup_completed (
& mut self ,
& mut self ,
order_id : OrderId ,
order_id : OrderId ,
@ -335,6 +381,43 @@ impl Actor {
Ok ( ( ) )
Ok ( ( ) )
}
}
async fn handle_cfd_roll_over_completed (
& mut self ,
order_id : OrderId ,
dlc : Result < Dlc > ,
) -> Result < ( ) > {
let dlc = dlc . context ( "Failed to roll over contract with taker" ) ? ;
self . roll_over_state = RollOverState ::None ;
let mut conn = self . db . acquire ( ) . await ? ;
insert_new_cfd_state_by_order_id (
order_id ,
CfdState ::Open {
common : CfdStateCommon {
transition_timestamp : SystemTime ::now ( ) ,
} ,
dlc : dlc . clone ( ) ,
attestation : None ,
} ,
& mut conn ,
)
. await ? ;
self . cfd_feed_actor_inbox
. send ( load_all_cfds ( & mut conn ) . await ? ) ? ;
let cfd = load_cfd_by_order_id ( order_id , & mut conn ) . await ? ;
self . monitor_actor
. do_send_async ( monitor ::StartMonitoring {
id : order_id ,
params : MonitorParams ::from_dlc_and_timelocks ( dlc , cfd . refund_timelock_in_blocks ( ) ) ,
} )
. await ? ;
Ok ( ( ) )
}
async fn handle_take_order (
async fn handle_take_order (
& mut self ,
& mut self ,
taker_id : TakerId ,
taker_id : TakerId ,
@ -590,22 +673,120 @@ impl Actor {
Ok ( ( ) )
Ok ( ( ) )
}
}
async fn handle_accept_roll_over ( & mut self , order_id : OrderId ) -> Result < ( ) > {
async fn handle_accept_roll_over (
tracing ::debug ! ( % order_id , "Maker accepts a rollover proposal" ) ;
& mut self ,
order_id : OrderId ,
ctx : & mut Context < Self > ,
) -> Result < ( ) > {
tracing ::debug ! ( % order_id , "Maker accepts a roll_over proposal" ) ;
// TODO: Initiate the roll over logic
let mut conn = self . db . acquire ( ) . await ? ;
let cfd = load_cfd_by_order_id ( order_id , & mut conn ) . await ? ;
// Validate if order is actually being requested to be extended
let ( proposal , taker_id ) = match self . current_pending_proposals . get ( & order_id ) {
Some ( (
UpdateCfdProposal ::RollOverProposal {
proposal ,
direction : SettlementKind ::Incoming ,
} ,
taker_id ,
) ) = > ( proposal , * taker_id ) ,
_ = > {
anyhow ::bail ! ( "Order is in invalid state. Ignoring trying to accept the roll over request it." )
}
} ;
let dlc = cfd . open_dlc ( ) . context ( "CFD was in wrong state" ) ? ;
// TODO: do we want to store in the db that we rolled over?
let ( oracle_event_id , announcement ) = self
. latest_announcements
. clone ( )
. context ( "Cannot roll over because no announcement from oracle was found" ) ?
. into_iter ( )
. next_back ( )
. context ( "Empty list of announcements" ) ? ;
self . takers
. send ( maker_inc_connections ::TakerMessage {
taker_id ,
command : TakerCommand ::NotifyRollOverAccepted {
id : proposal . order_id ,
oracle_event_id ,
} ,
} )
. await ? ;
self . oracle_actor
. do_send_async ( oracle ::MonitorEvent {
event_id : announcement . id . clone ( ) ,
} )
. await ? ;
let ( sender , receiver ) = mpsc ::unbounded ( ) ;
let contract_future = setup_contract ::roll_over (
self . takers . clone ( ) . into_sink ( ) . with ( move | msg | {
future ::ok ( maker_inc_connections ::TakerMessage {
taker_id ,
command : TakerCommand ::RollOverProtocol ( msg ) ,
} )
} ) ,
receiver ,
( self . oracle_pk , announcement ) ,
cfd ,
Role ::Maker ,
dlc ,
) ;
let this = ctx
. address ( )
. expect ( "actor to be able to give address to itself" ) ;
self . roll_over_state = RollOverState ::Active {
sender ,
taker : taker_id ,
} ;
tokio ::spawn ( async move {
let dlc = contract_future . await ;
this . do_send_async ( CfdRollOverCompleted { order_id , dlc } )
. await
} ) ;
self . remove_pending_proposal ( & order_id )
self . remove_pending_proposal ( & order_id )
. context ( "accepted rollover" ) ? ;
. context ( "accepted roll_ over" ) ? ;
Ok ( ( ) )
Ok ( ( ) )
}
}
async fn handle_reject_roll_over ( & mut self , order_id : OrderId ) -> Result < ( ) > {
async fn handle_reject_roll_over ( & mut self , order_id : OrderId ) -> Result < ( ) > {
tracing ::debug ! ( % order_id , "Maker rejects a rollover proposal" ) ;
tracing ::debug ! ( % order_id , "Maker rejects a roll_over proposal" ) ;
// TODO: Handle rejection and notify the taker that the rollover was rejected
// Validate if order is actually being requested to be extended
let ( _ , taker_id ) = match self . current_pending_proposals . get ( & order_id ) {
Some ( (
UpdateCfdProposal ::RollOverProposal {
proposal ,
direction : SettlementKind ::Incoming ,
} ,
taker_id ,
) ) = > ( proposal , * taker_id ) ,
_ = > {
anyhow ::bail ! ( "Order is in invalid state. Ignoring reject roll over request." )
}
} ;
self . takers
. do_send_async ( maker_inc_connections ::TakerMessage {
taker_id ,
command : TakerCommand ::NotifyRollOverRejected { id : order_id } ,
} )
. await ? ;
self . remove_pending_proposal ( & order_id )
self . remove_pending_proposal ( & order_id )
. context ( "rejected rollover" ) ? ;
. context ( "rejected roll_ over" ) ? ;
Ok ( ( ) )
Ok ( ( ) )
}
}
@ -725,8 +906,8 @@ impl Handler<RejectSettlement> for Actor {
#[ async_trait ]
#[ async_trait ]
impl Handler < AcceptRollOver > for Actor {
impl Handler < AcceptRollOver > for Actor {
async fn handle ( & mut self , msg : AcceptRollOver , _ ctx : & mut Context < Self > ) {
async fn handle ( & mut self , msg : AcceptRollOver , ctx : & mut Context < Self > ) {
log_error ! ( self . handle_accept_roll_over ( msg . order_id ) )
log_error ! ( self . handle_accept_roll_over ( msg . order_id , ctx ) )
}
}
}
}
@ -765,6 +946,13 @@ impl Handler<CfdSetupCompleted> for Actor {
}
}
}
}
#[ async_trait ]
impl Handler < CfdRollOverCompleted > for Actor {
async fn handle ( & mut self , msg : CfdRollOverCompleted , _ctx : & mut Context < Self > ) {
log_error ! ( self . handle_cfd_roll_over_completed ( msg . order_id , msg . dlc ) ) ;
}
}
#[ async_trait ]
#[ async_trait ]
impl Handler < monitor ::Event > for Actor {
impl Handler < monitor ::Event > for Actor {
async fn handle ( & mut self , msg : monitor ::Event , _ctx : & mut Context < Self > ) {
async fn handle ( & mut self , msg : monitor ::Event , _ctx : & mut Context < Self > ) {
@ -811,18 +999,22 @@ impl Handler<TakerStreamMessage> for Actor {
wire ::TakerToMaker ::Protocol ( msg ) = > {
wire ::TakerToMaker ::Protocol ( msg ) = > {
log_error ! ( self . handle_inc_protocol_msg ( taker_id , msg ) )
log_error ! ( self . handle_inc_protocol_msg ( taker_id , msg ) )
}
}
TakerToMaker ::ProposeRollOver {
wire ::TakerToMaker ::ProposeRollOver {
order_id ,
order_id ,
timestamp ,
timestamp ,
} = > {
} = > {
log_error ! ( self . handle_propose_roll_over (
log_error ! ( self . handle_propose_roll_over (
taker_id ,
RollOverProposal {
RollOverProposal {
order_id ,
order_id ,
timestamp ,
timestamp ,
}
} ,
taker_id ,
) )
) )
}
}
wire ::TakerToMaker ::RollOverProtocol ( msg ) = > {
log_error ! ( self . handle_inc_roll_over_protocol_msg ( taker_id , msg ) )
}
}
}
KeepRunning ::Yes
KeepRunning ::Yes
@ -855,6 +1047,10 @@ impl Message for CfdSetupCompleted {
type Result = ( ) ;
type Result = ( ) ;
}
}
impl Message for CfdRollOverCompleted {
type Result = ( ) ;
}
impl Message for AcceptOrder {
impl Message for AcceptOrder {
type Result = ( ) ;
type Result = ( ) ;
}
}