@ -1,22 +1,31 @@
#![ cfg_attr(not(test), warn(clippy::unwrap_used)) ]
#![ warn(clippy::disallowed_method) ]
use crate ::bitcoin ::Txid ;
use crate ::maker_cfd ::FromTaker ;
use crate ::maker_cfd ::TakerConnected ;
use crate ::model ::cfd ::Cfd ;
use crate ::model ::cfd ::Order ;
use crate ::model ::cfd ::OrderId ;
use crate ::model ::cfd ::UpdateCfdProposals ;
use crate ::model ::Identity ;
use crate ::model ::Price ;
use crate ::model ::Usd ;
use crate ::oracle ::Attestation ;
use crate ::tokio_ext ::FutureExt ;
use address_map ::Stopping ;
use anyhow ::Result ;
use bdk ::bitcoin ;
use bdk ::bitcoin ::Amount ;
use bdk ::FeeRate ;
use connection ::ConnectionStatus ;
use futures ::future ::RemoteHandle ;
use maia ::secp256k1_zkp ::schnorrsig ;
use maker_cfd ::TakerDisconnected ;
use sqlx ::SqlitePool ;
use std ::future ::Future ;
use std ::task ::Poll ;
use std ::time ::Duration ;
use tokio ::net ::TcpListener ;
use tokio ::sync ::watch ;
use xtra ::message_channel ::MessageChannel ;
use xtra ::message_channel ::StrongMessageChannel ;
@ -106,8 +115,9 @@ impl Tasks {
pub struct MakerActorSystem < O , M , T , W > {
pub cfd_actor_addr : Address < maker_cfd ::Actor < O , M , T , W > > ,
pub inc_conn_addr : Address < T > ,
pub tasks : Tasks ,
wallet_actor_addr : Address < W > ,
inc_conn_addr : Address < T > ,
_tasks : Tasks ,
}
impl < O , M , T , W > MakerActorSystem < O , M , T , W >
@ -126,10 +136,12 @@ where
+ xtra ::Handler < maker_inc_connections ::settlement ::Response >
+ xtra ::Handler < Stopping < collab_settlement_maker ::Actor > >
+ xtra ::Handler < Stopping < rollover_maker ::Actor > >
+ xtra ::Handler < maker_cfd ::RollOverProposed > ,
+ xtra ::Handler < maker_cfd ::RollOverProposed >
+ xtra ::Handler < maker_inc_connections ::ListenerMessage > ,
W : xtra ::Handler < wallet ::BuildPartyParams >
+ xtra ::Handler < wallet ::Sign >
+ xtra ::Handler < wallet ::TryBroadcastTransaction > ,
+ xtra ::Handler < wallet ::TryBroadcastTransaction >
+ xtra ::Handler < wallet ::Withdraw > ,
{
#[ allow(clippy::too_many_arguments) ]
pub async fn new < FO , FM > (
@ -159,7 +171,7 @@ where
let ( cfd_actor_addr , cfd_actor_fut ) = maker_cfd ::Actor ::new (
db ,
wallet_addr ,
wallet_addr . clone ( ) ,
settlement_interval ,
oracle_pk ,
projection_actor ,
@ -195,17 +207,117 @@ where
Ok ( Self {
cfd_actor_addr ,
wallet_actor_addr : wallet_addr ,
inc_conn_addr ,
tasks ,
_tasks : tasks ,
} )
}
pub fn listen_on ( & mut self , listener : TcpListener ) {
let listener_stream = futures ::stream ::poll_fn ( move | ctx | {
let message = match futures ::ready ! ( listener . poll_accept ( ctx ) ) {
Ok ( ( stream , address ) ) = > {
maker_inc_connections ::ListenerMessage ::NewConnection { stream , address }
}
Err ( e ) = > maker_inc_connections ::ListenerMessage ::Error { source : e } ,
} ;
Poll ::Ready ( Some ( message ) )
} ) ;
self . _tasks
. add ( self . inc_conn_addr . clone ( ) . attach_stream ( listener_stream ) ) ;
}
pub async fn new_order (
& self ,
price : Price ,
min_quantity : Usd ,
max_quantity : Usd ,
fee_rate : Option < u32 > ,
) -> Result < ( ) > {
self . cfd_actor_addr
. send ( maker_cfd ::NewOrder {
price ,
min_quantity ,
max_quantity ,
fee_rate : fee_rate . unwrap_or ( 1 ) ,
} )
. await ? ? ;
Ok ( ( ) )
}
pub async fn accept_order ( & self , order_id : OrderId ) -> Result < ( ) > {
self . cfd_actor_addr
. send ( maker_cfd ::AcceptOrder { order_id } )
. await ? ? ;
Ok ( ( ) )
}
pub async fn reject_order ( & self , order_id : OrderId ) -> Result < ( ) > {
self . cfd_actor_addr
. send ( maker_cfd ::RejectOrder { order_id } )
. await ? ? ;
Ok ( ( ) )
}
pub async fn accept_settlement ( & self , order_id : OrderId ) -> Result < ( ) > {
self . cfd_actor_addr
. send ( maker_cfd ::AcceptSettlement { order_id } )
. await ? ? ;
Ok ( ( ) )
}
pub async fn reject_settlement ( & self , order_id : OrderId ) -> Result < ( ) > {
self . cfd_actor_addr
. send ( maker_cfd ::RejectSettlement { order_id } )
. await ? ? ;
Ok ( ( ) )
}
pub async fn accept_rollover ( & self , order_id : OrderId ) -> Result < ( ) > {
self . cfd_actor_addr
. send ( maker_cfd ::AcceptRollOver { order_id } )
. await ? ? ;
Ok ( ( ) )
}
pub async fn reject_rollover ( & self , order_id : OrderId ) -> Result < ( ) > {
self . cfd_actor_addr
. send ( maker_cfd ::RejectRollOver { order_id } )
. await ? ? ;
Ok ( ( ) )
}
pub async fn commit ( & self , order_id : OrderId ) -> Result < ( ) > {
self . cfd_actor_addr
. send ( maker_cfd ::Commit { order_id } )
. await ? ? ;
Ok ( ( ) )
}
pub async fn withdraw (
& self ,
amount : Option < Amount > ,
address : bitcoin ::Address ,
fee : f32 ,
) -> Result < Txid > {
self . wallet_actor_addr
. send ( wallet ::Withdraw {
amount ,
address ,
fee : Some ( bdk ::FeeRate ::from_sat_per_vb ( fee ) ) ,
} )
. await ?
}
}
pub struct TakerActorSystem < O , M , W > {
pub cfd_actor_addr : Address < taker_cfd ::Actor < O , M , W > > ,
pub connection_actor_addr : Address < connection ::Actor > ,
pub maker_online_status_feed_receiver : watch ::Receiver < ConnectionStatus > ,
pub tasks : Tasks ,
wallet_actor_addr : Address < W > ,
_tasks : Tasks ,
}
impl < O , M , W > TakerActorSystem < O , M , W >
@ -219,12 +331,13 @@ where
+ xtra ::Handler < oracle ::Attestation > ,
W : xtra ::Handler < wallet ::BuildPartyParams >
+ xtra ::Handler < wallet ::Sign >
+ xtra ::Handler < wallet ::TryBroadcastTransaction > ,
+ xtra ::Handler < wallet ::TryBroadcastTransaction >
+ xtra ::Handler < wallet ::Withdraw > ,
{
#[ allow(clippy::too_many_arguments) ]
pub async fn new < FM , FO > (
db : SqlitePool ,
wallet_addr : Address < W > ,
wallet_actor_a ddr : Address < W > ,
oracle_pk : schnorrsig ::PublicKey ,
identity_sk : x25519_dalek ::StaticSecret ,
oracle_constructor : impl FnOnce ( Box < dyn StrongMessageChannel < Attestation > > ) -> FO ,
@ -250,7 +363,7 @@ where
let ( connection_actor_addr , connection_actor_ctx ) = xtra ::Context ::new ( None ) ;
let ( cfd_actor_addr , cfd_actor_fut ) = taker_cfd ::Actor ::new (
db . clone ( ) ,
wallet_addr ,
wallet_actor_a ddr . clone ( ) ,
oracle_pk ,
projection_actor . clone ( ) ,
connection_actor_addr . clone ( ) ,
@ -302,7 +415,45 @@ where
cfd_actor_addr ,
connection_actor_addr ,
maker_online_status_feed_receiver ,
tasks ,
wallet_actor_addr ,
_tasks : tasks ,
} )
}
pub async fn take_offer ( & self , order_id : OrderId , quantity : Usd ) -> Result < ( ) > {
self . cfd_actor_addr
. send ( taker_cfd ::TakeOffer { order_id , quantity } )
. await ? ? ;
Ok ( ( ) )
}
pub async fn commit ( & self , order_id : OrderId ) -> Result < ( ) > {
self . cfd_actor_addr
. send ( taker_cfd ::Commit { order_id } )
. await ?
}
pub async fn propose_settlement ( & self , order_id : OrderId , current_price : Price ) -> Result < ( ) > {
self . cfd_actor_addr
. send ( taker_cfd ::ProposeSettlement {
order_id ,
current_price ,
} )
. await ?
}
pub async fn withdraw (
& self ,
amount : Option < Amount > ,
address : bitcoin ::Address ,
fee_rate : FeeRate ,
) -> Result < Txid > {
self . wallet_actor_addr
. send ( wallet ::Withdraw {
amount ,
address ,
fee : Some ( fee_rate ) ,
} )
. await ?
}
}