Browse Source

Allow projection actor to retrieve latest Cfd state itself from DB

Instead of retrieving Cfds inside Cfd actors, send a msg that they changed and
retrieve them by the projection actor itself.
feature/force-stop-button
Mariusz Klochowicz 3 years ago
parent
commit
ad551e7161
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 10
      daemon/src/cfd_actors.rs
  2. 9
      daemon/src/maker.rs
  3. 4
      daemon/src/maker_cfd.rs
  4. 32
      daemon/src/projection.rs
  5. 9
      daemon/src/taker.rs
  6. 4
      daemon/src/taker_cfd.rs
  7. 12
      daemon/tests/harness/mod.rs

10
daemon/src/cfd_actors.rs

@ -4,15 +4,13 @@ use anyhow::{bail, Context, Result};
use sqlx::pool::PoolConnection; use sqlx::pool::PoolConnection;
use sqlx::Sqlite; use sqlx::Sqlite;
pub async fn insert_cfd_and_send_to_feed( pub async fn insert_cfd_and_update_feed(
cfd: &Cfd, cfd: &Cfd,
conn: &mut PoolConnection<Sqlite>, conn: &mut PoolConnection<Sqlite>,
projection_address: &xtra::Address<projection::Actor>, projection_address: &xtra::Address<projection::Actor>,
) -> Result<()> { ) -> Result<()> {
db::insert_cfd(cfd, conn).await?; db::insert_cfd(cfd, conn).await?;
projection_address projection_address.send(projection::CfdsChanged).await??;
.send(projection::Update(db::load_all_cfds(conn).await?))
.await?;
Ok(()) Ok(())
} }
@ -22,9 +20,7 @@ pub async fn append_cfd_state(
projection_address: &xtra::Address<projection::Actor>, projection_address: &xtra::Address<projection::Actor>,
) -> Result<()> { ) -> Result<()> {
db::append_cfd_state(cfd, conn).await?; db::append_cfd_state(cfd, conn).await?;
projection_address projection_address.send(projection::CfdsChanged).await??;
.send(projection::Update(db::load_all_cfds(conn).await?))
.await?;
Ok(()) Ok(())
} }

9
daemon/src/maker.rs

@ -4,7 +4,6 @@ use bdk::bitcoin::Amount;
use bdk::{bitcoin, FeeRate}; use bdk::{bitcoin, FeeRate};
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use daemon::auth::{self, MAKER_USERNAME}; use daemon::auth::{self, MAKER_USERNAME};
use daemon::db::load_all_cfds;
use daemon::model::cfd::Role; use daemon::model::cfd::Role;
use daemon::model::WalletInfo; use daemon::model::WalletInfo;
use daemon::seed::Seed; use daemon::seed::Seed;
@ -276,14 +275,8 @@ async fn main() -> Result<()> {
.send(bitmex_price_feed::Connect) .send(bitmex_price_feed::Connect)
.await??; .await??;
let cfds = {
let mut conn = db.acquire().await?;
load_all_cfds(&mut conn).await?
};
let (proj_actor, projection_feeds) = let (proj_actor, projection_feeds) =
projection::Actor::new(Role::Maker, bitcoin_network, cfds, init_quote); projection::Actor::new(db.clone(), Role::Maker, bitcoin_network, init_quote).await?;
tasks.add(projection_context.run(proj_actor)); tasks.add(projection_context.run(proj_actor));
let listener_stream = futures::stream::poll_fn(move |ctx| { let listener_stream = futures::stream::poll_fn(move |ctx| {

4
daemon/src/maker_cfd.rs

@ -1,5 +1,5 @@
use crate::address_map::{AddressMap, Stopping}; use crate::address_map::{AddressMap, Stopping};
use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_send_to_feed}; use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_update_feed};
use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id};
use crate::model::cfd::{ use crate::model::cfd::{
Cfd, CfdState, CfdStateCommon, CollaborativeSettlement, Dlc, Order, OrderId, Origin, Role, Cfd, CfdState, CfdStateCommon, CollaborativeSettlement, Dlc, Order, OrderId, Origin, Role,
@ -478,7 +478,7 @@ where
taker_id, taker_id,
}, },
); );
insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.projection_actor).await?; insert_cfd_and_update_feed(&cfd, &mut conn, &self.projection_actor).await?;
// 4. Try to get the oracle announcement, if that fails we should exit prior to changing any // 4. Try to get the oracle announcement, if that fails we should exit prior to changing any
// state // state

