diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 144f926..40fa778 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -11,7 +11,6 @@ use crate::model::cfd::{ use crate::model::{OracleEventId, TakerId, Usd}; use crate::monitor::MonitorParams; use crate::wallet::Wallet; -use crate::wire::TakerToMaker; use crate::{maker_inc_connections, monitor, oracle, setup_contract, wire}; use anyhow::{Context as _, Result}; use async_trait::async_trait; @@ -67,6 +66,11 @@ pub struct CfdSetupCompleted { pub dlc: Result, } +pub struct CfdRollOverCompleted { + pub order_id: OrderId, + pub dlc: Result, +} + pub struct TakerStreamMessage { pub taker_id: TakerId, pub item: Result, @@ -83,6 +87,7 @@ pub struct Actor { current_order_id: Option, monitor_actor: Address>, setup_state: SetupState, + roll_over_state: RollOverState, latest_announcements: Option>, oracle_actor: Address>>, // Maker needs to also store TakerId to be able to send a reply back @@ -97,6 +102,14 @@ enum SetupState { None, } +enum RollOverState { + Active { + taker: TakerId, + sender: mpsc::UnboundedSender, + }, + None, +} + impl Actor { #[allow(clippy::too_many_arguments)] pub fn new( @@ -121,6 +134,7 @@ impl Actor { current_order_id: None, monitor_actor, setup_state: SetupState::None, + roll_over_state: RollOverState::None, latest_announcements: None, oracle_actor, current_pending_proposals: HashMap::new(), @@ -243,8 +257,8 @@ impl Actor { async fn handle_propose_roll_over( &mut self, - taker_id: TakerId, proposal: RollOverProposal, + taker_id: TakerId, ) -> Result<()> { tracing::info!( "Received proposal from the taker {}: {:?} to roll over order {}", @@ -252,6 +266,20 @@ impl Actor { proposal, proposal.order_id ); + + // check if CFD is in open state, otherwise we should not proceed + let mut conn = self.db.acquire().await?; + let cfd = load_cfd_by_order_id(proposal.order_id, &mut conn).await?; + match cfd { + Cfd { + state: CfdState::Open { .. }, + .. + } => (), + _ => { + anyhow::bail!("Order is in invalid state. Cannot propose roll over.") + } + }; + self.current_pending_proposals.insert( proposal.order_id, ( @@ -287,6 +315,24 @@ impl Actor { Ok(()) } + async fn handle_inc_roll_over_protocol_msg( + &mut self, + taker_id: TakerId, + msg: wire::RollOverMsg, + ) -> Result<()> { + match &mut self.roll_over_state { + RollOverState::Active { taker, sender } if taker_id == *taker => { + sender.send(msg).await?; + } + RollOverState::Active { taker, .. } => { + anyhow::bail!("Currently rolling over with different taker {}", taker) + } + RollOverState::None => {} + } + + Ok(()) + } + async fn handle_cfd_setup_completed( &mut self, order_id: OrderId, @@ -335,6 +381,43 @@ impl Actor { Ok(()) } + async fn handle_cfd_roll_over_completed( + &mut self, + order_id: OrderId, + dlc: Result, + ) -> Result<()> { + let dlc = dlc.context("Failed to roll over contract with taker")?; + self.roll_over_state = RollOverState::None; + + let mut conn = self.db.acquire().await?; + insert_new_cfd_state_by_order_id( + order_id, + CfdState::Open { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc: dlc.clone(), + attestation: None, + }, + &mut conn, + ) + .await?; + + self.cfd_feed_actor_inbox + .send(load_all_cfds(&mut conn).await?)?; + + let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + + self.monitor_actor + .do_send_async(monitor::StartMonitoring { + id: order_id, + params: MonitorParams::from_dlc_and_timelocks(dlc, cfd.refund_timelock_in_blocks()), + }) + .await?; + + Ok(()) + } + async fn handle_take_order( &mut self, taker_id: TakerId, @@ -590,22 +673,120 @@ impl Actor { Ok(()) } - async fn handle_accept_roll_over(&mut self, order_id: OrderId) -> Result<()> { - tracing::debug!(%order_id, "Maker accepts a rollover proposal" ); + async fn handle_accept_roll_over( + &mut self, + order_id: OrderId, + ctx: &mut Context, + ) -> Result<()> { + tracing::debug!(%order_id, "Maker accepts a roll_over proposal" ); - // TODO: Initiate the roll over logic + let mut conn = self.db.acquire().await?; + let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + + // Validate if order is actually being requested to be extended + let (proposal, taker_id) = match self.current_pending_proposals.get(&order_id) { + Some(( + UpdateCfdProposal::RollOverProposal { + proposal, + direction: SettlementKind::Incoming, + }, + taker_id, + )) => (proposal, *taker_id), + _ => { + anyhow::bail!("Order is in invalid state. Ignoring trying to accept the roll over request it.") + } + }; + + let dlc = cfd.open_dlc().context("CFD was in wrong state")?; + + // TODO: do we want to store in the db that we rolled over? + + let (oracle_event_id, announcement) = self + .latest_announcements + .clone() + .context("Cannot roll over because no announcement from oracle was found")? + .into_iter() + .next_back() + .context("Empty list of announcements")?; + + self.takers + .send(maker_inc_connections::TakerMessage { + taker_id, + command: TakerCommand::NotifyRollOverAccepted { + id: proposal.order_id, + oracle_event_id, + }, + }) + .await?; + + self.oracle_actor + .do_send_async(oracle::MonitorEvent { + event_id: announcement.id.clone(), + }) + .await?; + + let (sender, receiver) = mpsc::unbounded(); + let contract_future = setup_contract::roll_over( + self.takers.clone().into_sink().with(move |msg| { + future::ok(maker_inc_connections::TakerMessage { + taker_id, + command: TakerCommand::RollOverProtocol(msg), + }) + }), + receiver, + (self.oracle_pk, announcement), + cfd, + Role::Maker, + dlc, + ); + + let this = ctx + .address() + .expect("actor to be able to give address to itself"); + + self.roll_over_state = RollOverState::Active { + sender, + taker: taker_id, + }; + + tokio::spawn(async move { + let dlc = contract_future.await; + + this.do_send_async(CfdRollOverCompleted { order_id, dlc }) + .await + }); self.remove_pending_proposal(&order_id) - .context("accepted rollover")?; + .context("accepted roll_over")?; Ok(()) } async fn handle_reject_roll_over(&mut self, order_id: OrderId) -> Result<()> { - tracing::debug!(%order_id, "Maker rejects a rollover proposal" ); - // TODO: Handle rejection and notify the taker that the rollover was rejected + tracing::debug!(%order_id, "Maker rejects a roll_over proposal" ); + + // Validate if order is actually being requested to be extended + let (_, taker_id) = match self.current_pending_proposals.get(&order_id) { + Some(( + UpdateCfdProposal::RollOverProposal { + proposal, + direction: SettlementKind::Incoming, + }, + taker_id, + )) => (proposal, *taker_id), + _ => { + anyhow::bail!("Order is in invalid state. Ignoring reject roll over request.") + } + }; + + self.takers + .do_send_async(maker_inc_connections::TakerMessage { + taker_id, + command: TakerCommand::NotifyRollOverRejected { id: order_id }, + }) + .await?; self.remove_pending_proposal(&order_id) - .context("rejected rollover")?; + .context("rejected roll_over")?; Ok(()) } @@ -725,8 +906,8 @@ impl Handler for Actor { #[async_trait] impl Handler for Actor { - async fn handle(&mut self, msg: AcceptRollOver, _ctx: &mut Context) { - log_error!(self.handle_accept_roll_over(msg.order_id)) + async fn handle(&mut self, msg: AcceptRollOver, ctx: &mut Context) { + log_error!(self.handle_accept_roll_over(msg.order_id, ctx)) } } @@ -765,6 +946,13 @@ impl Handler for Actor { } } +#[async_trait] +impl Handler for Actor { + async fn handle(&mut self, msg: CfdRollOverCompleted, _ctx: &mut Context) { + log_error!(self.handle_cfd_roll_over_completed(msg.order_id, msg.dlc)); + } +} + #[async_trait] impl Handler for Actor { async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context) { @@ -811,18 +999,22 @@ impl Handler for Actor { wire::TakerToMaker::Protocol(msg) => { log_error!(self.handle_inc_protocol_msg(taker_id, msg)) } - TakerToMaker::ProposeRollOver { + wire::TakerToMaker::ProposeRollOver { order_id, timestamp, } => { log_error!(self.handle_propose_roll_over( - taker_id, RollOverProposal { order_id, timestamp, - } + }, + taker_id, )) } + + wire::TakerToMaker::RollOverProtocol(msg) => { + log_error!(self.handle_inc_roll_over_protocol_msg(taker_id, msg)) + } } KeepRunning::Yes @@ -855,6 +1047,10 @@ impl Message for CfdSetupCompleted { type Result = (); } +impl Message for CfdRollOverCompleted { + type Result = (); +} + impl Message for AcceptOrder { type Result = (); } diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index 3a1e8c3..83b575b 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -1,6 +1,6 @@ use crate::actors::log_error; use crate::model::cfd::{Order, OrderId}; -use crate::model::TakerId; +use crate::model::{OracleEventId, TakerId}; use crate::{maker_cfd, send_to_socket, wire}; use anyhow::{Context as AnyhowContext, Result}; use async_trait::async_trait; @@ -18,13 +18,33 @@ pub struct BroadcastOrder(pub Option); #[allow(clippy::large_enum_variant)] pub enum TakerCommand { - SendOrder { order: Option }, - NotifyInvalidOrderId { id: OrderId }, - NotifyOrderAccepted { id: OrderId }, - NotifyOrderRejected { id: OrderId }, - NotifySettlementAccepted { id: OrderId }, - NotifySettlementRejected { id: OrderId }, + SendOrder { + order: Option, + }, + NotifyInvalidOrderId { + id: OrderId, + }, + NotifyOrderAccepted { + id: OrderId, + }, + NotifyOrderRejected { + id: OrderId, + }, + NotifySettlementAccepted { + id: OrderId, + }, + NotifySettlementRejected { + id: OrderId, + }, + NotifyRollOverAccepted { + id: OrderId, + oracle_event_id: OracleEventId, + }, + NotifyRollOverRejected { + id: OrderId, + }, Protocol(wire::SetupMsg), + RollOverProtocol(wire::RollOverMsg), } pub struct TakerMessage { @@ -108,6 +128,30 @@ impl Actor { self.send_to_taker(msg.taker_id, wire::MakerToTaker::Protocol(setup_msg)) .await?; } + TakerCommand::NotifyRollOverAccepted { + id, + oracle_event_id, + } => { + self.send_to_taker( + msg.taker_id, + wire::MakerToTaker::ConfirmRollOver { + order_id: id, + oracle_event_id, + }, + ) + .await?; + } + TakerCommand::NotifyRollOverRejected { id } => { + self.send_to_taker(msg.taker_id, wire::MakerToTaker::RejectRollOver(id)) + .await?; + } + TakerCommand::RollOverProtocol(roll_over_msg) => { + self.send_to_taker( + msg.taker_id, + wire::MakerToTaker::RollOverProtocol(roll_over_msg), + ) + .await?; + } } Ok(()) } diff --git a/daemon/src/model/cfd.rs b/daemon/src/model/cfd.rs index 570a52e..66ba9da 100644 --- a/daemon/src/model/cfd.rs +++ b/daemon/src/model/cfd.rs @@ -3,7 +3,7 @@ use crate::monitor; use crate::oracle::Attestation; use anyhow::{bail, Context, Result}; use bdk::bitcoin::secp256k1::{SecretKey, Signature}; -use bdk::bitcoin::{Address, Amount, PublicKey, SignedAmount, Transaction}; +use bdk::bitcoin::{Address, Amount, PublicKey, Script, SignedAmount, Transaction, Txid}; use bdk::descriptor::Descriptor; use cfd_protocol::secp256k1_zkp::{EcdsaAdaptorSignature, SECP256K1}; use cfd_protocol::{finalize_spend_transaction, spending_tx_sighash}; @@ -385,8 +385,7 @@ pub struct SettlementProposal { pub maker: Amount, } -/// Proposal to roll over over a fixed length. -/// The length of the roll over is defined by the maker. +/// Proposed collaborative settlement #[derive(Debug, Clone)] pub struct RollOverProposal { pub order_id: OrderId, @@ -644,6 +643,9 @@ impl Cfd { transition_timestamp: SystemTime::now(), }, }, + monitor::Event::RevokedTransactionFound(_) => { + todo!("Punish bad counterparty") + } }, CfdStateChangeEvent::CommitTxSent => { let (dlc, attestation) = if let PendingOpen { @@ -873,6 +875,14 @@ impl Cfd { } } + pub fn open_dlc(&self) -> Option { + if let CfdState::Open { dlc, .. } = self.state.clone() { + Some(dlc) + } else { + None + } + } + pub fn is_must_refund(&self) -> bool { matches!(self.state.clone(), CfdState::MustRefund { .. }) } @@ -1270,12 +1280,39 @@ pub struct Dlc { pub identity: SecretKey, pub identity_counterparty: PublicKey, pub revocation: SecretKey, + pub revocation_pk_counterparty: PublicKey, pub publish: SecretKey, - pub address: Address, + pub publish_pk_counterparty: PublicKey, + pub maker_address: Address, + pub taker_address: Address, /// The fully signed lock transaction ready to be published on chain pub lock: (Transaction, Descriptor), pub commit: (Transaction, EcdsaAdaptorSignature, Descriptor), pub cets: HashMap>, pub refund: (Transaction, Signature), + + #[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")] + pub maker_lock_amount: Amount, + #[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")] + pub taker_lock_amount: Amount, + + pub revoked_commit: Vec, +} + +/// Information which we need to remember in order to construct a +/// punishment transaction in case the counterparty publishes a +/// revoked commit transaction. +/// +/// It also includes the information needed to monitor for the +/// publication of the revoked commit transaction. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct RevokedCommit { + // To build punish transaction + pub encsig_ours: EcdsaAdaptorSignature, + pub revocation_sk_theirs: SecretKey, + pub publication_pk_theirs: PublicKey, + // To monitor revoked commit transaction + pub txid: Txid, + pub script_pubkey: Script, } diff --git a/daemon/src/monitor.rs b/daemon/src/monitor.rs index 40cf801..f6e36ae 100644 --- a/daemon/src/monitor.rs +++ b/daemon/src/monitor.rs @@ -37,6 +37,7 @@ pub struct MonitorParams { commit: (Txid, Descriptor), cets: HashMap>, refund: (Txid, Script, u32), + revoked_commits: Vec<(Txid, Script)>, } pub struct Sync; @@ -110,7 +111,7 @@ where actor.monitor_refund_finality(¶ms,cfd.order.id); } CetStatus::OracleSigned(attestation) => { - actor.monitor_cet_finality(map_cets(dlc.cets, dlc.address.script_pubkey()), attestation, cfd.order.id)?; + actor.monitor_cet_finality(map_cets(dlc.cets, dlc.maker_address.script_pubkey()), attestation, cfd.order.id)?; actor.monitor_commit_cet_timelock(¶ms, cfd.order.id); actor.monitor_commit_refund_timelock(¶ms, cfd.order.id); actor.monitor_refund_finality(¶ms,cfd.order.id); @@ -120,7 +121,7 @@ where actor.monitor_refund_finality(¶ms,cfd.order.id); } CetStatus::Ready(attestation) => { - actor.monitor_cet_finality(map_cets(dlc.cets, dlc.address.script_pubkey()), attestation, cfd.order.id)?; + actor.monitor_cet_finality(map_cets(dlc.cets, dlc.maker_address.script_pubkey()), attestation, cfd.order.id)?; actor.monitor_commit_refund_timelock(¶ms, cfd.order.id); actor.monitor_refund_finality(¶ms,cfd.order.id); } @@ -163,6 +164,7 @@ where self.monitor_commit_cet_timelock(params, order_id); self.monitor_commit_refund_timelock(params, order_id); self.monitor_refund_finality(params, order_id); + self.monitor_revoked_commit_transactions(params, order_id); } fn monitor_lock_finality(&mut self, params: &MonitorParams, order_id: OrderId) { @@ -236,6 +238,18 @@ where Ok(()) } + fn monitor_revoked_commit_transactions(&mut self, params: &MonitorParams, order_id: OrderId) { + for revoked_commit_tx in params.revoked_commits.iter() { + self.awaiting_status + .entry((revoked_commit_tx.0, revoked_commit_tx.1.clone())) + .or_default() + .push(( + ScriptStatus::with_confirmations(0), + Event::RevokedTransactionFound(order_id), + )); + } + } + async fn sync(&mut self) -> Result<()> { // Fetch the latest block for storing the height. // We do not act on this subscription after this call, as we cannot rely on @@ -474,6 +488,7 @@ pub enum Event { CetFinality(OrderId), RefundTimelockExpired(OrderId), RefundFinality(OrderId), + RevokedTransactionFound(OrderId), } impl Event { @@ -485,6 +500,7 @@ impl Event { Event::RefundTimelockExpired(order_id) => order_id, Event::RefundFinality(order_id) => order_id, Event::CetFinality(order_id) => order_id, + Event::RevokedTransactionFound(order_id) => order_id, }; *order_id @@ -493,7 +509,7 @@ impl Event { impl MonitorParams { pub fn from_dlc_and_timelocks(dlc: Dlc, refund_timelock_in_blocks: u32) -> Self { - let script_pubkey = dlc.address.script_pubkey(); + let script_pubkey = dlc.maker_address.script_pubkey(); MonitorParams { lock: (dlc.lock.0.txid(), dlc.lock.1), commit: (dlc.commit.0.txid(), dlc.commit.2), @@ -503,6 +519,11 @@ impl MonitorParams { script_pubkey, refund_timelock_in_blocks, ), + revoked_commits: dlc + .revoked_commit + .iter() + .map(|rev_commit| (rev_commit.txid, rev_commit.script_pubkey.clone())) + .collect(), } } } diff --git a/daemon/src/setup_contract.rs b/daemon/src/setup_contract.rs index 6a4253e..e20c063 100644 --- a/daemon/src/setup_contract.rs +++ b/daemon/src/setup_contract.rs @@ -1,16 +1,21 @@ -use crate::model::cfd::{Cet, Cfd, Dlc, Role}; +use crate::model::cfd::{Cet, Cfd, Dlc, RevokedCommit, Role}; use crate::model::OracleEventId; use crate::wallet::Wallet; -use crate::wire::{Msg0, Msg1, Msg2, SetupMsg}; -use crate::{model, payout_curve}; +use crate::wire::{ + Msg0, Msg1, Msg2, RollOverMsg, RollOverMsg0, RollOverMsg1, RollOverMsg2, SetupMsg, +}; +use crate::{model, oracle, payout_curve}; use anyhow::{Context, Result}; use bdk::bitcoin::secp256k1::{schnorrsig, Signature, SECP256K1}; +use bdk::bitcoin::util::psbt::PartiallySignedTransaction; use bdk::bitcoin::{Amount, PublicKey, Transaction}; use bdk::descriptor::Descriptor; +use bdk::miniscript::DescriptorTrait; use cfd_protocol::secp256k1_zkp::EcdsaAdaptorSignature; use cfd_protocol::{ commit_descriptor, compute_adaptor_pk, create_cfd_transactions, interval, lock_descriptor, - spending_tx_sighash, Announcement, PartyParams, PunishParams, + renew_cfd_transactions, secp256k1_zkp, spending_tx_sighash, Announcement, PartyParams, + PunishParams, }; use futures::stream::FusedStream; use futures::{Sink, SinkExt, StreamExt}; @@ -131,7 +136,7 @@ pub async fn new( ¶ms.own_punish.publish_pk, ¶ms.other.identity_pk, ) - .context("Punish adaptor signature does not verify")?; + .context("Commit adaptor signature does not verify")?; for own_grouped_cets in &own_cets { let other_cets = msg1 @@ -221,12 +226,279 @@ pub async fn new( identity: sk, identity_counterparty: params.other.identity_pk, revocation: rev_sk, + revocation_pk_counterparty: other_punish.revocation_pk, publish: publish_sk, - address: params.own.address, + publish_pk_counterparty: other_punish.publish_pk, + maker_address: params.maker().address.clone(), + taker_address: params.taker().address.clone(), lock: (signed_lock_tx.extract_tx(), lock_desc), commit: (commit_tx, msg1.commit, commit_desc), cets, refund: (refund_tx, msg1.refund), + maker_lock_amount: params.maker().lock_amount, + taker_lock_amount: params.taker().lock_amount, + revoked_commit: Vec::new(), + }) +} + +pub async fn roll_over( + mut sink: impl Sink + Unpin, + mut stream: impl FusedStream + Unpin, + (oracle_pk, announcement): (schnorrsig::PublicKey, oracle::Announcement), + cfd: Cfd, + our_role: Role, + dlc: Dlc, +) -> Result { + let sk = dlc.identity; + let pk = PublicKey::new(secp256k1_zkp::PublicKey::from_secret_key(SECP256K1, &sk)); + + let (rev_sk, rev_pk) = crate::keypair::new(&mut rand::thread_rng()); + let (publish_sk, publish_pk) = crate::keypair::new(&mut rand::thread_rng()); + + let own_punish = PunishParams { + revocation_pk: rev_pk, + publish_pk, + }; + + sink.send(RollOverMsg::Msg0(RollOverMsg0 { + revocation_pk: rev_pk, + publish_pk, + })) + .await + .context("Failed to send Msg0")?; + let msg0 = stream + .select_next_some() + .await + .try_into_msg0() + .context("Failed to read Msg0")?; + + let maker_lock_amount = dlc.maker_lock_amount; + let taker_lock_amount = dlc.taker_lock_amount; + let payouts = HashMap::from_iter([( + // TODO : we want to support multiple announcements + Announcement { + id: announcement.id.0, + nonce_pks: announcement.nonce_pks.clone(), + }, + payout_curve::calculate( + cfd.order.price, + cfd.quantity_usd, + maker_lock_amount, + (taker_lock_amount, cfd.order.leverage), + )?, + )]); + + // unsign lock tx because PartiallySignedTransaction needs an unsigned tx + let mut unsigned_lock_tx = dlc.lock.0.clone(); + unsigned_lock_tx + .input + .iter_mut() + .for_each(|input| input.witness.clear()); + + let lock_tx = PartiallySignedTransaction::from_unsigned_tx(unsigned_lock_tx)?; + let other_punish_params = PunishParams { + revocation_pk: msg0.revocation_pk, + publish_pk: msg0.publish_pk, + }; + let ((maker_identity, maker_punish_params), (taker_identity, taker_punish_params)) = + match our_role { + Role::Maker => ( + (pk, own_punish), + (dlc.identity_counterparty, other_punish_params), + ), + Role::Taker => ( + (dlc.identity_counterparty, other_punish_params), + (pk, own_punish), + ), + }; + let own_cfd_txs = renew_cfd_transactions( + lock_tx.clone(), + ( + pk, + maker_lock_amount, + dlc.maker_address.clone(), + maker_punish_params, + ), + ( + dlc.identity_counterparty, + taker_lock_amount, + dlc.taker_address.clone(), + taker_punish_params, + ), + oracle_pk, + ( + model::cfd::Cfd::CET_TIMELOCK, + cfd.refund_timelock_in_blocks(), + ), + payouts, + sk, + ) + .context("Failed to create new CFD transactions")?; + + sink.send(RollOverMsg::Msg1(RollOverMsg1::from(own_cfd_txs.clone()))) + .await + .context("Failed to send Msg1")?; + + let msg1 = stream + .select_next_some() + .await + .try_into_msg1() + .context("Failed to read Msg1")?; + + let lock_amount = taker_lock_amount + maker_lock_amount; + + let commit_desc = commit_descriptor( + ( + maker_identity, + maker_punish_params.revocation_pk, + maker_punish_params.publish_pk, + ), + ( + taker_identity, + taker_punish_params.revocation_pk, + taker_punish_params.publish_pk, + ), + ); + + let own_cets = own_cfd_txs.cets; + let commit_tx = own_cfd_txs.commit.0.clone(); + + let commit_amount = Amount::from_sat(commit_tx.output[0].value); + + verify_adaptor_signature( + &commit_tx, + &dlc.lock.1, + lock_amount, + &msg1.commit, + &publish_pk, + &dlc.identity_counterparty, + ) + .context("Commit adaptor signature does not verify")?; + + let other_address = match our_role { + Role::Maker => dlc.taker_address.clone(), + Role::Taker => dlc.maker_address.clone(), + }; + + for own_grouped_cets in &own_cets { + let other_cets = msg1 + .cets + .get(&own_grouped_cets.event.id) + .context("Expect event to exist in msg")?; + + verify_cets( + (&oracle_pk, &announcement.nonce_pks), + &PartyParams { + lock_psbt: lock_tx.clone(), + identity_pk: dlc.identity_counterparty, + lock_amount, + address: other_address.clone(), + }, + own_grouped_cets.cets.as_slice(), + other_cets.as_slice(), + &commit_desc, + commit_amount, + ) + .context("CET signatures don't verify")?; + } + + let refund_tx = own_cfd_txs.refund.0; + + verify_signature( + &refund_tx, + &commit_desc, + commit_amount, + &msg1.refund, + &dlc.identity_counterparty, + ) + .context("Refund signature does not verify")?; + + let cets = own_cets + .into_iter() + .map(|grouped_cets| { + let event_id = grouped_cets.event.id; + let other_cets = msg1 + .cets + .get(&event_id) + .with_context(|| format!("Counterparty CETs for event {} missing", event_id))?; + let cets = grouped_cets + .cets + .into_iter() + .map(|(tx, _, digits)| { + let other_encsig = other_cets + .iter() + .find_map(|(other_range, other_encsig)| { + (other_range == &digits.range()).then(|| other_encsig) + }) + .with_context(|| { + format!( + "Missing counterparty adaptor signature for CET corresponding to + price range {:?}", + digits.range() + ) + })?; + Ok(Cet { + tx, + adaptor_sig: *other_encsig, + range: digits.range(), + n_bits: digits.len(), + }) + }) + .collect::>>()?; + Ok((OracleEventId(event_id), cets)) + }) + .collect::>>()?; + + // reveal revocation secrets to the other party + sink.send(RollOverMsg::Msg2(RollOverMsg2 { + revocation_sk: dlc.revocation, + })) + .await + .context("Failed to send Msg1")?; + + let msg2 = stream + .select_next_some() + .await + .try_into_msg2() + .context("Failed to read Msg1")?; + let revocation_sk_theirs = msg2.revocation_sk; + + { + let derived_rev_pk = PublicKey::new(secp256k1_zkp::PublicKey::from_secret_key( + SECP256K1, + &revocation_sk_theirs, + )); + + if derived_rev_pk != dlc.revocation_pk_counterparty { + anyhow::bail!("Counterparty sent invalid revocation sk"); + } + } + + let mut revoked_commit = dlc.revoked_commit; + revoked_commit.push(RevokedCommit { + encsig_ours: own_cfd_txs.commit.1, + revocation_sk_theirs, + publication_pk_theirs: dlc.publish_pk_counterparty, + txid: dlc.commit.0.txid(), + script_pubkey: dlc.commit.2.script_pubkey(), + }); + + Ok(Dlc { + identity: sk, + identity_counterparty: dlc.identity_counterparty, + revocation: rev_sk, + revocation_pk_counterparty: other_punish_params.revocation_pk, + publish: publish_sk, + publish_pk_counterparty: other_punish_params.publish_pk, + maker_address: dlc.maker_address, + taker_address: dlc.taker_address, + lock: dlc.lock.clone(), + commit: (commit_tx, msg1.commit, commit_desc), + cets, + refund: (refund_tx, msg1.refund), + maker_lock_amount, + taker_lock_amount, + revoked_commit, }) } diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index c14d198..fe40288 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -10,7 +10,7 @@ use crate::model::cfd::{ use crate::model::{OracleEventId, Usd}; use crate::monitor::{self, MonitorParams}; use crate::wallet::Wallet; -use crate::wire::SetupMsg; +use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg}; use crate::{oracle, send_to_socket, setup_contract, wire}; use anyhow::{Context as _, Result}; use async_trait::async_trait; @@ -46,13 +46,25 @@ pub struct CfdSetupCompleted { pub dlc: Result, } +pub struct CfdRollOverCompleted { + pub order_id: OrderId, + pub dlc: Result, +} + pub struct Commit { pub order_id: OrderId, } enum SetupState { Active { - sender: mpsc::UnboundedSender, + sender: mpsc::UnboundedSender, + }, + None, +} + +enum RollOverState { + Active { + sender: mpsc::UnboundedSender, }, None, } @@ -67,6 +79,7 @@ pub struct Actor { send_to_maker: Address>, monitor_actor: Address>, setup_state: SetupState, + roll_over_state: RollOverState, latest_announcements: Option>, oracle_actor: Address>>, current_pending_proposals: UpdateCfdProposals, @@ -95,6 +108,7 @@ impl Actor { send_to_maker, monitor_actor, setup_state: SetupState::None, + roll_over_state: RollOverState::None, oracle_actor, latest_announcements: None, current_pending_proposals: HashMap::new(), @@ -334,6 +348,68 @@ impl Actor { Ok(()) } + async fn handle_roll_over_accepted( + &mut self, + order_id: OrderId, + oracle_event_id: OracleEventId, + ctx: &mut Context, + ) -> Result<()> { + tracing::info!(%order_id, "Roll over request got accepted"); + + let (sender, receiver) = mpsc::unbounded(); + + if let RollOverState::Active { .. } = self.roll_over_state { + anyhow::bail!("Already rolling over a contract!") + } + + let mut conn = self.db.acquire().await?; + + let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + let dlc = cfd.open_dlc().context("CFD was in wrong state")?; + + // TODO: we need to get multiple announcements for the next 24h + let announcement = self + .latest_announcements + .clone() + .context("Cannot roll over because no announcement from oracle was found")? + .get(&oracle_event_id) + .context("Empty list of announcements")? + .clone(); + + self.oracle_actor + .do_send_async(oracle::MonitorEvent { + event_id: announcement.id.clone(), + }) + .await?; + + let contract_future = setup_contract::roll_over( + self.send_to_maker + .clone() + .into_sink() + .with(|msg| future::ok(wire::TakerToMaker::RollOverProtocol(msg))), + receiver, + (self.oracle_pk, announcement), + cfd, + Role::Taker, + dlc, + ); + + let this = ctx + .address() + .expect("actor to be able to give address to itself"); + + self.roll_over_state = RollOverState::Active { sender }; + + tokio::spawn(async move { + let dlc = contract_future.await; + + this.do_send_async(CfdRollOverCompleted { order_id, dlc }) + .await + }); + + Ok(()) + } + async fn handle_settlement_rejected(&mut self, order_id: OrderId) -> Result<()> { tracing::info!(%order_id, "Settlement proposal got rejected"); @@ -342,6 +418,16 @@ impl Actor { Ok(()) } + async fn handle_roll_over_rejected(&mut self, order_id: OrderId) -> Result<()> { + tracing::debug!(%order_id, "Roll over request rejected"); + // TODO: tell UI that roll over was rejected + + // this is not too bad as we are still monitoring for the CFD to expiry + // the taker can just try to ask again :) + + Ok(()) + } + async fn handle_inc_protocol_msg(&mut self, msg: SetupMsg) -> Result<()> { match &mut self.setup_state { SetupState::Active { sender } => { @@ -355,6 +441,19 @@ impl Actor { Ok(()) } + async fn handle_inc_roll_over_msg(&mut self, msg: RollOverMsg) -> Result<()> { + match &mut self.roll_over_state { + RollOverState::Active { sender } => { + sender.send(msg).await?; + } + RollOverState::None => { + anyhow::bail!("Received message without an active roll_over setup") + } + } + + Ok(()) + } + async fn handle_cfd_setup_completed( &mut self, order_id: OrderId, @@ -404,6 +503,43 @@ impl Actor { Ok(()) } + async fn handle_cfd_roll_over_completed( + &mut self, + order_id: OrderId, + dlc: Result, + ) -> Result<()> { + let dlc = dlc.context("Failed to roll over contract with maker")?; + self.roll_over_state = RollOverState::None; + + let mut conn = self.db.acquire().await?; + insert_new_cfd_state_by_order_id( + order_id, + CfdState::Open { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc: dlc.clone(), + attestation: None, + }, + &mut conn, + ) + .await?; + + self.cfd_feed_actor_inbox + .send(load_all_cfds(&mut conn).await?)?; + + let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + + self.monitor_actor + .do_send_async(monitor::StartMonitoring { + id: order_id, + params: MonitorParams::from_dlc_and_timelocks(dlc, cfd.refund_timelock_in_blocks()), + }) + .await?; + + Ok(()) + } + async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { let order_id = event.order_id(); @@ -571,6 +707,18 @@ impl Handler for Actor { wire::MakerToTaker::Protocol(setup_msg) => { log_error!(self.handle_inc_protocol_msg(setup_msg)) } + wire::MakerToTaker::ConfirmRollOver { + order_id, + oracle_event_id, + } => { + log_error!(self.handle_roll_over_accepted(order_id, oracle_event_id, ctx)) + } + wire::MakerToTaker::RejectRollOver(order_id) => { + log_error!(self.handle_roll_over_rejected(order_id)) + } + MakerToTaker::RollOverProtocol(roll_over_msg) => { + log_error!(self.handle_inc_roll_over_msg(roll_over_msg)) + } } KeepRunning::Yes @@ -584,6 +732,13 @@ impl Handler for Actor { } } +#[async_trait] +impl Handler for Actor { + async fn handle(&mut self, msg: CfdRollOverCompleted, _ctx: &mut Context) { + log_error!(self.handle_cfd_roll_over_completed(msg.order_id, msg.dlc)); + } +} + #[async_trait] impl Handler for Actor { async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context) { @@ -633,6 +788,10 @@ impl Message for CfdSetupCompleted { type Result = (); } +impl Message for CfdRollOverCompleted { + type Result = (); +} + impl Message for Commit { type Result = (); } diff --git a/daemon/src/wire.rs b/daemon/src/wire.rs index 070a601..5db553f 100644 --- a/daemon/src/wire.rs +++ b/daemon/src/wire.rs @@ -1,12 +1,12 @@ use crate::model::cfd::OrderId; -use crate::model::Usd; +use crate::model::{OracleEventId, Usd}; use crate::Order; use anyhow::{bail, Result}; use bdk::bitcoin::secp256k1::Signature; use bdk::bitcoin::util::psbt::PartiallySignedTransaction; use bdk::bitcoin::{Address, Amount, PublicKey}; use bytes::BytesMut; -use cfd_protocol::secp256k1_zkp::EcdsaAdaptorSignature; +use cfd_protocol::secp256k1_zkp::{EcdsaAdaptorSignature, SecretKey}; use cfd_protocol::{CfdTransactions, PartyParams, PunishParams}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; @@ -38,6 +38,7 @@ pub enum TakerToMaker { timestamp: SystemTime, }, Protocol(SetupMsg), + RollOverProtocol(RollOverMsg), } impl fmt::Display for TakerToMaker { @@ -47,6 +48,7 @@ impl fmt::Display for TakerToMaker { TakerToMaker::ProposeSettlement { .. } => write!(f, "ProposeSettlement"), TakerToMaker::Protocol(_) => write!(f, "Protocol"), TakerToMaker::ProposeRollOver { .. } => write!(f, "ProposeRollOver"), + TakerToMaker::RollOverProtocol(_) => write!(f, "RollOverProtocol"), } } } @@ -62,6 +64,12 @@ pub enum MakerToTaker { RejectSettlement(OrderId), InvalidOrderId(OrderId), Protocol(SetupMsg), + RollOverProtocol(RollOverMsg), + ConfirmRollOver { + order_id: OrderId, + oracle_event_id: OracleEventId, + }, + RejectRollOver(OrderId), } impl fmt::Display for MakerToTaker { @@ -74,6 +82,9 @@ impl fmt::Display for MakerToTaker { MakerToTaker::RejectSettlement(_) => write!(f, "RejectSettlement"), MakerToTaker::InvalidOrderId(_) => write!(f, "InvalidOrderId"), MakerToTaker::Protocol(_) => write!(f, "Protocol"), + MakerToTaker::ConfirmRollOver { .. } => write!(f, "ConfirmRollOver"), + MakerToTaker::RejectRollOver(_) => write!(f, "RejectRollOver"), + MakerToTaker::RollOverProtocol(_) => write!(f, "RollOverProtocol"), } } } @@ -270,3 +281,79 @@ impl From for Msg1 { pub struct Msg2 { pub signed_lock: PartiallySignedTransaction, // TODO: Use binary representation } + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "type", content = "payload")] +pub enum RollOverMsg { + Msg0(RollOverMsg0), + Msg1(RollOverMsg1), + Msg2(RollOverMsg2), +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct RollOverMsg0 { + pub revocation_pk: PublicKey, + pub publish_pk: PublicKey, +} + +impl RollOverMsg { + pub fn try_into_msg0(self) -> Result { + if let Self::Msg0(v) = self { + Ok(v) + } else { + bail!("Not Msg0") + } + } + + pub fn try_into_msg1(self) -> Result { + if let Self::Msg1(v) = self { + Ok(v) + } else { + bail!("Not Msg1") + } + } + + pub fn try_into_msg2(self) -> Result { + if let Self::Msg2(v) = self { + Ok(v) + } else { + bail!("Not Msg2") + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct RollOverMsg1 { + pub commit: EcdsaAdaptorSignature, + pub cets: HashMap, EcdsaAdaptorSignature)>>, + pub refund: Signature, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct RollOverMsg2 { + pub revocation_sk: SecretKey, +} + +impl From for RollOverMsg1 { + fn from(txs: CfdTransactions) -> Self { + let cets = txs + .cets + .into_iter() + .map(|grouped_cets| { + ( + grouped_cets.event.id, + grouped_cets + .cets + .into_iter() + .map(|(_, encsig, digits)| (digits.range(), encsig)) + .collect::>(), + ) + }) + .collect::>(); + Self { + commit: txs.commit.1, + cets, + refund: txs.refund.1, + } + } +}