Browse Source

Merge pull request #154 from comit-network/price-feed

Display price feed in the UI and use it for profit margin calculations in the CFDs.
fix-olivia-event-id
Mariusz 3 years ago
committed by GitHub
parent
commit
5e74e9fd86
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 23
      daemon/src/bitmex_price_feed.rs
  2. 118
      daemon/src/cfd_feed.rs
  3. 43
      daemon/src/maker.rs
  4. 48
      daemon/src/maker_cfd.rs
  5. 15
      daemon/src/routes_maker.rs
  6. 15
      daemon/src/routes_taker.rs
  7. 42
      daemon/src/taker.rs
  8. 50
      daemon/src/taker_cfd.rs
  9. 95
      daemon/src/to_sse_event.rs
  10. 9
      frontend/src/MakerApp.tsx
  11. 5
      frontend/src/TakerApp.tsx
  12. 41
      frontend/src/components/CurrentPrice.tsx
  13. 26
      frontend/src/components/Timestamp.tsx
  14. 6
      frontend/src/components/Types.tsx
  15. 14
      frontend/src/components/Wallet.tsx

23
daemon/src/bitmex_price_feed.rs

@ -1,7 +1,8 @@
use crate::model::Usd; use crate::model::Usd;
use anyhow::Result; use anyhow::{Context, Result};
use futures::{StreamExt, TryStreamExt}; use futures::{StreamExt, TryStreamExt};
use rust_decimal::Decimal; use rust_decimal::Decimal;
use rust_decimal_macros::dec;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::future::Future; use std::future::Future;
use std::time::SystemTime; use std::time::SystemTime;
@ -40,7 +41,7 @@ pub async fn new() -> Result<(impl Future<Output = ()>, watch::Receiver<Quote>)>
Ok((task, receiver)) Ok((task, receiver))
} }
#[derive(Debug)] #[derive(Clone, Debug)]
pub struct Quote { pub struct Quote {
pub timestamp: SystemTime, pub timestamp: SystemTime,
pub bid: Usd, pub bid: Usd,
@ -67,6 +68,24 @@ impl Quote {
ask: Usd::from(Decimal::try_from(quote.ask_price)?), ask: Usd::from(Decimal::try_from(quote.ask_price)?),
})) }))
} }
#[allow(dead_code)] // Not used by all binaries.
pub fn for_maker(&self) -> Usd {
self.ask
}
#[allow(dead_code)] // Not used by all binaries.
pub fn for_taker(&self) -> Usd {
// TODO: Verify whether this is correct
self.mid_range().unwrap()
}
fn mid_range(&self) -> Result<Usd> {
Ok(Usd((self.bid.checked_add(self.ask))?
.0
.checked_div(dec!(2))
.context("division error")?))
}
} }
mod wire { mod wire {

118
daemon/src/cfd_feed.rs

@ -0,0 +1,118 @@
use crate::db::load_all_cfds;
use crate::model::cfd::OrderId;
use crate::model::{Leverage, Position, TradingPair, Usd};
use crate::{bitmex_price_feed, model};
use anyhow::{Context as _, Result};
use bdk::bitcoin::Amount;
use serde::Serialize;
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use std::time::UNIX_EPOCH;
use tokio::sync::watch;
#[allow(dead_code)]
/// Role of the actor's owner in the upcoming contract
pub enum Role {
Maker,
Taker,
}
#[derive(Debug, Clone, Serialize)]
pub struct Cfd {
pub order_id: OrderId,
pub initial_price: Usd,
pub leverage: Leverage,
pub trading_pair: TradingPair,
pub position: Position,
pub liquidation_price: Usd,
pub quantity_usd: Usd,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
pub margin: Amount,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
pub profit_btc: Amount,
pub profit_usd: Usd,
pub state: String,
pub state_transition_timestamp: u64,
}
pub struct CfdFeed {
role: Role,
cfd_feed_sender: watch::Sender<Vec<Cfd>>,
current_price: Option<bitmex_price_feed::Quote>,
}
impl CfdFeed {
pub fn new(role: Role) -> (Self, watch::Receiver<Vec<Cfd>>) {
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::<Vec<Cfd>>(vec![]);
(
Self {
role,
cfd_feed_sender,
current_price: None,
},
cfd_feed_receiver,
)
}
/// Update price from BitMex. It is recommended to call `update()` afterwards.
pub fn set_current_price(&mut self, quote: bitmex_price_feed::Quote) {
self.current_price = Some(quote);
}
/// Updates the CFD feed in the frontend
pub async fn update(&self, conn: &mut PoolConnection<Sqlite>) -> Result<()> {
let cfds = load_all_cfds(conn).await?;
let cfds = cfds
.iter()
.map(|cfd| {
let (profit_btc, profit_usd) = self.calculate_profit(cfd);
Cfd {
order_id: cfd.order.id,
initial_price: cfd.order.price,
leverage: cfd.order.leverage,
trading_pair: cfd.order.trading_pair.clone(),
position: cfd.position(),
liquidation_price: cfd.order.liquidation_price,
quantity_usd: cfd.quantity_usd,
profit_btc,
profit_usd,
state: cfd.state.to_string(),
state_transition_timestamp: cfd
.state
.get_transition_timestamp()
.duration_since(UNIX_EPOCH)
.expect("timestamp to be convertable to duration since epoch")
.as_secs(),
// TODO: Depending on the state the margin might be set (i.e. in Open we save it
// in the DB internally) and does not have to be calculated
margin: cfd.margin().unwrap(),
}
})
.collect::<Vec<Cfd>>();
self.cfd_feed_sender
.send(cfds)
.context("Could not update CFD feed")
}
fn calculate_profit(&self, cfd: &model::cfd::Cfd) -> (Amount, Usd) {
if let Some(quote) = &self.current_price {
cfd.profit(match self.role {
Role::Taker => quote.for_taker(),
Role::Maker => quote.for_maker(),
})
.unwrap()
} else {
// No current price set yet, returning no profits
(Amount::ZERO, Usd::ZERO)
}
}
}

43
daemon/src/maker.rs

@ -1,11 +1,12 @@
use crate::auth::MAKER_USERNAME; use crate::auth::MAKER_USERNAME;
use crate::cfd_feed::CfdFeed;
use crate::seed::Seed; use crate::seed::Seed;
use crate::wallet::Wallet; use crate::wallet::Wallet;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1}; use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1};
use bdk::bitcoin::Network; use bdk::bitcoin::Network;
use clap::Clap; use clap::Clap;
use model::cfd::{Cfd, Order}; use model::cfd::Order;
use model::WalletInfo; use model::WalletInfo;
use rocket::fairing::AdHoc; use rocket::fairing::AdHoc;
use rocket_db_pools::Database; use rocket_db_pools::Database;
@ -20,6 +21,7 @@ use xtra::spawn::TokioGlobalSpawnExt;
mod actors; mod actors;
mod auth; mod auth;
mod bitmex_price_feed; mod bitmex_price_feed;
mod cfd_feed;
mod db; mod db;
mod keypair; mod keypair;
mod logger; mod logger;
@ -105,7 +107,7 @@ async fn main() -> Result<()> {
let oracle = schnorrsig::KeyPair::new(SECP256K1, &mut rand::thread_rng()); // TODO: Fetch oracle public key from oracle. let oracle = schnorrsig::KeyPair::new(SECP256K1, &mut rand::thread_rng()); // TODO: Fetch oracle public key from oracle.
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::<Vec<Cfd>>(vec![]); let (cfd_feed_updater, cfd_feed_receiver) = CfdFeed::new(cfd_feed::Role::Maker);
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None); let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info); let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info);
@ -121,23 +123,12 @@ async fn main() -> Result<()> {
let (task, mut quote_updates) = bitmex_price_feed::new().await?; let (task, mut quote_updates) = bitmex_price_feed::new().await?;
tokio::spawn(task); tokio::spawn(task);
// dummy usage of quote receiver
tokio::spawn(async move {
loop {
let bitmex_price_feed::Quote { bid, ask, .. } = *quote_updates.borrow();
tracing::info!(%bid, %ask, "BitMex quote updated");
if quote_updates.changed().await.is_err() {
return;
}
}
});
rocket::custom(figment) rocket::custom(figment)
.manage(cfd_feed_receiver) .manage(cfd_feed_receiver)
.manage(order_feed_receiver) .manage(order_feed_receiver)
.manage(wallet_feed_receiver) .manage(wallet_feed_receiver)
.manage(auth_password) .manage(auth_password)
.manage(quote_updates.clone())
.attach(Db::init()) .attach(Db::init())
.attach(AdHoc::try_on_ignite( .attach(AdHoc::try_on_ignite(
"SQL migrations", "SQL migrations",
@ -168,7 +159,7 @@ async fn main() -> Result<()> {
db, db,
wallet.clone(), wallet.clone(),
schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle), schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle),
cfd_feed_sender, cfd_feed_updater,
order_feed_sender, order_feed_sender,
maker_inc_connections_address.clone(), maker_inc_connections_address.clone(),
monitor_actor_address, monitor_actor_address,
@ -204,6 +195,28 @@ async fn main() -> Result<()> {
}); });
tokio::spawn(maker_inc_connections_address.attach_stream(listener_stream)); tokio::spawn(maker_inc_connections_address.attach_stream(listener_stream));
tokio::spawn({
let cfd_actor_inbox = cfd_maker_actor_inbox.clone();
async move {
loop {
let quote = quote_updates.borrow().clone();
if cfd_actor_inbox.do_send_async(maker_cfd::PriceUpdate(quote)).await.is_err() {
tracing::warn!(
"Could not communicate with the message handler, stopping price feed sync"
);
return;
}
if quote_updates.changed().await.is_err() {
tracing::warn!("BitMex price feed receiver not available, stopping price feed sync");
return;
}
}
}
});
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
Ok(rocket.manage(cfd_maker_actor_inbox)) Ok(rocket.manage(cfd_maker_actor_inbox))

48
daemon/src/maker_cfd.rs

@ -1,14 +1,15 @@
use crate::actors::log_error; use crate::actors::log_error;
use crate::cfd_feed::CfdFeed;
use crate::db::{ use crate::db::{
insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds, insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_cfd_by_order_id,
load_cfd_by_order_id, load_order_by_id, load_order_by_id,
}; };
use crate::maker_inc_connections::TakerCommand; use crate::maker_inc_connections::TakerCommand;
use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId}; use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId};
use crate::model::{TakerId, Usd}; use crate::model::{TakerId, Usd};
use crate::monitor::MonitorParams; use crate::monitor::MonitorParams;
use crate::wallet::Wallet; use crate::wallet::Wallet;
use crate::{maker_inc_connections, monitor, setup_contract, wire}; use crate::{bitmex_price_feed, maker_inc_connections, monitor, setup_contract, wire};
use anyhow::{Context as _, Result}; use anyhow::{Context as _, Result};
use async_trait::async_trait; use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::secp256k1::schnorrsig;
@ -43,11 +44,13 @@ pub struct TakerStreamMessage {
pub item: Result<wire::TakerToMaker>, pub item: Result<wire::TakerToMaker>,
} }
pub struct PriceUpdate(pub bitmex_price_feed::Quote);
pub struct Actor { pub struct Actor {
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Wallet,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed: CfdFeed,
order_feed_sender: watch::Sender<Option<Order>>, order_feed_sender: watch::Sender<Option<Order>>,
takers: Address<maker_inc_connections::Actor>, takers: Address<maker_inc_connections::Actor>,
current_order_id: Option<OrderId>, current_order_id: Option<OrderId>,
@ -68,7 +71,7 @@ impl Actor {
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Wallet,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed: CfdFeed,
order_feed_sender: watch::Sender<Option<Order>>, order_feed_sender: watch::Sender<Option<Order>>,
takers: Address<maker_inc_connections::Actor>, takers: Address<maker_inc_connections::Actor>,
monitor_actor: Address<monitor::Actor<Actor>>, monitor_actor: Address<monitor::Actor<Actor>>,
@ -76,13 +79,13 @@ impl Actor {
let mut conn = db.acquire().await?; let mut conn = db.acquire().await?;
// populate the CFD feed with existing CFDs // populate the CFD feed with existing CFDs
cfd_feed_actor_inbox.send(load_all_cfds(&mut conn).await?)?; cfd_feed.update(&mut conn).await?;
Ok(Self { Ok(Self {
db, db,
wallet, wallet,
oracle_pk, oracle_pk,
cfd_feed_actor_inbox, cfd_feed,
order_feed_sender, order_feed_sender,
takers, takers,
current_order_id: None, current_order_id: None,
@ -170,8 +173,7 @@ impl Actor {
) )
.await?; .await?;
self.cfd_feed_actor_inbox self.cfd_feed.update(&mut conn).await?;
.send(load_all_cfds(&mut conn).await?)?;
let txid = self let txid = self
.wallet .wallet
@ -248,8 +250,7 @@ impl Actor {
); );
insert_cfd(cfd, &mut conn).await?; insert_cfd(cfd, &mut conn).await?;
self.cfd_feed_actor_inbox self.cfd_feed.update(&mut conn).await?;
.send(load_all_cfds(&mut conn).await?)?;
// 3. Remove current order // 3. Remove current order
self.current_order_id = None; self.current_order_id = None;
@ -309,8 +310,7 @@ impl Actor {
}) })
.await?; .await?;
self.cfd_feed_actor_inbox self.cfd_feed.update(&mut conn).await?;
.send(load_all_cfds(&mut conn).await?)?;
let contract_future = setup_contract::new( let contract_future = setup_contract::new(
self.takers.clone().into_sink().with(move |msg| { self.takers.clone().into_sink().with(move |msg| {
@ -380,8 +380,7 @@ impl Actor {
command: TakerCommand::NotifyOrderRejected { id: msg.order_id }, command: TakerCommand::NotifyOrderRejected { id: msg.order_id },
}) })
.await?; .await?;
self.cfd_feed_actor_inbox self.cfd_feed.update(&mut conn).await?;
.send(load_all_cfds(&mut conn).await?)?;
// Remove order for all // Remove order for all
self.current_order_id = None; self.current_order_id = None;
@ -418,6 +417,14 @@ impl Actor {
Ok(()) Ok(())
} }
async fn handle_price_update(&mut self, quote: bitmex_price_feed::Quote) -> Result<()> {
self.cfd_feed.set_current_price(quote);
let mut conn = self.db.acquire().await?;
self.cfd_feed.update(&mut conn).await?;
Ok(())
}
} }
#[async_trait] #[async_trait]
@ -494,6 +501,13 @@ impl Handler<TakerStreamMessage> for Actor {
} }
} }
#[async_trait]
impl Handler<PriceUpdate> for Actor {
async fn handle(&mut self, msg: PriceUpdate, _ctx: &mut Context<Self>) {
log_error!(self.handle_price_update(msg.0))
}
}
impl Message for NewOrder { impl Message for NewOrder {
type Result = (); type Result = ();
} }
@ -519,4 +533,8 @@ impl Message for TakerStreamMessage {
type Result = KeepRunning; type Result = KeepRunning;
} }
impl Message for PriceUpdate {
type Result = ();
}
impl xtra::Actor for Actor {} impl xtra::Actor for Actor {}

