diff --git a/daemon/sqlx-data.json b/daemon/sqlx-data.json index 3cd8045..83cf7c2 100644 --- a/daemon/sqlx-data.json +++ b/daemon/sqlx-data.json @@ -1,82 +1,95 @@ { "db": "SQLite", - "41fb3bea22bde82a79aee6096d579a16494a20cfcfb2e8cfffe0a56073460bbf": { - "query": "\n select\n cfds.id as cfd_id,\n orders.uuid as order_id,\n orders.initial_price as price,\n orders.min_quantity as min_quantity,\n orders.max_quantity as max_quantity,\n orders.leverage as leverage,\n orders.trading_pair as trading_pair,\n orders.position as position,\n orders.origin as origin,\n orders.liquidation_price as liquidation_price,\n orders.creation_timestamp as creation_timestamp,\n orders.term as term,\n orders.oracle_event_id,\n cfds.quantity_usd as quantity_usd,\n cfd_states.state as state\n from cfds as cfds\n inner join orders as orders on cfds.order_id = orders.id\n inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id\n where cfd_states.state in (\n select\n state\n from cfd_states\n where cfd_id = cfds.id\n order by id desc\n limit 1\n )\n ", + "50abbb297394739ec9d85917f8c32aa8bcfa0bfe140b24e9eeda4ce8d30d4f8d": { + "query": "\n select\n state\n from cfd_states\n where cfd_id = ?\n order by id desc\n limit 1;\n ", "describe": { "columns": [ { - "name": "cfd_id", + "name": "state", "ordinal": 0, - "type_info": "Int64" - }, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false + ] + } + }, + "63c8fd1a1512b55c19feea463dbb6a0fdf98a325801ba8677e2a33fd3ee1ebb9": { + "query": "\n select\n orders.uuid as order_id,\n orders.initial_price as price,\n orders.min_quantity as min_quantity,\n orders.max_quantity as max_quantity,\n orders.leverage as leverage,\n orders.trading_pair as trading_pair,\n orders.position as position,\n orders.origin as origin,\n orders.liquidation_price as liquidation_price,\n orders.creation_timestamp as creation_timestamp,\n orders.term as term,\n orders.oracle_event_id,\n cfds.quantity_usd as quantity_usd,\n cfd_states.state as state\n from cfds as cfds\n inner join orders as orders on cfds.order_id = orders.id\n inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id\n where cfd_states.state in (\n select\n state\n from cfd_states\n where cfd_id = cfds.id\n order by id desc\n limit 1\n )\n ", + "describe": { + "columns": [ { "name": "order_id", - "ordinal": 1, + "ordinal": 0, "type_info": "Text" }, { "name": "price", - "ordinal": 2, + "ordinal": 1, "type_info": "Text" }, { "name": "min_quantity", - "ordinal": 3, + "ordinal": 2, "type_info": "Text" }, { "name": "max_quantity", - "ordinal": 4, + "ordinal": 3, "type_info": "Text" }, { "name": "leverage", - "ordinal": 5, + "ordinal": 4, "type_info": "Int64" }, { "name": "trading_pair", - "ordinal": 6, + "ordinal": 5, "type_info": "Text" }, { "name": "position", - "ordinal": 7, + "ordinal": 6, "type_info": "Text" }, { "name": "origin", - "ordinal": 8, + "ordinal": 7, "type_info": "Text" }, { "name": "liquidation_price", - "ordinal": 9, + "ordinal": 8, "type_info": "Text" }, { "name": "creation_timestamp", - "ordinal": 10, + "ordinal": 9, "type_info": "Text" }, { "name": "term", - "ordinal": 11, + "ordinal": 10, "type_info": "Text" }, { "name": "oracle_event_id", - "ordinal": 12, + "ordinal": 11, "type_info": "Text" }, { "name": "quantity_usd", - "ordinal": 13, + "ordinal": 12, "type_info": "Text" }, { "name": "state", - "ordinal": 14, + "ordinal": 13, "type_info": "Text" } ], @@ -97,25 +110,6 @@ false, false, false, - false, - false - ] - } - }, - "50abbb297394739ec9d85917f8c32aa8bcfa0bfe140b24e9eeda4ce8d30d4f8d": { - "query": "\n select\n state\n from cfd_states\n where cfd_id = ?\n order by id desc\n limit 1;\n ", - "describe": { - "columns": [ - { - "name": "state", - "ordinal": 0, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ false ] } @@ -158,83 +152,174 @@ "nullable": [] } }, - "c404a4eebb7118fd4e716fe4155e011a52e12a3594b698ca4f0662674cd067f8": { - "query": "\n select\n cfds.id as cfd_id,\n orders.uuid as order_id,\n orders.initial_price as price,\n orders.min_quantity as min_quantity,\n orders.max_quantity as max_quantity,\n orders.leverage as leverage,\n orders.trading_pair as trading_pair,\n orders.position as position,\n orders.origin as origin,\n orders.liquidation_price as liquidation_price,\n orders.creation_timestamp as creation_timestamp,\n orders.term as term,\n orders.oracle_event_id,\n cfds.quantity_usd as quantity_usd,\n cfd_states.state as state\n from cfds as cfds\n inner join orders as orders on cfds.order_id = orders.id\n inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id\n where cfd_states.state in (\n select\n state\n from cfd_states\n where cfd_id = cfds.id\n order by id desc\n limit 1\n )\n and orders.uuid = ?\n ", + "e0d3fe192c419be4fa65199d1e024ce8794e19a847be53038b7a8a33afb36bd2": { + "query": "\n select\n orders.uuid as order_id,\n orders.initial_price as price,\n orders.min_quantity as min_quantity,\n orders.max_quantity as max_quantity,\n orders.leverage as leverage,\n orders.trading_pair as trading_pair,\n orders.position as position,\n orders.origin as origin,\n orders.liquidation_price as liquidation_price,\n orders.creation_timestamp as creation_timestamp,\n orders.term as term,\n orders.oracle_event_id,\n cfds.quantity_usd as quantity_usd,\n cfd_states.state as state\n from cfds as cfds\n inner join orders as orders on cfds.order_id = orders.id\n inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id\n where cfd_states.state in (\n select\n state\n from cfd_states\n where cfd_id = cfds.id\n order by id desc\n limit 1\n )\n and orders.uuid = ?\n ", "describe": { "columns": [ { - "name": "cfd_id", + "name": "order_id", "ordinal": 0, + "type_info": "Text" + }, + { + "name": "price", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "min_quantity", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "max_quantity", + "ordinal": 3, + "type_info": "Text" + }, + { + "name": "leverage", + "ordinal": 4, "type_info": "Int64" }, + { + "name": "trading_pair", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "position", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "origin", + "ordinal": 7, + "type_info": "Text" + }, + { + "name": "liquidation_price", + "ordinal": 8, + "type_info": "Text" + }, + { + "name": "creation_timestamp", + "ordinal": 9, + "type_info": "Text" + }, + { + "name": "term", + "ordinal": 10, + "type_info": "Text" + }, + { + "name": "oracle_event_id", + "ordinal": 11, + "type_info": "Text" + }, + { + "name": "quantity_usd", + "ordinal": 12, + "type_info": "Text" + }, + { + "name": "state", + "ordinal": 13, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + false, + false + ] + } + }, + "e45a49eb1fb4cbddc046712d9e2df2db0c18c618f0fae2133e51d82eda6bec36": { + "query": "\n select\n orders.uuid as order_id,\n orders.initial_price as price,\n orders.min_quantity as min_quantity,\n orders.max_quantity as max_quantity,\n orders.leverage as leverage,\n orders.trading_pair as trading_pair,\n orders.position as position,\n orders.origin as origin,\n orders.liquidation_price as liquidation_price,\n orders.creation_timestamp as creation_timestamp,\n orders.term as term,\n orders.oracle_event_id,\n cfds.quantity_usd as quantity_usd,\n cfd_states.state as state\n from cfds as cfds\n inner join orders as orders on cfds.order_id = orders.id\n inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id\n where cfd_states.state in (\n select\n state\n from cfd_states\n where cfd_id = cfds.id\n order by id desc\n limit 1\n )\n and orders.oracle_event_id = ?\n ", + "describe": { + "columns": [ { "name": "order_id", - "ordinal": 1, + "ordinal": 0, "type_info": "Text" }, { "name": "price", - "ordinal": 2, + "ordinal": 1, "type_info": "Text" }, { "name": "min_quantity", - "ordinal": 3, + "ordinal": 2, "type_info": "Text" }, { "name": "max_quantity", - "ordinal": 4, + "ordinal": 3, "type_info": "Text" }, { "name": "leverage", - "ordinal": 5, + "ordinal": 4, "type_info": "Int64" }, { "name": "trading_pair", - "ordinal": 6, + "ordinal": 5, "type_info": "Text" }, { "name": "position", - "ordinal": 7, + "ordinal": 6, "type_info": "Text" }, { "name": "origin", - "ordinal": 8, + "ordinal": 7, "type_info": "Text" }, { "name": "liquidation_price", - "ordinal": 9, + "ordinal": 8, "type_info": "Text" }, { "name": "creation_timestamp", - "ordinal": 10, + "ordinal": 9, "type_info": "Text" }, { "name": "term", - "ordinal": 11, + "ordinal": 10, "type_info": "Text" }, { "name": "oracle_event_id", - "ordinal": 12, + "ordinal": 11, "type_info": "Text" }, { "name": "quantity_usd", - "ordinal": 13, + "ordinal": 12, "type_info": "Text" }, { "name": "state", - "ordinal": 14, + "ordinal": 13, "type_info": "Text" } ], @@ -242,7 +327,6 @@ "Right": 1 }, "nullable": [ - true, false, false, false, diff --git a/daemon/src/db.rs b/daemon/src/db.rs index 0f933e7..adc5678 100644 --- a/daemon/src/db.rs +++ b/daemon/src/db.rs @@ -257,7 +257,6 @@ pub async fn load_cfd_by_order_id( let row = sqlx::query!( r#" select - cfds.id as cfd_id, orders.uuid as order_id, orders.initial_price as price, orders.min_quantity as min_quantity, @@ -335,7 +334,6 @@ pub async fn load_all_cfds(conn: &mut PoolConnection) -> anyhow::Result< let rows = sqlx::query!( r#" select - cfds.id as cfd_id, orders.uuid as order_id, orders.initial_price as price, orders.min_quantity as min_quantity, @@ -411,6 +409,91 @@ pub async fn load_all_cfds(conn: &mut PoolConnection) -> anyhow::Result< Ok(cfds) } +/// Loads all CFDs with the latest state as the CFD state +pub async fn load_cfds_by_oracle_event_id( + oracle_event_id: OracleEventId, + conn: &mut PoolConnection, +) -> anyhow::Result> { + let rows = sqlx::query!( + r#" + select + orders.uuid as order_id, + orders.initial_price as price, + orders.min_quantity as min_quantity, + orders.max_quantity as max_quantity, + orders.leverage as leverage, + orders.trading_pair as trading_pair, + orders.position as position, + orders.origin as origin, + orders.liquidation_price as liquidation_price, + orders.creation_timestamp as creation_timestamp, + orders.term as term, + orders.oracle_event_id, + cfds.quantity_usd as quantity_usd, + cfd_states.state as state + from cfds as cfds + inner join orders as orders on cfds.order_id = orders.id + inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id + where cfd_states.state in ( + select + state + from cfd_states + where cfd_id = cfds.id + order by id desc + limit 1 + ) + and orders.oracle_event_id = ? + "#, + oracle_event_id.0 + ) + .fetch_all(conn) + .await?; + + let cfds = rows + .iter() + .map(|row| { + let order_id = serde_json::from_str(row.order_id.as_str()).unwrap(); + let trading_pair = serde_json::from_str(row.trading_pair.as_str()).unwrap(); + let position: Position = serde_json::from_str(row.position.as_str()).unwrap(); + let price = serde_json::from_str(row.price.as_str()).unwrap(); + let min_quantity = serde_json::from_str(row.min_quantity.as_str()).unwrap(); + let max_quantity = serde_json::from_str(row.max_quantity.as_str()).unwrap(); + let leverage = Leverage(row.leverage.try_into().unwrap()); + let liquidation_price = serde_json::from_str(row.liquidation_price.as_str()).unwrap(); + let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap(); + let term = serde_json::from_str(row.term.as_str()).unwrap(); + let origin: Origin = serde_json::from_str(row.origin.as_str()).unwrap(); + let oracle_event_id = OracleEventId(row.oracle_event_id.clone()); + + let quantity = serde_json::from_str(row.quantity_usd.as_str()).unwrap(); + let latest_state = serde_json::from_str(row.state.as_str()).unwrap(); + + let order = Order { + id: order_id, + trading_pair, + position, + price, + min_quantity, + max_quantity, + leverage, + liquidation_price, + creation_timestamp, + term, + origin, + oracle_event_id, + }; + + Cfd { + order, + quantity_usd: quantity, + state: latest_state, + } + }) + .collect(); + + Ok(cfds) +} + #[cfg(test)] mod tests { use std::fs::File; @@ -444,18 +527,9 @@ mod tests { let pool = setup_test_db().await; let mut conn = pool.acquire().await.unwrap(); - let order = Order::default(); - let cfd = Cfd::new( - order.clone(), - Usd(dec!(1000)), - CfdState::OutgoingOrderRequest { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - }, - ); + let cfd = Cfd::default(); - insert_order(&order, &mut conn).await.unwrap(); + insert_order(&cfd.order, &mut conn).await.unwrap(); insert_cfd(cfd.clone(), &mut conn).await.unwrap(); let cfds_from_db = load_all_cfds(&mut conn).await.unwrap(); @@ -468,20 +542,10 @@ mod tests { let pool = setup_test_db().await; let mut conn = pool.acquire().await.unwrap(); - let order = Order::default(); - let cfd = Cfd::new( - order.clone(), - Usd(dec!(1000)), - CfdState::OutgoingOrderRequest { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - }, - ); - - let order_id = order.id; + let cfd = Cfd::default(); + let order_id = cfd.order.id; - insert_order(&order, &mut conn).await.unwrap(); + insert_order(&cfd.order, &mut conn).await.unwrap(); insert_cfd(cfd.clone(), &mut conn).await.unwrap(); let cfd_from_db = load_cfd_by_order_id(order_id, &mut conn).await.unwrap(); @@ -493,62 +557,75 @@ mod tests { let pool = setup_test_db().await; let mut conn = pool.acquire().await.unwrap(); - let order = Order::default(); - let cfd = Cfd::new( - order.clone(), - Usd(dec!(1000)), - CfdState::OutgoingOrderRequest { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - }, - ); - - let order_id = order.id; + let cfd = Cfd::default(); + let order_id = cfd.order.id; - insert_order(&order, &mut conn).await.unwrap(); + insert_order(&cfd.order, &mut conn).await.unwrap(); insert_cfd(cfd.clone(), &mut conn).await.unwrap(); let cfd_from_db = load_cfd_by_order_id(order_id, &mut conn).await.unwrap(); assert_eq!(cfd, cfd_from_db); - let order = Order::default(); - let cfd = Cfd::new( - order.clone(), - Usd(dec!(1000)), - CfdState::OutgoingOrderRequest { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - }, - ); - - let order_id = order.id; + let cfd = Cfd::default(); + let order_id = cfd.order.id; - insert_order(&order, &mut conn).await.unwrap(); + insert_order(&cfd.order, &mut conn).await.unwrap(); insert_cfd(cfd.clone(), &mut conn).await.unwrap(); let cfd_from_db = load_cfd_by_order_id(order_id, &mut conn).await.unwrap(); assert_eq!(cfd, cfd_from_db); } + #[tokio::test] + async fn test_insert_and_load_cfd_by_oracle_event_id() { + let pool = setup_test_db().await; + let mut conn = pool.acquire().await.unwrap(); + + let oracle_event_id_1 = OracleEventId("dummy_1".to_string()); + let oracle_event_id_2 = OracleEventId("dummy_2".to_string()); + + let cfd_1 = Cfd::default() + .with_order(Order::default().with_oracle_event_id(oracle_event_id_1.clone())); + + insert_order(&cfd_1.order, &mut conn).await.unwrap(); + insert_cfd(cfd_1.clone(), &mut conn).await.unwrap(); + + let cfd_from_db = load_cfds_by_oracle_event_id(oracle_event_id_1.clone(), &mut conn) + .await + .unwrap(); + assert_eq!(vec![cfd_1.clone()], cfd_from_db); + + let cfd_2 = Cfd::default() + .with_order(Order::default().with_oracle_event_id(oracle_event_id_1.clone())); + + insert_order(&cfd_2.order, &mut conn).await.unwrap(); + insert_cfd(cfd_2.clone(), &mut conn).await.unwrap(); + + let cfd_from_db = load_cfds_by_oracle_event_id(oracle_event_id_1, &mut conn) + .await + .unwrap(); + assert_eq!(vec![cfd_1, cfd_2], cfd_from_db); + + let cfd_3 = Cfd::default() + .with_order(Order::default().with_oracle_event_id(oracle_event_id_2.clone())); + + insert_order(&cfd_3.order, &mut conn).await.unwrap(); + insert_cfd(cfd_3.clone(), &mut conn).await.unwrap(); + + let cfd_from_db = load_cfds_by_oracle_event_id(oracle_event_id_2, &mut conn) + .await + .unwrap(); + assert_eq!(vec![cfd_3], cfd_from_db); + } + #[tokio::test] async fn test_insert_new_cfd_state() { let pool = setup_test_db().await; let mut conn = pool.acquire().await.unwrap(); - let order = Order::default(); - let mut cfd = Cfd::new( - order.clone(), - Usd(dec!(1000)), - CfdState::OutgoingOrderRequest { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - }, - ); + let mut cfd = Cfd::default(); - insert_order(&order, &mut conn).await.unwrap(); + insert_order(&cfd.order, &mut conn).await.unwrap(); insert_cfd(cfd.clone(), &mut conn).await.unwrap(); cfd.state = CfdState::Accepted { @@ -582,6 +659,27 @@ mod tests { pool } + impl Default for Cfd { + fn default() -> Self { + Cfd::new( + Order::default(), + Usd(dec!(1000)), + CfdState::OutgoingOrderRequest { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + }, + ) + } + } + + impl Cfd { + pub fn with_order(mut self, order: Order) -> Self { + self.order = order; + self + } + } + impl Default for Order { fn default() -> Self { Order::new( @@ -594,4 +692,11 @@ mod tests { .unwrap() } } + + impl Order { + pub fn with_oracle_event_id(mut self, oracle_event_id: OracleEventId) -> Self { + self.oracle_event_id = oracle_event_id; + self + } + } } diff --git a/daemon/src/housekeeping.rs b/daemon/src/housekeeping.rs index ca72834..2a4a51c 100644 --- a/daemon/src/housekeeping.rs +++ b/daemon/src/housekeeping.rs @@ -54,5 +54,13 @@ pub async fn rebroadcast_transactions( tracing::info!("Commit transaction published on chain: {}", txid); } + for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_cet(cfd)) { + // Double question-mark OK because if we are in PendingCet we must have been Ready before + let signed_cet = cfd.cet()??; + let txid = wallet.try_broadcast_transaction(signed_cet).await?; + + tracing::info!("CET published on chain: {}", txid); + } + Ok(()) } diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 2c4ab3c..00bd933 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -1,7 +1,7 @@ use crate::actors::log_error; use crate::db::{ insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds, - load_cfd_by_order_id, load_order_by_id, + load_cfd_by_order_id, load_cfds_by_oracle_event_id, load_order_by_id, }; use crate::maker_inc_connections::TakerCommand; use crate::model::cfd::{ @@ -266,6 +266,7 @@ impl Actor { transition_timestamp: SystemTime::now(), }, dlc: dlc.clone(), + attestation: None, }, &mut conn, ) @@ -411,8 +412,6 @@ impl Actor { }) .await?; - let nonce_pks = offer_announcement.nonce_pks.clone(); - let contract_future = setup_contract::new( self.takers.clone().into_sink().with(move |msg| { future::ok(maker_inc_connections::TakerMessage { @@ -421,7 +420,7 @@ impl Actor { }) }), receiver, - (self.oracle_pk, nonce_pks), + (self.oracle_pk, offer_announcement.clone().into()), cfd, self.wallet.clone(), Role::Maker, @@ -572,10 +571,10 @@ impl Actor { self.cfd_feed_actor_inbox .send(load_all_cfds(&mut conn).await?)?; - // TODO: Not sure that should be done here... - // Consider bubbling the refund availability up to the user, and let user trigger - // transaction publication - if let CfdState::MustRefund { .. } = new_state { + // TODO: code duplication maker/taker + if let CfdState::OpenCommitted { .. } = new_state { + self.try_cet_publication(cfd).await?; + } else if let CfdState::MustRefund { .. } = new_state { let signed_refund_tx = cfd.refund_tx()?; let txid = self .wallet @@ -606,12 +605,44 @@ impl Actor { async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { tracing::debug!( "Learnt latest oracle attestation for event: {}", - attestation.id.0 + attestation.id ); - todo!( - "Update all CFDs which care about this particular attestation, based on the event ID" - ); + let mut conn = self.db.acquire().await?; + let cfds = load_cfds_by_oracle_event_id(attestation.id.clone(), &mut conn).await?; + + for mut cfd in cfds { + cfd.handle(CfdStateChangeEvent::OracleAttestation(attestation.clone()))?; + insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn).await?; + + self.try_cet_publication(cfd).await?; + } + + Ok(()) + } + + // TODO: code duplication maker/taker + async fn try_cet_publication(&mut self, mut cfd: Cfd) -> Result<()> { + let mut conn = self.db.acquire().await?; + + match cfd.cet()? { + Ok(cet) => { + let txid = self.wallet.try_broadcast_transaction(cet).await?; + tracing::info!("CET published with txid {}", txid); + + cfd.handle(CfdStateChangeEvent::CetSent)?; + insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state, &mut conn).await?; + } + Err(not_ready_yet) => { + tracing::debug!( + "Attestation received but we are not ready to publish it yet: {:#}", + not_ready_yet + ); + return Ok(()); + } + }; + + Ok(()) } } diff --git a/daemon/src/model.rs b/daemon/src/model.rs index 0024a15..1aed42c 100644 --- a/daemon/src/model.rs +++ b/daemon/src/model.rs @@ -6,6 +6,7 @@ use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; use bdk::bitcoin::{Address, Amount}; +use std::fmt; use std::time::SystemTime; use uuid::Uuid; @@ -108,3 +109,9 @@ pub struct WalletInfo { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct OracleEventId(pub String); + +impl Display for OracleEventId { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} diff --git a/daemon/src/model/cfd.rs b/daemon/src/model/cfd.rs index 3818853..cd8dca6 100644 --- a/daemon/src/model/cfd.rs +++ b/daemon/src/model/cfd.rs @@ -1,5 +1,6 @@ use crate::model::{Leverage, OracleEventId, Percent, Position, TakerId, TradingPair, Usd}; use crate::monitor; +use crate::oracle::Attestation; use anyhow::{bail, Context, Result}; use bdk::bitcoin::secp256k1::{SecretKey, Signature}; use bdk::bitcoin::{Address, Amount, PublicKey, SignedAmount, Transaction}; @@ -173,9 +174,7 @@ pub enum CfdState { /// The taker sent an order to the maker to open the CFD but doesn't have a response yet. /// /// This state applies to taker only. - OutgoingOrderRequest { - common: CfdStateCommon, - }, + OutgoingOrderRequest { common: CfdStateCommon }, /// The maker received an order from the taker to open the CFD but doesn't have a response yet. /// @@ -188,29 +187,24 @@ pub enum CfdState { /// The maker has accepted the CFD take request, but the contract is not set up on chain yet. /// /// This state applies to taker and maker. - Accepted { - common: CfdStateCommon, - }, + Accepted { common: CfdStateCommon }, /// The maker rejected the CFD order. /// /// This state applies to taker and maker. /// This is a final state. - Rejected { - common: CfdStateCommon, - }, + Rejected { common: CfdStateCommon }, /// State used during contract setup. /// /// This state applies to taker and maker. /// All contract setup messages between taker and maker are expected to be sent in on scope. - ContractSetup { - common: CfdStateCommon, - }, + ContractSetup { common: CfdStateCommon }, PendingOpen { common: CfdStateCommon, dlc: Dlc, + attestation: Option, }, /// The CFD contract is set up on chain. @@ -219,6 +213,7 @@ pub enum CfdState { Open { common: CfdStateCommon, dlc: Dlc, + attestation: Option, }, /// The commit transaction was published but it not final yet @@ -228,6 +223,7 @@ pub enum CfdState { PendingCommit { common: CfdStateCommon, dlc: Dlc, + attestation: Option, }, // TODO: At the moment we are appending to this state. The way this is handled internally is @@ -243,19 +239,34 @@ pub enum CfdState { cet_status: CetStatus, }, - /// The CFD contract's refund transaction was published but it not final yet - MustRefund { + /// The CET was published on chain but is not final yet + /// + /// This state applies to taker and maker. + /// This state is needed, because otherwise the user does not get any feedback. + PendingCet { common: CfdStateCommon, dlc: Dlc, + cet_status: CetStatus, }, + /// The position was closed collaboratively or non-collaboratively + /// + /// This state applies to taker and maker. + /// This is a final state. + /// This is the final state for all happy-path scenarios where we had an open position and then + /// "settled" it. Settlement can be collaboratively or non-collaboratively (by publishing + /// commit + cet). + Closed { common: CfdStateCommon }, + + // TODO: Can be extended with CetStatus + /// The CFD contract's refund transaction was published but it not final yet + MustRefund { common: CfdStateCommon, dlc: Dlc }, + /// The Cfd was refunded and the refund transaction reached finality /// /// This state applies to taker and maker. /// This is a final state. - Refunded { - common: CfdStateCommon, - }, + Refunded { common: CfdStateCommon }, /// The Cfd was in a state that could not be continued after the application got interrupted /// @@ -273,8 +284,8 @@ pub enum CfdState { pub enum CetStatus { Unprepared, TimelockExpired, - OracleSigned(u64), - Ready(u64), + OracleSigned(Attestation), + Ready(Attestation), } impl CfdState { @@ -292,6 +303,8 @@ impl CfdState { CfdState::Refunded { common, .. } => common, CfdState::SetupFailed { common, .. } => common, CfdState::PendingCommit { common, .. } => common, + CfdState::PendingCet { common, .. } => common, + CfdState::Closed { common, .. } => common, }; *common @@ -341,6 +354,12 @@ impl fmt::Display for CfdState { CfdState::SetupFailed { .. } => { write!(f, "Setup Failed") } + CfdState::PendingCet { .. } => { + write!(f, "Pending CET") + } + CfdState::Closed { .. } => { + write!(f, "Closed") + } } } } @@ -506,6 +525,7 @@ impl Cfd { transition_timestamp: SystemTime::now(), }, dlc, + attestation: None, } } else { bail!( @@ -549,23 +569,34 @@ impl Cfd { }, CfdState::OpenCommitted { dlc, - cet_status: CetStatus::OracleSigned(price), + cet_status: CetStatus::OracleSigned(attestation), .. } => CfdState::OpenCommitted { common: CfdStateCommon { transition_timestamp: SystemTime::now(), }, dlc, - cet_status: CetStatus::Ready(price), + cet_status: CetStatus::Ready(attestation), }, - PendingOpen { dlc, .. } | Open { dlc, .. } => { - tracing::debug!(%order_id, "Was in unexpected state {}, jumping ahead to MustRefund", self.state); + PendingOpen { + dlc, attestation, .. + } + | Open { + dlc, attestation, .. + } + | PendingCommit { + dlc, attestation, .. + } => { + tracing::debug!(%order_id, "Was in unexpected state {}, jumping ahead to OpenCommitted", self.state); CfdState::OpenCommitted { common: CfdStateCommon { transition_timestamp: SystemTime::now(), }, dlc, - cet_status: CetStatus::TimelockExpired, + cet_status: match attestation { + None => CetStatus::TimelockExpired, + Some(attestation) => CetStatus::Ready(attestation), + }, } } _ => bail!( @@ -608,13 +639,21 @@ impl Cfd { }, } } - monitor::Event::CetFinality(_) => { - todo!("Implement state transition") - } + monitor::Event::CetFinality(_) => Closed { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + }, }, CfdStateChangeEvent::CommitTxSent => { - let dlc = if let Open { dlc, .. } | PendingOpen { dlc, .. } = self.state.clone() { - dlc + let (dlc, attestation) = if let PendingOpen { + dlc, attestation, .. + } + | Open { + dlc, attestation, .. + } = self.state.clone() + { + (dlc, attestation) } else { bail!( "Cannot transition to PendingCommit because of unexpected state {}", @@ -627,8 +666,66 @@ impl Cfd { transition_timestamp: SystemTime::now(), }, dlc, + attestation, } } + CfdStateChangeEvent::OracleAttestation(attestation) => match self.state.clone() { + CfdState::Open { dlc, .. } => CfdState::Open { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + attestation: Some(attestation), + }, + CfdState::PendingCommit { dlc, .. } => CfdState::PendingCommit { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + attestation: Some(attestation), + }, + CfdState::OpenCommitted { + dlc, + cet_status: CetStatus::Unprepared, + .. + } => CfdState::OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::OracleSigned(attestation), + }, + CfdState::OpenCommitted { + dlc, + cet_status: CetStatus::TimelockExpired, + .. + } => CfdState::OpenCommitted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc, + cet_status: CetStatus::Ready(attestation), + }, + _ => bail!( + "Cannot transition to OpenCommitted because of unexpected state {}", + self.state + ), + }, + CfdStateChangeEvent::CetSent => match self.state.clone() { + CfdState::OpenCommitted { + common, + dlc, + cet_status, + } => CfdState::PendingCet { + common, + dlc, + cet_status, + }, + _ => bail!( + "Cannot transition to PendingCet because of unexpected state {}", + self.state + ), + }, }; self.state = new_state.clone(); @@ -689,7 +786,6 @@ impl Cfd { &dlc.identity, )); - // TODO: verify that the dlc's `publish` sk corresponds to the decryption_sk we need here let counterparty_sig = dlc.commit.1.decrypt(&dlc.publish)?; let counterparty_pubkey = dlc.identity_counterparty; @@ -703,6 +799,72 @@ impl Cfd { Ok(signed_commit_tx) } + pub fn cet(&self) -> Result> { + let (dlc, attestation) = match self.state.clone() { + CfdState::OpenCommitted { + dlc, + cet_status: CetStatus::Ready(attestation), + .. + } + | CfdState::PendingCet { + dlc, + cet_status: CetStatus::Ready(attestation), + .. + } => (dlc, attestation), + CfdState::OpenCommitted { .. } + | CfdState::Open { .. } + | CfdState::PendingCommit { .. } => { + return Ok(Err(NotReadyYet)); + } + _ => bail!("Cannot publish CET in state {}", self.state.clone()), + }; + + let cets = dlc + .cets + .get(&attestation.id) + .context("Unable to find oracle event id within the cets of the dlc")?; + + let Cet { + tx: cet, + adaptor_sig: encsig, + n_bits, + .. + } = cets + .iter() + .find(|Cet { range, .. }| range.contains(&attestation.price)) + .context("Price out of range of cets")?; + + let oracle_attestations = attestation.scalars; + + let mut decryption_sk = oracle_attestations[0]; + for oracle_attestation in oracle_attestations[1..*n_bits].iter() { + decryption_sk.add_assign(oracle_attestation.as_ref())?; + } + + let sig_hash = spending_tx_sighash( + cet, + &dlc.commit.2, + Amount::from_sat(dlc.commit.0.output[0].value), + ); + let our_sig = SECP256K1.sign(&sig_hash, &dlc.identity); + let our_pubkey = PublicKey::new(bdk::bitcoin::secp256k1::PublicKey::from_secret_key( + SECP256K1, + &dlc.identity, + )); + + let counterparty_sig = encsig.decrypt(&decryption_sk)?; + let counterparty_pubkey = dlc.identity_counterparty; + + let signed_cet = finalize_spend_transaction( + cet.clone(), + &dlc.commit.2, + (our_pubkey, our_sig), + (counterparty_pubkey, counterparty_sig), + )?; + + Ok(Ok(signed_cet)) + } + pub fn pending_open_dlc(&self) -> Option { if let CfdState::PendingOpen { dlc, .. } = self.state.clone() { Some(dlc) @@ -719,6 +881,10 @@ impl Cfd { matches!(self.state.clone(), CfdState::PendingCommit { .. }) } + pub fn is_pending_cet(&self) -> bool { + matches!(self.state.clone(), CfdState::PendingCet { .. }) + } + pub fn is_cleanup(&self) -> bool { matches!( self.state.clone(), @@ -734,12 +900,18 @@ impl Cfd { } } +#[derive(thiserror::Error, Debug, Clone, Copy)] +#[error("The cfd is not committed yet")] +pub struct NotReadyYet; + #[derive(Debug, Clone)] pub enum CfdStateChangeEvent { // TODO: group other events by actors into enums and add them here so we can bundle all // transitions into cfd.transition_to(...) Monitor(monitor::Event), CommitTxSent, + OracleAttestation(Attestation), + CetSent, } /// Returns the Profit/Loss (P/L) as Bitcoin. Losses are capped by the provided margin @@ -1078,6 +1250,17 @@ mod tests { } } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct Cet { + pub tx: Transaction, + pub adaptor_sig: EcdsaAdaptorSignature, + + // TODO: Range + number of digits (usize) could be represented as Digits similar to what we do + // in the protocol lib + pub range: RangeInclusive, + pub n_bits: usize, +} + /// Contains all data we've assembled about the CFD through the setup protocol. /// /// All contained signatures are the signatures of THE OTHER PARTY. @@ -1093,6 +1276,6 @@ pub struct Dlc { /// The fully signed lock transaction ready to be published on chain pub lock: (Transaction, Descriptor), pub commit: (Transaction, EcdsaAdaptorSignature, Descriptor), - pub cets: HashMap)>>, + pub cets: HashMap>, pub refund: (Transaction, Signature), } diff --git a/daemon/src/monitor.rs b/daemon/src/monitor.rs index 2d907c9..40cf801 100644 --- a/daemon/src/monitor.rs +++ b/daemon/src/monitor.rs @@ -1,6 +1,8 @@ use crate::actors::log_error; use crate::model::cfd::{CetStatus, Cfd, CfdState, Dlc, OrderId}; -use crate::oracle; +use crate::model::OracleEventId; +use crate::oracle::Attestation; +use crate::{model, oracle}; use anyhow::{Context, Result}; use async_trait::async_trait; use bdk::bitcoin::{PublicKey, Script, Txid}; @@ -21,11 +23,19 @@ pub struct StartMonitoring { pub params: MonitorParams, } +#[derive(Clone)] +pub struct Cet { + txid: Txid, + script: Script, + range: RangeInclusive, + n_bits: usize, +} + #[derive(Clone)] pub struct MonitorParams { lock: (Txid, Descriptor), commit: (Txid, Descriptor), - cets: HashMap)>>, + cets: HashMap>, refund: (Txid, Script, u32), } @@ -88,13 +98,19 @@ where actor.monitor_commit_refund_timelock(¶ms, cfd.order.id); actor.monitor_refund_finality(¶ms,cfd.order.id); } - CfdState::OpenCommitted { dlc, cet_status, .. } => { + CfdState::OpenCommitted { dlc, cet_status, .. } + | CfdState::PendingCet { dlc, cet_status, .. } => { let params = MonitorParams::from_dlc_and_timelocks(dlc.clone(), cfd.refund_timelock_in_blocks()); actor.cfds.insert(cfd.order.id, params.clone()); match cet_status { - CetStatus::Unprepared - | CetStatus::OracleSigned(_) => { + CetStatus::Unprepared => { + actor.monitor_commit_cet_timelock(¶ms, cfd.order.id); + actor.monitor_commit_refund_timelock(¶ms, cfd.order.id); + actor.monitor_refund_finality(¶ms,cfd.order.id); + } + CetStatus::OracleSigned(attestation) => { + actor.monitor_cet_finality(map_cets(dlc.cets, dlc.address.script_pubkey()), attestation, cfd.order.id)?; actor.monitor_commit_cet_timelock(¶ms, cfd.order.id); actor.monitor_commit_refund_timelock(¶ms, cfd.order.id); actor.monitor_refund_finality(¶ms,cfd.order.id); @@ -103,17 +119,14 @@ where actor.monitor_commit_refund_timelock(¶ms, cfd.order.id); actor.monitor_refund_finality(¶ms,cfd.order.id); } - CetStatus::Ready(_price) => { - // TODO: monitor CET finality - + CetStatus::Ready(attestation) => { + actor.monitor_cet_finality(map_cets(dlc.cets, dlc.address.script_pubkey()), attestation, cfd.order.id)?; actor.monitor_commit_refund_timelock(¶ms, cfd.order.id); actor.monitor_refund_finality(¶ms,cfd.order.id); } } } CfdState::MustRefund { dlc, .. } => { - // TODO: CET monitoring (?) - note: would require to add CetStatus information to MustRefund - let params = MonitorParams::from_dlc_and_timelocks(dlc.clone(), cfd.refund_timelock_in_blocks()); actor.cfds.insert(cfd.order.id, params.clone()); @@ -128,6 +141,7 @@ where | CfdState::ContractSetup { .. } // final states + | CfdState::Closed { .. } | CfdState::Rejected { .. } | CfdState::Refunded { .. } | CfdState::SetupFailed { .. } => () @@ -192,6 +206,36 @@ where .push((ScriptStatus::finality(), Event::RefundFinality(order_id))); } + fn monitor_cet_finality( + &mut self, + cets: HashMap>, + attestation: Attestation, + order_id: OrderId, + ) -> Result<()> { + let cets = cets + .get(&attestation.id) + .context("No CET for oracle event found")?; + + let (txid, script_pubkey) = cets + .iter() + .find_map( + |Cet { + txid, + script, + range, + .. + }| { range.contains(&attestation.price).then(|| (txid, script)) }, + ) + .context("No price range match for oracle attestation")?; + + self.awaiting_status + .entry((*txid, script_pubkey.clone())) + .or_default() + .push((ScriptStatus::finality(), Event::CetFinality(order_id))); + + Ok(()) + } + async fn sync(&mut self) -> Result<()> { // Fetch the latest block for storing the height. // We do not act on this subscription after this call, as we cannot rely on @@ -221,23 +265,7 @@ where async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { for (order_id, MonitorParams { cets, .. }) in self.cfds.clone().into_iter() { - let cets = cets - .get(&attestation.id.0) - .context("No CET for oracle event found")?; - let (txid, script_pubkey) = - match cets.iter().find_map(|(txid, script_pubkey, range)| { - range - .contains(&attestation.price) - .then(|| (txid, script_pubkey)) - }) { - Some(cet) => cet, - None => continue, - }; - - self.awaiting_status - .entry((*txid, script_pubkey.clone())) - .or_default() - .push((ScriptStatus::finality(), Event::CetFinality(order_id))); + self.monitor_cet_finality(cets, attestation.clone(), order_id)?; } Ok(()) @@ -469,18 +497,7 @@ impl MonitorParams { MonitorParams { lock: (dlc.lock.0.txid(), dlc.lock.1), commit: (dlc.commit.0.txid(), dlc.commit.2), - cets: dlc - .cets - .iter() - .map(|(event_id, cets)| { - ( - event_id.clone(), - cets.iter() - .map(|(tx, _, range)| (tx.txid(), script_pubkey.clone(), range.clone())) - .collect::>(), - ) - }) - .collect::>(), + cets: map_cets(dlc.cets, script_pubkey.clone()), refund: ( dlc.refund.0.txid(), script_pubkey, @@ -490,6 +507,31 @@ impl MonitorParams { } } +fn map_cets( + cets: HashMap>, + script_pubkey: Script, +) -> HashMap> { + cets.iter() + .map(|(event_id, cets)| { + ( + event_id.clone(), + cets.iter() + .map( + |model::cfd::Cet { + tx, range, n_bits, .. + }| Cet { + txid: tx.txid(), + script: script_pubkey.clone(), + range: range.clone(), + n_bits: *n_bits, + }, + ) + .collect::>(), + ) + }) + .collect() +} + impl xtra::Message for Event { type Result = (); } diff --git a/daemon/src/oracle.rs b/daemon/src/oracle.rs index 8a47ee1..fbf05d4 100644 --- a/daemon/src/oracle.rs +++ b/daemon/src/oracle.rs @@ -8,6 +8,7 @@ use futures::TryStreamExt; use rocket::time::format_description::FormatItem; use rocket::time::macros::format_description; use rocket::time::{Duration, OffsetDateTime, Time}; +use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::convert::TryFrom; @@ -208,12 +209,21 @@ pub struct Announcement { pub nonce_pks: Vec, } +impl From for cfd_protocol::Announcement { + fn from(announcement: Announcement) -> Self { + cfd_protocol::Announcement { + id: announcement.id.0, + nonce_pks: announcement.nonce_pks, + } + } +} + #[derive(Debug, Clone)] pub struct Announcements(pub [Announcement; 24]); // TODO: Implement real deserialization once price attestation is // implemented in `olivia` -#[derive(Debug, Clone, serde::Deserialize, PartialEq)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(try_from = "olivia_api::Response")] pub struct Attestation { pub id: OracleEventId, @@ -426,7 +436,7 @@ mod olivia_api { let deserialized = serde_json::from_str::(json).unwrap(); let expected = oracle::Attestation { - id: "/x/BitMEX/BXBT/2021-10-04T22:00:00.price[n:20]".to_string(), + id: OracleEventId("/x/BitMEX/BXBT/2021-10-04T22:00:00.price[n:20]".to_string()), price: 48935, scalars: vec![ "1327b3bd0f1faf45d6fed6c96d0c158da22a2033a6fed98bed036df0a4eef484" diff --git a/daemon/src/setup_contract.rs b/daemon/src/setup_contract.rs index 299bfe6..3a835ed 100644 --- a/daemon/src/setup_contract.rs +++ b/daemon/src/setup_contract.rs @@ -1,4 +1,5 @@ -use crate::model::cfd::{Cfd, Dlc, Role}; +use crate::model::cfd::{Cet, Cfd, Dlc, Role}; +use crate::model::OracleEventId; use crate::wallet::Wallet; use crate::wire::{Msg0, Msg1, Msg2, SetupMsg}; use crate::{model, payout_curve}; @@ -26,7 +27,7 @@ use std::ops::RangeInclusive; pub async fn new( mut sink: impl Sink + Unpin, mut stream: impl FusedStream + Unpin, - (oracle_pk, nonce_pks): (schnorrsig::PublicKey, Vec), + (oracle_pk, announcement): (schnorrsig::PublicKey, Announcement), cfd: Cfd, wallet: Wallet, role: Role, @@ -68,10 +69,7 @@ pub async fn new( } let payouts = HashMap::from_iter([( - Announcement { - id: "dummy_id_to_be_replaced".to_string(), - nonce_pks: nonce_pks.clone(), - }, + announcement.clone(), payout_curve::calculate( cfd.order.price, cfd.quantity_usd, @@ -142,7 +140,7 @@ pub async fn new( .context("Expect event to exist in msg")?; verify_cets( - (&oracle_pk, &nonce_pks), + (&oracle_pk, &announcement.nonce_pks), ¶ms.other, own_grouped_cets.cets.as_slice(), other_cets.as_slice(), @@ -207,10 +205,15 @@ pub async fn new( digits.range() ) })?; - Ok((tx, *other_encsig, digits.range())) + Ok(Cet { + tx, + adaptor_sig: *other_encsig, + range: digits.range(), + n_bits: digits.len(), + }) }) .collect::>>()?; - Ok((event_id, cets)) + Ok((OracleEventId(event_id), cets)) }) .collect::>>()?; diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 1b92081..82f3827 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -1,7 +1,7 @@ use crate::actors::log_error; use crate::db::{ insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds, - load_cfd_by_order_id, load_order_by_id, + load_cfd_by_order_id, load_cfds_by_oracle_event_id, load_order_by_id, }; use crate::model::cfd::{ Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId, Origin, Role, @@ -262,15 +262,13 @@ impl Actor { }) .await?; - let nonce_pks = offer_announcement.nonce_pks.clone(); - let contract_future = setup_contract::new( self.send_to_maker .clone() .into_sink() .with(|msg| future::ok(wire::TakerToMaker::Protocol(msg))), receiver, - (self.oracle_pk, nonce_pks), + (self.oracle_pk, offer_announcement.clone().into()), cfd, self.wallet.clone(), Role::Taker, @@ -345,6 +343,7 @@ impl Actor { transition_timestamp: SystemTime::now(), }, dlc: dlc.clone(), + attestation: None, }, &mut conn, ) @@ -387,10 +386,10 @@ impl Actor { self.cfd_feed_actor_inbox .send(load_all_cfds(&mut conn).await?)?; - // TODO: Not sure that should be done here... - // Consider bubbling the refund availability up to the user, and let user trigger - // transaction publication - if let CfdState::MustRefund { .. } = new_state { + // TODO: code duplicateion maker/taker + if let CfdState::OpenCommitted { .. } = new_state { + self.try_cet_publication(cfd).await?; + } else if let CfdState::MustRefund { .. } = new_state { let signed_refund_tx = cfd.refund_tx()?; let txid = self .wallet @@ -403,7 +402,7 @@ impl Actor { Ok(()) } - // TODO: Duplicated with maker + // TODO: code duplicateion maker/taker async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { let mut conn = self.db.acquire().await?; let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; @@ -444,12 +443,44 @@ impl Actor { async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { tracing::debug!( "Learnt latest oracle attestation for event: {}", - attestation.id.0 + attestation.id ); - todo!( - "Update all CFDs which care about this particular attestation, based on the event ID" - ); + let mut conn = self.db.acquire().await?; + let cfds = load_cfds_by_oracle_event_id(attestation.id.clone(), &mut conn).await?; + + for mut cfd in cfds { + cfd.handle(CfdStateChangeEvent::OracleAttestation(attestation.clone()))?; + insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state.clone(), &mut conn).await?; + + self.try_cet_publication(cfd).await?; + } + + Ok(()) + } + + // TODO: code duplication maker/taker + async fn try_cet_publication(&mut self, mut cfd: Cfd) -> Result<()> { + let mut conn = self.db.acquire().await?; + + match cfd.cet()? { + Ok(cet) => { + let txid = self.wallet.try_broadcast_transaction(cet).await?; + tracing::info!("CET published with txid {}", txid); + + cfd.handle(CfdStateChangeEvent::CetSent)?; + insert_new_cfd_state_by_order_id(cfd.order.id, cfd.state, &mut conn).await?; + } + Err(not_ready_yet) => { + tracing::debug!( + "Attestation received but we are not ready to publish it yet: {:#}", + not_ready_yet + ); + return Ok(()); + } + }; + + Ok(()) } } diff --git a/daemon/src/to_sse_event.rs b/daemon/src/to_sse_event.rs index 38974db..621f66f 100644 --- a/daemon/src/to_sse_event.rs +++ b/daemon/src/to_sse_event.rs @@ -66,11 +66,13 @@ pub enum CfdState { PendingOpen, Open, PendingCommit, + PendingCet, OpenCommitted, IncomingSettlementProposal, OutgoingSettlementProposal, IncomingRollOverProposal, OutgoingRollOverProposal, + Closed, MustRefund, Refunded, SetupFailed, @@ -251,6 +253,8 @@ fn to_cfd_state( model::cfd::CfdState::Refunded { .. } => CfdState::Refunded, model::cfd::CfdState::SetupFailed { .. } => CfdState::SetupFailed, model::cfd::CfdState::PendingCommit { .. } => CfdState::PendingCommit, + model::cfd::CfdState::PendingCet { .. } => CfdState::PendingCet, + model::cfd::CfdState::Closed { .. } => CfdState::Closed, }, Some(UpdateCfdProposal::RollOverProposal { direction: SettlementKind::Outgoing, diff --git a/frontend/src/components/Types.tsx b/frontend/src/components/Types.tsx index 0161752..acc3462 100644 --- a/frontend/src/components/Types.tsx +++ b/frontend/src/components/Types.tsx @@ -87,6 +87,10 @@ export class State { return "Refunded"; case StateKey.SETUP_FAILED: return "Setup Failed"; + case StateKey.PENDING_CET: + return "Pending CET"; + case StateKey.CLOSED: + return "Closed"; } } @@ -107,6 +111,7 @@ export class State { case StateKey.PENDING_COMMIT: case StateKey.OPEN_COMMITTED: case StateKey.MUST_REFUND: + case StateKey.PENDING_CET: return orange; case StateKey.OUTGOING_ORDER_REQUEST: @@ -119,6 +124,7 @@ export class State { case StateKey.PENDING_OPEN: case StateKey.REFUNDED: case StateKey.SETUP_FAILED: + case StateKey.CLOSED: return default_color; } } @@ -140,6 +146,7 @@ export class State { case StateKey.MUST_REFUND: case StateKey.OUTGOING_SETTLEMENT_PROPOSAL: case StateKey.OUTGOING_ROLL_OVER_PROPOSAL: + case StateKey.PENDING_CET: return StateGroupKey.OPEN; case StateKey.INCOMING_SETTLEMENT_PROPOSAL: @@ -151,6 +158,7 @@ export class State { case StateKey.REJECTED: case StateKey.REFUNDED: case StateKey.SETUP_FAILED: + case StateKey.CLOSED: return StateGroupKey.CLOSED; } } @@ -176,6 +184,7 @@ const enum StateKey { PENDING_OPEN = "PendingOpen", OPEN = "Open", PENDING_COMMIT = "PendingCommit", + PENDING_CET = "PendingCet", OPEN_COMMITTED = "OpenCommitted", OUTGOING_SETTLEMENT_PROPOSAL = "OutgoingSettlementProposal", INCOMING_SETTLEMENT_PROPOSAL = "IncomingSettlementProposal", @@ -184,6 +193,7 @@ const enum StateKey { MUST_REFUND = "MustRefund", REFUNDED = "Refunded", SETUP_FAILED = "SetupFailed", + CLOSED = "Closed", } export enum StateGroupKey {