Browse Source

Refactor projection actor before adding new functionality

Create watch channels within the actor, returning a single struct called `Feeds`.
Manage the struct with Rocket and use it in SSE events.
debug-collab-settlement
Mariusz Klochowicz 3 years ago
parent
commit
ee9d81e7bc
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 30
      daemon/src/maker.rs
  2. 74
      daemon/src/projection.rs
  3. 24
      daemon/src/routes_maker.rs
  4. 23
      daemon/src/routes_taker.rs
  5. 34
      daemon/src/taker.rs
  6. 62
      daemon/tests/harness/mod.rs

30
daemon/src/maker.rs

@ -4,10 +4,8 @@ use bdk::bitcoin::Amount;
use bdk::{bitcoin, FeeRate};
use clap::{Parser, Subcommand};
use daemon::auth::{self, MAKER_USERNAME};
use daemon::bitmex_price_feed::Quote;
use daemon::db::load_all_cfds;
use daemon::model::cfd::{Order, UpdateCfdProposals};
use daemon::model::{TakerId, WalletInfo};
use daemon::model::WalletInfo;
use daemon::seed::Seed;
use daemon::tokio_ext::FutureExt;
use daemon::{
@ -17,7 +15,6 @@ use daemon::{
};
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
@ -272,27 +269,14 @@ async fn main() -> Result<()> {
let (task, init_quote) = bitmex_price_feed::new(projection_actor).await?;
tasks.add(task);
// TODO: Move to projection actor
let cfds = {
let mut conn = db.acquire().await?;
load_all_cfds(&mut conn).await?
};
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (update_cfd_feed_sender, update_cfd_feed_receiver) =
watch::channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, quote_receiver) = watch::channel::<Quote>(init_quote);
let (connected_takers_feed_sender, connected_takers_feed_receiver) =
watch::channel::<Vec<TakerId>>(Vec::new());
tasks.add(projection_context.run(projection::Actor::new(
cfd_feed_sender,
order_feed_sender,
quote_sender,
update_cfd_feed_sender,
connected_takers_feed_sender,
)));
let (proj_actor, projection_feeds) = projection::Actor::new(cfds, init_quote);
tasks.add(projection_context.run(proj_actor));
let listener_stream = futures::stream::poll_fn(move |ctx| {
let message = match futures::ready!(listener.poll_accept(ctx)) {
@ -309,14 +293,10 @@ async fn main() -> Result<()> {
tasks.add(wallet_sync::new(wallet.clone(), wallet_feed_sender));
rocket::custom(figment)
.manage(order_feed_receiver)
.manage(update_cfd_feed_receiver)
.manage(projection_feeds)
.manage(cfd_actor_addr)
.manage(cfd_feed_receiver)
.manage(connected_takers_feed_receiver)
.manage(wallet_feed_receiver)
.manage(auth_password)
.manage(quote_receiver)
.manage(bitcoin_network)
.manage(wallet)
.mount(

74
daemon/src/projection.rs

@ -1,3 +1,5 @@
use std::collections::HashMap;
use crate::bitmex_price_feed::Quote;
use crate::model::TakerId;
use crate::{Cfd, Order, UpdateCfdProposals};
@ -5,31 +7,55 @@ use tokio::sync::watch;
use xtra_productivity::xtra_productivity;
pub struct Actor {
tx_cfds: watch::Sender<Vec<Cfd>>,
tx_order: watch::Sender<Option<Order>>,
tx_quote: watch::Sender<Quote>,
tx_settlements: watch::Sender<UpdateCfdProposals>,
// TODO: Use this channel to communicate maker status as well with generic
// ID of connected counterparties
tx_connected_takers: watch::Sender<Vec<TakerId>>,
tx: Tx,
}
pub struct Feeds {
pub cfds: watch::Receiver<Vec<Cfd>>,
pub order: watch::Receiver<Option<Order>>,
pub quote: watch::Receiver<Quote>,
pub settlements: watch::Receiver<UpdateCfdProposals>,
pub connected_takers: watch::Receiver<Vec<TakerId>>,
}
impl Actor {
pub fn new(
tx_cfds: watch::Sender<Vec<Cfd>>,
tx_order: watch::Sender<Option<Order>>,
tx_quote: watch::Sender<Quote>,
tx_settlements: watch::Sender<UpdateCfdProposals>,
tx_connected_takers: watch::Sender<Vec<TakerId>>,
) -> Self {
pub fn new(init_cfds: Vec<Cfd>, init_quote: Quote) -> (Self, Feeds) {
let (tx_cfds, rx_cfds) = watch::channel(init_cfds);
let (tx_order, rx_order) = watch::channel(None);
let (tx_update_cfd_feed, rx_update_cfd_feed) = watch::channel(HashMap::new());
let (tx_quote, rx_quote) = watch::channel(init_quote);
let (tx_connected_takers, rx_connected_takers) = watch::channel(Vec::new());
(
Self {
tx_cfds,
tx_order,
tx_quote,
tx_settlements,
tx_connected_takers,
tx: Tx {
cfds: tx_cfds,
order: tx_order,
quote: tx_quote,
settlements: tx_update_cfd_feed,
connected_takers: tx_connected_takers,
},
},
Feeds {
cfds: rx_cfds,
order: rx_order,
quote: rx_quote,
settlements: rx_update_cfd_feed,
connected_takers: rx_connected_takers,
},
)
}
}
/// Internal struct to keep all the senders around in one place
struct Tx {
pub cfds: watch::Sender<Vec<Cfd>>,
pub order: watch::Sender<Option<Order>>,
pub quote: watch::Sender<Quote>,
pub settlements: watch::Sender<UpdateCfdProposals>,
// TODO: Use this channel to communicate maker status as well with generic
// ID of connected counterparties
pub connected_takers: watch::Sender<Vec<TakerId>>,
}
pub struct Update<T>(pub T);
@ -37,19 +63,19 @@ pub struct Update<T>(pub T);
#[xtra_productivity]
impl Actor {
fn handle(&mut self, msg: Update<Vec<Cfd>>) {
let _ = self.tx_cfds.send(msg.0);
let _ = self.tx.cfds.send(msg.0);
}
fn handle(&mut self, msg: Update<Option<Order>>) {
let _ = self.tx_order.send(msg.0);
let _ = self.tx.order.send(msg.0);
}
fn handle(&mut self, msg: Update<Quote>) {
let _ = self.tx_quote.send(msg.0);
let _ = self.tx.quote.send(msg.0);
}
fn handle(&mut self, msg: Update<UpdateCfdProposals>) {
let _ = self.tx_settlements.send(msg.0);
let _ = self.tx.settlements.send(msg.0);
}
fn handle(&mut self, msg: Update<Vec<TakerId>>) {
let _ = self.tx_connected_takers.send(msg.0);
let _ = self.tx.connected_takers.send(msg.0);
}
}

24
daemon/src/routes_maker.rs

@ -1,11 +1,12 @@
use anyhow::Result;
use bdk::bitcoin::Network;
use daemon::auth::Authenticated;
use daemon::model::cfd::{Cfd, Order, OrderId, Role, UpdateCfdProposals};
use daemon::model::{Price, TakerId, Usd, WalletInfo};
use daemon::model::cfd::{OrderId, Role};
use daemon::model::{Price, Usd, WalletInfo};
use daemon::projection::Feeds;
use daemon::routes::EmbeddedFileExt;
use daemon::to_sse_event::{CfdAction, CfdsWithAuxData, ToSseEvent};
use daemon::{bitmex_price_feed, maker_cfd, maker_inc_connections, monitor, oracle, wallet};
use daemon::{maker_cfd, maker_inc_connections, monitor, oracle, wallet};
use http_api_problem::{HttpApiProblem, StatusCode};
use rocket::http::{ContentType, Header, Status};
use rocket::response::stream::EventStream;
@ -27,21 +28,18 @@ pub type Maker = xtra::Address<
#[allow(clippy::too_many_arguments)]
#[rocket::get("/feed")]
pub async fn maker_feed(
rx_cfds: &State<watch::Receiver<Vec<Cfd>>>,
rx_order: &State<watch::Receiver<Option<Order>>>,
rx: &State<Feeds>,
rx_wallet: &State<watch::Receiver<WalletInfo>>,
rx_quote: &State<watch::Receiver<bitmex_price_feed::Quote>>,
rx_settlements: &State<watch::Receiver<UpdateCfdProposals>>,
rx_connected_takers: &State<watch::Receiver<Vec<TakerId>>>,
network: &State<Network>,
_auth: Authenticated,
) -> EventStream![] {
let mut rx_cfds = rx_cfds.inner().clone();
let mut rx_order = rx_order.inner().clone();
let rx = rx.inner();
let mut rx_cfds = rx.cfds.clone();
let mut rx_order = rx.order.clone();
let mut rx_wallet = rx_wallet.inner().clone();
let mut rx_quote = rx_quote.inner().clone();
let mut rx_settlements = rx_settlements.inner().clone();
let mut rx_connected_takers = rx_connected_takers.inner().clone();
let mut rx_quote = rx.quote.clone();
let mut rx_settlements = rx.settlements.clone();
let mut rx_connected_takers = rx.connected_takers.clone();
let network = *network.inner();
EventStream! {

23
daemon/src/routes_taker.rs

@ -1,10 +1,11 @@
use bdk::bitcoin::{Amount, Network};
use daemon::connection::ConnectionStatus;
use daemon::model::cfd::{calculate_long_margin, Cfd, Order, OrderId, Role, UpdateCfdProposals};
use daemon::model::cfd::{calculate_long_margin, OrderId, Role};
use daemon::model::{Leverage, Price, Usd, WalletInfo};
use daemon::projection::Feeds;
use daemon::routes::EmbeddedFileExt;
use daemon::to_sse_event::{CfdAction, CfdsWithAuxData, ToSseEvent};
use daemon::{bitmex_price_feed, taker_cfd, wallet};
use daemon::{taker_cfd, wallet};
use http_api_problem::{HttpApiProblem, StatusCode};
use rocket::http::{ContentType, Status};
use rocket::response::stream::EventStream;
@ -21,19 +22,17 @@ use xtra::prelude::*;
#[rocket::get("/feed")]
pub async fn feed(
rx_cfds: &State<watch::Receiver<Vec<Cfd>>>,
rx_order: &State<watch::Receiver<Option<Order>>>,
rx: &State<Feeds>,
rx_wallet: &State<watch::Receiver<WalletInfo>>,
rx_quote: &State<watch::Receiver<bitmex_price_feed::Quote>>,
rx_settlements: &State<watch::Receiver<UpdateCfdProposals>>,
rx_maker_status: &State<watch::Receiver<ConnectionStatus>>,
network: &State<Network>,
) -> EventStream![] {
let mut rx_cfds = rx_cfds.inner().clone();
let mut rx_order = rx_order.inner().clone();
let rx = rx.inner();
let mut rx_cfds = rx.cfds.clone();
let mut rx_order = rx.order.clone();
let mut rx_quote = rx.quote.clone();
let mut rx_settlements = rx.settlements.clone();
let mut rx_wallet = rx_wallet.inner().clone();
let mut rx_quote = rx_quote.inner().clone();
let mut rx_settlements = rx_settlements.inner().clone();
let mut rx_maker_status = rx_maker_status.inner().clone();
let network = *network.inner();
@ -138,7 +137,7 @@ pub async fn post_cfd_action(
id: OrderId,
action: CfdAction,
cfd_action_channel: &State<Box<dyn MessageChannel<taker_cfd::CfdAction>>>,
quote_updates: &State<watch::Receiver<bitmex_price_feed::Quote>>,
feeds: &State<Feeds>,
) -> Result<status::Accepted<()>, HttpApiProblem> {
use taker_cfd::CfdAction::*;
let result = match action {
@ -153,7 +152,7 @@ pub async fn post_cfd_action(
}
CfdAction::Commit => cfd_action_channel.send(Commit { order_id: id }),
CfdAction::Settle => {
let current_price = quote_updates.borrow().for_taker();
let current_price = feeds.quote.borrow().for_taker();
cfd_action_channel.send(ProposeSettlement {
order_id: id,
current_price,

34
daemon/src/taker.rs

@ -3,11 +3,9 @@ use bdk::bitcoin::secp256k1::schnorrsig;
use bdk::bitcoin::{Address, Amount};
use bdk::{bitcoin, FeeRate};
use clap::{Parser, Subcommand};
use daemon::bitmex_price_feed::Quote;
use daemon::connection::connect;
use daemon::db::load_all_cfds;
use daemon::model::cfd::{Order, UpdateCfdProposals};
use daemon::model::{TakerId, WalletInfo};
use daemon::model::WalletInfo;
use daemon::seed::Seed;
use daemon::tokio_ext::FutureExt;
use daemon::{
@ -16,13 +14,11 @@ use daemon::{
};
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use tokio::sync::watch;
use tracing_subscriber::filter::LevelFilter;
use watch::channel;
use xtra::prelude::MessageChannel;
use xtra::Actor;
@ -254,31 +250,14 @@ async fn main() -> Result<()> {
let (task, init_quote) = bitmex_price_feed::new(projection_actor).await?;
tasks.add(task);
// TODO: Move to projection actor
let cfds = {
let mut conn = db.acquire().await?;
load_all_cfds(&mut conn).await?
};
let (cfd_feed_sender, cfd_feed_receiver) = channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = channel::<Option<Order>>(None);
let (update_cfd_feed_sender, update_cfd_feed_receiver) =
channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, quote_receiver) = channel::<Quote>(init_quote);
// TODO: Use this channel to convey maker status.
// For now, the receiver is dropped instead of managed by Rocket to
// highlight that we're not using it
let (connected_takers_feed_sender, _connected_takers_feed_receiver) =
watch::channel::<Vec<TakerId>>(vec![]);
tasks.add(projection_context.run(projection::Actor::new(
cfd_feed_sender,
order_feed_sender,
quote_sender,
update_cfd_feed_sender,
connected_takers_feed_sender,
)));
let (proj_actor, projection_feeds) = projection::Actor::new(cfds, init_quote);
tasks.add(projection_context.run(proj_actor));
let possible_addresses = resolve_maker_addresses(&opts.maker).await?;
@ -294,13 +273,10 @@ async fn main() -> Result<()> {
let cfd_action_channel = MessageChannel::<taker_cfd::CfdAction>::clone_channel(&cfd_actor_addr);
let rocket = rocket::custom(figment)
.manage(order_feed_receiver)
.manage(update_cfd_feed_receiver)
.manage(projection_feeds)
.manage(take_offer_channel)
.manage(cfd_action_channel)
.manage(cfd_feed_receiver)
.manage(wallet_feed_receiver)
.manage(quote_receiver)
.manage(bitcoin_network)
.manage(wallet)
.manage(maker_online_status_feed_receiver)

62
daemon/tests/harness/mod.rs

@ -4,15 +4,15 @@ use crate::harness::mocks::wallet::WalletActor;
use crate::schnorrsig;
use daemon::bitmex_price_feed::Quote;
use daemon::connection::{connect, ConnectionStatus};
use daemon::model::cfd::{Cfd, Order, Origin, UpdateCfdProposals};
use daemon::model::cfd::{Cfd, Order, Origin};
use daemon::model::{Price, TakerId, Timestamp, Usd};
use daemon::projection::Feeds;
use daemon::seed::Seed;
use daemon::{
db, maker_cfd, maker_inc_connections, projection, taker_cfd, MakerActorSystem, Tasks,
};
use rust_decimal_macros::dec;
use sqlx::SqlitePool;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::str::FromStr;
use std::task::Poll;
@ -94,25 +94,23 @@ pub struct Maker {
pub system:
MakerActorSystem<OracleActor, MonitorActor, maker_inc_connections::Actor, WalletActor>,
pub mocks: mocks::Mocks,
pub feeds: Feeds,
pub listen_addr: SocketAddr,
pub identity_pk: x25519_dalek::PublicKey,
cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
order_feed_receiver: watch::Receiver<Option<Order>>,
connected_takers_feed_receiver: watch::Receiver<Vec<TakerId>>,
_tasks: Tasks,
}
impl Maker {
pub fn cfd_feed(&mut self) -> &mut watch::Receiver<Vec<Cfd>> {
&mut self.cfd_feed_receiver
&mut self.feeds.cfds
}
pub fn order_feed(&mut self) -> &mut watch::Receiver<Option<Order>> {
&mut self.order_feed_receiver
&mut self.feeds.order
}
pub fn connected_takers_feed(&mut self) -> &mut watch::Receiver<Vec<TakerId>> {
&mut self.connected_takers_feed_receiver
&mut self.feeds.connected_takers
}
pub async fn start(config: &MakerConfig, listener: TcpListener) -> Self {
@ -162,21 +160,8 @@ impl Maker {
ask: Price::new(dec!(10000)).unwrap(),
};
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(vec![]);
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (update_cfd_feed_sender, _update_cfd_feed_receiver) =
watch::channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, _) = watch::channel::<Quote>(dummy_quote);
let (connected_takers_feed_sender, connected_takers_feed_receiver) =
watch::channel::<Vec<TakerId>>(vec![]);
tasks.add(projection_context.run(projection::Actor::new(
cfd_feed_sender,
order_feed_sender,
quote_sender,
update_cfd_feed_sender,
connected_takers_feed_sender,
)));
let (proj_actor, feeds) = projection::Actor::new(vec![], dummy_quote);
tasks.add(projection_context.run(proj_actor));
let address = listener.local_addr().unwrap();
@ -195,13 +180,11 @@ impl Maker {
Self {
system: maker,
feeds,
identity_pk,
listen_addr: address,
mocks,
_tasks: tasks,
cfd_feed_receiver,
order_feed_receiver,
connected_takers_feed_receiver,
}
}
@ -240,18 +223,17 @@ pub struct Taker {
pub id: TakerId,
pub system: daemon::TakerActorSystem<OracleActor, MonitorActor, WalletActor>,
pub mocks: mocks::Mocks,
cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
order_feed_receiver: watch::Receiver<Option<Order>>,
pub feeds: Feeds,
_tasks: Tasks,
}
impl Taker {
pub fn cfd_feed(&mut self) -> &mut watch::Receiver<Vec<Cfd>> {
&mut self.cfd_feed_receiver
&mut self.feeds.cfds
}
pub fn order_feed(&mut self) -> &mut watch::Receiver<Option<Order>> {
&mut self.order_feed_receiver
&mut self.feeds.order
}
pub fn maker_status_feed(&mut self) -> &mut watch::Receiver<ConnectionStatus> {
@ -299,21 +281,8 @@ impl Taker {
ask: Price::new(dec!(10000)).unwrap(),
};
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(vec![]);
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (update_cfd_feed_sender, _) = watch::channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, _) = watch::channel::<Quote>(dummy_quote);
let (connected_takers_feed_sender, _connected_takers_feed_receiver) =
watch::channel::<Vec<TakerId>>(vec![]);
tasks.add(projection_context.run(projection::Actor::new(
cfd_feed_sender,
order_feed_sender,
quote_sender,
update_cfd_feed_sender,
connected_takers_feed_sender,
)));
let (proj_actor, feeds) = projection::Actor::new(vec![], dummy_quote);
tasks.add(projection_context.run(proj_actor));
tasks.add(connect(
taker.maker_online_status_feed_receiver.clone(),
@ -325,10 +294,9 @@ impl Taker {
Self {
id: TakerId::new(identity_pk),
system: taker,
feeds,
mocks,
_tasks: tasks,
order_feed_receiver,
cfd_feed_receiver,
}
}

Loading…
Cancel
Save