diff --git a/daemon/src/bitmex_price_feed.rs b/daemon/src/bitmex_price_feed.rs index d0adce6..2141622 100644 --- a/daemon/src/bitmex_price_feed.rs +++ b/daemon/src/bitmex_price_feed.rs @@ -1,15 +1,18 @@ use crate::model::{Price, Timestamp}; +use crate::projection; use anyhow::Result; use futures::{StreamExt, TryStreamExt}; use rust_decimal::Decimal; use std::convert::TryFrom; use std::future::Future; -use tokio::sync::watch; use tokio_tungstenite::tungstenite; +use xtra::prelude::MessageChannel; /// Connects to the BitMex price feed, returning the polling task and a watch channel that will /// always hold the last quote. -pub async fn new() -> Result<(impl Future, watch::Receiver)> { +pub async fn new( + msg_channel: impl MessageChannel>, +) -> Result<(impl Future, Quote)> { let (connection, _) = tokio_tungstenite::connect_async( "wss://www.bitmex.com/realtime?subscribe=quoteBin1m:XBTUSD", ) @@ -24,11 +27,10 @@ pub async fn new() -> Result<(impl Future, watch::Receiver)> tracing::info!("Connected to BitMex realtime API"); let first_quote = quotes.select_next_some().await?; - let (sender, receiver) = watch::channel(first_quote); let task = async move { while let Ok(Some(quote)) = quotes.try_next().await { - if sender.send(quote).is_err() { + if msg_channel.send(projection::Update(quote)).await.is_err() { break; // If the receiver dies, we can exit the loop. } } @@ -36,7 +38,7 @@ pub async fn new() -> Result<(impl Future, watch::Receiver)> tracing::warn!("Failed to read quote from websocket"); }; - Ok((task, receiver)) + Ok((task, first_quote)) } #[derive(Clone, Debug)] diff --git a/daemon/src/cfd_actors.rs b/daemon/src/cfd_actors.rs index 6a4692e..f4c1306 100644 --- a/daemon/src/cfd_actors.rs +++ b/daemon/src/cfd_actors.rs @@ -1,35 +1,30 @@ -use crate::db::load_cfd_by_order_id; use crate::model::cfd::{Attestation, Cfd, CfdState, OrderId}; -use crate::{db, monitor, oracle, try_continue, wallet}; +use crate::{db, monitor, oracle, projection, try_continue, wallet}; use anyhow::{bail, Context, Result}; use sqlx::pool::PoolConnection; use sqlx::Sqlite; -use tokio::sync::watch; pub async fn insert_cfd_and_send_to_feed( cfd: &Cfd, conn: &mut PoolConnection, - update_sender: &watch::Sender>, + projection_address: &xtra::Address, ) -> Result<()> { - if load_cfd_by_order_id(cfd.order.id, conn).await.is_ok() { - bail!( - "Cannot insert cfd because there is already a cfd for order id {}", - cfd.order.id - ) - } - db::insert_cfd(cfd, conn).await?; - update_sender.send(db::load_all_cfds(conn).await?)?; + projection_address + .send(projection::Update(db::load_all_cfds(conn).await?)) + .await?; Ok(()) } pub async fn append_cfd_state( cfd: &Cfd, conn: &mut PoolConnection, - update_sender: &watch::Sender>, + projection_address: &xtra::Address, ) -> Result<()> { db::append_cfd_state(cfd, conn).await?; - update_sender.send(db::load_all_cfds(conn).await?)?; + projection_address + .send(projection::Update(db::load_all_cfds(conn).await?)) + .await?; Ok(()) } @@ -37,7 +32,7 @@ pub async fn try_cet_publication( cfd: &mut Cfd, conn: &mut PoolConnection, wallet: &xtra::Address, - update_sender: &watch::Sender>, + projection_address: &xtra::Address, ) -> Result<()> where W: xtra::Handler, @@ -54,7 +49,7 @@ where bail!("If we can get the CET we should be able to transition") } - append_cfd_state(cfd, conn, update_sender).await?; + append_cfd_state(cfd, conn, projection_address).await?; } Err(not_ready_yet) => { tracing::debug!("{:#}", not_ready_yet); @@ -69,7 +64,7 @@ pub async fn handle_monitoring_event( event: monitor::Event, conn: &mut PoolConnection, wallet: &xtra::Address, - update_sender: &watch::Sender>, + projection_address: &xtra::Address, ) -> Result<()> where W: xtra::Handler, @@ -84,10 +79,10 @@ where return Ok(()); } - append_cfd_state(&cfd, conn, update_sender).await?; + append_cfd_state(&cfd, conn, projection_address).await?; if let CfdState::OpenCommitted { .. } = cfd.state { - try_cet_publication(&mut cfd, conn, wallet, update_sender).await?; + try_cet_publication(&mut cfd, conn, wallet, projection_address).await?; } else if let CfdState::PendingRefund { .. } = cfd.state { let signed_refund_tx = cfd.refund_tx()?; let txid = wallet @@ -106,7 +101,7 @@ pub async fn handle_commit( order_id: OrderId, conn: &mut PoolConnection, wallet: &xtra::Address, - update_sender: &watch::Sender>, + projection_address: &xtra::Address, ) -> Result<()> where W: xtra::Handler, @@ -126,7 +121,7 @@ where bail!("If we can get the commit tx we should be able to transition") } - append_cfd_state(&cfd, conn, update_sender).await?; + append_cfd_state(&cfd, conn, projection_address).await?; tracing::info!("Commit transaction published on chain: {}", txid); Ok(()) @@ -136,7 +131,7 @@ pub async fn handle_oracle_attestation( attestation: oracle::Attestation, conn: &mut PoolConnection, wallet: &xtra::Address, - update_sender: &watch::Sender>, + projection_address: &xtra::Address, ) -> Result<()> where W: xtra::Handler, @@ -169,8 +164,8 @@ where continue; } - try_continue!(append_cfd_state(cfd, conn, update_sender).await); - try_continue!(try_cet_publication(cfd, conn, wallet, update_sender) + try_continue!(append_cfd_state(cfd, conn, projection_address).await); + try_continue!(try_cet_publication(cfd, conn, wallet, projection_address) .await .context("Error when trying to publish CET")); } diff --git a/daemon/src/db.rs b/daemon/src/db.rs index b31ded7..429793b 100644 --- a/daemon/src/db.rs +++ b/daemon/src/db.rs @@ -1,6 +1,6 @@ use crate::model::cfd::{Cfd, CfdState, Order, OrderId}; use crate::model::{BitMexPriceEventId, Usd}; -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; use rust_decimal::Decimal; use sqlx::pool::PoolConnection; use sqlx::{Sqlite, SqlitePool}; @@ -97,6 +97,13 @@ pub async fn load_order_by_id( } pub async fn insert_cfd(cfd: &Cfd, conn: &mut PoolConnection) -> anyhow::Result<()> { + if load_cfd_by_order_id(cfd.order.id, conn).await.is_ok() { + bail!( + "Cannot insert cfd because there is already a cfd for order id {}", + cfd.order.id + ) + } + let state = serde_json::to_string(&cfd.state)?; let query_result = sqlx::query( r#" @@ -522,16 +529,14 @@ pub async fn load_cfds_by_oracle_event_id( #[cfg(test)] mod tests { - use crate::cfd_actors; use pretty_assertions::assert_eq; use rand::Rng; use rust_decimal_macros::dec; use sqlx::SqlitePool; use time::macros::datetime; use time::OffsetDateTime; - use tokio::sync::watch; - use crate::db::{self, insert_order}; + use crate::db::insert_order; use crate::model::cfd::{Cfd, CfdState, Order, Origin}; use crate::model::{Price, Usd}; @@ -557,32 +562,6 @@ mod tests { assert_eq!(vec![cfd], loaded); } - #[tokio::test] - async fn test_insert_like_cfd_actor() { - let mut conn = setup_test_db().await; - - let cfds = load_all_cfds(&mut conn).await.unwrap(); - - let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); - - assert_eq!(cfd_feed_receiver.borrow().clone(), vec![]); - - let cfd_1 = Cfd::dummy(); - db::insert_order(&cfd_1.order, &mut conn).await.unwrap(); - cfd_actors::insert_cfd_and_send_to_feed(&cfd_1, &mut conn, &cfd_feed_sender) - .await - .unwrap(); - - assert_eq!(cfd_feed_receiver.borrow().clone(), vec![cfd_1.clone()]); - - let cfd_2 = Cfd::dummy(); - db::insert_order(&cfd_2.order, &mut conn).await.unwrap(); - cfd_actors::insert_cfd_and_send_to_feed(&cfd_2, &mut conn, &cfd_feed_sender) - .await - .unwrap(); - assert_eq!(cfd_feed_receiver.borrow().clone(), vec![cfd_1, cfd_2]); - } - #[tokio::test] async fn test_insert_and_load_cfd_by_order_id() { let mut conn = setup_test_db().await; @@ -716,6 +695,22 @@ mod tests { assert_eq!(data.len(), 100); } + #[tokio::test] + async fn inserting_two_cfds_with_same_order_id_should_fail() { + let mut conn = setup_test_db().await; + + let cfd = Cfd::dummy().insert(&mut conn).await; + + let error = insert_cfd(&cfd, &mut conn).await.err().unwrap(); + assert_eq!( + error.to_string(), + format!( + "Cannot insert cfd because there is already a cfd for order id {}", + cfd.order.id + ) + ); + } + fn random_simple_state() -> CfdState { match rand::thread_rng().gen_range(0, 5) { 0 => CfdState::outgoing_order_request(), diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 164251d..5a5cc38 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -10,7 +10,6 @@ use connection::ConnectionStatus; use futures::future::RemoteHandle; use maia::secp256k1_zkp::schnorrsig; use sqlx::SqlitePool; -use std::collections::HashMap; use std::future::Future; use std::time::Duration; use tokio::sync::watch; @@ -36,6 +35,7 @@ mod noise; pub mod olivia; pub mod oracle; pub mod payout_curve; +pub mod projection; pub mod routes; pub mod seed; pub mod send_to_socket; @@ -75,9 +75,6 @@ impl Default for Tasks { pub struct MakerActorSystem { pub cfd_actor_addr: Address>, - pub cfd_feed_receiver: watch::Receiver>, - pub order_feed_receiver: watch::Receiver>, - pub update_cfd_feed_receiver: watch::Receiver, pub inc_conn_addr: Address, pub tasks: Tasks, } @@ -111,6 +108,7 @@ where ) -> T, settlement_time_interval_hours: time::Duration, n_payouts: usize, + projection_actor: Address, ) -> Result where F: Future>, @@ -119,11 +117,6 @@ where let cfds = load_all_cfds(&mut conn).await?; - let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); - let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); - let (update_cfd_feed_sender, update_cfd_feed_receiver) = - watch::channel::(HashMap::new()); - let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None); let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None); let (inc_conn_addr, inc_conn_ctx) = xtra::Context::new(None); @@ -135,9 +128,7 @@ where wallet_addr, settlement_time_interval_hours, oracle_pk, - cfd_feed_sender, - order_feed_sender, - update_cfd_feed_sender, + projection_actor, inc_conn_addr.clone(), monitor_addr.clone(), oracle_addr.clone(), @@ -182,9 +173,6 @@ where Ok(Self { cfd_actor_addr, - cfd_feed_receiver, - order_feed_receiver, - update_cfd_feed_receiver, inc_conn_addr, tasks, }) @@ -194,9 +182,6 @@ where pub struct TakerActorSystem { pub cfd_actor_addr: Address>, pub connection_actor_addr: Address, - pub cfd_feed_receiver: watch::Receiver>, - pub order_feed_receiver: watch::Receiver>, - pub update_cfd_feed_receiver: watch::Receiver, pub maker_online_status_feed_receiver: watch::Receiver, pub tasks: Tasks, } @@ -225,6 +210,7 @@ where monitor_constructor: impl FnOnce(Box>, Vec) -> F, n_payouts: usize, maker_heartbeat_interval: Duration, + projection_actor: Address, ) -> Result where F: Future>, @@ -233,11 +219,6 @@ where let cfds = load_all_cfds(&mut conn).await?; - let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); - let (order_feed_sender, order_feed_receiver) = watch::channel::>(None); - let (update_cfd_feed_sender, update_cfd_feed_receiver) = - watch::channel::(HashMap::new()); - let (maker_online_status_feed_sender, maker_online_status_feed_receiver) = watch::channel(ConnectionStatus::Offline); @@ -251,9 +232,7 @@ where db, wallet_addr, oracle_pk, - cfd_feed_sender, - order_feed_sender, - update_cfd_feed_sender, + projection_actor, Box::new(connection_actor_addr.clone()), monitor_addr.clone(), oracle_addr, @@ -301,9 +280,6 @@ where Ok(Self { cfd_actor_addr, connection_actor_addr, - cfd_feed_receiver, - order_feed_receiver, - update_cfd_feed_receiver, maker_online_status_feed_receiver, tasks, }) diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 6a32231..477edca 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -4,20 +4,25 @@ use bdk::bitcoin::Amount; use bdk::{bitcoin, FeeRate}; use clap::{Parser, Subcommand}; use daemon::auth::{self, MAKER_USERNAME}; +use daemon::bitmex_price_feed::Quote; +use daemon::db::load_all_cfds; +use daemon::model::cfd::{Order, UpdateCfdProposals}; use daemon::model::WalletInfo; use daemon::seed::Seed; use daemon::tokio_ext::FutureExt; use daemon::{ bitmex_price_feed, db, housekeeping, logger, maker_cfd, maker_inc_connections, monitor, oracle, - wallet, wallet_sync, MakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, + projection, wallet, wallet_sync, MakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, }; use sqlx::sqlite::SqliteConnectOptions; use sqlx::SqlitePool; +use std::collections::HashMap; use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; use std::task::Poll; use tokio::sync::watch; +use tokio::sync::watch::channel; use tracing_subscriber::filter::LevelFilter; use xtra::prelude::*; use xtra::Actor; @@ -225,9 +230,6 @@ async fn main() -> Result<()> { let mut tasks = Tasks::default(); - let (task, quote_updates) = bitmex_price_feed::new().await?; - tasks.add(task); - let db = SqlitePool::connect_with( SqliteConnectOptions::new() .create_if_missing(true) @@ -247,11 +249,11 @@ async fn main() -> Result<()> { let settlement_time_interval_hours = time::Duration::hours(opts.settlement_time_interval_hours as i64); + + let (projection_actor, projection_context) = xtra::Context::new(None); + let MakerActorSystem { cfd_actor_addr, - cfd_feed_receiver, - order_feed_receiver, - update_cfd_feed_receiver, inc_conn_addr: incoming_connection_addr, tasks: _tasks, } = MakerActorSystem::new( @@ -270,9 +272,27 @@ async fn main() -> Result<()> { }, time::Duration::hours(opts.settlement_time_interval_hours as i64), N_PAYOUTS, + projection_actor.clone(), ) .await?; + let (task, init_quote) = bitmex_price_feed::new(projection_actor).await?; + tasks.add(task); + + let cfds = load_all_cfds(&mut conn).await?; + let (cfd_feed_sender, cfd_feed_receiver) = channel(cfds.clone()); + let (order_feed_sender, order_feed_receiver) = channel::>(None); + let (update_cfd_feed_sender, update_cfd_feed_receiver) = + channel::(HashMap::new()); + let (quote_sender, quote_receiver) = channel::(init_quote); + + tasks.add(projection_context.run(projection::Actor::new( + cfd_feed_sender, + order_feed_sender, + quote_sender, + update_cfd_feed_sender, + ))); + let listener_stream = futures::stream::poll_fn(move |ctx| { let message = match futures::ready!(listener.poll_accept(ctx)) { Ok((stream, address)) => { @@ -299,7 +319,7 @@ async fn main() -> Result<()> { .manage(cfd_feed_receiver) .manage(wallet_feed_receiver) .manage(auth_password) - .manage(quote_updates) + .manage(quote_receiver) .manage(bitcoin_network) .mount( "/api", diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index a47356a..90666ad 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -2,12 +2,15 @@ use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_send_to_feed}; use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; use crate::model::cfd::{ Cfd, CfdState, CfdStateCommon, CollaborativeSettlement, Dlc, Order, OrderId, Origin, Role, - RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal, UpdateCfdProposals, + RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal, }; use crate::model::{Price, TakerId, Timestamp, Usd}; use crate::monitor::MonitorParams; use crate::tokio_ext::FutureExt; -use crate::{log_error, maker_inc_connections, monitor, oracle, setup_contract, wallet, wire}; +use crate::{ + log_error, maker_inc_connections, monitor, oracle, projection, setup_contract, wallet, wire, + UpdateCfdProposals, +}; use anyhow::{Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; @@ -19,7 +22,6 @@ use sqlx::pool::PoolConnection; use sqlx::Sqlite; use std::collections::HashMap; use time::Duration; -use tokio::sync::watch; use xtra::prelude::*; pub enum CfdAction { @@ -62,9 +64,7 @@ pub struct Actor { wallet: Address, settlement_time_interval_hours: Duration, oracle_pk: schnorrsig::PublicKey, - cfd_feed_actor_inbox: watch::Sender>, - order_feed_sender: watch::Sender>, - update_cfd_feed_sender: watch::Sender, + projection_actor: Address, takers: Address, current_order_id: Option, monitor_actor: Address, @@ -102,9 +102,7 @@ impl Actor { wallet: Address, settlement_time_interval_hours: Duration, oracle_pk: schnorrsig::PublicKey, - cfd_feed_actor_inbox: watch::Sender>, - order_feed_sender: watch::Sender>, - update_cfd_feed_sender: watch::Sender, + projection_actor: Address, takers: Address, monitor_actor: Address, oracle_actor: Address, @@ -115,9 +113,7 @@ impl Actor { wallet, settlement_time_interval_hours, oracle_pk, - cfd_feed_actor_inbox, - order_feed_sender, - update_cfd_feed_sender, + projection_actor, takers, current_order_id: None, monitor_actor, @@ -165,7 +161,7 @@ impl Actor { taker_id, ), ); - self.send_pending_proposals()?; + self.send_pending_proposals().await?; Ok(()) } @@ -189,7 +185,7 @@ impl Actor { taker_id, ), ); - self.send_pending_proposals()?; + self.send_pending_proposals().await?; Ok(()) } @@ -235,21 +231,25 @@ impl Actor { /// Send pending proposals for the purposes of UI updates. /// Filters out the TakerIds, as they are an implementation detail inside of /// the actor - fn send_pending_proposals(&self) -> Result<()> { - Ok(self.update_cfd_feed_sender.send( - self.current_pending_proposals - .iter() - .map(|(order_id, (update_cfd, _))| (*order_id, (update_cfd.clone()))) - .collect(), - )?) + async fn send_pending_proposals(&self) -> Result<()> { + let pending_proposal = self + .current_pending_proposals + .iter() + .map(|(order_id, (update_cfd, _))| (*order_id, (update_cfd.clone()))) + .collect::(); + let _ = self + .projection_actor + .send(projection::Update(pending_proposal)) + .await?; + Ok(()) } /// Removes a proposal and updates the update cfd proposals' feed - fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> { + async fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> { if self.current_pending_proposals.remove(order_id).is_none() { anyhow::bail!("Could not find proposal with order id: {}", &order_id) } - self.send_pending_proposals()?; + self.send_pending_proposals().await?; Ok(()) } @@ -283,26 +283,16 @@ where { async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { let mut conn = self.db.acquire().await?; - cfd_actors::handle_commit( - order_id, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; + cfd_actors::handle_commit(order_id, &mut conn, &self.wallet, &self.projection_actor) + .await?; Ok(()) } async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { let mut conn = self.db.acquire().await?; - cfd_actors::handle_monitoring_event( - event, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; + cfd_actors::handle_monitoring_event(event, &mut conn, &self.wallet, &self.projection_actor) + .await?; Ok(()) } @@ -312,7 +302,7 @@ where attestation, &mut conn, &self.wallet, - &self.cfd_feed_actor_inbox, + &self.projection_actor, ) .await?; Ok(()) @@ -361,11 +351,13 @@ where self.current_agreed_proposals .insert(order_id, self.get_settlement_proposal(order_id)?); self.remove_pending_proposal(&order_id) + .await .context("accepted settlement")?; } Err(e) => { tracing::warn!("Failed to notify taker of accepted settlement: {}", e); self.remove_pending_proposal(&order_id) + .await .context("accepted settlement")?; } } @@ -381,6 +373,7 @@ where // clean-up state ahead of sending to ensure consistency in case we fail to deliver the // message self.remove_pending_proposal(&order_id) + .await .context("rejected settlement")?; self.takers @@ -413,6 +406,7 @@ where // clean-up state ahead of sending to ensure consistency in case we fail to deliver the // message self.remove_pending_proposal(&order_id) + .await .context("rejected roll_over")?; self.takers @@ -472,7 +466,7 @@ where self.takers .send(maker_inc_connections::BroadcastOrder(None)) .await?; - self.order_feed_sender.send(None)?; + self.projection_actor.send(projection::Update(None)).await?; // 3. Insert CFD in DB let cfd = Cfd::new( @@ -485,7 +479,7 @@ where taker_id, }, ); - insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.projection_actor).await?; // 4. check if order has acceptable amounts and if not reject the cfd // Since rejection is tied to the cfd state at the moment we can only do this after creating @@ -533,7 +527,7 @@ where ) -> Result<()> { cfd.state = CfdState::rejected(); - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; self.takers .send(maker_inc_connections::TakerMessage { @@ -598,7 +592,7 @@ where // 4. Insert that we are in contract setup and refresh our own feed cfd.state = CfdState::contract_setup(); - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; // 5. Spawn away the contract setup let (sender, receiver) = mpsc::unbounded(); @@ -665,7 +659,7 @@ where info: e.to_string(), }; - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; return Err(e); } @@ -677,7 +671,7 @@ where attestation: None, }; - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; let txid = self .wallet @@ -802,6 +796,7 @@ where }; self.remove_pending_proposal(&order_id) + .await .context("accepted roll_over")?; Ok(()) } @@ -827,7 +822,7 @@ where attestation: None, collaborative_close: None, }; - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; self.monitor_actor .send(monitor::StartMonitoring { @@ -887,7 +882,7 @@ where own_script_pubkey.clone(), proposal.price, )?)?; - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; let spend_tx = dlc.finalize_spend_transaction((tx, sig_maker), sig_taker)?; @@ -976,7 +971,9 @@ where self.current_order_id.replace(order.id); // 3. Notify UI via feed - self.order_feed_sender.send(Some(order.clone()))?; + self.projection_actor + .send(projection::Update(Some(order.clone()))) + .await?; // 4. Inform connected takers self.takers diff --git a/daemon/src/projection.rs b/daemon/src/projection.rs new file mode 100644 index 0000000..1af3eac --- /dev/null +++ b/daemon/src/projection.rs @@ -0,0 +1,47 @@ +use crate::bitmex_price_feed::Quote; +use crate::{Cfd, Order, UpdateCfdProposals}; +use tokio::sync::watch; +use xtra_productivity::xtra_productivity; + +pub struct Actor { + tx_cfds: watch::Sender>, + tx_order: watch::Sender>, + tx_quote: watch::Sender, + tx_settlements: watch::Sender, +} + +impl Actor { + pub fn new( + tx_cfds: watch::Sender>, + tx_order: watch::Sender>, + tx_quote: watch::Sender, + tx_settlements: watch::Sender, + ) -> Self { + Self { + tx_cfds, + tx_order, + tx_quote, + tx_settlements, + } + } +} + +pub struct Update(pub T); + +#[xtra_productivity] +impl Actor { + fn handle(&mut self, msg: Update>) { + let _ = self.tx_cfds.send(msg.0); + } + fn handle(&mut self, msg: Update>) { + let _ = self.tx_order.send(msg.0); + } + fn handle(&mut self, msg: Update) { + let _ = self.tx_quote.send(msg.0); + } + fn handle(&mut self, msg: Update) { + let _ = self.tx_settlements.send(msg.0); + } +} + +impl xtra::Actor for Actor {} diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index dabfd9a..6fa56ec 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -3,16 +3,20 @@ use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::{Address, Amount}; use bdk::{bitcoin, FeeRate}; use clap::{Parser, Subcommand}; +use daemon::bitmex_price_feed::Quote; use daemon::connection::ConnectionStatus; +use daemon::db::load_all_cfds; +use daemon::model::cfd::{Order, UpdateCfdProposals}; use daemon::model::WalletInfo; use daemon::seed::Seed; use daemon::tokio_ext::FutureExt; use daemon::{ - bitmex_price_feed, connection, db, housekeeping, logger, monitor, oracle, taker_cfd, wallet, - wallet_sync, TakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, + bitmex_price_feed, connection, db, housekeeping, logger, monitor, oracle, projection, + taker_cfd, wallet, wallet_sync, TakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, }; use sqlx::sqlite::SqliteConnectOptions; use sqlx::SqlitePool; +use std::collections::HashMap; use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; @@ -20,6 +24,7 @@ use std::time::Duration; use tokio::sync::watch; use tokio::time::sleep; use tracing_subscriber::filter::LevelFilter; +use watch::channel; use xtra::prelude::MessageChannel; use xtra::Actor; @@ -209,9 +214,6 @@ async fn main() -> Result<()> { let mut tasks = Tasks::default(); - let (task, quote_updates) = bitmex_price_feed::new().await?; - tasks.add(task); - let figment = rocket::Config::figment() .merge(("address", opts.http_address.ip())) .merge(("port", opts.http_address.port())); @@ -233,12 +235,11 @@ async fn main() -> Result<()> { housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn).await?; housekeeping::rebroadcast_transactions(&mut conn, &wallet).await?; + let (projection_actor, projection_context) = xtra::Context::new(None); + let TakerActorSystem { cfd_actor_addr, connection_actor_addr, - cfd_feed_receiver, - order_feed_receiver, - update_cfd_feed_receiver, mut maker_online_status_feed_receiver, tasks: _tasks, } = TakerActorSystem::new( @@ -255,8 +256,25 @@ async fn main() -> Result<()> { }, N_PAYOUTS, HEARTBEAT_INTERVAL * 2, + projection_actor.clone(), ) .await?; + let (task, init_quote) = bitmex_price_feed::new(projection_actor).await?; + tasks.add(task); + + let cfds = load_all_cfds(&mut conn).await?; + let (cfd_feed_sender, cfd_feed_receiver) = channel(cfds.clone()); + let (order_feed_sender, order_feed_receiver) = channel::>(None); + let (update_cfd_feed_sender, update_cfd_feed_receiver) = + channel::(HashMap::new()); + let (quote_sender, quote_receiver) = channel::(init_quote); + + tasks.add(projection_context.run(projection::Actor::new( + cfd_feed_sender, + order_feed_sender, + quote_sender, + update_cfd_feed_sender, + ))); connect(connection_actor_addr, opts.maker_id, opts.maker).await?; @@ -271,7 +289,7 @@ async fn main() -> Result<()> { .manage(cfd_action_channel) .manage(cfd_feed_receiver) .manage(wallet_feed_receiver) - .manage(quote_updates) + .manage(quote_receiver) .manage(bitcoin_network) .mount( "/api", diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 28c0738..7f039df 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -8,7 +8,7 @@ use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd}; use crate::monitor::{self, MonitorParams}; use crate::tokio_ext::FutureExt; use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg}; -use crate::{log_error, oracle, setup_contract, wallet, wire}; +use crate::{log_error, oracle, projection, setup_contract, wallet, wire}; use anyhow::{bail, Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; @@ -16,7 +16,6 @@ use futures::channel::mpsc; use futures::future::RemoteHandle; use futures::{future, SinkExt}; use std::collections::HashMap; -use tokio::sync::watch; use xtra::prelude::*; pub struct TakeOffer { @@ -67,9 +66,7 @@ pub struct Actor { db: sqlx::SqlitePool, wallet: Address, oracle_pk: schnorrsig::PublicKey, - cfd_feed_actor_inbox: watch::Sender>, - order_feed_actor_inbox: watch::Sender>, - update_cfd_feed_sender: watch::Sender, + projection_actor: Address, send_to_maker: Box>, monitor_actor: Address, setup_state: SetupState, @@ -90,9 +87,7 @@ where db: sqlx::SqlitePool, wallet: Address, oracle_pk: schnorrsig::PublicKey, - cfd_feed_actor_inbox: watch::Sender>, - order_feed_actor_inbox: watch::Sender>, - update_cfd_feed_sender: watch::Sender, + projection_actor: Address, send_to_maker: Box>, monitor_actor: Address, oracle_actor: Address, @@ -102,9 +97,7 @@ where db, wallet, oracle_pk, - cfd_feed_actor_inbox, - order_feed_actor_inbox, - update_cfd_feed_sender, + projection_actor, send_to_maker, monitor_actor, setup_state: SetupState::None, @@ -117,18 +110,19 @@ where } impl Actor { - fn send_pending_update_proposals(&self) -> Result<()> { + async fn send_pending_update_proposals(&self) -> Result<()> { Ok(self - .update_cfd_feed_sender - .send(self.current_pending_proposals.clone())?) + .projection_actor + .send(projection::Update(self.current_pending_proposals.clone())) + .await?) } /// Removes a proposal and updates the update cfd proposals' feed - fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> { + async fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> { if self.current_pending_proposals.remove(order_id).is_none() { anyhow::bail!("Could not find proposal with order id: {}", &order_id) } - self.send_pending_update_proposals()?; + self.send_pending_update_proposals().await?; Ok(()) } @@ -158,12 +152,12 @@ impl Actor { CfdState::outgoing_order_request(), ); - insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.projection_actor).await?; // Cleanup own order feed, after inserting the cfd. // Due to the 1:1 relationship between order and cfd we can never create another cfd for the // same order id. - self.order_feed_actor_inbox.send(None)?; + self.projection_actor.send(projection::Update(None)).await?; self.send_to_maker .send(wire::TakerToMaker::TakeOrder { order_id, quantity }) @@ -181,13 +175,8 @@ where { async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { let mut conn = self.db.acquire().await?; - cfd_actors::handle_commit( - order_id, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; + cfd_actors::handle_commit(order_id, &mut conn, &self.wallet, &self.projection_actor) + .await?; Ok(()) } @@ -218,7 +207,7 @@ where direction: SettlementKind::Outgoing, }, ); - self.send_pending_update_proposals()?; + self.send_pending_update_proposals().await?; self.send_to_maker .send(wire::TakerToMaker::ProposeSettlement { @@ -241,7 +230,7 @@ where async fn handle_settlement_rejected(&mut self, order_id: OrderId) -> Result<()> { tracing::info!(%order_id, "Settlement proposal got rejected"); - self.remove_pending_proposal(&order_id)?; + self.remove_pending_proposal(&order_id).await?; Ok(()) } @@ -250,6 +239,7 @@ where tracing::info!(%order_id, "Roll over proposal got rejected"); self.remove_pending_proposal(&order_id) + .await .context("rejected settlement")?; Ok(()) @@ -295,7 +285,7 @@ where let mut conn = self.db.acquire().await?; let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; cfd.state = CfdState::rejected(); - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; Ok(()) } @@ -319,10 +309,12 @@ impl Actor { insert_order(&order, &mut conn).await?; } - self.order_feed_actor_inbox.send(Some(order))?; + self.projection_actor + .send(projection::Update(Some(order))) + .await?; } None => { - self.order_feed_actor_inbox.send(None)?; + self.projection_actor.send(projection::Update(None)).await?; } } Ok(()) @@ -339,7 +331,7 @@ where attestation, &mut conn, &self.wallet, - &self.cfd_feed_actor_inbox, + &self.projection_actor, ) .await?; Ok(()) @@ -347,13 +339,8 @@ where async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { let mut conn = self.db.acquire().await?; - cfd_actors::handle_monitoring_event( - event, - &mut conn, - &self.wallet, - &self.cfd_feed_actor_inbox, - ) - .await?; + cfd_actors::handle_monitoring_event(event, &mut conn, &self.wallet, &self.projection_actor) + .await?; Ok(()) } } @@ -379,7 +366,7 @@ where direction: SettlementKind::Outgoing, }, ); - self.send_pending_update_proposals()?; + self.send_pending_update_proposals().await?; self.send_to_maker .send(wire::TakerToMaker::ProposeRollOver { @@ -421,7 +408,7 @@ where info: e.to_string(), }; - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; return Err(e); } @@ -435,7 +422,7 @@ where attestation: None, }; - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; let txid = self .wallet @@ -490,7 +477,7 @@ where let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; cfd.state = CfdState::contract_setup(); - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; let offer_announcement = self .oracle_actor @@ -603,6 +590,7 @@ where }; self.remove_pending_proposal(&order_id) + .await .context("Could not remove accepted roll over")?; Ok(()) } @@ -629,7 +617,7 @@ where collaborative_close: None, }; - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; self.monitor_actor .send(monitor::StartMonitoring { @@ -680,9 +668,9 @@ where dlc.script_pubkey_for(cfd.role()), proposal.price, )?)?; - append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; - self.remove_pending_proposal(&order_id)?; + self.remove_pending_proposal(&order_id).await?; self.monitor_actor .send(monitor::CollaborativeSettlement { diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index 42fe437..9f30445 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -2,19 +2,24 @@ use crate::harness::mocks::monitor::MonitorActor; use crate::harness::mocks::oracle::OracleActor; use crate::harness::mocks::wallet::WalletActor; use crate::schnorrsig; +use daemon::bitmex_price_feed::Quote; use daemon::connection::{Connect, ConnectionStatus}; use daemon::maker_cfd::CfdAction; -use daemon::model::cfd::{Cfd, Order, Origin}; -use daemon::model::{Price, Usd}; +use daemon::model::cfd::{Cfd, Order, Origin, UpdateCfdProposals}; +use daemon::model::{Price, Timestamp, Usd}; use daemon::seed::Seed; -use daemon::{db, maker_cfd, maker_inc_connections, taker_cfd, MakerActorSystem, Tasks}; +use daemon::{ + db, maker_cfd, maker_inc_connections, projection, taker_cfd, MakerActorSystem, Tasks, +}; use rust_decimal_macros::dec; use sqlx::SqlitePool; +use std::collections::HashMap; use std::net::SocketAddr; use std::str::FromStr; use std::task::Poll; use std::time::Duration; use tokio::sync::watch; +use tokio::sync::watch::channel; use tracing::subscriber::DefaultGuard; use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::util::SubscriberInitExt; @@ -47,16 +52,18 @@ pub struct Maker { pub mocks: mocks::Mocks, pub listen_addr: SocketAddr, pub identity_pk: x25519_dalek::PublicKey, + cfd_feed_receiver: watch::Receiver>, + order_feed_receiver: watch::Receiver>, _tasks: Tasks, } impl Maker { pub fn cfd_feed(&mut self) -> &mut watch::Receiver> { - &mut self.system.cfd_feed_receiver + &mut self.cfd_feed_receiver } pub fn order_feed(&mut self) -> &mut watch::Receiver> { - &mut self.system.order_feed_receiver + &mut self.order_feed_receiver } pub async fn start(oracle_pk: schnorrsig::PublicKey) -> Self { @@ -75,6 +82,8 @@ impl Maker { let seed = Seed::default(); let (identity_pk, identity_sk) = seed.derive_identity(); + let (projection_actor, projection_context) = xtra::Context::new(None); + // system startup sends sync messages, mock them mocks.mock_sync_handlers().await; let maker = daemon::MakerActorSystem::new( @@ -93,10 +102,30 @@ impl Maker { }, settlement_time_interval_hours, N_PAYOUTS_FOR_TEST, + projection_actor.clone(), ) .await .unwrap(); + let dummy_quote = Quote { + timestamp: Timestamp::now().unwrap(), + bid: Price::new(dec!(10000)).unwrap(), + ask: Price::new(dec!(10000)).unwrap(), + }; + + let (cfd_feed_sender, cfd_feed_receiver) = channel(vec![]); + let (order_feed_sender, order_feed_receiver) = channel::>(None); + let (update_cfd_feed_sender, _update_cfd_feed_receiver) = + channel::(HashMap::new()); + let (quote_sender, _) = channel::(dummy_quote); + + tasks.add(projection_context.run(projection::Actor::new( + cfd_feed_sender, + order_feed_sender, + quote_sender, + update_cfd_feed_sender, + ))); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let address = listener.local_addr().unwrap(); @@ -120,6 +149,8 @@ impl Maker { listen_addr: address, mocks, _tasks: tasks, + cfd_feed_receiver, + order_feed_receiver, } } @@ -157,16 +188,18 @@ impl Maker { pub struct Taker { pub system: daemon::TakerActorSystem, pub mocks: mocks::Mocks, + cfd_feed_receiver: watch::Receiver>, + order_feed_receiver: watch::Receiver>, _tasks: Tasks, } impl Taker { pub fn cfd_feed(&mut self) -> &mut watch::Receiver> { - &mut self.system.cfd_feed_receiver + &mut self.cfd_feed_receiver } pub fn order_feed(&mut self) -> &mut watch::Receiver> { - &mut self.system.order_feed_receiver + &mut self.order_feed_receiver } pub fn maker_status_feed(&mut self) -> &mut watch::Receiver { @@ -192,6 +225,8 @@ impl Taker { let (wallet_addr, wallet_fut) = wallet.create(None).run(); tasks.add(wallet_fut); + let (projection_actor, projection_context) = xtra::Context::new(None); + // system startup sends sync messages, mock them mocks.mock_sync_handlers().await; let taker = daemon::TakerActorSystem::new( @@ -203,10 +238,29 @@ impl Taker { |_, _| async { Ok(monitor) }, N_PAYOUTS_FOR_TEST, HEARTBEAT_INTERVAL_FOR_TEST * 2, + projection_actor, ) .await .unwrap(); + let dummy_quote = Quote { + timestamp: Timestamp::now().unwrap(), + bid: Price::new(dec!(10000)).unwrap(), + ask: Price::new(dec!(10000)).unwrap(), + }; + + let (cfd_feed_sender, cfd_feed_receiver) = channel(vec![]); + let (order_feed_sender, order_feed_receiver) = channel::>(None); + let (update_cfd_feed_sender, _) = channel::(HashMap::new()); + let (quote_sender, _) = channel::(dummy_quote); + + tasks.add(projection_context.run(projection::Actor::new( + cfd_feed_sender, + order_feed_sender, + quote_sender, + update_cfd_feed_sender, + ))); + taker .connection_actor_addr .send(Connect { @@ -221,6 +275,8 @@ impl Taker { system: taker, mocks, _tasks: tasks, + order_feed_receiver, + cfd_feed_receiver, } }