@ -16,8 +16,8 @@ use bdk::bitcoin::secp256k1::schnorrsig;
use cfd_protocol ::secp256k1_zkp ::Signature ;
use futures ::channel ::mpsc ;
use futures ::{ future , SinkExt } ;
use rocket_db_pools ::sqlx ::Sqlite ;
use sqlx ::pool ::PoolConnection ;
use sqlx ::Sqlite ;
use std ::collections ::HashMap ;
use std ::time ::SystemTime ;
use tokio ::sync ::watch ;
@ -58,19 +58,19 @@ pub struct FromTaker {
pub msg : wire ::TakerToMaker ,
}
pub struct Actor {
pub struct Actor < O , M , T > {
db : sqlx ::SqlitePool ,
wallet : Wallet ,
oracle_pk : schnorrsig ::PublicKey ,
cfd_feed_actor_inbox : watch ::Sender < Vec < Cfd > > ,
order_feed_sender : watch ::Sender < Option < Order > > ,
update_cfd_feed_sender : watch ::Sender < UpdateCfdProposals > ,
takers : Address < maker_inc_connections ::Actor > ,
takers : Address < T > ,
current_order_id : Option < OrderId > ,
monitor_actor : Address < monitor ::Actor > ,
monitor_actor : Address < M > ,
setup_state : SetupState ,
roll_over_state : RollOverState ,
oracle_actor : Address < oracle ::Actor > ,
oracle_actor : Address < O > ,
// Maker needs to also store TakerId to be able to send a reply back
current_pending_proposals : HashMap < OrderId , ( UpdateCfdProposal , TakerId ) > ,
// TODO: Persist instead of using an in-memory hashmap for resiliency?
@ -95,7 +95,7 @@ enum RollOverState {
None ,
}
impl Actor {
impl < O , M , T > Actor < O , M , T > {
#[ allow(clippy::too_many_arguments) ]
pub fn new (
db : sqlx ::SqlitePool ,
@ -104,9 +104,9 @@ impl Actor {
cfd_feed_actor_inbox : watch ::Sender < Vec < Cfd > > ,
order_feed_sender : watch ::Sender < Option < Order > > ,
update_cfd_feed_sender : watch ::Sender < UpdateCfdProposals > ,
takers : Address < maker_inc_connections ::Actor > ,
monitor_actor : Address < monitor ::Actor > ,
oracle_actor : Address < oracle ::Actor > ,
takers : Address < T > ,
monitor_actor : Address < M > ,
oracle_actor : Address < O > ,
) -> Self {
Self {
db ,
@ -126,104 +126,55 @@ impl Actor {
}
}
/// Send pending proposals for the purposes of UI updates.
/// Filters out the TakerIds, as they are an implementation detail inside of
/// the actor
fn send_pending_proposals ( & self ) -> Result < ( ) > {
Ok ( self . update_cfd_feed_sender . send (
self . current_pending_proposals
. iter ( )
. map ( | ( order_id , ( update_cfd , _ ) ) | ( * order_id , ( update_cfd . clone ( ) ) ) )
. collect ( ) ,
) ? )
}
fn get_taker_id_of_proposal ( & self , order_id : & OrderId ) -> Result < TakerId > {
let ( _ , taker_id ) = self
. current_pending_proposals
. get ( order_id )
. context ( "Could not find proposal for given order id" ) ? ;
Ok ( * taker_id )
}
async fn handle_commit ( & mut self , order_id : OrderId ) -> Result < ( ) > {
let mut conn = self . db . acquire ( ) . await ? ;
cfd_actors ::handle_commit (
order_id ,
& mut conn ,
& self . wallet ,
& self . cfd_feed_actor_inbox ,
)
. await ? ;
/// Removes a proposal and updates the update cfd proposals' feed
fn remove_pending_proposal ( & mut self , order_id : & OrderId ) -> Result < ( ) > {
if self . current_pending_proposals . remove ( order_id ) . is_none ( ) {
anyhow ::bail ! ( "Could not find proposal with order id: {}" , & order_id )
}
self . send_pending_proposals ( ) ? ;
Ok ( ( ) )
}
fn get_settlement_proposal ( & self , order_id : OrderId ) -> Result < ( SettlementProposal , TakerId ) > {
let ( update_proposal , taker_id ) = self
. current_pending_proposals
. get ( & order_id )
. context ( "have a proposal that is about to be accepted" ) ? ;
let proposal = match update_proposal {
UpdateCfdProposal ::Settlement { proposal , . . } = > proposal ,
UpdateCfdProposal ::RollOverProposal { . . } = > {
anyhow ::bail ! ( "did not expect a rollover proposal" ) ;
}
} ;
Ok ( ( proposal . clone ( ) , * taker_id ) )
}
async fn handle_new_order (
async fn handle_propose_roll_over (
& mut self ,
price : Usd ,
min_quantity : Usd ,
max_quantity : Usd ,
proposal : RollOverProposal ,
taker_id : TakerId ,
) -> Result < ( ) > {
let oracle_event_id =
oracle ::next_announcement_after ( time ::OffsetDateTime ::now_utc ( ) + Order ::TERM ) ? ;
self . oracle_actor
. do_send_async ( oracle ::FetchAnnouncement ( oracle_event_id ) )
. await ? ;
let order = Order ::new (
price ,
min_quantity ,
max_quantity ,
Origin ::Ours ,
oracle_event_id ,
) ? ;
// 1. Save to DB
let mut conn = self . db . acquire ( ) . await ? ;
insert_order ( & order , & mut conn ) . await ? ;
// 2. Update actor state to current order
self . current_order_id . replace ( order . id ) ;
// 3. Notify UI via feed
self . order_feed_sender . send ( Some ( order . clone ( ) ) ) ? ;
// 4. Inform connected takers
self . takers
. do_send_async ( maker_inc_connections ::BroadcastOrder ( Some ( order ) ) )
. await ? ;
Ok ( ( ) )
}
tracing ::info ! (
"Received proposal from the taker {}: {:?} to roll over order {}" ,
taker_id ,
proposal ,
proposal . order_id
) ;
async fn handle_new_taker_online ( & mut self , taker_id : TakerId ) -> Result < ( ) > {
// check if CFD is in open state, otherwise we should not proceed
let mut conn = self . db . acquire ( ) . await ? ;
let current_order = match self . current_order_id {
Some ( current_order_id ) = > Some ( load_order_by_id ( current_order_id , & mut conn ) . await ? ) ,
None = > None ,
let cfd = load_cfd_by_order_id ( proposal . order_id , & mut conn ) . await ? ;
match cfd {
Cfd {
state : CfdState ::Open { . . } ,
. .
} = > ( ) ,
_ = > {
anyhow ::bail ! ( "Order is in invalid state. Cannot propose roll over." )
}
} ;
self . takers
. do_send_async ( maker_inc_connections ::TakerMessage {
taker_id ,
command : TakerCommand ::SendOrder {
order : current_order ,
self . current_pending_proposals . insert (
proposal . order_id ,
(
UpdateCfdProposal ::RollOverProposal {
proposal ,
direction : SettlementKind ::Incoming ,
} ,
} )
. await ? ;
taker_id ,
) ,
) ;
self . send_pending_proposals ( ) ? ;
Ok ( ( ) )
}
@ -309,43 +260,27 @@ impl Actor {
Ok ( ( ) )
}
async fn handle_propose_roll_over (
& mut self ,
proposal : RollOverProposal ,
taker_id : TakerId ,
) -> Result < ( ) > {
tracing ::info ! (
"Received proposal from the taker {}: {:?} to roll over order {}" ,
taker_id ,
proposal ,
proposal . order_id
) ;
// check if CFD is in open state, otherwise we should not proceed
async fn handle_monitoring_event ( & mut self , event : monitor ::Event ) -> Result < ( ) > {
let mut conn = self . db . acquire ( ) . await ? ;
let cfd = load_cfd_by_order_id ( proposal . order_id , & mut conn ) . await ? ;
match cfd {
Cfd {
state : CfdState ::Open { . . } ,
. .
} = > ( ) ,
_ = > {
anyhow ::bail ! ( "Order is in invalid state. Cannot propose roll over." )
}
} ;
self . current_pending_proposals . insert (
proposal . order_id ,
(
UpdateCfdProposal ::RollOverProposal {
proposal ,
direction : SettlementKind ::Incoming ,
} ,
taker_id ,
) ,
) ;
self . send_pending_proposals ( ) ? ;
cfd_actors ::handle_monitoring_event (
event ,
& mut conn ,
& self . wallet ,
& self . cfd_feed_actor_inbox ,
)
. await ? ;
Ok ( ( ) )
}
async fn handle_oracle_attestation ( & mut self , attestation : oracle ::Attestation ) -> Result < ( ) > {
let mut conn = self . db . acquire ( ) . await ? ;
cfd_actors ::handle_oracle_attestation (
attestation ,
& mut conn ,
& self . wallet ,
& self . cfd_feed_actor_inbox ,
)
. await ? ;
Ok ( ( ) )
}
@ -387,78 +322,146 @@ impl Actor {
Ok ( ( ) )
}
async fn handle_cfd_setup_completed (
& mut self ,
order_id : OrderId ,
dlc : Result < Dlc > ,
) -> Result < ( ) > {
self . setup_state = SetupState ::None ;
/// Send pending proposals for the purposes of UI updates.
/// Filters out the TakerIds, as they are an implementation detail inside of
/// the actor
fn send_pending_proposals ( & self ) -> Result < ( ) > {
Ok ( self . update_cfd_feed_sender . send (
self . current_pending_proposals
. iter ( )
. map ( | ( order_id , ( update_cfd , _ ) ) | ( * order_id , ( update_cfd . clone ( ) ) ) )
. collect ( ) ,
) ? )
}
let dlc = dlc . context ( "Failed to setup contract with taker" ) ? ;
/// Removes a proposal and updates the update cfd proposals' feed
fn remove_pending_proposal ( & mut self , order_id : & OrderId ) -> Result < ( ) > {
if self . current_pending_proposals . remove ( order_id ) . is_none ( ) {
anyhow ::bail ! ( "Could not find proposal with order id: {}" , & order_id )
}
self . send_pending_proposals ( ) ? ;
Ok ( ( ) )
}
let mut conn = self . db . acquire ( ) . await ? ;
fn get_taker_id_of_proposal ( & self , order_id : & OrderId ) -> Result < TakerId > {
let ( _ , taker_id ) = self
. current_pending_proposals
. get ( order_id )
. context ( "Could not find proposal for given order id" ) ? ;
Ok ( * taker_id )
}
let mut cfd = load_cfd_by_order_id ( order_id , & mut conn ) . await ? ;
cfd . state = CfdState ::PendingOpen {
common : CfdStateCommon ::default ( ) ,
dlc : dlc . clone ( ) ,
attestation : None ,
fn get_settlement_proposal ( & self , order_id : OrderId ) -> Result < ( SettlementProposal , TakerId ) > {
let ( update_proposal , taker_id ) = self
. current_pending_proposals
. get ( & order_id )
. context ( "have a proposal that is about to be accepted" ) ? ;
let proposal = match update_proposal {
UpdateCfdProposal ::Settlement { proposal , . . } = > proposal ,
UpdateCfdProposal ::RollOverProposal { . . } = > {
anyhow ::bail ! ( "did not expect a rollover proposal" ) ;
}
} ;
Ok ( ( proposal . clone ( ) , * taker_id ) )
}
}
append_cfd_state ( & cfd , & mut conn , & self . cfd_feed_actor_inbox ) . await ? ;
impl < O , M , T > Actor < O , M , T >
where
T : xtra ::Handler < maker_inc_connections ::TakerMessage > ,
{
async fn handle_new_taker_online ( & mut self , taker_id : TakerId ) -> Result < ( ) > {
let mut conn = self . db . acquire ( ) . await ? ;
let txid = self
. wallet
. try_broadcast_transaction ( dlc . lock . 0. clone ( ) )
let current_order = match self . current_order_id {
Some ( current_order_id ) = > Some ( load_order_by_id ( current_order_id , & mut conn ) . await ? ) ,
None = > None ,
} ;
self . takers
. do_send_async ( maker_inc_connections ::TakerMessage {
taker_id ,
command : TakerCommand ::SendOrder {
order : current_order ,
} ,
} )
. await ? ;
tracing ::info ! ( "Lock transaction published with txid {}" , txid ) ;
Ok ( ( ) )
}
self . monitor_actor
. do_send_async ( monitor ::StartMonitoring {
id : order_id ,
params : MonitorParams ::from_dlc_and_timelocks ( dlc , cfd . refund_timelock_in_blocks ( ) ) ,
async fn handle_accept_settlement ( & mut self , order_id : OrderId ) -> Result < ( ) > {
tracing ::debug ! ( % order_id , "Maker accepts a settlement proposal" ) ;
let taker_id = self . get_taker_id_of_proposal ( & order_id ) ? ;
self . takers
. do_send_async ( maker_inc_connections ::TakerMessage {
taker_id ,
command : TakerCommand ::NotifySettlementAccepted { id : order_id } ,
} )
. await ? ;
self . oracle_actor
. do_send_async ( oracle ::MonitorAttestation {
event_id : cfd . order . oracle_event_id ,
self . current_agreed_proposals
. insert ( order_id , self . get_settlement_proposal ( order_id ) ? ) ;
self . remove_pending_proposal ( & order_id )
. context ( "accepted settlement" ) ? ;
Ok ( ( ) )
}
async fn handle_reject_settlement ( & mut self , order_id : OrderId ) -> Result < ( ) > {
tracing ::debug ! ( % order_id , "Maker rejects a settlement proposal" ) ;
let taker_id = self . get_taker_id_of_proposal ( & order_id ) ? ;
self . takers
. do_send_async ( maker_inc_connections ::TakerMessage {
taker_id ,
command : TakerCommand ::NotifySettlementRejected { id : order_id } ,
} )
. await ? ;
self . remove_pending_proposal ( & order_id )
. context ( "rejected settlement" ) ? ;
Ok ( ( ) )
}
async fn handle_cfd_roll_over_completed (
& mut self ,
order_id : OrderId ,
dlc : Result < Dlc > ,
) -> Result < ( ) > {
let dlc = dlc . context ( "Failed to roll over contract with taker" ) ? ;
self . roll_over_state = RollOverState ::None ;
async fn handle_reject_roll_over ( & mut self , order_id : OrderId ) -> Result < ( ) > {
tracing ::debug ! ( % order_id , "Maker rejects a roll_over proposal" ) ;
let mut conn = self . db . acquire ( ) . await ? ;
let mut cfd = load_cfd_by_order_id ( order_id , & mut conn ) . await ? ;
cfd . state = CfdState ::Open {
common : CfdStateCommon ::default ( ) ,
dlc : dlc . clone ( ) ,
attestation : None ,
collaborative_close : None ,
// Validate if order is actually being requested to be extended
let ( _ , taker_id ) = match self . current_pending_proposals . get ( & order_id ) {
Some ( (
UpdateCfdProposal ::RollOverProposal {
proposal ,
direction : SettlementKind ::Incoming ,
} ,
taker_id ,
) ) = > ( proposal , * taker_id ) ,
_ = > {
anyhow ::bail ! ( "Order is in invalid state. Ignoring reject roll over request." )
}
} ;
append_cfd_state ( & cfd , & mut conn , & self . cfd_feed_actor_inbox ) . await ? ;
self . monitor_actor
. do_send_async ( monitor ::StartMonitoring {
id : order_id ,
params : MonitorParams ::from_dlc_and_timelocks ( dlc , cfd . refund_timelock_in_blocks ( ) ) ,
self . takers
. do_send_async ( maker_inc_connections ::TakerMessage {
taker_id ,
command : TakerCommand ::NotifyRollOverRejected { id : order_id } ,
} )
. await ? ;
self . remove_pending_proposal ( & order_id )
. context ( "rejected roll_over" ) ? ;
Ok ( ( ) )
}
}
impl < O , M , T > Actor < O , M , T >
where
T : xtra ::Handler < maker_inc_connections ::TakerMessage >
+ xtra ::Handler < maker_inc_connections ::BroadcastOrder > ,
{
async fn handle_take_order (
& mut self ,
taker_id : TakerId ,
@ -492,7 +495,7 @@ impl Actor {
}
} ;
// 2. Create a new CFD
// 2. Insert CFD in DB
let cfd = Cfd ::new (
current_order . clone ( ) ,
quantity ,
@ -515,6 +518,7 @@ impl Actor {
) ;
self . reject_order ( taker_id , cfd , conn ) . await ? ;
return Ok ( ( ) ) ;
}
@ -528,6 +532,66 @@ impl Actor {
Ok ( ( ) )
}
async fn handle_reject_order ( & mut self , order_id : OrderId ) -> Result < ( ) > {
tracing ::debug ! ( % order_id , "Maker rejects an order" ) ;
let mut conn = self . db . acquire ( ) . await ? ;
let cfd = load_cfd_by_order_id ( order_id , & mut conn ) . await ? ;
let taker_id = match cfd {
Cfd {
state : CfdState ::IncomingOrderRequest { taker_id , . . } ,
. .
} = > taker_id ,
_ = > {
anyhow ::bail ! ( "Order is in invalid state. Ignoring trying to accept it." )
}
} ;
self . reject_order ( taker_id , cfd , conn ) . await ? ;
Ok ( ( ) )
}
/// Reject an order
///
/// Rejection includes removing the order and saving in the db that it was rejected.
/// In the current model it is essential to remove the order because a taker
/// that received a rejection cannot communicate with the maker until a new order is published.
async fn reject_order (
& mut self ,
taker_id : TakerId ,
mut cfd : Cfd ,
mut conn : PoolConnection < Sqlite > ,
) -> Result < ( ) > {
cfd . state = CfdState ::rejected ( ) ;
append_cfd_state ( & cfd , & mut conn , & self . cfd_feed_actor_inbox ) . await ? ;
self . takers
. do_send_async ( maker_inc_connections ::TakerMessage {
taker_id ,
command : TakerCommand ::NotifyOrderRejected { id : cfd . order . id } ,
} )
. await ? ;
// Remove order for all
self . current_order_id = None ;
self . takers
. do_send_async ( maker_inc_connections ::BroadcastOrder ( None ) )
. await ? ;
self . order_feed_sender . send ( None ) ? ;
Ok ( ( ) )
}
}
impl < O , M , T > Actor < O , M , T >
where
Self : xtra ::Handler < CfdSetupCompleted > ,
O : xtra ::Handler < oracle ::GetAnnouncement > ,
T : xtra ::Handler < maker_inc_connections ::TakerMessage > ,
{
async fn handle_accept_order (
& mut self ,
order_id : OrderId ,
@ -612,108 +676,107 @@ impl Actor {
Ok ( ( ) )
}
}
async fn handle_reject_order ( & mut self , order_id : OrderId ) -> Result < ( ) > {
tracing ::debug ! ( % order_id , "Maker rejects an order" ) ;
impl < O , M , T > Actor < O , M , T >
where
O : xtra ::Handler < oracle ::FetchAnnouncement > ,
T : xtra ::Handler < maker_inc_connections ::BroadcastOrder > ,
{
async fn handle_new_order (
& mut self ,
price : Usd ,
min_quantity : Usd ,
max_quantity : Usd ,
) -> Result < ( ) > {
let oracle_event_id =
oracle ::next_announcement_after ( time ::OffsetDateTime ::now_utc ( ) + Order ::TERM ) ? ;
self . oracle_actor
. do_send_async ( oracle ::FetchAnnouncement ( oracle_event_id ) )
. await ? ;
let order = Order ::new (
price ,
min_quantity ,
max_quantity ,
Origin ::Ours ,
oracle_event_id ,
) ? ;
// 1. Save to DB
let mut conn = self . db . acquire ( ) . await ? ;
let cfd = load_cfd_by_order_id ( order_id , & mut conn ) . await ? ;
insert_order ( & order , & mut conn ) . await ? ;
let taker_id = match cfd {
Cfd {
state : CfdState ::IncomingOrderRequest { taker_id , . . } ,
. .
} = > taker_id ,
_ = > {
anyhow ::bail ! ( "Order is in invalid state. Ignoring trying to accept it." )
}
} ;
// 2. Update actor state to current order
self . current_order_id . replace ( order . id ) ;
self . reject_order ( taker_id , cfd , conn ) . await ? ;
// 3. Notify UI via feed
self . order_feed_sender . send ( Some ( order . clone ( ) ) ) ? ;
// 4. Inform connected takers
self . takers
. do_send_async ( maker_inc_connections ::BroadcastOrder ( Some ( order ) ) )
. await ? ;
Ok ( ( ) )
}
}
/// Reject an order
///
/// Rejection includes removing the order and saving in the db that it was rejected.
/// In the current model it is essential to remove the order because a taker
/// that received a rejection cannot communicate with the maker until a new order is published.
async fn reject_order (
impl < O , M , T > Actor < O , M , T >
where
O : xtra ::Handler < oracle ::MonitorAttestation > ,
M : xtra ::Handler < monitor ::StartMonitoring > ,
{
async fn handle_cfd_setup_completed (
& mut self ,
taker_id : TakerId ,
mut cfd : Cfd ,
mut conn : PoolConnection < Sqlite > ,
order_id : OrderId ,
dlc : Result < Dlc > ,
) -> Result < ( ) > {
// Update order in db
cfd . state = CfdState ::rejected ( ) ;
append_cfd_state ( & cfd , & mut conn , & self . cfd_feed_actor_inbox ) . await ? ;
self . takers
. do_send_async ( maker_inc_connections ::TakerMessage {
taker_id ,
command : TakerCommand ::NotifyOrderRejected { id : cfd . order . id } ,
} )
. await ? ;
// Remove order for all
self . current_order_id = None ;
self . takers
. do_send_async ( maker_inc_connections ::BroadcastOrder ( None ) )
. await ? ;
self . order_feed_sender . send ( None ) ? ;
self . setup_state = SetupState ::None ;
Ok ( ( ) )
}
let dlc = dlc . context ( "Failed to setup contract with taker" ) ? ;
async fn handle_commit ( & mut self , order_id : OrderId ) -> Result < ( ) > {
let mut conn = self . db . acquire ( ) . await ? ;
cfd_actors ::handle_commit (
order_id ,
& mut conn ,
& self . wallet ,
& self . cfd_feed_actor_inbox ,
)
. await ? ;
Ok ( ( ) )
}
async fn handle_accept_settlement ( & mut self , order_id : OrderId ) -> Result < ( ) > {
tracing ::debug ! ( % order_id , "Maker accepts a settlement proposal" ) ;
let mut cfd = load_cfd_by_order_id ( order_id , & mut conn ) . await ? ;
cfd . state = CfdState ::PendingOpen {
common : CfdStateCommon ::default ( ) ,
dlc : dlc . clone ( ) ,
attestation : None ,
} ;
let taker_id = self . get_taker_id_of_proposal ( & order_id ) ? ;
append_cfd_state ( & cfd , & mut conn , & self . cfd_feed_actor_inbox ) . await ? ;
self . takers
. do_send_async ( maker_inc_connections ::TakerMessage {
taker_id ,
command : TakerCommand ::NotifySettlementAccepted { id : order_id } ,
} )
let txid = self
. wallet
. try_broadcast_transaction ( dlc . lock . 0. clone ( ) )
. await ? ;
self . current_agreed_proposals
. insert ( order_id , self . get_settlement_proposal ( order_id ) ? ) ;
self . remove_pending_proposal ( & order_id )
. context ( "accepted settlement" ) ? ;
Ok ( ( ) )
}
async fn handle_reject_settlement ( & mut self , order_id : OrderId ) -> Result < ( ) > {
tracing ::debug ! ( % order_id , "Maker rejects a settlement proposal" ) ;
tracing ::info ! ( "Lock transaction published with txid {}" , txid ) ;
let taker_id = self . get_taker_id_of_proposal ( & order_id ) ? ;
self . monitor_actor
. do_send_async ( monitor ::StartMonitoring {
id : order_id ,
params : MonitorParams ::from_dlc_and_timelocks ( dlc , cfd . refund_timelock_in_blocks ( ) ) ,
} )
. await ? ;
self . takers
. do_send_async ( maker_inc_connections ::TakerMessage {
taker_id ,
command : TakerCommand ::NotifySettlementRejected { id : order_id } ,
self . oracle_actor
. do_send_async ( oracle ::MonitorAttestation {
event_id : cfd . order . oracle_event_id ,
} )
. await ? ;
self . remove_pending_proposal ( & order_id )
. context ( "rejected settlement" ) ? ;
Ok ( ( ) )
}
}
impl < O , M , T > Actor < O , M , T >
where
Self : xtra ::Handler < CfdRollOverCompleted > ,
O : xtra ::Handler < oracle ::MonitorAttestation > + xtra ::Handler < oracle ::GetAnnouncement > ,
T : xtra ::Handler < maker_inc_connections ::TakerMessage > ,
{
async fn handle_accept_roll_over (
& mut self ,
order_id : OrderId ,
@ -801,63 +864,49 @@ impl Actor {
. context ( "accepted roll_over" ) ? ;
Ok ( ( ) )
}
}
async fn handle_reject_roll_over ( & mut self , order_id : OrderId ) -> Result < ( ) > {
tracing ::debug ! ( % order_id , "Maker rejects a roll_over proposal" ) ;
impl < O , M , T > Actor < O , M , T >
where
M : xtra ::Handler < monitor ::StartMonitoring > ,
{
async fn handle_cfd_roll_over_completed (
& mut self ,
order_id : OrderId ,
dlc : Result < Dlc > ,
) -> Result < ( ) > {
let dlc = dlc . context ( "Failed to roll over contract with taker" ) ? ;
self . roll_over_state = RollOverState ::None ;
// Validate if order is actually being requested to be extended
let ( _ , taker_id ) = match self . current_pending_proposals . get ( & order_id ) {
Some ( (
UpdateCfdProposal ::RollOverProposal {
proposal ,
direction : SettlementKind ::Incoming ,
} ,
taker_id ,
) ) = > ( proposal , * taker_id ) ,
_ = > {
anyhow ::bail ! ( "Order is in invalid state. Ignoring reject roll over request." )
}
let mut conn = self . db . acquire ( ) . await ? ;
let mut cfd = load_cfd_by_order_id ( order_id , & mut conn ) . await ? ;
cfd . state = CfdState ::Open {
common : CfdStateCommon ::default ( ) ,
dlc : dlc . clone ( ) ,
attestation : None ,
collaborative_close : None ,
} ;
append_cfd_state ( & cfd , & mut conn , & self . cfd_feed_actor_inbox ) . await ? ;
self . takers
. do_send_async ( maker_inc_connections ::TakerMessage {
tak er_id,
command : TakerCommand ::NotifyRollOverRejected { id : order_id } ,
self . monitor_actor
. do_send_async ( monitor ::StartMonitoring {
id : ord er_id,
params : MonitorParams ::from_dlc_and_timelocks ( dlc , cfd . refund_timelock_in_blocks ( ) ) ,
} )
. await ? ;
self . remove_pending_proposal ( & order_id )
. context ( "rejected roll_over" ) ? ;
Ok ( ( ) )
}
async fn handle_monitoring_event ( & mut self , event : monitor ::Event ) -> Result < ( ) > {
let mut conn = self . db . acquire ( ) . await ? ;
cfd_actors ::handle_monitoring_event (
event ,
& mut conn ,
& self . wallet ,
& self . cfd_feed_actor_inbox ,
)
. await ? ;
Ok ( ( ) )
}
async fn handle_oracle_attestation ( & mut self , attestation : oracle ::Attestation ) -> Result < ( ) > {
let mut conn = self . db . acquire ( ) . await ? ;
cfd_actors ::handle_oracle_attestation (
attestation ,
& mut conn ,
& self . wallet ,
& self . cfd_feed_actor_inbox ,
)
. await ? ;
Ok ( ( ) )
}
}
#[ async_trait ]
impl Handler < CfdAction > for Actor {
impl < O : 'static , M : 'static , T : 'static > Handler < CfdAction > for Actor < O , M , T >
where
Self : xtra ::Handler < CfdSetupCompleted > + xtra ::Handler < CfdRollOverCompleted > ,
O : xtra ::Handler < oracle ::MonitorAttestation > + xtra ::Handler < oracle ::GetAnnouncement > ,
T : xtra ::Handler < maker_inc_connections ::TakerMessage >
+ xtra ::Handler < maker_inc_connections ::BroadcastOrder > ,
{
async fn handle ( & mut self , msg : CfdAction , ctx : & mut Context < Self > ) {
use CfdAction ::* ;
if let Err ( e ) = match msg {
@ -875,42 +924,60 @@ impl Handler<CfdAction> for Actor {
}
#[ async_trait ]
impl Handler < NewOrder > for Actor {
impl < O : 'static , M : 'static , T : 'static > Handler < NewOrder > for Actor < O , M , T >
where
O : xtra ::Handler < oracle ::FetchAnnouncement > ,
T : xtra ::Handler < maker_inc_connections ::BroadcastOrder > ,
{
async fn handle ( & mut self , msg : NewOrder , _ctx : & mut Context < Self > ) {
log_error ! ( self . handle_new_order ( msg . price , msg . min_quantity , msg . max_quantity ) ) ;
}
}
#[ async_trait ]
impl Handler < NewTakerOnline > for Actor {
impl < O : 'static , M : 'static , T : 'static > Handler < NewTakerOnline > for Actor < O , M , T >
where
T : xtra ::Handler < maker_inc_connections ::TakerMessage > ,
{
async fn handle ( & mut self , msg : NewTakerOnline , _ctx : & mut Context < Self > ) {
log_error ! ( self . handle_new_taker_online ( msg . id ) ) ;
}
}
#[ async_trait ]
impl Handler < CfdSetupCompleted > for Actor {
impl < O : 'static , M : 'static , T : 'static > Handler < CfdSetupCompleted > for Actor < O , M , T >
where
O : xtra ::Handler < oracle ::MonitorAttestation > ,
M : xtra ::Handler < monitor ::StartMonitoring > ,
{
async fn handle ( & mut self , msg : CfdSetupCompleted , _ctx : & mut Context < Self > ) {
log_error ! ( self . handle_cfd_setup_completed ( msg . order_id , msg . dlc ) ) ;
}
}
#[ async_trait ]
impl Handler < CfdRollOverCompleted > for Actor {
impl < O : 'static , M : 'static , T : 'static > Handler < CfdRollOverCompleted > for Actor < O , M , T >
where
M : xtra ::Handler < monitor ::StartMonitoring > ,
{
async fn handle ( & mut self , msg : CfdRollOverCompleted , _ctx : & mut Context < Self > ) {
log_error ! ( self . handle_cfd_roll_over_completed ( msg . order_id , msg . dlc ) ) ;
}
}
#[ async_trait ]
impl Handler < monitor ::Event > for Actor {
impl < O : 'static , M : 'static , T : 'static > Handler < monitor ::Event > for Actor < O , M , T > {
async fn handle ( & mut self , msg : monitor ::Event , _ctx : & mut Context < Self > ) {
log_error ! ( self . handle_monitoring_event ( msg ) )
}
}
#[ async_trait ]
impl Handler < FromTaker > for Actor {
impl < O : 'static , M : 'static , T : 'static > Handler < FromTaker > for Actor < O , M , T >
where
T : xtra ::Handler < maker_inc_connections ::BroadcastOrder >
+ xtra ::Handler < maker_inc_connections ::TakerMessage > ,
{
async fn handle ( & mut self , FromTaker { taker_id , msg } : FromTaker , _ctx : & mut Context < Self > ) {
match msg {
wire ::TakerToMaker ::TakeOrder { order_id , quantity } = > {
@ -964,7 +1031,7 @@ impl Handler<FromTaker> for Actor {
}
#[ async_trait ]
impl Handler < oracle ::Attestation > for Actor {
impl < O : 'static , M : 'static , T : 'static > Handler < oracle ::Attestation > for Actor < O , M , T > {
async fn handle ( & mut self , msg : oracle ::Attestation , _ctx : & mut Context < Self > ) {
log_error ! ( self . handle_oracle_attestation ( msg ) )
}
@ -994,4 +1061,4 @@ impl Message for FromTaker {
type Result = ( ) ;
}
impl xtra ::Actor for Actor { }
impl < O : 'static , M : 'static , T : 'static > xtra ::Actor for Actor < O , M , T > { }