15
daemon/src/routes_maker.rs

@ -1,9 +1,9 @@
use crate::auth::Authenticated; use crate::auth::Authenticated;
use crate::maker_cfd; use crate::model::cfd::{Order, OrderId, Origin};
use crate::model::cfd::{Cfd, Order, OrderId, Origin};
use crate::model::{Usd, WalletInfo}; use crate::model::{Usd, WalletInfo};
use crate::routes::EmbeddedFileExt; use crate::routes::EmbeddedFileExt;
use crate::to_sse_event::ToSseEvent; use crate::to_sse_event::ToSseEvent;
use crate::{bitmex_price_feed, cfd_feed, maker_cfd};
use anyhow::Result; use anyhow::Result;
use rocket::http::{ContentType, Header, Status}; use rocket::http::{ContentType, Header, Status};
use rocket::response::stream::EventStream; use rocket::response::stream::EventStream;
@ -20,14 +20,16 @@ use xtra::Address;
#[rocket::get("/feed")] #[rocket::get("/feed")]
pub async fn maker_feed( pub async fn maker_feed(
rx_cfds: &State<watch::Receiver<Vec<Cfd>>>, rx_cfds: &State<watch::Receiver<Vec<cfd_feed::Cfd>>>,
rx_order: &State<watch::Receiver<Option<Order>>>, rx_order: &State<watch::Receiver<Option<Order>>>,
rx_wallet: &State<watch::Receiver<WalletInfo>>, rx_wallet: &State<watch::Receiver<WalletInfo>>,
rx_quote: &State<watch::Receiver<bitmex_price_feed::Quote>>,
_auth: Authenticated, _auth: Authenticated,
) -> EventStream![] { ) -> EventStream![] {
let mut rx_cfds = rx_cfds.inner().clone(); let mut rx_cfds = rx_cfds.inner().clone();
let mut rx_order = rx_order.inner().clone(); let mut rx_order = rx_order.inner().clone();
let mut rx_wallet = rx_wallet.inner().clone(); let mut rx_wallet = rx_wallet.inner().clone();
let mut rx_quote = rx_quote.inner().clone();
EventStream! { EventStream! {
let wallet_info = rx_wallet.borrow().clone(); let wallet_info = rx_wallet.borrow().clone();
@ -36,6 +38,9 @@ pub async fn maker_feed(
let order = rx_order.borrow().clone(); let order = rx_order.borrow().clone();
yield order.to_sse_event(); yield order.to_sse_event();
let quote = rx_quote.borrow().clone();
yield quote.to_sse_event();
let cfds = rx_cfds.borrow().clone(); let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event(); yield cfds.to_sse_event();
@ -53,6 +58,10 @@ pub async fn maker_feed(
let cfds = rx_cfds.borrow().clone(); let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event(); yield cfds.to_sse_event();
} }
Ok(()) = rx_quote.changed() => {
let quote = rx_quote.borrow().clone();
yield quote.to_sse_event();
}
} }
} }
} }

