From bf36f290ddb3373a038e42baf0b5e3045b2948b5 Mon Sep 17 00:00:00 2001 From: bonomat Date: Mon, 22 Nov 2021 13:48:38 +1100 Subject: [PATCH] Introduce projection actor The projection actor is responsible in preparing data for the HTTP API and consequently for the UI. While this is commit provides only the foundation, in the long run we can: - Reduce the logic happening in the rocket layer. `ToSseEvent` can likely go away. - Reduce the complexity for other actors in what needs to be updated when. All they should care about is sending updates to the projection actor. (mostly done) - Improve test coverage. With a dedicated actor that does the projection, we should be able to write assertions in our integration tests that are closer to the UI. --- daemon/src/bitmex_price_feed.rs | 12 +++-- daemon/src/cfd_actors.rs | 35 +++++++------ daemon/src/db.rs | 4 +- daemon/src/lib.rs | 34 ++---------- daemon/src/maker.rs | 36 ++++++++++--- daemon/src/maker_cfd.rs | 91 ++++++++++++++++----------------- daemon/src/projection.rs | 47 +++++++++++++++++ daemon/src/taker.rs | 36 +++++++++---- daemon/src/taker_cfd.rs | 78 ++++++++++++---------------- daemon/tests/harness/mod.rs | 70 ++++++++++++++++++++++--- 10 files changed, 274 insertions(+), 169 deletions(-) create mode 100644 daemon/src/projection.rs diff --git a/daemon/src/bitmex_price_feed.rs b/daemon/src/bitmex_price_feed.rs index d0adce6..2141622 100644 --- a/daemon/src/bitmex_price_feed.rs +++ b/daemon/src/bitmex_price_feed.rs @@ -1,15 +1,18 @@ use crate::model::{Price, Timestamp}; +use crate::projection; use anyhow::Result; use futures::{StreamExt, TryStreamExt}; use rust_decimal::Decimal; use std::convert::TryFrom; use std::future::Future; -use tokio::sync::watch; use tokio_tungstenite::tungstenite; +use xtra::prelude::MessageChannel; /// Connects to the BitMex price feed, returning the polling task and a watch channel that will /// always hold the last quote. -pub async fn new() -> Result<(impl Future, watch::Receiver)> { +pub async fn new( + msg_channel: impl MessageChannel>, +) -> Result<(impl Future, Quote)> { let (connection, _) = tokio_tungstenite::connect_async( "wss://www.bitmex.com/realtime?subscribe=quoteBin1m:XBTUSD", ) @@ -24,11 +27,10 @@ pub async fn new() -> Result<(impl Future, watch::Receiver)> tracing::info!("Connected to BitMex realtime API"); let first_quote = quotes.select_next_some().await?; - let (sender, receiver) = watch::channel(first_quote); let task = async move { while let Ok(Some(quote)) = quotes.try_next().await { - if sender.send(quote).is_err() { + if msg_channel.send(projection::Update(quote)).await.is_err() { break; // If the receiver dies, we can exit the loop. } } @@ -36,7 +38,7 @@ pub async fn new() -> Result<(impl Future, watch::Receiver)> tracing::warn!("Failed to read quote from websocket"); }; - Ok((task, receiver)) + Ok((task, first_quote)) } #[derive(Clone, Debug)] diff --git a/daemon/src/cfd_actors.rs b/daemon/src/cfd_actors.rs index f778f6d..f4c1306 100644 --- a/daemon/src/cfd_actors.rs +++ b/daemon/src/cfd_actors.rs @@ -1,27 +1,30 @@ use crate::model::cfd::{Attestation, Cfd, CfdState, OrderId}; -use crate::{db, monitor, oracle, try_continue, wallet}; +use crate::{db, monitor, oracle, projection, try_continue, wallet}; use anyhow::{bail, Context, Result}; use sqlx::pool::PoolConnection; use sqlx::Sqlite; -use tokio::sync::watch; pub async fn insert_cfd_and_send_to_feed( cfd: &Cfd, conn: &mut PoolConnection, - update_sender: &watch::Sender>, + projection_address: &xtra::Address, ) -> Result<()> { db::insert_cfd(cfd, conn).await?; - update_sender.send(db::load_all_cfds(conn).await?)?; + projection_address + .send(projection::Update(db::load_all_cfds(conn).await?)) + .await?; Ok(()) } pub async fn append_cfd_state( cfd: &Cfd, conn: &mut PoolConnection, - update_sender: &watch::Sender>, + projection_address: &xtra::Address, ) -> Result<()> { db::append_cfd_state(cfd, conn).await?; - update_sender.send(db::load_all_cfds(conn).await?)?; + projection_address + .send(projection::Update(db::load_all_cfds(conn).await?)) + .await?; Ok(()) } @@ -29,7 +32,7 @@ pub async fn try_cet_publication( cfd: &mut Cfd, conn: &mut PoolConnection, wallet: &xtra::Address, - update_sender: &watch::Sender>, + projection_address: &xtra::Address, ) -> Result<()> where W: xtra::Handler, @@ -46,7 +49,7 @@ where bail!("If we can get the CET we should be able to transition") } - append_cfd_state(cfd, conn, update_sender).await?; + append_cfd_state(cfd, conn, projection_address).await?; } Err(not_ready_yet) => { tracing::debug!("{:#}", not_ready_yet); @@ -61,7 +64,7 @@ pub async fn handle_monitoring_event( event: monitor::Event, conn: &mut PoolConnection, wallet: &xtra::Address, - update_sender: &watch::Sender>, + projection_address: &xtra::Address, ) -> Result<()> where W: xtra::Handler, @@ -76,10 +79,10 @@ where return Ok(()); } - append_cfd_state(&cfd, conn, update_sender).await?; + append_cfd_state(&cfd, conn, projection_address).await?; if let CfdState::OpenCommitted { .. } = cfd.state { - try_cet_publication(&mut cfd, conn, wallet, update_sender).await?; + try_cet_publication(&mut cfd, conn, wallet, projection_address).await?; } else if let CfdState::PendingRefund { .. } = cfd.state { let signed_refund_tx = cfd.refund_tx()?; let txid = wallet @@ -98,7 +101,7 @@ pub async fn handle_commit( order_id: OrderId, conn: &mut PoolConnection, wallet: &xtra::Address, - update_sender: &watch::Sender>, + projection_address: &xtra::Address, ) -> Result<()> where W: xtra::Handler, @@ -118,7 +121,7 @@ where bail!("If we can get the commit tx we should be able to transition") } - append_cfd_state(&cfd, conn, update_sender).await?; + append_cfd_state(&cfd, conn, projection_address).await?; tracing::info!("Commit transaction published on chain: {}", txid); Ok(()) @@ -128,7 +131,7 @@ pub async fn handle_oracle_attestation( attestation: oracle::Attestation, conn: &mut PoolConnection, wallet: &xtra::Address, - update_sender: &watch::Sender>, + projection_address: &xtra::Address, ) -> Result<()> where W: xtra::Handler, @@ -161,8 +164,8 @@ where continue; } - try_continue!(append_cfd_state(cfd, conn, update_sender).await); - try_continue!(try_cet_publication(cfd, conn, wallet, update_sender) + try_continue!(append_cfd_state(cfd, conn, projection_address).await); + try_continue!(try_cet_publication(cfd, conn, wallet, projection_address) .await .context("Error when trying to publish CET")); } diff --git a/daemon/src/db.rs b/daemon/src/db.rs index 1475679..429793b 100644 --- a/daemon/src/db.rs +++ b/daemon/src/db.rs @@ -529,16 +529,14 @@ pub async fn load_cfds_by_oracle_event_id( #[cfg(test)] mod tests { - use crate::cfd_actors; use pretty_assertions::assert_eq; use rand::Rng; use rust_decimal_macros::dec; use sqlx::SqlitePool; use time::macros::datetime; use time::OffsetDateTime; - use tokio::sync::watch; - use crate::db::{self, insert_order}; + use crate::db::insert_order; use crate::model::cfd::{Cfd, CfdState, Order, Origin}; use crate::model::{Price, Usd}; diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 164251d..5a5cc38 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -10,7 +10,6 @@ use connection::ConnectionStatus; use futures::future::RemoteHandle; use maia::secp256k1_zkp::schnorrsig; use sqlx::SqlitePool; -use std::collections::HashMap; use std::future::Future; use std::time::Duration; use tokio::sync::watch; @@ -36,6 +35,7 @@ mod noise; pub mod olivia; pub mod oracle; pub mod payout_curve; +pub mod projection; pub mod routes; pub mod seed; pub mod send_to_socket; @@ -75,9 +75,6 @@ impl Default for Tasks { pub struct MakerActorSystem { pub cfd_actor_addr: Address>, - pub cfd_feed_receiver: watch::Receiver>, - pub order_feed_receiver: watch::Receiver>, - pub update_cfd_feed_receiver: watch::Receiver, pub inc_conn_addr: Address, pub tasks: Tasks, } @@ -111,6 +108,7 @@ where ) -> T, settlement_time_interval_hours: time::Duration, n_payouts: usize, + projection_actor: Address, ) -> Result where F: Future>, @@ -119,11 +117,6 @@ where let cfds = load_all_cfds(&mut conn).await?; - let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); - let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); - let (update_cfd_feed_sender, update_cfd_feed_receiver) = - watch::channel::(HashMap::new()); - let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None); let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None); let (inc_conn_addr, inc_conn_ctx) = xtra::Context::new(None); @@ -135,9 +128,7 @@ where wallet_addr, settlement_time_interval_hours, oracle_pk, - cfd_feed_sender, - order_feed_sender, - update_cfd_feed_sender, + projection_actor, inc_conn_addr.clone(), monitor_addr.clone(), oracle_addr.clone(), @@ -182,9 +173,6 @@ where Ok(Self { cfd_actor_addr, - cfd_feed_receiver, - order_feed_receiver, - update_cfd_feed_receiver, inc_conn_addr, tasks, }) @@ -194,9 +182,6 @@ where pub struct TakerActorSystem { pub cfd_actor_addr: Address>, pub connection_actor_addr: Address, - pub cfd_feed_receiver: watch::Receiver>, - pub order_feed_receiver: watch::Receiver>, - pub update_cfd_feed_receiver: watch::Receiver, pub maker_online_status_feed_receiver: watch::Receiver, pub tasks: Tasks, } @@ -225,6 +210,7 @@ where monitor_constructor: impl FnOnce(Box>, Vec) -> F, n_payouts: usize, maker_heartbeat_interval: Duration, + projection_actor: Address, ) -> Result where F: Future>, @@ -233,11 +219,6 @@ where let cfds = load_all_cfds(&mut conn).await?; - let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); - let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); - let (update_cfd_feed_sender, update_cfd_feed_receiver) = - watch::channel::(HashMap::new()); - let (maker_online_status_feed_sender, maker_online_status_feed_receiver) = watch::channel(ConnectionStatus::Offline); @@ -251,9 +232,7 @@ where db, wallet_addr, oracle_pk, - cfd_feed_sender, - order_feed_sender, - update_cfd_feed_sender, + projection_actor, Box::new(connection_actor_addr.clone()), monitor_addr.clone(), oracle_addr, @@ -301,9 +280,6 @@ where Ok(Self { cfd_actor_addr, connection_actor_addr, - cfd_feed_receiver, - order_feed_receiver, - update_cfd_feed_receiver, maker_online_status_feed_receiver, tasks, }) diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 6a32231..477edca 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -4,20 +4,25 @@ use bdk::bitcoin::Amount; use bdk::{bitcoin, FeeRate}; use clap::{Parser, Subcommand}; use daemon::auth::{self, MAKER_USERNAME}; +use daemon::bitmex_price_feed::Quote; +use daemon::db::load_all_cfds; +use daemon::model::cfd::{Order, UpdateCfdProposals}; use daemon::model::WalletInfo; use daemon::seed::Seed; use daemon::tokio_ext::FutureExt; use daemon::{ bitmex_price_feed, db, housekeeping, logger, maker_cfd, maker_inc_connections, monitor, oracle, - wallet, wallet_sync, MakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, + projection, wallet, wallet_sync, MakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, }; use sqlx::sqlite::SqliteConnectOptions; use sqlx::SqlitePool; +use std::collections::HashMap; use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; use std::task::Poll; use tokio::sync::watch; +use tokio::sync::watch::channel; use tracing_subscriber::filter::LevelFilter; use xtra::prelude::*; use xtra::Actor; @@ -225,9 +230,6 @@ async fn main() -> Result<()> { let mut tasks = Tasks::default(); - let (task, quote_updates) = bitmex_price_feed::new().await?; - tasks.add(task); - let db = SqlitePool::connect_with( SqliteConnectOptions::new() .create_if_missing(true) @@ -247,11 +249,11 @@ async fn main() -> Result<()> { let settlement_time_interval_hours = time::Duration::hours(opts.settlement_time_interval_hours as i64); + + let (projection_actor, projection_context) = xtra::Context::new(None); + let MakerActorSystem { cfd_actor_addr, - cfd_feed_receiver, - order_feed_receiver, - update_cfd_feed_receiver, inc_conn_addr: incoming_connection_addr, tasks: _tasks, } = MakerActorSystem::new( @@ -270,9 +272,27 @@ async fn main() -> Result<()> { }, time::Duration::hours(opts.settlement_time_interval_hours as i64), N_PAYOUTS, + projection_actor.clone(), ) .await?; + let (task, init_quote) = bitmex_price_feed::new(projection_actor).await?; + tasks.add(task); + + let cfds = load_all_cfds(&mut conn).await?; + let (cfd_feed_sender, cfd_feed_receiver) = channel(cfds.clone()); + let (order_feed_sender, order_feed_receiver) = channel::>(None); + let (update_cfd_feed_sender, update_cfd_feed_receiver) = + channel::(HashMap::new()); + let (quote_sender, quote_receiver) = channel::(init_quote); + + tasks.add(projection_context.run(projection::Actor::new( + cfd_feed_sender, + order_feed_sender, + quote_sender, + update_cfd_feed_sender, + ))); + let listener_stream = futures::stream::poll_fn(move |ctx| { let message = match futures::ready!(listener.poll_accept(ctx)) { Ok((stream, address)) => { @@ -299,7 +319,7 @@ async fn main() -> Result<()> { .manage(cfd_feed_receiver) .manage(wallet_feed_receiver) .manage(auth_password) - .manage(quote_updates) + .manage(quote_receiver) .manage(bitcoin_network) .mount( "/api", diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index a47356a..90666ad 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -2,12 +2,15 @@ use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_send_to_feed}; use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; use crate::model::cfd::{ Cfd, CfdState, CfdStateCommon, CollaborativeSettlement, Dlc, Order, OrderId, Origin, Role, - RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal, UpdateCfdProposals, + RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal, }; use crate::model::{Price, TakerId, Timestamp, Usd}; use crate::monitor::MonitorParams; use crate::tokio_ext::FutureExt; -use crate::{log_error, maker_inc_connections, monitor, oracle, setup_contract, wallet, wire}; +use crate::{ + log_error, maker_inc_connections, monitor, oracle, projection, setup_contract, wallet, wire, + UpdateCfdProposals, +}; use anyhow::{Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; @@ -19,7 +22,6 @@ use sqlx::pool::PoolConnection; use sqlx::Sqlite; use std::collections::HashMap; use time::Duration; -use tokio::sync::watch; use xtra::prelude::*; pub enum CfdAction { @@ -62,9 +64,7 @@ pub struct Actor { wallet: Address, settlement_time_interval_hours: Duration, oracle_pk: schnorrsig::PublicKey, - cfd_feed_actor_inbox: watch::Sender>, - order_feed_sender: watch::Sender>, - update_cfd_feed_sender: watch::Sender, + projection_actor: Address, takers: Address, current_order_id: Option, monitor_actor: Address, @@ -102,9 +102,7 @@ impl Actor { wallet: Address, settlement_time_interval_hours: Duration, oracle_pk: schnorrsig::PublicKey, - cfd_feed_actor_inbox: watch::Sender>, - order_feed_sender: watch::Sender>, - update_cfd_feed_sender: watch::Sender, + projection_actor: Address, takers: Address, monitor_actor: Address, oracle_actor: Address, @@ -115,9 +113,7 @@ impl Actor { wallet, settlement_time_interval_hours, oracle_pk, - cfd_feed_actor_inbox, - order_feed_sender, - update_cfd_feed_sender, + projection_actor, takers, current_order_id: None, monitor_actor, @@ -165,7 +161,7 @@ impl Actor { taker_id, ), ); - self.send_pending_proposals()?; + self.send_pending_proposals().await?; Ok(()) } @@ -189,7 +185,7 @@ impl Actor { taker_id, ), ); - self.send_pending_proposals()?; + self.send_pending_proposals().await?; Ok(()) } @@ -235,21 +231,25 @@ impl Actor { /// Send pending proposals for the purposes of UI updates. /// Filters out the TakerIds, as they are an implementation detail inside of /// the actor - fn send_pending_proposals(&self) -> Result<()> { - Ok(self.update_cfd_feed_sender.send( - self.current_pending_proposals - .iter() - .map(|(order_id, (update_cfd, _))| (*order_id, (update_cfd.clone()))) - .collect(), - )?) + async fn send_pending_proposals(&self) -> Result<()> { + let pending_proposal = self + .current_pending_proposals + .iter() + .map(|(order_id, (update_cfd, _))| (*order_id, (update_cfd.clone()))) + .collect::(); + let _ = self + .projection_actor + .send(projection::Update(pending_proposal)) + .await?; + Ok(()) } /// Removes a proposal and updates the update cfd proposals' feed - fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> { + async fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> { if self.current_pending_proposals.remove(order_id).is_none() { anyhow::bail!("Could not find proposal with order id: {}", &order_id) } - self.send_pending_proposals()?; + self.send_pending_proposals().await?; Ok(()) } @@ -283,26 +283,16 @@ where { async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { let mut conn = self.db.acquire().await?; - cfd_actors::handle_commit( - order_id, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; + cfd_actors::handle_commit(order_id, &mut conn, &self.wallet, &self.projection_actor) + .await?; Ok(()) } async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { let mut conn = self.db.acquire().await?; - cfd_actors::handle_monitoring_event( - event, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; + cfd_actors::handle_monitoring_event(event, &mut conn, &self.wallet, &self.projection_actor) + .await?; Ok(()) } @@ -312,7 +302,7 @@ where attestation, &mut conn, &self.wallet, - &self.cfd_feed_actor_inbox, + &self.projection_actor, ) .await?; Ok(()) @@ -361,11 +351,13 @@ where self.current_agreed_proposals .insert(order_id, self.get_settlement_proposal(order_id)?); self.remove_pending_proposal(&order_id) + .await .context("accepted settlement")?; } Err(e) => { tracing::warn!("Failed to notify taker of accepted settlement: {}", e); self.remove_pending_proposal(&order_id) + .await .context("accepted settlement")?; } } @@ -381,6 +373,7 @@ where // clean-up state ahead of sending to ensure consistency in case we fail to deliver the // message self.remove_pending_proposal(&order_id) + .await .context("rejected settlement")?; self.takers @@ -413,6 +406,7 @@ where // clean-up state ahead of sending to ensure consistency in case we fail to deliver the // message self.remove_pending_proposal(&order_id) + .await .context("rejected roll_over")?; self.takers @@ -472,7 +466,7 @@ where self.takers .send(maker_inc_connections::BroadcastOrder(None)) .await?; - self.order_feed_sender.send(None)?; + self.projection_actor.send(projection::Update(None)).await?; // 3. Insert CFD in DB let cfd = Cfd::new( @@ -485,7 +479,7 @@ where taker_id, }, ); - insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.projection_actor).await?; // 4. check if order has acceptable amounts and if not reject the cfd // Since rejection is tied to the cfd state at the moment we can only do this after creating @@ -533,7 +527,7 @@ where ) -> Result<()> { cfd.state = CfdState::rejected(); - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; self.takers .send(maker_inc_connections::TakerMessage { @@ -598,7 +592,7 @@ where // 4. Insert that we are in contract setup and refresh our own feed cfd.state = CfdState::contract_setup(); - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; // 5. Spawn away the contract setup let (sender, receiver) = mpsc::unbounded(); @@ -665,7 +659,7 @@ where info: e.to_string(), }; - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; return Err(e); } @@ -677,7 +671,7 @@ where attestation: None, }; - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; let txid = self .wallet @@ -802,6 +796,7 @@ where }; self.remove_pending_proposal(&order_id) + .await .context("accepted roll_over")?; Ok(()) } @@ -827,7 +822,7 @@ where attestation: None, collaborative_close: None, }; - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; self.monitor_actor .send(monitor::StartMonitoring { @@ -887,7 +882,7 @@ where own_script_pubkey.clone(), proposal.price, )?)?; - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; let spend_tx = dlc.finalize_spend_transaction((tx, sig_maker), sig_taker)?; @@ -976,7 +971,9 @@ where self.current_order_id.replace(order.id); // 3. Notify UI via feed - self.order_feed_sender.send(Some(order.clone()))?; + self.projection_actor + .send(projection::Update(Some(order.clone()))) + .await?; // 4. Inform connected takers self.takers diff --git a/daemon/src/projection.rs b/daemon/src/projection.rs new file mode 100644 index 0000000..1af3eac --- /dev/null +++ b/daemon/src/projection.rs @@ -0,0 +1,47 @@ +use crate::bitmex_price_feed::Quote; +use crate::{Cfd, Order, UpdateCfdProposals}; +use tokio::sync::watch; +use xtra_productivity::xtra_productivity; + +pub struct Actor { + tx_cfds: watch::Sender>, + tx_order: watch::Sender>, + tx_quote: watch::Sender, + tx_settlements: watch::Sender, +} + +impl Actor { + pub fn new( + tx_cfds: watch::Sender>, + tx_order: watch::Sender>, + tx_quote: watch::Sender, + tx_settlements: watch::Sender, + ) -> Self { + Self { + tx_cfds, + tx_order, + tx_quote, + tx_settlements, + } + } +} + +pub struct Update(pub T); + +#[xtra_productivity] +impl Actor { + fn handle(&mut self, msg: Update>) { + let _ = self.tx_cfds.send(msg.0); + } + fn handle(&mut self, msg: Update>) { + let _ = self.tx_order.send(msg.0); + } + fn handle(&mut self, msg: Update) { + let _ = self.tx_quote.send(msg.0); + } + fn handle(&mut self, msg: Update) { + let _ = self.tx_settlements.send(msg.0); + } +} + +impl xtra::Actor for Actor {} diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index dabfd9a..6fa56ec 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -3,16 +3,20 @@ use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::{Address, Amount}; use bdk::{bitcoin, FeeRate}; use clap::{Parser, Subcommand}; +use daemon::bitmex_price_feed::Quote; use daemon::connection::ConnectionStatus; +use daemon::db::load_all_cfds; +use daemon::model::cfd::{Order, UpdateCfdProposals}; use daemon::model::WalletInfo; use daemon::seed::Seed; use daemon::tokio_ext::FutureExt; use daemon::{ - bitmex_price_feed, connection, db, housekeeping, logger, monitor, oracle, taker_cfd, wallet, - wallet_sync, TakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, + bitmex_price_feed, connection, db, housekeeping, logger, monitor, oracle, projection, + taker_cfd, wallet, wallet_sync, TakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, }; use sqlx::sqlite::SqliteConnectOptions; use sqlx::SqlitePool; +use std::collections::HashMap; use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; @@ -20,6 +24,7 @@ use std::time::Duration; use tokio::sync::watch; use tokio::time::sleep; use tracing_subscriber::filter::LevelFilter; +use watch::channel; use xtra::prelude::MessageChannel; use xtra::Actor; @@ -209,9 +214,6 @@ async fn main() -> Result<()> { let mut tasks = Tasks::default(); - let (task, quote_updates) = bitmex_price_feed::new().await?; - tasks.add(task); - let figment = rocket::Config::figment() .merge(("address", opts.http_address.ip())) .merge(("port", opts.http_address.port())); @@ -233,12 +235,11 @@ async fn main() -> Result<()> { housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn).await?; housekeeping::rebroadcast_transactions(&mut conn, &wallet).await?; + let (projection_actor, projection_context) = xtra::Context::new(None); + let TakerActorSystem { cfd_actor_addr, connection_actor_addr, - cfd_feed_receiver, - order_feed_receiver, - update_cfd_feed_receiver, mut maker_online_status_feed_receiver, tasks: _tasks, } = TakerActorSystem::new( @@ -255,8 +256,25 @@ async fn main() -> Result<()> { }, N_PAYOUTS, HEARTBEAT_INTERVAL * 2, + projection_actor.clone(), ) .await?; + let (task, init_quote) = bitmex_price_feed::new(projection_actor).await?; + tasks.add(task); + + let cfds = load_all_cfds(&mut conn).await?; + let (cfd_feed_sender, cfd_feed_receiver) = channel(cfds.clone()); + let (order_feed_sender, order_feed_receiver) = channel::>(None); + let (update_cfd_feed_sender, update_cfd_feed_receiver) = + channel::(HashMap::new()); + let (quote_sender, quote_receiver) = channel::(init_quote); + + tasks.add(projection_context.run(projection::Actor::new( + cfd_feed_sender, + order_feed_sender, + quote_sender, + update_cfd_feed_sender, + ))); connect(connection_actor_addr, opts.maker_id, opts.maker).await?; @@ -271,7 +289,7 @@ async fn main() -> Result<()> { .manage(cfd_action_channel) .manage(cfd_feed_receiver) .manage(wallet_feed_receiver) - .manage(quote_updates) + .manage(quote_receiver) .manage(bitcoin_network) .mount( "/api", diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 28c0738..7f039df 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -8,7 +8,7 @@ use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd}; use crate::monitor::{self, MonitorParams}; use crate::tokio_ext::FutureExt; use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg}; -use crate::{log_error, oracle, setup_contract, wallet, wire}; +use crate::{log_error, oracle, projection, setup_contract, wallet, wire}; use anyhow::{bail, Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; @@ -16,7 +16,6 @@ use futures::channel::mpsc; use futures::future::RemoteHandle; use futures::{future, SinkExt}; use std::collections::HashMap; -use tokio::sync::watch; use xtra::prelude::*; pub struct TakeOffer { @@ -67,9 +66,7 @@ pub struct Actor { db: sqlx::SqlitePool, wallet: Address, oracle_pk: schnorrsig::PublicKey, - cfd_feed_actor_inbox: watch::Sender>, - order_feed_actor_inbox: watch::Sender>, - update_cfd_feed_sender: watch::Sender, + projection_actor: Address, send_to_maker: Box>, monitor_actor: Address, setup_state: SetupState, @@ -90,9 +87,7 @@ where db: sqlx::SqlitePool, wallet: Address, oracle_pk: schnorrsig::PublicKey, - cfd_feed_actor_inbox: watch::Sender>, - order_feed_actor_inbox: watch::Sender>, - update_cfd_feed_sender: watch::Sender, + projection_actor: Address, send_to_maker: Box>, monitor_actor: Address, oracle_actor: Address, @@ -102,9 +97,7 @@ where db, wallet, oracle_pk, - cfd_feed_actor_inbox, - order_feed_actor_inbox, - update_cfd_feed_sender, + projection_actor, send_to_maker, monitor_actor, setup_state: SetupState::None, @@ -117,18 +110,19 @@ where } impl Actor { - fn send_pending_update_proposals(&self) -> Result<()> { + async fn send_pending_update_proposals(&self) -> Result<()> { Ok(self - .update_cfd_feed_sender - .send(self.current_pending_proposals.clone())?) + .projection_actor + .send(projection::Update(self.current_pending_proposals.clone())) + .await?) } /// Removes a proposal and updates the update cfd proposals' feed - fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> { + async fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> { if self.current_pending_proposals.remove(order_id).is_none() { anyhow::bail!("Could not find proposal with order id: {}", &order_id) } - self.send_pending_update_proposals()?; + self.send_pending_update_proposals().await?; Ok(()) } @@ -158,12 +152,12 @@ impl Actor { CfdState::outgoing_order_request(), ); - insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.projection_actor).await?; // Cleanup own order feed, after inserting the cfd. // Due to the 1:1 relationship between order and cfd we can never create another cfd for the // same order id. - self.order_feed_actor_inbox.send(None)?; + self.projection_actor.send(projection::Update(None)).await?; self.send_to_maker .send(wire::TakerToMaker::TakeOrder { order_id, quantity }) @@ -181,13 +175,8 @@ where { async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { let mut conn = self.db.acquire().await?; - cfd_actors::handle_commit( - order_id, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; + cfd_actors::handle_commit(order_id, &mut conn, &self.wallet, &self.projection_actor) + .await?; Ok(()) } @@ -218,7 +207,7 @@ where direction: SettlementKind::Outgoing, }, ); - self.send_pending_update_proposals()?; + self.send_pending_update_proposals().await?; self.send_to_maker .send(wire::TakerToMaker::ProposeSettlement { @@ -241,7 +230,7 @@ where async fn handle_settlement_rejected(&mut self, order_id: OrderId) -> Result<()> { tracing::info!(%order_id, "Settlement proposal got rejected"); - self.remove_pending_proposal(&order_id)?; + self.remove_pending_proposal(&order_id).await?; Ok(()) } @@ -250,6 +239,7 @@ where tracing::info!(%order_id, "Roll over proposal got rejected"); self.remove_pending_proposal(&order_id) + .await .context("rejected settlement")?; Ok(()) @@ -295,7 +285,7 @@ where let mut conn = self.db.acquire().await?; let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; cfd.state = CfdState::rejected(); - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; Ok(()) } @@ -319,10 +309,12 @@ impl Actor { insert_order(&order, &mut conn).await?; } - self.order_feed_actor_inbox.send(Some(order))?; + self.projection_actor + .send(projection::Update(Some(order))) + .await?; } None => { - self.order_feed_actor_inbox.send(None)?; + self.projection_actor.send(projection::Update(None)).await?; } } Ok(()) @@ -339,7 +331,7 @@ where attestation, &mut conn, &self.wallet, - &self.cfd_feed_actor_inbox, + &self.projection_actor, ) .await?; Ok(()) @@ -347,13 +339,8 @@ where async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { let mut conn = self.db.acquire().await?; - cfd_actors::handle_monitoring_event( - event, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; + cfd_actors::handle_monitoring_event(event, &mut conn, &self.wallet, &self.projection_actor) + .await?; Ok(()) } } @@ -379,7 +366,7 @@ where direction: SettlementKind::Outgoing, }, ); - self.send_pending_update_proposals()?; + self.send_pending_update_proposals().await?; self.send_to_maker .send(wire::TakerToMaker::ProposeRollOver { @@ -421,7 +408,7 @@ where info: e.to_string(), }; - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; return Err(e); } @@ -435,7 +422,7 @@ where attestation: None, }; - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; let txid = self .wallet @@ -490,7 +477,7 @@ where let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; cfd.state = CfdState::contract_setup(); - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; let offer_announcement = self .oracle_actor @@ -603,6 +590,7 @@ where }; self.remove_pending_proposal(&order_id) + .await .context("Could not remove accepted roll over")?; Ok(()) } @@ -629,7 +617,7 @@ where collaborative_close: None, }; - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; self.monitor_actor .send(monitor::StartMonitoring { @@ -680,9 +668,9 @@ where dlc.script_pubkey_for(cfd.role()), proposal.price, )?)?; - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; - self.remove_pending_proposal(&order_id)?; + self.remove_pending_proposal(&order_id).await?; self.monitor_actor .send(monitor::CollaborativeSettlement { diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index 42fe437..9f30445 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -2,19 +2,24 @@ use crate::harness::mocks::monitor::MonitorActor; use crate::harness::mocks::oracle::OracleActor; use crate::harness::mocks::wallet::WalletActor; use crate::schnorrsig; +use daemon::bitmex_price_feed::Quote; use daemon::connection::{Connect, ConnectionStatus}; use daemon::maker_cfd::CfdAction; -use daemon::model::cfd::{Cfd, Order, Origin}; -use daemon::model::{Price, Usd}; +use daemon::model::cfd::{Cfd, Order, Origin, UpdateCfdProposals}; +use daemon::model::{Price, Timestamp, Usd}; use daemon::seed::Seed; -use daemon::{db, maker_cfd, maker_inc_connections, taker_cfd, MakerActorSystem, Tasks}; +use daemon::{ + db, maker_cfd, maker_inc_connections, projection, taker_cfd, MakerActorSystem, Tasks, +}; use rust_decimal_macros::dec; use sqlx::SqlitePool; +use std::collections::HashMap; use std::net::SocketAddr; use std::str::FromStr; use std::task::Poll; use std::time::Duration; use tokio::sync::watch; +use tokio::sync::watch::channel; use tracing::subscriber::DefaultGuard; use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::util::SubscriberInitExt; @@ -47,16 +52,18 @@ pub struct Maker { pub mocks: mocks::Mocks, pub listen_addr: SocketAddr, pub identity_pk: x25519_dalek::PublicKey, + cfd_feed_receiver: watch::Receiver>, + order_feed_receiver: watch::Receiver>, _tasks: Tasks, } impl Maker { pub fn cfd_feed(&mut self) -> &mut watch::Receiver> { - &mut self.system.cfd_feed_receiver + &mut self.cfd_feed_receiver } pub fn order_feed(&mut self) -> &mut watch::Receiver> { - &mut self.system.order_feed_receiver + &mut self.order_feed_receiver } pub async fn start(oracle_pk: schnorrsig::PublicKey) -> Self { @@ -75,6 +82,8 @@ impl Maker { let seed = Seed::default(); let (identity_pk, identity_sk) = seed.derive_identity(); + let (projection_actor, projection_context) = xtra::Context::new(None); + // system startup sends sync messages, mock them mocks.mock_sync_handlers().await; let maker = daemon::MakerActorSystem::new( @@ -93,10 +102,30 @@ impl Maker { }, settlement_time_interval_hours, N_PAYOUTS_FOR_TEST, + projection_actor.clone(), ) .await .unwrap(); + let dummy_quote = Quote { + timestamp: Timestamp::now().unwrap(), + bid: Price::new(dec!(10000)).unwrap(), + ask: Price::new(dec!(10000)).unwrap(), + }; + + let (cfd_feed_sender, cfd_feed_receiver) = channel(vec![]); + let (order_feed_sender, order_feed_receiver) = channel::>(None); + let (update_cfd_feed_sender, _update_cfd_feed_receiver) = + channel::(HashMap::new()); + let (quote_sender, _) = channel::(dummy_quote); + + tasks.add(projection_context.run(projection::Actor::new( + cfd_feed_sender, + order_feed_sender, + quote_sender, + update_cfd_feed_sender, + ))); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let address = listener.local_addr().unwrap(); @@ -120,6 +149,8 @@ impl Maker { listen_addr: address, mocks, _tasks: tasks, + cfd_feed_receiver, + order_feed_receiver, } } @@ -157,16 +188,18 @@ impl Maker { pub struct Taker { pub system: daemon::TakerActorSystem, pub mocks: mocks::Mocks, + cfd_feed_receiver: watch::Receiver>, + order_feed_receiver: watch::Receiver>, _tasks: Tasks, } impl Taker { pub fn cfd_feed(&mut self) -> &mut watch::Receiver> { - &mut self.system.cfd_feed_receiver + &mut self.cfd_feed_receiver } pub fn order_feed(&mut self) -> &mut watch::Receiver> { - &mut self.system.order_feed_receiver + &mut self.order_feed_receiver } pub fn maker_status_feed(&mut self) -> &mut watch::Receiver { @@ -192,6 +225,8 @@ impl Taker { let (wallet_addr, wallet_fut) = wallet.create(None).run(); tasks.add(wallet_fut); + let (projection_actor, projection_context) = xtra::Context::new(None); + // system startup sends sync messages, mock them mocks.mock_sync_handlers().await; let taker = daemon::TakerActorSystem::new( @@ -203,10 +238,29 @@ impl Taker { |_, _| async { Ok(monitor) }, N_PAYOUTS_FOR_TEST, HEARTBEAT_INTERVAL_FOR_TEST * 2, + projection_actor, ) .await .unwrap(); + let dummy_quote = Quote { + timestamp: Timestamp::now().unwrap(), + bid: Price::new(dec!(10000)).unwrap(), + ask: Price::new(dec!(10000)).unwrap(), + }; + + let (cfd_feed_sender, cfd_feed_receiver) = channel(vec![]); + let (order_feed_sender, order_feed_receiver) = channel::>(None); + let (update_cfd_feed_sender, _) = channel::(HashMap::new()); + let (quote_sender, _) = channel::(dummy_quote); + + tasks.add(projection_context.run(projection::Actor::new( + cfd_feed_sender, + order_feed_sender, + quote_sender, + update_cfd_feed_sender, + ))); + taker .connection_actor_addr .send(Connect { @@ -221,6 +275,8 @@ impl Taker { system: taker, mocks, _tasks: tasks, + order_feed_receiver, + cfd_feed_receiver, } }