From 7bdc93c636f2fb8e224db95f900cd2b77bdd2ba4 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 16 Nov 2021 17:52:13 +1100 Subject: [PATCH 1/9] Reduce timeout to 10 seconds With the reduced number of payout curve entries, our tests are always fast. --- daemon/tests/harness/flow.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/daemon/tests/harness/flow.rs b/daemon/tests/harness/flow.rs index e7165e0..6478e41 100644 --- a/daemon/tests/harness/flow.rs +++ b/daemon/tests/harness/flow.rs @@ -58,9 +58,8 @@ pub async fn next(rx: &mut watch::Receiver) -> T where T: Clone, { - // TODO: Make timeout configurable, only contract setup can take up to 2 min on CI rx.changed() - .timeout(Duration::from_secs(120)) + .timeout(Duration::from_secs(10)) .await .context("Waiting for next element in channel is taking too long, aborting") .unwrap() From 1e38ec38ac16807de3525a7de22e477e8702af60 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 16 Nov 2021 17:53:27 +1100 Subject: [PATCH 2/9] `unwrap` only in the tests This makes the stacktrace more useful because it shows us where exactly it is failing. --- daemon/tests/happy_path.rs | 26 ++++++++++++++---------- daemon/tests/harness/flow.rs | 39 ++++++++++++++++-------------------- 2 files changed, 32 insertions(+), 33 deletions(-) diff --git a/daemon/tests/happy_path.rs b/daemon/tests/happy_path.rs index f630ac4..d7a70b4 100644 --- a/daemon/tests/happy_path.rs +++ b/daemon/tests/happy_path.rs @@ -11,14 +11,14 @@ async fn taker_receives_order_from_maker_on_publication() { let _guard = init_tracing(); let (mut maker, mut taker) = start_both().await; - assert!(is_next_none(taker.order_feed()).await); + assert!(is_next_none(taker.order_feed()).await.unwrap()); maker.publish_order(dummy_new_order()).await; let (published, received) = tokio::join!(next_some(maker.order_feed()), next_some(taker.order_feed())); - assert_is_same_order(&published, &received); + assert_is_same_order(&published.unwrap(), &received.unwrap()); } #[tokio::test] @@ -27,15 +27,17 @@ async fn taker_takes_order_and_maker_rejects() { let (mut maker, mut taker) = start_both().await; // TODO: Why is this needed? For the cfd stream it is not needed - is_next_none(taker.order_feed()).await; + is_next_none(taker.order_feed()).await.unwrap(); maker.publish_order(dummy_new_order()).await; - let (_, received) = next_order(maker.order_feed(), taker.order_feed()).await; + let (_, received) = next_order(maker.order_feed(), taker.order_feed()) + .await + .unwrap(); taker.take_order(received.clone(), Usd::new(dec!(10))).await; - let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await; + let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); assert_is_same_order(&taker_cfd.order, &received); assert_is_same_order(&maker_cfd.order, &received); assert!(matches!( @@ -49,7 +51,7 @@ async fn taker_takes_order_and_maker_rejects() { maker.reject_take_request(received.clone()).await; - let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await; + let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); // TODO: More elaborate Cfd assertions assert_is_same_order(&taker_cfd.order, &received); assert_is_same_order(&maker_cfd.order, &received); @@ -62,14 +64,16 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() { let _guard = init_tracing(); let (mut maker, mut taker) = start_both().await; - is_next_none(taker.order_feed()).await; + is_next_none(taker.order_feed()).await.unwrap(); maker.publish_order(dummy_new_order()).await; - let (_, received) = next_order(maker.order_feed(), taker.order_feed()).await; + let (_, received) = next_order(maker.order_feed(), taker.order_feed()) + .await + .unwrap(); taker.take_order(received.clone(), Usd::new(dec!(5))).await; - let (_, _) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await; + let (_, _) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); maker.mocks.mock_oracle_annoucement().await; taker.mocks.mock_oracle_annoucement().await; @@ -79,7 +83,7 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() { maker.accept_take_request(received.clone()).await; - let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await; + let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); // TODO: More elaborate Cfd assertions assert_eq!(taker_cfd.order.id, received.id); assert_eq!(maker_cfd.order.id, received.id); @@ -89,7 +93,7 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() { maker.mocks.mock_wallet_sign_and_broadcast().await; taker.mocks.mock_wallet_sign_and_broadcast().await; - let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await; + let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); // TODO: More elaborate Cfd assertions assert_eq!(taker_cfd.order.id, received.id); assert_eq!(maker_cfd.order.id, received.id); diff --git a/daemon/tests/harness/flow.rs b/daemon/tests/harness/flow.rs index 6478e41..fc53d8b 100644 --- a/daemon/tests/harness/flow.rs +++ b/daemon/tests/harness/flow.rs @@ -1,8 +1,7 @@ -use std::time::Duration; - -use anyhow::Context; +use anyhow::{Context, Result}; use daemon::model::cfd::{Cfd, Order}; use daemon::tokio_ext::FutureExt; +use std::time::Duration; use tokio::sync::watch; /// Returns the first `Cfd` from both channels @@ -11,58 +10,54 @@ use tokio::sync::watch; pub async fn next_cfd( rx_a: &mut watch::Receiver>, rx_b: &mut watch::Receiver>, -) -> (Cfd, Cfd) { +) -> Result<(Cfd, Cfd)> { let (a, b) = tokio::join!(next(rx_a), next(rx_b)); + let (a, b) = (a?, b?); assert_eq!(a.len(), 1); assert_eq!(b.len(), 1); - (a.first().unwrap().clone(), b.first().unwrap().clone()) + Ok((a.first().unwrap().clone(), b.first().unwrap().clone())) } pub async fn next_order( rx_a: &mut watch::Receiver>, rx_b: &mut watch::Receiver>, -) -> (Order, Order) { +) -> Result<(Order, Order)> { let (a, b) = tokio::join!(next_some(rx_a), next_some(rx_b)); - (a, b) + Ok((a?, b?)) } /// Returns the value if the next Option received on the stream is Some -/// -/// Panics if None is received on the stream. -pub async fn next_some(rx: &mut watch::Receiver>) -> T +pub async fn next_some(rx: &mut watch::Receiver>) -> Result where T: Clone, { - if let Some(value) = next(rx).await { - value - } else { - panic!("Received None when Some was expected") - } + next(rx) + .await? + .context("Received None when Some was expected") } /// Returns true if the next Option received on the stream is None /// /// Returns false if Some is received. -pub async fn is_next_none(rx: &mut watch::Receiver>) -> bool +pub async fn is_next_none(rx: &mut watch::Receiver>) -> Result where T: Clone, { - next(rx).await.is_none() + Ok(next(rx).await?.is_none()) } /// Returns watch channel value upon change -pub async fn next(rx: &mut watch::Receiver) -> T +pub async fn next(rx: &mut watch::Receiver) -> Result where T: Clone, { rx.changed() .timeout(Duration::from_secs(10)) .await - .context("Waiting for next element in channel is taking too long, aborting") - .unwrap() - .unwrap(); - rx.borrow().clone() + .context("No change in channel within 10 seconds")??; + + Ok(rx.borrow().clone()) } From 89576974e2115c501aec171116750df6865f7b9d Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 16 Nov 2021 16:33:29 +1100 Subject: [PATCH 3/9] Inline `handle_new_order` This indirection is unnecessary. --- daemon/src/maker_cfd.rs | 78 ++++++++++++++++++----------------------- 1 file changed, 35 insertions(+), 43 deletions(-) diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index bace1fa..e3f7cd4 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -620,47 +620,6 @@ where } } -impl Actor -where - T: xtra::Handler, -{ - async fn handle_new_order( - &mut self, - price: Price, - min_quantity: Usd, - max_quantity: Usd, - ) -> Result<()> { - let oracle_event_id = oracle::next_announcement_after( - time::OffsetDateTime::now_utc() + self.settlement_time_interval_hours, - )?; - - let order = Order::new( - price, - min_quantity, - max_quantity, - Origin::Ours, - oracle_event_id, - self.settlement_time_interval_hours, - )?; - - // 1. Save to DB - let mut conn = self.db.acquire().await?; - insert_order(&order, &mut conn).await?; - - // 2. Update actor state to current order - self.current_order_id.replace(order.id); - - // 3. Notify UI via feed - self.order_feed_sender.send(Some(order.clone()))?; - - // 4. Inform connected takers - self.takers - .do_send_async(maker_inc_connections::BroadcastOrder(Some(order))) - .await?; - Ok(()) - } -} - impl Actor where O: xtra::Handler, @@ -964,8 +923,41 @@ where T: xtra::Handler, { async fn handle(&mut self, msg: NewOrder, _ctx: &mut Context) -> Result<()> { - self.handle_new_order(msg.price, msg.min_quantity, msg.max_quantity) - .await + let NewOrder { + price, + min_quantity, + max_quantity, + } = msg; + + let oracle_event_id = oracle::next_announcement_after( + time::OffsetDateTime::now_utc() + self.settlement_time_interval_hours, + )?; + + let order = Order::new( + price, + min_quantity, + max_quantity, + Origin::Ours, + oracle_event_id, + self.settlement_time_interval_hours, + )?; + + // 1. Save to DB + let mut conn = self.db.acquire().await?; + insert_order(&order, &mut conn).await?; + + // 2. Update actor state to current order + self.current_order_id.replace(order.id); + + // 3. Notify UI via feed + self.order_feed_sender.send(Some(order.clone()))?; + + // 4. Inform connected takers + self.takers + .do_send_async(maker_inc_connections::BroadcastOrder(Some(order))) + .await?; + + Ok(()) } } From 561b189de13f265ebc72ce3fde62f79b41894afc Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 15 Nov 2021 18:18:29 +1100 Subject: [PATCH 4/9] Remove unnecessary `Result` return value --- daemon/src/maker_inc_connections.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index 1b299f3..3dbb69b 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -158,14 +158,12 @@ impl Actor { #[xtra_productivity] impl Actor { - async fn handle_broadcast_order(&mut self, msg: BroadcastOrder) -> Result<()> { + async fn handle_broadcast_order(&mut self, msg: BroadcastOrder) { let order = msg.0; for taker_id in self.write_connections.clone().keys() { self.send_to_taker(taker_id, wire::MakerToTaker::CurrentOrder(order.clone())).await.expect("send_to_taker only fails on missing hashmap entry and we are iterating over those entries"); tracing::trace!(%taker_id, "sent new order: {:?}", order.as_ref().map(|o| o.id)); } - - Ok(()) } async fn handle_taker_message(&mut self, msg: TakerMessage) -> Result<()> { From ca2bd5e2910a99e3b7910b1ac3db4fbf6de64a97 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 15 Nov 2021 18:21:06 +1100 Subject: [PATCH 5/9] Statically ensure that our only failure mode is `NoConnection` By creating a dedicated error type, we can communicate that the only failure mode of this function is a missing connection. --- daemon/src/maker_inc_connections.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index 3dbb69b..7cd5e0a 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -2,7 +2,7 @@ use crate::maker_cfd::{FromTaker, NewTakerOnline}; use crate::model::cfd::{Order, OrderId}; use crate::model::{BitMexPriceEventId, TakerId}; use crate::{forward_only_ok, maker_cfd, noise, send_to_socket, wire, HEARTBEAT_INTERVAL}; -use anyhow::{Context as AnyhowContext, Result}; +use anyhow::Result; use futures::{StreamExt, TryStreamExt}; use std::collections::HashMap; use std::io; @@ -84,11 +84,15 @@ impl Actor { } } - async fn send_to_taker(&mut self, taker_id: &TakerId, msg: wire::MakerToTaker) -> Result<()> { + async fn send_to_taker( + &mut self, + taker_id: &TakerId, + msg: wire::MakerToTaker, + ) -> Result<(), NoConnection> { let conn = self .write_connections .get(taker_id) - .context("no connection to taker_id")?; + .ok_or_else(|| NoConnection(*taker_id))?; let msg_str = msg.to_string(); @@ -156,6 +160,10 @@ impl Actor { } } +#[derive(Debug, thiserror::Error)] +#[error("No connection to taker {0}")] +pub struct NoConnection(TakerId); + #[xtra_productivity] impl Actor { async fn handle_broadcast_order(&mut self, msg: BroadcastOrder) { @@ -166,7 +174,7 @@ impl Actor { } } - async fn handle_taker_message(&mut self, msg: TakerMessage) -> Result<()> { + async fn handle_taker_message(&mut self, msg: TakerMessage) -> Result<(), NoConnection> { match msg.command { TakerCommand::SendOrder { order } => { self.send_to_taker(&msg.taker_id, wire::MakerToTaker::CurrentOrder(order)) @@ -221,6 +229,7 @@ impl Actor { .await?; } } + Ok(()) } From b1f6e9e1db0239c22c057b2597b8f88b71cd896c Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 16 Nov 2021 17:42:50 +1100 Subject: [PATCH 6/9] Generalize logging for received messages --- daemon/src/connection.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/daemon/src/connection.rs b/daemon/src/connection.rs index 57bb3b8..5ce4448 100644 --- a/daemon/src/connection.rs +++ b/daemon/src/connection.rs @@ -136,9 +136,10 @@ impl Actor { } }; + tracing::trace!("Received '{}'", msg); + match msg { wire::MakerToTaker::Heartbeat => { - tracing::trace!("received a heartbeat message from maker"); self.connected_state .as_mut() .expect("wire messages only to arrive in connected state") From 900d9560e2d17076cfc659e9a9032dd43ef80b92 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 16 Nov 2021 17:43:03 +1100 Subject: [PATCH 7/9] Log readiness of actor systems --- daemon/src/lib.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index dda322c..c8f7014 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -148,6 +148,8 @@ where oracle_addr.do_send_async(oracle::Sync).await?; + tracing::debug!("Maker actor system ready"); + Ok(Self { cfd_actor_addr, cfd_feed_receiver, @@ -254,6 +256,8 @@ where tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor)))); + tracing::debug!("Taker actor system ready"); + Ok(Self { cfd_actor_addr, connection_actor_addr, From 05a883297f76109fad9dc6060df589ecb6c1d430 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 16 Nov 2021 17:43:30 +1100 Subject: [PATCH 8/9] Prettify log message for sending message --- daemon/src/send_to_socket.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daemon/src/send_to_socket.rs b/daemon/src/send_to_socket.rs index 765fb29..3fc899f 100644 --- a/daemon/src/send_to_socket.rs +++ b/daemon/src/send_to_socket.rs @@ -33,7 +33,7 @@ where async fn handle(&mut self, message: T, ctx: &mut xtra::Context) { let message_name = message.to_string(); // send consumes the message, avoid a clone just in case it errors by getting the name here - tracing::trace!(%message_name, "send to socket message"); + tracing::trace!("Sending '{}'", message_name); if let Err(e) = self.write.send(message).await { tracing::error!("Failed to write message {} to socket: {}", message_name, e); From 573a950527d1fbfdef618fe0bb9458dec8f62cb8 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 16 Nov 2021 17:43:49 +1100 Subject: [PATCH 9/9] Remove left-over dbg! --- daemon/tests/harness/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index 176a07c..7c180d2 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -91,7 +91,6 @@ impl Maker { let listener_stream = futures::stream::poll_fn(move |ctx| { let message = match futures::ready!(listener.poll_accept(ctx)) { Ok((stream, address)) => { - dbg!("new connection"); maker_inc_connections::ListenerMessage::NewConnection { stream, address } } Err(e) => maker_inc_connections::ListenerMessage::Error { source: e },