Browse Source

Merge #606

606: Reduce heartbeat interval in test harness r=klochowicz a=klochowicz

As a rule of thumb, taker waits twice the amount of HEARTBEAT_INTERVAL until it
assumes that maker disappeared.

Co-authored-by: Mariusz Klochowicz <mariusz@klochowicz.com>
new-http-api
bors[bot] 3 years ago
committed by GitHub
parent
commit
70b2af4067
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      daemon/src/lib.rs
  2. 6
      daemon/src/maker.rs
  3. 9
      daemon/src/maker_inc_connections.rs
  4. 3
      daemon/src/taker.rs
  5. 8
      daemon/tests/happy_path.rs
  6. 12
      daemon/tests/harness/mod.rs

5
daemon/src/lib.rs

@ -47,7 +47,7 @@ pub mod wallet;
pub mod wallet_sync;
pub mod wire;
const HEARTBEAT_INTERVAL: std::time::Duration = Duration::from_secs(5);
pub const HEARTBEAT_INTERVAL: std::time::Duration = Duration::from_secs(5);
pub const N_PAYOUTS: usize = 200;
@ -218,6 +218,7 @@ where
oracle_constructor: impl FnOnce(Vec<Cfd>, Box<dyn StrongMessageChannel<Attestation>>) -> O,
monitor_constructor: impl FnOnce(Box<dyn StrongMessageChannel<monitor::Event>>, Vec<Cfd>) -> F,
n_payouts: usize,
maker_heartbeat_interval: Duration,
) -> Result<Self>
where
F: Future<Output = Result<M>>,
@ -263,7 +264,7 @@ where
maker_online_status_feed_sender,
Box::new(cfd_actor_addr.clone()),
identity_sk,
HEARTBEAT_INTERVAL * 2,
maker_heartbeat_interval,
))
.spawn_with_handle(),
);

6
daemon/src/maker.rs

@ -9,7 +9,7 @@ use daemon::seed::Seed;
use daemon::tokio_ext::FutureExt;
use daemon::{
bitmex_price_feed, db, housekeeping, logger, maker_cfd, maker_inc_connections, monitor, oracle,
wallet, wallet_sync, MakerActorSystem, N_PAYOUTS,
wallet, wallet_sync, MakerActorSystem, HEARTBEAT_INTERVAL, N_PAYOUTS,
};
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
@ -263,7 +263,9 @@ async fn main() -> Result<()> {
monitor::Actor::new(electrum, channel, cfds)
}
},
|channel0, channel1| maker_inc_connections::Actor::new(channel0, channel1, identity_sk),
|channel0, channel1| {
maker_inc_connections::Actor::new(channel0, channel1, identity_sk, HEARTBEAT_INTERVAL)
},
time::Duration::hours(opts.settlement_time_interval_hours as i64),
N_PAYOUTS,
)

9
daemon/src/maker_inc_connections.rs

