Browse Source

Merge #967

967: Introduce process manager actor r=da-kami a=da-kami

Resolves #956

1. Event Saving: Only the process manager saves events into the dabase. The initial CFD creation (prior to events) is done in the cfd actors.
2. Event post-processing: Only the process manager handles post-processing of events
3. UI updates through projection: Only the process manager updates the UI after an event is generated. (Note: Upon initial cfd creation the UI is updated through the cfd actor when inserting the cfd; Upon startup the projection actor updates itself)

As a result of this the cfd actor now does not know about the monitor anymore; and furthermore we don't have to keep bounds for wallet/oracle on most of the function calls that are handled by the process manager. This massively reduces the trait bounds and impl blocks in the cfd actors.

Co-authored-by: Daniel Karzel <daniel@comit.network>
master preview
bors[bot] 3 years ago
committed by GitHub
parent
commit
61c7bcbb32
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 97
      daemon/src/cfd_actors.rs
  2. 60
      daemon/src/lib.rs
  3. 320
      daemon/src/maker_cfd.rs
  4. 2
      daemon/src/monitor.rs
  5. 177
      daemon/src/process_manager.rs
  6. 4
      daemon/src/routes_maker.rs
  7. 3
      daemon/src/routes_taker.rs
  8. 249
      daemon/src/taker_cfd.rs
  9. 6
      daemon/tests/harness/mod.rs

97
daemon/src/cfd_actors.rs

@ -1,14 +1,11 @@
use crate::db;
use crate::model::cfd::Cfd;
use crate::model::cfd::CfdEvent;
use crate::model::cfd::Event;
use crate::model::cfd::OrderId;
use crate::monitor;
use crate::oracle;
use crate::process_manager;
use crate::projection;
use crate::projection::CfdsChanged;
use crate::try_continue;
use crate::wallet;
use anyhow::Context;
use anyhow::Result;
use sqlx::pool::PoolConnection;
@ -25,15 +22,11 @@ pub async fn insert_cfd_and_update_feed(
Ok(())
}
pub async fn handle_monitoring_event<W>(
pub async fn handle_monitoring_event(
event: monitor::Event,
db: &SqlitePool,
wallet: &xtra::Address<W>,
projection_address: &xtra::Address<projection::Actor>,
) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
process_manager: &xtra::Address<process_manager::Actor>,
) -> Result<()> {
let mut conn = db.acquire().await?;
let order_id = event.order_id();
@ -57,9 +50,12 @@ where
monitor::Event::RevokedTransactionFound(_) => cfd.handle_revoke_confirmed(),
};
db::append_event(event.clone(), &mut conn).await?;
post_process_event(event, wallet).await?;
projection_address.send(CfdsChanged).await?;
if let Err(e) = process_manager
.send(process_manager::Event::new(event.clone()))
.await?
{
tracing::error!("Sending event to process manager failed: {:#}", e);
}
Ok(())
}
@ -93,36 +89,29 @@ pub async fn load_cfd(order_id: OrderId, conn: &mut PoolConnection<Sqlite>) -> R
Ok(cfd)
}
pub async fn handle_commit<W>(
pub async fn handle_commit(
order_id: OrderId,
conn: &mut PoolConnection<Sqlite>,
wallet: &xtra::Address<W>,
projection_address: &xtra::Address<projection::Actor>,
) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
process_manager: &xtra::Address<process_manager::Actor>,
) -> Result<()> {
let cfd = load_cfd(order_id, conn).await?;
let event = cfd.manual_commit_to_blockchain()?;
db::append_event(event.clone(), conn).await?;
post_process_event(event, wallet).await?;
projection_address.send(CfdsChanged).await?;
if let Err(e) = process_manager
.send(process_manager::Event::new(event.clone()))
.await?
{
tracing::error!("Sending event to process manager failed: {:#}", e);
}
Ok(())
}
pub async fn handle_oracle_attestation<W>(
pub async fn handle_oracle_attestation(
attestation: oracle::Attestation,
db: &SqlitePool,
wallet: &xtra::Address<W>,
projection_address: &xtra::Address<projection::Actor>,
) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
process_manager: &xtra::Address<process_manager::Actor>,
) -> Result<()> {
let mut conn = db.acquire().await?;
tracing::debug!(
@ -136,45 +125,15 @@ where
.decrypt_cet(&attestation)
.context("Failed to decrypt CET using attestation"));
try_continue!(db::append_event(event.clone(), &mut conn)
.await
.context("Failed to append events"));
if let Some(event) = event {
try_continue!(post_process_event(event, wallet).await)
}
}
projection_address.send(CfdsChanged).await?;
Ok(())
}
async fn post_process_event<W>(event: Event, wallet: &xtra::Address<W>) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
match event.event {
CfdEvent::OracleAttestedPostCetTimelock { cet, .. }
| CfdEvent::CetTimelockConfirmedPostOracleAttestation { cet } => {
let txid = wallet
.send(wallet::TryBroadcastTransaction { tx: cet })
// Note: ? OK, because if the actor is disconnected we can fail the loop
if let Err(e) = process_manager
.send(process_manager::Event::new(event.clone()))
.await?
.context("Failed to broadcast CET")?;
tracing::info!(%txid, "CET published");
}
CfdEvent::OracleAttestedPriorCetTimelock { commit_tx: tx, .. }
| CfdEvent::ManualCommit { tx } => {
let txid = wallet
.send(wallet::TryBroadcastTransaction { tx })
.await?
.context("Failed to broadcast commit transaction")?;
tracing::info!(%txid, "Commit transaction published");
{
tracing::error!("Sending event to process manager failed: {:#}", e);
}
}
_ => {}
}
Ok(())

