Browse Source

Merge #452

452: Encrypt network communication using the noise protocol r=rishflab a=rishflab

The messages which contain CET's do not fit into the
65535 byte message size limit of the noise protocol.
These messages are chunked to allow compatibility
with noise.

More research is required into the NOISE_PARAMS and
whether they can be used to configure snow elegantly
handle authentication.

Co-authored-by: rishflab <rishflab@hotmail.com>
burn-down-handle
bors[bot] 3 years ago
committed by GitHub
parent
commit
da0f605d6c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 265
      Cargo.lock
  2. 1
      daemon/Cargo.toml
  3. 23
      daemon/src/connection.rs
  4. 1
      daemon/src/lib.rs
  5. 20
      daemon/src/maker_inc_connections.rs
  6. 74
      daemon/src/noise.rs
  7. 10
      daemon/src/send_to_socket.rs
  8. 2
      daemon/src/taker.rs
  9. 53
      daemon/src/wire.rs
  10. 4
      daemon/tests/harness/mod.rs

265
Cargo.lock

@ -2,6 +2,41 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3 version = 3
[[package]]
name = "aead"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b613b8e1e3cf911a086f53f03bf286f52fd7a7258e4fa606f0ef220d39d8877"
dependencies = [
"generic-array",
]
[[package]]
name = "aes"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e8b47f52ea9bae42228d07ec09eb676433d7c4ed1ebdf0f1d1c29ed446f1ab8"
dependencies = [
"cfg-if 1.0.0",
"cipher",
"cpufeatures",
"opaque-debug",
]
[[package]]
name = "aes-gcm"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df5f85a83a7d8b0442b6aa7b504b8212c1733da07b98aae43d4bc21b2cb3cdf6"
dependencies = [
"aead",
"aes",
"cipher",
"ctr",
"ghash",
"subtle",
]
[[package]] [[package]]
name = "ahash" name = "ahash"
version = "0.7.4" version = "0.7.4"
@ -264,6 +299,17 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "blake2"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a4e37d16930f5459780f5621038b6382b9bb37c19016f39fb6b5808d831f174"
dependencies = [
"crypto-mac 0.8.0",
"digest",
"opaque-debug",
]
[[package]] [[package]]
name = "block-buffer" name = "block-buffer"
version = "0.9.0" version = "0.9.0"
@ -348,6 +394,31 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chacha20"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f08493fa7707effc63254c66c6ea908675912493cd67952eda23c09fae2610b1"
dependencies = [
"cfg-if 1.0.0",
"cipher",
"cpufeatures",
"zeroize",
]
[[package]]
name = "chacha20poly1305"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6547abe025f4027edacd9edaa357aded014eecec42a5070d9b885c3c334aba2"
dependencies = [
"aead",
"chacha20",
"cipher",
"poly1305",
"zeroize",
]
[[package]] [[package]]
name = "chrono" name = "chrono"
version = "0.4.19" version = "0.4.19"
@ -362,6 +433,15 @@ dependencies = [
"winapi 0.3.9", "winapi 0.3.9",
] ]
[[package]]
name = "cipher"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ee52072ec15386f770805afd189a01c8841be8696bed250fa2f13c4c0d6dfb7"
dependencies = [
"generic-array",
]
[[package]] [[package]]
name = "clap" name = "clap"
version = "3.0.0-beta.4" version = "3.0.0-beta.4"
@ -492,6 +572,16 @@ dependencies = [
"lazy_static", "lazy_static",
] ]
[[package]]
name = "crypto-mac"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b584a330336237c1eecd3e94266efb216c56ed91225d634cb2991c5f3fd1aeab"
dependencies = [
"generic-array",
"subtle",
]
[[package]] [[package]]
name = "crypto-mac" name = "crypto-mac"
version = "0.11.1" version = "0.11.1"
@ -512,6 +602,28 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "ctr"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "049bb91fb4aaf0e3c7efa6cd5ef877dbbbd15b39dad06d9948de4ec8a75761ea"
dependencies = [
"cipher",
]
[[package]]
name = "curve25519-dalek"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b9fdf9972b2bd6af2d913799d9ebc165ea4d2e65878e329d9c6b372c4491b61"
dependencies = [
"byteorder",
"digest",
"rand_core 0.5.1",
"subtle",
"zeroize",
]
[[package]] [[package]]
name = "daemon" name = "daemon"
version = "0.1.0" version = "0.1.0"
@ -548,6 +660,7 @@ dependencies = [
"serde_test", "serde_test",
"serde_with", "serde_with",
"sha2", "sha2",
"snow",
"sqlx", "sqlx",
"thiserror", "thiserror",
"time 0.3.4", "time 0.3.4",
@ -975,6 +1088,16 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "ghash"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1583cc1656d7839fd3732b80cf4f38850336cdb9b8ded1cd399ca62958de3c99"
dependencies = [
"opaque-debug",
"polyval",
]
[[package]] [[package]]
name = "git2" name = "git2"
version = "0.13.23" version = "0.13.23"
@ -1071,7 +1194,7 @@ version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a2a2320eb7ec0ebe8da8f744d7812d9fc4cb4d09344ac01898dbcb6a20ae69b" checksum = "2a2a2320eb7ec0ebe8da8f744d7812d9fc4cb4d09344ac01898dbcb6a20ae69b"
dependencies = [ dependencies = [
"crypto-mac", "crypto-mac 0.11.1",
"digest", "digest",
] ]
@ -1687,6 +1810,15 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pest"
version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10f4872ae94d7b90ae48754df22fd42ad52ce740b8f370b03da4835417403e53"
dependencies = [
"ucd-trie",
]
[[package]] [[package]]
name = "pin-project" name = "pin-project"
version = "1.0.8" version = "1.0.8"
@ -1731,6 +1863,29 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb20dcc30536a1508e75d47dd0e399bb2fe7354dcf35cda9127f2bf1ed92e30e" checksum = "bb20dcc30536a1508e75d47dd0e399bb2fe7354dcf35cda9127f2bf1ed92e30e"
[[package]]
name = "poly1305"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "048aeb476be11a4b6ca432ca569e375810de9294ae78f4774e78ea98a9246ede"
dependencies = [
"cpufeatures",
"opaque-debug",
"universal-hash",
]
[[package]]
name = "polyval"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8419d2b623c7c0896ff2d5d96e2cb4ede590fed28fcc34934f4c33c036e620a1"
dependencies = [
"cfg-if 1.0.0",
"cpufeatures",
"opaque-debug",
"universal-hash",
]
[[package]] [[package]]
name = "ppv-lite86" name = "ppv-lite86"
version = "0.2.10" version = "0.2.10"
@ -2352,13 +2507,22 @@ dependencies = [
"rust_decimal", "rust_decimal",
] ]
[[package]]
name = "rustc_version"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0dfe2087c51c460008730de8b57e6a320782fbfb312e1f4d520e6c6fae155ee"
dependencies = [
"semver 0.11.0",
]
[[package]] [[package]]
name = "rustc_version" name = "rustc_version"
version = "0.4.0" version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
dependencies = [ dependencies = [
"semver", "semver 1.0.4",
] ]
[[package]] [[package]]
@ -2525,12 +2689,30 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "semver"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6"
dependencies = [
"semver-parser",
]
[[package]] [[package]]
name = "semver" name = "semver"
version = "1.0.4" version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "568a8e6258aa33c13358f81fd834adb854c6f7c9468520910a9b1e8fac068012" checksum = "568a8e6258aa33c13358f81fd834adb854c6f7c9468520910a9b1e8fac068012"
[[package]]
name = "semver-parser"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0bef5b7f9e0df16536d3961cfb6e84331c065b4066afb39768d0e319411f7"
dependencies = [
"pest",
]
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.130" version = "1.0.130"
@ -2685,6 +2867,23 @@ version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309" checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309"
[[package]]
name = "snow"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6142f7c25e94f6fd25a32c3348ec230df9109b463f59c8c7acc4bd34936babb7"
dependencies = [
"aes-gcm",
"blake2",
"chacha20poly1305",
"rand 0.8.4",
"rand_core 0.6.3",
"rustc_version 0.3.3",
"sha2",
"subtle",
"x25519-dalek",
]
[[package]] [[package]]
name = "socket2" name = "socket2"
version = "0.4.2" version = "0.4.2"
@ -2891,6 +3090,18 @@ dependencies = [
"unicode-xid", "unicode-xid",
] ]
[[package]]
name = "synstructure"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f"
dependencies = [
"proc-macro2",
"quote",
"syn",
"unicode-xid",
]
[[package]] [[package]]
name = "sysinfo" name = "sysinfo"
version = "0.19.2" version = "0.19.2"
@ -3255,6 +3466,12 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "ucd-trie"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56dee185309b50d1f11bfedef0fe6d036842e3fb77413abef29f8f8d1c5d4c1c"
[[package]] [[package]]
name = "uncased" name = "uncased"
version = "0.9.6" version = "0.9.6"
@ -3310,6 +3527,16 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e"
[[package]]
name = "universal-hash"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f214e8f697e925001e66ec2c6e37a4ef93f0f78c2eed7814394e10c62025b05"
dependencies = [
"generic-array",
"subtle",
]
[[package]] [[package]]
name = "untrusted" name = "untrusted"
version = "0.7.1" version = "0.7.1"
@ -3368,7 +3595,7 @@ dependencies = [
"enum-iterator", "enum-iterator",
"getset", "getset",
"git2", "git2",
"rustc_version", "rustc_version 0.4.0",
"rustversion", "rustversion",
"sysinfo", "sysinfo",
"thiserror", "thiserror",
@ -3599,6 +3826,17 @@ dependencies = [
"winapi-build", "winapi-build",
] ]
[[package]]
name = "x25519-dalek"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2392b6b94a576b4e2bf3c5b2757d63f10ada8020a2e4d08ac849ebcf6ea8e077"
dependencies = [
"curve25519-dalek",
"rand_core 0.5.1",
"zeroize",
]
[[package]] [[package]]
name = "xtra" name = "xtra"
version = "0.6.0" version = "0.6.0"
@ -3633,3 +3871,24 @@ name = "yansi"
version = "0.5.0" version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fc79f4a1e39857fc00c3f662cbf2651c771f00e9c15fe2abc341806bd46bd71" checksum = "9fc79f4a1e39857fc00c3f662cbf2651c771f00e9c15fe2abc341806bd46bd71"
[[package]]
name = "zeroize"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4756f7db3f7b5574938c3eb1c117038b8e07f95ee6718c0efad4ac21508f1efd"
dependencies = [
"zeroize_derive",
]
[[package]]
name = "zeroize_derive"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdff2024a851a322b08f179173ae2ba620445aef1e838f0c196820eade4ae0c7"
dependencies = [
"proc-macro2",
"quote",
"syn",
"synstructure",
]

