diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index a7ebded..236445b 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -1,7 +1,7 @@ #![cfg_attr(not(test), warn(clippy::unwrap_used))] #![warn(clippy::disallowed_method)] use crate::db::load_all_cfds; -use crate::maker_cfd::{FromTaker, NewTakerOnline}; +use crate::maker_cfd::{FromTaker, TakerConnected}; use crate::model::cfd::{Cfd, Order, UpdateCfdProposals}; use crate::oracle::Attestation; use crate::tokio_ext::FutureExt; @@ -9,6 +9,7 @@ use anyhow::Result; use connection::ConnectionStatus; use futures::future::RemoteHandle; use maia::secp256k1_zkp::schnorrsig; +use maker_cfd::TakerDisconnected; use sqlx::SqlitePool; use std::future::Future; use std::time::Duration; @@ -117,7 +118,8 @@ where oracle_constructor: impl FnOnce(Vec, Box>) -> O, monitor_constructor: impl FnOnce(Box>, Vec) -> F, inc_conn_constructor: impl FnOnce( - Box>, + Box>, + Box>, Box>, ) -> T, settlement_interval: time::Duration, @@ -156,6 +158,7 @@ where tasks.add(inc_conn_ctx.run(inc_conn_constructor( Box::new(cfd_actor_addr.clone()), Box::new(cfd_actor_addr.clone()), + Box::new(cfd_actor_addr.clone()), ))); tasks.add( diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index b17a28d..19eb9bd 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -7,7 +7,7 @@ use daemon::auth::{self, MAKER_USERNAME}; use daemon::bitmex_price_feed::Quote; use daemon::db::load_all_cfds; use daemon::model::cfd::{Order, UpdateCfdProposals}; -use daemon::model::WalletInfo; +use daemon::model::{TakerId, WalletInfo}; use daemon::seed::Seed; use daemon::tokio_ext::FutureExt; use daemon::{ @@ -23,7 +23,6 @@ use std::path::PathBuf; use std::str::FromStr; use std::task::Poll; use tokio::sync::watch; -use tokio::sync::watch::channel; use tracing_subscriber::filter::LevelFilter; use xtra::prelude::*; use xtra::Actor; @@ -259,8 +258,14 @@ async fn main() -> Result<()> { monitor::Actor::new(electrum, channel, cfds) } }, - |channel0, channel1| { - maker_inc_connections::Actor::new(channel0, channel1, identity_sk, HEARTBEAT_INTERVAL) + |channel0, channel1, channel2| { + maker_inc_connections::Actor::new( + channel0, + channel1, + channel2, + identity_sk, + HEARTBEAT_INTERVAL, + ) }, SETTLEMENT_INTERVAL, N_PAYOUTS, @@ -272,17 +277,20 @@ async fn main() -> Result<()> { tasks.add(task); let cfds = load_all_cfds(&mut conn).await?; - let (cfd_feed_sender, cfd_feed_receiver) = channel(cfds.clone()); - let (order_feed_sender, order_feed_receiver) = channel::>(None); + let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); + let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); let (update_cfd_feed_sender, update_cfd_feed_receiver) = - channel::(HashMap::new()); - let (quote_sender, quote_receiver) = channel::(init_quote); + watch::channel::(HashMap::new()); + let (quote_sender, quote_receiver) = watch::channel::(init_quote); + let (connected_takers_feed_sender, connected_takers_feed_receiver) = + watch::channel::>(Vec::new()); tasks.add(projection_context.run(projection::Actor::new( cfd_feed_sender, order_feed_sender, quote_sender, update_cfd_feed_sender, + connected_takers_feed_sender, ))); let listener_stream = futures::stream::poll_fn(move |ctx| { @@ -309,6 +317,7 @@ async fn main() -> Result<()> { .manage(cfd_action_channel) .manage(new_order_channel) .manage(cfd_feed_receiver) + .manage(connected_takers_feed_receiver) .manage(wallet_feed_receiver) .manage(auth_password) .manage(quote_receiver) diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 1a5ea9c..10fcc2c 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -6,6 +6,7 @@ use crate::model::cfd::{ }; use crate::model::{Price, TakerId, Timestamp, Usd}; use crate::monitor::MonitorParams; +use crate::projection::Update; use crate::setup_contract::{RolloverParams, SetupParams}; use crate::tokio_ext::FutureExt; use crate::{ @@ -21,7 +22,7 @@ use futures::{future, SinkExt}; use maia::secp256k1_zkp::Signature; use sqlx::pool::PoolConnection; use sqlx::Sqlite; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use time::Duration; use xtra::prelude::*; @@ -41,7 +42,11 @@ pub struct NewOrder { pub max_quantity: Usd, } -pub struct NewTakerOnline { +pub struct TakerConnected { + pub id: TakerId, +} + +pub struct TakerDisconnected { pub id: TakerId, } @@ -75,6 +80,7 @@ pub struct Actor { // Maker needs to also store TakerId to be able to send a reply back current_pending_proposals: HashMap, current_agreed_proposals: HashMap, + connected_takers: HashSet, n_payouts: usize, } @@ -124,6 +130,7 @@ impl Actor { current_pending_proposals: HashMap::new(), current_agreed_proposals: HashMap::new(), n_payouts, + connected_takers: HashSet::new(), } } @@ -276,6 +283,18 @@ impl Actor { }; Ok((proposal.clone(), *taker_id)) } + + async fn update_connected_takers(&mut self) -> Result<()> { + self.projection_actor + .send(Update( + self.connected_takers + .clone() + .into_iter() + .collect::>(), + )) + .await?; + Ok(()) + } } impl Actor @@ -314,7 +333,7 @@ impl Actor where T: xtra::Handler, { - async fn handle_new_taker_online(&mut self, taker_id: TakerId) -> Result<()> { + async fn handle_taker_connected(&mut self, taker_id: TakerId) -> Result<()> { let mut conn = self.db.acquire().await?; let current_order = match self.current_order_id { @@ -332,6 +351,18 @@ where }) .await?; + if !self.connected_takers.insert(taker_id) { + tracing::warn!("Taker already connected: {:?}", &taker_id); + } + self.update_connected_takers().await?; + Ok(()) + } + + async fn handle_taker_disconnected(&mut self, taker_id: TakerId) -> Result<()> { + if !self.connected_takers.remove(&taker_id) { + tracing::warn!("Removed unknown taker: {:?}", &taker_id); + } + self.update_connected_takers().await?; Ok(()) } @@ -998,12 +1029,23 @@ where } #[async_trait] -impl Handler for Actor +impl Handler for Actor +where + T: xtra::Handler, +{ + async fn handle(&mut self, msg: TakerConnected, _ctx: &mut Context) { + log_error!(self.handle_taker_connected(msg.id)); + } +} + +#[async_trait] +impl Handler + for Actor where T: xtra::Handler, { - async fn handle(&mut self, msg: NewTakerOnline, _ctx: &mut Context) { - log_error!(self.handle_new_taker_online(msg.id)); + async fn handle(&mut self, msg: TakerDisconnected, _ctx: &mut Context) { + log_error!(self.handle_taker_disconnected(msg.id)); } } @@ -1116,7 +1158,11 @@ impl Message for NewOrder { type Result = Result<()>; } -impl Message for NewTakerOnline { +impl Message for TakerConnected { + type Result = (); +} + +impl Message for TakerDisconnected { type Result = (); } diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index 4813681..8db4e07 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -1,4 +1,4 @@ -use crate::maker_cfd::{FromTaker, NewTakerOnline}; +use crate::maker_cfd::{FromTaker, TakerConnected, TakerDisconnected}; use crate::model::cfd::Order; use crate::model::TakerId; use crate::noise::TransportStateExt; @@ -36,7 +36,8 @@ pub enum ListenerMessage { pub struct Actor { write_connections: HashMap>>, - new_taker_channel: Box>, + taker_connected_channel: Box>, + taker_disconnected_channel: Box>, taker_msg_channel: Box>, noise_priv_key: x25519_dalek::StaticSecret, heartbeat_interval: Duration, @@ -45,14 +46,16 @@ pub struct Actor { impl Actor { pub fn new( - new_taker_channel: Box>, + taker_connected_channel: Box>, + taker_disconnected_channel: Box>, taker_msg_channel: Box>, noise_priv_key: x25519_dalek::StaticSecret, heartbeat_interval: Duration, ) -> Self { Self { write_connections: HashMap::new(), - new_taker_channel: new_taker_channel.clone_channel(), + taker_connected_channel: taker_connected_channel.clone_channel(), + taker_disconnected_channel: taker_disconnected_channel.clone_channel(), taker_msg_channel: taker_msg_channel.clone_channel(), noise_priv_key, heartbeat_interval, @@ -74,7 +77,12 @@ impl Actor { if conn.send(msg).await.is_err() { tracing::info!(%taker_id, "Failed to send {} to taker, removing connection", msg_str); - self.write_connections.remove(taker_id); + if self.write_connections.remove(taker_id).is_some() { + let _ = self + .taker_disconnected_channel + .send(maker_cfd::TakerDisconnected { id: *taker_id }) + .await; + } } Ok(()) @@ -108,6 +116,7 @@ impl Actor { // only allow outgoing messages while we are successfully reading incoming ones let heartbeat_interval = self.heartbeat_interval; + let taker_disconnected_channel = self.taker_disconnected_channel.clone_channel(); self.tasks.add(async move { let mut actor = send_to_socket::Actor::new(write, transport_state.clone()); @@ -121,6 +130,9 @@ impl Actor { .await; tracing::error!("Closing connection to taker {}", taker_id); + let _ = taker_disconnected_channel + .send(maker_cfd::TakerDisconnected { id: taker_id }) + .await; actor.shutdown().await; }); @@ -129,8 +141,8 @@ impl Actor { .insert(taker_id, out_msg_actor_address); let _ = self - .new_taker_channel - .send(maker_cfd::NewTakerOnline { id: taker_id }) + .taker_connected_channel + .send(maker_cfd::TakerConnected { id: taker_id }) .await; Ok(()) diff --git a/daemon/src/projection.rs b/daemon/src/projection.rs index 1af3eac..5d69af8 100644 --- a/daemon/src/projection.rs +++ b/daemon/src/projection.rs @@ -1,4 +1,5 @@ use crate::bitmex_price_feed::Quote; +use crate::model::TakerId; use crate::{Cfd, Order, UpdateCfdProposals}; use tokio::sync::watch; use xtra_productivity::xtra_productivity; @@ -8,6 +9,9 @@ pub struct Actor { tx_order: watch::Sender>, tx_quote: watch::Sender, tx_settlements: watch::Sender, + // TODO: Use this channel to communicate maker status as well with generic + // ID of connected counterparties + tx_connected_takers: watch::Sender>, } impl Actor { @@ -16,12 +20,14 @@ impl Actor { tx_order: watch::Sender>, tx_quote: watch::Sender, tx_settlements: watch::Sender, + tx_connected_takers: watch::Sender>, ) -> Self { Self { tx_cfds, tx_order, tx_quote, tx_settlements, + tx_connected_takers, } } } @@ -42,6 +48,9 @@ impl Actor { fn handle(&mut self, msg: Update) { let _ = self.tx_settlements.send(msg.0); } + fn handle(&mut self, msg: Update>) { + let _ = self.tx_connected_takers.send(msg.0); + } } impl xtra::Actor for Actor {} diff --git a/daemon/src/routes_maker.rs b/daemon/src/routes_maker.rs index 1126f5d..4a38318 100644 --- a/daemon/src/routes_maker.rs +++ b/daemon/src/routes_maker.rs @@ -2,7 +2,7 @@ use anyhow::Result; use bdk::bitcoin::Network; use daemon::auth::Authenticated; use daemon::model::cfd::{Cfd, Order, OrderId, Role, UpdateCfdProposals}; -use daemon::model::{Price, Usd, WalletInfo}; +use daemon::model::{Price, TakerId, Usd, WalletInfo}; use daemon::routes::EmbeddedFileExt; use daemon::to_sse_event::{CfdAction, CfdsWithAuxData, ToSseEvent}; use daemon::{bitmex_price_feed, maker_cfd, wallet}; @@ -20,6 +20,7 @@ use tokio::select; use tokio::sync::watch; use xtra::prelude::*; +#[allow(clippy::too_many_arguments)] #[rocket::get("/feed")] pub async fn maker_feed( rx_cfds: &State>>, @@ -27,6 +28,7 @@ pub async fn maker_feed( rx_wallet: &State>, rx_quote: &State>, rx_settlements: &State>, + rx_connected_takers: &State>>, network: &State, _auth: Authenticated, ) -> EventStream![] { @@ -35,6 +37,7 @@ pub async fn maker_feed( let mut rx_wallet = rx_wallet.inner().clone(); let mut rx_quote = rx_quote.inner().clone(); let mut rx_settlements = rx_settlements.inner().clone(); + let mut rx_connected_takers = rx_connected_takers.inner().clone(); let network = *network.inner(); EventStream! { @@ -54,6 +57,9 @@ pub async fn maker_feed( Role::Maker, network ).to_sse_event(); + let takers = rx_connected_takers.borrow().clone(); + yield takers.to_sse_event(); + loop{ select! { Ok(()) = rx_wallet.changed() => { @@ -64,6 +70,10 @@ pub async fn maker_feed( let order = rx_order.borrow().clone(); yield order.to_sse_event(); } + Ok(()) = rx_connected_takers.changed() => { + let takers = rx_connected_takers.borrow().clone(); + yield takers.to_sse_event(); + } Ok(()) = rx_cfds.changed() => { yield CfdsWithAuxData::new( &rx_cfds, diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 28c424e..b070b84 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -7,7 +7,7 @@ use daemon::bitmex_price_feed::Quote; use daemon::connection::connect; use daemon::db::load_all_cfds; use daemon::model::cfd::{Order, UpdateCfdProposals}; -use daemon::model::WalletInfo; +use daemon::model::{TakerId, WalletInfo}; use daemon::seed::Seed; use daemon::tokio_ext::FutureExt; use daemon::{ @@ -264,11 +264,18 @@ async fn main() -> Result<()> { channel::(HashMap::new()); let (quote_sender, quote_receiver) = channel::(init_quote); + // TODO: Use this channel to convey maker status. + // For now, the receiver is dropped instead of managed by Rocket to + // highlight that we're not using it + let (connected_takers_feed_sender, _connected_takers_feed_receiver) = + watch::channel::>(vec![]); + tasks.add(projection_context.run(projection::Actor::new( cfd_feed_sender, order_feed_sender, quote_sender, update_cfd_feed_sender, + connected_takers_feed_sender, ))); let possible_addresses = resolve_maker_addresses(&opts.maker).await?; diff --git a/daemon/src/to_sse_event.rs b/daemon/src/to_sse_event.rs index 85c20ba..b3408d0 100644 --- a/daemon/src/to_sse_event.rs +++ b/daemon/src/to_sse_event.rs @@ -2,7 +2,7 @@ use crate::connection::ConnectionStatus; use crate::model::cfd::{ Dlc, OrderId, Payout, Role, SettlementKind, UpdateCfdProposal, UpdateCfdProposals, }; -use crate::model::{Leverage, Position, Timestamp, TradingPair}; +use crate::model::{Leverage, Position, TakerId, Timestamp, TradingPair}; use crate::{bitmex_price_feed, model}; use bdk::bitcoin::{Amount, Network, SignedAmount, Txid}; use rocket::request::FromParam; @@ -327,6 +327,13 @@ impl ToSseEvent for CfdsWithAuxData { } } +impl ToSseEvent for Vec { + fn to_sse_event(&self) -> Event { + let takers = self.iter().map(|x| x.to_string()).collect::>(); + Event::json(&takers).event("takers") + } +} + impl ToSseEvent for Option { fn to_sse_event(&self) -> Event { let order = self.clone().map(|order| CfdOrder { diff --git a/daemon/tests/happy_path.rs b/daemon/tests/happy_path.rs index d699577..a3bca96 100644 --- a/daemon/tests/happy_path.rs +++ b/daemon/tests/happy_path.rs @@ -5,7 +5,7 @@ use crate::harness::{ }; use daemon::connection::ConnectionStatus; use daemon::model::cfd::CfdState; -use daemon::model::Usd; +use daemon::model::{TakerId, Usd}; use maia::secp256k1_zkp::schnorrsig; use rust_decimal_macros::dec; use tokio::time::sleep; @@ -154,3 +154,21 @@ async fn taker_notices_lack_of_maker() { next(taker.maker_status_feed()).await.unwrap(), ); } + +#[tokio::test] +async fn maker_notices_lack_of_taker() { + let _guard = init_tracing(); + + let (mut maker, taker) = start_both().await; + assert_eq!( + vec![taker.id], + next(maker.connected_takers_feed()).await.unwrap() + ); + + std::mem::drop(taker); + + assert_eq!( + Vec::::new(), + next(maker.connected_takers_feed()).await.unwrap() + ); +} diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index f383a0d..857bc82 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -6,7 +6,7 @@ use daemon::bitmex_price_feed::Quote; use daemon::connection::{connect, ConnectionStatus}; use daemon::maker_cfd::CfdAction; use daemon::model::cfd::{Cfd, Order, Origin, UpdateCfdProposals}; -use daemon::model::{Price, Timestamp, Usd}; +use daemon::model::{Price, TakerId, Timestamp, Usd}; use daemon::seed::Seed; use daemon::{ db, maker_cfd, maker_inc_connections, projection, taker_cfd, MakerActorSystem, Tasks, @@ -20,7 +20,6 @@ use std::task::Poll; use std::time::Duration; use tokio::net::TcpListener; use tokio::sync::watch; -use tokio::sync::watch::channel; use tracing::subscriber::DefaultGuard; use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::util::SubscriberInitExt; @@ -59,6 +58,7 @@ pub struct Maker { pub identity_pk: x25519_dalek::PublicKey, cfd_feed_receiver: watch::Receiver>, order_feed_receiver: watch::Receiver>, + connected_takers_feed_receiver: watch::Receiver>, _tasks: Tasks, } @@ -71,6 +71,10 @@ impl Maker { &mut self.order_feed_receiver } + pub fn connected_takers_feed(&mut self) -> &mut watch::Receiver> { + &mut self.connected_takers_feed_receiver + } + pub async fn start( oracle_pk: schnorrsig::PublicKey, seed: Seed, @@ -100,10 +104,11 @@ impl Maker { oracle_pk, |_, _| oracle, |_, _| async { Ok(monitor) }, - |channel0, channel1| { + |channel0, channel1, channel2| { maker_inc_connections::Actor::new( channel0, channel1, + channel2, identity_sk, HEARTBEAT_INTERVAL_FOR_TEST, ) @@ -121,17 +126,20 @@ impl Maker { ask: Price::new(dec!(10000)).unwrap(), }; - let (cfd_feed_sender, cfd_feed_receiver) = channel(vec![]); - let (order_feed_sender, order_feed_receiver) = channel::>(None); + let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(vec![]); + let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); let (update_cfd_feed_sender, _update_cfd_feed_receiver) = - channel::(HashMap::new()); - let (quote_sender, _) = channel::(dummy_quote); + watch::channel::(HashMap::new()); + let (quote_sender, _) = watch::channel::(dummy_quote); + let (connected_takers_feed_sender, connected_takers_feed_receiver) = + watch::channel::>(vec![]); tasks.add(projection_context.run(projection::Actor::new( cfd_feed_sender, order_feed_sender, quote_sender, update_cfd_feed_sender, + connected_takers_feed_sender, ))); let address = listener.local_addr().unwrap(); @@ -157,6 +165,7 @@ impl Maker { _tasks: tasks, cfd_feed_receiver, order_feed_receiver, + connected_takers_feed_receiver, } } @@ -192,6 +201,7 @@ impl Maker { /// Taker Test Setup pub struct Taker { + pub id: TakerId, pub system: daemon::TakerActorSystem, pub mocks: mocks::Mocks, cfd_feed_receiver: watch::Receiver>, @@ -219,7 +229,7 @@ impl Taker { ) -> Self { let seed = Seed::default(); - let (_, identity_sk) = seed.derive_identity(); + let (identity_pk, identity_sk) = seed.derive_identity(); let db = in_memory_db().await; @@ -255,16 +265,20 @@ impl Taker { ask: Price::new(dec!(10000)).unwrap(), }; - let (cfd_feed_sender, cfd_feed_receiver) = channel(vec![]); - let (order_feed_sender, order_feed_receiver) = channel::>(None); - let (update_cfd_feed_sender, _) = channel::(HashMap::new()); - let (quote_sender, _) = channel::(dummy_quote); + let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(vec![]); + let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); + let (update_cfd_feed_sender, _) = watch::channel::(HashMap::new()); + let (quote_sender, _) = watch::channel::(dummy_quote); + + let (connected_takers_feed_sender, _connected_takers_feed_receiver) = + watch::channel::>(vec![]); tasks.add(projection_context.run(projection::Actor::new( cfd_feed_sender, order_feed_sender, quote_sender, update_cfd_feed_sender, + connected_takers_feed_sender, ))); tasks.add(connect( @@ -275,6 +289,7 @@ impl Taker { )); Self { + id: TakerId::new(identity_pk), system: taker, mocks, _tasks: tasks, diff --git a/maker-frontend/src/MakerApp.tsx b/maker-frontend/src/MakerApp.tsx index 768562d..4cbb8a1 100644 --- a/maker-frontend/src/MakerApp.tsx +++ b/maker-frontend/src/MakerApp.tsx @@ -20,6 +20,7 @@ import React, { useEffect, useState } from "react"; import { useAsync } from "react-async"; import { useEventSource } from "react-sse-hooks"; import { CfdTable } from "./components/cfdtables/CfdTable"; +import ConnectedTakers, { TakerId } from "./components/ConnectedTakers"; import CurrencyInputField from "./components/CurrencyInputField"; import CurrentPrice from "./components/CurrentPrice"; import createErrorToast from "./components/ErrorToast"; @@ -41,6 +42,8 @@ export default function App() { const order = useLatestEvent(source, "order", intoOrder); const walletInfo = useLatestEvent(source, "wallet"); const priceInfo = useLatestEvent(source, "quote"); + const takersOrUndefined = useLatestEvent(source, "takers"); + let takers = takersOrUndefined || []; const toast = useToast(); @@ -151,7 +154,10 @@ export default function App() { - {order && } + + + {order && } + diff --git a/maker-frontend/src/components/ConnectedTakers.tsx b/maker-frontend/src/components/ConnectedTakers.tsx new file mode 100644 index 0000000..8c81fa1 --- /dev/null +++ b/maker-frontend/src/components/ConnectedTakers.tsx @@ -0,0 +1,25 @@ +import { Heading, ListItem, UnorderedList, VStack } from "@chakra-ui/react"; +import React from "react"; + +export interface TakerId { + id: string; +} + +interface Props { + takers: TakerId[]; +} + +const ConnectedTakers = ({ takers }: Props) => { + return ( + + {"Connected takers: " + takers.length} + + {takers.map((taker) => { + return ({taker}); + })} + + + ); +}; + +export default ConnectedTakers;