Browse Source

Post-processing for events concerning wallet, monitor, oracle

Removes a shitload of complexity.
Will cleanup impl blocks (i.e. combine impl blocks) in cfd actors in separate commit.
master^2
Daniel Karzel 3 years ago
parent
commit
818d3849fc
No known key found for this signature in database GPG Key ID: 30C3FC2E438ADB6E
  1. 66
      daemon/src/cfd_actors.rs
  2. 42
      daemon/src/lib.rs
  3. 174
      daemon/src/maker_cfd.rs
  4. 2
      daemon/src/monitor.rs
  5. 128
      daemon/src/process_manager.rs
  6. 4
      daemon/src/routes_maker.rs
  7. 3
      daemon/src/routes_taker.rs
  8. 152
      daemon/src/taker_cfd.rs
  9. 6
      daemon/tests/harness/mod.rs

66
daemon/src/cfd_actors.rs

@ -1,14 +1,11 @@
use crate::db;
use crate::model::cfd::Cfd;
use crate::model::cfd::CfdEvent;
use crate::model::cfd::Event;
use crate::model::cfd::OrderId;
use crate::monitor;
use crate::oracle;
use crate::process_manager;
use crate::projection;
use crate::try_continue;
use crate::wallet;
use anyhow::Context;
use anyhow::Result;
use sqlx::pool::PoolConnection;
@ -25,15 +22,11 @@ pub async fn insert_cfd_and_update_feed(
Ok(())
}
pub async fn handle_monitoring_event<W>(
pub async fn handle_monitoring_event(
event: monitor::Event,
db: &SqlitePool,
wallet: &xtra::Address<W>,
process_manager_address: &xtra::Address<process_manager::Actor>,
) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
) -> Result<()> {
let mut conn = db.acquire().await?;
let order_id = event.order_id();
@ -62,9 +55,6 @@ where
.await?
{
tracing::error!("Sending event to process manager failed: {:#}", e);
} else {
// TODO: Move into process manager
post_process_event(event, wallet).await?;
}
Ok(())
@ -99,15 +89,11 @@ pub async fn load_cfd(order_id: OrderId, conn: &mut PoolConnection<Sqlite>) -> R
Ok(cfd)
}
pub async fn handle_commit<W>(
pub async fn handle_commit(
order_id: OrderId,
conn: &mut PoolConnection<Sqlite>,
wallet: &xtra::Address<W>,
process_manager_address: &xtra::Address<process_manager::Actor>,
) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
) -> Result<()> {
let cfd = load_cfd(order_id, conn).await?;
let event = cfd.manual_commit_to_blockchain()?;
@ -116,23 +102,16 @@ where
.await?
{
tracing::error!("Sending event to process manager failed: {:#}", e);
} else {
// TODO: Move into process manager
post_process_event(event, wallet).await?;
}
Ok(())
}
pub async fn handle_oracle_attestation<W>(
pub async fn handle_oracle_attestation(
attestation: oracle::Attestation,
db: &SqlitePool,
wallet: &xtra::Address<W>,
process_manager_address: &xtra::Address<process_manager::Actor>,
) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
) -> Result<()> {
let mut conn = db.acquire().await?;
tracing::debug!(
@ -153,41 +132,8 @@ where
.await?
{
tracing::error!("Sending event to process manager failed: {:#}", e);
} else {
// TODO: Move into process manager
try_continue!(post_process_event(event, wallet).await);
}
}
}
Ok(())
}
async fn post_process_event<W>(event: Event, wallet: &xtra::Address<W>) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
match event.event {
CfdEvent::OracleAttestedPostCetTimelock { cet, .. }
| CfdEvent::CetTimelockConfirmedPostOracleAttestation { cet } => {
let txid = wallet
.send(wallet::TryBroadcastTransaction { tx: cet })
.await?
.context("Failed to broadcast CET")?;
tracing::info!(%txid, "CET published");
}
CfdEvent::OracleAttestedPriorCetTimelock { commit_tx: tx, .. }
| CfdEvent::ManualCommit { tx } => {
let txid = wallet
.send(wallet::TryBroadcastTransaction { tx })
.await?
.context("Failed to broadcast commit transaction")?;
tracing::info!(%txid, "Commit transaction published");
}
_ => {}
}
Ok(())

42
daemon/src/lib.rs

