From 370d40f0c2565d1d9afad3b4ee05d70735b45dd4 Mon Sep 17 00:00:00 2001 From: Lucas Soriano del Pino Date: Thu, 14 Oct 2021 18:23:02 +1100 Subject: [PATCH] Use MessageChannels in maker_inc_connections::Actor We would have liked to attach the stream of `TakerStreamMessage`s directly to the `MessageChannel`, but we failed and instead decided to let the `maker_inc_connections::Actor` send these messages to itself and then forward them to the `maker_cfd::Actor`. --- daemon/src/maker.rs | 3 ++- daemon/src/maker_inc_connections.rs | 36 ++++++++++++++++++++++------- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 97edb62..75a2194 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -234,7 +234,8 @@ async fn main() -> Result<()> { tokio::spawn( maker_inc_connections_context.run(maker_inc_connections::Actor::new( - cfd_maker_actor_inbox.clone(), + &cfd_maker_actor_inbox, + &cfd_maker_actor_inbox, )), ); tokio::spawn( diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index 70be8ae..bc71321 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -1,3 +1,4 @@ +use crate::maker_cfd::{NewTakerOnline, TakerStreamMessage}; use crate::model::cfd::{Order, OrderId}; use crate::model::{BitMexPriceEventId, TakerId}; use crate::{log_error, maker_cfd, send_to_socket, wire}; @@ -63,14 +64,19 @@ pub enum ListenerMessage { pub struct Actor { write_connections: HashMap>>, - cfd_actor: Address, + new_taker_channel: Box>, + taker_msg_channel: Box>, } impl Actor { - pub fn new(cfd_actor: Address) -> Self { + pub fn new( + new_taker_channel: &impl MessageChannel, + taker_msg_channel: &impl MessageChannel, + ) -> Self { Self { write_connections: HashMap::new(), - cfd_actor, + new_taker_channel: new_taker_channel.clone_channel(), + taker_msg_channel: taker_msg_channel.clone_channel(), } } @@ -155,7 +161,12 @@ impl Actor { Ok(()) } - async fn handle_new_connection(&mut self, stream: TcpStream, address: SocketAddr) { + async fn handle_new_connection( + &mut self, + stream: TcpStream, + address: SocketAddr, + ctx: &mut Context, + ) { let taker_id = TakerId::default(); tracing::info!("New taker {} connected on {}", taker_id, address); @@ -164,7 +175,8 @@ impl Actor { let read = FramedRead::new(read, wire::JsonCodec::default()) .map(move |item| maker_cfd::TakerStreamMessage { taker_id, item }); - tokio::spawn(self.cfd_actor.clone().attach_stream(read)); + let this = ctx.address().expect("self to be alive"); + tokio::spawn(this.attach_stream(Box::pin(read))); let out_msg_actor = send_to_socket::Actor::new(write) .create(None) @@ -173,7 +185,7 @@ impl Actor { self.write_connections.insert(taker_id, out_msg_actor); let _ = self - .cfd_actor + .new_taker_channel .send(maker_cfd::NewTakerOnline { id: taker_id }) .await; } @@ -203,10 +215,10 @@ impl Handler for Actor { #[async_trait] impl Handler for Actor { - async fn handle(&mut self, msg: ListenerMessage, _ctx: &mut Context) -> KeepRunning { + async fn handle(&mut self, msg: ListenerMessage, ctx: &mut Context) -> KeepRunning { match msg { ListenerMessage::NewConnection { stream, address } => { - self.handle_new_connection(stream, address).await; + self.handle_new_connection(stream, address, ctx).await; KeepRunning::Yes } @@ -221,6 +233,14 @@ impl Handler for Actor { } } +#[async_trait] +impl Handler for Actor { + async fn handle(&mut self, msg: TakerStreamMessage, _ctx: &mut Context) -> KeepRunning { + log_error!(self.taker_msg_channel.send(msg)); + KeepRunning::Yes + } +} + impl Message for BroadcastOrder { type Result = (); }