15
daemon/src/routes_taker.rs

@ -1,8 +1,8 @@
use crate::model::cfd::{calculate_buy_margin, Cfd, Order, OrderId}; use crate::model::cfd::{calculate_buy_margin, Order, OrderId};
use crate::model::{Leverage, Usd, WalletInfo}; use crate::model::{Leverage, Usd, WalletInfo};
use crate::routes::EmbeddedFileExt; use crate::routes::EmbeddedFileExt;
use crate::taker_cfd;
use crate::to_sse_event::ToSseEvent; use crate::to_sse_event::ToSseEvent;
use crate::{bitmex_price_feed, cfd_feed, taker_cfd};
use bdk::bitcoin::Amount; use bdk::bitcoin::Amount;
use rocket::http::{ContentType, Status}; use rocket::http::{ContentType, Status};
use rocket::response::stream::EventStream; use rocket::response::stream::EventStream;
@ -19,13 +19,15 @@ use xtra::Address;
#[rocket::get("/feed")] #[rocket::get("/feed")]
pub async fn feed( pub async fn feed(
rx_cfds: &State<watch::Receiver<Vec<Cfd>>>, rx_cfds: &State<watch::Receiver<Vec<cfd_feed::Cfd>>>,
rx_order: &State<watch::Receiver<Option<Order>>>, rx_order: &State<watch::Receiver<Option<Order>>>,
rx_wallet: &State<watch::Receiver<WalletInfo>>, rx_wallet: &State<watch::Receiver<WalletInfo>>,
rx_quote: &State<watch::Receiver<bitmex_price_feed::Quote>>,
) -> EventStream![] { ) -> EventStream![] {
let mut rx_cfds = rx_cfds.inner().clone(); let mut rx_cfds = rx_cfds.inner().clone();
let mut rx_order = rx_order.inner().clone(); let mut rx_order = rx_order.inner().clone();
let mut rx_wallet = rx_wallet.inner().clone(); let mut rx_wallet = rx_wallet.inner().clone();
let mut rx_quote = rx_quote.inner().clone();
EventStream! { EventStream! {
let wallet_info = rx_wallet.borrow().clone(); let wallet_info = rx_wallet.borrow().clone();
@ -34,6 +36,9 @@ pub async fn feed(
let order = rx_order.borrow().clone(); let order = rx_order.borrow().clone();
yield order.to_sse_event(); yield order.to_sse_event();
let quote = rx_quote.borrow().clone();
yield quote.to_sse_event();
let cfds = rx_cfds.borrow().clone(); let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event(); yield cfds.to_sse_event();
@ -51,6 +56,10 @@ pub async fn feed(
let cfds = rx_cfds.borrow().clone(); let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event(); yield cfds.to_sse_event();
} }
Ok(()) = rx_quote.changed() => {
let quote = rx_quote.borrow().clone();
yield quote.to_sse_event();
}
} }
} }
} }

