Browse Source

Use current price in profit margin calculations

As the profit margin is connected to the CFD feed, refresh the feed whenever
current price or the CFDs change.

Fixes #75.
fix-olivia-event-id
Mariusz Klochowicz 3 years ago
parent
commit
4b8237c9d6
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 23
      daemon/src/bitmex_price_feed.rs
  2. 118
      daemon/src/cfd_feed.rs
  3. 34
      daemon/src/maker.rs
  4. 48
      daemon/src/maker_cfd.rs
  5. 12
      daemon/src/routes_maker.rs
  6. 12
      daemon/src/routes_taker.rs
  7. 33
      daemon/src/taker.rs
  8. 50
      daemon/src/taker_cfd.rs
  9. 62
      daemon/src/to_sse_event.rs

23
daemon/src/bitmex_price_feed.rs

@ -1,7 +1,8 @@
use crate::model::Usd; use crate::model::Usd;
use anyhow::Result; use anyhow::{Context, Result};
use futures::{StreamExt, TryStreamExt}; use futures::{StreamExt, TryStreamExt};
use rust_decimal::Decimal; use rust_decimal::Decimal;
use rust_decimal_macros::dec;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::future::Future; use std::future::Future;
use std::time::SystemTime; use std::time::SystemTime;
@ -40,7 +41,7 @@ pub async fn new() -> Result<(impl Future<Output = ()>, watch::Receiver<Quote>)>
Ok((task, receiver)) Ok((task, receiver))
} }
#[derive(Debug)] #[derive(Clone, Debug)]
pub struct Quote { pub struct Quote {
pub timestamp: SystemTime, pub timestamp: SystemTime,
pub bid: Usd, pub bid: Usd,
@ -67,6 +68,24 @@ impl Quote {
ask: Usd::from(Decimal::try_from(quote.ask_price)?), ask: Usd::from(Decimal::try_from(quote.ask_price)?),
})) }))
} }
#[allow(dead_code)] // Not used by all binaries.
pub fn for_maker(&self) -> Usd {
self.ask
}
#[allow(dead_code)] // Not used by all binaries.
pub fn for_taker(&self) -> Usd {
// TODO: Verify whether this is correct
self.mid_range().unwrap()
}
fn mid_range(&self) -> Result<Usd> {
Ok(Usd((self.bid.checked_add(self.ask))?
.0
.checked_div(dec!(2))
.context("division error")?))
}
} }
mod wire { mod wire {

118
daemon/src/cfd_feed.rs

@ -0,0 +1,118 @@
use crate::db::load_all_cfds;
use crate::model::cfd::OrderId;
use crate::model::{Leverage, Position, TradingPair, Usd};
use crate::{bitmex_price_feed, model};
use anyhow::{Context as _, Result};
use bdk::bitcoin::Amount;
use serde::Serialize;
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use std::time::UNIX_EPOCH;
use tokio::sync::watch;
#[allow(dead_code)]
/// Role of the actor's owner in the upcoming contract
pub enum Role {
Maker,
Taker,
}
#[derive(Debug, Clone, Serialize)]
pub struct Cfd {
pub order_id: OrderId,
pub initial_price: Usd,
pub leverage: Leverage,
pub trading_pair: TradingPair,
pub position: Position,
pub liquidation_price: Usd,
pub quantity_usd: Usd,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
pub margin: Amount,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
pub profit_btc: Amount,
pub profit_usd: Usd,
pub state: String,
pub state_transition_timestamp: u64,
}
pub struct CfdFeed {
role: Role,
cfd_feed_sender: watch::Sender<Vec<Cfd>>,
current_price: Option<bitmex_price_feed::Quote>,
}
impl CfdFeed {
pub fn new(role: Role) -> (Self, watch::Receiver<Vec<Cfd>>) {
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::<Vec<Cfd>>(vec![]);
(
Self {
role,
cfd_feed_sender,
current_price: None,
},
cfd_feed_receiver,
)
}
/// Update price from BitMex. It is recommended to call `update()` afterwards.
pub fn set_current_price(&mut self, quote: bitmex_price_feed::Quote) {
self.current_price = Some(quote);
}
/// Updates the CFD feed in the frontend
pub async fn update(&self, conn: &mut PoolConnection<Sqlite>) -> Result<()> {
let cfds = load_all_cfds(conn).await?;
let cfds = cfds
.iter()
.map(|cfd| {
let (profit_btc, profit_usd) = self.calculate_profit(cfd);
Cfd {
order_id: cfd.order.id,
initial_price: cfd.order.price,
leverage: cfd.order.leverage,
trading_pair: cfd.order.trading_pair.clone(),
position: cfd.position(),
liquidation_price: cfd.order.liquidation_price,
quantity_usd: cfd.quantity_usd,
profit_btc,
profit_usd,
state: cfd.state.to_string(),
state_transition_timestamp: cfd
.state
.get_transition_timestamp()
.duration_since(UNIX_EPOCH)
.expect("timestamp to be convertable to duration since epoch")
.as_secs(),
// TODO: Depending on the state the margin might be set (i.e. in Open we save it
// in the DB internally) and does not have to be calculated
margin: cfd.margin().unwrap(),
}
})
.collect::<Vec<Cfd>>();
self.cfd_feed_sender
.send(cfds)
.context("Could not update CFD feed")
}
fn calculate_profit(&self, cfd: &model::cfd::Cfd) -> (Amount, Usd) {
if let Some(quote) = &self.current_price {
cfd.profit(match self.role {
Role::Taker => quote.for_taker(),
Role::Maker => quote.for_maker(),
})
.unwrap()
} else {
// No current price set yet, returning no profits
(Amount::ZERO, Usd::ZERO)
}
}
}

34
daemon/src/maker.rs

@ -1,11 +1,12 @@
use crate::auth::MAKER_USERNAME; use crate::auth::MAKER_USERNAME;
use crate::cfd_feed::CfdFeed;
use crate::seed::Seed; use crate::seed::Seed;
use crate::wallet::Wallet; use crate::wallet::Wallet;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1}; use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1};
use bdk::bitcoin::Network; use bdk::bitcoin::Network;
use clap::Clap; use clap::Clap;
use model::cfd::{Cfd, Order}; use model::cfd::Order;
use model::WalletInfo; use model::WalletInfo;
use rocket::fairing::AdHoc; use rocket::fairing::AdHoc;
use rocket_db_pools::Database; use rocket_db_pools::Database;
@ -20,6 +21,7 @@ use xtra::spawn::TokioGlobalSpawnExt;
mod actors; mod actors;
mod auth; mod auth;
mod bitmex_price_feed; mod bitmex_price_feed;
mod cfd_feed;
mod db; mod db;
mod keypair; mod keypair;
mod logger; mod logger;
@ -105,7 +107,7 @@ async fn main() -> Result<()> {
let oracle = schnorrsig::KeyPair::new(SECP256K1, &mut rand::thread_rng()); // TODO: Fetch oracle public key from oracle. let oracle = schnorrsig::KeyPair::new(SECP256K1, &mut rand::thread_rng()); // TODO: Fetch oracle public key from oracle.
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::<Vec<Cfd>>(vec![]); let (cfd_feed_updater, cfd_feed_receiver) = CfdFeed::new(cfd_feed::Role::Maker);
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None); let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info); let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info);
@ -118,7 +120,7 @@ async fn main() -> Result<()> {
tracing::info!("Listening on {}", local_addr); tracing::info!("Listening on {}", local_addr);
let (task, quote_updates) = bitmex_price_feed::new().await?; let (task, mut quote_updates) = bitmex_price_feed::new().await?;
tokio::spawn(task); tokio::spawn(task);
rocket::custom(figment) rocket::custom(figment)
@ -126,7 +128,7 @@ async fn main() -> Result<()> {
.manage(order_feed_receiver) .manage(order_feed_receiver)
.manage(wallet_feed_receiver) .manage(wallet_feed_receiver)
.manage(auth_password) .manage(auth_password)
.manage(quote_updates) .manage(quote_updates.clone())
.attach(Db::init()) .attach(Db::init())
.attach(AdHoc::try_on_ignite( .attach(AdHoc::try_on_ignite(
"SQL migrations", "SQL migrations",
@ -157,7 +159,7 @@ async fn main() -> Result<()> {
db, db,
wallet.clone(), wallet.clone(),
schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle), schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle),
cfd_feed_sender, cfd_feed_updater,
order_feed_sender, order_feed_sender,
maker_inc_connections_address.clone(), maker_inc_connections_address.clone(),
monitor_actor_address, monitor_actor_address,
@ -193,6 +195,28 @@ async fn main() -> Result<()> {
}); });
tokio::spawn(maker_inc_connections_address.attach_stream(listener_stream)); tokio::spawn(maker_inc_connections_address.attach_stream(listener_stream));
tokio::spawn({
let cfd_actor_inbox = cfd_maker_actor_inbox.clone();
async move {
loop {
let quote = quote_updates.borrow().clone();
if cfd_actor_inbox.do_send_async(maker_cfd::PriceUpdate(quote)).await.is_err() {
tracing::warn!(
"Could not communicate with the message handler, stopping price feed sync"
);
return;
}
if quote_updates.changed().await.is_err() {
tracing::warn!("BitMex price feed receiver not available, stopping price feed sync");
return;
}
}
}
});
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
Ok(rocket.manage(cfd_maker_actor_inbox)) Ok(rocket.manage(cfd_maker_actor_inbox))

48
daemon/src/maker_cfd.rs

@ -1,14 +1,15 @@
use crate::actors::log_error; use crate::actors::log_error;
use crate::cfd_feed::CfdFeed;
use crate::db::{ use crate::db::{
insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds, insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_cfd_by_order_id,
load_cfd_by_order_id, load_order_by_id, load_order_by_id,
}; };
use crate::maker_inc_connections::TakerCommand; use crate::maker_inc_connections::TakerCommand;
use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId}; use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId};
use crate::model::{TakerId, Usd}; use crate::model::{TakerId, Usd};
use crate::monitor::MonitorParams; use crate::monitor::MonitorParams;
use crate::wallet::Wallet; use crate::wallet::Wallet;
use crate::{maker_inc_connections, monitor, setup_contract, wire}; use crate::{bitmex_price_feed, maker_inc_connections, monitor, setup_contract, wire};
use anyhow::{Context as _, Result}; use anyhow::{Context as _, Result};
use async_trait::async_trait; use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::secp256k1::schnorrsig;
@ -43,11 +44,13 @@ pub struct TakerStreamMessage {
pub item: Result<wire::TakerToMaker>, pub item: Result<wire::TakerToMaker>,
} }
pub struct PriceUpdate(pub bitmex_price_feed::Quote);
pub struct Actor { pub struct Actor {
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Wallet,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed: CfdFeed,
order_feed_sender: watch::Sender<Option<Order>>, order_feed_sender: watch::Sender<Option<Order>>,
takers: Address<maker_inc_connections::Actor>, takers: Address<maker_inc_connections::Actor>,
current_order_id: Option<OrderId>, current_order_id: Option<OrderId>,
@ -68,7 +71,7 @@ impl Actor {
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Wallet,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed: CfdFeed,
order_feed_sender: watch::Sender<Option<Order>>, order_feed_sender: watch::Sender<Option<Order>>,
takers: Address<maker_inc_connections::Actor>, takers: Address<maker_inc_connections::Actor>,
monitor_actor: Address<monitor::Actor<Actor>>, monitor_actor: Address<monitor::Actor<Actor>>,
@ -76,13 +79,13 @@ impl Actor {
let mut conn = db.acquire().await?; let mut conn = db.acquire().await?;
// populate the CFD feed with existing CFDs // populate the CFD feed with existing CFDs
cfd_feed_actor_inbox.send(load_all_cfds(&mut conn).await?)?; cfd_feed.update(&mut conn).await?;
Ok(Self { Ok(Self {
db, db,
wallet, wallet,
oracle_pk, oracle_pk,
cfd_feed_actor_inbox, cfd_feed,
order_feed_sender, order_feed_sender,
takers, takers,
current_order_id: None, current_order_id: None,
@ -170,8 +173,7 @@ impl Actor {
) )
.await?; .await?;
self.cfd_feed_actor_inbox self.cfd_feed.update(&mut conn).await?;
.send(load_all_cfds(&mut conn).await?)?;
let txid = self let txid = self
.wallet .wallet
@ -248,8 +250,7 @@ impl Actor {
); );
insert_cfd(cfd, &mut conn).await?; insert_cfd(cfd, &mut conn).await?;
self.cfd_feed_actor_inbox self.cfd_feed.update(&mut conn).await?;
.send(load_all_cfds(&mut conn).await?)?;
// 3. Remove current order // 3. Remove current order
self.current_order_id = None; self.current_order_id = None;
@ -309,8 +310,7 @@ impl Actor {
}) })
.await?; .await?;
self.cfd_feed_actor_inbox self.cfd_feed.update(&mut conn).await?;
.send(load_all_cfds(&mut conn).await?)?;
let contract_future = setup_contract::new( let contract_future = setup_contract::new(
self.takers.clone().into_sink().with(move |msg| { self.takers.clone().into_sink().with(move |msg| {
@ -380,8 +380,7 @@ impl Actor {
command: TakerCommand::NotifyOrderRejected { id: msg.order_id }, command: TakerCommand::NotifyOrderRejected { id: msg.order_id },
}) })
.await?; .await?;
self.cfd_feed_actor_inbox self.cfd_feed.update(&mut conn).await?;
.send(load_all_cfds(&mut conn).await?)?;
// Remove order for all // Remove order for all
self.current_order_id = None; self.current_order_id = None;
@ -418,6 +417,14 @@ impl Actor {
Ok(()) Ok(())
} }
async fn handle_price_update(&mut self, quote: bitmex_price_feed::Quote) -> Result<()> {
self.cfd_feed.set_current_price(quote);
let mut conn = self.db.acquire().await?;
self.cfd_feed.update(&mut conn).await?;
Ok(())
}
} }
#[async_trait] #[async_trait]
@ -494,6 +501,13 @@ impl Handler<TakerStreamMessage> for Actor {
} }
} }
#[async_trait]
impl Handler<PriceUpdate> for Actor {
async fn handle(&mut self, msg: PriceUpdate, _ctx: &mut Context<Self>) {
log_error!(self.handle_price_update(msg.0))
}
}
impl Message for NewOrder { impl Message for NewOrder {
type Result = (); type Result = ();
} }
@ -519,4 +533,8 @@ impl Message for TakerStreamMessage {
type Result = KeepRunning; type Result = KeepRunning;
} }
impl Message for PriceUpdate {
type Result = ();
}
impl xtra::Actor for Actor {} impl xtra::Actor for Actor {}

