Browse Source

Initialize projection::Actor without initial quote

This will allow us to simplify the bitmex API actor.
feature/supervised-connection
Thomas Eizinger 3 years ago
parent
commit
ce2fd805be
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 6
      daemon/src/bitmex_price_feed.rs
  2. 4
      daemon/src/maker.rs
  3. 17
      daemon/src/projection.rs
  4. 10
      daemon/src/routes_taker.rs
  5. 4
      daemon/src/taker.rs
  6. 2
      daemon/src/to_sse_event.rs
  7. 17
      daemon/tests/harness/mod.rs

6
daemon/src/bitmex_price_feed.rs

@ -43,7 +43,7 @@ impl Actor {
self.tasks.add(connect_until_successful(this)); self.tasks.add(connect_until_successful(this));
} }
async fn handle(&mut self, _: Connect, ctx: &mut xtra::Context<Self>) -> Result<Quote> { async fn handle(&mut self, _: Connect, ctx: &mut xtra::Context<Self>) -> Result<()> {
tracing::debug!("Connecting to BitMex realtime API"); tracing::debug!("Connecting to BitMex realtime API");
let (connection, _) = tokio_tungstenite::connect_async(URL).await?; let (connection, _) = tokio_tungstenite::connect_async(URL).await?;
@ -55,8 +55,6 @@ impl Actor {
tracing::info!("Connected to BitMex realtime API"); tracing::info!("Connected to BitMex realtime API");
let initial_quote = quotes.select_next_some().await?;
let this = ctx.address().expect("we are alive"); let this = ctx.address().expect("we are alive");
self.tasks.add({ self.tasks.add({
@ -78,7 +76,7 @@ impl Actor {
} }
}); });
Ok(initial_quote) Ok(())
} }
} }

4
daemon/src/maker.rs

