Browse Source

Encrypt network communication using the noise protocol

More research is required into the NOISE_PARAMS and
whether they can be used to configure snow elegantly
handle authentication.
burn-down-handle
rishflab 3 years ago
parent
commit
b9db55fce3
  1. 265
      Cargo.lock
  2. 1
      daemon/Cargo.toml
  3. 23
      daemon/src/connection.rs
  4. 1
      daemon/src/lib.rs
  5. 20
      daemon/src/maker_inc_connections.rs
  6. 72
      daemon/src/noise.rs
  7. 6
      daemon/src/send_to_socket.rs
  8. 2
      daemon/src/taker.rs
  9. 31
      daemon/src/wire.rs
  10. 4
      daemon/tests/happy_path.rs

265
Cargo.lock

@ -2,6 +2,41 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3 version = 3
[[package]]
name = "aead"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b613b8e1e3cf911a086f53f03bf286f52fd7a7258e4fa606f0ef220d39d8877"
dependencies = [
"generic-array",
]
[[package]]
name = "aes"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e8b47f52ea9bae42228d07ec09eb676433d7c4ed1ebdf0f1d1c29ed446f1ab8"
dependencies = [
"cfg-if 1.0.0",
"cipher",
"cpufeatures",
"opaque-debug",
]
[[package]]
name = "aes-gcm"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df5f85a83a7d8b0442b6aa7b504b8212c1733da07b98aae43d4bc21b2cb3cdf6"
dependencies = [
"aead",
"aes",
"cipher",
"ctr",
"ghash",
"subtle",
]
[[package]] [[package]]
name = "ahash" name = "ahash"
version = "0.7.4" version = "0.7.4"
@ -264,6 +299,17 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "blake2"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a4e37d16930f5459780f5621038b6382b9bb37c19016f39fb6b5808d831f174"
dependencies = [
"crypto-mac 0.8.0",
"digest",
"opaque-debug",
]
[[package]] [[package]]
name = "block-buffer" name = "block-buffer"
version = "0.9.0" version = "0.9.0"
@ -348,6 +394,31 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chacha20"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f08493fa7707effc63254c66c6ea908675912493cd67952eda23c09fae2610b1"
dependencies = [
"cfg-if 1.0.0",
"cipher",
"cpufeatures",
"zeroize",
]
[[package]]
name = "chacha20poly1305"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6547abe025f4027edacd9edaa357aded014eecec42a5070d9b885c3c334aba2"
dependencies = [
"aead",
"chacha20",
"cipher",
"poly1305",
"zeroize",
]
[[package]] [[package]]
name = "chrono" name = "chrono"
version = "0.4.19" version = "0.4.19"
@ -362,6 +433,15 @@ dependencies = [
"winapi 0.3.9", "winapi 0.3.9",
] ]
[[package]]
name = "cipher"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ee52072ec15386f770805afd189a01c8841be8696bed250fa2f13c4c0d6dfb7"
dependencies = [
"generic-array",
]
[[package]] [[package]]
name = "clap" name = "clap"
version = "3.0.0-beta.4" version = "3.0.0-beta.4"
@ -492,6 +572,16 @@ dependencies = [
"lazy_static", "lazy_static",
] ]
[[package]]
name = "crypto-mac"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b584a330336237c1eecd3e94266efb216c56ed91225d634cb2991c5f3fd1aeab"
dependencies = [
"generic-array",
"subtle",
]
[[package]] [[package]]
name = "crypto-mac" name = "crypto-mac"
version = "0.11.1" version = "0.11.1"
@ -512,6 +602,28 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "ctr"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "049bb91fb4aaf0e3c7efa6cd5ef877dbbbd15b39dad06d9948de4ec8a75761ea"
dependencies = [
"cipher",
]
[[package]]
name = "curve25519-dalek"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b9fdf9972b2bd6af2d913799d9ebc165ea4d2e65878e329d9c6b372c4491b61"
dependencies = [
"byteorder",
"digest",
"rand_core 0.5.1",
"subtle",
"zeroize",
]
[[package]] [[package]]
name = "daemon" name = "daemon"
version = "0.1.0" version = "0.1.0"
@ -546,6 +658,7 @@ dependencies = [
"serde_test", "serde_test",
"serde_with", "serde_with",
"sha2", "sha2",
"snow",
"sqlx", "sqlx",
"thiserror", "thiserror",
"time 0.3.3", "time 0.3.3",
@ -946,6 +1059,16 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "ghash"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1583cc1656d7839fd3732b80cf4f38850336cdb9b8ded1cd399ca62958de3c99"
dependencies = [
"opaque-debug",
"polyval",
]
[[package]] [[package]]
name = "git2" name = "git2"
version = "0.13.23" version = "0.13.23"
@ -1042,7 +1165,7 @@ version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a2a2320eb7ec0ebe8da8f744d7812d9fc4cb4d09344ac01898dbcb6a20ae69b" checksum = "2a2a2320eb7ec0ebe8da8f744d7812d9fc4cb4d09344ac01898dbcb6a20ae69b"
dependencies = [ dependencies = [
"crypto-mac", "crypto-mac 0.11.1",
"digest", "digest",
] ]
@ -1625,6 +1748,15 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pest"
version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10f4872ae94d7b90ae48754df22fd42ad52ce740b8f370b03da4835417403e53"
dependencies = [
"ucd-trie",
]
[[package]] [[package]]
name = "pin-project" name = "pin-project"
version = "1.0.8" version = "1.0.8"
@ -1669,6 +1801,29 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb20dcc30536a1508e75d47dd0e399bb2fe7354dcf35cda9127f2bf1ed92e30e" checksum = "bb20dcc30536a1508e75d47dd0e399bb2fe7354dcf35cda9127f2bf1ed92e30e"
[[package]]
name = "poly1305"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "048aeb476be11a4b6ca432ca569e375810de9294ae78f4774e78ea98a9246ede"
dependencies = [
"cpufeatures",
"opaque-debug",
"universal-hash",
]
[[package]]
name = "polyval"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8419d2b623c7c0896ff2d5d96e2cb4ede590fed28fcc34934f4c33c036e620a1"
dependencies = [
"cfg-if 1.0.0",
"cpufeatures",
"opaque-debug",
"universal-hash",
]
[[package]] [[package]]
name = "ppv-lite86" name = "ppv-lite86"
version = "0.2.10" version = "0.2.10"
@ -2261,13 +2416,22 @@ dependencies = [
"rust_decimal", "rust_decimal",
] ]
[[package]]
name = "rustc_version"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0dfe2087c51c460008730de8b57e6a320782fbfb312e1f4d520e6c6fae155ee"
dependencies = [
"semver 0.11.0",
]
[[package]] [[package]]
name = "rustc_version" name = "rustc_version"
version = "0.4.0" version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
dependencies = [ dependencies = [
"semver", "semver 1.0.4",
] ]
[[package]] [[package]]
@ -2434,12 +2598,30 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "semver"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6"
dependencies = [
"semver-parser",
]
[[package]] [[package]]
name = "semver" name = "semver"
version = "1.0.4" version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "568a8e6258aa33c13358f81fd834adb854c6f7c9468520910a9b1e8fac068012" checksum = "568a8e6258aa33c13358f81fd834adb854c6f7c9468520910a9b1e8fac068012"
[[package]]
name = "semver-parser"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0bef5b7f9e0df16536d3961cfb6e84331c065b4066afb39768d0e319411f7"
dependencies = [
"pest",
]
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.130" version = "1.0.130"
@ -2594,6 +2776,23 @@ version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309" checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309"
[[package]]
name = "snow"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6142f7c25e94f6fd25a32c3348ec230df9109b463f59c8c7acc4bd34936babb7"
dependencies = [
"aes-gcm",
"blake2",
"chacha20poly1305",
"rand 0.8.4",
"rand_core 0.6.3",
"rustc_version 0.3.3",
"sha2",
"subtle",
"x25519-dalek",
]
[[package]] [[package]]
name = "socket2" name = "socket2"
version = "0.4.2" version = "0.4.2"
@ -2800,6 +2999,18 @@ dependencies = [
"unicode-xid", "unicode-xid",
] ]
[[package]]
name = "synstructure"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f"
dependencies = [
"proc-macro2",
"quote",
"syn",
"unicode-xid",
]
[[package]] [[package]]
name = "sysinfo" name = "sysinfo"
version = "0.19.2" version = "0.19.2"
@ -3158,6 +3369,12 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "ucd-trie"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56dee185309b50d1f11bfedef0fe6d036842e3fb77413abef29f8f8d1c5d4c1c"
[[package]] [[package]]
name = "uncased" name = "uncased"
version = "0.9.6" version = "0.9.6"
@ -3213,6 +3430,16 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e"
[[package]]
name = "universal-hash"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f214e8f697e925001e66ec2c6e37a4ef93f0f78c2eed7814394e10c62025b05"
dependencies = [
"generic-array",
"subtle",
]
[[package]] [[package]]
name = "untrusted" name = "untrusted"
version = "0.7.1" version = "0.7.1"
@ -3271,7 +3498,7 @@ dependencies = [
"enum-iterator", "enum-iterator",
"getset", "getset",
"git2", "git2",
"rustc_version", "rustc_version 0.4.0",
"rustversion", "rustversion",
"sysinfo", "sysinfo",
"thiserror", "thiserror",
@ -3502,6 +3729,17 @@ dependencies = [
"winapi-build", "winapi-build",
] ]
[[package]]
name = "x25519-dalek"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2392b6b94a576b4e2bf3c5b2757d63f10ada8020a2e4d08ac849ebcf6ea8e077"
dependencies = [
"curve25519-dalek",
"rand_core 0.5.1",
"zeroize",
]
[[package]] [[package]]
name = "xtra" name = "xtra"
version = "0.6.0" version = "0.6.0"
@ -3536,3 +3774,24 @@ name = "yansi"
version = "0.5.0" version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fc79f4a1e39857fc00c3f662cbf2651c771f00e9c15fe2abc341806bd46bd71" checksum = "9fc79f4a1e39857fc00c3f662cbf2651c771f00e9c15fe2abc341806bd46bd71"
[[package]]
name = "zeroize"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4756f7db3f7b5574938c3eb1c117038b8e07f95ee6718c0efad4ac21508f1efd"
dependencies = [
"zeroize_derive",
]
[[package]]
name = "zeroize_derive"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdff2024a851a322b08f179173ae2ba620445aef1e838f0c196820eade4ae0c7"
dependencies = [
"proc-macro2",
"quote",
"syn",
"synstructure",
]

