From 4b9dae8c7dc458855ffcf65cd7070374cc7f9387 Mon Sep 17 00:00:00 2001 From: Lucas Soriano del Pino Date: Mon, 18 Oct 2021 12:19:37 +1100 Subject: [PATCH] Make taker_cfd::Actor generic over other actor addresses --- daemon/src/taker_cfd.rs | 407 ++++++++++++++++++++++------------------ 1 file changed, 227 insertions(+), 180 deletions(-) diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 5709169..0705bd7 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -67,7 +67,7 @@ enum RollOverState { None, } -pub struct Actor { +pub struct Actor { db: sqlx::SqlitePool, wallet: Wallet, oracle_pk: schnorrsig::PublicKey, @@ -75,14 +75,14 @@ pub struct Actor { order_feed_actor_inbox: watch::Sender>, update_cfd_feed_sender: watch::Sender, send_to_maker: Box>, - monitor_actor: Address, + monitor_actor: Address, setup_state: SetupState, roll_over_state: RollOverState, - oracle_actor: Address, + oracle_actor: Address, current_pending_proposals: UpdateCfdProposals, } -impl Actor { +impl Actor { #[allow(clippy::too_many_arguments)] pub fn new( db: sqlx::SqlitePool, @@ -92,8 +92,8 @@ impl Actor { order_feed_actor_inbox: watch::Sender>, update_cfd_feed_sender: watch::Sender, send_to_maker: Box + Send>, - monitor_actor: Address, - oracle_actor: Address, + monitor_actor: Address, + oracle_actor: Address, ) -> Self { Self { db, @@ -227,6 +227,103 @@ impl Actor { Ok(()) } + async fn handle_order_rejected(&mut self, order_id: OrderId) -> Result<()> { + tracing::debug!(%order_id, "Order rejected"); + + let mut conn = self.db.acquire().await?; + let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + cfd.state = CfdState::rejected(); + + append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + + Ok(()) + } + + async fn handle_settlement_rejected(&mut self, order_id: OrderId) -> Result<()> { + tracing::info!(%order_id, "Settlement proposal got rejected"); + + self.remove_pending_proposal(&order_id)?; + + Ok(()) + } + + async fn handle_roll_over_rejected(&mut self, order_id: OrderId) -> Result<()> { + tracing::debug!(%order_id, "Roll over request rejected"); + // TODO: tell UI that roll over was rejected + + // this is not too bad as we are still monitoring for the CFD to expiry + // the taker can just try to ask again :) + + Ok(()) + } + + async fn handle_inc_protocol_msg(&mut self, msg: SetupMsg) -> Result<()> { + match &mut self.setup_state { + SetupState::Active { sender } => { + sender.send(msg).await?; + } + SetupState::None => { + anyhow::bail!("Received setup message without an active contract setup") + } + } + + Ok(()) + } + + async fn handle_inc_roll_over_msg(&mut self, msg: RollOverMsg) -> Result<()> { + match &mut self.roll_over_state { + RollOverState::Active { sender } => { + sender.send(msg).await?; + } + RollOverState::None => { + anyhow::bail!("Received message without an active roll_over setup") + } + } + + Ok(()) + } + + async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { + let mut conn = self.db.acquire().await?; + cfd_actors::handle_monitoring_event( + event, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; + Ok(()) + } + + async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { + let mut conn = self.db.acquire().await?; + cfd_actors::handle_commit( + order_id, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; + Ok(()) + } + + async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { + let mut conn = self.db.acquire().await?; + cfd_actors::handle_oracle_attestation( + attestation, + &mut conn, + &self.wallet, + &self.cfd_feed_actor_inbox, + ) + .await?; + Ok(()) + } +} + +impl Actor +where + O: xtra::Handler, +{ async fn handle_new_order(&mut self, order: Option) -> Result<()> { match order { Some(mut order) => { @@ -246,7 +343,62 @@ impl Actor { } Ok(()) } +} + +impl Actor +where + O: xtra::Handler, + M: xtra::Handler, +{ + async fn handle_cfd_setup_completed( + &mut self, + order_id: OrderId, + dlc: Result, + ) -> Result<()> { + self.setup_state = SetupState::None; + let dlc = dlc.context("Failed to setup contract with maker")?; + + tracing::info!("Setup complete, publishing on chain now"); + + let mut conn = self.db.acquire().await?; + let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + cfd.state = CfdState::PendingOpen { + common: CfdStateCommon::default(), + dlc: dlc.clone(), + attestation: None, + }; + + append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + + let txid = self + .wallet + .try_broadcast_transaction(dlc.lock.0.clone()) + .await?; + + tracing::info!("Lock transaction published with txid {}", txid); + + self.monitor_actor + .do_send_async(monitor::StartMonitoring { + id: order_id, + params: MonitorParams::from_dlc_and_timelocks(dlc, cfd.refund_timelock_in_blocks()), + }) + .await?; + + self.oracle_actor + .do_send_async(oracle::MonitorAttestation { + event_id: cfd.order.oracle_event_id, + }) + .await?; + + Ok(()) + } +} +impl Actor +where + Self: xtra::Handler, + O: xtra::Handler + xtra::Handler, +{ async fn handle_order_accepted( &mut self, order_id: OrderId, @@ -305,61 +457,13 @@ impl Actor { Ok(()) } +} - async fn handle_order_rejected(&mut self, order_id: OrderId) -> Result<()> { - tracing::debug!(%order_id, "Order rejected"); - - let mut conn = self.db.acquire().await?; - let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - cfd.state = CfdState::rejected(); - - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; - - Ok(()) - } - - async fn handle_settlement_accepted( - &mut self, - order_id: OrderId, - _ctx: &mut Context, - ) -> Result<()> { - tracing::info!(%order_id, "Settlement proposal got accepted"); - - let mut conn = self.db.acquire().await?; - - let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - let dlc = cfd.open_dlc().context("CFD was in wrong state")?; - - let proposal = self.get_settlement_proposal(order_id)?; - let (tx, sig_taker) = dlc.close_transaction(proposal)?; - - self.send_to_maker - .do_send(wire::TakerToMaker::InitiateSettlement { - order_id, - sig_taker, - })?; - - cfd.handle(CfdStateChangeEvent::ProposalSigned( - CollaborativeSettlement::new( - tx.clone(), - dlc.script_pubkey_for(cfd.role()), - proposal.price, - ), - ))?; - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; - - self.remove_pending_proposal(&order_id)?; - - self.monitor_actor - .do_send_async(monitor::CollaborativeSettlement { - order_id, - tx: (tx.txid(), dlc.script_pubkey_for(Role::Taker)), - }) - .await?; - - Ok(()) - } - +impl Actor +where + Self: xtra::Handler, + O: xtra::Handler, +{ async fn handle_roll_over_accepted( &mut self, order_id: OrderId, @@ -413,78 +517,31 @@ impl Actor { .context("Could not remove accepted roll over")?; Ok(()) } +} - async fn handle_settlement_rejected(&mut self, order_id: OrderId) -> Result<()> { - tracing::info!(%order_id, "Settlement proposal got rejected"); - - self.remove_pending_proposal(&order_id)?; - - Ok(()) - } - - async fn handle_roll_over_rejected(&mut self, order_id: OrderId) -> Result<()> { - tracing::debug!(%order_id, "Roll over request rejected"); - // TODO: tell UI that roll over was rejected - - // this is not too bad as we are still monitoring for the CFD to expiry - // the taker can just try to ask again :) - - Ok(()) - } - - async fn handle_inc_protocol_msg(&mut self, msg: SetupMsg) -> Result<()> { - match &mut self.setup_state { - SetupState::Active { sender } => { - sender.send(msg).await?; - } - SetupState::None => { - anyhow::bail!("Received setup message without an active contract setup") - } - } - - Ok(()) - } - - async fn handle_inc_roll_over_msg(&mut self, msg: RollOverMsg) -> Result<()> { - match &mut self.roll_over_state { - RollOverState::Active { sender } => { - sender.send(msg).await?; - } - RollOverState::None => { - anyhow::bail!("Received message without an active roll_over setup") - } - } - - Ok(()) - } - - async fn handle_cfd_setup_completed( +impl Actor +where + M: xtra::Handler, +{ + async fn handle_cfd_roll_over_completed( &mut self, order_id: OrderId, dlc: Result, ) -> Result<()> { - self.setup_state = SetupState::None; - let dlc = dlc.context("Failed to setup contract with maker")?; - - tracing::info!("Setup complete, publishing on chain now"); + let dlc = dlc.context("Failed to roll over contract with maker")?; + self.roll_over_state = RollOverState::None; let mut conn = self.db.acquire().await?; let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - cfd.state = CfdState::PendingOpen { + cfd.state = CfdState::Open { common: CfdStateCommon::default(), dlc: dlc.clone(), attestation: None, + collaborative_close: None, }; append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; - let txid = self - .wallet - .try_broadcast_transaction(dlc.lock.0.clone()) - .await?; - - tracing::info!("Lock transaction published with txid {}", txid); - self.monitor_actor .do_send_async(monitor::StartMonitoring { id: order_id, @@ -492,90 +549,66 @@ impl Actor { }) .await?; - self.oracle_actor - .do_send_async(oracle::MonitorAttestation { - event_id: cfd.order.oracle_event_id, - }) - .await?; - Ok(()) } +} - async fn handle_cfd_roll_over_completed( +impl Actor +where + M: xtra::Handler, +{ + async fn handle_settlement_accepted( &mut self, order_id: OrderId, - dlc: Result, + _ctx: &mut Context, ) -> Result<()> { - let dlc = dlc.context("Failed to roll over contract with maker")?; - self.roll_over_state = RollOverState::None; + tracing::info!(%order_id, "Settlement proposal got accepted"); let mut conn = self.db.acquire().await?; + let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - cfd.state = CfdState::Open { - common: CfdStateCommon::default(), - dlc: dlc.clone(), - attestation: None, - collaborative_close: None, - }; + let dlc = cfd.open_dlc().context("CFD was in wrong state")?; + + let proposal = self.get_settlement_proposal(order_id)?; + let (tx, sig_taker) = dlc.close_transaction(proposal)?; + self.send_to_maker + .do_send(wire::TakerToMaker::InitiateSettlement { + order_id, + sig_taker, + })?; + + cfd.handle(CfdStateChangeEvent::ProposalSigned( + CollaborativeSettlement::new( + tx.clone(), + dlc.script_pubkey_for(cfd.role()), + proposal.price, + ), + ))?; append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + self.remove_pending_proposal(&order_id)?; + self.monitor_actor - .do_send_async(monitor::StartMonitoring { - id: order_id, - params: MonitorParams::from_dlc_and_timelocks(dlc, cfd.refund_timelock_in_blocks()), + .do_send_async(monitor::CollaborativeSettlement { + order_id, + tx: (tx.txid(), dlc.script_pubkey_for(Role::Taker)), }) .await?; Ok(()) } - - async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { - let mut conn = self.db.acquire().await?; - cfd_actors::handle_monitoring_event( - event, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; - Ok(()) - } - - async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { - let mut conn = self.db.acquire().await?; - cfd_actors::handle_commit( - order_id, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; - Ok(()) - } - - async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { - let mut conn = self.db.acquire().await?; - cfd_actors::handle_oracle_attestation( - attestation, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; - Ok(()) - } } #[async_trait] -impl Handler for Actor { +impl Handler for Actor { async fn handle(&mut self, msg: TakeOffer, _ctx: &mut Context) { log_error!(self.handle_take_offer(msg.order_id, msg.quantity)); } } #[async_trait] -impl Handler for Actor { +impl Handler for Actor { async fn handle(&mut self, msg: CfdAction, _ctx: &mut Context) { use CfdAction::*; @@ -596,7 +629,14 @@ impl Handler for Actor { } #[async_trait] -impl Handler for Actor { +impl Handler for Actor +where + Self: xtra::Handler + xtra::Handler, + O: xtra::Handler + + xtra::Handler + + xtra::Handler, + M: xtra::Handler, +{ async fn handle( &mut self, message: MakerStreamMessage, @@ -649,28 +689,35 @@ impl Handler for Actor { } #[async_trait] -impl Handler for Actor { +impl Handler for Actor +where + O: xtra::Handler, + M: xtra::Handler, +{ async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context) { log_error!(self.handle_cfd_setup_completed(msg.order_id, msg.dlc)); } } #[async_trait] -impl Handler for Actor { +impl Handler for Actor +where + M: xtra::Handler, +{ async fn handle(&mut self, msg: CfdRollOverCompleted, _ctx: &mut Context) { log_error!(self.handle_cfd_roll_over_completed(msg.order_id, msg.dlc)); } } #[async_trait] -impl Handler for Actor { +impl Handler for Actor { async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context) { log_error!(self.handle_monitoring_event(msg)) } } #[async_trait] -impl Handler for Actor { +impl Handler for Actor { async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context) { log_error!(self.handle_oracle_attestation(msg)) } @@ -697,4 +744,4 @@ impl Message for CfdRollOverCompleted { type Result = (); } -impl xtra::Actor for Actor {} +impl xtra::Actor for Actor {}