Browse Source

Move tcp connection setup outside of `Maker` actor system

It is an implementation detail that we use a tcp connection and bites us in the tests.

Co-authored-by: Mariusz Klochowicz <mariusz@klochowicz.com>
contact-taker-before-changing-cfd-state
Daniel Karzel 3 years ago
committed by Mariusz Klochowicz
parent
commit
fe4172f1c0
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 21
      daemon/src/lib.rs
  2. 16
      daemon/src/maker.rs

21
daemon/src/lib.rs

@ -10,9 +10,7 @@ use futures::Stream;
use sqlx::SqlitePool;
use std::collections::HashMap;
use std::future::Future;
use std::task::Poll;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::sync::watch;
use xtra::message_channel::{MessageChannel, StrongMessageChannel};
use xtra::spawn::TokioGlobalSpawnExt;
@ -52,6 +50,7 @@ pub struct Maker<O, M, T> {
pub cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
pub order_feed_receiver: watch::Receiver<Option<Order>>,
pub update_cfd_feed_receiver: watch::Receiver<UpdateCfdProposals>,
pub inc_conn_addr: Address<T>,
}
impl<O, M, T> Maker<O, M, T>
@ -65,8 +64,7 @@ where
+ xtra::Handler<monitor::CollaborativeSettlement>
+ xtra::Handler<oracle::Attestation>,
T: xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>
+ xtra::Handler<maker_inc_connections::ListenerMessage>,
+ xtra::Handler<maker_inc_connections::BroadcastOrder>,
{
pub async fn new<F>(
db: SqlitePool,
@ -78,7 +76,6 @@ where
Box<dyn MessageChannel<NewTakerOnline>>,
Box<dyn MessageChannel<FromTaker>>,
) -> T,
listener: TcpListener,
term: time::Duration,
) -> Result<Self>
where
@ -140,24 +137,12 @@ where
oracle_addr.do_send_async(oracle::Sync).await?;
let listener_stream = futures::stream::poll_fn(move |ctx| {
let message = match futures::ready!(listener.poll_accept(ctx)) {
Ok((stream, address)) => {
maker_inc_connections::ListenerMessage::NewConnection { stream, address }
}
Err(e) => maker_inc_connections::ListenerMessage::Error { source: e },
};
Poll::Ready(Some(message))
});
tokio::spawn(inc_conn_addr.attach_stream(listener_stream));
Ok(Self {
cfd_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
inc_conn_addr,
})
}
}

16
daemon/src/maker.rs

@ -22,6 +22,7 @@ use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::task::Poll;
use tokio::sync::watch;
use tracing_subscriber::filter::LevelFilter;
@ -190,6 +191,7 @@ async fn main() -> Result<()> {
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
inc_conn_addr: incoming_connection_addr,
} = Maker::new(
db.clone(),
wallet.clone(),
@ -202,11 +204,23 @@ async fn main() -> Result<()> {
}
},
|channel0, channel1| maker_inc_connections::Actor::new(channel0, channel1),
listener,
time::Duration::hours(opts.term as i64),
)
.await?;
let listener_stream = futures::stream::poll_fn(move |ctx| {
let message = match futures::ready!(listener.poll_accept(ctx)) {
Ok((stream, address)) => {
maker_inc_connections::ListenerMessage::NewConnection { stream, address }
}
Err(e) => maker_inc_connections::ListenerMessage::Error { source: e },
};
Poll::Ready(Some(message))
});
tokio::spawn(incoming_connection_addr.attach_stream(listener_stream));
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
let cfd_action_channel = MessageChannel::<maker_cfd::CfdAction>::clone_channel(&cfd_actor_addr);

Loading…
Cancel
Save