Browse Source

Refactor `insert_new_cfd_state_by_order_id` to `append_cfd_state`

We almost always have the entire `Cfd` available when we call this
function. It is therefore much easier to simply pass the entire
`Cfd` in instead of selective data.
testing
Thomas Eizinger 3 years ago
parent
commit
ca93714746
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 19
      daemon/src/cfd_actors.rs
  2. 25
      daemon/src/db.rs
  3. 23
      daemon/src/housekeeping.rs
  4. 126
      daemon/src/maker_cfd.rs
  5. 93
      daemon/src/taker_cfd.rs

19
daemon/src/cfd_actors.rs

@ -7,22 +7,21 @@ use sqlx::Sqlite;
use tokio::sync::watch; use tokio::sync::watch;
pub async fn insert_cfd( pub async fn insert_cfd(
cfd: Cfd, cfd: &Cfd,
conn: &mut PoolConnection<Sqlite>, conn: &mut PoolConnection<Sqlite>,
update_sender: &watch::Sender<Vec<Cfd>>, update_sender: &watch::Sender<Vec<Cfd>>,
) -> Result<()> { ) -> Result<()> {
db::insert_cfd(&cfd, conn).await?; db::insert_cfd(cfd, conn).await?;
update_sender.send(db::load_all_cfds(conn).await?)?; update_sender.send(db::load_all_cfds(conn).await?)?;
Ok(()) Ok(())
} }
pub async fn insert_new_cfd_state_by_order_id( pub async fn append_cfd_state(
order_id: OrderId, cfd: &Cfd,
new_state: &CfdState,
conn: &mut PoolConnection<Sqlite>, conn: &mut PoolConnection<Sqlite>,
update_sender: &watch::Sender<Vec<Cfd>>, update_sender: &watch::Sender<Vec<Cfd>>,
) -> Result<()> { ) -> Result<()> {
db::insert_new_cfd_state_by_order_id(order_id, new_state, conn).await?; db::append_cfd_state(cfd, conn).await?;
update_sender.send(db::load_all_cfds(conn).await?)?; update_sender.send(db::load_all_cfds(conn).await?)?;
Ok(()) Ok(())
} }
@ -42,7 +41,7 @@ pub async fn try_cet_publication(
bail!("If we can get the CET we should be able to transition") bail!("If we can get the CET we should be able to transition")
} }
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, conn, update_sender).await?; append_cfd_state(cfd, conn, update_sender).await?;
} }
Err(not_ready_yet) => { Err(not_ready_yet) => {
tracing::debug!("{:#}", not_ready_yet); tracing::debug!("{:#}", not_ready_yet);
@ -69,7 +68,7 @@ pub async fn handle_monitoring_event(
return Ok(()); return Ok(());
} }
insert_new_cfd_state_by_order_id(order_id, &cfd.state, conn, update_sender).await?; append_cfd_state(&cfd, conn, update_sender).await?;
if let CfdState::OpenCommitted { .. } = cfd.state { if let CfdState::OpenCommitted { .. } = cfd.state {
try_cet_publication(&mut cfd, conn, wallet, update_sender).await?; try_cet_publication(&mut cfd, conn, wallet, update_sender).await?;
@ -98,7 +97,7 @@ pub async fn handle_commit(
bail!("If we can get the commit tx we should be able to transition") bail!("If we can get the commit tx we should be able to transition")
} }
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, conn, update_sender).await?; append_cfd_state(&cfd, conn, update_sender).await?;
tracing::info!("Commit transaction published on chain: {}", txid); tracing::info!("Commit transaction published on chain: {}", txid);
Ok(()) Ok(())
@ -137,7 +136,7 @@ pub async fn handle_oracle_attestation(
continue; continue;
} }
insert_new_cfd_state_by_order_id(cfd.order.id, &cfd.state, conn, update_sender).await?; append_cfd_state(cfd, conn, update_sender).await?;
if let Err(e) = try_cet_publication(cfd, conn, wallet, update_sender).await { if let Err(e) = try_cet_publication(cfd, conn, wallet, update_sender).await {
tracing::error!("Error when trying to publish CET: {:#}", e); tracing::error!("Error when trying to publish CET: {:#}", e);

25
daemon/src/db.rs

@ -160,22 +160,19 @@ pub async fn insert_cfd(cfd: &Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow:
Ok(()) Ok(())
} }
pub async fn insert_new_cfd_state_by_order_id( pub async fn append_cfd_state(cfd: &Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<()> {
order_id: OrderId, let cfd_id = load_cfd_id_by_order_uuid(cfd.order.id, conn).await?;
new_state: &CfdState, let current_state = load_latest_cfd_state(cfd_id, conn)
conn: &mut PoolConnection<Sqlite>,
) -> anyhow::Result<()> {
let cfd_id = load_cfd_id_by_order_uuid(order_id, conn).await?;
let latest_cfd_state_in_db = load_latest_cfd_state(cfd_id, conn)
.await .await
.context("loading latest state failed")?; .context("loading latest state failed")?;
let new_state = &cfd.state;
if mem::discriminant(&latest_cfd_state_in_db) == mem::discriminant(new_state) { if mem::discriminant(&current_state) == mem::discriminant(new_state) {
// Since we have states where we add information this happens quite frequently // Since we have states where we add information this happens quite frequently
tracing::trace!( tracing::trace!(
"Same state transition for cfd with order_id {}: {}", "Same state transition for cfd with order_id {}: {}",
order_id, cfd.order.id,
latest_cfd_state_in_db current_state
); );
} }
@ -599,9 +596,7 @@ mod tests {
transition_timestamp: SystemTime::now(), transition_timestamp: SystemTime::now(),
}, },
}; };
insert_new_cfd_state_by_order_id(cfd_1.order.id, &cfd_1.state, &mut conn) append_cfd_state(&cfd_1, &mut conn).await.unwrap();
.await
.unwrap();
let cfds_from_db = load_all_cfds(&mut conn).await.unwrap(); let cfds_from_db = load_all_cfds(&mut conn).await.unwrap();
assert_eq!(vec![cfd_1.clone()], cfds_from_db); assert_eq!(vec![cfd_1.clone()], cfds_from_db);
@ -616,9 +611,7 @@ mod tests {
transition_timestamp: SystemTime::now(), transition_timestamp: SystemTime::now(),
}, },
}; };
insert_new_cfd_state_by_order_id(cfd_2.order.id, &cfd_2.state, &mut conn) append_cfd_state(&cfd_2, &mut conn).await.unwrap();
.await
.unwrap();
let cfds_from_db = load_all_cfds(&mut conn).await.unwrap(); let cfds_from_db = load_all_cfds(&mut conn).await.unwrap();
assert_eq!(vec![cfd_1, cfd_2], cfds_from_db); assert_eq!(vec![cfd_1, cfd_2], cfds_from_db);