60
daemon/src/lib.rs

@ -6,6 +6,7 @@ use crate::maker_cfd::TakerConnected;
use crate::model::cfd::Cfd;
use crate::model::cfd::Order;
use crate::model::cfd::OrderId;
use crate::model::cfd::Role;
use crate::model::Identity;
use crate::model::Price;
use crate::model::Usd;
@ -54,6 +55,7 @@ mod noise;
pub mod olivia;
pub mod oracle;
pub mod payout_curve;
pub mod process_manager;
pub mod projection;
pub mod rollover_maker;
pub mod rollover_taker;
@ -109,22 +111,18 @@ impl Tasks {
}
}
pub struct MakerActorSystem<O, M, T, W> {
pub cfd_actor_addr: Address<maker_cfd::Actor<O, M, T, W>>,
pub struct MakerActorSystem<O, T, W> {
pub cfd_actor_addr: Address<maker_cfd::Actor<O, T, W>>,
wallet_actor_addr: Address<W>,
inc_conn_addr: Address<T>,
_tasks: Tasks,
}
impl<O, M, T, W> MakerActorSystem<O, M, T, W>
impl<O, T, W> MakerActorSystem<O, T, W>
where
O: xtra::Handler<oracle::MonitorAttestation>
+ xtra::Handler<oracle::GetAnnouncement>
+ xtra::Handler<oracle::Sync>,
M: xtra::Handler<monitor::StartMonitoring>
+ xtra::Handler<monitor::Sync>
+ xtra::Handler<monitor::CollaborativeSettlement>
+ xtra::Handler<oracle::Attestation>,
T: xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>
+ xtra::Handler<maker_inc_connections::ConfirmOrder>
@ -140,7 +138,7 @@ where
+ xtra::Handler<wallet::Withdraw>,
{
#[allow(clippy::too_many_arguments)]
pub async fn new<FO, FM>(
pub async fn new<FO, FM, M>(
db: SqlitePool,
wallet_addr: Address<W>,
oracle_pk: schnorrsig::PublicKey,
@ -156,23 +154,38 @@ where
projection_actor: Address<projection::Actor>,
) -> Result<Self>
where
M: xtra::Handler<monitor::StartMonitoring>
+ xtra::Handler<monitor::Sync>
+ xtra::Handler<monitor::CollaborativeSettlement>
+ xtra::Handler<oracle::Attestation>,
FO: Future<Output = Result<O>>,
FM: Future<Output = Result<M>>,
{
let (monitor_addr, monitor_ctx) = xtra::Context::new(None);
let (oracle_addr, oracle_ctx) = xtra::Context::new(None);
let (inc_conn_addr, inc_conn_ctx) = xtra::Context::new(None);
let (process_manager_addr, process_manager_ctx) = xtra::Context::new(None);
let mut tasks = Tasks::default();
tasks.add(process_manager_ctx.run(process_manager::Actor::new(
db.clone(),
Role::Maker,
&projection_actor,
&wallet_addr,
&monitor_addr,
&monitor_addr,
&oracle_addr,
)));
let (cfd_actor_addr, cfd_actor_fut) = maker_cfd::Actor::new(
db,
wallet_addr.clone(),
settlement_interval,
oracle_pk,
projection_actor,
process_manager_addr.clone(),
inc_conn_addr.clone(),
monitor_addr.clone(),
oracle_addr.clone(),
n_payouts,
)
@ -308,23 +321,19 @@ where
}
}
pub struct TakerActorSystem<O, M, W> {
pub cfd_actor_addr: Address<taker_cfd::Actor<O, M, W>>,
pub struct TakerActorSystem<O, W> {
pub cfd_actor_addr: Address<taker_cfd::Actor<O, W>>,
pub connection_actor_addr: Address<connection::Actor>,
pub maker_online_status_feed_receiver: watch::Receiver<ConnectionStatus>,
wallet_actor_addr: Address<W>,
_tasks: Tasks,
}
impl<O, M, W> TakerActorSystem<O, M, W>
impl<O, W> TakerActorSystem<O, W>
where
O: xtra::Handler<oracle::MonitorAttestation>
+ xtra::Handler<oracle::GetAnnouncement>
+ xtra::Handler<oracle::Sync>,
M: xtra::Handler<monitor::StartMonitoring>
+ xtra::Handler<monitor::Sync>
+ xtra::Handler<monitor::CollaborativeSettlement>
+ xtra::Handler<oracle::Attestation>,
W: xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::TryBroadcastTransaction>
@ -332,7 +341,7 @@ where
+ xtra::Handler<wallet::Reinitialise>,
{
#[allow(clippy::too_many_arguments)]
pub async fn new<FM, FO>(
pub async fn new<FM, FO, M>(
db: SqlitePool,
wallet_actor_addr: Address<W>,
oracle_pk: schnorrsig::PublicKey,
@ -346,6 +355,10 @@ where
maker_identity: Identity,
) -> Result<Self>
where
M: xtra::Handler<monitor::StartMonitoring>
+ xtra::Handler<monitor::Sync>
+ xtra::Handler<monitor::CollaborativeSettlement>
+ xtra::Handler<oracle::Attestation>,
FO: Future<Output = Result<O>>,
FM: Future<Output = Result<M>>,
{
@ -354,17 +367,28 @@ where
let (monitor_addr, monitor_ctx) = xtra::Context::new(None);
let (oracle_addr, oracle_ctx) = xtra::Context::new(None);
let (process_manager_addr, process_manager_ctx) = xtra::Context::new(None);
let mut tasks = Tasks::default();
tasks.add(process_manager_ctx.run(process_manager::Actor::new(
db.clone(),
Role::Taker,
&projection_actor,
&wallet_actor_addr,
&monitor_addr,
&monitor_addr,
&oracle_addr,
)));
let (connection_actor_addr, connection_actor_ctx) = xtra::Context::new(None);
let (cfd_actor_addr, cfd_actor_fut) = taker_cfd::Actor::new(
db.clone(),
wallet_actor_addr.clone(),
oracle_pk,
projection_actor.clone(),
process_manager_addr,
connection_actor_addr.clone(),
monitor_addr.clone(),
oracle_addr.clone(),
n_payouts,
maker_identity,

320
daemon/src/maker_cfd.rs

@ -4,11 +4,9 @@ use crate::cfd_actors;
use crate::cfd_actors::insert_cfd_and_update_feed;
use crate::cfd_actors::load_cfd;
use crate::collab_settlement_maker;
use crate::db::append_event;
use crate::maker_inc_connections;
use crate::model;
use crate::model::cfd::Cfd;
use crate::model::cfd::CfdEvent;
use crate::model::cfd::CollaborativeSettlement;
use crate::model::cfd::Order;
use crate::model::cfd::OrderId;
@ -22,8 +20,8 @@ use crate::model::Position;
use crate::model::Price;
use crate::model::Usd;
use crate::monitor;
use crate::monitor::MonitorParams;
use crate::oracle;
use crate::process_manager;
use crate::projection;
use crate::projection::Update;
use crate::rollover_maker;
@ -34,7 +32,6 @@ use crate::wallet;
use crate::wire;
use crate::wire::TakerToMaker;
use crate::Tasks;
use anyhow::bail;
use anyhow::Context as _;
use anyhow::Result;
use async_trait::async_trait;
@ -91,16 +88,16 @@ pub struct FromTaker {
pub msg: wire::TakerToMaker,
}
pub struct Actor<O, M, T, W> {
pub struct Actor<O, T, W> {
db: sqlx::SqlitePool,
wallet: Address<W>,
settlement_interval: Duration,
oracle_pk: schnorrsig::PublicKey,
projection_actor: Address<projection::Actor>,
process_manager_actor: Address<process_manager::Actor>,
rollover_actors: AddressMap<OrderId, rollover_maker::Actor>,
takers: Address<T>,
current_order: Option<Order>,
monitor_actor: Address<M>,
setup_actors: AddressMap<OrderId, setup_maker::Actor>,
settlement_actors: AddressMap<OrderId, collab_settlement_maker::Actor>,
oracle_actor: Address<O>,
@ -109,7 +106,7 @@ pub struct Actor<O, M, T, W> {
tasks: Tasks,
}
impl<O, M, T, W> Actor<O, M, T, W> {
impl<O, T, W> Actor<O, T, W> {
#[allow(clippy::too_many_arguments)]
pub fn new(
db: sqlx::SqlitePool,
@ -117,8 +114,8 @@ impl<O, M, T, W> Actor<O, M, T, W> {
settlement_interval: Duration,
oracle_pk: schnorrsig::PublicKey,
projection_actor: Address<projection::Actor>,
process_manager_actor: Address<process_manager::Actor>,
takers: Address<T>,
monitor_actor: Address<M>,
oracle_actor: Address<O>,
n_payouts: usize,
) -> Self {
@ -128,10 +125,10 @@ impl<O, M, T, W> Actor<O, M, T, W> {
settlement_interval,
oracle_pk,
projection_actor,
process_manager_actor,
rollover_actors: AddressMap::default(),
takers,
current_order: None,
monitor_actor,
setup_actors: AddressMap::default(),
oracle_actor,
n_payouts,
@ -154,7 +151,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
}
}
impl<O, M, T, W> Actor<O, M, T, W>
impl<O, T, W> Actor<O, T, W>
where
T: xtra::Handler<maker_inc_connections::TakerMessage>,
{
@ -182,39 +179,9 @@ where
}
}
#[xtra_productivity]
impl<O, M, T, W> Actor<O, M, T, W> {
async fn handle_accept_rollover(&mut self, msg: AcceptRollOver) -> Result<()> {
if self
.rollover_actors
.send(&msg.order_id, rollover_maker::AcceptRollOver)
.await
.is_err()
{
tracing::warn!(%msg.order_id, "No active rollover");
}
Ok(())
}
async fn handle_reject_rollover(&mut self, msg: RejectRollOver) -> Result<()> {
if self
.rollover_actors
.send(&msg.order_id, rollover_maker::RejectRollOver)
.await
.is_err()
{
tracing::warn!(%msg.order_id, "No active rollover");
}
Ok(())
}
}
impl<O, M, T, W> Actor<O, M, T, W>
impl<O, T, W> Actor<O, T, W>
where
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
T: xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<Stopping<rollover_maker::Actor>>
+ xtra::Handler<RollOverProposed>,
@ -272,17 +239,9 @@ where
}
}
#[xtra_productivity(message_impl = false)]
impl<O, M, T, W> Actor<O, M, T, W> {
async fn handle_rollover_actor_stopping(&mut self, msg: Stopping<rollover_maker::Actor>) {
self.rollover_actors.gc(msg);
}
}
impl<O, M, T, W> Actor<O, M, T, W>
impl<O, T, W> Actor<O, T, W>
where
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
T: xtra::Handler<maker_inc_connections::ConfirmOrder>
+ xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>
@ -352,7 +311,6 @@ where
.await?;
self.projection_actor.send(projection::Update(None)).await?;
insert_cfd_and_update_feed(&cfd, &mut conn, &self.projection_actor).await?;
// 4. Try to get the oracle announcement, if that fails we should exit prior to changing any
@ -388,7 +346,7 @@ where
}
#[xtra_productivity]
impl<O, M, T, W> Actor<O, M, T, W> {
impl<O, T, W> Actor<O, T, W> {
async fn handle_accept_order(&mut self, msg: AcceptOrder) -> Result<()> {
let AcceptOrder { order_id } = msg;
@ -436,28 +394,66 @@ impl<O, M, T, W> Actor<O, M, T, W> {
Ok(())
}
}
#[xtra_productivity(message_impl = false)]
impl<O, M, T, W> Actor<O, M, T, W> {
async fn handle_setup_actor_stopping(&mut self, message: Stopping<setup_maker::Actor>) {
self.setup_actors.gc(message);
async fn handle_accept_rollover(&mut self, msg: AcceptRollOver) -> Result<()> {
if self
.rollover_actors
.send(&msg.order_id, rollover_maker::AcceptRollOver)
.await
.is_err()
{
tracing::warn!(%msg.order_id, "No active rollover");
}
Ok(())
}
async fn handle_settlement_actor_stopping(
&mut self,
message: Stopping<collab_settlement_maker::Actor>,
) {
self.settlement_actors.gc(message);
async fn handle_reject_rollover(&mut self, msg: RejectRollOver) -> Result<()> {
if self
.rollover_actors
.send(&msg.order_id, rollover_maker::RejectRollOver)
.await
.is_err()
{
tracing::warn!(%msg.order_id, "No active rollover");
}
Ok(())
}
async fn handle_commit(&mut self, msg: Commit) -> Result<()> {
let Commit { order_id } = msg;
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(order_id, &mut conn, &self.process_manager_actor).await?;
Ok(())
}
}
#[xtra_productivity(message_impl = false)]
impl<O, M, T, W> Actor<O, M, T, W>
where
M: xtra::Handler<monitor::CollaborativeSettlement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
impl<O, T, W> Actor<O, T, W> {
async fn handle_setup_completed(&mut self, msg: SetupCompleted) -> Result<()> {
let order_id = msg.order_id();
let mut conn = self.db.acquire().await?;
let cfd = load_cfd(order_id, &mut conn).await?;
let event = cfd.setup_contract(msg)?;
if let Err(e) = self
.process_manager_actor
.send(process_manager::Event::new(event.clone()))
.await?
{
tracing::error!("Sending event to process manager failed: {:#}", e);
}
Ok(())
}
async fn handle_setup_actor_stopping(&mut self, message: Stopping<setup_maker::Actor>) {
self.setup_actors.gc(message);
}
async fn handle_settlement_completed(
&mut self,
msg: model::cfd::Completed<CollaborativeSettlement>,
@ -467,78 +463,47 @@ where
let cfd = load_cfd(order_id, &mut conn).await?;
let event = cfd.settle_collaboratively(msg)?;
append_event(event.clone(), &mut conn).await?;
self.projection_actor.send(projection::CfdsChanged).await?;
match event.event {
CfdEvent::CollaborativeSettlementCompleted {
spend_tx, script, ..
} => {
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction { tx: spend_tx })
.await?
.context("Broadcasting close transaction")?;
tracing::info!(%order_id, "Close transaction published with txid {}", txid);
self.monitor_actor
.send(monitor::CollaborativeSettlement {
order_id,
tx: (txid, script),
})
.await?;
}
CfdEvent::CollaborativeSettlementRejected { commit_tx } => {
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction { tx: commit_tx })
.await?
.context("Broadcasting commit transaction")?;
tracing::info!(
"Closing non-collaboratively. Commit tx published with txid {}",
txid
)
}
CfdEvent::CollaborativeSettlementFailed { commit_tx } => {
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction { tx: commit_tx })
.await?
.context("Broadcasting commit transaction")?;
tracing::warn!(
"Closing non-collaboratively. Commit tx published with txid {}",
txid
)
}
_ => bail!("Unexpected event {:?}", event.event),
if let Err(e) = self
.process_manager_actor
.send(process_manager::Event::new(event.clone()))
.await?
{
tracing::error!("Sending event to process manager failed: {:#}", e);
}
Ok(())
}
}
#[xtra_productivity]
impl<O, M, T, W> Actor<O, M, T, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_commit(&mut self, msg: Commit) -> Result<()> {
let Commit { order_id } = msg;
async fn handle_settlement_actor_stopping(
&mut self,
message: Stopping<collab_settlement_maker::Actor>,
) {
self.settlement_actors.gc(message);
}
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(order_id, &mut conn, &self.wallet, &self.projection_actor)
.await?;
async fn handle_monitor(&mut self, msg: monitor::Event) {
if let Err(e) =
cfd_actors::handle_monitoring_event(msg, &self.db, &self.process_manager_actor).await
{
tracing::error!("Unable to handle monotoring event: {:#}", e)
}
}
Ok(())
async fn handle_attestation(&mut self, msg: oracle::Attestation) {
if let Err(e) =
cfd_actors::handle_oracle_attestation(msg, &self.db, &self.process_manager_actor).await
{
tracing::warn!("Failed to handle oracle attestation: {:#}", e)
}
}
async fn handle_rollover_actor_stopping(&mut self, msg: Stopping<rollover_maker::Actor>) {
self.rollover_actors.gc(msg);
}
}
impl<O, M, T, W> Actor<O, M, T, W>
impl<O, T, W> Actor<O, T, W>
where
M: xtra::Handler<monitor::StartMonitoring>,
O: xtra::Handler<oracle::MonitorAttestation>,
{
async fn handle_roll_over_completed(&mut self, _: Completed) -> Result<()> {
@ -548,10 +513,9 @@ where
}
}
impl<O, M, T, W> Actor<O, M, T, W>
impl<O, T, W> Actor<O, T, W>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring> + xtra::Handler<monitor::CollaborativeSettlement>,
T: xtra::Handler<maker_inc_connections::settlement::Response>
+ xtra::Handler<Stopping<collab_settlement_maker::Actor>>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
@ -596,7 +560,7 @@ where
}
#[xtra_productivity]
impl<O, M, T, W> Actor<O, M, T, W>
impl<O, T, W> Actor<O, T, W>
where
T: xtra::Handler<maker_inc_connections::BroadcastOrder>,
{
@ -639,61 +603,8 @@ where
}
}
#[xtra_productivity(message_impl = false)]
impl<O, M, T, W> Actor<O, M, T, W>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_setup_completed(&mut self, msg: SetupCompleted) -> Result<()> {
let order_id = msg.order_id();
let mut conn = self.db.acquire().await?;
let cfd = load_cfd(order_id, &mut conn).await?;
let event = cfd.setup_contract(msg)?;
append_event(event.clone(), &mut conn).await?;
self.projection_actor.send(projection::CfdsChanged).await?;
let dlc = match event.event {
CfdEvent::ContractSetupCompleted { dlc } => dlc,
CfdEvent::OfferRejected | CfdEvent::ContractSetupFailed => {
return Ok(());
}
_ => bail!("Unexpected event {:?}", event.event),
};
tracing::info!("Setup complete, publishing on chain now");
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction {
tx: dlc.lock.0.clone(),
})
.await??;
tracing::info!("Lock transaction published with txid {}", txid);
self.monitor_actor
.send(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::new(dlc.clone()),
})
.await?;
self.oracle_actor
.send(oracle::MonitorAttestation {
event_id: dlc.settlement_event_id,
})
.await?;
Ok(())
}
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<TakerConnected> for Actor<O, M, T, W>
impl<O: 'static, T: 'static, W: 'static> Handler<TakerConnected> for Actor<O, T, W>
where
T: xtra::Handler<maker_inc_connections::TakerMessage>,
{
@ -703,8 +614,7 @@ where
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<TakerDisconnected>
for Actor<O, M, T, W>
impl<O: 'static, T: 'static, W: 'static> Handler<TakerDisconnected> for Actor<O, T, W>
where
T: xtra::Handler<maker_inc_connections::TakerMessage>,
{
@ -714,9 +624,8 @@ where
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<Completed> for Actor<O, M, T, W>
impl<O: 'static, T: 'static, W: 'static> Handler<Completed> for Actor<O, T, W>
where
M: xtra::Handler<monitor::StartMonitoring>,
O: xtra::Handler<oracle::MonitorAttestation>,
{
async fn handle(&mut self, msg: Completed, _ctx: &mut Context<Self>) -> Result<()> {
@ -724,39 +633,10 @@ where
}
}
#[xtra_productivity(message_impl = false)]
impl<O, M, T, W> Actor<O, M, T, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_monitor(&mut self, msg: monitor::Event) {
if let Err(e) =
cfd_actors::handle_monitoring_event(msg, &self.db, &self.wallet, &self.projection_actor)
.await
{
tracing::error!("Unable to handle monotoring event: {:#}", e)
}
}
async fn handle_attestation(&mut self, msg: oracle::Attestation) {
if let Err(e) = cfd_actors::handle_oracle_attestation(
msg,
&self.db,
&self.wallet,
&self.projection_actor,
)
.await
{
tracing::warn!("Failed to handle oracle attestation: {:#}", e)
}
}
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<FromTaker> for Actor<O, M, T, W>
impl<O: 'static, T: 'static, W: 'static> Handler<FromTaker> for Actor<O, T, W>
where
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring> + xtra::Handler<monitor::CollaborativeSettlement>,
T: xtra::Handler<maker_inc_connections::ConfirmOrder>
+ xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>
@ -859,4 +739,4 @@ impl Message for FromTaker {
type Result = ();
}
impl<O: 'static, M: 'static, T: 'static, W: 'static> xtra::Actor for Actor<O, M, T, W> {}
impl<O: 'static, T: 'static, W: 'static> xtra::Actor for Actor<O, T, W> {}

2
daemon/src/monitor.rs

@ -100,7 +100,7 @@ impl Cfd {
// might become relevant. See also https://github.com/itchysats/itchysats/issues/605 and https://github.com/itchysats/itchysats/issues/236.
fn apply(self, event: cfd::Event) -> Self {
match event.event {
CfdEvent::ContractSetupCompleted { dlc } => Self {
CfdEvent::ContractSetupCompleted { dlc, .. } => Self {
params: Some(MonitorParams::new(dlc)),
monitor_lock_finality: true,
monitor_commit_finality: true,

177
daemon/src/process_manager.rs

@ -0,0 +1,177 @@
use crate::db::append_event;
use crate::model::cfd;
use crate::model::cfd::CfdEvent;
use crate::model::cfd::Role;
use crate::monitor;
use crate::monitor::MonitorParams;
use crate::oracle;
use crate::projection;
use crate::wallet;
use anyhow::Context;
use anyhow::Result;
use xtra::prelude::MessageChannel;
use xtra_productivity::xtra_productivity;
pub struct Actor {
db: sqlx::SqlitePool,
role: Role,
cfds_changed: Box<dyn MessageChannel<projection::CfdsChanged>>,
try_broadcast_transaction: Box<dyn MessageChannel<wallet::TryBroadcastTransaction>>,
start_monitoring: Box<dyn MessageChannel<monitor::StartMonitoring>>,
monitor_collaborative_settlement: Box<dyn MessageChannel<monitor::CollaborativeSettlement>>,
monitor_attestation: Box<dyn MessageChannel<oracle::MonitorAttestation>>,
}
pub struct Event(cfd::Event);
impl Event {
pub fn new(event: cfd::Event) -> Self {
Self(event)
}
}
impl Actor {
pub fn new(
db: sqlx::SqlitePool,
role: Role,
cfds_changed: &(impl MessageChannel<projection::CfdsChanged> + 'static),
try_broadcast_transaction: &(impl MessageChannel<wallet::TryBroadcastTransaction> + 'static),
start_monitoring: &(impl MessageChannel<monitor::StartMonitoring> + 'static),
monitor_collaborative_settlement: &(impl MessageChannel<monitor::CollaborativeSettlement>
+ 'static),
monitor_attestation: &(impl MessageChannel<oracle::MonitorAttestation> + 'static),
) -> Self {
Self {
db,
role,
cfds_changed: cfds_changed.clone_channel(),
try_broadcast_transaction: try_broadcast_transaction.clone_channel(),
start_monitoring: start_monitoring.clone_channel(),
monitor_collaborative_settlement: monitor_collaborative_settlement.clone_channel(),
monitor_attestation: monitor_attestation.clone_channel(),
}
}
}
#[xtra_productivity]
impl Actor {
fn handle(&mut self, msg: Event) -> Result<()> {
let event = msg.0;
// 1. Safe in DB
let mut conn = self.db.acquire().await?;
append_event(event.clone(), &mut conn).await?;
// 2. Post process event
match event.event {
CfdEvent::ContractSetupCompleted { dlc } => {
tracing::info!("Setup complete, publishing on chain now");
let lock_tx = dlc.lock.0.clone();
let txid = self
.try_broadcast_transaction
.send(wallet::TryBroadcastTransaction { tx: lock_tx })
.await??;
tracing::info!("Lock transaction published with txid {}", txid);
self.start_monitoring
.send(monitor::StartMonitoring {
id: event.id,
params: MonitorParams::new(dlc.clone()),
})
.await?;
self.monitor_attestation
.send(oracle::MonitorAttestation {
event_id: dlc.settlement_event_id,
})
.await?;
}
CfdEvent::CollaborativeSettlementCompleted {
spend_tx, script, ..
} => {
let txid = match self.role {
Role::Maker => {
let txid = self
.try_broadcast_transaction
.send(wallet::TryBroadcastTransaction { tx: spend_tx })
.await?
.context("Broadcasting close transaction")?;
tracing::info!(order_id=%event.id, "Close transaction published with txid {}", txid);
txid
}
Role::Taker => {
// TODO: Publish the tx once the collaborative settlement is symmetric,
// allowing the taker to publish as well.
let txid = spend_tx.txid();
tracing::info!(order_id=%event.id, "Collaborative settlement completed successfully {}", txid);
txid
}
};
self.monitor_collaborative_settlement
.send(monitor::CollaborativeSettlement {
order_id: event.id,
tx: (txid, script),
})
.await?;
}
CfdEvent::CollaborativeSettlementRejected { commit_tx } => {
let txid = self
.try_broadcast_transaction
.send(wallet::TryBroadcastTransaction { tx: commit_tx })
.await?
.context("Broadcasting commit transaction")?;
tracing::info!(
"Closing non-collaboratively. Commit tx published with txid {}",
txid
)
}
CfdEvent::CollaborativeSettlementFailed { commit_tx } => {
let txid = self
.try_broadcast_transaction
.send(wallet::TryBroadcastTransaction { tx: commit_tx })
.await?
.context("Broadcasting commit transaction")?;
tracing::warn!(
"Closing non-collaboratively. Commit tx published with txid {}",
txid
)
}
CfdEvent::OracleAttestedPostCetTimelock { cet, .. }
| CfdEvent::CetTimelockConfirmedPostOracleAttestation { cet } => {
let txid = self
.try_broadcast_transaction
.send(wallet::TryBroadcastTransaction { tx: cet })
.await?
.context("Failed to broadcast CET")?;
tracing::info!(%txid, "CET published");
}
CfdEvent::OracleAttestedPriorCetTimelock { commit_tx: tx, .. }
| CfdEvent::ManualCommit { tx } => {
let txid = self
.try_broadcast_transaction
.send(wallet::TryBroadcastTransaction { tx })
.await?
.context("Failed to broadcast commit transaction")?;
tracing::info!(%txid, "Commit transaction published");
}
_ => {} // TODO: Monitor post processing for rollover
}
// 3. Update UI
self.cfds_changed.send(projection::CfdsChanged).await?;
Ok(())
}
}
impl xtra::Actor for Actor {}

4
daemon/src/routes_maker.rs

@ -7,7 +7,6 @@ use daemon::model::Identity;
use daemon::model::Price;
use daemon::model::Usd;
use daemon::model::WalletInfo;
use daemon::monitor;
use daemon::oracle;
use daemon::projection::Cfd;
use daemon::projection::CfdAction;
@ -33,8 +32,7 @@ use std::path::PathBuf;
use tokio::select;
use tokio::sync::watch;
pub type Maker =
MakerActorSystem<oracle::Actor, monitor::Actor, maker_inc_connections::Actor, wallet::Actor>;
pub type Maker = MakerActorSystem<oracle::Actor, maker_inc_connections::Actor, wallet::Actor>;
#[allow(clippy::too_many_arguments)]
#[rocket::get("/feed")]

3
daemon/src/routes_taker.rs

@ -8,7 +8,6 @@ use daemon::model::Leverage;
use daemon::model::Price;
use daemon::model::Usd;
use daemon::model::WalletInfo;
use daemon::monitor;
use daemon::oracle;
use daemon::projection;
use daemon::projection::CfdAction;
@ -34,7 +33,7 @@ use std::path::PathBuf;
use tokio::select;
use tokio::sync::watch;
type Taker = TakerActorSystem<oracle::Actor, monitor::Actor, wallet::Actor>;
type Taker = TakerActorSystem<oracle::Actor, wallet::Actor>;
#[rocket::get("/feed")]
pub async fn feed(

249
daemon/src/taker_cfd.rs

@ -4,9 +4,7 @@ use crate::cfd_actors::insert_cfd_and_update_feed;
use crate::cfd_actors::load_cfd;
use crate::collab_settlement_taker;
use crate::connection;
use crate::db::append_event;
use crate::model::cfd::Cfd;
use crate::model::cfd::CfdEvent;
use crate::model::cfd::CollaborativeSettlement;
use crate::model::cfd::Completed;
use crate::model::cfd::Order;
@ -19,16 +17,14 @@ use crate::model::Position;
use crate::model::Price;
use crate::model::Usd;
use crate::monitor;
use crate::monitor::MonitorParams;
use crate::oracle;
use crate::process_manager;
use crate::projection;
use crate::setup_taker;
use crate::wallet;
use crate::Tasks;
use anyhow::bail;
use anyhow::Context as _;
use anyhow::Result;
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
use xtra::prelude::*;
use xtra::Actor as _;
@ -50,13 +46,13 @@ pub struct Commit {
pub order_id: OrderId,
}
pub struct Actor<O, M, W> {
pub struct Actor<O, W> {
db: sqlx::SqlitePool,
wallet: Address<W>,
oracle_pk: schnorrsig::PublicKey,
projection_actor: Address<projection::Actor>,
process_manager_actor: Address<process_manager::Actor>,
conn_actor: Address<connection::Actor>,
monitor_actor: Address<M>,
setup_actors: AddressMap<OrderId, setup_taker::Actor>,
collab_settlement_actors: AddressMap<OrderId, collab_settlement_taker::Actor>,
oracle_actor: Address<O>,
@ -66,7 +62,7 @@ pub struct Actor<O, M, W> {
maker_identity: Identity,
}
impl<O, M, W> Actor<O, M, W>
impl<O, W> Actor<O, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
@ -78,8 +74,8 @@ where
wallet: Address<W>,
oracle_pk: schnorrsig::PublicKey,
projection_actor: Address<projection::Actor>,
process_manager_actor: Address<process_manager::Actor>,
conn_actor: Address<connection::Actor>,
monitor_actor: Address<M>,
oracle_actor: Address<O>,
n_payouts: usize,
maker_identity: Identity,
@ -89,8 +85,8 @@ where
wallet,
oracle_pk,
projection_actor,
process_manager_actor,
conn_actor,
monitor_actor,
oracle_actor,
n_payouts,
setup_actors: AddressMap::default(),
@ -103,17 +99,36 @@ where
}
#[xtra_productivity]
impl<O, M, W> Actor<O, M, W>
impl<O, W> Actor<O, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
M: xtra::Handler<monitor::CollaborativeSettlement>,
{
async fn handle_current_order(&mut self, msg: CurrentOrder) -> Result<()> {
let order = msg.0;
tracing::trace!("new order {:?}", order);
match order {
Some(mut order) => {
order.origin = Origin::Theirs;
self.current_order = Some(order.clone());
self.projection_actor
.send(projection::Update(Some(order)))
.await?;
}
None => {
self.projection_actor.send(projection::Update(None)).await?;
}
}
Ok(())
}
async fn handle_commit(&mut self, msg: Commit) -> Result<()> {
let Commit { order_id } = msg;
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(order_id, &mut conn, &self.wallet, &self.projection_actor)
.await?;
cfd_actors::handle_commit(order_id, &mut conn, &self.process_manager_actor).await?;
Ok(())
}
@ -157,11 +172,24 @@ where
}
#[xtra_productivity(message_impl = false)]
impl<O, M, W> Actor<O, M, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
M: xtra::Handler<monitor::CollaborativeSettlement>,
{
impl<O, W> Actor<O, W> {
async fn handle_setup_completed(&mut self, msg: SetupCompleted) -> Result<()> {
let mut conn = self.db.acquire().await?;
let order_id = msg.order_id();
let cfd = load_cfd(order_id, &mut conn).await?;
let event = cfd.setup_contract(msg)?;
if let Err(e) = self
.process_manager_actor
.send(process_manager::Event::new(event.clone()))
.await?
{
tracing::error!("Sending event to process manager failed: {:#}", e);
}
Ok(())
}
async fn handle_settlement_completed(
&mut self,
msg: Completed<CollaborativeSettlement>,
@ -171,80 +199,36 @@ where
let cfd = load_cfd(order_id, &mut conn).await?;
let event = cfd.settle_collaboratively(msg)?;
append_event(event.clone(), &mut conn).await?;
self.projection_actor.send(projection::CfdsChanged).await?;
match event.event {
CfdEvent::CollaborativeSettlementCompleted {
spend_tx, script, ..
} => {
// TODO: Publish the tx once the collaborative settlement is symmetric, allowing the
// taker to publish as well.
let txid = spend_tx.txid();
tracing::info!(%order_id, "Collaborative settlement completed successfully {}", txid);
self.monitor_actor
.send(monitor::CollaborativeSettlement {
order_id,
tx: (txid, script),
})
.await?;
}
CfdEvent::CollaborativeSettlementRejected { commit_tx } => {
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction { tx: commit_tx })
.await?
.context("Broadcasting commit transaction")?;
tracing::info!(
"Closing non-collaboratively. Commit tx published with txid {}",
txid
)
}
CfdEvent::CollaborativeSettlementFailed { commit_tx } => {
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction { tx: commit_tx })
.await?
.context("Broadcasting commit transaction")?;
tracing::warn!(
"Closing non-collaboratively. Commit tx published with txid {}",
txid
)
}
_ => bail!("Unexpected event {:?}", event.event),
if let Err(e) = self
.process_manager_actor
.send(process_manager::Event::new(event.clone()))
.await?
{
tracing::error!("Sending event to process manager failed: {:#}", e);
}
Ok(())
}
}
impl<O, M, W> Actor<O, M, W> {
async fn handle_new_order(&mut self, order: Option<Order>) -> Result<()> {
tracing::trace!("new order {:?}", order);
match order {
Some(mut order) => {
order.origin = Origin::Theirs;
self.current_order = Some(order.clone());
async fn handle_monitor(&mut self, msg: monitor::Event) {
if let Err(e) =
cfd_actors::handle_monitoring_event(msg, &self.db, &self.process_manager_actor).await
{
tracing::error!("Unable to handle monotoring event: {:#}", e)
}
}
self.projection_actor
.send(projection::Update(Some(order)))
.await?;
}
None => {
self.projection_actor.send(projection::Update(None)).await?;
}
async fn handle_attestation(&mut self, msg: oracle::Attestation) {
if let Err(e) =
cfd_actors::handle_oracle_attestation(msg, &self.db, &self.process_manager_actor).await
{
tracing::warn!("Failed to handle oracle attestation: {:#}", e)
}
Ok(())
}
}
#[xtra_productivity]
impl<O, M, W> Actor<O, M, W>
impl<O, W> Actor<O, W>
where
Self: xtra::Handler<SetupCompleted>,
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
@ -318,103 +302,4 @@ where
}
}
impl<O, M, W> Actor<O, M, W>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_setup_completed(&mut self, msg: SetupCompleted) -> Result<()> {
let mut conn = self.db.acquire().await?;
let order_id = msg.order_id();
let cfd = load_cfd(order_id, &mut conn).await?;
let event = cfd.setup_contract(msg)?;
append_event(event.clone(), &mut conn).await?;
self.projection_actor.send(projection::CfdsChanged).await?;
let dlc = match event.event {
CfdEvent::ContractSetupCompleted { dlc } => dlc,
CfdEvent::OfferRejected | CfdEvent::ContractSetupFailed => {
return Ok(());
}
_ => bail!("Unexpected event {:?}", event.event),
};
tracing::info!("Setup complete, publishing on chain now");
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction {
tx: dlc.lock.0.clone(),
})
.await??;
tracing::info!("Lock transaction published with txid {}", txid);
self.monitor_actor
.send(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::new(dlc.clone()),
})
.await?;
self.oracle_actor
.send(oracle::MonitorAttestation {
event_id: dlc.settlement_event_id,
})
.await?;
Ok(())
}
}
#[xtra_productivity]
impl<O, M, W> Actor<O, M, W> {
async fn handle_current_order(&mut self, msg: CurrentOrder) -> Result<()> {
self.handle_new_order(msg.0).await
}
}
#[async_trait]
impl<O: 'static, M: 'static, W: 'static> Handler<SetupCompleted> for Actor<O, M, W>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: SetupCompleted, _ctx: &mut Context<Self>) -> Result<()> {
self.handle_setup_completed(msg).await
}
}
#[xtra_productivity(message_impl = false)]
impl<O, M, W> Actor<O, M, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_monitor(&mut self, msg: monitor::Event) {
if let Err(e) =
cfd_actors::handle_monitoring_event(msg, &self.db, &self.wallet, &self.projection_actor)
.await
{
tracing::error!("Unable to handle monotoring event: {:#}", e)
}
}
async fn handle_attestation(&mut self, msg: oracle::Attestation) {
if let Err(e) = cfd_actors::handle_oracle_attestation(
msg,
&self.db,
&self.wallet,
&self.projection_actor,
)
.await
{
tracing::warn!("Failed to handle oracle attestation: {:#}", e)
}
}
}
impl<O: 'static, M: 'static, W: 'static> xtra::Actor for Actor<O, M, W> {}
impl<O: 'static, W: 'static> xtra::Actor for Actor<O, W> {}

6
daemon/tests/harness/mod.rs

@ -1,4 +1,3 @@
use crate::harness::mocks::monitor::MonitorActor;
use crate::harness::mocks::oracle::OracleActor;
use crate::harness::mocks::wallet::WalletActor;
use crate::schnorrsig;
@ -116,8 +115,7 @@ impl Default for TakerConfig {
/// Maker Test Setup
pub struct Maker {
pub system:
MakerActorSystem<OracleActor, MonitorActor, maker_inc_connections::Actor, WalletActor>,
pub system: MakerActorSystem<OracleActor, maker_inc_connections::Actor, WalletActor>,
pub mocks: mocks::Mocks,
pub feeds: Feeds,
pub listen_addr: SocketAddr,
@ -238,7 +236,7 @@ impl Maker {
/// Taker Test Setup
pub struct Taker {
pub id: Identity,
pub system: daemon::TakerActorSystem<OracleActor, MonitorActor, WalletActor>,
pub system: daemon::TakerActorSystem<OracleActor, WalletActor>,
pub mocks: mocks::Mocks,
pub feeds: Feeds,
_tasks: Tasks,

Loading…
Cancel
Save