Browse Source

Merge #602

602: Bug hunting cleanups r=thomaseizinger a=thomaseizinger

Some changes I made whilst bug hunting.


Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
new-http-api
bors[bot] 3 years ago
committed by GitHub
parent
commit
0d6c4a4b57
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      daemon/src/connection.rs
  2. 4
      daemon/src/lib.rs
  3. 78
      daemon/src/maker_cfd.rs
  4. 21
      daemon/src/maker_inc_connections.rs
  5. 2
      daemon/src/send_to_socket.rs
  6. 26
      daemon/tests/happy_path.rs
  7. 42
      daemon/tests/harness/flow.rs
  8. 1
      daemon/tests/harness/mod.rs

3
daemon/src/connection.rs

@ -136,9 +136,10 @@ impl Actor {
} }
}; };
tracing::trace!("Received '{}'", msg);
match msg { match msg {
wire::MakerToTaker::Heartbeat => { wire::MakerToTaker::Heartbeat => {
tracing::trace!("received a heartbeat message from maker");
self.connected_state self.connected_state
.as_mut() .as_mut()
.expect("wire messages only to arrive in connected state") .expect("wire messages only to arrive in connected state")

4
daemon/src/lib.rs

@ -148,6 +148,8 @@ where
oracle_addr.do_send_async(oracle::Sync).await?; oracle_addr.do_send_async(oracle::Sync).await?;
tracing::debug!("Maker actor system ready");
Ok(Self { Ok(Self {
cfd_actor_addr, cfd_actor_addr,
cfd_feed_receiver, cfd_feed_receiver,
@ -254,6 +256,8 @@ where
tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor)))); tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor))));
tracing::debug!("Taker actor system ready");
Ok(Self { Ok(Self {
cfd_actor_addr, cfd_actor_addr,
connection_actor_addr, connection_actor_addr,

78
daemon/src/maker_cfd.rs

@ -620,47 +620,6 @@ where
} }
} }
impl<O, M, T, W> Actor<O, M, T, W>
where
T: xtra::Handler<maker_inc_connections::BroadcastOrder>,
{
async fn handle_new_order(
&mut self,
price: Price,
min_quantity: Usd,
max_quantity: Usd,
) -> Result<()> {
let oracle_event_id = oracle::next_announcement_after(
time::OffsetDateTime::now_utc() + self.settlement_time_interval_hours,
)?;
let order = Order::new(
price,
min_quantity,
max_quantity,
Origin::Ours,
oracle_event_id,
self.settlement_time_interval_hours,
)?;
// 1. Save to DB
let mut conn = self.db.acquire().await?;
insert_order(&order, &mut conn).await?;
// 2. Update actor state to current order
self.current_order_id.replace(order.id);
// 3. Notify UI via feed
self.order_feed_sender.send(Some(order.clone()))?;
// 4. Inform connected takers
self.takers
.do_send_async(maker_inc_connections::BroadcastOrder(Some(order)))
.await?;
Ok(())
}
}
impl<O, M, T, W> Actor<O, M, T, W> impl<O, M, T, W> Actor<O, M, T, W>
where where
O: xtra::Handler<oracle::MonitorAttestation>, O: xtra::Handler<oracle::MonitorAttestation>,
@ -964,8 +923,41 @@ where
T: xtra::Handler<maker_inc_connections::BroadcastOrder>, T: xtra::Handler<maker_inc_connections::BroadcastOrder>,
{ {
async fn handle(&mut self, msg: NewOrder, _ctx: &mut Context<Self>) -> Result<()> { async fn handle(&mut self, msg: NewOrder, _ctx: &mut Context<Self>) -> Result<()> {
self.handle_new_order(msg.price, msg.min_quantity, msg.max_quantity) let NewOrder {
.await price,
min_quantity,
max_quantity,
} = msg;
let oracle_event_id = oracle::next_announcement_after(
time::OffsetDateTime::now_utc() + self.settlement_time_interval_hours,
)?;
let order = Order::new(
price,
min_quantity,
max_quantity,
Origin::Ours,
oracle_event_id,
self.settlement_time_interval_hours,
)?;
// 1. Save to DB
let mut conn = self.db.acquire().await?;
insert_order(&order, &mut conn).await?;
// 2. Update actor state to current order
self.current_order_id.replace(order.id);
// 3. Notify UI via feed
self.order_feed_sender.send(Some(order.clone()))?;
// 4. Inform connected takers
self.takers
.do_send_async(maker_inc_connections::BroadcastOrder(Some(order)))
.await?;
Ok(())
} }
} }

