diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 28c78f4..9ffb7eb 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -1,20 +1,18 @@ use crate::auth::MAKER_USERNAME; -use crate::model::TakerId; use crate::seed::Seed; use crate::wallet::Wallet; use anyhow::{Context, Result}; use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1}; use bdk::bitcoin::Network; use clap::Clap; -use futures::StreamExt; use model::cfd::{Cfd, Order}; use model::WalletInfo; use rocket::fairing::AdHoc; use rocket_db_pools::Database; use std::collections::HashMap; use std::path::PathBuf; +use std::task::Poll; use tokio::sync::watch; -use tokio_util::codec::FramedRead; use tracing_subscriber::filter::LevelFilter; use xtra::prelude::*; use xtra::spawn::TokioGlobalSpawnExt; @@ -190,38 +188,22 @@ async fn main() -> Result<()> { HashMap::new(), cfd_maker_actor_inbox.clone(), ))); - tokio::spawn({ - let cfd_maker_actor_inbox = cfd_maker_actor_inbox.clone(); - let maker_inc_connections_address = maker_inc_connections_address.clone(); - async move { - loop { - if let Ok((socket, remote_addr)) = listener.accept().await { - tracing::info!("Connected to {}", remote_addr); - let taker_id = TakerId::default(); - let (read, write) = socket.into_split(); - - let read = FramedRead::new(read, wire::JsonCodec::new()).map( - move |item| maker_cfd::TakerStreamMessage { taker_id, item }, - ); - - tokio::spawn(cfd_maker_actor_inbox.clone().attach_stream(read)); - - let out_msg_actor = send_to_socket::Actor::new(write) - .create(None) - .spawn_global(); - - maker_inc_connections_address - .do_send_async(maker_inc_connections::NewTakerOnline { - taker_id, - out_msg_actor, - }) - .await - .unwrap(); - }; + + let listener_stream = futures::stream::poll_fn(move |ctx| { + let message = match futures::ready!(listener.poll_accept(ctx)) { + Ok((stream, address)) => { + maker_inc_connections::ListenerMessage::NewConnection { + stream, + address, + } } - } + Err(e) => maker_inc_connections::ListenerMessage::Error { source: e }, + }; + + Poll::Ready(Some(message)) }); + tokio::spawn(maker_inc_connections_address.attach_stream(listener_stream)); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); Ok(rocket.manage(cfd_maker_actor_inbox)) diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index bb39ba6..e840d8d 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -4,8 +4,15 @@ use crate::model::TakerId; use crate::{maker_cfd, send_to_socket, wire}; use anyhow::{Context as AnyhowContext, Result}; use async_trait::async_trait; +use futures::StreamExt; use std::collections::HashMap; +use std::io; +use std::net::SocketAddr; +use tokio::net::TcpStream; +use tokio_util::codec::FramedRead; use xtra::prelude::*; +use xtra::spawn::TokioGlobalSpawnExt; +use xtra::{Actor as _, KeepRunning}; pub struct BroadcastOrder(pub Option); @@ -23,9 +30,14 @@ pub struct TakerMessage { pub command: TakerCommand, } -pub struct NewTakerOnline { - pub taker_id: TakerId, - pub out_msg_actor: Address>, +pub enum ListenerMessage { + NewConnection { + stream: TcpStream, + address: SocketAddr, + }, + Error { + source: io::Error, + }, } pub struct Actor { @@ -90,14 +102,22 @@ impl Actor { Ok(()) } - async fn handle_new_taker_online(&mut self, msg: NewTakerOnline) -> Result<()> { - self.cfd_actor - .do_send_async(maker_cfd::NewTakerOnline { id: msg.taker_id }) - .await?; + async fn handle_new_connection(&mut self, stream: TcpStream, address: SocketAddr) { + let taker_id = TakerId::default(); - self.write_connections - .insert(msg.taker_id, msg.out_msg_actor); - Ok(()) + tracing::info!("New taker {} connected on {}", taker_id, address); + + let (read, write) = stream.into_split(); + let read = FramedRead::new(read, wire::JsonCodec::new()) + .map(move |item| maker_cfd::TakerStreamMessage { taker_id, item }); + + tokio::spawn(self.cfd_actor.clone().attach_stream(read)); + + let out_msg_actor = send_to_socket::Actor::new(write) + .create(None) + .spawn_global(); + + self.write_connections.insert(taker_id, out_msg_actor); } } @@ -124,9 +144,22 @@ impl Handler for Actor { } #[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: NewTakerOnline, _ctx: &mut Context) { - log_error!(self.handle_new_taker_online(msg)); +impl Handler for Actor { + async fn handle(&mut self, msg: ListenerMessage, _ctx: &mut Context) -> KeepRunning { + match msg { + ListenerMessage::NewConnection { stream, address } => { + self.handle_new_connection(stream, address).await; + + KeepRunning::Yes + } + ListenerMessage::Error { source } => { + tracing::warn!("TCP listener produced an error: {}", source); + + // Maybe we should move the actual listening on the socket into here and restart the + // actor upon an error? + KeepRunning::Yes + } + } } } @@ -138,8 +171,8 @@ impl Message for TakerMessage { type Result = (); } -impl Message for NewTakerOnline { - type Result = (); +impl Message for ListenerMessage { + type Result = KeepRunning; } impl xtra::Actor for Actor {}