Browse Source

Wire in automatic CET publication

- Handles the Oracle's attestation in the cfd actor, transition according to the knowledge we have about the timelock expiry already.
- Tries to publish the CET if we are in `CetStatus::Ready`, i.e. both the attestation and timelock expiry happened. We try publishing if either event happened, and just print a log in case it's not ready yet.
- Tries to re-publish CET if we are in `PendingCet` upon restart.
- Re-triggers CET monitoring upon startup.
- Extends any state after `ContractSetup` to be able to store the attestation. (see below for ideas to change that)

Things that could be done different:
Currently we are carrying on the attestation through a lot of states - because we cannot just transition the user to `OpenCommitted` because it is a user decision to go for commit, but we have to keep the attestation around once it happened.
To reduce the state complexity, we could store the attestation independent of the state, but associated with the cfd.
This would make things a lot simpler, but we would then always have to go to the database to check if the attestation is already around (which might make other parts more complex).
feature/integration-tests
Daniel Karzel 3 years ago
parent
commit
159240cc9f
No known key found for this signature in database GPG Key ID: 30C3FC2E438ADB6E
  1. 196
      daemon/sqlx-data.json
  2. 231
      daemon/src/db.rs
  3. 8
      daemon/src/housekeeping.rs
  4. 55
      daemon/src/maker_cfd.rs
  5. 7
      daemon/src/model.rs
  6. 245
      daemon/src/model/cfd.rs
  7. 120
      daemon/src/monitor.rs
  8. 14
      daemon/src/oracle.rs
  9. 21
      daemon/src/setup_contract.rs
  10. 57
      daemon/src/taker_cfd.rs
  11. 4
      daemon/src/to_sse_event.rs
  12. 10
      frontend/src/components/Types.tsx

196
daemon/sqlx-data.json

