From 7b40667d4d427a6e52b4dc9c71f14eead424a9fd Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Wed, 20 Oct 2021 17:17:06 +1100 Subject: [PATCH 1/3] Macro to trace errors in loop and continue --- daemon/src/lib.rs | 1 + daemon/src/try_continue.rs | 13 +++++++++++++ 2 files changed, 14 insertions(+) create mode 100644 daemon/src/try_continue.rs diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index def8c6c..b02ff37 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -22,6 +22,7 @@ pub mod setup_contract; pub mod taker_cfd; pub mod to_sse_event; pub mod tokio_ext; +pub mod try_continue; pub mod wallet; pub mod wallet_sync; pub mod wire; diff --git a/daemon/src/try_continue.rs b/daemon/src/try_continue.rs new file mode 100644 index 0000000..5fa3dc3 --- /dev/null +++ b/daemon/src/try_continue.rs @@ -0,0 +1,13 @@ +/// Wrapper for errors in loop that logs error and continues +#[macro_export] +macro_rules! try_continue { + ($result:expr) => { + match $result { + Ok(value) => value, + Err(e) => { + tracing::error!("{:#}", e); + continue; + } + } + }; +} From c798035402194cba4c0704abfe99602f58207189 Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Wed, 20 Oct 2021 17:22:19 +1100 Subject: [PATCH 2/3] Don't fail re-broadcasting tx loop --- daemon/src/housekeeping.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/daemon/src/housekeeping.rs b/daemon/src/housekeeping.rs index 32056bc..dd5d225 100644 --- a/daemon/src/housekeeping.rs +++ b/daemon/src/housekeeping.rs @@ -1,10 +1,10 @@ use crate::db::{append_cfd_state, load_all_cfds}; use crate::model::cfd::{Cfd, CfdState}; +use crate::try_continue; use crate::wallet::Wallet; use anyhow::Result; use sqlx::pool::PoolConnection; use sqlx::Sqlite; - pub async fn transition_non_continue_cfds_to_setup_failed( conn: &mut PoolConnection, ) -> Result<()> { @@ -29,29 +29,28 @@ pub async fn rebroadcast_transactions( let cfds = load_all_cfds(conn).await?; for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) { - let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?; - + let txid = try_continue!(wallet.try_broadcast_transaction(dlc.lock.0.clone()).await); tracing::info!("Lock transaction published with txid {}", txid); } for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) { let signed_refund_tx = cfd.refund_tx()?; - let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?; + let txid = try_continue!(wallet.try_broadcast_transaction(signed_refund_tx).await); tracing::info!("Refund transaction published on chain: {}", txid); } for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) { let signed_commit_tx = cfd.commit_tx()?; - let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?; + let txid = try_continue!(wallet.try_broadcast_transaction(signed_commit_tx).await); tracing::info!("Commit transaction published on chain: {}", txid); } for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_cet(cfd)) { - // Double question-mark OK because if we are in PendingCet we must have been Ready before + // Double question mark OK because if we are in PendingCet we must have been Ready before let signed_cet = cfd.cet()??; - let txid = wallet.try_broadcast_transaction(signed_cet).await?; + let txid = try_continue!(wallet.try_broadcast_transaction(signed_cet).await); tracing::info!("CET published on chain: {}", txid); } From c9c03b7c88f359686f3d780138c8fadd3acb973c Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Wed, 20 Oct 2021 17:22:53 +1100 Subject: [PATCH 3/3] Don't fail in oracle attestation loop --- daemon/src/cfd_actors.rs | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/daemon/src/cfd_actors.rs b/daemon/src/cfd_actors.rs index 8484812..a82b0ec 100644 --- a/daemon/src/cfd_actors.rs +++ b/daemon/src/cfd_actors.rs @@ -1,7 +1,7 @@ use crate::model::cfd::{Attestation, Cfd, CfdState, CfdStateChangeEvent, OrderId}; use crate::wallet::Wallet; -use crate::{db, monitor, oracle}; -use anyhow::{bail, Result}; +use crate::{db, monitor, oracle, try_continue}; +use anyhow::{bail, Context, Result}; use sqlx::pool::PoolConnection; use sqlx::Sqlite; use tokio::sync::watch; @@ -120,28 +120,28 @@ pub async fn handle_oracle_attestation( .iter_mut() .filter_map(|cfd| cfd.dlc().map(|dlc| (cfd, dlc))) { - if cfd - .handle(CfdStateChangeEvent::OracleAttestation(Attestation::new( - attestation.id, - attestation.price, - attestation.scalars.clone(), - dlc, - cfd.role(), - )?))? - .is_none() - { + let attestation = try_continue!(Attestation::new( + attestation.id, + attestation.price, + attestation.scalars.clone(), + dlc, + cfd.role(), + )); + + let new_state = + try_continue!(cfd.handle(CfdStateChangeEvent::OracleAttestation(attestation))); + + if new_state.is_none() { // if we don't transition to a new state after oracle attestation we ignore the cfd // this is for cases where we cannot handle the attestation which should be in a // final state continue; } - append_cfd_state(cfd, conn, update_sender).await?; - - if let Err(e) = try_cet_publication(cfd, conn, wallet, update_sender).await { - tracing::error!("Error when trying to publish CET: {:#}", e); - continue; - } + try_continue!(append_cfd_state(cfd, conn, update_sender).await); + try_continue!(try_cet_publication(cfd, conn, wallet, update_sender) + .await + .context("Error when trying to publish CET")); } Ok(())