Browse Source

Move collab. settlement protocol into dedicated actor for taker

no-buy-button-while-setting-up-cfd
Thomas Eizinger 3 years ago
parent
commit
d75e73da9f
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 65
      daemon/src/address_map.rs
  2. 194
      daemon/src/collab_settlement_taker.rs
  3. 56
      daemon/src/connection.rs
  4. 1
      daemon/src/lib.rs
  5. 197
      daemon/src/taker_cfd.rs

65
daemon/src/address_map.rs

@ -1,6 +1,7 @@
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::collections::HashMap; use std::collections::HashMap;
use std::hash::Hash; use std::hash::Hash;
use xtra::{Address, Handler, Message};
pub struct AddressMap<K, A> { pub struct AddressMap<K, A> {
inner: HashMap<K, xtra::Address<A>>, inner: HashMap<K, xtra::Address<A>>,
@ -27,6 +28,46 @@ where
Ok(Disconnected { entry }) Ok(Disconnected { entry })
} }
/// Garbage-collect an address that is no longer active.
pub fn gc(&mut self, stopping: Stopping<A>) {
self.inner.retain(|_, candidate| stopping.me != *candidate);
}
pub fn insert(&mut self, key: K, address: Address<A>) {
self.inner.insert(key, address);
}
/// Sends a message to the actor stored with the given key.
pub async fn send<M>(&self, key: &K, msg: M) -> bool
where
M: Message<Result = ()>,
A: Handler<M>,
{
match self.inner.get(key) {
Some(addr) if addr.is_connected() => {
addr.send(msg)
.await
.expect("we checked that we are connected");
true
}
Some(_) => false,
None => false,
}
}
}
/// A message to notify that an actor instance is stopping.
pub struct Stopping<A> {
pub me: Address<A>,
}
impl<A> Message for Stopping<A>
where
A: 'static,
{
type Result = ();
} }
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
@ -49,3 +90,27 @@ impl<'a, K, A> Disconnected<'a, K, A> {
}; };
} }
} }
#[cfg(test)]
mod tests {
use super::*;
use xtra::Context;
#[test]
fn gc_removes_address() {
let (addr_1, _) = Context::<Dummy>::new(None);
let (addr_2, _) = Context::<Dummy>::new(None);
let mut map = AddressMap::default();
map.insert("foo", addr_1);
map.insert("bar", addr_2.clone());
map.gc(Stopping { me: addr_2 });
assert_eq!(map.inner.len(), 1);
assert!(map.inner.get("foo").is_some());
}
struct Dummy;
impl xtra::Actor for Dummy {}
}

194
daemon/src/collab_settlement_taker.rs

