@ -1,9 +1,10 @@
use crate ::model ::{ Price , Timestamp } ;
use crate ::{ projection , Tasks } ;
use anyhow ::Result ;
use futures ::{ Stream Ext , TryStreamExt } ;
use futures ::{ Sink Ext , TryStreamExt } ;
use rust_decimal ::Decimal ;
use std ::convert ::TryFrom ;
use std ::time ::Duration ;
use tokio_tungstenite ::tungstenite ;
use xtra ::prelude ::MessageChannel ;
use xtra_productivity ::xtra_productivity ;
@ -31,7 +32,7 @@ impl Actor {
async fn handle ( & mut self , msg : NotifyNoConnection , ctx : & mut xtra ::Context < Self > ) {
match msg {
NotifyNoConnection ::Failed { error } = > {
tracing ::warn ! ( "Connection to BitMex realtime API failed: {:# }" , error )
tracing ::warn ! ( "Connection to BitMex realtime API failed: {}" , error )
}
NotifyNoConnection ::StreamEnded = > {
tracing ::warn ! ( "Connection to BitMex realtime API closed" )
@ -46,12 +47,7 @@ impl Actor {
async fn handle ( & mut self , _ : Connect , ctx : & mut xtra ::Context < Self > ) -> Result < ( ) > {
tracing ::debug ! ( "Connecting to BitMex realtime API" ) ;
let ( connection , _ ) = tokio_tungstenite ::connect_async ( URL ) . await ? ;
let mut quotes = connection
. map ( | msg | Quote ::from_message ( msg ? ) )
. filter_map ( | result | async move { result . transpose ( ) } )
. boxed ( )
. fuse ( ) ;
let ( mut connection , _ ) = tokio_tungstenite ::connect_async ( URL ) . await ? ;
tracing ::info ! ( "Connected to BitMex realtime API" ) ;
@ -61,14 +57,45 @@ impl Actor {
let receiver = self . receiver . clone_channel ( ) ;
async move {
let no_connection = loop {
match quotes . try_next ( ) . await {
Ok ( Some ( quote ) ) = > {
if receiver . send ( projection ::Update ( quote ) ) . await . is_err ( ) {
return ; // if the receiver dies, our job is done
tokio ::select ! {
_ = tokio ::time ::sleep ( Duration ::from_secs ( 5 ) ) = > {
tracing ::trace ! ( "No message from BitMex in the last 5 seconds, pinging" ) ;
let _ = connection . send ( tungstenite ::Message ::Ping ( [ 0 u8 ; 32 ] . to_vec ( ) ) ) . await ;
} ,
msg = connection . try_next ( ) = > {
match msg {
Ok ( Some ( tungstenite ::Message ::Pong ( _ ) ) ) = > {
tracing ::trace ! ( "Received pong" ) ;
continue ;
}
Ok ( Some ( tungstenite ::Message ::Text ( text ) ) ) = > {
match Quote ::from_str ( & text ) {
Ok ( None ) = > {
continue ;
}
Ok ( Some ( quote ) ) = > {
if receiver . send ( projection ::Update ( quote ) ) . await . is_err ( ) {
return ; // if the receiver dies, our job is done
}
}
Err ( e ) = > {
tracing ::warn ! ( "Failed to parse quote: {:#}" , e ) ;
return ;
}
}
}
Ok ( Some ( other ) ) = > {
tracing ::trace ! ( "Unsupported message: {:?}" , other ) ;
continue ;
}
Ok ( None ) = > {
break NotifyNoConnection ::StreamEnded
}
Err ( e ) = > {
break NotifyNoConnection ::Failed { error : e }
}
}
}
Ok ( None ) = > break NotifyNoConnection ::StreamEnded ,
Err ( e ) = > break NotifyNoConnection ::Failed { error : e } ,
} ,
}
} ;
@ -93,7 +120,7 @@ async fn connect_until_successful(this: xtra::Address<Actor>) {
pub struct Connect ;
enum NotifyNoConnection {
Failed { error : anyhow ::Error } ,
Failed { error : tungstenite ::Error } ,
StreamEnded ,
}
@ -105,16 +132,11 @@ pub struct Quote {
}
impl Quote {
fn from_message ( message : tungstenite ::Message ) -> Result < Option < Self > > {
let text_message = match message {
tungstenite ::Message ::Text ( text_message ) = > text_message ,
_ = > anyhow ::bail ! ( "Bad message type, only text is supported" ) ,
} ;
let table_message = match serde_json ::from_str ::< wire ::TableMessage > ( & text_message ) {
fn from_str ( text : & str ) -> Result < Option < Self > > {
let table_message = match serde_json ::from_str ::< wire ::TableMessage > ( text ) {
Ok ( table_message ) = > table_message ,
Err ( _ ) = > {
tracing ::trace ! ( % text_message , "Not a 'table' message, skipping..." ) ;
tracing ::trace ! ( % text , "Not a 'table' message, skipping..." ) ;
return Ok ( None ) ;
}
} ;
@ -171,9 +193,7 @@ mod tests {
#[ test ]
fn can_deserialize_quote_message ( ) {
let message = tungstenite ::Message ::Text ( r#"{"table":"quoteBin1m","action":"insert","data":[{"timestamp":"2021-09-21T02:40:00.000Z","symbol":"XBTUSD","bidSize":50200,"bidPrice":42640.5,"askPrice":42641,"askSize":363600}]}"# . to_owned ( ) ) ;
let quote = Quote ::from_message ( message ) . unwrap ( ) . unwrap ( ) ;
let quote = Quote ::from_str ( r#"{"table":"quoteBin1m","action":"insert","data":[{"timestamp":"2021-09-21T02:40:00.000Z","symbol":"XBTUSD","bidSize":50200,"bidPrice":42640.5,"askPrice":42641,"askSize":363600}]}"# ) . unwrap ( ) . unwrap ( ) ;
assert_eq ! ( quote . bid , Price ::new ( dec ! ( 42640.5 ) ) . unwrap ( ) ) ;
assert_eq ! ( quote . ask , Price ::new ( dec ! ( 42641 ) ) . unwrap ( ) ) ;