From d75e73da9fc0331a1bc0c66b16212b100847ce2b Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 30 Nov 2021 09:45:36 +1100 Subject: [PATCH] Move collab. settlement protocol into dedicated actor for taker --- daemon/src/address_map.rs | 65 +++++++++ daemon/src/collab_settlement_taker.rs | 194 +++++++++++++++++++++++++ daemon/src/connection.rs | 56 +++++++- daemon/src/lib.rs | 1 + daemon/src/taker_cfd.rs | 197 +++++++++----------------- 5 files changed, 380 insertions(+), 133 deletions(-) create mode 100644 daemon/src/collab_settlement_taker.rs diff --git a/daemon/src/address_map.rs b/daemon/src/address_map.rs index deb7515..7723da0 100644 --- a/daemon/src/address_map.rs +++ b/daemon/src/address_map.rs @@ -1,6 +1,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::hash::Hash; +use xtra::{Address, Handler, Message}; pub struct AddressMap { inner: HashMap>, @@ -27,6 +28,46 @@ where Ok(Disconnected { entry }) } + + /// Garbage-collect an address that is no longer active. + pub fn gc(&mut self, stopping: Stopping) { + self.inner.retain(|_, candidate| stopping.me != *candidate); + } + + pub fn insert(&mut self, key: K, address: Address) { + self.inner.insert(key, address); + } + + /// Sends a message to the actor stored with the given key. + pub async fn send(&self, key: &K, msg: M) -> bool + where + M: Message, + A: Handler, + { + match self.inner.get(key) { + Some(addr) if addr.is_connected() => { + addr.send(msg) + .await + .expect("we checked that we are connected"); + + true + } + Some(_) => false, + None => false, + } + } +} + +/// A message to notify that an actor instance is stopping. +pub struct Stopping { + pub me: Address, +} + +impl Message for Stopping +where + A: 'static, +{ + type Result = (); } #[derive(thiserror::Error, Debug)] @@ -49,3 +90,27 @@ impl<'a, K, A> Disconnected<'a, K, A> { }; } } + +#[cfg(test)] +mod tests { + use super::*; + use xtra::Context; + + #[test] + fn gc_removes_address() { + let (addr_1, _) = Context::::new(None); + let (addr_2, _) = Context::::new(None); + let mut map = AddressMap::default(); + map.insert("foo", addr_1); + map.insert("bar", addr_2.clone()); + + map.gc(Stopping { me: addr_2 }); + + assert_eq!(map.inner.len(), 1); + assert!(map.inner.get("foo").is_some()); + } + + struct Dummy; + + impl xtra::Actor for Dummy {} +} diff --git a/daemon/src/collab_settlement_taker.rs b/daemon/src/collab_settlement_taker.rs new file mode 100644 index 0000000..b39836b --- /dev/null +++ b/daemon/src/collab_settlement_taker.rs @@ -0,0 +1,194 @@ +use crate::address_map::Stopping; +use crate::model::cfd::{ + Cfd, CollaborativeSettlement, OrderId, SettlementKind, SettlementProposal, +}; +use crate::model::Price; +use crate::{connection, projection, wire}; +use anyhow::{Context, Result}; +use async_trait::async_trait; +use xtra::prelude::MessageChannel; +use xtra_productivity::xtra_productivity; + +pub struct Actor { + cfd: Cfd, + projection: xtra::Address, + on_completed: Box>, + connection: xtra::Address, + proposal: SettlementProposal, +} + +impl Actor { + pub fn new( + cfd: Cfd, + projection: xtra::Address, + on_completed: impl MessageChannel + 'static, + current_price: Price, + connection: xtra::Address, + n_payouts: usize, + ) -> Result { + let proposal = cfd.calculate_settlement(current_price, n_payouts)?; + + Ok(Self { + cfd, + projection, + on_completed: Box::new(on_completed), + connection, + proposal, + }) + } + + async fn propose(&mut self, this: xtra::Address) -> Result<()> { + if !self.cfd.is_collaborative_settle_possible() { + anyhow::bail!( + "Settlement proposal not possible because for cfd {} is in state {} which cannot be collaboratively settled", + self.cfd.order.id, + self.cfd.state + ) + } + + self.connection + .send(connection::ProposeSettlement { + timestamp: self.proposal.timestamp, + taker: self.proposal.taker, + maker: self.proposal.maker, + price: self.proposal.price, + address: this, + order_id: self.cfd.order.id, + }) + .await??; + + self.update_proposal(Some((self.proposal.clone(), SettlementKind::Outgoing))) + .await?; + + Ok(()) + } + + async fn handle_confirmed(&mut self) -> Result { + let order_id = self.cfd.order.id; + + tracing::info!(%order_id, "Settlement proposal got accepted"); + + self.update_proposal(None).await?; + + let dlc = self.cfd.dlc().context("No DLC in CFD")?; + + let (tx, sig) = dlc.close_transaction(&self.proposal)?; + + // Need to use `do_send_async` here because this handler is called in + // context of a message arriving over the wire, and would result in a + // deadlock otherwise. + #[allow(clippy::disallowed_method)] + self.connection + .do_send_async(wire::TakerToMaker::Settlement { + order_id, + msg: wire::taker_to_maker::Settlement::Initiate { sig_taker: sig }, + }) + .await?; + + Ok(CollaborativeSettlement::new( + tx, + dlc.script_pubkey_for(self.cfd.role()), // TODO: Hardcode role to Taker? + self.proposal.price, + )?) + } + + async fn handle_rejected(&mut self) -> Result<()> { + let order_id = self.cfd.order.id; + + tracing::info!(%order_id, "Settlement proposal got rejected"); + + self.update_proposal(None).await?; + + Ok(()) + } + + async fn update_proposal( + &mut self, + proposal: Option<(SettlementProposal, SettlementKind)>, + ) -> Result<()> { + self.projection + .send(projection::UpdateSettlementProposal { + order: self.cfd.order.id, + proposal, + }) + .await?; + + Ok(()) + } + + async fn complete(&mut self, completed: Completed, ctx: &mut xtra::Context) { + let _ = self.on_completed.send(completed).await; + + ctx.stop(); + } +} + +#[async_trait] +impl xtra::Actor for Actor { + async fn started(&mut self, ctx: &mut xtra::Context) { + let this = ctx.address().expect("get address to ourselves"); + + if let Err(e) = self.propose(this).await { + self.complete( + Completed::Failed { + order_id: self.cfd.order.id, + error: e, + }, + ctx, + ) + .await; + } + } + + async fn stopping(&mut self, ctx: &mut xtra::Context) -> xtra::KeepRunning { + // inform the connection actor that we stopping so it can GC the address from the hashmap + let me = ctx.address().expect("we are still alive"); + let _ = self.connection.send(Stopping { me }).await; + + xtra::KeepRunning::StopAll + } +} + +pub enum Completed { + Confirmed { + order_id: OrderId, + settlement: CollaborativeSettlement, + }, + Rejected { + order_id: OrderId, + }, + Failed { + order_id: OrderId, + error: anyhow::Error, + }, +} + +#[xtra_productivity] +impl Actor { + async fn handle( + &mut self, + msg: wire::maker_to_taker::Settlement, + ctx: &mut xtra::Context, + ) { + let order_id = self.cfd.order.id; + + let completed = match msg { + wire::maker_to_taker::Settlement::Confirm => match self.handle_confirmed().await { + Ok(settlement) => Completed::Confirmed { + settlement, + order_id, + }, + Err(e) => Completed::Failed { error: e, order_id }, + }, + wire::maker_to_taker::Settlement::Reject => { + if let Err(e) = self.handle_rejected().await { + Completed::Failed { error: e, order_id } + } else { + Completed::Rejected { order_id } + } + } + }; + + self.complete(completed, ctx).await; + } +} diff --git a/daemon/src/connection.rs b/daemon/src/connection.rs index a602660..795d73e 100644 --- a/daemon/src/connection.rs +++ b/daemon/src/connection.rs @@ -1,8 +1,10 @@ +use crate::address_map::{AddressMap, Stopping}; use crate::model::cfd::OrderId; -use crate::model::Usd; +use crate::model::{Price, Timestamp, Usd}; use crate::tokio_ext::FutureExt; -use crate::{log_error, noise, send_to_socket, setup_taker, wire, Tasks}; +use crate::{collab_settlement_taker, log_error, noise, send_to_socket, setup_taker, wire, Tasks}; use anyhow::{Context, Result}; +use bdk::bitcoin::Amount; use futures::StreamExt; use std::collections::HashMap; use std::net::SocketAddr; @@ -34,6 +36,7 @@ pub struct Actor { connect_timeout: Duration, connected_state: Option, setup_actors: HashMap>, + collab_settlement_actors: AddressMap, } pub struct Connect { @@ -67,6 +70,15 @@ pub struct TakeOrder { pub address: xtra::Address, } +pub struct ProposeSettlement { + pub order_id: OrderId, + pub timestamp: Timestamp, + pub taker: Amount, + pub maker: Amount, + pub price: Price, + pub address: xtra::Address, +} + impl Actor { pub fn new( status_sender: watch::Sender, @@ -87,6 +99,7 @@ impl Actor { connected_state: None, setup_actors: HashMap::new(), connect_timeout, + collab_settlement_actors: AddressMap::default(), } } } @@ -96,6 +109,13 @@ impl Actor { async fn handle_taker_to_maker(&mut self, message: wire::TakerToMaker) { log_error!(self.send_to_maker.send(message)); } + + async fn handle_collab_settlement_actor_stopping( + &mut self, + message: Stopping, + ) { + self.collab_settlement_actors.gc(message); + } } #[xtra_productivity] @@ -112,6 +132,33 @@ impl Actor { Ok(()) } + + async fn handle_propose_settlement(&mut self, msg: ProposeSettlement) -> Result<()> { + let ProposeSettlement { + order_id, + timestamp, + taker, + maker, + price, + address, + } = msg; + + self.send_to_maker + .send(wire::TakerToMaker::Settlement { + order_id, + msg: wire::taker_to_maker::Settlement::Propose { + timestamp, + taker, + maker, + price, + }, + }) + .await?; + + self.collab_settlement_actors.insert(order_id, address); + + Ok(()) + } } #[xtra_productivity] @@ -233,6 +280,11 @@ impl Actor { } } } + wire::MakerToTaker::Settlement { order_id, msg } => { + if !self.collab_settlement_actors.send(&order_id, msg).await { + tracing::warn!(%order_id, "No active collaborative settlement"); + } + } other => { // this one should go to the taker cfd actor log_error!(self.maker_to_taker.send(other)); diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 6e381c3..5d8bdbd 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -24,6 +24,7 @@ pub mod address_map; pub mod auth; pub mod bitmex_price_feed; pub mod cfd_actors; +pub mod collab_settlement_taker; pub mod connection; pub mod db; pub mod fan_out; diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index f11ca9f..7164068 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -2,21 +2,20 @@ use crate::address_map::AddressMap; use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_send_to_feed}; use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; use crate::model::cfd::{ - Cfd, CfdState, CfdStateCommon, CollaborativeSettlement, Completed, Dlc, Order, OrderId, Origin, - Role, RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal, - UpdateCfdProposals, + Cfd, CfdState, CfdStateCommon, Completed, Dlc, Order, OrderId, Origin, Role, RollOverProposal, + SettlementKind, UpdateCfdProposal, UpdateCfdProposals, }; use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd}; use crate::monitor::{self, MonitorParams}; use crate::projection::{ - try_into_update_rollover_proposal, try_into_update_settlement_proposal, UpdateRollOverProposal, - UpdateSettlementProposal, + try_into_update_rollover_proposal, UpdateRollOverProposal, UpdateSettlementProposal, }; use crate::setup_contract::RolloverParams; use crate::tokio_ext::FutureExt; use crate::wire::RollOverMsg; use crate::{ - connection, log_error, oracle, projection, setup_contract, setup_taker, wallet, wire, Tasks, + collab_settlement_taker, connection, log_error, oracle, projection, setup_contract, + setup_taker, wallet, wire, Tasks, }; use anyhow::{bail, Context as _, Result}; use async_trait::async_trait; @@ -68,6 +67,7 @@ pub struct Actor { conn_actor: Address, monitor_actor: Address, setup_actors: AddressMap, + collab_settlement_actors: AddressMap, roll_over_state: RollOverState, oracle_actor: Address, current_pending_proposals: UpdateCfdProposals, @@ -104,6 +104,7 @@ where current_pending_proposals: HashMap::new(), n_payouts, setup_actors: AddressMap::default(), + collab_settlement_actors: AddressMap::default(), tasks: Tasks::default(), } } @@ -138,25 +139,13 @@ impl Actor { } Ok(()) } - - fn get_settlement_proposal(&self, order_id: OrderId) -> Result<&SettlementProposal> { - match self - .current_pending_proposals - .get(&order_id) - .context("have a proposal that is about to be accepted")? - { - UpdateCfdProposal::Settlement { proposal, .. } => Ok(proposal), - UpdateCfdProposal::RollOverProposal { .. } => { - anyhow::bail!("did not expect a rollover proposal"); - } - } - } } #[xtra_productivity] impl Actor where W: xtra::Handler, + M: xtra::Handler, { async fn handle_commit(&mut self, msg: Commit) -> Result<()> { let Commit { order_id } = msg; @@ -198,61 +187,78 @@ where .await?; Ok(()) } -} -#[xtra_productivity] -impl Actor { - async fn handle_propose_settlement(&mut self, msg: ProposeSettlement) -> Result<()> { + async fn handle_propose_settlement( + &mut self, + msg: ProposeSettlement, + ctx: &mut xtra::Context, + ) -> Result<()> { let ProposeSettlement { order_id, current_price, } = msg; + let disconnected = self + .collab_settlement_actors + .get_disconnected(order_id) + .with_context(|| format!("Settlement for order {} is already in progress", order_id))?; + let mut conn = self.db.acquire().await?; let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - if !cfd.is_collaborative_settle_possible() { - anyhow::bail!( - "Settlement proposal not possible because for cfd {} is in state {} which cannot be collaboratively settled", - order_id, - cfd.state - ) - } + let this = ctx + .address() + .expect("actor to be able to give address to itself"); + let (addr, fut) = collab_settlement_taker::Actor::new( + cfd, + self.projection_actor.clone(), + this, + current_price, + self.conn_actor.clone(), + self.n_payouts, + )? + .create(None) + .run(); - let proposal = cfd.calculate_settlement(current_price, self.n_payouts)?; + disconnected.insert(addr); + self.tasks.add(fut); - if self - .current_pending_proposals - .contains_key(&proposal.order_id) - { - anyhow::bail!( - "Settlement proposal for order id {} already present", - order_id - ) - } + Ok(()) + } - let new_proposal = UpdateCfdProposal::Settlement { - proposal: proposal.clone(), - direction: SettlementKind::Outgoing, + async fn handle_settlement_completed( + &mut self, + msg: collab_settlement_taker::Completed, + ) -> Result<()> { + let (order_id, settlement) = match msg { + collab_settlement_taker::Completed::Confirmed { + order_id, + settlement, + } => (order_id, settlement), + collab_settlement_taker::Completed::Rejected { .. } => { + return Ok(()); + } + collab_settlement_taker::Completed::Failed { order_id, error } => { + tracing::warn!(%order_id, "Collaborative settlement failed: {:#}", error); + return Ok(()); + } }; + let settlement_txid = settlement.tx.txid(); - self.current_pending_proposals - .insert(proposal.order_id, new_proposal.clone()); - self.projection_actor - .send(try_into_update_settlement_proposal(new_proposal)?) - .await?; + let mut conn = self.db.acquire().await?; + let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + let dlc = cfd.dlc().context("No DLC in CFD")?; - self.conn_actor - .send(wire::TakerToMaker::Settlement { - order_id: proposal.order_id, - msg: wire::taker_to_maker::Settlement::Propose { - timestamp: proposal.timestamp, - taker: proposal.taker, - maker: proposal.maker, - price: proposal.price, - }, + cfd.handle_proposal_signed(settlement)?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; + + self.monitor_actor + .send(monitor::CollaborativeSettlement { + order_id, + tx: (settlement_txid, dlc.script_pubkey_for(Role::Taker)), }) .await?; + Ok(()) } } @@ -263,14 +269,6 @@ where + xtra::Handler + xtra::Handler, { - async fn handle_settlement_rejected(&mut self, order_id: OrderId) -> Result<()> { - tracing::info!(%order_id, "Settlement proposal got rejected"); - - self.remove_pending_proposal(&order_id).await?; - - Ok(()) - } - async fn handle_roll_over_rejected(&mut self, order_id: OrderId) -> Result<()> { tracing::info!(%order_id, "Roll over proposal got rejected"); @@ -629,59 +627,6 @@ where } } -impl Actor -where - M: xtra::Handler, - W: xtra::Handler - + xtra::Handler - + xtra::Handler, -{ - async fn handle_settlement_accepted( - &mut self, - order_id: OrderId, - _ctx: &mut Context, - ) -> Result<()> { - tracing::info!(%order_id, "Settlement proposal got accepted"); - - let mut conn = self.db.acquire().await?; - - let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - let dlc = cfd.open_dlc().context("CFD was in wrong state")?; - - let proposal = self.get_settlement_proposal(order_id)?; - let (tx, sig_taker) = dlc.close_transaction(proposal)?; - - // Need to use `do_send_async` here because this handler is called in - // context of a message arriving over the wire, and would result in a - // deadlock otherwise. - #[allow(clippy::disallowed_method)] - self.conn_actor - .do_send_async(wire::TakerToMaker::Settlement { - order_id, - msg: wire::taker_to_maker::Settlement::Initiate { sig_taker }, - }) - .await?; - - cfd.handle_proposal_signed(CollaborativeSettlement::new( - tx.clone(), - dlc.script_pubkey_for(cfd.role()), - proposal.price, - )?)?; - append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; - - self.remove_pending_proposal(&order_id).await?; - - self.monitor_actor - .send(monitor::CollaborativeSettlement { - order_id, - tx: (tx.txid(), dlc.script_pubkey_for(Role::Taker)), - }) - .await?; - - Ok(()) - } -} - #[async_trait] impl Handler for Actor where @@ -700,7 +645,6 @@ impl Handler for Actor, O: xtra::Handler + xtra::Handler, - M: xtra::Handler, W: xtra::Handler + xtra::Handler + xtra::Handler, @@ -710,18 +654,6 @@ where wire::MakerToTaker::CurrentOrder(current_order) => { log_error!(self.handle_new_order(current_order)) } - wire::MakerToTaker::Settlement { - order_id, - msg: wire::maker_to_taker::Settlement::Confirm, - } => { - log_error!(self.handle_settlement_accepted(order_id, ctx)) - } - wire::MakerToTaker::Settlement { - order_id, - msg: wire::maker_to_taker::Settlement::Reject, - } => { - log_error!(self.handle_settlement_rejected(order_id)) - } wire::MakerToTaker::ConfirmRollOver { order_id, oracle_event_id, @@ -743,6 +675,9 @@ where | wire::MakerToTaker::InvalidOrderId(_) => { unreachable!("These messages should be sent to the `setup_taker::Actor`") } + wire::MakerToTaker::Settlement { .. } => { + unreachable!("These messages should be sent to the `collab_settlement::Actor`") + } } } }