From fe4172f1c0f80c1db6b5448c69caff40d0ff89db Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Fri, 22 Oct 2021 15:41:03 +1100 Subject: [PATCH] Move tcp connection setup outside of `Maker` actor system It is an implementation detail that we use a tcp connection and bites us in the tests. Co-authored-by: Mariusz Klochowicz --- daemon/src/lib.rs | 21 +++------------------ daemon/src/maker.rs | 16 +++++++++++++++- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index c4fac5b..95c6362 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -10,9 +10,7 @@ use futures::Stream; use sqlx::SqlitePool; use std::collections::HashMap; use std::future::Future; -use std::task::Poll; use std::time::Duration; -use tokio::net::TcpListener; use tokio::sync::watch; use xtra::message_channel::{MessageChannel, StrongMessageChannel}; use xtra::spawn::TokioGlobalSpawnExt; @@ -52,6 +50,7 @@ pub struct Maker { pub cfd_feed_receiver: watch::Receiver>, pub order_feed_receiver: watch::Receiver>, pub update_cfd_feed_receiver: watch::Receiver, + pub inc_conn_addr: Address, } impl Maker @@ -65,8 +64,7 @@ where + xtra::Handler + xtra::Handler, T: xtra::Handler - + xtra::Handler - + xtra::Handler, + + xtra::Handler, { pub async fn new( db: SqlitePool, @@ -78,7 +76,6 @@ where Box>, Box>, ) -> T, - listener: TcpListener, term: time::Duration, ) -> Result where @@ -140,24 +137,12 @@ where oracle_addr.do_send_async(oracle::Sync).await?; - let listener_stream = futures::stream::poll_fn(move |ctx| { - let message = match futures::ready!(listener.poll_accept(ctx)) { - Ok((stream, address)) => { - maker_inc_connections::ListenerMessage::NewConnection { stream, address } - } - Err(e) => maker_inc_connections::ListenerMessage::Error { source: e }, - }; - - Poll::Ready(Some(message)) - }); - - tokio::spawn(inc_conn_addr.attach_stream(listener_stream)); - Ok(Self { cfd_actor_addr, cfd_feed_receiver, order_feed_receiver, update_cfd_feed_receiver, + inc_conn_addr, }) } } diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 5575419..cffc91f 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -22,6 +22,7 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; +use std::task::Poll; use tokio::sync::watch; use tracing_subscriber::filter::LevelFilter; @@ -190,6 +191,7 @@ async fn main() -> Result<()> { cfd_feed_receiver, order_feed_receiver, update_cfd_feed_receiver, + inc_conn_addr: incoming_connection_addr, } = Maker::new( db.clone(), wallet.clone(), @@ -202,11 +204,23 @@ async fn main() -> Result<()> { } }, |channel0, channel1| maker_inc_connections::Actor::new(channel0, channel1), - listener, time::Duration::hours(opts.term as i64), ) .await?; + let listener_stream = futures::stream::poll_fn(move |ctx| { + let message = match futures::ready!(listener.poll_accept(ctx)) { + Ok((stream, address)) => { + maker_inc_connections::ListenerMessage::NewConnection { stream, address } + } + Err(e) => maker_inc_connections::ListenerMessage::Error { source: e }, + }; + + Poll::Ready(Some(message)) + }); + + tokio::spawn(incoming_connection_addr.attach_stream(listener_stream)); + tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); let cfd_action_channel = MessageChannel::::clone_channel(&cfd_actor_addr);