From 159240cc9fc4e424c71cc57c70231dbc2ab8a47c Mon Sep 17 00:00:00 2001 From: Daniel Karzel Date: Wed, 6 Oct 2021 19:14:05 +1100 Subject: [PATCH] Wire in automatic CET publication - Handles the Oracle's attestation in the cfd actor, transition according to the knowledge we have about the timelock expiry already. - Tries to publish the CET if we are in `CetStatus::Ready`, i.e. both the attestation and timelock expiry happened. We try publishing if either event happened, and just print a log in case it's not ready yet. - Tries to re-publish CET if we are in `PendingCet` upon restart. - Re-triggers CET monitoring upon startup. - Extends any state after `ContractSetup` to be able to store the attestation. (see below for ideas to change that) Things that could be done different: Currently we are carrying on the attestation through a lot of states - because we cannot just transition the user to `OpenCommitted` because it is a user decision to go for commit, but we have to keep the attestation around once it happened. To reduce the state complexity, we could store the attestation independent of the state, but associated with the cfd. This would make things a lot simpler, but we would then always have to go to the database to check if the attestation is already around (which might make other parts more complex). --- daemon/sqlx-data.json | 196 +++++++++++++++++------- daemon/src/db.rs | 231 ++++++++++++++++++++-------- daemon/src/housekeeping.rs | 8 + daemon/src/maker_cfd.rs | 55 +++++-- daemon/src/model.rs | 7 + daemon/src/model/cfd.rs | 245 ++++++++++++++++++++++++++---- daemon/src/monitor.rs | 120 ++++++++++----- daemon/src/oracle.rs | 14 +- daemon/src/setup_contract.rs | 21 +-- daemon/src/taker_cfd.rs | 57 +++++-- daemon/src/to_sse_event.rs | 4 + frontend/src/components/Types.tsx | 10 ++ 12 files changed, 743 insertions(+), 225 deletions(-) diff --git a/daemon/sqlx-data.json b/daemon/sqlx-data.json index 3cd8045..83cf7c2 100644 --- a/daemon/sqlx-data.json +++ b/daemon/sqlx-data.json @@ -1,82 +1,95 @@ { "db": "SQLite", - "41fb3bea22bde82a79aee6096d579a16494a20cfcfb2e8cfffe0a56073460bbf": { - "query": "\n select\n cfds.id as cfd_id,\n orders.uuid as order_id,\n orders.initial_price as price,\n orders.min_quantity as min_quantity,\n orders.max_quantity as max_quantity,\n orders.leverage as leverage,\n orders.trading_pair as trading_pair,\n orders.position as position,\n orders.origin as origin,\n orders.liquidation_price as liquidation_price,\n orders.creation_timestamp as creation_timestamp,\n orders.term as term,\n orders.oracle_event_id,\n cfds.quantity_usd as quantity_usd,\n cfd_states.state as state\n from cfds as cfds\n inner join orders as orders on cfds.order_id = orders.id\n inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id\n where cfd_states.state in (\n select\n state\n from cfd_states\n where cfd_id = cfds.id\n order by id desc\n limit 1\n )\n ", + "50abbb297394739ec9d85917f8c32aa8bcfa0bfe140b24e9eeda4ce8d30d4f8d": { + "query": "\n select\n state\n from cfd_states\n where cfd_id = ?\n order by id desc\n limit 1;\n ", "describe": { "columns": [ { - "name": "cfd_id", + "name": "state", "ordinal": 0, - "type_info": "Int64" - }, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false + ] + } + }, + "63c8fd1a1512b55c19feea463dbb6a0fdf98a325801ba8677e2a33fd3ee1ebb9": { + "query": "\n select\n orders.uuid as order_id,\n orders.initial_price as price,\n orders.min_quantity as min_quantity,\n orders.max_quantity as max_quantity,\n orders.leverage as leverage,\n orders.trading_pair as trading_pair,\n orders.position as position,\n orders.origin as origin,\n orders.liquidation_price as liquidation_price,\n orders.creation_timestamp as creation_timestamp,\n orders.term as term,\n orders.oracle_event_id,\n cfds.quantity_usd as quantity_usd,\n cfd_states.state as state\n from cfds as cfds\n inner join orders as orders on cfds.order_id = orders.id\n inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id\n where cfd_states.state in (\n select\n state\n from cfd_states\n where cfd_id = cfds.id\n order by id desc\n limit 1\n )\n ", + "describe": { + "columns": [ { "name": "order_id", - "ordinal": 1, + "ordinal": 0, "type_info": "Text" }, { "name": "price", - "ordinal": 2, + "ordinal": 1, "type_info": "Text" }, { "name": "min_quantity", - "ordinal": 3, + "ordinal": 2, "type_info": "Text" }, { "name": "max_quantity", - "ordinal": 4, + "ordinal": 3, "type_info": "Text" }, { "name": "leverage", - "ordinal": 5, + "ordinal": 4, "type_info": "Int64" }, { "name": "trading_pair", - "ordinal": 6, + "ordinal": 5, "type_info": "Text" }, { "name": "position", - "ordinal": 7, + "ordinal": 6, "type_info": "Text" }, { "name": "origin", - "ordinal": 8, + "ordinal": 7, "type_info": "Text" }, { "name": "liquidation_price", - "ordinal": 9, + "ordinal": 8, "type_info": "Text" }, { "name": "creation_timestamp", - "ordinal": 10, + "ordinal": 9, "type_info": "Text" }, { "name": "term", - "ordinal": 11, + "ordinal": 10, "type_info": "Text" }, { "name": "oracle_event_id", - "ordinal": 12, + "ordinal": 11, "type_info": "Text" }, { "name": "quantity_usd", - "ordinal": 13, + "ordinal": 12, "type_info": "Text" }, { "name": "state", - "ordinal": 14, + "ordinal": 13, "type_info": "Text" } ], @@ -97,25 +110,6 @@ false, false, false, - false, - false - ] - } - }, - "50abbb297394739ec9d85917f8c32aa8bcfa0bfe140b24e9eeda4ce8d30d4f8d": { - "query": "\n select\n state\n from cfd_states\n where cfd_id = ?\n order by id desc\n limit 1;\n ", - "describe": { - "columns": [ - { - "name": "state", - "ordinal": 0, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ false ] } @@ -158,83 +152,174 @@ "nullable": [] } }, - "c404a4eebb7118fd4e716fe4155e011a52e12a3594b698ca4f0662674cd067f8": { - "query": "\n select\n cfds.id as cfd_id,\n orders.uuid as order_id,\n orders.initial_price as price,\n orders.min_quantity as min_quantity,\n orders.max_quantity as max_quantity,\n orders.leverage as leverage,\n orders.trading_pair as trading_pair,\n orders.position as position,\n orders.origin as origin,\n orders.liquidation_price as liquidation_price,\n orders.creation_timestamp as creation_timestamp,\n orders.term as term,\n orders.oracle_event_id,\n cfds.quantity_usd as quantity_usd,\n cfd_states.state as state\n from cfds as cfds\n inner join orders as orders on cfds.order_id = orders.id\n inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id\n where cfd_states.state in (\n select\n state\n from cfd_states\n where cfd_id = cfds.id\n order by id desc\n limit 1\n )\n and orders.uuid = ?\n ", + "e0d3fe192c419be4fa65199d1e024ce8794e19a847be53038b7a8a33afb36bd2": { + "query": "\n select\n orders.uuid as order_id,\n orders.initial_price as price,\n orders.min_quantity as min_quantity,\n orders.max_quantity as max_quantity,\n orders.leverage as leverage,\n orders.trading_pair as trading_pair,\n orders.position as position,\n orders.origin as origin,\n orders.liquidation_price as liquidation_price,\n orders.creation_timestamp as creation_timestamp,\n orders.term as term,\n orders.oracle_event_id,\n cfds.quantity_usd as quantity_usd,\n cfd_states.state as state\n from cfds as cfds\n inner join orders as orders on cfds.order_id = orders.id\n inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id\n where cfd_states.state in (\n select\n state\n from cfd_states\n where cfd_id = cfds.id\n order by id desc\n limit 1\n )\n and orders.uuid = ?\n ", "describe": { "columns": [ { - "name": "cfd_id", + "name": "order_id", "ordinal": 0, + "type_info": "Text" + }, + { + "name": "price", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "min_quantity", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "max_quantity", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "leverage", + "ordinal": 4, "type_info": "Int64" }, + { + "name": "trading_pair", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "position", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "origin", + "ordinal": 7, + "type_info": "Text" + }, + { + "name": "liquidation_price", + "ordinal": 8, + "type_info": "Text" + }, + { + "name": "creation_timestamp", + "ordinal": 9, + "type_info": "Text" + }, + { + "name": "term", + "ordinal": 10, + "type_info": "Text" + }, + { + "name": "oracle_event_id", + "ordinal": 11, + "type_info": "Text" + }, + { + "name": "quantity_usd", + "ordinal": 12, + "type_info": "Text" + }, + { + "name": "state", + "ordinal": 13, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + false + ] + } + }, + "e45a49eb1fb4cbddc046712d9e2df2db0c18c618f0fae2133e51d82eda6bec36": { + "query": "\n select\n orders.uuid as order_id,\n orders.initial_price as price,\n orders.min_quantity as min_quantity,\n orders.max_quantity as max_quantity,\n orders.leverage as leverage,\n orders.trading_pair as trading_pair,\n orders.position as position,\n orders.origin as origin,\n orders.liquidation_price as liquidation_price,\n orders.creation_timestamp as creation_timestamp,\n orders.term as term,\n orders.oracle_event_id,\n cfds.quantity_usd as quantity_usd,\n cfd_states.state as state\n from cfds as cfds\n inner join orders as orders on cfds.order_id = orders.id\n inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id\n where cfd_states.state in (\n select\n state\n from cfd_states\n where cfd_id = cfds.id\n order by id desc\n limit 1\n )\n and orders.oracle_event_id = ?\n ", + "describe": { + "columns": [ { "name": "order_id", - "ordinal": 1, + "ordinal": 0, "type_info": "Text" }, { "name": "price", - "ordinal": 2, + "ordinal": 1, "type_info": "Text" }, { "name": "min_quantity", - "ordinal": 3, + "ordinal": 2, "type_info": "Text" }, { "name": "max_quantity", - "ordinal": 4, + "ordinal": 3, "type_info": "Text" }, { "name": "leverage", - "ordinal": 5, + "ordinal": 4, "type_info": "Int64" }, { "name": "trading_pair", - "ordinal": 6, + "ordinal": 5, "type_info": "Text" }, { "name": "position", - "ordinal": 7, + "ordinal": 6, "type_info": "Text" }, { "name": "origin", - "ordinal": 8, + "ordinal": 7, "type_info": "Text" }, { "name": "liquidation_price", - "ordinal": 9, + "ordinal": 8, "type_info": "Text" }, { "name": "creation_timestamp", - "ordinal": 10, + "ordinal": 9, "type_info": "Text" }, { "name": "term", - "ordinal": 11, + "ordinal": 10, "type_info": "Text" }, { "name": "oracle_event_id", - "ordinal": 12, + "ordinal": 11, "type_info": "Text" }, { "name": "quantity_usd", - "ordinal": 13, + "ordinal": 12, "type_info": "Text" }, { "name": "state", - "ordinal": 14, + "ordinal": 13, "type_info": "Text" } ], @@ -242,7 +327,6 @@ "Right": 1 }, "nullable": [ - true, false, false, false, diff --git a/daemon/src/db.rs b/daemon/src/db.rs index 0f933e7..adc5678 100644 --- a/daemon/src/db.rs +++ b/daemon/src/db.rs @@ -257,7 +257,6 @@ pub async fn load_cfd_by_order_id( let row = sqlx::query!( r#" select - cfds.id as cfd_id, orders.uuid as order_id, orders.initial_price as price, orders.min_quantity as min_quantity, @@ -335,7 +334,6 @@ pub async fn load_all_cfds(conn: &mut PoolConnection) -> anyhow::Result< let rows = sqlx::query!( r#" select - cfds.id as cfd_id, orders.uuid as order_id, orders.initial_price as price, orders.min_quantity as min_quantity, @@ -411,6 +409,91 @@ pub async fn load_all_cfds(conn: &mut PoolConnection) -> anyhow::Result< Ok(cfds) } +/// Loads all CFDs with the latest state as the CFD state +pub async fn load_cfds_by_oracle_event_id( + oracle_event_id: OracleEventId, + conn: &mut PoolConnection, +) -> anyhow::Result> { + let rows = sqlx::query!( + r#" + select + orders.uuid as order_id, + orders.initial_price as price, + orders.min_quantity as min_quantity, + orders.max_quantity as max_quantity, + orders.leverage as leverage, + orders.trading_pair as trading_pair, + orders.position as position, + orders.origin as origin, + orders.liquidation_price as liquidation_price, + orders.creation_timestamp as creation_timestamp, + orders.term as term, + orders.oracle_event_id, + cfds.quantity_usd as quantity_usd, + cfd_states.state as state + from cfds as cfds + inner join orders as orders on cfds.order_id = orders.id + inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id + where cfd_states.state in ( + select + state + from cfd_states + where cfd_id = cfds.id + order by id desc + limit 1 + ) + and orders.oracle_event_id = ? + "#, + oracle_event_id.0 + ) + .fetch_all(conn) + .await?; + + let cfds = rows + .iter() + .map(|row| { + let order_id = serde_json::from_str(row.order_id.as_str()).unwrap(); + let trading_pair = serde_json::from_str(row.trading_pair.as_str()).unwrap(); + let position: Position = serde_json::from_str(row.position.as_str()).unwrap(); + let price = serde_json::from_str(row.price.as_str()).unwrap(); + let min_quantity = serde_json::from_str(row.min_quantity.as_str()).unwrap(); + let max_quantity = serde_json::from_str(row.max_quantity.as_str()).unwrap(); + let leverage = Leverage(row.leverage.try_into().unwrap()); + let liquidation_price = serde_json::from_str(row.liquidation_price.as_str()).unwrap(); + let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap(); + let term = serde_json::from_str(row.term.as_str()).unwrap(); + let origin: Origin = serde_json::from_str(row.origin.as_str()).unwrap(); + let oracle_event_id = OracleEventId(row.oracle_event_id.clone()); + + let quantity = serde_json::from_str(row.quantity_usd.as_str()).unwrap(); + let latest_state = serde_json::from_str(row.state.as_str()).unwrap(); + + let order = Order { + id: order_id, + trading_pair, + position, + price, + min_quantity, + max_quantity, + leverage, + liquidation_price, + creation_timestamp, + term, + origin, + oracle_event_id, + }; + + Cfd { + order, + quantity_usd: quantity, + state: latest_state, + } + }) + .collect(); + + Ok(cfds) +} + #[cfg(test)] mod tests { use std::fs::File; @@ -444,18 +527,9 @@ mod tests { let pool = setup_test_db().await; let mut conn = pool.acquire().await.unwrap(); - let order = Order::default(); - let cfd = Cfd::new( - order.clone(), - Usd(dec!(1000)), - CfdState::OutgoingOrderRequest { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - }, - ); + let cfd = Cfd::default(); - insert_order(&order, &mut conn).await.unwrap(); + insert_order(&cfd.order, &mut conn).await.unwrap(); insert_cfd(cfd.clone(), &mut conn).await.unwrap(); let cfds_from_db = load_all_cfds(&mut conn).await.unwrap(); @@ -468,20 +542,10 @@ mod tests { let pool = setup_test_db().await; let mut conn = pool.acquire().await.unwrap(); - let order = Order::default(); - let cfd = Cfd::new( - order.clone(), - Usd(dec!(1000)), - CfdState::OutgoingOrderRequest { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - }, - ); - - let order_id = order.id; + let cfd = Cfd::default(); + let order_id = cfd.order.id; - insert_order(&order, &mut conn).await.unwrap(); + insert_order(&cfd.order, &mut conn).await.unwrap(); insert_cfd(cfd.clone(), &mut conn).await.unwrap(); let cfd_from_db = load_cfd_by_order_id(order_id, &mut conn).await.unwrap(); @@ -493,62 +557,75 @@ mod tests { let pool = setup_test_db().await; let mut conn = pool.acquire().await.unwrap(); - let order = Order::default(); - let cfd = Cfd::new( - order.clone(), - Usd(dec!(1000)), - CfdState::OutgoingOrderRequest { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - }, - ); - - let order_id = order.id; + let cfd = Cfd::default(); + let order_id = cfd.order.id; - insert_order(&order, &mut conn).await.unwrap(); + insert_order(&cfd.order, &mut conn).await.unwrap(); insert_cfd(cfd.clone(), &mut conn).await.unwrap(); let cfd_from_db = load_cfd_by_order_id(order_id, &mut conn).await.unwrap(); assert_eq!(cfd, cfd_from_db); - let order = Order::default(); - let cfd = Cfd::new( - order.clone(), - Usd(dec!(1000)), - CfdState::OutgoingOrderRequest { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - }, - ); - - let order_id = order.id; + let cfd = Cfd::default(); + let order_id = cfd.order.id; - insert_order(&order, &mut conn).await.unwrap(); + insert_order(&cfd.order, &mut conn).await.unwrap(); insert_cfd(cfd.clone(), &mut conn).await.unwrap(); let cfd_from_db = load_cfd_by_order_id(order_id, &mut conn).await.unwrap(); assert_eq!(cfd, cfd_from_db); } + #[tokio::test] + async fn test_insert_and_load_cfd_by_oracle_event_id() { + let pool = setup_test_db().await; + let mut conn = pool.acquire().await.unwrap(); + + let oracle_event_id_1 = OracleEventId("dummy_1".to_string()); + let oracle_event_id_2 = OracleEventId("dummy_2".to_string()); + + let cfd_1 = Cfd::default() + .with_order(Order::default().with_oracle_event_id(oracle_event_id_1.clone())); + + insert_order(&cfd_1.order, &mut conn).await.unwrap(); + insert_cfd(cfd_1.clone(), &mut conn).await.unwrap(); + + let cfd_from_db = load_cfds_by_oracle_event_id(oracle_event_id_1.clone(), &mut conn) + .await + .unwrap(); + assert_eq!(vec![cfd_1.clone()], cfd_from_db); + + let cfd_2 = Cfd::default() + .with_order(Order::default().with_oracle_event_id(oracle_event_id_1.clone())); + + insert_order(&cfd_2.order, &mut conn).await.unwrap(); + insert_cfd(cfd_2.clone(), &mut conn).await.unwrap(); + + let cfd_from_db = load_cfds_by_oracle_event_id(oracle_event_id_1, &mut conn) + .await + .unwrap(); + assert_eq!(vec![cfd_1, cfd_2], cfd_from_db); + + let cfd_3 = Cfd::default() + .with_order(Order::default().with_oracle_event_id(oracle_event_id_2.clone())); + + insert_order(&cfd_3.order, &mut conn).await.unwrap(); + insert_cfd(cfd_3.clone(), &mut conn).await.unwrap(); + + let cfd_from_db = load_cfds_by_oracle_event_id(oracle_event_id_2, &mut conn) + .await + .unwrap(); + assert_eq!(vec![cfd_3], cfd_from_db); + } + #[tokio::test] async fn test_insert_new_cfd_state() { let pool = setup_test_db().await; let mut conn = pool.acquire().await.unwrap(); - let order = Order::default(); - let mut cfd = Cfd::new( - order.clone(), - Usd(dec!(1000)), - CfdState::OutgoingOrderRequest { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - }, - ); + let mut cfd = Cfd::default(); - insert_order(&order, &mut conn).await.unwrap(); + insert_order(&cfd.order, &mut conn).await.unwrap(); insert_cfd(cfd.clone(), &mut conn).await.unwrap(); cfd.state = CfdState::Accepted { @@ -582,6 +659,27 @@ mod tests { pool } + impl Default for Cfd { + fn default() -> Self { + Cfd::new( + Order::default(), + Usd(dec!(1000)), + CfdState::OutgoingOrderRequest { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + }, + ) + } + } + + impl Cfd { + pub fn with_order(mut self, order: Order) -> Self { + self.order = order; + self + } + } + impl Default for Order { fn default() -> Self { Order::new( @@ -594,4 +692,11 @@ mod tests { .unwrap() } } + + impl Order { + pub fn with_oracle_event_id(mut self, oracle_event_id: OracleEventId) -> Self { + self.oracle_event_id = oracle_event_id; + self + } + } } diff --git a/daemon/src/housekeeping.rs b/daemon/src/housekeeping.rs index ca72834..2a4a51c 100644 --- a/daemon/src/housekeeping.rs +++ b/daemon/src/housekeeping.rs @@ -54,5 +54,13 @@ pub async fn rebroadcast_transactions( tracing::info!("Commit transaction published on chain: {}", txid); } + for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_cet(cfd)) { + // Double question-mark OK because if we are in PendingCet we must have been Ready before + let signed_cet = cfd.cet()??; + let txid = wallet.try_broadcast_transaction(signed_cet).await?; + + tracing::info!("CET published on chain: {}", txid); + } + Ok(()) } diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 2c4ab3c..00bd933 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -1,7 +1,7 @@ use crate::actors::log_error; use crate::db::{ insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds, - load_cfd_by_order_id, load_order_by_id, + load_cfd_by_order_id, load_cfds_by_oracle_event_id, load_order_by_id, }; use crate::maker_inc_connections::TakerCommand; use crate::model::cfd::{ @@ -266,6 +266,7 @@ impl Actor { transition_timestamp: SystemTime::now(), }, dlc: dlc.clone(), + attestation: None, }, &mut conn, ) @@ -411,8 +412,6 @@ impl Actor { }) .await?; - let nonce_pks = offer_announcement.nonce_pks.clone(); - let contract_future = setup_contract::new( self.takers.clone().into_sink().with(move |msg| { future::ok(maker_inc_connections::TakerMessage { @@ -421,7 +420,7 @@ impl Actor { }) }), receiver, - (self.oracle_pk, nonce_pks), + (self.oracle_pk, offer_announcement.clone().into()), cfd, self.wallet.clone(), Role::Maker, @@ -572,10 +571,10 @@ impl Actor { self.cfd_feed_actor_inbox .send(load_all_cfds(&mut conn).await?)?; - // TODO: Not sure that should be done here... - // Consider bubbling the refund availability up to the user, and let user trigger - // transaction publication - if let CfdState::MustRefund { .. } = new_state { + // TODO: code duplication maker/taker + if let CfdState::OpenCommitted { .. } = new_state { + self.try_cet_publication(cfd).await?; + } else if let CfdState::MustRefund { .. } = new_state { let signed_refund_tx = cfd.refund_tx()?; let txid = self .wallet @@ -606,12 +605,44 @@ impl Actor { async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { tracing::debug!( "Learnt latest oracle attestation for event: {}", - attestation.id.0 + attestation.id ); - todo!( - "Update all CFDs which care about this particular attestation, based on the event ID" - ); + let mut conn = self.db.acquire().await?; + let cfds = load_cfds_by_oracle_event_id(attestation.id.clone(), &mut conn).await?; + + for mut cfd in cfds { + cfd.handle(CfdStateChangeEvent::OracleAttestation(attestation.clone()))?; + insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn).await?; + + self.try_cet_publication(cfd).await?; + } + + Ok(()) + } + + // TODO: code duplication maker/taker + async fn try_cet_publication(&mut self, mut cfd: Cfd) -> Result<()> { + let mut conn = self.db.acquire().await?; + + match cfd.cet()? { + Ok(cet) => { + let txid = self.wallet.try_broadcast_transaction(cet).await?; + tracing::info!("CET published with txid {}", txid); + + cfd.handle(CfdStateChangeEvent::CetSent)?; + insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state, &mut conn).await?; + } + Err(not_ready_yet) => { + tracing::debug!( + "Attestation received but we are not ready to publish it yet: {:#}", + not_ready_yet + ); + return Ok(()); + } + }; + + Ok(()) } } diff --git a/daemon/src/model.rs b/daemon/src/model.rs index 0024a15..1aed42c 100644 --- a/daemon/src/model.rs +++ b/daemon/src/model.rs @@ -6,6 +6,7 @@ use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; use bdk::bitcoin::{Address, Amount}; +use std::fmt; use std::time::SystemTime; use uuid::Uuid; @@ -108,3 +109,9 @@ pub struct WalletInfo { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct OracleEventId(pub String); + +impl Display for OracleEventId { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} diff --git a/daemon/src/model/cfd.rs b/daemon/src/model/cfd.rs index 3818853..cd8dca6 100644 --- a/daemon/src/model/cfd.rs +++ b/daemon/src/model/cfd.rs @@ -1,5 +1,6 @@ use crate::model::{Leverage, OracleEventId, Percent, Position, TakerId, TradingPair, Usd}; use crate::monitor; +use crate::oracle::Attestation; use anyhow::{bail, Context, Result}; use bdk::bitcoin::secp256k1::{SecretKey, Signature}; use bdk::bitcoin::{Address, Amount, PublicKey, SignedAmount, Transaction}; @@ -173,9 +174,7 @@ pub enum CfdState { /// The taker sent an order to the maker to open the CFD but doesn't have a response yet. /// /// This state applies to taker only. - OutgoingOrderRequest { - common: CfdStateCommon, - }, + OutgoingOrderRequest { common: CfdStateCommon }, /// The maker received an order from the taker to open the CFD but doesn't have a response yet. /// @@ -188,29 +187,24 @@ pub enum CfdState { /// The maker has accepted the CFD take request, but the contract is not set up on chain yet. /// /// This state applies to taker and maker. - Accepted { - common: CfdStateCommon, - }, + Accepted { common: CfdStateCommon }, /// The maker rejected the CFD order. /// /// This state applies to taker and maker. /// This is a final state. - Rejected { - common: CfdStateCommon, - }, + Rejected { common: CfdStateCommon }, /// State used during contract setup. /// /// This state applies to taker and maker. /// All contract setup messages between taker and maker are expected to be sent in on scope. - ContractSetup { - common: CfdStateCommon, - }, + ContractSetup { common: CfdStateCommon }, PendingOpen { common: CfdStateCommon, dlc: Dlc, + attestation: Option, }, /// The CFD contract is set up on chain. @@ -219,6 +213,7 @@ pub enum CfdState { Open { common: CfdStateCommon, dlc: Dlc, + attestation: Option, }, /// The commit transaction was published but it not final yet @@ -228,6 +223,7 @@ pub enum CfdState { PendingCommit { common: CfdStateCommon, dlc: Dlc, + attestation: Option, }, // TODO: At the moment we are appending to this state. The way this is handled internally is @@ -243,19 +239,34 @@ pub enum CfdState { cet_status: CetStatus, }, - /// The CFD contract's refund transaction was published but it not final yet - MustRefund { + /// The CET was published on chain but is not final yet + /// + /// This state applies to taker and maker. + /// This state is needed, because otherwise the user does not get any feedback. + PendingCet { common: CfdStateCommon, dlc: Dlc, + cet_status: CetStatus, }, + /// The position was closed collaboratively or non-collaboratively + /// + /// This state applies to taker and maker. + /// This is a final state. + /// This is the final state for all happy-path scenarios where we had an open position and then + /// "settled" it. Settlement can be collaboratively or non-collaboratively (by publishing + /// commit + cet). + Closed { common: CfdStateCommon }, + + // TODO: Can be extended with CetStatus + /// The CFD contract's refund transaction was published but it not final yet + MustRefund { common: CfdStateCommon, dlc: Dlc }, + /// The Cfd was refunded and the refund transaction reached finality /// /// This state applies to taker and maker. /// This is a final state. - Refunded { - common: CfdStateCommon, - }, + Refunded { common: CfdStateCommon }, /// The Cfd was in a state that could not be continued after the application got interrupted /// @@ -273,8 +284,8 @@ pub enum CfdState { pub enum CetStatus { Unprepared, TimelockExpired, - OracleSigned(u64), - Ready(u64), + OracleSigned(Attestation), + Ready(Attestation), } impl CfdState { @@ -292,6 +303,8 @@ impl CfdState { CfdState::Refunded { common, .. } => common, CfdState::SetupFailed { common, .. } => common, CfdState::PendingCommit { common, .. } => common, + CfdState::PendingCet { common, .. } => common, + CfdState::Closed { common, .. } => common, }; *common @@ -341,6 +354,12 @@ impl fmt::Display for CfdState { CfdState::SetupFailed { .. } => { write!(f, "Setup Failed") } + CfdState::PendingCet { .. } => { + write!(f, "Pending CET") + } + CfdState::Closed { .. } => { + write!(f, "Closed") + } } } } @@ -506,6 +525,7 @@ impl Cfd { transition_timestamp: SystemTime::now(), }, dlc, + attestation: None, } } else { bail!( @@ -549,23 +569,34 @@ impl Cfd { }, CfdState::OpenCommitted { dlc, - cet_status: CetStatus::OracleSigned(price), + cet_status: CetStatus::OracleSigned(attestation), .. } => CfdState::OpenCommitted { common: CfdStateCommon { transition_timestamp: SystemTime::now(), }, dlc, - cet_status: CetStatus::Ready(price), + cet_status: CetStatus::Ready(attestation), }, - PendingOpen { dlc, .. } | Open { dlc, .. } => { - tracing::debug!(%order_id, "Was in unexpected state {}, jumping ahead to MustRefund", self.state); + PendingOpen { + dlc, attestation, .. + } + | Open { + dlc, attestation, .. + } + | PendingCommit { + dlc, attestation, .. + } => { + tracing::debug!(%order_id, "Was in unexpected state {}, jumping ahead to OpenCommitted", self.state); CfdState::OpenCommitted { common: CfdStateCommon { transition_timestamp: SystemTime::now(), }, dlc, - cet_status: CetStatus::TimelockExpired, + cet_status: match attestation { + None => CetStatus::TimelockExpired, + Some(attestation) => CetStatus::Ready(attestation), + }, } } _ => bail!( @@ -608,13 +639,21 @@ impl Cfd { }, } } - monitor::Event::CetFinality(_) => { - todo!("Implement state transition") - } + monitor::Event::CetFinality(_) => Closed { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + }, }, CfdStateChangeEvent::CommitTxSent => { - let dlc = if let Open { dlc, .. } | PendingOpen { dlc, .. } = self.state.clone() { - dlc + let (dlc, attestation) = if let PendingOpen { + dlc, attestation, .. + } + | Open { + dlc, attestation, .. + } = self.state.clone() + { + (dlc, attestation) } else { bail!( "Cannot transition to PendingCommit because of unexpected state {}", @@ -627,8 +666,66 @@ impl Cfd { transition_timestamp: SystemTime::now(), }, dlc, + attestation, } } + CfdStateChangeEvent::OracleAttestation(attestation) => match self.state.clone() { + CfdState::Open { dlc, .. } => CfdState::Open { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + attestation: Some(attestation), + }, + CfdState::PendingCommit { dlc, .. } => CfdState::PendingCommit { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + attestation: Some(attestation), + }, + CfdState::OpenCommitted { + dlc, + cet_status: CetStatus::Unprepared, + .. + } => CfdState::OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::OracleSigned(attestation), + }, + CfdState::OpenCommitted { + dlc, + cet_status: CetStatus::TimelockExpired, + .. + } => CfdState::OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::Ready(attestation), + }, + _ => bail!( + "Cannot transition to OpenCommitted because of unexpected state {}", + self.state + ), + }, + CfdStateChangeEvent::CetSent => match self.state.clone() { + CfdState::OpenCommitted { + common, + dlc, + cet_status, + } => CfdState::PendingCet { + common, + dlc, + cet_status, + }, + _ => bail!( + "Cannot transition to PendingCet because of unexpected state {}", + self.state + ), + }, }; self.state = new_state.clone(); @@ -689,7 +786,6 @@ impl Cfd { &dlc.identity, )); - // TODO: verify that the dlc's `publish` sk corresponds to the decryption_sk we need here let counterparty_sig = dlc.commit.1.decrypt(&dlc.publish)?; let counterparty_pubkey = dlc.identity_counterparty; @@ -703,6 +799,72 @@ impl Cfd { Ok(signed_commit_tx) } + pub fn cet(&self) -> Result> { + let (dlc, attestation) = match self.state.clone() { + CfdState::OpenCommitted { + dlc, + cet_status: CetStatus::Ready(attestation), + .. + } + | CfdState::PendingCet { + dlc, + cet_status: CetStatus::Ready(attestation), + .. + } => (dlc, attestation), + CfdState::OpenCommitted { .. } + | CfdState::Open { .. } + | CfdState::PendingCommit { .. } => { + return Ok(Err(NotReadyYet)); + } + _ => bail!("Cannot publish CET in state {}", self.state.clone()), + }; + + let cets = dlc + .cets + .get(&attestation.id) + .context("Unable to find oracle event id within the cets of the dlc")?; + + let Cet { + tx: cet, + adaptor_sig: encsig, + n_bits, + .. + } = cets + .iter() + .find(|Cet { range, .. }| range.contains(&attestation.price)) + .context("Price out of range of cets")?; + + let oracle_attestations = attestation.scalars; + + let mut decryption_sk = oracle_attestations[0]; + for oracle_attestation in oracle_attestations[1..*n_bits].iter() { + decryption_sk.add_assign(oracle_attestation.as_ref())?; + } + + let sig_hash = spending_tx_sighash( + cet, + &dlc.commit.2, + Amount::from_sat(dlc.commit.0.output[0].value), + ); + let our_sig = SECP256K1.sign(&sig_hash, &dlc.identity); + let our_pubkey = PublicKey::new(bdk::bitcoin::secp256k1::PublicKey::from_secret_key( + SECP256K1, + &dlc.identity, + )); + + let counterparty_sig = encsig.decrypt(&decryption_sk)?; + let counterparty_pubkey = dlc.identity_counterparty; + + let signed_cet = finalize_spend_transaction( + cet.clone(), + &dlc.commit.2, + (our_pubkey, our_sig), + (counterparty_pubkey, counterparty_sig), + )?; + + Ok(Ok(signed_cet)) + } + pub fn pending_open_dlc(&self) -> Option { if let CfdState::PendingOpen { dlc, .. } = self.state.clone() { Some(dlc) @@ -719,6 +881,10 @@ impl Cfd { matches!(self.state.clone(), CfdState::PendingCommit { .. }) } + pub fn is_pending_cet(&self) -> bool { + matches!(self.state.clone(), CfdState::PendingCet { .. }) + } + pub fn is_cleanup(&self) -> bool { matches!( self.state.clone(), @@ -734,12 +900,18 @@ impl Cfd { } } +#[derive(thiserror::Error, Debug, Clone, Copy)] +#[error("The cfd is not committed yet")] +pub struct NotReadyYet; + #[derive(Debug, Clone)] pub enum CfdStateChangeEvent { // TODO: group other events by actors into enums and add them here so we can bundle all // transitions into cfd.transition_to(...) Monitor(monitor::Event), CommitTxSent, + OracleAttestation(Attestation), + CetSent, } /// Returns the Profit/Loss (P/L) as Bitcoin. Losses are capped by the provided margin @@ -1078,6 +1250,17 @@ mod tests { } } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct Cet { + pub tx: Transaction, + pub adaptor_sig: EcdsaAdaptorSignature, + + // TODO: Range + number of digits (usize) could be represented as Digits similar to what we do + // in the protocol lib + pub range: RangeInclusive, + pub n_bits: usize, +} + /// Contains all data we've assembled about the CFD through the setup protocol. /// /// All contained signatures are the signatures of THE OTHER PARTY. @@ -1093,6 +1276,6 @@ pub struct Dlc { /// The fully signed lock transaction ready to be published on chain pub lock: (Transaction, Descriptor), pub commit: (Transaction, EcdsaAdaptorSignature, Descriptor), - pub cets: HashMap)>>, + pub cets: HashMap>, pub refund: (Transaction, Signature), } diff --git a/daemon/src/monitor.rs b/daemon/src/monitor.rs index 2d907c9..40cf801 100644 --- a/daemon/src/monitor.rs +++ b/daemon/src/monitor.rs @@ -1,6 +1,8 @@ use crate::actors::log_error; use crate::model::cfd::{CetStatus, Cfd, CfdState, Dlc, OrderId}; -use crate::oracle; +use crate::model::OracleEventId; +use crate::oracle::Attestation; +use crate::{model, oracle}; use anyhow::{Context, Result}; use async_trait::async_trait; use bdk::bitcoin::{PublicKey, Script, Txid}; @@ -21,11 +23,19 @@ pub struct StartMonitoring { pub params: MonitorParams, } +#[derive(Clone)] +pub struct Cet { + txid: Txid, + script: Script, + range: RangeInclusive, + n_bits: usize, +} + #[derive(Clone)] pub struct MonitorParams { lock: (Txid, Descriptor), commit: (Txid, Descriptor), - cets: HashMap)>>, + cets: HashMap>, refund: (Txid, Script, u32), } @@ -88,13 +98,19 @@ where actor.monitor_commit_refund_timelock(¶ms, cfd.order.id); actor.monitor_refund_finality(¶ms,cfd.order.id); } - CfdState::OpenCommitted { dlc, cet_status, .. } => { + CfdState::OpenCommitted { dlc, cet_status, .. } + | CfdState::PendingCet { dlc, cet_status, .. } => { let params = MonitorParams::from_dlc_and_timelocks(dlc.clone(), cfd.refund_timelock_in_blocks()); actor.cfds.insert(cfd.order.id, params.clone()); match cet_status { - CetStatus::Unprepared - | CetStatus::OracleSigned(_) => { + CetStatus::Unprepared => { + actor.monitor_commit_cet_timelock(¶ms, cfd.order.id); + actor.monitor_commit_refund_timelock(¶ms, cfd.order.id); + actor.monitor_refund_finality(¶ms,cfd.order.id); + } + CetStatus::OracleSigned(attestation) => { + actor.monitor_cet_finality(map_cets(dlc.cets, dlc.address.script_pubkey()), attestation, cfd.order.id)?; actor.monitor_commit_cet_timelock(¶ms, cfd.order.id); actor.monitor_commit_refund_timelock(¶ms, cfd.order.id); actor.monitor_refund_finality(¶ms,cfd.order.id); @@ -103,17 +119,14 @@ where actor.monitor_commit_refund_timelock(¶ms, cfd.order.id); actor.monitor_refund_finality(¶ms,cfd.order.id); } - CetStatus::Ready(_price) => { - // TODO: monitor CET finality - + CetStatus::Ready(attestation) => { + actor.monitor_cet_finality(map_cets(dlc.cets, dlc.address.script_pubkey()), attestation, cfd.order.id)?; actor.monitor_commit_refund_timelock(¶ms, cfd.order.id); actor.monitor_refund_finality(¶ms,cfd.order.id); } } } CfdState::MustRefund { dlc, .. } => { - // TODO: CET monitoring (?) - note: would require to add CetStatus information to MustRefund - let params = MonitorParams::from_dlc_and_timelocks(dlc.clone(), cfd.refund_timelock_in_blocks()); actor.cfds.insert(cfd.order.id, params.clone()); @@ -128,6 +141,7 @@ where | CfdState::ContractSetup { .. } // final states + | CfdState::Closed { .. } | CfdState::Rejected { .. } | CfdState::Refunded { .. } | CfdState::SetupFailed { .. } => () @@ -192,6 +206,36 @@ where .push((ScriptStatus::finality(), Event::RefundFinality(order_id))); } + fn monitor_cet_finality( + &mut self, + cets: HashMap>, + attestation: Attestation, + order_id: OrderId, + ) -> Result<()> { + let cets = cets + .get(&attestation.id) + .context("No CET for oracle event found")?; + + let (txid, script_pubkey) = cets + .iter() + .find_map( + |Cet { + txid, + script, + range, + .. + }| { range.contains(&attestation.price).then(|| (txid, script)) }, + ) + .context("No price range match for oracle attestation")?; + + self.awaiting_status + .entry((*txid, script_pubkey.clone())) + .or_default() + .push((ScriptStatus::finality(), Event::CetFinality(order_id))); + + Ok(()) + } + async fn sync(&mut self) -> Result<()> { // Fetch the latest block for storing the height. // We do not act on this subscription after this call, as we cannot rely on @@ -221,23 +265,7 @@ where async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { for (order_id, MonitorParams { cets, .. }) in self.cfds.clone().into_iter() { - let cets = cets - .get(&attestation.id.0) - .context("No CET for oracle event found")?; - let (txid, script_pubkey) = - match cets.iter().find_map(|(txid, script_pubkey, range)| { - range - .contains(&attestation.price) - .then(|| (txid, script_pubkey)) - }) { - Some(cet) => cet, - None => continue, - }; - - self.awaiting_status - .entry((*txid, script_pubkey.clone())) - .or_default() - .push((ScriptStatus::finality(), Event::CetFinality(order_id))); + self.monitor_cet_finality(cets, attestation.clone(), order_id)?; } Ok(()) @@ -469,18 +497,7 @@ impl MonitorParams { MonitorParams { lock: (dlc.lock.0.txid(), dlc.lock.1), commit: (dlc.commit.0.txid(), dlc.commit.2), - cets: dlc - .cets - .iter() - .map(|(event_id, cets)| { - ( - event_id.clone(), - cets.iter() - .map(|(tx, _, range)| (tx.txid(), script_pubkey.clone(), range.clone())) - .collect::>(), - ) - }) - .collect::>(), + cets: map_cets(dlc.cets, script_pubkey.clone()), refund: ( dlc.refund.0.txid(), script_pubkey, @@ -490,6 +507,31 @@ impl MonitorParams { } } +fn map_cets( + cets: HashMap>, + script_pubkey: Script, +) -> HashMap> { + cets.iter() + .map(|(event_id, cets)| { + ( + event_id.clone(), + cets.iter() + .map( + |model::cfd::Cet { + tx, range, n_bits, .. + }| Cet { + txid: tx.txid(), + script: script_pubkey.clone(), + range: range.clone(), + n_bits: *n_bits, + }, + ) + .collect::>(), + ) + }) + .collect() +} + impl xtra::Message for Event { type Result = (); } diff --git a/daemon/src/oracle.rs b/daemon/src/oracle.rs index 8a47ee1..fbf05d4 100644 --- a/daemon/src/oracle.rs +++ b/daemon/src/oracle.rs @@ -8,6 +8,7 @@ use futures::TryStreamExt; use rocket::time::format_description::FormatItem; use rocket::time::macros::format_description; use rocket::time::{Duration, OffsetDateTime, Time}; +use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::convert::TryFrom; @@ -208,12 +209,21 @@ pub struct Announcement { pub nonce_pks: Vec, } +impl From for cfd_protocol::Announcement { + fn from(announcement: Announcement) -> Self { + cfd_protocol::Announcement { + id: announcement.id.0, + nonce_pks: announcement.nonce_pks, + } + } +} + #[derive(Debug, Clone)] pub struct Announcements(pub [Announcement; 24]); // TODO: Implement real deserialization once price attestation is // implemented in `olivia` -#[derive(Debug, Clone, serde::Deserialize, PartialEq)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(try_from = "olivia_api::Response")] pub struct Attestation { pub id: OracleEventId, @@ -426,7 +436,7 @@ mod olivia_api { let deserialized = serde_json::from_str::(json).unwrap(); let expected = oracle::Attestation { - id: "/x/BitMEX/BXBT/2021-10-04T22:00:00.price[n:20]".to_string(), + id: OracleEventId("/x/BitMEX/BXBT/2021-10-04T22:00:00.price[n:20]".to_string()), price: 48935, scalars: vec![ "1327b3bd0f1faf45d6fed6c96d0c158da22a2033a6fed98bed036df0a4eef484" diff --git a/daemon/src/setup_contract.rs b/daemon/src/setup_contract.rs index 299bfe6..3a835ed 100644 --- a/daemon/src/setup_contract.rs +++ b/daemon/src/setup_contract.rs @@ -1,4 +1,5 @@ -use crate::model::cfd::{Cfd, Dlc, Role}; +use crate::model::cfd::{Cet, Cfd, Dlc, Role}; +use crate::model::OracleEventId; use crate::wallet::Wallet; use crate::wire::{Msg0, Msg1, Msg2, SetupMsg}; use crate::{model, payout_curve}; @@ -26,7 +27,7 @@ use std::ops::RangeInclusive; pub async fn new( mut sink: impl Sink + Unpin, mut stream: impl FusedStream + Unpin, - (oracle_pk, nonce_pks): (schnorrsig::PublicKey, Vec), + (oracle_pk, announcement): (schnorrsig::PublicKey, Announcement), cfd: Cfd, wallet: Wallet, role: Role, @@ -68,10 +69,7 @@ pub async fn new( } let payouts = HashMap::from_iter([( - Announcement { - id: "dummy_id_to_be_replaced".to_string(), - nonce_pks: nonce_pks.clone(), - }, + announcement.clone(), payout_curve::calculate( cfd.order.price, cfd.quantity_usd, @@ -142,7 +140,7 @@ pub async fn new( .context("Expect event to exist in msg")?; verify_cets( - (&oracle_pk, &nonce_pks), + (&oracle_pk, &announcement.nonce_pks), ¶ms.other, own_grouped_cets.cets.as_slice(), other_cets.as_slice(), @@ -207,10 +205,15 @@ pub async fn new( digits.range() ) })?; - Ok((tx, *other_encsig, digits.range())) + Ok(Cet { + tx, + adaptor_sig: *other_encsig, + range: digits.range(), + n_bits: digits.len(), + }) }) .collect::>>()?; - Ok((event_id, cets)) + Ok((OracleEventId(event_id), cets)) }) .collect::>>()?; diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 1b92081..82f3827 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -1,7 +1,7 @@ use crate::actors::log_error; use crate::db::{ insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds, - load_cfd_by_order_id, load_order_by_id, + load_cfd_by_order_id, load_cfds_by_oracle_event_id, load_order_by_id, }; use crate::model::cfd::{ Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId, Origin, Role, @@ -262,15 +262,13 @@ impl Actor { }) .await?; - let nonce_pks = offer_announcement.nonce_pks.clone(); - let contract_future = setup_contract::new( self.send_to_maker .clone() .into_sink() .with(|msg| future::ok(wire::TakerToMaker::Protocol(msg))), receiver, - (self.oracle_pk, nonce_pks), + (self.oracle_pk, offer_announcement.clone().into()), cfd, self.wallet.clone(), Role::Taker, @@ -345,6 +343,7 @@ impl Actor { transition_timestamp: SystemTime::now(), }, dlc: dlc.clone(), + attestation: None, }, &mut conn, ) @@ -387,10 +386,10 @@ impl Actor { self.cfd_feed_actor_inbox .send(load_all_cfds(&mut conn).await?)?; - // TODO: Not sure that should be done here... - // Consider bubbling the refund availability up to the user, and let user trigger - // transaction publication - if let CfdState::MustRefund { .. } = new_state { + // TODO: code duplicateion maker/taker + if let CfdState::OpenCommitted { .. } = new_state { + self.try_cet_publication(cfd).await?; + } else if let CfdState::MustRefund { .. } = new_state { let signed_refund_tx = cfd.refund_tx()?; let txid = self .wallet @@ -403,7 +402,7 @@ impl Actor { Ok(()) } - // TODO: Duplicated with maker + // TODO: code duplicateion maker/taker async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { let mut conn = self.db.acquire().await?; let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; @@ -444,12 +443,44 @@ impl Actor { async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { tracing::debug!( "Learnt latest oracle attestation for event: {}", - attestation.id.0 + attestation.id ); - todo!( - "Update all CFDs which care about this particular attestation, based on the event ID" - ); + let mut conn = self.db.acquire().await?; + let cfds = load_cfds_by_oracle_event_id(attestation.id.clone(), &mut conn).await?; + + for mut cfd in cfds { + cfd.handle(CfdStateChangeEvent::OracleAttestation(attestation.clone()))?; + insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn).await?; + + self.try_cet_publication(cfd).await?; + } + + Ok(()) + } + + // TODO: code duplication maker/taker + async fn try_cet_publication(&mut self, mut cfd: Cfd) -> Result<()> { + let mut conn = self.db.acquire().await?; + + match cfd.cet()? { + Ok(cet) => { + let txid = self.wallet.try_broadcast_transaction(cet).await?; + tracing::info!("CET published with txid {}", txid); + + cfd.handle(CfdStateChangeEvent::CetSent)?; + insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state, &mut conn).await?; + } + Err(not_ready_yet) => { + tracing::debug!( + "Attestation received but we are not ready to publish it yet: {:#}", + not_ready_yet + ); + return Ok(()); + } + }; + + Ok(()) } } diff --git a/daemon/src/to_sse_event.rs b/daemon/src/to_sse_event.rs index 38974db..621f66f 100644 --- a/daemon/src/to_sse_event.rs +++ b/daemon/src/to_sse_event.rs @@ -66,11 +66,13 @@ pub enum CfdState { PendingOpen, Open, PendingCommit, + PendingCet, OpenCommitted, IncomingSettlementProposal, OutgoingSettlementProposal, IncomingRollOverProposal, OutgoingRollOverProposal, + Closed, MustRefund, Refunded, SetupFailed, @@ -251,6 +253,8 @@ fn to_cfd_state( model::cfd::CfdState::Refunded { .. } => CfdState::Refunded, model::cfd::CfdState::SetupFailed { .. } => CfdState::SetupFailed, model::cfd::CfdState::PendingCommit { .. } => CfdState::PendingCommit, + model::cfd::CfdState::PendingCet { .. } => CfdState::PendingCet, + model::cfd::CfdState::Closed { .. } => CfdState::Closed, }, Some(UpdateCfdProposal::RollOverProposal { direction: SettlementKind::Outgoing, diff --git a/frontend/src/components/Types.tsx b/frontend/src/components/Types.tsx index 0161752..acc3462 100644 --- a/frontend/src/components/Types.tsx +++ b/frontend/src/components/Types.tsx @@ -87,6 +87,10 @@ export class State { return "Refunded"; case StateKey.SETUP_FAILED: return "Setup Failed"; + case StateKey.PENDING_CET: + return "Pending CET"; + case StateKey.CLOSED: + return "Closed"; } } @@ -107,6 +111,7 @@ export class State { case StateKey.PENDING_COMMIT: case StateKey.OPEN_COMMITTED: case StateKey.MUST_REFUND: + case StateKey.PENDING_CET: return orange; case StateKey.OUTGOING_ORDER_REQUEST: @@ -119,6 +124,7 @@ export class State { case StateKey.PENDING_OPEN: case StateKey.REFUNDED: case StateKey.SETUP_FAILED: + case StateKey.CLOSED: return default_color; } } @@ -140,6 +146,7 @@ export class State { case StateKey.MUST_REFUND: case StateKey.OUTGOING_SETTLEMENT_PROPOSAL: case StateKey.OUTGOING_ROLL_OVER_PROPOSAL: + case StateKey.PENDING_CET: return StateGroupKey.OPEN; case StateKey.INCOMING_SETTLEMENT_PROPOSAL: @@ -151,6 +158,7 @@ export class State { case StateKey.REJECTED: case StateKey.REFUNDED: case StateKey.SETUP_FAILED: + case StateKey.CLOSED: return StateGroupKey.CLOSED; } } @@ -176,6 +184,7 @@ const enum StateKey { PENDING_OPEN = "PendingOpen", OPEN = "Open", PENDING_COMMIT = "PendingCommit", + PENDING_CET = "PendingCet", OPEN_COMMITTED = "OpenCommitted", OUTGOING_SETTLEMENT_PROPOSAL = "OutgoingSettlementProposal", INCOMING_SETTLEMENT_PROPOSAL = "IncomingSettlementProposal", @@ -184,6 +193,7 @@ const enum StateKey { MUST_REFUND = "MustRefund", REFUNDED = "Refunded", SETUP_FAILED = "SetupFailed", + CLOSED = "Closed", } export enum StateGroupKey {