Browse Source

Refactor bitmex price feed to Actor that reconnects on error

Fixes #758.
no-buy-button-while-setting-up-cfd
Thomas Eizinger 3 years ago
parent
commit
30e5dc8ae7
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 121
      daemon/src/bitmex_price_feed.rs
  2. 8
      daemon/src/maker.rs
  3. 9
      daemon/src/taker.rs

121
daemon/src/bitmex_price_feed.rs

@ -1,44 +1,102 @@
use crate::model::{Price, Timestamp};
use crate::projection;
use crate::{projection, Tasks};
use anyhow::Result;
use futures::{StreamExt, TryStreamExt};
use rust_decimal::Decimal;
use std::convert::TryFrom;
use std::future::Future;
use tokio_tungstenite::tungstenite;
use xtra::prelude::MessageChannel;
use xtra_productivity::xtra_productivity;
/// Connects to the BitMex price feed, returning the polling task and a watch channel that will
/// always hold the last quote.
pub async fn new(
msg_channel: impl MessageChannel<projection::Update<Quote>>,
) -> Result<(impl Future<Output = ()>, Quote)> {
let (connection, _) = tokio_tungstenite::connect_async(
"wss://www.bitmex.com/realtime?subscribe=quoteBin1m:XBTUSD",
)
.await?;
let mut quotes = connection
.inspect_err(|e| tracing::warn!("Error on websocket stream: {}", e))
.map(|msg| Quote::from_message(msg?))
.filter_map(|result| async move { result.transpose() })
.boxed()
.fuse();
tracing::info!("Connected to BitMex realtime API");
let first_quote = quotes.select_next_some().await?;
let task = async move {
while let Ok(Some(quote)) = quotes.try_next().await {
if msg_channel.send(projection::Update(quote)).await.is_err() {
break; // If the receiver dies, we can exit the loop.
const URL: &str = "wss://www.bitmex.com/realtime?subscribe=quoteBin1m:XBTUSD";
pub struct Actor {
tasks: Tasks,
receiver: Box<dyn MessageChannel<projection::Update<Quote>>>,
}
impl Actor {
pub fn new(receiver: impl MessageChannel<projection::Update<Quote>> + 'static) -> Self {
Self {
tasks: Tasks::default(),
receiver: Box::new(receiver),
}
}
}
impl xtra::Actor for Actor {}
#[xtra_productivity]
impl Actor {
async fn handle(&mut self, msg: NotifyNoConnection, ctx: &mut xtra::Context<Self>) {
match msg {
NotifyNoConnection::Failed { error } => {
tracing::warn!("Connection to BitMex realtime API failed: {:#}", error)
}
NotifyNoConnection::StreamEnded => {
tracing::warn!("Connection to BitMex realtime API closed")
}
}
tracing::warn!("Failed to read quote from websocket");
};
let this = ctx.address().expect("we are alive");
self.tasks.add(connect_until_successful(this));
}
Ok((task, first_quote))
async fn handle(&mut self, _: Connect, ctx: &mut xtra::Context<Self>) -> Result<Quote> {
tracing::debug!("Connecting to BitMex realtime API");
let (connection, _) = tokio_tungstenite::connect_async(URL).await?;
let mut quotes = connection
.map(|msg| Quote::from_message(msg?))
.filter_map(|result| async move { result.transpose() })
.boxed()
.fuse();
tracing::info!("Connected to BitMex realtime API");
let initial_quote = quotes.select_next_some().await?;
let this = ctx.address().expect("we are alive");
self.tasks.add({
let receiver = self.receiver.clone_channel();
async move {
let no_connection = loop {
match quotes.try_next().await {
Ok(Some(quote)) => {
if receiver.send(projection::Update(quote)).await.is_err() {
return; // if the receiver dies, our job is done
}
}
Ok(None) => break NotifyNoConnection::StreamEnded,
Err(e) => break NotifyNoConnection::Failed { error: e },
}
};
let _ = this.send(no_connection).await;
}
});
Ok(initial_quote)
}
}
async fn connect_until_successful(this: xtra::Address<Actor>) {
while let Err(e) = this
.send(Connect)
.await
.expect("always connected to ourselves")
{
tracing::warn!("Failed to connect to BitMex realtime API: {:#}", e);
}
}
pub struct Connect;
enum NotifyNoConnection {
Failed { error: anyhow::Error },
StreamEnded,
}
#[derive(Clone, Debug)]
@ -57,7 +115,10 @@ impl Quote {
let table_message = match serde_json::from_str::<wire::TableMessage>(&text_message) {
Ok(table_message) => table_message,
Err(_) => return Ok(None),
Err(_) => {
tracing::trace!(%text_message, "Not a 'table' message, skipping...");
return Ok(None);
}
};
let [quote] = table_message.data;

8
daemon/src/maker.rs

@ -267,9 +267,15 @@ async fn main() -> Result<()> {
)
.await?;
let (task, init_quote) = bitmex_price_feed::new(projection_actor).await?;
let (price_feed_address, task) = bitmex_price_feed::Actor::new(projection_actor)
.create(None)
.run();
tasks.add(task);
let init_quote = price_feed_address
.send(bitmex_price_feed::Connect)
.await??;
let cfds = {
let mut conn = db.acquire().await?;

9
daemon/src/taker.rs

@ -247,9 +247,16 @@ async fn main() -> Result<()> {
projection_actor.clone(),
)
.await?;
let (task, init_quote) = bitmex_price_feed::new(projection_actor).await?;
let (price_feed_address, task) = bitmex_price_feed::Actor::new(projection_actor)
.create(None)
.run();
tasks.add(task);
let init_quote = price_feed_address
.send(bitmex_price_feed::Connect)
.await??;
let cfds = {
let mut conn = db.acquire().await?;

Loading…
Cancel
Save