Browse Source

Roll out xtra_productivity inside maker_inc_connections actor

feature/actor-custom-derive
Mariusz Klochowicz 3 years ago
parent
commit
5650ab1ec5
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 1
      Cargo.lock
  2. 1
      daemon/Cargo.toml
  3. 4
      daemon/src/maker_cfd.rs
  4. 131
      daemon/src/maker_inc_connections.rs

1
Cargo.lock

@ -555,6 +555,7 @@ dependencies = [
"uuid",
"vergen",
"xtra",
"xtra_productivity",
]
[[package]]

1
daemon/Cargo.toml

@ -41,6 +41,7 @@ tracing = { version = "0.1" }
tracing-subscriber = { version = "0.2", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log", "json"] }
uuid = { version = "0.8", features = ["serde", "v4"] }
xtra = { version = "0.6", features = ["with-tokio-1"] }
xtra_productivity = { path = "../xtra_productivity" }
[[bin]]
name = "taker"

4
daemon/src/maker_cfd.rs

@ -583,7 +583,7 @@ where
taker_id,
command: TakerCommand::NotifyOrderAccepted { id: order_id },
})
.await?;
.await??;
Ok(())
}
});
@ -770,7 +770,7 @@ where
oracle_event_id,
},
})
.await?;
.await??;
Ok(())
}
});

131
daemon/src/maker_inc_connections.rs

@ -1,7 +1,7 @@
use crate::maker_cfd::{FromTaker, NewTakerOnline};
use crate::model::cfd::{Order, OrderId};
use crate::model::{BitMexPriceEventId, TakerId};
use crate::{forward_only_ok, log_error, maker_cfd, send_to_socket, wire};
use crate::{forward_only_ok, maker_cfd, send_to_socket, wire};
use anyhow::{Context as AnyhowContext, Result};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
@ -13,6 +13,7 @@ use tokio_util::codec::FramedRead;
use xtra::prelude::*;
use xtra::spawn::TokioGlobalSpawnExt;
use xtra::{Actor as _, KeepRunning};
use xtra_productivity::xtra_productivity;
pub struct BroadcastOrder(pub Option<Order>);
@ -92,6 +93,52 @@ impl Actor {
Ok(())
}
async fn handle_new_connection_impl(
&mut self,
stream: TcpStream,
address: SocketAddr,
_: &mut Context<Self>,
) {
let taker_id = TakerId::default();
tracing::info!("New taker {} connected on {}", taker_id, address);
let (read, write) = stream.into_split();
let read = FramedRead::new(read, wire::JsonCodec::default())
.map_ok(move |msg| FromTaker { taker_id, msg })
.map(forward_only_ok::Message);
let (out_msg_actor_address, mut out_msg_actor_context) = xtra::Context::new(None);
let forward_to_cfd = forward_only_ok::Actor::new(self.taker_msg_channel.clone_channel())
.create(None)
.spawn_global();
// only allow outgoing messages while we are successfully reading incoming ones
tokio::spawn(async move {
let mut actor = send_to_socket::Actor::new(write);
out_msg_actor_context
.handle_while(&mut actor, forward_to_cfd.attach_stream(read))
.await;
tracing::error!("Closing connection to taker {}", taker_id);
actor.shutdown().await;
});
self.write_connections
.insert(taker_id, out_msg_actor_address);
let _ = self
.new_taker_channel
.send(maker_cfd::NewTakerOnline { id: taker_id })
.await;
}
}
#[xtra_productivity]
impl Actor {
async fn handle_broadcast_order(&mut self, msg: BroadcastOrder) -> Result<()> {
let order = msg.0;
@ -161,78 +208,10 @@ impl Actor {
Ok(())
}
async fn handle_new_connection(
&mut self,
stream: TcpStream,
address: SocketAddr,
_: &mut Context<Self>,
) {
let taker_id = TakerId::default();
tracing::info!("New taker {} connected on {}", taker_id, address);
let (read, write) = stream.into_split();
let read = FramedRead::new(read, wire::JsonCodec::default())
.map_ok(move |msg| FromTaker { taker_id, msg })
.map(forward_only_ok::Message);
let (out_msg_actor_address, mut out_msg_actor_context) = xtra::Context::new(None);
let forward_to_cfd = forward_only_ok::Actor::new(self.taker_msg_channel.clone_channel())
.create(None)
.spawn_global();
// only allow outgoing messages while we are successfully reading incoming ones
tokio::spawn(async move {
let mut actor = send_to_socket::Actor::new(write);
out_msg_actor_context
.handle_while(&mut actor, forward_to_cfd.attach_stream(read))
.await;
tracing::error!("Closing connection to taker {}", taker_id);
actor.shutdown().await;
});
self.write_connections
.insert(taker_id, out_msg_actor_address);
let _ = self
.new_taker_channel
.send(maker_cfd::NewTakerOnline { id: taker_id })
.await;
}
}
macro_rules! log_error {
($future:expr) => {
if let Err(e) = $future.await {
tracing::error!(%e);
}
};
}
#[async_trait]
impl Handler<BroadcastOrder> for Actor {
async fn handle(&mut self, msg: BroadcastOrder, _ctx: &mut Context<Self>) {
log_error!(self.handle_broadcast_order(msg));
}
}
#[async_trait]
impl Handler<TakerMessage> for Actor {
async fn handle(&mut self, msg: TakerMessage, _ctx: &mut Context<Self>) {
log_error!(self.handle_taker_message(msg));
}
}
#[async_trait]
impl Handler<ListenerMessage> for Actor {
async fn handle(&mut self, msg: ListenerMessage, ctx: &mut Context<Self>) -> KeepRunning {
match msg {
ListenerMessage::NewConnection { stream, address } => {
self.handle_new_connection(stream, address, ctx).await;
self.handle_new_connection_impl(stream, address, ctx).await;
KeepRunning::Yes
}
@ -247,16 +226,4 @@ impl Handler<ListenerMessage> for Actor {
}
}
impl Message for BroadcastOrder {
type Result = ();
}
impl Message for TakerMessage {
type Result = ();
}
impl Message for ListenerMessage {
type Result = KeepRunning;
}
impl xtra::Actor for Actor {}

Loading…
Cancel
Save