32
daemon/src/projection.rs

@ -5,7 +5,7 @@ use crate::model::cfd::{
UpdateCfdProposal, UpdateCfdProposal,
}; };
use crate::model::{Leverage, Position, Timestamp, TradingPair}; use crate::model::{Leverage, Position, Timestamp, TradingPair};
use crate::{bitmex_price_feed, model, tx, Order, UpdateCfdProposals}; use crate::{bitmex_price_feed, db, model, tx, Order, UpdateCfdProposals};
use anyhow::Result; use anyhow::Result;
use bdk::bitcoin::{Amount, Network, SignedAmount}; use bdk::bitcoin::{Amount, Network, SignedAmount};
use itertools::Itertools; use itertools::Itertools;
@ -31,7 +31,12 @@ pub struct UpdateRollOverProposal {
/// (replaces previously stored values) /// (replaces previously stored values)
pub struct Update<T>(pub T); pub struct Update<T>(pub T);
/// Message indicating that the Cfds in the projection need to be reloaded, as at
/// least one of the Cfds has changed.
pub struct CfdsChanged;
pub struct Actor { pub struct Actor {
db: sqlx::SqlitePool,
tx: Tx, tx: Tx,
state: State, state: State,
} }
@ -44,12 +49,15 @@ pub struct Feeds {
} }
impl Actor { impl Actor {
pub fn new( pub async fn new(
db: sqlx::SqlitePool,
role: Role, role: Role,
network: Network, network: Network,
init_cfds: Vec<ModelCfd>,
init_quote: bitmex_price_feed::Quote, init_quote: bitmex_price_feed::Quote,
) -> (Self, Feeds) { ) -> Result<(Self, Feeds)> {
let mut conn = db.acquire().await?;
let init_cfds = db::load_all_cfds(&mut conn).await?;
let state = State { let state = State {
role, role,
network, network,
@ -63,8 +71,9 @@ impl Actor {
let (tx_quote, rx_quote) = watch::channel(init_quote.into()); let (tx_quote, rx_quote) = watch::channel(init_quote.into());
let (tx_connected_takers, rx_connected_takers) = watch::channel(Vec::new()); let (tx_connected_takers, rx_connected_takers) = watch::channel(Vec::new());
( Ok((
Self { Self {
db,
tx: Tx { tx: Tx {
cfds: tx_cfds, cfds: tx_cfds,
order: tx_order, order: tx_order,
@ -79,7 +88,7 @@ impl Actor {
quote: rx_quote, quote: rx_quote,
connected_takers: rx_connected_takers, connected_takers: rx_connected_takers,
}, },
) ))
} }
} }
@ -153,6 +162,13 @@ impl State {
#[xtra_productivity] #[xtra_productivity]
impl Actor { impl Actor {
fn handle(&mut self, _: CfdsChanged) -> Result<()> {
let mut conn = self.db.acquire().await?;
let cfds = db::load_all_cfds(&mut conn).await?;
self.state.update_cfds(cfds);
let _ = self.tx.cfds.send(self.state.to_cfds());
Ok(())
}
fn handle(&mut self, msg: Update<Option<Order>>) { fn handle(&mut self, msg: Update<Option<Order>>) {
let _ = self.tx.order.send(msg.0.map(|x| x.into())); let _ = self.tx.order.send(msg.0.map(|x| x.into()));
} }
@ -162,10 +178,6 @@ impl Actor {
let _ = self.tx.quote.send(quote.into()); let _ = self.tx.quote.send(quote.into());
let _ = self.tx.cfds.send(self.state.to_cfds()); let _ = self.tx.cfds.send(self.state.to_cfds());
} }
fn handle(&mut self, msg: Update<Vec<ModelCfd>>) {
self.state.update_cfds(msg.0);
let _ = self.tx.cfds.send(self.state.to_cfds());
}
fn handle(&mut self, msg: Update<Vec<model::Identity>>) { fn handle(&mut self, msg: Update<Vec<model::Identity>>) {
let _ = self let _ = self
.tx .tx

9
daemon/src/taker.rs

@ -4,7 +4,6 @@ use bdk::bitcoin::{Address, Amount};
use bdk::{bitcoin, FeeRate}; use bdk::{bitcoin, FeeRate};
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use daemon::connection::connect; use daemon::connection::connect;
use daemon::db::load_all_cfds;
use daemon::model::cfd::Role; use daemon::model::cfd::Role;
use daemon::model::{Identity, WalletInfo}; use daemon::model::{Identity, WalletInfo};
use daemon::seed::Seed; use daemon::seed::Seed;
@ -259,14 +258,8 @@ async fn main() -> Result<()> {
.send(bitmex_price_feed::Connect) .send(bitmex_price_feed::Connect)
.await??; .await??;
let cfds = {
let mut conn = db.acquire().await?;
load_all_cfds(&mut conn).await?
};
let (proj_actor, projection_feeds) = let (proj_actor, projection_feeds) =
projection::Actor::new(Role::Taker, bitcoin_network, cfds, init_quote); projection::Actor::new(db.clone(), Role::Taker, bitcoin_network, init_quote).await?;
tasks.add(projection_context.run(proj_actor)); tasks.add(projection_context.run(proj_actor));
let possible_addresses = resolve_maker_addresses(&opts.maker).await?; let possible_addresses = resolve_maker_addresses(&opts.maker).await?;

4
daemon/src/taker_cfd.rs

@ -1,5 +1,5 @@
use crate::address_map::AddressMap; use crate::address_map::AddressMap;
use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_send_to_feed}; use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_update_feed};
use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id};
use crate::model::cfd::{ use crate::model::cfd::{
Cfd, CfdState, CfdStateCommon, Completed, Dlc, Order, OrderId, Origin, Role, RollOverProposal, Cfd, CfdState, CfdStateCommon, Completed, Dlc, Order, OrderId, Origin, Role, RollOverProposal,
@ -416,7 +416,7 @@ where
CfdState::outgoing_order_request(), CfdState::outgoing_order_request(),
); );
insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.projection_actor).await?; insert_cfd_and_update_feed(&cfd, &mut conn, &self.projection_actor).await?;
// Cleanup own order feed, after inserting the cfd. // Cleanup own order feed, after inserting the cfd.
// Due to the 1:1 relationship between order and cfd we can never create another cfd for the // Due to the 1:1 relationship between order and cfd we can never create another cfd for the

12
daemon/tests/harness/mod.rs

@ -147,7 +147,7 @@ impl Maker {
// system startup sends sync messages, mock them // system startup sends sync messages, mock them
mocks.mock_sync_handlers().await; mocks.mock_sync_handlers().await;
let maker = daemon::MakerActorSystem::new( let maker = daemon::MakerActorSystem::new(
db, db.clone(),
wallet_addr, wallet_addr,
config.oracle_pk, config.oracle_pk,
|_, _| oracle, |_, _| oracle,
@ -169,7 +169,9 @@ impl Maker {
.unwrap(); .unwrap();
let (proj_actor, feeds) = let (proj_actor, feeds) =
projection::Actor::new(Role::Maker, Network::Testnet, vec![], dummy_quote()); projection::Actor::new(db, Role::Maker, Network::Testnet, dummy_quote())
.await
.unwrap();
tasks.add(projection_context.run(proj_actor)); tasks.add(projection_context.run(proj_actor));
let address = listener.local_addr().unwrap(); let address = listener.local_addr().unwrap();
@ -280,7 +282,7 @@ impl Taker {
// system startup sends sync messages, mock them // system startup sends sync messages, mock them
mocks.mock_sync_handlers().await; mocks.mock_sync_handlers().await;
let taker = daemon::TakerActorSystem::new( let taker = daemon::TakerActorSystem::new(
db, db.clone(),
wallet_addr, wallet_addr,
config.oracle_pk, config.oracle_pk,
identity_sk, identity_sk,
@ -295,7 +297,9 @@ impl Taker {
.unwrap(); .unwrap();
let (proj_actor, feeds) = let (proj_actor, feeds) =
projection::Actor::new(Role::Taker, Network::Testnet, vec![], dummy_quote()); projection::Actor::new(db, Role::Taker, Network::Testnet, dummy_quote())
.await
.unwrap();
tasks.add(projection_context.run(proj_actor)); tasks.add(projection_context.run(proj_actor));
tasks.add(connect( tasks.add(connect(

Loading…
Cancel
Save