@ -0,0 +1,194 @@
use crate::address_map::Stopping;
use crate::model::cfd::{
Cfd, CollaborativeSettlement, OrderId, SettlementKind, SettlementProposal,
};
use crate::model::Price;
use crate::{connection, projection, wire};
use anyhow::{Context, Result};
use async_trait::async_trait;
use xtra::prelude::MessageChannel;
use xtra_productivity::xtra_productivity;
pub struct Actor {
cfd: Cfd,
projection: xtra::Address<projection::Actor>,
on_completed: Box<dyn MessageChannel<Completed>>,
connection: xtra::Address<connection::Actor>,
proposal: SettlementProposal,
}
impl Actor {
pub fn new(
cfd: Cfd,
projection: xtra::Address<projection::Actor>,
on_completed: impl MessageChannel<Completed> + 'static,
current_price: Price,
connection: xtra::Address<connection::Actor>,
n_payouts: usize,
) -> Result<Self> {
let proposal = cfd.calculate_settlement(current_price, n_payouts)?;
Ok(Self {
cfd,
projection,
on_completed: Box::new(on_completed),
connection,
proposal,
})
}
async fn propose(&mut self, this: xtra::Address<Self>) -> Result<()> {
if !self.cfd.is_collaborative_settle_possible() {
anyhow::bail!(
"Settlement proposal not possible because for cfd {} is in state {} which cannot be collaboratively settled",
self.cfd.order.id,
self.cfd.state
)
}
self.connection
.send(connection::ProposeSettlement {
timestamp: self.proposal.timestamp,
taker: self.proposal.taker,
maker: self.proposal.maker,
price: self.proposal.price,
address: this,
order_id: self.cfd.order.id,
})
.await??;
self.update_proposal(Some((self.proposal.clone(), SettlementKind::Outgoing)))
.await?;
Ok(())
}
async fn handle_confirmed(&mut self) -> Result<CollaborativeSettlement> {
let order_id = self.cfd.order.id;
tracing::info!(%order_id, "Settlement proposal got accepted");
self.update_proposal(None).await?;
let dlc = self.cfd.dlc().context("No DLC in CFD")?;
let (tx, sig) = dlc.close_transaction(&self.proposal)?;
// Need to use `do_send_async` here because this handler is called in
// context of a message arriving over the wire, and would result in a
// deadlock otherwise.
#[allow(clippy::disallowed_method)]
self.connection
.do_send_async(wire::TakerToMaker::Settlement {
order_id,
msg: wire::taker_to_maker::Settlement::Initiate { sig_taker: sig },
})
.await?;
Ok(CollaborativeSettlement::new(
tx,
dlc.script_pubkey_for(self.cfd.role()), // TODO: Hardcode role to Taker?
self.proposal.price,
)?)
}
async fn handle_rejected(&mut self) -> Result<()> {
let order_id = self.cfd.order.id;
tracing::info!(%order_id, "Settlement proposal got rejected");
self.update_proposal(None).await?;
Ok(())
}
async fn update_proposal(
&mut self,
proposal: Option<(SettlementProposal, SettlementKind)>,
) -> Result<()> {
self.projection
.send(projection::UpdateSettlementProposal {
order: self.cfd.order.id,
proposal,
})
.await?;
Ok(())
}
async fn complete(&mut self, completed: Completed, ctx: &mut xtra::Context<Self>) {
let _ = self.on_completed.send(completed).await;
ctx.stop();
}
}
#[async_trait]
impl xtra::Actor for Actor {
async fn started(&mut self, ctx: &mut xtra::Context<Self>) {
let this = ctx.address().expect("get address to ourselves");
if let Err(e) = self.propose(this).await {
self.complete(
Completed::Failed {
order_id: self.cfd.order.id,
error: e,
},
ctx,
)
.await;
}
}
async fn stopping(&mut self, ctx: &mut xtra::Context<Self>) -> xtra::KeepRunning {
// inform the connection actor that we stopping so it can GC the address from the hashmap
let me = ctx.address().expect("we are still alive");
let _ = self.connection.send(Stopping { me }).await;
xtra::KeepRunning::StopAll
}
}
pub enum Completed {
Confirmed {
order_id: OrderId,
settlement: CollaborativeSettlement,
},
Rejected {
order_id: OrderId,
},
Failed {
order_id: OrderId,
error: anyhow::Error,
},
}
#[xtra_productivity]
impl Actor {
async fn handle(
&mut self,
msg: wire::maker_to_taker::Settlement,
ctx: &mut xtra::Context<Self>,
) {
let order_id = self.cfd.order.id;
let completed = match msg {
wire::maker_to_taker::Settlement::Confirm => match self.handle_confirmed().await {
Ok(settlement) => Completed::Confirmed {
settlement,
order_id,
},
Err(e) => Completed::Failed { error: e, order_id },
},
wire::maker_to_taker::Settlement::Reject => {
if let Err(e) = self.handle_rejected().await {
Completed::Failed { error: e, order_id }
} else {
Completed::Rejected { order_id }
}
}
};
self.complete(completed, ctx).await;
}
}

56
daemon/src/connection.rs