23
daemon/src/housekeeping.rs

@ -1,4 +1,4 @@
use crate::db::{insert_new_cfd_state_by_order_id, load_all_cfds}; use crate::db::{append_cfd_state, load_all_cfds};
use crate::model::cfd::{Cfd, CfdState, CfdStateCommon}; use crate::model::cfd::{Cfd, CfdState, CfdStateCommon};
use crate::wallet::Wallet; use crate::wallet::Wallet;
use anyhow::Result; use anyhow::Result;
@ -9,20 +9,17 @@ use std::time::SystemTime;
pub async fn transition_non_continue_cfds_to_setup_failed( pub async fn transition_non_continue_cfds_to_setup_failed(
conn: &mut PoolConnection<Sqlite>, conn: &mut PoolConnection<Sqlite>,
) -> Result<()> { ) -> Result<()> {
let cfds = load_all_cfds(conn).await?; let mut cfds = load_all_cfds(conn).await?;
for cfd in cfds.iter().filter(|cfd| Cfd::is_cleanup(cfd)) { for cfd in cfds.iter_mut().filter(|cfd| Cfd::is_cleanup(cfd)) {
insert_new_cfd_state_by_order_id( cfd.state = CfdState::SetupFailed {
cfd.order.id, common: CfdStateCommon {
&CfdState::SetupFailed { transition_timestamp: SystemTime::now(),
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
info: format!("Was in state {} which cannot be continued.", cfd.state),
}, },
conn, info: format!("Was in state {} which cannot be continued.", cfd.state),
) };
.await?;
append_cfd_state(cfd, conn).await?;
} }
Ok(()) Ok(())

126
daemon/src/maker_cfd.rs

@ -1,4 +1,4 @@
use crate::cfd_actors::{self, insert_cfd, insert_new_cfd_state_by_order_id}; use crate::cfd_actors::{self, append_cfd_state, insert_cfd};
use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id};
use crate::maker_inc_connections::TakerCommand; use crate::maker_inc_connections::TakerCommand;
use crate::model::cfd::{ use crate::model::cfd::{
@ -292,13 +292,7 @@ impl Actor {
proposal.price, proposal.price,
), ),
))?; ))?;
insert_new_cfd_state_by_order_id( append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
cfd.order.id,
&cfd.state,
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
let spend_tx = dlc.finalize_spend_transaction((tx, sig_maker), sig_taker)?; let spend_tx = dlc.finalize_spend_transaction((tx, sig_maker), sig_taker)?;
@ -405,19 +399,16 @@ impl Actor {
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
insert_new_cfd_state_by_order_id( let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
order_id, cfd.state = CfdState::PendingOpen {
&CfdState::PendingOpen { common: CfdStateCommon {
common: CfdStateCommon { transition_timestamp: SystemTime::now(),
transition_timestamp: SystemTime::now(),
},
dlc: dlc.clone(),
attestation: None,
}, },
&mut conn, dlc: dlc.clone(),
&self.cfd_feed_actor_inbox, attestation: None,
) };
.await?;
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
let txid = self let txid = self
.wallet .wallet
@ -426,8 +417,6 @@ impl Actor {
tracing::info!("Lock transaction published with txid {}", txid); tracing::info!("Lock transaction published with txid {}", txid);
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
self.monitor_actor self.monitor_actor
.do_send_async(monitor::StartMonitoring { .do_send_async(monitor::StartMonitoring {
id: order_id, id: order_id,
@ -453,22 +442,17 @@ impl Actor {
self.roll_over_state = RollOverState::None; self.roll_over_state = RollOverState::None;
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
insert_new_cfd_state_by_order_id( let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
order_id, cfd.state = CfdState::Open {
&CfdState::Open { common: CfdStateCommon {
common: CfdStateCommon { transition_timestamp: SystemTime::now(),
transition_timestamp: SystemTime::now(),
},
dlc: dlc.clone(),
attestation: None,
collaborative_close: None,
}, },
&mut conn, dlc: dlc.clone(),
&self.cfd_feed_actor_inbox, attestation: None,
) collaborative_close: None,
.await?; };
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
self.monitor_actor self.monitor_actor
.do_send_async(monitor::StartMonitoring { .do_send_async(monitor::StartMonitoring {
@ -513,20 +497,7 @@ impl Actor {
} }
}; };
// 2. check if order has acceptable amounts // 2. Create a new CFD
if quantity < current_order.min_quantity || quantity > current_order.max_quantity {
tracing::warn!(
"Order rejected because quantity {} was out of bounds. It was either <{} or >{}",
quantity,
current_order.min_quantity,
current_order.max_quantity
);
self.reject_order(taker_id, order_id, conn).await?;
return Ok(());
}
// 3. Insert CFD in DB
let cfd = Cfd::new( let cfd = Cfd::new(
current_order.clone(), current_order.clone(),
quantity, quantity,
@ -537,7 +508,20 @@ impl Actor {
taker_id, taker_id,
}, },
); );
insert_cfd(cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; insert_cfd(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
// 3. check if order has acceptable amounts
if quantity < current_order.min_quantity || quantity > current_order.max_quantity {
tracing::warn!(
"Order rejected because quantity {} was out of bounds. It was either <{} or >{}",
quantity,
current_order.min_quantity,
current_order.max_quantity
);
self.reject_order(taker_id, cfd, conn).await?;
return Ok(());
}
// 4. Remove current order // 4. Remove current order
self.current_order_id = None; self.current_order_id = None;
@ -563,7 +547,7 @@ impl Actor {
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
// 1. Validate if order is still valid // 1. Validate if order is still valid
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let taker_id = match cfd { let taker_id = match cfd {
Cfd { Cfd {
state: CfdState::IncomingOrderRequest { taker_id, .. }, state: CfdState::IncomingOrderRequest { taker_id, .. },
@ -583,17 +567,13 @@ impl Actor {
.with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?; .with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?;
// 3. Insert that we are in contract setup and refresh our own feed // 3. Insert that we are in contract setup and refresh our own feed
insert_new_cfd_state_by_order_id( cfd.state = CfdState::ContractSetup {
order_id, common: CfdStateCommon {
&CfdState::ContractSetup { transition_timestamp: SystemTime::now(),
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
}, },
&mut conn, };
&self.cfd_feed_actor_inbox,
) append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
.await?;
// 4. Notify the taker that we are ready for contract setup // 4. Notify the taker that we are ready for contract setup
// Use `.send` here to ensure we only continue once the message has been sent // Use `.send` here to ensure we only continue once the message has been sent
@ -658,7 +638,7 @@ impl Actor {
} }
}; };
self.reject_order(taker_id, order_id, conn).await?; self.reject_order(taker_id, cfd, conn).await?;
Ok(()) Ok(())
} }
@ -671,26 +651,22 @@ impl Actor {
async fn reject_order( async fn reject_order(
&mut self, &mut self,
taker_id: TakerId, taker_id: TakerId,
order_id: OrderId, mut cfd: Cfd,
mut conn: PoolConnection<Sqlite>, mut conn: PoolConnection<Sqlite>,
) -> Result<()> { ) -> Result<()> {
// Update order in db // Update order in db
insert_new_cfd_state_by_order_id( cfd.state = CfdState::Rejected {
order_id, common: CfdStateCommon {
&CfdState::Rejected { transition_timestamp: SystemTime::now(),
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
}, },
&mut conn, };
&self.cfd_feed_actor_inbox,
) append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
.await?;
self.takers self.takers
.do_send_async(maker_inc_connections::TakerMessage { .do_send_async(maker_inc_connections::TakerMessage {
taker_id, taker_id,
command: TakerCommand::NotifyOrderRejected { id: order_id }, command: TakerCommand::NotifyOrderRejected { id: cfd.order.id },
}) })
.await?; .await?;

93
daemon/src/taker_cfd.rs

@ -1,4 +1,4 @@
use crate::cfd_actors::{self, insert_cfd, insert_new_cfd_state_by_order_id}; use crate::cfd_actors::{self, append_cfd_state, insert_cfd};
use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id};
use crate::model::cfd::{ use crate::model::cfd::{
Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc, Order, Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc, Order,
@ -156,7 +156,7 @@ impl Actor {
}, },
); );
insert_cfd(cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; insert_cfd(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
self.send_to_maker self.send_to_maker
.do_send_async(wire::TakerToMaker::TakeOrder { order_id, quantity }) .do_send_async(wire::TakerToMaker::TakeOrder { order_id, quantity })
@ -268,19 +268,14 @@ impl Actor {
} }
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
insert_new_cfd_state_by_order_id( let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
order_id, cfd.state = CfdState::ContractSetup {
&CfdState::ContractSetup { common: CfdStateCommon {
common: CfdStateCommon { transition_timestamp: SystemTime::now(),
transition_timestamp: SystemTime::now(),
},
}, },
&mut conn, };
&self.cfd_feed_actor_inbox,
)
.await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
let offer_announcement = self let offer_announcement = self
.oracle_actor .oracle_actor
@ -326,17 +321,14 @@ impl Actor {
tracing::debug!(%order_id, "Order rejected"); tracing::debug!(%order_id, "Order rejected");
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
insert_new_cfd_state_by_order_id( let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
order_id, cfd.state = CfdState::Rejected {
&CfdState::Rejected { common: CfdStateCommon {
common: CfdStateCommon { transition_timestamp: SystemTime::now(),
transition_timestamp: SystemTime::now(),
},
}, },
&mut conn, };
&self.cfd_feed_actor_inbox,
) append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
.await?;
Ok(()) Ok(())
} }
@ -366,13 +358,7 @@ impl Actor {
cfd.handle(CfdStateChangeEvent::ProposalSigned( cfd.handle(CfdStateChangeEvent::ProposalSigned(
CollaborativeSettlement::new(tx, dlc.script_pubkey_for(cfd.role()), proposal.price), CollaborativeSettlement::new(tx, dlc.script_pubkey_for(cfd.role()), proposal.price),
))?; ))?;
insert_new_cfd_state_by_order_id( append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
cfd.order.id,
&cfd.state,
&mut conn,
&self.cfd_feed_actor_inbox,
)
.await?;
self.remove_pending_proposal(&order_id)?; self.remove_pending_proposal(&order_id)?;
@ -489,20 +475,16 @@ impl Actor {
tracing::info!("Setup complete, publishing on chain now"); tracing::info!("Setup complete, publishing on chain now");
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
insert_new_cfd_state_by_order_id( cfd.state = CfdState::PendingOpen {
order_id, common: CfdStateCommon {
&CfdState::PendingOpen { transition_timestamp: SystemTime::now(),
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc: dlc.clone(),
attestation: None,
}, },
&mut conn, dlc: dlc.clone(),
&self.cfd_feed_actor_inbox, attestation: None,
) };
.await?;
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
let txid = self let txid = self
.wallet .wallet
@ -511,8 +493,6 @@ impl Actor {
tracing::info!("Lock transaction published with txid {}", txid); tracing::info!("Lock transaction published with txid {}", txid);
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
self.monitor_actor self.monitor_actor
.do_send_async(monitor::StartMonitoring { .do_send_async(monitor::StartMonitoring {
id: order_id, id: order_id,
@ -538,22 +518,17 @@ impl Actor {
self.roll_over_state = RollOverState::None; self.roll_over_state = RollOverState::None;
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
insert_new_cfd_state_by_order_id( let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
order_id, cfd.state = CfdState::Open {
&CfdState::Open { common: CfdStateCommon {
common: CfdStateCommon { transition_timestamp: SystemTime::now(),
transition_timestamp: SystemTime::now(),
},
dlc: dlc.clone(),
attestation: None,
collaborative_close: None,
}, },
&mut conn, dlc: dlc.clone(),
&self.cfd_feed_actor_inbox, attestation: None,
) collaborative_close: None,
.await?; };
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
self.monitor_actor self.monitor_actor
.do_send_async(monitor::StartMonitoring { .do_send_async(monitor::StartMonitoring {

Loading…
Cancel
Save