12
daemon/src/routes_maker.rs

@ -1,10 +1,9 @@
use crate::auth::Authenticated; use crate::auth::Authenticated;
use crate::bitmex_price_feed; use crate::model::cfd::{Order, OrderId, Origin};
use crate::maker_cfd;
use crate::model::cfd::{Cfd, Order, OrderId, Origin};
use crate::model::{Usd, WalletInfo}; use crate::model::{Usd, WalletInfo};
use crate::routes::EmbeddedFileExt; use crate::routes::EmbeddedFileExt;
use crate::to_sse_event::ToSseEvent; use crate::to_sse_event::ToSseEvent;
use crate::{bitmex_price_feed, cfd_feed, maker_cfd};
use anyhow::Result; use anyhow::Result;
use rocket::http::{ContentType, Header, Status}; use rocket::http::{ContentType, Header, Status};
use rocket::response::stream::EventStream; use rocket::response::stream::EventStream;
@ -21,7 +20,7 @@ use xtra::Address;
#[rocket::get("/feed")] #[rocket::get("/feed")]
pub async fn maker_feed( pub async fn maker_feed(
rx_cfds: &State<watch::Receiver<Vec<Cfd>>>, rx_cfds: &State<watch::Receiver<Vec<cfd_feed::Cfd>>>,
rx_order: &State<watch::Receiver<Option<Order>>>, rx_order: &State<watch::Receiver<Option<Order>>>,
rx_wallet: &State<watch::Receiver<WalletInfo>>, rx_wallet: &State<watch::Receiver<WalletInfo>>,
rx_quote: &State<watch::Receiver<bitmex_price_feed::Quote>>, rx_quote: &State<watch::Receiver<bitmex_price_feed::Quote>>,
@ -39,11 +38,11 @@ pub async fn maker_feed(
let order = rx_order.borrow().clone(); let order = rx_order.borrow().clone();
yield order.to_sse_event(); yield order.to_sse_event();
let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event();
let quote = rx_quote.borrow().clone(); let quote = rx_quote.borrow().clone();
yield quote.to_sse_event(); yield quote.to_sse_event();
let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event();
loop{ loop{
select! { select! {
@ -58,6 +57,7 @@ pub async fn maker_feed(
Ok(()) = rx_cfds.changed() => { Ok(()) = rx_cfds.changed() => {
let cfds = rx_cfds.borrow().clone(); let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event(); yield cfds.to_sse_event();
}
Ok(()) = rx_quote.changed() => { Ok(()) = rx_quote.changed() => {
let quote = rx_quote.borrow().clone(); let quote = rx_quote.borrow().clone();
yield quote.to_sse_event(); yield quote.to_sse_event();

12
daemon/src/routes_taker.rs

@ -1,9 +1,8 @@
use crate::bitmex_price_feed; use crate::model::cfd::{calculate_buy_margin, Order, OrderId};
use crate::model::cfd::{calculate_buy_margin, Cfd, Order, OrderId};
use crate::model::{Leverage, Usd, WalletInfo}; use crate::model::{Leverage, Usd, WalletInfo};
use crate::routes::EmbeddedFileExt; use crate::routes::EmbeddedFileExt;
use crate::taker_cfd;
use crate::to_sse_event::ToSseEvent; use crate::to_sse_event::ToSseEvent;
use crate::{bitmex_price_feed, cfd_feed, taker_cfd};
use bdk::bitcoin::Amount; use bdk::bitcoin::Amount;
use rocket::http::{ContentType, Status}; use rocket::http::{ContentType, Status};
use rocket::response::stream::EventStream; use rocket::response::stream::EventStream;
@ -20,7 +19,7 @@ use xtra::Address;
#[rocket::get("/feed")] #[rocket::get("/feed")]
pub async fn feed( pub async fn feed(
rx_cfds: &State<watch::Receiver<Vec<Cfd>>>, rx_cfds: &State<watch::Receiver<Vec<cfd_feed::Cfd>>>,
rx_order: &State<watch::Receiver<Option<Order>>>, rx_order: &State<watch::Receiver<Option<Order>>>,
rx_wallet: &State<watch::Receiver<WalletInfo>>, rx_wallet: &State<watch::Receiver<WalletInfo>>,
rx_quote: &State<watch::Receiver<bitmex_price_feed::Quote>>, rx_quote: &State<watch::Receiver<bitmex_price_feed::Quote>>,
@ -37,11 +36,11 @@ pub async fn feed(
let order = rx_order.borrow().clone(); let order = rx_order.borrow().clone();
yield order.to_sse_event(); yield order.to_sse_event();
let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event();
let quote = rx_quote.borrow().clone(); let quote = rx_quote.borrow().clone();
yield quote.to_sse_event(); yield quote.to_sse_event();
let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event();
loop{ loop{
select! { select! {
@ -56,6 +55,7 @@ pub async fn feed(
Ok(()) = rx_cfds.changed() => { Ok(()) = rx_cfds.changed() => {
let cfds = rx_cfds.borrow().clone(); let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event(); yield cfds.to_sse_event();
}
Ok(()) = rx_quote.changed() => { Ok(()) = rx_quote.changed() => {
let quote = rx_quote.borrow().clone(); let quote = rx_quote.borrow().clone();
yield quote.to_sse_event(); yield quote.to_sse_event();

33
daemon/src/taker.rs

@ -1,3 +1,4 @@
use crate::cfd_feed::CfdFeed;
use crate::model::WalletInfo; use crate::model::WalletInfo;
use crate::wallet::Wallet; use crate::wallet::Wallet;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
@ -5,7 +6,7 @@ use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1};
use bdk::bitcoin::Network; use bdk::bitcoin::Network;
use clap::Clap; use clap::Clap;
use futures::StreamExt; use futures::StreamExt;
use model::cfd::{Cfd, Order}; use model::cfd::Order;
use rocket::fairing::AdHoc; use rocket::fairing::AdHoc;
use rocket_db_pools::Database; use rocket_db_pools::Database;
use seed::Seed; use seed::Seed;
@ -23,6 +24,7 @@ use xtra::Actor;
mod actors; mod actors;
mod bitmex_price_feed; mod bitmex_price_feed;
mod cfd_feed;
mod db; mod db;
mod keypair; mod keypair;
mod logger; mod logger;
@ -101,7 +103,7 @@ async fn main() -> Result<()> {
let oracle = schnorrsig::KeyPair::new(SECP256K1, &mut rand::thread_rng()); // TODO: Fetch oracle public key from oracle. let oracle = schnorrsig::KeyPair::new(SECP256K1, &mut rand::thread_rng()); // TODO: Fetch oracle public key from oracle.
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::<Vec<Cfd>>(vec![]); let (cfd_feed_updater, cfd_feed_receiver) = CfdFeed::new(cfd_feed::Role::Taker);
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None); let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info); let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info);
@ -118,7 +120,7 @@ async fn main() -> Result<()> {
} }
}; };
let (task, quote_updates) = bitmex_price_feed::new().await?; let (task, mut quote_updates) = bitmex_price_feed::new().await?;
tokio::spawn(task); tokio::spawn(task);
let figment = rocket::Config::figment() let figment = rocket::Config::figment()
@ -129,7 +131,7 @@ async fn main() -> Result<()> {
.manage(cfd_feed_receiver) .manage(cfd_feed_receiver)
.manage(order_feed_receiver) .manage(order_feed_receiver)
.manage(wallet_feed_receiver) .manage(wallet_feed_receiver)
.manage(quote_updates) .manage(quote_updates.clone())
.attach(Db::init()) .attach(Db::init())
.attach(AdHoc::try_on_ignite( .attach(AdHoc::try_on_ignite(
"SQL migrations", "SQL migrations",
@ -161,7 +163,7 @@ async fn main() -> Result<()> {
db, db,
wallet.clone(), wallet.clone(),
schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle), schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle),
cfd_feed_sender, cfd_feed_updater,
order_feed_sender, order_feed_sender,
send_to_maker, send_to_maker,
monitor_actor_address, monitor_actor_address,
@ -181,6 +183,27 @@ async fn main() -> Result<()> {
cfd_actor_inbox.clone(), cfd_actor_inbox.clone(),
))); )));
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
tokio::spawn({
let cfd_actor_inbox = cfd_actor_inbox.clone();
async move {
loop {
let quote = quote_updates.borrow().clone();
if cfd_actor_inbox.do_send_async(taker_cfd::PriceUpdate(quote)).await.is_err() {
tracing::warn!(
"Could not communicate with the message handler, stopping price feed sync"
);
return;
}
if quote_updates.changed().await.is_err() {
tracing::warn!("BitMex price feed receiver not available, stopping price feed sync");
return;
}
}
}
});
Ok(rocket.manage(cfd_actor_inbox)) Ok(rocket.manage(cfd_actor_inbox))
}, },

50
daemon/src/taker_cfd.rs

@ -1,7 +1,8 @@
use crate::actors::log_error; use crate::actors::log_error;
use crate::cfd_feed::CfdFeed;
use crate::db::{ use crate::db::{
insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds, insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_cfd_by_order_id,
load_cfd_by_order_id, load_order_by_id, load_order_by_id,
}; };
use crate::model::cfd::{ use crate::model::cfd::{
Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId, Origin, Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId, Origin,
@ -10,7 +11,7 @@ use crate::model::Usd;
use crate::monitor::{self, MonitorParams}; use crate::monitor::{self, MonitorParams};
use crate::wallet::Wallet; use crate::wallet::Wallet;
use crate::wire::SetupMsg; use crate::wire::SetupMsg;
use crate::{send_to_socket, setup_contract, wire}; use crate::{bitmex_price_feed, send_to_socket, setup_contract, wire};
use anyhow::{Context as _, Result}; use anyhow::{Context as _, Result};
use async_trait::async_trait; use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::secp256k1::schnorrsig;
@ -35,6 +36,8 @@ pub struct CfdSetupCompleted {
pub dlc: Result<Dlc>, pub dlc: Result<Dlc>,
} }
pub struct PriceUpdate(pub bitmex_price_feed::Quote);
enum SetupState { enum SetupState {
Active { Active {
sender: mpsc::UnboundedSender<wire::SetupMsg>, sender: mpsc::UnboundedSender<wire::SetupMsg>,
@ -46,7 +49,7 @@ pub struct Actor {
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Wallet,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed: CfdFeed,
order_feed_actor_inbox: watch::Sender<Option<Order>>, order_feed_actor_inbox: watch::Sender<Option<Order>>,
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>, send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>,
monitor_actor: Address<monitor::Actor<Actor>>, monitor_actor: Address<monitor::Actor<Actor>>,
@ -58,19 +61,21 @@ impl Actor {
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Wallet,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed: CfdFeed,
order_feed_actor_inbox: watch::Sender<Option<Order>>, order_feed_actor_inbox: watch::Sender<Option<Order>>,
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>, send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>,
monitor_actor: Address<monitor::Actor<Actor>>, monitor_actor: Address<monitor::Actor<Actor>>,
) -> Result<Self> { ) -> Result<Self> {
let mut conn = db.acquire().await?; let mut conn = db.acquire().await?;
cfd_feed_actor_inbox.send(load_all_cfds(&mut conn).await?)?;
// populate the CFD feed with existing CFDs
cfd_feed.update(&mut conn).await?;
Ok(Self { Ok(Self {
db, db,
wallet, wallet,
oracle_pk, oracle_pk,
cfd_feed_actor_inbox, cfd_feed,
order_feed_actor_inbox, order_feed_actor_inbox,
send_to_maker, send_to_maker,
monitor_actor, monitor_actor,
@ -97,8 +102,7 @@ impl Actor {
insert_cfd(cfd, &mut conn).await?; insert_cfd(cfd, &mut conn).await?;
self.cfd_feed_actor_inbox self.cfd_feed.update(&mut conn).await?;
.send(load_all_cfds(&mut conn).await?)?;
self.send_to_maker self.send_to_maker
.do_send_async(wire::TakerToMaker::TakeOrder { order_id, quantity }) .do_send_async(wire::TakerToMaker::TakeOrder { order_id, quantity })
.await?; .await?;
@ -147,8 +151,7 @@ impl Actor {
) )
.await?; .await?;
self.cfd_feed_actor_inbox self.cfd_feed.update(&mut conn).await?;
.send(load_all_cfds(&mut conn).await?)?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let contract_future = setup_contract::new( let contract_future = setup_contract::new(
@ -194,8 +197,7 @@ impl Actor {
) )
.await?; .await?;
self.cfd_feed_actor_inbox self.cfd_feed.update(&mut conn).await?;
.send(load_all_cfds(&mut conn).await?)?;
Ok(()) Ok(())
} }
@ -235,8 +237,7 @@ impl Actor {
) )
.await?; .await?;
self.cfd_feed_actor_inbox self.cfd_feed.update(&mut conn).await?;
.send(load_all_cfds(&mut conn).await?)?;
let txid = self let txid = self
.wallet .wallet
@ -298,6 +299,14 @@ impl Actor {
Ok(()) Ok(())
} }
async fn handle_price_update(&mut self, quote: bitmex_price_feed::Quote) -> Result<()> {
self.cfd_feed.set_current_price(quote);
let mut conn = self.db.acquire().await?;
self.cfd_feed.update(&mut conn).await?;
Ok(())
}
} }
#[async_trait] #[async_trait]
@ -356,6 +365,13 @@ impl Handler<monitor::Event> for Actor {
} }
} }
#[async_trait]
impl Handler<PriceUpdate> for Actor {
async fn handle(&mut self, msg: PriceUpdate, _ctx: &mut Context<Self>) {
log_error!(self.handle_price_update(msg.0))
}
}
impl Message for TakeOffer { impl Message for TakeOffer {
type Result = (); type Result = ();
} }
@ -369,4 +385,8 @@ impl Message for CfdSetupCompleted {
type Result = (); type Result = ();
} }
impl Message for PriceUpdate {
type Result = ();
}
impl xtra::Actor for Actor {} impl xtra::Actor for Actor {}

62
daemon/src/to_sse_event.rs

@ -1,34 +1,11 @@
use crate::model::cfd::OrderId; use crate::model::cfd::OrderId;
use crate::model::{Leverage, Position, TradingPair, Usd}; use crate::model::{Leverage, Position, TradingPair, Usd};
use crate::{bitmex_price_feed, model}; use crate::{bitmex_price_feed, cfd_feed, model};
use bdk::bitcoin::Amount; use bdk::bitcoin::Amount;
use rocket::response::stream::Event; use rocket::response::stream::Event;
use serde::Serialize; use serde::Serialize;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Serialize)]
pub struct Cfd {
pub order_id: OrderId,
pub initial_price: Usd,
pub leverage: Leverage,
pub trading_pair: TradingPair,
pub position: Position,
pub liquidation_price: Usd,
pub quantity_usd: Usd,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
pub margin: Amount,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
pub profit_btc: Amount,
pub profit_usd: Usd,
pub state: String,
pub state_transition_timestamp: u64,
}
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct CfdOrder { pub struct CfdOrder {
pub id: OrderId, pub id: OrderId,
@ -52,42 +29,9 @@ pub trait ToSseEvent {
fn to_sse_event(&self) -> Event; fn to_sse_event(&self) -> Event;
} }
impl ToSseEvent for Vec<model::cfd::Cfd> { impl ToSseEvent for Vec<cfd_feed::Cfd> {
// TODO: This conversion can fail, we might want to change the API
fn to_sse_event(&self) -> Event { fn to_sse_event(&self) -> Event {
let cfds = self Event::json(&self).event("cfds")
.iter()
.map(|cfd| {
// TODO: Get the actual current price here
let current_price = Usd::ZERO;
let (profit_btc, profit_usd) = cfd.profit(current_price).unwrap();
Cfd {
order_id: cfd.order.id,
initial_price: cfd.order.price,
leverage: cfd.order.leverage,
trading_pair: cfd.order.trading_pair.clone(),
position: cfd.position(),
liquidation_price: cfd.order.liquidation_price,
quantity_usd: cfd.quantity_usd,
profit_btc,
profit_usd,
state: cfd.state.to_string(),
state_transition_timestamp: cfd
.state
.get_transition_timestamp()
.duration_since(UNIX_EPOCH)
.expect("timestamp to be convertable to duration since epoch")
.as_secs(),
// TODO: Depending on the state the margin might be set (i.e. in Open we save it
// in the DB internally) and does not have to be calculated
margin: cfd.margin().unwrap(),
}
})
.collect::<Vec<Cfd>>();
Event::json(&cfds).event("cfds")
} }
} }

Loading…
Cancel
Save