diff --git a/daemon/src/address_map.rs b/daemon/src/address_map.rs index deb7515..7723da0 100644 --- a/daemon/src/address_map.rs +++ b/daemon/src/address_map.rs @@ -1,6 +1,7 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::hash::Hash; +use xtra::{Address, Handler, Message}; pub struct AddressMap { inner: HashMap>, @@ -27,6 +28,46 @@ where Ok(Disconnected { entry }) } + + /// Garbage-collect an address that is no longer active. + pub fn gc(&mut self, stopping: Stopping) { + self.inner.retain(|_, candidate| stopping.me != *candidate); + } + + pub fn insert(&mut self, key: K, address: Address) { + self.inner.insert(key, address); + } + + /// Sends a message to the actor stored with the given key. + pub async fn send(&self, key: &K, msg: M) -> bool + where + M: Message, + A: Handler, + { + match self.inner.get(key) { + Some(addr) if addr.is_connected() => { + addr.send(msg) + .await + .expect("we checked that we are connected"); + + true + } + Some(_) => false, + None => false, + } + } +} + +/// A message to notify that an actor instance is stopping. +pub struct Stopping { + pub me: Address, +} + +impl Message for Stopping +where + A: 'static, +{ + type Result = (); } #[derive(thiserror::Error, Debug)] @@ -49,3 +90,27 @@ impl<'a, K, A> Disconnected<'a, K, A> { }; } } + +#[cfg(test)] +mod tests { + use super::*; + use xtra::Context; + + #[test] + fn gc_removes_address() { + let (addr_1, _) = Context::::new(None); + let (addr_2, _) = Context::::new(None); + let mut map = AddressMap::default(); + map.insert("foo", addr_1); + map.insert("bar", addr_2.clone()); + + map.gc(Stopping { me: addr_2 }); + + assert_eq!(map.inner.len(), 1); + assert!(map.inner.get("foo").is_some()); + } + + struct Dummy; + + impl xtra::Actor for Dummy {} +} diff --git a/daemon/src/collab_settlement_taker.rs b/daemon/src/collab_settlement_taker.rs new file mode 100644 index 0000000..b39836b --- /dev/null +++ b/daemon/src/collab_settlement_taker.rs @@ -0,0 +1,194 @@ +use crate::address_map::Stopping; +use crate::model::cfd::{ + Cfd, CollaborativeSettlement, OrderId, SettlementKind, SettlementProposal, +}; +use crate::model::Price; +use crate::{connection, projection, wire}; +use anyhow::{Context, Result}; +use async_trait::async_trait; +use xtra::prelude::MessageChannel; +use xtra_productivity::xtra_productivity; + +pub struct Actor { + cfd: Cfd, + projection: xtra::Address, + on_completed: Box>, + connection: xtra::Address, + proposal: SettlementProposal, +} + +impl Actor { + pub fn new( + cfd: Cfd, + projection: xtra::Address, + on_completed: impl MessageChannel + 'static, + current_price: Price, + connection: xtra::Address, + n_payouts: usize, + ) -> Result { + let proposal = cfd.calculate_settlement(current_price, n_payouts)?; + + Ok(Self { + cfd, + projection, + on_completed: Box::new(on_completed), + connection, + proposal, + }) + } + + async fn propose(&mut self, this: xtra::Address) -> Result<()> { + if !self.cfd.is_collaborative_settle_possible() { + anyhow::bail!( + "Settlement proposal not possible because for cfd {} is in state {} which cannot be collaboratively settled", + self.cfd.order.id, + self.cfd.state + ) + } + + self.connection + .send(connection::ProposeSettlement { + timestamp: self.proposal.timestamp, + taker: self.proposal.taker, + maker: self.proposal.maker, + price: self.proposal.price, + address: this, + order_id: self.cfd.order.id, + }) + .await??; + + self.update_proposal(Some((self.proposal.clone(), SettlementKind::Outgoing))) + .await?; + + Ok(()) + } + + async fn handle_confirmed(&mut self) -> Result { + let order_id = self.cfd.order.id; + + tracing::info!(%order_id, "Settlement proposal got accepted"); + + self.update_proposal(None).await?; + + let dlc = self.cfd.dlc().context("No DLC in CFD")?; + + let (tx, sig) = dlc.close_transaction(&self.proposal)?; + + // Need to use `do_send_async` here because this handler is called in + // context of a message arriving over the wire, and would result in a + // deadlock otherwise. + #[allow(clippy::disallowed_method)] + self.connection + .do_send_async(wire::TakerToMaker::Settlement { + order_id, + msg: wire::taker_to_maker::Settlement::Initiate { sig_taker: sig }, + }) + .await?; + + Ok(CollaborativeSettlement::new( + tx, + dlc.script_pubkey_for(self.cfd.role()), // TODO: Hardcode role to Taker? + self.proposal.price, + )?) + } + + async fn handle_rejected(&mut self) -> Result<()> { + let order_id = self.cfd.order.id; + + tracing::info!(%order_id, "Settlement proposal got rejected"); + + self.update_proposal(None).await?; + + Ok(()) + } + + async fn update_proposal( + &mut self, + proposal: Option<(SettlementProposal, SettlementKind)>, + ) -> Result<()> { + self.projection + .send(projection::UpdateSettlementProposal { + order: self.cfd.order.id, + proposal, + }) + .await?; + + Ok(()) + } + + async fn complete(&mut self, completed: Completed, ctx: &mut xtra::Context) { + let _ = self.on_completed.send(completed).await; + + ctx.stop(); + } +} + +#[async_trait] +impl xtra::Actor for Actor { + async fn started(&mut self, ctx: &mut xtra::Context) { + let this = ctx.address().expect("get address to ourselves"); + + if let Err(e) = self.propose(this).await { + self.complete( + Completed::Failed { + order_id: self.cfd.order.id, + error: e, + }, + ctx, + ) + .await; + } + } + + async fn stopping(&mut self, ctx: &mut xtra::Context) -> xtra::KeepRunning { + // inform the connection actor that we stopping so it can GC the address from the hashmap + let me = ctx.address().expect("we are still alive"); + let _ = self.connection.send(Stopping { me }).await; + + xtra::KeepRunning::StopAll + } +} + +pub enum Completed { + Confirmed { + order_id: OrderId, + settlement: CollaborativeSettlement, + }, + Rejected { + order_id: OrderId, + }, + Failed { + order_id: OrderId, + error: anyhow::Error, + }, +} + +#[xtra_productivity] +impl Actor { + async fn handle( + &mut self, + msg: wire::maker_to_taker::Settlement, + ctx: &mut xtra::Context, + ) { + let order_id = self.cfd.order.id; + + let completed = match msg { + wire::maker_to_taker::Settlement::Confirm => match self.handle_confirmed().await { + Ok(settlement) => Completed::Confirmed { + settlement, + order_id, + }, + Err(e) => Completed::Failed { error: e, order_id }, + }, + wire::maker_to_taker::Settlement::Reject => { + if let Err(e) = self.handle_rejected().await { + Completed::Failed { error: e, order_id } + } else { + Completed::Rejected { order_id } + } + } + }; + + self.complete(completed, ctx).await; + } +} diff --git a/daemon/src/connection.rs b/daemon/src/connection.rs index a602660..795d73e 100644 --- a/daemon/src/connection.rs +++ b/daemon/src/connection.rs @@ -1,8 +1,10 @@ +use crate::address_map::{AddressMap, Stopping}; use crate::model::cfd::OrderId; -use crate::model::Usd; +use crate::model::{Price, Timestamp, Usd}; use crate::tokio_ext::FutureExt; -use crate::{log_error, noise, send_to_socket, setup_taker, wire, Tasks}; +use crate::{collab_settlement_taker, log_error, noise, send_to_socket, setup_taker, wire, Tasks}; use anyhow::{Context, Result}; +use bdk::bitcoin::Amount; use futures::StreamExt; use std::collections::HashMap; use std::net::SocketAddr; @@ -34,6 +36,7 @@ pub struct Actor { connect_timeout: Duration, connected_state: Option, setup_actors: HashMap>, + collab_settlement_actors: AddressMap, } pub struct Connect { @@ -67,6 +70,15 @@ pub struct TakeOrder { pub address: xtra::Address, } +pub struct ProposeSettlement { + pub order_id: OrderId, + pub timestamp: Timestamp, + pub taker: Amount, + pub maker: Amount, + pub price: Price, + pub address: xtra::Address, +} + impl Actor { pub fn new( status_sender: watch::Sender, @@ -87,6 +99,7 @@ impl Actor { connected_state: None, setup_actors: HashMap::new(), connect_timeout, + collab_settlement_actors: AddressMap::default(), } } } @@ -96,6 +109,13 @@ impl Actor { async fn handle_taker_to_maker(&mut self, message: wire::TakerToMaker) { log_error!(self.send_to_maker.send(message)); } + + async fn handle_collab_settlement_actor_stopping( + &mut self, + message: Stopping, + ) { + self.collab_settlement_actors.gc(message); + } } #[xtra_productivity] @@ -112,6 +132,33 @@ impl Actor { Ok(()) } + + async fn handle_propose_settlement(&mut self, msg: ProposeSettlement) -> Result<()> { + let ProposeSettlement { + order_id, + timestamp, + taker, + maker, + price, + address, + } = msg; + + self.send_to_maker + .send(wire::TakerToMaker::Settlement { + order_id, + msg: wire::taker_to_maker::Settlement::Propose { + timestamp, + taker, + maker, + price, + }, + }) + .await?; + + self.collab_settlement_actors.insert(order_id, address); + + Ok(()) + } } #[xtra_productivity] @@ -233,6 +280,11 @@ impl Actor { } } } + wire::MakerToTaker::Settlement { order_id, msg } => { + if !self.collab_settlement_actors.send(&order_id, msg).await { + tracing::warn!(%order_id, "No active collaborative settlement"); + } + } other => { // this one should go to the taker cfd actor log_error!(self.maker_to_taker.send(other)); diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 6e381c3..5d8bdbd 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -24,6 +24,7 @@ pub mod address_map; pub mod auth; pub mod bitmex_price_feed; pub mod cfd_actors; +pub mod collab_settlement_taker; pub mod connection; pub mod db; pub mod fan_out; diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 5fb064b..09ef881 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -187,7 +187,7 @@ impl Actor { .insert(proposal.order_id, (new_proposal.clone(), taker_id)); self.projection_actor .send(try_into_update_rollover_proposal(new_proposal)?) - .await??; + .await?; Ok(()) } @@ -211,7 +211,7 @@ impl Actor { .insert(proposal.order_id, (new_proposal.clone(), taker_id)); self.projection_actor .send(try_into_update_settlement_proposal(new_proposal)?) - .await??; + .await?; Ok(()) } @@ -269,7 +269,7 @@ impl Actor { order: *order_id, proposal: None, }) - .await?? + .await? } UpdateCfdProposal::RollOverProposal { .. } => { self.projection_actor @@ -277,7 +277,7 @@ impl Actor { order: *order_id, proposal: None, }) - .await?? + .await? } } } else { @@ -634,7 +634,10 @@ where .takers .send(maker_inc_connections::TakerMessage { taker_id, - msg: wire::MakerToTaker::ConfirmSettlement(order_id), + msg: wire::MakerToTaker::Settlement { + order_id, + msg: wire::maker_to_taker::Settlement::Confirm, + }, }) .await? { @@ -672,7 +675,10 @@ where self.takers .send(maker_inc_connections::TakerMessage { taker_id, - msg: wire::MakerToTaker::RejectSettlement(order_id), + msg: wire::MakerToTaker::Settlement { + order_id, + msg: wire::maker_to_taker::Settlement::Reject, + }, }) .await??; @@ -1123,12 +1129,15 @@ where wire::TakerToMaker::TakeOrder { order_id, quantity } => { log_error!(self.handle_take_order(taker_id, order_id, quantity)) } - wire::TakerToMaker::ProposeSettlement { + wire::TakerToMaker::Settlement { order_id, - timestamp, - taker, - maker, - price, + msg: + wire::taker_to_maker::Settlement::Propose { + timestamp, + taker, + maker, + price, + }, } => { log_error!(self.handle_propose_settlement( taker_id, @@ -1141,9 +1150,9 @@ where } )) } - wire::TakerToMaker::InitiateSettlement { + wire::TakerToMaker::Settlement { order_id, - sig_taker, + msg: wire::taker_to_maker::Settlement::Initiate { sig_taker }, } => { log_error!(self.handle_initiate_settlement(taker_id, order_id, sig_taker)) } diff --git a/daemon/src/projection.rs b/daemon/src/projection.rs index f5fefd2..841d9f5 100644 --- a/daemon/src/projection.rs +++ b/daemon/src/projection.rs @@ -115,12 +115,12 @@ impl State { temp.into() } - pub fn amend_settlement_proposal(&mut self, proposal: UpdateSettlementProposal) -> Result<()> { + pub fn amend_settlement_proposal(&mut self, proposal: UpdateSettlementProposal) { let order = proposal.order; self.amend_cfd_proposal(order, proposal.into()) } - pub fn amend_rollover_proposal(&mut self, proposal: UpdateRollOverProposal) -> Result<()> { + pub fn amend_rollover_proposal(&mut self, proposal: UpdateRollOverProposal) { let order = proposal.order; self.amend_cfd_proposal(order, proposal.into()) } @@ -133,21 +133,21 @@ impl State { let _ = std::mem::replace(&mut self.cfds, cfds); } - fn amend_cfd_proposal( - &mut self, - order: OrderId, - proposal: Option, - ) -> Result<()> { + fn amend_cfd_proposal(&mut self, order: OrderId, proposal: Option) { if let Some(proposal) = proposal { self.proposals.insert(order, proposal); tracing::trace!(%order, "Cfd proposal got updated"); - } else { - if self.proposals.remove(&order).is_none() { - anyhow::bail!("Could not find proposal with order id: {}", &order) - } - tracing::trace!(%order, "Removed cfd proposal"); - }; - Ok(()) + + return; + } + + if self.proposals.remove(&order).is_none() { + tracing::trace!(%order, "Cannot remove cfd proposal: unknown"); + + return; + } + + tracing::trace!(%order, "Removed cfd proposal"); } } @@ -172,15 +172,13 @@ impl Actor { .connected_takers .send(msg.0.iter().map(|x| x.into()).collect_vec()); } - fn handle(&mut self, msg: UpdateSettlementProposal) -> Result<()> { - self.state.amend_settlement_proposal(msg)?; + fn handle(&mut self, msg: UpdateSettlementProposal) { + self.state.amend_settlement_proposal(msg); let _ = self.tx.cfds.send(self.state.to_cfds()); - Ok(()) } - fn handle(&mut self, msg: UpdateRollOverProposal) -> Result<()> { - self.state.amend_rollover_proposal(msg)?; + fn handle(&mut self, msg: UpdateRollOverProposal) { + self.state.amend_rollover_proposal(msg); let _ = self.tx.cfds.send(self.state.to_cfds()); - Ok(()) } } diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index ddc6a15..7164068 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -2,21 +2,20 @@ use crate::address_map::AddressMap; use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_send_to_feed}; use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; use crate::model::cfd::{ - Cfd, CfdState, CfdStateCommon, CollaborativeSettlement, Completed, Dlc, Order, OrderId, Origin, - Role, RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal, - UpdateCfdProposals, + Cfd, CfdState, CfdStateCommon, Completed, Dlc, Order, OrderId, Origin, Role, RollOverProposal, + SettlementKind, UpdateCfdProposal, UpdateCfdProposals, }; use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd}; use crate::monitor::{self, MonitorParams}; use crate::projection::{ - try_into_update_rollover_proposal, try_into_update_settlement_proposal, UpdateRollOverProposal, - UpdateSettlementProposal, + try_into_update_rollover_proposal, UpdateRollOverProposal, UpdateSettlementProposal, }; use crate::setup_contract::RolloverParams; use crate::tokio_ext::FutureExt; use crate::wire::RollOverMsg; use crate::{ - connection, log_error, oracle, projection, setup_contract, setup_taker, wallet, wire, Tasks, + collab_settlement_taker, connection, log_error, oracle, projection, setup_contract, + setup_taker, wallet, wire, Tasks, }; use anyhow::{bail, Context as _, Result}; use async_trait::async_trait; @@ -68,6 +67,7 @@ pub struct Actor { conn_actor: Address, monitor_actor: Address, setup_actors: AddressMap, + collab_settlement_actors: AddressMap, roll_over_state: RollOverState, oracle_actor: Address, current_pending_proposals: UpdateCfdProposals, @@ -104,6 +104,7 @@ where current_pending_proposals: HashMap::new(), n_payouts, setup_actors: AddressMap::default(), + collab_settlement_actors: AddressMap::default(), tasks: Tasks::default(), } } @@ -122,7 +123,7 @@ impl Actor { order: *order_id, proposal: None, }) - .await?? + .await? } UpdateCfdProposal::RollOverProposal { .. } => { self.projection_actor @@ -130,7 +131,7 @@ impl Actor { order: *order_id, proposal: None, }) - .await?? + .await? } } } else { @@ -138,25 +139,13 @@ impl Actor { } Ok(()) } - - fn get_settlement_proposal(&self, order_id: OrderId) -> Result<&SettlementProposal> { - match self - .current_pending_proposals - .get(&order_id) - .context("have a proposal that is about to be accepted")? - { - UpdateCfdProposal::Settlement { proposal, .. } => Ok(proposal), - UpdateCfdProposal::RollOverProposal { .. } => { - anyhow::bail!("did not expect a rollover proposal"); - } - } - } } #[xtra_productivity] impl Actor where W: xtra::Handler, + M: xtra::Handler, { async fn handle_commit(&mut self, msg: Commit) -> Result<()> { let Commit { order_id } = msg; @@ -188,7 +177,7 @@ where .insert(proposal.order_id, new_proposal.clone()); self.projection_actor .send(try_into_update_rollover_proposal(new_proposal)?) - .await??; + .await?; self.conn_actor .send(wire::TakerToMaker::ProposeRollOver { @@ -198,59 +187,78 @@ where .await?; Ok(()) } -} -#[xtra_productivity] -impl Actor { - async fn handle_propose_settlement(&mut self, msg: ProposeSettlement) -> Result<()> { + async fn handle_propose_settlement( + &mut self, + msg: ProposeSettlement, + ctx: &mut xtra::Context, + ) -> Result<()> { let ProposeSettlement { order_id, current_price, } = msg; + let disconnected = self + .collab_settlement_actors + .get_disconnected(order_id) + .with_context(|| format!("Settlement for order {} is already in progress", order_id))?; + let mut conn = self.db.acquire().await?; let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - if !cfd.is_collaborative_settle_possible() { - anyhow::bail!( - "Settlement proposal not possible because for cfd {} is in state {} which cannot be collaboratively settled", - order_id, - cfd.state - ) - } + let this = ctx + .address() + .expect("actor to be able to give address to itself"); + let (addr, fut) = collab_settlement_taker::Actor::new( + cfd, + self.projection_actor.clone(), + this, + current_price, + self.conn_actor.clone(), + self.n_payouts, + )? + .create(None) + .run(); - let proposal = cfd.calculate_settlement(current_price, self.n_payouts)?; + disconnected.insert(addr); + self.tasks.add(fut); - if self - .current_pending_proposals - .contains_key(&proposal.order_id) - { - anyhow::bail!( - "Settlement proposal for order id {} already present", - order_id - ) - } + Ok(()) + } - let new_proposal = UpdateCfdProposal::Settlement { - proposal: proposal.clone(), - direction: SettlementKind::Outgoing, + async fn handle_settlement_completed( + &mut self, + msg: collab_settlement_taker::Completed, + ) -> Result<()> { + let (order_id, settlement) = match msg { + collab_settlement_taker::Completed::Confirmed { + order_id, + settlement, + } => (order_id, settlement), + collab_settlement_taker::Completed::Rejected { .. } => { + return Ok(()); + } + collab_settlement_taker::Completed::Failed { order_id, error } => { + tracing::warn!(%order_id, "Collaborative settlement failed: {:#}", error); + return Ok(()); + } }; + let settlement_txid = settlement.tx.txid(); - self.current_pending_proposals - .insert(proposal.order_id, new_proposal.clone()); - self.projection_actor - .send(try_into_update_settlement_proposal(new_proposal)?) - .await??; + let mut conn = self.db.acquire().await?; + let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + let dlc = cfd.dlc().context("No DLC in CFD")?; - self.conn_actor - .send(wire::TakerToMaker::ProposeSettlement { - order_id: proposal.order_id, - timestamp: proposal.timestamp, - taker: proposal.taker, - maker: proposal.maker, - price: proposal.price, + cfd.handle_proposal_signed(settlement)?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; + + self.monitor_actor + .send(monitor::CollaborativeSettlement { + order_id, + tx: (settlement_txid, dlc.script_pubkey_for(Role::Taker)), }) .await?; + Ok(()) } } @@ -261,14 +269,6 @@ where + xtra::Handler + xtra::Handler, { - async fn handle_settlement_rejected(&mut self, order_id: OrderId) -> Result<()> { - tracing::info!(%order_id, "Settlement proposal got rejected"); - - self.remove_pending_proposal(&order_id).await?; - - Ok(()) - } - async fn handle_roll_over_rejected(&mut self, order_id: OrderId) -> Result<()> { tracing::info!(%order_id, "Roll over proposal got rejected"); @@ -627,59 +627,6 @@ where } } -impl Actor -where - M: xtra::Handler, - W: xtra::Handler - + xtra::Handler - + xtra::Handler, -{ - async fn handle_settlement_accepted( - &mut self, - order_id: OrderId, - _ctx: &mut Context, - ) -> Result<()> { - tracing::info!(%order_id, "Settlement proposal got accepted"); - - let mut conn = self.db.acquire().await?; - - let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - let dlc = cfd.open_dlc().context("CFD was in wrong state")?; - - let proposal = self.get_settlement_proposal(order_id)?; - let (tx, sig_taker) = dlc.close_transaction(proposal)?; - - // Need to use `do_send_async` here because this handler is called in - // context of a message arriving over the wire, and would result in a - // deadlock otherwise. - #[allow(clippy::disallowed_method)] - self.conn_actor - .do_send_async(wire::TakerToMaker::InitiateSettlement { - order_id, - sig_taker, - }) - .await?; - - cfd.handle_proposal_signed(CollaborativeSettlement::new( - tx.clone(), - dlc.script_pubkey_for(cfd.role()), - proposal.price, - )?)?; - append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; - - self.remove_pending_proposal(&order_id).await?; - - self.monitor_actor - .send(monitor::CollaborativeSettlement { - order_id, - tx: (tx.txid(), dlc.script_pubkey_for(Role::Taker)), - }) - .await?; - - Ok(()) - } -} - #[async_trait] impl Handler for Actor where @@ -698,7 +645,6 @@ impl Handler for Actor, O: xtra::Handler + xtra::Handler, - M: xtra::Handler, W: xtra::Handler + xtra::Handler + xtra::Handler, @@ -708,12 +654,6 @@ where wire::MakerToTaker::CurrentOrder(current_order) => { log_error!(self.handle_new_order(current_order)) } - wire::MakerToTaker::ConfirmSettlement(order_id) => { - log_error!(self.handle_settlement_accepted(order_id, ctx)) - } - wire::MakerToTaker::RejectSettlement(order_id) => { - log_error!(self.handle_settlement_rejected(order_id)) - } wire::MakerToTaker::ConfirmRollOver { order_id, oracle_event_id, @@ -735,6 +675,9 @@ where | wire::MakerToTaker::InvalidOrderId(_) => { unreachable!("These messages should be sent to the `setup_taker::Actor`") } + wire::MakerToTaker::Settlement { .. } => { + unreachable!("These messages should be sent to the `collab_settlement::Actor`") + } } } } diff --git a/daemon/src/wire.rs b/daemon/src/wire.rs index 0158872..b958df0 100644 --- a/daemon/src/wire.rs +++ b/daemon/src/wire.rs @@ -18,6 +18,27 @@ use std::ops::RangeInclusive; use std::sync::{Arc, Mutex}; use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; +pub mod taker_to_maker { + use super::*; + + #[derive(Debug, Serialize, Deserialize)] + #[serde(tag = "type", content = "payload")] + #[allow(clippy::large_enum_variant)] + pub enum Settlement { + Propose { + timestamp: Timestamp, + #[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")] + taker: Amount, + #[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")] + maker: Amount, + price: Price, + }, + Initiate { + sig_taker: Signature, + }, + } +} + #[derive(Debug, Serialize, Deserialize)] #[serde(tag = "type", content = "payload")] #[allow(clippy::large_enum_variant)] @@ -26,19 +47,6 @@ pub enum TakerToMaker { order_id: OrderId, quantity: Usd, }, - ProposeSettlement { - order_id: OrderId, - timestamp: Timestamp, - #[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")] - taker: Amount, - #[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")] - maker: Amount, - price: Price, - }, - InitiateSettlement { - order_id: OrderId, - sig_taker: Signature, - }, ProposeRollOver { order_id: OrderId, timestamp: Timestamp, @@ -48,17 +56,20 @@ pub enum TakerToMaker { msg: SetupMsg, }, RollOverProtocol(RollOverMsg), + Settlement { + order_id: OrderId, + msg: taker_to_maker::Settlement, + }, } impl fmt::Display for TakerToMaker { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { TakerToMaker::TakeOrder { .. } => write!(f, "TakeOrder"), - TakerToMaker::ProposeSettlement { .. } => write!(f, "ProposeSettlement"), - TakerToMaker::InitiateSettlement { .. } => write!(f, "InitiateSettlement"), TakerToMaker::Protocol { .. } => write!(f, "Protocol"), TakerToMaker::ProposeRollOver { .. } => write!(f, "ProposeRollOver"), TakerToMaker::RollOverProtocol(_) => write!(f, "RollOverProtocol"), + TakerToMaker::Settlement { .. } => write!(f, "Settlement"), } } } @@ -72,8 +83,6 @@ pub enum MakerToTaker { CurrentOrder(Option), ConfirmOrder(OrderId), // TODO: Include payout curve in "accept" message from maker RejectOrder(OrderId), - ConfirmSettlement(OrderId), - RejectSettlement(OrderId), InvalidOrderId(OrderId), Protocol { order_id: OrderId, @@ -85,6 +94,21 @@ pub enum MakerToTaker { oracle_event_id: BitMexPriceEventId, }, RejectRollOver(OrderId), + Settlement { + order_id: OrderId, + msg: maker_to_taker::Settlement, + }, +} + +pub mod maker_to_taker { + use super::*; + + #[derive(Debug, Serialize, Deserialize)] + #[serde(tag = "type", content = "payload")] + pub enum Settlement { + Confirm, + Reject, + } } impl fmt::Display for MakerToTaker { @@ -94,13 +118,12 @@ impl fmt::Display for MakerToTaker { MakerToTaker::CurrentOrder(_) => write!(f, "CurrentOrder"), MakerToTaker::ConfirmOrder(_) => write!(f, "ConfirmOrder"), MakerToTaker::RejectOrder(_) => write!(f, "RejectOrder"), - MakerToTaker::ConfirmSettlement(_) => write!(f, "ConfirmSettlement"), - MakerToTaker::RejectSettlement(_) => write!(f, "RejectSettlement"), MakerToTaker::InvalidOrderId(_) => write!(f, "InvalidOrderId"), MakerToTaker::Protocol { .. } => write!(f, "Protocol"), MakerToTaker::ConfirmRollOver { .. } => write!(f, "ConfirmRollOver"), MakerToTaker::RejectRollOver(_) => write!(f, "RejectRollOver"), MakerToTaker::RollOverProtocol(_) => write!(f, "RollOverProtocol"), + MakerToTaker::Settlement { .. } => write!(f, "Settlement"), } } }