diff --git a/daemon/src/bitmex_price_feed.rs b/daemon/src/bitmex_price_feed.rs index 766f82e..aa23c03 100644 --- a/daemon/src/bitmex_price_feed.rs +++ b/daemon/src/bitmex_price_feed.rs @@ -1,8 +1,7 @@ use crate::model::Usd; -use anyhow::{Context, Result}; +use anyhow::Result; use futures::{StreamExt, TryStreamExt}; use rust_decimal::Decimal; -use rust_decimal_macros::dec; use std::convert::TryFrom; use std::future::Future; use std::time::SystemTime; @@ -41,7 +40,7 @@ pub async fn new() -> Result<(impl Future, watch::Receiver)> Ok((task, receiver)) } -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct Quote { pub timestamp: SystemTime, pub bid: Usd, @@ -68,24 +67,6 @@ impl Quote { ask: Usd::from(Decimal::try_from(quote.ask_price)?), })) } - - #[allow(dead_code)] // Not used by all binaries. - pub fn for_maker(&self) -> Usd { - self.ask - } - - #[allow(dead_code)] // Not used by all binaries. - pub fn for_taker(&self) -> Usd { - // TODO: Verify whether this is correct - self.mid_range().unwrap() - } - - fn mid_range(&self) -> Result { - Ok(Usd((self.bid.checked_add(self.ask))? - .0 - .checked_div(dec!(2)) - .context("division error")?)) - } } mod wire { diff --git a/daemon/src/cfd_feed.rs b/daemon/src/cfd_feed.rs deleted file mode 100644 index ac7cc75..0000000 --- a/daemon/src/cfd_feed.rs +++ /dev/null @@ -1,118 +0,0 @@ -use crate::db::load_all_cfds; -use crate::model::cfd::OrderId; -use crate::model::{Leverage, Position, TradingPair, Usd}; -use crate::{bitmex_price_feed, model}; -use anyhow::{Context as _, Result}; -use bdk::bitcoin::Amount; -use serde::Serialize; -use sqlx::pool::PoolConnection; -use sqlx::Sqlite; -use std::time::UNIX_EPOCH; -use tokio::sync::watch; - -#[allow(dead_code)] -/// Role of the actor's owner in the upcoming contract -pub enum Role { - Maker, - Taker, -} - -#[derive(Debug, Clone, Serialize)] -pub struct Cfd { - pub order_id: OrderId, - pub initial_price: Usd, - - pub leverage: Leverage, - pub trading_pair: TradingPair, - pub position: Position, - pub liquidation_price: Usd, - - pub quantity_usd: Usd, - - #[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")] - pub margin: Amount, - - #[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")] - pub profit_btc: Amount, - pub profit_usd: Usd, - - pub state: String, - pub state_transition_timestamp: u64, -} - -pub struct CfdFeed { - role: Role, - cfd_feed_sender: watch::Sender>, - current_price: Option, -} - -impl CfdFeed { - pub fn new(role: Role) -> (Self, watch::Receiver>) { - let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::>(vec![]); - ( - Self { - role, - cfd_feed_sender, - current_price: None, - }, - cfd_feed_receiver, - ) - } - - /// Update price from BitMex. It is recommended to call `update()` afterwards. - pub fn set_current_price(&mut self, quote: bitmex_price_feed::Quote) { - self.current_price = Some(quote); - } - - /// Updates the CFD feed in the frontend - pub async fn update(&self, conn: &mut PoolConnection) -> Result<()> { - let cfds = load_all_cfds(conn).await?; - - let cfds = cfds - .iter() - .map(|cfd| { - let (profit_btc, profit_usd) = self.calculate_profit(cfd); - - Cfd { - order_id: cfd.order.id, - initial_price: cfd.order.price, - leverage: cfd.order.leverage, - trading_pair: cfd.order.trading_pair.clone(), - position: cfd.position(), - liquidation_price: cfd.order.liquidation_price, - quantity_usd: cfd.quantity_usd, - profit_btc, - profit_usd, - state: cfd.state.to_string(), - state_transition_timestamp: cfd - .state - .get_transition_timestamp() - .duration_since(UNIX_EPOCH) - .expect("timestamp to be convertable to duration since epoch") - .as_secs(), - - // TODO: Depending on the state the margin might be set (i.e. in Open we save it - // in the DB internally) and does not have to be calculated - margin: cfd.margin().unwrap(), - } - }) - .collect::>(); - - self.cfd_feed_sender - .send(cfds) - .context("Could not update CFD feed") - } - - fn calculate_profit(&self, cfd: &model::cfd::Cfd) -> (Amount, Usd) { - if let Some(quote) = &self.current_price { - cfd.profit(match self.role { - Role::Taker => quote.for_taker(), - Role::Maker => quote.for_maker(), - }) - .unwrap() - } else { - // No current price set yet, returning no profits - (Amount::ZERO, Usd::ZERO) - } - } -} diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index d98b6a9..9eb5592 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -1,5 +1,4 @@ use crate::auth::MAKER_USERNAME; -use crate::cfd_feed::CfdFeed; use crate::db::load_all_cfds; use crate::seed::Seed; use crate::wallet::Wallet; @@ -7,7 +6,7 @@ use anyhow::{Context, Result}; use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1}; use bdk::bitcoin::Network; use clap::Clap; -use model::cfd::Order; +use model::cfd::{Cfd, Order}; use model::WalletInfo; use rocket::fairing::AdHoc; use rocket_db_pools::Database; @@ -21,7 +20,6 @@ use xtra::spawn::TokioGlobalSpawnExt; mod actors; mod auth; mod bitmex_price_feed; -mod cfd_feed; mod cleanup; mod db; mod keypair; @@ -108,7 +106,7 @@ async fn main() -> Result<()> { let oracle = schnorrsig::KeyPair::new(SECP256K1, &mut rand::thread_rng()); // TODO: Fetch oracle public key from oracle. - let (cfd_feed_updater, cfd_feed_receiver) = CfdFeed::new(cfd_feed::Role::Maker); + let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::>(vec![]); let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::(wallet_info); @@ -121,7 +119,7 @@ async fn main() -> Result<()> { tracing::info!("Listening on {}", local_addr); - let (task, mut quote_updates) = bitmex_price_feed::new().await?; + let (task, quote_updates) = bitmex_price_feed::new().await?; tokio::spawn(task); rocket::custom(figment) @@ -129,7 +127,7 @@ async fn main() -> Result<()> { .manage(order_feed_receiver) .manage(wallet_feed_receiver) .manage(auth_password) - .manage(quote_updates.clone()) + .manage(quote_updates) .attach(Db::init()) .attach(AdHoc::try_on_ignite( "SQL migrations", @@ -166,7 +164,7 @@ async fn main() -> Result<()> { db, wallet.clone(), schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle), - cfd_feed_updater, + cfd_feed_sender, order_feed_sender, maker_inc_connections_address.clone(), monitor_actor_address, @@ -201,28 +199,6 @@ async fn main() -> Result<()> { }); tokio::spawn(maker_inc_connections_address.attach_stream(listener_stream)); - tokio::spawn({ - let cfd_actor_inbox = cfd_maker_actor_inbox.clone(); - - async move { - loop { - let quote = quote_updates.borrow().clone(); - - if cfd_actor_inbox.do_send_async(maker_cfd::PriceUpdate(quote)).await.is_err() { - tracing::warn!( - "Could not communicate with the message handler, stopping price feed sync" - ); - return; - } - - if quote_updates.changed().await.is_err() { - tracing::warn!("BitMex price feed receiver not available, stopping price feed sync"); - return; - } - } - } - }); - tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); Ok(rocket.manage(cfd_maker_actor_inbox)) diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 562345b..27bd369 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -1,15 +1,14 @@ use crate::actors::log_error; -use crate::cfd_feed::CfdFeed; use crate::db::{ - insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_cfd_by_order_id, - load_order_by_id, + insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds, + load_cfd_by_order_id, load_order_by_id, }; use crate::maker_inc_connections::TakerCommand; use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId}; use crate::model::{TakerId, Usd}; use crate::monitor::MonitorParams; use crate::wallet::Wallet; -use crate::{bitmex_price_feed, maker_inc_connections, monitor, setup_contract, wire}; +use crate::{maker_inc_connections, monitor, setup_contract, wire}; use anyhow::{Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; @@ -44,13 +43,11 @@ pub struct TakerStreamMessage { pub item: Result, } -pub struct PriceUpdate(pub bitmex_price_feed::Quote); - pub struct Actor { db: sqlx::SqlitePool, wallet: Wallet, oracle_pk: schnorrsig::PublicKey, - cfd_feed: CfdFeed, + cfd_feed_actor_inbox: watch::Sender>, order_feed_sender: watch::Sender>, takers: Address, current_order_id: Option, @@ -72,7 +69,7 @@ impl Actor { db: sqlx::SqlitePool, wallet: Wallet, oracle_pk: schnorrsig::PublicKey, - cfd_feed: CfdFeed, + cfd_feed_actor_inbox: watch::Sender>, order_feed_sender: watch::Sender>, takers: Address, monitor_actor: Address>, @@ -81,7 +78,7 @@ impl Actor { let mut conn = db.acquire().await?; // populate the CFD feed with existing CFDs - cfd_feed.update(&mut conn).await?; + cfd_feed_actor_inbox.send(load_all_cfds(&mut conn).await?)?; for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) { let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?; @@ -100,7 +97,7 @@ impl Actor { db, wallet, oracle_pk, - cfd_feed, + cfd_feed_actor_inbox, order_feed_sender, takers, current_order_id: None, @@ -188,7 +185,8 @@ impl Actor { ) .await?; - self.cfd_feed.update(&mut conn).await?; + self.cfd_feed_actor_inbox + .send(load_all_cfds(&mut conn).await?)?; let txid = self .wallet @@ -251,7 +249,8 @@ impl Actor { ); insert_cfd(cfd, &mut conn).await?; - self.cfd_feed.update(&mut conn).await?; + self.cfd_feed_actor_inbox + .send(load_all_cfds(&mut conn).await?)?; // 3. Remove current order self.current_order_id = None; @@ -311,7 +310,8 @@ impl Actor { }) .await?; - self.cfd_feed.update(&mut conn).await?; + self.cfd_feed_actor_inbox + .send(load_all_cfds(&mut conn).await?)?; let contract_future = setup_contract::new( self.takers.clone().into_sink().with(move |msg| { @@ -381,7 +381,8 @@ impl Actor { command: TakerCommand::NotifyOrderRejected { id: msg.order_id }, }) .await?; - self.cfd_feed.update(&mut conn).await?; + self.cfd_feed_actor_inbox + .send(load_all_cfds(&mut conn).await?)?; // Remove order for all self.current_order_id = None; @@ -418,14 +419,6 @@ impl Actor { Ok(()) } - - async fn handle_price_update(&mut self, quote: bitmex_price_feed::Quote) -> Result<()> { - self.cfd_feed.set_current_price(quote); - - let mut conn = self.db.acquire().await?; - self.cfd_feed.update(&mut conn).await?; - Ok(()) - } } #[async_trait] @@ -502,13 +495,6 @@ impl Handler for Actor { } } -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: PriceUpdate, _ctx: &mut Context) { - log_error!(self.handle_price_update(msg.0)) - } -} - impl Message for NewOrder { type Result = (); } @@ -534,8 +520,4 @@ impl Message for TakerStreamMessage { type Result = KeepRunning; } -impl Message for PriceUpdate { - type Result = (); -} - impl xtra::Actor for Actor {} diff --git a/daemon/src/routes_maker.rs b/daemon/src/routes_maker.rs index 6529b20..aa5470e 100644 --- a/daemon/src/routes_maker.rs +++ b/daemon/src/routes_maker.rs @@ -1,9 +1,10 @@ use crate::auth::Authenticated; -use crate::model::cfd::{Order, OrderId, Origin}; +use crate::bitmex_price_feed; +use crate::maker_cfd; +use crate::model::cfd::{Cfd, Order, OrderId, Origin}; use crate::model::{Usd, WalletInfo}; use crate::routes::EmbeddedFileExt; use crate::to_sse_event::ToSseEvent; -use crate::{bitmex_price_feed, cfd_feed, maker_cfd}; use anyhow::Result; use rocket::http::{ContentType, Header, Status}; use rocket::response::stream::EventStream; @@ -20,7 +21,7 @@ use xtra::Address; #[rocket::get("/feed")] pub async fn maker_feed( - rx_cfds: &State>>, + rx_cfds: &State>>, rx_order: &State>>, rx_wallet: &State>, rx_quote: &State>, @@ -38,11 +39,11 @@ pub async fn maker_feed( let order = rx_order.borrow().clone(); yield order.to_sse_event(); + let cfds = rx_cfds.borrow().clone(); + yield cfds.to_sse_event(); let quote = rx_quote.borrow().clone(); yield quote.to_sse_event(); - let cfds = rx_cfds.borrow().clone(); - yield cfds.to_sse_event(); loop{ select! { @@ -57,7 +58,6 @@ pub async fn maker_feed( Ok(()) = rx_cfds.changed() => { let cfds = rx_cfds.borrow().clone(); yield cfds.to_sse_event(); - } Ok(()) = rx_quote.changed() => { let quote = rx_quote.borrow().clone(); yield quote.to_sse_event(); diff --git a/daemon/src/routes_taker.rs b/daemon/src/routes_taker.rs index 34ee977..07f2c32 100644 --- a/daemon/src/routes_taker.rs +++ b/daemon/src/routes_taker.rs @@ -1,8 +1,9 @@ -use crate::model::cfd::{calculate_buy_margin, Order, OrderId}; +use crate::bitmex_price_feed; +use crate::model::cfd::{calculate_buy_margin, Cfd, Order, OrderId}; use crate::model::{Leverage, Usd, WalletInfo}; use crate::routes::EmbeddedFileExt; +use crate::taker_cfd; use crate::to_sse_event::ToSseEvent; -use crate::{bitmex_price_feed, cfd_feed, taker_cfd}; use bdk::bitcoin::Amount; use rocket::http::{ContentType, Status}; use rocket::response::stream::EventStream; @@ -19,7 +20,7 @@ use xtra::Address; #[rocket::get("/feed")] pub async fn feed( - rx_cfds: &State>>, + rx_cfds: &State>>, rx_order: &State>>, rx_wallet: &State>, rx_quote: &State>, @@ -36,11 +37,11 @@ pub async fn feed( let order = rx_order.borrow().clone(); yield order.to_sse_event(); + let cfds = rx_cfds.borrow().clone(); + yield cfds.to_sse_event(); let quote = rx_quote.borrow().clone(); yield quote.to_sse_event(); - let cfds = rx_cfds.borrow().clone(); - yield cfds.to_sse_event(); loop{ select! { @@ -55,7 +56,6 @@ pub async fn feed( Ok(()) = rx_cfds.changed() => { let cfds = rx_cfds.borrow().clone(); yield cfds.to_sse_event(); - } Ok(()) = rx_quote.changed() => { let quote = rx_quote.borrow().clone(); yield quote.to_sse_event(); diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 4a775e1..1e29145 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -1,4 +1,3 @@ -use crate::cfd_feed::CfdFeed; use crate::db::load_all_cfds; use crate::model::WalletInfo; use crate::wallet::Wallet; @@ -7,7 +6,7 @@ use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1}; use bdk::bitcoin::Network; use clap::Clap; use futures::StreamExt; -use model::cfd::Order; +use model::cfd::{Cfd, Order}; use rocket::fairing::AdHoc; use rocket_db_pools::Database; use seed::Seed; @@ -24,7 +23,6 @@ use xtra::Actor; mod actors; mod bitmex_price_feed; -mod cfd_feed; mod cleanup; mod db; mod keypair; @@ -104,7 +102,7 @@ async fn main() -> Result<()> { let oracle = schnorrsig::KeyPair::new(SECP256K1, &mut rand::thread_rng()); // TODO: Fetch oracle public key from oracle. - let (cfd_feed_updater, cfd_feed_receiver) = CfdFeed::new(cfd_feed::Role::Taker); + let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::>(vec![]); let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::(wallet_info); @@ -121,7 +119,7 @@ async fn main() -> Result<()> { } }; - let (task, mut quote_updates) = bitmex_price_feed::new().await?; + let (task, quote_updates) = bitmex_price_feed::new().await?; tokio::spawn(task); let figment = rocket::Config::figment() @@ -132,7 +130,7 @@ async fn main() -> Result<()> { .manage(cfd_feed_receiver) .manage(order_feed_receiver) .manage(wallet_feed_receiver) - .manage(quote_updates.clone()) + .manage(quote_updates) .attach(Db::init()) .attach(AdHoc::try_on_ignite( "SQL migrations", @@ -170,7 +168,7 @@ async fn main() -> Result<()> { db.clone(), wallet.clone(), schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle), - cfd_feed_updater, + cfd_feed_sender, order_feed_sender, send_to_maker, monitor_actor_address, @@ -191,27 +189,6 @@ async fn main() -> Result<()> { ), ); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); - tokio::spawn({ - let cfd_actor_inbox = cfd_actor_inbox.clone(); - - async move { - loop { - let quote = quote_updates.borrow().clone(); - - if cfd_actor_inbox.do_send_async(taker_cfd::PriceUpdate(quote)).await.is_err() { - tracing::warn!( - "Could not communicate with the message handler, stopping price feed sync" - ); - return; - } - - if quote_updates.changed().await.is_err() { - tracing::warn!("BitMex price feed receiver not available, stopping price feed sync"); - return; - } - } - } - }); Ok(rocket.manage(cfd_actor_inbox)) }, diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index bf36b54..e9dda36 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -1,8 +1,7 @@ use crate::actors::log_error; -use crate::cfd_feed::CfdFeed; use crate::db::{ - insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_cfd_by_order_id, - load_order_by_id, + insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds, + load_cfd_by_order_id, load_order_by_id, }; use crate::model::cfd::{ Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId, Origin, @@ -11,7 +10,7 @@ use crate::model::Usd; use crate::monitor::{self, MonitorParams}; use crate::wallet::Wallet; use crate::wire::SetupMsg; -use crate::{bitmex_price_feed, send_to_socket, setup_contract, wire}; +use crate::{send_to_socket, setup_contract, wire}; use anyhow::{Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; @@ -36,8 +35,6 @@ pub struct CfdSetupCompleted { pub dlc: Result, } -pub struct PriceUpdate(pub bitmex_price_feed::Quote); - enum SetupState { Active { sender: mpsc::UnboundedSender, @@ -49,7 +46,7 @@ pub struct Actor { db: sqlx::SqlitePool, wallet: Wallet, oracle_pk: schnorrsig::PublicKey, - cfd_feed: CfdFeed, + cfd_feed_actor_inbox: watch::Sender>, order_feed_actor_inbox: watch::Sender>, send_to_maker: Address>, monitor_actor: Address>, @@ -62,16 +59,14 @@ impl Actor { db: sqlx::SqlitePool, wallet: Wallet, oracle_pk: schnorrsig::PublicKey, - cfd_feed: CfdFeed, + cfd_feed_actor_inbox: watch::Sender>, order_feed_actor_inbox: watch::Sender>, send_to_maker: Address>, monitor_actor: Address>, cfds: Vec, ) -> Result { let mut conn = db.acquire().await?; - - // populate the CFD feed with existing CFDs - cfd_feed.update(&mut conn).await?; + cfd_feed_actor_inbox.send(load_all_cfds(&mut conn).await?)?; for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) { let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?; @@ -90,7 +85,7 @@ impl Actor { db, wallet, oracle_pk, - cfd_feed, + cfd_feed_actor_inbox, order_feed_actor_inbox, send_to_maker, monitor_actor, @@ -117,7 +112,8 @@ impl Actor { insert_cfd(cfd, &mut conn).await?; - self.cfd_feed.update(&mut conn).await?; + self.cfd_feed_actor_inbox + .send(load_all_cfds(&mut conn).await?)?; self.send_to_maker .do_send_async(wire::TakerToMaker::TakeOrder { order_id, quantity }) .await?; @@ -166,7 +162,8 @@ impl Actor { ) .await?; - self.cfd_feed.update(&mut conn).await?; + self.cfd_feed_actor_inbox + .send(load_all_cfds(&mut conn).await?)?; let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; let contract_future = setup_contract::new( @@ -212,7 +209,8 @@ impl Actor { ) .await?; - self.cfd_feed.update(&mut conn).await?; + self.cfd_feed_actor_inbox + .send(load_all_cfds(&mut conn).await?)?; Ok(()) } @@ -252,7 +250,8 @@ impl Actor { ) .await?; - self.cfd_feed.update(&mut conn).await?; + self.cfd_feed_actor_inbox + .send(load_all_cfds(&mut conn).await?)?; let txid = self .wallet @@ -300,14 +299,6 @@ impl Actor { Ok(()) } - - async fn handle_price_update(&mut self, quote: bitmex_price_feed::Quote) -> Result<()> { - self.cfd_feed.set_current_price(quote); - - let mut conn = self.db.acquire().await?; - self.cfd_feed.update(&mut conn).await?; - Ok(()) - } } #[async_trait] @@ -366,13 +357,6 @@ impl Handler for Actor { } } -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: PriceUpdate, _ctx: &mut Context) { - log_error!(self.handle_price_update(msg.0)) - } -} - impl Message for TakeOffer { type Result = (); } @@ -386,8 +370,4 @@ impl Message for CfdSetupCompleted { type Result = (); } -impl Message for PriceUpdate { - type Result = (); -} - impl xtra::Actor for Actor {} diff --git a/daemon/src/to_sse_event.rs b/daemon/src/to_sse_event.rs index 8d340cd..5716d8a 100644 --- a/daemon/src/to_sse_event.rs +++ b/daemon/src/to_sse_event.rs @@ -1,11 +1,34 @@ use crate::model::cfd::OrderId; use crate::model::{Leverage, Position, TradingPair, Usd}; -use crate::{bitmex_price_feed, cfd_feed, model}; +use crate::{bitmex_price_feed, model}; use bdk::bitcoin::Amount; use rocket::response::stream::Event; use serde::Serialize; use std::time::{SystemTime, UNIX_EPOCH}; +#[derive(Debug, Clone, Serialize)] +pub struct Cfd { + pub order_id: OrderId, + pub initial_price: Usd, + + pub leverage: Leverage, + pub trading_pair: TradingPair, + pub position: Position, + pub liquidation_price: Usd, + + pub quantity_usd: Usd, + + #[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")] + pub margin: Amount, + + #[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")] + pub profit_btc: Amount, + pub profit_usd: Usd, + + pub state: String, + pub state_transition_timestamp: u64, +} + #[derive(Debug, Clone, Serialize)] pub struct CfdOrder { pub id: OrderId, @@ -29,9 +52,42 @@ pub trait ToSseEvent { fn to_sse_event(&self) -> Event; } -impl ToSseEvent for Vec { +impl ToSseEvent for Vec { + // TODO: This conversion can fail, we might want to change the API fn to_sse_event(&self) -> Event { - Event::json(&self).event("cfds") + let cfds = self + .iter() + .map(|cfd| { + // TODO: Get the actual current price here + let current_price = Usd::ZERO; + let (profit_btc, profit_usd) = cfd.profit(current_price).unwrap(); + + Cfd { + order_id: cfd.order.id, + initial_price: cfd.order.price, + leverage: cfd.order.leverage, + trading_pair: cfd.order.trading_pair.clone(), + position: cfd.position(), + liquidation_price: cfd.order.liquidation_price, + quantity_usd: cfd.quantity_usd, + profit_btc, + profit_usd, + state: cfd.state.to_string(), + state_transition_timestamp: cfd + .state + .get_transition_timestamp() + .duration_since(UNIX_EPOCH) + .expect("timestamp to be convertable to duration since epoch") + .as_secs(), + + // TODO: Depending on the state the margin might be set (i.e. in Open we save it + // in the DB internally) and does not have to be calculated + margin: cfd.margin().unwrap(), + } + }) + .collect::>(); + + Event::json(&cfds).event("cfds") } }