Browse Source

Merge #663

663: Add connected takers feed r=klochowicz a=klochowicz

- add a new watch channel with information about connected takers from the maker's actor system
- test the watch channel behaviour with a unit test
- add a list of connected takers to maker's UI (for testing/monitoring)

This PR will enable us to write more elaborate assertions (e.g. reconnection of the same taker & resuming its operation) and also enables us to gauge the length & amount of inteactions with the maker

Co-authored-by: Mariusz Klochowicz <mariusz@klochowicz.com>
debug-collab-settlement
bors[bot] 3 years ago
committed by GitHub
parent
commit
563a4f3c74
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      daemon/src/lib.rs
  2. 25
      daemon/src/maker.rs
  3. 60
      daemon/src/maker_cfd.rs
  4. 26
      daemon/src/maker_inc_connections.rs
  5. 9
      daemon/src/projection.rs
  6. 12
      daemon/src/routes_maker.rs
  7. 9
      daemon/src/taker.rs
  8. 9
      daemon/src/to_sse_event.rs
  9. 20
      daemon/tests/happy_path.rs
  10. 39
      daemon/tests/harness/mod.rs
  11. 8
      maker-frontend/src/MakerApp.tsx
  12. 25
      maker-frontend/src/components/ConnectedTakers.tsx

7
daemon/src/lib.rs

