diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 19eb9bd..bdacf4a 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -11,7 +11,7 @@ use daemon::model::{TakerId, WalletInfo}; use daemon::seed::Seed; use daemon::tokio_ext::FutureExt; use daemon::{ - bitmex_price_feed, db, housekeeping, logger, maker_cfd, maker_inc_connections, monitor, oracle, + bitmex_price_feed, db, housekeeping, logger, maker_inc_connections, monitor, oracle, projection, wallet, wallet_sync, MakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, SETTLEMENT_INTERVAL, }; @@ -24,7 +24,6 @@ use std::str::FromStr; use std::task::Poll; use tokio::sync::watch; use tracing_subscriber::filter::LevelFilter; -use xtra::prelude::*; use xtra::Actor; mod routes_maker; @@ -305,17 +304,12 @@ async fn main() -> Result<()> { }); tasks.add(incoming_connection_addr.attach_stream(listener_stream)); - tasks.add(wallet_sync::new(wallet.clone(), wallet_feed_sender)); - let cfd_action_channel = MessageChannel::::clone_channel(&cfd_actor_addr); - let new_order_channel = MessageChannel::::clone_channel(&cfd_actor_addr); - rocket::custom(figment) .manage(order_feed_receiver) .manage(update_cfd_feed_receiver) - .manage(cfd_action_channel) - .manage(new_order_channel) + .manage(cfd_actor_addr) .manage(cfd_feed_receiver) .manage(connected_takers_feed_receiver) .manage(wallet_feed_receiver) diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 0e3a77f..0271584 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -25,17 +25,29 @@ use sqlx::Sqlite; use std::collections::{HashMap, HashSet}; use time::Duration; use xtra::prelude::*; +use xtra_productivity::xtra_productivity; -pub enum CfdAction { - AcceptOrder { order_id: OrderId }, - RejectOrder { order_id: OrderId }, - AcceptSettlement { order_id: OrderId }, - RejectSettlement { order_id: OrderId }, - AcceptRollOver { order_id: OrderId }, - RejectRollOver { order_id: OrderId }, - Commit { order_id: OrderId }, +pub struct AcceptOrder { + pub order_id: OrderId, +} +pub struct RejectOrder { + pub order_id: OrderId, +} +pub struct AcceptSettlement { + pub order_id: OrderId, +} +pub struct RejectSettlement { + pub order_id: OrderId, +} +pub struct AcceptRollOver { + pub order_id: OrderId, +} +pub struct RejectRollOver { + pub order_id: OrderId, +} +pub struct Commit { + pub order_id: OrderId, } - pub struct NewOrder { pub price: Price, pub min_quantity: Usd, @@ -65,7 +77,12 @@ pub struct FromTaker { pub msg: wire::TakerToMaker, } -pub struct Actor { +pub struct Actor< + O = oracle::Actor, + M = monitor::Actor, + T = maker_inc_connections::Actor, + W = wallet::Actor, +> { db: sqlx::SqlitePool, wallet: Address, settlement_interval: Duration, @@ -301,14 +318,6 @@ impl Actor where W: xtra::Handler, { - async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { - let mut conn = self.db.acquire().await?; - cfd_actors::handle_commit(order_id, &mut conn, &self.wallet, &self.projection_actor) - .await?; - - Ok(()) - } - async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { let mut conn = self.db.acquire().await?; cfd_actors::handle_monitoring_event(event, &mut conn, &self.wallet, &self.projection_actor) @@ -366,85 +375,20 @@ where Ok(()) } - async fn handle_accept_settlement(&mut self, order_id: OrderId) -> Result<()> { - tracing::debug!(%order_id, "Maker accepts a settlement proposal" ); - - let taker_id = self.get_taker_id_of_proposal(&order_id)?; - - match self - .takers - .send(maker_inc_connections::TakerMessage { - taker_id, - msg: wire::MakerToTaker::ConfirmSettlement(order_id), - }) - .await? - { - Ok(_) => { - self.current_agreed_proposals - .insert(order_id, self.get_settlement_proposal(order_id)?); - self.remove_pending_proposal(&order_id) - .await - .context("accepted settlement")?; - } - Err(e) => { - tracing::warn!("Failed to notify taker of accepted settlement: {}", e); - self.remove_pending_proposal(&order_id) - .await - .context("accepted settlement")?; - } - } - - Ok(()) - } - - async fn handle_reject_settlement(&mut self, order_id: OrderId) -> Result<()> { - tracing::debug!(%order_id, "Maker rejects a settlement proposal" ); - - let taker_id = self.get_taker_id_of_proposal(&order_id)?; - - // clean-up state ahead of sending to ensure consistency in case we fail to deliver the - // message - self.remove_pending_proposal(&order_id) - .await - .context("rejected settlement")?; - - self.takers - .send(maker_inc_connections::TakerMessage { - taker_id, - msg: wire::MakerToTaker::RejectSettlement(order_id), - }) - .await??; - - Ok(()) - } - - async fn handle_reject_roll_over(&mut self, order_id: OrderId) -> Result<()> { - tracing::debug!(%order_id, "Maker rejects a roll_over proposal" ); - - // Validate if order is actually being requested to be extended - let (_, taker_id) = match self.current_pending_proposals.get(&order_id) { - Some(( - UpdateCfdProposal::RollOverProposal { - proposal, - direction: SettlementKind::Incoming, - }, - taker_id, - )) => (proposal, *taker_id), - _ => { - anyhow::bail!("Order is in invalid state. Ignoring reject roll over request.") - } - }; + async fn reject_order( + &mut self, + taker_id: TakerId, + mut cfd: Cfd, + mut conn: PoolConnection, + ) -> Result<()> { + cfd.state = CfdState::rejected(); - // clean-up state ahead of sending to ensure consistency in case we fail to deliver the - // message - self.remove_pending_proposal(&order_id) - .await - .context("rejected roll_over")?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; self.takers .send(maker_inc_connections::TakerMessage { taker_id, - msg: wire::MakerToTaker::RejectRollOver(order_id), + msg: wire::MakerToTaker::RejectOrder(cfd.order.id), }) .await??; @@ -529,49 +473,9 @@ where Ok(()) } - - async fn handle_reject_order(&mut self, order_id: OrderId) -> Result<()> { - tracing::debug!(%order_id, "Maker rejects an order" ); - - let mut conn = self.db.acquire().await?; - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - - let taker_id = match cfd { - Cfd { - state: CfdState::IncomingOrderRequest { taker_id, .. }, - .. - } => taker_id, - _ => { - anyhow::bail!("Order is in invalid state. Ignoring trying to accept it.") - } - }; - - self.reject_order(taker_id, cfd, conn).await?; - - Ok(()) - } - - async fn reject_order( - &mut self, - taker_id: TakerId, - mut cfd: Cfd, - mut conn: PoolConnection, - ) -> Result<()> { - cfd.state = CfdState::rejected(); - - append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; - - self.takers - .send(maker_inc_connections::TakerMessage { - taker_id, - msg: wire::MakerToTaker::RejectOrder(cfd.order.id), - }) - .await??; - - Ok(()) - } } +#[xtra_productivity] impl Actor where Self: xtra::Handler, @@ -581,9 +485,11 @@ where { async fn handle_accept_order( &mut self, - order_id: OrderId, + msg: AcceptOrder, ctx: &mut Context, ) -> Result<()> { + let AcceptOrder { order_id } = msg; + if let SetupState::Active { .. } = self.setup_state { anyhow::bail!("Already setting up a contract!") } @@ -674,81 +580,141 @@ where } } +#[xtra_productivity] impl Actor where - O: xtra::Handler, - M: xtra::Handler, - W: xtra::Handler, + T: xtra::Handler, { - async fn handle_cfd_setup_completed( - &mut self, - order_id: OrderId, - dlc: Result, - ) -> Result<()> { - self.setup_state = SetupState::None; + async fn handle_reject_order(&mut self, msg: RejectOrder) -> Result<()> { + let RejectOrder { order_id } = msg; + + tracing::debug!(%order_id, "Maker rejects an order" ); let mut conn = self.db.acquire().await?; - let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - let dlc = match dlc { - Ok(dlc) => dlc, - Err(e) => { - cfd.state = CfdState::SetupFailed { - common: CfdStateCommon::default(), - info: e.to_string(), - }; + let taker_id = match cfd { + Cfd { + state: CfdState::IncomingOrderRequest { taker_id, .. }, + .. + } => taker_id, + _ => { + anyhow::bail!("Order is in invalid state. Ignoring trying to accept it.") + } + }; - append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; + self.reject_order(taker_id, cfd, conn).await?; - return Err(e); + Ok(()) + } + + async fn handle_accept_settlement(&mut self, msg: AcceptSettlement) -> Result<()> { + let AcceptSettlement { order_id } = msg; + + tracing::debug!(%order_id, "Maker accepts a settlement proposal" ); + + let taker_id = self.get_taker_id_of_proposal(&order_id)?; + + match self + .takers + .send(maker_inc_connections::TakerMessage { + taker_id, + msg: wire::MakerToTaker::ConfirmSettlement(order_id), + }) + .await? + { + Ok(_) => { + self.current_agreed_proposals + .insert(order_id, self.get_settlement_proposal(order_id)?); + self.remove_pending_proposal(&order_id) + .await + .context("accepted settlement")?; } - }; + Err(e) => { + tracing::warn!("Failed to notify taker of accepted settlement: {}", e); + self.remove_pending_proposal(&order_id) + .await + .context("accepted settlement")?; + } + } - cfd.state = CfdState::PendingOpen { - common: CfdStateCommon::default(), - dlc: dlc.clone(), - attestation: None, - }; + Ok(()) + } - append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; + async fn handle_reject_settlement(&mut self, msg: RejectSettlement) -> Result<()> { + let RejectSettlement { order_id } = msg; - let txid = self - .wallet - .send(wallet::TryBroadcastTransaction { - tx: dlc.lock.0.clone(), + tracing::debug!(%order_id, "Maker rejects a settlement proposal" ); + + let taker_id = self.get_taker_id_of_proposal(&order_id)?; + + // clean-up state ahead of sending to ensure consistency in case we fail to deliver the + // message + self.remove_pending_proposal(&order_id) + .await + .context("rejected settlement")?; + + self.takers + .send(maker_inc_connections::TakerMessage { + taker_id, + msg: wire::MakerToTaker::RejectSettlement(order_id), }) .await??; - tracing::info!("Lock transaction published with txid {}", txid); + Ok(()) + } - self.monitor_actor - .send(monitor::StartMonitoring { - id: order_id, - params: MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()), - }) - .await?; + async fn handle_reject_roll_over(&mut self, msg: RejectRollOver) -> Result<()> { + let RejectRollOver { order_id } = msg; - self.oracle_actor - .send(oracle::MonitorAttestation { - event_id: dlc.settlement_event_id, + tracing::debug!(%order_id, "Maker rejects a roll_over proposal" ); + + // Validate if order is actually being requested to be extended + let (_, taker_id) = match self.current_pending_proposals.get(&order_id) { + Some(( + UpdateCfdProposal::RollOverProposal { + proposal, + direction: SettlementKind::Incoming, + }, + taker_id, + )) => (proposal, *taker_id), + _ => { + anyhow::bail!("Order is in invalid state. Ignoring reject roll over request.") + } + }; + + // clean-up state ahead of sending to ensure consistency in case we fail to deliver the + // message + self.remove_pending_proposal(&order_id) + .await + .context("rejected roll_over")?; + + self.takers + .send(maker_inc_connections::TakerMessage { + taker_id, + msg: wire::MakerToTaker::RejectRollOver(order_id), }) - .await?; + .await??; Ok(()) } } +#[xtra_productivity] impl Actor where Self: xtra::Handler, O: xtra::Handler + xtra::Handler, T: xtra::Handler, + W: xtra::Handler + xtra::Handler, { async fn handle_accept_roll_over( &mut self, - order_id: OrderId, + msg: AcceptRollOver, ctx: &mut Context, ) -> Result<()> { + let AcceptRollOver { order_id } = msg; + tracing::debug!(%order_id, "Maker accepts a roll_over proposal" ); let mut conn = self.db.acquire().await?; @@ -836,6 +802,86 @@ where } } +#[xtra_productivity] +impl Actor +where + W: xtra::Handler, +{ + async fn handle_commit(&mut self, msg: Commit) -> Result<()> { + let Commit { order_id } = msg; + + let mut conn = self.db.acquire().await?; + cfd_actors::handle_commit(order_id, &mut conn, &self.wallet, &self.projection_actor) + .await?; + + Ok(()) + } +} + +impl Actor +where + O: xtra::Handler, + M: xtra::Handler, + W: xtra::Handler, +{ + async fn handle_cfd_setup_completed( + &mut self, + order_id: OrderId, + dlc: Result, + ) -> Result<()> { + self.setup_state = SetupState::None; + + let mut conn = self.db.acquire().await?; + let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + + let dlc = match dlc { + Ok(dlc) => dlc, + Err(e) => { + cfd.state = CfdState::SetupFailed { + common: CfdStateCommon::default(), + info: e.to_string(), + }; + + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; + + return Err(e); + } + }; + + cfd.state = CfdState::PendingOpen { + common: CfdStateCommon::default(), + dlc: dlc.clone(), + attestation: None, + }; + + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; + + let txid = self + .wallet + .send(wallet::TryBroadcastTransaction { + tx: dlc.lock.0.clone(), + }) + .await??; + + tracing::info!("Lock transaction published with txid {}", txid); + + self.monitor_actor + .send(monitor::StartMonitoring { + id: order_id, + params: MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()), + }) + .await?; + + self.oracle_actor + .send(oracle::MonitorAttestation { + event_id: dlc.settlement_event_id, + }) + .await?; + + Ok(()) + } +} + impl Actor where M: xtra::Handler, @@ -947,34 +993,6 @@ where } } -#[async_trait] -impl Handler for Actor -where - Self: xtra::Handler + xtra::Handler, - O: xtra::Handler + xtra::Handler, - T: xtra::Handler - + xtra::Handler, - W: xtra::Handler - + xtra::Handler - + xtra::Handler, -{ - async fn handle(&mut self, msg: CfdAction, ctx: &mut Context) -> Result<()> { - use CfdAction::*; - if let Err(e) = match msg { - AcceptOrder { order_id } => self.handle_accept_order(order_id, ctx).await, - RejectOrder { order_id } => self.handle_reject_order(order_id).await, - AcceptSettlement { order_id } => self.handle_accept_settlement(order_id).await, - RejectSettlement { order_id } => self.handle_reject_settlement(order_id).await, - AcceptRollOver { order_id } => self.handle_accept_roll_over(order_id, ctx).await, - RejectRollOver { order_id } => self.handle_reject_roll_over(order_id).await, - Commit { order_id } => self.handle_commit(order_id).await, - } { - anyhow::bail!("Message handler failed: {:#}", e); - } - Ok(()) - } -} - #[async_trait] impl Handler for Actor where @@ -1168,10 +1186,6 @@ impl Message for CfdRollOverCompleted { type Result = (); } -impl Message for CfdAction { - type Result = Result<()>; -} - impl Message for FromTaker { type Result = (); } diff --git a/daemon/src/routes_maker.rs b/daemon/src/routes_maker.rs index 4a38318..a209280 100644 --- a/daemon/src/routes_maker.rs +++ b/daemon/src/routes_maker.rs @@ -5,7 +5,7 @@ use daemon::model::cfd::{Cfd, Order, OrderId, Role, UpdateCfdProposals}; use daemon::model::{Price, TakerId, Usd, WalletInfo}; use daemon::routes::EmbeddedFileExt; use daemon::to_sse_event::{CfdAction, CfdsWithAuxData, ToSseEvent}; -use daemon::{bitmex_price_feed, maker_cfd, wallet}; +use daemon::{bitmex_price_feed, maker_cfd, maker_inc_connections, monitor, oracle, wallet}; use http_api_problem::{HttpApiProblem, StatusCode}; use rocket::http::{ContentType, Header, Status}; use rocket::response::stream::EventStream; @@ -20,6 +20,10 @@ use tokio::select; use tokio::sync::watch; use xtra::prelude::*; +pub type Maker = xtra::Address< + maker_cfd::Actor, +>; + #[allow(clippy::too_many_arguments)] #[rocket::get("/feed")] pub async fn maker_feed( @@ -122,10 +126,10 @@ pub struct CfdNewOrderRequest { #[rocket::post("/order/sell", data = "")] pub async fn post_sell_order( order: Json, - new_order_channel: &State>>, + cfd_actor: &State, _auth: Authenticated, ) -> Result, HttpApiProblem> { - new_order_channel + cfd_actor .send(maker_cfd::NewOrder { price: order.price, min_quantity: order.min_quantity, @@ -163,18 +167,19 @@ pub struct PromptAuthentication { pub async fn post_cfd_action( id: OrderId, action: CfdAction, - cfd_action_channel: &State>>, + cfd_actor: &State, _auth: Authenticated, ) -> Result, HttpApiProblem> { - use maker_cfd::CfdAction::*; + use maker_cfd::*; + let result = match action { - CfdAction::AcceptOrder => cfd_action_channel.send(AcceptOrder { order_id: id }), - CfdAction::RejectOrder => cfd_action_channel.send(RejectOrder { order_id: id }), - CfdAction::AcceptSettlement => cfd_action_channel.send(AcceptSettlement { order_id: id }), - CfdAction::RejectSettlement => cfd_action_channel.send(RejectSettlement { order_id: id }), - CfdAction::AcceptRollOver => cfd_action_channel.send(AcceptRollOver { order_id: id }), - CfdAction::RejectRollOver => cfd_action_channel.send(RejectRollOver { order_id: id }), - CfdAction::Commit => cfd_action_channel.send(Commit { order_id: id }), + CfdAction::AcceptOrder => cfd_actor.send(AcceptOrder { order_id: id }).await, + CfdAction::RejectOrder => cfd_actor.send(RejectOrder { order_id: id }).await, + CfdAction::AcceptSettlement => cfd_actor.send(AcceptSettlement { order_id: id }).await, + CfdAction::RejectSettlement => cfd_actor.send(RejectSettlement { order_id: id }).await, + CfdAction::AcceptRollOver => cfd_actor.send(AcceptRollOver { order_id: id }).await, + CfdAction::RejectRollOver => cfd_actor.send(RejectRollOver { order_id: id }).await, + CfdAction::Commit => cfd_actor.send(Commit { order_id: id }).await, CfdAction::Settle => { let msg = "Collaborative settlement can only be triggered by taker"; tracing::error!(msg); @@ -187,14 +192,11 @@ pub async fn post_cfd_action( } }; - result - .await - .unwrap_or_else(|e| anyhow::bail!(e)) - .map_err(|e| { - HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR) - .title(action.to_string() + " failed") - .detail(e.to_string()) - })?; + result.unwrap_or_else(|e| anyhow::bail!(e)).map_err(|e| { + HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR) + .title(action.to_string() + " failed") + .detail(e.to_string()) + })?; Ok(status::Accepted(None)) } diff --git a/daemon/tests/harness/mocks/wallet.rs b/daemon/tests/harness/mocks/wallet.rs index a8ff0ed..91115f7 100644 --- a/daemon/tests/harness/mocks/wallet.rs +++ b/daemon/tests/harness/mocks/wallet.rs @@ -3,7 +3,7 @@ use anyhow::Result; use bdk::bitcoin::util::psbt::PartiallySignedTransaction; use bdk::bitcoin::{ecdsa, Amount, Txid}; use daemon::model::{Timestamp, WalletInfo}; -use daemon::wallet::{self}; +use daemon::wallet; use maia::secp256k1_zkp::Secp256k1; use maia::{PartyParams, WalletExt}; use mockall::*; diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index 857bc82..ee8cb2a 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -4,7 +4,6 @@ use crate::harness::mocks::wallet::WalletActor; use crate::schnorrsig; use daemon::bitmex_price_feed::Quote; use daemon::connection::{connect, ConnectionStatus}; -use daemon::maker_cfd::CfdAction; use daemon::model::cfd::{Cfd, Order, Origin, UpdateCfdProposals}; use daemon::model::{Price, TakerId, Timestamp, Usd}; use daemon::seed::Seed; @@ -183,7 +182,7 @@ impl Maker { pub async fn reject_take_request(&self, order: Order) { self.system .cfd_actor_addr - .send(CfdAction::RejectOrder { order_id: order.id }) + .send(maker_cfd::RejectOrder { order_id: order.id }) .await .unwrap() .unwrap(); @@ -192,7 +191,7 @@ impl Maker { pub async fn accept_take_request(&self, order: Order) { self.system .cfd_actor_addr - .send(CfdAction::AcceptOrder { order_id: order.id }) + .send(maker_cfd::AcceptOrder { order_id: order.id }) .await .unwrap() .unwrap();