42
daemon/src/taker.rs

@ -1,3 +1,4 @@
use crate::cfd_feed::CfdFeed;
use crate::model::WalletInfo; use crate::model::WalletInfo;
use crate::wallet::Wallet; use crate::wallet::Wallet;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
@ -5,7 +6,7 @@ use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1};
use bdk::bitcoin::Network; use bdk::bitcoin::Network;
use clap::Clap; use clap::Clap;
use futures::StreamExt; use futures::StreamExt;
use model::cfd::{Cfd, Order}; use model::cfd::Order;
use rocket::fairing::AdHoc; use rocket::fairing::AdHoc;
use rocket_db_pools::Database; use rocket_db_pools::Database;
use seed::Seed; use seed::Seed;
@ -23,6 +24,7 @@ use xtra::Actor;
mod actors; mod actors;
mod bitmex_price_feed; mod bitmex_price_feed;
mod cfd_feed;
mod db; mod db;
mod keypair; mod keypair;
mod logger; mod logger;
@ -101,7 +103,7 @@ async fn main() -> Result<()> {
let oracle = schnorrsig::KeyPair::new(SECP256K1, &mut rand::thread_rng()); // TODO: Fetch oracle public key from oracle. let oracle = schnorrsig::KeyPair::new(SECP256K1, &mut rand::thread_rng()); // TODO: Fetch oracle public key from oracle.
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::<Vec<Cfd>>(vec![]); let (cfd_feed_updater, cfd_feed_receiver) = CfdFeed::new(cfd_feed::Role::Taker);
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None); let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info); let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info);
@ -121,18 +123,6 @@ async fn main() -> Result<()> {
let (task, mut quote_updates) = bitmex_price_feed::new().await?; let (task, mut quote_updates) = bitmex_price_feed::new().await?;
tokio::spawn(task); tokio::spawn(task);
// dummy usage of quote receiver
tokio::spawn(async move {
loop {
let bitmex_price_feed::Quote { bid, ask, .. } = *quote_updates.borrow();
tracing::info!(%bid, %ask, "BitMex quote updated");
if quote_updates.changed().await.is_err() {
return;
}
}
});
let figment = rocket::Config::figment() let figment = rocket::Config::figment()
.merge(("databases.taker.url", data_dir.join("taker.sqlite"))) .merge(("databases.taker.url", data_dir.join("taker.sqlite")))
.merge(("port", opts.http_port)); .merge(("port", opts.http_port));
@ -141,6 +131,7 @@ async fn main() -> Result<()> {
.manage(cfd_feed_receiver) .manage(cfd_feed_receiver)
.manage(order_feed_receiver) .manage(order_feed_receiver)
.manage(wallet_feed_receiver) .manage(wallet_feed_receiver)
.manage(quote_updates.clone())
.attach(Db::init()) .attach(Db::init())
.attach(AdHoc::try_on_ignite( .attach(AdHoc::try_on_ignite(
"SQL migrations", "SQL migrations",
@ -172,7 +163,7 @@ async fn main() -> Result<()> {
db, db,
wallet.clone(), wallet.clone(),
schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle), schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle),
cfd_feed_sender, cfd_feed_updater,
order_feed_sender, order_feed_sender,
send_to_maker, send_to_maker,
monitor_actor_address, monitor_actor_address,
@ -192,6 +183,27 @@ async fn main() -> Result<()> {
cfd_actor_inbox.clone(), cfd_actor_inbox.clone(),
))); )));
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
tokio::spawn({
let cfd_actor_inbox = cfd_actor_inbox.clone();
async move {
loop {
let quote = quote_updates.borrow().clone();
if cfd_actor_inbox.do_send_async(taker_cfd::PriceUpdate(quote)).await.is_err() {
tracing::warn!(
"Could not communicate with the message handler, stopping price feed sync"
);
return;
}
if quote_updates.changed().await.is_err() {
tracing::warn!("BitMex price feed receiver not available, stopping price feed sync");
return;
}
}
}
});
Ok(rocket.manage(cfd_actor_inbox)) Ok(rocket.manage(cfd_actor_inbox))
}, },

