Browse Source

Move re-broadcasting functionality from actor ctor to housekeeping

upload-correct-windows-binary
Thomas Eizinger 3 years ago
parent
commit
1560d86698
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 30
      daemon/src/housekeeping.rs
  2. 6
      daemon/src/maker.rs
  3. 29
      daemon/src/maker_cfd.rs
  4. 7
      daemon/src/taker.rs
  5. 29
      daemon/src/taker_cfd.rs

30
daemon/src/housekeeping.rs

@ -1,5 +1,6 @@
use crate::db::{insert_new_cfd_state_by_order_id, load_all_cfds}; use crate::db::{insert_new_cfd_state_by_order_id, load_all_cfds};
use crate::model::cfd::{Cfd, CfdState, CfdStateCommon}; use crate::model::cfd::{Cfd, CfdState, CfdStateCommon};
use crate::wallet::Wallet;
use anyhow::Result; use anyhow::Result;
use sqlx::pool::PoolConnection; use sqlx::pool::PoolConnection;
use sqlx::Sqlite; use sqlx::Sqlite;
@ -26,3 +27,32 @@ pub async fn transition_non_continue_cfds_to_setup_failed(
Ok(()) Ok(())
} }
pub async fn rebroadcast_transactions(
conn: &mut PoolConnection<Sqlite>,
wallet: &Wallet,
) -> Result<()> {
let cfds = load_all_cfds(conn).await?;
for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) {
let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?;
tracing::info!("Lock transaction published with txid {}", txid);
}
for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) {
let signed_refund_tx = cfd.refund_tx()?;
let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?;
tracing::info!("Refund transaction published on chain: {}", txid);
}
for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) {
let signed_commit_tx = cfd.commit_tx()?;
let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?;
tracing::info!("Commit transaction published on chain: {}", txid);
}
Ok(())
}

6
daemon/src/maker.rs

@ -154,6 +154,9 @@ async fn main() -> Result<()> {
housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn) housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn)
.await .await
.unwrap(); .unwrap();
housekeeping::rebroadcast_transactions(&mut conn, &wallet)
.await
.unwrap();
let cfds = load_all_cfds(&mut conn).await.unwrap(); let cfds = load_all_cfds(&mut conn).await.unwrap();
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
@ -174,11 +177,8 @@ async fn main() -> Result<()> {
order_feed_sender, order_feed_sender,
maker_inc_connections_address.clone(), maker_inc_connections_address.clone(),
monitor_actor_address.clone(), monitor_actor_address.clone(),
cfds.clone(),
oracle_actor_address, oracle_actor_address,
) )
.await
.unwrap()
.create(None) .create(None)
.spawn_global(); .spawn_global();

29
daemon/src/maker_cfd.rs

@ -74,7 +74,7 @@ enum SetupState {
impl Actor { impl Actor {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub async fn new( pub fn new(
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Wallet,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
@ -82,30 +82,9 @@ impl Actor {
order_feed_sender: watch::Sender<Option<Order>>, order_feed_sender: watch::Sender<Option<Order>>,
takers: Address<maker_inc_connections::Actor>, takers: Address<maker_inc_connections::Actor>,
monitor_actor: Address<monitor::Actor<Actor>>, monitor_actor: Address<monitor::Actor<Actor>>,
cfds: Vec<Cfd>,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>, oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>,
) -> Result<Self> { ) -> Self {
for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) { Self {
let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?;
tracing::info!("Lock transaction published with txid {}", txid);
}
for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) {
let signed_refund_tx = cfd.refund_tx()?;
let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?;
tracing::info!("Refund transaction published on chain: {}", txid);
}
for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) {
let signed_commit_tx = cfd.commit_tx()?;
let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?;
tracing::info!("Commit transaction published on chain: {}", txid);
}
Ok(Self {
db, db,
wallet, wallet,
oracle_pk, oracle_pk,
@ -117,7 +96,7 @@ impl Actor {
setup_state: SetupState::None, setup_state: SetupState::None,
latest_announcement: None, latest_announcement: None,
_oracle_actor: oracle_actor, _oracle_actor: oracle_actor,
}) }
} }
async fn handle_new_order(&mut self, order: Order) -> Result<()> { async fn handle_new_order(&mut self, order: Order) -> Result<()> {

7
daemon/src/taker.rs

@ -156,6 +156,9 @@ async fn main() -> Result<()> {
housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn) housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn)
.await .await
.unwrap(); .unwrap();
housekeeping::rebroadcast_transactions(&mut conn, &wallet)
.await
.unwrap();
let cfds = load_all_cfds(&mut conn).await.unwrap(); let cfds = load_all_cfds(&mut conn).await.unwrap();
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
@ -164,7 +167,6 @@ async fn main() -> Result<()> {
.spawn_global(); .spawn_global();
let (monitor_actor_address, mut monitor_actor_context) = xtra::Context::new(None); let (monitor_actor_address, mut monitor_actor_context) = xtra::Context::new(None);
let (oracle_actor_address, mut oracle_actor_context) = xtra::Context::new(None); let (oracle_actor_address, mut oracle_actor_context) = xtra::Context::new(None);
let mut conn = db.acquire().await.unwrap(); let mut conn = db.acquire().await.unwrap();
@ -177,11 +179,8 @@ async fn main() -> Result<()> {
order_feed_sender, order_feed_sender,
send_to_maker, send_to_maker,
monitor_actor_address.clone(), monitor_actor_address.clone(),
cfds.clone(),
oracle_actor_address, oracle_actor_address,
) )
.await
.unwrap()
.create(None) .create(None)
.spawn_global(); .spawn_global();

29
daemon/src/taker_cfd.rs

@ -66,7 +66,7 @@ pub struct Actor {
impl Actor { impl Actor {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub async fn new( pub fn new(
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Wallet,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
@ -74,30 +74,9 @@ impl Actor {
order_feed_actor_inbox: watch::Sender<Option<Order>>, order_feed_actor_inbox: watch::Sender<Option<Order>>,
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>, send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>,
monitor_actor: Address<monitor::Actor<Actor>>, monitor_actor: Address<monitor::Actor<Actor>>,
cfds: Vec<Cfd>,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>, oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>,
) -> Result<Self> { ) -> Self {
for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) { Self {
let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?;
tracing::info!("Lock transaction published with txid {}", txid);
}
for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) {
let signed_refund_tx = cfd.refund_tx()?;
let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?;
tracing::info!("Refund transaction published on chain: {}", txid);
}
for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) {
let signed_commit_tx = cfd.commit_tx()?;
let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?;
tracing::info!("Commit transaction published on chain: {}", txid);
}
Ok(Self {
db, db,
wallet, wallet,
oracle_pk, oracle_pk,
@ -108,7 +87,7 @@ impl Actor {
setup_state: SetupState::None, setup_state: SetupState::None,
latest_announcement: None, latest_announcement: None,
_oracle_actor: oracle_actor, _oracle_actor: oracle_actor,
}) }
} }
async fn handle_take_offer(&mut self, order_id: OrderId, quantity: Usd) -> Result<()> { async fn handle_take_offer(&mut self, order_id: OrderId, quantity: Usd) -> Result<()> {

Loading…
Cancel
Save