|
|
@ -2,7 +2,7 @@ use crate::maker_cfd::{FromTaker, NewTakerOnline}; |
|
|
|
use crate::model::cfd::{Order, OrderId}; |
|
|
|
use crate::model::{BitMexPriceEventId, TakerId}; |
|
|
|
use crate::tokio_ext::FutureExt; |
|
|
|
use crate::{forward_only_ok, maker_cfd, noise, send_to_socket, wire, HEARTBEAT_INTERVAL}; |
|
|
|
use crate::{forward_only_ok, maker_cfd, noise, send_to_socket, wire}; |
|
|
|
use anyhow::Result; |
|
|
|
use futures::future::RemoteHandle; |
|
|
|
use futures::{StreamExt, TryStreamExt}; |
|
|
@ -10,6 +10,7 @@ use std::collections::HashMap; |
|
|
|
use std::io; |
|
|
|
use std::net::SocketAddr; |
|
|
|
use std::sync::{Arc, Mutex}; |
|
|
|
use std::time::Duration; |
|
|
|
use tokio::net::TcpStream; |
|
|
|
use tokio_util::codec::FramedRead; |
|
|
|
use xtra::prelude::*; |
|
|
@ -69,6 +70,7 @@ pub struct Actor { |
|
|
|
new_taker_channel: Box<dyn MessageChannel<NewTakerOnline>>, |
|
|
|
taker_msg_channel: Box<dyn MessageChannel<FromTaker>>, |
|
|
|
noise_priv_key: x25519_dalek::StaticSecret, |
|
|
|
heartbeat_interval: Duration, |
|
|
|
tasks: Vec<RemoteHandle<()>>, |
|
|
|
} |
|
|
|
|
|
|
@ -77,12 +79,14 @@ impl Actor { |
|
|
|
new_taker_channel: Box<dyn MessageChannel<NewTakerOnline>>, |
|
|
|
taker_msg_channel: Box<dyn MessageChannel<FromTaker>>, |
|
|
|
noise_priv_key: x25519_dalek::StaticSecret, |
|
|
|
heartbeat_interval: Duration, |
|
|
|
) -> Self { |
|
|
|
Self { |
|
|
|
write_connections: HashMap::new(), |
|
|
|
new_taker_channel: new_taker_channel.clone_channel(), |
|
|
|
taker_msg_channel: taker_msg_channel.clone_channel(), |
|
|
|
noise_priv_key, |
|
|
|
heartbeat_interval, |
|
|
|
tasks: Vec::new(), |
|
|
|
} |
|
|
|
} |
|
|
@ -135,12 +139,13 @@ impl Actor { |
|
|
|
self.tasks.push(forward_to_cfd_fut.spawn_with_handle()); |
|
|
|
|
|
|
|
// only allow outgoing messages while we are successfully reading incoming ones
|
|
|
|
let heartbeat_interval = self.heartbeat_interval; |
|
|
|
self.tasks.push( |
|
|
|
async move { |
|
|
|
let mut actor = send_to_socket::Actor::new(write, noise.clone()); |
|
|
|
|
|
|
|
let _heartbeat_handle = out_msg_actor_context |
|
|
|
.notify_interval(HEARTBEAT_INTERVAL, || wire::MakerToTaker::Heartbeat) |
|
|
|
.notify_interval(heartbeat_interval, || wire::MakerToTaker::Heartbeat) |
|
|
|
.expect("actor not to shutdown") |
|
|
|
.spawn_with_handle(); |
|
|
|
|
|
|
|