Browse Source

Revert "Use current price in profit margin calculations"

This reverts commit 4b8237c9d6.

After last night's conversation, we decided to return to the first version of
the PR, which will come in a subsequent commit.
fix-olivia-event-id
Mariusz Klochowicz 3 years ago
parent
commit
3badba7067
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 23
      daemon/src/bitmex_price_feed.rs
  2. 118
      daemon/src/cfd_feed.rs
  3. 34
      daemon/src/maker.rs
  4. 48
      daemon/src/maker_cfd.rs
  5. 12
      daemon/src/routes_maker.rs
  6. 12
      daemon/src/routes_taker.rs
  7. 33
      daemon/src/taker.rs
  8. 50
      daemon/src/taker_cfd.rs
  9. 62
      daemon/src/to_sse_event.rs

23
daemon/src/bitmex_price_feed.rs

@ -1,8 +1,7 @@
use crate::model::Usd; use crate::model::Usd;
use anyhow::{Context, Result}; use anyhow::Result;
use futures::{StreamExt, TryStreamExt}; use futures::{StreamExt, TryStreamExt};
use rust_decimal::Decimal; use rust_decimal::Decimal;
use rust_decimal_macros::dec;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::future::Future; use std::future::Future;
use std::time::SystemTime; use std::time::SystemTime;
@ -41,7 +40,7 @@ pub async fn new() -> Result<(impl Future<Output = ()>, watch::Receiver<Quote>)>
Ok((task, receiver)) Ok((task, receiver))
} }
#[derive(Clone, Debug)] #[derive(Debug)]
pub struct Quote { pub struct Quote {
pub timestamp: SystemTime, pub timestamp: SystemTime,
pub bid: Usd, pub bid: Usd,
@ -68,24 +67,6 @@ impl Quote {
ask: Usd::from(Decimal::try_from(quote.ask_price)?), ask: Usd::from(Decimal::try_from(quote.ask_price)?),
})) }))
} }
#[allow(dead_code)] // Not used by all binaries.
pub fn for_maker(&self) -> Usd {
self.ask
}
#[allow(dead_code)] // Not used by all binaries.
pub fn for_taker(&self) -> Usd {
// TODO: Verify whether this is correct
self.mid_range().unwrap()
}
fn mid_range(&self) -> Result<Usd> {
Ok(Usd((self.bid.checked_add(self.ask))?
.0
.checked_div(dec!(2))
.context("division error")?))
}
} }
mod wire { mod wire {

118
daemon/src/cfd_feed.rs

@ -1,118 +0,0 @@
use crate::db::load_all_cfds;
use crate::model::cfd::OrderId;
use crate::model::{Leverage, Position, TradingPair, Usd};
use crate::{bitmex_price_feed, model};
use anyhow::{Context as _, Result};
use bdk::bitcoin::Amount;
use serde::Serialize;
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use std::time::UNIX_EPOCH;
use tokio::sync::watch;
#[allow(dead_code)]
/// Role of the actor's owner in the upcoming contract
pub enum Role {
Maker,
Taker,
}
#[derive(Debug, Clone, Serialize)]
pub struct Cfd {
pub order_id: OrderId,
pub initial_price: Usd,
pub leverage: Leverage,
pub trading_pair: TradingPair,
pub position: Position,
pub liquidation_price: Usd,
pub quantity_usd: Usd,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
pub margin: Amount,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
pub profit_btc: Amount,
pub profit_usd: Usd,
pub state: String,
pub state_transition_timestamp: u64,
}
pub struct CfdFeed {
role: Role,
cfd_feed_sender: watch::Sender<Vec<Cfd>>,
current_price: Option<bitmex_price_feed::Quote>,
}
impl CfdFeed {
pub fn new(role: Role) -> (Self, watch::Receiver<Vec<Cfd>>) {
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::<Vec<Cfd>>(vec![]);
(
Self {
role,
cfd_feed_sender,
current_price: None,
},
cfd_feed_receiver,
)
}
/// Update price from BitMex. It is recommended to call `update()` afterwards.
pub fn set_current_price(&mut self, quote: bitmex_price_feed::Quote) {
self.current_price = Some(quote);
}
/// Updates the CFD feed in the frontend
pub async fn update(&self, conn: &mut PoolConnection<Sqlite>) -> Result<()> {
let cfds = load_all_cfds(conn).await?;
let cfds = cfds
.iter()
.map(|cfd| {
let (profit_btc, profit_usd) = self.calculate_profit(cfd);
Cfd {
order_id: cfd.order.id,
initial_price: cfd.order.price,
leverage: cfd.order.leverage,
trading_pair: cfd.order.trading_pair.clone(),
position: cfd.position(),
liquidation_price: cfd.order.liquidation_price,
quantity_usd: cfd.quantity_usd,
profit_btc,
profit_usd,
state: cfd.state.to_string(),
state_transition_timestamp: cfd
.state
.get_transition_timestamp()
.duration_since(UNIX_EPOCH)
.expect("timestamp to be convertable to duration since epoch")
.as_secs(),
// TODO: Depending on the state the margin might be set (i.e. in Open we save it
// in the DB internally) and does not have to be calculated
margin: cfd.margin().unwrap(),
}
})
.collect::<Vec<Cfd>>();
self.cfd_feed_sender
.send(cfds)
.context("Could not update CFD feed")
}
fn calculate_profit(&self, cfd: &model::cfd::Cfd) -> (Amount, Usd) {
if let Some(quote) = &self.current_price {
cfd.profit(match self.role {
Role::Taker => quote.for_taker(),
Role::Maker => quote.for_maker(),
})
.unwrap()
} else {
// No current price set yet, returning no profits
(Amount::ZERO, Usd::ZERO)
}
}
}

34
daemon/src/maker.rs

@ -1,5 +1,4 @@
use crate::auth::MAKER_USERNAME; use crate::auth::MAKER_USERNAME;
use crate::cfd_feed::CfdFeed;
use crate::db::load_all_cfds; use crate::db::load_all_cfds;
use crate::seed::Seed; use crate::seed::Seed;
use crate::wallet::Wallet; use crate::wallet::Wallet;
@ -7,7 +6,7 @@ use anyhow::{Context, Result};
use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1}; use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1};
use bdk::bitcoin::Network; use bdk::bitcoin::Network;
use clap::Clap; use clap::Clap;
use model::cfd::Order; use model::cfd::{Cfd, Order};
use model::WalletInfo; use model::WalletInfo;
use rocket::fairing::AdHoc; use rocket::fairing::AdHoc;
use rocket_db_pools::Database; use rocket_db_pools::Database;
@ -21,7 +20,6 @@ use xtra::spawn::TokioGlobalSpawnExt;
mod actors; mod actors;
mod auth; mod auth;
mod bitmex_price_feed; mod bitmex_price_feed;
mod cfd_feed;
mod cleanup; mod cleanup;
mod db; mod db;
mod keypair; mod keypair;
@ -108,7 +106,7 @@ async fn main() -> Result<()> {
let oracle = schnorrsig::KeyPair::new(SECP256K1, &mut rand::thread_rng()); // TODO: Fetch oracle public key from oracle. let oracle = schnorrsig::KeyPair::new(SECP256K1, &mut rand::thread_rng()); // TODO: Fetch oracle public key from oracle.
let (cfd_feed_updater, cfd_feed_receiver) = CfdFeed::new(cfd_feed::Role::Maker); let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::<Vec<Cfd>>(vec![]);
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None); let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info); let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info);
@ -121,7 +119,7 @@ async fn main() -> Result<()> {
tracing::info!("Listening on {}", local_addr); tracing::info!("Listening on {}", local_addr);
let (task, mut quote_updates) = bitmex_price_feed::new().await?; let (task, quote_updates) = bitmex_price_feed::new().await?;
tokio::spawn(task); tokio::spawn(task);
rocket::custom(figment) rocket::custom(figment)
@ -129,7 +127,7 @@ async fn main() -> Result<()> {
.manage(order_feed_receiver) .manage(order_feed_receiver)
.manage(wallet_feed_receiver) .manage(wallet_feed_receiver)
.manage(auth_password) .manage(auth_password)
.manage(quote_updates.clone()) .manage(quote_updates)
.attach(Db::init()) .attach(Db::init())
.attach(AdHoc::try_on_ignite( .attach(AdHoc::try_on_ignite(
"SQL migrations", "SQL migrations",
@ -166,7 +164,7 @@ async fn main() -> Result<()> {
db, db,
wallet.clone(), wallet.clone(),
schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle), schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle),
cfd_feed_updater, cfd_feed_sender,
order_feed_sender, order_feed_sender,
maker_inc_connections_address.clone(), maker_inc_connections_address.clone(),
monitor_actor_address, monitor_actor_address,
@ -201,28 +199,6 @@ async fn main() -> Result<()> {
}); });
tokio::spawn(maker_inc_connections_address.attach_stream(listener_stream)); tokio::spawn(maker_inc_connections_address.attach_stream(listener_stream));
tokio::spawn({
let cfd_actor_inbox = cfd_maker_actor_inbox.clone();
async move {
loop {
let quote = quote_updates.borrow().clone();
if cfd_actor_inbox.do_send_async(maker_cfd::PriceUpdate(quote)).await.is_err() {
tracing::warn!(
"Could not communicate with the message handler, stopping price feed sync"
);
return;
}
if quote_updates.changed().await.is_err() {
tracing::warn!("BitMex price feed receiver not available, stopping price feed sync");
return;
}
}
}
});
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
Ok(rocket.manage(cfd_maker_actor_inbox)) Ok(rocket.manage(cfd_maker_actor_inbox))

48
daemon/src/maker_cfd.rs

@ -1,15 +1,14 @@
use crate::actors::log_error; use crate::actors::log_error;
use crate::cfd_feed::CfdFeed;
use crate::db::{ use crate::db::{
insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_cfd_by_order_id, insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds,
load_order_by_id, load_cfd_by_order_id, load_order_by_id,
}; };
use crate::maker_inc_connections::TakerCommand; use crate::maker_inc_connections::TakerCommand;
use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId}; use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId};
use crate::model::{TakerId, Usd}; use crate::model::{TakerId, Usd};
use crate::monitor::MonitorParams; use crate::monitor::MonitorParams;
use crate::wallet::Wallet; use crate::wallet::Wallet;
use crate::{bitmex_price_feed, maker_inc_connections, monitor, setup_contract, wire}; use crate::{maker_inc_connections, monitor, setup_contract, wire};
use anyhow::{Context as _, Result}; use anyhow::{Context as _, Result};
use async_trait::async_trait; use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::secp256k1::schnorrsig;
@ -44,13 +43,11 @@ pub struct TakerStreamMessage {
pub item: Result<wire::TakerToMaker>, pub item: Result<wire::TakerToMaker>,
} }
pub struct PriceUpdate(pub bitmex_price_feed::Quote);
pub struct Actor { pub struct Actor {
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Wallet,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed: CfdFeed, cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_sender: watch::Sender<Option<Order>>, order_feed_sender: watch::Sender<Option<Order>>,
takers: Address<maker_inc_connections::Actor>, takers: Address<maker_inc_connections::Actor>,
current_order_id: Option<OrderId>, current_order_id: Option<OrderId>,
@ -72,7 +69,7 @@ impl Actor {
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Wallet,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed: CfdFeed, cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_sender: watch::Sender<Option<Order>>, order_feed_sender: watch::Sender<Option<Order>>,
takers: Address<maker_inc_connections::Actor>, takers: Address<maker_inc_connections::Actor>,
monitor_actor: Address<monitor::Actor<Actor>>, monitor_actor: Address<monitor::Actor<Actor>>,
@ -81,7 +78,7 @@ impl Actor {
let mut conn = db.acquire().await?; let mut conn = db.acquire().await?;
// populate the CFD feed with existing CFDs // populate the CFD feed with existing CFDs
cfd_feed.update(&mut conn).await?; cfd_feed_actor_inbox.send(load_all_cfds(&mut conn).await?)?;
for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) { for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) {
let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?; let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?;
@ -100,7 +97,7 @@ impl Actor {
db, db,
wallet, wallet,
oracle_pk, oracle_pk,
cfd_feed, cfd_feed_actor_inbox,
order_feed_sender, order_feed_sender,
takers, takers,
current_order_id: None, current_order_id: None,
@ -188,7 +185,8 @@ impl Actor {
) )
.await?; .await?;
self.cfd_feed.update(&mut conn).await?; self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let txid = self let txid = self
.wallet .wallet
@ -251,7 +249,8 @@ impl Actor {
); );
insert_cfd(cfd, &mut conn).await?; insert_cfd(cfd, &mut conn).await?;
self.cfd_feed.update(&mut conn).await?; self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
// 3. Remove current order // 3. Remove current order
self.current_order_id = None; self.current_order_id = None;
@ -311,7 +310,8 @@ impl Actor {
}) })
.await?; .await?;
self.cfd_feed.update(&mut conn).await?; self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let contract_future = setup_contract::new( let contract_future = setup_contract::new(
self.takers.clone().into_sink().with(move |msg| { self.takers.clone().into_sink().with(move |msg| {
@ -381,7 +381,8 @@ impl Actor {
command: TakerCommand::NotifyOrderRejected { id: msg.order_id }, command: TakerCommand::NotifyOrderRejected { id: msg.order_id },
}) })
.await?; .await?;
self.cfd_feed.update(&mut conn).await?; self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
// Remove order for all // Remove order for all
self.current_order_id = None; self.current_order_id = None;
@ -418,14 +419,6 @@ impl Actor {
Ok(()) Ok(())
} }
async fn handle_price_update(&mut self, quote: bitmex_price_feed::Quote) -> Result<()> {
self.cfd_feed.set_current_price(quote);
let mut conn = self.db.acquire().await?;
self.cfd_feed.update(&mut conn).await?;
Ok(())
}
} }
#[async_trait] #[async_trait]
@ -502,13 +495,6 @@ impl Handler<TakerStreamMessage> for Actor {
} }
} }
#[async_trait]
impl Handler<PriceUpdate> for Actor {
async fn handle(&mut self, msg: PriceUpdate, _ctx: &mut Context<Self>) {
log_error!(self.handle_price_update(msg.0))
}
}
impl Message for NewOrder { impl Message for NewOrder {
type Result = (); type Result = ();
} }
@ -534,8 +520,4 @@ impl Message for TakerStreamMessage {
type Result = KeepRunning; type Result = KeepRunning;
} }
impl Message for PriceUpdate {
type Result = ();
}
impl xtra::Actor for Actor {} impl xtra::Actor for Actor {}

