use anyhow::{Context, Result}; use daemon::model::cfd::Cfd; use daemon::projection::CfdOrder; use daemon::tokio_ext::FutureExt; use std::time::Duration; use tokio::sync::watch; /// Waiting time for the time on the watch channel before returning error const NEXT_WAIT_TIME: Duration = Duration::from_secs(if cfg!(debug_assertions) { 180 } else { 30 }); /// Returns the first `Cfd` from both channels /// /// Ensures that there is only one `Cfd` present in both channels. pub async fn next_cfd( rx_a: &mut watch::Receiver>, rx_b: &mut watch::Receiver>, ) -> Result<(Cfd, Cfd)> { let (a, b) = tokio::join!(next(rx_a), next(rx_b)); let (a, b) = (a?, b?); assert_eq!(a.len(), 1); assert_eq!(b.len(), 1); Ok((a.first().unwrap().clone(), b.first().unwrap().clone())) } pub async fn next_order( rx_a: &mut watch::Receiver>, rx_b: &mut watch::Receiver>, ) -> Result<(CfdOrder, CfdOrder)> { let (a, b) = tokio::join!(next_some(rx_a), next_some(rx_b)); Ok((a?, b?)) } /// Returns the value if the next Option received on the stream is Some pub async fn next_some(rx: &mut watch::Receiver>) -> Result where T: Clone, { next(rx) .await? .context("Received None when Some was expected") } /// Returns true if the next Option received on the stream is None /// /// Returns false if Some is received. pub async fn is_next_none(rx: &mut watch::Receiver>) -> Result where T: Clone, { Ok(next(rx).await?.is_none()) } /// Returns watch channel value upon change pub async fn next(rx: &mut watch::Receiver) -> Result where T: Clone, { rx.changed() .timeout(NEXT_WAIT_TIME) .await .context(format!( "No change in channel within {} seconds", NEXT_WAIT_TIME.as_secs() ))??; Ok(rx.borrow().clone()) }