@ -1,82 +1,95 @@
{
"db": "SQLite",
"41fb3bea22bde82a79aee6096d579a16494a20cfcfb2e8cfffe0a56073460bbf": {
"query": "\n select\n cfds.id as cfd_id,\n orders.uuid as order_id,\n orders.initial_price as price,\n orders.min_quantity as min_quantity,\n orders.max_quantity as max_quantity,\n orders.leverage as leverage,\n orders.trading_pair as trading_pair,\n orders.position as position,\n orders.origin as origin,\n orders.liquidation_price as liquidation_price,\n orders.creation_timestamp as creation_timestamp,\n orders.term as term,\n orders.oracle_event_id,\n cfds.quantity_usd as quantity_usd,\n cfd_states.state as state\n from cfds as cfds\n inner join orders as orders on cfds.order_id = orders.id\n inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id\n where cfd_states.state in (\n select\n state\n from cfd_states\n where cfd_id = cfds.id\n order by id desc\n limit 1\n )\n ",
"50abbb297394739ec9d85917f8c32aa8bcfa0bfe140b24e9eeda4ce8d30d4f8d": {
"query": "\n select\n state\n from cfd_states\n where cfd_id = ?\n order by id desc\n limit 1;\n ",
"describe": {
"columns": [
{
"name": "cfd_id",
"name": "state",
"ordinal": 0,
"type_info": "Int64"
},
"type_info": "Text"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false
]
}
},
"63c8fd1a1512b55c19feea463dbb6a0fdf98a325801ba8677e2a33fd3ee1ebb9": {
"query": "\n select\n orders.uuid as order_id,\n orders.initial_price as price,\n orders.min_quantity as min_quantity,\n orders.max_quantity as max_quantity,\n orders.leverage as leverage,\n orders.trading_pair as trading_pair,\n orders.position as position,\n orders.origin as origin,\n orders.liquidation_price as liquidation_price,\n orders.creation_timestamp as creation_timestamp,\n orders.term as term,\n orders.oracle_event_id,\n cfds.quantity_usd as quantity_usd,\n cfd_states.state as state\n from cfds as cfds\n inner join orders as orders on cfds.order_id = orders.id\n inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id\n where cfd_states.state in (\n select\n state\n from cfd_states\n where cfd_id = cfds.id\n order by id desc\n limit 1\n )\n ",
"describe": {
"columns": [
{
"name": "order_id",
"ordinal": 1,
"ordinal": 0,
"type_info": "Text"
},
{
"name": "price",
"ordinal": 2,
"ordinal": 1,
"type_info": "Text"
},
{
"name": "min_quantity",
"ordinal": 3,
"ordinal": 2,
"type_info": "Text"
},
{
"name": "max_quantity",
"ordinal": 4,
"ordinal": 3,
"type_info": "Text"
},
{
"name": "leverage",
"ordinal": 5,
"ordinal": 4,
"type_info": "Int64"
},
{
"name": "trading_pair",
"ordinal": 6,
"ordinal": 5,
"type_info": "Text"
},
{
"name": "position",
"ordinal": 7,
"ordinal": 6,
"type_info": "Text"
},
{
"name": "origin",
"ordinal": 8,
"ordinal": 7,
"type_info": "Text"
},
{
"name": "liquidation_price",
"ordinal": 9,
"ordinal": 8,
"type_info": "Text"
},
{
"name": "creation_timestamp",
"ordinal": 10,
"ordinal": 9,
"type_info": "Text"
},
{
"name": "term",
"ordinal": 11,
"ordinal": 10,
"type_info": "Text"
},
{
"name": "oracle_event_id",
"ordinal": 12,
"ordinal": 11,
"type_info": "Text"
},
{
"name": "quantity_usd",
"ordinal": 13,
"ordinal": 12,
"type_info": "Text"
},
{
"name": "state",
"ordinal": 14,
"ordinal": 13,
"type_info": "Text"
}
],
@ -97,25 +110,6 @@
false,
false,
false,
false,
false
]
}
},
"50abbb297394739ec9d85917f8c32aa8bcfa0bfe140b24e9eeda4ce8d30d4f8d": {
"query": "\n select\n state\n from cfd_states\n where cfd_id = ?\n order by id desc\n limit 1;\n ",
"describe": {
"columns": [
{
"name": "state",
"ordinal": 0,
"type_info": "Text"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false
]
}
@ -158,83 +152,174 @@
"nullable": []
}
},
"c404a4eebb7118fd4e716fe4155e011a52e12a3594b698ca4f0662674cd067f8": {
"query": "\n select\n cfds.id as cfd_id,\n orders.uuid as order_id,\n orders.initial_price as price,\n orders.min_quantity as min_quantity,\n orders.max_quantity as max_quantity,\n orders.leverage as leverage,\n orders.trading_pair as trading_pair,\n orders.position as position,\n orders.origin as origin,\n orders.liquidation_price as liquidation_price,\n orders.creation_timestamp as creation_timestamp,\n orders.term as term,\n orders.oracle_event_id,\n cfds.quantity_usd as quantity_usd,\n cfd_states.state as state\n from cfds as cfds\n inner join orders as orders on cfds.order_id = orders.id\n inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id\n where cfd_states.state in (\n select\n state\n from cfd_states\n where cfd_id = cfds.id\n order by id desc\n limit 1\n )\n and orders.uuid = ?\n ",
"e0d3fe192c419be4fa65199d1e024ce8794e19a847be53038b7a8a33afb36bd2": {
"query": "\n select\n orders.uuid as order_id,\n orders.initial_price as price,\n orders.min_quantity as min_quantity,\n orders.max_quantity as max_quantity,\n orders.leverage as leverage,\n orders.trading_pair as trading_pair,\n orders.position as position,\n orders.origin as origin,\n orders.liquidation_price as liquidation_price,\n orders.creation_timestamp as creation_timestamp,\n orders.term as term,\n orders.oracle_event_id,\n cfds.quantity_usd as quantity_usd,\n cfd_states.state as state\n from cfds as cfds\n inner join orders as orders on cfds.order_id = orders.id\n inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id\n where cfd_states.state in (\n select\n state\n from cfd_states\n where cfd_id = cfds.id\n order by id desc\n limit 1\n )\n and orders.uuid = ?\n ",
"describe": {
"columns": [
{
"name": "cfd_id",
"name": "order_id",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "price",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "min_quantity",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "max_quantity",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "leverage",
"ordinal": 4,
"type_info": "Int64"
},
{
"name": "trading_pair",
"ordinal": 5,
"type_info": "Text"
},
{
"name": "position",
"ordinal": 6,
"type_info": "Text"
},
{
"name": "origin",
"ordinal": 7,
"type_info": "Text"
},
{
"name": "liquidation_price",
"ordinal": 8,
"type_info": "Text"
},
{
"name": "creation_timestamp",
"ordinal": 9,
"type_info": "Text"
},
{
"name": "term",
"ordinal": 10,
"type_info": "Text"
},
{
"name": "oracle_event_id",
"ordinal": 11,
"type_info": "Text"
},
{
"name": "quantity_usd",
"ordinal": 12,
"type_info": "Text"
},
{
"name": "state",
"ordinal": 13,
"type_info": "Text"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false
]
}
},
"e45a49eb1fb4cbddc046712d9e2df2db0c18c618f0fae2133e51d82eda6bec36": {
"query": "\n select\n orders.uuid as order_id,\n orders.initial_price as price,\n orders.min_quantity as min_quantity,\n orders.max_quantity as max_quantity,\n orders.leverage as leverage,\n orders.trading_pair as trading_pair,\n orders.position as position,\n orders.origin as origin,\n orders.liquidation_price as liquidation_price,\n orders.creation_timestamp as creation_timestamp,\n orders.term as term,\n orders.oracle_event_id,\n cfds.quantity_usd as quantity_usd,\n cfd_states.state as state\n from cfds as cfds\n inner join orders as orders on cfds.order_id = orders.id\n inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id\n where cfd_states.state in (\n select\n state\n from cfd_states\n where cfd_id = cfds.id\n order by id desc\n limit 1\n )\n and orders.oracle_event_id = ?\n ",
"describe": {
"columns": [
{
"name": "order_id",
"ordinal": 1,
"ordinal": 0,
"type_info": "Text"
},
{
"name": "price",
"ordinal": 2,
"ordinal": 1,
"type_info": "Text"
},
{
"name": "min_quantity",
"ordinal": 3,
"ordinal": 2,
"type_info": "Text"
},
{
"name": "max_quantity",
"ordinal": 4,
"ordinal": 3,
"type_info": "Text"
},
{
"name": "leverage",
"ordinal": 5,
"ordinal": 4,
"type_info": "Int64"
},
{
"name": "trading_pair",
"ordinal": 6,
"ordinal": 5,
"type_info": "Text"
},
{
"name": "position",
"ordinal": 7,
"ordinal": 6,
"type_info": "Text"
},
{
"name": "origin",
"ordinal": 8,
"ordinal": 7,
"type_info": "Text"
},
{
"name": "liquidation_price",
"ordinal": 9,
"ordinal": 8,
"type_info": "Text"
},
{
"name": "creation_timestamp",
"ordinal": 10,
"ordinal": 9,
"type_info": "Text"
},
{
"name": "term",
"ordinal": 11,
"ordinal": 10,
"type_info": "Text"
},
{
"name": "oracle_event_id",
"ordinal": 12,
"ordinal": 11,
"type_info": "Text"
},
{
"name": "quantity_usd",
"ordinal": 13,
"ordinal": 12,
"type_info": "Text"
},
{
"name": "state",
"ordinal": 14,
"ordinal": 13,
"type_info": "Text"
}
],
@ -242,7 +327,6 @@
"Right": 1
},
"nullable": [
true,
false,
false,
false,

231
daemon/src/db.rs

@ -257,7 +257,6 @@ pub async fn load_cfd_by_order_id(
let row = sqlx::query!(
r#"
select
cfds.id as cfd_id,
orders.uuid as order_id,
orders.initial_price as price,
orders.min_quantity as min_quantity,
@ -335,7 +334,6 @@ pub async fn load_all_cfds(conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<
let rows = sqlx::query!(
r#"
select
cfds.id as cfd_id,
orders.uuid as order_id,
orders.initial_price as price,
orders.min_quantity as min_quantity,
@ -411,6 +409,91 @@ pub async fn load_all_cfds(conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<
Ok(cfds)
}
/// Loads all CFDs with the latest state as the CFD state
pub async fn load_cfds_by_oracle_event_id(
oracle_event_id: OracleEventId,
conn: &mut PoolConnection<Sqlite>,
) -> anyhow::Result<Vec<Cfd>> {
let rows = sqlx::query!(
r#"
select
orders.uuid as order_id,
orders.initial_price as price,
orders.min_quantity as min_quantity,
orders.max_quantity as max_quantity,
orders.leverage as leverage,
orders.trading_pair as trading_pair,
orders.position as position,
orders.origin as origin,
orders.liquidation_price as liquidation_price,
orders.creation_timestamp as creation_timestamp,
orders.term as term,
orders.oracle_event_id,
cfds.quantity_usd as quantity_usd,
cfd_states.state as state
from cfds as cfds
inner join orders as orders on cfds.order_id = orders.id
inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id
where cfd_states.state in (
select
state
from cfd_states
where cfd_id = cfds.id
order by id desc
limit 1
)
and orders.oracle_event_id = ?
"#,
oracle_event_id.0
)
.fetch_all(conn)
.await?;
let cfds = rows
.iter()
.map(|row| {
let order_id = serde_json::from_str(row.order_id.as_str()).unwrap();
let trading_pair = serde_json::from_str(row.trading_pair.as_str()).unwrap();
let position: Position = serde_json::from_str(row.position.as_str()).unwrap();
let price = serde_json::from_str(row.price.as_str()).unwrap();
let min_quantity = serde_json::from_str(row.min_quantity.as_str()).unwrap();
let max_quantity = serde_json::from_str(row.max_quantity.as_str()).unwrap();
let leverage = Leverage(row.leverage.try_into().unwrap());
let liquidation_price = serde_json::from_str(row.liquidation_price.as_str()).unwrap();
let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap();
let term = serde_json::from_str(row.term.as_str()).unwrap();
let origin: Origin = serde_json::from_str(row.origin.as_str()).unwrap();
let oracle_event_id = OracleEventId(row.oracle_event_id.clone());
let quantity = serde_json::from_str(row.quantity_usd.as_str()).unwrap();
let latest_state = serde_json::from_str(row.state.as_str()).unwrap();
let order = Order {
id: order_id,
trading_pair,
position,
price,
min_quantity,
max_quantity,
leverage,
liquidation_price,
creation_timestamp,
term,
origin,
oracle_event_id,
};
Cfd {
order,
quantity_usd: quantity,
state: latest_state,
}
})
.collect();
Ok(cfds)
}
#[cfg(test)]
mod tests {
use std::fs::File;
@ -444,18 +527,9 @@ mod tests {
let pool = setup_test_db().await;
let mut conn = pool.acquire().await.unwrap();
let order = Order::default();
let cfd = Cfd::new(
order.clone(),
Usd(dec!(1000)),
CfdState::OutgoingOrderRequest {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
);
let cfd = Cfd::default();
insert_order(&order, &mut conn).await.unwrap();
insert_order(&cfd.order, &mut conn).await.unwrap();
insert_cfd(cfd.clone(), &mut conn).await.unwrap();
let cfds_from_db = load_all_cfds(&mut conn).await.unwrap();
@ -468,20 +542,10 @@ mod tests {
let pool = setup_test_db().await;
let mut conn = pool.acquire().await.unwrap();
let order = Order::default();
let cfd = Cfd::new(
order.clone(),
Usd(dec!(1000)),
CfdState::OutgoingOrderRequest {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
);
let order_id = order.id;
let cfd = Cfd::default();
let order_id = cfd.order.id;
insert_order(&order, &mut conn).await.unwrap();
insert_order(&cfd.order, &mut conn).await.unwrap();
insert_cfd(cfd.clone(), &mut conn).await.unwrap();
let cfd_from_db = load_cfd_by_order_id(order_id, &mut conn).await.unwrap();
@ -493,62 +557,75 @@ mod tests {
let pool = setup_test_db().await;
let mut conn = pool.acquire().await.unwrap();
let order = Order::default();
let cfd = Cfd::new(
order.clone(),
Usd(dec!(1000)),
CfdState::OutgoingOrderRequest {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
);
let order_id = order.id;
let cfd = Cfd::default();
let order_id = cfd.order.id;
insert_order(&order, &mut conn).await.unwrap();
insert_order(&cfd.order, &mut conn).await.unwrap();
insert_cfd(cfd.clone(), &mut conn).await.unwrap();
let cfd_from_db = load_cfd_by_order_id(order_id, &mut conn).await.unwrap();
assert_eq!(cfd, cfd_from_db);
let order = Order::default();
let cfd = Cfd::new(
order.clone(),
Usd(dec!(1000)),
CfdState::OutgoingOrderRequest {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
);
let order_id = order.id;
let cfd = Cfd::default();
let order_id = cfd.order.id;
insert_order(&order, &mut conn).await.unwrap();
insert_order(&cfd.order, &mut conn).await.unwrap();
insert_cfd(cfd.clone(), &mut conn).await.unwrap();
let cfd_from_db = load_cfd_by_order_id(order_id, &mut conn).await.unwrap();
assert_eq!(cfd, cfd_from_db);
}
#[tokio::test]
async fn test_insert_and_load_cfd_by_oracle_event_id() {
let pool = setup_test_db().await;
let mut conn = pool.acquire().await.unwrap();
let oracle_event_id_1 = OracleEventId("dummy_1".to_string());
let oracle_event_id_2 = OracleEventId("dummy_2".to_string());
let cfd_1 = Cfd::default()
.with_order(Order::default().with_oracle_event_id(oracle_event_id_1.clone()));
insert_order(&cfd_1.order, &mut conn).await.unwrap();
insert_cfd(cfd_1.clone(), &mut conn).await.unwrap();
let cfd_from_db = load_cfds_by_oracle_event_id(oracle_event_id_1.clone(), &mut conn)
.await
.unwrap();
assert_eq!(vec![cfd_1.clone()], cfd_from_db);
let cfd_2 = Cfd::default()
.with_order(Order::default().with_oracle_event_id(oracle_event_id_1.clone()));
insert_order(&cfd_2.order, &mut conn).await.unwrap();
insert_cfd(cfd_2.clone(), &mut conn).await.unwrap();
let cfd_from_db = load_cfds_by_oracle_event_id(oracle_event_id_1, &mut conn)
.await
.unwrap();
assert_eq!(vec![cfd_1, cfd_2], cfd_from_db);
let cfd_3 = Cfd::default()
.with_order(Order::default().with_oracle_event_id(oracle_event_id_2.clone()));
insert_order(&cfd_3.order, &mut conn).await.unwrap();
insert_cfd(cfd_3.clone(), &mut conn).await.unwrap();
let cfd_from_db = load_cfds_by_oracle_event_id(oracle_event_id_2, &mut conn)
.await
.unwrap();
assert_eq!(vec![cfd_3], cfd_from_db);
}
#[tokio::test]
async fn test_insert_new_cfd_state() {
let pool = setup_test_db().await;
let mut conn = pool.acquire().await.unwrap();
let order = Order::default();
let mut cfd = Cfd::new(
order.clone(),
Usd(dec!(1000)),
CfdState::OutgoingOrderRequest {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
);
let mut cfd = Cfd::default();
insert_order(&order, &mut conn).await.unwrap();
insert_order(&cfd.order, &mut conn).await.unwrap();
insert_cfd(cfd.clone(), &mut conn).await.unwrap();
cfd.state = CfdState::Accepted {
@ -582,6 +659,27 @@ mod tests {
pool
}
impl Default for Cfd {
fn default() -> Self {
Cfd::new(
Order::default(),
Usd(dec!(1000)),
CfdState::OutgoingOrderRequest {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
)
}
}
impl Cfd {
pub fn with_order(mut self, order: Order) -> Self {
self.order = order;
self
}
}
impl Default for Order {
fn default() -> Self {
Order::new(
@ -594,4 +692,11 @@ mod tests {
.unwrap()
}
}
impl Order {
pub fn with_oracle_event_id(mut self, oracle_event_id: OracleEventId) -> Self {
self.oracle_event_id = oracle_event_id;
self
}
}
}

8
daemon/src/housekeeping.rs

@ -54,5 +54,13 @@ pub async fn rebroadcast_transactions(
tracing::info!("Commit transaction published on chain: {}", txid);
}
for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_cet(cfd)) {
// Double question-mark OK because if we are in PendingCet we must have been Ready before
let signed_cet = cfd.cet()??;
let txid = wallet.try_broadcast_transaction(signed_cet).await?;
tracing::info!("CET published on chain: {}", txid);
}
Ok(())
}

55
daemon/src/maker_cfd.rs

@ -1,7 +1,7 @@
use crate::actors::log_error;
use crate::db::{
insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds,
load_cfd_by_order_id, load_order_by_id,
load_cfd_by_order_id, load_cfds_by_oracle_event_id, load_order_by_id,
};
use crate::maker_inc_connections::TakerCommand;
use crate::model::cfd::{
@ -266,6 +266,7 @@ impl Actor {
transition_timestamp: SystemTime::now(),
},
dlc: dlc.clone(),
attestation: None,
},
&mut conn,
)
@ -411,8 +412,6 @@ impl Actor {
})
.await?;
let nonce_pks = offer_announcement.nonce_pks.clone();
let contract_future = setup_contract::new(
self.takers.clone().into_sink().with(move |msg| {
future::ok(maker_inc_connections::TakerMessage {
@ -421,7 +420,7 @@ impl Actor {
})
}),
receiver,
(self.oracle_pk, nonce_pks),
(self.oracle_pk, offer_announcement.clone().into()),
cfd,
self.wallet.clone(),
Role::Maker,
@ -572,10 +571,10 @@ impl Actor {
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
// TODO: Not sure that should be done here...
// Consider bubbling the refund availability up to the user, and let user trigger
// transaction publication
if let CfdState::MustRefund { .. } = new_state {
// TODO: code duplication maker/taker
if let CfdState::OpenCommitted { .. } = new_state {
self.try_cet_publication(cfd).await?;
} else if let CfdState::MustRefund { .. } = new_state {
let signed_refund_tx = cfd.refund_tx()?;
let txid = self
.wallet
@ -606,12 +605,44 @@ impl Actor {
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
tracing::debug!(
"Learnt latest oracle attestation for event: {}",
attestation.id.0
attestation.id
);
todo!(
"Update all CFDs which care about this particular attestation, based on the event ID"
);
let mut conn = self.db.acquire().await?;
let cfds = load_cfds_by_oracle_event_id(attestation.id.clone(), &mut conn).await?;
for mut cfd in cfds {
cfd.handle(CfdStateChangeEvent::OracleAttestation(attestation.clone()))?;
insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn).await?;
self.try_cet_publication(cfd).await?;
}
Ok(())
}
// TODO: code duplication maker/taker
async fn try_cet_publication(&mut self, mut cfd: Cfd) -> Result<()> {
let mut conn = self.db.acquire().await?;
match cfd.cet()? {
Ok(cet) => {
let txid = self.wallet.try_broadcast_transaction(cet).await?;
tracing::info!("CET published with txid {}", txid);
cfd.handle(CfdStateChangeEvent::CetSent)?;
insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state, &mut conn).await?;
}
Err(not_ready_yet) => {
tracing::debug!(
"Attestation received but we are not ready to publish it yet: {:#}",
not_ready_yet
);
return Ok(());
}
};
Ok(())
}
}

7
daemon/src/model.rs

@ -6,6 +6,7 @@ use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use bdk::bitcoin::{Address, Amount};
use std::fmt;
use std::time::SystemTime;
use uuid::Uuid;
@ -108,3 +109,9 @@ pub struct WalletInfo {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct OracleEventId(pub String);
impl Display for OracleEventId {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}

245
daemon/src/model/cfd.rs

@ -1,5 +1,6 @@
use crate::model::{Leverage, OracleEventId, Percent, Position, TakerId, TradingPair, Usd};
use crate::monitor;
use crate::oracle::Attestation;
use anyhow::{bail, Context, Result};
use bdk::bitcoin::secp256k1::{SecretKey, Signature};
use bdk::bitcoin::{Address, Amount, PublicKey, SignedAmount, Transaction};
@ -173,9 +174,7 @@ pub enum CfdState {
/// The taker sent an order to the maker to open the CFD but doesn't have a response yet.
///
/// This state applies to taker only.
OutgoingOrderRequest {
common: CfdStateCommon,
},
OutgoingOrderRequest { common: CfdStateCommon },
/// The maker received an order from the taker to open the CFD but doesn't have a response yet.
///
@ -188,29 +187,24 @@ pub enum CfdState {
/// The maker has accepted the CFD take request, but the contract is not set up on chain yet.
///
/// This state applies to taker and maker.
Accepted {
common: CfdStateCommon,
},
Accepted { common: CfdStateCommon },
/// The maker rejected the CFD order.
///
/// This state applies to taker and maker.
/// This is a final state.
Rejected {
common: CfdStateCommon,
},
Rejected { common: CfdStateCommon },
/// State used during contract setup.
///
/// This state applies to taker and maker.
/// All contract setup messages between taker and maker are expected to be sent in on scope.
ContractSetup {
common: CfdStateCommon,
},
ContractSetup { common: CfdStateCommon },
PendingOpen {
common: CfdStateCommon,
dlc: Dlc,
attestation: Option<Attestation>,
},
/// The CFD contract is set up on chain.
@ -219,6 +213,7 @@ pub enum CfdState {
Open {
common: CfdStateCommon,
dlc: Dlc,
attestation: Option<Attestation>,
},
/// The commit transaction was published but it not final yet
@ -228,6 +223,7 @@ pub enum CfdState {
PendingCommit {
common: CfdStateCommon,
dlc: Dlc,
attestation: Option<Attestation>,
},
// TODO: At the moment we are appending to this state. The way this is handled internally is
@ -243,19 +239,34 @@ pub enum CfdState {
cet_status: CetStatus,
},
/// The CFD contract's refund transaction was published but it not final yet
MustRefund {
/// The CET was published on chain but is not final yet
///
/// This state applies to taker and maker.
/// This state is needed, because otherwise the user does not get any feedback.
PendingCet {
common: CfdStateCommon,
dlc: Dlc,
cet_status: CetStatus,
},
/// The position was closed collaboratively or non-collaboratively
///
/// This state applies to taker and maker.
/// This is a final state.
/// This is the final state for all happy-path scenarios where we had an open position and then
/// "settled" it. Settlement can be collaboratively or non-collaboratively (by publishing
/// commit + cet).
Closed { common: CfdStateCommon },
// TODO: Can be extended with CetStatus
/// The CFD contract's refund transaction was published but it not final yet
MustRefund { common: CfdStateCommon, dlc: Dlc },
/// The Cfd was refunded and the refund transaction reached finality
///
/// This state applies to taker and maker.
/// This is a final state.
Refunded {
common: CfdStateCommon,
},
Refunded { common: CfdStateCommon },
/// The Cfd was in a state that could not be continued after the application got interrupted
///
@ -273,8 +284,8 @@ pub enum CfdState {
pub enum CetStatus {
Unprepared,
TimelockExpired,
OracleSigned(u64),
Ready(u64),
OracleSigned(Attestation),
Ready(Attestation),
}
impl CfdState {
@ -292,6 +303,8 @@ impl CfdState {
CfdState::Refunded { common, .. } => common,
CfdState::SetupFailed { common, .. } => common,
CfdState::PendingCommit { common, .. } => common,
CfdState::PendingCet { common, .. } => common,
CfdState::Closed { common, .. } => common,
};
*common
@ -341,6 +354,12 @@ impl fmt::Display for CfdState {
CfdState::SetupFailed { .. } => {
write!(f, "Setup Failed")
}
CfdState::PendingCet { .. } => {
write!(f, "Pending CET")
}
CfdState::Closed { .. } => {
write!(f, "Closed")
}
}
}
}
@ -506,6 +525,7 @@ impl Cfd {
transition_timestamp: SystemTime::now(),
},
dlc,
attestation: None,
}
} else {
bail!(
@ -549,23 +569,34 @@ impl Cfd {
},
CfdState::OpenCommitted {
dlc,
cet_status: CetStatus::OracleSigned(price),
cet_status: CetStatus::OracleSigned(attestation),
..
} => CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::Ready(price),
cet_status: CetStatus::Ready(attestation),
},
PendingOpen { dlc, .. } | Open { dlc, .. } => {
tracing::debug!(%order_id, "Was in unexpected state {}, jumping ahead to MustRefund", self.state);
PendingOpen {
dlc, attestation, ..
}
| Open {
dlc, attestation, ..
}
| PendingCommit {
dlc, attestation, ..
} => {
tracing::debug!(%order_id, "Was in unexpected state {}, jumping ahead to OpenCommitted", self.state);
CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::TimelockExpired,
cet_status: match attestation {
None => CetStatus::TimelockExpired,
Some(attestation) => CetStatus::Ready(attestation),
},
}
}
_ => bail!(
@ -608,13 +639,21 @@ impl Cfd {
},
}
}
monitor::Event::CetFinality(_) => {
todo!("Implement state transition")
}
monitor::Event::CetFinality(_) => Closed {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
},
CfdStateChangeEvent::CommitTxSent => {
let dlc = if let Open { dlc, .. } | PendingOpen { dlc, .. } = self.state.clone() {
dlc
let (dlc, attestation) = if let PendingOpen {
dlc, attestation, ..
}
| Open {
dlc, attestation, ..
} = self.state.clone()
{
(dlc, attestation)
} else {
bail!(
"Cannot transition to PendingCommit because of unexpected state {}",
@ -627,8 +666,66 @@ impl Cfd {
transition_timestamp: SystemTime::now(),
},
dlc,
attestation,
}
}
CfdStateChangeEvent::OracleAttestation(attestation) => match self.state.clone() {
CfdState::Open { dlc, .. } => CfdState::Open {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
attestation: Some(attestation),
},
CfdState::PendingCommit { dlc, .. } => CfdState::PendingCommit {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
attestation: Some(attestation),
},
CfdState::OpenCommitted {
dlc,
cet_status: CetStatus::Unprepared,
..
} => CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::OracleSigned(attestation),
},
CfdState::OpenCommitted {
dlc,
cet_status: CetStatus::TimelockExpired,
..
} => CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc,
cet_status: CetStatus::Ready(attestation),
},
_ => bail!(
"Cannot transition to OpenCommitted because of unexpected state {}",
self.state
),
},
CfdStateChangeEvent::CetSent => match self.state.clone() {
CfdState::OpenCommitted {
common,
dlc,
cet_status,
} => CfdState::PendingCet {
common,
dlc,
cet_status,
},
_ => bail!(
"Cannot transition to PendingCet because of unexpected state {}",
self.state
),
},
};
self.state = new_state.clone();
@ -689,7 +786,6 @@ impl Cfd {
&dlc.identity,
));
// TODO: verify that the dlc's `publish` sk corresponds to the decryption_sk we need here
let counterparty_sig = dlc.commit.1.decrypt(&dlc.publish)?;
let counterparty_pubkey = dlc.identity_counterparty;
@ -703,6 +799,72 @@ impl Cfd {
Ok(signed_commit_tx)
}
pub fn cet(&self) -> Result<Result<Transaction, NotReadyYet>> {
let (dlc, attestation) = match self.state.clone() {
CfdState::OpenCommitted {
dlc,
cet_status: CetStatus::Ready(attestation),
..
}
| CfdState::PendingCet {
dlc,
cet_status: CetStatus::Ready(attestation),
..
} => (dlc, attestation),
CfdState::OpenCommitted { .. }
| CfdState::Open { .. }
| CfdState::PendingCommit { .. } => {
return Ok(Err(NotReadyYet));
}
_ => bail!("Cannot publish CET in state {}", self.state.clone()),
};
let cets = dlc
.cets
.get(&attestation.id)
.context("Unable to find oracle event id within the cets of the dlc")?;
let Cet {
tx: cet,
adaptor_sig: encsig,
n_bits,
..
} = cets
.iter()
.find(|Cet { range, .. }| range.contains(&attestation.price))
.context("Price out of range of cets")?;
let oracle_attestations = attestation.scalars;
let mut decryption_sk = oracle_attestations[0];
for oracle_attestation in oracle_attestations[1..*n_bits].iter() {
decryption_sk.add_assign(oracle_attestation.as_ref())?;
}
let sig_hash = spending_tx_sighash(
cet,
&dlc.commit.2,
Amount::from_sat(dlc.commit.0.output[0].value),
);
let our_sig = SECP256K1.sign(&sig_hash, &dlc.identity);
let our_pubkey = PublicKey::new(bdk::bitcoin::secp256k1::PublicKey::from_secret_key(
SECP256K1,
&dlc.identity,
));
let counterparty_sig = encsig.decrypt(&decryption_sk)?;
let counterparty_pubkey = dlc.identity_counterparty;
let signed_cet = finalize_spend_transaction(
cet.clone(),
&dlc.commit.2,
(our_pubkey, our_sig),
(counterparty_pubkey, counterparty_sig),
)?;
Ok(Ok(signed_cet))
}
pub fn pending_open_dlc(&self) -> Option<Dlc> {
if let CfdState::PendingOpen { dlc, .. } = self.state.clone() {
Some(dlc)
@ -719,6 +881,10 @@ impl Cfd {
matches!(self.state.clone(), CfdState::PendingCommit { .. })
}
pub fn is_pending_cet(&self) -> bool {
matches!(self.state.clone(), CfdState::PendingCet { .. })
}
pub fn is_cleanup(&self) -> bool {
matches!(
self.state.clone(),
@ -734,12 +900,18 @@ impl Cfd {
}
}
#[derive(thiserror::Error, Debug, Clone, Copy)]
#[error("The cfd is not committed yet")]
pub struct NotReadyYet;
#[derive(Debug, Clone)]
pub enum CfdStateChangeEvent {
// TODO: group other events by actors into enums and add them here so we can bundle all
// transitions into cfd.transition_to(...)
Monitor(monitor::Event),
CommitTxSent,
OracleAttestation(Attestation),
CetSent,
}
/// Returns the Profit/Loss (P/L) as Bitcoin. Losses are capped by the provided margin
@ -1078,6 +1250,17 @@ mod tests {
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Cet {
pub tx: Transaction,
pub adaptor_sig: EcdsaAdaptorSignature,
// TODO: Range + number of digits (usize) could be represented as Digits similar to what we do
// in the protocol lib
pub range: RangeInclusive<u64>,
pub n_bits: usize,
}
/// Contains all data we've assembled about the CFD through the setup protocol.
///
/// All contained signatures are the signatures of THE OTHER PARTY.
@ -1093,6 +1276,6 @@ pub struct Dlc {
/// The fully signed lock transaction ready to be published on chain
pub lock: (Transaction, Descriptor<PublicKey>),
pub commit: (Transaction, EcdsaAdaptorSignature, Descriptor<PublicKey>),
pub cets: HashMap<String, Vec<(Transaction, EcdsaAdaptorSignature, RangeInclusive<u64>)>>,
pub cets: HashMap<OracleEventId, Vec<Cet>>,
pub refund: (Transaction, Signature),
}

120
daemon/src/monitor.rs

@ -1,6 +1,8 @@
use crate::actors::log_error;
use crate::model::cfd::{CetStatus, Cfd, CfdState, Dlc, OrderId};
use crate::oracle;
use crate::model::OracleEventId;
use crate::oracle::Attestation;
use crate::{model, oracle};
use anyhow::{Context, Result};
use async_trait::async_trait;
use bdk::bitcoin::{PublicKey, Script, Txid};
@ -21,11 +23,19 @@ pub struct StartMonitoring {
pub params: MonitorParams,
}
#[derive(Clone)]
pub struct Cet {
txid: Txid,
script: Script,
range: RangeInclusive<u64>,
n_bits: usize,
}
#[derive(Clone)]
pub struct MonitorParams {
lock: (Txid, Descriptor<PublicKey>),
commit: (Txid, Descriptor<PublicKey>),
cets: HashMap<String, Vec<(Txid, Script, RangeInclusive<u64>)>>,
cets: HashMap<OracleEventId, Vec<Cet>>,
refund: (Txid, Script, u32),
}
@ -88,13 +98,19 @@ where
actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id);
}
CfdState::OpenCommitted { dlc, cet_status, .. } => {
CfdState::OpenCommitted { dlc, cet_status, .. }
| CfdState::PendingCet { dlc, cet_status, .. } => {
let params = MonitorParams::from_dlc_and_timelocks(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.order.id, params.clone());
match cet_status {
CetStatus::Unprepared
| CetStatus::OracleSigned(_) => {
CetStatus::Unprepared => {
actor.monitor_commit_cet_timelock(&params, cfd.order.id);
actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id);
}
CetStatus::OracleSigned(attestation) => {
actor.monitor_cet_finality(map_cets(dlc.cets, dlc.address.script_pubkey()), attestation, cfd.order.id)?;
actor.monitor_commit_cet_timelock(&params, cfd.order.id);
actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id);
@ -103,17 +119,14 @@ where
actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id);
}
CetStatus::Ready(_price) => {
// TODO: monitor CET finality
CetStatus::Ready(attestation) => {
actor.monitor_cet_finality(map_cets(dlc.cets, dlc.address.script_pubkey()), attestation, cfd.order.id)?;
actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id);
}
}
}
CfdState::MustRefund { dlc, .. } => {
// TODO: CET monitoring (?) - note: would require to add CetStatus information to MustRefund
let params = MonitorParams::from_dlc_and_timelocks(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.order.id, params.clone());
@ -128,6 +141,7 @@ where
| CfdState::ContractSetup { .. }
// final states
| CfdState::Closed { .. }
| CfdState::Rejected { .. }
| CfdState::Refunded { .. }
| CfdState::SetupFailed { .. } => ()
@ -192,6 +206,36 @@ where
.push((ScriptStatus::finality(), Event::RefundFinality(order_id)));
}
fn monitor_cet_finality(
&mut self,
cets: HashMap<OracleEventId, Vec<Cet>>,
attestation: Attestation,
order_id: OrderId,
) -> Result<()> {
let cets = cets
.get(&attestation.id)
.context("No CET for oracle event found")?;
let (txid, script_pubkey) = cets
.iter()
.find_map(
|Cet {
txid,
script,
range,
..
}| { range.contains(&attestation.price).then(|| (txid, script)) },
)
.context("No price range match for oracle attestation")?;
self.awaiting_status
.entry((*txid, script_pubkey.clone()))
.or_default()
.push((ScriptStatus::finality(), Event::CetFinality(order_id)));
Ok(())
}
async fn sync(&mut self) -> Result<()> {
// Fetch the latest block for storing the height.
// We do not act on this subscription after this call, as we cannot rely on
@ -221,23 +265,7 @@ where
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
for (order_id, MonitorParams { cets, .. }) in self.cfds.clone().into_iter() {
let cets = cets
.get(&attestation.id.0)
.context("No CET for oracle event found")?;
let (txid, script_pubkey) =
match cets.iter().find_map(|(txid, script_pubkey, range)| {
range
.contains(&attestation.price)
.then(|| (txid, script_pubkey))
}) {
Some(cet) => cet,
None => continue,
};
self.awaiting_status
.entry((*txid, script_pubkey.clone()))
.or_default()
.push((ScriptStatus::finality(), Event::CetFinality(order_id)));
self.monitor_cet_finality(cets, attestation.clone(), order_id)?;
}
Ok(())
@ -469,18 +497,7 @@ impl MonitorParams {
MonitorParams {
lock: (dlc.lock.0.txid(), dlc.lock.1),
commit: (dlc.commit.0.txid(), dlc.commit.2),
cets: dlc
.cets
.iter()
.map(|(event_id, cets)| {
(
event_id.clone(),
cets.iter()
.map(|(tx, _, range)| (tx.txid(), script_pubkey.clone(), range.clone()))
.collect::<Vec<_>>(),
)
})
.collect::<HashMap<_, _>>(),
cets: map_cets(dlc.cets, script_pubkey.clone()),
refund: (
dlc.refund.0.txid(),
script_pubkey,
@ -490,6 +507,31 @@ impl MonitorParams {
}
}
fn map_cets(
cets: HashMap<OracleEventId, Vec<model::cfd::Cet>>,
script_pubkey: Script,
) -> HashMap<OracleEventId, Vec<Cet>> {
cets.iter()
.map(|(event_id, cets)| {
(
event_id.clone(),
cets.iter()
.map(
|model::cfd::Cet {
tx, range, n_bits, ..
}| Cet {
txid: tx.txid(),
script: script_pubkey.clone(),
range: range.clone(),
n_bits: *n_bits,
},
)
.collect::<Vec<_>>(),
)
})
.collect()
}
impl xtra::Message for Event {
type Result = ();
}

14
daemon/src/oracle.rs

@ -8,6 +8,7 @@ use futures::TryStreamExt;
use rocket::time::format_description::FormatItem;
use rocket::time::macros::format_description;
use rocket::time::{Duration, OffsetDateTime, Time};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::convert::TryFrom;
@ -208,12 +209,21 @@ pub struct Announcement {
pub nonce_pks: Vec<schnorrsig::PublicKey>,
}
impl From<Announcement> for cfd_protocol::Announcement {
fn from(announcement: Announcement) -> Self {
cfd_protocol::Announcement {
id: announcement.id.0,
nonce_pks: announcement.nonce_pks,
}
}
}
#[derive(Debug, Clone)]
pub struct Announcements(pub [Announcement; 24]);
// TODO: Implement real deserialization once price attestation is
// implemented in `olivia`
#[derive(Debug, Clone, serde::Deserialize, PartialEq)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(try_from = "olivia_api::Response")]
pub struct Attestation {
pub id: OracleEventId,
@ -426,7 +436,7 @@ mod olivia_api {
let deserialized = serde_json::from_str::<oracle::Attestation>(json).unwrap();
let expected = oracle::Attestation {
id: "/x/BitMEX/BXBT/2021-10-04T22:00:00.price[n:20]".to_string(),
id: OracleEventId("/x/BitMEX/BXBT/2021-10-04T22:00:00.price[n:20]".to_string()),
price: 48935,
scalars: vec![
"1327b3bd0f1faf45d6fed6c96d0c158da22a2033a6fed98bed036df0a4eef484"

21
daemon/src/setup_contract.rs

@ -1,4 +1,5 @@
use crate::model::cfd::{Cfd, Dlc, Role};
use crate::model::cfd::{Cet, Cfd, Dlc, Role};
use crate::model::OracleEventId;
use crate::wallet::Wallet;
use crate::wire::{Msg0, Msg1, Msg2, SetupMsg};
use crate::{model, payout_curve};
@ -26,7 +27,7 @@ use std::ops::RangeInclusive;
pub async fn new(
mut sink: impl Sink<SetupMsg, Error = anyhow::Error> + Unpin,
mut stream: impl FusedStream<Item = SetupMsg> + Unpin,
(oracle_pk, nonce_pks): (schnorrsig::PublicKey, Vec<schnorrsig::PublicKey>),
(oracle_pk, announcement): (schnorrsig::PublicKey, Announcement),
cfd: Cfd,
wallet: Wallet,
role: Role,
@ -68,10 +69,7 @@ pub async fn new(
}
let payouts = HashMap::from_iter([(
Announcement {
id: "dummy_id_to_be_replaced".to_string(),
nonce_pks: nonce_pks.clone(),
},
announcement.clone(),
payout_curve::calculate(
cfd.order.price,
cfd.quantity_usd,
@ -142,7 +140,7 @@ pub async fn new(
.context("Expect event to exist in msg")?;
verify_cets(
(&oracle_pk, &nonce_pks),
(&oracle_pk, &announcement.nonce_pks),
&params.other,
own_grouped_cets.cets.as_slice(),
other_cets.as_slice(),
@ -207,10 +205,15 @@ pub async fn new(
digits.range()
)
})?;
Ok((tx, *other_encsig, digits.range()))
Ok(Cet {
tx,
adaptor_sig: *other_encsig,
range: digits.range(),
n_bits: digits.len(),
})
})
.collect::<Result<Vec<_>>>()?;
Ok((event_id, cets))
Ok((OracleEventId(event_id), cets))
})
.collect::<Result<HashMap<_, _>>>()?;

57
daemon/src/taker_cfd.rs

@ -1,7 +1,7 @@
use crate::actors::log_error;
use crate::db::{
insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds,
load_cfd_by_order_id, load_order_by_id,
load_cfd_by_order_id, load_cfds_by_oracle_event_id, load_order_by_id,
};
use crate::model::cfd::{
Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId, Origin, Role,
@ -262,15 +262,13 @@ impl Actor {
})
.await?;
let nonce_pks = offer_announcement.nonce_pks.clone();
let contract_future = setup_contract::new(
self.send_to_maker
.clone()
.into_sink()
.with(|msg| future::ok(wire::TakerToMaker::Protocol(msg))),
receiver,
(self.oracle_pk, nonce_pks),
(self.oracle_pk, offer_announcement.clone().into()),
cfd,
self.wallet.clone(),
Role::Taker,
@ -345,6 +343,7 @@ impl Actor {
transition_timestamp: SystemTime::now(),
},
dlc: dlc.clone(),
attestation: None,
},
&mut conn,
)
@ -387,10 +386,10 @@ impl Actor {
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
// TODO: Not sure that should be done here...
// Consider bubbling the refund availability up to the user, and let user trigger
// transaction publication
if let CfdState::MustRefund { .. } = new_state {
// TODO: code duplicateion maker/taker
if let CfdState::OpenCommitted { .. } = new_state {
self.try_cet_publication(cfd).await?;
} else if let CfdState::MustRefund { .. } = new_state {
let signed_refund_tx = cfd.refund_tx()?;
let txid = self
.wallet
@ -403,7 +402,7 @@ impl Actor {
Ok(())
}
// TODO: Duplicated with maker
// TODO: code duplicateion maker/taker
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
@ -444,12 +443,44 @@ impl Actor {
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
tracing::debug!(
"Learnt latest oracle attestation for event: {}",
attestation.id.0
attestation.id
);
todo!(
"Update all CFDs which care about this particular attestation, based on the event ID"
);
let mut conn = self.db.acquire().await?;
let cfds = load_cfds_by_oracle_event_id(attestation.id.clone(), &mut conn).await?;
for mut cfd in cfds {
cfd.handle(CfdStateChangeEvent::OracleAttestation(attestation.clone()))?;
insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn).await?;
self.try_cet_publication(cfd).await?;
}
Ok(())
}
// TODO: code duplication maker/taker
async fn try_cet_publication(&mut self, mut cfd: Cfd) -> Result<()> {
let mut conn = self.db.acquire().await?;
match cfd.cet()? {
Ok(cet) => {
let txid = self.wallet.try_broadcast_transaction(cet).await?;
tracing::info!("CET published with txid {}", txid);
cfd.handle(CfdStateChangeEvent::CetSent)?;
insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state, &mut conn).await?;
}
Err(not_ready_yet) => {
tracing::debug!(
"Attestation received but we are not ready to publish it yet: {:#}",
not_ready_yet
);
return Ok(());
}
};
Ok(())
}
}

4
daemon/src/to_sse_event.rs

@ -66,11 +66,13 @@ pub enum CfdState {
PendingOpen,
Open,
PendingCommit,
PendingCet,
OpenCommitted,
IncomingSettlementProposal,
OutgoingSettlementProposal,
IncomingRollOverProposal,
OutgoingRollOverProposal,
Closed,
MustRefund,
Refunded,
SetupFailed,
@ -251,6 +253,8 @@ fn to_cfd_state(
model::cfd::CfdState::Refunded { .. } => CfdState::Refunded,
model::cfd::CfdState::SetupFailed { .. } => CfdState::SetupFailed,
model::cfd::CfdState::PendingCommit { .. } => CfdState::PendingCommit,
model::cfd::CfdState::PendingCet { .. } => CfdState::PendingCet,
model::cfd::CfdState::Closed { .. } => CfdState::Closed,
},
Some(UpdateCfdProposal::RollOverProposal {
direction: SettlementKind::Outgoing,

10
frontend/src/components/Types.tsx

@ -87,6 +87,10 @@ export class State {
return "Refunded";
case StateKey.SETUP_FAILED:
return "Setup Failed";
case StateKey.PENDING_CET:
return "Pending CET";
case StateKey.CLOSED:
return "Closed";
}
}
@ -107,6 +111,7 @@ export class State {
case StateKey.PENDING_COMMIT:
case StateKey.OPEN_COMMITTED:
case StateKey.MUST_REFUND:
case StateKey.PENDING_CET:
return orange;
case StateKey.OUTGOING_ORDER_REQUEST:
@ -119,6 +124,7 @@ export class State {
case StateKey.PENDING_OPEN:
case StateKey.REFUNDED:
case StateKey.SETUP_FAILED:
case StateKey.CLOSED:
return default_color;
}
}
@ -140,6 +146,7 @@ export class State {
case StateKey.MUST_REFUND:
case StateKey.OUTGOING_SETTLEMENT_PROPOSAL:
case StateKey.OUTGOING_ROLL_OVER_PROPOSAL:
case StateKey.PENDING_CET:
return StateGroupKey.OPEN;
case StateKey.INCOMING_SETTLEMENT_PROPOSAL:
@ -151,6 +158,7 @@ export class State {
case StateKey.REJECTED:
case StateKey.REFUNDED:
case StateKey.SETUP_FAILED:
case StateKey.CLOSED:
return StateGroupKey.CLOSED;
}
}
@ -176,6 +184,7 @@ const enum StateKey {
PENDING_OPEN = "PendingOpen",
OPEN = "Open",
PENDING_COMMIT = "PendingCommit",
PENDING_CET = "PendingCet",
OPEN_COMMITTED = "OpenCommitted",
OUTGOING_SETTLEMENT_PROPOSAL = "OutgoingSettlementProposal",
INCOMING_SETTLEMENT_PROPOSAL = "IncomingSettlementProposal",
@ -184,6 +193,7 @@ const enum StateKey {
MUST_REFUND = "MustRefund",
REFUNDED = "Refunded",
SETUP_FAILED = "SetupFailed",
CLOSED = "Closed",
}
export enum StateGroupKey {

Loading…
Cancel
Save