Browse Source

Merge #746

746: Use UI types in the projection actor r=klochowicz a=klochowicz



Co-authored-by: Mariusz Klochowicz <mariusz@klochowicz.com>
chore/leaner-release-process
bors[bot] 3 years ago
committed by GitHub
parent
commit
637f62df9a
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      daemon/src/maker.rs
  2. 90
      daemon/src/projection.rs
  3. 40
      daemon/src/routes_maker.rs
  4. 41
      daemon/src/routes_taker.rs
  5. 4
      daemon/src/taker.rs
  6. 8
      daemon/src/to_sse_event.rs
  7. 22
      daemon/tests/happy_path.rs
  8. 3
      daemon/tests/harness/flow.rs
  9. 15
      daemon/tests/harness/mod.rs

4
daemon/src/maker.rs

@ -5,6 +5,7 @@ use bdk::{bitcoin, FeeRate};
use clap::{Parser, Subcommand};
use daemon::auth::{self, MAKER_USERNAME};
use daemon::db::load_all_cfds;
use daemon::model::cfd::Role;
use daemon::model::WalletInfo;
use daemon::seed::Seed;
use daemon::tokio_ext::FutureExt;
@ -275,7 +276,8 @@ async fn main() -> Result<()> {
load_all_cfds(&mut conn).await?
};
let (proj_actor, projection_feeds) = projection::Actor::new(cfds, init_quote);
let (proj_actor, projection_feeds) =
projection::Actor::new(Role::Maker, bitcoin_network, cfds, init_quote);
tasks.add(projection_context.run(proj_actor));
let listener_stream = futures::stream::poll_fn(move |ctx| {

90
daemon/src/projection.rs

@ -13,22 +13,33 @@ use xtra_productivity::xtra_productivity;
pub struct Actor {
tx: Tx,
state: State,
}
pub struct Feeds {
pub quote: watch::Receiver<Quote>,
pub order: watch::Receiver<Option<CfdOrder>>,
pub connected_takers: watch::Receiver<Vec<Identity>>,
// TODO: Convert items below here into projections
pub cfds: watch::Receiver<Vec<ModelCfd>>,
pub settlements: watch::Receiver<UpdateCfdProposals>,
pub cfds: watch::Receiver<Vec<Cfd>>,
}
impl Actor {
pub fn new(init_cfds: Vec<ModelCfd>, init_quote: bitmex_price_feed::Quote) -> (Self, Feeds) {
let (tx_cfds, rx_cfds) = watch::channel(init_cfds);
pub fn new(
role: Role,
network: Network,
init_cfds: Vec<ModelCfd>,
init_quote: bitmex_price_feed::Quote,
) -> (Self, Feeds) {
let state = State {
role,
network,
cfds: init_cfds,
proposals: HashMap::new(),
quote: init_quote.clone(),
};
let (tx_cfds, rx_cfds) = watch::channel(state.to_cfds());
let (tx_order, rx_order) = watch::channel(None);
let (tx_update_cfd_feed, rx_update_cfd_feed) = watch::channel(HashMap::new());
let (tx_quote, rx_quote) = watch::channel(init_quote.into());
let (tx_connected_takers, rx_connected_takers) = watch::channel(Vec::new());
@ -38,15 +49,14 @@ impl Actor {
cfds: tx_cfds,
order: tx_order,
quote: tx_quote,
settlements: tx_update_cfd_feed,
connected_takers: tx_connected_takers,
},
state,
},
Feeds {
cfds: rx_cfds,
order: rx_order,
quote: rx_quote,
settlements: rx_update_cfd_feed,
connected_takers: rx_connected_takers,
},
)
@ -55,15 +65,49 @@ impl Actor {
/// Internal struct to keep all the senders around in one place
struct Tx {
pub cfds: watch::Sender<Vec<ModelCfd>>,
pub cfds: watch::Sender<Vec<Cfd>>,
pub order: watch::Sender<Option<CfdOrder>>,
pub quote: watch::Sender<Quote>,
pub settlements: watch::Sender<UpdateCfdProposals>,
// TODO: Use this channel to communicate maker status as well with generic
// ID of connected counterparties
pub connected_takers: watch::Sender<Vec<Identity>>,
}
/// Internal struct to keep state in one place
struct State {
role: Role,
network: Network,
quote: bitmex_price_feed::Quote,
proposals: UpdateCfdProposals,
cfds: Vec<ModelCfd>,
}
impl State {
pub fn to_cfds(&self) -> Vec<Cfd> {
// FIXME: starting with the intermediate struct, only temporarily
let temp = CfdsWithAuxData::new(
self.cfds.clone(),
self.quote.clone(),
self.proposals.clone(),
self.role,
self.network,
);
temp.into()
}
pub fn update_proposals(&mut self, proposals: UpdateCfdProposals) {
let _ = std::mem::replace(&mut self.proposals, proposals);
}
pub fn update_quote(&mut self, quote: bitmex_price_feed::Quote) {
let _ = std::mem::replace(&mut self.quote, quote);
}
pub fn update_cfds(&mut self, cfds: Vec<ModelCfd>) {
let _ = std::mem::replace(&mut self.cfds, cfds);
}
}
pub struct Update<T>(pub T);
#[xtra_productivity]
@ -72,13 +116,18 @@ impl Actor {
let _ = self.tx.order.send(msg.0.map(|x| x.into()));
}
fn handle(&mut self, msg: Update<bitmex_price_feed::Quote>) {
let _ = self.tx.quote.send(msg.0.into());
let quote = msg.0;
self.state.update_quote(quote.clone());
let _ = self.tx.quote.send(quote.into());
let _ = self.tx.cfds.send(self.state.to_cfds());
}
fn handle(&mut self, msg: Update<Vec<ModelCfd>>) {
let _ = self.tx.cfds.send(msg.0);
self.state.update_cfds(msg.0);
let _ = self.tx.cfds.send(self.state.to_cfds());
}
fn handle(&mut self, msg: Update<UpdateCfdProposals>) {
let _ = self.tx.settlements.send(msg.0);
self.state.update_proposals(msg.0);
let _ = self.tx.cfds.send(self.state.to_cfds());
}
fn handle(&mut self, msg: Update<Vec<model::Identity>>) {
let _ = self
@ -396,8 +445,8 @@ pub struct Cfd {
pub expiry_timestamp: OffsetDateTime,
}
impl From<&CfdsWithAuxData> for Vec<Cfd> {
fn from(input: &CfdsWithAuxData) -> Self {
impl From<CfdsWithAuxData> for Vec<Cfd> {
fn from(input: CfdsWithAuxData) -> Self {
let current_price = input.current_price;
let network = input.network;
@ -460,22 +509,19 @@ pub struct CfdsWithAuxData {
impl CfdsWithAuxData {
pub fn new(
rx_cfds: &watch::Receiver<Vec<model::cfd::Cfd>>,
rx_quote: &watch::Receiver<Quote>,
rx_updates: &watch::Receiver<UpdateCfdProposals>,
cfds: Vec<model::cfd::Cfd>,
quote: bitmex_price_feed::Quote,
pending_proposals: UpdateCfdProposals,
role: Role,
network: Network,
) -> Self {
let quote: bitmex_price_feed::Quote = rx_quote.borrow().clone().into();
let current_price = match role {
Role::Maker => quote.for_maker(),
Role::Taker => quote.for_taker(),
};
let pending_proposals = rx_updates.borrow().clone();
CfdsWithAuxData {
cfds: rx_cfds.borrow().clone(),
cfds,
current_price,
pending_proposals,
network,

40
daemon/src/routes_maker.rs

@ -1,9 +1,9 @@
use anyhow::Result;
use bdk::bitcoin::Network;
use daemon::auth::Authenticated;
use daemon::model::cfd::{OrderId, Role};
use daemon::model::cfd::OrderId;
use daemon::model::{Price, Usd, WalletInfo};
use daemon::projection::{CfdAction, CfdsWithAuxData, Feeds};
use daemon::projection::{CfdAction, Feeds};
use daemon::routes::EmbeddedFileExt;
use daemon::to_sse_event::ToSseEvent;
use daemon::{maker_cfd, maker_inc_connections, monitor, oracle, wallet};
@ -30,7 +30,6 @@ pub type Maker = xtra::Address<
pub async fn maker_feed(
rx: &State<Feeds>,
rx_wallet: &State<watch::Receiver<WalletInfo>>,
network: &State<Network>,
_auth: Authenticated,
) -> EventStream![] {
let rx = rx.inner();
@ -38,9 +37,7 @@ pub async fn maker_feed(
let mut rx_order = rx.order.clone();
let mut rx_wallet = rx_wallet.inner().clone();
let mut rx_quote = rx.quote.clone();
let mut rx_settlements = rx.settlements.clone();
let mut rx_connected_takers = rx.connected_takers.clone();
let network = *network.inner();
EventStream! {
let wallet_info = rx_wallet.borrow().clone();
@ -52,12 +49,8 @@ pub async fn maker_feed(
let quote = rx_quote.borrow().clone();
yield quote.to_sse_event();
yield CfdsWithAuxData::new(
&rx_cfds,
&rx_quote,
&rx_settlements,
Role::Maker, network
).to_sse_event();
let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event();
let takers = rx_connected_takers.borrow().clone();
yield takers.to_sse_event();
@ -77,33 +70,12 @@ pub async fn maker_feed(
yield takers.to_sse_event();
}
Ok(()) = rx_cfds.changed() => {
yield CfdsWithAuxData::new(
&rx_cfds,
&rx_quote,
&rx_settlements,
Role::Maker,
network
).to_sse_event();
}
Ok(()) = rx_settlements.changed() => {
yield CfdsWithAuxData::new(
&rx_cfds,
&rx_quote,
&rx_settlements,
Role::Maker,
network
).to_sse_event();
let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event();
}
Ok(()) = rx_quote.changed() => {
let quote = rx_quote.borrow().clone();
yield quote.to_sse_event();
yield CfdsWithAuxData::new(
&rx_cfds,
&rx_quote,
&rx_settlements,
Role::Maker,
network
).to_sse_event();
}
}
}

41
daemon/src/routes_taker.rs

@ -1,8 +1,8 @@
use bdk::bitcoin::{Amount, Network};
use daemon::connection::ConnectionStatus;
use daemon::model::cfd::{calculate_long_margin, OrderId, Role};
use daemon::model::cfd::{calculate_long_margin, OrderId};
use daemon::model::{Leverage, Price, Usd, WalletInfo};
use daemon::projection::{CfdAction, CfdsWithAuxData, Feeds};
use daemon::projection::{CfdAction, Feeds};
use daemon::routes::EmbeddedFileExt;
use daemon::to_sse_event::ToSseEvent;
use daemon::{bitmex_price_feed, monitor, oracle, taker_cfd, tx, wallet};
@ -27,16 +27,13 @@ pub async fn feed(
rx: &State<Feeds>,
rx_wallet: &State<watch::Receiver<WalletInfo>>,
rx_maker_status: &State<watch::Receiver<ConnectionStatus>>,
network: &State<Network>,
) -> EventStream![] {
let rx = rx.inner();
let mut rx_cfds = rx.cfds.clone();
let mut rx_order = rx.order.clone();
let mut rx_quote = rx.quote.clone();
let mut rx_settlements = rx.settlements.clone();
let mut rx_wallet = rx_wallet.inner().clone();
let mut rx_maker_status = rx_maker_status.inner().clone();
let network = *network.inner();
EventStream! {
let wallet_info = rx_wallet.borrow().clone();
@ -51,13 +48,8 @@ pub async fn feed(
let quote = rx_quote.borrow().clone();
yield quote.to_sse_event();
yield CfdsWithAuxData::new(
&rx_cfds,
&rx_quote,
&rx_settlements,
Role::Taker,
network
).to_sse_event();
let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event();
loop{
select! {
@ -74,33 +66,12 @@ pub async fn feed(
yield order.to_sse_event();
}
Ok(()) = rx_cfds.changed() => {
yield CfdsWithAuxData::new(
&rx_cfds,
&rx_quote,
&rx_settlements,
Role::Taker,
network
).to_sse_event();
}
Ok(()) = rx_settlements.changed() => {
yield CfdsWithAuxData::new(
&rx_cfds,
&rx_quote,
&rx_settlements,
Role::Taker,
network
).to_sse_event();
let cfds = rx_cfds.borrow().clone();
yield cfds.to_sse_event();
}
Ok(()) = rx_quote.changed() => {
let quote = rx_quote.borrow().clone();
yield quote.to_sse_event();
yield CfdsWithAuxData::new(
&rx_cfds,
&rx_quote,
&rx_settlements,
Role::Taker,
network
).to_sse_event();
}
}
}

4
daemon/src/taker.rs

@ -5,6 +5,7 @@ use bdk::{bitcoin, FeeRate};
use clap::{Parser, Subcommand};
use daemon::connection::connect;
use daemon::db::load_all_cfds;
use daemon::model::cfd::Role;
use daemon::model::WalletInfo;
use daemon::seed::Seed;
use daemon::tokio_ext::FutureExt;
@ -255,7 +256,8 @@ async fn main() -> Result<()> {
load_all_cfds(&mut conn).await?
};
let (proj_actor, projection_feeds) = projection::Actor::new(cfds, init_quote);
let (proj_actor, projection_feeds) =
projection::Actor::new(Role::Taker, bitcoin_network, cfds, init_quote);
tasks.add(projection_context.run(proj_actor));
let possible_addresses = resolve_maker_addresses(&opts.maker).await?;

8
daemon/src/to_sse_event.rs

@ -1,7 +1,7 @@
use crate::connection::ConnectionStatus;
use crate::model;
use crate::model::Timestamp;
use crate::projection::{Cfd, CfdAction, CfdOrder, CfdsWithAuxData, Identity, Quote};
use crate::projection::{Cfd, CfdAction, CfdOrder, Identity, Quote};
use bdk::bitcoin::Amount;
use rocket::request::FromParam;
use rocket::response::stream::Event;
@ -20,11 +20,9 @@ pub trait ToSseEvent {
fn to_sse_event(&self) -> Event;
}
impl ToSseEvent for CfdsWithAuxData {
// TODO: This conversion can fail, we might want to change the API
impl ToSseEvent for Vec<Cfd> {
fn to_sse_event(&self) -> Event {
let cfds: Vec<Cfd> = self.into();
Event::json(&cfds).event("cfds")
Event::json(&self).event("cfds")
}
}

22
daemon/tests/happy_path.rs

@ -2,13 +2,11 @@ use std::time::Duration;
use crate::harness::flow::{is_next_none, next, next_cfd, next_order, next_some};
use crate::harness::{
dummy_new_order, init_tracing, order_from_cfd, start_both, Maker, MakerConfig, Taker,
TakerConfig,
dummy_new_order, init_tracing, start_both, Maker, MakerConfig, Taker, TakerConfig,
};
use daemon::connection::ConnectionStatus;
use daemon::model::cfd::CfdState;
use daemon::model::Usd;
use daemon::projection::Identity;
use daemon::projection::{CfdState, Identity};
use maia::secp256k1_zkp::schnorrsig;
use rust_decimal_macros::dec;
use tokio::time::sleep;
@ -47,8 +45,8 @@ async fn taker_takes_order_and_maker_rejects() {
taker.take_order(received.clone(), Usd::new(dec!(10))).await;
let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap();
assert_eq!(order_from_cfd(&taker_cfd), received);
assert_eq!(order_from_cfd(&maker_cfd), received);
assert_eq!(taker_cfd.order_id, received.id);
assert_eq!(maker_cfd.order_id, received.id);
assert!(matches!(
taker_cfd.state,
CfdState::OutgoingOrderRequest { .. }
@ -62,8 +60,8 @@ async fn taker_takes_order_and_maker_rejects() {
let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap();
// TODO: More elaborate Cfd assertions
assert_eq!(order_from_cfd(&taker_cfd), received);
assert_eq!(order_from_cfd(&maker_cfd), received);
assert_eq!(taker_cfd.order_id, received.id);
assert_eq!(maker_cfd.order_id, received.id);
assert!(matches!(taker_cfd.state, CfdState::Rejected { .. }));
assert!(matches!(maker_cfd.state, CfdState::Rejected { .. }));
}
@ -105,8 +103,8 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() {
let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap();
// TODO: More elaborate Cfd assertions
assert_eq!(taker_cfd.order.id, received.id);
assert_eq!(maker_cfd.order.id, received.id);
assert_eq!(taker_cfd.order_id, received.id);
assert_eq!(maker_cfd.order_id, received.id);
assert!(matches!(taker_cfd.state, CfdState::ContractSetup { .. }));
assert!(matches!(maker_cfd.state, CfdState::ContractSetup { .. }));
@ -115,8 +113,8 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() {
let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap();
// TODO: More elaborate Cfd assertions
assert_eq!(taker_cfd.order.id, received.id);
assert_eq!(maker_cfd.order.id, received.id);
assert_eq!(taker_cfd.order_id, received.id);
assert_eq!(maker_cfd.order_id, received.id);
assert!(matches!(taker_cfd.state, CfdState::PendingOpen { .. }));
assert!(matches!(maker_cfd.state, CfdState::PendingOpen { .. }));
}

3
daemon/tests/harness/flow.rs

@ -1,6 +1,5 @@
use anyhow::{Context, Result};
use daemon::model::cfd::Cfd;
use daemon::projection::CfdOrder;
use daemon::projection::{Cfd, CfdOrder};
use daemon::tokio_ext::FutureExt;
use std::time::Duration;
use tokio::sync::watch;

15
daemon/tests/harness/mod.rs

@ -2,11 +2,12 @@ use crate::harness::mocks::monitor::MonitorActor;
use crate::harness::mocks::oracle::OracleActor;
use crate::harness::mocks::wallet::WalletActor;
use crate::schnorrsig;
use ::bdk::bitcoin::Network;
use daemon::bitmex_price_feed::Quote;
use daemon::connection::{connect, ConnectionStatus};
use daemon::model::cfd::Cfd;
use daemon::model::cfd::Role;
use daemon::model::{self, Price, Timestamp, Usd};
use daemon::projection::{CfdOrder, Feeds, Identity};
use daemon::projection::{Cfd, CfdOrder, Feeds, Identity};
use daemon::seed::Seed;
use daemon::{
db, maker_cfd, maker_inc_connections, projection, taker_cfd, MakerActorSystem, Tasks,
@ -179,7 +180,8 @@ impl Maker {
ask: Price::new(dec!(10000)).unwrap(),
};
let (proj_actor, feeds) = projection::Actor::new(vec![], dummy_quote);
let (proj_actor, feeds) =
projection::Actor::new(Role::Maker, Network::Testnet, vec![], dummy_quote);
tasks.add(projection_context.run(proj_actor));
let address = listener.local_addr().unwrap();
@ -300,7 +302,8 @@ impl Taker {
ask: Price::new(dec!(10000)).unwrap(),
};
let (proj_actor, feeds) = projection::Actor::new(vec![], dummy_quote);
let (proj_actor, feeds) =
projection::Actor::new(Role::Taker, Network::Testnet, vec![], dummy_quote);
tasks.add(projection_context.run(proj_actor));
tasks.add(connect(
@ -375,7 +378,3 @@ pub fn init_tracing() -> DefaultGuard {
guard
}
pub fn order_from_cfd(cfd: &Cfd) -> CfdOrder {
cfd.order.clone().into()
}

Loading…
Cancel
Save