1
daemon/Cargo.toml

@ -32,6 +32,7 @@ serde_json = "1"
serde_plain = "1" serde_plain = "1"
serde_with = { version = "1", features = ["macros"] } serde_with = { version = "1", features = ["macros"] }
sha2 = "0.9" sha2 = "0.9"
snow = "0.8"
sqlx = { version = "0.5", features = ["offline", "sqlite", "uuid", "runtime-tokio-rustls"] } sqlx = { version = "0.5", features = ["offline", "sqlite", "uuid", "runtime-tokio-rustls"] }
thiserror = "1" thiserror = "1"
time = { version = "0.3", features = ["serde"] } time = { version = "0.3", features = ["serde"] }

23
daemon/src/connection.rs

@ -1,6 +1,8 @@
use crate::{send_to_socket, taker_cfd, wire}; use crate::{noise, send_to_socket, taker_cfd, wire};
use anyhow::Result;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use tokio_util::codec::FramedRead; use tokio_util::codec::FramedRead;
use xtra::prelude::MessageChannel; use xtra::prelude::MessageChannel;
@ -15,11 +17,14 @@ pub struct Actor {
} }
impl Actor { impl Actor {
pub async fn new(maker: SocketAddr) -> Self { pub async fn new(maker: SocketAddr) -> Result<Self> {
let (read, write) = loop { let (read, write, noise) = loop {
let socket = tokio::net::TcpSocket::new_v4().expect("Be able ta create a socket"); let socket = tokio::net::TcpSocket::new_v4().expect("Be able ta create a socket");
if let Ok(connection) = socket.connect(maker).await { if let Ok(mut connection) = socket.connect(maker).await {
break connection.into_split(); let noise = noise::initiator_handshake(&mut connection).await?;
let (read, write) = connection.into_split();
break (read, write, Arc::new(Mutex::new(noise)));
} else { } else {
tracing::warn!( tracing::warn!(
"Could not connect to the maker, retrying in {}s ...", "Could not connect to the maker, retrying in {}s ...",
@ -29,16 +34,16 @@ impl Actor {
} }
}; };
let send_to_maker = send_to_socket::Actor::new(write) let send_to_maker = send_to_socket::Actor::new(write, noise.clone())
.create(None) .create(None)
.spawn_global(); .spawn_global();
let read = FramedRead::new(read, wire::JsonCodec::default()) let read = FramedRead::new(read, wire::JsonCodec::new(noise))
.map(move |item| taker_cfd::MakerStreamMessage { item }); .map(move |item| taker_cfd::MakerStreamMessage { item });
Self { Ok(Self {
send_to_maker: Box::new(send_to_maker), send_to_maker: Box::new(send_to_maker),
read_from_maker: Box::new(read), read_from_maker: Box::new(read),
} })
} }
} }

1
daemon/src/lib.rs

@ -30,6 +30,7 @@ pub mod maker_cfd;
pub mod maker_inc_connections; pub mod maker_inc_connections;
pub mod model; pub mod model;
pub mod monitor; pub mod monitor;
pub mod noise;
pub mod olivia; pub mod olivia;
pub mod oracle; pub mod oracle;
pub mod payout_curve; pub mod payout_curve;

20
daemon/src/maker_inc_connections.rs

@ -1,12 +1,13 @@
use crate::maker_cfd::{FromTaker, NewTakerOnline}; use crate::maker_cfd::{FromTaker, NewTakerOnline};
use crate::model::cfd::{Order, OrderId}; use crate::model::cfd::{Order, OrderId};
use crate::model::{BitMexPriceEventId, TakerId}; use crate::model::{BitMexPriceEventId, TakerId};
use crate::{forward_only_ok, maker_cfd, send_to_socket, wire}; use crate::{forward_only_ok, maker_cfd, noise, send_to_socket, wire};
use anyhow::{Context as AnyhowContext, Result}; use anyhow::{Context as AnyhowContext, Result};
use futures::{StreamExt, TryStreamExt}; use futures::{StreamExt, TryStreamExt};
use std::collections::HashMap; use std::collections::HashMap;
use std::io; use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio_util::codec::FramedRead; use tokio_util::codec::FramedRead;
use xtra::prelude::*; use xtra::prelude::*;
@ -94,16 +95,18 @@ impl Actor {
async fn handle_new_connection_impl( async fn handle_new_connection_impl(
&mut self, &mut self,
stream: TcpStream, mut stream: TcpStream,
address: SocketAddr, address: SocketAddr,
_: &mut Context<Self>, _: &mut Context<Self>,
) { ) -> Result<()> {
let taker_id = TakerId::default(); let taker_id = TakerId::default();
tracing::info!("New taker {} connected on {}", taker_id, address); tracing::info!("New taker {} connected on {}", taker_id, address);
let noise = Arc::new(Mutex::new(noise::responder_handshake(&mut stream).await?));
let (read, write) = stream.into_split(); let (read, write) = stream.into_split();
let read = FramedRead::new(read, wire::JsonCodec::default()) let read = FramedRead::new(read, wire::JsonCodec::new(noise.clone()))
.map_ok(move |msg| FromTaker { taker_id, msg }) .map_ok(move |msg| FromTaker { taker_id, msg })
.map(forward_only_ok::Message); .map(forward_only_ok::Message);
@ -115,7 +118,7 @@ impl Actor {
// only allow outgoing messages while we are successfully reading incoming ones // only allow outgoing messages while we are successfully reading incoming ones
tokio::spawn(async move { tokio::spawn(async move {
let mut actor = send_to_socket::Actor::new(write); let mut actor = send_to_socket::Actor::new(write, noise.clone());
out_msg_actor_context out_msg_actor_context
.handle_while(&mut actor, forward_to_cfd.attach_stream(read)) .handle_while(&mut actor, forward_to_cfd.attach_stream(read))
@ -133,6 +136,8 @@ impl Actor {
.new_taker_channel .new_taker_channel
.send(maker_cfd::NewTakerOnline { id: taker_id }) .send(maker_cfd::NewTakerOnline { id: taker_id })
.await; .await;
Ok(())
} }
} }
@ -210,8 +215,9 @@ impl Actor {
async fn handle(&mut self, msg: ListenerMessage, ctx: &mut Context<Self>) -> KeepRunning { async fn handle(&mut self, msg: ListenerMessage, ctx: &mut Context<Self>) -> KeepRunning {
match msg { match msg {
ListenerMessage::NewConnection { stream, address } => { ListenerMessage::NewConnection { stream, address } => {
self.handle_new_connection_impl(stream, address, ctx).await; if let Err(err) = self.handle_new_connection_impl(stream, address, ctx).await {
tracing::warn!("Maker was unable to negotiate a new connection: {}", err);
}
KeepRunning::Yes KeepRunning::Yes
} }
ListenerMessage::Error { source } => { ListenerMessage::Error { source } => {

72
daemon/src/noise.rs

@ -0,0 +1,72 @@
use anyhow::Result;
use snow::{Builder, TransportState};
use std::io;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
// TODO: investigate what these params do and whether we can use get authentication for free using
// them
static NOISE_PARAMS: &str = "Noise_XX_25519_ChaChaPoly_BLAKE2s";
pub async fn initiator_handshake(connection: &mut TcpStream) -> Result<TransportState> {
let builder: Builder<'_> = Builder::new(NOISE_PARAMS.parse()?);
let static_key = builder.generate_keypair()?.private;
let mut noise = builder.local_private_key(&static_key).build_initiator()?;
let mut buf = vec![0u8; 65535];
let len = noise.write_message(&[], &mut buf)?;
send(connection, &buf[..len]).await?;
noise.read_message(&recv(connection).await?, &mut buf)?;
let len = noise.write_message(&[], &mut buf)?;
send(connection, &buf[..len]).await?;
let noise = noise.into_transport_mode()?;
tracing::debug!("Noise protocol initiator handshake is complete");
Ok(noise)
}
pub async fn responder_handshake(connection: &mut TcpStream) -> Result<TransportState> {
let builder: Builder<'_> = Builder::new(NOISE_PARAMS.parse()?);
let static_key = builder.generate_keypair()?.private;
let mut noise = builder.local_private_key(&static_key).build_responder()?;
let mut buf = vec![0u8; 65535];
noise.read_message(&recv(connection).await?, &mut buf)?;
let len = noise.write_message(&[0u8; 0], &mut buf)?;
send(connection, &buf[..len]).await?;
noise.read_message(&recv(connection).await?, &mut buf)?;
let noise = noise.into_transport_mode()?;
tracing::debug!("Noise protocol responder handshake is complete");
Ok(noise)
}
/// Hyper-basic stream transport receiver. 16-bit BE size followed by payload.
async fn recv(stream: &mut TcpStream) -> io::Result<Vec<u8>> {
let mut msg_len_buf = [0u8; 2];
stream.read_exact(&mut msg_len_buf).await?;
let msg_len = ((msg_len_buf[0] as usize) << 8) + (msg_len_buf[1] as usize);
let mut msg = vec![0u8; msg_len];
stream.read_exact(&mut msg[..]).await?;
Ok(msg)
}
/// Hyper-basic stream transport sender. 16-bit BE size followed by payload.
async fn send(stream: &mut TcpStream, buf: &[u8]) -> Result<()> {
let msg_len_buf = [(buf.len() >> 8) as u8, (buf.len() & 0xff) as u8];
stream.write_all(&msg_len_buf).await?;
stream.write_all(buf).await?;
Ok(())
}

6
daemon/src/send_to_socket.rs

@ -1,7 +1,9 @@
use crate::wire::{self, JsonCodec}; use crate::wire::{self, JsonCodec};
use futures::SinkExt; use futures::SinkExt;
use serde::Serialize; use serde::Serialize;
use snow::TransportState;
use std::fmt; use std::fmt;
use std::sync::{Arc, Mutex};
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::net::tcp::OwnedWriteHalf; use tokio::net::tcp::OwnedWriteHalf;
use tokio_util::codec::FramedWrite; use tokio_util::codec::FramedWrite;
@ -12,9 +14,9 @@ pub struct Actor<T> {
} }
impl<T> Actor<T> { impl<T> Actor<T> {
pub fn new(write: OwnedWriteHalf) -> Self { pub fn new(write: OwnedWriteHalf, transport_state: Arc<Mutex<TransportState>>) -> Self {
Self { Self {
write: FramedWrite::new(write, JsonCodec::default()), write: FramedWrite::new(write, JsonCodec::new(transport_state)),
} }
} }

2
daemon/src/taker.rs

@ -168,7 +168,7 @@ async fn main() -> Result<()> {
let connection::Actor { let connection::Actor {
send_to_maker, send_to_maker,
read_from_maker, read_from_maker,
} = connection::Actor::new(opts.maker).await; } = connection::Actor::new(opts.maker).await?;
let TakerActorSystem { let TakerActorSystem {
cfd_actor_addr, cfd_actor_addr,

31
daemon/src/wire.rs

@ -9,10 +9,12 @@ use cfd_protocol::secp256k1_zkp::{EcdsaAdaptorSignature, SecretKey};
use cfd_protocol::{CfdTransactions, PartyParams, PunishParams}; use cfd_protocol::{CfdTransactions, PartyParams, PunishParams};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use snow::TransportState;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt; use std::fmt;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::ops::RangeInclusive; use std::ops::RangeInclusive;
use std::sync::{Arc, Mutex};
use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
@ -96,20 +98,22 @@ impl fmt::Display for MakerToTaker {
pub struct JsonCodec<T> { pub struct JsonCodec<T> {
_type: PhantomData<T>, _type: PhantomData<T>,
inner: LengthDelimitedCodec, inner: LengthDelimitedCodec,
transport_state: Arc<Mutex<TransportState>>,
} }
impl<T> Default for JsonCodec<T> { impl<T> JsonCodec<T> {
fn default() -> Self { pub fn new(transport_state: Arc<Mutex<TransportState>>) -> Self {
Self { Self {
_type: PhantomData, _type: PhantomData,
inner: LengthDelimitedCodec::new(), inner: LengthDelimitedCodec::new(),
transport_state,
} }
} }
} }
impl<T> Decoder for JsonCodec<T> impl<T> Decoder for JsonCodec<T>
where where
T: DeserializeOwned, T: DeserializeOwned + Sync,
{ {
type Item = T; type Item = T;
type Error = anyhow::Error; type Error = anyhow::Error;
@ -120,7 +124,15 @@ where
Some(bytes) => bytes, Some(bytes) => bytes,
}; };
let item = serde_json::from_slice(&bytes)?; let mut buf = vec![0u8; 65535];
let mut transport = self
.transport_state
.lock()
.expect("acquired mutex lock on Noise object to encrypt message");
let len = transport.read_message(&bytes, &mut buf)?;
let item = serde_json::from_slice(&buf[..len])?;
Ok(Some(item)) Ok(Some(item))
} }
@ -135,7 +147,16 @@ where
fn encode(&mut self, item: T, dst: &mut BytesMut) -> Result<(), Self::Error> { fn encode(&mut self, item: T, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = serde_json::to_vec(&item)?; let bytes = serde_json::to_vec(&item)?;
self.inner.encode(bytes.into(), dst)?; let mut buf = vec![0u8; 65535];
let mut transport = self
.transport_state
.lock()
.expect("acquired mutex lock on Noise object to encrypt message");
let len = transport.write_message(&bytes, &mut *buf)?;
let part: BytesMut = buf[..len].into();
self.inner.encode(part.into(), dst)?;
Ok(()) Ok(())
} }

4
daemon/tests/happy_path.rs

@ -327,7 +327,9 @@ impl Taker {
let connection::Actor { let connection::Actor {
send_to_maker, send_to_maker,
read_from_maker, read_from_maker,
} = connection::Actor::new(maker_address).await; } = connection::Actor::new(maker_address)
.await
.expect("Taker successfully initialised");
let db = in_memory_db().await; let db = in_memory_db().await;

Loading…
Cancel
Save