Browse Source

Move framed read and write into connection setup

This enables us read/write messages before creating actors.
feature/reconnect-button
Daniel Karzel 3 years ago
parent
commit
287ad4788d
No known key found for this signature in database GPG Key ID: 30C3FC2E438ADB6E
  1. 17
      daemon/src/connection.rs
  2. 7
      daemon/src/maker_inc_connections.rs
  3. 8
      daemon/src/send_to_socket.rs

17
daemon/src/connection.rs

@ -2,6 +2,7 @@ use crate::address_map::{AddressMap, Stopping};
use crate::model::cfd::OrderId;
use crate::model::{Price, Timestamp, Usd};
use crate::tokio_ext::FutureExt;
use crate::wire::EncryptedJsonCodec;
use crate::{collab_settlement_taker, log_error, noise, send_to_socket, setup_taker, wire, Tasks};
use anyhow::{Context, Result};
use bdk::bitcoin::Amount;
@ -12,7 +13,7 @@ use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use tokio::net::TcpStream;
use tokio::sync::watch;
use tokio_util::codec::FramedRead;
use tokio_util::codec::{FramedRead, FramedWrite};
use xtra::prelude::MessageChannel;
use xtra::KeepRunning;
use xtra_productivity::xtra_productivity;
@ -193,17 +194,15 @@ impl Actor {
(read, write, Arc::new(Mutex::new(noise)))
};
let send_to_socket = send_to_socket::Actor::new(write, noise.clone());
let read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise.clone()));
let write = FramedWrite::new(write, EncryptedJsonCodec::new(noise));
let this = ctx.address().expect("self to be alive");
let send_to_socket = send_to_socket::Actor::new(write);
let mut tasks = Tasks::default();
tasks.add(self.send_to_maker_ctx.attach(send_to_socket));
let read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise))
.map(move |item| MakerStreamMessage { item });
let this = ctx.address().expect("self to be alive");
tasks.add(this.attach_stream(read));
tasks.add(this.attach_stream(read.map(move |item| MakerStreamMessage { item })));
tasks.add(
ctx.notify_interval(self.heartbeat_timeout, || MeasurePulse)
.expect("we just started"),

7
daemon/src/maker_inc_connections.rs

@ -3,6 +3,7 @@ use crate::maker_cfd::{FromTaker, TakerConnected, TakerDisconnected};
use crate::model::cfd::{Order, OrderId};
use crate::model::Identity;
use crate::noise::TransportStateExt;
use crate::wire::EncryptedJsonCodec;
use crate::{maker_cfd, noise, send_to_socket, setup_maker, wire, Tasks};
use anyhow::Result;
use futures::TryStreamExt;
@ -12,7 +13,7 @@ use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::net::TcpStream;
use tokio_util::codec::FramedRead;
use tokio_util::codec::{FramedRead, FramedWrite};
use xtra::prelude::*;
use xtra::KeepRunning;
use xtra_productivity::xtra_productivity;
@ -128,7 +129,7 @@ impl Actor {
let (read, write) = stream.into_split();
let mut read =
FramedRead::new(read, wire::EncryptedJsonCodec::new(transport_state.clone()));
let write = FramedWrite::new(write, EncryptedJsonCodec::new(transport_state));
let this = ctx.address().expect("self to be alive");
let read_fut = async move {
while let Ok(Some(msg)) = read.try_next().await {
@ -143,7 +144,7 @@ impl Actor {
};
let (out_msg, mut out_msg_actor_context) = xtra::Context::new(None);
let send_to_socket_actor = send_to_socket::Actor::new(write, transport_state.clone());
let send_to_socket_actor = send_to_socket::Actor::new(write);
let heartbeat_fut = out_msg_actor_context
.notify_interval(self.heartbeat_interval, || wire::MakerToTaker::Heartbeat)

8
daemon/src/send_to_socket.rs

@ -1,9 +1,7 @@
use crate::wire::{self, EncryptedJsonCodec};
use futures::SinkExt;
use serde::Serialize;
use snow::TransportState;
use std::fmt;
use std::sync::{Arc, Mutex};
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::OwnedWriteHalf;
use tokio_util::codec::FramedWrite;
@ -14,10 +12,8 @@ pub struct Actor<T> {
}
impl<T> Actor<T> {
pub fn new(write: OwnedWriteHalf, transport_state: Arc<Mutex<TransportState>>) -> Self {
Self {
write: FramedWrite::new(write, EncryptedJsonCodec::new(transport_state)),
}
pub fn new(write: FramedWrite<OwnedWriteHalf, EncryptedJsonCodec<T>>) -> Self {
Self { write }
}
pub async fn shutdown(self) {

Loading…
Cancel
Save