Browse Source

Use projection::Cfd in the feeds

Roll-out its usage in the actor tests
chore/leaner-release-process
Mariusz Klochowicz 3 years ago
parent
commit
01a6dba79b
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 43
      daemon/src/projection.rs
  2. 40
      daemon/src/routes_maker.rs
  3. 41
      daemon/src/routes_taker.rs
  4. 22
      daemon/tests/happy_path.rs
  5. 3
      daemon/tests/harness/flow.rs
  6. 8
      daemon/tests/harness/mod.rs

43
daemon/src/projection.rs

@ -20,9 +20,7 @@ pub struct Feeds {
pub quote: watch::Receiver<Quote>,
pub order: watch::Receiver<Option<CfdOrder>>,
pub connected_takers: watch::Receiver<Vec<Identity>>,
// TODO: Convert items below here into projections
pub cfds: watch::Receiver<Vec<ModelCfd>>,
pub settlements: watch::Receiver<UpdateCfdProposals>,
pub cfds: watch::Receiver<Vec<Cfd>>,
}
impl Actor {
@ -32,10 +30,17 @@ impl Actor {
init_cfds: Vec<ModelCfd>,
init_quote: bitmex_price_feed::Quote,
) -> (Self, Feeds) {
let (tx_cfds, rx_cfds) = watch::channel(init_cfds);
let state = State {
role,
network,
cfds: init_cfds,
proposals: HashMap::new(),
quote: init_quote.clone(),
};
let (tx_cfds, rx_cfds) = watch::channel(state.to_cfds());
let (tx_order, rx_order) = watch::channel(None);
let (tx_update_cfd_feed, rx_update_cfd_feed) = watch::channel(HashMap::new());
let (tx_quote, rx_quote) = watch::channel(init_quote.clone().into());
let (tx_quote, rx_quote) = watch::channel(init_quote.into());
let (tx_connected_takers, rx_connected_takers) = watch::channel(Vec::new());
(
@ -44,22 +49,14 @@ impl Actor {
cfds: tx_cfds,
order: tx_order,
quote: tx_quote,
settlements: tx_update_cfd_feed,
connected_takers: tx_connected_takers,
},
state: State {
role,
network,
cfds: vec![],
proposals: HashMap::new(),
quote: init_quote,
},
state,
},
Feeds {
cfds: rx_cfds,
order: rx_order,
quote: rx_quote,
settlements: rx_update_cfd_feed,
connected_takers: rx_connected_takers,
},
)
@ -68,10 +65,9 @@ impl Actor {
/// Internal struct to keep all the senders around in one place
struct Tx {
pub cfds: watch::Sender<Vec<ModelCfd>>,
pub cfds: watch::Sender<Vec<Cfd>>,
pub order: watch::Sender<Option<CfdOrder>>,
pub quote: watch::Sender<Quote>,
pub settlements: watch::Sender<UpdateCfdProposals>,
// TODO: Use this channel to communicate maker status as well with generic
// ID of connected counterparties
pub connected_takers: watch::Sender<Vec<Identity>>,
@ -87,7 +83,7 @@ struct State {
}
impl State {
pub fn to_cfd(&self) -> Vec<Cfd> {
pub fn to_cfds(&self) -> Vec<Cfd> {
// FIXME: starting with the intermediate struct, only temporarily
let temp = CfdsWithAuxData::new(
self.cfds.clone(),
@ -123,16 +119,15 @@ impl Actor {
let quote = msg.0;
self.state.update_quote(quote.clone());
let _ = self.tx.quote.send(quote.into());
let _ = self.tx.cfds.send(self.state.to_cfds());
}
fn handle(&mut self, msg: Update<Vec<ModelCfd>>) {
let cfds = msg.0;
self.state.update_cfds(cfds.clone());
let _ = self.tx.cfds.send(cfds);
self.state.update_cfds(msg.0);
let _ = self.tx.cfds.send(self.state.to_cfds());
}
fn handle(&mut self, msg: Update<UpdateCfdProposals>) {
let proposals = msg.0;
self.state.update_proposals(proposals.clone());
let _ = self.tx.settlements.send(proposals);
self.state.update_proposals(msg.0);
let _ = self.tx.cfds.send(self.state.to_cfds());
}
fn handle(&mut self, msg: Update<Vec<model::Identity>>) {
let _ = self

40
daemon/src/routes_maker.rs

@ -1,9 +1,9 @@
use anyhow::Result;
use bdk::bitcoin::Network;
use daemon::auth::Authenticated;
use daemon::model::cfd::{OrderId, Role};
use daemon::model::cfd::OrderId;
use daemon::model::{Price, Usd, WalletInfo};
use daemon::projection::{CfdAction, CfdsWithAuxData, Feeds};
use daemon::projection::{CfdAction, Feeds};
use daemon::routes::EmbeddedFileExt;
use daemon::to_sse_event::ToSseEvent;
use daemon::{maker_cfd, maker_inc_connections, monitor, oracle, wallet};
@ -30,7 +30,6 @@ pub type Maker = xtra::Address<
pub async fn maker_feed(
rx: &State<Feeds>,
rx_wallet: &State<watch::Receiver<WalletInfo>>,
network: &State<Network>,
_auth: Authenticated,
) -> EventStream![] {
let rx = rx.inner();
@ -38,9 +37,7 @@ pub async fn maker_feed(
let mut rx_order = rx.order.clone();
let mut rx_wallet = rx_wallet.inner().clone();
let mut rx_quote = rx.quote.clone();
let mut rx_settlements = rx.settlements.clone();
let mut rx_connected_takers = rx.connected_takers.clone();
let network = *network.inner();
EventStream! {
let wallet_info = rx_wallet.borrow().clone();
@ -52,12 +49,8 @@ pub async fn maker_feed(
let quote = rx_quote.borrow().clone();
yield quote.to_sse_event();
yield CfdsWithAuxData::new(
&rx_cfds,
&rx_quote,
&rx_settlements,
Role::Maker, network
).to_sse_event();
let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event();
let takers = rx_connected_takers.borrow().clone();
yield takers.to_sse_event();
@ -77,33 +70,12 @@ pub async fn maker_feed(
yield takers.to_sse_event();
}
Ok(()) = rx_cfds.changed() => {
yield CfdsWithAuxData::new(
&rx_cfds,
&rx_quote,
&rx_settlements,
Role::Maker,
network
).to_sse_event();
}
Ok(()) = rx_settlements.changed() => {
yield CfdsWithAuxData::new(
&rx_cfds,
&rx_quote,
&rx_settlements,
Role::Maker,
network
).to_sse_event();
let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event();
}
Ok(()) = rx_quote.changed() => {
let quote = rx_quote.borrow().clone();
yield quote.to_sse_event();
yield CfdsWithAuxData::new(
&rx_cfds,
&rx_quote,
&rx_settlements,
Role::Maker,
network
).to_sse_event();
}
}
}

41
daemon/src/routes_taker.rs

@ -1,8 +1,8 @@
use bdk::bitcoin::{Amount, Network};
use daemon::connection::ConnectionStatus;
use daemon::model::cfd::{calculate_long_margin, OrderId, Role};
use daemon::model::cfd::{calculate_long_margin, OrderId};
use daemon::model::{Leverage, Price, Usd, WalletInfo};
use daemon::projection::{CfdAction, CfdsWithAuxData, Feeds};
use daemon::projection::{CfdAction, Feeds};
use daemon::routes::EmbeddedFileExt;
use daemon::to_sse_event::ToSseEvent;
use daemon::{bitmex_price_feed, monitor, oracle, taker_cfd, tx, wallet};
@ -27,16 +27,13 @@ pub async fn feed(
rx: &State<Feeds>,
rx_wallet: &State<watch::Receiver<WalletInfo>>,
rx_maker_status: &State<watch::Receiver<ConnectionStatus>>,
network: &State<Network>,
) -> EventStream![] {
let rx = rx.inner();
let mut rx_cfds = rx.cfds.clone();
let mut rx_order = rx.order.clone();
let mut rx_quote = rx.quote.clone();
let mut rx_settlements = rx.settlements.clone();
let mut rx_wallet = rx_wallet.inner().clone();
let mut rx_maker_status = rx_maker_status.inner().clone();
let network = *network.inner();
EventStream! {
let wallet_info = rx_wallet.borrow().clone();
@ -51,13 +48,8 @@ pub async fn feed(
let quote = rx_quote.borrow().clone();
yield quote.to_sse_event();
yield CfdsWithAuxData::new(
&rx_cfds,
&rx_quote,
&rx_settlements,
Role::Taker,
network
).to_sse_event();
let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event();
loop{
select! {
@ -74,33 +66,12 @@ pub async fn feed(
yield order.to_sse_event();
}
Ok(()) = rx_cfds.changed() => {
yield CfdsWithAuxData::new(
&rx_cfds,
&rx_quote,
&rx_settlements,
Role::Taker,
network
).to_sse_event();
}
Ok(()) = rx_settlements.changed() => {
yield CfdsWithAuxData::new(
&rx_cfds,
&rx_quote,
&rx_settlements,
Role::Taker,
network
).to_sse_event();
let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event();
}
Ok(()) = rx_quote.changed() => {
let quote = rx_quote.borrow().clone();
yield quote.to_sse_event();
yield CfdsWithAuxData::new(
&rx_cfds,
&rx_quote,
&rx_settlements,
Role::Taker,
network
).to_sse_event();
}
}
}

22
daemon/tests/happy_path.rs

@ -2,13 +2,11 @@ use std::time::Duration;
use crate::harness::flow::{is_next_none, next, next_cfd, next_order, next_some};
use crate::harness::{
dummy_new_order, init_tracing, order_from_cfd, start_both, Maker, MakerConfig, Taker,
TakerConfig,
dummy_new_order, init_tracing, start_both, Maker, MakerConfig, Taker, TakerConfig,
};
use daemon::connection::ConnectionStatus;
use daemon::model::cfd::CfdState;
use daemon::model::Usd;
use daemon::projection::Identity;
use daemon::projection::{CfdState, Identity};
use maia::secp256k1_zkp::schnorrsig;
use rust_decimal_macros::dec;
use tokio::time::sleep;
@ -47,8 +45,8 @@ async fn taker_takes_order_and_maker_rejects() {
taker.take_order(received.clone(), Usd::new(dec!(10))).await;
let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap();
assert_eq!(order_from_cfd(&taker_cfd), received);
assert_eq!(order_from_cfd(&maker_cfd), received);
assert_eq!(taker_cfd.order_id, received.id);
assert_eq!(maker_cfd.order_id, received.id);
assert!(matches!(
taker_cfd.state,
CfdState::OutgoingOrderRequest { .. }
@ -62,8 +60,8 @@ async fn taker_takes_order_and_maker_rejects() {
let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap();
// TODO: More elaborate Cfd assertions
assert_eq!(order_from_cfd(&taker_cfd), received);
assert_eq!(order_from_cfd(&maker_cfd), received);
assert_eq!(taker_cfd.order_id, received.id);
assert_eq!(maker_cfd.order_id, received.id);
assert!(matches!(taker_cfd.state, CfdState::Rejected { .. }));
assert!(matches!(maker_cfd.state, CfdState::Rejected { .. }));
}
@ -105,8 +103,8 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() {
let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap();
// TODO: More elaborate Cfd assertions
assert_eq!(taker_cfd.order.id, received.id);
assert_eq!(maker_cfd.order.id, received.id);
assert_eq!(taker_cfd.order_id, received.id);
assert_eq!(maker_cfd.order_id, received.id);
assert!(matches!(taker_cfd.state, CfdState::ContractSetup { .. }));
assert!(matches!(maker_cfd.state, CfdState::ContractSetup { .. }));
@ -115,8 +113,8 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() {
let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap();
// TODO: More elaborate Cfd assertions
assert_eq!(taker_cfd.order.id, received.id);
assert_eq!(maker_cfd.order.id, received.id);
assert_eq!(taker_cfd.order_id, received.id);
assert_eq!(maker_cfd.order_id, received.id);
assert!(matches!(taker_cfd.state, CfdState::PendingOpen { .. }));
assert!(matches!(maker_cfd.state, CfdState::PendingOpen { .. }));
}

3
daemon/tests/harness/flow.rs

@ -1,6 +1,5 @@
use anyhow::{Context, Result};
use daemon::model::cfd::Cfd;
use daemon::projection::CfdOrder;
use daemon::projection::{Cfd, CfdOrder};
use daemon::tokio_ext::FutureExt;
use std::time::Duration;
use tokio::sync::watch;

8
daemon/tests/harness/mod.rs

@ -5,9 +5,9 @@ use crate::schnorrsig;
use ::bdk::bitcoin::Network;
use daemon::bitmex_price_feed::Quote;
use daemon::connection::{connect, ConnectionStatus};
use daemon::model::cfd::{Cfd, Role};
use daemon::model::cfd::Role;
use daemon::model::{self, Price, Timestamp, Usd};
use daemon::projection::{CfdOrder, Feeds, Identity};
use daemon::projection::{Cfd, CfdOrder, Feeds, Identity};
use daemon::seed::Seed;
use daemon::{
db, maker_cfd, maker_inc_connections, projection, taker_cfd, MakerActorSystem, Tasks,
@ -378,7 +378,3 @@ pub fn init_tracing() -> DefaultGuard {
guard
}
pub fn order_from_cfd(cfd: &Cfd) -> CfdOrder {
cfd.order.clone().into()
}

Loading…
Cancel
Save