From 287ad4788d06039604033c4ac5c13589f2d54cd6 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Thu, 2 Dec 2021 14:39:25 +1100 Subject: [PATCH 1/3] Move framed read and write into connection setup This enables us read/write messages before creating actors. --- daemon/src/connection.rs | 17 ++++++++--------- daemon/src/maker_inc_connections.rs | 7 ++++--- daemon/src/send_to_socket.rs | 8 ++------ 3 files changed, 14 insertions(+), 18 deletions(-) diff --git a/daemon/src/connection.rs b/daemon/src/connection.rs index 795d73e..950fe1e 100644 --- a/daemon/src/connection.rs +++ b/daemon/src/connection.rs @@ -2,6 +2,7 @@ use crate::address_map::{AddressMap, Stopping}; use crate::model::cfd::OrderId; use crate::model::{Price, Timestamp, Usd}; use crate::tokio_ext::FutureExt; +use crate::wire::EncryptedJsonCodec; use crate::{collab_settlement_taker, log_error, noise, send_to_socket, setup_taker, wire, Tasks}; use anyhow::{Context, Result}; use bdk::bitcoin::Amount; @@ -12,7 +13,7 @@ use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; use tokio::net::TcpStream; use tokio::sync::watch; -use tokio_util::codec::FramedRead; +use tokio_util::codec::{FramedRead, FramedWrite}; use xtra::prelude::MessageChannel; use xtra::KeepRunning; use xtra_productivity::xtra_productivity; @@ -193,17 +194,15 @@ impl Actor { (read, write, Arc::new(Mutex::new(noise))) }; - let send_to_socket = send_to_socket::Actor::new(write, noise.clone()); + let read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise.clone())); + let write = FramedWrite::new(write, EncryptedJsonCodec::new(noise)); + let this = ctx.address().expect("self to be alive"); + + let send_to_socket = send_to_socket::Actor::new(write); let mut tasks = Tasks::default(); tasks.add(self.send_to_maker_ctx.attach(send_to_socket)); - - let read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise)) - .map(move |item| MakerStreamMessage { item }); - - let this = ctx.address().expect("self to be alive"); - tasks.add(this.attach_stream(read)); - + tasks.add(this.attach_stream(read.map(move |item| MakerStreamMessage { item }))); tasks.add( ctx.notify_interval(self.heartbeat_timeout, || MeasurePulse) .expect("we just started"), diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index c26baf3..04dd439 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -3,6 +3,7 @@ use crate::maker_cfd::{FromTaker, TakerConnected, TakerDisconnected}; use crate::model::cfd::{Order, OrderId}; use crate::model::Identity; use crate::noise::TransportStateExt; +use crate::wire::EncryptedJsonCodec; use crate::{maker_cfd, noise, send_to_socket, setup_maker, wire, Tasks}; use anyhow::Result; use futures::TryStreamExt; @@ -12,7 +13,7 @@ use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::net::TcpStream; -use tokio_util::codec::FramedRead; +use tokio_util::codec::{FramedRead, FramedWrite}; use xtra::prelude::*; use xtra::KeepRunning; use xtra_productivity::xtra_productivity; @@ -128,7 +129,7 @@ impl Actor { let (read, write) = stream.into_split(); let mut read = FramedRead::new(read, wire::EncryptedJsonCodec::new(transport_state.clone())); - + let write = FramedWrite::new(write, EncryptedJsonCodec::new(transport_state)); let this = ctx.address().expect("self to be alive"); let read_fut = async move { while let Ok(Some(msg)) = read.try_next().await { @@ -143,7 +144,7 @@ impl Actor { }; let (out_msg, mut out_msg_actor_context) = xtra::Context::new(None); - let send_to_socket_actor = send_to_socket::Actor::new(write, transport_state.clone()); + let send_to_socket_actor = send_to_socket::Actor::new(write); let heartbeat_fut = out_msg_actor_context .notify_interval(self.heartbeat_interval, || wire::MakerToTaker::Heartbeat) diff --git a/daemon/src/send_to_socket.rs b/daemon/src/send_to_socket.rs index 3fc899f..a87b005 100644 --- a/daemon/src/send_to_socket.rs +++ b/daemon/src/send_to_socket.rs @@ -1,9 +1,7 @@ use crate::wire::{self, EncryptedJsonCodec}; use futures::SinkExt; use serde::Serialize; -use snow::TransportState; use std::fmt; -use std::sync::{Arc, Mutex}; use tokio::io::AsyncWriteExt; use tokio::net::tcp::OwnedWriteHalf; use tokio_util::codec::FramedWrite; @@ -14,10 +12,8 @@ pub struct Actor { } impl Actor { - pub fn new(write: OwnedWriteHalf, transport_state: Arc>) -> Self { - Self { - write: FramedWrite::new(write, EncryptedJsonCodec::new(transport_state)), - } + pub fn new(write: FramedWrite>) -> Self { + Self { write } } pub async fn shutdown(self) { From 302c1be86b5f066e54f7366d10737afd9d785b31 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Thu, 2 Dec 2021 14:57:57 +1100 Subject: [PATCH 2/3] Use identity more consistently --- daemon/src/connection.rs | 19 +++++++++++-------- daemon/src/model.rs | 4 ++++ daemon/src/taker.rs | 4 ++-- daemon/tests/happy_path.rs | 4 ++-- daemon/tests/harness/mod.rs | 15 +++++---------- 5 files changed, 24 insertions(+), 22 deletions(-) diff --git a/daemon/src/connection.rs b/daemon/src/connection.rs index 950fe1e..3680d95 100644 --- a/daemon/src/connection.rs +++ b/daemon/src/connection.rs @@ -1,6 +1,6 @@ use crate::address_map::{AddressMap, Stopping}; use crate::model::cfd::OrderId; -use crate::model::{Price, Timestamp, Usd}; +use crate::model::{Identity, Price, Timestamp, Usd}; use crate::tokio_ext::FutureExt; use crate::wire::EncryptedJsonCodec; use crate::{collab_settlement_taker, log_error, noise, send_to_socket, setup_taker, wire, Tasks}; @@ -41,7 +41,7 @@ pub struct Actor { } pub struct Connect { - pub maker_identity_pk: x25519_dalek::PublicKey, + pub maker_identity: Identity, pub maker_addr: SocketAddr, } @@ -168,7 +168,7 @@ impl Actor { &mut self, Connect { maker_addr, - maker_identity_pk, + maker_identity, }: Connect, ctx: &mut xtra::Context, ) -> Result<()> { @@ -186,9 +186,12 @@ impl Actor { ) })? .with_context(|| format!("Failed to connect to {}", maker_addr))?; - let noise = - noise::initiator_handshake(&mut connection, &self.identity_sk, &maker_identity_pk) - .await?; + let noise = noise::initiator_handshake( + &mut connection, + &self.identity_sk, + &maker_identity.pk(), + ) + .await?; let (read, write) = connection.into_split(); (read, write, Arc::new(Mutex::new(noise))) @@ -318,7 +321,7 @@ impl xtra::Actor for Actor {} pub async fn connect( mut maker_online_status_feed_receiver: watch::Receiver, connection_actor_addr: xtra::Address, - maker_identity_pk: x25519_dalek::PublicKey, + maker_identity: Identity, maker_addresses: Vec, ) { loop { @@ -327,7 +330,7 @@ pub async fn connect( 'connect: loop { for address in &maker_addresses { let connect_msg = Connect { - maker_identity_pk, + maker_identity, maker_addr: *address, }; diff --git a/daemon/src/model.rs b/daemon/src/model.rs index 26fa746..170116d 100644 --- a/daemon/src/model.rs +++ b/daemon/src/model.rs @@ -395,6 +395,10 @@ impl Identity { pub fn new(key: x25519_dalek::PublicKey) -> Self { Self(key) } + + pub fn pk(&self) -> x25519_dalek::PublicKey { + self.0 + } } impl Serialize for Identity { diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 03b217a..9f93899 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -6,7 +6,7 @@ use clap::{Parser, Subcommand}; use daemon::connection::connect; use daemon::db::load_all_cfds; use daemon::model::cfd::Role; -use daemon::model::WalletInfo; +use daemon::model::{Identity, WalletInfo}; use daemon::seed::Seed; use daemon::tokio_ext::FutureExt; use daemon::{ @@ -274,7 +274,7 @@ async fn main() -> Result<()> { tasks.add(connect( maker_online_status_feed_receiver.clone(), connection_actor_addr, - opts.maker_id, + Identity::new(opts.maker_id), possible_addresses, )); diff --git a/daemon/tests/happy_path.rs b/daemon/tests/happy_path.rs index 2b01383..c80a29f 100644 --- a/daemon/tests/happy_path.rs +++ b/daemon/tests/happy_path.rs @@ -177,7 +177,7 @@ async fn taker_notices_lack_of_maker() { let taker_config = TakerConfig::default().with_heartbeat_timeout(short_interval * 2); - let mut taker = Taker::start(&taker_config, maker.listen_addr, maker.identity_pk).await; + let mut taker = Taker::start(&taker_config, maker.listen_addr, maker.identity).await; assert_eq!( ConnectionStatus::Online, @@ -237,7 +237,7 @@ async fn start_from_open_cfd_state() -> (Maker, Taker, OrderId) { let mut taker = Taker::start( &TakerConfig::default().with_heartbeat_timeout(heartbeat_interval * 2), maker.listen_addr, - maker.identity_pk, + maker.identity, ) .await; diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index ae8901b..d6c4eec 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -44,12 +44,7 @@ fn oracle_pk() -> schnorrsig::PublicKey { pub async fn start_both() -> (Maker, Taker) { let maker_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let maker = Maker::start(&MakerConfig::default(), maker_listener).await; - let taker = Taker::start( - &TakerConfig::default(), - maker.listen_addr, - maker.identity_pk, - ) - .await; + let taker = Taker::start(&TakerConfig::default(), maker.listen_addr, maker.identity).await; (maker, taker) } @@ -115,7 +110,7 @@ pub struct Maker { pub mocks: mocks::Mocks, pub feeds: Feeds, pub listen_addr: SocketAddr, - pub identity_pk: x25519_dalek::PublicKey, + pub identity: model::Identity, _tasks: Tasks, } @@ -195,7 +190,7 @@ impl Maker { Self { system: maker, feeds, - identity_pk, + identity: model::Identity::new(identity_pk), listen_addr: address, mocks, _tasks: tasks, @@ -266,7 +261,7 @@ impl Taker { pub async fn start( config: &TakerConfig, maker_address: SocketAddr, - maker_noise_pub_key: x25519_dalek::PublicKey, + maker_identity: model::Identity, ) -> Self { let (identity_pk, identity_sk) = config.seed.derive_identity(); @@ -306,7 +301,7 @@ impl Taker { tasks.add(connect( taker.maker_online_status_feed_receiver.clone(), taker.connection_actor_addr.clone(), - maker_noise_pub_key, + maker_identity, vec![maker_address], )); From fbdc6c5e77fef4523b00f3043c71d1d660a56f1c Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Thu, 2 Dec 2021 15:00:51 +1100 Subject: [PATCH 3/3] Hello message --- Cargo.lock | 4 ++ daemon/Cargo.toml | 1 + daemon/src/connection.rs | 77 +++++++++++++++++++---- daemon/src/lib.rs | 2 +- daemon/src/maker_cfd.rs | 4 ++ daemon/src/maker_inc_connections.rs | 52 ++++++++++++--- daemon/src/taker_cfd.rs | 3 + daemon/src/to_sse_event.rs | 39 ++++++++++-- daemon/src/wire.rs | 19 ++++++ daemon/tests/happy_path.rs | 2 +- taker-frontend/src/App.tsx | 7 ++- taker-frontend/src/components/History.tsx | 8 +-- taker-frontend/src/components/NavBar.tsx | 18 +++++- taker-frontend/src/components/Trade.tsx | 6 +- taker-frontend/src/types.ts | 10 +++ 15 files changed, 213 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 14ec962..ea55e22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -631,6 +631,7 @@ dependencies = [ "rust-embed", "rust_decimal", "rust_decimal_macros", + "semver 1.0.4", "serde", "serde_json", "serde_plain", @@ -2702,6 +2703,9 @@ name = "semver" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "568a8e6258aa33c13358f81fd834adb854c6f7c9468520910a9b1e8fac068012" +dependencies = [ + "serde", +] [[package]] name = "semver-parser" diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml index ae9725b..6e025c3 100644 --- a/daemon/Cargo.toml +++ b/daemon/Cargo.toml @@ -29,6 +29,7 @@ rocket-basicauth = { version = "2", default-features = false } rust-embed = "6.3" rust_decimal = "1.18" rust_decimal_macros = "1.18" +semver = { version = "1.0.4", features = ["serde"] } serde = { version = "1", features = ["derive"] } serde_json = "1" serde_plain = "1" diff --git a/daemon/src/connection.rs b/daemon/src/connection.rs index 3680d95..dbb45bd 100644 --- a/daemon/src/connection.rs +++ b/daemon/src/connection.rs @@ -2,11 +2,11 @@ use crate::address_map::{AddressMap, Stopping}; use crate::model::cfd::OrderId; use crate::model::{Identity, Price, Timestamp, Usd}; use crate::tokio_ext::FutureExt; -use crate::wire::EncryptedJsonCodec; +use crate::wire::{EncryptedJsonCodec, TakerToMaker, Version}; use crate::{collab_settlement_taker, log_error, noise, send_to_socket, setup_taker, wire, Tasks}; -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; use bdk::bitcoin::Amount; -use futures::StreamExt; +use futures::{SinkExt, StreamExt, TryStreamExt}; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::{Arc, Mutex}; @@ -55,7 +55,17 @@ struct MeasurePulse; #[derive(Clone, Debug, PartialEq)] pub enum ConnectionStatus { Online, - Offline, + Offline { + reason: Option, + }, +} + +#[derive(Clone, Debug, PartialEq)] +pub enum ConnectionCloseReason { + VersionMismatch { + taker_version: Version, + maker_version: Version, + }, } /// Message sent from the `setup_taker::Actor` to the @@ -197,8 +207,51 @@ impl Actor { (read, write, Arc::new(Mutex::new(noise))) }; - let read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise.clone())); - let write = FramedWrite::new(write, EncryptedJsonCodec::new(noise)); + let mut read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise.clone())); + let mut write = FramedWrite::new(write, EncryptedJsonCodec::new(noise)); + + let our_version = Version::current(); + write.send(TakerToMaker::Hello(our_version.clone())).await?; + + match read + .try_next() + .timeout(Duration::from_secs(10)) + .await + .with_context(|| { + format!( + "Maker {} did not send Hello within 10 seconds, dropping connection", + maker_identity + ) + })? { + Ok(Some(wire::MakerToTaker::Hello(maker_version))) => { + if our_version != maker_version { + self.status_sender + .send(ConnectionStatus::Offline { + reason: Some(ConnectionCloseReason::VersionMismatch { + taker_version: our_version.clone(), + maker_version: maker_version.clone(), + }), + }) + .expect("receiver to outlive the actor"); + + bail!( + "Network version mismatch, we are on version {} but taker is on version {}", + our_version, + maker_version, + ) + } + } + unexpected_message => { + bail!( + "Unexpected message {:?} from maker {}", + unexpected_message, + maker_identity + ) + } + } + + tracing::info!(address = %maker_addr, "Established connection to maker"); + let this = ctx.address().expect("self to be alive"); let send_to_socket = send_to_socket::Actor::new(write); @@ -219,8 +272,6 @@ impl Actor { .send(ConnectionStatus::Online) .expect("receiver to outlive the actor"); - tracing::info!(address = %maker_addr, "Established connection to maker"); - Ok(()) } @@ -287,6 +338,9 @@ impl Actor { tracing::warn!(%order_id, "No active collaborative settlement"); } } + wire::MakerToTaker::Hello(_) => { + tracing::warn!("Ignoring unexpected Hello message from maker. Hello is only expected when opening a new connection.") + } other => { // this one should go to the taker cfd actor log_error!(self.maker_to_taker.send(other)); @@ -307,7 +361,7 @@ impl Actor { if time_since_last_heartbeat > self.heartbeat_timeout { self.status_sender - .send(ConnectionStatus::Offline) + .send(ConnectionStatus::Offline { reason: None }) .expect("watch receiver to outlive the actor"); self.connected_state = None; } @@ -325,7 +379,8 @@ pub async fn connect( maker_addresses: Vec, ) { loop { - if maker_online_status_feed_receiver.borrow().clone() == ConnectionStatus::Offline { + let connection_status = maker_online_status_feed_receiver.borrow().clone(); + if matches!(connection_status, ConnectionStatus::Offline { .. }) { tracing::debug!("No connection to the maker"); 'connect: loop { for address in &maker_addresses { @@ -339,7 +394,7 @@ pub async fn connect( .await .expect("Taker actor to be present") { - tracing::trace!(%address, "Failed to establish connection: {:#}", e); + tracing::warn!(%address, "Failed to establish connection: {:#}", e); continue; } break 'connect; diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index c036947..566667c 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -245,7 +245,7 @@ where let cfds = load_all_cfds(&mut conn).await?; let (maker_online_status_feed_sender, maker_online_status_feed_receiver) = - watch::channel(ConnectionStatus::Offline); + watch::channel(ConnectionStatus::Offline { reason: None }); let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None); let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None); diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 55508d8..863264b 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -13,6 +13,7 @@ use crate::projection::{ }; use crate::setup_contract::RolloverParams; use crate::tokio_ext::FutureExt; +use crate::wire::TakerToMaker; use crate::{ log_error, maker_inc_connections, monitor, oracle, projection, setup_contract, setup_maker, wallet, wire, Tasks, @@ -1109,6 +1110,9 @@ where wire::TakerToMaker::Protocol { .. } => { unreachable!("This kind of message should be sent to the `setup_maker::Actor`") } + TakerToMaker::Hello(_) => { + unreachable!("The Hello message is not sent to the cfd actor") + } } } } diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index 04dd439..590bfc1 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -3,10 +3,11 @@ use crate::maker_cfd::{FromTaker, TakerConnected, TakerDisconnected}; use crate::model::cfd::{Order, OrderId}; use crate::model::Identity; use crate::noise::TransportStateExt; -use crate::wire::EncryptedJsonCodec; +use crate::tokio_ext::FutureExt; +use crate::wire::{EncryptedJsonCodec, MakerToTaker, TakerToMaker, Version}; use crate::{maker_cfd, noise, send_to_socket, setup_maker, wire, Tasks}; -use anyhow::Result; -use futures::TryStreamExt; +use anyhow::{bail, Context, Result}; +use futures::{SinkExt, TryStreamExt}; use std::collections::HashMap; use std::io; use std::net::SocketAddr; @@ -117,19 +118,54 @@ impl Actor { &mut self, mut stream: TcpStream, taker_address: SocketAddr, - ctx: &mut Context, + ctx: &mut xtra::Context, ) -> Result<()> { let transport_state = noise::responder_handshake(&mut stream, &self.noise_priv_key).await?; let taker_id = Identity::new(transport_state.get_remote_public_key()?); - tracing::info!(%taker_id, address = %taker_address, "New taker connected"); - let transport_state = Arc::new(Mutex::new(transport_state)); let (read, write) = stream.into_split(); let mut read = FramedRead::new(read, wire::EncryptedJsonCodec::new(transport_state.clone())); - let write = FramedWrite::new(write, EncryptedJsonCodec::new(transport_state)); + let mut write = FramedWrite::new(write, EncryptedJsonCodec::new(transport_state)); + + match read + .try_next() + .timeout(Duration::from_secs(10)) + .await + .with_context(|| { + format!( + "Taker {} did not send Hello within 10 seconds, dropping connection", + taker_id + ) + })? { + Ok(Some(TakerToMaker::Hello(taker_version))) => { + let our_version = Version::current(); + write.send(MakerToTaker::Hello(our_version.clone())).await?; + + if our_version != taker_version { + tracing::debug!( + "Network version mismatch, we are on version {} but taker is on version {}", + our_version, + taker_version + ); + + // A taker running a different version is not treated as error for the maker + return Ok(()); + } + } + unexpected_message => { + bail!( + "Unexpected message {:?} from taker {}", + unexpected_message, + taker_id + ); + } + } + + tracing::info!(%taker_id, address = %taker_address, "New taker connected"); + let this = ctx.address().expect("self to be alive"); let read_fut = async move { while let Ok(Some(msg)) = read.try_next().await { @@ -201,7 +237,7 @@ impl Actor { Ok(()) } - async fn handle(&mut self, msg: ListenerMessage, ctx: &mut Context) -> KeepRunning { + async fn handle(&mut self, msg: ListenerMessage, ctx: &mut xtra::Context) -> KeepRunning { match msg { ListenerMessage::NewConnection { stream, address } => { if let Err(err) = self.handle_new_connection_impl(stream, address, ctx).await { diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 84d9b2b..c78ba00 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -663,6 +663,9 @@ where wire::MakerToTaker::Settlement { .. } => { unreachable!("These messages should be sent to the `collab_settlement::Actor`") } + wire::MakerToTaker::Hello(_) => { + unreachable!("Connection related messages are handled in the connection actor") + } } } } diff --git a/daemon/src/to_sse_event.rs b/daemon/src/to_sse_event.rs index 7d6875d..add1e92 100644 --- a/daemon/src/to_sse_event.rs +++ b/daemon/src/to_sse_event.rs @@ -1,7 +1,7 @@ -use crate::connection::ConnectionStatus; -use crate::model; use crate::model::Timestamp; use crate::projection::{Cfd, CfdAction, CfdOrder, Identity, Quote}; +use crate::to_sse_event::ConnectionCloseReason::{MakerVersionOutdated, TakerVersionOutdated}; +use crate::{connection, model}; use bdk::bitcoin::Amount; use rocket::request::FromParam; use rocket::response::stream::Event; @@ -58,11 +58,40 @@ impl ToSseEvent for model::WalletInfo { } } -impl ToSseEvent for ConnectionStatus { +#[derive(Debug, Clone, Serialize)] +pub struct ConnectionStatus { + online: bool, + connection_close_reason: Option, +} + +#[derive(Debug, Clone, Serialize)] +pub enum ConnectionCloseReason { + MakerVersionOutdated, + TakerVersionOutdated, +} + +impl ToSseEvent for connection::ConnectionStatus { fn to_sse_event(&self) -> Event { let connected = match self { - ConnectionStatus::Online => true, - ConnectionStatus::Offline => false, + connection::ConnectionStatus::Online => ConnectionStatus { + online: true, + connection_close_reason: None, + }, + connection::ConnectionStatus::Offline { reason } => ConnectionStatus { + online: false, + connection_close_reason: reason.as_ref().map(|g| match g { + connection::ConnectionCloseReason::VersionMismatch { + maker_version, + taker_version, + } => { + if *maker_version < *taker_version { + MakerVersionOutdated + } else { + TakerVersionOutdated + } + } + }), + }, }; Event::json(&connected).event("maker_status") diff --git a/daemon/src/wire.rs b/daemon/src/wire.rs index b958df0..1595f56 100644 --- a/daemon/src/wire.rs +++ b/daemon/src/wire.rs @@ -18,6 +18,21 @@ use std::ops::RangeInclusive; use std::sync::{Arc, Mutex}; use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, PartialOrd)] +pub struct Version(semver::Version); + +impl Version { + pub fn current() -> Self { + Self(semver::Version::new(1, 0, 0)) + } +} + +impl fmt::Display for Version { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + pub mod taker_to_maker { use super::*; @@ -43,6 +58,7 @@ pub mod taker_to_maker { #[serde(tag = "type", content = "payload")] #[allow(clippy::large_enum_variant)] pub enum TakerToMaker { + Hello(Version), TakeOrder { order_id: OrderId, quantity: Usd, @@ -70,6 +86,7 @@ impl fmt::Display for TakerToMaker { TakerToMaker::ProposeRollOver { .. } => write!(f, "ProposeRollOver"), TakerToMaker::RollOverProtocol(_) => write!(f, "RollOverProtocol"), TakerToMaker::Settlement { .. } => write!(f, "Settlement"), + TakerToMaker::Hello(_) => write!(f, "Hello"), } } } @@ -78,6 +95,7 @@ impl fmt::Display for TakerToMaker { #[serde(tag = "type", content = "payload")] #[allow(clippy::large_enum_variant)] pub enum MakerToTaker { + Hello(Version), /// Periodically broadcasted message, indicating maker's presence Heartbeat, CurrentOrder(Option), @@ -114,6 +132,7 @@ pub mod maker_to_taker { impl fmt::Display for MakerToTaker { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { + MakerToTaker::Hello(_) => write!(f, "Hello"), MakerToTaker::Heartbeat { .. } => write!(f, "Heartbeat"), MakerToTaker::CurrentOrder(_) => write!(f, "CurrentOrder"), MakerToTaker::ConfirmOrder(_) => write!(f, "ConfirmOrder"), diff --git a/daemon/tests/happy_path.rs b/daemon/tests/happy_path.rs index c80a29f..0b2732a 100644 --- a/daemon/tests/happy_path.rs +++ b/daemon/tests/happy_path.rs @@ -189,7 +189,7 @@ async fn taker_notices_lack_of_maker() { sleep(taker_config.heartbeat_timeout).await; assert_eq!( - ConnectionStatus::Offline, + ConnectionStatus::Offline { reason: None }, next(taker.maker_status_feed()).await.unwrap(), ); diff --git a/taker-frontend/src/App.tsx b/taker-frontend/src/App.tsx index afe2962..18a86b8 100644 --- a/taker-frontend/src/App.tsx +++ b/taker-frontend/src/App.tsx @@ -25,6 +25,7 @@ import { BXBTData, Cfd, CfdOrderRequestPayload, + ConnectionStatus, intoCfd, intoOrder, MarginRequestPayload, @@ -63,8 +64,8 @@ export const App = () => { const cfdsOrUndefined = useLatestEvent(source, "cfds", intoCfd); let cfds = cfdsOrUndefined ? cfdsOrUndefined! : []; cfds.sort((a, b) => a.order_id.localeCompare(b.order_id)); - const connectedToMakerOrUndefined = useLatestEvent(source, "maker_status"); - const connectedToMaker = connectedToMakerOrUndefined ? connectedToMakerOrUndefined! : false; + const connectedToMakerOrUndefined = useLatestEvent(source, "maker_status"); + const connectedToMaker = connectedToMakerOrUndefined ? connectedToMakerOrUndefined : { online: false }; let [quantity, setQuantity] = useState("0"); let [margin, setMargin] = useState(0); @@ -154,7 +155,7 @@ export const App = () => { cfds={cfds.filter((cfd) => cfd.state.getGroup() === StateGroupKey.CLOSED )} - connectedToMaker + connectedToMaker={connectedToMaker} /> diff --git a/taker-frontend/src/components/History.tsx b/taker-frontend/src/components/History.tsx index 9ee2eca..af2f2fb 100644 --- a/taker-frontend/src/components/History.tsx +++ b/taker-frontend/src/components/History.tsx @@ -18,14 +18,14 @@ import { VStack, } from "@chakra-ui/react"; import * as React from "react"; -import { Cfd, Tx, TxLabel } from "../types"; +import { Cfd, ConnectionStatus, Tx, TxLabel } from "../types"; import usePostRequest from "../usePostRequest"; import CloseButton from "./CloseButton"; interface HistoryProps { cfds: Cfd[]; title?: string; - connectedToMaker: Boolean; + connectedToMaker: ConnectionStatus; } const History = ({ cfds, title, connectedToMaker }: HistoryProps) => { @@ -51,7 +51,7 @@ export default History; interface CfdDetailsProps { cfd: Cfd; - connectedToMaker: Boolean; + connectedToMaker: ConnectionStatus; } const CfdDetails = ({ cfd, connectedToMaker }: CfdDetailsProps) => { @@ -74,7 +74,7 @@ const CfdDetails = ({ cfd, connectedToMaker }: CfdDetailsProps) => { let [settle, isSettling] = usePostRequest(`/api/cfd/${cfd.order_id}/settle`); let [commit, isCommiting] = usePostRequest(`/api/cfd/${cfd.order_id}/commit`); - const closeButton = connectedToMaker + const closeButton = connectedToMaker.online ? : ; diff --git a/taker-frontend/src/components/NavBar.tsx b/taker-frontend/src/components/NavBar.tsx index 437b44f..460db6b 100644 --- a/taker-frontend/src/components/NavBar.tsx +++ b/taker-frontend/src/components/NavBar.tsx @@ -17,11 +17,11 @@ import * as React from "react"; import { useNavigate } from "react-router-dom"; import logoBlack from "../images/logo_nav_bar_black.svg"; import logoWhite from "../images/logo_nav_bar_white.svg"; -import { WalletInfo } from "../types"; +import { ConnectionCloseReason, ConnectionStatus, WalletInfo } from "../types"; interface NavProps { walletInfo: WalletInfo | null; - connectedToMaker: boolean; + connectedToMaker: ConnectionStatus; } export default function Nav({ walletInfo, connectedToMaker }: NavProps) { @@ -38,6 +38,18 @@ export default function Nav({ walletInfo, connectedToMaker }: NavProps) { , ); + let connectionMessage = connectedToMaker.online ? "Online" : "Offline"; + if (connectedToMaker.connection_close_reason) { + switch (connectedToMaker.connection_close_reason) { + case ConnectionCloseReason.MAKER_VERSION_OUTDATED: + connectionMessage = connectionMessage + ": the maker is running an outdated version"; + break; + case ConnectionCloseReason.TAKER_VERSION_OUTDATED: + connectionMessage = connectionMessage + " - you are running an incompatible version, please upgrade!"; + break; + } + } + return ( <> @@ -56,7 +68,7 @@ export default function Nav({ walletInfo, connectedToMaker }: NavProps) { navigate("/wallet")}>Wallet - {"Maker status: " + (connectedToMaker ? "Online" : "Offline")} + {"Maker status: " + connectionMessage}