1
daemon/Cargo.toml

@ -32,6 +32,7 @@ serde_json = "1"
serde_plain = "1" serde_plain = "1"
serde_with = { version = "1", features = ["macros"] } serde_with = { version = "1", features = ["macros"] }
sha2 = "0.9" sha2 = "0.9"
snow = "0.8"
sqlx = { version = "0.5", features = ["offline", "sqlite", "uuid", "runtime-tokio-rustls"] } sqlx = { version = "0.5", features = ["offline", "sqlite", "uuid", "runtime-tokio-rustls"] }
thiserror = "1" thiserror = "1"
time = { version = "0.3", features = ["serde"] } time = { version = "0.3", features = ["serde"] }

23
daemon/src/connection.rs

@ -1,6 +1,8 @@
use crate::{send_to_socket, taker_cfd, wire}; use crate::{noise, send_to_socket, taker_cfd, wire};
use anyhow::Result;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use tokio_util::codec::FramedRead; use tokio_util::codec::FramedRead;
use xtra::prelude::MessageChannel; use xtra::prelude::MessageChannel;
@ -15,11 +17,14 @@ pub struct Actor {
} }
impl Actor { impl Actor {
pub async fn new(maker: SocketAddr) -> Self { pub async fn new(maker: SocketAddr) -> Result<Self> {
let (read, write) = loop { let (read, write, noise) = loop {
let socket = tokio::net::TcpSocket::new_v4().expect("Be able ta create a socket"); let socket = tokio::net::TcpSocket::new_v4().expect("Be able ta create a socket");
if let Ok(connection) = socket.connect(maker).await { if let Ok(mut connection) = socket.connect(maker).await {
break connection.into_split(); let noise = noise::initiator_handshake(&mut connection).await?;
let (read, write) = connection.into_split();
break (read, write, Arc::new(Mutex::new(noise)));
} else { } else {
tracing::warn!( tracing::warn!(
"Could not connect to the maker, retrying in {}s ...", "Could not connect to the maker, retrying in {}s ...",
@ -29,16 +34,16 @@ impl Actor {
} }
}; };
let send_to_maker = send_to_socket::Actor::new(write) let send_to_maker = send_to_socket::Actor::new(write, noise.clone())
.create(None) .create(None)
.spawn_global(); .spawn_global();
let read = FramedRead::new(read, wire::JsonCodec::default()) let read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise))
.map(move |item| taker_cfd::MakerStreamMessage { item }); .map(move |item| taker_cfd::MakerStreamMessage { item });
Self { Ok(Self {
send_to_maker: Box::new(send_to_maker), send_to_maker: Box::new(send_to_maker),
read_from_maker: Box::new(read), read_from_maker: Box::new(read),
} })
} }
} }

1
daemon/src/lib.rs

@ -30,6 +30,7 @@ pub mod maker_cfd;
pub mod maker_inc_connections; pub mod maker_inc_connections;
pub mod model; pub mod model;
pub mod monitor; pub mod monitor;
mod noise;
pub mod olivia; pub mod olivia;
pub mod oracle; pub mod oracle;
pub mod payout_curve; pub mod payout_curve;

20
daemon/src/maker_inc_connections.rs

@ -1,12 +1,13 @@
use crate::maker_cfd::{FromTaker, NewTakerOnline}; use crate::maker_cfd::{FromTaker, NewTakerOnline};
use crate::model::cfd::{Order, OrderId}; use crate::model::cfd::{Order, OrderId};
use crate::model::{BitMexPriceEventId, TakerId}; use crate::model::{BitMexPriceEventId, TakerId};
use crate::{forward_only_ok, maker_cfd, send_to_socket, wire}; use crate::{forward_only_ok, maker_cfd, noise, send_to_socket, wire};
use anyhow::{Context as AnyhowContext, Result}; use anyhow::{Context as AnyhowContext, Result};
use futures::{StreamExt, TryStreamExt}; use futures::{StreamExt, TryStreamExt};
use std::collections::HashMap; use std::collections::HashMap;
use std::io; use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio_util::codec::FramedRead; use tokio_util::codec::FramedRead;
use xtra::prelude::*; use xtra::prelude::*;
@ -94,16 +95,18 @@ impl Actor {
async fn handle_new_connection_impl( async fn handle_new_connection_impl(
&mut self, &mut self,
stream: TcpStream, mut stream: TcpStream,
address: SocketAddr, address: SocketAddr,
_: &mut Context<Self>, _: &mut Context<Self>,
) { ) -> Result<()> {
let taker_id = TakerId::default(); let taker_id = TakerId::default();
tracing::info!("New taker {} connected on {}", taker_id, address); tracing::info!("New taker {} connected on {}", taker_id, address);
let noise = Arc::new(Mutex::new(noise::responder_handshake(&mut stream).await?));
let (read, write) = stream.into_split(); let (read, write) = stream.into_split();
let read = FramedRead::new(read, wire::JsonCodec::default()) let read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise.clone()))
.map_ok(move |msg| FromTaker { taker_id, msg }) .map_ok(move |msg| FromTaker { taker_id, msg })
.map(forward_only_ok::Message); .map(forward_only_ok::Message);
@ -115,7 +118,7 @@ impl Actor {
// only allow outgoing messages while we are successfully reading incoming ones // only allow outgoing messages while we are successfully reading incoming ones
tokio::spawn(async move { tokio::spawn(async move {
let mut actor = send_to_socket::Actor::new(write); let mut actor = send_to_socket::Actor::new(write, noise.clone());
out_msg_actor_context out_msg_actor_context
.handle_while(&mut actor, forward_to_cfd.attach_stream(read)) .handle_while(&mut actor, forward_to_cfd.attach_stream(read))
@ -133,6 +136,8 @@ impl Actor {
.new_taker_channel .new_taker_channel
.send(maker_cfd::NewTakerOnline { id: taker_id }) .send(maker_cfd::NewTakerOnline { id: taker_id })
.await; .await;
Ok(())
} }
} }
@ -210,8 +215,9 @@ impl Actor {
async fn handle(&mut self, msg: ListenerMessage, ctx: &mut Context<Self>) -> KeepRunning { async fn handle(&mut self, msg: ListenerMessage, ctx: &mut Context<Self>) -> KeepRunning {
match msg { match msg {
ListenerMessage::NewConnection { stream, address } => { ListenerMessage::NewConnection { stream, address } => {
self.handle_new_connection_impl(stream, address, ctx).await; if let Err(err) = self.handle_new_connection_impl(stream, address, ctx).await {
tracing::warn!("Maker was unable to negotiate a new connection: {}", err);
}
KeepRunning::Yes KeepRunning::Yes
} }
ListenerMessage::Error { source } => { ListenerMessage::Error { source } => {

74
daemon/src/noise.rs

@ -0,0 +1,74 @@
use anyhow::Result;
use snow::{Builder, TransportState};
use std::io;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
pub static NOISE_MAX_MSG_LEN: u32 = 65535;
pub static NOISE_TAG_LEN: u32 = 16;
// TODO: investigate what these params do and whether we can use get authentication for free using
// them
static NOISE_PARAMS: &str = "Noise_XX_25519_ChaChaPoly_BLAKE2s";
pub async fn initiator_handshake(connection: &mut TcpStream) -> Result<TransportState> {
let builder: Builder<'_> = Builder::new(NOISE_PARAMS.parse()?);
let static_key = builder.generate_keypair()?.private;
let mut noise = builder.local_private_key(&static_key).build_initiator()?;
let mut buf = vec![0u8; NOISE_MAX_MSG_LEN as usize];
let len = noise.write_message(&[], &mut buf)?;
send(connection, &buf[..len]).await?;
noise.read_message(&recv(connection).await?, &mut buf)?;
let len = noise.write_message(&[], &mut buf)?;
send(connection, &buf[..len]).await?;
let noise = noise.into_transport_mode()?;
tracing::debug!("Noise protocol initiator handshake is complete");
Ok(noise)
}
pub async fn responder_handshake(connection: &mut TcpStream) -> Result<TransportState> {
let builder: Builder<'_> = Builder::new(NOISE_PARAMS.parse()?);
let static_key = builder.generate_keypair()?.private;
let mut noise = builder.local_private_key(&static_key).build_responder()?;
let mut buf = vec![0u8; NOISE_MAX_MSG_LEN as usize];
noise.read_message(&recv(connection).await?, &mut buf)?;
let len = noise.write_message(&[0u8; 0], &mut buf)?;
send(connection, &buf[..len]).await?;
noise.read_message(&recv(connection).await?, &mut buf)?;
let noise = noise.into_transport_mode()?;
tracing::debug!("Noise protocol responder handshake is complete");
Ok(noise)
}
/// Hyper-basic stream transport receiver. 16-bit BE size followed by payload.
async fn recv(stream: &mut TcpStream) -> io::Result<Vec<u8>> {
let mut msg_len_buf = [0u8; 2];
stream.read_exact(&mut msg_len_buf).await?;
let msg_len = ((msg_len_buf[0] as usize) << 8) + (msg_len_buf[1] as usize);
let mut msg = vec![0u8; msg_len];
stream.read_exact(&mut msg[..]).await?;
Ok(msg)
}
/// Hyper-basic stream transport sender. 16-bit BE size followed by payload.
async fn send(stream: &mut TcpStream, buf: &[u8]) -> Result<()> {
let msg_len_buf = [(buf.len() >> 8) as u8, (buf.len() & 0xff) as u8];
stream.write_all(&msg_len_buf).await?;
stream.write_all(buf).await?;
Ok(())
}

10
daemon/src/send_to_socket.rs

@ -1,20 +1,22 @@
use crate::wire::{self, JsonCodec}; use crate::wire::{self, EncryptedJsonCodec};
use futures::SinkExt; use futures::SinkExt;
use serde::Serialize; use serde::Serialize;
use snow::TransportState;
use std::fmt; use std::fmt;
use std::sync::{Arc, Mutex};
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::net::tcp::OwnedWriteHalf; use tokio::net::tcp::OwnedWriteHalf;
use tokio_util::codec::FramedWrite; use tokio_util::codec::FramedWrite;
use xtra::{Handler, Message}; use xtra::{Handler, Message};
pub struct Actor<T> { pub struct Actor<T> {
write: FramedWrite<OwnedWriteHalf, JsonCodec<T>>, write: FramedWrite<OwnedWriteHalf, EncryptedJsonCodec<T>>,
} }
impl<T> Actor<T> { impl<T> Actor<T> {
pub fn new(write: OwnedWriteHalf) -> Self { pub fn new(write: OwnedWriteHalf, transport_state: Arc<Mutex<TransportState>>) -> Self {
Self { Self {
write: FramedWrite::new(write, JsonCodec::default()), write: FramedWrite::new(write, EncryptedJsonCodec::new(transport_state)),
} }
} }

2
daemon/src/taker.rs

@ -168,7 +168,7 @@ async fn main() -> Result<()> {
let connection::Actor { let connection::Actor {
send_to_maker, send_to_maker,
read_from_maker, read_from_maker,
} = connection::Actor::new(opts.maker).await; } = connection::Actor::new(opts.maker).await?;
let TakerActorSystem { let TakerActorSystem {
cfd_actor_addr, cfd_actor_addr,

53
daemon/src/wire.rs

@ -1,5 +1,6 @@
use crate::model::cfd::{Order, OrderId}; use crate::model::cfd::{Order, OrderId};
use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd}; use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd};
use crate::noise::{NOISE_MAX_MSG_LEN, NOISE_TAG_LEN};
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use bdk::bitcoin::secp256k1::Signature; use bdk::bitcoin::secp256k1::Signature;
use bdk::bitcoin::util::psbt::PartiallySignedTransaction; use bdk::bitcoin::util::psbt::PartiallySignedTransaction;
@ -9,10 +10,12 @@ use cfd_protocol::secp256k1_zkp::{EcdsaAdaptorSignature, SecretKey};
use cfd_protocol::{CfdTransactions, PartyParams, PunishParams}; use cfd_protocol::{CfdTransactions, PartyParams, PunishParams};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use snow::TransportState;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt; use std::fmt;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::ops::RangeInclusive; use std::ops::RangeInclusive;
use std::sync::{Arc, Mutex};
use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
@ -93,21 +96,23 @@ impl fmt::Display for MakerToTaker {
} }
} }
pub struct JsonCodec<T> { pub struct EncryptedJsonCodec<T> {
_type: PhantomData<T>, _type: PhantomData<T>,
inner: LengthDelimitedCodec, inner: LengthDelimitedCodec,
transport_state: Arc<Mutex<TransportState>>,
} }
impl<T> Default for JsonCodec<T> { impl<T> EncryptedJsonCodec<T> {
fn default() -> Self { pub fn new(transport_state: Arc<Mutex<TransportState>>) -> Self {
Self { Self {
_type: PhantomData, _type: PhantomData,
inner: LengthDelimitedCodec::new(), inner: LengthDelimitedCodec::new(),
transport_state,
} }
} }
} }
impl<T> Decoder for JsonCodec<T> impl<T> Decoder for EncryptedJsonCodec<T>
where where
T: DeserializeOwned, T: DeserializeOwned,
{ {
@ -120,13 +125,30 @@ where
Some(bytes) => bytes, Some(bytes) => bytes,
}; };
let item = serde_json::from_slice(&bytes)?; let mut transport = self
.transport_state
.lock()
.expect("acquired mutex lock on Noise object to encrypt message");
let decrypted = bytes
.chunks(NOISE_MAX_MSG_LEN as usize)
.map(|chunk| {
let mut buf = vec![0u8; chunk.len() - NOISE_TAG_LEN as usize];
transport.read_message(chunk, &mut *buf)?;
Ok(buf)
})
.collect::<Result<Vec<Vec<u8>>>>()?
.into_iter()
.flatten()
.collect::<Vec<u8>>();
let item = serde_json::from_slice(&decrypted)?;
Ok(Some(item)) Ok(Some(item))
} }
} }
impl<T> Encoder<T> for JsonCodec<T> impl<T> Encoder<T> for EncryptedJsonCodec<T>
where where
T: Serialize, T: Serialize,
{ {
@ -135,7 +157,24 @@ where
fn encode(&mut self, item: T, dst: &mut BytesMut) -> Result<(), Self::Error> { fn encode(&mut self, item: T, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = serde_json::to_vec(&item)?; let bytes = serde_json::to_vec(&item)?;
self.inner.encode(bytes.into(), dst)?; let mut transport = self
.transport_state
.lock()
.expect("acquired mutex lock on Noise object to encrypt message");
let encrypted = bytes
.chunks((NOISE_MAX_MSG_LEN - NOISE_TAG_LEN) as usize)
.map(|chunk| {
let mut buf = vec![0u8; chunk.len() + NOISE_TAG_LEN as usize];
transport.write_message(chunk, &mut *buf)?;
Ok(buf)
})
.collect::<Result<Vec<Vec<u8>>>>()?
.into_iter()
.flatten()
.collect::<Vec<u8>>();
self.inner.encode(encrypted.into(), dst)?;
Ok(()) Ok(())
} }

4
daemon/tests/harness/mod.rs

@ -130,7 +130,9 @@ impl Taker {
let connection::Actor { let connection::Actor {
send_to_maker, send_to_maker,
read_from_maker, read_from_maker,
} = connection::Actor::new(maker_address).await; } = connection::Actor::new(maker_address)
.await
.expect("Connected to maker");
let db = in_memory_db().await; let db = in_memory_db().await;

Loading…
Cancel
Save