50
daemon/src/taker_cfd.rs

@ -1,7 +1,8 @@
use crate::actors::log_error; use crate::actors::log_error;
use crate::cfd_feed::CfdFeed;
use crate::db::{ use crate::db::{
insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds, insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_cfd_by_order_id,
load_cfd_by_order_id, load_order_by_id, load_order_by_id,
}; };
use crate::model::cfd::{ use crate::model::cfd::{
Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId, Origin, Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId, Origin,
@ -10,7 +11,7 @@ use crate::model::Usd;
use crate::monitor::{self, MonitorParams}; use crate::monitor::{self, MonitorParams};
use crate::wallet::Wallet; use crate::wallet::Wallet;
use crate::wire::SetupMsg; use crate::wire::SetupMsg;
use crate::{send_to_socket, setup_contract, wire}; use crate::{bitmex_price_feed, send_to_socket, setup_contract, wire};
use anyhow::{Context as _, Result}; use anyhow::{Context as _, Result};
use async_trait::async_trait; use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::secp256k1::schnorrsig;
@ -35,6 +36,8 @@ pub struct CfdSetupCompleted {
pub dlc: Result<Dlc>, pub dlc: Result<Dlc>,
} }
pub struct PriceUpdate(pub bitmex_price_feed::Quote);
enum SetupState { enum SetupState {
Active { Active {
sender: mpsc::UnboundedSender<wire::SetupMsg>, sender: mpsc::UnboundedSender<wire::SetupMsg>,
@ -46,7 +49,7 @@ pub struct Actor {
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Wallet,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed: CfdFeed,
order_feed_actor_inbox: watch::Sender<Option<Order>>, order_feed_actor_inbox: watch::Sender<Option<Order>>,
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>, send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>,
monitor_actor: Address<monitor::Actor<Actor>>, monitor_actor: Address<monitor::Actor<Actor>>,
@ -58,19 +61,21 @@ impl Actor {
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Wallet,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed: CfdFeed,
order_feed_actor_inbox: watch::Sender<Option<Order>>, order_feed_actor_inbox: watch::Sender<Option<Order>>,
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>, send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>,
monitor_actor: Address<monitor::Actor<Actor>>, monitor_actor: Address<monitor::Actor<Actor>>,
) -> Result<Self> { ) -> Result<Self> {
let mut conn = db.acquire().await?; let mut conn = db.acquire().await?;
cfd_feed_actor_inbox.send(load_all_cfds(&mut conn).await?)?;
// populate the CFD feed with existing CFDs
cfd_feed.update(&mut conn).await?;
Ok(Self { Ok(Self {
db, db,
wallet, wallet,
oracle_pk, oracle_pk,
cfd_feed_actor_inbox, cfd_feed,
order_feed_actor_inbox, order_feed_actor_inbox,
send_to_maker, send_to_maker,
monitor_actor, monitor_actor,
@ -97,8 +102,7 @@ impl Actor {
insert_cfd(cfd, &mut conn).await?; insert_cfd(cfd, &mut conn).await?;
self.cfd_feed_actor_inbox self.cfd_feed.update(&mut conn).await?;
.send(load_all_cfds(&mut conn).await?)?;
self.send_to_maker self.send_to_maker
.do_send_async(wire::TakerToMaker::TakeOrder { order_id, quantity }) .do_send_async(wire::TakerToMaker::TakeOrder { order_id, quantity })
.await?; .await?;
@ -147,8 +151,7 @@ impl Actor {
) )
.await?; .await?;
self.cfd_feed_actor_inbox self.cfd_feed.update(&mut conn).await?;
.send(load_all_cfds(&mut conn).await?)?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let contract_future = setup_contract::new( let contract_future = setup_contract::new(
@ -194,8 +197,7 @@ impl Actor {
) )
.await?; .await?;
self.cfd_feed_actor_inbox self.cfd_feed.update(&mut conn).await?;
.send(load_all_cfds(&mut conn).await?)?;
Ok(()) Ok(())
} }
@ -235,8 +237,7 @@ impl Actor {
) )
.await?; .await?;
self.cfd_feed_actor_inbox self.cfd_feed.update(&mut conn).await?;
.send(load_all_cfds(&mut conn).await?)?;
let txid = self let txid = self
.wallet .wallet
@ -298,6 +299,14 @@ impl Actor {
Ok(()) Ok(())
} }
async fn handle_price_update(&mut self, quote: bitmex_price_feed::Quote) -> Result<()> {
self.cfd_feed.set_current_price(quote);
let mut conn = self.db.acquire().await?;
self.cfd_feed.update(&mut conn).await?;
Ok(())
}
} }
#[async_trait] #[async_trait]
@ -356,6 +365,13 @@ impl Handler<monitor::Event> for Actor {
} }
} }
#[async_trait]
impl Handler<PriceUpdate> for Actor {
async fn handle(&mut self, msg: PriceUpdate, _ctx: &mut Context<Self>) {
log_error!(self.handle_price_update(msg.0))
}
}
impl Message for TakeOffer { impl Message for TakeOffer {
type Result = (); type Result = ();
} }
@ -369,4 +385,8 @@ impl Message for CfdSetupCompleted {
type Result = (); type Result = ();
} }
impl Message for PriceUpdate {
type Result = ();
}
impl xtra::Actor for Actor {} impl xtra::Actor for Actor {}

95
daemon/src/to_sse_event.rs

@ -1,33 +1,10 @@
use crate::model;
use crate::model::cfd::OrderId; use crate::model::cfd::OrderId;
use crate::model::{Leverage, Position, TradingPair, Usd}; use crate::model::{Leverage, Position, TradingPair, Usd};
use crate::{bitmex_price_feed, cfd_feed, model};
use bdk::bitcoin::Amount; use bdk::bitcoin::Amount;
use rocket::response::stream::Event; use rocket::response::stream::Event;
use serde::Serialize; use serde::Serialize;
use std::time::UNIX_EPOCH; use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Serialize)]
pub struct Cfd {
pub order_id: OrderId,
pub initial_price: Usd,
pub leverage: Leverage,
pub trading_pair: TradingPair,
pub position: Position,
pub liquidation_price: Usd,
pub quantity_usd: Usd,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
pub margin: Amount,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
pub profit_btc: Amount,
pub profit_usd: Usd,
pub state: String,
pub state_transition_timestamp: u64,
}
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct CfdOrder { pub struct CfdOrder {
@ -52,42 +29,9 @@ pub trait ToSseEvent {
fn to_sse_event(&self) -> Event; fn to_sse_event(&self) -> Event;
} }
impl ToSseEvent for Vec<model::cfd::Cfd> { impl ToSseEvent for Vec<cfd_feed::Cfd> {
// TODO: This conversion can fail, we might want to change the API
fn to_sse_event(&self) -> Event { fn to_sse_event(&self) -> Event {
let cfds = self Event::json(&self).event("cfds")
.iter()
.map(|cfd| {
// TODO: Get the actual current price here
let current_price = Usd::ZERO;
let (profit_btc, profit_usd) = cfd.profit(current_price).unwrap();
Cfd {
order_id: cfd.order.id,
initial_price: cfd.order.price,
leverage: cfd.order.leverage,
trading_pair: cfd.order.trading_pair.clone(),
position: cfd.position(),
liquidation_price: cfd.order.liquidation_price,
quantity_usd: cfd.quantity_usd,
profit_btc,
profit_usd,
state: cfd.state.to_string(),
state_transition_timestamp: cfd
.state
.get_transition_timestamp()
.duration_since(UNIX_EPOCH)
.expect("timestamp to be convertable to duration since epoch")
.as_secs(),
// TODO: Depending on the state the margin might be set (i.e. in Open we save it
// in the DB internally) and does not have to be calculated
margin: cfd.margin().unwrap(),
}
})
.collect::<Vec<Cfd>>();
Event::json(&cfds).event("cfds")
} }
} }
@ -127,13 +71,34 @@ impl ToSseEvent for model::WalletInfo {
let wallet_info = WalletInfo { let wallet_info = WalletInfo {
balance: self.balance, balance: self.balance,
address: self.address.to_string(), address: self.address.to_string(),
last_updated_at: self last_updated_at: into_unix_secs(self.last_updated_at),
.last_updated_at
.duration_since(UNIX_EPOCH)
.expect("timestamp to be convertible to duration since epoch")
.as_secs(),
}; };
Event::json(&wallet_info).event("wallet") Event::json(&wallet_info).event("wallet")
} }
} }
#[derive(Debug, Clone, Serialize)]
pub struct Quote {
bid: Usd,
ask: Usd,
last_updated_at: u64,
}
impl ToSseEvent for bitmex_price_feed::Quote {
fn to_sse_event(&self) -> Event {
let quote = Quote {
bid: self.bid,
ask: self.ask,
last_updated_at: into_unix_secs(self.timestamp),
};
Event::json(&quote).event("quote")
}
}
/// Convert to the format expected by the frontend
fn into_unix_secs(time: SystemTime) -> u64 {
time.duration_since(UNIX_EPOCH)
.expect("timestamp to be convertible to duration since epoch")
.as_secs()
}

