diff --git a/daemon/src/connection.rs b/daemon/src/connection.rs index 57bb3b8..5ce4448 100644 --- a/daemon/src/connection.rs +++ b/daemon/src/connection.rs @@ -136,9 +136,10 @@ impl Actor { } }; + tracing::trace!("Received '{}'", msg); + match msg { wire::MakerToTaker::Heartbeat => { - tracing::trace!("received a heartbeat message from maker"); self.connected_state .as_mut() .expect("wire messages only to arrive in connected state") diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index dda322c..c8f7014 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -148,6 +148,8 @@ where oracle_addr.do_send_async(oracle::Sync).await?; + tracing::debug!("Maker actor system ready"); + Ok(Self { cfd_actor_addr, cfd_feed_receiver, @@ -254,6 +256,8 @@ where tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor)))); + tracing::debug!("Taker actor system ready"); + Ok(Self { cfd_actor_addr, connection_actor_addr, diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index bace1fa..e3f7cd4 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -620,47 +620,6 @@ where } } -impl Actor -where - T: xtra::Handler, -{ - async fn handle_new_order( - &mut self, - price: Price, - min_quantity: Usd, - max_quantity: Usd, - ) -> Result<()> { - let oracle_event_id = oracle::next_announcement_after( - time::OffsetDateTime::now_utc() + self.settlement_time_interval_hours, - )?; - - let order = Order::new( - price, - min_quantity, - max_quantity, - Origin::Ours, - oracle_event_id, - self.settlement_time_interval_hours, - )?; - - // 1. Save to DB - let mut conn = self.db.acquire().await?; - insert_order(&order, &mut conn).await?; - - // 2. Update actor state to current order - self.current_order_id.replace(order.id); - - // 3. Notify UI via feed - self.order_feed_sender.send(Some(order.clone()))?; - - // 4. Inform connected takers - self.takers - .do_send_async(maker_inc_connections::BroadcastOrder(Some(order))) - .await?; - Ok(()) - } -} - impl Actor where O: xtra::Handler, @@ -964,8 +923,41 @@ where T: xtra::Handler, { async fn handle(&mut self, msg: NewOrder, _ctx: &mut Context) -> Result<()> { - self.handle_new_order(msg.price, msg.min_quantity, msg.max_quantity) - .await + let NewOrder { + price, + min_quantity, + max_quantity, + } = msg; + + let oracle_event_id = oracle::next_announcement_after( + time::OffsetDateTime::now_utc() + self.settlement_time_interval_hours, + )?; + + let order = Order::new( + price, + min_quantity, + max_quantity, + Origin::Ours, + oracle_event_id, + self.settlement_time_interval_hours, + )?; + + // 1. Save to DB + let mut conn = self.db.acquire().await?; + insert_order(&order, &mut conn).await?; + + // 2. Update actor state to current order + self.current_order_id.replace(order.id); + + // 3. Notify UI via feed + self.order_feed_sender.send(Some(order.clone()))?; + + // 4. Inform connected takers + self.takers + .do_send_async(maker_inc_connections::BroadcastOrder(Some(order))) + .await?; + + Ok(()) } } diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index 1b299f3..7cd5e0a 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -2,7 +2,7 @@ use crate::maker_cfd::{FromTaker, NewTakerOnline}; use crate::model::cfd::{Order, OrderId}; use crate::model::{BitMexPriceEventId, TakerId}; use crate::{forward_only_ok, maker_cfd, noise, send_to_socket, wire, HEARTBEAT_INTERVAL}; -use anyhow::{Context as AnyhowContext, Result}; +use anyhow::Result; use futures::{StreamExt, TryStreamExt}; use std::collections::HashMap; use std::io; @@ -84,11 +84,15 @@ impl Actor { } } - async fn send_to_taker(&mut self, taker_id: &TakerId, msg: wire::MakerToTaker) -> Result<()> { + async fn send_to_taker( + &mut self, + taker_id: &TakerId, + msg: wire::MakerToTaker, + ) -> Result<(), NoConnection> { let conn = self .write_connections .get(taker_id) - .context("no connection to taker_id")?; + .ok_or_else(|| NoConnection(*taker_id))?; let msg_str = msg.to_string(); @@ -156,19 +160,21 @@ impl Actor { } } +#[derive(Debug, thiserror::Error)] +#[error("No connection to taker {0}")] +pub struct NoConnection(TakerId); + #[xtra_productivity] impl Actor { - async fn handle_broadcast_order(&mut self, msg: BroadcastOrder) -> Result<()> { + async fn handle_broadcast_order(&mut self, msg: BroadcastOrder) { let order = msg.0; for taker_id in self.write_connections.clone().keys() { self.send_to_taker(taker_id, wire::MakerToTaker::CurrentOrder(order.clone())).await.expect("send_to_taker only fails on missing hashmap entry and we are iterating over those entries"); tracing::trace!(%taker_id, "sent new order: {:?}", order.as_ref().map(|o| o.id)); } - - Ok(()) } - async fn handle_taker_message(&mut self, msg: TakerMessage) -> Result<()> { + async fn handle_taker_message(&mut self, msg: TakerMessage) -> Result<(), NoConnection> { match msg.command { TakerCommand::SendOrder { order } => { self.send_to_taker(&msg.taker_id, wire::MakerToTaker::CurrentOrder(order)) @@ -223,6 +229,7 @@ impl Actor { .await?; } } + Ok(()) } diff --git a/daemon/src/send_to_socket.rs b/daemon/src/send_to_socket.rs index 765fb29..3fc899f 100644 --- a/daemon/src/send_to_socket.rs +++ b/daemon/src/send_to_socket.rs @@ -33,7 +33,7 @@ where async fn handle(&mut self, message: T, ctx: &mut xtra::Context) { let message_name = message.to_string(); // send consumes the message, avoid a clone just in case it errors by getting the name here - tracing::trace!(%message_name, "send to socket message"); + tracing::trace!("Sending '{}'", message_name); if let Err(e) = self.write.send(message).await { tracing::error!("Failed to write message {} to socket: {}", message_name, e); diff --git a/daemon/tests/happy_path.rs b/daemon/tests/happy_path.rs index f630ac4..d7a70b4 100644 --- a/daemon/tests/happy_path.rs +++ b/daemon/tests/happy_path.rs @@ -11,14 +11,14 @@ async fn taker_receives_order_from_maker_on_publication() { let _guard = init_tracing(); let (mut maker, mut taker) = start_both().await; - assert!(is_next_none(taker.order_feed()).await); + assert!(is_next_none(taker.order_feed()).await.unwrap()); maker.publish_order(dummy_new_order()).await; let (published, received) = tokio::join!(next_some(maker.order_feed()), next_some(taker.order_feed())); - assert_is_same_order(&published, &received); + assert_is_same_order(&published.unwrap(), &received.unwrap()); } #[tokio::test] @@ -27,15 +27,17 @@ async fn taker_takes_order_and_maker_rejects() { let (mut maker, mut taker) = start_both().await; // TODO: Why is this needed? For the cfd stream it is not needed - is_next_none(taker.order_feed()).await; + is_next_none(taker.order_feed()).await.unwrap(); maker.publish_order(dummy_new_order()).await; - let (_, received) = next_order(maker.order_feed(), taker.order_feed()).await; + let (_, received) = next_order(maker.order_feed(), taker.order_feed()) + .await + .unwrap(); taker.take_order(received.clone(), Usd::new(dec!(10))).await; - let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await; + let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); assert_is_same_order(&taker_cfd.order, &received); assert_is_same_order(&maker_cfd.order, &received); assert!(matches!( @@ -49,7 +51,7 @@ async fn taker_takes_order_and_maker_rejects() { maker.reject_take_request(received.clone()).await; - let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await; + let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); // TODO: More elaborate Cfd assertions assert_is_same_order(&taker_cfd.order, &received); assert_is_same_order(&maker_cfd.order, &received); @@ -62,14 +64,16 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() { let _guard = init_tracing(); let (mut maker, mut taker) = start_both().await; - is_next_none(taker.order_feed()).await; + is_next_none(taker.order_feed()).await.unwrap(); maker.publish_order(dummy_new_order()).await; - let (_, received) = next_order(maker.order_feed(), taker.order_feed()).await; + let (_, received) = next_order(maker.order_feed(), taker.order_feed()) + .await + .unwrap(); taker.take_order(received.clone(), Usd::new(dec!(5))).await; - let (_, _) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await; + let (_, _) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); maker.mocks.mock_oracle_annoucement().await; taker.mocks.mock_oracle_annoucement().await; @@ -79,7 +83,7 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() { maker.accept_take_request(received.clone()).await; - let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await; + let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); // TODO: More elaborate Cfd assertions assert_eq!(taker_cfd.order.id, received.id); assert_eq!(maker_cfd.order.id, received.id); @@ -89,7 +93,7 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() { maker.mocks.mock_wallet_sign_and_broadcast().await; taker.mocks.mock_wallet_sign_and_broadcast().await; - let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await; + let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); // TODO: More elaborate Cfd assertions assert_eq!(taker_cfd.order.id, received.id); assert_eq!(maker_cfd.order.id, received.id); diff --git a/daemon/tests/harness/flow.rs b/daemon/tests/harness/flow.rs index e7165e0..fc53d8b 100644 --- a/daemon/tests/harness/flow.rs +++ b/daemon/tests/harness/flow.rs @@ -1,8 +1,7 @@ -use std::time::Duration; - -use anyhow::Context; +use anyhow::{Context, Result}; use daemon::model::cfd::{Cfd, Order}; use daemon::tokio_ext::FutureExt; +use std::time::Duration; use tokio::sync::watch; /// Returns the first `Cfd` from both channels @@ -11,59 +10,54 @@ use tokio::sync::watch; pub async fn next_cfd( rx_a: &mut watch::Receiver>, rx_b: &mut watch::Receiver>, -) -> (Cfd, Cfd) { +) -> Result<(Cfd, Cfd)> { let (a, b) = tokio::join!(next(rx_a), next(rx_b)); + let (a, b) = (a?, b?); assert_eq!(a.len(), 1); assert_eq!(b.len(), 1); - (a.first().unwrap().clone(), b.first().unwrap().clone()) + Ok((a.first().unwrap().clone(), b.first().unwrap().clone())) } pub async fn next_order( rx_a: &mut watch::Receiver>, rx_b: &mut watch::Receiver>, -) -> (Order, Order) { +) -> Result<(Order, Order)> { let (a, b) = tokio::join!(next_some(rx_a), next_some(rx_b)); - (a, b) + Ok((a?, b?)) } /// Returns the value if the next Option received on the stream is Some -/// -/// Panics if None is received on the stream. -pub async fn next_some(rx: &mut watch::Receiver>) -> T +pub async fn next_some(rx: &mut watch::Receiver>) -> Result where T: Clone, { - if let Some(value) = next(rx).await { - value - } else { - panic!("Received None when Some was expected") - } + next(rx) + .await? + .context("Received None when Some was expected") } /// Returns true if the next Option received on the stream is None /// /// Returns false if Some is received. -pub async fn is_next_none(rx: &mut watch::Receiver>) -> bool +pub async fn is_next_none(rx: &mut watch::Receiver>) -> Result where T: Clone, { - next(rx).await.is_none() + Ok(next(rx).await?.is_none()) } /// Returns watch channel value upon change -pub async fn next(rx: &mut watch::Receiver) -> T +pub async fn next(rx: &mut watch::Receiver) -> Result where T: Clone, { - // TODO: Make timeout configurable, only contract setup can take up to 2 min on CI rx.changed() - .timeout(Duration::from_secs(120)) + .timeout(Duration::from_secs(10)) .await - .context("Waiting for next element in channel is taking too long, aborting") - .unwrap() - .unwrap(); - rx.borrow().clone() + .context("No change in channel within 10 seconds")??; + + Ok(rx.borrow().clone()) } diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index 176a07c..7c180d2 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -91,7 +91,6 @@ impl Maker { let listener_stream = futures::stream::poll_fn(move |ctx| { let message = match futures::ready!(listener.poll_accept(ctx)) { Ok((stream, address)) => { - dbg!("new connection"); maker_inc_connections::ListenerMessage::NewConnection { stream, address } } Err(e) => maker_inc_connections::ListenerMessage::Error { source: e },