diff --git a/daemon/src/cfd_actors.rs b/daemon/src/cfd_actors.rs index f4c1306..be7808f 100644 --- a/daemon/src/cfd_actors.rs +++ b/daemon/src/cfd_actors.rs @@ -4,15 +4,13 @@ use anyhow::{bail, Context, Result}; use sqlx::pool::PoolConnection; use sqlx::Sqlite; -pub async fn insert_cfd_and_send_to_feed( +pub async fn insert_cfd_and_update_feed( cfd: &Cfd, conn: &mut PoolConnection, projection_address: &xtra::Address, ) -> Result<()> { db::insert_cfd(cfd, conn).await?; - projection_address - .send(projection::Update(db::load_all_cfds(conn).await?)) - .await?; + projection_address.send(projection::CfdsChanged).await??; Ok(()) } @@ -22,9 +20,7 @@ pub async fn append_cfd_state( projection_address: &xtra::Address, ) -> Result<()> { db::append_cfd_state(cfd, conn).await?; - projection_address - .send(projection::Update(db::load_all_cfds(conn).await?)) - .await?; + projection_address.send(projection::CfdsChanged).await??; Ok(()) } diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index fc8afe1..7a9a16e 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -4,7 +4,6 @@ use bdk::bitcoin::Amount; use bdk::{bitcoin, FeeRate}; use clap::{Parser, Subcommand}; use daemon::auth::{self, MAKER_USERNAME}; -use daemon::db::load_all_cfds; use daemon::model::cfd::Role; use daemon::model::WalletInfo; use daemon::seed::Seed; @@ -276,14 +275,8 @@ async fn main() -> Result<()> { .send(bitmex_price_feed::Connect) .await??; - let cfds = { - let mut conn = db.acquire().await?; - - load_all_cfds(&mut conn).await? - }; - let (proj_actor, projection_feeds) = - projection::Actor::new(Role::Maker, bitcoin_network, cfds, init_quote); + projection::Actor::new(db.clone(), Role::Maker, bitcoin_network, init_quote).await?; tasks.add(projection_context.run(proj_actor)); let listener_stream = futures::stream::poll_fn(move |ctx| { diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 863264b..0b0227d 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -1,5 +1,5 @@ use crate::address_map::{AddressMap, Stopping}; -use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_send_to_feed}; +use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_update_feed}; use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; use crate::model::cfd::{ Cfd, CfdState, CfdStateCommon, CollaborativeSettlement, Dlc, Order, OrderId, Origin, Role, @@ -478,7 +478,7 @@ where taker_id, }, ); - insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.projection_actor).await?; + insert_cfd_and_update_feed(&cfd, &mut conn, &self.projection_actor).await?; // 4. Try to get the oracle announcement, if that fails we should exit prior to changing any // state diff --git a/daemon/src/projection.rs b/daemon/src/projection.rs index 841d9f5..899f907 100644 --- a/daemon/src/projection.rs +++ b/daemon/src/projection.rs @@ -5,7 +5,7 @@ use crate::model::cfd::{ UpdateCfdProposal, }; use crate::model::{Leverage, Position, Timestamp, TradingPair}; -use crate::{bitmex_price_feed, model, tx, Order, UpdateCfdProposals}; +use crate::{bitmex_price_feed, db, model, tx, Order, UpdateCfdProposals}; use anyhow::Result; use bdk::bitcoin::{Amount, Network, SignedAmount}; use itertools::Itertools; @@ -31,7 +31,12 @@ pub struct UpdateRollOverProposal { /// (replaces previously stored values) pub struct Update(pub T); +/// Message indicating that the Cfds in the projection need to be reloaded, as at +/// least one of the Cfds has changed. +pub struct CfdsChanged; + pub struct Actor { + db: sqlx::SqlitePool, tx: Tx, state: State, } @@ -44,12 +49,15 @@ pub struct Feeds { } impl Actor { - pub fn new( + pub async fn new( + db: sqlx::SqlitePool, role: Role, network: Network, - init_cfds: Vec, init_quote: bitmex_price_feed::Quote, - ) -> (Self, Feeds) { + ) -> Result<(Self, Feeds)> { + let mut conn = db.acquire().await?; + let init_cfds = db::load_all_cfds(&mut conn).await?; + let state = State { role, network, @@ -63,8 +71,9 @@ impl Actor { let (tx_quote, rx_quote) = watch::channel(init_quote.into()); let (tx_connected_takers, rx_connected_takers) = watch::channel(Vec::new()); - ( + Ok(( Self { + db, tx: Tx { cfds: tx_cfds, order: tx_order, @@ -79,7 +88,7 @@ impl Actor { quote: rx_quote, connected_takers: rx_connected_takers, }, - ) + )) } } @@ -153,6 +162,13 @@ impl State { #[xtra_productivity] impl Actor { + fn handle(&mut self, _: CfdsChanged) -> Result<()> { + let mut conn = self.db.acquire().await?; + let cfds = db::load_all_cfds(&mut conn).await?; + self.state.update_cfds(cfds); + let _ = self.tx.cfds.send(self.state.to_cfds()); + Ok(()) + } fn handle(&mut self, msg: Update>) { let _ = self.tx.order.send(msg.0.map(|x| x.into())); } @@ -162,10 +178,6 @@ impl Actor { let _ = self.tx.quote.send(quote.into()); let _ = self.tx.cfds.send(self.state.to_cfds()); } - fn handle(&mut self, msg: Update>) { - self.state.update_cfds(msg.0); - let _ = self.tx.cfds.send(self.state.to_cfds()); - } fn handle(&mut self, msg: Update>) { let _ = self .tx diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 9f93899..371e16d 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -4,7 +4,6 @@ use bdk::bitcoin::{Address, Amount}; use bdk::{bitcoin, FeeRate}; use clap::{Parser, Subcommand}; use daemon::connection::connect; -use daemon::db::load_all_cfds; use daemon::model::cfd::Role; use daemon::model::{Identity, WalletInfo}; use daemon::seed::Seed; @@ -259,14 +258,8 @@ async fn main() -> Result<()> { .send(bitmex_price_feed::Connect) .await??; - let cfds = { - let mut conn = db.acquire().await?; - - load_all_cfds(&mut conn).await? - }; - let (proj_actor, projection_feeds) = - projection::Actor::new(Role::Taker, bitcoin_network, cfds, init_quote); + projection::Actor::new(db.clone(), Role::Taker, bitcoin_network, init_quote).await?; tasks.add(projection_context.run(proj_actor)); let possible_addresses = resolve_maker_addresses(&opts.maker).await?; diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index c78ba00..be53d63 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -1,5 +1,5 @@ use crate::address_map::AddressMap; -use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_send_to_feed}; +use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_update_feed}; use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; use crate::model::cfd::{ Cfd, CfdState, CfdStateCommon, Completed, Dlc, Order, OrderId, Origin, Role, RollOverProposal, @@ -416,7 +416,7 @@ where CfdState::outgoing_order_request(), ); - insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.projection_actor).await?; + insert_cfd_and_update_feed(&cfd, &mut conn, &self.projection_actor).await?; // Cleanup own order feed, after inserting the cfd. // Due to the 1:1 relationship between order and cfd we can never create another cfd for the diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index d6c4eec..a1cc76f 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -147,7 +147,7 @@ impl Maker { // system startup sends sync messages, mock them mocks.mock_sync_handlers().await; let maker = daemon::MakerActorSystem::new( - db, + db.clone(), wallet_addr, config.oracle_pk, |_, _| oracle, @@ -169,7 +169,9 @@ impl Maker { .unwrap(); let (proj_actor, feeds) = - projection::Actor::new(Role::Maker, Network::Testnet, vec![], dummy_quote()); + projection::Actor::new(db, Role::Maker, Network::Testnet, dummy_quote()) + .await + .unwrap(); tasks.add(projection_context.run(proj_actor)); let address = listener.local_addr().unwrap(); @@ -280,7 +282,7 @@ impl Taker { // system startup sends sync messages, mock them mocks.mock_sync_handlers().await; let taker = daemon::TakerActorSystem::new( - db, + db.clone(), wallet_addr, config.oracle_pk, identity_sk, @@ -295,7 +297,9 @@ impl Taker { .unwrap(); let (proj_actor, feeds) = - projection::Actor::new(Role::Taker, Network::Testnet, vec![], dummy_quote()); + projection::Actor::new(db, Role::Taker, Network::Testnet, dummy_quote()) + .await + .unwrap(); tasks.add(projection_context.run(proj_actor)); tasks.add(connect(