diff --git a/Cargo.lock b/Cargo.lock index 660276e..bcf4778 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -391,6 +391,22 @@ dependencies = [ "version_check", ] +[[package]] +name = "core-foundation" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a89e2ae426ea83155dccf10c0fa6b1463ef6d5fcb44cee0b224a408fa640a62" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea221b5284a47e40033bf9b66f35f984ec0ea2931eb03505246cd27a963f981b" + [[package]] name = "cpufeatures" version = "0.2.1" @@ -505,6 +521,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tokio-tungstenite", "tokio-util", "tracing", "tracing-subscriber", @@ -1311,6 +1328,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "openssl-probe" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a" + [[package]] name = "os_str_bytes" version = "3.1.0" @@ -1985,6 +2008,18 @@ dependencies = [ "webpki", ] +[[package]] +name = "rustls-native-certs" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a07b7c1885bd8ed3831c289b7870b13ef46fe0e856d288c30d9cc17d75a2092" +dependencies = [ + "openssl-probe", + "rustls 0.19.1", + "schannel", + "security-framework", +] + [[package]] name = "rustversion" version = "1.0.5" @@ -2006,6 +2041,16 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schannel" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" +dependencies = [ + "lazy_static", + "winapi 0.3.9", +] + [[package]] name = "scoped-tls" version = "1.0.0" @@ -2069,6 +2114,29 @@ dependencies = [ "secp256k1-sys", ] +[[package]] +name = "security-framework" +version = "2.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "525bc1abfda2e1998d152c45cf13e696f76d0a4972310b22fac1658b05df7c87" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9dd14d83160b528b7bfd66439110573efcfbe281b17fc2ca9f39f550d619c7e" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.130" @@ -2124,6 +2192,19 @@ dependencies = [ "syn", ] +[[package]] +name = "sha-1" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6" +dependencies = [ + "block-buffer", + "cfg-if 1.0.0", + "cpufeatures", + "digest", + "opaque-debug", +] + [[package]] name = "sha2" version = "0.9.8" @@ -2535,6 +2616,23 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "511de3f85caf1c98983545490c3d09685fa8eb634e57eec22bb4db271f46cbd8" +dependencies = [ + "futures-util", + "log", + "pin-project", + "rustls 0.19.1", + "tokio", + "tokio-rustls", + "tungstenite", + "webpki", + "webpki-roots 0.21.1", +] + [[package]] name = "tokio-util" version = "0.6.8" @@ -2644,6 +2742,28 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +[[package]] +name = "tungstenite" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0b2d8558abd2e276b0a8df5c05a2ec762609344191e5fd23e292c910e9165b5" +dependencies = [ + "base64 0.13.0", + "byteorder", + "bytes", + "http", + "httparse", + "log", + "rand 0.8.4", + "rustls 0.19.1", + "rustls-native-certs", + "sha-1", + "thiserror", + "url", + "utf-8", + "webpki", +] + [[package]] name = "twoway" version = "0.2.2" @@ -2742,6 +2862,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "uuid" version = "0.8.2" diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml index 6797aaf..bee7d6a 100644 --- a/daemon/Cargo.toml +++ b/daemon/Cargo.toml @@ -27,6 +27,7 @@ sha2 = "0.9" sqlx = { version = "0.5", features = ["offline"] } thiserror = "1" tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "net"] } +tokio-tungstenite = { version = "0.15", features = ["rustls-tls"] } tokio-util = { version = "0.6", features = ["codec"] } tracing = { version = "0.1" } tracing-subscriber = { version = "0.2", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log", "json"] } diff --git a/daemon/src/bitmex_price_feed.rs b/daemon/src/bitmex_price_feed.rs new file mode 100644 index 0000000..aa23c03 --- /dev/null +++ b/daemon/src/bitmex_price_feed.rs @@ -0,0 +1,107 @@ +use crate::model::Usd; +use anyhow::Result; +use futures::{StreamExt, TryStreamExt}; +use rust_decimal::Decimal; +use std::convert::TryFrom; +use std::future::Future; +use std::time::SystemTime; +use tokio::sync::watch; +use tokio_tungstenite::tungstenite; + +/// Connects to the BitMex price feed, returning the polling task and a watch channel that will +/// always hold the last quote. +pub async fn new() -> Result<(impl Future, watch::Receiver)> { + let (connection, _) = tokio_tungstenite::connect_async( + "wss://www.bitmex.com/realtime?subscribe=quoteBin1m:XBTUSD", + ) + .await?; + let mut quotes = connection + .inspect_err(|e| tracing::warn!("Error on websocket stream: {}", e)) + .map(|msg| Quote::from_message(msg?)) + .filter_map(|result| async move { result.transpose() }) + .boxed() + .fuse(); + + tracing::info!("Connected to BitMex realtime API"); + + let first_quote = quotes.select_next_some().await?; + let (sender, receiver) = watch::channel(first_quote); + + let task = async move { + while let Ok(Some(quote)) = quotes.try_next().await { + if sender.send(quote).is_err() { + break; // If the receiver dies, we can exit the loop. + } + } + + tracing::warn!("Failed to read quote from websocket"); + }; + + Ok((task, receiver)) +} + +#[derive(Debug)] +pub struct Quote { + pub timestamp: SystemTime, + pub bid: Usd, + pub ask: Usd, +} + +impl Quote { + fn from_message(message: tungstenite::Message) -> Result> { + let text_message = match message { + tungstenite::Message::Text(text_message) => text_message, + _ => anyhow::bail!("Bad message type, only text is supported"), + }; + + let table_message = match serde_json::from_str::(&text_message) { + Ok(table_message) => table_message, + Err(_) => return Ok(None), + }; + + let [quote] = table_message.data; + + Ok(Some(Self { + timestamp: SystemTime::now(), + bid: Usd::from(Decimal::try_from(quote.bid_price)?), + ask: Usd::from(Decimal::try_from(quote.ask_price)?), + })) + } +} + +mod wire { + use serde::Deserialize; + + #[derive(Debug, Clone, Deserialize, PartialEq)] + pub struct TableMessage { + pub table: String, + // we always just expect a single quote, hence the use of an array instead of a vec + pub data: [QuoteData; 1], + } + + #[derive(Debug, Clone, Deserialize, PartialEq)] + #[serde(rename_all = "camelCase")] + pub struct QuoteData { + pub bid_size: u64, + pub ask_size: u64, + pub bid_price: f64, + pub ask_price: f64, + pub symbol: String, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rust_decimal_macros::dec; + + #[test] + fn can_deserialize_quote_message() { + let message = tungstenite::Message::Text(r#"{"table":"quoteBin1m","action":"insert","data":[{"timestamp":"2021-09-21T02:40:00.000Z","symbol":"XBTUSD","bidSize":50200,"bidPrice":42640.5,"askPrice":42641,"askSize":363600}]}"#.to_owned()); + + let quote = Quote::from_message(message).unwrap().unwrap(); + + assert_eq!(quote.bid, Usd(dec!(42640.5))); + assert_eq!(quote.ask, Usd(dec!(42641))); + } +} diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 791b9cb..a1e1871 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -19,6 +19,7 @@ use xtra::prelude::*; use xtra::spawn::TokioGlobalSpawnExt; mod auth; +mod bitmex_price_feed; mod db; mod keypair; mod logger; @@ -114,6 +115,21 @@ async fn main() -> Result<()> { tracing::info!("Listening on {}", local_addr); + let (task, mut quote_updates) = bitmex_price_feed::new().await?; + tokio::spawn(task); + + // dummy usage of quote receiver + tokio::spawn(async move { + loop { + let bitmex_price_feed::Quote { bid, ask, .. } = *quote_updates.borrow(); + tracing::info!(%bid, %ask, "BitMex quote updated"); + + if quote_updates.changed().await.is_err() { + return; + } + } + }); + rocket::custom(figment) .manage(cfd_feed_receiver) .manage(order_feed_receiver) diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index c4d1791..6a982df 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -16,6 +16,7 @@ use std::time::Duration; use tokio::sync::watch; use tracing_subscriber::filter::LevelFilter; +mod bitmex_price_feed; mod db; mod keypair; mod logger; @@ -109,6 +110,21 @@ async fn main() -> Result<()> { } }; + let (task, mut quote_updates) = bitmex_price_feed::new().await?; + tokio::spawn(task); + + // dummy usage of quote receiver + tokio::spawn(async move { + loop { + let bitmex_price_feed::Quote { bid, ask, .. } = *quote_updates.borrow(); + tracing::info!(%bid, %ask, "BitMex quote updated"); + + if quote_updates.changed().await.is_err() { + return; + } + } + }); + let figment = rocket::Config::figment() .merge(("databases.taker.url", data_dir.join("taker.sqlite"))) .merge(("port", opts.http_port));