Browse Source

Bundle monitoring events that change cfd state

This goes towards handling all cfd state transitions in one place, so it becomes easier to reason about it.
Currently, state transitions are scattered over the code of the cfd actor.
This change bundles the actual transition in the cfd itself.
Inserting the new state into the db is still in the responsibility of the caller.

note: the `monitor` import in `taker.rs` should technically not be part of this commit, but was needed because I decided that the `CfdMonitoringEvent` should live in the `monitor` actor. But since that is not part of the `Cfd` in `model` the taker needs to know about the `monitor`.
fix-bad-api-calls
Daniel Karzel 3 years ago
parent
commit
4fb84cc58f
No known key found for this signature in database GPG Key ID: 30C3FC2E438ADB6E
  1. 287
      daemon/src/maker_cfd.rs
  2. 189
      daemon/src/model/cfd.rs
  3. 67
      daemon/src/monitor.rs
  4. 1
      daemon/src/taker.rs

287
daemon/src/maker_cfd.rs

@ -4,16 +4,13 @@ use crate::db::{
load_cfd_by_order_id, load_order_by_id,
};
use crate::maker_inc_connections::TakerCommand;
use crate::model::cfd::{CetStatus, Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId};
use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId};
use crate::model::{TakerId, Usd};
use crate::monitor::{
CetTimelockExpired, CommitFinality, LockFinality, MonitorParams, RefundFinality,
RefundTimelockExpired,
};
use crate::monitor::{CfdMonitoringEvent, MonitorParams};
use crate::wallet::Wallet;
use crate::wire::SetupMsg;
use crate::{maker_inc_connections, monitor, setup_contract_actor};
use anyhow::{bail, Result};
use anyhow::Result;
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
use bdk::bitcoin::{Amount, PublicKey};
@ -414,209 +411,18 @@ impl Actor {
Ok(())
}
async fn handle_lock_finality(&mut self, msg: LockFinality) -> Result<()> {
let order_id = msg.0;
tracing::debug!(%order_id, "Lock transaction has reached finality");
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
use CfdState::*;
let dlc = match cfd.state {
PendingOpen { dlc, .. } => dlc,
OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. }
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => bail!("Did not expect lock finality yet: ignoring"),
Open { .. } | OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => {
bail!("State already assumes lock finality: ignoring")
}
};
insert_new_cfd_state_by_order_id(
msg.0,
CfdState::Open {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
},
&mut conn,
)
.await?;
Ok(())
}
async fn handle_commit_finality(&mut self, msg: CommitFinality) -> Result<()> {
let order_id = msg.0;
tracing::debug!(%order_id, "Commit transaction has reached finality");
async fn handle_monitoring_event(&mut self, event: CfdMonitoringEvent) -> Result<()> {
let order_id = event.order_id();
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
use CfdState::*;
let dlc = match cfd.state {
Open { dlc, .. } => dlc,
PendingOpen { dlc, .. } => {
tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead");
dlc
}
OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. }
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => bail!("Did not expect commit finality yet: ignoring"),
OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => {
bail!("State already assumes commit finality: ignoring")
}
};
let new_state = cfd.transition_to(CfdStateChangeEvent::Monitor(event))?;
insert_new_cfd_state_by_order_id(
msg.0,
CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::Unprepared,
},
&mut conn,
)
.await?;
Ok(())
}
async fn handle_cet_timelock_expired(&mut self, msg: CetTimelockExpired) -> Result<()> {
let order_id = msg.0;
tracing::debug!(%order_id, "CET timelock has expired");
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
use CfdState::*;
let new_state = match cfd.state {
CfdState::OpenCommitted {
dlc,
cet_status: CetStatus::Unprepared,
..
} => CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::TimelockExpired,
},
CfdState::OpenCommitted {
dlc,
cet_status: CetStatus::OracleSigned(price),
..
} => CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::Ready(price),
},
PendingOpen { dlc, .. } => {
tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead");
CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::TimelockExpired,
}
}
Open { dlc, .. } => {
tracing::debug!(%order_id, "Was not aware of commit TX broadcast, jumping ahead");
CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::TimelockExpired,
}
}
OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. }
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => bail!("Did not expect CET timelock expiry yet: ignoring"),
OpenCommitted {
cet_status: CetStatus::TimelockExpired,
..
}
| OpenCommitted {
cet_status: CetStatus::Ready(_),
..
} => bail!("State already assumes CET timelock expiry: ignoring"),
MustRefund { .. } | Refunded { .. } => {
bail!("Refund path does not care about CET timelock expiry: ignoring")
}
};
insert_new_cfd_state_by_order_id(msg.0, new_state, &mut conn).await?;
Ok(())
}
async fn handle_refund_timelock_expired(&mut self, msg: RefundTimelockExpired) -> Result<()> {
let order_id = msg.0;
tracing::debug!(%order_id, "Refund timelock has expired");
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
use CfdState::*;
let dlc = match cfd.state {
OpenCommitted { dlc, .. } => {
insert_new_cfd_state_by_order_id(
msg.0,
MustRefund {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc: dlc.clone(),
},
&mut conn,
)
.await?;
dlc
}
MustRefund { .. } | Refunded { .. } => {
bail!("State already assumes refund timelock expiry: ignoring")
}
OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. }
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => bail!("Did not expect refund timelock expiry yet: ignoring"),
PendingOpen { dlc, .. } => {
tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead");
dlc
}
Open { dlc, .. } => {
tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead");
dlc
}
};
insert_new_cfd_state_by_order_id(
msg.0,
MustRefund {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc: dlc.clone(),
},
&mut conn,
)
.await?;
insert_new_cfd_state_by_order_id(order_id, new_state.clone(), &mut conn).await?;
// TODO: Consider sending a message to ourselves to trigger broadcasting refund?
if let CfdState::MustRefund { dlc, .. } = new_state {
let sig_hash = spending_tx_sighash(
&dlc.refund.0,
&dlc.commit.2,
@ -642,48 +448,7 @@ impl Actor {
.await?;
tracing::info!("Refund transaction published on chain: {}", txid);
Ok(())
}
async fn handle_refund_finality(&mut self, msg: RefundFinality) -> Result<()> {
let order_id = msg.0;
tracing::debug!(%order_id, "Refund transaction has reached finality");
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
use CfdState::*;
match cfd.state {
MustRefund { .. } => (),
OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. }
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => bail!("Did not expect refund finality yet: ignoring"),
PendingOpen { .. } => {
tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead");
}
Open { .. } => {
tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead");
}
OpenCommitted { .. } => {
tracing::debug!(%order_id, "Was waiting on refund timelock expiry, jumping ahead");
}
Refunded { .. } => bail!("State already assumes refund finality: ignoring"),
};
insert_new_cfd_state_by_order_id(
msg.0,
CfdState::Refunded {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
&mut conn,
)
.await?;
Ok(())
}
@ -739,37 +504,9 @@ impl Handler<CfdSetupCompleted> for Actor {
}
#[async_trait]
impl Handler<LockFinality> for Actor {
async fn handle(&mut self, msg: LockFinality, _ctx: &mut Context<Self>) {
log_error!(self.handle_lock_finality(msg))
}
}
#[async_trait]
impl Handler<CommitFinality> for Actor {
async fn handle(&mut self, msg: CommitFinality, _ctx: &mut Context<Self>) {
log_error!(self.handle_commit_finality(msg))
}
}
#[async_trait]
impl Handler<CetTimelockExpired> for Actor {
async fn handle(&mut self, msg: CetTimelockExpired, _ctx: &mut Context<Self>) {
log_error!(self.handle_cet_timelock_expired(msg))
}
}
#[async_trait]
impl Handler<RefundTimelockExpired> for Actor {
async fn handle(&mut self, msg: RefundTimelockExpired, _ctx: &mut Context<Self>) {
log_error!(self.handle_refund_timelock_expired(msg))
}
}
#[async_trait]
impl Handler<RefundFinality> for Actor {
async fn handle(&mut self, msg: RefundFinality, _ctx: &mut Context<Self>) {
log_error!(self.handle_refund_finality(msg))
impl Handler<CfdMonitoringEvent> for Actor {
async fn handle(&mut self, msg: CfdMonitoringEvent, _ctx: &mut Context<Self>) {
log_error!(self.handle_monitoring_event(msg))
}
}

189
daemon/src/model/cfd.rs

@ -1,5 +1,6 @@
use crate::model::{Leverage, Position, TakerId, TradingPair, Usd};
use anyhow::Result;
use crate::monitor::CfdMonitoringEvent;
use anyhow::{bail, Result};
use bdk::bitcoin::secp256k1::{SecretKey, Signature};
use bdk::bitcoin::{Address, Amount, PublicKey, Transaction};
use bdk::descriptor::Descriptor;
@ -363,6 +364,192 @@ impl Cfd {
#[allow(dead_code)]
pub const CET_TIMELOCK: u32 = 12;
pub fn transition_to(&self, event: CfdStateChangeEvent) -> Result<CfdState> {
use CfdState::*;
// TODO: Display impl
tracing::info!("Cfd state change event {:?}", event);
let order_id = self.order.id;
let new_state = match event {
CfdStateChangeEvent::Monitor(event) => match event {
CfdMonitoringEvent::LockFinality(_) => match self.state.clone() {
PendingOpen { dlc, .. } => CfdState::Open {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
},
OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. }
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => bail!("Did not expect lock finality yet: ignoring"),
Open { .. } | OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => {
bail!("State already assumes lock finality: ignoring")
}
},
CfdMonitoringEvent::CommitFinality(_) => {
let dlc = match self.state.clone() {
Open { dlc, .. } => dlc,
PendingOpen { dlc, .. } => {
tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead");
dlc
}
OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. }
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => {
bail!("Did not expect commit finality yet: ignoring")
}
OpenCommitted { .. } | MustRefund { .. } | Refunded { .. } => {
bail!("State already assumes commit finality: ignoring")
}
};
OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::Unprepared,
}
}
CfdMonitoringEvent::CetTimelockExpired(_) => match self.state.clone() {
CfdState::OpenCommitted {
dlc,
cet_status: CetStatus::Unprepared,
..
} => CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::TimelockExpired,
},
CfdState::OpenCommitted {
dlc,
cet_status: CetStatus::OracleSigned(price),
..
} => CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::Ready(price),
},
PendingOpen { dlc, .. } => {
tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead");
CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::TimelockExpired,
}
}
Open { dlc, .. } => {
tracing::debug!(%order_id, "Was not aware of commit TX broadcast, jumping ahead");
CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::TimelockExpired,
}
}
OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. }
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => {
bail!("Did not expect CET timelock expiry yet: ignoring")
}
OpenCommitted {
cet_status: CetStatus::TimelockExpired,
..
}
| OpenCommitted {
cet_status: CetStatus::Ready(_),
..
} => bail!("State already assumes CET timelock expiry: ignoring"),
MustRefund { .. } | Refunded { .. } => {
bail!("Refund path does not care about CET timelock expiry: ignoring")
}
},
CfdMonitoringEvent::RefundTimelockExpired(_) => {
let dlc = match self.state.clone() {
OpenCommitted { dlc, .. } => dlc,
MustRefund { .. } | Refunded { .. } => {
bail!("State already assumes refund timelock expiry: ignoring")
}
OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. }
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => {
bail!("Did not expect refund timelock expiry yet: ignoring")
}
PendingOpen { dlc, .. } => {
tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead");
dlc
}
Open { dlc, .. } => {
tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead");
dlc
}
};
MustRefund {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
}
}
CfdMonitoringEvent::RefundFinality(_) => {
match self.state {
MustRefund { .. } => (),
OutgoingOrderRequest { .. } => unreachable!("taker-only state"),
IncomingOrderRequest { .. }
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. } => {
bail!("Did not expect refund finality yet: ignoring")
}
PendingOpen { .. } => {
tracing::debug!(%order_id, "Was waiting on lock finality, jumping ahead");
}
Open { .. } => {
tracing::debug!(%order_id, "Was waiting on CET timelock expiry, jumping ahead");
}
OpenCommitted { .. } => {
tracing::debug!(%order_id, "Was waiting on refund timelock expiry, jumping ahead");
}
Refunded { .. } => bail!("State already assumes refund finality: ignoring"),
}
Refunded {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
}
}
},
};
Ok(new_state)
}
}
#[derive(Debug, Clone)]
pub enum CfdStateChangeEvent {
// TODO: groupd other events by actors into enums and add them here so we can bundle all
// transitions into cfd.transition_to(...)
Monitor(CfdMonitoringEvent),
}
fn calculate_profit(

67
daemon/src/monitor.rs

@ -23,12 +23,7 @@ pub struct MonitorParams {
impl<T> Actor<T>
where
T: xtra::Actor
+ xtra::Handler<LockFinality>
+ xtra::Handler<CommitFinality>
+ xtra::Handler<CetTimelockExpired>
+ xtra::Handler<RefundTimelockExpired>
+ xtra::Handler<RefundFinality>,
T: xtra::Actor + xtra::Handler<CfdMonitoringEvent>,
{
pub fn new(
electrum_rpc_url: &str,
@ -59,7 +54,7 @@ where
lock_subscription.wait_until_final().await.unwrap();
cfd_actor_addr
.do_send_async(LockFinality(id))
.do_send_async(CfdMonitoringEvent::LockFinality(id))
.await
.unwrap();
}
@ -77,7 +72,7 @@ where
commit_subscription.wait_until_final().await.unwrap();
cfd_actor_addr
.do_send_async(CommitFinality(id))
.do_send_async(CfdMonitoringEvent::CommitFinality(id))
.await
.unwrap();
}
@ -93,7 +88,7 @@ where
.unwrap();
cfd_actor_addr
.do_send_async(CetTimelockExpired(id))
.do_send_async(CfdMonitoringEvent::CetTimelockExpired(id))
.await
.unwrap();
}
@ -110,7 +105,7 @@ where
.unwrap();
cfd_actor_addr
.do_send_async(RefundTimelockExpired(id))
.do_send_async(CfdMonitoringEvent::RefundTimelockExpired(id))
.await
.unwrap();
}
@ -126,7 +121,7 @@ where
refund_subscription.wait_until_final().await.unwrap();
cfd_actor_addr
.do_send_async(RefundFinality(id))
.do_send_async(CfdMonitoringEvent::RefundFinality(id))
.await
.unwrap();
}
@ -147,33 +142,30 @@ impl xtra::Message for StartMonitoring {
type Result = ();
}
pub struct LockFinality(pub OrderId);
impl xtra::Message for LockFinality {
type Result = ();
}
pub struct CommitFinality(pub OrderId);
impl xtra::Message for CommitFinality {
type Result = ();
}
pub struct CetTimelockExpired(pub OrderId);
impl xtra::Message for CetTimelockExpired {
type Result = ();
#[derive(Debug, Clone)]
pub enum CfdMonitoringEvent {
LockFinality(OrderId),
CommitFinality(OrderId),
CetTimelockExpired(OrderId),
RefundTimelockExpired(OrderId),
RefundFinality(OrderId),
}
pub struct RefundTimelockExpired(pub OrderId);
impl xtra::Message for RefundTimelockExpired {
type Result = ();
impl CfdMonitoringEvent {
pub fn order_id(&self) -> OrderId {
let order_id = match self {
CfdMonitoringEvent::LockFinality(order_id) => order_id,
CfdMonitoringEvent::CommitFinality(order_id) => order_id,
CfdMonitoringEvent::CetTimelockExpired(order_id) => order_id,
CfdMonitoringEvent::RefundTimelockExpired(order_id) => order_id,
CfdMonitoringEvent::RefundFinality(order_id) => order_id,
};
*order_id
}
}
pub struct RefundFinality(pub OrderId);
impl xtra::Message for RefundFinality {
impl xtra::Message for CfdMonitoringEvent {
type Result = ();
}
@ -192,12 +184,7 @@ impl<T> xtra::Actor for Actor<T> where T: xtra::Actor {}
#[async_trait]
impl<T> xtra::Handler<StartMonitoring> for Actor<T>
where
T: xtra::Actor
+ xtra::Handler<LockFinality>
+ xtra::Handler<CommitFinality>
+ xtra::Handler<CetTimelockExpired>
+ xtra::Handler<RefundTimelockExpired>
+ xtra::Handler<RefundFinality>,
T: xtra::Actor + xtra::Handler<CfdMonitoringEvent>,
{
async fn handle(&mut self, msg: StartMonitoring, _ctx: &mut xtra::Context<Self>) {
log_error!(self.handle_start_monitoring(msg));

1
daemon/src/taker.rs

@ -24,6 +24,7 @@ mod db;
mod keypair;
mod logger;
mod model;
mod monitor;
mod routes;
mod routes_taker;
mod seed;

Loading…
Cancel
Save