Browse Source

Introduce process manager actor

In this first iteration we move saving the event and updating the UI (by sending `CfdChanged` to the projection actor) into the process manager.
Next iterations will more the post-processing into the process manager actor step by step until there is not event-related communication with other actors anymore.
master^2
Daniel Karzel 3 years ago
parent
commit
5a41cb312a
No known key found for this signature in database GPG Key ID: 30C3FC2E438ADB6E
  1. 47
      daemon/src/cfd_actors.rs
  2. 18
      daemon/src/lib.rs
  3. 42
      daemon/src/maker_cfd.rs
  4. 55
      daemon/src/process_manager.rs
  5. 41
      daemon/src/taker_cfd.rs

47
daemon/src/cfd_actors.rs

@ -5,8 +5,8 @@ use crate::model::cfd::Event;
use crate::model::cfd::OrderId; use crate::model::cfd::OrderId;
use crate::monitor; use crate::monitor;
use crate::oracle; use crate::oracle;
use crate::process_manager;
use crate::projection; use crate::projection;
use crate::projection::CfdsChanged;
use crate::try_continue; use crate::try_continue;
use crate::wallet; use crate::wallet;
use anyhow::Context; use anyhow::Context;
@ -29,7 +29,7 @@ pub async fn handle_monitoring_event<W>(
event: monitor::Event, event: monitor::Event,
db: &SqlitePool, db: &SqlitePool,
wallet: &xtra::Address<W>, wallet: &xtra::Address<W>,
projection_address: &xtra::Address<projection::Actor>, process_manager_address: &xtra::Address<process_manager::Actor>,
) -> Result<()> ) -> Result<()>
where where
W: xtra::Handler<wallet::TryBroadcastTransaction>, W: xtra::Handler<wallet::TryBroadcastTransaction>,
@ -57,9 +57,15 @@ where
monitor::Event::RevokedTransactionFound(_) => cfd.handle_revoke_confirmed(), monitor::Event::RevokedTransactionFound(_) => cfd.handle_revoke_confirmed(),
}; };
db::append_event(event.clone(), &mut conn).await?; if let Err(e) = process_manager_address
.send(process_manager::Event::new(event.clone()))
.await?
{
tracing::error!("Sending event to process manager failed: {:#}", e);
} else {
// TODO: Move into process manager
post_process_event(event, wallet).await?; post_process_event(event, wallet).await?;
projection_address.send(CfdsChanged).await?; }
Ok(()) Ok(())
} }
@ -97,7 +103,7 @@ pub async fn handle_commit<W>(
order_id: OrderId, order_id: OrderId,
conn: &mut PoolConnection<Sqlite>, conn: &mut PoolConnection<Sqlite>,
wallet: &xtra::Address<W>, wallet: &xtra::Address<W>,
projection_address: &xtra::Address<projection::Actor>, process_manager_address: &xtra::Address<process_manager::Actor>,
) -> Result<()> ) -> Result<()>
where where
W: xtra::Handler<wallet::TryBroadcastTransaction>, W: xtra::Handler<wallet::TryBroadcastTransaction>,
@ -105,11 +111,15 @@ where
let cfd = load_cfd(order_id, conn).await?; let cfd = load_cfd(order_id, conn).await?;
let event = cfd.manual_commit_to_blockchain()?; let event = cfd.manual_commit_to_blockchain()?;
db::append_event(event.clone(), conn).await?; if let Err(e) = process_manager_address
.send(process_manager::Event::new(event.clone()))
.await?
{
tracing::error!("Sending event to process manager failed: {:#}", e);
} else {
// TODO: Move into process manager
post_process_event(event, wallet).await?; post_process_event(event, wallet).await?;
}
projection_address.send(CfdsChanged).await?;
Ok(()) Ok(())
} }
@ -118,7 +128,7 @@ pub async fn handle_oracle_attestation<W>(
attestation: oracle::Attestation, attestation: oracle::Attestation,
db: &SqlitePool, db: &SqlitePool,
wallet: &xtra::Address<W>, wallet: &xtra::Address<W>,
projection_address: &xtra::Address<projection::Actor>, process_manager_address: &xtra::Address<process_manager::Actor>,
) -> Result<()> ) -> Result<()>
where where
W: xtra::Handler<wallet::TryBroadcastTransaction>, W: xtra::Handler<wallet::TryBroadcastTransaction>,
@ -136,16 +146,19 @@ where
.decrypt_cet(&attestation) .decrypt_cet(&attestation)
.context("Failed to decrypt CET using attestation")); .context("Failed to decrypt CET using attestation"));
try_continue!(db::append_event(event.clone(), &mut conn)
.await
.context("Failed to append events"));
if let Some(event) = event { if let Some(event) = event {
try_continue!(post_process_event(event, wallet).await) // Note: ? OK, because if the actor is disconnected we can fail the loop
if let Err(e) = process_manager_address
.send(process_manager::Event::new(event.clone()))
.await?
{
tracing::error!("Sending event to process manager failed: {:#}", e);
} else {
// TODO: Move into process manager
try_continue!(post_process_event(event, wallet).await);
}
} }
} }
projection_address.send(CfdsChanged).await?;
Ok(()) Ok(())
} }