21
daemon/src/maker_inc_connections.rs

@ -2,7 +2,7 @@ use crate::maker_cfd::{FromTaker, NewTakerOnline};
use crate::model::cfd::{Order, OrderId}; use crate::model::cfd::{Order, OrderId};
use crate::model::{BitMexPriceEventId, TakerId}; use crate::model::{BitMexPriceEventId, TakerId};
use crate::{forward_only_ok, maker_cfd, noise, send_to_socket, wire, HEARTBEAT_INTERVAL}; use crate::{forward_only_ok, maker_cfd, noise, send_to_socket, wire, HEARTBEAT_INTERVAL};
use anyhow::{Context as AnyhowContext, Result}; use anyhow::Result;
use futures::{StreamExt, TryStreamExt}; use futures::{StreamExt, TryStreamExt};
use std::collections::HashMap; use std::collections::HashMap;
use std::io; use std::io;
@ -84,11 +84,15 @@ impl Actor {
} }
} }
async fn send_to_taker(&mut self, taker_id: &TakerId, msg: wire::MakerToTaker) -> Result<()> { async fn send_to_taker(
&mut self,
taker_id: &TakerId,
msg: wire::MakerToTaker,
) -> Result<(), NoConnection> {
let conn = self let conn = self
.write_connections .write_connections
.get(taker_id) .get(taker_id)
.context("no connection to taker_id")?; .ok_or_else(|| NoConnection(*taker_id))?;
let msg_str = msg.to_string(); let msg_str = msg.to_string();
@ -156,19 +160,21 @@ impl Actor {
} }
} }
#[derive(Debug, thiserror::Error)]
#[error("No connection to taker {0}")]
pub struct NoConnection(TakerId);
#[xtra_productivity] #[xtra_productivity]
impl Actor { impl Actor {
async fn handle_broadcast_order(&mut self, msg: BroadcastOrder) -> Result<()> { async fn handle_broadcast_order(&mut self, msg: BroadcastOrder) {
let order = msg.0; let order = msg.0;
for taker_id in self.write_connections.clone().keys() { for taker_id in self.write_connections.clone().keys() {
self.send_to_taker(taker_id, wire::MakerToTaker::CurrentOrder(order.clone())).await.expect("send_to_taker only fails on missing hashmap entry and we are iterating over those entries"); self.send_to_taker(taker_id, wire::MakerToTaker::CurrentOrder(order.clone())).await.expect("send_to_taker only fails on missing hashmap entry and we are iterating over those entries");
tracing::trace!(%taker_id, "sent new order: {:?}", order.as_ref().map(|o| o.id)); tracing::trace!(%taker_id, "sent new order: {:?}", order.as_ref().map(|o| o.id));
} }
Ok(())
} }
async fn handle_taker_message(&mut self, msg: TakerMessage) -> Result<()> { async fn handle_taker_message(&mut self, msg: TakerMessage) -> Result<(), NoConnection> {
match msg.command { match msg.command {
TakerCommand::SendOrder { order } => { TakerCommand::SendOrder { order } => {
self.send_to_taker(&msg.taker_id, wire::MakerToTaker::CurrentOrder(order)) self.send_to_taker(&msg.taker_id, wire::MakerToTaker::CurrentOrder(order))
@ -223,6 +229,7 @@ impl Actor {
.await?; .await?;
} }
} }
Ok(()) Ok(())
} }

2
daemon/src/send_to_socket.rs

@ -33,7 +33,7 @@ where
async fn handle(&mut self, message: T, ctx: &mut xtra::Context<Self>) { async fn handle(&mut self, message: T, ctx: &mut xtra::Context<Self>) {
let message_name = message.to_string(); // send consumes the message, avoid a clone just in case it errors by getting the name here let message_name = message.to_string(); // send consumes the message, avoid a clone just in case it errors by getting the name here
tracing::trace!(%message_name, "send to socket message"); tracing::trace!("Sending '{}'", message_name);
if let Err(e) = self.write.send(message).await { if let Err(e) = self.write.send(message).await {
tracing::error!("Failed to write message {} to socket: {}", message_name, e); tracing::error!("Failed to write message {} to socket: {}", message_name, e);

26
daemon/tests/happy_path.rs

@ -11,14 +11,14 @@ async fn taker_receives_order_from_maker_on_publication() {
let _guard = init_tracing(); let _guard = init_tracing();
let (mut maker, mut taker) = start_both().await; let (mut maker, mut taker) = start_both().await;
assert!(is_next_none(taker.order_feed()).await); assert!(is_next_none(taker.order_feed()).await.unwrap());
maker.publish_order(dummy_new_order()).await; maker.publish_order(dummy_new_order()).await;
let (published, received) = let (published, received) =
tokio::join!(next_some(maker.order_feed()), next_some(taker.order_feed())); tokio::join!(next_some(maker.order_feed()), next_some(taker.order_feed()));
assert_is_same_order(&published, &received); assert_is_same_order(&published.unwrap(), &received.unwrap());
} }
#[tokio::test] #[tokio::test]
@ -27,15 +27,17 @@ async fn taker_takes_order_and_maker_rejects() {
let (mut maker, mut taker) = start_both().await; let (mut maker, mut taker) = start_both().await;
// TODO: Why is this needed? For the cfd stream it is not needed // TODO: Why is this needed? For the cfd stream it is not needed
is_next_none(taker.order_feed()).await; is_next_none(taker.order_feed()).await.unwrap();
maker.publish_order(dummy_new_order()).await; maker.publish_order(dummy_new_order()).await;
let (_, received) = next_order(maker.order_feed(), taker.order_feed()).await; let (_, received) = next_order(maker.order_feed(), taker.order_feed())
.await
.unwrap();
taker.take_order(received.clone(), Usd::new(dec!(10))).await; taker.take_order(received.clone(), Usd::new(dec!(10))).await;
let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await; let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap();
assert_is_same_order(&taker_cfd.order, &received); assert_is_same_order(&taker_cfd.order, &received);
assert_is_same_order(&maker_cfd.order, &received); assert_is_same_order(&maker_cfd.order, &received);
assert!(matches!( assert!(matches!(
@ -49,7 +51,7 @@ async fn taker_takes_order_and_maker_rejects() {
maker.reject_take_request(received.clone()).await; maker.reject_take_request(received.clone()).await;
let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await; let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap();
// TODO: More elaborate Cfd assertions // TODO: More elaborate Cfd assertions
assert_is_same_order(&taker_cfd.order, &received); assert_is_same_order(&taker_cfd.order, &received);
assert_is_same_order(&maker_cfd.order, &received); assert_is_same_order(&maker_cfd.order, &received);
@ -62,14 +64,16 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() {
let _guard = init_tracing(); let _guard = init_tracing();
let (mut maker, mut taker) = start_both().await; let (mut maker, mut taker) = start_both().await;
is_next_none(taker.order_feed()).await; is_next_none(taker.order_feed()).await.unwrap();
maker.publish_order(dummy_new_order()).await; maker.publish_order(dummy_new_order()).await;
let (_, received) = next_order(maker.order_feed(), taker.order_feed()).await; let (_, received) = next_order(maker.order_feed(), taker.order_feed())
.await
.unwrap();
taker.take_order(received.clone(), Usd::new(dec!(5))).await; taker.take_order(received.clone(), Usd::new(dec!(5))).await;
let (_, _) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await; let (_, _) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap();
maker.mocks.mock_oracle_annoucement().await; maker.mocks.mock_oracle_annoucement().await;
taker.mocks.mock_oracle_annoucement().await; taker.mocks.mock_oracle_annoucement().await;
@ -79,7 +83,7 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() {
maker.accept_take_request(received.clone()).await; maker.accept_take_request(received.clone()).await;
let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await; let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap();
// TODO: More elaborate Cfd assertions // TODO: More elaborate Cfd assertions
assert_eq!(taker_cfd.order.id, received.id); assert_eq!(taker_cfd.order.id, received.id);
assert_eq!(maker_cfd.order.id, received.id); assert_eq!(maker_cfd.order.id, received.id);
@ -89,7 +93,7 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() {
maker.mocks.mock_wallet_sign_and_broadcast().await; maker.mocks.mock_wallet_sign_and_broadcast().await;
taker.mocks.mock_wallet_sign_and_broadcast().await; taker.mocks.mock_wallet_sign_and_broadcast().await;
let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await; let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap();
// TODO: More elaborate Cfd assertions // TODO: More elaborate Cfd assertions
assert_eq!(taker_cfd.order.id, received.id); assert_eq!(taker_cfd.order.id, received.id);
assert_eq!(maker_cfd.order.id, received.id); assert_eq!(maker_cfd.order.id, received.id);

42
daemon/tests/harness/flow.rs

@ -1,8 +1,7 @@
use std::time::Duration; use anyhow::{Context, Result};
use anyhow::Context;
use daemon::model::cfd::{Cfd, Order}; use daemon::model::cfd::{Cfd, Order};
use daemon::tokio_ext::FutureExt; use daemon::tokio_ext::FutureExt;
use std::time::Duration;
use tokio::sync::watch; use tokio::sync::watch;
/// Returns the first `Cfd` from both channels /// Returns the first `Cfd` from both channels
@ -11,59 +10,54 @@ use tokio::sync::watch;
pub async fn next_cfd( pub async fn next_cfd(
rx_a: &mut watch::Receiver<Vec<Cfd>>, rx_a: &mut watch::Receiver<Vec<Cfd>>,
rx_b: &mut watch::Receiver<Vec<Cfd>>, rx_b: &mut watch::Receiver<Vec<Cfd>>,
) -> (Cfd, Cfd) { ) -> Result<(Cfd, Cfd)> {
let (a, b) = tokio::join!(next(rx_a), next(rx_b)); let (a, b) = tokio::join!(next(rx_a), next(rx_b));
let (a, b) = (a?, b?);
assert_eq!(a.len(), 1); assert_eq!(a.len(), 1);
assert_eq!(b.len(), 1); assert_eq!(b.len(), 1);
(a.first().unwrap().clone(), b.first().unwrap().clone()) Ok((a.first().unwrap().clone(), b.first().unwrap().clone()))
} }
pub async fn next_order( pub async fn next_order(
rx_a: &mut watch::Receiver<Option<Order>>, rx_a: &mut watch::Receiver<Option<Order>>,
rx_b: &mut watch::Receiver<Option<Order>>, rx_b: &mut watch::Receiver<Option<Order>>,
) -> (Order, Order) { ) -> Result<(Order, Order)> {
let (a, b) = tokio::join!(next_some(rx_a), next_some(rx_b)); let (a, b) = tokio::join!(next_some(rx_a), next_some(rx_b));
(a, b) Ok((a?, b?))
} }
/// Returns the value if the next Option received on the stream is Some /// Returns the value if the next Option received on the stream is Some
/// pub async fn next_some<T>(rx: &mut watch::Receiver<Option<T>>) -> Result<T>
/// Panics if None is received on the stream.
pub async fn next_some<T>(rx: &mut watch::Receiver<Option<T>>) -> T
where where
T: Clone, T: Clone,
{ {
if let Some(value) = next(rx).await { next(rx)
value .await?
} else { .context("Received None when Some was expected")
panic!("Received None when Some was expected")
}
} }
/// Returns true if the next Option received on the stream is None /// Returns true if the next Option received on the stream is None
/// ///
/// Returns false if Some is received. /// Returns false if Some is received.
pub async fn is_next_none<T>(rx: &mut watch::Receiver<Option<T>>) -> bool pub async fn is_next_none<T>(rx: &mut watch::Receiver<Option<T>>) -> Result<bool>
where where
T: Clone, T: Clone,
{ {
next(rx).await.is_none() Ok(next(rx).await?.is_none())
} }
/// Returns watch channel value upon change /// Returns watch channel value upon change
pub async fn next<T>(rx: &mut watch::Receiver<T>) -> T pub async fn next<T>(rx: &mut watch::Receiver<T>) -> Result<T>
where where
T: Clone, T: Clone,
{ {
// TODO: Make timeout configurable, only contract setup can take up to 2 min on CI
rx.changed() rx.changed()
.timeout(Duration::from_secs(120)) .timeout(Duration::from_secs(10))
.await .await
.context("Waiting for next element in channel is taking too long, aborting") .context("No change in channel within 10 seconds")??;
.unwrap()
.unwrap(); Ok(rx.borrow().clone())
rx.borrow().clone()
} }

1
daemon/tests/harness/mod.rs

@ -91,7 +91,6 @@ impl Maker {
let listener_stream = futures::stream::poll_fn(move |ctx| { let listener_stream = futures::stream::poll_fn(move |ctx| {
let message = match futures::ready!(listener.poll_accept(ctx)) { let message = match futures::ready!(listener.poll_accept(ctx)) {
Ok((stream, address)) => { Ok((stream, address)) => {
dbg!("new connection");
maker_inc_connections::ListenerMessage::NewConnection { stream, address } maker_inc_connections::ListenerMessage::NewConnection { stream, address }
} }
Err(e) => maker_inc_connections::ListenerMessage::Error { source: e }, Err(e) => maker_inc_connections::ListenerMessage::Error { source: e },

Loading…
Cancel
Save