@ -1,7 +1,7 @@
#![cfg_attr(not(test), warn(clippy::unwrap_used))] #![cfg_attr(not(test), warn(clippy::unwrap_used))]
#![warn(clippy::disallowed_method)] #![warn(clippy::disallowed_method)]
use crate::db::load_all_cfds; use crate::db::load_all_cfds;
use crate::maker_cfd::{FromTaker, NewTakerOnline}; use crate::maker_cfd::{FromTaker, TakerConnected};
use crate::model::cfd::{Cfd, Order, UpdateCfdProposals}; use crate::model::cfd::{Cfd, Order, UpdateCfdProposals};
use crate::oracle::Attestation; use crate::oracle::Attestation;
use crate::tokio_ext::FutureExt; use crate::tokio_ext::FutureExt;
@ -9,6 +9,7 @@ use anyhow::Result;
use connection::ConnectionStatus; use connection::ConnectionStatus;
use futures::future::RemoteHandle; use futures::future::RemoteHandle;
use maia::secp256k1_zkp::schnorrsig; use maia::secp256k1_zkp::schnorrsig;
use maker_cfd::TakerDisconnected;
use sqlx::SqlitePool; use sqlx::SqlitePool;
use std::future::Future; use std::future::Future;
use std::time::Duration; use std::time::Duration;
@ -117,7 +118,8 @@ where
oracle_constructor: impl FnOnce(Vec<Cfd>, Box<dyn StrongMessageChannel<Attestation>>) -> O, oracle_constructor: impl FnOnce(Vec<Cfd>, Box<dyn StrongMessageChannel<Attestation>>) -> O,
monitor_constructor: impl FnOnce(Box<dyn StrongMessageChannel<monitor::Event>>, Vec<Cfd>) -> F, monitor_constructor: impl FnOnce(Box<dyn StrongMessageChannel<monitor::Event>>, Vec<Cfd>) -> F,
inc_conn_constructor: impl FnOnce( inc_conn_constructor: impl FnOnce(
Box<dyn MessageChannel<NewTakerOnline>>, Box<dyn MessageChannel<TakerConnected>>,
Box<dyn MessageChannel<TakerDisconnected>>,
Box<dyn MessageChannel<FromTaker>>, Box<dyn MessageChannel<FromTaker>>,
) -> T, ) -> T,
settlement_interval: time::Duration, settlement_interval: time::Duration,
@ -156,6 +158,7 @@ where
tasks.add(inc_conn_ctx.run(inc_conn_constructor( tasks.add(inc_conn_ctx.run(inc_conn_constructor(
Box::new(cfd_actor_addr.clone()), Box::new(cfd_actor_addr.clone()),
Box::new(cfd_actor_addr.clone()), Box::new(cfd_actor_addr.clone()),
Box::new(cfd_actor_addr.clone()),
))); )));
tasks.add( tasks.add(

25
daemon/src/maker.rs

@ -7,7 +7,7 @@ use daemon::auth::{self, MAKER_USERNAME};
use daemon::bitmex_price_feed::Quote; use daemon::bitmex_price_feed::Quote;
use daemon::db::load_all_cfds; use daemon::db::load_all_cfds;
use daemon::model::cfd::{Order, UpdateCfdProposals}; use daemon::model::cfd::{Order, UpdateCfdProposals};
use daemon::model::WalletInfo; use daemon::model::{TakerId, WalletInfo};
use daemon::seed::Seed; use daemon::seed::Seed;
use daemon::tokio_ext::FutureExt; use daemon::tokio_ext::FutureExt;
use daemon::{ use daemon::{
@ -23,7 +23,6 @@ use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::task::Poll; use std::task::Poll;
use tokio::sync::watch; use tokio::sync::watch;
use tokio::sync::watch::channel;
use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::filter::LevelFilter;
use xtra::prelude::*; use xtra::prelude::*;
use xtra::Actor; use xtra::Actor;
@ -259,8 +258,14 @@ async fn main() -> Result<()> {
monitor::Actor::new(electrum, channel, cfds) monitor::Actor::new(electrum, channel, cfds)
} }
}, },
|channel0, channel1| { |channel0, channel1, channel2| {
maker_inc_connections::Actor::new(channel0, channel1, identity_sk, HEARTBEAT_INTERVAL) maker_inc_connections::Actor::new(
channel0,
channel1,
channel2,
identity_sk,
HEARTBEAT_INTERVAL,
)
}, },
SETTLEMENT_INTERVAL, SETTLEMENT_INTERVAL,
N_PAYOUTS, N_PAYOUTS,
@ -272,17 +277,20 @@ async fn main() -> Result<()> {
tasks.add(task); tasks.add(task);
let cfds = load_all_cfds(&mut conn).await?; let cfds = load_all_cfds(&mut conn).await?;
let (cfd_feed_sender, cfd_feed_receiver) = channel(cfds.clone()); let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = channel::<Option<Order>>(None); let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (update_cfd_feed_sender, update_cfd_feed_receiver) = let (update_cfd_feed_sender, update_cfd_feed_receiver) =
channel::<UpdateCfdProposals>(HashMap::new()); watch::channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, quote_receiver) = channel::<Quote>(init_quote); let (quote_sender, quote_receiver) = watch::channel::<Quote>(init_quote);
let (connected_takers_feed_sender, connected_takers_feed_receiver) =
watch::channel::<Vec<TakerId>>(Vec::new());
tasks.add(projection_context.run(projection::Actor::new( tasks.add(projection_context.run(projection::Actor::new(
cfd_feed_sender, cfd_feed_sender,
order_feed_sender, order_feed_sender,
quote_sender, quote_sender,
update_cfd_feed_sender, update_cfd_feed_sender,
connected_takers_feed_sender,
))); )));
let listener_stream = futures::stream::poll_fn(move |ctx| { let listener_stream = futures::stream::poll_fn(move |ctx| {
@ -309,6 +317,7 @@ async fn main() -> Result<()> {
.manage(cfd_action_channel) .manage(cfd_action_channel)
.manage(new_order_channel) .manage(new_order_channel)
.manage(cfd_feed_receiver) .manage(cfd_feed_receiver)
.manage(connected_takers_feed_receiver)
.manage(wallet_feed_receiver) .manage(wallet_feed_receiver)
.manage(auth_password) .manage(auth_password)
.manage(quote_receiver) .manage(quote_receiver)

60
daemon/src/maker_cfd.rs

@ -6,6 +6,7 @@ use crate::model::cfd::{
}; };
use crate::model::{Price, TakerId, Timestamp, Usd}; use crate::model::{Price, TakerId, Timestamp, Usd};
use crate::monitor::MonitorParams; use crate::monitor::MonitorParams;
use crate::projection::Update;
use crate::setup_contract::{RolloverParams, SetupParams}; use crate::setup_contract::{RolloverParams, SetupParams};
use crate::tokio_ext::FutureExt; use crate::tokio_ext::FutureExt;
use crate::{ use crate::{
@ -21,7 +22,7 @@ use futures::{future, SinkExt};
use maia::secp256k1_zkp::Signature; use maia::secp256k1_zkp::Signature;
use sqlx::pool::PoolConnection; use sqlx::pool::PoolConnection;
use sqlx::Sqlite; use sqlx::Sqlite;
use std::collections::HashMap; use std::collections::{HashMap, HashSet};
use time::Duration; use time::Duration;
use xtra::prelude::*; use xtra::prelude::*;
@ -41,7 +42,11 @@ pub struct NewOrder {
pub max_quantity: Usd, pub max_quantity: Usd,
} }
pub struct NewTakerOnline { pub struct TakerConnected {
pub id: TakerId,
}
pub struct TakerDisconnected {
pub id: TakerId, pub id: TakerId,
} }
@ -75,6 +80,7 @@ pub struct Actor<O, M, T, W> {
// Maker needs to also store TakerId to be able to send a reply back // Maker needs to also store TakerId to be able to send a reply back
current_pending_proposals: HashMap<OrderId, (UpdateCfdProposal, TakerId)>, current_pending_proposals: HashMap<OrderId, (UpdateCfdProposal, TakerId)>,
current_agreed_proposals: HashMap<OrderId, (SettlementProposal, TakerId)>, current_agreed_proposals: HashMap<OrderId, (SettlementProposal, TakerId)>,
connected_takers: HashSet<TakerId>,
n_payouts: usize, n_payouts: usize,
} }
@ -124,6 +130,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
current_pending_proposals: HashMap::new(), current_pending_proposals: HashMap::new(),
current_agreed_proposals: HashMap::new(), current_agreed_proposals: HashMap::new(),
n_payouts, n_payouts,
connected_takers: HashSet::new(),
} }
} }
@ -276,6 +283,18 @@ impl<O, M, T, W> Actor<O, M, T, W> {
}; };
Ok((proposal.clone(), *taker_id)) Ok((proposal.clone(), *taker_id))
} }
async fn update_connected_takers(&mut self) -> Result<()> {
self.projection_actor
.send(Update(
self.connected_takers
.clone()
.into_iter()
.collect::<Vec<TakerId>>(),
))
.await?;
Ok(())
}
} }
impl<O, M, T, W> Actor<O, M, T, W> impl<O, M, T, W> Actor<O, M, T, W>
@ -314,7 +333,7 @@ impl<O, M, T, W> Actor<O, M, T, W>
where where
T: xtra::Handler<maker_inc_connections::TakerMessage>, T: xtra::Handler<maker_inc_connections::TakerMessage>,
{ {
async fn handle_new_taker_online(&mut self, taker_id: TakerId) -> Result<()> { async fn handle_taker_connected(&mut self, taker_id: TakerId) -> Result<()> {
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
let current_order = match self.current_order_id { let current_order = match self.current_order_id {
@ -332,6 +351,18 @@ where
}) })
.await?; .await?;
if !self.connected_takers.insert(taker_id) {
tracing::warn!("Taker already connected: {:?}", &taker_id);
}
self.update_connected_takers().await?;
Ok(())
}
async fn handle_taker_disconnected(&mut self, taker_id: TakerId) -> Result<()> {
if !self.connected_takers.remove(&taker_id) {
tracing::warn!("Removed unknown taker: {:?}", &taker_id);
}
self.update_connected_takers().await?;
Ok(()) Ok(())
} }
@ -998,12 +1029,23 @@ where
} }
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<NewTakerOnline> for Actor<O, M, T, W> impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<TakerConnected> for Actor<O, M, T, W>
where
T: xtra::Handler<maker_inc_connections::TakerMessage>,
{
async fn handle(&mut self, msg: TakerConnected, _ctx: &mut Context<Self>) {
log_error!(self.handle_taker_connected(msg.id));
}
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<TakerDisconnected>
for Actor<O, M, T, W>
where where
T: xtra::Handler<maker_inc_connections::TakerMessage>, T: xtra::Handler<maker_inc_connections::TakerMessage>,
{ {
async fn handle(&mut self, msg: NewTakerOnline, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: TakerDisconnected, _ctx: &mut Context<Self>) {
log_error!(self.handle_new_taker_online(msg.id)); log_error!(self.handle_taker_disconnected(msg.id));
} }
} }
@ -1116,7 +1158,11 @@ impl Message for NewOrder {
type Result = Result<()>; type Result = Result<()>;
} }
impl Message for NewTakerOnline { impl Message for TakerConnected {
type Result = ();
}
impl Message for TakerDisconnected {
type Result = (); type Result = ();
} }

