Browse Source

Move collab. settlement protocol into dedicated actor for maker

resilient-broadcast
luckysori 3 years ago
committed by Lucas Soriano del Pino
parent
commit
57db5c2eba
No known key found for this signature in database GPG Key ID: 89CE0DB40A19D524
  1. 3
      daemon/src/actor_name.rs
  2. 6
      daemon/src/address_map.rs
  3. 216
      daemon/src/collab_settlement_maker.rs
  4. 3
      daemon/src/collab_settlement_taker.rs
  5. 6
      daemon/src/lib.rs
  6. 278
      daemon/src/maker_cfd.rs
  7. 74
      daemon/src/maker_inc_connections.rs
  8. 3
      daemon/src/rollover_taker.rs
  9. 3
      daemon/src/setup_maker.rs

3
daemon/src/actor_name.rs

@ -1,3 +0,0 @@
pub trait ActorName {
fn actor_name() -> String;
}

6
daemon/src/address_map.rs

@ -3,8 +3,6 @@ use std::collections::HashMap;
use std::hash::Hash;
use xtra::{Address, Handler, Message};
use crate::actor_name::ActorName;
pub struct AddressMap<K, A> {
inner: HashMap<K, xtra::Address<A>>,
}
@ -111,6 +109,10 @@ impl<'a, K, A> Disconnected<'a, K, A> {
}
}
pub trait ActorName {
fn actor_name() -> String;
}
#[cfg(test)]
mod tests {
use super::*;

216
daemon/src/collab_settlement_maker.rs

@ -0,0 +1,216 @@
use crate::address_map::{ActorName, Stopping};
use crate::model::cfd::{
Cfd, CollaborativeSettlement, OrderId, Role, SettlementKind, SettlementProposal,
};
use crate::model::Identity;
use crate::{maker_inc_connections, projection};
use anyhow::Context;
use async_trait::async_trait;
use bdk::bitcoin::Script;
use maia::secp256k1_zkp::Signature;
use xtra::prelude::MessageChannel;
use xtra_productivity::xtra_productivity;
pub struct Actor {
cfd: Cfd,
projection: xtra::Address<projection::Actor>,
on_completed: Box<dyn MessageChannel<Completed>>,
proposal: SettlementProposal,
taker_id: Identity,
connections: Box<dyn MessageChannel<maker_inc_connections::settlement::Response>>,
on_stopping: Vec<Box<dyn MessageChannel<Stopping<Self>>>>,
}
pub enum Completed {
Confirmed {
order_id: OrderId,
settlement: CollaborativeSettlement,
script_pubkey: Script,
},
Rejected {
order_id: OrderId,
},
Failed {
order_id: OrderId,
error: anyhow::Error,
},
}
pub struct Accepted;
pub struct Rejected;
pub struct Initiated {
pub sig_taker: Signature,
}
#[xtra_productivity]
impl Actor {
async fn handle(&mut self, _: Accepted, ctx: &mut xtra::Context<Self>) {
let order_id = self.cfd.order.id;
tracing::info!(%order_id, "Settlement proposal accepted");
self.accept(ctx).await;
self.update_proposal(None).await;
}
async fn handle(&mut self, _: Rejected, ctx: &mut xtra::Context<Self>) {
let order_id = self.cfd.order.id;
tracing::info!(%order_id, "Settlement proposal rejected");
self.reject(ctx).await;
self.update_proposal(None).await;
}
async fn handle(&mut self, msg: Initiated, ctx: &mut xtra::Context<Self>) {
let completed = async {
tracing::info!(
order_id = %self.cfd.order.id,
taker_id = %self.taker_id,
"Received signature for collaborative settlement"
);
let Initiated { sig_taker } = msg;
let dlc = self.cfd.open_dlc().context("CFD was in wrong state")?;
let (tx, sig_maker) = dlc.close_transaction(&self.proposal)?;
let spend_tx = dlc.finalize_spend_transaction((tx, sig_maker), sig_taker)?;
let settlement = CollaborativeSettlement::new(
spend_tx.clone(),
dlc.script_pubkey_for(Role::Maker),
self.proposal.price,
)?;
self.update_proposal(None).await;
anyhow::Ok(Completed::Confirmed {
order_id: self.cfd.order.id,
settlement,
script_pubkey: dlc.script_pubkey_for(Role::Maker),
})
}
.await
.unwrap_or_else(|e| Completed::Failed {
order_id: self.cfd.order.id,
error: e,
});
self.complete(completed, ctx).await;
}
}
#[async_trait]
impl xtra::Actor for Actor {
async fn started(&mut self, _ctx: &mut xtra::Context<Self>) {
tracing::info!(
order_id = %self.proposal.order_id,
price = %self.proposal.price,
"Received settlement proposal"
);
self.update_proposal(Some((self.proposal.clone(), SettlementKind::Incoming)))
.await;
}
async fn stopping(&mut self, ctx: &mut xtra::Context<Self>) -> xtra::KeepRunning {
// inform other actors that we are stopping so that our
// address can be GCd from their AddressMaps
let me = ctx.address().expect("we are still alive");
for channel in self.on_stopping.iter() {
let _ = channel.send(Stopping { me: me.clone() }).await;
}
xtra::KeepRunning::StopAll
}
}
impl Actor {
pub fn new(
cfd: Cfd,
proposal: SettlementProposal,
projection: xtra::Address<projection::Actor>,
on_completed: &(impl MessageChannel<Completed> + 'static),
taker_id: Identity,
connections: &(impl MessageChannel<maker_inc_connections::settlement::Response> + 'static),
(on_stopping0, on_stopping1): (
&(impl MessageChannel<Stopping<Self>> + 'static),
&(impl MessageChannel<Stopping<Self>> + 'static),
),
) -> Self {
Self {
cfd,
projection,
on_completed: on_completed.clone_channel(),
proposal,
taker_id,
connections: connections.clone_channel(),
on_stopping: vec![on_stopping0.clone_channel(), on_stopping1.clone_channel()],
}
}
async fn update_proposal(&mut self, proposal: Option<(SettlementProposal, SettlementKind)>) {
if let Err(e) = self
.projection
.send(projection::UpdateSettlementProposal {
order: self.cfd.order.id,
proposal,
})
.await
{
tracing::warn!(
"Failed to deliver settlement proposal update to projection actor: {:#}",
e
);
};
}
async fn complete(&mut self, completed: Completed, ctx: &mut xtra::Context<Self>) {
let _ = self.on_completed.send(completed).await;
ctx.stop();
}
async fn accept(&mut self, ctx: &mut xtra::Context<Self>) {
let this = ctx.address().expect("self to be alive");
self.inform_taker(
maker_inc_connections::settlement::Decision::Accept { address: this },
ctx,
)
.await
}
async fn reject(&mut self, ctx: &mut xtra::Context<Self>) {
self.inform_taker(maker_inc_connections::settlement::Decision::Reject, ctx)
.await
}
async fn inform_taker(
&mut self,
decision: maker_inc_connections::settlement::Decision,
ctx: &mut xtra::Context<Self>,
) {
let order_id = self.cfd.order.id;
if let Err(e) = self
.connections
.send(maker_inc_connections::settlement::Response {
taker_id: self.taker_id,
order_id,
decision,
})
.await
.context("Failed inform taker about settlement decision")
{
self.complete(Completed::Failed { order_id, error: e }, ctx)
.await;
}
}
}
impl ActorName for Actor {
fn actor_name() -> String {
"Maker collab settlement".to_string()
}
}

3
daemon/src/collab_settlement_taker.rs

@ -1,5 +1,4 @@
use crate::actor_name::ActorName;
use crate::address_map::Stopping;
use crate::address_map::{ActorName, Stopping};
use crate::model::cfd::{
Cfd, CollaborativeSettlement, OrderId, SettlementKind, SettlementProposal,
};

6
daemon/src/lib.rs

@ -21,13 +21,13 @@ use xtra::{Actor, Address};
pub mod sqlx_ext; // Must come first because it is a macro.
pub mod actor_name;
pub mod actors;
pub mod address_map;
pub mod auth;
pub mod bdk_ext;
pub mod bitmex_price_feed;
pub mod cfd_actors;
pub mod collab_settlement_maker;
pub mod collab_settlement_taker;
pub mod connection;
pub mod db;
@ -114,7 +114,9 @@ where
T: xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>
+ xtra::Handler<maker_inc_connections::ConfirmOrder>
+ xtra::Handler<Stopping<setup_maker::Actor>>,
+ xtra::Handler<Stopping<setup_maker::Actor>>
+ xtra::Handler<maker_inc_connections::settlement::Response>
+ xtra::Handler<Stopping<collab_settlement_maker::Actor>>,
W: xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::TryBroadcastTransaction>,

278
daemon/src/maker_cfd.rs

@ -2,21 +2,20 @@ use crate::address_map::{AddressMap, Stopping};
use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_update_feed};
use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id};
use crate::model::cfd::{
Cfd, CfdState, CfdStateCommon, CollaborativeSettlement, Dlc, Order, OrderId, Origin, Role,
RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal,
Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId, Origin, Role, RollOverProposal,
SettlementKind, SettlementProposal, UpdateCfdProposal,
};
use crate::model::{Identity, Price, Timestamp, Usd};
use crate::monitor::MonitorParams;
use crate::projection::{
try_into_update_rollover_proposal, try_into_update_settlement_proposal, Update,
UpdateRollOverProposal, UpdateSettlementProposal,
try_into_update_rollover_proposal, Update, UpdateRollOverProposal, UpdateSettlementProposal,
};
use crate::setup_contract::RolloverParams;
use crate::tokio_ext::FutureExt;
use crate::wire::TakerToMaker;
use crate::{
log_error, maker_inc_connections, monitor, oracle, projection, setup_contract, setup_maker,
wallet, wire, Tasks,
collab_settlement_maker, log_error, maker_inc_connections, monitor, oracle, projection,
setup_contract, setup_maker, wallet, wire, Tasks,
};
use anyhow::{Context as _, Result};
use async_trait::async_trait;
@ -24,7 +23,6 @@ use bdk::bitcoin::secp256k1::schnorrsig;
use futures::channel::mpsc;
use futures::future::RemoteHandle;
use futures::{future, SinkExt};
use maia::secp256k1_zkp::Signature;
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use std::collections::{HashMap, HashSet};
@ -89,11 +87,11 @@ pub struct Actor<O, M, T, W> {
current_order_id: Option<OrderId>,
monitor_actor: Address<M>,
setup_actors: AddressMap<OrderId, setup_maker::Actor>,
settlement_actors: AddressMap<OrderId, collab_settlement_maker::Actor>,
roll_over_state: RollOverState,
oracle_actor: Address<O>,
// Maker needs to also store Identity to be able to send a reply back
current_pending_proposals: HashMap<OrderId, (UpdateCfdProposal, Identity)>,
current_agreed_proposals: HashMap<OrderId, (SettlementProposal, Identity)>,
connected_takers: HashSet<Identity>,
n_payouts: usize,
tasks: Tasks,
@ -134,9 +132,9 @@ impl<O, M, T, W> Actor<O, M, T, W> {
roll_over_state: RollOverState::None,
oracle_actor,
current_pending_proposals: HashMap::new(),
current_agreed_proposals: HashMap::new(),
n_payouts,
connected_takers: HashSet::new(),
settlement_actors: AddressMap::default(),
tasks: Tasks::default(),
}
}
@ -180,30 +178,6 @@ impl<O, M, T, W> Actor<O, M, T, W> {
Ok(())
}
async fn handle_propose_settlement(
&mut self,
taker_id: Identity,
proposal: SettlementProposal,
) -> Result<()> {
tracing::info!(
"Received settlement proposal from the taker: {:?}",
proposal
);
let new_proposal = UpdateCfdProposal::Settlement {
proposal: proposal.clone(),
direction: SettlementKind::Incoming,
};
self.current_pending_proposals
.insert(proposal.order_id, (new_proposal.clone(), taker_id));
self.projection_actor
.send(try_into_update_settlement_proposal(new_proposal)?)
.await?;
Ok(())
}
async fn handle_inc_roll_over_protocol_msg(
&mut self,
taker_id: Identity,
@ -254,29 +228,6 @@ impl<O, M, T, W> Actor<O, M, T, W> {
Ok(())
}
fn get_taker_id_of_proposal(&self, order_id: &OrderId) -> Result<Identity> {
let (_, taker_id) = self
.current_pending_proposals
.get(order_id)
.context("Could not find proposal for given order id")?;
Ok(*taker_id)
}
fn get_settlement_proposal(&self, order_id: OrderId) -> Result<(SettlementProposal, Identity)> {
let (update_proposal, taker_id) = self
.current_pending_proposals
.get(&order_id)
.context("have a proposal that is about to be accepted")?;
let proposal = match update_proposal {
UpdateCfdProposal::Settlement { proposal, .. } => proposal,
UpdateCfdProposal::RollOverProposal { .. } => {
anyhow::bail!("did not expect a rollover proposal");
}
};
Ok((proposal.clone(), *taker_id))
}
async fn update_connected_takers(&mut self) -> Result<()> {
self.projection_actor
.send(Update(
@ -542,6 +493,16 @@ impl<O, M, T, W> Actor<O, M, T, W> {
}
}
#[xtra_productivity(message_impl = false)]
impl<O, M, T, W> Actor<O, M, T, W> {
async fn handle_settlement_actor_stopping(
&mut self,
message: Stopping<collab_settlement_maker::Actor>,
) {
self.settlement_actors.gc(message);
}
}
#[xtra_productivity]
impl<O, M, T, W> Actor<O, M, T, W>
where
@ -573,35 +534,10 @@ where
async fn handle_accept_settlement(&mut self, msg: AcceptSettlement) -> Result<()> {
let AcceptSettlement { order_id } = msg;
tracing::debug!(%order_id, "Maker accepts a settlement proposal" );
let taker_id = self.get_taker_id_of_proposal(&order_id)?;
match self
.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
msg: wire::MakerToTaker::Settlement {
order_id,
msg: wire::maker_to_taker::Settlement::Confirm,
},
})
.await?
{
Ok(_) => {
self.current_agreed_proposals
.insert(order_id, self.get_settlement_proposal(order_id)?);
self.remove_pending_proposal(&order_id)
.await
.context("accepted settlement")?;
}
Err(e) => {
tracing::warn!("Failed to notify taker of accepted settlement: {}", e);
self.remove_pending_proposal(&order_id)
.await
.context("accepted settlement")?;
}
}
self.settlement_actors
.send(&order_id, collab_settlement_maker::Accepted)
.await
.with_context(|| format!("No settlement in progress for order {}", order_id))?;
Ok(())
}
@ -609,25 +545,10 @@ where
async fn handle_reject_settlement(&mut self, msg: RejectSettlement) -> Result<()> {
let RejectSettlement { order_id } = msg;
tracing::debug!(%order_id, "Maker rejects a settlement proposal" );
let taker_id = self.get_taker_id_of_proposal(&order_id)?;
// clean-up state ahead of sending to ensure consistency in case we fail to deliver the
// message
self.remove_pending_proposal(&order_id)
self.settlement_actors
.send(&order_id, collab_settlement_maker::Rejected)
.await
.context("rejected settlement")?;
self.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
msg: wire::MakerToTaker::Settlement {
order_id,
msg: wire::maker_to_taker::Settlement::Reject,
},
})
.await??;
.with_context(|| format!("No settlement in progress for order {}", order_id))?;
Ok(())
}
@ -668,6 +589,58 @@ where
}
}
#[xtra_productivity]
impl<O, M, T, W> Actor<O, M, T, W>
where
M: xtra::Handler<monitor::CollaborativeSettlement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_settlement_completed(&mut self, msg: collab_settlement_maker::Completed) {
log_error!(async {
use collab_settlement_maker::Completed::*;
let (order_id, settlement, script_pubkey) = match msg {
Confirmed {
order_id,
settlement,
script_pubkey,
} => (order_id, settlement, script_pubkey),
Rejected { .. } => {
return Ok(());
}
Failed { order_id, error } => {
tracing::warn!(%order_id, "Collaborative settlement failed: {:#}", error);
return Ok(());
}
};
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let tx = settlement.tx.clone();
cfd.handle_proposal_signed(settlement)
.context("Failed to update state with collaborative settlement")?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction { tx })
.await?
.context("Broadcasting close transaction")?;
tracing::info!(%order_id, "Close transaction published with txid {}", txid);
self.monitor_actor
.send(monitor::CollaborativeSettlement {
order_id,
tx: (txid, script_pubkey),
})
.await?;
anyhow::Ok(())
});
}
}
#[xtra_productivity]
impl<O, M, T, W> Actor<O, M, T, W>
where
@ -829,70 +802,46 @@ where
impl<O, M, T, W> Actor<O, M, T, W>
where
M: xtra::Handler<monitor::CollaborativeSettlement>,
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring> + xtra::Handler<monitor::CollaborativeSettlement>,
T: xtra::Handler<maker_inc_connections::settlement::Response>
+ xtra::Handler<Stopping<collab_settlement_maker::Actor>>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_initiate_settlement(
async fn handle_propose_settlement(
&mut self,
taker_id: Identity,
order_id: OrderId,
sig_taker: Signature,
proposal: SettlementProposal,
ctx: &mut xtra::Context<Self>,
) -> Result<()> {
tracing::info!(
"Taker {} initiated collab settlement for order { } by sending their signature",
taker_id,
order_id,
);
let (proposal, agreed_taker_id) = self
.current_agreed_proposals
.get(&order_id)
.context("maker should have data matching the agreed settlement")?;
if taker_id != *agreed_taker_id {
anyhow::bail!(
"taker Id mismatch. Expected: {}, received: {}",
agreed_taker_id,
taker_id
);
}
let disconnected = self
.settlement_actors
.get_disconnected(proposal.order_id)
.with_context(|| {
format!(
"Settlement for order {} is already in progress",
proposal.order_id
)
})?;
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(proposal.order_id, &mut conn).await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let dlc = cfd.open_dlc().context("CFD was in wrong state")?;
let (tx, sig_maker) = dlc.close_transaction(proposal)?;
let own_script_pubkey = dlc.script_pubkey_for(cfd.role());
cfd.handle_proposal_signed(CollaborativeSettlement::new(
tx.clone(),
own_script_pubkey.clone(),
proposal.price,
)?)?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
let spend_tx = dlc.finalize_spend_transaction((tx, sig_maker), sig_taker)?;
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction {
tx: spend_tx.clone(),
})
.await?
.context("Broadcasting spend transaction")?;
tracing::info!("Close transaction published with txid {}", txid);
self.monitor_actor
.send(monitor::CollaborativeSettlement {
order_id,
tx: (txid, own_script_pubkey),
})
.await?;
let this = ctx.address().expect("self to be alive");
let (addr, task) = collab_settlement_maker::Actor::new(
cfd,
proposal,
self.projection_actor.clone(),
&ctx.address().expect("we are alive"),
taker_id,
&self.takers,
(&self.takers, &this),
)
.create(None)
.run();
self.current_agreed_proposals
.remove(&order_id)
.context("remove accepted proposal after signing")?;
self.tasks.add(task);
disconnected.insert(addr);
Ok(())
}
@ -1057,7 +1006,9 @@ where
T: xtra::Handler<maker_inc_connections::ConfirmOrder>
+ xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>
+ xtra::Handler<Stopping<setup_maker::Actor>>,
+ xtra::Handler<Stopping<setup_maker::Actor>>
+ xtra::Handler<maker_inc_connections::settlement::Response>
+ xtra::Handler<Stopping<collab_settlement_maker::Actor>>,
W: xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::TryBroadcastTransaction>,
@ -1085,14 +1036,15 @@ where
taker,
maker,
price
}
},
ctx
))
}
wire::TakerToMaker::Settlement {
order_id,
msg: wire::taker_to_maker::Settlement::Initiate { sig_taker },
msg: wire::taker_to_maker::Settlement::Initiate { .. },
..
} => {
log_error!(self.handle_initiate_settlement(taker_id, order_id, sig_taker))
unreachable!("Handled within `collab_settlement_maker::Actor");
}
wire::TakerToMaker::ProposeRollOver {
order_id,

74
daemon/src/maker_inc_connections.rs

@ -4,8 +4,8 @@ use crate::model::cfd::{Order, OrderId};
use crate::model::Identity;
use crate::noise::TransportStateExt;
use crate::tokio_ext::FutureExt;
use crate::wire::{EncryptedJsonCodec, MakerToTaker, TakerToMaker, Version};
use crate::{maker_cfd, noise, send_to_socket, setup_maker, wire, Tasks};
use crate::wire::{taker_to_maker, EncryptedJsonCodec, MakerToTaker, TakerToMaker, Version};
use crate::{collab_settlement_maker, maker_cfd, noise, send_to_socket, setup_maker, wire, Tasks};
use anyhow::{bail, Context, Result};
use futures::{SinkExt, TryStreamExt};
use std::collections::HashMap;
@ -35,6 +35,32 @@ pub struct ConfirmOrder {
pub address: xtra::Address<setup_maker::Actor>,
}
pub mod settlement {
use super::*;
/// Message sent from the `collab_settlement_maker::Actor` to the
/// `maker_inc_connections::Actor` so that it can forward it to the
/// taker.
///
/// Additionally, the address of this instance of the
/// `collab_settlement_maker::Actor` is included so that the
/// `maker_inc_connections::Actor` knows where to forward the
/// collaborative settlement messages from the taker about this
/// particular order.
pub struct Response {
pub taker_id: Identity,
pub order_id: OrderId,
pub decision: Decision,
}
pub enum Decision {
Accept {
address: xtra::Address<collab_settlement_maker::Actor>,
},
Reject,
}
}
#[derive(Debug)]
pub struct TakerMessage {
pub taker_id: Identity,
@ -59,6 +85,7 @@ pub struct Actor {
noise_priv_key: x25519_dalek::StaticSecret,
heartbeat_interval: Duration,
setup_actors: AddressMap<OrderId, setup_maker::Actor>,
settlement_actors: AddressMap<OrderId, collab_settlement_maker::Actor>,
connection_tasks: HashMap<Identity, Tasks>,
}
@ -78,6 +105,7 @@ impl Actor {
noise_priv_key,
heartbeat_interval,
setup_actors: AddressMap::default(),
settlement_actors: AddressMap::default(),
connection_tasks: HashMap::new(),
}
}
@ -231,6 +259,28 @@ impl Actor {
Ok(())
}
async fn handle_settlement_response(&mut self, msg: settlement::Response) -> Result<()> {
let decision = match msg.decision {
settlement::Decision::Accept { address } => {
self.settlement_actors.insert(msg.order_id, address);
wire::maker_to_taker::Settlement::Confirm
}
settlement::Decision::Reject => wire::maker_to_taker::Settlement::Reject,
};
self.send_to_taker(
&msg.taker_id,
wire::MakerToTaker::Settlement {
order_id: msg.order_id,
msg: decision,
},
)
.await?;
Ok(())
}
async fn handle_taker_message(&mut self, msg: TakerMessage) -> Result<(), NoConnection> {
self.send_to_taker(&msg.taker_id, msg.msg).await?;
@ -276,6 +326,19 @@ impl Actor {
tracing::error!(%order_id, "No active contract setup");
}
},
Settlement {
order_id,
msg: taker_to_maker::Settlement::Initiate { sig_taker },
} => {
if self
.settlement_actors
.send(&order_id, collab_settlement_maker::Initiated { sig_taker })
.await
.is_err()
{
tracing::warn!(%order_id, "No active settlement");
}
}
_ => {
let _ = self.taker_msg_channel.send(msg);
}
@ -285,6 +348,13 @@ impl Actor {
async fn handle_setup_actor_stopping(&mut self, message: Stopping<setup_maker::Actor>) {
self.setup_actors.gc(message);
}
async fn handle_settlement_actor_stopping(
&mut self,
message: Stopping<collab_settlement_maker::Actor>,
) {
self.settlement_actors.gc(message);
}
}
struct ReadFail(Identity);

3
daemon/src/rollover_taker.rs

@ -1,5 +1,4 @@
use crate::actor_name::ActorName;
use crate::address_map::Stopping;
use crate::address_map::{ActorName, Stopping};
use crate::connection;
use crate::model::cfd::{Cfd, Dlc, OrderId, Role, RollOverProposal, SettlementKind};
use crate::model::{BitMexPriceEventId, Timestamp};

3
daemon/src/setup_maker.rs

@ -1,5 +1,4 @@
use crate::actor_name::ActorName;
use crate::address_map::Stopping;
use crate::address_map::{ActorName, Stopping};
use crate::model::cfd::{Cfd, CfdState, Dlc, Order, OrderId, Role};
use crate::model::{Identity, Usd};
use crate::oracle::Announcement;

Loading…
Cancel
Save