Browse Source

Merge pull request #187 from comit-network/housekeeping

upload-correct-windows-binary
Thomas Eizinger 3 years ago
committed by GitHub
parent
commit
7f66504090
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      daemon/src/cleanup.rs
  2. 58
      daemon/src/housekeeping.rs
  3. 10
      daemon/src/maker.rs
  4. 29
      daemon/src/maker_cfd.rs
  5. 11
      daemon/src/taker.rs
  6. 29
      daemon/src/taker_cfd.rs

28
daemon/src/cleanup.rs

@ -1,28 +0,0 @@
use crate::db::{insert_new_cfd_state_by_order_id, load_all_cfds};
use crate::model::cfd::{Cfd, CfdState, CfdStateCommon};
use anyhow::Result;
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use std::time::SystemTime;
pub async fn transition_non_continue_cfds_to_setup_failed(
conn: &mut PoolConnection<Sqlite>,
) -> Result<()> {
let cfds = load_all_cfds(conn).await?;
for cfd in cfds.iter().filter(|cfd| Cfd::is_cleanup(cfd)) {
insert_new_cfd_state_by_order_id(
cfd.order.id,
CfdState::SetupFailed {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
info: format!("Was in state {} which cannot be continued.", cfd.state),
},
conn,
)
.await?;
}
Ok(())
}

58
daemon/src/housekeeping.rs

@ -0,0 +1,58 @@
use crate::db::{insert_new_cfd_state_by_order_id, load_all_cfds};
use crate::model::cfd::{Cfd, CfdState, CfdStateCommon};
use crate::wallet::Wallet;
use anyhow::Result;
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use std::time::SystemTime;
pub async fn transition_non_continue_cfds_to_setup_failed(
conn: &mut PoolConnection<Sqlite>,
) -> Result<()> {
let cfds = load_all_cfds(conn).await?;
for cfd in cfds.iter().filter(|cfd| Cfd::is_cleanup(cfd)) {
insert_new_cfd_state_by_order_id(
cfd.order.id,
CfdState::SetupFailed {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
info: format!("Was in state {} which cannot be continued.", cfd.state),
},
conn,
)
.await?;
}
Ok(())
}
pub async fn rebroadcast_transactions(
conn: &mut PoolConnection<Sqlite>,
wallet: &Wallet,
) -> Result<()> {
let cfds = load_all_cfds(conn).await?;
for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) {
let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?;
tracing::info!("Lock transaction published with txid {}", txid);
}
for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) {
let signed_refund_tx = cfd.refund_tx()?;
let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?;
tracing::info!("Refund transaction published on chain: {}", txid);
}
for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) {
let signed_commit_tx = cfd.commit_tx()?;
let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?;
tracing::info!("Commit transaction published on chain: {}", txid);
}
Ok(())
}

10
daemon/src/maker.rs

@ -21,8 +21,8 @@ use xtra::spawn::TokioGlobalSpawnExt;
mod actors;
mod auth;
mod bitmex_price_feed;
mod cleanup;
mod db;
mod housekeeping;
mod keypair;
mod logger;
mod maker_cfd;
@ -151,7 +151,10 @@ async fn main() -> Result<()> {
};
let mut conn = db.acquire().await.unwrap();
cleanup::transition_non_continue_cfds_to_setup_failed(&mut conn)
housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn)
.await
.unwrap();
housekeeping::rebroadcast_transactions(&mut conn, &wallet)
.await
.unwrap();
let cfds = load_all_cfds(&mut conn).await.unwrap();
@ -174,11 +177,8 @@ async fn main() -> Result<()> {
order_feed_sender,
maker_inc_connections_address.clone(),
monitor_actor_address.clone(),
cfds.clone(),
oracle_actor_address,
)
.await
.unwrap()
.create(None)
.spawn_global();

29
daemon/src/maker_cfd.rs