@ -262,12 +262,12 @@ async fn main() -> Result<()> {
.run(); .run();
tasks.add(task); tasks.add(task);
let init_quote = price_feed_address price_feed_address
.send(bitmex_price_feed::Connect) .send(bitmex_price_feed::Connect)
.await??; .await??;
let (proj_actor, projection_feeds) = let (proj_actor, projection_feeds) =
projection::Actor::new(db.clone(), Role::Maker, bitcoin_network, init_quote).await?; projection::Actor::new(db.clone(), Role::Maker, bitcoin_network).await?;
tasks.add(projection_context.run(proj_actor)); tasks.add(projection_context.run(proj_actor));
let listener_stream = futures::stream::poll_fn(move |ctx| { let listener_stream = futures::stream::poll_fn(move |ctx| {

17
daemon/src/projection.rs

@ -42,19 +42,14 @@ pub struct Actor {
} }
pub struct Feeds { pub struct Feeds {
pub quote: watch::Receiver<Quote>, pub quote: watch::Receiver<Option<Quote>>,
pub order: watch::Receiver<Option<CfdOrder>>, pub order: watch::Receiver<Option<CfdOrder>>,
pub connected_takers: watch::Receiver<Vec<Identity>>, pub connected_takers: watch::Receiver<Vec<Identity>>,
pub cfds: watch::Receiver<Vec<Cfd>>, pub cfds: watch::Receiver<Vec<Cfd>>,
} }
impl Actor { impl Actor {
pub async fn new( pub async fn new(db: sqlx::SqlitePool, role: Role, network: Network) -> Result<(Self, Feeds)> {
db: sqlx::SqlitePool,
role: Role,
network: Network,
init_quote: bitmex_price_feed::Quote,
) -> Result<(Self, Feeds)> {
let mut conn = db.acquire().await?; let mut conn = db.acquire().await?;
let init_cfds = db::load_all_cfds(&mut conn).await?; let init_cfds = db::load_all_cfds(&mut conn).await?;
@ -63,12 +58,12 @@ impl Actor {
network, network,
cfds: init_cfds, cfds: init_cfds,
proposals: HashMap::new(), proposals: HashMap::new(),
quote: Some(init_quote.clone()), quote: None,
}; };
let (tx_cfds, rx_cfds) = watch::channel(state.to_cfds()); let (tx_cfds, rx_cfds) = watch::channel(state.to_cfds());
let (tx_order, rx_order) = watch::channel(None); let (tx_order, rx_order) = watch::channel(None);
let (tx_quote, rx_quote) = watch::channel(init_quote.into()); let (tx_quote, rx_quote) = watch::channel(None);
let (tx_connected_takers, rx_connected_takers) = watch::channel(Vec::new()); let (tx_connected_takers, rx_connected_takers) = watch::channel(Vec::new());
Ok(( Ok((
@ -96,7 +91,7 @@ impl Actor {
struct Tx { struct Tx {
pub cfds: watch::Sender<Vec<Cfd>>, pub cfds: watch::Sender<Vec<Cfd>>,
pub order: watch::Sender<Option<CfdOrder>>, pub order: watch::Sender<Option<CfdOrder>>,
pub quote: watch::Sender<Quote>, pub quote: watch::Sender<Option<Quote>>,
// TODO: Use this channel to communicate maker status as well with generic // TODO: Use this channel to communicate maker status as well with generic
// ID of connected counterparties // ID of connected counterparties
pub connected_takers: watch::Sender<Vec<Identity>>, pub connected_takers: watch::Sender<Vec<Identity>>,
@ -175,7 +170,7 @@ impl Actor {
fn handle(&mut self, msg: Update<bitmex_price_feed::Quote>) { fn handle(&mut self, msg: Update<bitmex_price_feed::Quote>) {
let quote = msg.0; let quote = msg.0;
self.state.update_quote(quote.clone()); self.state.update_quote(quote.clone());
let _ = self.tx.quote.send(quote.into()); let _ = self.tx.quote.send(Some(quote.into()));
let _ = self.tx.cfds.send(self.state.to_cfds()); let _ = self.tx.cfds.send(self.state.to_cfds());
} }
fn handle(&mut self, msg: Update<Vec<model::Identity>>) { fn handle(&mut self, msg: Update<Vec<model::Identity>>) {

10
daemon/src/routes_taker.rs

@ -124,7 +124,15 @@ pub async fn post_cfd_action(
} }
CfdAction::Commit => cfd_actor.send(taker_cfd::Commit { order_id: id }).await, CfdAction::Commit => cfd_actor.send(taker_cfd::Commit { order_id: id }).await,
CfdAction::Settle => { CfdAction::Settle => {
let quote: bitmex_price_feed::Quote = feeds.quote.borrow().clone().into(); let quote: bitmex_price_feed::Quote = match feeds.quote.borrow().as_ref() {
Some(quote) => quote.clone().into(),
None => {
return Err(HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR)
.title("Quote unavailable")
.detail("Cannot settle without current price information."))
}
};
let current_price = quote.for_taker(); let current_price = quote.for_taker();
cfd_actor cfd_actor
.send(taker_cfd::ProposeSettlement { .send(taker_cfd::ProposeSettlement {

4
daemon/src/taker.rs

@ -246,12 +246,12 @@ async fn main() -> Result<()> {
.run(); .run();
tasks.add(task); tasks.add(task);
let init_quote = price_feed_address price_feed_address
.send(bitmex_price_feed::Connect) .send(bitmex_price_feed::Connect)
.await??; .await??;
let (proj_actor, projection_feeds) = let (proj_actor, projection_feeds) =
projection::Actor::new(db.clone(), Role::Taker, bitcoin_network, init_quote).await?; projection::Actor::new(db.clone(), Role::Taker, bitcoin_network).await?;
tasks.add(projection_context.run(proj_actor)); tasks.add(projection_context.run(proj_actor));
let possible_addresses = resolve_maker_addresses(&opts.maker).await?; let possible_addresses = resolve_maker_addresses(&opts.maker).await?;

2
daemon/src/to_sse_event.rs

@ -98,7 +98,7 @@ impl ToSseEvent for connection::ConnectionStatus {
} }
} }
impl ToSseEvent for Quote { impl ToSseEvent for Option<Quote> {
fn to_sse_event(&self) -> Event { fn to_sse_event(&self) -> Event {
Event::json(self).event("quote") Event::json(self).event("quote")
} }

17
daemon/tests/harness/mod.rs

@ -3,10 +3,9 @@ use crate::harness::mocks::oracle::OracleActor;
use crate::harness::mocks::wallet::WalletActor; use crate::harness::mocks::wallet::WalletActor;
use crate::schnorrsig; use crate::schnorrsig;
use ::bdk::bitcoin::Network; use ::bdk::bitcoin::Network;
use daemon::bitmex_price_feed::Quote;
use daemon::connection::{connect, ConnectionStatus}; use daemon::connection::{connect, ConnectionStatus};
use daemon::model::cfd::{OrderId, Role}; use daemon::model::cfd::{OrderId, Role};
use daemon::model::{self, Price, Timestamp, Usd}; use daemon::model::{self, Price, Usd};
use daemon::projection::{Cfd, CfdOrder, Feeds, Identity}; use daemon::projection::{Cfd, CfdOrder, Feeds, Identity};
use daemon::seed::Seed; use daemon::seed::Seed;
use daemon::{ use daemon::{
@ -168,8 +167,7 @@ impl Maker {
.await .await
.unwrap(); .unwrap();
let (proj_actor, feeds) = let (proj_actor, feeds) = projection::Actor::new(db, Role::Maker, Network::Testnet)
projection::Actor::new(db, Role::Maker, Network::Testnet, dummy_quote())
.await .await
.unwrap(); .unwrap();
tasks.add(projection_context.run(proj_actor)); tasks.add(projection_context.run(proj_actor));
@ -296,8 +294,7 @@ impl Taker {
.await .await
.unwrap(); .unwrap();
let (proj_actor, feeds) = let (proj_actor, feeds) = projection::Actor::new(db, Role::Taker, Network::Testnet)
projection::Actor::new(db, Role::Taker, Network::Testnet, dummy_quote())
.await .await
.unwrap(); .unwrap();
tasks.add(projection_context.run(proj_actor)); tasks.add(projection_context.run(proj_actor));
@ -390,14 +387,6 @@ pub fn dummy_price() -> Price {
Price::new(dec!(50_000)).expect("to not fail") Price::new(dec!(50_000)).expect("to not fail")
} }
pub fn dummy_quote() -> Quote {
Quote {
timestamp: Timestamp::now(),
bid: dummy_price(),
ask: dummy_price(),
}
}
pub fn dummy_new_order() -> maker_cfd::NewOrder { pub fn dummy_new_order() -> maker_cfd::NewOrder {
maker_cfd::NewOrder { maker_cfd::NewOrder {
price: dummy_price(), price: dummy_price(),

Loading…
Cancel
Save