From 1560d866981a851a84fe675a3d6da71a2cebc6f2 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 1 Oct 2021 17:51:57 +1000 Subject: [PATCH] Move re-broadcasting functionality from actor ctor to housekeeping --- daemon/src/housekeeping.rs | 30 ++++++++++++++++++++++++++++++ daemon/src/maker.rs | 6 +++--- daemon/src/maker_cfd.rs | 29 ++++------------------------- daemon/src/taker.rs | 7 +++---- daemon/src/taker_cfd.rs | 29 ++++------------------------- 5 files changed, 44 insertions(+), 57 deletions(-) diff --git a/daemon/src/housekeeping.rs b/daemon/src/housekeeping.rs index cd2d7a4..ca72834 100644 --- a/daemon/src/housekeeping.rs +++ b/daemon/src/housekeeping.rs @@ -1,5 +1,6 @@ use crate::db::{insert_new_cfd_state_by_order_id, load_all_cfds}; use crate::model::cfd::{Cfd, CfdState, CfdStateCommon}; +use crate::wallet::Wallet; use anyhow::Result; use sqlx::pool::PoolConnection; use sqlx::Sqlite; @@ -26,3 +27,32 @@ pub async fn transition_non_continue_cfds_to_setup_failed( Ok(()) } + +pub async fn rebroadcast_transactions( + conn: &mut PoolConnection, + wallet: &Wallet, +) -> Result<()> { + let cfds = load_all_cfds(conn).await?; + + for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) { + let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?; + + tracing::info!("Lock transaction published with txid {}", txid); + } + + for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) { + let signed_refund_tx = cfd.refund_tx()?; + let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?; + + tracing::info!("Refund transaction published on chain: {}", txid); + } + + for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) { + let signed_commit_tx = cfd.commit_tx()?; + let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?; + + tracing::info!("Commit transaction published on chain: {}", txid); + } + + Ok(()) +} diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 795a0eb..e95b189 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -154,6 +154,9 @@ async fn main() -> Result<()> { housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn) .await .unwrap(); + housekeeping::rebroadcast_transactions(&mut conn, &wallet) + .await + .unwrap(); let cfds = load_all_cfds(&mut conn).await.unwrap(); let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); @@ -174,11 +177,8 @@ async fn main() -> Result<()> { order_feed_sender, maker_inc_connections_address.clone(), monitor_actor_address.clone(), - cfds.clone(), oracle_actor_address, ) - .await - .unwrap() .create(None) .spawn_global(); diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 5c5042b..e1f393d 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -74,7 +74,7 @@ enum SetupState { impl Actor { #[allow(clippy::too_many_arguments)] - pub async fn new( + pub fn new( db: sqlx::SqlitePool, wallet: Wallet, oracle_pk: schnorrsig::PublicKey, @@ -82,30 +82,9 @@ impl Actor { order_feed_sender: watch::Sender>, takers: Address, monitor_actor: Address>, - cfds: Vec, oracle_actor: Address>>, - ) -> Result { - for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) { - let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?; - - tracing::info!("Lock transaction published with txid {}", txid); - } - - for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) { - let signed_refund_tx = cfd.refund_tx()?; - let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?; - - tracing::info!("Refund transaction published on chain: {}", txid); - } - - for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) { - let signed_commit_tx = cfd.commit_tx()?; - let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?; - - tracing::info!("Commit transaction published on chain: {}", txid); - } - - Ok(Self { + ) -> Self { + Self { db, wallet, oracle_pk, @@ -117,7 +96,7 @@ impl Actor { setup_state: SetupState::None, latest_announcement: None, _oracle_actor: oracle_actor, - }) + } } async fn handle_new_order(&mut self, order: Order) -> Result<()> { diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index da7892d..076a9f8 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -156,6 +156,9 @@ async fn main() -> Result<()> { housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn) .await .unwrap(); + housekeeping::rebroadcast_transactions(&mut conn, &wallet) + .await + .unwrap(); let cfds = load_all_cfds(&mut conn).await.unwrap(); let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); @@ -164,7 +167,6 @@ async fn main() -> Result<()> { .spawn_global(); let (monitor_actor_address, mut monitor_actor_context) = xtra::Context::new(None); - let (oracle_actor_address, mut oracle_actor_context) = xtra::Context::new(None); let mut conn = db.acquire().await.unwrap(); @@ -177,11 +179,8 @@ async fn main() -> Result<()> { order_feed_sender, send_to_maker, monitor_actor_address.clone(), - cfds.clone(), oracle_actor_address, ) - .await - .unwrap() .create(None) .spawn_global(); diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index cd11c06..7ba4717 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -66,7 +66,7 @@ pub struct Actor { impl Actor { #[allow(clippy::too_many_arguments)] - pub async fn new( + pub fn new( db: sqlx::SqlitePool, wallet: Wallet, oracle_pk: schnorrsig::PublicKey, @@ -74,30 +74,9 @@ impl Actor { order_feed_actor_inbox: watch::Sender>, send_to_maker: Address>, monitor_actor: Address>, - cfds: Vec, oracle_actor: Address>>, - ) -> Result { - for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) { - let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?; - - tracing::info!("Lock transaction published with txid {}", txid); - } - - for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) { - let signed_refund_tx = cfd.refund_tx()?; - let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?; - - tracing::info!("Refund transaction published on chain: {}", txid); - } - - for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) { - let signed_commit_tx = cfd.commit_tx()?; - let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?; - - tracing::info!("Commit transaction published on chain: {}", txid); - } - - Ok(Self { + ) -> Self { + Self { db, wallet, oracle_pk, @@ -108,7 +87,7 @@ impl Actor { setup_state: SetupState::None, latest_announcement: None, _oracle_actor: oracle_actor, - }) + } } async fn handle_take_offer(&mut self, order_id: OrderId, quantity: Usd) -> Result<()> {