diff --git a/daemon/migrations/20211220000000_introduce-event-sourcing.sql b/daemon/migrations/20211220000000_introduce-event-sourcing.sql new file mode 100644 index 0000000..1bd75fb --- /dev/null +++ b/daemon/migrations/20211220000000_introduce-event-sourcing.sql @@ -0,0 +1,29 @@ +drop table cfd_states; +drop table cfds; + +create table if not exists cfds +( + id integer primary key autoincrement, + uuid text unique not null, + position text not null, + initial_price text not null, + leverage integer not null, + settlement_time_interval_hours integer not null, + quantity_usd text not null, + counterparty_network_identity text not null, + role text not null +); + +create unique index if not exists cfds_uuid + on cfds (uuid); + +create table if not exists events +( + id integer primary key autoincrement, + cfd_id integer not null, + name text not null, + data text not null, + created_at text not null, + + foreign key (cfd_id) references cfds (id) +) diff --git a/daemon/prepare_db.sh b/daemon/prepare_db.sh index acc4355..29a9f4f 100755 --- a/daemon/prepare_db.sh +++ b/daemon/prepare_db.sh @@ -15,4 +15,4 @@ trap 'rm -f $TEMPDB' EXIT DATABASE_URL=sqlite:$TEMPDB cargo sqlx migrate run # prepare the sqlx-data.json rust mappings -DATABASE_URL=sqlite:./$DAEMON_DIR/$TEMPDB cargo sqlx prepare -- --bin taker +DATABASE_URL=sqlite:./$DAEMON_DIR/$TEMPDB SQLX_OFFLINE=true cargo sqlx prepare -- --bin taker diff --git a/daemon/sqlx-data.json b/daemon/sqlx-data.json index 17854d3..2b20a21 100644 --- a/daemon/sqlx-data.json +++ b/daemon/sqlx-data.json @@ -1,34 +1,34 @@ { "db": "SQLite", - "221a6283db798bacaba99e7e85130f9a8bbea1299d8cb99d272b1d478dc19775": { - "query": "\n select\n state\n from cfd_states\n where cfd_id = $1\n order by id desc\n limit 1;\n ", + "7f977cdcbd7287d249b0a467e48f6788d196b267e3df3970d614848b8c899a61": { + "query": "\n select\n uuid as \"uuid: crate::model::cfd::OrderId\"\n from\n cfds\n ", "describe": { "columns": [ { - "name": "state", + "name": "uuid: crate::model::cfd::OrderId", "ordinal": 0, "type_info": "Text" } ], "parameters": { - "Right": 1 + "Right": 0 }, "nullable": [ false ] } }, - "8708389be41d08359966b16ea018a0fd39acbf61981c1933b46d3b50bb430311": { - "query": "\n with state as (\n select\n cfd_id,\n state\n from cfd_states\n inner join cfds on cfds.id = cfd_states.cfd_id\n where cfd_states.id in (\n select\n max(id) as id\n from cfd_states\n group by (cfd_id)\n )\n )\n\n select\n cfds.uuid as \"uuid: crate::model::cfd::OrderId\",\n cfds.trading_pair as \"trading_pair: crate::model::TradingPair\",\n cfds.position as \"position: crate::model::Position\",\n cfds.initial_price as \"initial_price: crate::model::Price\",\n cfds.leverage as \"leverage: crate::model::Leverage\",\n cfds.liquidation_price as \"liquidation_price: crate::model::Price\",\n cfds.creation_timestamp_seconds as \"creation_timestamp_seconds: crate::model::Timestamp\",\n cfds.settlement_time_interval_seconds as \"settlement_time_interval_secs: i64\",\n cfds.role as \"role: crate::model::cfd::Role\",\n cfds.fee_rate as \"fee_rate: u32\",\n cfds.quantity_usd as \"quantity_usd: crate::model::Usd\",\n cfds.counterparty as \"counterparty: crate::model::Identity\",\n state.state\n\n from cfds\n inner join state on state.cfd_id = cfds.id\n ", + "e8a672355cd8c799b6291ccb629837dcd3a3fa9d3954bb78d22ba98e99674341": { + "query": "\n select\n id as cfd_id,\n uuid as \"uuid: crate::model::cfd::OrderId\",\n position as \"position: crate::model::Position\",\n initial_price as \"initial_price: crate::model::Price\",\n leverage as \"leverage: crate::model::Leverage\",\n settlement_time_interval_hours,\n quantity_usd as \"quantity_usd: crate::model::Usd\",\n counterparty_network_identity as \"counterparty_network_identity: crate::model::Identity\",\n role as \"role: crate::model::cfd::Role\"\n from\n cfds\n where\n cfds.uuid = $1\n ", "describe": { "columns": [ { - "name": "uuid: crate::model::cfd::OrderId", + "name": "cfd_id", "ordinal": 0, - "type_info": "Text" + "type_info": "Int64" }, { - "name": "trading_pair: crate::model::TradingPair", + "name": "uuid: crate::model::cfd::OrderId", "ordinal": 1, "type_info": "Text" }, @@ -48,55 +48,31 @@ "type_info": "Int64" }, { - "name": "liquidation_price: crate::model::Price", + "name": "settlement_time_interval_hours", "ordinal": 5, - "type_info": "Text" - }, - { - "name": "creation_timestamp_seconds: crate::model::Timestamp", - "ordinal": 6, - "type_info": "Int64" - }, - { - "name": "settlement_time_interval_secs: i64", - "ordinal": 7, - "type_info": "Int64" - }, - { - "name": "role: crate::model::cfd::Role", - "ordinal": 8, - "type_info": "Text" - }, - { - "name": "fee_rate: u32", - "ordinal": 9, "type_info": "Int64" }, { "name": "quantity_usd: crate::model::Usd", - "ordinal": 10, + "ordinal": 6, "type_info": "Text" }, { - "name": "counterparty: crate::model::Identity", - "ordinal": 11, + "name": "counterparty_network_identity: crate::model::Identity", + "ordinal": 7, "type_info": "Text" }, { - "name": "state", - "ordinal": 12, + "name": "role: crate::model::cfd::Role", + "ordinal": 8, "type_info": "Text" } ], "parameters": { - "Right": 0 + "Right": 1 }, "nullable": [ - false, - false, - false, - false, - false, + true, false, false, false, @@ -108,112 +84,34 @@ ] } }, - "9f31d4002a7328b199a24d50149f2724706e2d391a94b76d7894983f5eb71c4b": { - "query": "\n select\n id\n from cfds\n where cfds.uuid = $1;\n ", + "fdf6b7cee19e20e6c3ba00a821b5d92949a707a23c9fc8ebbc4502ffd7b1a5f1": { + "query": "\n\n select\n name,\n data,\n created_at as \"created_at: crate::model::Timestamp\"\n from\n events\n where\n cfd_id = $1\n ", "describe": { "columns": [ { - "name": "id", - "ordinal": 0, - "type_info": "Int64" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - true - ] - } - }, - "a22bf53971e4e255ca63d16cc7eb37ded66b0c4c375828f13ae5c28aa975441e": { - "query": "\n with state as (\n select\n cfd_id,\n state\n from cfd_states\n inner join cfds on cfds.id = cfd_states.cfd_id\n where cfd_states.id in (\n select\n max(id) as id\n from cfd_states\n group by (cfd_id)\n )\n )\n\n select\n cfds.uuid as \"uuid: crate::model::cfd::OrderId\",\n cfds.trading_pair as \"trading_pair: crate::model::TradingPair\",\n cfds.position as \"position: crate::model::Position\",\n cfds.initial_price as \"initial_price: crate::model::Price\",\n cfds.leverage as \"leverage: crate::model::Leverage\",\n cfds.liquidation_price as \"liquidation_price: crate::model::Price\",\n cfds.creation_timestamp_seconds as \"creation_timestamp_seconds: crate::model::Timestamp\",\n cfds.settlement_time_interval_seconds as \"settlement_time_interval_secs: i64\",\n cfds.role as \"role: crate::model::cfd::Role\",\n cfds.fee_rate as \"fee_rate: u32\",\n cfds.quantity_usd as \"quantity_usd: crate::model::Usd\",\n cfds.counterparty as \"counterparty: crate::model::Identity\",\n state.state\n\n from cfds\n inner join state on state.cfd_id = cfds.id\n\n where cfds.uuid = $1\n ", - "describe": { - "columns": [ - { - "name": "uuid: crate::model::cfd::OrderId", + "name": "name", "ordinal": 0, "type_info": "Text" }, { - "name": "trading_pair: crate::model::TradingPair", + "name": "data", "ordinal": 1, "type_info": "Text" }, { - "name": "position: crate::model::Position", + "name": "created_at: crate::model::Timestamp", "ordinal": 2, "type_info": "Text" - }, - { - "name": "initial_price: crate::model::Price", - "ordinal": 3, - "type_info": "Text" - }, - { - "name": "leverage: crate::model::Leverage", - "ordinal": 4, - "type_info": "Int64" - }, - { - "name": "liquidation_price: crate::model::Price", - "ordinal": 5, - "type_info": "Text" - }, - { - "name": "creation_timestamp_seconds: crate::model::Timestamp", - "ordinal": 6, - "type_info": "Int64" - }, - { - "name": "settlement_time_interval_secs: i64", - "ordinal": 7, - "type_info": "Int64" - }, - { - "name": "role: crate::model::cfd::Role", - "ordinal": 8, - "type_info": "Text" - }, - { - "name": "fee_rate: u32", - "ordinal": 9, - "type_info": "Int64" - }, - { - "name": "quantity_usd: crate::model::Usd", - "ordinal": 10, - "type_info": "Text" - }, - { - "name": "counterparty: crate::model::Identity", - "ordinal": 11, - "type_info": "Text" - }, - { - "name": "state", - "ordinal": 12, - "type_info": "Text" } ], "parameters": { "Right": 1 }, "nullable": [ - false, - false, - false, - false, - false, - false, - false, - false, - false, - false, false, false, false ] } } -} \ No newline at end of file +} diff --git a/daemon/src/auto_rollover.rs b/daemon/src/auto_rollover.rs index 02277ae..59aff5d 100644 --- a/daemon/src/auto_rollover.rs +++ b/daemon/src/auto_rollover.rs @@ -1,15 +1,11 @@ use crate::address_map::AddressMap; use crate::address_map::Stopping; -use crate::cfd_actors::append_cfd_state; +use crate::cfd_actors::load_cfd; use crate::connection; use crate::db; -use crate::db::load_cfd; -use crate::model::cfd::CfdState; -use crate::model::cfd::CfdStateCommon; use crate::model::cfd::OrderId; use crate::model::cfd::RolloverCompleted; use crate::monitor; -use crate::monitor::MonitorParams; use crate::oracle; use crate::projection; use crate::rollover_taker; @@ -27,7 +23,7 @@ pub struct Actor { oracle_pk: schnorrsig::PublicKey, projection_actor: Address, conn_actor: Address, - monitor_actor: Address, + _monitor_actor: Address, oracle_actor: Address, n_payouts: usize, @@ -51,7 +47,7 @@ impl Actor { oracle_pk, projection_actor, conn_actor, - monitor_actor, + _monitor_actor: monitor_actor, oracle_actor, n_payouts, rollover_actors: AddressMap::default(), @@ -70,21 +66,24 @@ where tracing::trace!("Checking all CFDs for rollover eligibility"); let mut conn = self.db.acquire().await?; - let cfds = db::load_all_cfds(&mut conn).await?; + let cfd_ids = db::load_all_cfd_ids(&mut conn).await?; let this = ctx .address() .expect("actor to be able to give address to itself"); - for cfd in cfds { - let disconnected = match self.rollover_actors.get_disconnected(cfd.id()) { + for id in cfd_ids { + let disconnected = match self.rollover_actors.get_disconnected(id) { Ok(disconnected) => disconnected, Err(_) => { - tracing::debug!(order_id=%cfd.id(), "Rollover already in progress"); + tracing::debug!(order_id=%id, "Rollover already in progress"); continue; } }; + // TODO: Shall this have a try_continue? + let cfd = load_cfd(id, &mut conn).await?; + let (addr, fut) = rollover_taker::Actor::new( (cfd, self.n_payouts), self.oracle_pk, @@ -105,7 +104,7 @@ where } } -#[xtra_productivity] +#[xtra_productivity(message_impl = false)] impl Actor where O: 'static, @@ -113,45 +112,8 @@ where M: xtra::Handler, O: xtra::Handler + xtra::Handler, { - async fn handle_rollover_completed(&mut self, msg: RolloverCompleted) -> Result<()> { - let (order_id, dlc) = match msg { - RolloverCompleted::Succeeded { - order_id, - payload: (dlc, _), - } => (order_id, dlc), - RolloverCompleted::Rejected { order_id, reason } => { - tracing::debug!(%order_id, "Not rolled over: {:#}", reason); - return Ok(()); - } - RolloverCompleted::Failed { order_id, error } => { - tracing::warn!(%order_id, "Rollover failed: {:#}", error); - return Ok(()); - } - }; - - let mut conn = self.db.acquire().await?; - let mut cfd = load_cfd(order_id, &mut conn).await?; - *cfd.state_mut() = CfdState::Open { - common: CfdStateCommon::default(), - dlc: dlc.clone(), - attestation: None, - collaborative_close: None, - }; - - append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; - - self.monitor_actor - .send(monitor::StartMonitoring { - id: order_id, - params: MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()), - }) - .await?; - - self.oracle_actor - .send(oracle::MonitorAttestation { - event_id: dlc.settlement_event_id, - }) - .await?; + async fn handle_rollover_completed(&mut self, _: RolloverCompleted) -> Result<()> { + // TODO: Implement this in terms of event sourcing Ok(()) } diff --git a/daemon/src/bitmex_price_feed.rs b/daemon/src/bitmex_price_feed.rs index 27a62a4..9f810bb 100644 --- a/daemon/src/bitmex_price_feed.rs +++ b/daemon/src/bitmex_price_feed.rs @@ -125,7 +125,7 @@ pub enum StopReason { StreamEnded, } -#[derive(Clone, Debug)] +#[derive(Clone, Copy, Debug)] pub struct Quote { pub timestamp: Timestamp, pub bid: Price, diff --git a/daemon/src/cfd_actors.rs b/daemon/src/cfd_actors.rs index 838f226..a52a64c 100644 --- a/daemon/src/cfd_actors.rs +++ b/daemon/src/cfd_actors.rs @@ -1,18 +1,19 @@ use crate::db; -use crate::model::cfd::Attestation; use crate::model::cfd::Cfd; -use crate::model::cfd::CfdState; +use crate::model::cfd::CfdEvent; +use crate::model::cfd::Event; use crate::model::cfd::OrderId; use crate::monitor; use crate::oracle; use crate::projection; +use crate::projection::CfdsChanged; use crate::try_continue; use crate::wallet; -use anyhow::bail; use anyhow::Context; use anyhow::Result; use sqlx::pool::PoolConnection; use sqlx::Sqlite; +use sqlx::SqlitePool; pub async fn insert_cfd_and_update_feed( cfd: &Cfd, @@ -20,54 +21,80 @@ pub async fn insert_cfd_and_update_feed( projection_address: &xtra::Address, ) -> Result<()> { db::insert_cfd(cfd, conn).await?; - projection_address.send(projection::CfdsChanged).await??; + projection_address.send(projection::CfdsChanged).await?; Ok(()) } -pub async fn append_cfd_state( - cfd: &Cfd, - conn: &mut PoolConnection, - projection_address: &xtra::Address, -) -> Result<()> { - db::append_cfd_state(cfd, conn).await?; - projection_address.send(projection::CfdsChanged).await??; - Ok(()) -} - -pub async fn try_cet_publication( - cfd: &mut Cfd, - conn: &mut PoolConnection, +pub async fn handle_monitoring_event( + event: monitor::Event, + db: &SqlitePool, wallet: &xtra::Address, projection_address: &xtra::Address, ) -> Result<()> where W: xtra::Handler, { - match cfd.cet()? { - Ok(cet) => { - let txid = wallet - .send(wallet::TryBroadcastTransaction { tx: cet }) - .await? - .context("Failed to send transaction")?; - tracing::info!("CET published with txid {}", txid); + let mut conn = db.acquire().await?; - if cfd.handle_cet_sent()?.is_none() { - bail!("If we can get the CET we should be able to transition") - } + let order_id = event.order_id(); - append_cfd_state(cfd, conn, projection_address).await?; - } - Err(not_ready_yet) => { - tracing::debug!("{:#}", not_ready_yet); - return Ok(()); + let cfd = load_cfd(order_id, &mut conn).await?; + + let event = match event { + monitor::Event::LockFinality(_) => cfd.handle_lock_confirmed(), + monitor::Event::CommitFinality(_) => cfd.handle_commit_confirmed(), + monitor::Event::CloseFinality(_) => cfd.handle_collaborative_settlement_confirmed(), + monitor::Event::CetTimelockExpired(_) => { + if let Ok(event) = cfd.handle_cet_timelock_expired() { + event + } else { + return Ok(()); // Early return from a no-op + } } + monitor::Event::CetFinality(_) => cfd.handle_cet_confirmed(), + monitor::Event::RefundTimelockExpired(_) => cfd.handle_refund_timelock_expired(), + monitor::Event::RefundFinality(_) => cfd.handle_refund_confirmed(), + monitor::Event::RevokedTransactionFound(_) => cfd.handle_revoke_confirmed(), }; + db::append_event(event.clone(), &mut conn).await?; + post_process_event(event, wallet).await?; + projection_address.send(CfdsChanged).await?; + Ok(()) } -pub async fn handle_monitoring_event( - event: monitor::Event, +/// Load a CFD from the database and rehydrate as the [`model::cfd::Cfd`] aggregate. +pub async fn load_cfd(order_id: OrderId, conn: &mut PoolConnection) -> Result { + let ( + db::Cfd { + id, + position, + initial_price, + leverage, + settlement_interval, + counterparty_network_identity, + role, + quantity_usd, + }, + events, + ) = db::load_cfd(order_id, conn).await?; + let cfd = Cfd::rehydrate( + id, + position, + initial_price, + leverage, + settlement_interval, + quantity_usd, + counterparty_network_identity, + role, + events, + ); + Ok(cfd) +} + +pub async fn handle_commit( + order_id: OrderId, conn: &mut PoolConnection, wallet: &xtra::Address, projection_address: &xtra::Address, @@ -75,110 +102,79 @@ pub async fn handle_monitoring_event( where W: xtra::Handler, { - let order_id = event.order_id(); + let cfd = load_cfd(order_id, conn).await?; - let mut cfd = db::load_cfd(order_id, conn).await?; + let event = cfd.manual_commit_to_blockchain()?; + db::append_event(event.clone(), conn).await?; - if cfd.handle_monitoring_event(event)?.is_none() { - // early exit if there was not state change - // this is for cases where we are already in a final state - return Ok(()); - } - - append_cfd_state(&cfd, conn, projection_address).await?; + post_process_event(event, wallet).await?; - if let CfdState::OpenCommitted { .. } = cfd.state() { - try_cet_publication(&mut cfd, conn, wallet, projection_address).await?; - } else if let CfdState::PendingRefund { .. } = cfd.state() { - let signed_refund_tx = cfd.refund_tx()?; - let txid = wallet - .send(wallet::TryBroadcastTransaction { - tx: signed_refund_tx, - }) - .await? - .context("Failed to publish CET")?; + projection_address.send(CfdsChanged).await?; - tracing::info!("Refund transaction published on chain: {}", txid); - } Ok(()) } -pub async fn handle_commit( - order_id: OrderId, - conn: &mut PoolConnection, +pub async fn handle_oracle_attestation( + attestation: oracle::Attestation, + db: &SqlitePool, wallet: &xtra::Address, projection_address: &xtra::Address, ) -> Result<()> where W: xtra::Handler, { - let mut cfd = db::load_cfd(order_id, conn).await?; + let mut conn = db.acquire().await?; - let signed_commit_tx = cfd.commit_tx()?; + tracing::debug!( + "Learnt latest oracle attestation for event: {}", + attestation.id + ); + + for id in db::load_all_cfd_ids(&mut conn).await? { + let cfd = try_continue!(load_cfd(id, &mut conn).await); + let event = try_continue!(cfd + .decrypt_cet(&attestation) + .context("Failed to decrypt CET using attestation")); - let txid = wallet - .send(wallet::TryBroadcastTransaction { - tx: signed_commit_tx, - }) - .await? - .context("Failed to publish commit tx")?; + try_continue!(db::append_event(event.clone(), &mut conn) + .await + .context("Failed to append events")); - if cfd.handle_commit_tx_sent()?.is_none() { - bail!("If we can get the commit tx we should be able to transition") + if let Some(event) = event { + try_continue!(post_process_event(event, wallet).await) + } } - append_cfd_state(&cfd, conn, projection_address).await?; - tracing::info!("Commit transaction published on chain: {}", txid); + projection_address.send(CfdsChanged).await?; Ok(()) } -pub async fn handle_oracle_attestation( - attestation: oracle::Attestation, - conn: &mut PoolConnection, - wallet: &xtra::Address, - projection_address: &xtra::Address, -) -> Result<()> +async fn post_process_event(event: Event, wallet: &xtra::Address) -> Result<()> where W: xtra::Handler, { - tracing::debug!( - "Learnt latest oracle attestation for event: {}", - attestation.id - ); - - let mut cfds = db::load_all_cfds(conn).await?; + match event.event { + CfdEvent::OracleAttestedPostCetTimelock { cet, .. } + | CfdEvent::CetTimelockConfirmedPostOracleAttestation { cet } => { + let txid = wallet + .send(wallet::TryBroadcastTransaction { tx: cet }) + .await? + .context("Failed to broadcast CET")?; - for (cfd, dlc) in cfds - .iter_mut() - .filter_map(|cfd| cfd.dlc().map(|dlc| (cfd, dlc))) - { - if dlc.settlement_event_id != attestation.id { - // If this CFD is not interested in this attestation we ignore it - continue; + tracing::info!(%txid, "CET published"); } + CfdEvent::OracleAttestedPriorCetTimelock { commit_tx: tx, .. } + | CfdEvent::ManualCommit { tx } => { + let txid = wallet + .send(wallet::TryBroadcastTransaction { tx }) + .await? + .context("Failed to broadcast commit transaction")?; - let attestation = try_continue!(Attestation::new( - attestation.id, - attestation.price, - attestation.scalars.clone(), - dlc, - cfd.role(), - )); - - let new_state = try_continue!(cfd.handle_oracle_attestation(attestation)); - - if new_state.is_none() { - // if we don't transition to a new state after oracle attestation we ignore the cfd - // this is for cases where we cannot handle the attestation which should be in a - // final state - continue; + tracing::info!(%txid, "Commit transaction published"); } - try_continue!(append_cfd_state(cfd, conn, projection_address).await); - try_continue!(try_cet_publication(cfd, conn, wallet, projection_address) - .await - .context("Error when trying to publish CET")); + _ => {} } Ok(()) diff --git a/daemon/src/collab_settlement_maker.rs b/daemon/src/collab_settlement_maker.rs index 79f5b50..a377739 100644 --- a/daemon/src/collab_settlement_maker.rs +++ b/daemon/src/collab_settlement_maker.rs @@ -3,8 +3,7 @@ use crate::address_map::Stopping; use crate::maker_inc_connections; use crate::model::cfd::Cfd; use crate::model::cfd::CollaborativeSettlement; -use crate::model::cfd::MakerSettlementCompleted; -use crate::model::cfd::Role; +use crate::model::cfd::Completed; use crate::model::cfd::SettlementKind; use crate::model::cfd::SettlementProposal; use crate::model::Identity; @@ -19,7 +18,7 @@ use xtra_productivity::xtra_productivity; pub struct Actor { cfd: Cfd, projection: xtra::Address, - on_completed: Box>, + on_completed: Box>>, proposal: SettlementProposal, taker_id: Identity, connections: Box>, @@ -60,27 +59,19 @@ impl Actor { "Received signature for collaborative settlement" ); - let Initiated { sig_taker } = msg; - - let dlc = self.cfd.open_dlc().context("CFD was in wrong state")?; - let (tx, sig_maker) = dlc.close_transaction(&self.proposal)?; - let spend_tx = dlc.finalize_spend_transaction((tx, sig_maker), sig_taker)?; - - let settlement = CollaborativeSettlement::new( - spend_tx.clone(), - dlc.script_pubkey_for(Role::Maker), - self.proposal.price, - )?; + let settlement = self + .cfd + .start_collaborative_settlement_maker(self.proposal.clone(), msg.sig_taker)?; self.update_proposal(None).await; - anyhow::Ok(MakerSettlementCompleted::Succeeded { + anyhow::Ok(Completed::Succeeded { order_id: self.cfd.id(), - payload: (settlement, dlc.script_pubkey_for(Role::Maker)), + payload: settlement, }) } .await - .unwrap_or_else(|e| MakerSettlementCompleted::Failed { + .unwrap_or_else(|e| Completed::Failed { order_id: self.cfd.id(), error: e, }); @@ -113,6 +104,10 @@ impl xtra::Actor for Actor { xtra::KeepRunning::StopAll } + + async fn stopped(mut self) { + let _ = self.update_proposal(None).await; + } } impl Actor { @@ -120,7 +115,7 @@ impl Actor { cfd: Cfd, proposal: SettlementProposal, projection: xtra::Address, - on_completed: &(impl MessageChannel + 'static), + on_completed: &(impl MessageChannel> + 'static), taker_id: Identity, connections: &(impl MessageChannel + 'static), (on_stopping0, on_stopping1): ( @@ -157,7 +152,7 @@ impl Actor { async fn complete( &mut self, - completed: MakerSettlementCompleted, + completed: Completed, ctx: &mut xtra::Context, ) { let _ = self @@ -200,7 +195,7 @@ impl Actor { .await .context("Failed inform taker about settlement decision") { - self.complete(MakerSettlementCompleted::Failed { order_id, error: e }, ctx) + self.complete(Completed::Failed { order_id, error: e }, ctx) .await; } } diff --git a/daemon/src/collab_settlement_taker.rs b/daemon/src/collab_settlement_taker.rs index ab6dc96..fc24a1b 100644 --- a/daemon/src/collab_settlement_taker.rs +++ b/daemon/src/collab_settlement_taker.rs @@ -3,14 +3,13 @@ use crate::address_map::Stopping; use crate::connection; use crate::model::cfd::Cfd; use crate::model::cfd::CollaborativeSettlement; +use crate::model::cfd::Completed; use crate::model::cfd::SettlementKind; use crate::model::cfd::SettlementProposal; -use crate::model::cfd::TakerSettlementCompleted; use crate::model::Price; use crate::projection; use crate::send_async_safe::SendAsyncSafe; use crate::wire; -use anyhow::Context; use anyhow::Result; use async_trait::async_trait; use xtra::prelude::MessageChannel; @@ -19,7 +18,7 @@ use xtra_productivity::xtra_productivity; pub struct Actor { cfd: Cfd, projection: xtra::Address, - on_completed: Box>, + on_completed: Box>>, connection: xtra::Address, proposal: SettlementProposal, } @@ -28,12 +27,12 @@ impl Actor { pub fn new( cfd: Cfd, projection: xtra::Address, - on_completed: impl MessageChannel + 'static, + on_completed: impl MessageChannel> + 'static, current_price: Price, connection: xtra::Address, n_payouts: usize, ) -> Result { - let proposal = cfd.calculate_settlement(current_price, n_payouts)?; + let proposal = cfd.start_collaborative_settlement_taker(current_price, n_payouts)?; Ok(Self { cfd, @@ -45,14 +44,6 @@ impl Actor { } async fn propose(&mut self, this: xtra::Address) -> Result<()> { - if !self.cfd.is_collaborative_settle_possible() { - anyhow::bail!( - "Settlement proposal not possible because for cfd {} is in state {} which cannot be collaboratively settled", - self.cfd.id(), - self.cfd.state() - ) - } - self.connection .send(connection::ProposeSettlement { timestamp: self.proposal.timestamp, @@ -77,9 +68,11 @@ impl Actor { self.update_proposal(None).await?; - let dlc = self.cfd.dlc().context("No DLC in CFD")?; - - let (tx, sig) = dlc.close_transaction(&self.proposal)?; + // TODO: This should happen within a dedicated state machine returned from + // start_collaborative_settlement + let (tx, sig, payout_script_pubkey) = self + .cfd + .sign_collaborative_close_transaction_taker(&self.proposal)?; self.connection .send_async_safe(wire::TakerToMaker::Settlement { @@ -90,7 +83,7 @@ impl Actor { Ok(CollaborativeSettlement::new( tx, - dlc.script_pubkey_for(self.cfd.role()), // TODO: Hardcode role to Taker? + payout_script_pubkey, self.proposal.price, )?) } @@ -121,7 +114,7 @@ impl Actor { async fn complete( &mut self, - completed: TakerSettlementCompleted, + completed: Completed, ctx: &mut xtra::Context, ) { let _ = self.on_completed.send(completed).await; @@ -137,7 +130,7 @@ impl xtra::Actor for Actor { if let Err(e) = self.propose(this).await { self.complete( - TakerSettlementCompleted::Failed { + Completed::Failed { order_id: self.cfd.id(), error: e, }, @@ -154,6 +147,10 @@ impl xtra::Actor for Actor { xtra::KeepRunning::StopAll } + + async fn stopped(mut self) { + let _ = self.update_proposal(None).await; + } } #[xtra_productivity] @@ -167,18 +164,18 @@ impl Actor { let completed = match msg { wire::maker_to_taker::Settlement::Confirm => match self.handle_confirmed().await { - Ok(settlement) => TakerSettlementCompleted::Succeeded { + Ok(settlement) => Completed::Succeeded { order_id, payload: settlement, }, - Err(e) => TakerSettlementCompleted::Failed { error: e, order_id }, + Err(e) => Completed::Failed { error: e, order_id }, }, wire::maker_to_taker::Settlement::Reject => { if let Err(e) = self.handle_rejected().await { // XXX: Should this be rejected_due_to(order_id, e) instead? - TakerSettlementCompleted::Failed { error: e, order_id } + Completed::Failed { error: e, order_id } } else { - TakerSettlementCompleted::rejected(order_id) + Completed::rejected(order_id) } } }; diff --git a/daemon/src/db.rs b/daemon/src/db.rs index 26219bd..b964162 100644 --- a/daemon/src/db.rs +++ b/daemon/src/db.rs @@ -1,12 +1,17 @@ -use crate::model::cfd::Cfd; -use crate::model::cfd::CfdState; +use crate::model; +use crate::model::cfd::CfdEvent; +use crate::model::cfd::Event; use crate::model::cfd::OrderId; -use anyhow::Context; +use crate::model::cfd::Role; +use crate::model::Identity; +use crate::model::Leverage; +use crate::model::Position; +use crate::model::Price; +use crate::model::Usd; use anyhow::Result; use sqlx::pool::PoolConnection; use sqlx::Sqlite; use sqlx::SqlitePool; -use std::mem; use time::Duration; pub async fn run_migrations(pool: &SqlitePool) -> anyhow::Result<()> { @@ -14,280 +19,183 @@ pub async fn run_migrations(pool: &SqlitePool) -> anyhow::Result<()> { Ok(()) } -pub async fn insert_cfd(cfd: &Cfd, conn: &mut PoolConnection) -> anyhow::Result<()> { - let state = serde_json::to_string(&cfd.state())?; +pub async fn insert_cfd(cfd: &model::cfd::Cfd, conn: &mut PoolConnection) -> Result<()> { let query_result = sqlx::query( r#" insert into cfds ( uuid, - trading_pair, position, initial_price, leverage, - liquidation_price, - creation_timestamp_seconds, - settlement_time_interval_seconds, - role, - fee_rate, + settlement_time_interval_hours, quantity_usd, - counterparty - ) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12); - - insert into cfd_states ( - cfd_id, - state - ) - select - id as cfd_id, - $13 as state - from cfds - order by id desc limit 1; - "#, + counterparty_network_identity, + role + ) values ($1, $2, $3, $4, $5, $6, $7, $8)"#, ) .bind(&cfd.id()) - .bind(&cfd.trading_pair()) .bind(&cfd.position()) - .bind(&cfd.price()) + .bind(&cfd.initial_price()) .bind(&cfd.leverage()) - .bind(&cfd.liquidation_price()) - .bind(&cfd.creation_timestamp()) - .bind(&cfd.settlement_interval().whole_seconds()) + .bind(&cfd.settlement_time_interval_hours().whole_hours()) + .bind(&cfd.quantity()) + .bind(&cfd.counterparty_network_identity()) .bind(&cfd.role()) - .bind(&cfd.fee_rate()) - .bind(&cfd.quantity_usd()) - .bind(&cfd.counterparty()) - .bind(state) .execute(conn) - .await - .with_context(|| format!("Failed to insert CFD with id {}", cfd.id()))?; + .await?; - // Should be 2 because we insert into cfds and cfd_states - if query_result.rows_affected() != 2 { + if query_result.rows_affected() != 1 { anyhow::bail!("failed to insert cfd"); } Ok(()) } -pub async fn append_cfd_state(cfd: &Cfd, conn: &mut PoolConnection) -> anyhow::Result<()> { - let cfd_id = load_cfd_id_by_order_uuid(cfd.id(), conn).await?; - let current_state = load_latest_cfd_state(cfd_id, conn) - .await - .context("loading latest state failed")?; - let new_state = &cfd.state(); - - if mem::discriminant(¤t_state) == mem::discriminant(new_state) { - // Since we have states where we add information this happens quite frequently - tracing::trace!( - "Same state transition for cfd with order_id {}: {}", - cfd.id(), - current_state - ); - } +/// Appends an event to the `events` table. +/// +/// To make handling of `None` events more ergonomic, you can pass anything in here that implements +/// `Into