12
daemon/src/routes_maker.rs

@ -1,9 +1,10 @@
use crate::auth::Authenticated; use crate::auth::Authenticated;
use crate::model::cfd::{Order, OrderId, Origin}; use crate::bitmex_price_feed;
use crate::maker_cfd;
use crate::model::cfd::{Cfd, Order, OrderId, Origin};
use crate::model::{Usd, WalletInfo}; use crate::model::{Usd, WalletInfo};
use crate::routes::EmbeddedFileExt; use crate::routes::EmbeddedFileExt;
use crate::to_sse_event::ToSseEvent; use crate::to_sse_event::ToSseEvent;
use crate::{bitmex_price_feed, cfd_feed, maker_cfd};
use anyhow::Result; use anyhow::Result;
use rocket::http::{ContentType, Header, Status}; use rocket::http::{ContentType, Header, Status};
use rocket::response::stream::EventStream; use rocket::response::stream::EventStream;
@ -20,7 +21,7 @@ use xtra::Address;
#[rocket::get("/feed")] #[rocket::get("/feed")]
pub async fn maker_feed( pub async fn maker_feed(
rx_cfds: &State<watch::Receiver<Vec<cfd_feed::Cfd>>>, rx_cfds: &State<watch::Receiver<Vec<Cfd>>>,
rx_order: &State<watch::Receiver<Option<Order>>>, rx_order: &State<watch::Receiver<Option<Order>>>,
rx_wallet: &State<watch::Receiver<WalletInfo>>, rx_wallet: &State<watch::Receiver<WalletInfo>>,
rx_quote: &State<watch::Receiver<bitmex_price_feed::Quote>>, rx_quote: &State<watch::Receiver<bitmex_price_feed::Quote>>,
@ -38,11 +39,11 @@ pub async fn maker_feed(
let order = rx_order.borrow().clone(); let order = rx_order.borrow().clone();
yield order.to_sse_event(); yield order.to_sse_event();
let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event();
let quote = rx_quote.borrow().clone(); let quote = rx_quote.borrow().clone();
yield quote.to_sse_event(); yield quote.to_sse_event();
let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event();
loop{ loop{
select! { select! {
@ -57,7 +58,6 @@ pub async fn maker_feed(
Ok(()) = rx_cfds.changed() => { Ok(()) = rx_cfds.changed() => {
let cfds = rx_cfds.borrow().clone(); let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event(); yield cfds.to_sse_event();
}
Ok(()) = rx_quote.changed() => { Ok(()) = rx_quote.changed() => {
let quote = rx_quote.borrow().clone(); let quote = rx_quote.borrow().clone();
yield quote.to_sse_event(); yield quote.to_sse_event();

12
daemon/src/routes_taker.rs

@ -1,8 +1,9 @@
use crate::model::cfd::{calculate_buy_margin, Order, OrderId}; use crate::bitmex_price_feed;
use crate::model::cfd::{calculate_buy_margin, Cfd, Order, OrderId};
use crate::model::{Leverage, Usd, WalletInfo}; use crate::model::{Leverage, Usd, WalletInfo};
use crate::routes::EmbeddedFileExt; use crate::routes::EmbeddedFileExt;
use crate::taker_cfd;
use crate::to_sse_event::ToSseEvent; use crate::to_sse_event::ToSseEvent;
use crate::{bitmex_price_feed, cfd_feed, taker_cfd};
use bdk::bitcoin::Amount; use bdk::bitcoin::Amount;
use rocket::http::{ContentType, Status}; use rocket::http::{ContentType, Status};
use rocket::response::stream::EventStream; use rocket::response::stream::EventStream;
@ -19,7 +20,7 @@ use xtra::Address;
#[rocket::get("/feed")] #[rocket::get("/feed")]
pub async fn feed( pub async fn feed(
rx_cfds: &State<watch::Receiver<Vec<cfd_feed::Cfd>>>, rx_cfds: &State<watch::Receiver<Vec<Cfd>>>,
rx_order: &State<watch::Receiver<Option<Order>>>, rx_order: &State<watch::Receiver<Option<Order>>>,
rx_wallet: &State<watch::Receiver<WalletInfo>>, rx_wallet: &State<watch::Receiver<WalletInfo>>,
rx_quote: &State<watch::Receiver<bitmex_price_feed::Quote>>, rx_quote: &State<watch::Receiver<bitmex_price_feed::Quote>>,
@ -36,11 +37,11 @@ pub async fn feed(
let order = rx_order.borrow().clone(); let order = rx_order.borrow().clone();
yield order.to_sse_event(); yield order.to_sse_event();
let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event();
let quote = rx_quote.borrow().clone(); let quote = rx_quote.borrow().clone();
yield quote.to_sse_event(); yield quote.to_sse_event();
let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event();
loop{ loop{
select! { select! {
@ -55,7 +56,6 @@ pub async fn feed(
Ok(()) = rx_cfds.changed() => { Ok(()) = rx_cfds.changed() => {
let cfds = rx_cfds.borrow().clone(); let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event(); yield cfds.to_sse_event();
}
Ok(()) = rx_quote.changed() => { Ok(()) = rx_quote.changed() => {
let quote = rx_quote.borrow().clone(); let quote = rx_quote.borrow().clone();
yield quote.to_sse_event(); yield quote.to_sse_event();

33
daemon/src/taker.rs

@ -1,4 +1,3 @@
use crate::cfd_feed::CfdFeed;
use crate::db::load_all_cfds; use crate::db::load_all_cfds;
use crate::model::WalletInfo; use crate::model::WalletInfo;
use crate::wallet::Wallet; use crate::wallet::Wallet;
@ -7,7 +6,7 @@ use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1};
use bdk::bitcoin::Network; use bdk::bitcoin::Network;
use clap::Clap; use clap::Clap;
use futures::StreamExt; use futures::StreamExt;
use model::cfd::Order; use model::cfd::{Cfd, Order};
use rocket::fairing::AdHoc; use rocket::fairing::AdHoc;
use rocket_db_pools::Database; use rocket_db_pools::Database;
use seed::Seed; use seed::Seed;
@ -24,7 +23,6 @@ use xtra::Actor;
mod actors; mod actors;
mod bitmex_price_feed; mod bitmex_price_feed;
mod cfd_feed;
mod cleanup; mod cleanup;
mod db; mod db;
mod keypair; mod keypair;
@ -104,7 +102,7 @@ async fn main() -> Result<()> {
let oracle = schnorrsig::KeyPair::new(SECP256K1, &mut rand::thread_rng()); // TODO: Fetch oracle public key from oracle. let oracle = schnorrsig::KeyPair::new(SECP256K1, &mut rand::thread_rng()); // TODO: Fetch oracle public key from oracle.
let (cfd_feed_updater, cfd_feed_receiver) = CfdFeed::new(cfd_feed::Role::Taker); let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::<Vec<Cfd>>(vec![]);
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None); let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info); let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info);
@ -121,7 +119,7 @@ async fn main() -> Result<()> {
} }
}; };
let (task, mut quote_updates) = bitmex_price_feed::new().await?; let (task, quote_updates) = bitmex_price_feed::new().await?;
tokio::spawn(task); tokio::spawn(task);
let figment = rocket::Config::figment() let figment = rocket::Config::figment()
@ -132,7 +130,7 @@ async fn main() -> Result<()> {
.manage(cfd_feed_receiver) .manage(cfd_feed_receiver)
.manage(order_feed_receiver) .manage(order_feed_receiver)
.manage(wallet_feed_receiver) .manage(wallet_feed_receiver)
.manage(quote_updates.clone()) .manage(quote_updates)
.attach(Db::init()) .attach(Db::init())
.attach(AdHoc::try_on_ignite( .attach(AdHoc::try_on_ignite(
"SQL migrations", "SQL migrations",
@ -170,7 +168,7 @@ async fn main() -> Result<()> {
db.clone(), db.clone(),
wallet.clone(), wallet.clone(),
schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle), schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle),
cfd_feed_updater, cfd_feed_sender,
order_feed_sender, order_feed_sender,
send_to_maker, send_to_maker,
monitor_actor_address, monitor_actor_address,
@ -191,27 +189,6 @@ async fn main() -> Result<()> {
), ),
); );
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
tokio::spawn({
let cfd_actor_inbox = cfd_actor_inbox.clone();
async move {
loop {
let quote = quote_updates.borrow().clone();
if cfd_actor_inbox.do_send_async(taker_cfd::PriceUpdate(quote)).await.is_err() {
tracing::warn!(
"Could not communicate with the message handler, stopping price feed sync"
);
return;
}
if quote_updates.changed().await.is_err() {
tracing::warn!("BitMex price feed receiver not available, stopping price feed sync");
return;
}
}
}
});
Ok(rocket.manage(cfd_actor_inbox)) Ok(rocket.manage(cfd_actor_inbox))
}, },

50
daemon/src/taker_cfd.rs

@ -1,8 +1,7 @@
use crate::actors::log_error; use crate::actors::log_error;
use crate::cfd_feed::CfdFeed;
use crate::db::{ use crate::db::{
insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_cfd_by_order_id, insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds,
load_order_by_id, load_cfd_by_order_id, load_order_by_id,
}; };
use crate::model::cfd::{ use crate::model::cfd::{
Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId, Origin, Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId, Origin,
@ -11,7 +10,7 @@ use crate::model::Usd;
use crate::monitor::{self, MonitorParams}; use crate::monitor::{self, MonitorParams};
use crate::wallet::Wallet; use crate::wallet::Wallet;
use crate::wire::SetupMsg; use crate::wire::SetupMsg;
use crate::{bitmex_price_feed, send_to_socket, setup_contract, wire}; use crate::{send_to_socket, setup_contract, wire};
use anyhow::{Context as _, Result}; use anyhow::{Context as _, Result};
use async_trait::async_trait; use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::secp256k1::schnorrsig;
@ -36,8 +35,6 @@ pub struct CfdSetupCompleted {
pub dlc: Result<Dlc>, pub dlc: Result<Dlc>,
} }
pub struct PriceUpdate(pub bitmex_price_feed::Quote);
enum SetupState { enum SetupState {
Active { Active {
sender: mpsc::UnboundedSender<wire::SetupMsg>, sender: mpsc::UnboundedSender<wire::SetupMsg>,
@ -49,7 +46,7 @@ pub struct Actor {
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Wallet,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed: CfdFeed, cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>, order_feed_actor_inbox: watch::Sender<Option<Order>>,
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>, send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>,
monitor_actor: Address<monitor::Actor<Actor>>, monitor_actor: Address<monitor::Actor<Actor>>,
@ -62,16 +59,14 @@ impl Actor {
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Wallet,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed: CfdFeed, cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>, order_feed_actor_inbox: watch::Sender<Option<Order>>,
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>, send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>,
monitor_actor: Address<monitor::Actor<Actor>>, monitor_actor: Address<monitor::Actor<Actor>>,
cfds: Vec<Cfd>, cfds: Vec<Cfd>,
) -> Result<Self> { ) -> Result<Self> {
let mut conn = db.acquire().await?; let mut conn = db.acquire().await?;
cfd_feed_actor_inbox.send(load_all_cfds(&mut conn).await?)?;
// populate the CFD feed with existing CFDs
cfd_feed.update(&mut conn).await?;
for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) { for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) {
let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?; let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?;
@ -90,7 +85,7 @@ impl Actor {
db, db,
wallet, wallet,
oracle_pk, oracle_pk,
cfd_feed, cfd_feed_actor_inbox,
order_feed_actor_inbox, order_feed_actor_inbox,
send_to_maker, send_to_maker,
monitor_actor, monitor_actor,
@ -117,7 +112,8 @@ impl Actor {
insert_cfd(cfd, &mut conn).await?; insert_cfd(cfd, &mut conn).await?;
self.cfd_feed.update(&mut conn).await?; self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
self.send_to_maker self.send_to_maker
.do_send_async(wire::TakerToMaker::TakeOrder { order_id, quantity }) .do_send_async(wire::TakerToMaker::TakeOrder { order_id, quantity })
.await?; .await?;
@ -166,7 +162,8 @@ impl Actor {
) )
.await?; .await?;
self.cfd_feed.update(&mut conn).await?; self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let contract_future = setup_contract::new( let contract_future = setup_contract::new(
@ -212,7 +209,8 @@ impl Actor {
) )
.await?; .await?;
self.cfd_feed.update(&mut conn).await?; self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
Ok(()) Ok(())
} }
@ -252,7 +250,8 @@ impl Actor {
) )
.await?; .await?;
self.cfd_feed.update(&mut conn).await?; self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let txid = self let txid = self
.wallet .wallet
@ -300,14 +299,6 @@ impl Actor {
Ok(()) Ok(())
} }
async fn handle_price_update(&mut self, quote: bitmex_price_feed::Quote) -> Result<()> {
self.cfd_feed.set_current_price(quote);
let mut conn = self.db.acquire().await?;
self.cfd_feed.update(&mut conn).await?;
Ok(())
}
} }
#[async_trait] #[async_trait]
@ -366,13 +357,6 @@ impl Handler<monitor::Event> for Actor {
} }
} }
#[async_trait]
impl Handler<PriceUpdate> for Actor {
async fn handle(&mut self, msg: PriceUpdate, _ctx: &mut Context<Self>) {
log_error!(self.handle_price_update(msg.0))
}
}
impl Message for TakeOffer { impl Message for TakeOffer {
type Result = (); type Result = ();
} }
@ -386,8 +370,4 @@ impl Message for CfdSetupCompleted {
type Result = (); type Result = ();
} }
impl Message for PriceUpdate {
type Result = ();
}
impl xtra::Actor for Actor {} impl xtra::Actor for Actor {}

62
daemon/src/to_sse_event.rs

@ -1,11 +1,34 @@
use crate::model::cfd::OrderId; use crate::model::cfd::OrderId;
use crate::model::{Leverage, Position, TradingPair, Usd}; use crate::model::{Leverage, Position, TradingPair, Usd};
use crate::{bitmex_price_feed, cfd_feed, model}; use crate::{bitmex_price_feed, model};
use bdk::bitcoin::Amount; use bdk::bitcoin::Amount;
use rocket::response::stream::Event; use rocket::response::stream::Event;
use serde::Serialize; use serde::Serialize;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Serialize)]
pub struct Cfd {
pub order_id: OrderId,
pub initial_price: Usd,
pub leverage: Leverage,
pub trading_pair: TradingPair,
pub position: Position,
pub liquidation_price: Usd,
pub quantity_usd: Usd,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
pub margin: Amount,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
pub profit_btc: Amount,
pub profit_usd: Usd,
pub state: String,
pub state_transition_timestamp: u64,
}
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct CfdOrder { pub struct CfdOrder {
pub id: OrderId, pub id: OrderId,
@ -29,9 +52,42 @@ pub trait ToSseEvent {
fn to_sse_event(&self) -> Event; fn to_sse_event(&self) -> Event;
} }
impl ToSseEvent for Vec<cfd_feed::Cfd> { impl ToSseEvent for Vec<model::cfd::Cfd> {
// TODO: This conversion can fail, we might want to change the API
fn to_sse_event(&self) -> Event { fn to_sse_event(&self) -> Event {
Event::json(&self).event("cfds") let cfds = self
.iter()
.map(|cfd| {
// TODO: Get the actual current price here
let current_price = Usd::ZERO;
let (profit_btc, profit_usd) = cfd.profit(current_price).unwrap();
Cfd {
order_id: cfd.order.id,
initial_price: cfd.order.price,
leverage: cfd.order.leverage,
trading_pair: cfd.order.trading_pair.clone(),
position: cfd.position(),
liquidation_price: cfd.order.liquidation_price,
quantity_usd: cfd.quantity_usd,
profit_btc,
profit_usd,
state: cfd.state.to_string(),
state_transition_timestamp: cfd
.state
.get_transition_timestamp()
.duration_since(UNIX_EPOCH)
.expect("timestamp to be convertable to duration since epoch")
.as_secs(),
// TODO: Depending on the state the margin might be set (i.e. in Open we save it
// in the DB internally) and does not have to be calculated
margin: cfd.margin().unwrap(),
}
})
.collect::<Vec<Cfd>>();
Event::json(&cfds).event("cfds")
} }
} }

Loading…
Cancel
Save