Browse Source

Merge #719

719: Use projection actor for connected takers r=klochowicz a=klochowicz

Last small stage before tackling the Cfd projection

Co-authored-by: Mariusz Klochowicz <mariusz@klochowicz.com>
debug-collab-settlement
bors[bot] 3 years ago
committed by GitHub
parent
commit
6ec94e9520
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 29
      daemon/src/projection.rs
  2. 7
      daemon/src/to_sse_event.rs
  3. 5
      daemon/tests/happy_path.rs
  4. 6
      daemon/tests/harness/mod.rs

29
daemon/src/projection.rs

@ -1,10 +1,11 @@
use std::collections::HashMap;
use crate::model::cfd::OrderId;
use crate::model::{Identity, Leverage, Position, Timestamp, TradingPair};
use crate::model::{Leverage, Position, Timestamp, TradingPair};
use crate::{bitmex_price_feed, model, Cfd, Order, UpdateCfdProposals};
use itertools::Itertools;
use rust_decimal::Decimal;
use serde::Serialize;
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use xtra_productivity::xtra_productivity;
@ -15,10 +16,10 @@ pub struct Actor {
pub struct Feeds {
pub quote: watch::Receiver<Quote>,
pub order: watch::Receiver<Option<CfdOrder>>,
pub connected_takers: watch::Receiver<Vec<Identity>>,
// TODO: Convert items below here into projections
pub cfds: watch::Receiver<Vec<Cfd>>,
pub settlements: watch::Receiver<UpdateCfdProposals>,
pub connected_takers: watch::Receiver<Vec<Identity>>,
}
impl Actor {
@ -77,8 +78,11 @@ impl Actor {
fn handle(&mut self, msg: Update<UpdateCfdProposals>) {
let _ = self.tx.settlements.send(msg.0);
}
fn handle(&mut self, msg: Update<Vec<Identity>>) {
let _ = self.tx.connected_takers.send(msg.0);
fn handle(&mut self, msg: Update<Vec<model::Identity>>) {
let _ = self
.tx
.connected_takers
.send(msg.0.iter().map(|x| x.into()).collect_vec());
}
}
@ -240,3 +244,18 @@ impl From<Order> for CfdOrder {
}
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, derive_more::Display)]
pub struct Identity(String);
impl From<&model::Identity> for Identity {
fn from(id: &model::Identity) -> Self {
Self(id.to_string())
}
}
impl From<model::Identity> for Identity {
fn from(id: model::Identity) -> Self {
Self(id.to_string())
}
}

7
daemon/src/to_sse_event.rs

@ -2,8 +2,8 @@ use crate::connection::ConnectionStatus;
use crate::model::cfd::{
Dlc, OrderId, Payout, Role, SettlementKind, UpdateCfdProposal, UpdateCfdProposals,
};
use crate::model::{Identity, Leverage, Position, Timestamp, TradingPair};
use crate::projection::{CfdOrder, Price, Quote, Usd};
use crate::model::{Leverage, Position, Timestamp, TradingPair};
use crate::projection::{CfdOrder, Identity, Price, Quote, Usd};
use crate::{bitmex_price_feed, model};
use bdk::bitcoin::{Amount, Network, SignedAmount, Txid};
use rocket::request::FromParam;
@ -255,8 +255,7 @@ impl ToSseEvent for CfdsWithAuxData {
impl ToSseEvent for Vec<Identity> {
fn to_sse_event(&self) -> Event {
let takers = self.iter().map(|x| x.to_string()).collect::<Vec<_>>();
Event::json(&takers).event("takers")
Event::json(&self).event("takers")
}
}

5
daemon/tests/happy_path.rs

@ -7,7 +7,8 @@ use crate::harness::{
};
use daemon::connection::ConnectionStatus;
use daemon::model::cfd::CfdState;
use daemon::model::{Identity, Usd};
use daemon::model::Usd;
use daemon::projection::Identity;
use maia::secp256k1_zkp::schnorrsig;
use rust_decimal_macros::dec;
use tokio::time::sleep;
@ -170,7 +171,7 @@ async fn maker_notices_lack_of_taker() {
let (mut maker, taker) = start_both().await;
assert_eq!(
vec![taker.id],
vec![taker.id.clone()],
next(maker.connected_takers_feed()).await.unwrap()
);

6
daemon/tests/harness/mod.rs

@ -5,8 +5,8 @@ use crate::schnorrsig;
use daemon::bitmex_price_feed::Quote;
use daemon::connection::{connect, ConnectionStatus};
use daemon::model::cfd::Cfd;
use daemon::model::{Identity, Price, Timestamp, Usd};
use daemon::projection::{CfdOrder, Feeds};
use daemon::model::{self, Price, Timestamp, Usd};
use daemon::projection::{CfdOrder, Feeds, Identity};
use daemon::seed::Seed;
use daemon::{
db, maker_cfd, maker_inc_connections, projection, taker_cfd, MakerActorSystem, Tasks,
@ -311,7 +311,7 @@ impl Taker {
));
Self {
id: Identity::new(identity_pk),
id: model::Identity::new(identity_pk).into(),
system: taker,
feeds,
mocks,

Loading…
Cancel
Save