Browse Source

Merge #355

355: Notify monitor actor to watch for collaborative close tx r=da-kami a=da-kami

So far we only did this on startup, but not during the actual execution of the collaborative close.

Co-authored-by: Daniel Karzel <daniel@comit.network>
debug-statements
bors[bot] 3 years ago
committed by GitHub
parent
commit
80948a563f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 124
      daemon/src/maker_cfd.rs
  2. 33
      daemon/src/monitor.rs
  3. 13
      daemon/src/taker_cfd.rs

124
daemon/src/maker_cfd.rs

@ -203,63 +203,6 @@ impl<O, M, T> Actor<O, M, T> {
Ok(()) Ok(())
} }
async fn handle_initiate_settlement(
&mut self,
taker_id: TakerId,
order_id: OrderId,
sig_taker: Signature,
) -> Result<()> {
tracing::info!(
"Taker {} initiated collab settlement for order { } by sending their signature",
taker_id,
order_id,
);
let (proposal, agreed_taker_id) = self
.current_agreed_proposals
.get(&order_id)
.context("maker should have data matching the agreed settlement")?;
if taker_id != *agreed_taker_id {
anyhow::bail!(
"taker Id mismatch. Expected: {}, received: {}",
agreed_taker_id,
taker_id
);
}
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let dlc = cfd.open_dlc().context("CFD was in wrong state")?;
let (tx, sig_maker) = dlc.close_transaction(proposal)?;
cfd.handle(CfdStateChangeEvent::ProposalSigned(
CollaborativeSettlement::new(
tx.clone(),
dlc.script_pubkey_for(cfd.role()),
proposal.price,
),
))?;
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
let spend_tx = dlc.finalize_spend_transaction((tx, sig_maker), sig_taker)?;
let txid = self
.wallet
.try_broadcast_transaction(spend_tx.clone())
.await
.context("Broadcasting spend transaction")?;
tracing::info!("Close transaction published with txid {}", txid);
self.current_agreed_proposals
.remove(&order_id)
.context("remove accepted proposal after signing")?;
Ok(())
}
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event( cfd_actors::handle_monitoring_event(
@ -899,6 +842,72 @@ where
} }
} }
impl<O, M, T> Actor<O, M, T>
where
M: xtra::Handler<monitor::CollaborativeSettlement>,
{
async fn handle_initiate_settlement(
&mut self,
taker_id: TakerId,
order_id: OrderId,
sig_taker: Signature,
) -> Result<()> {
tracing::info!(
"Taker {} initiated collab settlement for order { } by sending their signature",
taker_id,
order_id,
);
let (proposal, agreed_taker_id) = self
.current_agreed_proposals
.get(&order_id)
.context("maker should have data matching the agreed settlement")?;
if taker_id != *agreed_taker_id {
anyhow::bail!(
"taker Id mismatch. Expected: {}, received: {}",
agreed_taker_id,
taker_id
);
}
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let dlc = cfd.open_dlc().context("CFD was in wrong state")?;
let (tx, sig_maker) = dlc.close_transaction(proposal)?;
let own_script_pubkey = dlc.script_pubkey_for(cfd.role());
cfd.handle(CfdStateChangeEvent::ProposalSigned(
CollaborativeSettlement::new(tx.clone(), own_script_pubkey.clone(), proposal.price),
))?;
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
let spend_tx = dlc.finalize_spend_transaction((tx, sig_maker), sig_taker)?;
let txid = self
.wallet
.try_broadcast_transaction(spend_tx.clone())
.await
.context("Broadcasting spend transaction")?;
tracing::info!("Close transaction published with txid {}", txid);
self.monitor_actor
.do_send_async(monitor::CollaborativeSettlement {
order_id,
tx: (txid, own_script_pubkey),
})
.await?;
self.current_agreed_proposals
.remove(&order_id)
.context("remove accepted proposal after signing")?;
Ok(())
}
}
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static, T: 'static> Handler<CfdAction> for Actor<O, M, T> impl<O: 'static, M: 'static, T: 'static> Handler<CfdAction> for Actor<O, M, T>
where where
@ -977,6 +986,7 @@ impl<O: 'static, M: 'static, T: 'static> Handler<FromTaker> for Actor<O, M, T>
where where
T: xtra::Handler<maker_inc_connections::BroadcastOrder> T: xtra::Handler<maker_inc_connections::BroadcastOrder>
+ xtra::Handler<maker_inc_connections::TakerMessage>, + xtra::Handler<maker_inc_connections::TakerMessage>,
M: xtra::Handler<monitor::CollaborativeSettlement>,
{ {
async fn handle(&mut self, FromTaker { taker_id, msg }: FromTaker, _ctx: &mut Context<Self>) { async fn handle(&mut self, FromTaker { taker_id, msg }: FromTaker, _ctx: &mut Context<Self>) {
match msg { match msg {

33
daemon/src/monitor.rs

@ -1,4 +1,4 @@
use crate::model::cfd::{CetStatus, Cfd, CfdState, CollaborativeSettlement, Dlc, OrderId}; use crate::model::cfd::{CetStatus, Cfd, CfdState, Dlc, OrderId};
use crate::model::BitMexPriceEventId; use crate::model::BitMexPriceEventId;
use crate::oracle::Attestation; use crate::oracle::Attestation;
use crate::{log_error, model, oracle}; use crate::{log_error, model, oracle};
@ -23,6 +23,11 @@ pub struct StartMonitoring {
pub params: MonitorParams, pub params: MonitorParams,
} }
pub struct CollaborativeSettlement {
pub order_id: OrderId,
pub tx: (Txid, Script),
}
#[derive(Clone)] #[derive(Clone)]
pub struct MonitorParams { pub struct MonitorParams {
lock: (Txid, Descriptor<PublicKey>), lock: (Txid, Descriptor<PublicKey>),
@ -84,7 +89,7 @@ impl Actor<bdk::electrum_client::Client> {
actor.monitor_commit_refund_timelock(&params, cfd.order.id); actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id); actor.monitor_refund_finality(&params,cfd.order.id);
if let Some(CollaborativeSettlement { tx, ..} if let Some(model::cfd::CollaborativeSettlement { tx, ..}
) = cfd.state.get_collaborative_close() { ) = cfd.state.get_collaborative_close() {
let close_params = (tx.txid(), let close_params = (tx.txid(),
tx.output.first().expect("have output").script_pubkey.clone()); tx.output.first().expect("have output").script_pubkey.clone());
@ -315,6 +320,16 @@ where
Ok(()) Ok(())
} }
fn handle_collaborative_settlement(
&mut self,
collaborative_settlement: CollaborativeSettlement,
) {
self.monitor_close_finality(
collaborative_settlement.tx,
collaborative_settlement.order_id,
);
}
async fn update_state( async fn update_state(
&mut self, &mut self,
latest_block_height: BlockHeight, latest_block_height: BlockHeight,
@ -519,6 +534,10 @@ impl xtra::Message for StartMonitoring {
type Result = (); type Result = ();
} }
impl xtra::Message for CollaborativeSettlement {
type Result = ();
}
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq)]
pub enum Event { pub enum Event {
LockFinality(OrderId), LockFinality(OrderId),
@ -640,6 +659,16 @@ impl xtra::Handler<oracle::Attestation> for Actor {
} }
} }
#[async_trait]
impl<C> xtra::Handler<CollaborativeSettlement> for Actor<C>
where
C: bdk::electrum_client::ElectrumApi + Send + 'static,
{
async fn handle(&mut self, msg: CollaborativeSettlement, _ctx: &mut xtra::Context<Self>) {
self.handle_collaborative_settlement(msg);
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

13
daemon/src/taker_cfd.rs

@ -344,12 +344,23 @@ impl Actor {
.await?; .await?;
cfd.handle(CfdStateChangeEvent::ProposalSigned( cfd.handle(CfdStateChangeEvent::ProposalSigned(
CollaborativeSettlement::new(tx, dlc.script_pubkey_for(cfd.role()), proposal.price), CollaborativeSettlement::new(
tx.clone(),
dlc.script_pubkey_for(cfd.role()),
proposal.price,
),
))?; ))?;
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
self.remove_pending_proposal(&order_id)?; self.remove_pending_proposal(&order_id)?;
self.monitor_actor
.do_send_async(monitor::CollaborativeSettlement {
order_id,
tx: (tx.txid(), dlc.script_pubkey_for(Role::Taker)),
})
.await?;
Ok(()) Ok(())
} }

Loading…
Cancel
Save