diff --git a/daemon/src/maker_cfd_actor.rs b/daemon/src/maker_cfd_actor.rs index b94c118..f36ef62 100644 --- a/daemon/src/maker_cfd_actor.rs +++ b/daemon/src/maker_cfd_actor.rs @@ -17,64 +17,32 @@ use xtra::prelude::*; pub struct Initialized(pub Address); -impl Message for Initialized { - type Result = Result<()>; -} - pub struct TakeOrder { pub taker_id: TakerId, pub order_id: OrderId, pub quantity: Usd, } -impl Message for TakeOrder { - type Result = Result<()>; -} - pub struct NewOrder(pub Order); -impl Message for NewOrder { - type Result = Result<()>; -} - pub struct StartContractSetup { pub taker_id: TakerId, pub order_id: OrderId, } -impl Message for StartContractSetup { - type Result = Result<()>; -} - pub struct NewTakerOnline { pub id: TakerId, } -impl Message for NewTakerOnline { - type Result = Result<()>; -} - pub struct IncProtocolMsg(pub SetupMsg); -impl Message for IncProtocolMsg { - type Result = Result<()>; -} - pub struct CfdSetupCompleted { pub order_id: OrderId, pub dlc: Dlc, } -impl Message for CfdSetupCompleted { - type Result = Result<()>; -} - pub struct SyncWallet; -impl Message for SyncWallet { - type Result = Result<()>; -} - pub struct MakerCfdActor { db: sqlx::SqlitePool, wallet: Wallet, @@ -90,8 +58,6 @@ pub struct MakerCfdActor { contract_setup_message_buffer: Vec, } -impl xtra::Actor for MakerCfdActor {} - impl MakerCfdActor { pub async fn new( db: sqlx::SqlitePool, @@ -195,7 +161,7 @@ impl MakerCfdActor { self.current_contract_setup.replace(inbox.clone()); for msg in self.contract_setup_message_buffer.drain(..) { - inbox.send(msg).unwrap(); + inbox.send(msg)?; } // TODO: Should we do this here or already earlier or after the spawn? @@ -266,19 +232,38 @@ impl MakerCfdActor { self.wallet_feed_sender.send(wallet_info)?; Ok(()) } -} -#[async_trait] -impl Handler for MakerCfdActor { - async fn handle(&mut self, msg: Initialized, _ctx: &mut Context) -> Result<()> { - self.takers.replace(msg.0); + async fn handle_cfd_setup_completed(&mut self, msg: CfdSetupCompleted) -> Result<()> { + let mut conn = self.db.acquire().await?; + + self.current_contract_setup = None; + self.contract_setup_message_buffer = vec![]; + + insert_new_cfd_state_by_order_id( + msg.order_id, + CfdState::PendingOpen { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc: msg.dlc.clone(), + }, + &mut conn, + ) + .await?; + + self.cfd_feed_actor_inbox + .send(load_all_cfds(&mut conn).await?)?; + + let txid = self.wallet.try_broadcast_transaction(msg.dlc.lock).await?; + + tracing::info!("Lock transaction published with txid {}", txid); + + // TODO: tx monitoring, once confirmed with x blocks transition the Cfd to + // Open Ok(()) } -} -#[async_trait] -impl Handler for MakerCfdActor { - async fn handle(&mut self, msg: TakeOrder, _ctx: &mut Context) -> Result<()> { + async fn handle_take_order(&mut self, msg: TakeOrder) -> Result<()> { tracing::debug!(%msg.taker_id, %msg.quantity, %msg.order_id, "Taker wants to take an order" ); @@ -288,7 +273,7 @@ impl Handler for MakerCfdActor { // 1. Validate if order is still valid let current_order = match self.current_order_id { Some(current_order_id) if current_order_id == msg.order_id => { - load_order_by_id(current_order_id, &mut conn).await.unwrap() + load_order_by_id(current_order_id, &mut conn).await? } _ => { self.takers()? @@ -335,6 +320,13 @@ impl Handler for MakerCfdActor { } } +#[async_trait] +impl Handler for MakerCfdActor { + async fn handle(&mut self, msg: Initialized, _ctx: &mut Context) { + self.takers.replace(msg.0); + } +} + macro_rules! log_error { ($future:expr) => { if let Err(e) = $future.await { @@ -343,79 +335,85 @@ macro_rules! log_error { }; } +#[async_trait] +impl Handler for MakerCfdActor { + async fn handle(&mut self, msg: TakeOrder, _ctx: &mut Context) { + log_error!(self.handle_take_order(msg)) + } +} + #[async_trait] impl Handler for MakerCfdActor { - async fn handle(&mut self, msg: NewOrder, _ctx: &mut Context) -> Result<()> { + async fn handle(&mut self, msg: NewOrder, _ctx: &mut Context) { log_error!(self.handle_new_order(msg)); - Ok(()) } } #[async_trait] impl Handler for MakerCfdActor { - async fn handle(&mut self, msg: StartContractSetup, ctx: &mut Context) -> Result<()> { + async fn handle(&mut self, msg: StartContractSetup, ctx: &mut Context) { log_error!(self.handle_start_contract_setup(msg, ctx)); - Ok(()) } } #[async_trait] impl Handler for MakerCfdActor { - async fn handle(&mut self, msg: NewTakerOnline, _ctx: &mut Context) -> Result<()> { + async fn handle(&mut self, msg: NewTakerOnline, _ctx: &mut Context) { log_error!(self.handle_new_taker_online(msg)); - Ok(()) } } #[async_trait] impl Handler for MakerCfdActor { - async fn handle(&mut self, msg: IncProtocolMsg, _ctx: &mut Context) -> Result<()> { + async fn handle(&mut self, msg: IncProtocolMsg, _ctx: &mut Context) { log_error!(self.handle_inc_protocol_msg(msg)); - Ok(()) } } #[async_trait] impl Handler for MakerCfdActor { - async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context) -> Result<()> { - let mut conn = self.db.acquire().await?; + async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context) { + log_error!(self.handle_cfd_setup_completed(msg)); + } +} - self.current_contract_setup = None; - self.contract_setup_message_buffer = vec![]; +#[async_trait] +impl Handler for MakerCfdActor { + async fn handle(&mut self, _msg: SyncWallet, _ctx: &mut Context) { + log_error!(self.handle_sync_wallet()); + } +} - insert_new_cfd_state_by_order_id( - msg.order_id, - CfdState::PendingOpen { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc: msg.dlc.clone(), - }, - &mut conn, - ) - .await?; +impl Message for Initialized { + type Result = (); +} - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; +impl Message for TakeOrder { + type Result = (); +} - let txid = self - .wallet - .try_broadcast_transaction(msg.dlc.lock) - .await - .unwrap(); +impl Message for NewOrder { + type Result = (); +} - tracing::info!("Lock transaction published with txid {}", txid); +impl Message for StartContractSetup { + type Result = (); +} - // TODO: tx monitoring, once confirmed with x blocks transition the Cfd to - // Open - Ok(()) - } +impl Message for NewTakerOnline { + type Result = (); } -#[async_trait] -impl Handler for MakerCfdActor { - async fn handle(&mut self, _msg: SyncWallet, _ctx: &mut Context) -> Result<()> { - log_error!(self.handle_sync_wallet()); - Ok(()) - } +impl Message for IncProtocolMsg { + type Result = (); } + +impl Message for CfdSetupCompleted { + type Result = (); +} + +impl Message for SyncWallet { + type Result = (); +} + +impl xtra::Actor for MakerCfdActor {}