diff --git a/daemon/src/bitmex_price_feed.rs b/daemon/src/bitmex_price_feed.rs index bc6b9b6..4f569a7 100644 --- a/daemon/src/bitmex_price_feed.rs +++ b/daemon/src/bitmex_price_feed.rs @@ -1,9 +1,10 @@ use crate::model::{Price, Timestamp}; use crate::{projection, Tasks}; use anyhow::Result; -use futures::{StreamExt, TryStreamExt}; +use futures::{SinkExt, TryStreamExt}; use rust_decimal::Decimal; use std::convert::TryFrom; +use std::time::Duration; use tokio_tungstenite::tungstenite; use xtra::prelude::MessageChannel; use xtra_productivity::xtra_productivity; @@ -31,7 +32,7 @@ impl Actor { async fn handle(&mut self, msg: NotifyNoConnection, ctx: &mut xtra::Context) { match msg { NotifyNoConnection::Failed { error } => { - tracing::warn!("Connection to BitMex realtime API failed: {:#}", error) + tracing::warn!("Connection to BitMex realtime API failed: {}", error) } NotifyNoConnection::StreamEnded => { tracing::warn!("Connection to BitMex realtime API closed") @@ -43,34 +44,58 @@ impl Actor { self.tasks.add(connect_until_successful(this)); } - async fn handle(&mut self, _: Connect, ctx: &mut xtra::Context) -> Result { + async fn handle(&mut self, _: Connect, ctx: &mut xtra::Context) -> Result<()> { tracing::debug!("Connecting to BitMex realtime API"); - let (connection, _) = tokio_tungstenite::connect_async(URL).await?; - let mut quotes = connection - .map(|msg| Quote::from_message(msg?)) - .filter_map(|result| async move { result.transpose() }) - .boxed() - .fuse(); + let (mut connection, _) = tokio_tungstenite::connect_async(URL).await?; tracing::info!("Connected to BitMex realtime API"); - let initial_quote = quotes.select_next_some().await?; - let this = ctx.address().expect("we are alive"); self.tasks.add({ let receiver = self.receiver.clone_channel(); async move { let no_connection = loop { - match quotes.try_next().await { - Ok(Some(quote)) => { - if receiver.send(projection::Update(quote)).await.is_err() { - return; // if the receiver dies, our job is done + tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(5)) => { + tracing::trace!("No message from BitMex in the last 5 seconds, pinging"); + let _ = connection.send(tungstenite::Message::Ping([0u8; 32].to_vec())).await; + }, + msg = connection.try_next() => { + match msg { + Ok(Some(tungstenite::Message::Pong(_))) => { + tracing::trace!("Received pong"); + continue; + } + Ok(Some(tungstenite::Message::Text(text))) => { + match Quote::from_str(&text) { + Ok(None) => { + continue; + } + Ok(Some(quote)) => { + if receiver.send(projection::Update(quote)).await.is_err() { + return; // if the receiver dies, our job is done + } + } + Err(e) => { + tracing::warn!("Failed to parse quote: {:#}", e); + return; + } + } + } + Ok(Some(other)) => { + tracing::trace!("Unsupported message: {:?}", other); + continue; + } + Ok(None) => { + break NotifyNoConnection::StreamEnded + } + Err(e) => { + break NotifyNoConnection::Failed { error: e } + } } - } - Ok(None) => break NotifyNoConnection::StreamEnded, - Err(e) => break NotifyNoConnection::Failed { error: e }, + }, } }; @@ -78,7 +103,7 @@ impl Actor { } }); - Ok(initial_quote) + Ok(()) } } @@ -95,7 +120,7 @@ async fn connect_until_successful(this: xtra::Address) { pub struct Connect; enum NotifyNoConnection { - Failed { error: anyhow::Error }, + Failed { error: tungstenite::Error }, StreamEnded, } @@ -107,16 +132,11 @@ pub struct Quote { } impl Quote { - fn from_message(message: tungstenite::Message) -> Result> { - let text_message = match message { - tungstenite::Message::Text(text_message) => text_message, - _ => anyhow::bail!("Bad message type, only text is supported"), - }; - - let table_message = match serde_json::from_str::(&text_message) { + fn from_str(text: &str) -> Result> { + let table_message = match serde_json::from_str::(text) { Ok(table_message) => table_message, Err(_) => { - tracing::trace!(%text_message, "Not a 'table' message, skipping..."); + tracing::trace!(%text, "Not a 'table' message, skipping..."); return Ok(None); } }; @@ -173,9 +193,7 @@ mod tests { #[test] fn can_deserialize_quote_message() { - let message = tungstenite::Message::Text(r#"{"table":"quoteBin1m","action":"insert","data":[{"timestamp":"2021-09-21T02:40:00.000Z","symbol":"XBTUSD","bidSize":50200,"bidPrice":42640.5,"askPrice":42641,"askSize":363600}]}"#.to_owned()); - - let quote = Quote::from_message(message).unwrap().unwrap(); + let quote = Quote::from_str(r#"{"table":"quoteBin1m","action":"insert","data":[{"timestamp":"2021-09-21T02:40:00.000Z","symbol":"XBTUSD","bidSize":50200,"bidPrice":42640.5,"askPrice":42641,"askSize":363600}]}"#).unwrap().unwrap(); assert_eq!(quote.bid, Price::new(dec!(42640.5)).unwrap()); assert_eq!(quote.ask, Price::new(dec!(42641)).unwrap()); diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 086b3dc..378a5d0 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -262,12 +262,12 @@ async fn main() -> Result<()> { .run(); tasks.add(task); - let init_quote = price_feed_address + price_feed_address .send(bitmex_price_feed::Connect) .await??; let (proj_actor, projection_feeds) = - projection::Actor::new(db.clone(), Role::Maker, bitcoin_network, init_quote).await?; + projection::Actor::new(db.clone(), Role::Maker, bitcoin_network).await?; tasks.add(projection_context.run(proj_actor)); let listener_stream = futures::stream::poll_fn(move |ctx| { diff --git a/daemon/src/projection.rs b/daemon/src/projection.rs index cf8fcdb..66ce6f3 100644 --- a/daemon/src/projection.rs +++ b/daemon/src/projection.rs @@ -42,19 +42,14 @@ pub struct Actor { } pub struct Feeds { - pub quote: watch::Receiver, + pub quote: watch::Receiver>, pub order: watch::Receiver>, pub connected_takers: watch::Receiver>, pub cfds: watch::Receiver>, } impl Actor { - pub async fn new( - db: sqlx::SqlitePool, - role: Role, - network: Network, - init_quote: bitmex_price_feed::Quote, - ) -> Result<(Self, Feeds)> { + pub async fn new(db: sqlx::SqlitePool, role: Role, network: Network) -> Result<(Self, Feeds)> { let mut conn = db.acquire().await?; let init_cfds = db::load_all_cfds(&mut conn).await?; @@ -63,12 +58,12 @@ impl Actor { network, cfds: init_cfds, proposals: HashMap::new(), - quote: init_quote.clone(), + quote: None, }; let (tx_cfds, rx_cfds) = watch::channel(state.to_cfds()); let (tx_order, rx_order) = watch::channel(None); - let (tx_quote, rx_quote) = watch::channel(init_quote.into()); + let (tx_quote, rx_quote) = watch::channel(None); let (tx_connected_takers, rx_connected_takers) = watch::channel(Vec::new()); Ok(( @@ -96,7 +91,7 @@ impl Actor { struct Tx { pub cfds: watch::Sender>, pub order: watch::Sender>, - pub quote: watch::Sender, + pub quote: watch::Sender>, // TODO: Use this channel to communicate maker status as well with generic // ID of connected counterparties pub connected_takers: watch::Sender>, @@ -106,7 +101,7 @@ struct Tx { struct State { role: Role, network: Network, - quote: bitmex_price_feed::Quote, + quote: Option, proposals: UpdateCfdProposals, cfds: Vec, } @@ -135,7 +130,7 @@ impl State { } pub fn update_quote(&mut self, quote: bitmex_price_feed::Quote) { - let _ = std::mem::replace(&mut self.quote, quote); + self.quote = Some(quote); } pub fn update_cfds(&mut self, cfds: Vec) { @@ -175,7 +170,7 @@ impl Actor { fn handle(&mut self, msg: Update) { let quote = msg.0; self.state.update_quote(quote.clone()); - let _ = self.tx.quote.send(quote.into()); + let _ = self.tx.quote.send(Some(quote.into())); let _ = self.tx.cfds.send(self.state.to_cfds()); } fn handle(&mut self, msg: Update>) { @@ -488,9 +483,9 @@ pub struct Cfd { #[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")] pub margin_counterparty: Amount, - #[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")] - pub profit_btc: SignedAmount, - pub profit_in_percent: String, + #[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc::opt")] + pub profit_btc: Option, + pub profit_percent: Option, pub state: CfdState, pub actions: Vec, @@ -506,20 +501,28 @@ pub struct Cfd { impl From for Vec { fn from(input: CfdsWithAuxData) -> Self { - let current_price = input.current_price; let network = input.network; let cfds = input .cfds .iter() .map(|cfd| { - let (profit_btc, profit_in_percent) = - cfd.profit(current_price).unwrap_or_else(|error| { - tracing::warn!( - "Calculating profit/loss failed. Falling back to 0. {:#}", - error - ); - (SignedAmount::ZERO, Decimal::ZERO.into()) + let (profit_btc, profit_percent) = input.current_price + .map(|current_price| match cfd.profit(current_price) { + Ok((profit_btc, profit_percent)) => ( + Some(profit_btc), + Some(profit_percent.round_dp(1).to_string()), + ), + Err(e) => { + tracing::warn!("Failed to calculate profit/loss {:#}", e); + + (None, None) + } + }) + .unwrap_or_else(|| { + tracing::debug!(order_id = %cfd.order.id, "Unable to calculate profit/loss without current price"); + + (None, None) }); let pending_proposal = input.pending_proposals.get(&cfd.order.id); @@ -534,7 +537,7 @@ impl From for Vec { liquidation_price: cfd.order.liquidation_price.into(), quantity_usd: cfd.quantity_usd.into(), profit_btc, - profit_in_percent: profit_in_percent.round_dp(1).to_string(), + profit_percent, state: state.clone(), actions: available_actions(state, cfd.role()), state_transition_timestamp: cfd.state.get_transition_timestamp().seconds(), @@ -562,7 +565,7 @@ impl From for Vec { // TODO: Remove this struct out of existence pub struct CfdsWithAuxData { pub cfds: Vec, - pub current_price: model::Price, + pub current_price: Option, pub pending_proposals: UpdateCfdProposals, pub network: Network, } @@ -570,15 +573,15 @@ pub struct CfdsWithAuxData { impl CfdsWithAuxData { pub fn new( cfds: Vec, - quote: bitmex_price_feed::Quote, + quote: Option, pending_proposals: UpdateCfdProposals, role: Role, network: Network, ) -> Self { - let current_price = match role { + let current_price = quote.map(|quote| match role { Role::Maker => quote.for_maker(), Role::Taker => quote.for_taker(), - }; + }); CfdsWithAuxData { cfds, diff --git a/daemon/src/routes_taker.rs b/daemon/src/routes_taker.rs index 7e1e49b..7011dd6 100644 --- a/daemon/src/routes_taker.rs +++ b/daemon/src/routes_taker.rs @@ -124,7 +124,15 @@ pub async fn post_cfd_action( } CfdAction::Commit => cfd_actor.send(taker_cfd::Commit { order_id: id }).await, CfdAction::Settle => { - let quote: bitmex_price_feed::Quote = feeds.quote.borrow().clone().into(); + let quote: bitmex_price_feed::Quote = match feeds.quote.borrow().as_ref() { + Some(quote) => quote.clone().into(), + None => { + return Err(HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR) + .title("Quote unavailable") + .detail("Cannot settle without current price information.")) + } + }; + let current_price = quote.for_taker(); cfd_actor .send(taker_cfd::ProposeSettlement { diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 39a4025..bf8d56a 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -249,12 +249,12 @@ async fn main() -> Result<()> { .run(); tasks.add(task); - let init_quote = price_feed_address + price_feed_address .send(bitmex_price_feed::Connect) .await??; let (proj_actor, projection_feeds) = - projection::Actor::new(db.clone(), Role::Taker, bitcoin_network, init_quote).await?; + projection::Actor::new(db.clone(), Role::Taker, bitcoin_network).await?; tasks.add(projection_context.run(proj_actor)); let possible_addresses = resolve_maker_addresses(&opts.maker).await?; diff --git a/daemon/src/to_sse_event.rs b/daemon/src/to_sse_event.rs index eec421e..f4d206e 100644 --- a/daemon/src/to_sse_event.rs +++ b/daemon/src/to_sse_event.rs @@ -98,7 +98,7 @@ impl ToSseEvent for connection::ConnectionStatus { } } -impl ToSseEvent for Quote { +impl ToSseEvent for Option { fn to_sse_event(&self) -> Event { Event::json(self).event("quote") } diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index 0a6d299..14fb03a 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -3,10 +3,9 @@ use crate::harness::mocks::oracle::OracleActor; use crate::harness::mocks::wallet::WalletActor; use crate::schnorrsig; use ::bdk::bitcoin::Network; -use daemon::bitmex_price_feed::Quote; use daemon::connection::{connect, ConnectionStatus}; use daemon::model::cfd::{OrderId, Role}; -use daemon::model::{self, Price, Timestamp, Usd}; +use daemon::model::{self, Price, Usd}; use daemon::projection::{Cfd, CfdOrder, Feeds, Identity}; use daemon::seed::Seed; use daemon::{ @@ -168,10 +167,9 @@ impl Maker { .await .unwrap(); - let (proj_actor, feeds) = - projection::Actor::new(db, Role::Maker, Network::Testnet, dummy_quote()) - .await - .unwrap(); + let (proj_actor, feeds) = projection::Actor::new(db, Role::Maker, Network::Testnet) + .await + .unwrap(); tasks.add(projection_context.run(proj_actor)); let address = listener.local_addr().unwrap(); @@ -297,10 +295,9 @@ impl Taker { .await .unwrap(); - let (proj_actor, feeds) = - projection::Actor::new(db, Role::Taker, Network::Testnet, dummy_quote()) - .await - .unwrap(); + let (proj_actor, feeds) = projection::Actor::new(db, Role::Taker, Network::Testnet) + .await + .unwrap(); tasks.add(projection_context.run(proj_actor)); tasks.add(connect( @@ -391,14 +388,6 @@ pub fn dummy_price() -> Price { Price::new(dec!(50_000)).expect("to not fail") } -pub fn dummy_quote() -> Quote { - Quote { - timestamp: Timestamp::now(), - bid: dummy_price(), - ask: dummy_price(), - } -} - pub fn dummy_new_order() -> maker_cfd::NewOrder { maker_cfd::NewOrder { price: dummy_price(), diff --git a/taker-frontend/src/components/History.tsx b/taker-frontend/src/components/History.tsx index faf3bb8..72284de 100644 --- a/taker-frontend/src/components/History.tsx +++ b/taker-frontend/src/components/History.tsx @@ -8,6 +8,7 @@ import { HStack, Link, SimpleGrid, + Skeleton, Spinner, Table, Tbody, @@ -60,11 +61,6 @@ const CfdDetails = ({ cfd, connectedToMaker }: CfdDetailsProps) => { const margin = `₿${Math.round((cfd.margin) * 1_000_000) / 1_000_000}`; const liquidationPrice = `$${cfd.liquidation_price}`; - const pAndLNumber = Math.round((cfd.profit_btc) * 1_000_000) / 1_000_000; - const pAndL = pAndLNumber < 0 ? `-₿${Math.abs(pAndLNumber)}` : `₿${Math.abs(pAndLNumber)}`; - - const payout = `₿${Math.round((cfd.margin + cfd.profit_btc) * 1_000_000) / 1_000_000}`; - const txLock = cfd.details.tx_url_list.find((tx) => tx.label === TxLabel.Lock); const txCommit = cfd.details.tx_url_list.find((tx) => tx.label === TxLabel.Commit); const txRefund = cfd.details.tx_url_list.find((tx) => tx.label === TxLabel.Refund); @@ -108,11 +104,19 @@ const CfdDetails = ({ cfd, connectedToMaker }: CfdDetailsProps) => { Unrealized P/L - {pAndL} + + + + + Payout - {payout} + + + + + @@ -164,6 +168,33 @@ const CfdDetails = ({ cfd, connectedToMaker }: CfdDetailsProps) => { ); }; +interface ProfitAndLossProps { + profitBtc: number; +} + +function ProfitAndLoss({ profitBtc }: ProfitAndLossProps) { + const pAndLNumber = Math.round((profitBtc) * 1_000_000) / 1_000_000; + const absPAndL = Math.abs(pAndLNumber); + const negativeSign = pAndLNumber < 0 ? "-" : ""; + + return + {negativeSign}₿{absPAndL} + ; +} + +interface PayoutProps { + profitBtc: number; + margin: number; +} + +function Payout({ profitBtc, margin }: PayoutProps) { + let payoutBtc = Math.round((margin + profitBtc) * 1_000_000) / 1_000_000; + + return + ₿{payoutBtc} + ; +} + const CircleIcon = (props: any) => (