Browse Source

Merge #899

899: Rename database function for loading CFDs r=thomaseizinger a=thomaseizinger

With the removal of the order from the DB, we are loading a CFD by
its ID which just happens to be the order id but that is no longer
relevant at this stage.

Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
remove-long-heartbeat-interval-in-debug-mode
bors[bot] 3 years ago
committed by GitHub
parent
commit
e8d37e43ef
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      daemon/src/cfd_actors.rs
  2. 11
      daemon/src/db.rs
  3. 20
      daemon/src/maker_cfd.rs
  4. 20
      daemon/src/taker_cfd.rs

4
daemon/src/cfd_actors.rs

@ -77,7 +77,7 @@ where
{
let order_id = event.order_id();
let mut cfd = db::load_cfd_by_order_id(order_id, conn).await?;
let mut cfd = db::load_cfd(order_id, conn).await?;
if cfd.handle_monitoring_event(event)?.is_none() {
// early exit if there was not state change
@ -112,7 +112,7 @@ pub async fn handle_commit<W>(
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
let mut cfd = db::load_cfd_by_order_id(order_id, conn).await?;
let mut cfd = db::load_cfd(order_id, conn).await?;
let signed_commit_tx = cfd.commit_tx()?;

11
daemon/src/db.rs

@ -147,10 +147,7 @@ async fn load_latest_cfd_state(
Ok(latest_cfd_state_in_db)
}
pub async fn load_cfd_by_order_id(
order_id: OrderId,
conn: &mut PoolConnection<Sqlite>,
) -> Result<Cfd> {
pub async fn load_cfd(order_id: OrderId, conn: &mut PoolConnection<Sqlite>) -> Result<Cfd> {
let row = sqlx::query!(
r#"
with state as (
@ -310,7 +307,7 @@ mod tests {
let mut conn = setup_test_db().await;
let cfd = Cfd::dummy().insert(&mut conn).await;
let loaded = load_cfd_by_order_id(cfd.id(), &mut conn).await.unwrap();
let loaded = load_cfd(cfd.id(), &mut conn).await.unwrap();
assert_eq!(cfd, loaded)
}
@ -322,8 +319,8 @@ mod tests {
let cfd1 = Cfd::dummy().insert(&mut conn).await;
let cfd2 = Cfd::dummy().insert(&mut conn).await;
let loaded_1 = load_cfd_by_order_id(cfd1.id(), &mut conn).await.unwrap();
let loaded_2 = load_cfd_by_order_id(cfd2.id(), &mut conn).await.unwrap();
let loaded_1 = load_cfd(cfd1.id(), &mut conn).await.unwrap();
let loaded_2 = load_cfd(cfd2.id(), &mut conn).await.unwrap();
assert_eq!(cfd1, loaded_1);
assert_eq!(cfd2, loaded_2);

20
daemon/src/maker_cfd.rs

@ -4,7 +4,7 @@ use crate::cfd_actors::append_cfd_state;
use crate::cfd_actors::insert_cfd_and_update_feed;
use crate::cfd_actors::{self};
use crate::collab_settlement_maker;
use crate::db::load_cfd_by_order_id;
use crate::db::load_cfd;
use crate::log_error;
use crate::maker_inc_connections;
use crate::model::cfd::Cfd;
@ -160,7 +160,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
tracing::error!(%order_id, "Contract setup failed: {:#?}", error);
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
*cfd.state_mut() = CfdState::setup_failed(error.to_string());
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
@ -169,7 +169,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
async fn append_cfd_state_rejected(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
*cfd.state_mut() = CfdState::rejected();
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
@ -303,7 +303,7 @@ where
// check if CFD is in open state, otherwise we should not proceed
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(proposal.order_id, &mut conn).await?;
let cfd = load_cfd(proposal.order_id, &mut conn).await?;
match cfd.state() {
CfdState::Open { .. } => (),
_ => {
@ -472,7 +472,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
tracing::debug!(%order_id, "Maker accepts order");
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
self.setup_actors
.send(&order_id, setup_maker::Accepted)
@ -514,7 +514,7 @@ where
tracing::debug!(%order_id, "Maker rejects order");
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let cfd = load_cfd(order_id, &mut conn).await?;
let taker_id = match cfd.state() {
CfdState::IncomingOrderRequest { taker_id, .. } => taker_id,
@ -576,7 +576,7 @@ where
};
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
let tx = settlement.tx.clone();
cfd.handle_proposal_signed(settlement)
@ -630,7 +630,7 @@ where
let Completed { order_id, dlc } = msg;
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
*cfd.state_mut() = CfdState::Open {
common: CfdStateCommon::default(),
dlc: dlc.clone(),
@ -681,7 +681,7 @@ where
})?;
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(proposal.order_id, &mut conn).await?;
let cfd = load_cfd(proposal.order_id, &mut conn).await?;
let this = ctx.address().expect("self to be alive");
let (addr, task) = collab_settlement_maker::Actor::new(
@ -770,7 +770,7 @@ where
};
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
*cfd.state_mut() = CfdState::PendingOpen {
common: CfdStateCommon::default(),

20
daemon/src/taker_cfd.rs

@ -5,7 +5,7 @@ use crate::cfd_actors::insert_cfd_and_update_feed;
use crate::cfd_actors::{self};
use crate::collab_settlement_taker;
use crate::connection;
use crate::db::load_cfd_by_order_id;
use crate::db::load_cfd;
use crate::log_error;
use crate::model::cfd::Cfd;
use crate::model::cfd::CfdState;
@ -140,7 +140,7 @@ where
.with_context(|| format!("Settlement for order {} is already in progress", order_id))?;
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let cfd = load_cfd(order_id, &mut conn).await?;
let this = ctx
.address()
@ -182,7 +182,7 @@ where
let settlement_txid = settlement.tx.txid();
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
let dlc = cfd.dlc().context("No DLC in CFD")?;
cfd.handle_proposal_signed(settlement)?;
@ -208,7 +208,7 @@ impl<O, M, W> Actor<O, M, W> {
let mut conn = self.db.acquire().await?;
if load_cfd_by_order_id(order.id, &mut conn).await.is_ok() {
if load_cfd(order.id, &mut conn).await.is_ok() {
bail!("Received order {} from maker, but already have a cfd in the database for that order. The maker did not properly remove the order.", order.id)
}
@ -229,7 +229,7 @@ impl<O, M, W> Actor<O, M, W> {
tracing::debug!(%order_id, "Order rejected");
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
*cfd.state_mut() = CfdState::rejected();
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
@ -244,7 +244,7 @@ impl<O, M, W> Actor<O, M, W> {
tracing::error!(%order_id, "Contract setup failed: {:#?}", error);
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
*cfd.state_mut() = CfdState::setup_failed(error.to_string());
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
@ -255,7 +255,7 @@ impl<O, M, W> Actor<O, M, W> {
/// and update the corresponding projection.
async fn handle_setup_started(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
*cfd.state_mut() = CfdState::contract_setup();
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
@ -380,7 +380,7 @@ where
};
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
tracing::info!("Setup complete, publishing on chain now");
@ -437,7 +437,7 @@ where
.with_context(|| format!("Rollover for order {} is already in progress", order_id))?;
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let cfd = load_cfd(order_id, &mut conn).await?;
let this = ctx
.address()
@ -481,7 +481,7 @@ where
};
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
*cfd.state_mut() = CfdState::Open {
common: CfdStateCommon::default(),
dlc: dlc.clone(),

Loading…
Cancel
Save