diff --git a/daemon/src/actor_name.rs b/daemon/src/actor_name.rs deleted file mode 100644 index 961b08d..0000000 --- a/daemon/src/actor_name.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub trait ActorName { - fn actor_name() -> String; -} diff --git a/daemon/src/address_map.rs b/daemon/src/address_map.rs index bf43bcc..0ea15a5 100644 --- a/daemon/src/address_map.rs +++ b/daemon/src/address_map.rs @@ -3,8 +3,6 @@ use std::collections::HashMap; use std::hash::Hash; use xtra::{Address, Handler, Message}; -use crate::actor_name::ActorName; - pub struct AddressMap { inner: HashMap>, } @@ -111,6 +109,10 @@ impl<'a, K, A> Disconnected<'a, K, A> { } } +pub trait ActorName { + fn actor_name() -> String; +} + #[cfg(test)] mod tests { use super::*; diff --git a/daemon/src/collab_settlement_maker.rs b/daemon/src/collab_settlement_maker.rs new file mode 100644 index 0000000..01477b2 --- /dev/null +++ b/daemon/src/collab_settlement_maker.rs @@ -0,0 +1,216 @@ +use crate::address_map::{ActorName, Stopping}; +use crate::model::cfd::{ + Cfd, CollaborativeSettlement, OrderId, Role, SettlementKind, SettlementProposal, +}; +use crate::model::Identity; +use crate::{maker_inc_connections, projection}; +use anyhow::Context; +use async_trait::async_trait; +use bdk::bitcoin::Script; +use maia::secp256k1_zkp::Signature; +use xtra::prelude::MessageChannel; +use xtra_productivity::xtra_productivity; + +pub struct Actor { + cfd: Cfd, + projection: xtra::Address, + on_completed: Box>, + proposal: SettlementProposal, + taker_id: Identity, + connections: Box>, + on_stopping: Vec>>>, +} + +pub enum Completed { + Confirmed { + order_id: OrderId, + settlement: CollaborativeSettlement, + script_pubkey: Script, + }, + Rejected { + order_id: OrderId, + }, + Failed { + order_id: OrderId, + error: anyhow::Error, + }, +} + +pub struct Accepted; +pub struct Rejected; +pub struct Initiated { + pub sig_taker: Signature, +} + +#[xtra_productivity] +impl Actor { + async fn handle(&mut self, _: Accepted, ctx: &mut xtra::Context) { + let order_id = self.cfd.order.id; + + tracing::info!(%order_id, "Settlement proposal accepted"); + + self.accept(ctx).await; + self.update_proposal(None).await; + } + + async fn handle(&mut self, _: Rejected, ctx: &mut xtra::Context) { + let order_id = self.cfd.order.id; + + tracing::info!(%order_id, "Settlement proposal rejected"); + + self.reject(ctx).await; + self.update_proposal(None).await; + } + + async fn handle(&mut self, msg: Initiated, ctx: &mut xtra::Context) { + let completed = async { + tracing::info!( + order_id = %self.cfd.order.id, + taker_id = %self.taker_id, + "Received signature for collaborative settlement" + ); + + let Initiated { sig_taker } = msg; + + let dlc = self.cfd.open_dlc().context("CFD was in wrong state")?; + let (tx, sig_maker) = dlc.close_transaction(&self.proposal)?; + let spend_tx = dlc.finalize_spend_transaction((tx, sig_maker), sig_taker)?; + + let settlement = CollaborativeSettlement::new( + spend_tx.clone(), + dlc.script_pubkey_for(Role::Maker), + self.proposal.price, + )?; + + self.update_proposal(None).await; + + anyhow::Ok(Completed::Confirmed { + order_id: self.cfd.order.id, + settlement, + script_pubkey: dlc.script_pubkey_for(Role::Maker), + }) + } + .await + .unwrap_or_else(|e| Completed::Failed { + order_id: self.cfd.order.id, + error: e, + }); + + self.complete(completed, ctx).await; + } +} + +#[async_trait] +impl xtra::Actor for Actor { + async fn started(&mut self, _ctx: &mut xtra::Context) { + tracing::info!( + order_id = %self.proposal.order_id, + price = %self.proposal.price, + "Received settlement proposal" + ); + + self.update_proposal(Some((self.proposal.clone(), SettlementKind::Incoming))) + .await; + } + + async fn stopping(&mut self, ctx: &mut xtra::Context) -> xtra::KeepRunning { + // inform other actors that we are stopping so that our + // address can be GCd from their AddressMaps + let me = ctx.address().expect("we are still alive"); + + for channel in self.on_stopping.iter() { + let _ = channel.send(Stopping { me: me.clone() }).await; + } + + xtra::KeepRunning::StopAll + } +} + +impl Actor { + pub fn new( + cfd: Cfd, + proposal: SettlementProposal, + projection: xtra::Address, + on_completed: &(impl MessageChannel + 'static), + taker_id: Identity, + connections: &(impl MessageChannel + 'static), + (on_stopping0, on_stopping1): ( + &(impl MessageChannel> + 'static), + &(impl MessageChannel> + 'static), + ), + ) -> Self { + Self { + cfd, + projection, + on_completed: on_completed.clone_channel(), + proposal, + taker_id, + connections: connections.clone_channel(), + on_stopping: vec![on_stopping0.clone_channel(), on_stopping1.clone_channel()], + } + } + + async fn update_proposal(&mut self, proposal: Option<(SettlementProposal, SettlementKind)>) { + if let Err(e) = self + .projection + .send(projection::UpdateSettlementProposal { + order: self.cfd.order.id, + proposal, + }) + .await + { + tracing::warn!( + "Failed to deliver settlement proposal update to projection actor: {:#}", + e + ); + }; + } + + async fn complete(&mut self, completed: Completed, ctx: &mut xtra::Context) { + let _ = self.on_completed.send(completed).await; + + ctx.stop(); + } + + async fn accept(&mut self, ctx: &mut xtra::Context) { + let this = ctx.address().expect("self to be alive"); + self.inform_taker( + maker_inc_connections::settlement::Decision::Accept { address: this }, + ctx, + ) + .await + } + + async fn reject(&mut self, ctx: &mut xtra::Context) { + self.inform_taker(maker_inc_connections::settlement::Decision::Reject, ctx) + .await + } + + async fn inform_taker( + &mut self, + decision: maker_inc_connections::settlement::Decision, + ctx: &mut xtra::Context, + ) { + let order_id = self.cfd.order.id; + + if let Err(e) = self + .connections + .send(maker_inc_connections::settlement::Response { + taker_id: self.taker_id, + order_id, + decision, + }) + .await + .context("Failed inform taker about settlement decision") + { + self.complete(Completed::Failed { order_id, error: e }, ctx) + .await; + } + } +} + +impl ActorName for Actor { + fn actor_name() -> String { + "Maker collab settlement".to_string() + } +} diff --git a/daemon/src/collab_settlement_taker.rs b/daemon/src/collab_settlement_taker.rs index 7e61af8..6b00b6b 100644 --- a/daemon/src/collab_settlement_taker.rs +++ b/daemon/src/collab_settlement_taker.rs @@ -1,5 +1,4 @@ -use crate::actor_name::ActorName; -use crate::address_map::Stopping; +use crate::address_map::{ActorName, Stopping}; use crate::model::cfd::{ Cfd, CollaborativeSettlement, OrderId, SettlementKind, SettlementProposal, }; diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index a66a524..8493e65 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -21,13 +21,13 @@ use xtra::{Actor, Address}; pub mod sqlx_ext; // Must come first because it is a macro. -pub mod actor_name; pub mod actors; pub mod address_map; pub mod auth; pub mod bdk_ext; pub mod bitmex_price_feed; pub mod cfd_actors; +pub mod collab_settlement_maker; pub mod collab_settlement_taker; pub mod connection; pub mod db; @@ -114,7 +114,9 @@ where T: xtra::Handler + xtra::Handler + xtra::Handler - + xtra::Handler>, + + xtra::Handler> + + xtra::Handler + + xtra::Handler>, W: xtra::Handler + xtra::Handler + xtra::Handler, diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index fa3e026..466f996 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -2,21 +2,20 @@ use crate::address_map::{AddressMap, Stopping}; use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_update_feed}; use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; use crate::model::cfd::{ - Cfd, CfdState, CfdStateCommon, CollaborativeSettlement, Dlc, Order, OrderId, Origin, Role, - RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal, + Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId, Origin, Role, RollOverProposal, + SettlementKind, SettlementProposal, UpdateCfdProposal, }; use crate::model::{Identity, Price, Timestamp, Usd}; use crate::monitor::MonitorParams; use crate::projection::{ - try_into_update_rollover_proposal, try_into_update_settlement_proposal, Update, - UpdateRollOverProposal, UpdateSettlementProposal, + try_into_update_rollover_proposal, Update, UpdateRollOverProposal, UpdateSettlementProposal, }; use crate::setup_contract::RolloverParams; use crate::tokio_ext::FutureExt; use crate::wire::TakerToMaker; use crate::{ - log_error, maker_inc_connections, monitor, oracle, projection, setup_contract, setup_maker, - wallet, wire, Tasks, + collab_settlement_maker, log_error, maker_inc_connections, monitor, oracle, projection, + setup_contract, setup_maker, wallet, wire, Tasks, }; use anyhow::{Context as _, Result}; use async_trait::async_trait; @@ -24,7 +23,6 @@ use bdk::bitcoin::secp256k1::schnorrsig; use futures::channel::mpsc; use futures::future::RemoteHandle; use futures::{future, SinkExt}; -use maia::secp256k1_zkp::Signature; use sqlx::pool::PoolConnection; use sqlx::Sqlite; use std::collections::{HashMap, HashSet}; @@ -89,11 +87,11 @@ pub struct Actor { current_order_id: Option, monitor_actor: Address, setup_actors: AddressMap, + settlement_actors: AddressMap, roll_over_state: RollOverState, oracle_actor: Address, // Maker needs to also store Identity to be able to send a reply back current_pending_proposals: HashMap, - current_agreed_proposals: HashMap, connected_takers: HashSet, n_payouts: usize, tasks: Tasks, @@ -134,9 +132,9 @@ impl Actor { roll_over_state: RollOverState::None, oracle_actor, current_pending_proposals: HashMap::new(), - current_agreed_proposals: HashMap::new(), n_payouts, connected_takers: HashSet::new(), + settlement_actors: AddressMap::default(), tasks: Tasks::default(), } } @@ -180,30 +178,6 @@ impl Actor { Ok(()) } - async fn handle_propose_settlement( - &mut self, - taker_id: Identity, - proposal: SettlementProposal, - ) -> Result<()> { - tracing::info!( - "Received settlement proposal from the taker: {:?}", - proposal - ); - - let new_proposal = UpdateCfdProposal::Settlement { - proposal: proposal.clone(), - direction: SettlementKind::Incoming, - }; - - self.current_pending_proposals - .insert(proposal.order_id, (new_proposal.clone(), taker_id)); - self.projection_actor - .send(try_into_update_settlement_proposal(new_proposal)?) - .await?; - - Ok(()) - } - async fn handle_inc_roll_over_protocol_msg( &mut self, taker_id: Identity, @@ -254,29 +228,6 @@ impl Actor { Ok(()) } - fn get_taker_id_of_proposal(&self, order_id: &OrderId) -> Result { - let (_, taker_id) = self - .current_pending_proposals - .get(order_id) - .context("Could not find proposal for given order id")?; - Ok(*taker_id) - } - - fn get_settlement_proposal(&self, order_id: OrderId) -> Result<(SettlementProposal, Identity)> { - let (update_proposal, taker_id) = self - .current_pending_proposals - .get(&order_id) - .context("have a proposal that is about to be accepted")?; - - let proposal = match update_proposal { - UpdateCfdProposal::Settlement { proposal, .. } => proposal, - UpdateCfdProposal::RollOverProposal { .. } => { - anyhow::bail!("did not expect a rollover proposal"); - } - }; - Ok((proposal.clone(), *taker_id)) - } - async fn update_connected_takers(&mut self) -> Result<()> { self.projection_actor .send(Update( @@ -542,6 +493,16 @@ impl Actor { } } +#[xtra_productivity(message_impl = false)] +impl Actor { + async fn handle_settlement_actor_stopping( + &mut self, + message: Stopping, + ) { + self.settlement_actors.gc(message); + } +} + #[xtra_productivity] impl Actor where @@ -573,35 +534,10 @@ where async fn handle_accept_settlement(&mut self, msg: AcceptSettlement) -> Result<()> { let AcceptSettlement { order_id } = msg; - tracing::debug!(%order_id, "Maker accepts a settlement proposal" ); - - let taker_id = self.get_taker_id_of_proposal(&order_id)?; - - match self - .takers - .send(maker_inc_connections::TakerMessage { - taker_id, - msg: wire::MakerToTaker::Settlement { - order_id, - msg: wire::maker_to_taker::Settlement::Confirm, - }, - }) - .await? - { - Ok(_) => { - self.current_agreed_proposals - .insert(order_id, self.get_settlement_proposal(order_id)?); - self.remove_pending_proposal(&order_id) - .await - .context("accepted settlement")?; - } - Err(e) => { - tracing::warn!("Failed to notify taker of accepted settlement: {}", e); - self.remove_pending_proposal(&order_id) - .await - .context("accepted settlement")?; - } - } + self.settlement_actors + .send(&order_id, collab_settlement_maker::Accepted) + .await + .with_context(|| format!("No settlement in progress for order {}", order_id))?; Ok(()) } @@ -609,25 +545,10 @@ where async fn handle_reject_settlement(&mut self, msg: RejectSettlement) -> Result<()> { let RejectSettlement { order_id } = msg; - tracing::debug!(%order_id, "Maker rejects a settlement proposal" ); - - let taker_id = self.get_taker_id_of_proposal(&order_id)?; - - // clean-up state ahead of sending to ensure consistency in case we fail to deliver the - // message - self.remove_pending_proposal(&order_id) + self.settlement_actors + .send(&order_id, collab_settlement_maker::Rejected) .await - .context("rejected settlement")?; - - self.takers - .send(maker_inc_connections::TakerMessage { - taker_id, - msg: wire::MakerToTaker::Settlement { - order_id, - msg: wire::maker_to_taker::Settlement::Reject, - }, - }) - .await??; + .with_context(|| format!("No settlement in progress for order {}", order_id))?; Ok(()) } @@ -668,6 +589,58 @@ where } } +#[xtra_productivity] +impl Actor +where + M: xtra::Handler, + W: xtra::Handler, +{ + async fn handle_settlement_completed(&mut self, msg: collab_settlement_maker::Completed) { + log_error!(async { + use collab_settlement_maker::Completed::*; + let (order_id, settlement, script_pubkey) = match msg { + Confirmed { + order_id, + settlement, + script_pubkey, + } => (order_id, settlement, script_pubkey), + Rejected { .. } => { + return Ok(()); + } + Failed { order_id, error } => { + tracing::warn!(%order_id, "Collaborative settlement failed: {:#}", error); + return Ok(()); + } + }; + + let mut conn = self.db.acquire().await?; + let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + + let tx = settlement.tx.clone(); + cfd.handle_proposal_signed(settlement) + .context("Failed to update state with collaborative settlement")?; + + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; + + let txid = self + .wallet + .send(wallet::TryBroadcastTransaction { tx }) + .await? + .context("Broadcasting close transaction")?; + tracing::info!(%order_id, "Close transaction published with txid {}", txid); + + self.monitor_actor + .send(monitor::CollaborativeSettlement { + order_id, + tx: (txid, script_pubkey), + }) + .await?; + + anyhow::Ok(()) + }); + } +} + #[xtra_productivity] impl Actor where @@ -829,70 +802,46 @@ where impl Actor where - M: xtra::Handler, + O: xtra::Handler, + M: xtra::Handler + xtra::Handler, + T: xtra::Handler + + xtra::Handler>, W: xtra::Handler, { - async fn handle_initiate_settlement( + async fn handle_propose_settlement( &mut self, taker_id: Identity, - order_id: OrderId, - sig_taker: Signature, + proposal: SettlementProposal, + ctx: &mut xtra::Context, ) -> Result<()> { - tracing::info!( - "Taker {} initiated collab settlement for order { } by sending their signature", - taker_id, - order_id, - ); - - let (proposal, agreed_taker_id) = self - .current_agreed_proposals - .get(&order_id) - .context("maker should have data matching the agreed settlement")?; - - if taker_id != *agreed_taker_id { - anyhow::bail!( - "taker Id mismatch. Expected: {}, received: {}", - agreed_taker_id, - taker_id - ); - } + let disconnected = self + .settlement_actors + .get_disconnected(proposal.order_id) + .with_context(|| { + format!( + "Settlement for order {} is already in progress", + proposal.order_id + ) + })?; let mut conn = self.db.acquire().await?; + let cfd = load_cfd_by_order_id(proposal.order_id, &mut conn).await?; - let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - let dlc = cfd.open_dlc().context("CFD was in wrong state")?; - - let (tx, sig_maker) = dlc.close_transaction(proposal)?; - - let own_script_pubkey = dlc.script_pubkey_for(cfd.role()); - cfd.handle_proposal_signed(CollaborativeSettlement::new( - tx.clone(), - own_script_pubkey.clone(), - proposal.price, - )?)?; - append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; - - let spend_tx = dlc.finalize_spend_transaction((tx, sig_maker), sig_taker)?; - - let txid = self - .wallet - .send(wallet::TryBroadcastTransaction { - tx: spend_tx.clone(), - }) - .await? - .context("Broadcasting spend transaction")?; - tracing::info!("Close transaction published with txid {}", txid); - - self.monitor_actor - .send(monitor::CollaborativeSettlement { - order_id, - tx: (txid, own_script_pubkey), - }) - .await?; + let this = ctx.address().expect("self to be alive"); + let (addr, task) = collab_settlement_maker::Actor::new( + cfd, + proposal, + self.projection_actor.clone(), + &ctx.address().expect("we are alive"), + taker_id, + &self.takers, + (&self.takers, &this), + ) + .create(None) + .run(); - self.current_agreed_proposals - .remove(&order_id) - .context("remove accepted proposal after signing")?; + self.tasks.add(task); + disconnected.insert(addr); Ok(()) } @@ -1057,7 +1006,9 @@ where T: xtra::Handler + xtra::Handler + xtra::Handler - + xtra::Handler>, + + xtra::Handler> + + xtra::Handler + + xtra::Handler>, W: xtra::Handler + xtra::Handler + xtra::Handler, @@ -1085,14 +1036,15 @@ where taker, maker, price - } + }, + ctx )) } wire::TakerToMaker::Settlement { - order_id, - msg: wire::taker_to_maker::Settlement::Initiate { sig_taker }, + msg: wire::taker_to_maker::Settlement::Initiate { .. }, + .. } => { - log_error!(self.handle_initiate_settlement(taker_id, order_id, sig_taker)) + unreachable!("Handled within `collab_settlement_maker::Actor"); } wire::TakerToMaker::ProposeRollOver { order_id, diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index 590bfc1..5d72295 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -4,8 +4,8 @@ use crate::model::cfd::{Order, OrderId}; use crate::model::Identity; use crate::noise::TransportStateExt; use crate::tokio_ext::FutureExt; -use crate::wire::{EncryptedJsonCodec, MakerToTaker, TakerToMaker, Version}; -use crate::{maker_cfd, noise, send_to_socket, setup_maker, wire, Tasks}; +use crate::wire::{taker_to_maker, EncryptedJsonCodec, MakerToTaker, TakerToMaker, Version}; +use crate::{collab_settlement_maker, maker_cfd, noise, send_to_socket, setup_maker, wire, Tasks}; use anyhow::{bail, Context, Result}; use futures::{SinkExt, TryStreamExt}; use std::collections::HashMap; @@ -35,6 +35,32 @@ pub struct ConfirmOrder { pub address: xtra::Address, } +pub mod settlement { + use super::*; + + /// Message sent from the `collab_settlement_maker::Actor` to the + /// `maker_inc_connections::Actor` so that it can forward it to the + /// taker. + /// + /// Additionally, the address of this instance of the + /// `collab_settlement_maker::Actor` is included so that the + /// `maker_inc_connections::Actor` knows where to forward the + /// collaborative settlement messages from the taker about this + /// particular order. + pub struct Response { + pub taker_id: Identity, + pub order_id: OrderId, + pub decision: Decision, + } + + pub enum Decision { + Accept { + address: xtra::Address, + }, + Reject, + } +} + #[derive(Debug)] pub struct TakerMessage { pub taker_id: Identity, @@ -59,6 +85,7 @@ pub struct Actor { noise_priv_key: x25519_dalek::StaticSecret, heartbeat_interval: Duration, setup_actors: AddressMap, + settlement_actors: AddressMap, connection_tasks: HashMap, } @@ -78,6 +105,7 @@ impl Actor { noise_priv_key, heartbeat_interval, setup_actors: AddressMap::default(), + settlement_actors: AddressMap::default(), connection_tasks: HashMap::new(), } } @@ -231,6 +259,28 @@ impl Actor { Ok(()) } + async fn handle_settlement_response(&mut self, msg: settlement::Response) -> Result<()> { + let decision = match msg.decision { + settlement::Decision::Accept { address } => { + self.settlement_actors.insert(msg.order_id, address); + + wire::maker_to_taker::Settlement::Confirm + } + settlement::Decision::Reject => wire::maker_to_taker::Settlement::Reject, + }; + + self.send_to_taker( + &msg.taker_id, + wire::MakerToTaker::Settlement { + order_id: msg.order_id, + msg: decision, + }, + ) + .await?; + + Ok(()) + } + async fn handle_taker_message(&mut self, msg: TakerMessage) -> Result<(), NoConnection> { self.send_to_taker(&msg.taker_id, msg.msg).await?; @@ -276,6 +326,19 @@ impl Actor { tracing::error!(%order_id, "No active contract setup"); } }, + Settlement { + order_id, + msg: taker_to_maker::Settlement::Initiate { sig_taker }, + } => { + if self + .settlement_actors + .send(&order_id, collab_settlement_maker::Initiated { sig_taker }) + .await + .is_err() + { + tracing::warn!(%order_id, "No active settlement"); + } + } _ => { let _ = self.taker_msg_channel.send(msg); } @@ -285,6 +348,13 @@ impl Actor { async fn handle_setup_actor_stopping(&mut self, message: Stopping) { self.setup_actors.gc(message); } + + async fn handle_settlement_actor_stopping( + &mut self, + message: Stopping, + ) { + self.settlement_actors.gc(message); + } } struct ReadFail(Identity); diff --git a/daemon/src/rollover_taker.rs b/daemon/src/rollover_taker.rs index ee4f52c..2045b67 100644 --- a/daemon/src/rollover_taker.rs +++ b/daemon/src/rollover_taker.rs @@ -1,5 +1,4 @@ -use crate::actor_name::ActorName; -use crate::address_map::Stopping; +use crate::address_map::{ActorName, Stopping}; use crate::connection; use crate::model::cfd::{Cfd, Dlc, OrderId, Role, RollOverProposal, SettlementKind}; use crate::model::{BitMexPriceEventId, Timestamp}; diff --git a/daemon/src/setup_maker.rs b/daemon/src/setup_maker.rs index 0f084f8..a34958d 100644 --- a/daemon/src/setup_maker.rs +++ b/daemon/src/setup_maker.rs @@ -1,5 +1,4 @@ -use crate::actor_name::ActorName; -use crate::address_map::Stopping; +use crate::address_map::{ActorName, Stopping}; use crate::model::cfd::{Cfd, CfdState, Dlc, Order, OrderId, Role}; use crate::model::{Identity, Usd}; use crate::oracle::Announcement;