Browse Source

Merge #256

256: 2nd cet trial fixes r=da-kami a=da-kami

I wish I could have had the brainpower and dicipline to do the last commit in multiple atomic commits. 
Quite a nightmare "fixing" all these things. Too many moving parts and complex code. 

`@thomaseizinger` maybe you can help me split it up?

Co-authored-by: Daniel Karzel <daniel@comit.network>
refactor/no-log-handler
bors[bot] 3 years ago
committed by GitHub
parent
commit
663cdec7e7
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      daemon/src/db.rs
  2. 57
      daemon/src/maker_cfd.rs
  3. 105
      daemon/src/model/cfd.rs
  4. 4
      daemon/src/monitor.rs
  5. 6
      daemon/src/oracle.rs
  6. 60
      daemon/src/taker_cfd.rs

5
daemon/src/db.rs

@ -172,10 +172,9 @@ pub async fn insert_new_cfd_state_by_order_id(
.await
.context("loading latest state failed")?;
// make sure that the new state is different than the current one to avoid that we save the same
// state twice
if mem::discriminant(&latest_cfd_state_in_db) == mem::discriminant(&new_state) {
tracing::warn!(
// Since we have states where we add information this happens quite frequently
tracing::trace!(
"Same state transition for cfd with order_id {}: {}",
order_id,
latest_cfd_state_in_db

57
daemon/src/maker_cfd.rs

@ -12,7 +12,7 @@ use crate::model::{TakerId, Usd};
use crate::monitor::MonitorParams;
use crate::wallet::Wallet;
use crate::{maker_inc_connections, monitor, oracle, setup_contract, wire};
use anyhow::{Context as _, Result};
use anyhow::{bail, Context as _, Result};
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
use futures::channel::mpsc;
@ -617,8 +617,9 @@ impl Actor {
tracing::info!("Commit transaction published on chain: {}", txid);
let new_state = cfd.handle(CfdStateChangeEvent::CommitTxSent)?;
insert_new_cfd_state_by_order_id(cfd.order.id, new_state, &mut conn).await?;
if let Some(new_state) = cfd.handle(CfdStateChangeEvent::CommitTxSent)? {
insert_new_cfd_state_by_order_id(cfd.order.id, new_state, &mut conn).await?;
}
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
@ -785,15 +786,21 @@ impl Actor {
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let new_state = cfd.handle(CfdStateChangeEvent::Monitor(event))?;
insert_new_cfd_state_by_order_id(order_id, new_state.clone(), &mut conn).await?;
if cfd.handle(CfdStateChangeEvent::Monitor(event))?.is_none() {
// early exit if there was not state change
// this is for cases where we are already in a final state
return Ok(());
}
insert_new_cfd_state_by_order_id(order_id, cfd.state.clone(), &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
// TODO: code duplication maker/taker
if let CfdState::OpenCommitted { .. } = new_state {
if let CfdState::OpenCommitted { .. } = cfd.state {
self.try_cet_publication(cfd).await?;
} else if let CfdState::MustRefund { .. } = new_state {
} else if let CfdState::MustRefund { .. } = cfd.state {
let signed_refund_tx = cfd.refund_tx()?;
let txid = self
.wallet
@ -816,10 +823,26 @@ impl Actor {
let cfds = load_cfds_by_oracle_event_id(attestation.id.clone(), &mut conn).await?;
for mut cfd in cfds {
cfd.handle(CfdStateChangeEvent::OracleAttestation(attestation.clone()))?;
if cfd
.handle(CfdStateChangeEvent::OracleAttestation(
attestation.clone().into(),
))?
.is_none()
{
// if we don't transition to a new state after oracle attestation we ignore the cfd
// this is for cases where we cannot handle the attestation which should be in a
// final state
continue;
}
insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
self.try_cet_publication(cfd).await?;
if let Err(e) = self.try_cet_publication(cfd).await {
tracing::error!("Error when trying to publish CET: {:#}", e);
continue;
}
}
Ok(())
@ -834,14 +857,18 @@ impl Actor {
let txid = self.wallet.try_broadcast_transaction(cet).await?;
tracing::info!("CET published with txid {}", txid);
cfd.handle(CfdStateChangeEvent::CetSent)?;
insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state, &mut conn).await?;
if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() {
bail!("If we can get the CET we should be able to transition")
}
insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
}
Err(not_ready_yet) => {
tracing::debug!(
"Attestation received but we are not ready to publish it yet: {:#}",
not_ready_yet
);
tracing::debug!("{:#}", not_ready_yet);
return Ok(());
}
};

105
daemon/src/model/cfd.rs

@ -1,6 +1,5 @@
use crate::model::{Leverage, OracleEventId, Percent, Position, TakerId, TradingPair, Usd};
use crate::monitor;
use crate::oracle::Attestation;
use crate::{monitor, oracle};
use anyhow::{bail, Context, Result};
use bdk::bitcoin::secp256k1::{SecretKey, Signature};
use bdk::bitcoin::{Address, Amount, PublicKey, Script, SignedAmount, Transaction, Txid};
@ -281,6 +280,33 @@ pub enum CfdState {
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Attestation {
pub id: OracleEventId,
pub price: u64,
pub scalars: Vec<SecretKey>,
}
impl From<oracle::Attestation> for Attestation {
fn from(attestation: oracle::Attestation) -> Self {
Attestation {
id: attestation.id,
price: attestation.price,
scalars: attestation.scalars,
}
}
}
impl From<Attestation> for oracle::Attestation {
fn from(attestation: Attestation) -> oracle::Attestation {
oracle::Attestation {
id: attestation.id,
price: attestation.price,
scalars: attestation.scalars,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", content = "payload")]
pub enum CetStatus {
@ -290,6 +316,17 @@ pub enum CetStatus {
Ready(Attestation),
}
impl fmt::Display for CetStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
CetStatus::Unprepared => write!(f, "Unprepared"),
CetStatus::TimelockExpired => write!(f, "TimelockExpired"),
CetStatus::OracleSigned(_) => write!(f, "OracleSigned"),
CetStatus::Ready(_) => write!(f, "Ready"),
}
}
}
impl CfdState {
fn get_common(&self) -> CfdStateCommon {
let common = match self {
@ -505,7 +542,7 @@ impl Cfd {
#[allow(dead_code)]
pub const CET_TIMELOCK: u32 = 12;
pub fn handle(&mut self, event: CfdStateChangeEvent) -> Result<CfdState> {
pub fn handle(&mut self, event: CfdStateChangeEvent) -> Result<Option<CfdState>> {
use CfdState::*;
// TODO: Display impl
@ -513,6 +550,16 @@ impl Cfd {
let order_id = self.order.id;
// early exit if already final
if let SetupFailed { .. } | Closed { .. } | Refunded { .. } = self.state.clone() {
tracing::trace!(
"Ignoring event {:?} because cfd already in state {}",
event,
self.state
);
return Ok(None);
}
let new_state = match event {
CfdStateChangeEvent::Monitor(event) => match event {
monitor::Event::LockFinality(_) => {
@ -543,11 +590,20 @@ impl Cfd {
}
}
monitor::Event::CommitFinality(_) => {
let dlc = if let PendingCommit { dlc, .. } = self.state.clone() {
dlc
} else if let PendingOpen { dlc, .. } | Open { dlc, .. } = self.state.clone() {
let (dlc, attestation) = if let PendingCommit {
dlc, attestation, ..
} = self.state.clone()
{
(dlc, attestation)
} else if let PendingOpen {
dlc, attestation, ..
}
| Open {
dlc, attestation, ..
} = self.state.clone()
{
tracing::debug!(%order_id, "Was in unexpected state {}, jumping ahead to OpenCommitted", self.state);
dlc
(dlc, attestation)
} else {
bail!(
"Cannot transition to OpenCommitted because of unexpected state {}",
@ -560,7 +616,11 @@ impl Cfd {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::Unprepared,
cet_status: if let Some(attestation) = attestation {
CetStatus::OracleSigned(attestation)
} else {
CetStatus::Unprepared
},
}
}
monitor::Event::CetTimelockExpired(_) => match self.state.clone() {
@ -620,7 +680,7 @@ impl Cfd {
dlc
} else {
bail!(
"Cannot transition to OpenCommitted because of unexpected state {}",
"Cannot transition to MustRefund because of unexpected state {}",
self.state
)
};
@ -681,7 +741,7 @@ impl Cfd {
}
}
CfdStateChangeEvent::OracleAttestation(attestation) => match self.state.clone() {
CfdState::Open { dlc, .. } => CfdState::Open {
CfdState::PendingOpen { dlc, .. } | CfdState::Open { dlc, .. } => CfdState::Open {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
@ -741,7 +801,7 @@ impl Cfd {
self.state = new_state.clone();
Ok(new_state)
Ok(Some(new_state))
}
pub fn refund_tx(&self) -> Result<Transaction> {
@ -822,10 +882,13 @@ impl Cfd {
cet_status: CetStatus::Ready(attestation),
..
} => (dlc, attestation),
CfdState::OpenCommitted { .. }
| CfdState::Open { .. }
| CfdState::PendingCommit { .. } => {
return Ok(Err(NotReadyYet));
CfdState::OpenCommitted { cet_status, .. } => {
return Ok(Err(NotReadyYet { cet_status }));
}
CfdState::Open { .. } | CfdState::PendingCommit { .. } => {
return Ok(Err(NotReadyYet {
cet_status: CetStatus::Unprepared,
}));
}
_ => bail!("Cannot publish CET in state {}", self.state.clone()),
};
@ -919,9 +982,11 @@ impl Cfd {
}
}
#[derive(thiserror::Error, Debug, Clone, Copy)]
#[error("The cfd is not ready for CET publication yet")]
pub struct NotReadyYet;
#[derive(thiserror::Error, Debug, Clone)]
#[error("The cfd is not ready for CET publication yet: {cet_status}")]
pub struct NotReadyYet {
cet_status: CetStatus,
}
#[derive(Debug, Clone)]
pub enum CfdStateChangeEvent {
@ -1301,9 +1366,9 @@ pub struct Dlc {
pub cets: HashMap<OracleEventId, Vec<Cet>>,
pub refund: (Transaction, Signature),
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_sat")]
pub maker_lock_amount: Amount,
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_btc")]
#[serde(with = "::bdk::bitcoin::util::amount::serde::as_sat")]
pub taker_lock_amount: Amount,
pub revoked_commit: Vec<RevokedCommit>,

4
daemon/src/monitor.rs

@ -111,7 +111,7 @@ where
actor.monitor_refund_finality(&params,cfd.order.id);
}
CetStatus::OracleSigned(attestation) => {
actor.monitor_cet_finality(map_cets(dlc.cets, dlc.maker_address.script_pubkey()), attestation, cfd.order.id)?;
actor.monitor_cet_finality(map_cets(dlc.cets, dlc.maker_address.script_pubkey()), attestation.into(), cfd.order.id)?;
actor.monitor_commit_cet_timelock(&params, cfd.order.id);
actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id);
@ -121,7 +121,7 @@ where
actor.monitor_refund_finality(&params,cfd.order.id);
}
CetStatus::Ready(attestation) => {
actor.monitor_cet_finality(map_cets(dlc.cets, dlc.maker_address.script_pubkey()), attestation, cfd.order.id)?;
actor.monitor_cet_finality(map_cets(dlc.cets, dlc.maker_address.script_pubkey()), attestation.into(), cfd.order.id)?;
actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id);
}

6
daemon/src/oracle.rs

@ -10,7 +10,7 @@ use reqwest::StatusCode;
use rocket::time::format_description::FormatItem;
use rocket::time::macros::format_description;
use rocket::time::{Duration, OffsetDateTime, Time};
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use std::collections::{HashMap, HashSet};
use std::ops::Add;
use time::ext::NumericalDuration;
@ -133,7 +133,7 @@ where
}
};
let attestation = dbg!(res).json::<Attestation>().await?;
let attestation = res.json::<Attestation>().await?;
self.cfd_actor_address
.clone()
@ -284,7 +284,7 @@ impl From<Announcement> for cfd_protocol::Announcement {
// TODO: Implement real deserialization once price attestation is
// implemented in `olivia`
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[derive(Debug, Clone, Deserialize, PartialEq)]
#[serde(try_from = "olivia_api::Response")]
pub struct Attestation {
pub id: OracleEventId,

60
daemon/src/taker_cfd.rs

@ -12,7 +12,7 @@ use crate::monitor::{self, MonitorParams};
use crate::wallet::Wallet;
use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg};
use crate::{oracle, send_to_socket, setup_contract, wire};
use anyhow::{Context as _, Result};
use anyhow::{bail, Context as _, Result};
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
use futures::channel::mpsc;
@ -541,17 +541,21 @@ impl Actor {
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let new_state = cfd.handle(CfdStateChangeEvent::Monitor(event))?;
if cfd.handle(CfdStateChangeEvent::Monitor(event))?.is_none() {
// early exit if there was not state change
// this is for cases where we are already in a final state
return Ok(());
}
insert_new_cfd_state_by_order_id(order_id, new_state.clone(), &mut conn).await?;
insert_new_cfd_state_by_order_id(order_id, cfd.state.clone(), &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
// TODO: code duplicateion maker/taker
if let CfdState::OpenCommitted { .. } = new_state {
if let CfdState::OpenCommitted { .. } = cfd.state {
self.try_cet_publication(cfd).await?;
} else if let CfdState::MustRefund { .. } = new_state {
} else if let CfdState::MustRefund { .. } = cfd.state {
let signed_refund_tx = cfd.refund_tx()?;
let txid = self
.wallet
@ -576,14 +580,16 @@ impl Actor {
.try_broadcast_transaction(signed_commit_tx)
.await?;
tracing::info!("Commit transaction published on chain: {}", txid);
let new_state = cfd.handle(CfdStateChangeEvent::CommitTxSent)?;
insert_new_cfd_state_by_order_id(cfd.order.id, new_state, &mut conn).await?;
if cfd.handle(CfdStateChangeEvent::CommitTxSent)?.is_none() {
bail!("If we can get the commit tx we should be able to transition")
}
insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
tracing::info!("Commit transaction published on chain: {}", txid);
Ok(())
}
@ -597,10 +603,26 @@ impl Actor {
let cfds = load_cfds_by_oracle_event_id(attestation.id.clone(), &mut conn).await?;
for mut cfd in cfds {
cfd.handle(CfdStateChangeEvent::OracleAttestation(attestation.clone()))?;
if cfd
.handle(CfdStateChangeEvent::OracleAttestation(
attestation.clone().into(),
))?
.is_none()
{
// if we don't transition to a new state after oracle attestation we ignore the cfd
// this is for cases where we cannot handle the attestation which should be in a
// final state
continue;
}
insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn).await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
self.try_cet_publication(cfd).await?;
if let Err(e) = self.try_cet_publication(cfd).await {
tracing::error!("Error when trying to publish CET: {:#}", e);
continue;
}
}
Ok(())
@ -615,14 +637,18 @@ impl Actor {
let txid = self.wallet.try_broadcast_transaction(cet).await?;
tracing::info!("CET published with txid {}", txid);
cfd.handle(CfdStateChangeEvent::CetSent)?;
insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state, &mut conn).await?;
if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() {
bail!("If we can get the CET we should be able to transition")
}
insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
}
Err(not_ready_yet) => {
tracing::debug!(
"Attestation received but we are not ready to publish it yet: {:#}",
not_ready_yet
);
tracing::debug!("{:#}", not_ready_yet);
return Ok(());
}
};

Loading…
Cancel
Save