Browse Source
Co-authored-by: Mariusz Klochowicz <mariusz@klochowicz.com> Co-authored-by: Daniel Karzel <daniel@comit.network>feature/rollover-event-sourcing
Thomas Eizinger
3 years ago
40 changed files with 2269 additions and 3412 deletions
@ -0,0 +1,29 @@ |
|||||
|
drop table cfd_states; |
||||
|
drop table cfds; |
||||
|
|
||||
|
create table if not exists cfds |
||||
|
( |
||||
|
id integer primary key autoincrement, |
||||
|
uuid text unique not null, |
||||
|
position text not null, |
||||
|
initial_price text not null, |
||||
|
leverage integer not null, |
||||
|
settlement_time_interval_hours integer not null, |
||||
|
quantity_usd text not null, |
||||
|
counterparty_network_identity text not null, |
||||
|
role text not null |
||||
|
); |
||||
|
|
||||
|
create unique index if not exists cfds_uuid |
||||
|
on cfds (uuid); |
||||
|
|
||||
|
create table if not exists events |
||||
|
( |
||||
|
id integer primary key autoincrement, |
||||
|
cfd_id integer not null, |
||||
|
name text not null, |
||||
|
data text not null, |
||||
|
created_at text not null, |
||||
|
|
||||
|
foreign key (cfd_id) references cfds (id) |
||||
|
) |
@ -1,91 +0,0 @@ |
|||||
use crate::db::append_cfd_state; |
|
||||
use crate::db::load_all_cfds; |
|
||||
use crate::model::cfd::Cfd; |
|
||||
use crate::model::cfd::CfdState; |
|
||||
use crate::try_continue; |
|
||||
use crate::wallet; |
|
||||
use anyhow::Result; |
|
||||
use sqlx::pool::PoolConnection; |
|
||||
use sqlx::Sqlite; |
|
||||
use sqlx::SqlitePool; |
|
||||
use xtra::Address; |
|
||||
|
|
||||
/// Perform necessary housekeeping before actor system startup
|
|
||||
pub async fn new(db: &SqlitePool, wallet: &Address<wallet::Actor>) -> Result<()> { |
|
||||
let mut conn = db.acquire().await?; |
|
||||
|
|
||||
transition_non_continue_cfds_to_setup_failed(&mut conn).await?; |
|
||||
rebroadcast_transactions(&mut conn, wallet).await?; |
|
||||
Ok(()) |
|
||||
} |
|
||||
|
|
||||
async fn transition_non_continue_cfds_to_setup_failed( |
|
||||
conn: &mut PoolConnection<Sqlite>, |
|
||||
) -> Result<()> { |
|
||||
let mut cfds = load_all_cfds(conn).await?; |
|
||||
|
|
||||
for cfd in cfds.iter_mut().filter(|cfd| Cfd::is_cleanup(cfd)) { |
|
||||
*cfd.state_mut() = CfdState::setup_failed(format!( |
|
||||
"Was in state {} which cannot be continued.", |
|
||||
cfd.state() |
|
||||
)); |
|
||||
|
|
||||
append_cfd_state(cfd, conn).await?; |
|
||||
} |
|
||||
|
|
||||
Ok(()) |
|
||||
} |
|
||||
|
|
||||
async fn rebroadcast_transactions( |
|
||||
conn: &mut PoolConnection<Sqlite>, |
|
||||
wallet: &Address<wallet::Actor>, |
|
||||
) -> Result<()> { |
|
||||
let cfds = load_all_cfds(conn).await?; |
|
||||
|
|
||||
for dlc in cfds.iter().filter_map(Cfd::pending_open_dlc) { |
|
||||
let txid = try_continue!(wallet |
|
||||
.send(wallet::TryBroadcastTransaction { |
|
||||
tx: dlc.lock.0.clone() |
|
||||
}) |
|
||||
.await |
|
||||
.expect("if sending to actor fails here we are screwed anyway")); |
|
||||
tracing::info!("Lock transaction published with txid {}", txid); |
|
||||
} |
|
||||
|
|
||||
for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) { |
|
||||
let signed_refund_tx = cfd.refund_tx()?; |
|
||||
let txid = try_continue!(wallet |
|
||||
.send(wallet::TryBroadcastTransaction { |
|
||||
tx: signed_refund_tx |
|
||||
}) |
|
||||
.await |
|
||||
.expect("if sending to actor fails here we are screwed anyway")); |
|
||||
|
|
||||
tracing::info!("Refund transaction published on chain: {}", txid); |
|
||||
} |
|
||||
|
|
||||
for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) { |
|
||||
let signed_commit_tx = cfd.commit_tx()?; |
|
||||
let txid = try_continue!(wallet |
|
||||
.send(wallet::TryBroadcastTransaction { |
|
||||
tx: signed_commit_tx |
|
||||
}) |
|
||||
.await |
|
||||
.expect("if sending to actor fails here we are screwed anyway")); |
|
||||
|
|
||||
tracing::info!("Commit transaction published on chain: {}", txid); |
|
||||
} |
|
||||
|
|
||||
for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_cet(cfd)) { |
|
||||
// Double question mark OK because if we are in PendingCet we must have been Ready before
|
|
||||
let signed_cet = cfd.cet()??; |
|
||||
let txid = try_continue!(wallet |
|
||||
.send(wallet::TryBroadcastTransaction { tx: signed_cet }) |
|
||||
.await |
|
||||
.expect("if sending to actor fails here we are screwed anyway")); |
|
||||
|
|
||||
tracing::info!("CET published on chain: {}", txid); |
|
||||
} |
|
||||
|
|
||||
Ok(()) |
|
||||
} |
|
File diff suppressed because it is too large
File diff suppressed because it is too large
@ -1,119 +0,0 @@ |
|||||
use bdk::bitcoin::Network; |
|
||||
use bdk::bitcoin::Txid; |
|
||||
use serde::Serialize; |
|
||||
|
|
||||
use crate::model::cfd; |
|
||||
|
|
||||
#[derive(Debug, Clone, Serialize)] |
|
||||
pub struct TxUrl { |
|
||||
pub label: TxLabel, |
|
||||
pub url: String, |
|
||||
} |
|
||||
|
|
||||
/// Construct a mempool.space URL for a given txid
|
|
||||
pub fn to_mempool_url(txid: Txid, network: Network) -> String { |
|
||||
match network { |
|
||||
Network::Bitcoin => format!("https://mempool.space/tx/{}", txid), |
|
||||
Network::Testnet => format!("https://mempool.space/testnet/tx/{}", txid), |
|
||||
Network::Signet => format!("https://mempool.space/signet/tx/{}", txid), |
|
||||
Network::Regtest => txid.to_string(), |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
impl TxUrl { |
|
||||
pub fn new(txid: Txid, network: Network, label: TxLabel) -> Self { |
|
||||
Self { |
|
||||
label, |
|
||||
url: to_mempool_url(txid, network), |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
pub fn to_tx_url_list(state: cfd::CfdState, network: Network) -> Vec<TxUrl> { |
|
||||
use cfd::CfdState::*; |
|
||||
|
|
||||
let tx_ub = TxUrlBuilder::new(network); |
|
||||
|
|
||||
match state { |
|
||||
PendingOpen { dlc, .. } => { |
|
||||
vec![tx_ub.lock(&dlc)] |
|
||||
} |
|
||||
PendingCommit { dlc, .. } => vec![tx_ub.lock(&dlc), tx_ub.commit(&dlc)], |
|
||||
OpenCommitted { dlc, .. } => vec![tx_ub.lock(&dlc), tx_ub.commit(&dlc)], |
|
||||
Open { |
|
||||
dlc, |
|
||||
collaborative_close, |
|
||||
.. |
|
||||
} => { |
|
||||
let mut tx_urls = vec![tx_ub.lock(&dlc)]; |
|
||||
if let Some(collaborative_close) = collaborative_close { |
|
||||
tx_urls.push(tx_ub.collaborative_close(collaborative_close.tx.txid())); |
|
||||
} |
|
||||
tx_urls |
|
||||
} |
|
||||
PendingCet { |
|
||||
dlc, attestation, .. |
|
||||
} => vec![ |
|
||||
tx_ub.lock(&dlc), |
|
||||
tx_ub.commit(&dlc), |
|
||||
tx_ub.cet(attestation.txid()), |
|
||||
], |
|
||||
Closed { |
|
||||
payout: cfd::Payout::Cet(attestation), |
|
||||
.. |
|
||||
} => vec![tx_ub.cet(attestation.txid())], |
|
||||
Closed { |
|
||||
payout: cfd::Payout::CollaborativeClose(collaborative_close), |
|
||||
.. |
|
||||
} => { |
|
||||
vec![tx_ub.collaborative_close(collaborative_close.tx.txid())] |
|
||||
} |
|
||||
PendingRefund { dlc, .. } => vec![tx_ub.lock(&dlc), tx_ub.commit(&dlc), tx_ub.refund(&dlc)], |
|
||||
Refunded { dlc, .. } => vec![tx_ub.refund(&dlc)], |
|
||||
OutgoingOrderRequest { .. } |
|
||||
| IncomingOrderRequest { .. } |
|
||||
| Accepted { .. } |
|
||||
| Rejected { .. } |
|
||||
| ContractSetup { .. } |
|
||||
| SetupFailed { .. } => vec![], |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
struct TxUrlBuilder { |
|
||||
network: Network, |
|
||||
} |
|
||||
|
|
||||
impl TxUrlBuilder { |
|
||||
pub fn new(network: Network) -> Self { |
|
||||
Self { network } |
|
||||
} |
|
||||
|
|
||||
pub fn lock(&self, dlc: &cfd::Dlc) -> TxUrl { |
|
||||
TxUrl::new(dlc.lock.0.txid(), self.network, TxLabel::Lock) |
|
||||
} |
|
||||
|
|
||||
pub fn commit(&self, dlc: &cfd::Dlc) -> TxUrl { |
|
||||
TxUrl::new(dlc.commit.0.txid(), self.network, TxLabel::Commit) |
|
||||
} |
|
||||
|
|
||||
pub fn cet(&self, txid: Txid) -> TxUrl { |
|
||||
TxUrl::new(txid, self.network, TxLabel::Cet) |
|
||||
} |
|
||||
|
|
||||
pub fn collaborative_close(&self, txid: Txid) -> TxUrl { |
|
||||
TxUrl::new(txid, self.network, TxLabel::Collaborative) |
|
||||
} |
|
||||
|
|
||||
pub fn refund(&self, dlc: &cfd::Dlc) -> TxUrl { |
|
||||
TxUrl::new(dlc.refund.0.txid(), self.network, TxLabel::Refund) |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
#[derive(Debug, Clone, Serialize)] |
|
||||
pub enum TxLabel { |
|
||||
Lock, |
|
||||
Commit, |
|
||||
Cet, |
|
||||
Refund, |
|
||||
Collaborative, |
|
||||
} |
|
Loading…
Reference in new issue