@ -74,7 +74,7 @@ enum SetupState {
impl Actor {
#[allow(clippy::too_many_arguments)]
pub async fn new(
pub fn new(
db: sqlx::SqlitePool,
wallet: Wallet,
oracle_pk: schnorrsig::PublicKey,
@ -82,30 +82,9 @@ impl Actor {
order_feed_sender: watch::Sender<Option<Order>>,
takers: Address<maker_inc_connections::Actor>,
monitor_actor: Address<monitor::Actor<Actor>>,
cfds: Vec<Cfd>,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>,
) -> Result<Self> {
for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) {
let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?;
tracing::info!("Lock transaction published with txid {}", txid);
}
for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) {
let signed_refund_tx = cfd.refund_tx()?;
let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?;
tracing::info!("Refund transaction published on chain: {}", txid);
}
for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) {
let signed_commit_tx = cfd.commit_tx()?;
let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?;
tracing::info!("Commit transaction published on chain: {}", txid);
}
Ok(Self {
) -> Self {
Self {
db,
wallet,
oracle_pk,
@ -117,7 +96,7 @@ impl Actor {
setup_state: SetupState::None,
latest_announcement: None,
_oracle_actor: oracle_actor,
})
}
}
async fn handle_new_order(&mut self, order: Order) -> Result<()> {

11
daemon/src/taker.rs

@ -23,8 +23,8 @@ use xtra::Actor;
mod actors;
mod bitmex_price_feed;
mod cleanup;
mod db;
mod housekeeping;
mod keypair;
mod logger;
mod model;
@ -153,7 +153,10 @@ async fn main() -> Result<()> {
};
let mut conn = db.acquire().await.unwrap();
cleanup::transition_non_continue_cfds_to_setup_failed(&mut conn)
housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn)
.await
.unwrap();
housekeeping::rebroadcast_transactions(&mut conn, &wallet)
.await
.unwrap();
let cfds = load_all_cfds(&mut conn).await.unwrap();
@ -164,7 +167,6 @@ async fn main() -> Result<()> {
.spawn_global();
let (monitor_actor_address, mut monitor_actor_context) = xtra::Context::new(None);
let (oracle_actor_address, mut oracle_actor_context) = xtra::Context::new(None);
let mut conn = db.acquire().await.unwrap();
@ -177,11 +179,8 @@ async fn main() -> Result<()> {
order_feed_sender,
send_to_maker,
monitor_actor_address.clone(),
cfds.clone(),
oracle_actor_address,
)
.await
.unwrap()
.create(None)
.spawn_global();

29
daemon/src/taker_cfd.rs

@ -66,7 +66,7 @@ pub struct Actor {
impl Actor {
#[allow(clippy::too_many_arguments)]
pub async fn new(
pub fn new(
db: sqlx::SqlitePool,
wallet: Wallet,
oracle_pk: schnorrsig::PublicKey,
@ -74,30 +74,9 @@ impl Actor {
order_feed_actor_inbox: watch::Sender<Option<Order>>,
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>,
monitor_actor: Address<monitor::Actor<Actor>>,
cfds: Vec<Cfd>,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>,
) -> Result<Self> {
for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) {
let txid = wallet.try_broadcast_transaction(dlc.lock.0.clone()).await?;
tracing::info!("Lock transaction published with txid {}", txid);
}
for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) {
let signed_refund_tx = cfd.refund_tx()?;
let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?;
tracing::info!("Refund transaction published on chain: {}", txid);
}
for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) {
let signed_commit_tx = cfd.commit_tx()?;
let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?;
tracing::info!("Commit transaction published on chain: {}", txid);
}
Ok(Self {
) -> Self {
Self {
db,
wallet,
oracle_pk,
@ -108,7 +87,7 @@ impl Actor {
setup_state: SetupState::None,
latest_announcement: None,
_oracle_actor: oracle_actor,
})
}
}
async fn handle_take_offer(&mut self, order_id: OrderId, quantity: Usd) -> Result<()> {

Loading…
Cancel
Save