diff --git a/daemon/tests/happy_path.rs b/daemon/tests/happy_path.rs index 914f0fe..5f34413 100644 --- a/daemon/tests/happy_path.rs +++ b/daemon/tests/happy_path.rs @@ -1,16 +1,13 @@ +use crate::harness::flow::{is_next_none, next_cfd, next_order, next_some}; use crate::harness::mocks::{ mock_oracle_annoucement, mock_party_params, mock_wallet_sign_and_broadcast, }; use crate::harness::start_both; -use anyhow::Context; use daemon::maker_cfd; -use daemon::model::cfd::{Cfd, CfdState, Order, Origin}; +use daemon::model::cfd::{CfdState, Order, Origin}; use daemon::model::{Price, Usd}; -use daemon::tokio_ext::FutureExt; use maia::secp256k1_zkp::schnorrsig; use rust_decimal_macros::dec; -use std::time::Duration; -use tokio::sync::watch; use tracing::subscriber::DefaultGuard; use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::util::SubscriberInitExt; @@ -130,69 +127,6 @@ fn dummy_new_order() -> maker_cfd::NewOrder { } } -/// Returns the first `Cfd` from both channels -/// -/// Ensures that there is only one `Cfd` present in both channels. -async fn next_cfd( - rx_a: &mut watch::Receiver>, - rx_b: &mut watch::Receiver>, -) -> (Cfd, Cfd) { - let (a, b) = tokio::join!(next(rx_a), next(rx_b)); - - assert_eq!(a.len(), 1); - assert_eq!(b.len(), 1); - - (a.first().unwrap().clone(), b.first().unwrap().clone()) -} - -async fn next_order( - rx_a: &mut watch::Receiver>, - rx_b: &mut watch::Receiver>, -) -> (Order, Order) { - let (a, b) = tokio::join!(next_some(rx_a), next_some(rx_b)); - - (a, b) -} - -/// Returns the value if the next Option received on the stream is Some -/// -/// Panics if None is received on the stream. -async fn next_some(rx: &mut watch::Receiver>) -> T -where - T: Clone, -{ - if let Some(value) = next(rx).await { - value - } else { - panic!("Received None when Some was expected") - } -} - -/// Returns true if the next Option received on the stream is None -/// -/// Returns false if Some is received. -async fn is_next_none(rx: &mut watch::Receiver>) -> bool -where - T: Clone, -{ - next(rx).await.is_none() -} - -/// Returns watch channel value upon change -async fn next(rx: &mut watch::Receiver) -> T -where - T: Clone, -{ - // TODO: Make timeout configurable, only contract setup can take up to 2 min on CI - rx.changed() - .timeout(Duration::from_secs(120)) - .await - .context("Waiting for next element in channel is taking too long, aborting") - .unwrap() - .unwrap(); - rx.borrow().clone() -} - fn init_tracing() -> DefaultGuard { let filter = EnvFilter::from_default_env() // apply warning level globally diff --git a/daemon/tests/harness/flow.rs b/daemon/tests/harness/flow.rs new file mode 100644 index 0000000..e7165e0 --- /dev/null +++ b/daemon/tests/harness/flow.rs @@ -0,0 +1,69 @@ +use std::time::Duration; + +use anyhow::Context; +use daemon::model::cfd::{Cfd, Order}; +use daemon::tokio_ext::FutureExt; +use tokio::sync::watch; + +/// Returns the first `Cfd` from both channels +/// +/// Ensures that there is only one `Cfd` present in both channels. +pub async fn next_cfd( + rx_a: &mut watch::Receiver>, + rx_b: &mut watch::Receiver>, +) -> (Cfd, Cfd) { + let (a, b) = tokio::join!(next(rx_a), next(rx_b)); + + assert_eq!(a.len(), 1); + assert_eq!(b.len(), 1); + + (a.first().unwrap().clone(), b.first().unwrap().clone()) +} + +pub async fn next_order( + rx_a: &mut watch::Receiver>, + rx_b: &mut watch::Receiver>, +) -> (Order, Order) { + let (a, b) = tokio::join!(next_some(rx_a), next_some(rx_b)); + + (a, b) +} + +/// Returns the value if the next Option received on the stream is Some +/// +/// Panics if None is received on the stream. +pub async fn next_some(rx: &mut watch::Receiver>) -> T +where + T: Clone, +{ + if let Some(value) = next(rx).await { + value + } else { + panic!("Received None when Some was expected") + } +} + +/// Returns true if the next Option received on the stream is None +/// +/// Returns false if Some is received. +pub async fn is_next_none(rx: &mut watch::Receiver>) -> bool +where + T: Clone, +{ + next(rx).await.is_none() +} + +/// Returns watch channel value upon change +pub async fn next(rx: &mut watch::Receiver) -> T +where + T: Clone, +{ + // TODO: Make timeout configurable, only contract setup can take up to 2 min on CI + rx.changed() + .timeout(Duration::from_secs(120)) + .await + .context("Waiting for next element in channel is taking too long, aborting") + .unwrap() + .unwrap(); + rx.borrow().clone() +} diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index 41d79f2..0f2c095 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -16,6 +16,7 @@ use xtra::spawn::TokioGlobalSpawnExt; use xtra::Actor; pub mod bdk; +pub mod flow; pub mod maia; pub mod mocks;