26
daemon/src/maker_inc_connections.rs

@ -1,4 +1,4 @@
use crate::maker_cfd::{FromTaker, NewTakerOnline}; use crate::maker_cfd::{FromTaker, TakerConnected, TakerDisconnected};
use crate::model::cfd::Order; use crate::model::cfd::Order;
use crate::model::TakerId; use crate::model::TakerId;
use crate::noise::TransportStateExt; use crate::noise::TransportStateExt;
@ -36,7 +36,8 @@ pub enum ListenerMessage {
pub struct Actor { pub struct Actor {
write_connections: HashMap<TakerId, Address<send_to_socket::Actor<wire::MakerToTaker>>>, write_connections: HashMap<TakerId, Address<send_to_socket::Actor<wire::MakerToTaker>>>,
new_taker_channel: Box<dyn MessageChannel<NewTakerOnline>>, taker_connected_channel: Box<dyn MessageChannel<TakerConnected>>,
taker_disconnected_channel: Box<dyn MessageChannel<TakerDisconnected>>,
taker_msg_channel: Box<dyn MessageChannel<FromTaker>>, taker_msg_channel: Box<dyn MessageChannel<FromTaker>>,
noise_priv_key: x25519_dalek::StaticSecret, noise_priv_key: x25519_dalek::StaticSecret,
heartbeat_interval: Duration, heartbeat_interval: Duration,
@ -45,14 +46,16 @@ pub struct Actor {
impl Actor { impl Actor {
pub fn new( pub fn new(
new_taker_channel: Box<dyn MessageChannel<NewTakerOnline>>, taker_connected_channel: Box<dyn MessageChannel<TakerConnected>>,
taker_disconnected_channel: Box<dyn MessageChannel<TakerDisconnected>>,
taker_msg_channel: Box<dyn MessageChannel<FromTaker>>, taker_msg_channel: Box<dyn MessageChannel<FromTaker>>,
noise_priv_key: x25519_dalek::StaticSecret, noise_priv_key: x25519_dalek::StaticSecret,
heartbeat_interval: Duration, heartbeat_interval: Duration,
) -> Self { ) -> Self {
Self { Self {
write_connections: HashMap::new(), write_connections: HashMap::new(),
new_taker_channel: new_taker_channel.clone_channel(), taker_connected_channel: taker_connected_channel.clone_channel(),
taker_disconnected_channel: taker_disconnected_channel.clone_channel(),
taker_msg_channel: taker_msg_channel.clone_channel(), taker_msg_channel: taker_msg_channel.clone_channel(),
noise_priv_key, noise_priv_key,
heartbeat_interval, heartbeat_interval,
@ -74,7 +77,12 @@ impl Actor {
if conn.send(msg).await.is_err() { if conn.send(msg).await.is_err() {
tracing::info!(%taker_id, "Failed to send {} to taker, removing connection", msg_str); tracing::info!(%taker_id, "Failed to send {} to taker, removing connection", msg_str);
self.write_connections.remove(taker_id); if self.write_connections.remove(taker_id).is_some() {
let _ = self
.taker_disconnected_channel
.send(maker_cfd::TakerDisconnected { id: *taker_id })
.await;
}
} }
Ok(()) Ok(())
@ -108,6 +116,7 @@ impl Actor {
// only allow outgoing messages while we are successfully reading incoming ones // only allow outgoing messages while we are successfully reading incoming ones
let heartbeat_interval = self.heartbeat_interval; let heartbeat_interval = self.heartbeat_interval;
let taker_disconnected_channel = self.taker_disconnected_channel.clone_channel();
self.tasks.add(async move { self.tasks.add(async move {
let mut actor = send_to_socket::Actor::new(write, transport_state.clone()); let mut actor = send_to_socket::Actor::new(write, transport_state.clone());
@ -121,6 +130,9 @@ impl Actor {
.await; .await;
tracing::error!("Closing connection to taker {}", taker_id); tracing::error!("Closing connection to taker {}", taker_id);
let _ = taker_disconnected_channel
.send(maker_cfd::TakerDisconnected { id: taker_id })
.await;
actor.shutdown().await; actor.shutdown().await;
}); });
@ -129,8 +141,8 @@ impl Actor {
.insert(taker_id, out_msg_actor_address); .insert(taker_id, out_msg_actor_address);
let _ = self let _ = self
.new_taker_channel .taker_connected_channel
.send(maker_cfd::NewTakerOnline { id: taker_id }) .send(maker_cfd::TakerConnected { id: taker_id })
.await; .await;
Ok(()) Ok(())

9
daemon/src/projection.rs

@ -1,4 +1,5 @@
use crate::bitmex_price_feed::Quote; use crate::bitmex_price_feed::Quote;
use crate::model::TakerId;
use crate::{Cfd, Order, UpdateCfdProposals}; use crate::{Cfd, Order, UpdateCfdProposals};
use tokio::sync::watch; use tokio::sync::watch;
use xtra_productivity::xtra_productivity; use xtra_productivity::xtra_productivity;
@ -8,6 +9,9 @@ pub struct Actor {
tx_order: watch::Sender<Option<Order>>, tx_order: watch::Sender<Option<Order>>,
tx_quote: watch::Sender<Quote>, tx_quote: watch::Sender<Quote>,
tx_settlements: watch::Sender<UpdateCfdProposals>, tx_settlements: watch::Sender<UpdateCfdProposals>,
// TODO: Use this channel to communicate maker status as well with generic
// ID of connected counterparties
tx_connected_takers: watch::Sender<Vec<TakerId>>,
} }
impl Actor { impl Actor {
@ -16,12 +20,14 @@ impl Actor {
tx_order: watch::Sender<Option<Order>>, tx_order: watch::Sender<Option<Order>>,
tx_quote: watch::Sender<Quote>, tx_quote: watch::Sender<Quote>,
tx_settlements: watch::Sender<UpdateCfdProposals>, tx_settlements: watch::Sender<UpdateCfdProposals>,
tx_connected_takers: watch::Sender<Vec<TakerId>>,
) -> Self { ) -> Self {
Self { Self {
tx_cfds, tx_cfds,
tx_order, tx_order,
tx_quote, tx_quote,
tx_settlements, tx_settlements,
tx_connected_takers,
} }
} }
} }
@ -42,6 +48,9 @@ impl Actor {
fn handle(&mut self, msg: Update<UpdateCfdProposals>) { fn handle(&mut self, msg: Update<UpdateCfdProposals>) {
let _ = self.tx_settlements.send(msg.0); let _ = self.tx_settlements.send(msg.0);
} }
fn handle(&mut self, msg: Update<Vec<TakerId>>) {
let _ = self.tx_connected_takers.send(msg.0);
}
} }
impl xtra::Actor for Actor {} impl xtra::Actor for Actor {}

12
daemon/src/routes_maker.rs

@ -2,7 +2,7 @@ use anyhow::Result;
use bdk::bitcoin::Network; use bdk::bitcoin::Network;
use daemon::auth::Authenticated; use daemon::auth::Authenticated;
use daemon::model::cfd::{Cfd, Order, OrderId, Role, UpdateCfdProposals}; use daemon::model::cfd::{Cfd, Order, OrderId, Role, UpdateCfdProposals};
use daemon::model::{Price, Usd, WalletInfo}; use daemon::model::{Price, TakerId, Usd, WalletInfo};
use daemon::routes::EmbeddedFileExt; use daemon::routes::EmbeddedFileExt;
use daemon::to_sse_event::{CfdAction, CfdsWithAuxData, ToSseEvent}; use daemon::to_sse_event::{CfdAction, CfdsWithAuxData, ToSseEvent};
use daemon::{bitmex_price_feed, maker_cfd, wallet}; use daemon::{bitmex_price_feed, maker_cfd, wallet};
@ -20,6 +20,7 @@ use tokio::select;
use tokio::sync::watch; use tokio::sync::watch;
use xtra::prelude::*; use xtra::prelude::*;
#[allow(clippy::too_many_arguments)]
#[rocket::get("/feed")] #[rocket::get("/feed")]
pub async fn maker_feed( pub async fn maker_feed(
rx_cfds: &State<watch::Receiver<Vec<Cfd>>>, rx_cfds: &State<watch::Receiver<Vec<Cfd>>>,
@ -27,6 +28,7 @@ pub async fn maker_feed(
rx_wallet: &State<watch::Receiver<WalletInfo>>, rx_wallet: &State<watch::Receiver<WalletInfo>>,
rx_quote: &State<watch::Receiver<bitmex_price_feed::Quote>>, rx_quote: &State<watch::Receiver<bitmex_price_feed::Quote>>,
rx_settlements: &State<watch::Receiver<UpdateCfdProposals>>, rx_settlements: &State<watch::Receiver<UpdateCfdProposals>>,
rx_connected_takers: &State<watch::Receiver<Vec<TakerId>>>,
network: &State<Network>, network: &State<Network>,
_auth: Authenticated, _auth: Authenticated,
) -> EventStream![] { ) -> EventStream![] {
@ -35,6 +37,7 @@ pub async fn maker_feed(
let mut rx_wallet = rx_wallet.inner().clone(); let mut rx_wallet = rx_wallet.inner().clone();
let mut rx_quote = rx_quote.inner().clone(); let mut rx_quote = rx_quote.inner().clone();
let mut rx_settlements = rx_settlements.inner().clone(); let mut rx_settlements = rx_settlements.inner().clone();
let mut rx_connected_takers = rx_connected_takers.inner().clone();
let network = *network.inner(); let network = *network.inner();
EventStream! { EventStream! {
@ -54,6 +57,9 @@ pub async fn maker_feed(
Role::Maker, network Role::Maker, network
).to_sse_event(); ).to_sse_event();
let takers = rx_connected_takers.borrow().clone();
yield takers.to_sse_event();
loop{ loop{
select! { select! {
Ok(()) = rx_wallet.changed() => { Ok(()) = rx_wallet.changed() => {
@ -64,6 +70,10 @@ pub async fn maker_feed(
let order = rx_order.borrow().clone(); let order = rx_order.borrow().clone();
yield order.to_sse_event(); yield order.to_sse_event();
} }
Ok(()) = rx_connected_takers.changed() => {
let takers = rx_connected_takers.borrow().clone();
yield takers.to_sse_event();
}
Ok(()) = rx_cfds.changed() => { Ok(()) = rx_cfds.changed() => {
yield CfdsWithAuxData::new( yield CfdsWithAuxData::new(
&rx_cfds, &rx_cfds,

9
daemon/src/taker.rs

@ -7,7 +7,7 @@ use daemon::bitmex_price_feed::Quote;
use daemon::connection::connect; use daemon::connection::connect;
use daemon::db::load_all_cfds; use daemon::db::load_all_cfds;
use daemon::model::cfd::{Order, UpdateCfdProposals}; use daemon::model::cfd::{Order, UpdateCfdProposals};
use daemon::model::WalletInfo; use daemon::model::{TakerId, WalletInfo};
use daemon::seed::Seed; use daemon::seed::Seed;
use daemon::tokio_ext::FutureExt; use daemon::tokio_ext::FutureExt;
use daemon::{ use daemon::{
@ -264,11 +264,18 @@ async fn main() -> Result<()> {
channel::<UpdateCfdProposals>(HashMap::new()); channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, quote_receiver) = channel::<Quote>(init_quote); let (quote_sender, quote_receiver) = channel::<Quote>(init_quote);
// TODO: Use this channel to convey maker status.
// For now, the receiver is dropped instead of managed by Rocket to
// highlight that we're not using it
let (connected_takers_feed_sender, _connected_takers_feed_receiver) =
watch::channel::<Vec<TakerId>>(vec![]);
tasks.add(projection_context.run(projection::Actor::new( tasks.add(projection_context.run(projection::Actor::new(
cfd_feed_sender, cfd_feed_sender,
order_feed_sender, order_feed_sender,
quote_sender, quote_sender,
update_cfd_feed_sender, update_cfd_feed_sender,
connected_takers_feed_sender,
))); )));
let possible_addresses = resolve_maker_addresses(&opts.maker).await?; let possible_addresses = resolve_maker_addresses(&opts.maker).await?;

9
daemon/src/to_sse_event.rs

@ -2,7 +2,7 @@ use crate::connection::ConnectionStatus;
use crate::model::cfd::{ use crate::model::cfd::{
Dlc, OrderId, Payout, Role, SettlementKind, UpdateCfdProposal, UpdateCfdProposals, Dlc, OrderId, Payout, Role, SettlementKind, UpdateCfdProposal, UpdateCfdProposals,
}; };
use crate::model::{Leverage, Position, Timestamp, TradingPair}; use crate::model::{Leverage, Position, TakerId, Timestamp, TradingPair};
use crate::{bitmex_price_feed, model}; use crate::{bitmex_price_feed, model};
use bdk::bitcoin::{Amount, Network, SignedAmount, Txid}; use bdk::bitcoin::{Amount, Network, SignedAmount, Txid};
use rocket::request::FromParam; use rocket::request::FromParam;
@ -327,6 +327,13 @@ impl ToSseEvent for CfdsWithAuxData {
} }
} }
impl ToSseEvent for Vec<TakerId> {
fn to_sse_event(&self) -> Event {
let takers = self.iter().map(|x| x.to_string()).collect::<Vec<_>>();
Event::json(&takers).event("takers")
}
}
impl ToSseEvent for Option<model::cfd::Order> { impl ToSseEvent for Option<model::cfd::Order> {
fn to_sse_event(&self) -> Event { fn to_sse_event(&self) -> Event {
let order = self.clone().map(|order| CfdOrder { let order = self.clone().map(|order| CfdOrder {

20
daemon/tests/happy_path.rs

@ -5,7 +5,7 @@ use crate::harness::{
}; };
use daemon::connection::ConnectionStatus; use daemon::connection::ConnectionStatus;
use daemon::model::cfd::CfdState; use daemon::model::cfd::CfdState;
use daemon::model::Usd; use daemon::model::{TakerId, Usd};
use maia::secp256k1_zkp::schnorrsig; use maia::secp256k1_zkp::schnorrsig;
use rust_decimal_macros::dec; use rust_decimal_macros::dec;
use tokio::time::sleep; use tokio::time::sleep;
@ -154,3 +154,21 @@ async fn taker_notices_lack_of_maker() {
next(taker.maker_status_feed()).await.unwrap(), next(taker.maker_status_feed()).await.unwrap(),
); );
} }
#[tokio::test]
async fn maker_notices_lack_of_taker() {
let _guard = init_tracing();
let (mut maker, taker) = start_both().await;
assert_eq!(
vec![taker.id],
next(maker.connected_takers_feed()).await.unwrap()
);
std::mem::drop(taker);
assert_eq!(
Vec::<TakerId>::new(),
next(maker.connected_takers_feed()).await.unwrap()
);
}

39
daemon/tests/harness/mod.rs

@ -6,7 +6,7 @@ use daemon::bitmex_price_feed::Quote;
use daemon::connection::{connect, ConnectionStatus}; use daemon::connection::{connect, ConnectionStatus};
use daemon::maker_cfd::CfdAction; use daemon::maker_cfd::CfdAction;
use daemon::model::cfd::{Cfd, Order, Origin, UpdateCfdProposals}; use daemon::model::cfd::{Cfd, Order, Origin, UpdateCfdProposals};
use daemon::model::{Price, Timestamp, Usd}; use daemon::model::{Price, TakerId, Timestamp, Usd};
use daemon::seed::Seed; use daemon::seed::Seed;
use daemon::{ use daemon::{
db, maker_cfd, maker_inc_connections, projection, taker_cfd, MakerActorSystem, Tasks, db, maker_cfd, maker_inc_connections, projection, taker_cfd, MakerActorSystem, Tasks,
@ -20,7 +20,6 @@ use std::task::Poll;
use std::time::Duration; use std::time::Duration;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::sync::watch; use tokio::sync::watch;
use tokio::sync::watch::channel;
use tracing::subscriber::DefaultGuard; use tracing::subscriber::DefaultGuard;
use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::filter::LevelFilter;
use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::util::SubscriberInitExt;
@ -59,6 +58,7 @@ pub struct Maker {
pub identity_pk: x25519_dalek::PublicKey, pub identity_pk: x25519_dalek::PublicKey,
cfd_feed_receiver: watch::Receiver<Vec<Cfd>>, cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
order_feed_receiver: watch::Receiver<Option<Order>>, order_feed_receiver: watch::Receiver<Option<Order>>,
connected_takers_feed_receiver: watch::Receiver<Vec<TakerId>>,
_tasks: Tasks, _tasks: Tasks,
} }
@ -71,6 +71,10 @@ impl Maker {
&mut self.order_feed_receiver &mut self.order_feed_receiver
} }
pub fn connected_takers_feed(&mut self) -> &mut watch::Receiver<Vec<TakerId>> {
&mut self.connected_takers_feed_receiver
}
pub async fn start( pub async fn start(
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
seed: Seed, seed: Seed,
@ -100,10 +104,11 @@ impl Maker {
oracle_pk, oracle_pk,
|_, _| oracle, |_, _| oracle,
|_, _| async { Ok(monitor) }, |_, _| async { Ok(monitor) },
|channel0, channel1| { |channel0, channel1, channel2| {
maker_inc_connections::Actor::new( maker_inc_connections::Actor::new(
channel0, channel0,
channel1, channel1,
channel2,
identity_sk, identity_sk,
HEARTBEAT_INTERVAL_FOR_TEST, HEARTBEAT_INTERVAL_FOR_TEST,
) )
@ -121,17 +126,20 @@ impl Maker {
ask: Price::new(dec!(10000)).unwrap(), ask: Price::new(dec!(10000)).unwrap(),
}; };
let (cfd_feed_sender, cfd_feed_receiver) = channel(vec![]); let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(vec![]);
let (order_feed_sender, order_feed_receiver) = channel::<Option<Order>>(None); let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (update_cfd_feed_sender, _update_cfd_feed_receiver) = let (update_cfd_feed_sender, _update_cfd_feed_receiver) =
channel::<UpdateCfdProposals>(HashMap::new()); watch::channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, _) = channel::<Quote>(dummy_quote); let (quote_sender, _) = watch::channel::<Quote>(dummy_quote);
let (connected_takers_feed_sender, connected_takers_feed_receiver) =
watch::channel::<Vec<TakerId>>(vec![]);
tasks.add(projection_context.run(projection::Actor::new( tasks.add(projection_context.run(projection::Actor::new(
cfd_feed_sender, cfd_feed_sender,
order_feed_sender, order_feed_sender,
quote_sender, quote_sender,
update_cfd_feed_sender, update_cfd_feed_sender,
connected_takers_feed_sender,
))); )));
let address = listener.local_addr().unwrap(); let address = listener.local_addr().unwrap();
@ -157,6 +165,7 @@ impl Maker {
_tasks: tasks, _tasks: tasks,
cfd_feed_receiver, cfd_feed_receiver,
order_feed_receiver, order_feed_receiver,
connected_takers_feed_receiver,
} }
} }
@ -192,6 +201,7 @@ impl Maker {
/// Taker Test Setup /// Taker Test Setup
pub struct Taker { pub struct Taker {
pub id: TakerId,
pub system: daemon::TakerActorSystem<OracleActor, MonitorActor, WalletActor>, pub system: daemon::TakerActorSystem<OracleActor, MonitorActor, WalletActor>,
pub mocks: mocks::Mocks, pub mocks: mocks::Mocks,
cfd_feed_receiver: watch::Receiver<Vec<Cfd>>, cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
@ -219,7 +229,7 @@ impl Taker {
) -> Self { ) -> Self {
let seed = Seed::default(); let seed = Seed::default();
let (_, identity_sk) = seed.derive_identity(); let (identity_pk, identity_sk) = seed.derive_identity();
let db = in_memory_db().await; let db = in_memory_db().await;
@ -255,16 +265,20 @@ impl Taker {
ask: Price::new(dec!(10000)).unwrap(), ask: Price::new(dec!(10000)).unwrap(),
}; };
let (cfd_feed_sender, cfd_feed_receiver) = channel(vec![]); let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(vec![]);
let (order_feed_sender, order_feed_receiver) = channel::<Option<Order>>(None); let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (update_cfd_feed_sender, _) = channel::<UpdateCfdProposals>(HashMap::new()); let (update_cfd_feed_sender, _) = watch::channel::<UpdateCfdProposals>(HashMap::new());
let (quote_sender, _) = channel::<Quote>(dummy_quote); let (quote_sender, _) = watch::channel::<Quote>(dummy_quote);
let (connected_takers_feed_sender, _connected_takers_feed_receiver) =
watch::channel::<Vec<TakerId>>(vec![]);
tasks.add(projection_context.run(projection::Actor::new( tasks.add(projection_context.run(projection::Actor::new(
cfd_feed_sender, cfd_feed_sender,
order_feed_sender, order_feed_sender,
quote_sender, quote_sender,
update_cfd_feed_sender, update_cfd_feed_sender,
connected_takers_feed_sender,
))); )));
tasks.add(connect( tasks.add(connect(
@ -275,6 +289,7 @@ impl Taker {
)); ));
Self { Self {
id: TakerId::new(identity_pk),
system: taker, system: taker,
mocks, mocks,
_tasks: tasks, _tasks: tasks,

8
maker-frontend/src/MakerApp.tsx

@ -20,6 +20,7 @@ import React, { useEffect, useState } from "react";
import { useAsync } from "react-async"; import { useAsync } from "react-async";
import { useEventSource } from "react-sse-hooks"; import { useEventSource } from "react-sse-hooks";
import { CfdTable } from "./components/cfdtables/CfdTable"; import { CfdTable } from "./components/cfdtables/CfdTable";
import ConnectedTakers, { TakerId } from "./components/ConnectedTakers";
import CurrencyInputField from "./components/CurrencyInputField"; import CurrencyInputField from "./components/CurrencyInputField";
import CurrentPrice from "./components/CurrentPrice"; import CurrentPrice from "./components/CurrentPrice";
import createErrorToast from "./components/ErrorToast"; import createErrorToast from "./components/ErrorToast";
@ -41,6 +42,8 @@ export default function App() {
const order = useLatestEvent<Order>(source, "order", intoOrder); const order = useLatestEvent<Order>(source, "order", intoOrder);
const walletInfo = useLatestEvent<WalletInfo>(source, "wallet"); const walletInfo = useLatestEvent<WalletInfo>(source, "wallet");
const priceInfo = useLatestEvent<PriceInfo>(source, "quote"); const priceInfo = useLatestEvent<PriceInfo>(source, "quote");
const takersOrUndefined = useLatestEvent<TakerId[]>(source, "takers");
let takers = takersOrUndefined || [];
const toast = useToast(); const toast = useToast();
@ -151,7 +154,10 @@ export default function App() {
</GridItem> </GridItem>
</Grid> </Grid>
</VStack> </VStack>
{order && <OrderTile order={order} />} <VStack>
<ConnectedTakers takers={takers} />
{order && <OrderTile order={order} />}
</VStack>
<Box width="40%" /> <Box width="40%" />
</HStack> </HStack>

25
maker-frontend/src/components/ConnectedTakers.tsx

@ -0,0 +1,25 @@
import { Heading, ListItem, UnorderedList, VStack } from "@chakra-ui/react";
import React from "react";
export interface TakerId {
id: string;
}
interface Props {
takers: TakerId[];
}
const ConnectedTakers = ({ takers }: Props) => {
return (
<VStack spacing={3}>
<Heading size={"sm"} padding={2}>{"Connected takers: " + takers.length}</Heading>
<UnorderedList>
{takers.map((taker) => {
return (<ListItem>{taker}</ListItem>);
})}
</UnorderedList>
</VStack>
);
};
export default ConnectedTakers;
Loading…
Cancel
Save