Thomas Eizinger
3 years ago
5 changed files with 266 additions and 0 deletions
@ -0,0 +1,107 @@ |
|||
use crate::model::Usd; |
|||
use anyhow::Result; |
|||
use futures::{StreamExt, TryStreamExt}; |
|||
use rust_decimal::Decimal; |
|||
use std::convert::TryFrom; |
|||
use std::future::Future; |
|||
use std::time::SystemTime; |
|||
use tokio::sync::watch; |
|||
use tokio_tungstenite::tungstenite; |
|||
|
|||
/// Connects to the BitMex price feed, returning the polling task and a watch channel that will
|
|||
/// always hold the last quote.
|
|||
pub async fn new() -> Result<(impl Future<Output = ()>, watch::Receiver<Quote>)> { |
|||
let (connection, _) = tokio_tungstenite::connect_async( |
|||
"wss://www.bitmex.com/realtime?subscribe=quoteBin1m:XBTUSD", |
|||
) |
|||
.await?; |
|||
let mut quotes = connection |
|||
.inspect_err(|e| tracing::warn!("Error on websocket stream: {}", e)) |
|||
.map(|msg| Quote::from_message(msg?)) |
|||
.filter_map(|result| async move { result.transpose() }) |
|||
.boxed() |
|||
.fuse(); |
|||
|
|||
tracing::info!("Connected to BitMex realtime API"); |
|||
|
|||
let first_quote = quotes.select_next_some().await?; |
|||
let (sender, receiver) = watch::channel(first_quote); |
|||
|
|||
let task = async move { |
|||
while let Ok(Some(quote)) = quotes.try_next().await { |
|||
if sender.send(quote).is_err() { |
|||
break; // If the receiver dies, we can exit the loop.
|
|||
} |
|||
} |
|||
|
|||
tracing::warn!("Failed to read quote from websocket"); |
|||
}; |
|||
|
|||
Ok((task, receiver)) |
|||
} |
|||
|
|||
#[derive(Debug)] |
|||
pub struct Quote { |
|||
pub timestamp: SystemTime, |
|||
pub bid: Usd, |
|||
pub ask: Usd, |
|||
} |
|||
|
|||
impl Quote { |
|||
fn from_message(message: tungstenite::Message) -> Result<Option<Self>> { |
|||
let text_message = match message { |
|||
tungstenite::Message::Text(text_message) => text_message, |
|||
_ => anyhow::bail!("Bad message type, only text is supported"), |
|||
}; |
|||
|
|||
let table_message = match serde_json::from_str::<wire::TableMessage>(&text_message) { |
|||
Ok(table_message) => table_message, |
|||
Err(_) => return Ok(None), |
|||
}; |
|||
|
|||
let [quote] = table_message.data; |
|||
|
|||
Ok(Some(Self { |
|||
timestamp: SystemTime::now(), |
|||
bid: Usd::from(Decimal::try_from(quote.bid_price)?), |
|||
ask: Usd::from(Decimal::try_from(quote.ask_price)?), |
|||
})) |
|||
} |
|||
} |
|||
|
|||
mod wire { |
|||
use serde::Deserialize; |
|||
|
|||
#[derive(Debug, Clone, Deserialize, PartialEq)] |
|||
pub struct TableMessage { |
|||
pub table: String, |
|||
// we always just expect a single quote, hence the use of an array instead of a vec
|
|||
pub data: [QuoteData; 1], |
|||
} |
|||
|
|||
#[derive(Debug, Clone, Deserialize, PartialEq)] |
|||
#[serde(rename_all = "camelCase")] |
|||
pub struct QuoteData { |
|||
pub bid_size: u64, |
|||
pub ask_size: u64, |
|||
pub bid_price: f64, |
|||
pub ask_price: f64, |
|||
pub symbol: String, |
|||
} |
|||
} |
|||
|
|||
#[cfg(test)] |
|||
mod tests { |
|||
use super::*; |
|||
use rust_decimal_macros::dec; |
|||
|
|||
#[test] |
|||
fn can_deserialize_quote_message() { |
|||
let message = tungstenite::Message::Text(r#"{"table":"quoteBin1m","action":"insert","data":[{"timestamp":"2021-09-21T02:40:00.000Z","symbol":"XBTUSD","bidSize":50200,"bidPrice":42640.5,"askPrice":42641,"askSize":363600}]}"#.to_owned()); |
|||
|
|||
let quote = Quote::from_message(message).unwrap().unwrap(); |
|||
|
|||
assert_eq!(quote.bid, Usd(dec!(42640.5))); |
|||
assert_eq!(quote.ask, Usd(dec!(42641))); |
|||
} |
|||
} |
Loading…
Reference in new issue