18
daemon/src/lib.rs

@ -6,6 +6,7 @@ use crate::maker_cfd::TakerConnected;
use crate::model::cfd::Cfd; use crate::model::cfd::Cfd;
use crate::model::cfd::Order; use crate::model::cfd::Order;
use crate::model::cfd::OrderId; use crate::model::cfd::OrderId;
use crate::model::cfd::Role;
use crate::model::Identity; use crate::model::Identity;
use crate::model::Price; use crate::model::Price;
use crate::model::Usd; use crate::model::Usd;
@ -54,6 +55,7 @@ mod noise;
pub mod olivia; pub mod olivia;
pub mod oracle; pub mod oracle;
pub mod payout_curve; pub mod payout_curve;
pub mod process_manager;
pub mod projection; pub mod projection;
pub mod rollover_maker; pub mod rollover_maker;
pub mod rollover_taker; pub mod rollover_taker;
@ -162,15 +164,23 @@ where
let (monitor_addr, monitor_ctx) = xtra::Context::new(None); let (monitor_addr, monitor_ctx) = xtra::Context::new(None);
let (oracle_addr, oracle_ctx) = xtra::Context::new(None); let (oracle_addr, oracle_ctx) = xtra::Context::new(None);
let (inc_conn_addr, inc_conn_ctx) = xtra::Context::new(None); let (inc_conn_addr, inc_conn_ctx) = xtra::Context::new(None);
let (process_manager_addr, process_manager_ctx) = xtra::Context::new(None);
let mut tasks = Tasks::default(); let mut tasks = Tasks::default();
tasks.add(process_manager_ctx.run(process_manager::Actor::new(
db.clone(),
Role::Maker,
&projection_actor,
)));
let (cfd_actor_addr, cfd_actor_fut) = maker_cfd::Actor::new( let (cfd_actor_addr, cfd_actor_fut) = maker_cfd::Actor::new(
db, db,
wallet_addr.clone(), wallet_addr.clone(),
settlement_interval, settlement_interval,
oracle_pk, oracle_pk,
projection_actor, projection_actor,
process_manager_addr.clone(),
inc_conn_addr.clone(), inc_conn_addr.clone(),
monitor_addr.clone(), monitor_addr.clone(),
oracle_addr.clone(), oracle_addr.clone(),
@ -354,15 +364,23 @@ where
let (monitor_addr, monitor_ctx) = xtra::Context::new(None); let (monitor_addr, monitor_ctx) = xtra::Context::new(None);
let (oracle_addr, oracle_ctx) = xtra::Context::new(None); let (oracle_addr, oracle_ctx) = xtra::Context::new(None);
let (process_manager_addr, process_manager_ctx) = xtra::Context::new(None);
let mut tasks = Tasks::default(); let mut tasks = Tasks::default();
tasks.add(process_manager_ctx.run(process_manager::Actor::new(
db.clone(),
Role::Taker,
&projection_actor,
)));
let (connection_actor_addr, connection_actor_ctx) = xtra::Context::new(None); let (connection_actor_addr, connection_actor_ctx) = xtra::Context::new(None);
let (cfd_actor_addr, cfd_actor_fut) = taker_cfd::Actor::new( let (cfd_actor_addr, cfd_actor_fut) = taker_cfd::Actor::new(
db.clone(), db.clone(),
wallet_actor_addr.clone(), wallet_actor_addr.clone(),
oracle_pk, oracle_pk,
projection_actor.clone(), projection_actor.clone(),
process_manager_addr,
connection_actor_addr.clone(), connection_actor_addr.clone(),
monitor_addr.clone(), monitor_addr.clone(),
oracle_addr.clone(), oracle_addr.clone(),

42
daemon/src/maker_cfd.rs

@ -4,7 +4,6 @@ use crate::cfd_actors;
use crate::cfd_actors::insert_cfd_and_update_feed; use crate::cfd_actors::insert_cfd_and_update_feed;
use crate::cfd_actors::load_cfd; use crate::cfd_actors::load_cfd;
use crate::collab_settlement_maker; use crate::collab_settlement_maker;
use crate::db::append_event;
use crate::maker_inc_connections; use crate::maker_inc_connections;
use crate::model; use crate::model;
use crate::model::cfd::Cfd; use crate::model::cfd::Cfd;
@ -24,6 +23,7 @@ use crate::model::Usd;
use crate::monitor; use crate::monitor;
use crate::monitor::MonitorParams; use crate::monitor::MonitorParams;
use crate::oracle; use crate::oracle;
use crate::process_manager;
use crate::projection; use crate::projection;
use crate::projection::Update; use crate::projection::Update;
use crate::rollover_maker; use crate::rollover_maker;
@ -97,6 +97,7 @@ pub struct Actor<O, M, T, W> {
settlement_interval: Duration, settlement_interval: Duration,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
projection_actor: Address<projection::Actor>, projection_actor: Address<projection::Actor>,
process_manager_actor: Address<process_manager::Actor>,
rollover_actors: AddressMap<OrderId, rollover_maker::Actor>, rollover_actors: AddressMap<OrderId, rollover_maker::Actor>,
takers: Address<T>, takers: Address<T>,
current_order: Option<Order>, current_order: Option<Order>,
@ -117,6 +118,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
settlement_interval: Duration, settlement_interval: Duration,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
projection_actor: Address<projection::Actor>, projection_actor: Address<projection::Actor>,
process_manager_actor: Address<process_manager::Actor>,
takers: Address<T>, takers: Address<T>,
monitor_actor: Address<M>, monitor_actor: Address<M>,
oracle_actor: Address<O>, oracle_actor: Address<O>,
@ -128,6 +130,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
settlement_interval, settlement_interval,
oracle_pk, oracle_pk,
projection_actor, projection_actor,
process_manager_actor,
rollover_actors: AddressMap::default(), rollover_actors: AddressMap::default(),
takers, takers,
current_order: None, current_order: None,
@ -352,7 +355,6 @@ where
.await?; .await?;
self.projection_actor.send(projection::Update(None)).await?; self.projection_actor.send(projection::Update(None)).await?;
insert_cfd_and_update_feed(&cfd, &mut conn, &self.projection_actor).await?; insert_cfd_and_update_feed(&cfd, &mut conn, &self.projection_actor).await?;
// 4. Try to get the oracle announcement, if that fails we should exit prior to changing any // 4. Try to get the oracle announcement, if that fails we should exit prior to changing any
@ -467,8 +469,13 @@ where
let cfd = load_cfd(order_id, &mut conn).await?; let cfd = load_cfd(order_id, &mut conn).await?;
let event = cfd.settle_collaboratively(msg)?; let event = cfd.settle_collaboratively(msg)?;
append_event(event.clone(), &mut conn).await?; if let Err(e) = self
self.projection_actor.send(projection::CfdsChanged).await?; .process_manager_actor
.send(process_manager::Event::new(event.clone()))
.await?
{
tracing::error!("Sending event to process manager failed: {:#}", e);
}
match event.event { match event.event {
CfdEvent::CollaborativeSettlementCompleted { CfdEvent::CollaborativeSettlementCompleted {
@ -529,7 +536,12 @@ where
let Commit { order_id } = msg; let Commit { order_id } = msg;
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(order_id, &mut conn, &self.wallet, &self.projection_actor) cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.process_manager_actor,
)
.await?; .await?;
Ok(()) Ok(())
@ -652,9 +664,13 @@ where
let cfd = load_cfd(order_id, &mut conn).await?; let cfd = load_cfd(order_id, &mut conn).await?;
let event = cfd.setup_contract(msg)?; let event = cfd.setup_contract(msg)?;
append_event(event.clone(), &mut conn).await?; if let Err(e) = self
.process_manager_actor
self.projection_actor.send(projection::CfdsChanged).await?; .send(process_manager::Event::new(event.clone()))
.await?
{
tracing::error!("Sending event to process manager failed: {:#}", e);
}
let dlc = match event.event { let dlc = match event.event {
CfdEvent::ContractSetupCompleted { dlc } => dlc, CfdEvent::ContractSetupCompleted { dlc } => dlc,
@ -730,8 +746,12 @@ where
W: xtra::Handler<wallet::TryBroadcastTransaction>, W: xtra::Handler<wallet::TryBroadcastTransaction>,
{ {
async fn handle_monitor(&mut self, msg: monitor::Event) { async fn handle_monitor(&mut self, msg: monitor::Event) {
if let Err(e) = if let Err(e) = cfd_actors::handle_monitoring_event(
cfd_actors::handle_monitoring_event(msg, &self.db, &self.wallet, &self.projection_actor) msg,
&self.db,
&self.wallet,
&self.process_manager_actor,
)
.await .await
{ {
tracing::error!("Unable to handle monotoring event: {:#}", e) tracing::error!("Unable to handle monotoring event: {:#}", e)
@ -743,7 +763,7 @@ where
msg, msg,
&self.db, &self.db,
&self.wallet, &self.wallet,
&self.projection_actor, &self.process_manager_actor,
) )
.await .await
{ {

55
daemon/src/process_manager.rs

@ -0,0 +1,55 @@
use crate::db::append_event;
use crate::model::cfd;
use crate::model::cfd::Role;
use crate::projection;
use anyhow::Result;
use xtra::prelude::MessageChannel;
use xtra_productivity::xtra_productivity;
pub struct Actor {
db: sqlx::SqlitePool,
_role: Role,
cfds_changed: Box<dyn MessageChannel<projection::CfdsChanged>>,
}
pub struct Event(cfd::Event);
impl Event {
pub fn new(event: cfd::Event) -> Self {
Self(event)
}
}
impl Actor {
pub fn new(
db: sqlx::SqlitePool,
role: Role,
cfds_changed: &(impl MessageChannel<projection::CfdsChanged> + 'static),
) -> Self {
Self {
db,
_role: role,
cfds_changed: cfds_changed.clone_channel(),
}
}
}
#[xtra_productivity]
impl Actor {
fn handle(&mut self, msg: Event) -> Result<()> {
let event = msg.0;
// 1. Safe in DB
let mut conn = self.db.acquire().await?;
append_event(event.clone(), &mut conn).await?;
// TODO: 2. Post-process event by sending out messages
// 3. Update UI
self.cfds_changed.send(projection::CfdsChanged).await?;
Ok(())
}
}
impl xtra::Actor for Actor {}

41
daemon/src/taker_cfd.rs

@ -4,7 +4,6 @@ use crate::cfd_actors::insert_cfd_and_update_feed;
use crate::cfd_actors::load_cfd; use crate::cfd_actors::load_cfd;
use crate::collab_settlement_taker; use crate::collab_settlement_taker;
use crate::connection; use crate::connection;
use crate::db::append_event;
use crate::model::cfd::Cfd; use crate::model::cfd::Cfd;
use crate::model::cfd::CfdEvent; use crate::model::cfd::CfdEvent;
use crate::model::cfd::CollaborativeSettlement; use crate::model::cfd::CollaborativeSettlement;
@ -21,6 +20,7 @@ use crate::model::Usd;
use crate::monitor; use crate::monitor;
use crate::monitor::MonitorParams; use crate::monitor::MonitorParams;
use crate::oracle; use crate::oracle;
use crate::process_manager;
use crate::projection; use crate::projection;
use crate::setup_taker; use crate::setup_taker;
use crate::wallet; use crate::wallet;
@ -55,6 +55,7 @@ pub struct Actor<O, M, W> {
wallet: Address<W>, wallet: Address<W>,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
projection_actor: Address<projection::Actor>, projection_actor: Address<projection::Actor>,
process_manager_actor: Address<process_manager::Actor>,
conn_actor: Address<connection::Actor>, conn_actor: Address<connection::Actor>,
monitor_actor: Address<M>, monitor_actor: Address<M>,
setup_actors: AddressMap<OrderId, setup_taker::Actor>, setup_actors: AddressMap<OrderId, setup_taker::Actor>,
@ -78,6 +79,7 @@ where
wallet: Address<W>, wallet: Address<W>,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
projection_actor: Address<projection::Actor>, projection_actor: Address<projection::Actor>,
process_manager_actor: Address<process_manager::Actor>,
conn_actor: Address<connection::Actor>, conn_actor: Address<connection::Actor>,
monitor_actor: Address<M>, monitor_actor: Address<M>,
oracle_actor: Address<O>, oracle_actor: Address<O>,
@ -89,6 +91,7 @@ where
wallet, wallet,
oracle_pk, oracle_pk,
projection_actor, projection_actor,
process_manager_actor,
conn_actor, conn_actor,
monitor_actor, monitor_actor,
oracle_actor, oracle_actor,
@ -112,7 +115,12 @@ where
let Commit { order_id } = msg; let Commit { order_id } = msg;
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(order_id, &mut conn, &self.wallet, &self.projection_actor) cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.process_manager_actor,
)
.await?; .await?;
Ok(()) Ok(())
} }
@ -171,8 +179,13 @@ where
let cfd = load_cfd(order_id, &mut conn).await?; let cfd = load_cfd(order_id, &mut conn).await?;
let event = cfd.settle_collaboratively(msg)?; let event = cfd.settle_collaboratively(msg)?;
append_event(event.clone(), &mut conn).await?; if let Err(e) = self
self.projection_actor.send(projection::CfdsChanged).await?; .process_manager_actor
.send(process_manager::Event::new(event.clone()))
.await?
{
tracing::error!("Sending event to process manager failed: {:#}", e);
}
match event.event { match event.event {
CfdEvent::CollaborativeSettlementCompleted { CfdEvent::CollaborativeSettlementCompleted {
@ -330,9 +343,13 @@ where
let cfd = load_cfd(order_id, &mut conn).await?; let cfd = load_cfd(order_id, &mut conn).await?;
let event = cfd.setup_contract(msg)?; let event = cfd.setup_contract(msg)?;
append_event(event.clone(), &mut conn).await?; if let Err(e) = self
.process_manager_actor
self.projection_actor.send(projection::CfdsChanged).await?; .send(process_manager::Event::new(event.clone()))
.await?
{
tracing::error!("Sending event to process manager failed: {:#}", e);
}
let dlc = match event.event { let dlc = match event.event {
CfdEvent::ContractSetupCompleted { dlc } => dlc, CfdEvent::ContractSetupCompleted { dlc } => dlc,
@ -395,8 +412,12 @@ where
W: xtra::Handler<wallet::TryBroadcastTransaction>, W: xtra::Handler<wallet::TryBroadcastTransaction>,
{ {
async fn handle_monitor(&mut self, msg: monitor::Event) { async fn handle_monitor(&mut self, msg: monitor::Event) {
if let Err(e) = if let Err(e) = cfd_actors::handle_monitoring_event(
cfd_actors::handle_monitoring_event(msg, &self.db, &self.wallet, &self.projection_actor) msg,
&self.db,
&self.wallet,
&self.process_manager_actor,
)
.await .await
{ {
tracing::error!("Unable to handle monotoring event: {:#}", e) tracing::error!("Unable to handle monotoring event: {:#}", e)
@ -408,7 +429,7 @@ where
msg, msg,
&self.db, &self.db,
&self.wallet, &self.wallet,
&self.projection_actor, &self.process_manager_actor,
) )
.await .await
{ {

Loading…
Cancel
Save