diff --git a/Cargo.lock b/Cargo.lock index 14ec962..ea55e22 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -631,6 +631,7 @@ dependencies = [ "rust-embed", "rust_decimal", "rust_decimal_macros", + "semver 1.0.4", "serde", "serde_json", "serde_plain", @@ -2702,6 +2703,9 @@ name = "semver" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "568a8e6258aa33c13358f81fd834adb854c6f7c9468520910a9b1e8fac068012" +dependencies = [ + "serde", +] [[package]] name = "semver-parser" diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml index ae9725b..6e025c3 100644 --- a/daemon/Cargo.toml +++ b/daemon/Cargo.toml @@ -29,6 +29,7 @@ rocket-basicauth = { version = "2", default-features = false } rust-embed = "6.3" rust_decimal = "1.18" rust_decimal_macros = "1.18" +semver = { version = "1.0.4", features = ["serde"] } serde = { version = "1", features = ["derive"] } serde_json = "1" serde_plain = "1" diff --git a/daemon/src/connection.rs b/daemon/src/connection.rs index 3680d95..dbb45bd 100644 --- a/daemon/src/connection.rs +++ b/daemon/src/connection.rs @@ -2,11 +2,11 @@ use crate::address_map::{AddressMap, Stopping}; use crate::model::cfd::OrderId; use crate::model::{Identity, Price, Timestamp, Usd}; use crate::tokio_ext::FutureExt; -use crate::wire::EncryptedJsonCodec; +use crate::wire::{EncryptedJsonCodec, TakerToMaker, Version}; use crate::{collab_settlement_taker, log_error, noise, send_to_socket, setup_taker, wire, Tasks}; -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; use bdk::bitcoin::Amount; -use futures::StreamExt; +use futures::{SinkExt, StreamExt, TryStreamExt}; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::{Arc, Mutex}; @@ -55,7 +55,17 @@ struct MeasurePulse; #[derive(Clone, Debug, PartialEq)] pub enum ConnectionStatus { Online, - Offline, + Offline { + reason: Option, + }, +} + +#[derive(Clone, Debug, PartialEq)] +pub enum ConnectionCloseReason { + VersionMismatch { + taker_version: Version, + maker_version: Version, + }, } /// Message sent from the `setup_taker::Actor` to the @@ -197,8 +207,51 @@ impl Actor { (read, write, Arc::new(Mutex::new(noise))) }; - let read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise.clone())); - let write = FramedWrite::new(write, EncryptedJsonCodec::new(noise)); + let mut read = FramedRead::new(read, wire::EncryptedJsonCodec::new(noise.clone())); + let mut write = FramedWrite::new(write, EncryptedJsonCodec::new(noise)); + + let our_version = Version::current(); + write.send(TakerToMaker::Hello(our_version.clone())).await?; + + match read + .try_next() + .timeout(Duration::from_secs(10)) + .await + .with_context(|| { + format!( + "Maker {} did not send Hello within 10 seconds, dropping connection", + maker_identity + ) + })? { + Ok(Some(wire::MakerToTaker::Hello(maker_version))) => { + if our_version != maker_version { + self.status_sender + .send(ConnectionStatus::Offline { + reason: Some(ConnectionCloseReason::VersionMismatch { + taker_version: our_version.clone(), + maker_version: maker_version.clone(), + }), + }) + .expect("receiver to outlive the actor"); + + bail!( + "Network version mismatch, we are on version {} but taker is on version {}", + our_version, + maker_version, + ) + } + } + unexpected_message => { + bail!( + "Unexpected message {:?} from maker {}", + unexpected_message, + maker_identity + ) + } + } + + tracing::info!(address = %maker_addr, "Established connection to maker"); + let this = ctx.address().expect("self to be alive"); let send_to_socket = send_to_socket::Actor::new(write); @@ -219,8 +272,6 @@ impl Actor { .send(ConnectionStatus::Online) .expect("receiver to outlive the actor"); - tracing::info!(address = %maker_addr, "Established connection to maker"); - Ok(()) } @@ -287,6 +338,9 @@ impl Actor { tracing::warn!(%order_id, "No active collaborative settlement"); } } + wire::MakerToTaker::Hello(_) => { + tracing::warn!("Ignoring unexpected Hello message from maker. Hello is only expected when opening a new connection.") + } other => { // this one should go to the taker cfd actor log_error!(self.maker_to_taker.send(other)); @@ -307,7 +361,7 @@ impl Actor { if time_since_last_heartbeat > self.heartbeat_timeout { self.status_sender - .send(ConnectionStatus::Offline) + .send(ConnectionStatus::Offline { reason: None }) .expect("watch receiver to outlive the actor"); self.connected_state = None; } @@ -325,7 +379,8 @@ pub async fn connect( maker_addresses: Vec, ) { loop { - if maker_online_status_feed_receiver.borrow().clone() == ConnectionStatus::Offline { + let connection_status = maker_online_status_feed_receiver.borrow().clone(); + if matches!(connection_status, ConnectionStatus::Offline { .. }) { tracing::debug!("No connection to the maker"); 'connect: loop { for address in &maker_addresses { @@ -339,7 +394,7 @@ pub async fn connect( .await .expect("Taker actor to be present") { - tracing::trace!(%address, "Failed to establish connection: {:#}", e); + tracing::warn!(%address, "Failed to establish connection: {:#}", e); continue; } break 'connect; diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index c036947..566667c 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -245,7 +245,7 @@ where let cfds = load_all_cfds(&mut conn).await?; let (maker_online_status_feed_sender, maker_online_status_feed_receiver) = - watch::channel(ConnectionStatus::Offline); + watch::channel(ConnectionStatus::Offline { reason: None }); let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None); let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None); diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 55508d8..863264b 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -13,6 +13,7 @@ use crate::projection::{ }; use crate::setup_contract::RolloverParams; use crate::tokio_ext::FutureExt; +use crate::wire::TakerToMaker; use crate::{ log_error, maker_inc_connections, monitor, oracle, projection, setup_contract, setup_maker, wallet, wire, Tasks, @@ -1109,6 +1110,9 @@ where wire::TakerToMaker::Protocol { .. } => { unreachable!("This kind of message should be sent to the `setup_maker::Actor`") } + TakerToMaker::Hello(_) => { + unreachable!("The Hello message is not sent to the cfd actor") + } } } } diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index 04dd439..590bfc1 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -3,10 +3,11 @@ use crate::maker_cfd::{FromTaker, TakerConnected, TakerDisconnected}; use crate::model::cfd::{Order, OrderId}; use crate::model::Identity; use crate::noise::TransportStateExt; -use crate::wire::EncryptedJsonCodec; +use crate::tokio_ext::FutureExt; +use crate::wire::{EncryptedJsonCodec, MakerToTaker, TakerToMaker, Version}; use crate::{maker_cfd, noise, send_to_socket, setup_maker, wire, Tasks}; -use anyhow::Result; -use futures::TryStreamExt; +use anyhow::{bail, Context, Result}; +use futures::{SinkExt, TryStreamExt}; use std::collections::HashMap; use std::io; use std::net::SocketAddr; @@ -117,19 +118,54 @@ impl Actor { &mut self, mut stream: TcpStream, taker_address: SocketAddr, - ctx: &mut Context, + ctx: &mut xtra::Context, ) -> Result<()> { let transport_state = noise::responder_handshake(&mut stream, &self.noise_priv_key).await?; let taker_id = Identity::new(transport_state.get_remote_public_key()?); - tracing::info!(%taker_id, address = %taker_address, "New taker connected"); - let transport_state = Arc::new(Mutex::new(transport_state)); let (read, write) = stream.into_split(); let mut read = FramedRead::new(read, wire::EncryptedJsonCodec::new(transport_state.clone())); - let write = FramedWrite::new(write, EncryptedJsonCodec::new(transport_state)); + let mut write = FramedWrite::new(write, EncryptedJsonCodec::new(transport_state)); + + match read + .try_next() + .timeout(Duration::from_secs(10)) + .await + .with_context(|| { + format!( + "Taker {} did not send Hello within 10 seconds, dropping connection", + taker_id + ) + })? { + Ok(Some(TakerToMaker::Hello(taker_version))) => { + let our_version = Version::current(); + write.send(MakerToTaker::Hello(our_version.clone())).await?; + + if our_version != taker_version { + tracing::debug!( + "Network version mismatch, we are on version {} but taker is on version {}", + our_version, + taker_version + ); + + // A taker running a different version is not treated as error for the maker + return Ok(()); + } + } + unexpected_message => { + bail!( + "Unexpected message {:?} from taker {}", + unexpected_message, + taker_id + ); + } + } + + tracing::info!(%taker_id, address = %taker_address, "New taker connected"); + let this = ctx.address().expect("self to be alive"); let read_fut = async move { while let Ok(Some(msg)) = read.try_next().await { @@ -201,7 +237,7 @@ impl Actor { Ok(()) } - async fn handle(&mut self, msg: ListenerMessage, ctx: &mut Context) -> KeepRunning { + async fn handle(&mut self, msg: ListenerMessage, ctx: &mut xtra::Context) -> KeepRunning { match msg { ListenerMessage::NewConnection { stream, address } => { if let Err(err) = self.handle_new_connection_impl(stream, address, ctx).await { diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 84d9b2b..c78ba00 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -663,6 +663,9 @@ where wire::MakerToTaker::Settlement { .. } => { unreachable!("These messages should be sent to the `collab_settlement::Actor`") } + wire::MakerToTaker::Hello(_) => { + unreachable!("Connection related messages are handled in the connection actor") + } } } } diff --git a/daemon/src/to_sse_event.rs b/daemon/src/to_sse_event.rs index 7d6875d..add1e92 100644 --- a/daemon/src/to_sse_event.rs +++ b/daemon/src/to_sse_event.rs @@ -1,7 +1,7 @@ -use crate::connection::ConnectionStatus; -use crate::model; use crate::model::Timestamp; use crate::projection::{Cfd, CfdAction, CfdOrder, Identity, Quote}; +use crate::to_sse_event::ConnectionCloseReason::{MakerVersionOutdated, TakerVersionOutdated}; +use crate::{connection, model}; use bdk::bitcoin::Amount; use rocket::request::FromParam; use rocket::response::stream::Event; @@ -58,11 +58,40 @@ impl ToSseEvent for model::WalletInfo { } } -impl ToSseEvent for ConnectionStatus { +#[derive(Debug, Clone, Serialize)] +pub struct ConnectionStatus { + online: bool, + connection_close_reason: Option, +} + +#[derive(Debug, Clone, Serialize)] +pub enum ConnectionCloseReason { + MakerVersionOutdated, + TakerVersionOutdated, +} + +impl ToSseEvent for connection::ConnectionStatus { fn to_sse_event(&self) -> Event { let connected = match self { - ConnectionStatus::Online => true, - ConnectionStatus::Offline => false, + connection::ConnectionStatus::Online => ConnectionStatus { + online: true, + connection_close_reason: None, + }, + connection::ConnectionStatus::Offline { reason } => ConnectionStatus { + online: false, + connection_close_reason: reason.as_ref().map(|g| match g { + connection::ConnectionCloseReason::VersionMismatch { + maker_version, + taker_version, + } => { + if *maker_version < *taker_version { + MakerVersionOutdated + } else { + TakerVersionOutdated + } + } + }), + }, }; Event::json(&connected).event("maker_status") diff --git a/daemon/src/wire.rs b/daemon/src/wire.rs index b958df0..1595f56 100644 --- a/daemon/src/wire.rs +++ b/daemon/src/wire.rs @@ -18,6 +18,21 @@ use std::ops::RangeInclusive; use std::sync::{Arc, Mutex}; use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec}; +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, PartialOrd)] +pub struct Version(semver::Version); + +impl Version { + pub fn current() -> Self { + Self(semver::Version::new(1, 0, 0)) + } +} + +impl fmt::Display for Version { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + pub mod taker_to_maker { use super::*; @@ -43,6 +58,7 @@ pub mod taker_to_maker { #[serde(tag = "type", content = "payload")] #[allow(clippy::large_enum_variant)] pub enum TakerToMaker { + Hello(Version), TakeOrder { order_id: OrderId, quantity: Usd, @@ -70,6 +86,7 @@ impl fmt::Display for TakerToMaker { TakerToMaker::ProposeRollOver { .. } => write!(f, "ProposeRollOver"), TakerToMaker::RollOverProtocol(_) => write!(f, "RollOverProtocol"), TakerToMaker::Settlement { .. } => write!(f, "Settlement"), + TakerToMaker::Hello(_) => write!(f, "Hello"), } } } @@ -78,6 +95,7 @@ impl fmt::Display for TakerToMaker { #[serde(tag = "type", content = "payload")] #[allow(clippy::large_enum_variant)] pub enum MakerToTaker { + Hello(Version), /// Periodically broadcasted message, indicating maker's presence Heartbeat, CurrentOrder(Option), @@ -114,6 +132,7 @@ pub mod maker_to_taker { impl fmt::Display for MakerToTaker { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { + MakerToTaker::Hello(_) => write!(f, "Hello"), MakerToTaker::Heartbeat { .. } => write!(f, "Heartbeat"), MakerToTaker::CurrentOrder(_) => write!(f, "CurrentOrder"), MakerToTaker::ConfirmOrder(_) => write!(f, "ConfirmOrder"), diff --git a/daemon/tests/happy_path.rs b/daemon/tests/happy_path.rs index c80a29f..0b2732a 100644 --- a/daemon/tests/happy_path.rs +++ b/daemon/tests/happy_path.rs @@ -189,7 +189,7 @@ async fn taker_notices_lack_of_maker() { sleep(taker_config.heartbeat_timeout).await; assert_eq!( - ConnectionStatus::Offline, + ConnectionStatus::Offline { reason: None }, next(taker.maker_status_feed()).await.unwrap(), ); diff --git a/taker-frontend/src/App.tsx b/taker-frontend/src/App.tsx index afe2962..18a86b8 100644 --- a/taker-frontend/src/App.tsx +++ b/taker-frontend/src/App.tsx @@ -25,6 +25,7 @@ import { BXBTData, Cfd, CfdOrderRequestPayload, + ConnectionStatus, intoCfd, intoOrder, MarginRequestPayload, @@ -63,8 +64,8 @@ export const App = () => { const cfdsOrUndefined = useLatestEvent(source, "cfds", intoCfd); let cfds = cfdsOrUndefined ? cfdsOrUndefined! : []; cfds.sort((a, b) => a.order_id.localeCompare(b.order_id)); - const connectedToMakerOrUndefined = useLatestEvent(source, "maker_status"); - const connectedToMaker = connectedToMakerOrUndefined ? connectedToMakerOrUndefined! : false; + const connectedToMakerOrUndefined = useLatestEvent(source, "maker_status"); + const connectedToMaker = connectedToMakerOrUndefined ? connectedToMakerOrUndefined : { online: false }; let [quantity, setQuantity] = useState("0"); let [margin, setMargin] = useState(0); @@ -154,7 +155,7 @@ export const App = () => { cfds={cfds.filter((cfd) => cfd.state.getGroup() === StateGroupKey.CLOSED )} - connectedToMaker + connectedToMaker={connectedToMaker} /> diff --git a/taker-frontend/src/components/History.tsx b/taker-frontend/src/components/History.tsx index 9ee2eca..af2f2fb 100644 --- a/taker-frontend/src/components/History.tsx +++ b/taker-frontend/src/components/History.tsx @@ -18,14 +18,14 @@ import { VStack, } from "@chakra-ui/react"; import * as React from "react"; -import { Cfd, Tx, TxLabel } from "../types"; +import { Cfd, ConnectionStatus, Tx, TxLabel } from "../types"; import usePostRequest from "../usePostRequest"; import CloseButton from "./CloseButton"; interface HistoryProps { cfds: Cfd[]; title?: string; - connectedToMaker: Boolean; + connectedToMaker: ConnectionStatus; } const History = ({ cfds, title, connectedToMaker }: HistoryProps) => { @@ -51,7 +51,7 @@ export default History; interface CfdDetailsProps { cfd: Cfd; - connectedToMaker: Boolean; + connectedToMaker: ConnectionStatus; } const CfdDetails = ({ cfd, connectedToMaker }: CfdDetailsProps) => { @@ -74,7 +74,7 @@ const CfdDetails = ({ cfd, connectedToMaker }: CfdDetailsProps) => { let [settle, isSettling] = usePostRequest(`/api/cfd/${cfd.order_id}/settle`); let [commit, isCommiting] = usePostRequest(`/api/cfd/${cfd.order_id}/commit`); - const closeButton = connectedToMaker + const closeButton = connectedToMaker.online ? : ; diff --git a/taker-frontend/src/components/NavBar.tsx b/taker-frontend/src/components/NavBar.tsx index 437b44f..460db6b 100644 --- a/taker-frontend/src/components/NavBar.tsx +++ b/taker-frontend/src/components/NavBar.tsx @@ -17,11 +17,11 @@ import * as React from "react"; import { useNavigate } from "react-router-dom"; import logoBlack from "../images/logo_nav_bar_black.svg"; import logoWhite from "../images/logo_nav_bar_white.svg"; -import { WalletInfo } from "../types"; +import { ConnectionCloseReason, ConnectionStatus, WalletInfo } from "../types"; interface NavProps { walletInfo: WalletInfo | null; - connectedToMaker: boolean; + connectedToMaker: ConnectionStatus; } export default function Nav({ walletInfo, connectedToMaker }: NavProps) { @@ -38,6 +38,18 @@ export default function Nav({ walletInfo, connectedToMaker }: NavProps) { , ); + let connectionMessage = connectedToMaker.online ? "Online" : "Offline"; + if (connectedToMaker.connection_close_reason) { + switch (connectedToMaker.connection_close_reason) { + case ConnectionCloseReason.MAKER_VERSION_OUTDATED: + connectionMessage = connectionMessage + ": the maker is running an outdated version"; + break; + case ConnectionCloseReason.TAKER_VERSION_OUTDATED: + connectionMessage = connectionMessage + " - you are running an incompatible version, please upgrade!"; + break; + } + } + return ( <> @@ -56,7 +68,7 @@ export default function Nav({ walletInfo, connectedToMaker }: NavProps) { navigate("/wallet")}>Wallet - {"Maker status: " + (connectedToMaker ? "Online" : "Offline")} + {"Maker status: " + connectionMessage}