From 641905e6bec0f280b1b38504bdc824b188a562f9 Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Mon, 27 Sep 2021 15:02:13 +0930 Subject: [PATCH 1/2] Display price feed in the UI Display both bid and ask price feed from BitMex. --- daemon/src/maker.rs | 15 ++------- daemon/src/routes_maker.rs | 9 ++++++ daemon/src/routes_taker.rs | 9 ++++++ daemon/src/taker.rs | 15 ++------- daemon/src/to_sse_event.rs | 35 ++++++++++++++++---- frontend/src/MakerApp.tsx | 9 +++--- frontend/src/TakerApp.tsx | 5 ++- frontend/src/components/CurrentPrice.tsx | 41 ++++++++++++++++++++++++ frontend/src/components/Timestamp.tsx | 26 +++++++++++++++ frontend/src/components/Types.tsx | 6 ++++ frontend/src/components/Wallet.tsx | 14 ++------ 11 files changed, 134 insertions(+), 50 deletions(-) create mode 100644 frontend/src/components/CurrentPrice.tsx create mode 100644 frontend/src/components/Timestamp.tsx diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 9ffb7eb..0987808 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -118,26 +118,15 @@ async fn main() -> Result<()> { tracing::info!("Listening on {}", local_addr); - let (task, mut quote_updates) = bitmex_price_feed::new().await?; + let (task, quote_updates) = bitmex_price_feed::new().await?; tokio::spawn(task); - // dummy usage of quote receiver - tokio::spawn(async move { - loop { - let bitmex_price_feed::Quote { bid, ask, .. } = *quote_updates.borrow(); - tracing::info!(%bid, %ask, "BitMex quote updated"); - - if quote_updates.changed().await.is_err() { - return; - } - } - }); - rocket::custom(figment) .manage(cfd_feed_receiver) .manage(order_feed_receiver) .manage(wallet_feed_receiver) .manage(auth_password) + .manage(quote_updates) .attach(Db::init()) .attach(AdHoc::try_on_ignite( "SQL migrations", diff --git a/daemon/src/routes_maker.rs b/daemon/src/routes_maker.rs index 559ebc0..aa5470e 100644 --- a/daemon/src/routes_maker.rs +++ b/daemon/src/routes_maker.rs @@ -1,4 +1,5 @@ use crate::auth::Authenticated; +use crate::bitmex_price_feed; use crate::maker_cfd; use crate::model::cfd::{Cfd, Order, OrderId, Origin}; use crate::model::{Usd, WalletInfo}; @@ -23,11 +24,13 @@ pub async fn maker_feed( rx_cfds: &State>>, rx_order: &State>>, rx_wallet: &State>, + rx_quote: &State>, _auth: Authenticated, ) -> EventStream![] { let mut rx_cfds = rx_cfds.inner().clone(); let mut rx_order = rx_order.inner().clone(); let mut rx_wallet = rx_wallet.inner().clone(); + let mut rx_quote = rx_quote.inner().clone(); EventStream! { let wallet_info = rx_wallet.borrow().clone(); @@ -38,6 +41,9 @@ pub async fn maker_feed( let cfds = rx_cfds.borrow().clone(); yield cfds.to_sse_event(); + let quote = rx_quote.borrow().clone(); + yield quote.to_sse_event(); + loop{ select! { @@ -52,6 +58,9 @@ pub async fn maker_feed( Ok(()) = rx_cfds.changed() => { let cfds = rx_cfds.borrow().clone(); yield cfds.to_sse_event(); + Ok(()) = rx_quote.changed() => { + let quote = rx_quote.borrow().clone(); + yield quote.to_sse_event(); } } } diff --git a/daemon/src/routes_taker.rs b/daemon/src/routes_taker.rs index 6c5db68..07f2c32 100644 --- a/daemon/src/routes_taker.rs +++ b/daemon/src/routes_taker.rs @@ -1,3 +1,4 @@ +use crate::bitmex_price_feed; use crate::model::cfd::{calculate_buy_margin, Cfd, Order, OrderId}; use crate::model::{Leverage, Usd, WalletInfo}; use crate::routes::EmbeddedFileExt; @@ -22,10 +23,12 @@ pub async fn feed( rx_cfds: &State>>, rx_order: &State>>, rx_wallet: &State>, + rx_quote: &State>, ) -> EventStream![] { let mut rx_cfds = rx_cfds.inner().clone(); let mut rx_order = rx_order.inner().clone(); let mut rx_wallet = rx_wallet.inner().clone(); + let mut rx_quote = rx_quote.inner().clone(); EventStream! { let wallet_info = rx_wallet.borrow().clone(); @@ -36,6 +39,9 @@ pub async fn feed( let cfds = rx_cfds.borrow().clone(); yield cfds.to_sse_event(); + let quote = rx_quote.borrow().clone(); + yield quote.to_sse_event(); + loop{ select! { @@ -50,6 +56,9 @@ pub async fn feed( Ok(()) = rx_cfds.changed() => { let cfds = rx_cfds.borrow().clone(); yield cfds.to_sse_event(); + Ok(()) = rx_quote.changed() => { + let quote = rx_quote.borrow().clone(); + yield quote.to_sse_event(); } } } diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 29c9b31..99f511a 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -118,21 +118,9 @@ async fn main() -> Result<()> { } }; - let (task, mut quote_updates) = bitmex_price_feed::new().await?; + let (task, quote_updates) = bitmex_price_feed::new().await?; tokio::spawn(task); - // dummy usage of quote receiver - tokio::spawn(async move { - loop { - let bitmex_price_feed::Quote { bid, ask, .. } = *quote_updates.borrow(); - tracing::info!(%bid, %ask, "BitMex quote updated"); - - if quote_updates.changed().await.is_err() { - return; - } - } - }); - let figment = rocket::Config::figment() .merge(("databases.taker.url", data_dir.join("taker.sqlite"))) .merge(("port", opts.http_port)); @@ -141,6 +129,7 @@ async fn main() -> Result<()> { .manage(cfd_feed_receiver) .manage(order_feed_receiver) .manage(wallet_feed_receiver) + .manage(quote_updates) .attach(Db::init()) .attach(AdHoc::try_on_ignite( "SQL migrations", diff --git a/daemon/src/to_sse_event.rs b/daemon/src/to_sse_event.rs index faee19d..5716d8a 100644 --- a/daemon/src/to_sse_event.rs +++ b/daemon/src/to_sse_event.rs @@ -1,10 +1,10 @@ -use crate::model; use crate::model::cfd::OrderId; use crate::model::{Leverage, Position, TradingPair, Usd}; +use crate::{bitmex_price_feed, model}; use bdk::bitcoin::Amount; use rocket::response::stream::Event; use serde::Serialize; -use std::time::UNIX_EPOCH; +use std::time::{SystemTime, UNIX_EPOCH}; #[derive(Debug, Clone, Serialize)] pub struct Cfd { @@ -127,13 +127,34 @@ impl ToSseEvent for model::WalletInfo { let wallet_info = WalletInfo { balance: self.balance, address: self.address.to_string(), - last_updated_at: self - .last_updated_at - .duration_since(UNIX_EPOCH) - .expect("timestamp to be convertible to duration since epoch") - .as_secs(), + last_updated_at: into_unix_secs(self.last_updated_at), }; Event::json(&wallet_info).event("wallet") } } + +#[derive(Debug, Clone, Serialize)] +pub struct Quote { + bid: Usd, + ask: Usd, + last_updated_at: u64, +} + +impl ToSseEvent for bitmex_price_feed::Quote { + fn to_sse_event(&self) -> Event { + let quote = Quote { + bid: self.bid, + ask: self.ask, + last_updated_at: into_unix_secs(self.timestamp), + }; + Event::json("e).event("quote") + } +} + +/// Convert to the format expected by the frontend +fn into_unix_secs(time: SystemTime) -> u64 { + time.duration_since(UNIX_EPOCH) + .expect("timestamp to be convertible to duration since epoch") + .as_secs() +} diff --git a/frontend/src/MakerApp.tsx b/frontend/src/MakerApp.tsx index cc83b94..08132fe 100644 --- a/frontend/src/MakerApp.tsx +++ b/frontend/src/MakerApp.tsx @@ -19,9 +19,10 @@ import { useEventSource } from "react-sse-hooks"; import { CfdTable } from "./components/cfdtables/CfdTable"; import { CfdTableMaker } from "./components/cfdtables/CfdTableMaker"; import CurrencyInputField from "./components/CurrencyInputField"; +import CurrentPrice from "./components/CurrentPrice"; import useLatestEvent from "./components/Hooks"; import OrderTile from "./components/OrderTile"; -import { Cfd, Order, WalletInfo } from "./components/Types"; +import { Cfd, Order, PriceInfo, WalletInfo } from "./components/Types"; import Wallet from "./components/Wallet"; import { CfdSellOrderPayload, postCfdSellOrderRequest } from "./MakerClient"; @@ -35,6 +36,7 @@ export default function App() { console.log(cfds); const walletInfo = useLatestEvent(source, "wallet"); + const priceInfo = useLatestEvent(source, "quote"); const toast = useToast(); let [minQuantity, setMinQuantity] = useState("100"); @@ -80,11 +82,8 @@ export default function App() { + - - Current Price: - {49000} - Min Quantity: (source, "order"); const walletInfo = useLatestEvent(source, "wallet"); + const priceInfo = useLatestEvent(source, "quote"); const toast = useToast(); let [quantity, setQuantity] = useState("0"); @@ -123,6 +125,7 @@ export default function App() { + Order Price: diff --git a/frontend/src/components/CurrentPrice.tsx b/frontend/src/components/CurrentPrice.tsx new file mode 100644 index 0000000..fa94833 --- /dev/null +++ b/frontend/src/components/CurrentPrice.tsx @@ -0,0 +1,41 @@ +import { Box, Center, Divider, HStack, Skeleton, Text } from "@chakra-ui/react"; +import React from "react"; +import Timestamp from "./Timestamp"; +import { PriceInfo } from "./Types"; + +interface Props { + priceInfo: PriceInfo | null; +} + +export default function CurrentPrice( + { + priceInfo, + }: Props, +) { + let bid = ; + let ask = ; + let timestamp = ; + + if (priceInfo) { + bid = {priceInfo.bid} USD; + ask = {priceInfo.ask} USD; + timestamp = ; + } + + return ( + +
Current Price
+ + Bid: + {bid} + + + + Ask: + {ask} + + + {timestamp} +
+ ); +} diff --git a/frontend/src/components/Timestamp.tsx b/frontend/src/components/Timestamp.tsx new file mode 100644 index 0000000..a3709aa --- /dev/null +++ b/frontend/src/components/Timestamp.tsx @@ -0,0 +1,26 @@ +import { Text } from "@chakra-ui/react"; +import React from "react"; +import { unixTimestampToDate } from "./Types"; + +interface Props { + timestamp: number; +} + +export default function Timestamp( + { + timestamp, + }: Props, +) { + return ( + + Updated: {unixTimestampToDate(timestamp).toLocaleDateString("en-US", { + year: "numeric", + month: "numeric", + day: "numeric", + hour: "2-digit", + minute: "2-digit", + second: "2-digit", + })} + + ); +} diff --git a/frontend/src/components/Types.tsx b/frontend/src/components/Types.tsx index fb5cea9..59b2399 100644 --- a/frontend/src/components/Types.tsx +++ b/frontend/src/components/Types.tsx @@ -37,6 +37,12 @@ export interface WalletInfo { last_updated_at: number; } +export interface PriceInfo { + bid: number; + ask: number; + last_updated_at: number; +} + export function unixTimestampToDate(unixTimestamp: number): Date { return new Date(unixTimestamp * 1000); } diff --git a/frontend/src/components/Wallet.tsx b/frontend/src/components/Wallet.tsx index 3b68332..209beba 100644 --- a/frontend/src/components/Wallet.tsx +++ b/frontend/src/components/Wallet.tsx @@ -1,7 +1,8 @@ import { CheckIcon, CopyIcon } from "@chakra-ui/icons"; import { Box, Center, Divider, HStack, IconButton, Skeleton, Text, useClipboard } from "@chakra-ui/react"; import React from "react"; -import { unixTimestampToDate, WalletInfo } from "./Types"; +import Timestamp from "./Timestamp"; +import { WalletInfo } from "./Types"; interface WalletProps { walletInfo: WalletInfo | null; @@ -30,16 +31,7 @@ export default function Wallet( />
); - timestamp = - Updated: {unixTimestampToDate(walletInfo.last_updated_at).toLocaleDateString("en-US", { - year: "numeric", - month: "numeric", - day: "numeric", - hour: "2-digit", - minute: "2-digit", - second: "2-digit", - })} - ; + timestamp = ; } return ( From 4b8237c9d6a71fda6f74b5198ba9571671326b6b Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Mon, 27 Sep 2021 15:10:24 +0930 Subject: [PATCH 2/2] Use current price in profit margin calculations As the profit margin is connected to the CFD feed, refresh the feed whenever current price or the CFDs change. Fixes #75. --- daemon/src/bitmex_price_feed.rs | 23 ++++++- daemon/src/cfd_feed.rs | 118 ++++++++++++++++++++++++++++++++ daemon/src/maker.rs | 34 +++++++-- daemon/src/maker_cfd.rs | 48 +++++++++---- daemon/src/routes_maker.rs | 12 ++-- daemon/src/routes_taker.rs | 12 ++-- daemon/src/taker.rs | 33 +++++++-- daemon/src/taker_cfd.rs | 50 ++++++++++---- daemon/src/to_sse_event.rs | 62 +---------------- 9 files changed, 279 insertions(+), 113 deletions(-) create mode 100644 daemon/src/cfd_feed.rs diff --git a/daemon/src/bitmex_price_feed.rs b/daemon/src/bitmex_price_feed.rs index aa23c03..766f82e 100644 --- a/daemon/src/bitmex_price_feed.rs +++ b/daemon/src/bitmex_price_feed.rs @@ -1,7 +1,8 @@ use crate::model::Usd; -use anyhow::Result; +use anyhow::{Context, Result}; use futures::{StreamExt, TryStreamExt}; use rust_decimal::Decimal; +use rust_decimal_macros::dec; use std::convert::TryFrom; use std::future::Future; use std::time::SystemTime; @@ -40,7 +41,7 @@ pub async fn new() -> Result<(impl Future, watch::Receiver)> Ok((task, receiver)) } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct Quote { pub timestamp: SystemTime, pub bid: Usd, @@ -67,6 +68,24 @@ impl Quote { ask: Usd::from(Decimal::try_from(quote.ask_price)?), })) } + + #[allow(dead_code)] // Not used by all binaries. + pub fn for_maker(&self) -> Usd { + self.ask + } + + #[allow(dead_code)] // Not used by all binaries. + pub fn for_taker(&self) -> Usd { + // TODO: Verify whether this is correct + self.mid_range().unwrap() + } + + fn mid_range(&self) -> Result { + Ok(Usd((self.bid.checked_add(self.ask))? + .0 + .checked_div(dec!(2)) + .context("division error")?)) + } } mod wire { diff --git a/daemon/src/cfd_feed.rs b/daemon/src/cfd_feed.rs new file mode 100644 index 0000000..ac7cc75 --- /dev/null +++ b/daemon/src/cfd_feed.rs @@ -0,0 +1,118 @@ +use crate::db::load_all_cfds; +use crate::model::cfd::OrderId; +use crate::model::{Leverage, Position, TradingPair, Usd}; +use crate::{bitmex_price_feed, model}; +use anyhow::{Context as _, Result}; +use bdk::bitcoin::Amount; +use serde::Serialize; +use sqlx::pool::PoolConnection; +use sqlx::Sqlite; +use std::time::UNIX_EPOCH; +use tokio::sync::watch; + +#[allow(dead_code)] +/// Role of the actor's owner in the upcoming contract +pub enum Role { + Maker, + Taker, +} + +#[derive(Debug, Clone, Serialize)] +pub struct Cfd { + pub order_id: OrderId, + pub initial_price: Usd, + + pub leverage: Leverage, + pub trading_pair: TradingPair, + pub position: Position, + pub liquidation_price: Usd, + + pub quantity_usd: Usd, + + #[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")] + pub margin: Amount, + + #[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")] + pub profit_btc: Amount, + pub profit_usd: Usd, + + pub state: String, + pub state_transition_timestamp: u64, +} + +pub struct CfdFeed { + role: Role, + cfd_feed_sender: watch::Sender>, + current_price: Option, +} + +impl CfdFeed { + pub fn new(role: Role) -> (Self, watch::Receiver>) { + let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::>(vec![]); + ( + Self { + role, + cfd_feed_sender, + current_price: None, + }, + cfd_feed_receiver, + ) + } + + /// Update price from BitMex. It is recommended to call `update()` afterwards. + pub fn set_current_price(&mut self, quote: bitmex_price_feed::Quote) { + self.current_price = Some(quote); + } + + /// Updates the CFD feed in the frontend + pub async fn update(&self, conn: &mut PoolConnection) -> Result<()> { + let cfds = load_all_cfds(conn).await?; + + let cfds = cfds + .iter() + .map(|cfd| { + let (profit_btc, profit_usd) = self.calculate_profit(cfd); + + Cfd { + order_id: cfd.order.id, + initial_price: cfd.order.price, + leverage: cfd.order.leverage, + trading_pair: cfd.order.trading_pair.clone(), + position: cfd.position(), + liquidation_price: cfd.order.liquidation_price, + quantity_usd: cfd.quantity_usd, + profit_btc, + profit_usd, + state: cfd.state.to_string(), + state_transition_timestamp: cfd + .state + .get_transition_timestamp() + .duration_since(UNIX_EPOCH) + .expect("timestamp to be convertable to duration since epoch") + .as_secs(), + + // TODO: Depending on the state the margin might be set (i.e. in Open we save it + // in the DB internally) and does not have to be calculated + margin: cfd.margin().unwrap(), + } + }) + .collect::>(); + + self.cfd_feed_sender + .send(cfds) + .context("Could not update CFD feed") + } + + fn calculate_profit(&self, cfd: &model::cfd::Cfd) -> (Amount, Usd) { + if let Some(quote) = &self.current_price { + cfd.profit(match self.role { + Role::Taker => quote.for_taker(), + Role::Maker => quote.for_maker(), + }) + .unwrap() + } else { + // No current price set yet, returning no profits + (Amount::ZERO, Usd::ZERO) + } + } +} diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 0987808..3444be9 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -1,11 +1,12 @@ use crate::auth::MAKER_USERNAME; +use crate::cfd_feed::CfdFeed; use crate::seed::Seed; use crate::wallet::Wallet; use anyhow::{Context, Result}; use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1}; use bdk::bitcoin::Network; use clap::Clap; -use model::cfd::{Cfd, Order}; +use model::cfd::Order; use model::WalletInfo; use rocket::fairing::AdHoc; use rocket_db_pools::Database; @@ -20,6 +21,7 @@ use xtra::spawn::TokioGlobalSpawnExt; mod actors; mod auth; mod bitmex_price_feed; +mod cfd_feed; mod db; mod keypair; mod logger; @@ -105,7 +107,7 @@ async fn main() -> Result<()> { let oracle = schnorrsig::KeyPair::new(SECP256K1, &mut rand::thread_rng()); // TODO: Fetch oracle public key from oracle. - let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::>(vec![]); + let (cfd_feed_updater, cfd_feed_receiver) = CfdFeed::new(cfd_feed::Role::Maker); let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::(wallet_info); @@ -118,7 +120,7 @@ async fn main() -> Result<()> { tracing::info!("Listening on {}", local_addr); - let (task, quote_updates) = bitmex_price_feed::new().await?; + let (task, mut quote_updates) = bitmex_price_feed::new().await?; tokio::spawn(task); rocket::custom(figment) @@ -126,7 +128,7 @@ async fn main() -> Result<()> { .manage(order_feed_receiver) .manage(wallet_feed_receiver) .manage(auth_password) - .manage(quote_updates) + .manage(quote_updates.clone()) .attach(Db::init()) .attach(AdHoc::try_on_ignite( "SQL migrations", @@ -157,7 +159,7 @@ async fn main() -> Result<()> { db, wallet.clone(), schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle), - cfd_feed_sender, + cfd_feed_updater, order_feed_sender, maker_inc_connections_address.clone(), monitor_actor_address, @@ -193,6 +195,28 @@ async fn main() -> Result<()> { }); tokio::spawn(maker_inc_connections_address.attach_stream(listener_stream)); + tokio::spawn({ + let cfd_actor_inbox = cfd_maker_actor_inbox.clone(); + + async move { + loop { + let quote = quote_updates.borrow().clone(); + + if cfd_actor_inbox.do_send_async(maker_cfd::PriceUpdate(quote)).await.is_err() { + tracing::warn!( + "Could not communicate with the message handler, stopping price feed sync" + ); + return; + } + + if quote_updates.changed().await.is_err() { + tracing::warn!("BitMex price feed receiver not available, stopping price feed sync"); + return; + } + } + } + }); + tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); Ok(rocket.manage(cfd_maker_actor_inbox)) diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 6d6764f..bacf569 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -1,14 +1,15 @@ use crate::actors::log_error; +use crate::cfd_feed::CfdFeed; use crate::db::{ - insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds, - load_cfd_by_order_id, load_order_by_id, + insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_cfd_by_order_id, + load_order_by_id, }; use crate::maker_inc_connections::TakerCommand; use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId}; use crate::model::{TakerId, Usd}; use crate::monitor::MonitorParams; use crate::wallet::Wallet; -use crate::{maker_inc_connections, monitor, setup_contract, wire}; +use crate::{bitmex_price_feed, maker_inc_connections, monitor, setup_contract, wire}; use anyhow::{Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; @@ -43,11 +44,13 @@ pub struct TakerStreamMessage { pub item: Result, } +pub struct PriceUpdate(pub bitmex_price_feed::Quote); + pub struct Actor { db: sqlx::SqlitePool, wallet: Wallet, oracle_pk: schnorrsig::PublicKey, - cfd_feed_actor_inbox: watch::Sender>, + cfd_feed: CfdFeed, order_feed_sender: watch::Sender>, takers: Address, current_order_id: Option, @@ -68,7 +71,7 @@ impl Actor { db: sqlx::SqlitePool, wallet: Wallet, oracle_pk: schnorrsig::PublicKey, - cfd_feed_actor_inbox: watch::Sender>, + cfd_feed: CfdFeed, order_feed_sender: watch::Sender>, takers: Address, monitor_actor: Address>, @@ -76,13 +79,13 @@ impl Actor { let mut conn = db.acquire().await?; // populate the CFD feed with existing CFDs - cfd_feed_actor_inbox.send(load_all_cfds(&mut conn).await?)?; + cfd_feed.update(&mut conn).await?; Ok(Self { db, wallet, oracle_pk, - cfd_feed_actor_inbox, + cfd_feed, order_feed_sender, takers, current_order_id: None, @@ -170,8 +173,7 @@ impl Actor { ) .await?; - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; + self.cfd_feed.update(&mut conn).await?; let txid = self .wallet @@ -248,8 +250,7 @@ impl Actor { ); insert_cfd(cfd, &mut conn).await?; - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; + self.cfd_feed.update(&mut conn).await?; // 3. Remove current order self.current_order_id = None; @@ -309,8 +310,7 @@ impl Actor { }) .await?; - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; + self.cfd_feed.update(&mut conn).await?; let contract_future = setup_contract::new( self.takers.clone().into_sink().with(move |msg| { @@ -380,8 +380,7 @@ impl Actor { command: TakerCommand::NotifyOrderRejected { id: msg.order_id }, }) .await?; - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; + self.cfd_feed.update(&mut conn).await?; // Remove order for all self.current_order_id = None; @@ -418,6 +417,14 @@ impl Actor { Ok(()) } + + async fn handle_price_update(&mut self, quote: bitmex_price_feed::Quote) -> Result<()> { + self.cfd_feed.set_current_price(quote); + + let mut conn = self.db.acquire().await?; + self.cfd_feed.update(&mut conn).await?; + Ok(()) + } } #[async_trait] @@ -494,6 +501,13 @@ impl Handler for Actor { } } +#[async_trait] +impl Handler for Actor { + async fn handle(&mut self, msg: PriceUpdate, _ctx: &mut Context) { + log_error!(self.handle_price_update(msg.0)) + } +} + impl Message for NewOrder { type Result = (); } @@ -519,4 +533,8 @@ impl Message for TakerStreamMessage { type Result = KeepRunning; } +impl Message for PriceUpdate { + type Result = (); +} + impl xtra::Actor for Actor {} diff --git a/daemon/src/routes_maker.rs b/daemon/src/routes_maker.rs index aa5470e..6529b20 100644 --- a/daemon/src/routes_maker.rs +++ b/daemon/src/routes_maker.rs @@ -1,10 +1,9 @@ use crate::auth::Authenticated; -use crate::bitmex_price_feed; -use crate::maker_cfd; -use crate::model::cfd::{Cfd, Order, OrderId, Origin}; +use crate::model::cfd::{Order, OrderId, Origin}; use crate::model::{Usd, WalletInfo}; use crate::routes::EmbeddedFileExt; use crate::to_sse_event::ToSseEvent; +use crate::{bitmex_price_feed, cfd_feed, maker_cfd}; use anyhow::Result; use rocket::http::{ContentType, Header, Status}; use rocket::response::stream::EventStream; @@ -21,7 +20,7 @@ use xtra::Address; #[rocket::get("/feed")] pub async fn maker_feed( - rx_cfds: &State>>, + rx_cfds: &State>>, rx_order: &State>>, rx_wallet: &State>, rx_quote: &State>, @@ -39,11 +38,11 @@ pub async fn maker_feed( let order = rx_order.borrow().clone(); yield order.to_sse_event(); - let cfds = rx_cfds.borrow().clone(); - yield cfds.to_sse_event(); let quote = rx_quote.borrow().clone(); yield quote.to_sse_event(); + let cfds = rx_cfds.borrow().clone(); + yield cfds.to_sse_event(); loop{ select! { @@ -58,6 +57,7 @@ pub async fn maker_feed( Ok(()) = rx_cfds.changed() => { let cfds = rx_cfds.borrow().clone(); yield cfds.to_sse_event(); + } Ok(()) = rx_quote.changed() => { let quote = rx_quote.borrow().clone(); yield quote.to_sse_event(); diff --git a/daemon/src/routes_taker.rs b/daemon/src/routes_taker.rs index 07f2c32..34ee977 100644 --- a/daemon/src/routes_taker.rs +++ b/daemon/src/routes_taker.rs @@ -1,9 +1,8 @@ -use crate::bitmex_price_feed; -use crate::model::cfd::{calculate_buy_margin, Cfd, Order, OrderId}; +use crate::model::cfd::{calculate_buy_margin, Order, OrderId}; use crate::model::{Leverage, Usd, WalletInfo}; use crate::routes::EmbeddedFileExt; -use crate::taker_cfd; use crate::to_sse_event::ToSseEvent; +use crate::{bitmex_price_feed, cfd_feed, taker_cfd}; use bdk::bitcoin::Amount; use rocket::http::{ContentType, Status}; use rocket::response::stream::EventStream; @@ -20,7 +19,7 @@ use xtra::Address; #[rocket::get("/feed")] pub async fn feed( - rx_cfds: &State>>, + rx_cfds: &State>>, rx_order: &State>>, rx_wallet: &State>, rx_quote: &State>, @@ -37,11 +36,11 @@ pub async fn feed( let order = rx_order.borrow().clone(); yield order.to_sse_event(); - let cfds = rx_cfds.borrow().clone(); - yield cfds.to_sse_event(); let quote = rx_quote.borrow().clone(); yield quote.to_sse_event(); + let cfds = rx_cfds.borrow().clone(); + yield cfds.to_sse_event(); loop{ select! { @@ -56,6 +55,7 @@ pub async fn feed( Ok(()) = rx_cfds.changed() => { let cfds = rx_cfds.borrow().clone(); yield cfds.to_sse_event(); + } Ok(()) = rx_quote.changed() => { let quote = rx_quote.borrow().clone(); yield quote.to_sse_event(); diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 99f511a..f011920 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -1,3 +1,4 @@ +use crate::cfd_feed::CfdFeed; use crate::model::WalletInfo; use crate::wallet::Wallet; use anyhow::{Context, Result}; @@ -5,7 +6,7 @@ use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1}; use bdk::bitcoin::Network; use clap::Clap; use futures::StreamExt; -use model::cfd::{Cfd, Order}; +use model::cfd::Order; use rocket::fairing::AdHoc; use rocket_db_pools::Database; use seed::Seed; @@ -23,6 +24,7 @@ use xtra::Actor; mod actors; mod bitmex_price_feed; +mod cfd_feed; mod db; mod keypair; mod logger; @@ -101,7 +103,7 @@ async fn main() -> Result<()> { let oracle = schnorrsig::KeyPair::new(SECP256K1, &mut rand::thread_rng()); // TODO: Fetch oracle public key from oracle. - let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::>(vec![]); + let (cfd_feed_updater, cfd_feed_receiver) = CfdFeed::new(cfd_feed::Role::Taker); let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::(wallet_info); @@ -118,7 +120,7 @@ async fn main() -> Result<()> { } }; - let (task, quote_updates) = bitmex_price_feed::new().await?; + let (task, mut quote_updates) = bitmex_price_feed::new().await?; tokio::spawn(task); let figment = rocket::Config::figment() @@ -129,7 +131,7 @@ async fn main() -> Result<()> { .manage(cfd_feed_receiver) .manage(order_feed_receiver) .manage(wallet_feed_receiver) - .manage(quote_updates) + .manage(quote_updates.clone()) .attach(Db::init()) .attach(AdHoc::try_on_ignite( "SQL migrations", @@ -161,7 +163,7 @@ async fn main() -> Result<()> { db, wallet.clone(), schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle), - cfd_feed_sender, + cfd_feed_updater, order_feed_sender, send_to_maker, monitor_actor_address, @@ -181,6 +183,27 @@ async fn main() -> Result<()> { cfd_actor_inbox.clone(), ))); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); + tokio::spawn({ + let cfd_actor_inbox = cfd_actor_inbox.clone(); + + async move { + loop { + let quote = quote_updates.borrow().clone(); + + if cfd_actor_inbox.do_send_async(taker_cfd::PriceUpdate(quote)).await.is_err() { + tracing::warn!( + "Could not communicate with the message handler, stopping price feed sync" + ); + return; + } + + if quote_updates.changed().await.is_err() { + tracing::warn!("BitMex price feed receiver not available, stopping price feed sync"); + return; + } + } + } + }); Ok(rocket.manage(cfd_actor_inbox)) }, diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 2ecbbff..dcac221 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -1,7 +1,8 @@ use crate::actors::log_error; +use crate::cfd_feed::CfdFeed; use crate::db::{ - insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds, - load_cfd_by_order_id, load_order_by_id, + insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_cfd_by_order_id, + load_order_by_id, }; use crate::model::cfd::{ Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId, Origin, @@ -10,7 +11,7 @@ use crate::model::Usd; use crate::monitor::{self, MonitorParams}; use crate::wallet::Wallet; use crate::wire::SetupMsg; -use crate::{send_to_socket, setup_contract, wire}; +use crate::{bitmex_price_feed, send_to_socket, setup_contract, wire}; use anyhow::{Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; @@ -35,6 +36,8 @@ pub struct CfdSetupCompleted { pub dlc: Result, } +pub struct PriceUpdate(pub bitmex_price_feed::Quote); + enum SetupState { Active { sender: mpsc::UnboundedSender, @@ -46,7 +49,7 @@ pub struct Actor { db: sqlx::SqlitePool, wallet: Wallet, oracle_pk: schnorrsig::PublicKey, - cfd_feed_actor_inbox: watch::Sender>, + cfd_feed: CfdFeed, order_feed_actor_inbox: watch::Sender>, send_to_maker: Address>, monitor_actor: Address>, @@ -58,19 +61,21 @@ impl Actor { db: sqlx::SqlitePool, wallet: Wallet, oracle_pk: schnorrsig::PublicKey, - cfd_feed_actor_inbox: watch::Sender>, + cfd_feed: CfdFeed, order_feed_actor_inbox: watch::Sender>, send_to_maker: Address>, monitor_actor: Address>, ) -> Result { let mut conn = db.acquire().await?; - cfd_feed_actor_inbox.send(load_all_cfds(&mut conn).await?)?; + + // populate the CFD feed with existing CFDs + cfd_feed.update(&mut conn).await?; Ok(Self { db, wallet, oracle_pk, - cfd_feed_actor_inbox, + cfd_feed, order_feed_actor_inbox, send_to_maker, monitor_actor, @@ -97,8 +102,7 @@ impl Actor { insert_cfd(cfd, &mut conn).await?; - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; + self.cfd_feed.update(&mut conn).await?; self.send_to_maker .do_send_async(wire::TakerToMaker::TakeOrder { order_id, quantity }) .await?; @@ -147,8 +151,7 @@ impl Actor { ) .await?; - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; + self.cfd_feed.update(&mut conn).await?; let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; let contract_future = setup_contract::new( @@ -194,8 +197,7 @@ impl Actor { ) .await?; - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; + self.cfd_feed.update(&mut conn).await?; Ok(()) } @@ -235,8 +237,7 @@ impl Actor { ) .await?; - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; + self.cfd_feed.update(&mut conn).await?; let txid = self .wallet @@ -298,6 +299,14 @@ impl Actor { Ok(()) } + + async fn handle_price_update(&mut self, quote: bitmex_price_feed::Quote) -> Result<()> { + self.cfd_feed.set_current_price(quote); + + let mut conn = self.db.acquire().await?; + self.cfd_feed.update(&mut conn).await?; + Ok(()) + } } #[async_trait] @@ -356,6 +365,13 @@ impl Handler for Actor { } } +#[async_trait] +impl Handler for Actor { + async fn handle(&mut self, msg: PriceUpdate, _ctx: &mut Context) { + log_error!(self.handle_price_update(msg.0)) + } +} + impl Message for TakeOffer { type Result = (); } @@ -369,4 +385,8 @@ impl Message for CfdSetupCompleted { type Result = (); } +impl Message for PriceUpdate { + type Result = (); +} + impl xtra::Actor for Actor {} diff --git a/daemon/src/to_sse_event.rs b/daemon/src/to_sse_event.rs index 5716d8a..8d340cd 100644 --- a/daemon/src/to_sse_event.rs +++ b/daemon/src/to_sse_event.rs @@ -1,34 +1,11 @@ use crate::model::cfd::OrderId; use crate::model::{Leverage, Position, TradingPair, Usd}; -use crate::{bitmex_price_feed, model}; +use crate::{bitmex_price_feed, cfd_feed, model}; use bdk::bitcoin::Amount; use rocket::response::stream::Event; use serde::Serialize; use std::time::{SystemTime, UNIX_EPOCH}; -#[derive(Debug, Clone, Serialize)] -pub struct Cfd { - pub order_id: OrderId, - pub initial_price: Usd, - - pub leverage: Leverage, - pub trading_pair: TradingPair, - pub position: Position, - pub liquidation_price: Usd, - - pub quantity_usd: Usd, - - #[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")] - pub margin: Amount, - - #[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")] - pub profit_btc: Amount, - pub profit_usd: Usd, - - pub state: String, - pub state_transition_timestamp: u64, -} - #[derive(Debug, Clone, Serialize)] pub struct CfdOrder { pub id: OrderId, @@ -52,42 +29,9 @@ pub trait ToSseEvent { fn to_sse_event(&self) -> Event; } -impl ToSseEvent for Vec { - // TODO: This conversion can fail, we might want to change the API +impl ToSseEvent for Vec { fn to_sse_event(&self) -> Event { - let cfds = self - .iter() - .map(|cfd| { - // TODO: Get the actual current price here - let current_price = Usd::ZERO; - let (profit_btc, profit_usd) = cfd.profit(current_price).unwrap(); - - Cfd { - order_id: cfd.order.id, - initial_price: cfd.order.price, - leverage: cfd.order.leverage, - trading_pair: cfd.order.trading_pair.clone(), - position: cfd.position(), - liquidation_price: cfd.order.liquidation_price, - quantity_usd: cfd.quantity_usd, - profit_btc, - profit_usd, - state: cfd.state.to_string(), - state_transition_timestamp: cfd - .state - .get_transition_timestamp() - .duration_since(UNIX_EPOCH) - .expect("timestamp to be convertable to duration since epoch") - .as_secs(), - - // TODO: Depending on the state the margin might be set (i.e. in Open we save it - // in the DB internally) and does not have to be calculated - margin: cfd.margin().unwrap(), - } - }) - .collect::>(); - - Event::json(&cfds).event("cfds") + Event::json(&self).event("cfds") } }