Browse Source

Merge #831

831: Keep BitMex API connection API open r=thomaseizinger a=thomaseizinger

To keep the BitMex API connection open, we need to send `Ping` messages to it.
To send `Ping` messages, we need access to the `Sink` API of the websocket connection.
By get access to the `Sink` API, we must not use any of the `Stream` combinators on the connection.
Not using any of the stream combinators makes it clunky to retrieve an initial quote before we start the loop.

Relying on an initial quote being available has been annoying in the past, and also doesn't help with ideas like https://github.com/itchysats/itchysats/discussions/753.

We also already had some hacky solution in place that defaulted to 0 for errors in calculating the profit.

Fix both problems by rendering only a part of the CFD blank:

![Screenshot from 2021-12-08 11-47-35](https://user-images.githubusercontent.com/5486389/145134380-c85e3cb7-31d6-482f-9ad0-2cdc8745d986.png)

Fixes #736.

Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
resilient-broadcast
bors[bot] 3 years ago
committed by GitHub
parent
commit
c37f91f6c6
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 72
      daemon/src/bitmex_price_feed.rs
  2. 4
      daemon/src/maker.rs
  3. 61
      daemon/src/projection.rs
  4. 10
      daemon/src/routes_taker.rs
  5. 4
      daemon/src/taker.rs
  6. 2
      daemon/src/to_sse_event.rs
  7. 17
      daemon/tests/harness/mod.rs
  8. 45
      taker-frontend/src/components/History.tsx
  9. 4
      taker-frontend/src/types.ts

72
daemon/src/bitmex_price_feed.rs

@ -1,9 +1,10 @@
use crate::model::{Price, Timestamp};
use crate::{projection, Tasks};
use anyhow::Result;
use futures::{StreamExt, TryStreamExt};
use futures::{SinkExt, TryStreamExt};
use rust_decimal::Decimal;
use std::convert::TryFrom;
use std::time::Duration;
use tokio_tungstenite::tungstenite;
use xtra::prelude::MessageChannel;
use xtra_productivity::xtra_productivity;
@ -31,7 +32,7 @@ impl Actor {
async fn handle(&mut self, msg: NotifyNoConnection, ctx: &mut xtra::Context<Self>) {
match msg {
NotifyNoConnection::Failed { error } => {
tracing::warn!("Connection to BitMex realtime API failed: {:#}", error)
tracing::warn!("Connection to BitMex realtime API failed: {}", error)
}
NotifyNoConnection::StreamEnded => {
tracing::warn!("Connection to BitMex realtime API closed")
@ -43,34 +44,58 @@ impl Actor {
self.tasks.add(connect_until_successful(this));
}
async fn handle(&mut self, _: Connect, ctx: &mut xtra::Context<Self>) -> Result<Quote> {
async fn handle(&mut self, _: Connect, ctx: &mut xtra::Context<Self>) -> Result<()> {
tracing::debug!("Connecting to BitMex realtime API");
let (connection, _) = tokio_tungstenite::connect_async(URL).await?;
let mut quotes = connection
.map(|msg| Quote::from_message(msg?))
.filter_map(|result| async move { result.transpose() })
.boxed()
.fuse();
let (mut connection, _) = tokio_tungstenite::connect_async(URL).await?;
tracing::info!("Connected to BitMex realtime API");
let initial_quote = quotes.select_next_some().await?;
let this = ctx.address().expect("we are alive");
self.tasks.add({
let receiver = self.receiver.clone_channel();
async move {
let no_connection = loop {
match quotes.try_next().await {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(5)) => {
tracing::trace!("No message from BitMex in the last 5 seconds, pinging");
let _ = connection.send(tungstenite::Message::Ping([0u8; 32].to_vec())).await;
},
msg = connection.try_next() => {
match msg {
Ok(Some(tungstenite::Message::Pong(_))) => {
tracing::trace!("Received pong");
continue;
}
Ok(Some(tungstenite::Message::Text(text))) => {
match Quote::from_str(&text) {
Ok(None) => {
continue;
}
Ok(Some(quote)) => {
if receiver.send(projection::Update(quote)).await.is_err() {
return; // if the receiver dies, our job is done
}
}
Ok(None) => break NotifyNoConnection::StreamEnded,
Err(e) => break NotifyNoConnection::Failed { error: e },
Err(e) => {
tracing::warn!("Failed to parse quote: {:#}", e);
return;
}
}
}
Ok(Some(other)) => {
tracing::trace!("Unsupported message: {:?}", other);
continue;
}
Ok(None) => {
break NotifyNoConnection::StreamEnded
}
Err(e) => {
break NotifyNoConnection::Failed { error: e }
}
}
},
}
};
@ -78,7 +103,7 @@ impl Actor {
}
});
Ok(initial_quote)
Ok(())
}
}
@ -95,7 +120,7 @@ async fn connect_until_successful(this: xtra::Address<Actor>) {
pub struct Connect;
enum NotifyNoConnection {
Failed { error: anyhow::Error },
Failed { error: tungstenite::Error },
StreamEnded,
}
@ -107,16 +132,11 @@ pub struct Quote {
}
impl Quote {
fn from_message(message: tungstenite::Message) -> Result<Option<Self>> {
let text_message = match message {
tungstenite::Message::Text(text_message) => text_message,
_ => anyhow::bail!("Bad message type, only text is supported"),
};
let table_message = match serde_json::from_str::<wire::TableMessage>(&text_message) {
fn from_str(text: &str) -> Result<Option<Self>> {
let table_message = match serde_json::from_str::<wire::TableMessage>(text) {
Ok(table_message) => table_message,
Err(_) => {
tracing::trace!(%text_message, "Not a 'table' message, skipping...");
tracing::trace!(%text, "Not a 'table' message, skipping...");
return Ok(None);
}
};
@ -173,9 +193,7 @@ mod tests {
#[test]
fn can_deserialize_quote_message() {
let message = tungstenite::Message::Text(r#"{"table":"quoteBin1m","action":"insert","data":[{"timestamp":"2021-09-21T02:40:00.000Z","symbol":"XBTUSD","bidSize":50200,"bidPrice":42640.5,"askPrice":42641,"askSize":363600}]}"#.to_owned());
let quote = Quote::from_message(message).unwrap().unwrap();
let quote = Quote::from_str(r#"{"table":"quoteBin1m","action":"insert","data":[{"timestamp":"2021-09-21T02:40:00.000Z","symbol":"XBTUSD","bidSize":50200,"bidPrice":42640.5,"askPrice":42641,"askSize":363600}]}"#).unwrap().unwrap();
assert_eq!(quote.bid, Price::new(dec!(42640.5)).unwrap());
assert_eq!(quote.ask, Price::new(dec!(42641)).unwrap());

4
daemon/src/maker.rs

@ -262,12 +262,12 @@ async fn main() -> Result<()> {
.run();
tasks.add(task);
let init_quote = price_feed_address
price_feed_address
.send(bitmex_price_feed::Connect)
.await??;
let (proj_actor, projection_feeds) =
projection::Actor::new(db.clone(), Role::Maker, bitcoin_network, init_quote).await?;
projection::Actor::new(db.clone(), Role::Maker, bitcoin_network).await?;
tasks.add(projection_context.run(proj_actor));
let listener_stream = futures::stream::poll_fn(move |ctx| {

61
daemon/src/projection.rs

@ -42,19 +42,14 @@ pub struct Actor {
}
pub struct Feeds {
pub quote: watch::Receiver<Quote>,
pub quote: watch::Receiver<Option<Quote>>,
pub order: watch::Receiver<Option<CfdOrder>>,
pub connected_takers: watch::Receiver<Vec<Identity>>,
pub cfds: watch::Receiver<Vec<Cfd>>,
}
impl Actor {
pub async fn new(
db: sqlx::SqlitePool,
role: Role,
network: Network,
init_quote: bitmex_price_feed::Quote,
) -> Result<(Self, Feeds)> {
pub async fn new(db: sqlx::SqlitePool, role: Role, network: Network) -> Result<(Self, Feeds)> {
let mut conn = db.acquire().await?;
let init_cfds = db::load_all_cfds(&mut conn).await?;
@ -63,12 +58,12 @@ impl Actor {
network,
cfds: init_cfds,
proposals: HashMap::new(),
quote: init_quote.clone(),
quote: None,
};
let (tx_cfds, rx_cfds) = watch::channel(state.to_cfds());
let (tx_order, rx_order) = watch::channel(None);
let (tx_quote, rx_quote) = watch::channel(init_quote.into());
let (tx_quote, rx_quote) = watch::channel(None);
let (tx_connected_takers, rx_connected_takers) = watch::channel(Vec::new());
Ok((
@ -96,7 +91,7 @@ impl Actor {
struct Tx {
pub cfds: watch::Sender<Vec<Cfd>>,
pub order: watch::Sender<Option<CfdOrder>>,
pub quote: watch::Sender<Quote>,
pub quote: watch::Sender<Option<Quote>>,
// TODO: Use this channel to communicate maker status as well with generic
// ID of connected counterparties
pub connected_takers: watch::Sender<Vec<Identity>>,
@ -106,7 +101,7 @@ struct Tx {
struct State {
role: Role,
network: Network,
quote: bitmex_price_feed::Quote,
quote: Option<bitmex_price_feed::Quote>,
proposals: UpdateCfdProposals,
cfds: Vec<ModelCfd>,
}
@ -135,7 +130,7 @@ impl State {
}
pub fn update_quote(&mut self, quote: bitmex_price_feed::Quote) {
let _ = std::mem::replace(&mut self.quote, quote);
self.quote = Some(quote);
}
pub fn update_cfds(&mut self, cfds: Vec<ModelCfd>) {
@ -175,7 +170,7 @@ impl Actor {
fn handle(&mut self, msg: Update<bitmex_price_feed::Quote>) {
let quote = msg.0;
self.state.update_quote(quote.clone());
let _ = self.tx.quote.send(quote.into());
let _ = self.tx.quote.send(Some(quote.into()));
let _ = self.tx.cfds.send(self.state.to_cfds());
}
fn handle(&mut self, msg: Update<Vec<model::Identity>>) {
@ -488,9 +483,9 @@ pub struct Cfd {
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
pub margin_counterparty: Amount,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
pub profit_btc: SignedAmount,
pub profit_in_percent: String,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc::opt")]
pub profit_btc: Option<SignedAmount>,
pub profit_percent: Option<String>,
pub state: CfdState,
pub actions: Vec<CfdAction>,
@ -506,20 +501,28 @@ pub struct Cfd {
impl From<CfdsWithAuxData> for Vec<Cfd> {
fn from(input: CfdsWithAuxData) -> Self {
let current_price = input.current_price;
let network = input.network;
let cfds = input
.cfds
.iter()
.map(|cfd| {
let (profit_btc, profit_in_percent) =
cfd.profit(current_price).unwrap_or_else(|error| {
tracing::warn!(
"Calculating profit/loss failed. Falling back to 0. {:#}",
error
);
(SignedAmount::ZERO, Decimal::ZERO.into())
let (profit_btc, profit_percent) = input.current_price
.map(|current_price| match cfd.profit(current_price) {
Ok((profit_btc, profit_percent)) => (
Some(profit_btc),
Some(profit_percent.round_dp(1).to_string()),
),
Err(e) => {
tracing::warn!("Failed to calculate profit/loss {:#}", e);
(None, None)
}
})
.unwrap_or_else(|| {
tracing::debug!(order_id = %cfd.order.id, "Unable to calculate profit/loss without current price");
(None, None)
});
let pending_proposal = input.pending_proposals.get(&cfd.order.id);
@ -534,7 +537,7 @@ impl From<CfdsWithAuxData> for Vec<Cfd> {
liquidation_price: cfd.order.liquidation_price.into(),
quantity_usd: cfd.quantity_usd.into(),
profit_btc,
profit_in_percent: profit_in_percent.round_dp(1).to_string(),
profit_percent,
state: state.clone(),
actions: available_actions(state, cfd.role()),
state_transition_timestamp: cfd.state.get_transition_timestamp().seconds(),
@ -562,7 +565,7 @@ impl From<CfdsWithAuxData> for Vec<Cfd> {
// TODO: Remove this struct out of existence
pub struct CfdsWithAuxData {
pub cfds: Vec<model::cfd::Cfd>,
pub current_price: model::Price,
pub current_price: Option<model::Price>,
pub pending_proposals: UpdateCfdProposals,
pub network: Network,
}
@ -570,15 +573,15 @@ pub struct CfdsWithAuxData {
impl CfdsWithAuxData {
pub fn new(
cfds: Vec<model::cfd::Cfd>,
quote: bitmex_price_feed::Quote,
quote: Option<bitmex_price_feed::Quote>,
pending_proposals: UpdateCfdProposals,
role: Role,
network: Network,
) -> Self {
let current_price = match role {
let current_price = quote.map(|quote| match role {
Role::Maker => quote.for_maker(),
Role::Taker => quote.for_taker(),
};
});
CfdsWithAuxData {
cfds,

10
daemon/src/routes_taker.rs

@ -124,7 +124,15 @@ pub async fn post_cfd_action(
}
CfdAction::Commit => cfd_actor.send(taker_cfd::Commit { order_id: id }).await,
CfdAction::Settle => {
let quote: bitmex_price_feed::Quote = feeds.quote.borrow().clone().into();
let quote: bitmex_price_feed::Quote = match feeds.quote.borrow().as_ref() {
Some(quote) => quote.clone().into(),
None => {
return Err(HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR)
.title("Quote unavailable")
.detail("Cannot settle without current price information."))
}
};
let current_price = quote.for_taker();
cfd_actor
.send(taker_cfd::ProposeSettlement {

4
daemon/src/taker.rs

@ -249,12 +249,12 @@ async fn main() -> Result<()> {
.run();
tasks.add(task);
let init_quote = price_feed_address
price_feed_address
.send(bitmex_price_feed::Connect)
.await??;
let (proj_actor, projection_feeds) =
projection::Actor::new(db.clone(), Role::Taker, bitcoin_network, init_quote).await?;
projection::Actor::new(db.clone(), Role::Taker, bitcoin_network).await?;
tasks.add(projection_context.run(proj_actor));
let possible_addresses = resolve_maker_addresses(&opts.maker).await?;

2
daemon/src/to_sse_event.rs

@ -98,7 +98,7 @@ impl ToSseEvent for connection::ConnectionStatus {
}
}
impl ToSseEvent for Quote {
impl ToSseEvent for Option<Quote> {
fn to_sse_event(&self) -> Event {
Event::json(self).event("quote")
}

17
daemon/tests/harness/mod.rs

@ -3,10 +3,9 @@ use crate::harness::mocks::oracle::OracleActor;
use crate::harness::mocks::wallet::WalletActor;
use crate::schnorrsig;
use ::bdk::bitcoin::Network;
use daemon::bitmex_price_feed::Quote;
use daemon::connection::{connect, ConnectionStatus};
use daemon::model::cfd::{OrderId, Role};
use daemon::model::{self, Price, Timestamp, Usd};
use daemon::model::{self, Price, Usd};
use daemon::projection::{Cfd, CfdOrder, Feeds, Identity};
use daemon::seed::Seed;
use daemon::{
@ -168,8 +167,7 @@ impl Maker {
.await
.unwrap();
let (proj_actor, feeds) =
projection::Actor::new(db, Role::Maker, Network::Testnet, dummy_quote())
let (proj_actor, feeds) = projection::Actor::new(db, Role::Maker, Network::Testnet)
.await
.unwrap();
tasks.add(projection_context.run(proj_actor));
@ -297,8 +295,7 @@ impl Taker {
.await
.unwrap();
let (proj_actor, feeds) =
projection::Actor::new(db, Role::Taker, Network::Testnet, dummy_quote())
let (proj_actor, feeds) = projection::Actor::new(db, Role::Taker, Network::Testnet)
.await
.unwrap();
tasks.add(projection_context.run(proj_actor));
@ -391,14 +388,6 @@ pub fn dummy_price() -> Price {
Price::new(dec!(50_000)).expect("to not fail")
}
pub fn dummy_quote() -> Quote {
Quote {
timestamp: Timestamp::now(),
bid: dummy_price(),
ask: dummy_price(),
}
}
pub fn dummy_new_order() -> maker_cfd::NewOrder {
maker_cfd::NewOrder {
price: dummy_price(),

45
taker-frontend/src/components/History.tsx

@ -8,6 +8,7 @@ import {
HStack,
Link,
SimpleGrid,
Skeleton,
Spinner,
Table,
Tbody,
@ -60,11 +61,6 @@ const CfdDetails = ({ cfd, connectedToMaker }: CfdDetailsProps) => {
const margin = `${Math.round((cfd.margin) * 1_000_000) / 1_000_000}`;
const liquidationPrice = `$${cfd.liquidation_price}`;
const pAndLNumber = Math.round((cfd.profit_btc) * 1_000_000) / 1_000_000;
const pAndL = pAndLNumber < 0 ? `-₿${Math.abs(pAndLNumber)}` : `${Math.abs(pAndLNumber)}`;
const payout = `${Math.round((cfd.margin + cfd.profit_btc) * 1_000_000) / 1_000_000}`;
const txLock = cfd.details.tx_url_list.find((tx) => tx.label === TxLabel.Lock);
const txCommit = cfd.details.tx_url_list.find((tx) => tx.label === TxLabel.Commit);
const txRefund = cfd.details.tx_url_list.find((tx) => tx.label === TxLabel.Refund);
@ -108,11 +104,19 @@ const CfdDetails = ({ cfd, connectedToMaker }: CfdDetailsProps) => {
</Tr>
<Tr>
<Td><Text as={"b"}>Unrealized P/L</Text></Td>
<Td textAlign="right">{pAndL}</Td>
<Td textAlign="right">
<Skeleton isLoaded={cfd.profit_btc != null}>
<ProfitAndLoss profitBtc={cfd.profit_btc!} />
</Skeleton>
</Td>
</Tr>
<Tr>
<Td><Text as={"b"}>Payout</Text></Td>
<Td textAlign="right">{payout}</Td>
<Td textAlign="right">
<Skeleton isLoaded={cfd.profit_btc != null}>
<Payout profitBtc={cfd.profit_btc!} margin={cfd.margin} />
</Skeleton>
</Td>
</Tr>
</Tbody>
</Table>
@ -164,6 +168,33 @@ const CfdDetails = ({ cfd, connectedToMaker }: CfdDetailsProps) => {
);
};
interface ProfitAndLossProps {
profitBtc: number;
}
function ProfitAndLoss({ profitBtc }: ProfitAndLossProps) {
const pAndLNumber = Math.round((profitBtc) * 1_000_000) / 1_000_000;
const absPAndL = Math.abs(pAndLNumber);
const negativeSign = pAndLNumber < 0 ? "-" : "";
return <Text>
{negativeSign}{absPAndL}
</Text>;
}
interface PayoutProps {
profitBtc: number;
margin: number;
}
function Payout({ profitBtc, margin }: PayoutProps) {
let payoutBtc = Math.round((margin + profitBtc) * 1_000_000) / 1_000_000;
return <Text>
{payoutBtc}
</Text>;
}
const CircleIcon = (props: any) => (
<Icon viewBox="0 0 200 200" {...props}>
<path

4
taker-frontend/src/types.ts

@ -52,8 +52,8 @@ export interface Cfd {
margin: number;
profit_btc: number;
profit_in_percent: number;
profit_btc?: number;
profit_percent?: number;
state: State;
state_transition_timestamp: number;

Loading…
Cancel
Save