9
frontend/src/MakerApp.tsx

@ -19,9 +19,10 @@ import { useEventSource } from "react-sse-hooks";
import { CfdTable } from "./components/cfdtables/CfdTable"; import { CfdTable } from "./components/cfdtables/CfdTable";
import { CfdTableMaker } from "./components/cfdtables/CfdTableMaker"; import { CfdTableMaker } from "./components/cfdtables/CfdTableMaker";
import CurrencyInputField from "./components/CurrencyInputField"; import CurrencyInputField from "./components/CurrencyInputField";
import CurrentPrice from "./components/CurrentPrice";
import useLatestEvent from "./components/Hooks"; import useLatestEvent from "./components/Hooks";
import OrderTile from "./components/OrderTile"; import OrderTile from "./components/OrderTile";
import { Cfd, Order, WalletInfo } from "./components/Types"; import { Cfd, Order, PriceInfo, WalletInfo } from "./components/Types";
import Wallet from "./components/Wallet"; import Wallet from "./components/Wallet";
import { CfdSellOrderPayload, postCfdSellOrderRequest } from "./MakerClient"; import { CfdSellOrderPayload, postCfdSellOrderRequest } from "./MakerClient";
@ -35,6 +36,7 @@ export default function App() {
console.log(cfds); console.log(cfds);
const walletInfo = useLatestEvent<WalletInfo>(source, "wallet"); const walletInfo = useLatestEvent<WalletInfo>(source, "wallet");
const priceInfo = useLatestEvent<PriceInfo>(source, "quote");
const toast = useToast(); const toast = useToast();
let [minQuantity, setMinQuantity] = useState<string>("100"); let [minQuantity, setMinQuantity] = useState<string>("100");
@ -80,11 +82,8 @@ export default function App() {
<HStack spacing={5}> <HStack spacing={5}>
<VStack> <VStack>
<Wallet walletInfo={walletInfo} /> <Wallet walletInfo={walletInfo} />
<CurrentPrice priceInfo={priceInfo} />
<VStack spacing={5} shadow={"md"} padding={5} width="100%" align={"stretch"}> <VStack spacing={5} shadow={"md"} padding={5} width="100%" align={"stretch"}>
<HStack>
<Text width={labelWidth} align={"left"}>Current Price:</Text>
<Text>{49000}</Text>
</HStack>
<HStack> <HStack>
<Text width={labelWidth}>Min Quantity:</Text> <Text width={labelWidth}>Min Quantity:</Text>
<CurrencyInputField <CurrencyInputField

5
frontend/src/TakerApp.tsx

@ -18,8 +18,9 @@ import { useAsync } from "react-async";
import { useEventSource } from "react-sse-hooks"; import { useEventSource } from "react-sse-hooks";
import { CfdTable } from "./components/cfdtables/CfdTable"; import { CfdTable } from "./components/cfdtables/CfdTable";
import CurrencyInputField from "./components/CurrencyInputField"; import CurrencyInputField from "./components/CurrencyInputField";
import CurrentPrice from "./components/CurrentPrice";
import useLatestEvent from "./components/Hooks"; import useLatestEvent from "./components/Hooks";
import { Cfd, Order, WalletInfo } from "./components/Types"; import { Cfd, Order, PriceInfo, WalletInfo } from "./components/Types";
import Wallet from "./components/Wallet"; import Wallet from "./components/Wallet";
interface CfdOrderRequestPayload { interface CfdOrderRequestPayload {
@ -62,6 +63,7 @@ export default function App() {
let cfds = cfdsOrUndefined ? cfdsOrUndefined! : []; let cfds = cfdsOrUndefined ? cfdsOrUndefined! : [];
const order = useLatestEvent<Order>(source, "order"); const order = useLatestEvent<Order>(source, "order");
const walletInfo = useLatestEvent<WalletInfo>(source, "wallet"); const walletInfo = useLatestEvent<WalletInfo>(source, "wallet");
const priceInfo = useLatestEvent<PriceInfo>(source, "quote");
const toast = useToast(); const toast = useToast();
let [quantity, setQuantity] = useState("0"); let [quantity, setQuantity] = useState("0");
@ -123,6 +125,7 @@ export default function App() {
<HStack spacing={5}> <HStack spacing={5}>
<VStack> <VStack>
<Wallet walletInfo={walletInfo} /> <Wallet walletInfo={walletInfo} />
<CurrentPrice priceInfo={priceInfo} />
<VStack shadow={"md"} padding={5} align="stretch" spacing={5} width="100%"> <VStack shadow={"md"} padding={5} align="stretch" spacing={5} width="100%">
<HStack> <HStack>
<Text align={"left"} width={labelWidth}>Order Price:</Text> <Text align={"left"} width={labelWidth}>Order Price:</Text>

41
frontend/src/components/CurrentPrice.tsx

@ -0,0 +1,41 @@
import { Box, Center, Divider, HStack, Skeleton, Text } from "@chakra-ui/react";
import React from "react";
import Timestamp from "./Timestamp";
import { PriceInfo } from "./Types";
interface Props {
priceInfo: PriceInfo | null;
}
export default function CurrentPrice(
{
priceInfo,
}: Props,
) {
let bid = <Skeleton height="20px" />;
let ask = <Skeleton height="20px" />;
let timestamp = <Skeleton height="20px" />;
if (priceInfo) {
bid = <Text>{priceInfo.bid} USD</Text>;
ask = <Text>{priceInfo.ask} USD</Text>;
timestamp = <Timestamp timestamp={priceInfo.last_updated_at} />;
}
return (
<Box shadow={"md"} marginBottom={5} padding={5}>
<Center><Text fontWeight={"bold"}>Current Price</Text></Center>
<HStack>
<Text align={"left"}>Bid:</Text>
{bid}
</HStack>
<Divider marginTop={2} marginBottom={2} />
<HStack>
<Text align={"left"}>Ask:</Text>
{ask}
</HStack>
<Divider marginTop={2} marginBottom={2} />
{timestamp}
</Box>
);
}

26
frontend/src/components/Timestamp.tsx

@ -0,0 +1,26 @@
import { Text } from "@chakra-ui/react";
import React from "react";
import { unixTimestampToDate } from "./Types";
interface Props {
timestamp: number;
}
export default function Timestamp(
{
timestamp,
}: Props,
) {
return (
<Text>
Updated: {unixTimestampToDate(timestamp).toLocaleDateString("en-US", {
year: "numeric",
month: "numeric",
day: "numeric",
hour: "2-digit",
minute: "2-digit",
second: "2-digit",
})}
</Text>
);
}

6
frontend/src/components/Types.tsx

@ -37,6 +37,12 @@ export interface WalletInfo {
last_updated_at: number; last_updated_at: number;
} }
export interface PriceInfo {
bid: number;
ask: number;
last_updated_at: number;
}
export function unixTimestampToDate(unixTimestamp: number): Date { export function unixTimestampToDate(unixTimestamp: number): Date {
return new Date(unixTimestamp * 1000); return new Date(unixTimestamp * 1000);
} }

14
frontend/src/components/Wallet.tsx

@ -1,7 +1,8 @@
import { CheckIcon, CopyIcon } from "@chakra-ui/icons"; import { CheckIcon, CopyIcon } from "@chakra-ui/icons";
import { Box, Center, Divider, HStack, IconButton, Skeleton, Text, useClipboard } from "@chakra-ui/react"; import { Box, Center, Divider, HStack, IconButton, Skeleton, Text, useClipboard } from "@chakra-ui/react";
import React from "react"; import React from "react";
import { unixTimestampToDate, WalletInfo } from "./Types"; import Timestamp from "./Timestamp";
import { WalletInfo } from "./Types";
interface WalletProps { interface WalletProps {
walletInfo: WalletInfo | null; walletInfo: WalletInfo | null;
@ -30,16 +31,7 @@ export default function Wallet(
/> />
</HStack> </HStack>
); );
timestamp = <Text> timestamp = <Timestamp timestamp={walletInfo.last_updated_at} />;
Updated: {unixTimestampToDate(walletInfo.last_updated_at).toLocaleDateString("en-US", {
year: "numeric",
month: "numeric",
day: "numeric",
hour: "2-digit",
minute: "2-digit",
second: "2-digit",
})}
</Text>;
} }
return ( return (

Loading…
Cancel
Save