@ -1,8 +1,10 @@
use crate::address_map::{AddressMap, Stopping};
use crate::model::cfd::OrderId; use crate::model::cfd::OrderId;
use crate::model::Usd; use crate::model::{Price, Timestamp, Usd};
use crate::tokio_ext::FutureExt; use crate::tokio_ext::FutureExt;
use crate::{log_error, noise, send_to_socket, setup_taker, wire, Tasks}; use crate::{collab_settlement_taker, log_error, noise, send_to_socket, setup_taker, wire, Tasks};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use bdk::bitcoin::Amount;
use futures::StreamExt; use futures::StreamExt;
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
@ -34,6 +36,7 @@ pub struct Actor {
connect_timeout: Duration, connect_timeout: Duration,
connected_state: Option<ConnectedState>, connected_state: Option<ConnectedState>,
setup_actors: HashMap<OrderId, xtra::Address<setup_taker::Actor>>, setup_actors: HashMap<OrderId, xtra::Address<setup_taker::Actor>>,
collab_settlement_actors: AddressMap<OrderId, collab_settlement_taker::Actor>,
} }
pub struct Connect { pub struct Connect {
@ -67,6 +70,15 @@ pub struct TakeOrder {
pub address: xtra::Address<setup_taker::Actor>, pub address: xtra::Address<setup_taker::Actor>,
} }
pub struct ProposeSettlement {
pub order_id: OrderId,
pub timestamp: Timestamp,
pub taker: Amount,
pub maker: Amount,
pub price: Price,
pub address: xtra::Address<collab_settlement_taker::Actor>,
}
impl Actor { impl Actor {
pub fn new( pub fn new(
status_sender: watch::Sender<ConnectionStatus>, status_sender: watch::Sender<ConnectionStatus>,
@ -87,6 +99,7 @@ impl Actor {
connected_state: None, connected_state: None,
setup_actors: HashMap::new(), setup_actors: HashMap::new(),
connect_timeout, connect_timeout,
collab_settlement_actors: AddressMap::default(),
} }
} }
} }
@ -96,6 +109,13 @@ impl Actor {
async fn handle_taker_to_maker(&mut self, message: wire::TakerToMaker) { async fn handle_taker_to_maker(&mut self, message: wire::TakerToMaker) {
log_error!(self.send_to_maker.send(message)); log_error!(self.send_to_maker.send(message));
} }
async fn handle_collab_settlement_actor_stopping(
&mut self,
message: Stopping<collab_settlement_taker::Actor>,
) {
self.collab_settlement_actors.gc(message);
}
} }
#[xtra_productivity] #[xtra_productivity]
@ -112,6 +132,33 @@ impl Actor {
Ok(()) Ok(())
} }
async fn handle_propose_settlement(&mut self, msg: ProposeSettlement) -> Result<()> {
let ProposeSettlement {
order_id,
timestamp,
taker,
maker,
price,
address,
} = msg;
self.send_to_maker
.send(wire::TakerToMaker::Settlement {
order_id,
msg: wire::taker_to_maker::Settlement::Propose {
timestamp,
taker,
maker,
price,
},
})
.await?;
self.collab_settlement_actors.insert(order_id, address);
Ok(())
}
} }
#[xtra_productivity] #[xtra_productivity]
@ -233,6 +280,11 @@ impl Actor {
} }
} }
} }
wire::MakerToTaker::Settlement { order_id, msg } => {
if !self.collab_settlement_actors.send(&order_id, msg).await {
tracing::warn!(%order_id, "No active collaborative settlement");
}
}
other => { other => {
// this one should go to the taker cfd actor // this one should go to the taker cfd actor
log_error!(self.maker_to_taker.send(other)); log_error!(self.maker_to_taker.send(other));

1
daemon/src/lib.rs

@ -24,6 +24,7 @@ pub mod address_map;
pub mod auth; pub mod auth;
pub mod bitmex_price_feed; pub mod bitmex_price_feed;
pub mod cfd_actors; pub mod cfd_actors;
pub mod collab_settlement_taker;
pub mod connection; pub mod connection;
pub mod db; pub mod db;
pub mod fan_out; pub mod fan_out;

197
daemon/src/taker_cfd.rs

@ -2,21 +2,20 @@ use crate::address_map::AddressMap;
use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_send_to_feed}; use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_send_to_feed};
use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id};
use crate::model::cfd::{ use crate::model::cfd::{
Cfd, CfdState, CfdStateCommon, CollaborativeSettlement, Completed, Dlc, Order, OrderId, Origin, Cfd, CfdState, CfdStateCommon, Completed, Dlc, Order, OrderId, Origin, Role, RollOverProposal,
Role, RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal, SettlementKind, UpdateCfdProposal, UpdateCfdProposals,
UpdateCfdProposals,
}; };
use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd}; use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd};
use crate::monitor::{self, MonitorParams}; use crate::monitor::{self, MonitorParams};
use crate::projection::{ use crate::projection::{
try_into_update_rollover_proposal, try_into_update_settlement_proposal, UpdateRollOverProposal, try_into_update_rollover_proposal, UpdateRollOverProposal, UpdateSettlementProposal,
UpdateSettlementProposal,
}; };
use crate::setup_contract::RolloverParams; use crate::setup_contract::RolloverParams;
use crate::tokio_ext::FutureExt; use crate::tokio_ext::FutureExt;
use crate::wire::RollOverMsg; use crate::wire::RollOverMsg;
use crate::{ use crate::{
connection, log_error, oracle, projection, setup_contract, setup_taker, wallet, wire, Tasks, collab_settlement_taker, connection, log_error, oracle, projection, setup_contract,
setup_taker, wallet, wire, Tasks,
}; };
use anyhow::{bail, Context as _, Result}; use anyhow::{bail, Context as _, Result};
use async_trait::async_trait; use async_trait::async_trait;
@ -68,6 +67,7 @@ pub struct Actor<O, M, W> {
conn_actor: Address<connection::Actor>, conn_actor: Address<connection::Actor>,
monitor_actor: Address<M>, monitor_actor: Address<M>,
setup_actors: AddressMap<OrderId, setup_taker::Actor>, setup_actors: AddressMap<OrderId, setup_taker::Actor>,
collab_settlement_actors: AddressMap<OrderId, collab_settlement_taker::Actor>,
roll_over_state: RollOverState, roll_over_state: RollOverState,
oracle_actor: Address<O>, oracle_actor: Address<O>,
current_pending_proposals: UpdateCfdProposals, current_pending_proposals: UpdateCfdProposals,
@ -104,6 +104,7 @@ where
current_pending_proposals: HashMap::new(), current_pending_proposals: HashMap::new(),
n_payouts, n_payouts,
setup_actors: AddressMap::default(), setup_actors: AddressMap::default(),
collab_settlement_actors: AddressMap::default(),
tasks: Tasks::default(), tasks: Tasks::default(),
} }
} }
@ -138,25 +139,13 @@ impl<O, M, W> Actor<O, M, W> {
} }
Ok(()) Ok(())
} }
fn get_settlement_proposal(&self, order_id: OrderId) -> Result<&SettlementProposal> {
match self
.current_pending_proposals
.get(&order_id)
.context("have a proposal that is about to be accepted")?
{
UpdateCfdProposal::Settlement { proposal, .. } => Ok(proposal),
UpdateCfdProposal::RollOverProposal { .. } => {
anyhow::bail!("did not expect a rollover proposal");
}
}
}
} }
#[xtra_productivity] #[xtra_productivity]
impl<O, M, W> Actor<O, M, W> impl<O, M, W> Actor<O, M, W>
where where
W: xtra::Handler<wallet::TryBroadcastTransaction>, W: xtra::Handler<wallet::TryBroadcastTransaction>,
M: xtra::Handler<monitor::CollaborativeSettlement>,
{ {
async fn handle_commit(&mut self, msg: Commit) -> Result<()> { async fn handle_commit(&mut self, msg: Commit) -> Result<()> {
let Commit { order_id } = msg; let Commit { order_id } = msg;
@ -198,61 +187,78 @@ where
.await?; .await?;
Ok(()) Ok(())
} }
}
#[xtra_productivity] async fn handle_propose_settlement(
impl<O, M, W> Actor<O, M, W> { &mut self,
async fn handle_propose_settlement(&mut self, msg: ProposeSettlement) -> Result<()> { msg: ProposeSettlement,
ctx: &mut xtra::Context<Self>,
) -> Result<()> {
let ProposeSettlement { let ProposeSettlement {
order_id, order_id,
current_price, current_price,
} = msg; } = msg;
let disconnected = self
.collab_settlement_actors
.get_disconnected(order_id)
.with_context(|| format!("Settlement for order {} is already in progress", order_id))?;
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
if !cfd.is_collaborative_settle_possible() { let this = ctx
anyhow::bail!( .address()
"Settlement proposal not possible because for cfd {} is in state {} which cannot be collaboratively settled", .expect("actor to be able to give address to itself");
order_id, let (addr, fut) = collab_settlement_taker::Actor::new(
cfd.state cfd,
) self.projection_actor.clone(),
} this,
current_price,
self.conn_actor.clone(),
self.n_payouts,
)?
.create(None)
.run();
let proposal = cfd.calculate_settlement(current_price, self.n_payouts)?; disconnected.insert(addr);
self.tasks.add(fut);
if self Ok(())
.current_pending_proposals }
.contains_key(&proposal.order_id)
{
anyhow::bail!(
"Settlement proposal for order id {} already present",
order_id
)
}
let new_proposal = UpdateCfdProposal::Settlement { async fn handle_settlement_completed(
proposal: proposal.clone(), &mut self,
direction: SettlementKind::Outgoing, msg: collab_settlement_taker::Completed,
) -> Result<()> {
let (order_id, settlement) = match msg {
collab_settlement_taker::Completed::Confirmed {
order_id,
settlement,
} => (order_id, settlement),
collab_settlement_taker::Completed::Rejected { .. } => {
return Ok(());
}
collab_settlement_taker::Completed::Failed { order_id, error } => {
tracing::warn!(%order_id, "Collaborative settlement failed: {:#}", error);
return Ok(());
}
}; };
let settlement_txid = settlement.tx.txid();
self.current_pending_proposals let mut conn = self.db.acquire().await?;
.insert(proposal.order_id, new_proposal.clone()); let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
self.projection_actor let dlc = cfd.dlc().context("No DLC in CFD")?;
.send(try_into_update_settlement_proposal(new_proposal)?)
.await?;
self.conn_actor cfd.handle_proposal_signed(settlement)?;
.send(wire::TakerToMaker::Settlement { append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
order_id: proposal.order_id,
msg: wire::taker_to_maker::Settlement::Propose { self.monitor_actor
timestamp: proposal.timestamp, .send(monitor::CollaborativeSettlement {
taker: proposal.taker, order_id,
maker: proposal.maker, tx: (settlement_txid, dlc.script_pubkey_for(Role::Taker)),
price: proposal.price,
},
}) })
.await?; .await?;
Ok(()) Ok(())
} }
} }
@ -263,14 +269,6 @@ where
+ xtra::Handler<wallet::Sign> + xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>, + xtra::Handler<wallet::BuildPartyParams>,
{ {
async fn handle_settlement_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::info!(%order_id, "Settlement proposal got rejected");
self.remove_pending_proposal(&order_id).await?;
Ok(())
}
async fn handle_roll_over_rejected(&mut self, order_id: OrderId) -> Result<()> { async fn handle_roll_over_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::info!(%order_id, "Roll over proposal got rejected"); tracing::info!(%order_id, "Roll over proposal got rejected");
@ -629,59 +627,6 @@ where
} }
} }
impl<O: 'static, M: 'static, W: 'static> Actor<O, M, W>
where
M: xtra::Handler<monitor::CollaborativeSettlement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>,
{
async fn handle_settlement_accepted(
&mut self,
order_id: OrderId,
_ctx: &mut Context<Self>,
) -> Result<()> {
tracing::info!(%order_id, "Settlement proposal got accepted");
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let dlc = cfd.open_dlc().context("CFD was in wrong state")?;
let proposal = self.get_settlement_proposal(order_id)?;
let (tx, sig_taker) = dlc.close_transaction(proposal)?;
// Need to use `do_send_async` here because this handler is called in
// context of a message arriving over the wire, and would result in a
// deadlock otherwise.
#[allow(clippy::disallowed_method)]
self.conn_actor
.do_send_async(wire::TakerToMaker::Settlement {
order_id,
msg: wire::taker_to_maker::Settlement::Initiate { sig_taker },
})
.await?;
cfd.handle_proposal_signed(CollaborativeSettlement::new(
tx.clone(),
dlc.script_pubkey_for(cfd.role()),
proposal.price,
)?)?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
self.remove_pending_proposal(&order_id).await?;
self.monitor_actor
.send(monitor::CollaborativeSettlement {
order_id,
tx: (tx.txid(), dlc.script_pubkey_for(Role::Taker)),
})
.await?;
Ok(())
}
}
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static, W: 'static> Handler<TakeOffer> for Actor<O, M, W> impl<O: 'static, M: 'static, W: 'static> Handler<TakeOffer> for Actor<O, M, W>
where where
@ -700,7 +645,6 @@ impl<O: 'static, M: 'static, W: 'static> Handler<wire::MakerToTaker> for Actor<O
where where
Self: xtra::Handler<CfdRollOverCompleted>, Self: xtra::Handler<CfdRollOverCompleted>,
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>, O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::CollaborativeSettlement>,
W: xtra::Handler<wallet::TryBroadcastTransaction> W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign> + xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>, + xtra::Handler<wallet::BuildPartyParams>,
@ -710,18 +654,6 @@ where
wire::MakerToTaker::CurrentOrder(current_order) => { wire::MakerToTaker::CurrentOrder(current_order) => {
log_error!(self.handle_new_order(current_order)) log_error!(self.handle_new_order(current_order))
} }
wire::MakerToTaker::Settlement {
order_id,
msg: wire::maker_to_taker::Settlement::Confirm,
} => {
log_error!(self.handle_settlement_accepted(order_id, ctx))
}
wire::MakerToTaker::Settlement {
order_id,
msg: wire::maker_to_taker::Settlement::Reject,
} => {
log_error!(self.handle_settlement_rejected(order_id))
}
wire::MakerToTaker::ConfirmRollOver { wire::MakerToTaker::ConfirmRollOver {
order_id, order_id,
oracle_event_id, oracle_event_id,
@ -743,6 +675,9 @@ where
| wire::MakerToTaker::InvalidOrderId(_) => { | wire::MakerToTaker::InvalidOrderId(_) => {
unreachable!("These messages should be sent to the `setup_taker::Actor`") unreachable!("These messages should be sent to the `setup_taker::Actor`")
} }
wire::MakerToTaker::Settlement { .. } => {
unreachable!("These messages should be sent to the `collab_settlement::Actor`")
}
} }
} }
} }

Loading…
Cancel
Save