@ -1,24 +1,53 @@
use crate ::model ::{ Price , Timestamp } ;
use crate ::model ::{ Price , Timestamp } ;
use crate ::projection ;
use crate ::{ projection , Tasks } ;
use anyhow ::Result ;
use anyhow ::Result ;
use futures ::{ StreamExt , TryStreamExt } ;
use futures ::{ StreamExt , TryStreamExt } ;
use rust_decimal ::Decimal ;
use rust_decimal ::Decimal ;
use std ::convert ::TryFrom ;
use std ::convert ::TryFrom ;
use std ::future ::Future ;
use tokio_tungstenite ::tungstenite ;
use tokio_tungstenite ::tungstenite ;
use xtra ::prelude ::MessageChannel ;
use xtra ::prelude ::MessageChannel ;
use xtra_productivity ::xtra_productivity ;
/// Connects to the BitMex price feed, returning the polling task and a watch channel that will
const URL : & str = "wss://www.bitmex.com/realtime?subscribe=quoteBin1m:XBTUSD" ;
/// always hold the last quote.
pub async fn new (
pub struct Actor {
msg_channel : impl MessageChannel < projection ::Update < Quote > > ,
tasks : Tasks ,
) -> Result < ( impl Future < Output = ( ) > , Quote ) > {
receiver : Box < dyn MessageChannel < projection ::Update < Quote > > > ,
let ( connection , _ ) = tokio_tungstenite ::connect_async (
}
"wss://www.bitmex.com/realtime?subscribe=quoteBin1m:XBTUSD" ,
)
impl Actor {
. await ? ;
pub fn new ( receiver : impl MessageChannel < projection ::Update < Quote > > + 'static ) -> Self {
Self {
tasks : Tasks ::default ( ) ,
receiver : Box ::new ( receiver ) ,
}
}
}
impl xtra ::Actor for Actor { }
#[ xtra_productivity ]
impl Actor {
async fn handle ( & mut self , msg : NotifyNoConnection , ctx : & mut xtra ::Context < Self > ) {
match msg {
NotifyNoConnection ::Failed { error } = > {
tracing ::warn ! ( "Connection to BitMex realtime API failed: {:#}" , error )
}
NotifyNoConnection ::StreamEnded = > {
tracing ::warn ! ( "Connection to BitMex realtime API closed" )
}
}
let this = ctx . address ( ) . expect ( "we are alive" ) ;
self . tasks . add ( connect_until_successful ( this ) ) ;
}
async fn handle ( & mut self , _ : Connect , ctx : & mut xtra ::Context < Self > ) -> Result < Quote > {
tracing ::debug ! ( "Connecting to BitMex realtime API" ) ;
let ( connection , _ ) = tokio_tungstenite ::connect_async ( URL ) . await ? ;
let mut quotes = connection
let mut quotes = connection
. inspect_err ( | e | tracing ::warn ! ( "Error on websocket stream: {}" , e ) )
. map ( | msg | Quote ::from_message ( msg ? ) )
. map ( | msg | Quote ::from_message ( msg ? ) )
. filter_map ( | result | async move { result . transpose ( ) } )
. filter_map ( | result | async move { result . transpose ( ) } )
. boxed ( )
. boxed ( )
@ -26,19 +55,48 @@ pub async fn new(
tracing ::info ! ( "Connected to BitMex realtime API" ) ;
tracing ::info ! ( "Connected to BitMex realtime API" ) ;
let first_quote = quotes . select_next_some ( ) . await ? ;
let initial_quote = quotes . select_next_some ( ) . await ? ;
let this = ctx . address ( ) . expect ( "we are alive" ) ;
let task = async move {
self . tasks . add ( {
while let Ok ( Some ( quote ) ) = quotes . try_next ( ) . await {
let receiver = self . receiver . clone_channel ( ) ;
if msg_channel . send ( projection ::Update ( quote ) ) . await . is_err ( ) {
async move {
break ; // If the receiver dies, we can exit the loop.
let no_connection = loop {
match quotes . try_next ( ) . await {
Ok ( Some ( quote ) ) = > {
if receiver . send ( projection ::Update ( quote ) ) . await . is_err ( ) {
return ; // if the receiver dies, our job is done
}
}
}
}
Ok ( None ) = > break NotifyNoConnection ::StreamEnded ,
tracing ::warn ! ( "Failed to read quote from websocket" ) ;
Err ( e ) = > break NotifyNoConnection ::Failed { error : e } ,
}
} ;
} ;
Ok ( ( task , first_quote ) )
let _ = this . send ( no_connection ) . await ;
}
} ) ;
Ok ( initial_quote )
}
}
async fn connect_until_successful ( this : xtra ::Address < Actor > ) {
while let Err ( e ) = this
. send ( Connect )
. await
. expect ( "always connected to ourselves" )
{
tracing ::warn ! ( "Failed to connect to BitMex realtime API: {:#}" , e ) ;
}
}
pub struct Connect ;
enum NotifyNoConnection {
Failed { error : anyhow ::Error } ,
StreamEnded ,
}
}
#[ derive(Clone, Debug) ]
#[ derive(Clone, Debug) ]
@ -57,7 +115,10 @@ impl Quote {
let table_message = match serde_json ::from_str ::< wire ::TableMessage > ( & text_message ) {
let table_message = match serde_json ::from_str ::< wire ::TableMessage > ( & text_message ) {
Ok ( table_message ) = > table_message ,
Ok ( table_message ) = > table_message ,
Err ( _ ) = > return Ok ( None ) ,
Err ( _ ) = > {
tracing ::trace ! ( % text_message , "Not a 'table' message, skipping..." ) ;
return Ok ( None ) ;
}
} ;
} ;
let [ quote ] = table_message . data ;
let [ quote ] = table_message . data ;