Browse Source

Optional state transition and early exit

Reasoning:
The state transition can fail, hence the result.
It can, however, happen that there is no failure, but we still did not transition to a new state.
In such cases we don't want to save the state in the database because it would be an unwanted duplicated entry.
temporary-fast-timelocks
Daniel Karzel 3 years ago
parent
commit
9666b63cb7
No known key found for this signature in database GPG Key ID: 30C3FC2E438ADB6E
  1. 40
      daemon/src/maker_cfd.rs
  2. 14
      daemon/src/model/cfd.rs
  3. 45
      daemon/src/taker_cfd.rs

40
daemon/src/maker_cfd.rs

@ -12,7 +12,7 @@ use crate::model::{TakerId, Usd};
use crate::monitor::MonitorParams;
use crate::wallet::Wallet;
use crate::{maker_inc_connections, monitor, oracle, setup_contract, wire};
use anyhow::{Context as _, Result};
use anyhow::{bail, Context as _, Result};
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
use futures::channel::mpsc;
@ -617,8 +617,9 @@ impl Actor {
tracing::info!("Commit transaction published on chain: {}", txid);
let new_state = cfd.handle(CfdStateChangeEvent::CommitTxSent)?;
if let Some(new_state) = cfd.handle(CfdStateChangeEvent::CommitTxSent)? {
insert_new_cfd_state_by_order_id(cfd.order.id, new_state, &mut conn).await?;
}
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
@ -785,15 +786,21 @@ impl Actor {
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let new_state = cfd.handle(CfdStateChangeEvent::Monitor(event))?;
insert_new_cfd_state_by_order_id(order_id, new_state.clone(), &mut conn).await?;
if cfd.handle(CfdStateChangeEvent::Monitor(event))?.is_none() {
// early exit if there was not state change
// this is for cases where we are already in a final state
return Ok(());
}
insert_new_cfd_state_by_order_id(order_id, cfd.state.clone(), &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
// TODO: code duplication maker/taker
if let CfdState::OpenCommitted { .. } = new_state {
if let CfdState::OpenCommitted { .. } = cfd.state {
self.try_cet_publication(cfd).await?;
} else if let CfdState::MustRefund { .. } = new_state {
} else if let CfdState::MustRefund { .. } = cfd.state {
let signed_refund_tx = cfd.refund_tx()?;
let txid = self
.wallet
@ -816,9 +823,18 @@ impl Actor {
let cfds = load_cfds_by_oracle_event_id(attestation.id.clone(), &mut conn).await?;
for mut cfd in cfds {
cfd.handle(CfdStateChangeEvent::OracleAttestation(
if cfd
.handle(CfdStateChangeEvent::OracleAttestation(
attestation.clone().into(),
))?;
))?
.is_none()
{
// if we don't transition to a new state after oracle attestation we ignore the cfd
// this is for cases where we cannot handle the attestation which should be in a
// final state
continue;
}
insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn).await?;
self.try_cet_publication(cfd).await?;
@ -836,8 +852,12 @@ impl Actor {
let txid = self.wallet.try_broadcast_transaction(cet).await?;
tracing::info!("CET published with txid {}", txid);
cfd.handle(CfdStateChangeEvent::CetSent)?;
insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state, &mut conn).await?;
if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() {
bail!("If we can get the CET we should be able to transition")
}
insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn)
.await?;
}
Err(not_ready_yet) => {
tracing::debug!("{:#}", not_ready_yet);

14
daemon/src/model/cfd.rs

@ -543,7 +543,7 @@ impl Cfd {
#[allow(dead_code)]
pub const CET_TIMELOCK: u32 = 12;
pub fn handle(&mut self, event: CfdStateChangeEvent) -> Result<CfdState> {
pub fn handle(&mut self, event: CfdStateChangeEvent) -> Result<Option<CfdState>> {
use CfdState::*;
// TODO: Display impl
@ -551,6 +551,16 @@ impl Cfd {
let order_id = self.order.id;
// early exit if already final
if let SetupFailed { .. } | Closed { .. } | Refunded { .. } = self.state.clone() {
tracing::trace!(
"Ignoring event {:?} because cfd already in state {}",
event,
self.state
);
return Ok(None);
}
let new_state = match event {
CfdStateChangeEvent::Monitor(event) => match event {
monitor::Event::LockFinality(_) => {
@ -792,7 +802,7 @@ impl Cfd {
self.state = new_state.clone();
Ok(new_state)
Ok(Some(new_state))
}
pub fn refund_tx(&self) -> Result<Transaction> {

45
daemon/src/taker_cfd.rs

@ -12,7 +12,7 @@ use crate::monitor::{self, MonitorParams};
use crate::wallet::Wallet;
use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg};
use crate::{oracle, send_to_socket, setup_contract, wire};
use anyhow::{Context as _, Result};
use anyhow::{bail, Context as _, Result};
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
use futures::channel::mpsc;
@ -541,17 +541,21 @@ impl Actor {
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let new_state = cfd.handle(CfdStateChangeEvent::Monitor(event))?;
if cfd.handle(CfdStateChangeEvent::Monitor(event))?.is_none() {
// early exit if there was not state change
// this is for cases where we are already in a final state
return Ok(());
}
insert_new_cfd_state_by_order_id(order_id, new_state.clone(), &mut conn).await?;
insert_new_cfd_state_by_order_id(order_id, cfd.state.clone(), &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
// TODO: code duplicateion maker/taker
if let CfdState::OpenCommitted { .. } = new_state {
if let CfdState::OpenCommitted { .. } = cfd.state {
self.try_cet_publication(cfd).await?;
} else if let CfdState::MustRefund { .. } = new_state {
} else if let CfdState::MustRefund { .. } = cfd.state {
let signed_refund_tx = cfd.refund_tx()?;
let txid = self
.wallet
@ -576,14 +580,16 @@ impl Actor {
.try_broadcast_transaction(signed_commit_tx)
.await?;
tracing::info!("Commit transaction published on chain: {}", txid);
let new_state = cfd.handle(CfdStateChangeEvent::CommitTxSent)?;
insert_new_cfd_state_by_order_id(cfd.order.id, new_state, &mut conn).await?;
if cfd.handle(CfdStateChangeEvent::CommitTxSent)?.is_none() {
bail!("If we can get the commit tx we should be able to transition")
}
insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
tracing::info!("Commit transaction published on chain: {}", txid);
Ok(())
}
@ -597,9 +603,18 @@ impl Actor {
let cfds = load_cfds_by_oracle_event_id(attestation.id.clone(), &mut conn).await?;
for mut cfd in cfds {
cfd.handle(CfdStateChangeEvent::OracleAttestation(
if cfd
.handle(CfdStateChangeEvent::OracleAttestation(
attestation.clone().into(),
))?;
))?
.is_none()
{
// if we don't transition to a new state after oracle attestation we ignore the cfd
// this is for cases where we cannot handle the attestation which should be in a
// final state
continue;
}
insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn).await?;
self.try_cet_publication(cfd).await?;
@ -617,8 +632,12 @@ impl Actor {
let txid = self.wallet.try_broadcast_transaction(cet).await?;
tracing::info!("CET published with txid {}", txid);
cfd.handle(CfdStateChangeEvent::CetSent)?;
insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state, &mut conn).await?;
if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() {
bail!("If we can get the CET we should be able to transition")
}
insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn)
.await?;
}
Err(not_ready_yet) => {
tracing::debug!("{:#}", not_ready_yet);

Loading…
Cancel
Save