diff --git a/daemon/src/cleanup.rs b/daemon/src/cleanup.rs deleted file mode 100644 index cd2d7a4..0000000 --- a/daemon/src/cleanup.rs +++ /dev/null @@ -1,28 +0,0 @@ -use crate::db::{insert_new_cfd_state_by_order_id, load_all_cfds}; -use crate::model::cfd::{Cfd, CfdState, CfdStateCommon}; -use anyhow::Result; -use sqlx::pool::PoolConnection; -use sqlx::Sqlite; -use std::time::SystemTime; - -pub async fn transition_non_continue_cfds_to_setup_failed( - conn: &mut PoolConnection, -) -> Result<()> { - let cfds = load_all_cfds(conn).await?; - - for cfd in cfds.iter().filter(|cfd| Cfd::is_cleanup(cfd)) { - insert_new_cfd_state_by_order_id( - cfd.order.id, - CfdState::SetupFailed { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - info: format!("Was in state {} which cannot be continued.", cfd.state), - }, - conn, - ) - .await?; - } - - Ok(()) -} diff --git a/daemon/src/housekeeping.rs b/daemon/src/housekeeping.rs new file mode 100644 index 0000000..ca72834 --- /dev/null +++ b/daemon/src/housekeeping.rs @@ -0,0 +1,58 @@ +use crate::db::{insert_new_cfd_state_by_order_id, load_all_cfds}; +use crate::model::cfd::{Cfd, CfdState, CfdStateCommon}; +use crate::wallet::Wallet; +use anyhow::Result; +use sqlx::pool::PoolConnection; +use sqlx::Sqlite; +use std::time::SystemTime; + +pub async fn transition_non_continue_cfds_to_setup_failed( + conn: &mut PoolConnection, +) -> Result<()> { + let cfds = load_all_cfds(conn).await?; + + for cfd in cfds.iter().filter(|cfd| Cfd::is_cleanup(cfd)) { + insert_new_cfd_state_by_order_id( + cfd.order.id, + CfdState::SetupFailed { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + info: format!("Was in state {} which cannot be continued.", cfd.state), + }, + conn, + ) + .await?; + } + + Ok(()) +} + +pub async fn rebroadcast_transactions( + conn: &mut PoolConnection, + wallet: &Wallet, +) -> Result<()> { + let cfds = load_all_cfds(conn).await?; + + for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) { + let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?; + + tracing::info!("Lock transaction published with txid {}", txid); + } + + for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) { + let signed_refund_tx = cfd.refund_tx()?; + let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?; + + tracing::info!("Refund transaction published on chain: {}", txid); + } + + for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) { + let signed_commit_tx = cfd.commit_tx()?; + let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?; + + tracing::info!("Commit transaction published on chain: {}", txid); + } + + Ok(()) +} diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 6f718f7..e95b189 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -21,8 +21,8 @@ use xtra::spawn::TokioGlobalSpawnExt; mod actors; mod auth; mod bitmex_price_feed; -mod cleanup; mod db; +mod housekeeping; mod keypair; mod logger; mod maker_cfd; @@ -151,7 +151,10 @@ async fn main() -> Result<()> { }; let mut conn = db.acquire().await.unwrap(); - cleanup::transition_non_continue_cfds_to_setup_failed(&mut conn) + housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn) + .await + .unwrap(); + housekeeping::rebroadcast_transactions(&mut conn, &wallet) .await .unwrap(); let cfds = load_all_cfds(&mut conn).await.unwrap(); @@ -174,11 +177,8 @@ async fn main() -> Result<()> { order_feed_sender, maker_inc_connections_address.clone(), monitor_actor_address.clone(), - cfds.clone(), oracle_actor_address, ) - .await - .unwrap() .create(None) .spawn_global(); diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 5c5042b..e1f393d 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -74,7 +74,7 @@ enum SetupState { impl Actor { #[allow(clippy::too_many_arguments)] - pub async fn new( + pub fn new( db: sqlx::SqlitePool, wallet: Wallet, oracle_pk: schnorrsig::PublicKey, @@ -82,30 +82,9 @@ impl Actor { order_feed_sender: watch::Sender>, takers: Address, monitor_actor: Address>, - cfds: Vec, oracle_actor: Address>>, - ) -> Result { - for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) { - let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?; - - tracing::info!("Lock transaction published with txid {}", txid); - } - - for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) { - let signed_refund_tx = cfd.refund_tx()?; - let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?; - - tracing::info!("Refund transaction published on chain: {}", txid); - } - - for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) { - let signed_commit_tx = cfd.commit_tx()?; - let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?; - - tracing::info!("Commit transaction published on chain: {}", txid); - } - - Ok(Self { + ) -> Self { + Self { db, wallet, oracle_pk, @@ -117,7 +96,7 @@ impl Actor { setup_state: SetupState::None, latest_announcement: None, _oracle_actor: oracle_actor, - }) + } } async fn handle_new_order(&mut self, order: Order) -> Result<()> { diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 7d737c9..076a9f8 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -23,8 +23,8 @@ use xtra::Actor; mod actors; mod bitmex_price_feed; -mod cleanup; mod db; +mod housekeeping; mod keypair; mod logger; mod model; @@ -153,7 +153,10 @@ async fn main() -> Result<()> { }; let mut conn = db.acquire().await.unwrap(); - cleanup::transition_non_continue_cfds_to_setup_failed(&mut conn) + housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn) + .await + .unwrap(); + housekeeping::rebroadcast_transactions(&mut conn, &wallet) .await .unwrap(); let cfds = load_all_cfds(&mut conn).await.unwrap(); @@ -164,7 +167,6 @@ async fn main() -> Result<()> { .spawn_global(); let (monitor_actor_address, mut monitor_actor_context) = xtra::Context::new(None); - let (oracle_actor_address, mut oracle_actor_context) = xtra::Context::new(None); let mut conn = db.acquire().await.unwrap(); @@ -177,11 +179,8 @@ async fn main() -> Result<()> { order_feed_sender, send_to_maker, monitor_actor_address.clone(), - cfds.clone(), oracle_actor_address, ) - .await - .unwrap() .create(None) .spawn_global(); diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index cd11c06..7ba4717 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -66,7 +66,7 @@ pub struct Actor { impl Actor { #[allow(clippy::too_many_arguments)] - pub async fn new( + pub fn new( db: sqlx::SqlitePool, wallet: Wallet, oracle_pk: schnorrsig::PublicKey, @@ -74,30 +74,9 @@ impl Actor { order_feed_actor_inbox: watch::Sender>, send_to_maker: Address>, monitor_actor: Address>, - cfds: Vec, oracle_actor: Address>>, - ) -> Result { - for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) { - let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?; - - tracing::info!("Lock transaction published with txid {}", txid); - } - - for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) { - let signed_refund_tx = cfd.refund_tx()?; - let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?; - - tracing::info!("Refund transaction published on chain: {}", txid); - } - - for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) { - let signed_commit_tx = cfd.commit_tx()?; - let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?; - - tracing::info!("Commit transaction published on chain: {}", txid); - } - - Ok(Self { + ) -> Self { + Self { db, wallet, oracle_pk, @@ -108,7 +87,7 @@ impl Actor { setup_state: SetupState::None, latest_announcement: None, _oracle_actor: oracle_actor, - }) + } } async fn handle_take_offer(&mut self, order_id: OrderId, quantity: Usd) -> Result<()> {