diff --git a/daemon/src/connection.rs b/daemon/src/connection.rs index c03ad64..a602660 100644 --- a/daemon/src/connection.rs +++ b/daemon/src/connection.rs @@ -1,7 +1,8 @@ use crate::model::cfd::OrderId; use crate::model::Usd; +use crate::tokio_ext::FutureExt; use crate::{log_error, noise, send_to_socket, setup_taker, wire, Tasks}; -use anyhow::Result; +use anyhow::{Context, Result}; use futures::StreamExt; use std::collections::HashMap; use std::net::SocketAddr; @@ -29,7 +30,8 @@ pub struct Actor { identity_sk: x25519_dalek::StaticSecret, maker_to_taker: Box>, /// Max duration since the last heartbeat until we die. - timeout: Duration, + heartbeat_timeout: Duration, + connect_timeout: Duration, connected_state: Option, setup_actors: HashMap>, } @@ -70,7 +72,8 @@ impl Actor { status_sender: watch::Sender, maker_to_taker: Box>, identity_sk: x25519_dalek::StaticSecret, - timeout: Duration, + hearthbeat_timeout: Duration, + connect_timeout: Duration, ) -> Self { let (send_to_maker_addr, send_to_maker_ctx) = xtra::Context::new(None); @@ -80,9 +83,10 @@ impl Actor { send_to_maker_ctx, identity_sk, maker_to_taker, - timeout, + heartbeat_timeout: hearthbeat_timeout, connected_state: None, setup_actors: HashMap::new(), + connect_timeout, } } } @@ -120,8 +124,20 @@ impl Actor { }: Connect, ctx: &mut xtra::Context, ) -> Result<()> { + tracing::debug!(address = %maker_addr, "Connecting to maker"); + let (read, write, noise) = { - let mut connection = TcpStream::connect(&maker_addr).await?; + let mut connection = TcpStream::connect(&maker_addr) + .timeout(self.connect_timeout) + .await + .with_context(|| { + format!( + "Connection attempt to {} timed out after {}s", + maker_addr, + self.connect_timeout.as_secs() + ) + })? + .with_context(|| format!("Failed to connect to {}", maker_addr))?; let noise = noise::initiator_handshake(&mut connection, &self.identity_sk, &maker_identity_pk) .await?; @@ -142,7 +158,7 @@ impl Actor { tasks.add(this.attach_stream(read)); tasks.add( - ctx.notify_interval(self.timeout, || MeasurePulse) + ctx.notify_interval(self.heartbeat_timeout, || MeasurePulse) .expect("we just started"), ); @@ -235,7 +251,7 @@ impl Actor { ) .expect("now is always later than heartbeat"); - if time_since_last_heartbeat > self.timeout { + if time_since_last_heartbeat > self.heartbeat_timeout { self.status_sender .send(ConnectionStatus::Offline) .expect("watch receiver to outlive the actor"); @@ -256,11 +272,9 @@ pub async fn connect( ) { loop { if maker_online_status_feed_receiver.borrow().clone() == ConnectionStatus::Offline { - tracing::info!("No connection to the maker, attempting to connect:"); + tracing::debug!("No connection to the maker"); 'connect: loop { for address in &maker_addresses { - tracing::trace!("Connecting to {}", address); - let connect_msg = Connect { maker_identity_pk, maker_addr: *address, @@ -274,11 +288,10 @@ pub async fn connect( tracing::trace!(%address, "Failed to establish connection: {:#}", e); continue; } - tracing::debug!("Connection established"); break 'connect; } - tracing::debug!( + tracing::warn!( "Tried connecting to {} addresses without success, retrying in {} seconds", maker_addresses.len(), CONNECT_TO_MAKER_INTERVAL.as_secs() diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 012cb65..6e381c3 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -234,6 +234,7 @@ where monitor_constructor: impl FnOnce(Box>, Vec) -> F, n_payouts: usize, maker_heartbeat_interval: Duration, + connect_timeout: Duration, projection_actor: Address, ) -> Result where @@ -272,6 +273,7 @@ where Box::new(cfd_actor_addr.clone()), identity_sk, maker_heartbeat_interval, + connect_timeout, ))); tasks.add( diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 87f4926..b196162 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -18,6 +18,7 @@ use sqlx::SqlitePool; use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; +use std::time::Duration; use tokio::sync::watch; use tracing_subscriber::filter::LevelFilter; use xtra::Actor; @@ -244,6 +245,7 @@ async fn main() -> Result<()> { }, N_PAYOUTS, HEARTBEAT_INTERVAL * 2, + Duration::from_secs(10), projection_actor.clone(), ) .await?; diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index c990403..3435ea1 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -291,6 +291,7 @@ impl Taker { |_, _| async { Ok(monitor) }, config.n_payouts, config.heartbeat_timeout, + Duration::from_secs(10), projection_actor, ) .await