Browse Source

Merge #601

601: Use the taker's public key for `TakerId` r=thomaseizinger a=thomaseizinger



Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
new-http-api
bors[bot] 3 years ago
committed by GitHub
parent
commit
e4036440b2
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      daemon/src/maker_inc_connections.rs
  2. 49
      daemon/src/model.rs
  3. 21
      daemon/src/noise.rs

14
daemon/src/maker_inc_connections.rs

@ -1,6 +1,7 @@
use crate::maker_cfd::{FromTaker, NewTakerOnline}; use crate::maker_cfd::{FromTaker, NewTakerOnline};
use crate::model::cfd::{Order, OrderId}; use crate::model::cfd::{Order, OrderId};
use crate::model::{BitMexPriceEventId, TakerId}; use crate::model::{BitMexPriceEventId, TakerId};
use crate::noise::TransportStateExt;
use crate::tokio_ext::FutureExt; use crate::tokio_ext::FutureExt;
use crate::{forward_only_ok, maker_cfd, noise, send_to_socket, wire}; use crate::{forward_only_ok, maker_cfd, noise, send_to_socket, wire};
use anyhow::Result; use anyhow::Result;
@ -117,16 +118,15 @@ impl Actor {
taker_address: SocketAddr, taker_address: SocketAddr,
_: &mut Context<Self>, _: &mut Context<Self>,
) -> Result<()> { ) -> Result<()> {
let taker_id = TakerId::default(); let transport_state = noise::responder_handshake(&mut stream, &self.noise_priv_key).await?;
let taker_id = TakerId::new(transport_state.get_remote_public_key()?);
tracing::info!("New taker {} connected on {}", taker_id, taker_address); tracing::info!(%taker_id, address = %taker_address, "New taker connected");
let noise = Arc::new(Mutex::new( let transport_state = Arc::new(Mutex::new(transport_state));
noise::responder_handshake(&mut stream, &self.noise_priv_key).await?,
));
let (read, write) = stream.into_split(); let (read, write) = stream.into_split();
let read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise.clone())) let read = FramedRead::new(read, wire::EncryptedJsonCodec::new(transport_state.clone()))
.map_ok(move |msg| FromTaker { taker_id, msg }) .map_ok(move |msg| FromTaker { taker_id, msg })
.map(forward_only_ok::Message); .map(forward_only_ok::Message);
@ -142,7 +142,7 @@ impl Actor {
let heartbeat_interval = self.heartbeat_interval; let heartbeat_interval = self.heartbeat_interval;
self.tasks.push( self.tasks.push(
async move { async move {
let mut actor = send_to_socket::Actor::new(write, noise.clone()); let mut actor = send_to_socket::Actor::new(write, transport_state.clone());
let _heartbeat_handle = out_msg_actor_context let _heartbeat_handle = out_msg_actor_context
.notify_interval(heartbeat_interval, || wire::MakerToTaker::Heartbeat) .notify_interval(heartbeat_interval, || wire::MakerToTaker::Heartbeat)

49
daemon/src/model.rs

@ -5,6 +5,7 @@ use chrono::DateTime;
use reqwest::Url; use reqwest::Url;
use rust_decimal::prelude::ToPrimitive; use rust_decimal::prelude::ToPrimitive;
use rust_decimal::Decimal; use rust_decimal::Decimal;
use serde::de::Error as _;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_with::{DeserializeFromStr, SerializeDisplay}; use serde_with::{DeserializeFromStr, SerializeDisplay};
use std::convert::TryInto; use std::convert::TryInto;
@ -13,7 +14,6 @@ use std::ops::{Add, Div, Mul, Sub};
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use std::{fmt, str}; use std::{fmt, str};
use time::{OffsetDateTime, PrimitiveDateTime, Time}; use time::{OffsetDateTime, PrimitiveDateTime, Time};
use uuid::Uuid;
pub mod cfd; pub mod cfd;
@ -383,18 +383,41 @@ pub enum Position {
Short, Short,
} }
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub struct TakerId(Uuid); pub struct TakerId(x25519_dalek::PublicKey);
impl Default for TakerId { impl TakerId {
fn default() -> Self { pub fn new(key: x25519_dalek::PublicKey) -> Self {
Self(Uuid::new_v4()) Self(key)
}
}
impl Serialize for TakerId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.collect_str(self)
}
}
impl<'de> Deserialize<'de> for TakerId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let hex = String::deserialize(deserializer)?;
let mut bytes = [0u8; 32];
hex::decode_to_slice(&hex, &mut bytes).map_err(D::Error::custom)?;
Ok(Self(x25519_dalek::PublicKey::from(bytes)))
} }
} }
impl fmt::Display for TakerId { impl fmt::Display for TakerId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f) write!(f, "{}", hex::encode(self.0.as_bytes()))
} }
} }
@ -632,4 +655,16 @@ mod tests {
assert_eq!(ts_b.seconds() - ts_a.seconds(), -36000); assert_eq!(ts_b.seconds() - ts_a.seconds(), -36000);
} }
#[test]
fn roundtrip_taker_id_serde() {
let id = TakerId::new(x25519_dalek::PublicKey::from([42u8; 32]));
serde_test::assert_tokens(
&id,
&[serde_test::Token::String(
"2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a",
)],
);
}
} }

21
daemon/src/noise.rs

@ -1,4 +1,4 @@
use anyhow::Result; use anyhow::{anyhow, Context, Result};
use snow::{Builder, TransportState}; use snow::{Builder, TransportState};
use std::io; use std::io;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
@ -75,3 +75,22 @@ async fn send(stream: &mut TcpStream, buf: &[u8]) -> Result<()> {
stream.write_all(buf).await?; stream.write_all(buf).await?;
Ok(()) Ok(())
} }
pub trait TransportStateExt {
/// Extract the remote's public key from this transport state.
fn get_remote_public_key(&self) -> Result<x25519_dalek::PublicKey>;
}
impl TransportStateExt for TransportState {
fn get_remote_public_key(&self) -> Result<x25519_dalek::PublicKey> {
let public_key: [u8; 32] = self
.get_remote_static()
.context("No public key for remote connection")?
.to_vec()
.try_into()
.map_err(|_| anyhow!("Expected public key to be 32 bytes"))?;
let public_key = x25519_dalek::PublicKey::from(public_key);
Ok(public_key)
}
}

Loading…
Cancel
Save