diff --git a/daemon/src/bitmex_price_feed.rs b/daemon/src/bitmex_price_feed.rs index 2141622..bc6b9b6 100644 --- a/daemon/src/bitmex_price_feed.rs +++ b/daemon/src/bitmex_price_feed.rs @@ -1,44 +1,102 @@ use crate::model::{Price, Timestamp}; -use crate::projection; +use crate::{projection, Tasks}; use anyhow::Result; use futures::{StreamExt, TryStreamExt}; use rust_decimal::Decimal; use std::convert::TryFrom; -use std::future::Future; use tokio_tungstenite::tungstenite; use xtra::prelude::MessageChannel; +use xtra_productivity::xtra_productivity; -/// Connects to the BitMex price feed, returning the polling task and a watch channel that will -/// always hold the last quote. -pub async fn new( - msg_channel: impl MessageChannel>, -) -> Result<(impl Future, Quote)> { - let (connection, _) = tokio_tungstenite::connect_async( - "wss://www.bitmex.com/realtime?subscribe=quoteBin1m:XBTUSD", - ) - .await?; - let mut quotes = connection - .inspect_err(|e| tracing::warn!("Error on websocket stream: {}", e)) - .map(|msg| Quote::from_message(msg?)) - .filter_map(|result| async move { result.transpose() }) - .boxed() - .fuse(); - - tracing::info!("Connected to BitMex realtime API"); - - let first_quote = quotes.select_next_some().await?; - - let task = async move { - while let Ok(Some(quote)) = quotes.try_next().await { - if msg_channel.send(projection::Update(quote)).await.is_err() { - break; // If the receiver dies, we can exit the loop. +const URL: &str = "wss://www.bitmex.com/realtime?subscribe=quoteBin1m:XBTUSD"; + +pub struct Actor { + tasks: Tasks, + receiver: Box>>, +} + +impl Actor { + pub fn new(receiver: impl MessageChannel> + 'static) -> Self { + Self { + tasks: Tasks::default(), + receiver: Box::new(receiver), + } + } +} + +impl xtra::Actor for Actor {} + +#[xtra_productivity] +impl Actor { + async fn handle(&mut self, msg: NotifyNoConnection, ctx: &mut xtra::Context) { + match msg { + NotifyNoConnection::Failed { error } => { + tracing::warn!("Connection to BitMex realtime API failed: {:#}", error) + } + NotifyNoConnection::StreamEnded => { + tracing::warn!("Connection to BitMex realtime API closed") } } - tracing::warn!("Failed to read quote from websocket"); - }; + let this = ctx.address().expect("we are alive"); + + self.tasks.add(connect_until_successful(this)); + } - Ok((task, first_quote)) + async fn handle(&mut self, _: Connect, ctx: &mut xtra::Context) -> Result { + tracing::debug!("Connecting to BitMex realtime API"); + + let (connection, _) = tokio_tungstenite::connect_async(URL).await?; + let mut quotes = connection + .map(|msg| Quote::from_message(msg?)) + .filter_map(|result| async move { result.transpose() }) + .boxed() + .fuse(); + + tracing::info!("Connected to BitMex realtime API"); + + let initial_quote = quotes.select_next_some().await?; + + let this = ctx.address().expect("we are alive"); + + self.tasks.add({ + let receiver = self.receiver.clone_channel(); + async move { + let no_connection = loop { + match quotes.try_next().await { + Ok(Some(quote)) => { + if receiver.send(projection::Update(quote)).await.is_err() { + return; // if the receiver dies, our job is done + } + } + Ok(None) => break NotifyNoConnection::StreamEnded, + Err(e) => break NotifyNoConnection::Failed { error: e }, + } + }; + + let _ = this.send(no_connection).await; + } + }); + + Ok(initial_quote) + } +} + +async fn connect_until_successful(this: xtra::Address) { + while let Err(e) = this + .send(Connect) + .await + .expect("always connected to ourselves") + { + tracing::warn!("Failed to connect to BitMex realtime API: {:#}", e); + } +} + +pub struct Connect; + +enum NotifyNoConnection { + Failed { error: anyhow::Error }, + StreamEnded, } #[derive(Clone, Debug)] @@ -57,7 +115,10 @@ impl Quote { let table_message = match serde_json::from_str::(&text_message) { Ok(table_message) => table_message, - Err(_) => return Ok(None), + Err(_) => { + tracing::trace!(%text_message, "Not a 'table' message, skipping..."); + return Ok(None); + } }; let [quote] = table_message.data; diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 9e39f7d..2229508 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -267,9 +267,15 @@ async fn main() -> Result<()> { ) .await?; - let (task, init_quote) = bitmex_price_feed::new(projection_actor).await?; + let (price_feed_address, task) = bitmex_price_feed::Actor::new(projection_actor) + .create(None) + .run(); tasks.add(task); + let init_quote = price_feed_address + .send(bitmex_price_feed::Connect) + .await??; + let cfds = { let mut conn = db.acquire().await?; diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 87f4926..2323e50 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -247,9 +247,16 @@ async fn main() -> Result<()> { projection_actor.clone(), ) .await?; - let (task, init_quote) = bitmex_price_feed::new(projection_actor).await?; + + let (price_feed_address, task) = bitmex_price_feed::Actor::new(projection_actor) + .create(None) + .run(); tasks.add(task); + let init_quote = price_feed_address + .send(bitmex_price_feed::Connect) + .await??; + let cfds = { let mut conn = db.acquire().await?;