@ -111,22 +111,18 @@ impl Tasks {
}
}
pub struct MakerActorSystem<O, M, T, W> {
pub cfd_actor_addr: Address<maker_cfd::Actor<O, M, T, W>>,
pub struct MakerActorSystem<O, T, W> {
pub cfd_actor_addr: Address<maker_cfd::Actor<O, T, W>>,
wallet_actor_addr: Address<W>,
inc_conn_addr: Address<T>,
_tasks: Tasks,
}
impl<O, M, T, W> MakerActorSystem<O, M, T, W>
impl<O, T, W> MakerActorSystem<O, T, W>
where
O: xtra::Handler<oracle::MonitorAttestation>
+ xtra::Handler<oracle::GetAnnouncement>
+ xtra::Handler<oracle::Sync>,
M: xtra::Handler<monitor::StartMonitoring>
+ xtra::Handler<monitor::Sync>
+ xtra::Handler<monitor::CollaborativeSettlement>
+ xtra::Handler<oracle::Attestation>,
T: xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>
+ xtra::Handler<maker_inc_connections::ConfirmOrder>
@ -142,7 +138,7 @@ where
+ xtra::Handler<wallet::Withdraw>,
{
#[allow(clippy::too_many_arguments)]
pub async fn new<FO, FM>(
pub async fn new<FO, FM, M>(
db: SqlitePool,
wallet_addr: Address<W>,
oracle_pk: schnorrsig::PublicKey,
@ -158,6 +154,10 @@ where
projection_actor: Address<projection::Actor>,
) -> Result<Self>
where
M: xtra::Handler<monitor::StartMonitoring>
+ xtra::Handler<monitor::Sync>
+ xtra::Handler<monitor::CollaborativeSettlement>
+ xtra::Handler<oracle::Attestation>,
FO: Future<Output = Result<O>>,
FM: Future<Output = Result<M>>,
{
@ -172,6 +172,10 @@ where
db.clone(),
Role::Maker,
&projection_actor,
&wallet_addr,
&monitor_addr,
&monitor_addr,
&oracle_addr,
)));
let (cfd_actor_addr, cfd_actor_fut) = maker_cfd::Actor::new(
@ -182,7 +186,6 @@ where
projection_actor,
process_manager_addr.clone(),
inc_conn_addr.clone(),
monitor_addr.clone(),
oracle_addr.clone(),
n_payouts,
)
@ -318,23 +321,19 @@ where
}
}
pub struct TakerActorSystem<O, M, W> {
pub cfd_actor_addr: Address<taker_cfd::Actor<O, M, W>>,
pub struct TakerActorSystem<O, W> {
pub cfd_actor_addr: Address<taker_cfd::Actor<O, W>>,
pub connection_actor_addr: Address<connection::Actor>,
pub maker_online_status_feed_receiver: watch::Receiver<ConnectionStatus>,
wallet_actor_addr: Address<W>,
_tasks: Tasks,
}
impl<O, M, W> TakerActorSystem<O, M, W>
impl<O, W> TakerActorSystem<O, W>
where
O: xtra::Handler<oracle::MonitorAttestation>
+ xtra::Handler<oracle::GetAnnouncement>
+ xtra::Handler<oracle::Sync>,
M: xtra::Handler<monitor::StartMonitoring>
+ xtra::Handler<monitor::Sync>
+ xtra::Handler<monitor::CollaborativeSettlement>
+ xtra::Handler<oracle::Attestation>,
W: xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::TryBroadcastTransaction>
@ -342,7 +341,7 @@ where
+ xtra::Handler<wallet::Reinitialise>,
{
#[allow(clippy::too_many_arguments)]
pub async fn new<FM, FO>(
pub async fn new<FM, FO, M>(
db: SqlitePool,
wallet_actor_addr: Address<W>,
oracle_pk: schnorrsig::PublicKey,
@ -356,6 +355,10 @@ where
maker_identity: Identity,
) -> Result<Self>
where
M: xtra::Handler<monitor::StartMonitoring>
+ xtra::Handler<monitor::Sync>
+ xtra::Handler<monitor::CollaborativeSettlement>
+ xtra::Handler<oracle::Attestation>,
FO: Future<Output = Result<O>>,
FM: Future<Output = Result<M>>,
{
@ -372,6 +375,10 @@ where
db.clone(),
Role::Taker,
&projection_actor,
&wallet_actor_addr,
&monitor_addr,
&monitor_addr,
&oracle_addr,
)));
let (connection_actor_addr, connection_actor_ctx) = xtra::Context::new(None);
@ -382,7 +389,6 @@ where
projection_actor.clone(),
process_manager_addr,
connection_actor_addr.clone(),
monitor_addr.clone(),
oracle_addr.clone(),
n_payouts,
maker_identity,

174
daemon/src/maker_cfd.rs

@ -7,7 +7,6 @@ use crate::collab_settlement_maker;
use crate::maker_inc_connections;
use crate::model;
use crate::model::cfd::Cfd;
use crate::model::cfd::CfdEvent;
use crate::model::cfd::CollaborativeSettlement;
use crate::model::cfd::Order;
use crate::model::cfd::OrderId;
@ -21,7 +20,6 @@ use crate::model::Position;
use crate::model::Price;
use crate::model::Usd;
use crate::monitor;
use crate::monitor::MonitorParams;
use crate::oracle;
use crate::process_manager;
use crate::projection;
@ -34,7 +32,6 @@ use crate::wallet;
use crate::wire;
use crate::wire::TakerToMaker;
use crate::Tasks;
use anyhow::bail;
use anyhow::Context as _;
use anyhow::Result;
use async_trait::async_trait;
@ -91,7 +88,7 @@ pub struct FromTaker {
pub msg: wire::TakerToMaker,
}
pub struct Actor<O, M, T, W> {
pub struct Actor<O, T, W> {
db: sqlx::SqlitePool,
wallet: Address<W>,
settlement_interval: Duration,
@ -101,7 +98,6 @@ pub struct Actor<O, M, T, W> {
rollover_actors: AddressMap<OrderId, rollover_maker::Actor>,
takers: Address<T>,
current_order: Option<Order>,
monitor_actor: Address<M>,
setup_actors: AddressMap<OrderId, setup_maker::Actor>,
settlement_actors: AddressMap<OrderId, collab_settlement_maker::Actor>,
oracle_actor: Address<O>,
@ -110,7 +106,7 @@ pub struct Actor<O, M, T, W> {
tasks: Tasks,
}
impl<O, M, T, W> Actor<O, M, T, W> {
impl<O, T, W> Actor<O, T, W> {
#[allow(clippy::too_many_arguments)]
pub fn new(
db: sqlx::SqlitePool,
@ -120,7 +116,6 @@ impl<O, M, T, W> Actor<O, M, T, W> {
projection_actor: Address<projection::Actor>,
process_manager_actor: Address<process_manager::Actor>,
takers: Address<T>,
monitor_actor: Address<M>,
oracle_actor: Address<O>,
n_payouts: usize,
) -> Self {
@ -134,7 +129,6 @@ impl<O, M, T, W> Actor<O, M, T, W> {
rollover_actors: AddressMap::default(),
takers,
current_order: None,
monitor_actor,
setup_actors: AddressMap::default(),
oracle_actor,
n_payouts,
@ -157,7 +151,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
}
}
impl<O, M, T, W> Actor<O, M, T, W>
impl<O, T, W> Actor<O, T, W>
where
T: xtra::Handler<maker_inc_connections::TakerMessage>,
{
@ -186,7 +180,7 @@ where
}
#[xtra_productivity]
impl<O, M, T, W> Actor<O, M, T, W> {
impl<O, T, W> Actor<O, T, W> {
async fn handle_accept_rollover(&mut self, msg: AcceptRollOver) -> Result<()> {
if self
.rollover_actors
@ -214,10 +208,9 @@ impl<O, M, T, W> Actor<O, M, T, W> {
}
}
impl<O, M, T, W> Actor<O, M, T, W>
impl<O, T, W> Actor<O, T, W>
where
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
T: xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<Stopping<rollover_maker::Actor>>
+ xtra::Handler<RollOverProposed>,
@ -276,16 +269,15 @@ where
}
#[xtra_productivity(message_impl = false)]
impl<O, M, T, W> Actor<O, M, T, W> {
impl<O, T, W> Actor<O, T, W> {
async fn handle_rollover_actor_stopping(&mut self, msg: Stopping<rollover_maker::Actor>) {
self.rollover_actors.gc(msg);
}
}
impl<O, M, T, W> Actor<O, M, T, W>
impl<O, T, W> Actor<O, T, W>
where
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
T: xtra::Handler<maker_inc_connections::ConfirmOrder>
+ xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>
@ -390,7 +382,7 @@ where
}
#[xtra_productivity]
impl<O, M, T, W> Actor<O, M, T, W> {
impl<O, T, W> Actor<O, T, W> {
async fn handle_accept_order(&mut self, msg: AcceptOrder) -> Result<()> {
let AcceptOrder { order_id } = msg;
@ -441,7 +433,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
}
#[xtra_productivity(message_impl = false)]
impl<O, M, T, W> Actor<O, M, T, W> {
impl<O, T, W> Actor<O, T, W> {
async fn handle_setup_actor_stopping(&mut self, message: Stopping<setup_maker::Actor>) {
self.setup_actors.gc(message);
}
@ -455,11 +447,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
}
#[xtra_productivity(message_impl = false)]
impl<O, M, T, W> Actor<O, M, T, W>
where
M: xtra::Handler<monitor::CollaborativeSettlement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
impl<O, T, W> Actor<O, T, W> {
async fn handle_settlement_completed(
&mut self,
msg: model::cfd::Completed<CollaborativeSettlement>,
@ -477,80 +465,24 @@ where
tracing::error!("Sending event to process manager failed: {:#}", e);
}
match event.event {
CfdEvent::CollaborativeSettlementCompleted {
spend_tx, script, ..
} => {
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction { tx: spend_tx })
.await?
.context("Broadcasting close transaction")?;
tracing::info!(%order_id, "Close transaction published with txid {}", txid);
self.monitor_actor
.send(monitor::CollaborativeSettlement {
order_id,
tx: (txid, script),
})
.await?;
}
CfdEvent::CollaborativeSettlementRejected { commit_tx } => {
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction { tx: commit_tx })
.await?
.context("Broadcasting commit transaction")?;
tracing::info!(
"Closing non-collaboratively. Commit tx published with txid {}",
txid
)
}
CfdEvent::CollaborativeSettlementFailed { commit_tx } => {
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction { tx: commit_tx })
.await?
.context("Broadcasting commit transaction")?;
tracing::warn!(
"Closing non-collaboratively. Commit tx published with txid {}",
txid
)
}
_ => bail!("Unexpected event {:?}", event.event),
}
Ok(())
}
}
#[xtra_productivity]
impl<O, M, T, W> Actor<O, M, T, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
impl<O, T, W> Actor<O, T, W> {
async fn handle_commit(&mut self, msg: Commit) -> Result<()> {
let Commit { order_id } = msg;
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.process_manager_actor,
)
.await?;
cfd_actors::handle_commit(order_id, &mut conn, &self.process_manager_actor).await?;
Ok(())
}
}
impl<O, M, T, W> Actor<O, M, T, W>
impl<O, T, W> Actor<O, T, W>
where
M: xtra::Handler<monitor::StartMonitoring>,
O: xtra::Handler<oracle::MonitorAttestation>,
{
async fn handle_roll_over_completed(&mut self, _: Completed) -> Result<()> {
@ -560,10 +492,9 @@ where
}
}
impl<O, M, T, W> Actor<O, M, T, W>
impl<O, T, W> Actor<O, T, W>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring> + xtra::Handler<monitor::CollaborativeSettlement>,
T: xtra::Handler<maker_inc_connections::settlement::Response>
+ xtra::Handler<Stopping<collab_settlement_maker::Actor>>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
@ -608,7 +539,7 @@ where
}
#[xtra_productivity]
impl<O, M, T, W> Actor<O, M, T, W>
impl<O, T, W> Actor<O, T, W>
where
T: xtra::Handler<maker_inc_connections::BroadcastOrder>,
{
@ -652,12 +583,7 @@ where
}
#[xtra_productivity(message_impl = false)]
impl<O, M, T, W> Actor<O, M, T, W>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
impl<O, T, W> Actor<O, T, W> {
async fn handle_setup_completed(&mut self, msg: SetupCompleted) -> Result<()> {
let order_id = msg.order_id();
let mut conn = self.db.acquire().await?;
@ -672,44 +598,12 @@ where
tracing::error!("Sending event to process manager failed: {:#}", e);
}
let dlc = match event.event {
CfdEvent::ContractSetupCompleted { dlc } => dlc,
CfdEvent::OfferRejected | CfdEvent::ContractSetupFailed => {
return Ok(());
}
_ => bail!("Unexpected event {:?}", event.event),
};
tracing::info!("Setup complete, publishing on chain now");
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction {
tx: dlc.lock.0.clone(),
})
.await??;
tracing::info!("Lock transaction published with txid {}", txid);
self.monitor_actor
.send(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::new(dlc.clone()),
})
.await?;
self.oracle_actor
.send(oracle::MonitorAttestation {
event_id: dlc.settlement_event_id,
})
.await?;
Ok(())
}
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<TakerConnected> for Actor<O, M, T, W>
impl<O: 'static, T: 'static, W: 'static> Handler<TakerConnected> for Actor<O, T, W>
where
T: xtra::Handler<maker_inc_connections::TakerMessage>,
{
@ -719,8 +613,7 @@ where
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<TakerDisconnected>
for Actor<O, M, T, W>
impl<O: 'static, T: 'static, W: 'static> Handler<TakerDisconnected> for Actor<O, T, W>
where
T: xtra::Handler<maker_inc_connections::TakerMessage>,
{
@ -730,9 +623,8 @@ where
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<Completed> for Actor<O, M, T, W>
impl<O: 'static, T: 'static, W: 'static> Handler<Completed> for Actor<O, T, W>
where
M: xtra::Handler<monitor::StartMonitoring>,
O: xtra::Handler<oracle::MonitorAttestation>,
{
async fn handle(&mut self, msg: Completed, _ctx: &mut Context<Self>) -> Result<()> {
@ -741,31 +633,18 @@ where
}
#[xtra_productivity(message_impl = false)]
impl<O, M, T, W> Actor<O, M, T, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
impl<O, T, W> Actor<O, T, W> {
async fn handle_monitor(&mut self, msg: monitor::Event) {
if let Err(e) = cfd_actors::handle_monitoring_event(
msg,
&self.db,
&self.wallet,
&self.process_manager_actor,
)
.await
if let Err(e) =
cfd_actors::handle_monitoring_event(msg, &self.db, &self.process_manager_actor).await
{
tracing::error!("Unable to handle monotoring event: {:#}", e)
}
}
async fn handle_attestation(&mut self, msg: oracle::Attestation) {
if let Err(e) = cfd_actors::handle_oracle_attestation(
msg,
&self.db,
&self.wallet,
&self.process_manager_actor,
)
.await
if let Err(e) =
cfd_actors::handle_oracle_attestation(msg, &self.db, &self.process_manager_actor).await
{
tracing::warn!("Failed to handle oracle attestation: {:#}", e)
}
@ -773,10 +652,9 @@ where
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<FromTaker> for Actor<O, M, T, W>
impl<O: 'static, T: 'static, W: 'static> Handler<FromTaker> for Actor<O, T, W>
where
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring> + xtra::Handler<monitor::CollaborativeSettlement>,
T: xtra::Handler<maker_inc_connections::ConfirmOrder>
+ xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>
@ -879,4 +757,4 @@ impl Message for FromTaker {
type Result = ();
}
impl<O: 'static, M: 'static, T: 'static, W: 'static> xtra::Actor for Actor<O, M, T, W> {}
impl<O: 'static, T: 'static, W: 'static> xtra::Actor for Actor<O, T, W> {}

2
daemon/src/monitor.rs

@ -100,7 +100,7 @@ impl Cfd {
// might become relevant. See also https://github.com/itchysats/itchysats/issues/605 and https://github.com/itchysats/itchysats/issues/236.
fn apply(self, event: cfd::Event) -> Self {
match event.event {
CfdEvent::ContractSetupCompleted { dlc } => Self {
CfdEvent::ContractSetupCompleted { dlc, .. } => Self {
params: Some(MonitorParams::new(dlc)),
monitor_lock_finality: true,
monitor_commit_finality: true,

128
daemon/src/process_manager.rs

@ -1,15 +1,25 @@
use crate::db::append_event;
use crate::model::cfd;
use crate::model::cfd::CfdEvent;
use crate::model::cfd::Role;
use crate::monitor;
use crate::monitor::MonitorParams;
use crate::oracle;
use crate::projection;
use crate::wallet;
use anyhow::Context;
use anyhow::Result;
use xtra::prelude::MessageChannel;
use xtra_productivity::xtra_productivity;
pub struct Actor {
db: sqlx::SqlitePool,
_role: Role,
role: Role,
cfds_changed: Box<dyn MessageChannel<projection::CfdsChanged>>,
try_broadcast_transaction: Box<dyn MessageChannel<wallet::TryBroadcastTransaction>>,
start_monitoring: Box<dyn MessageChannel<monitor::StartMonitoring>>,
monitor_collaborative_settlement: Box<dyn MessageChannel<monitor::CollaborativeSettlement>>,
monitor_attestation: Box<dyn MessageChannel<oracle::MonitorAttestation>>,
}
pub struct Event(cfd::Event);
@ -25,11 +35,20 @@ impl Actor {
db: sqlx::SqlitePool,
role: Role,
cfds_changed: &(impl MessageChannel<projection::CfdsChanged> + 'static),
try_broadcast_transaction: &(impl MessageChannel<wallet::TryBroadcastTransaction> + 'static),
start_monitoring: &(impl MessageChannel<monitor::StartMonitoring> + 'static),
monitor_collaborative_settlement: &(impl MessageChannel<monitor::CollaborativeSettlement>
+ 'static),
monitor_attestation: &(impl MessageChannel<oracle::MonitorAttestation> + 'static),
) -> Self {
Self {
db,
_role: role,
role,
cfds_changed: cfds_changed.clone_channel(),
try_broadcast_transaction: try_broadcast_transaction.clone_channel(),
start_monitoring: start_monitoring.clone_channel(),
monitor_collaborative_settlement: monitor_collaborative_settlement.clone_channel(),
monitor_attestation: monitor_attestation.clone_channel(),
}
}
}
@ -43,7 +62,110 @@ impl Actor {
let mut conn = self.db.acquire().await?;
append_event(event.clone(), &mut conn).await?;
// TODO: 2. Post-process event by sending out messages
// 2. Post process event
match event.event {
CfdEvent::ContractSetupCompleted { dlc } => {
tracing::info!("Setup complete, publishing on chain now");
let lock_tx = dlc.lock.0.clone();
let txid = self
.try_broadcast_transaction
.send(wallet::TryBroadcastTransaction { tx: lock_tx })
.await??;
tracing::info!("Lock transaction published with txid {}", txid);
self.start_monitoring
.send(monitor::StartMonitoring {
id: event.id,
params: MonitorParams::new(dlc.clone()),
})
.await?;
self.monitor_attestation
.send(oracle::MonitorAttestation {
event_id: dlc.settlement_event_id,
})
.await?;
}
CfdEvent::CollaborativeSettlementCompleted {
spend_tx, script, ..
} => {
let txid = match self.role {
Role::Maker => {
let txid = self
.try_broadcast_transaction
.send(wallet::TryBroadcastTransaction { tx: spend_tx })
.await?
.context("Broadcasting close transaction")?;
tracing::info!(order_id=%event.id, "Close transaction published with txid {}", txid);
txid
}
Role::Taker => {
// TODO: Publish the tx once the collaborative settlement is symmetric,
// allowing the taker to publish as well.
let txid = spend_tx.txid();
tracing::info!(order_id=%event.id, "Collaborative settlement completed successfully {}", txid);
txid
}
};
self.monitor_collaborative_settlement
.send(monitor::CollaborativeSettlement {
order_id: event.id,
tx: (txid, script),
})
.await?;
}
CfdEvent::CollaborativeSettlementRejected { commit_tx } => {
let txid = self
.try_broadcast_transaction
.send(wallet::TryBroadcastTransaction { tx: commit_tx })
.await?
.context("Broadcasting commit transaction")?;
tracing::info!(
"Closing non-collaboratively. Commit tx published with txid {}",
txid
)
}
CfdEvent::CollaborativeSettlementFailed { commit_tx } => {
let txid = self
.try_broadcast_transaction
.send(wallet::TryBroadcastTransaction { tx: commit_tx })
.await?
.context("Broadcasting commit transaction")?;
tracing::warn!(
"Closing non-collaboratively. Commit tx published with txid {}",
txid
)
}
CfdEvent::OracleAttestedPostCetTimelock { cet, .. }
| CfdEvent::CetTimelockConfirmedPostOracleAttestation { cet } => {
let txid = self
.try_broadcast_transaction
.send(wallet::TryBroadcastTransaction { tx: cet })
.await?
.context("Failed to broadcast CET")?;
tracing::info!(%txid, "CET published");
}
CfdEvent::OracleAttestedPriorCetTimelock { commit_tx: tx, .. }
| CfdEvent::ManualCommit { tx } => {
let txid = self
.try_broadcast_transaction
.send(wallet::TryBroadcastTransaction { tx })
.await?
.context("Failed to broadcast commit transaction")?;
tracing::info!(%txid, "Commit transaction published");
}
_ => {} // TODO: Monitor post processing for rollover
}
// 3. Update UI
self.cfds_changed.send(projection::CfdsChanged).await?;

4
daemon/src/routes_maker.rs

@ -7,7 +7,6 @@ use daemon::model::Identity;
use daemon::model::Price;
use daemon::model::Usd;
use daemon::model::WalletInfo;
use daemon::monitor;
use daemon::oracle;
use daemon::projection::Cfd;
use daemon::projection::CfdAction;
@ -33,8 +32,7 @@ use std::path::PathBuf;
use tokio::select;
use tokio::sync::watch;
pub type Maker =
MakerActorSystem<oracle::Actor, monitor::Actor, maker_inc_connections::Actor, wallet::Actor>;
pub type Maker = MakerActorSystem<oracle::Actor, maker_inc_connections::Actor, wallet::Actor>;
#[allow(clippy::too_many_arguments)]
#[rocket::get("/feed")]

3
daemon/src/routes_taker.rs

@ -8,7 +8,6 @@ use daemon::model::Leverage;
use daemon::model::Price;
use daemon::model::Usd;
use daemon::model::WalletInfo;
use daemon::monitor;
use daemon::oracle;
use daemon::projection;
use daemon::projection::CfdAction;
@ -34,7 +33,7 @@ use std::path::PathBuf;
use tokio::select;
use tokio::sync::watch;
type Taker = TakerActorSystem<oracle::Actor, monitor::Actor, wallet::Actor>;
type Taker = TakerActorSystem<oracle::Actor, wallet::Actor>;
#[rocket::get("/feed")]
pub async fn feed(

152
daemon/src/taker_cfd.rs

@ -5,7 +5,6 @@ use crate::cfd_actors::load_cfd;
use crate::collab_settlement_taker;
use crate::connection;
use crate::model::cfd::Cfd;
use crate::model::cfd::CfdEvent;
use crate::model::cfd::CollaborativeSettlement;
use crate::model::cfd::Completed;
use crate::model::cfd::Order;
@ -18,17 +17,14 @@ use crate::model::Position;
use crate::model::Price;
use crate::model::Usd;
use crate::monitor;
use crate::monitor::MonitorParams;
use crate::oracle;
use crate::process_manager;
use crate::projection;
use crate::setup_taker;
use crate::wallet;
use crate::Tasks;
use anyhow::bail;
use anyhow::Context as _;
use anyhow::Result;
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
use xtra::prelude::*;
use xtra::Actor as _;
@ -50,14 +46,13 @@ pub struct Commit {
pub order_id: OrderId,
}
pub struct Actor<O, M, W> {
pub struct Actor<O, W> {
db: sqlx::SqlitePool,
wallet: Address<W>,
oracle_pk: schnorrsig::PublicKey,
projection_actor: Address<projection::Actor>,
process_manager_actor: Address<process_manager::Actor>,
conn_actor: Address<connection::Actor>,
monitor_actor: Address<M>,
setup_actors: AddressMap<OrderId, setup_taker::Actor>,
collab_settlement_actors: AddressMap<OrderId, collab_settlement_taker::Actor>,
oracle_actor: Address<O>,
@ -67,7 +62,7 @@ pub struct Actor<O, M, W> {
maker_identity: Identity,
}
impl<O, M, W> Actor<O, M, W>
impl<O, W> Actor<O, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
@ -81,7 +76,6 @@ where
projection_actor: Address<projection::Actor>,
process_manager_actor: Address<process_manager::Actor>,
conn_actor: Address<connection::Actor>,
monitor_actor: Address<M>,
oracle_actor: Address<O>,
n_payouts: usize,
maker_identity: Identity,
@ -93,7 +87,6 @@ where
projection_actor,
process_manager_actor,
conn_actor,
monitor_actor,
oracle_actor,
n_payouts,
setup_actors: AddressMap::default(),
@ -106,22 +99,15 @@ where
}
#[xtra_productivity]
impl<O, M, W> Actor<O, M, W>
impl<O, W> Actor<O, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
M: xtra::Handler<monitor::CollaborativeSettlement>,
{
async fn handle_commit(&mut self, msg: Commit) -> Result<()> {
let Commit { order_id } = msg;
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.process_manager_actor,
)
.await?;
cfd_actors::handle_commit(order_id, &mut conn, &self.process_manager_actor).await?;
Ok(())
}
@ -165,11 +151,7 @@ where
}
#[xtra_productivity(message_impl = false)]
impl<O, M, W> Actor<O, M, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
M: xtra::Handler<monitor::CollaborativeSettlement>,
{
impl<O, W> Actor<O, W> {
async fn handle_settlement_completed(
&mut self,
msg: Completed<CollaborativeSettlement>,
@ -187,55 +169,11 @@ where
tracing::error!("Sending event to process manager failed: {:#}", e);
}
match event.event {
CfdEvent::CollaborativeSettlementCompleted {
spend_tx, script, ..
} => {
// TODO: Publish the tx once the collaborative settlement is symmetric, allowing the
// taker to publish as well.
let txid = spend_tx.txid();
tracing::info!(%order_id, "Collaborative settlement completed successfully {}", txid);
self.monitor_actor
.send(monitor::CollaborativeSettlement {
order_id,
tx: (txid, script),
})
.await?;
}
CfdEvent::CollaborativeSettlementRejected { commit_tx } => {
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction { tx: commit_tx })
.await?
.context("Broadcasting commit transaction")?;
tracing::info!(
"Closing non-collaboratively. Commit tx published with txid {}",
txid
)
}
CfdEvent::CollaborativeSettlementFailed { commit_tx } => {
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction { tx: commit_tx })
.await?
.context("Broadcasting commit transaction")?;
tracing::warn!(
"Closing non-collaboratively. Commit tx published with txid {}",
txid
)
}
_ => bail!("Unexpected event {:?}", event.event),
}
Ok(())
}
}
impl<O, M, W> Actor<O, M, W> {
impl<O, W> Actor<O, W> {
async fn handle_new_order(&mut self, order: Option<Order>) -> Result<()> {
tracing::trace!("new order {:?}", order);
match order {
@ -257,7 +195,7 @@ impl<O, M, W> Actor<O, M, W> {
}
#[xtra_productivity]
impl<O, M, W> Actor<O, M, W>
impl<O, W> Actor<O, W>
where
Self: xtra::Handler<SetupCompleted>,
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
@ -331,12 +269,8 @@ where
}
}
impl<O, M, W> Actor<O, M, W>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
#[xtra_productivity(message_impl = false)]
impl<O, W> Actor<O, W> {
async fn handle_setup_completed(&mut self, msg: SetupCompleted) -> Result<()> {
let mut conn = self.db.acquire().await?;
let order_id = msg.order_id();
@ -351,91 +285,37 @@ where
tracing::error!("Sending event to process manager failed: {:#}", e);
}
let dlc = match event.event {
CfdEvent::ContractSetupCompleted { dlc } => dlc,
CfdEvent::OfferRejected | CfdEvent::ContractSetupFailed => {
return Ok(());
}
_ => bail!("Unexpected event {:?}", event.event),
};
tracing::info!("Setup complete, publishing on chain now");
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction {
tx: dlc.lock.0.clone(),
})
.await??;
tracing::info!("Lock transaction published with txid {}", txid);
self.monitor_actor
.send(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::new(dlc.clone()),
})
.await?;
self.oracle_actor
.send(oracle::MonitorAttestation {
event_id: dlc.settlement_event_id,
})
.await?;
Ok(())
}
}
#[xtra_productivity]
impl<O, M, W> Actor<O, M, W> {
impl<O, W> Actor<O, W> {
async fn handle_current_order(&mut self, msg: CurrentOrder) -> Result<()> {
self.handle_new_order(msg.0).await
}
}
#[async_trait]
impl<O: 'static, M: 'static, W: 'static> Handler<SetupCompleted> for Actor<O, M, W>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: SetupCompleted, _ctx: &mut Context<Self>) -> Result<()> {
self.handle_setup_completed(msg).await
}
}
#[xtra_productivity(message_impl = false)]
impl<O, M, W> Actor<O, M, W>
impl<O, W> Actor<O, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_monitor(&mut self, msg: monitor::Event) {
if let Err(e) = cfd_actors::handle_monitoring_event(
msg,
&self.db,
&self.wallet,
&self.process_manager_actor,
)
.await
if let Err(e) =
cfd_actors::handle_monitoring_event(msg, &self.db, &self.process_manager_actor).await
{
tracing::error!("Unable to handle monotoring event: {:#}", e)
}
}
async fn handle_attestation(&mut self, msg: oracle::Attestation) {
if let Err(e) = cfd_actors::handle_oracle_attestation(
msg,
&self.db,
&self.wallet,
&self.process_manager_actor,
)
.await
if let Err(e) =
cfd_actors::handle_oracle_attestation(msg, &self.db, &self.process_manager_actor).await
{
tracing::warn!("Failed to handle oracle attestation: {:#}", e)
}
}
}
impl<O: 'static, M: 'static, W: 'static> xtra::Actor for Actor<O, M, W> {}
impl<O: 'static, W: 'static> xtra::Actor for Actor<O, W> {}

6
daemon/tests/harness/mod.rs

@ -1,4 +1,3 @@
use crate::harness::mocks::monitor::MonitorActor;
use crate::harness::mocks::oracle::OracleActor;
use crate::harness::mocks::wallet::WalletActor;
use crate::schnorrsig;
@ -116,8 +115,7 @@ impl Default for TakerConfig {
/// Maker Test Setup
pub struct Maker {
pub system:
MakerActorSystem<OracleActor, MonitorActor, maker_inc_connections::Actor, WalletActor>,
pub system: MakerActorSystem<OracleActor, maker_inc_connections::Actor, WalletActor>,
pub mocks: mocks::Mocks,
pub feeds: Feeds,
pub listen_addr: SocketAddr,
@ -238,7 +236,7 @@ impl Maker {
/// Taker Test Setup
pub struct Taker {
pub id: Identity,
pub system: daemon::TakerActorSystem<OracleActor, MonitorActor, WalletActor>,
pub system: daemon::TakerActorSystem<OracleActor, WalletActor>,
pub mocks: mocks::Mocks,
pub feeds: Feeds,
_tasks: Tasks,

Loading…
Cancel
Save