Browse Source

Merge #398

398: Don't fail for loops r=da-kami a=da-kami

We should not fail for loops where we loop over cfds to process state updates based on information we learned / restart. 
We use a macro to wrap all fallible code in the loop to just print an error and continue. 

Note: I went over the codebase in search of other for loops, maybe someone else can do another grep :)

Co-authored-by: Daniel Karzel <daniel@comit.network>
hotfix/0.1.1 0.1.0
bors[bot] 3 years ago
committed by GitHub
parent
commit
5891ad627a
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 36
      daemon/src/cfd_actors.rs
  2. 13
      daemon/src/housekeeping.rs
  3. 1
      daemon/src/lib.rs
  4. 13
      daemon/src/try_continue.rs

36
daemon/src/cfd_actors.rs

@ -1,8 +1,8 @@
use crate::db::load_cfd_by_order_id; use crate::db::load_cfd_by_order_id;
use crate::model::cfd::{Attestation, Cfd, CfdState, CfdStateChangeEvent, OrderId}; use crate::model::cfd::{Attestation, Cfd, CfdState, CfdStateChangeEvent, OrderId};
use crate::wallet::Wallet; use crate::wallet::Wallet;
use crate::{db, monitor, oracle}; use crate::{db, monitor, oracle, try_continue};
use anyhow::{bail, Result}; use anyhow::{bail, Context, Result};
use sqlx::pool::PoolConnection; use sqlx::pool::PoolConnection;
use sqlx::Sqlite; use sqlx::Sqlite;
use tokio::sync::watch; use tokio::sync::watch;
@ -128,28 +128,28 @@ pub async fn handle_oracle_attestation(
.iter_mut() .iter_mut()
.filter_map(|cfd| cfd.dlc().map(|dlc| (cfd, dlc))) .filter_map(|cfd| cfd.dlc().map(|dlc| (cfd, dlc)))
{ {
if cfd let attestation = try_continue!(Attestation::new(
.handle(CfdStateChangeEvent::OracleAttestation(Attestation::new( attestation.id,
attestation.id, attestation.price,
attestation.price, attestation.scalars.clone(),
attestation.scalars.clone(), dlc,
dlc, cfd.role(),
cfd.role(), ));
)?))?
.is_none() let new_state =
{ try_continue!(cfd.handle(CfdStateChangeEvent::OracleAttestation(attestation)));
if new_state.is_none() {
// if we don't transition to a new state after oracle attestation we ignore the cfd // if we don't transition to a new state after oracle attestation we ignore the cfd
// this is for cases where we cannot handle the attestation which should be in a // this is for cases where we cannot handle the attestation which should be in a
// final state // final state
continue; continue;
} }
append_cfd_state(cfd, conn, update_sender).await?; try_continue!(append_cfd_state(cfd, conn, update_sender).await);
try_continue!(try_cet_publication(cfd, conn, wallet, update_sender)
if let Err(e) = try_cet_publication(cfd, conn, wallet, update_sender).await { .await
tracing::error!("Error when trying to publish CET: {:#}", e); .context("Error when trying to publish CET"));
continue;
}
} }
Ok(()) Ok(())

13
daemon/src/housekeeping.rs

@ -1,10 +1,10 @@
use crate::db::{append_cfd_state, load_all_cfds}; use crate::db::{append_cfd_state, load_all_cfds};
use crate::model::cfd::{Cfd, CfdState}; use crate::model::cfd::{Cfd, CfdState};
use crate::try_continue;
use crate::wallet::Wallet; use crate::wallet::Wallet;
use anyhow::Result; use anyhow::Result;
use sqlx::pool::PoolConnection; use sqlx::pool::PoolConnection;
use sqlx::Sqlite; use sqlx::Sqlite;
pub async fn transition_non_continue_cfds_to_setup_failed( pub async fn transition_non_continue_cfds_to_setup_failed(
conn: &mut PoolConnection<Sqlite>, conn: &mut PoolConnection<Sqlite>,
) -> Result<()> { ) -> Result<()> {
@ -29,29 +29,28 @@ pub async fn rebroadcast_transactions(
let cfds = load_all_cfds(conn).await?; let cfds = load_all_cfds(conn).await?;
for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) { for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) {
let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?; let txid = try_continue!(wallet.try_broadcast_transaction(dlc.lock.0.clone()).await);
tracing::info!("Lock transaction published with txid {}", txid); tracing::info!("Lock transaction published with txid {}", txid);
} }
for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) { for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) {
let signed_refund_tx = cfd.refund_tx()?; let signed_refund_tx = cfd.refund_tx()?;
let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?; let txid = try_continue!(wallet.try_broadcast_transaction(signed_refund_tx).await);
tracing::info!("Refund transaction published on chain: {}", txid); tracing::info!("Refund transaction published on chain: {}", txid);
} }
for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) { for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) {
let signed_commit_tx = cfd.commit_tx()?; let signed_commit_tx = cfd.commit_tx()?;
let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?; let txid = try_continue!(wallet.try_broadcast_transaction(signed_commit_tx).await);
tracing::info!("Commit transaction published on chain: {}", txid); tracing::info!("Commit transaction published on chain: {}", txid);
} }
for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_cet(cfd)) { for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_cet(cfd)) {
// Double question-mark OK because if we are in PendingCet we must have been Ready before // Double question mark OK because if we are in PendingCet we must have been Ready before
let signed_cet = cfd.cet()??; let signed_cet = cfd.cet()??;
let txid = wallet.try_broadcast_transaction(signed_cet).await?; let txid = try_continue!(wallet.try_broadcast_transaction(signed_cet).await);
tracing::info!("CET published on chain: {}", txid); tracing::info!("CET published on chain: {}", txid);
} }

1
daemon/src/lib.rs

@ -22,6 +22,7 @@ pub mod setup_contract;
pub mod taker_cfd; pub mod taker_cfd;
pub mod to_sse_event; pub mod to_sse_event;
pub mod tokio_ext; pub mod tokio_ext;
pub mod try_continue;
pub mod wallet; pub mod wallet;
pub mod wallet_sync; pub mod wallet_sync;
pub mod wire; pub mod wire;

13
daemon/src/try_continue.rs

@ -0,0 +1,13 @@
/// Wrapper for errors in loop that logs error and continues
#[macro_export]
macro_rules! try_continue {
($result:expr) => {
match $result {
Ok(value) => value,
Err(e) => {
tracing::error!("{:#}", e);
continue;
}
}
};
}
Loading…
Cancel
Save