@ -2,7 +2,7 @@ use crate::maker_cfd::{FromTaker, NewTakerOnline};
use crate::model::cfd::{Order, OrderId};
use crate::model::{BitMexPriceEventId, TakerId};
use crate::tokio_ext::FutureExt;
use crate::{forward_only_ok, maker_cfd, noise, send_to_socket, wire, HEARTBEAT_INTERVAL};
use crate::{forward_only_ok, maker_cfd, noise, send_to_socket, wire};
use anyhow::Result;
use futures::future::RemoteHandle;
use futures::{StreamExt, TryStreamExt};
@ -10,6 +10,7 @@ use std::collections::HashMap;
use std::io;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::net::TcpStream;
use tokio_util::codec::FramedRead;
use xtra::prelude::*;
@ -69,6 +70,7 @@ pub struct Actor {
new_taker_channel: Box<dyn MessageChannel<NewTakerOnline>>,
taker_msg_channel: Box<dyn MessageChannel<FromTaker>>,
noise_priv_key: x25519_dalek::StaticSecret,
heartbeat_interval: Duration,
tasks: Vec<RemoteHandle<()>>,
}
@ -77,12 +79,14 @@ impl Actor {
new_taker_channel: Box<dyn MessageChannel<NewTakerOnline>>,
taker_msg_channel: Box<dyn MessageChannel<FromTaker>>,
noise_priv_key: x25519_dalek::StaticSecret,
heartbeat_interval: Duration,
) -> Self {
Self {
write_connections: HashMap::new(),
new_taker_channel: new_taker_channel.clone_channel(),
taker_msg_channel: taker_msg_channel.clone_channel(),
noise_priv_key,
heartbeat_interval,
tasks: Vec::new(),
}
}
@ -135,12 +139,13 @@ impl Actor {
self.tasks.push(forward_to_cfd_fut.spawn_with_handle());
// only allow outgoing messages while we are successfully reading incoming ones
let heartbeat_interval = self.heartbeat_interval;
self.tasks.push(
async move {
let mut actor = send_to_socket::Actor::new(write, noise.clone());
let _heartbeat_handle = out_msg_actor_context
.notify_interval(HEARTBEAT_INTERVAL, || wire::MakerToTaker::Heartbeat)
.notify_interval(heartbeat_interval, || wire::MakerToTaker::Heartbeat)
.expect("actor not to shutdown")
.spawn_with_handle();

3
daemon/src/taker.rs

@ -9,7 +9,7 @@ use daemon::seed::Seed;
use daemon::tokio_ext::FutureExt;
use daemon::{
bitmex_price_feed, connection, db, housekeeping, logger, monitor, oracle, taker_cfd, wallet,
wallet_sync, TakerActorSystem, N_PAYOUTS,
wallet_sync, TakerActorSystem, HEARTBEAT_INTERVAL, N_PAYOUTS,
};
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
@ -252,6 +252,7 @@ async fn main() -> Result<()> {
}
},
N_PAYOUTS,
HEARTBEAT_INTERVAL * 2,
)
.await?;

8
daemon/tests/happy_path.rs

@ -1,11 +1,12 @@
use crate::harness::flow::{is_next_none, next, next_cfd, next_order, next_some};
use crate::harness::{assert_is_same_order, dummy_new_order, init_tracing, start_both};
use crate::harness::{
assert_is_same_order, dummy_new_order, init_tracing, start_both, HEARTBEAT_INTERVAL_FOR_TEST,
};
use daemon::connection::ConnectionStatus;
use daemon::model::cfd::CfdState;
use daemon::model::Usd;
use maia::secp256k1_zkp::schnorrsig;
use rust_decimal_macros::dec;
use std::time::Duration;
use tokio::time::sleep;
mod harness;
@ -116,8 +117,7 @@ async fn taker_notices_lack_of_maker() {
std::mem::drop(maker);
// TODO: shorten this sleep by specifying different heartbeat interval for tests
sleep(Duration::from_secs(12)).await;
sleep(HEARTBEAT_INTERVAL_FOR_TEST * 3).await;
assert_eq!(
ConnectionStatus::Offline,

12
daemon/tests/harness/mod.rs

@ -13,6 +13,7 @@ use sqlx::SqlitePool;
use std::net::SocketAddr;
use std::str::FromStr;
use std::task::Poll;
use std::time::Duration;
use tokio::sync::watch;
use tracing::subscriber::DefaultGuard;
use tracing_subscriber::filter::LevelFilter;
@ -25,6 +26,7 @@ pub mod flow;
pub mod maia;
pub mod mocks;
pub const HEARTBEAT_INTERVAL_FOR_TEST: Duration = Duration::from_secs(2);
const N_PAYOUTS_FOR_TEST: usize = 5;
pub async fn start_both() -> (Maker, Taker) {
@ -77,7 +79,14 @@ impl Maker {
oracle_pk,
|_, _| oracle,
|_, _| async { Ok(monitor) },
|channel0, channel1| maker_inc_connections::Actor::new(channel0, channel1, identity_sk),
|channel0, channel1| {
maker_inc_connections::Actor::new(
channel0,
channel1,
identity_sk,
HEARTBEAT_INTERVAL_FOR_TEST,
)
},
settlement_time_interval_hours,
N_PAYOUTS_FOR_TEST,
)
@ -182,6 +191,7 @@ impl Taker {
|_, _| oracle,
|_, _| async { Ok(monitor) },
N_PAYOUTS_FOR_TEST,
HEARTBEAT_INTERVAL_FOR_TEST * 2,
)
.await
.unwrap();

Loading…
Cancel
Save