From ddb66937a00bcb6e0308d5360fd40dfca5774681 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 8 Dec 2021 12:47:05 +1100 Subject: [PATCH] Keep BitMex API connection open If we don't receive a message from BitMex within 5 seconds, we ping them to let them know that we are still using this connection. This avoids the constant dis/reconnects that we have been seeing. --- daemon/src/bitmex_price_feed.rs | 74 +++++++++++++++++++++------------ 1 file changed, 47 insertions(+), 27 deletions(-) diff --git a/daemon/src/bitmex_price_feed.rs b/daemon/src/bitmex_price_feed.rs index b2387fc..4f569a7 100644 --- a/daemon/src/bitmex_price_feed.rs +++ b/daemon/src/bitmex_price_feed.rs @@ -1,9 +1,10 @@ use crate::model::{Price, Timestamp}; use crate::{projection, Tasks}; use anyhow::Result; -use futures::{StreamExt, TryStreamExt}; +use futures::{SinkExt, TryStreamExt}; use rust_decimal::Decimal; use std::convert::TryFrom; +use std::time::Duration; use tokio_tungstenite::tungstenite; use xtra::prelude::MessageChannel; use xtra_productivity::xtra_productivity; @@ -31,7 +32,7 @@ impl Actor { async fn handle(&mut self, msg: NotifyNoConnection, ctx: &mut xtra::Context) { match msg { NotifyNoConnection::Failed { error } => { - tracing::warn!("Connection to BitMex realtime API failed: {:#}", error) + tracing::warn!("Connection to BitMex realtime API failed: {}", error) } NotifyNoConnection::StreamEnded => { tracing::warn!("Connection to BitMex realtime API closed") @@ -46,12 +47,7 @@ impl Actor { async fn handle(&mut self, _: Connect, ctx: &mut xtra::Context) -> Result<()> { tracing::debug!("Connecting to BitMex realtime API"); - let (connection, _) = tokio_tungstenite::connect_async(URL).await?; - let mut quotes = connection - .map(|msg| Quote::from_message(msg?)) - .filter_map(|result| async move { result.transpose() }) - .boxed() - .fuse(); + let (mut connection, _) = tokio_tungstenite::connect_async(URL).await?; tracing::info!("Connected to BitMex realtime API"); @@ -61,14 +57,45 @@ impl Actor { let receiver = self.receiver.clone_channel(); async move { let no_connection = loop { - match quotes.try_next().await { - Ok(Some(quote)) => { - if receiver.send(projection::Update(quote)).await.is_err() { - return; // if the receiver dies, our job is done + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(5)) => { + tracing::trace!("No message from BitMex in the last 5 seconds, pinging"); + let _ = connection.send(tungstenite::Message::Ping([0u8; 32].to_vec())).await; + }, + msg = connection.try_next() => { + match msg { + Ok(Some(tungstenite::Message::Pong(_))) => { + tracing::trace!("Received pong"); + continue; + } + Ok(Some(tungstenite::Message::Text(text))) => { + match Quote::from_str(&text) { + Ok(None) => { + continue; + } + Ok(Some(quote)) => { + if receiver.send(projection::Update(quote)).await.is_err() { + return; // if the receiver dies, our job is done + } + } + Err(e) => { + tracing::warn!("Failed to parse quote: {:#}", e); + return; + } + } + } + Ok(Some(other)) => { + tracing::trace!("Unsupported message: {:?}", other); + continue; + } + Ok(None) => { + break NotifyNoConnection::StreamEnded + } + Err(e) => { + break NotifyNoConnection::Failed { error: e } + } } - } - Ok(None) => break NotifyNoConnection::StreamEnded, - Err(e) => break NotifyNoConnection::Failed { error: e }, + }, } }; @@ -93,7 +120,7 @@ async fn connect_until_successful(this: xtra::Address) { pub struct Connect; enum NotifyNoConnection { - Failed { error: anyhow::Error }, + Failed { error: tungstenite::Error }, StreamEnded, } @@ -105,16 +132,11 @@ pub struct Quote { } impl Quote { - fn from_message(message: tungstenite::Message) -> Result> { - let text_message = match message { - tungstenite::Message::Text(text_message) => text_message, - _ => anyhow::bail!("Bad message type, only text is supported"), - }; - - let table_message = match serde_json::from_str::(&text_message) { + fn from_str(text: &str) -> Result> { + let table_message = match serde_json::from_str::(text) { Ok(table_message) => table_message, Err(_) => { - tracing::trace!(%text_message, "Not a 'table' message, skipping..."); + tracing::trace!(%text, "Not a 'table' message, skipping..."); return Ok(None); } }; @@ -171,9 +193,7 @@ mod tests { #[test] fn can_deserialize_quote_message() { - let message = tungstenite::Message::Text(r#"{"table":"quoteBin1m","action":"insert","data":[{"timestamp":"2021-09-21T02:40:00.000Z","symbol":"XBTUSD","bidSize":50200,"bidPrice":42640.5,"askPrice":42641,"askSize":363600}]}"#.to_owned()); - - let quote = Quote::from_message(message).unwrap().unwrap(); + let quote = Quote::from_str(r#"{"table":"quoteBin1m","action":"insert","data":[{"timestamp":"2021-09-21T02:40:00.000Z","symbol":"XBTUSD","bidSize":50200,"bidPrice":42640.5,"askPrice":42641,"askSize":363600}]}"#).unwrap().unwrap(); assert_eq!(quote.bid, Price::new(dec!(42640.5)).unwrap()); assert_eq!(quote.ask, Price::new(dec!(42641)).unwrap());