Browse Source

Implement rollover protocol in daemons

compile-for-aarch64
Lucas Soriano del Pino 3 years ago
parent
commit
5cd723441d
No known key found for this signature in database GPG Key ID: EE611E973A1530E7
  1. 224
      daemon/src/maker_cfd.rs
  2. 58
      daemon/src/maker_inc_connections.rs
  3. 45
      daemon/src/model/cfd.rs
  4. 27
      daemon/src/monitor.rs
  5. 298
      daemon/src/setup_contract.rs
  6. 163
      daemon/src/taker_cfd.rs
  7. 90
      daemon/src/wire.rs

224
daemon/src/maker_cfd.rs

@ -11,7 +11,6 @@ use crate::model::cfd::{
use crate::model::{OracleEventId, TakerId, Usd};
use crate::monitor::MonitorParams;
use crate::wallet::Wallet;
use crate::wire::TakerToMaker;
use crate::{maker_inc_connections, monitor, oracle, setup_contract, wire};
use anyhow::{Context as _, Result};
use async_trait::async_trait;
@ -67,6 +66,11 @@ pub struct CfdSetupCompleted {
pub dlc: Result<Dlc>,
}
pub struct CfdRollOverCompleted {
pub order_id: OrderId,
pub dlc: Result<Dlc>,
}
pub struct TakerStreamMessage {
pub taker_id: TakerId,
pub item: Result<wire::TakerToMaker>,
@ -83,6 +87,7 @@ pub struct Actor {
current_order_id: Option<OrderId>,
monitor_actor: Address<monitor::Actor<Actor>>,
setup_state: SetupState,
roll_over_state: RollOverState,
latest_announcements: Option<BTreeMap<OracleEventId, oracle::Announcement>>,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>,
// Maker needs to also store TakerId to be able to send a reply back
@ -97,6 +102,14 @@ enum SetupState {
None,
}
enum RollOverState {
Active {
taker: TakerId,
sender: mpsc::UnboundedSender<wire::RollOverMsg>,
},
None,
}
impl Actor {
#[allow(clippy::too_many_arguments)]
pub fn new(
@ -121,6 +134,7 @@ impl Actor {
current_order_id: None,
monitor_actor,
setup_state: SetupState::None,
roll_over_state: RollOverState::None,
latest_announcements: None,
oracle_actor,
current_pending_proposals: HashMap::new(),
@ -243,8 +257,8 @@ impl Actor {
async fn handle_propose_roll_over(
&mut self,
taker_id: TakerId,
proposal: RollOverProposal,
taker_id: TakerId,
) -> Result<()> {
tracing::info!(
"Received proposal from the taker {}: {:?} to roll over order {}",
@ -252,6 +266,20 @@ impl Actor {
proposal,
proposal.order_id
);
// check if CFD is in open state, otherwise we should not proceed
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(proposal.order_id, &mut conn).await?;
match cfd {
Cfd {
state: CfdState::Open { .. },
..
} => (),
_ => {
anyhow::bail!("Order is in invalid state. Cannot propose roll over.")
}
};
self.current_pending_proposals.insert(
proposal.order_id,
(
@ -287,6 +315,24 @@ impl Actor {
Ok(())
}
async fn handle_inc_roll_over_protocol_msg(
&mut self,
taker_id: TakerId,
msg: wire::RollOverMsg,
) -> Result<()> {
match &mut self.roll_over_state {
RollOverState::Active { taker, sender } if taker_id == *taker => {
sender.send(msg).await?;
}
RollOverState::Active { taker, .. } => {
anyhow::bail!("Currently rolling over with different taker {}", taker)
}
RollOverState::None => {}
}
Ok(())
}
async fn handle_cfd_setup_completed(
&mut self,
order_id: OrderId,
@ -335,6 +381,43 @@ impl Actor {
Ok(())
}
async fn handle_cfd_roll_over_completed(
&mut self,
order_id: OrderId,
dlc: Result<Dlc>,
) -> Result<()> {
let dlc = dlc.context("Failed to roll over contract with taker")?;
self.roll_over_state = RollOverState::None;
let mut conn = self.db.acquire().await?;
insert_new_cfd_state_by_order_id(
order_id,
CfdState::Open {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc: dlc.clone(),
attestation: None,
},
&mut conn,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
self.monitor_actor
.do_send_async(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::from_dlc_and_timelocks(dlc, cfd.refund_timelock_in_blocks()),
})
.await?;
Ok(())
}
async fn handle_take_order(
&mut self,
taker_id: TakerId,
@ -590,22 +673,120 @@ impl Actor {
Ok(())
}
async fn handle_accept_roll_over(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Maker accepts a rollover proposal" );
async fn handle_accept_roll_over(
&mut self,
order_id: OrderId,
ctx: &mut Context<Self>,
) -> Result<()> {
tracing::debug!(%order_id, "Maker accepts a roll_over proposal" );
// TODO: Initiate the roll over logic
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
// Validate if order is actually being requested to be extended
let (proposal, taker_id) = match self.current_pending_proposals.get(&order_id) {
Some((
UpdateCfdProposal::RollOverProposal {
proposal,
direction: SettlementKind::Incoming,
},
taker_id,
)) => (proposal, *taker_id),
_ => {
anyhow::bail!("Order is in invalid state. Ignoring trying to accept the roll over request it.")
}
};
let dlc = cfd.open_dlc().context("CFD was in wrong state")?;
// TODO: do we want to store in the db that we rolled over?
let (oracle_event_id, announcement) = self
.latest_announcements
.clone()
.context("Cannot roll over because no announcement from oracle was found")?
.into_iter()
.next_back()
.context("Empty list of announcements")?;
self.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifyRollOverAccepted {
id: proposal.order_id,
oracle_event_id,
},
})
.await?;
self.oracle_actor
.do_send_async(oracle::MonitorEvent {
event_id: announcement.id.clone(),
})
.await?;
let (sender, receiver) = mpsc::unbounded();
let contract_future = setup_contract::roll_over(
self.takers.clone().into_sink().with(move |msg| {
future::ok(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::RollOverProtocol(msg),
})
}),
receiver,
(self.oracle_pk, announcement),
cfd,
Role::Maker,
dlc,
);
let this = ctx
.address()
.expect("actor to be able to give address to itself");
self.roll_over_state = RollOverState::Active {
sender,
taker: taker_id,
};
tokio::spawn(async move {
let dlc = contract_future.await;
this.do_send_async(CfdRollOverCompleted { order_id, dlc })
.await
});
self.remove_pending_proposal(&order_id)
.context("accepted rollover")?;
.context("accepted roll_over")?;
Ok(())
}
async fn handle_reject_roll_over(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Maker rejects a rollover proposal" );
// TODO: Handle rejection and notify the taker that the rollover was rejected
tracing::debug!(%order_id, "Maker rejects a roll_over proposal" );
// Validate if order is actually being requested to be extended
let (_, taker_id) = match self.current_pending_proposals.get(&order_id) {
Some((
UpdateCfdProposal::RollOverProposal {
proposal,
direction: SettlementKind::Incoming,
},
taker_id,
)) => (proposal, *taker_id),
_ => {
anyhow::bail!("Order is in invalid state. Ignoring reject roll over request.")
}
};
self.takers
.do_send_async(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifyRollOverRejected { id: order_id },
})
.await?;
self.remove_pending_proposal(&order_id)
.context("rejected rollover")?;
.context("rejected roll_over")?;
Ok(())
}
@ -725,8 +906,8 @@ impl Handler<RejectSettlement> for Actor {
#[async_trait]
impl Handler<AcceptRollOver> for Actor {
async fn handle(&mut self, msg: AcceptRollOver, _ctx: &mut Context<Self>) {
log_error!(self.handle_accept_roll_over(msg.order_id))
async fn handle(&mut self, msg: AcceptRollOver, ctx: &mut Context<Self>) {
log_error!(self.handle_accept_roll_over(msg.order_id, ctx))
}
}
@ -765,6 +946,13 @@ impl Handler<CfdSetupCompleted> for Actor {
}
}
#[async_trait]
impl Handler<CfdRollOverCompleted> for Actor {
async fn handle(&mut self, msg: CfdRollOverCompleted, _ctx: &mut Context<Self>) {
log_error!(self.handle_cfd_roll_over_completed(msg.order_id, msg.dlc));
}
}
#[async_trait]
impl Handler<monitor::Event> for Actor {
async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context<Self>) {
@ -811,18 +999,22 @@ impl Handler<TakerStreamMessage> for Actor {
wire::TakerToMaker::Protocol(msg) => {
log_error!(self.handle_inc_protocol_msg(taker_id, msg))
}
TakerToMaker::ProposeRollOver {
wire::TakerToMaker::ProposeRollOver {
order_id,
timestamp,
} => {
log_error!(self.handle_propose_roll_over(
taker_id,
RollOverProposal {
order_id,
timestamp,
}
},
taker_id,
))
}
wire::TakerToMaker::RollOverProtocol(msg) => {
log_error!(self.handle_inc_roll_over_protocol_msg(taker_id, msg))
}
}
KeepRunning::Yes
@ -855,6 +1047,10 @@ impl Message for CfdSetupCompleted {
type Result = ();
}
impl Message for CfdRollOverCompleted {
type Result = ();
}
impl Message for AcceptOrder {
type Result = ();
}

58
daemon/src/maker_inc_connections.rs

@ -1,6 +1,6 @@
use crate::actors::log_error;
use crate::model::cfd::{Order, OrderId};
use crate::model::TakerId;
use crate::model::{OracleEventId, TakerId};
use crate::{maker_cfd, send_to_socket, wire};
use anyhow::{Context as AnyhowContext, Result};
use async_trait::async_trait;
@ -18,13 +18,33 @@ pub struct BroadcastOrder(pub Option<Order>);
#[allow(clippy::large_enum_variant)]
pub enum TakerCommand {
SendOrder { order: Option<Order> },
NotifyInvalidOrderId { id: OrderId },
NotifyOrderAccepted { id: OrderId },
NotifyOrderRejected { id: OrderId },
NotifySettlementAccepted { id: OrderId },
NotifySettlementRejected { id: OrderId },
SendOrder {
order: Option<Order>,
},
NotifyInvalidOrderId {
id: OrderId,
},
NotifyOrderAccepted {
id: OrderId,
},
NotifyOrderRejected {
id: OrderId,
},
NotifySettlementAccepted {
id: OrderId,
},
NotifySettlementRejected {
id: OrderId,
},
NotifyRollOverAccepted {
id: OrderId,
oracle_event_id: OracleEventId,
},
NotifyRollOverRejected {
id: OrderId,
},
Protocol(wire::SetupMsg),
RollOverProtocol(wire::RollOverMsg),
}
pub struct TakerMessage {
@ -108,6 +128,30 @@ impl Actor {
self.send_to_taker(msg.taker_id, wire::MakerToTaker::Protocol(setup_msg))
.await?;
}
TakerCommand::NotifyRollOverAccepted {
id,
oracle_event_id,
} => {
self.send_to_taker(
msg.taker_id,
wire::MakerToTaker::ConfirmRollOver {
order_id: id,
oracle_event_id,
},
)
.await?;
}
TakerCommand::NotifyRollOverRejected { id } => {
self.send_to_taker(msg.taker_id, wire::MakerToTaker::RejectRollOver(id))
.await?;
}
TakerCommand::RollOverProtocol(roll_over_msg) => {
self.send_to_taker(
msg.taker_id,
wire::MakerToTaker::RollOverProtocol(roll_over_msg),
)
.await?;
}
}
Ok(())
}

45
daemon/src/model/cfd.rs

@ -3,7 +3,7 @@ use crate::monitor;
use crate::oracle::Attestation;
use anyhow::{bail, Context, Result};
use bdk::bitcoin::secp256k1::{SecretKey, Signature};
use bdk::bitcoin::{Address, Amount, PublicKey, SignedAmount, Transaction};
use bdk::bitcoin::{Address, Amount, PublicKey, Script, SignedAmount, Transaction, Txid};
use bdk::descriptor::Descriptor;
use cfd_protocol::secp256k1_zkp::{EcdsaAdaptorSignature, SECP256K1};
use cfd_protocol::{finalize_spend_transaction, spending_tx_sighash};
@ -385,8 +385,7 @@ pub struct SettlementProposal {
pub maker: Amount,
}
/// Proposal to roll over over a fixed length.
/// The length of the roll over is defined by the maker.
/// Proposed collaborative settlement
#[derive(Debug, Clone)]
pub struct RollOverProposal {
pub order_id: OrderId,
@ -644,6 +643,9 @@ impl Cfd {
transition_timestamp: SystemTime::now(),
},
},
monitor::Event::RevokedTransactionFound(_) => {
todo!("Punish bad counterparty")
}
},
CfdStateChangeEvent::CommitTxSent => {
let (dlc, attestation) = if let PendingOpen {
@ -873,6 +875,14 @@ impl Cfd {
}
}
pub fn open_dlc(&self) -> Option<Dlc> {
if let CfdState::Open { dlc, .. } = self.state.clone() {
Some(dlc)
} else {
None
}
}
pub fn is_must_refund(&self) -> bool {
matches!(self.state.clone(), CfdState::MustRefund { .. })
}
@ -1270,12 +1280,39 @@ pub struct Dlc {
pub identity: SecretKey,
pub identity_counterparty: PublicKey,
pub revocation: SecretKey,
pub revocation_pk_counterparty: PublicKey,
pub publish: SecretKey,
pub address: Address,
pub publish_pk_counterparty: PublicKey,
pub maker_address: Address,
pub taker_address: Address,
/// The fully signed lock transaction ready to be published on chain
pub lock: (Transaction, Descriptor<PublicKey>),
pub commit: (Transaction, EcdsaAdaptorSignature, Descriptor<PublicKey>),
pub cets: HashMap<OracleEventId, Vec<Cet>>,
pub refund: (Transaction, Signature),
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
pub maker_lock_amount: Amount,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
pub taker_lock_amount: Amount,
pub revoked_commit: Vec<RevokedCommit>,
}
/// Information which we need to remember in order to construct a
/// punishment transaction in case the counterparty publishes a
/// revoked commit transaction.
///
/// It also includes the information needed to monitor for the
/// publication of the revoked commit transaction.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RevokedCommit {
// To build punish transaction
pub encsig_ours: EcdsaAdaptorSignature,
pub revocation_sk_theirs: SecretKey,
pub publication_pk_theirs: PublicKey,
// To monitor revoked commit transaction
pub txid: Txid,
pub script_pubkey: Script,
}

27
daemon/src/monitor.rs

@ -37,6 +37,7 @@ pub struct MonitorParams {
commit: (Txid, Descriptor<PublicKey>),
cets: HashMap<OracleEventId, Vec<Cet>>,
refund: (Txid, Script, u32),
revoked_commits: Vec<(Txid, Script)>,
}
pub struct Sync;
@ -110,7 +111,7 @@ where
actor.monitor_refund_finality(&params,cfd.order.id);
}
CetStatus::OracleSigned(attestation) => {
actor.monitor_cet_finality(map_cets(dlc.cets, dlc.address.script_pubkey()), attestation, cfd.order.id)?;
actor.monitor_cet_finality(map_cets(dlc.cets, dlc.maker_address.script_pubkey()), attestation, cfd.order.id)?;
actor.monitor_commit_cet_timelock(&params, cfd.order.id);
actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id);
@ -120,7 +121,7 @@ where
actor.monitor_refund_finality(&params,cfd.order.id);
}
CetStatus::Ready(attestation) => {
actor.monitor_cet_finality(map_cets(dlc.cets, dlc.address.script_pubkey()), attestation, cfd.order.id)?;
actor.monitor_cet_finality(map_cets(dlc.cets, dlc.maker_address.script_pubkey()), attestation, cfd.order.id)?;
actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id);
}
@ -163,6 +164,7 @@ where
self.monitor_commit_cet_timelock(params, order_id);
self.monitor_commit_refund_timelock(params, order_id);
self.monitor_refund_finality(params, order_id);
self.monitor_revoked_commit_transactions(params, order_id);
}
fn monitor_lock_finality(&mut self, params: &MonitorParams, order_id: OrderId) {
@ -236,6 +238,18 @@ where
Ok(())
}
fn monitor_revoked_commit_transactions(&mut self, params: &MonitorParams, order_id: OrderId) {
for revoked_commit_tx in params.revoked_commits.iter() {
self.awaiting_status
.entry((revoked_commit_tx.0, revoked_commit_tx.1.clone()))
.or_default()
.push((
ScriptStatus::with_confirmations(0),
Event::RevokedTransactionFound(order_id),
));
}
}
async fn sync(&mut self) -> Result<()> {
// Fetch the latest block for storing the height.
// We do not act on this subscription after this call, as we cannot rely on
@ -474,6 +488,7 @@ pub enum Event {
CetFinality(OrderId),
RefundTimelockExpired(OrderId),
RefundFinality(OrderId),
RevokedTransactionFound(OrderId),
}
impl Event {
@ -485,6 +500,7 @@ impl Event {
Event::RefundTimelockExpired(order_id) => order_id,
Event::RefundFinality(order_id) => order_id,
Event::CetFinality(order_id) => order_id,
Event::RevokedTransactionFound(order_id) => order_id,
};
*order_id
@ -493,7 +509,7 @@ impl Event {
impl MonitorParams {
pub fn from_dlc_and_timelocks(dlc: Dlc, refund_timelock_in_blocks: u32) -> Self {
let script_pubkey = dlc.address.script_pubkey();
let script_pubkey = dlc.maker_address.script_pubkey();
MonitorParams {
lock: (dlc.lock.0.txid(), dlc.lock.1),
commit: (dlc.commit.0.txid(), dlc.commit.2),
@ -503,6 +519,11 @@ impl MonitorParams {
script_pubkey,
refund_timelock_in_blocks,
),
revoked_commits: dlc
.revoked_commit
.iter()
.map(|rev_commit| (rev_commit.txid, rev_commit.script_pubkey.clone()))
.collect(),
}
}
}

298
daemon/src/setup_contract.rs

@ -1,16 +1,21 @@
use crate::model::cfd::{Cet, Cfd, Dlc, Role};
use crate::model::cfd::{Cet, Cfd, Dlc, RevokedCommit, Role};
use crate::model::OracleEventId;
use crate::wallet::Wallet;
use crate::wire::{Msg0, Msg1, Msg2, SetupMsg};
use crate::{model, payout_curve};
use crate::wire::{
Msg0, Msg1, Msg2, RollOverMsg, RollOverMsg0, RollOverMsg1, RollOverMsg2, SetupMsg,
};
use crate::{model, oracle, payout_curve};
use anyhow::{Context, Result};
use bdk::bitcoin::secp256k1::{schnorrsig, Signature, SECP256K1};
use bdk::bitcoin::{Amount, PublicKey, Transaction};
use bdk::bitcoin::util::psbt::PartiallySignedTransaction;
use bdk::bitcoin::{Amount, PublicKey, Transaction, TxIn};
use bdk::descriptor::Descriptor;
use bdk::miniscript::DescriptorTrait;
use cfd_protocol::secp256k1_zkp::EcdsaAdaptorSignature;
use cfd_protocol::{
commit_descriptor, compute_adaptor_pk, create_cfd_transactions, interval, lock_descriptor,
spending_tx_sighash, Announcement, PartyParams, PunishParams,
renew_cfd_transactions, secp256k1_zkp, spending_tx_sighash, Announcement, PartyParams,
PunishParams,
};
use futures::stream::FusedStream;
use futures::{Sink, SinkExt, StreamExt};
@ -131,7 +136,7 @@ pub async fn new(
&params.own_punish.publish_pk,
&params.other.identity_pk,
)
.context("Punish adaptor signature does not verify")?;
.context("Commit adaptor signature does not verify")?;
for own_grouped_cets in &own_cets {
let other_cets = msg1
@ -221,12 +226,291 @@ pub async fn new(
identity: sk,
identity_counterparty: params.other.identity_pk,
revocation: rev_sk,
revocation_pk_counterparty: other_punish.revocation_pk,
publish: publish_sk,
address: params.own.address,
publish_pk_counterparty: other_punish.publish_pk,
maker_address: params.maker().address.clone(),
taker_address: params.taker().address.clone(),
lock: (signed_lock_tx.extract_tx(), lock_desc),
commit: (commit_tx, msg1.commit, commit_desc),
cets,
refund: (refund_tx, msg1.refund),
maker_lock_amount: params.maker().lock_amount,
taker_lock_amount: params.taker().lock_amount,
revoked_commit: Vec::new(),
})
}
pub async fn roll_over(
mut sink: impl Sink<RollOverMsg, Error = anyhow::Error> + Unpin,
mut stream: impl FusedStream<Item = RollOverMsg> + Unpin,
(oracle_pk, announcement): (schnorrsig::PublicKey, oracle::Announcement),
cfd: Cfd,
our_role: Role,
dlc: Dlc,
) -> Result<Dlc> {
let sk = dlc.identity;
let pk = PublicKey::new(secp256k1_zkp::PublicKey::from_secret_key(SECP256K1, &sk));
let (rev_sk, rev_pk) = crate::keypair::new(&mut rand::thread_rng());
let (publish_sk, publish_pk) = crate::keypair::new(&mut rand::thread_rng());
let own_punish = PunishParams {
revocation_pk: rev_pk,
publish_pk,
};
sink.send(RollOverMsg::Msg0(RollOverMsg0 {
revocation_pk: rev_pk,
publish_pk,
}))
.await
.context("Failed to send Msg0")?;
let msg0 = stream
.select_next_some()
.await
.try_into_msg0()
.context("Failed to read Msg0")?;
let maker_lock_amount = dlc.maker_lock_amount;
let taker_lock_amount = dlc.taker_lock_amount;
let payouts = HashMap::from_iter([(
// TODO : we want to support multiple announcements
Announcement {
id: announcement.id.0,
nonce_pks: announcement.nonce_pks.clone(),
},
payout_curve::calculate(
cfd.order.price,
cfd.quantity_usd,
maker_lock_amount,
(taker_lock_amount, cfd.order.leverage),
)?,
)]);
// unsign lock tx because PartiallySignedTransaction needs an unsigned tx
let unsigned_lock_tx = Transaction {
version: 0,
lock_time: dlc.lock.0.lock_time,
input: dlc
.lock
.0
.input
.iter()
.map(|txin| TxIn {
previous_output: txin.previous_output,
script_sig: txin.script_sig.clone(),
sequence: txin.sequence,
witness: vec![],
})
.collect(),
output: dlc.lock.0.output.clone(),
};
let lock_tx = PartiallySignedTransaction::from_unsigned_tx(unsigned_lock_tx)?;
let other_punish_params = PunishParams {
revocation_pk: msg0.revocation_pk,
publish_pk: msg0.publish_pk,
};
let ((maker_identity, maker_punish_params), (taker_identity, taker_punish_params)) =
match our_role {
Role::Maker => (
(pk, own_punish),
(dlc.identity_counterparty, other_punish_params),
),
Role::Taker => (
(dlc.identity_counterparty, other_punish_params),
(pk, own_punish),
),
};
let own_cfd_txs = renew_cfd_transactions(
lock_tx.clone(),
(
pk,
maker_lock_amount,
dlc.maker_address.clone(),
maker_punish_params,
),
(
dlc.identity_counterparty,
taker_lock_amount,
dlc.taker_address.clone(),
taker_punish_params,
),
oracle_pk,
(
model::cfd::Cfd::CET_TIMELOCK,
cfd.refund_timelock_in_blocks(),
),
payouts,
sk,
)
.context("Failed to create new CFD transactions")?;
sink.send(RollOverMsg::Msg1(RollOverMsg1::from(own_cfd_txs.clone())))
.await
.context("Failed to send Msg1")?;
let msg1 = stream
.select_next_some()
.await
.try_into_msg1()
.context("Failed to read Msg1")?;
let lock_amount = taker_lock_amount + maker_lock_amount;
let commit_desc = commit_descriptor(
(
maker_identity,
maker_punish_params.revocation_pk,
maker_punish_params.publish_pk,
),
(
taker_identity,
taker_punish_params.revocation_pk,
taker_punish_params.publish_pk,
),
);
let own_cets = own_cfd_txs.cets;
let commit_tx = own_cfd_txs.commit.0.clone();
let commit_amount = Amount::from_sat(commit_tx.output[0].value);
verify_adaptor_signature(
&commit_tx,
&dlc.lock.1,
lock_amount,
&msg1.commit,
&publish_pk,
&dlc.identity_counterparty,
)
.context("Commit adaptor signature does not verify")?;
let other_address = match our_role {
Role::Maker => dlc.taker_address.clone(),
Role::Taker => dlc.maker_address.clone(),
};
for own_grouped_cets in &own_cets {
let other_cets = msg1
.cets
.get(&own_grouped_cets.event.id)
.context("Expect event to exist in msg")?;
verify_cets(
(&oracle_pk, &announcement.nonce_pks),
&PartyParams {
lock_psbt: lock_tx.clone(),
identity_pk: dlc.identity_counterparty,
lock_amount,
address: other_address.clone(),
},
own_grouped_cets.cets.as_slice(),
other_cets.as_slice(),
&commit_desc,
commit_amount,
)
.context("CET signatures don't verify")?;
}
let refund_tx = own_cfd_txs.refund.0;
verify_signature(
&refund_tx,
&commit_desc,
commit_amount,
&msg1.refund,
&dlc.identity_counterparty,
)
.context("Refund signature does not verify")?;
let cets = own_cets
.into_iter()
.map(|grouped_cets| {
let event_id = grouped_cets.event.id;
let other_cets = msg1
.cets
.get(&event_id)
.with_context(|| format!("Counterparty CETs for event {} missing", event_id))?;
let cets = grouped_cets
.cets
.into_iter()
.map(|(tx, _, digits)| {
let other_encsig = other_cets
.iter()
.find_map(|(other_range, other_encsig)| {
(other_range == &digits.range()).then(|| other_encsig)
})
.with_context(|| {
format!(
"Missing counterparty adaptor signature for CET corresponding to
price range {:?}",
digits.range()
)
})?;
Ok(Cet {
tx,
adaptor_sig: *other_encsig,
range: digits.range(),
n_bits: digits.len(),
})
})
.collect::<Result<Vec<_>>>()?;
Ok((OracleEventId(event_id), cets))
})
.collect::<Result<HashMap<_, _>>>()?;
// reveal revocation secrets to the other party
sink.send(RollOverMsg::Msg2(RollOverMsg2 {
revocation_sk: dlc.revocation,
}))
.await
.context("Failed to send Msg1")?;
let msg2 = stream
.select_next_some()
.await
.try_into_msg2()
.context("Failed to read Msg1")?;
let revocation_sk_theirs = msg2.revocation_sk;
{
let derived_rev_pk = PublicKey::new(secp256k1_zkp::PublicKey::from_secret_key(
SECP256K1,
&revocation_sk_theirs,
));
if derived_rev_pk != dlc.revocation_pk_counterparty {
anyhow::bail!("Counterparty sent invalid revocation sk");
}
}
let mut revoked_commit = dlc.revoked_commit;
revoked_commit.push(RevokedCommit {
encsig_ours: own_cfd_txs.commit.1,
revocation_sk_theirs,
publication_pk_theirs: dlc.publish_pk_counterparty,
txid: dlc.commit.0.txid(),
script_pubkey: dlc.commit.2.script_pubkey(),
});
Ok(Dlc {
identity: sk,
identity_counterparty: dlc.identity_counterparty,
revocation: rev_sk,
revocation_pk_counterparty: other_punish_params.revocation_pk,
publish: publish_sk,
publish_pk_counterparty: other_punish_params.publish_pk,
maker_address: dlc.maker_address,
taker_address: dlc.taker_address,
lock: dlc.lock.clone(),
commit: (commit_tx, msg1.commit, commit_desc),
cets,
refund: (refund_tx, msg1.refund),
maker_lock_amount,
taker_lock_amount,
revoked_commit,
})
}

163
daemon/src/taker_cfd.rs

@ -10,7 +10,7 @@ use crate::model::cfd::{
use crate::model::{OracleEventId, Usd};
use crate::monitor::{self, MonitorParams};
use crate::wallet::Wallet;
use crate::wire::SetupMsg;
use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg};
use crate::{oracle, send_to_socket, setup_contract, wire};
use anyhow::{Context as _, Result};
use async_trait::async_trait;
@ -46,13 +46,25 @@ pub struct CfdSetupCompleted {
pub dlc: Result<Dlc>,
}
pub struct CfdRollOverCompleted {
pub order_id: OrderId,
pub dlc: Result<Dlc>,
}
pub struct Commit {
pub order_id: OrderId,
}
enum SetupState {
Active {
sender: mpsc::UnboundedSender<wire::SetupMsg>,
sender: mpsc::UnboundedSender<SetupMsg>,
},
None,
}
enum RollOverState {
Active {
sender: mpsc::UnboundedSender<RollOverMsg>,
},
None,
}
@ -67,6 +79,7 @@ pub struct Actor {
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>,
monitor_actor: Address<monitor::Actor<Actor>>,
setup_state: SetupState,
roll_over_state: RollOverState,
latest_announcements: Option<BTreeMap<OracleEventId, oracle::Announcement>>,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>,
current_pending_proposals: UpdateCfdProposals,
@ -95,6 +108,7 @@ impl Actor {
send_to_maker,
monitor_actor,
setup_state: SetupState::None,
roll_over_state: RollOverState::None,
oracle_actor,
latest_announcements: None,
current_pending_proposals: HashMap::new(),
@ -334,6 +348,68 @@ impl Actor {
Ok(())
}
async fn handle_roll_over_accepted(
&mut self,
order_id: OrderId,
oracle_event_id: OracleEventId,
ctx: &mut Context<Self>,
) -> Result<()> {
tracing::info!(%order_id, "Roll over request got accepted");
let (sender, receiver) = mpsc::unbounded();
if let RollOverState::Active { .. } = self.roll_over_state {
anyhow::bail!("Already rolling over a contract!")
}
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let dlc = cfd.open_dlc().context("CFD was in wrong state")?;
// TODO: we need to get multiple announcements for the next 24h
let announcement = self
.latest_announcements
.clone()
.context("Cannot roll over because no announcement from oracle was found")?
.get(&oracle_event_id)
.context("Empty list of announcements")?
.clone();
self.oracle_actor
.do_send_async(oracle::MonitorEvent {
event_id: announcement.id.clone(),
})
.await?;
let contract_future = setup_contract::roll_over(
self.send_to_maker
.clone()
.into_sink()
.with(|msg| future::ok(wire::TakerToMaker::RollOverProtocol(msg))),
receiver,
(self.oracle_pk, announcement),
cfd,
Role::Taker,
dlc,
);
let this = ctx
.address()
.expect("actor to be able to give address to itself");
self.roll_over_state = RollOverState::Active { sender };
tokio::spawn(async move {
let dlc = contract_future.await;
this.do_send_async(CfdRollOverCompleted { order_id, dlc })
.await
});
Ok(())
}
async fn handle_settlement_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::info!(%order_id, "Settlement proposal got rejected");
@ -342,6 +418,16 @@ impl Actor {
Ok(())
}
async fn handle_roll_over_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Roll over request rejected");
// TODO: tell UI that roll over was rejected
// this is not too bad as we are still monitoring for the CFD to expiry
// the taker can just try to ask again :)
Ok(())
}
async fn handle_inc_protocol_msg(&mut self, msg: SetupMsg) -> Result<()> {
match &mut self.setup_state {
SetupState::Active { sender } => {
@ -355,6 +441,19 @@ impl Actor {
Ok(())
}
async fn handle_inc_roll_over_msg(&mut self, msg: RollOverMsg) -> Result<()> {
match &mut self.roll_over_state {
RollOverState::Active { sender } => {
sender.send(msg).await?;
}
RollOverState::None => {
anyhow::bail!("Received message without an active roll_over setup")
}
}
Ok(())
}
async fn handle_cfd_setup_completed(
&mut self,
order_id: OrderId,
@ -404,6 +503,43 @@ impl Actor {
Ok(())
}
async fn handle_cfd_roll_over_completed(
&mut self,
order_id: OrderId,
dlc: Result<Dlc>,
) -> Result<()> {
let dlc = dlc.context("Failed to roll over contract with maker")?;
self.roll_over_state = RollOverState::None;
let mut conn = self.db.acquire().await?;
insert_new_cfd_state_by_order_id(
order_id,
CfdState::Open {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc: dlc.clone(),
attestation: None,
},
&mut conn,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
self.monitor_actor
.do_send_async(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::from_dlc_and_timelocks(dlc, cfd.refund_timelock_in_blocks()),
})
.await?;
Ok(())
}
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let order_id = event.order_id();
@ -571,6 +707,18 @@ impl Handler<MakerStreamMessage> for Actor {
wire::MakerToTaker::Protocol(setup_msg) => {
log_error!(self.handle_inc_protocol_msg(setup_msg))
}
wire::MakerToTaker::ConfirmRollOver {
order_id,
oracle_event_id,
} => {
log_error!(self.handle_roll_over_accepted(order_id, oracle_event_id, ctx))
}
wire::MakerToTaker::RejectRollOver(order_id) => {
log_error!(self.handle_roll_over_rejected(order_id))
}
MakerToTaker::RollOverProtocol(roll_over_msg) => {
log_error!(self.handle_inc_roll_over_msg(roll_over_msg))
}
}
KeepRunning::Yes
@ -584,6 +732,13 @@ impl Handler<CfdSetupCompleted> for Actor {
}
}
#[async_trait]
impl Handler<CfdRollOverCompleted> for Actor {
async fn handle(&mut self, msg: CfdRollOverCompleted, _ctx: &mut Context<Self>) {
log_error!(self.handle_cfd_roll_over_completed(msg.order_id, msg.dlc));
}
}
#[async_trait]
impl Handler<monitor::Event> for Actor {
async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context<Self>) {
@ -633,6 +788,10 @@ impl Message for CfdSetupCompleted {
type Result = ();
}
impl Message for CfdRollOverCompleted {
type Result = ();
}
impl Message for Commit {
type Result = ();
}

90
daemon/src/wire.rs

@ -1,12 +1,12 @@
use crate::model::cfd::OrderId;
use crate::model::Usd;
use crate::model::{OracleEventId, Usd};
use crate::Order;
use anyhow::{bail, Result};
use bdk::bitcoin::secp256k1::Signature;
use bdk::bitcoin::util::psbt::PartiallySignedTransaction;
use bdk::bitcoin::{Address, Amount, PublicKey};
use bytes::BytesMut;
use cfd_protocol::secp256k1_zkp::EcdsaAdaptorSignature;
use cfd_protocol::secp256k1_zkp::{EcdsaAdaptorSignature, SecretKey};
use cfd_protocol::{CfdTransactions, PartyParams, PunishParams};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
@ -38,6 +38,7 @@ pub enum TakerToMaker {
timestamp: SystemTime,
},
Protocol(SetupMsg),
RollOverProtocol(RollOverMsg),
}
impl fmt::Display for TakerToMaker {
@ -47,6 +48,7 @@ impl fmt::Display for TakerToMaker {
TakerToMaker::ProposeSettlement { .. } => write!(f, "ProposeSettlement"),
TakerToMaker::Protocol(_) => write!(f, "Protocol"),
TakerToMaker::ProposeRollOver { .. } => write!(f, "ProposeRollOver"),
TakerToMaker::RollOverProtocol(_) => write!(f, "RollOverProtocol"),
}
}
}
@ -62,6 +64,12 @@ pub enum MakerToTaker {
RejectSettlement(OrderId),
InvalidOrderId(OrderId),
Protocol(SetupMsg),
RollOverProtocol(RollOverMsg),
ConfirmRollOver {
order_id: OrderId,
oracle_event_id: OracleEventId,
},
RejectRollOver(OrderId),
}
impl fmt::Display for MakerToTaker {
@ -74,6 +82,9 @@ impl fmt::Display for MakerToTaker {
MakerToTaker::RejectSettlement(_) => write!(f, "RejectSettlement"),
MakerToTaker::InvalidOrderId(_) => write!(f, "InvalidOrderId"),
MakerToTaker::Protocol(_) => write!(f, "Protocol"),
MakerToTaker::ConfirmRollOver { .. } => write!(f, "ConfirmRollOver"),
MakerToTaker::RejectRollOver(_) => write!(f, "RejectRollOver"),
MakerToTaker::RollOverProtocol(_) => write!(f, "RollOverProtocol"),
}
}
}
@ -270,3 +281,78 @@ impl From<CfdTransactions> for Msg1 {
pub struct Msg2 {
pub signed_lock: PartiallySignedTransaction, // TODO: Use binary representation
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type", content = "payload")]
pub enum RollOverMsg {
Msg0(RollOverMsg0),
Msg1(RollOverMsg1),
Msg2(RollOverMsg2),
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RollOverMsg0 {
pub revocation_pk: PublicKey,
pub publish_pk: PublicKey,
}
impl RollOverMsg {
pub fn try_into_msg0(self) -> Result<RollOverMsg0> {
if let Self::Msg0(v) = self {
Ok(v)
} else {
bail!("Not Msg0")
}
}
pub fn try_into_msg1(self) -> Result<RollOverMsg1> {
if let Self::Msg1(v) = self {
Ok(v)
} else {
bail!("Not Msg1")
}
}
pub fn try_into_msg2(self) -> Result<RollOverMsg2> {
if let Self::Msg2(v) = self {
Ok(v)
} else {
bail!("Not Msg2")
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RollOverMsg1 {
pub commit: EcdsaAdaptorSignature,
pub cets: HashMap<String, Vec<(RangeInclusive<u64>, EcdsaAdaptorSignature)>>,
pub refund: Signature,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RollOverMsg2 {
pub revocation_sk: SecretKey,
}
impl From<CfdTransactions> for RollOverMsg1 {
fn from(txs: CfdTransactions) -> Self {
let cets = txs
.cets
.into_iter()
.map(|grouped_cets| {
(
grouped_cets.event.id,
grouped_cets
.cets
.into_iter()
.map(|(_, encsig, digits)| (digits.range(), encsig))
.collect::<Vec<_>>(),
)
})
.collect::<HashMap<_, _>>();
Self {
commit: txs.commit.1,
cets,
refund: txs.refund.1,
}
}
}

Loading…
Cancel
Save