Browse Source

Merge #356

356: Add timeout to contract-setup r=thomaseizinger a=thomaseizinger

- Remove stale TODO
- Add log statements to sections of contract-setup
- Add timeouts to awaiting messages during contract setup
- Fix typo
- Add timeouts to rollover messages


Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
debug-statements
bors[bot] 3 years ago
committed by GitHub
parent
commit
52530a9f4b
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 32
      daemon/src/setup_contract.rs
  2. 15
      daemon/src/tokio_ext.rs

32
daemon/src/setup_contract.rs

@ -1,4 +1,5 @@
use crate::model::cfd::{Cet, Cfd, Dlc, RevokedCommit, Role}; use crate::model::cfd::{Cet, Cfd, Dlc, RevokedCommit, Role};
use crate::tokio_ext::FutureExt;
use crate::wallet::Wallet; use crate::wallet::Wallet;
use crate::wire::{ use crate::wire::{
Msg0, Msg1, Msg2, RollOverMsg, RollOverMsg0, RollOverMsg1, RollOverMsg2, SetupMsg, Msg0, Msg1, Msg2, RollOverMsg, RollOverMsg0, RollOverMsg1, RollOverMsg2, SetupMsg,
@ -21,13 +22,10 @@ use futures::{Sink, SinkExt, StreamExt};
use std::collections::HashMap; use std::collections::HashMap;
use std::iter::FromIterator; use std::iter::FromIterator;
use std::ops::RangeInclusive; use std::ops::RangeInclusive;
use std::time::Duration;
/// Given an initial set of parameters, sets up the CFD contract with /// Given an initial set of parameters, sets up the CFD contract with
/// the other party. /// the other party.
///
/// TODO: Replace `nonce_pks` argument with set of
/// `daemon::oracle::Announcement`, which can be mapped into
/// `cfd_protocol::Announcement`.
pub async fn new( pub async fn new(
mut sink: impl Sink<SetupMsg, Error = anyhow::Error> + Unpin, mut sink: impl Sink<SetupMsg, Error = anyhow::Error> + Unpin,
mut stream: impl FusedStream<Item = SetupMsg> + Unpin, mut stream: impl FusedStream<Item = SetupMsg> + Unpin,
@ -56,10 +54,14 @@ pub async fn new(
.context("Failed to send Msg0")?; .context("Failed to send Msg0")?;
let msg0 = stream let msg0 = stream
.select_next_some() .select_next_some()
.timeout(Duration::from_secs(60))
.await .await
.context("Expected Msg0 within 60 seconds")?
.try_into_msg0() .try_into_msg0()
.context("Failed to read Msg0")?; .context("Failed to read Msg0")?;
tracing::info!("Exchanged setup parameters");
let (other, other_punish) = msg0.into(); let (other, other_punish) = msg0.into();
let params = AllParams::new(own_params, own_punish, other, other_punish, role); let params = AllParams::new(own_params, own_punish, other, other_punish, role);
@ -90,16 +92,22 @@ pub async fn new(
) )
.context("Failed to create CFD transactions")?; .context("Failed to create CFD transactions")?;
tracing::info!("Created CFD transactions");
sink.send(SetupMsg::Msg1(Msg1::from(own_cfd_txs.clone()))) sink.send(SetupMsg::Msg1(Msg1::from(own_cfd_txs.clone())))
.await .await
.context("Failed to send Msg1")?; .context("Failed to send Msg1")?;
let msg1 = stream let msg1 = stream
.select_next_some() .select_next_some()
.timeout(Duration::from_secs(60))
.await .await
.context("Expected Msg1 within 60 seconds")?
.try_into_msg1() .try_into_msg1()
.context("Failed to read Msg1")?; .context("Failed to read Msg1")?;
tracing::info!("Exchanged CFD transactions");
let lock_desc = lock_descriptor(params.maker().identity_pk, params.taker().identity_pk); let lock_desc = lock_descriptor(params.maker().identity_pk, params.taker().identity_pk);
let lock_amount = params.maker().lock_amount + params.taker().lock_amount; let lock_amount = params.maker().lock_amount + params.taker().lock_amount;
@ -161,6 +169,8 @@ pub async fn new(
) )
.context("Refund signature does not verify")?; .context("Refund signature does not verify")?;
tracing::info!("Verified all signatures");
let mut signed_lock_tx = wallet.sign(lock_tx).await?; let mut signed_lock_tx = wallet.sign(lock_tx).await?;
sink.send(SetupMsg::Msg2(Msg2 { sink.send(SetupMsg::Msg2(Msg2 {
signed_lock: signed_lock_tx.clone(), signed_lock: signed_lock_tx.clone(),
@ -169,7 +179,9 @@ pub async fn new(
.context("Failed to send Msg2")?; .context("Failed to send Msg2")?;
let msg2 = stream let msg2 = stream
.select_next_some() .select_next_some()
.timeout(Duration::from_secs(60))
.await .await
.context("Expected Msg2 within 60 seconds")?
.try_into_msg2() .try_into_msg2()
.context("Failed to read Msg2")?; .context("Failed to read Msg2")?;
signed_lock_tx signed_lock_tx
@ -216,6 +228,8 @@ pub async fn new(
}) })
.collect::<Result<HashMap<_, _>>>()?; .collect::<Result<HashMap<_, _>>>()?;
tracing::info!("Exchanged signed lock transaction");
Ok(Dlc { Ok(Dlc {
identity: sk, identity: sk,
identity_counterparty: params.other.identity_pk, identity_counterparty: params.other.identity_pk,
@ -262,7 +276,9 @@ pub async fn roll_over(
.context("Failed to send Msg0")?; .context("Failed to send Msg0")?;
let msg0 = stream let msg0 = stream
.select_next_some() .select_next_some()
.timeout(Duration::from_secs(60))
.await .await
.context("Expected Msg0 within 60 seconds")?
.try_into_msg0() .try_into_msg0()
.context("Failed to read Msg0")?; .context("Failed to read Msg0")?;
@ -330,7 +346,9 @@ pub async fn roll_over(
let msg1 = stream let msg1 = stream
.select_next_some() .select_next_some()
.timeout(Duration::from_secs(60))
.await .await
.context("Expected Msg1 within 60 seconds")?
.try_into_msg1() .try_into_msg1()
.context("Failed to read Msg1")?; .context("Failed to read Msg1")?;
@ -443,13 +461,15 @@ pub async fn roll_over(
revocation_sk: dlc.revocation, revocation_sk: dlc.revocation,
})) }))
.await .await
.context("Failed to send Msg1")?; .context("Failed to send Msg2")?;
let msg2 = stream let msg2 = stream
.select_next_some() .select_next_some()
.timeout(Duration::from_secs(60))
.await .await
.context("Expected Msg2 within 60 seconds")?
.try_into_msg2() .try_into_msg2()
.context("Failed to read Msg1")?; .context("Failed to read Msg2")?;
let revocation_sk_theirs = msg2.revocation_sk; let revocation_sk_theirs = msg2.revocation_sk;
{ {

15
daemon/src/tokio_ext.rs

@ -1,5 +1,7 @@
use std::fmt; use std::fmt;
use std::future::Future; use std::future::Future;
use std::time::Duration;
use tokio::time::{timeout, Timeout};
pub fn spawn_fallible<F, E>(future: F) pub fn spawn_fallible<F, E>(future: F)
where where
@ -12,3 +14,16 @@ where
} }
}); });
} }
pub trait FutureExt: Future + Sized {
fn timeout(self, duration: Duration) -> Timeout<Self>;
}
impl<F> FutureExt for F
where
F: Future,
{
fn timeout(self, duration: Duration) -> Timeout<F> {
timeout(duration, self)
}
}

Loading…
Cancel
Save