diff --git a/Cargo.lock b/Cargo.lock index d67725b..27525af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,41 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "aead" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b613b8e1e3cf911a086f53f03bf286f52fd7a7258e4fa606f0ef220d39d8877" +dependencies = [ + "generic-array", +] + +[[package]] +name = "aes" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e8b47f52ea9bae42228d07ec09eb676433d7c4ed1ebdf0f1d1c29ed446f1ab8" +dependencies = [ + "cfg-if 1.0.0", + "cipher", + "cpufeatures", + "opaque-debug", +] + +[[package]] +name = "aes-gcm" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df5f85a83a7d8b0442b6aa7b504b8212c1733da07b98aae43d4bc21b2cb3cdf6" +dependencies = [ + "aead", + "aes", + "cipher", + "ctr", + "ghash", + "subtle", +] + [[package]] name = "ahash" version = "0.7.4" @@ -264,6 +299,17 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "blake2" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a4e37d16930f5459780f5621038b6382b9bb37c19016f39fb6b5808d831f174" +dependencies = [ + "crypto-mac 0.8.0", + "digest", + "opaque-debug", +] + [[package]] name = "block-buffer" version = "0.9.0" @@ -348,6 +394,31 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chacha20" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f08493fa7707effc63254c66c6ea908675912493cd67952eda23c09fae2610b1" +dependencies = [ + "cfg-if 1.0.0", + "cipher", + "cpufeatures", + "zeroize", +] + +[[package]] +name = "chacha20poly1305" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6547abe025f4027edacd9edaa357aded014eecec42a5070d9b885c3c334aba2" +dependencies = [ + "aead", + "chacha20", + "cipher", + "poly1305", + "zeroize", +] + [[package]] name = "chrono" version = "0.4.19" @@ -362,6 +433,15 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "cipher" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ee52072ec15386f770805afd189a01c8841be8696bed250fa2f13c4c0d6dfb7" +dependencies = [ + "generic-array", +] + [[package]] name = "clap" version = "3.0.0-beta.4" @@ -492,6 +572,16 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "crypto-mac" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b584a330336237c1eecd3e94266efb216c56ed91225d634cb2991c5f3fd1aeab" +dependencies = [ + "generic-array", + "subtle", +] + [[package]] name = "crypto-mac" version = "0.11.1" @@ -512,6 +602,28 @@ dependencies = [ "syn", ] +[[package]] +name = "ctr" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "049bb91fb4aaf0e3c7efa6cd5ef877dbbbd15b39dad06d9948de4ec8a75761ea" +dependencies = [ + "cipher", +] + +[[package]] +name = "curve25519-dalek" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b9fdf9972b2bd6af2d913799d9ebc165ea4d2e65878e329d9c6b372c4491b61" +dependencies = [ + "byteorder", + "digest", + "rand_core 0.5.1", + "subtle", + "zeroize", +] + [[package]] name = "daemon" version = "0.1.0" @@ -548,6 +660,7 @@ dependencies = [ "serde_test", "serde_with", "sha2", + "snow", "sqlx", "thiserror", "time 0.3.4", @@ -975,6 +1088,16 @@ dependencies = [ "syn", ] +[[package]] +name = "ghash" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1583cc1656d7839fd3732b80cf4f38850336cdb9b8ded1cd399ca62958de3c99" +dependencies = [ + "opaque-debug", + "polyval", +] + [[package]] name = "git2" version = "0.13.23" @@ -1071,7 +1194,7 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a2a2320eb7ec0ebe8da8f744d7812d9fc4cb4d09344ac01898dbcb6a20ae69b" dependencies = [ - "crypto-mac", + "crypto-mac 0.11.1", "digest", ] @@ -1687,6 +1810,15 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "pest" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10f4872ae94d7b90ae48754df22fd42ad52ce740b8f370b03da4835417403e53" +dependencies = [ + "ucd-trie", +] + [[package]] name = "pin-project" version = "1.0.8" @@ -1731,6 +1863,29 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb20dcc30536a1508e75d47dd0e399bb2fe7354dcf35cda9127f2bf1ed92e30e" +[[package]] +name = "poly1305" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "048aeb476be11a4b6ca432ca569e375810de9294ae78f4774e78ea98a9246ede" +dependencies = [ + "cpufeatures", + "opaque-debug", + "universal-hash", +] + +[[package]] +name = "polyval" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8419d2b623c7c0896ff2d5d96e2cb4ede590fed28fcc34934f4c33c036e620a1" +dependencies = [ + "cfg-if 1.0.0", + "cpufeatures", + "opaque-debug", + "universal-hash", +] + [[package]] name = "ppv-lite86" version = "0.2.10" @@ -2352,13 +2507,22 @@ dependencies = [ "rust_decimal", ] +[[package]] +name = "rustc_version" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0dfe2087c51c460008730de8b57e6a320782fbfb312e1f4d520e6c6fae155ee" +dependencies = [ + "semver 0.11.0", +] + [[package]] name = "rustc_version" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" dependencies = [ - "semver", + "semver 1.0.4", ] [[package]] @@ -2525,12 +2689,30 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6" +dependencies = [ + "semver-parser", +] + [[package]] name = "semver" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "568a8e6258aa33c13358f81fd834adb854c6f7c9468520910a9b1e8fac068012" +[[package]] +name = "semver-parser" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0bef5b7f9e0df16536d3961cfb6e84331c065b4066afb39768d0e319411f7" +dependencies = [ + "pest", +] + [[package]] name = "serde" version = "1.0.130" @@ -2685,6 +2867,23 @@ version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309" +[[package]] +name = "snow" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6142f7c25e94f6fd25a32c3348ec230df9109b463f59c8c7acc4bd34936babb7" +dependencies = [ + "aes-gcm", + "blake2", + "chacha20poly1305", + "rand 0.8.4", + "rand_core 0.6.3", + "rustc_version 0.3.3", + "sha2", + "subtle", + "x25519-dalek", +] + [[package]] name = "socket2" version = "0.4.2" @@ -2891,6 +3090,18 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "synstructure" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "unicode-xid", +] + [[package]] name = "sysinfo" version = "0.19.2" @@ -3255,6 +3466,12 @@ dependencies = [ "serde", ] +[[package]] +name = "ucd-trie" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56dee185309b50d1f11bfedef0fe6d036842e3fb77413abef29f8f8d1c5d4c1c" + [[package]] name = "uncased" version = "0.9.6" @@ -3310,6 +3527,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" +[[package]] +name = "universal-hash" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f214e8f697e925001e66ec2c6e37a4ef93f0f78c2eed7814394e10c62025b05" +dependencies = [ + "generic-array", + "subtle", +] + [[package]] name = "untrusted" version = "0.7.1" @@ -3368,7 +3595,7 @@ dependencies = [ "enum-iterator", "getset", "git2", - "rustc_version", + "rustc_version 0.4.0", "rustversion", "sysinfo", "thiserror", @@ -3599,6 +3826,17 @@ dependencies = [ "winapi-build", ] +[[package]] +name = "x25519-dalek" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2392b6b94a576b4e2bf3c5b2757d63f10ada8020a2e4d08ac849ebcf6ea8e077" +dependencies = [ + "curve25519-dalek", + "rand_core 0.5.1", + "zeroize", +] + [[package]] name = "xtra" version = "0.6.0" @@ -3633,3 +3871,24 @@ name = "yansi" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fc79f4a1e39857fc00c3f662cbf2651c771f00e9c15fe2abc341806bd46bd71" + +[[package]] +name = "zeroize" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4756f7db3f7b5574938c3eb1c117038b8e07f95ee6718c0efad4ac21508f1efd" +dependencies = [ + "zeroize_derive", +] + +[[package]] +name = "zeroize_derive" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdff2024a851a322b08f179173ae2ba620445aef1e838f0c196820eade4ae0c7" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml index e72fd65..38bd8bb 100644 --- a/daemon/Cargo.toml +++ b/daemon/Cargo.toml @@ -32,6 +32,7 @@ serde_json = "1" serde_plain = "1" serde_with = { version = "1", features = ["macros"] } sha2 = "0.9" +snow = "0.8" sqlx = { version = "0.5", features = ["offline", "sqlite", "uuid", "runtime-tokio-rustls"] } thiserror = "1" time = { version = "0.3", features = ["serde"] } diff --git a/daemon/src/connection.rs b/daemon/src/connection.rs index 05dbe02..e75dd5e 100644 --- a/daemon/src/connection.rs +++ b/daemon/src/connection.rs @@ -1,6 +1,8 @@ -use crate::{send_to_socket, taker_cfd, wire}; +use crate::{noise, send_to_socket, taker_cfd, wire}; +use anyhow::Result; use futures::{Stream, StreamExt}; use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio_util::codec::FramedRead; use xtra::prelude::MessageChannel; @@ -15,11 +17,14 @@ pub struct Actor { } impl Actor { - pub async fn new(maker: SocketAddr) -> Self { - let (read, write) = loop { + pub async fn new(maker: SocketAddr) -> Result { + let (read, write, noise) = loop { let socket = tokio::net::TcpSocket::new_v4().expect("Be able ta create a socket"); - if let Ok(connection) = socket.connect(maker).await { - break connection.into_split(); + if let Ok(mut connection) = socket.connect(maker).await { + let noise = noise::initiator_handshake(&mut connection).await?; + + let (read, write) = connection.into_split(); + break (read, write, Arc::new(Mutex::new(noise))); } else { tracing::warn!( "Could not connect to the maker, retrying in {}s ...", @@ -29,16 +34,16 @@ impl Actor { } }; - let send_to_maker = send_to_socket::Actor::new(write) + let send_to_maker = send_to_socket::Actor::new(write, noise.clone()) .create(None) .spawn_global(); - let read = FramedRead::new(read, wire::JsonCodec::default()) + let read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise)) .map(move |item| taker_cfd::MakerStreamMessage { item }); - Self { + Ok(Self { send_to_maker: Box::new(send_to_maker), read_from_maker: Box::new(read), - } + }) } } diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 403f0e6..154dc3e 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -30,6 +30,7 @@ pub mod maker_cfd; pub mod maker_inc_connections; pub mod model; pub mod monitor; +mod noise; pub mod olivia; pub mod oracle; pub mod payout_curve; diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index 8da7f96..2ca7743 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -1,12 +1,13 @@ use crate::maker_cfd::{FromTaker, NewTakerOnline}; use crate::model::cfd::{Order, OrderId}; use crate::model::{BitMexPriceEventId, TakerId}; -use crate::{forward_only_ok, maker_cfd, send_to_socket, wire}; +use crate::{forward_only_ok, maker_cfd, noise, send_to_socket, wire}; use anyhow::{Context as AnyhowContext, Result}; use futures::{StreamExt, TryStreamExt}; use std::collections::HashMap; use std::io; use std::net::SocketAddr; +use std::sync::{Arc, Mutex}; use tokio::net::TcpStream; use tokio_util::codec::FramedRead; use xtra::prelude::*; @@ -94,16 +95,18 @@ impl Actor { async fn handle_new_connection_impl( &mut self, - stream: TcpStream, + mut stream: TcpStream, address: SocketAddr, _: &mut Context, - ) { + ) -> Result<()> { let taker_id = TakerId::default(); tracing::info!("New taker {} connected on {}", taker_id, address); + let noise = Arc::new(Mutex::new(noise::responder_handshake(&mut stream).await?)); + let (read, write) = stream.into_split(); - let read = FramedRead::new(read, wire::JsonCodec::default()) + let read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise.clone())) .map_ok(move |msg| FromTaker { taker_id, msg }) .map(forward_only_ok::Message); @@ -115,7 +118,7 @@ impl Actor { // only allow outgoing messages while we are successfully reading incoming ones tokio::spawn(async move { - let mut actor = send_to_socket::Actor::new(write); + let mut actor = send_to_socket::Actor::new(write, noise.clone()); out_msg_actor_context .handle_while(&mut actor, forward_to_cfd.attach_stream(read)) @@ -133,6 +136,8 @@ impl Actor { .new_taker_channel .send(maker_cfd::NewTakerOnline { id: taker_id }) .await; + + Ok(()) } } @@ -210,8 +215,9 @@ impl Actor { async fn handle(&mut self, msg: ListenerMessage, ctx: &mut Context) -> KeepRunning { match msg { ListenerMessage::NewConnection { stream, address } => { - self.handle_new_connection_impl(stream, address, ctx).await; - + if let Err(err) = self.handle_new_connection_impl(stream, address, ctx).await { + tracing::warn!("Maker was unable to negotiate a new connection: {}", err); + } KeepRunning::Yes } ListenerMessage::Error { source } => { diff --git a/daemon/src/noise.rs b/daemon/src/noise.rs new file mode 100644 index 0000000..62a8950 --- /dev/null +++ b/daemon/src/noise.rs @@ -0,0 +1,74 @@ +use anyhow::Result; +use snow::{Builder, TransportState}; +use std::io; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; + +pub static NOISE_MAX_MSG_LEN: u32 = 65535; +pub static NOISE_TAG_LEN: u32 = 16; +// TODO: investigate what these params do and whether we can use get authentication for free using +// them +static NOISE_PARAMS: &str = "Noise_XX_25519_ChaChaPoly_BLAKE2s"; + +pub async fn initiator_handshake(connection: &mut TcpStream) -> Result { + let builder: Builder<'_> = Builder::new(NOISE_PARAMS.parse()?); + let static_key = builder.generate_keypair()?.private; + + let mut noise = builder.local_private_key(&static_key).build_initiator()?; + + let mut buf = vec![0u8; NOISE_MAX_MSG_LEN as usize]; + + let len = noise.write_message(&[], &mut buf)?; + send(connection, &buf[..len]).await?; + + noise.read_message(&recv(connection).await?, &mut buf)?; + + let len = noise.write_message(&[], &mut buf)?; + send(connection, &buf[..len]).await?; + + let noise = noise.into_transport_mode()?; + + tracing::debug!("Noise protocol initiator handshake is complete"); + + Ok(noise) +} + +pub async fn responder_handshake(connection: &mut TcpStream) -> Result { + let builder: Builder<'_> = Builder::new(NOISE_PARAMS.parse()?); + let static_key = builder.generate_keypair()?.private; + + let mut noise = builder.local_private_key(&static_key).build_responder()?; + + let mut buf = vec![0u8; NOISE_MAX_MSG_LEN as usize]; + + noise.read_message(&recv(connection).await?, &mut buf)?; + + let len = noise.write_message(&[0u8; 0], &mut buf)?; + send(connection, &buf[..len]).await?; + + noise.read_message(&recv(connection).await?, &mut buf)?; + + let noise = noise.into_transport_mode()?; + + tracing::debug!("Noise protocol responder handshake is complete"); + + Ok(noise) +} + +/// Hyper-basic stream transport receiver. 16-bit BE size followed by payload. +async fn recv(stream: &mut TcpStream) -> io::Result> { + let mut msg_len_buf = [0u8; 2]; + stream.read_exact(&mut msg_len_buf).await?; + let msg_len = ((msg_len_buf[0] as usize) << 8) + (msg_len_buf[1] as usize); + let mut msg = vec![0u8; msg_len]; + stream.read_exact(&mut msg[..]).await?; + Ok(msg) +} + +/// Hyper-basic stream transport sender. 16-bit BE size followed by payload. +async fn send(stream: &mut TcpStream, buf: &[u8]) -> Result<()> { + let msg_len_buf = [(buf.len() >> 8) as u8, (buf.len() & 0xff) as u8]; + stream.write_all(&msg_len_buf).await?; + stream.write_all(buf).await?; + Ok(()) +} diff --git a/daemon/src/send_to_socket.rs b/daemon/src/send_to_socket.rs index 21e6c29..84f5cc5 100644 --- a/daemon/src/send_to_socket.rs +++ b/daemon/src/send_to_socket.rs @@ -1,20 +1,22 @@ -use crate::wire::{self, JsonCodec}; +use crate::wire::{self, EncryptedJsonCodec}; use futures::SinkExt; use serde::Serialize; +use snow::TransportState; use std::fmt; +use std::sync::{Arc, Mutex}; use tokio::io::AsyncWriteExt; use tokio::net::tcp::OwnedWriteHalf; use tokio_util::codec::FramedWrite; use xtra::{Handler, Message}; pub struct Actor { - write: FramedWrite>, + write: FramedWrite>, } impl Actor { - pub fn new(write: OwnedWriteHalf) -> Self { + pub fn new(write: OwnedWriteHalf, transport_state: Arc>) -> Self { Self { - write: FramedWrite::new(write, JsonCodec::default()), + write: FramedWrite::new(write, EncryptedJsonCodec::new(transport_state)), } } diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 3258c4d..8b63f06 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -168,7 +168,7 @@ async fn main() -> Result<()> { let connection::Actor { send_to_maker, read_from_maker, - } = connection::Actor::new(opts.maker).await; + } = connection::Actor::new(opts.maker).await?; let TakerActorSystem { cfd_actor_addr, diff --git a/daemon/src/wire.rs b/daemon/src/wire.rs index a833cee..fecb998 100644 --- a/daemon/src/wire.rs +++ b/daemon/src/wire.rs @@ -1,5 +1,6 @@ use crate::model::cfd::{Order, OrderId}; use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd}; +use crate::noise::{NOISE_MAX_MSG_LEN, NOISE_TAG_LEN}; use anyhow::{bail, Result}; use bdk::bitcoin::secp256k1::Signature; use bdk::bitcoin::util::psbt::PartiallySignedTransaction; @@ -9,10 +10,12 @@ use cfd_protocol::secp256k1_zkp::{EcdsaAdaptorSignature, SecretKey}; use cfd_protocol::{CfdTransactions, PartyParams, PunishParams}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; +use snow::TransportState; use std::collections::HashMap; use std::fmt; use std::marker::PhantomData; use std::ops::RangeInclusive; +use std::sync::{Arc, Mutex}; use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; #[derive(Debug, Serialize, Deserialize)] @@ -93,21 +96,23 @@ impl fmt::Display for MakerToTaker { } } -pub struct JsonCodec { +pub struct EncryptedJsonCodec { _type: PhantomData, inner: LengthDelimitedCodec, + transport_state: Arc>, } -impl Default for JsonCodec { - fn default() -> Self { +impl EncryptedJsonCodec { + pub fn new(transport_state: Arc>) -> Self { Self { _type: PhantomData, inner: LengthDelimitedCodec::new(), + transport_state, } } } -impl Decoder for JsonCodec +impl Decoder for EncryptedJsonCodec where T: DeserializeOwned, { @@ -120,13 +125,30 @@ where Some(bytes) => bytes, }; - let item = serde_json::from_slice(&bytes)?; + let mut transport = self + .transport_state + .lock() + .expect("acquired mutex lock on Noise object to encrypt message"); + + let decrypted = bytes + .chunks(NOISE_MAX_MSG_LEN as usize) + .map(|chunk| { + let mut buf = vec![0u8; chunk.len() - NOISE_TAG_LEN as usize]; + transport.read_message(chunk, &mut *buf)?; + Ok(buf) + }) + .collect::>>>()? + .into_iter() + .flatten() + .collect::>(); + + let item = serde_json::from_slice(&decrypted)?; Ok(Some(item)) } } -impl Encoder for JsonCodec +impl Encoder for EncryptedJsonCodec where T: Serialize, { @@ -135,7 +157,24 @@ where fn encode(&mut self, item: T, dst: &mut BytesMut) -> Result<(), Self::Error> { let bytes = serde_json::to_vec(&item)?; - self.inner.encode(bytes.into(), dst)?; + let mut transport = self + .transport_state + .lock() + .expect("acquired mutex lock on Noise object to encrypt message"); + + let encrypted = bytes + .chunks((NOISE_MAX_MSG_LEN - NOISE_TAG_LEN) as usize) + .map(|chunk| { + let mut buf = vec![0u8; chunk.len() + NOISE_TAG_LEN as usize]; + transport.write_message(chunk, &mut *buf)?; + Ok(buf) + }) + .collect::>>>()? + .into_iter() + .flatten() + .collect::>(); + + self.inner.encode(encrypted.into(), dst)?; Ok(()) } diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index 4631d71..f98c436 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -130,7 +130,9 @@ impl Taker { let connection::Actor { send_to_maker, read_from_maker, - } = connection::Actor::new(maker_address).await; + } = connection::Actor::new(maker_address) + .await + .expect("Connected to maker"); let db = in_memory_db().await;