Browse Source

Use MessageChannels in maker_inc_connections::Actor

We would have liked to attach the stream of `TakerStreamMessage`s
directly to the `MessageChannel`, but we failed and instead decided to
let the `maker_inc_connections::Actor` send these messages to itself
and then forward them to the `maker_cfd::Actor`.
refactor/no-log-handler
Lucas Soriano del Pino 3 years ago
parent
commit
370d40f0c2
No known key found for this signature in database GPG Key ID: EE611E973A1530E7
  1. 3
      daemon/src/maker.rs
  2. 36
      daemon/src/maker_inc_connections.rs

3
daemon/src/maker.rs

@ -234,7 +234,8 @@ async fn main() -> Result<()> {
tokio::spawn(
maker_inc_connections_context.run(maker_inc_connections::Actor::new(
cfd_maker_actor_inbox.clone(),
&cfd_maker_actor_inbox,
&cfd_maker_actor_inbox,
)),
);
tokio::spawn(

36
daemon/src/maker_inc_connections.rs

@ -1,3 +1,4 @@
use crate::maker_cfd::{NewTakerOnline, TakerStreamMessage};
use crate::model::cfd::{Order, OrderId};
use crate::model::{BitMexPriceEventId, TakerId};
use crate::{log_error, maker_cfd, send_to_socket, wire};
@ -63,14 +64,19 @@ pub enum ListenerMessage {
pub struct Actor {
write_connections: HashMap<TakerId, Address<send_to_socket::Actor<wire::MakerToTaker>>>,
cfd_actor: Address<maker_cfd::Actor>,
new_taker_channel: Box<dyn MessageChannel<NewTakerOnline>>,
taker_msg_channel: Box<dyn MessageChannel<TakerStreamMessage>>,
}
impl Actor {
pub fn new(cfd_actor: Address<maker_cfd::Actor>) -> Self {
pub fn new(
new_taker_channel: &impl MessageChannel<NewTakerOnline>,
taker_msg_channel: &impl MessageChannel<TakerStreamMessage>,
) -> Self {
Self {
write_connections: HashMap::new(),
cfd_actor,
new_taker_channel: new_taker_channel.clone_channel(),
taker_msg_channel: taker_msg_channel.clone_channel(),
}
}
@ -155,7 +161,12 @@ impl Actor {
Ok(())
}
async fn handle_new_connection(&mut self, stream: TcpStream, address: SocketAddr) {
async fn handle_new_connection(
&mut self,
stream: TcpStream,
address: SocketAddr,
ctx: &mut Context<Self>,
) {
let taker_id = TakerId::default();
tracing::info!("New taker {} connected on {}", taker_id, address);
@ -164,7 +175,8 @@ impl Actor {
let read = FramedRead::new(read, wire::JsonCodec::default())
.map(move |item| maker_cfd::TakerStreamMessage { taker_id, item });
tokio::spawn(self.cfd_actor.clone().attach_stream(read));
let this = ctx.address().expect("self to be alive");
tokio::spawn(this.attach_stream(Box::pin(read)));
let out_msg_actor = send_to_socket::Actor::new(write)
.create(None)
@ -173,7 +185,7 @@ impl Actor {
self.write_connections.insert(taker_id, out_msg_actor);
let _ = self
.cfd_actor
.new_taker_channel
.send(maker_cfd::NewTakerOnline { id: taker_id })
.await;
}
@ -203,10 +215,10 @@ impl Handler<TakerMessage> for Actor {
#[async_trait]
impl Handler<ListenerMessage> for Actor {
async fn handle(&mut self, msg: ListenerMessage, _ctx: &mut Context<Self>) -> KeepRunning {
async fn handle(&mut self, msg: ListenerMessage, ctx: &mut Context<Self>) -> KeepRunning {
match msg {
ListenerMessage::NewConnection { stream, address } => {
self.handle_new_connection(stream, address).await;
self.handle_new_connection(stream, address, ctx).await;
KeepRunning::Yes
}
@ -221,6 +233,14 @@ impl Handler<ListenerMessage> for Actor {
}
}
#[async_trait]
impl Handler<TakerStreamMessage> for Actor {
async fn handle(&mut self, msg: TakerStreamMessage, _ctx: &mut Context<Self>) -> KeepRunning {
log_error!(self.taker_msg_channel.send(msg));
KeepRunning::Yes
}
}
impl Message for BroadcastOrder {
type Result = ();
}

Loading…
Cancel
Save