From 0b2d2ec3c2634a3e5fc1c5d17e1aea3b3d802a91 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 17 Nov 2021 09:34:07 +1100 Subject: [PATCH 1/3] Prefer tracing fields of stringified log message --- daemon/src/maker_inc_connections.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index 9ab2500..f710f68 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -119,7 +119,7 @@ impl Actor { ) -> Result<()> { let taker_id = TakerId::default(); - tracing::info!("New taker {} connected on {}", taker_id, taker_address); + tracing::info!(%taker_id, address = %taker_address, "New taker connected"); let noise = Arc::new(Mutex::new( noise::responder_handshake(&mut stream, &self.noise_priv_key).await?, From 8bda269e7a4f9ae2b0443d2f28a1b70070b2e6de Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 17 Nov 2021 09:35:37 +1100 Subject: [PATCH 2/3] Improve variable naming --- daemon/src/maker_inc_connections.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index f710f68..f291497 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -121,12 +121,11 @@ impl Actor { tracing::info!(%taker_id, address = %taker_address, "New taker connected"); - let noise = Arc::new(Mutex::new( - noise::responder_handshake(&mut stream, &self.noise_priv_key).await?, - )); + let transport_state = noise::responder_handshake(&mut stream, &self.noise_priv_key).await?; + let transport_state = Arc::new(Mutex::new(transport_state)); let (read, write) = stream.into_split(); - let read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise.clone())) + let read = FramedRead::new(read, wire::EncryptedJsonCodec::new(transport_state.clone())) .map_ok(move |msg| FromTaker { taker_id, msg }) .map(forward_only_ok::Message); @@ -142,7 +141,7 @@ impl Actor { let heartbeat_interval = self.heartbeat_interval; self.tasks.push( async move { - let mut actor = send_to_socket::Actor::new(write, noise.clone()); + let mut actor = send_to_socket::Actor::new(write, transport_state.clone()); let _heartbeat_handle = out_msg_actor_context .notify_interval(heartbeat_interval, || wire::MakerToTaker::Heartbeat) From 47bfc3a2a955efdf14bbe4644b4000b42b11c792 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 17 Nov 2021 09:46:56 +1100 Subject: [PATCH 3/3] Replace implementation of TakerId with public key This is a backwards-incompatible change in regards to the database because we previously serialized UUIDs and now expect to deserialize public keys. It is a first step towards #576. --- daemon/src/maker_inc_connections.rs | 5 +-- daemon/src/model.rs | 49 ++++++++++++++++++++++++----- daemon/src/noise.rs | 21 ++++++++++++- 3 files changed, 65 insertions(+), 10 deletions(-) diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index f291497..dec196f 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -1,6 +1,7 @@ use crate::maker_cfd::{FromTaker, NewTakerOnline}; use crate::model::cfd::{Order, OrderId}; use crate::model::{BitMexPriceEventId, TakerId}; +use crate::noise::TransportStateExt; use crate::tokio_ext::FutureExt; use crate::{forward_only_ok, maker_cfd, noise, send_to_socket, wire}; use anyhow::Result; @@ -117,11 +118,11 @@ impl Actor { taker_address: SocketAddr, _: &mut Context, ) -> Result<()> { - let taker_id = TakerId::default(); + let transport_state = noise::responder_handshake(&mut stream, &self.noise_priv_key).await?; + let taker_id = TakerId::new(transport_state.get_remote_public_key()?); tracing::info!(%taker_id, address = %taker_address, "New taker connected"); - let transport_state = noise::responder_handshake(&mut stream, &self.noise_priv_key).await?; let transport_state = Arc::new(Mutex::new(transport_state)); let (read, write) = stream.into_split(); diff --git a/daemon/src/model.rs b/daemon/src/model.rs index 6a6adf0..c224e69 100644 --- a/daemon/src/model.rs +++ b/daemon/src/model.rs @@ -5,6 +5,7 @@ use chrono::DateTime; use reqwest::Url; use rust_decimal::prelude::ToPrimitive; use rust_decimal::Decimal; +use serde::de::Error as _; use serde::{Deserialize, Serialize}; use serde_with::{DeserializeFromStr, SerializeDisplay}; use std::convert::TryInto; @@ -13,7 +14,6 @@ use std::ops::{Add, Div, Mul, Sub}; use std::time::{SystemTime, UNIX_EPOCH}; use std::{fmt, str}; use time::{OffsetDateTime, PrimitiveDateTime, Time}; -use uuid::Uuid; pub mod cfd; @@ -383,18 +383,41 @@ pub enum Position { Short, } -#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] -pub struct TakerId(Uuid); +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct TakerId(x25519_dalek::PublicKey); -impl Default for TakerId { - fn default() -> Self { - Self(Uuid::new_v4()) +impl TakerId { + pub fn new(key: x25519_dalek::PublicKey) -> Self { + Self(key) + } +} + +impl Serialize for TakerId { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.collect_str(self) + } +} + +impl<'de> Deserialize<'de> for TakerId { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let hex = String::deserialize(deserializer)?; + + let mut bytes = [0u8; 32]; + hex::decode_to_slice(&hex, &mut bytes).map_err(D::Error::custom)?; + + Ok(Self(x25519_dalek::PublicKey::from(bytes))) } } impl fmt::Display for TakerId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) + write!(f, "{}", hex::encode(self.0.as_bytes())) } } @@ -632,4 +655,16 @@ mod tests { assert_eq!(ts_b.seconds() - ts_a.seconds(), -36000); } + + #[test] + fn roundtrip_taker_id_serde() { + let id = TakerId::new(x25519_dalek::PublicKey::from([42u8; 32])); + + serde_test::assert_tokens( + &id, + &[serde_test::Token::String( + "2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a", + )], + ); + } } diff --git a/daemon/src/noise.rs b/daemon/src/noise.rs index f64661c..1021774 100644 --- a/daemon/src/noise.rs +++ b/daemon/src/noise.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use anyhow::{anyhow, Context, Result}; use snow::{Builder, TransportState}; use std::io; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -75,3 +75,22 @@ async fn send(stream: &mut TcpStream, buf: &[u8]) -> Result<()> { stream.write_all(buf).await?; Ok(()) } + +pub trait TransportStateExt { + /// Extract the remote's public key from this transport state. + fn get_remote_public_key(&self) -> Result; +} + +impl TransportStateExt for TransportState { + fn get_remote_public_key(&self) -> Result { + let public_key: [u8; 32] = self + .get_remote_static() + .context("No public key for remote connection")? + .to_vec() + .try_into() + .map_err(|_| anyhow!("Expected public key to be 32 bytes"))?; + let public_key = x25519_dalek::PublicKey::from(public_key); + + Ok(public_key) + } +}