diff --git a/daemon/src/connection.rs b/daemon/src/connection.rs index 795d73e..950fe1e 100644 --- a/daemon/src/connection.rs +++ b/daemon/src/connection.rs @@ -2,6 +2,7 @@ use crate::address_map::{AddressMap, Stopping}; use crate::model::cfd::OrderId; use crate::model::{Price, Timestamp, Usd}; use crate::tokio_ext::FutureExt; +use crate::wire::EncryptedJsonCodec; use crate::{collab_settlement_taker, log_error, noise, send_to_socket, setup_taker, wire, Tasks}; use anyhow::{Context, Result}; use bdk::bitcoin::Amount; @@ -12,7 +13,7 @@ use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; use tokio::net::TcpStream; use tokio::sync::watch; -use tokio_util::codec::FramedRead; +use tokio_util::codec::{FramedRead, FramedWrite}; use xtra::prelude::MessageChannel; use xtra::KeepRunning; use xtra_productivity::xtra_productivity; @@ -193,17 +194,15 @@ impl Actor { (read, write, Arc::new(Mutex::new(noise))) }; - let send_to_socket = send_to_socket::Actor::new(write, noise.clone()); + let read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise.clone())); + let write = FramedWrite::new(write, EncryptedJsonCodec::new(noise)); + let this = ctx.address().expect("self to be alive"); + + let send_to_socket = send_to_socket::Actor::new(write); let mut tasks = Tasks::default(); tasks.add(self.send_to_maker_ctx.attach(send_to_socket)); - - let read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise)) - .map(move |item| MakerStreamMessage { item }); - - let this = ctx.address().expect("self to be alive"); - tasks.add(this.attach_stream(read)); - + tasks.add(this.attach_stream(read.map(move |item| MakerStreamMessage { item }))); tasks.add( ctx.notify_interval(self.heartbeat_timeout, || MeasurePulse) .expect("we just started"), diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index c26baf3..04dd439 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -3,6 +3,7 @@ use crate::maker_cfd::{FromTaker, TakerConnected, TakerDisconnected}; use crate::model::cfd::{Order, OrderId}; use crate::model::Identity; use crate::noise::TransportStateExt; +use crate::wire::EncryptedJsonCodec; use crate::{maker_cfd, noise, send_to_socket, setup_maker, wire, Tasks}; use anyhow::Result; use futures::TryStreamExt; @@ -12,7 +13,7 @@ use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::net::TcpStream; -use tokio_util::codec::FramedRead; +use tokio_util::codec::{FramedRead, FramedWrite}; use xtra::prelude::*; use xtra::KeepRunning; use xtra_productivity::xtra_productivity; @@ -128,7 +129,7 @@ impl Actor { let (read, write) = stream.into_split(); let mut read = FramedRead::new(read, wire::EncryptedJsonCodec::new(transport_state.clone())); - + let write = FramedWrite::new(write, EncryptedJsonCodec::new(transport_state)); let this = ctx.address().expect("self to be alive"); let read_fut = async move { while let Ok(Some(msg)) = read.try_next().await { @@ -143,7 +144,7 @@ impl Actor { }; let (out_msg, mut out_msg_actor_context) = xtra::Context::new(None); - let send_to_socket_actor = send_to_socket::Actor::new(write, transport_state.clone()); + let send_to_socket_actor = send_to_socket::Actor::new(write); let heartbeat_fut = out_msg_actor_context .notify_interval(self.heartbeat_interval, || wire::MakerToTaker::Heartbeat) diff --git a/daemon/src/send_to_socket.rs b/daemon/src/send_to_socket.rs index 3fc899f..a87b005 100644 --- a/daemon/src/send_to_socket.rs +++ b/daemon/src/send_to_socket.rs @@ -1,9 +1,7 @@ use crate::wire::{self, EncryptedJsonCodec}; use futures::SinkExt; use serde::Serialize; -use snow::TransportState; use std::fmt; -use std::sync::{Arc, Mutex}; use tokio::io::AsyncWriteExt; use tokio::net::tcp::OwnedWriteHalf; use tokio_util::codec::FramedWrite; @@ -14,10 +12,8 @@ pub struct Actor { } impl Actor { - pub fn new(write: OwnedWriteHalf, transport_state: Arc>) -> Self { - Self { - write: FramedWrite::new(write, EncryptedJsonCodec::new(transport_state)), - } + pub fn new(write: FramedWrite>) -> Self { + Self { write } } pub async fn shutdown(self) {