From ce2fd805be1a8dcc35d19b2a22ca42db1fc42f7b Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 8 Dec 2021 12:12:26 +1100 Subject: [PATCH] Initialize projection::Actor without initial quote This will allow us to simplify the bitmex API actor. --- daemon/src/bitmex_price_feed.rs | 6 ++---- daemon/src/maker.rs | 4 ++-- daemon/src/projection.rs | 17 ++++++----------- daemon/src/routes_taker.rs | 10 +++++++++- daemon/src/taker.rs | 4 ++-- daemon/src/to_sse_event.rs | 2 +- daemon/tests/harness/mod.rs | 25 +++++++------------------ 7 files changed, 29 insertions(+), 39 deletions(-) diff --git a/daemon/src/bitmex_price_feed.rs b/daemon/src/bitmex_price_feed.rs index bc6b9b6..b2387fc 100644 --- a/daemon/src/bitmex_price_feed.rs +++ b/daemon/src/bitmex_price_feed.rs @@ -43,7 +43,7 @@ impl Actor { self.tasks.add(connect_until_successful(this)); } - async fn handle(&mut self, _: Connect, ctx: &mut xtra::Context) -> Result { + async fn handle(&mut self, _: Connect, ctx: &mut xtra::Context) -> Result<()> { tracing::debug!("Connecting to BitMex realtime API"); let (connection, _) = tokio_tungstenite::connect_async(URL).await?; @@ -55,8 +55,6 @@ impl Actor { tracing::info!("Connected to BitMex realtime API"); - let initial_quote = quotes.select_next_some().await?; - let this = ctx.address().expect("we are alive"); self.tasks.add({ @@ -78,7 +76,7 @@ impl Actor { } }); - Ok(initial_quote) + Ok(()) } } diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 086b3dc..378a5d0 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -262,12 +262,12 @@ async fn main() -> Result<()> { .run(); tasks.add(task); - let init_quote = price_feed_address + price_feed_address .send(bitmex_price_feed::Connect) .await??; let (proj_actor, projection_feeds) = - projection::Actor::new(db.clone(), Role::Maker, bitcoin_network, init_quote).await?; + projection::Actor::new(db.clone(), Role::Maker, bitcoin_network).await?; tasks.add(projection_context.run(proj_actor)); let listener_stream = futures::stream::poll_fn(move |ctx| { diff --git a/daemon/src/projection.rs b/daemon/src/projection.rs index 5b60ecc..7970f31 100644 --- a/daemon/src/projection.rs +++ b/daemon/src/projection.rs @@ -42,19 +42,14 @@ pub struct Actor { } pub struct Feeds { - pub quote: watch::Receiver, + pub quote: watch::Receiver>, pub order: watch::Receiver>, pub connected_takers: watch::Receiver>, pub cfds: watch::Receiver>, } impl Actor { - pub async fn new( - db: sqlx::SqlitePool, - role: Role, - network: Network, - init_quote: bitmex_price_feed::Quote, - ) -> Result<(Self, Feeds)> { + pub async fn new(db: sqlx::SqlitePool, role: Role, network: Network) -> Result<(Self, Feeds)> { let mut conn = db.acquire().await?; let init_cfds = db::load_all_cfds(&mut conn).await?; @@ -63,12 +58,12 @@ impl Actor { network, cfds: init_cfds, proposals: HashMap::new(), - quote: Some(init_quote.clone()), + quote: None, }; let (tx_cfds, rx_cfds) = watch::channel(state.to_cfds()); let (tx_order, rx_order) = watch::channel(None); - let (tx_quote, rx_quote) = watch::channel(init_quote.into()); + let (tx_quote, rx_quote) = watch::channel(None); let (tx_connected_takers, rx_connected_takers) = watch::channel(Vec::new()); Ok(( @@ -96,7 +91,7 @@ impl Actor { struct Tx { pub cfds: watch::Sender>, pub order: watch::Sender>, - pub quote: watch::Sender, + pub quote: watch::Sender>, // TODO: Use this channel to communicate maker status as well with generic // ID of connected counterparties pub connected_takers: watch::Sender>, @@ -175,7 +170,7 @@ impl Actor { fn handle(&mut self, msg: Update) { let quote = msg.0; self.state.update_quote(quote.clone()); - let _ = self.tx.quote.send(quote.into()); + let _ = self.tx.quote.send(Some(quote.into())); let _ = self.tx.cfds.send(self.state.to_cfds()); } fn handle(&mut self, msg: Update>) { diff --git a/daemon/src/routes_taker.rs b/daemon/src/routes_taker.rs index 7e1e49b..7011dd6 100644 --- a/daemon/src/routes_taker.rs +++ b/daemon/src/routes_taker.rs @@ -124,7 +124,15 @@ pub async fn post_cfd_action( } CfdAction::Commit => cfd_actor.send(taker_cfd::Commit { order_id: id }).await, CfdAction::Settle => { - let quote: bitmex_price_feed::Quote = feeds.quote.borrow().clone().into(); + let quote: bitmex_price_feed::Quote = match feeds.quote.borrow().as_ref() { + Some(quote) => quote.clone().into(), + None => { + return Err(HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR) + .title("Quote unavailable") + .detail("Cannot settle without current price information.")) + } + }; + let current_price = quote.for_taker(); cfd_actor .send(taker_cfd::ProposeSettlement { diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 467d376..cba7380 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -246,12 +246,12 @@ async fn main() -> Result<()> { .run(); tasks.add(task); - let init_quote = price_feed_address + price_feed_address .send(bitmex_price_feed::Connect) .await??; let (proj_actor, projection_feeds) = - projection::Actor::new(db.clone(), Role::Taker, bitcoin_network, init_quote).await?; + projection::Actor::new(db.clone(), Role::Taker, bitcoin_network).await?; tasks.add(projection_context.run(proj_actor)); let possible_addresses = resolve_maker_addresses(&opts.maker).await?; diff --git a/daemon/src/to_sse_event.rs b/daemon/src/to_sse_event.rs index eec421e..f4d206e 100644 --- a/daemon/src/to_sse_event.rs +++ b/daemon/src/to_sse_event.rs @@ -98,7 +98,7 @@ impl ToSseEvent for connection::ConnectionStatus { } } -impl ToSseEvent for Quote { +impl ToSseEvent for Option { fn to_sse_event(&self) -> Event { Event::json(self).event("quote") } diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index 69d9334..5db23bc 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -3,10 +3,9 @@ use crate::harness::mocks::oracle::OracleActor; use crate::harness::mocks::wallet::WalletActor; use crate::schnorrsig; use ::bdk::bitcoin::Network; -use daemon::bitmex_price_feed::Quote; use daemon::connection::{connect, ConnectionStatus}; use daemon::model::cfd::{OrderId, Role}; -use daemon::model::{self, Price, Timestamp, Usd}; +use daemon::model::{self, Price, Usd}; use daemon::projection::{Cfd, CfdOrder, Feeds, Identity}; use daemon::seed::Seed; use daemon::{ @@ -168,10 +167,9 @@ impl Maker { .await .unwrap(); - let (proj_actor, feeds) = - projection::Actor::new(db, Role::Maker, Network::Testnet, dummy_quote()) - .await - .unwrap(); + let (proj_actor, feeds) = projection::Actor::new(db, Role::Maker, Network::Testnet) + .await + .unwrap(); tasks.add(projection_context.run(proj_actor)); let address = listener.local_addr().unwrap(); @@ -296,10 +294,9 @@ impl Taker { .await .unwrap(); - let (proj_actor, feeds) = - projection::Actor::new(db, Role::Taker, Network::Testnet, dummy_quote()) - .await - .unwrap(); + let (proj_actor, feeds) = projection::Actor::new(db, Role::Taker, Network::Testnet) + .await + .unwrap(); tasks.add(projection_context.run(proj_actor)); tasks.add(connect( @@ -390,14 +387,6 @@ pub fn dummy_price() -> Price { Price::new(dec!(50_000)).expect("to not fail") } -pub fn dummy_quote() -> Quote { - Quote { - timestamp: Timestamp::now(), - bid: dummy_price(), - ask: dummy_price(), - } -} - pub fn dummy_new_order() -> maker_cfd::NewOrder { maker_cfd::NewOrder { price: dummy_price(),