Browse Source

Merge #665

665: Refactor model to use event sourcing r=thomaseizinger a=da-kami

Make the change easy, then make the easy change.

## Work that makes / will make this change easier:

- [x] https://github.com/itchysats/itchysats/pull/796
- [x] https://github.com/itchysats/itchysats/pull/795
- [x] https://github.com/itchysats/itchysats/pull/791
- [x] https://github.com/itchysats/itchysats/pull/765
- [x] https://github.com/itchysats/itchysats/pull/743
- [x] https://github.com/itchysats/itchysats/pull/769
- [x] https://github.com/itchysats/itchysats/pull/746
- [x] https://github.com/itchysats/itchysats/pull/744
- [x] https://github.com/itchysats/itchysats/pull/740
- [x] https://github.com/itchysats/itchysats/pull/715
- [x] https://github.com/itchysats/itchysats/pull/707
- [x] https://github.com/itchysats/itchysats/pull/718
- [x] https://github.com/itchysats/itchysats/pull/708
- [x] https://github.com/itchysats/itchysats/pull/684
- [x] https://github.com/itchysats/itchysats/pull/704
- [x] https://github.com/itchysats/itchysats/pull/695
- [x] https://github.com/itchysats/itchysats/pull/690
- [x] https://github.com/itchysats/itchysats/pull/645
- [x] #743
- [x] https://github.com/itchysats/itchysats/pull/819
- [x] https://github.com/itchysats/itchysats/pull/775
- [x] https://github.com/itchysats/itchysats/pull/806
- [x] https://github.com/itchysats/itchysats/pull/863 (this reduces the number of merge conflicts during rebases)
- [x] https://github.com/itchysats/itchysats/pull/876 (this will make rebasing easier because `CfdAggregate` has getters)
- [ ] Move all failure cases for incoming new order into protocol actor (fetching oracle announcement, etc)
- [ ] Remove this check? 0a9240abe2/daemon/src/taker_cfd.rs (L211-L213)
- [x] https://github.com/itchysats/itchysats/pull/899
- [x] https://github.com/itchysats/itchysats/pull/900
- [x] https://github.com/itchysats/itchysats/pull/917

## Missing bits in this PR

- [x] Manually switch to `imort-granularity: item` before the next rebase
- [x] Delete old `Cfd` and rename `CfdAggregate` to `Cfd` (this will make rebasing a lot easier because there are no naming conflicts)
- [x] Create dedicated projection from events for projection actor
- [x] Create dedicated projection from events for monitoring actor (for startup)
- [ ] Grep for TODOs and fix them
- [ ] ~Consider introducing a central `DomainEventActor` right away instead of doing another iteration~
- [ ] ~Consider introducing a housekeeping task that moves closed CFDs into an archive table and prunes all events~

## Features to re-introduce

- [ ] fee-rate inside CFD
- [ ] unit tests covering rollover

Fixes https://github.com/itchysats/itchysats/issues/646.
Fixes https://github.com/itchysats/itchysats/issues/644.
Fixes https://github.com/itchysats/itchysats/issues/375 (will be obsolete).
Fixes https://github.com/itchysats/itchysats/issues/725 (will be obsolete due to change in DB model).
Fixes https://github.com/itchysats/itchysats/issues/780.

Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
heartbeat-event-source
bors[bot] 3 years ago
committed by GitHub
parent
commit
770efb2053
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 29
      daemon/migrations/20211220000000_introduce-event-sourcing.sql
  2. 2
      daemon/prepare_db.sh
  3. 148
      daemon/sqlx-data.json
  4. 64
      daemon/src/auto_rollover.rs
  5. 2
      daemon/src/bitmex_price_feed.rs
  6. 212
      daemon/src/cfd_actors.rs
  7. 35
      daemon/src/collab_settlement_maker.rs
  8. 43
      daemon/src/collab_settlement_taker.rs
  9. 463
      daemon/src/db.rs
  10. 91
      daemon/src/housekeeping.rs
  11. 6
      daemon/src/lib.rs
  12. 4
      daemon/src/maker.rs
  13. 383
      daemon/src/maker_cfd.rs
  14. 6
      daemon/src/maker_inc_connections.rs
  15. 11
      daemon/src/model.rs
  16. 2015
      daemon/src/model/cfd.rs
  17. 252
      daemon/src/monitor.rs
  18. 61
      daemon/src/oracle.rs
  19. 1015
      daemon/src/projection.rs
  20. 87
      daemon/src/rollover_maker.rs
  21. 37
      daemon/src/rollover_taker.rs
  22. 2
      daemon/src/routes_maker.rs
  23. 4
      daemon/src/routes_taker.rs
  24. 31
      daemon/src/send_async_safe.rs
  25. 11
      daemon/src/setup_contract.rs
  26. 49
      daemon/src/setup_maker.rs
  27. 40
      daemon/src/setup_taker.rs
  28. 4
      daemon/src/taker.rs
  29. 248
      daemon/src/taker_cfd.rs
  30. 2
      daemon/src/to_sse_event.rs
  31. 119
      daemon/src/tx.rs
  32. 6
      daemon/src/xtra_ext.rs
  33. 26
      daemon/tests/happy_path.rs
  34. 7
      daemon/tests/harness/flow.rs
  35. 2
      daemon/tests/harness/maia.rs
  36. 9
      daemon/tests/harness/mocks/mod.rs
  37. 14
      daemon/tests/harness/mod.rs
  38. 86
      docs/asset/mvp_maker_taker_db.puml
  39. 27
      maker-frontend/src/components/Types.tsx
  40. 28
      taker-frontend/src/types.ts

29
daemon/migrations/20211220000000_introduce-event-sourcing.sql

@ -0,0 +1,29 @@
drop table cfd_states;
drop table cfds;
create table if not exists cfds
(
id integer primary key autoincrement,
uuid text unique not null,
position text not null,
initial_price text not null,
leverage integer not null,
settlement_time_interval_hours integer not null,
quantity_usd text not null,
counterparty_network_identity text not null,
role text not null
);
create unique index if not exists cfds_uuid
on cfds (uuid);
create table if not exists events
(
id integer primary key autoincrement,
cfd_id integer not null,
name text not null,
data text not null,
created_at text not null,
foreign key (cfd_id) references cfds (id)
)

2
daemon/prepare_db.sh

@ -15,4 +15,4 @@ trap 'rm -f $TEMPDB' EXIT
DATABASE_URL=sqlite:$TEMPDB cargo sqlx migrate run
# prepare the sqlx-data.json rust mappings
DATABASE_URL=sqlite:./$DAEMON_DIR/$TEMPDB cargo sqlx prepare -- --bin taker
DATABASE_URL=sqlite:./$DAEMON_DIR/$TEMPDB SQLX_OFFLINE=true cargo sqlx prepare -- --bin taker

148
daemon/sqlx-data.json

@ -1,34 +1,34 @@
{
"db": "SQLite",
"221a6283db798bacaba99e7e85130f9a8bbea1299d8cb99d272b1d478dc19775": {
"query": "\n select\n state\n from cfd_states\n where cfd_id = $1\n order by id desc\n limit 1;\n ",
"7f977cdcbd7287d249b0a467e48f6788d196b267e3df3970d614848b8c899a61": {
"query": "\n select\n uuid as \"uuid: crate::model::cfd::OrderId\"\n from\n cfds\n ",
"describe": {
"columns": [
{
"name": "state",
"name": "uuid: crate::model::cfd::OrderId",
"ordinal": 0,
"type_info": "Text"
}
],
"parameters": {
"Right": 1
"Right": 0
},
"nullable": [
false
]
}
},
"8708389be41d08359966b16ea018a0fd39acbf61981c1933b46d3b50bb430311": {
"query": "\n with state as (\n select\n cfd_id,\n state\n from cfd_states\n inner join cfds on cfds.id = cfd_states.cfd_id\n where cfd_states.id in (\n select\n max(id) as id\n from cfd_states\n group by (cfd_id)\n )\n )\n\n select\n cfds.uuid as \"uuid: crate::model::cfd::OrderId\",\n cfds.trading_pair as \"trading_pair: crate::model::TradingPair\",\n cfds.position as \"position: crate::model::Position\",\n cfds.initial_price as \"initial_price: crate::model::Price\",\n cfds.leverage as \"leverage: crate::model::Leverage\",\n cfds.liquidation_price as \"liquidation_price: crate::model::Price\",\n cfds.creation_timestamp_seconds as \"creation_timestamp_seconds: crate::model::Timestamp\",\n cfds.settlement_time_interval_seconds as \"settlement_time_interval_secs: i64\",\n cfds.role as \"role: crate::model::cfd::Role\",\n cfds.fee_rate as \"fee_rate: u32\",\n cfds.quantity_usd as \"quantity_usd: crate::model::Usd\",\n cfds.counterparty as \"counterparty: crate::model::Identity\",\n state.state\n\n from cfds\n inner join state on state.cfd_id = cfds.id\n ",
"e8a672355cd8c799b6291ccb629837dcd3a3fa9d3954bb78d22ba98e99674341": {
"query": "\n select\n id as cfd_id,\n uuid as \"uuid: crate::model::cfd::OrderId\",\n position as \"position: crate::model::Position\",\n initial_price as \"initial_price: crate::model::Price\",\n leverage as \"leverage: crate::model::Leverage\",\n settlement_time_interval_hours,\n quantity_usd as \"quantity_usd: crate::model::Usd\",\n counterparty_network_identity as \"counterparty_network_identity: crate::model::Identity\",\n role as \"role: crate::model::cfd::Role\"\n from\n cfds\n where\n cfds.uuid = $1\n ",
"describe": {
"columns": [
{
"name": "uuid: crate::model::cfd::OrderId",
"name": "cfd_id",
"ordinal": 0,
"type_info": "Text"
"type_info": "Int64"
},
{
"name": "trading_pair: crate::model::TradingPair",
"name": "uuid: crate::model::cfd::OrderId",
"ordinal": 1,
"type_info": "Text"
},
@ -48,55 +48,31 @@
"type_info": "Int64"
},
{
"name": "liquidation_price: crate::model::Price",
"name": "settlement_time_interval_hours",
"ordinal": 5,
"type_info": "Text"
},
{
"name": "creation_timestamp_seconds: crate::model::Timestamp",
"ordinal": 6,
"type_info": "Int64"
},
{
"name": "settlement_time_interval_secs: i64",
"ordinal": 7,
"type_info": "Int64"
},
{
"name": "role: crate::model::cfd::Role",
"ordinal": 8,
"type_info": "Text"
},
{
"name": "fee_rate: u32",
"ordinal": 9,
"type_info": "Int64"
},
{
"name": "quantity_usd: crate::model::Usd",
"ordinal": 10,
"ordinal": 6,
"type_info": "Text"
},
{
"name": "counterparty: crate::model::Identity",
"ordinal": 11,
"name": "counterparty_network_identity: crate::model::Identity",
"ordinal": 7,
"type_info": "Text"
},
{
"name": "state",
"ordinal": 12,
"name": "role: crate::model::cfd::Role",
"ordinal": 8,
"type_info": "Text"
}
],
"parameters": {
"Right": 0
"Right": 1
},
"nullable": [
false,
false,
false,
false,
false,
true,
false,
false,
false,
@ -108,112 +84,34 @@
]
}
},
"9f31d4002a7328b199a24d50149f2724706e2d391a94b76d7894983f5eb71c4b": {
"query": "\n select\n id\n from cfds\n where cfds.uuid = $1;\n ",
"fdf6b7cee19e20e6c3ba00a821b5d92949a707a23c9fc8ebbc4502ffd7b1a5f1": {
"query": "\n\n select\n name,\n data,\n created_at as \"created_at: crate::model::Timestamp\"\n from\n events\n where\n cfd_id = $1\n ",
"describe": {
"columns": [
{
"name": "id",
"ordinal": 0,
"type_info": "Int64"
}
],
"parameters": {
"Right": 1
},
"nullable": [
true
]
}
},
"a22bf53971e4e255ca63d16cc7eb37ded66b0c4c375828f13ae5c28aa975441e": {
"query": "\n with state as (\n select\n cfd_id,\n state\n from cfd_states\n inner join cfds on cfds.id = cfd_states.cfd_id\n where cfd_states.id in (\n select\n max(id) as id\n from cfd_states\n group by (cfd_id)\n )\n )\n\n select\n cfds.uuid as \"uuid: crate::model::cfd::OrderId\",\n cfds.trading_pair as \"trading_pair: crate::model::TradingPair\",\n cfds.position as \"position: crate::model::Position\",\n cfds.initial_price as \"initial_price: crate::model::Price\",\n cfds.leverage as \"leverage: crate::model::Leverage\",\n cfds.liquidation_price as \"liquidation_price: crate::model::Price\",\n cfds.creation_timestamp_seconds as \"creation_timestamp_seconds: crate::model::Timestamp\",\n cfds.settlement_time_interval_seconds as \"settlement_time_interval_secs: i64\",\n cfds.role as \"role: crate::model::cfd::Role\",\n cfds.fee_rate as \"fee_rate: u32\",\n cfds.quantity_usd as \"quantity_usd: crate::model::Usd\",\n cfds.counterparty as \"counterparty: crate::model::Identity\",\n state.state\n\n from cfds\n inner join state on state.cfd_id = cfds.id\n\n where cfds.uuid = $1\n ",
"describe": {
"columns": [
{
"name": "uuid: crate::model::cfd::OrderId",
"name": "name",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "trading_pair: crate::model::TradingPair",
"name": "data",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "position: crate::model::Position",
"name": "created_at: crate::model::Timestamp",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "initial_price: crate::model::Price",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "leverage: crate::model::Leverage",
"ordinal": 4,
"type_info": "Int64"
},
{
"name": "liquidation_price: crate::model::Price",
"ordinal": 5,
"type_info": "Text"
},
{
"name": "creation_timestamp_seconds: crate::model::Timestamp",
"ordinal": 6,
"type_info": "Int64"
},
{
"name": "settlement_time_interval_secs: i64",
"ordinal": 7,
"type_info": "Int64"
},
{
"name": "role: crate::model::cfd::Role",
"ordinal": 8,
"type_info": "Text"
},
{
"name": "fee_rate: u32",
"ordinal": 9,
"type_info": "Int64"
},
{
"name": "quantity_usd: crate::model::Usd",
"ordinal": 10,
"type_info": "Text"
},
{
"name": "counterparty: crate::model::Identity",
"ordinal": 11,
"type_info": "Text"
},
{
"name": "state",
"ordinal": 12,
"type_info": "Text"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false
]
}
}
}
}

64
daemon/src/auto_rollover.rs

@ -1,15 +1,11 @@
use crate::address_map::AddressMap;
use crate::address_map::Stopping;
use crate::cfd_actors::append_cfd_state;
use crate::cfd_actors::load_cfd;
use crate::connection;
use crate::db;
use crate::db::load_cfd;
use crate::model::cfd::CfdState;
use crate::model::cfd::CfdStateCommon;
use crate::model::cfd::OrderId;
use crate::model::cfd::RolloverCompleted;
use crate::monitor;
use crate::monitor::MonitorParams;
use crate::oracle;
use crate::projection;
use crate::rollover_taker;
@ -27,7 +23,7 @@ pub struct Actor<O, M> {
oracle_pk: schnorrsig::PublicKey,
projection_actor: Address<projection::Actor>,
conn_actor: Address<connection::Actor>,
monitor_actor: Address<M>,
_monitor_actor: Address<M>,
oracle_actor: Address<O>,
n_payouts: usize,
@ -51,7 +47,7 @@ impl<O, M> Actor<O, M> {
oracle_pk,
projection_actor,
conn_actor,
monitor_actor,
_monitor_actor: monitor_actor,
oracle_actor,
n_payouts,
rollover_actors: AddressMap::default(),
@ -70,21 +66,24 @@ where
tracing::trace!("Checking all CFDs for rollover eligibility");
let mut conn = self.db.acquire().await?;
let cfds = db::load_all_cfds(&mut conn).await?;
let cfd_ids = db::load_all_cfd_ids(&mut conn).await?;
let this = ctx
.address()
.expect("actor to be able to give address to itself");
for cfd in cfds {
let disconnected = match self.rollover_actors.get_disconnected(cfd.id()) {
for id in cfd_ids {
let disconnected = match self.rollover_actors.get_disconnected(id) {
Ok(disconnected) => disconnected,
Err(_) => {
tracing::debug!(order_id=%cfd.id(), "Rollover already in progress");
tracing::debug!(order_id=%id, "Rollover already in progress");
continue;
}
};
// TODO: Shall this have a try_continue?
let cfd = load_cfd(id, &mut conn).await?;
let (addr, fut) = rollover_taker::Actor::new(
(cfd, self.n_payouts),
self.oracle_pk,
@ -105,7 +104,7 @@ where
}
}
#[xtra_productivity]
#[xtra_productivity(message_impl = false)]
impl<O, M> Actor<O, M>
where
O: 'static,
@ -113,45 +112,8 @@ where
M: xtra::Handler<monitor::StartMonitoring>,
O: xtra::Handler<oracle::MonitorAttestation> + xtra::Handler<oracle::GetAnnouncement>,
{
async fn handle_rollover_completed(&mut self, msg: RolloverCompleted) -> Result<()> {
let (order_id, dlc) = match msg {
RolloverCompleted::Succeeded {
order_id,
payload: (dlc, _),
} => (order_id, dlc),
RolloverCompleted::Rejected { order_id, reason } => {
tracing::debug!(%order_id, "Not rolled over: {:#}", reason);
return Ok(());
}
RolloverCompleted::Failed { order_id, error } => {
tracing::warn!(%order_id, "Rollover failed: {:#}", error);
return Ok(());
}
};
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
*cfd.state_mut() = CfdState::Open {
common: CfdStateCommon::default(),
dlc: dlc.clone(),
attestation: None,
collaborative_close: None,
};
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
self.monitor_actor
.send(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()),
})
.await?;
self.oracle_actor
.send(oracle::MonitorAttestation {
event_id: dlc.settlement_event_id,
})
.await?;
async fn handle_rollover_completed(&mut self, _: RolloverCompleted) -> Result<()> {
// TODO: Implement this in terms of event sourcing
Ok(())
}

2
daemon/src/bitmex_price_feed.rs

@ -125,7 +125,7 @@ pub enum StopReason {
StreamEnded,
}
#[derive(Clone, Debug)]
#[derive(Clone, Copy, Debug)]
pub struct Quote {
pub timestamp: Timestamp,
pub bid: Price,

212
daemon/src/cfd_actors.rs

@ -1,18 +1,19 @@
use crate::db;
use crate::model::cfd::Attestation;
use crate::model::cfd::Cfd;
use crate::model::cfd::CfdState;
use crate::model::cfd::CfdEvent;
use crate::model::cfd::Event;
use crate::model::cfd::OrderId;
use crate::monitor;
use crate::oracle;
use crate::projection;
use crate::projection::CfdsChanged;
use crate::try_continue;
use crate::wallet;
use anyhow::bail;
use anyhow::Context;
use anyhow::Result;
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use sqlx::SqlitePool;
pub async fn insert_cfd_and_update_feed(
cfd: &Cfd,
@ -20,54 +21,80 @@ pub async fn insert_cfd_and_update_feed(
projection_address: &xtra::Address<projection::Actor>,
) -> Result<()> {
db::insert_cfd(cfd, conn).await?;
projection_address.send(projection::CfdsChanged).await??;
projection_address.send(projection::CfdsChanged).await?;
Ok(())
}
pub async fn append_cfd_state(
cfd: &Cfd,
conn: &mut PoolConnection<Sqlite>,
projection_address: &xtra::Address<projection::Actor>,
) -> Result<()> {
db::append_cfd_state(cfd, conn).await?;
projection_address.send(projection::CfdsChanged).await??;
Ok(())
}
pub async fn try_cet_publication<W>(
cfd: &mut Cfd,
conn: &mut PoolConnection<Sqlite>,
pub async fn handle_monitoring_event<W>(
event: monitor::Event,
db: &SqlitePool,
wallet: &xtra::Address<W>,
projection_address: &xtra::Address<projection::Actor>,
) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
match cfd.cet()? {
Ok(cet) => {
let txid = wallet
.send(wallet::TryBroadcastTransaction { tx: cet })
.await?
.context("Failed to send transaction")?;
tracing::info!("CET published with txid {}", txid);
let mut conn = db.acquire().await?;
if cfd.handle_cet_sent()?.is_none() {
bail!("If we can get the CET we should be able to transition")
}
let order_id = event.order_id();
append_cfd_state(cfd, conn, projection_address).await?;
}
Err(not_ready_yet) => {
tracing::debug!("{:#}", not_ready_yet);
return Ok(());
let cfd = load_cfd(order_id, &mut conn).await?;
let event = match event {
monitor::Event::LockFinality(_) => cfd.handle_lock_confirmed(),
monitor::Event::CommitFinality(_) => cfd.handle_commit_confirmed(),
monitor::Event::CloseFinality(_) => cfd.handle_collaborative_settlement_confirmed(),
monitor::Event::CetTimelockExpired(_) => {
if let Ok(event) = cfd.handle_cet_timelock_expired() {
event
} else {
return Ok(()); // Early return from a no-op
}
}
monitor::Event::CetFinality(_) => cfd.handle_cet_confirmed(),
monitor::Event::RefundTimelockExpired(_) => cfd.handle_refund_timelock_expired(),
monitor::Event::RefundFinality(_) => cfd.handle_refund_confirmed(),
monitor::Event::RevokedTransactionFound(_) => cfd.handle_revoke_confirmed(),
};
db::append_event(event.clone(), &mut conn).await?;
post_process_event(event, wallet).await?;
projection_address.send(CfdsChanged).await?;
Ok(())
}
pub async fn handle_monitoring_event<W>(
event: monitor::Event,
/// Load a CFD from the database and rehydrate as the [`model::cfd::Cfd`] aggregate.
pub async fn load_cfd(order_id: OrderId, conn: &mut PoolConnection<Sqlite>) -> Result<Cfd> {
let (
db::Cfd {
id,
position,
initial_price,
leverage,
settlement_interval,
counterparty_network_identity,
role,
quantity_usd,
},
events,
) = db::load_cfd(order_id, conn).await?;
let cfd = Cfd::rehydrate(
id,
position,
initial_price,
leverage,
settlement_interval,
quantity_usd,
counterparty_network_identity,
role,
events,
);
Ok(cfd)
}
pub async fn handle_commit<W>(
order_id: OrderId,
conn: &mut PoolConnection<Sqlite>,
wallet: &xtra::Address<W>,
projection_address: &xtra::Address<projection::Actor>,
@ -75,110 +102,79 @@ pub async fn handle_monitoring_event<W>(
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
let order_id = event.order_id();
let cfd = load_cfd(order_id, conn).await?;
let mut cfd = db::load_cfd(order_id, conn).await?;
let event = cfd.manual_commit_to_blockchain()?;
db::append_event(event.clone(), conn).await?;
if cfd.handle_monitoring_event(event)?.is_none() {
// early exit if there was not state change
// this is for cases where we are already in a final state
return Ok(());
}
append_cfd_state(&cfd, conn, projection_address).await?;
post_process_event(event, wallet).await?;
if let CfdState::OpenCommitted { .. } = cfd.state() {
try_cet_publication(&mut cfd, conn, wallet, projection_address).await?;
} else if let CfdState::PendingRefund { .. } = cfd.state() {
let signed_refund_tx = cfd.refund_tx()?;
let txid = wallet
.send(wallet::TryBroadcastTransaction {
tx: signed_refund_tx,
})
.await?
.context("Failed to publish CET")?;
projection_address.send(CfdsChanged).await?;
tracing::info!("Refund transaction published on chain: {}", txid);
}
Ok(())
}
pub async fn handle_commit<W>(
order_id: OrderId,
conn: &mut PoolConnection<Sqlite>,
pub async fn handle_oracle_attestation<W>(
attestation: oracle::Attestation,
db: &SqlitePool,
wallet: &xtra::Address<W>,
projection_address: &xtra::Address<projection::Actor>,
) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
let mut cfd = db::load_cfd(order_id, conn).await?;
let mut conn = db.acquire().await?;
let signed_commit_tx = cfd.commit_tx()?;
tracing::debug!(
"Learnt latest oracle attestation for event: {}",
attestation.id
);
for id in db::load_all_cfd_ids(&mut conn).await? {
let cfd = try_continue!(load_cfd(id, &mut conn).await);
let event = try_continue!(cfd
.decrypt_cet(&attestation)
.context("Failed to decrypt CET using attestation"));
let txid = wallet
.send(wallet::TryBroadcastTransaction {
tx: signed_commit_tx,
})
.await?
.context("Failed to publish commit tx")?;
try_continue!(db::append_event(event.clone(), &mut conn)
.await
.context("Failed to append events"));
if cfd.handle_commit_tx_sent()?.is_none() {
bail!("If we can get the commit tx we should be able to transition")
if let Some(event) = event {
try_continue!(post_process_event(event, wallet).await)
}
}
append_cfd_state(&cfd, conn, projection_address).await?;
tracing::info!("Commit transaction published on chain: {}", txid);
projection_address.send(CfdsChanged).await?;
Ok(())
}
pub async fn handle_oracle_attestation<W>(
attestation: oracle::Attestation,
conn: &mut PoolConnection<Sqlite>,
wallet: &xtra::Address<W>,
projection_address: &xtra::Address<projection::Actor>,
) -> Result<()>
async fn post_process_event<W>(event: Event, wallet: &xtra::Address<W>) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
tracing::debug!(
"Learnt latest oracle attestation for event: {}",
attestation.id
);
let mut cfds = db::load_all_cfds(conn).await?;
match event.event {
CfdEvent::OracleAttestedPostCetTimelock { cet, .. }
| CfdEvent::CetTimelockConfirmedPostOracleAttestation { cet } => {
let txid = wallet
.send(wallet::TryBroadcastTransaction { tx: cet })
.await?
.context("Failed to broadcast CET")?;
for (cfd, dlc) in cfds
.iter_mut()
.filter_map(|cfd| cfd.dlc().map(|dlc| (cfd, dlc)))
{
if dlc.settlement_event_id != attestation.id {
// If this CFD is not interested in this attestation we ignore it
continue;
tracing::info!(%txid, "CET published");
}
CfdEvent::OracleAttestedPriorCetTimelock { commit_tx: tx, .. }
| CfdEvent::ManualCommit { tx } => {
let txid = wallet
.send(wallet::TryBroadcastTransaction { tx })
.await?
.context("Failed to broadcast commit transaction")?;
let attestation = try_continue!(Attestation::new(
attestation.id,
attestation.price,
attestation.scalars.clone(),
dlc,
cfd.role(),
));
let new_state = try_continue!(cfd.handle_oracle_attestation(attestation));
if new_state.is_none() {
// if we don't transition to a new state after oracle attestation we ignore the cfd
// this is for cases where we cannot handle the attestation which should be in a
// final state
continue;
tracing::info!(%txid, "Commit transaction published");
}
try_continue!(append_cfd_state(cfd, conn, projection_address).await);
try_continue!(try_cet_publication(cfd, conn, wallet, projection_address)
.await
.context("Error when trying to publish CET"));
_ => {}
}
Ok(())

35
daemon/src/collab_settlement_maker.rs

@ -3,8 +3,7 @@ use crate::address_map::Stopping;
use crate::maker_inc_connections;
use crate::model::cfd::Cfd;
use crate::model::cfd::CollaborativeSettlement;
use crate::model::cfd::MakerSettlementCompleted;
use crate::model::cfd::Role;
use crate::model::cfd::Completed;
use crate::model::cfd::SettlementKind;
use crate::model::cfd::SettlementProposal;
use crate::model::Identity;
@ -19,7 +18,7 @@ use xtra_productivity::xtra_productivity;
pub struct Actor {
cfd: Cfd,
projection: xtra::Address<projection::Actor>,
on_completed: Box<dyn MessageChannel<MakerSettlementCompleted>>,
on_completed: Box<dyn MessageChannel<Completed<CollaborativeSettlement>>>,
proposal: SettlementProposal,
taker_id: Identity,
connections: Box<dyn MessageChannel<maker_inc_connections::settlement::Response>>,
@ -60,27 +59,19 @@ impl Actor {
"Received signature for collaborative settlement"
);
let Initiated { sig_taker } = msg;
let dlc = self.cfd.open_dlc().context("CFD was in wrong state")?;
let (tx, sig_maker) = dlc.close_transaction(&self.proposal)?;
let spend_tx = dlc.finalize_spend_transaction((tx, sig_maker), sig_taker)?;
let settlement = CollaborativeSettlement::new(
spend_tx.clone(),
dlc.script_pubkey_for(Role::Maker),
self.proposal.price,
)?;
let settlement = self
.cfd
.start_collaborative_settlement_maker(self.proposal.clone(), msg.sig_taker)?;
self.update_proposal(None).await;
anyhow::Ok(MakerSettlementCompleted::Succeeded {
anyhow::Ok(Completed::Succeeded {
order_id: self.cfd.id(),
payload: (settlement, dlc.script_pubkey_for(Role::Maker)),
payload: settlement,
})
}
.await
.unwrap_or_else(|e| MakerSettlementCompleted::Failed {
.unwrap_or_else(|e| Completed::Failed {
order_id: self.cfd.id(),
error: e,
});
@ -113,6 +104,10 @@ impl xtra::Actor for Actor {
xtra::KeepRunning::StopAll
}
async fn stopped(mut self) {
let _ = self.update_proposal(None).await;
}
}
impl Actor {
@ -120,7 +115,7 @@ impl Actor {
cfd: Cfd,
proposal: SettlementProposal,
projection: xtra::Address<projection::Actor>,
on_completed: &(impl MessageChannel<MakerSettlementCompleted> + 'static),
on_completed: &(impl MessageChannel<Completed<CollaborativeSettlement>> + 'static),
taker_id: Identity,
connections: &(impl MessageChannel<maker_inc_connections::settlement::Response> + 'static),
(on_stopping0, on_stopping1): (
@ -157,7 +152,7 @@ impl Actor {
async fn complete(
&mut self,
completed: MakerSettlementCompleted,
completed: Completed<CollaborativeSettlement>,
ctx: &mut xtra::Context<Self>,
) {
let _ = self
@ -200,7 +195,7 @@ impl Actor {
.await
.context("Failed inform taker about settlement decision")
{
self.complete(MakerSettlementCompleted::Failed { order_id, error: e }, ctx)
self.complete(Completed::Failed { order_id, error: e }, ctx)
.await;
}
}

43
daemon/src/collab_settlement_taker.rs

@ -3,14 +3,13 @@ use crate::address_map::Stopping;
use crate::connection;
use crate::model::cfd::Cfd;
use crate::model::cfd::CollaborativeSettlement;
use crate::model::cfd::Completed;
use crate::model::cfd::SettlementKind;
use crate::model::cfd::SettlementProposal;
use crate::model::cfd::TakerSettlementCompleted;
use crate::model::Price;
use crate::projection;
use crate::send_async_safe::SendAsyncSafe;
use crate::wire;
use anyhow::Context;
use anyhow::Result;
use async_trait::async_trait;
use xtra::prelude::MessageChannel;
@ -19,7 +18,7 @@ use xtra_productivity::xtra_productivity;
pub struct Actor {
cfd: Cfd,
projection: xtra::Address<projection::Actor>,
on_completed: Box<dyn MessageChannel<TakerSettlementCompleted>>,
on_completed: Box<dyn MessageChannel<Completed<CollaborativeSettlement>>>,
connection: xtra::Address<connection::Actor>,
proposal: SettlementProposal,
}
@ -28,12 +27,12 @@ impl Actor {
pub fn new(
cfd: Cfd,
projection: xtra::Address<projection::Actor>,
on_completed: impl MessageChannel<TakerSettlementCompleted> + 'static,
on_completed: impl MessageChannel<Completed<CollaborativeSettlement>> + 'static,
current_price: Price,
connection: xtra::Address<connection::Actor>,
n_payouts: usize,
) -> Result<Self> {
let proposal = cfd.calculate_settlement(current_price, n_payouts)?;
let proposal = cfd.start_collaborative_settlement_taker(current_price, n_payouts)?;
Ok(Self {
cfd,
@ -45,14 +44,6 @@ impl Actor {
}
async fn propose(&mut self, this: xtra::Address<Self>) -> Result<()> {
if !self.cfd.is_collaborative_settle_possible() {
anyhow::bail!(
"Settlement proposal not possible because for cfd {} is in state {} which cannot be collaboratively settled",
self.cfd.id(),
self.cfd.state()
)
}
self.connection
.send(connection::ProposeSettlement {
timestamp: self.proposal.timestamp,
@ -77,9 +68,11 @@ impl Actor {
self.update_proposal(None).await?;
let dlc = self.cfd.dlc().context("No DLC in CFD")?;
let (tx, sig) = dlc.close_transaction(&self.proposal)?;
// TODO: This should happen within a dedicated state machine returned from
// start_collaborative_settlement
let (tx, sig, payout_script_pubkey) = self
.cfd
.sign_collaborative_close_transaction_taker(&self.proposal)?;
self.connection
.send_async_safe(wire::TakerToMaker::Settlement {
@ -90,7 +83,7 @@ impl Actor {
Ok(CollaborativeSettlement::new(
tx,
dlc.script_pubkey_for(self.cfd.role()), // TODO: Hardcode role to Taker?
payout_script_pubkey,
self.proposal.price,
)?)
}
@ -121,7 +114,7 @@ impl Actor {
async fn complete(
&mut self,
completed: TakerSettlementCompleted,
completed: Completed<CollaborativeSettlement>,
ctx: &mut xtra::Context<Self>,
) {
let _ = self.on_completed.send(completed).await;
@ -137,7 +130,7 @@ impl xtra::Actor for Actor {
if let Err(e) = self.propose(this).await {
self.complete(
TakerSettlementCompleted::Failed {
Completed::Failed {
order_id: self.cfd.id(),
error: e,
},
@ -154,6 +147,10 @@ impl xtra::Actor for Actor {
xtra::KeepRunning::StopAll
}
async fn stopped(mut self) {
let _ = self.update_proposal(None).await;
}
}
#[xtra_productivity]
@ -167,18 +164,18 @@ impl Actor {
let completed = match msg {
wire::maker_to_taker::Settlement::Confirm => match self.handle_confirmed().await {
Ok(settlement) => TakerSettlementCompleted::Succeeded {
Ok(settlement) => Completed::Succeeded {
order_id,
payload: settlement,
},
Err(e) => TakerSettlementCompleted::Failed { error: e, order_id },
Err(e) => Completed::Failed { error: e, order_id },
},
wire::maker_to_taker::Settlement::Reject => {
if let Err(e) = self.handle_rejected().await {
// XXX: Should this be rejected_due_to(order_id, e) instead?
TakerSettlementCompleted::Failed { error: e, order_id }
Completed::Failed { error: e, order_id }
} else {
TakerSettlementCompleted::rejected(order_id)
Completed::rejected(order_id)
}
}
};

463
daemon/src/db.rs

@ -1,12 +1,17 @@
use crate::model::cfd::Cfd;
use crate::model::cfd::CfdState;
use crate::model;
use crate::model::cfd::CfdEvent;
use crate::model::cfd::Event;
use crate::model::cfd::OrderId;
use anyhow::Context;
use crate::model::cfd::Role;
use crate::model::Identity;
use crate::model::Leverage;
use crate::model::Position;
use crate::model::Price;
use crate::model::Usd;
use anyhow::Result;
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use sqlx::SqlitePool;
use std::mem;
use time::Duration;
pub async fn run_migrations(pool: &SqlitePool) -> anyhow::Result<()> {
@ -14,280 +19,183 @@ pub async fn run_migrations(pool: &SqlitePool) -> anyhow::Result<()> {
Ok(())
}
pub async fn insert_cfd(cfd: &Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<()> {
let state = serde_json::to_string(&cfd.state())?;
pub async fn insert_cfd(cfd: &model::cfd::Cfd, conn: &mut PoolConnection<Sqlite>) -> Result<()> {
let query_result = sqlx::query(
r#"
insert into cfds (
uuid,
trading_pair,
position,
initial_price,
leverage,
liquidation_price,
creation_timestamp_seconds,
settlement_time_interval_seconds,
role,
fee_rate,
settlement_time_interval_hours,
quantity_usd,
counterparty
) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12);
insert into cfd_states (
cfd_id,
state
)
select
id as cfd_id,
$13 as state
from cfds
order by id desc limit 1;
"#,
counterparty_network_identity,
role
) values ($1, $2, $3, $4, $5, $6, $7, $8)"#,
)
.bind(&cfd.id())
.bind(&cfd.trading_pair())
.bind(&cfd.position())
.bind(&cfd.price())
.bind(&cfd.initial_price())
.bind(&cfd.leverage())
.bind(&cfd.liquidation_price())
.bind(&cfd.creation_timestamp())
.bind(&cfd.settlement_interval().whole_seconds())
.bind(&cfd.settlement_time_interval_hours().whole_hours())
.bind(&cfd.quantity())
.bind(&cfd.counterparty_network_identity())
.bind(&cfd.role())
.bind(&cfd.fee_rate())
.bind(&cfd.quantity_usd())
.bind(&cfd.counterparty())
.bind(state)
.execute(conn)
.await
.with_context(|| format!("Failed to insert CFD with id {}", cfd.id()))?;
.await?;
// Should be 2 because we insert into cfds and cfd_states
if query_result.rows_affected() != 2 {
if query_result.rows_affected() != 1 {
anyhow::bail!("failed to insert cfd");
}
Ok(())
}
pub async fn append_cfd_state(cfd: &Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<()> {
let cfd_id = load_cfd_id_by_order_uuid(cfd.id(), conn).await?;
let current_state = load_latest_cfd_state(cfd_id, conn)
.await
.context("loading latest state failed")?;
let new_state = &cfd.state();
if mem::discriminant(&current_state) == mem::discriminant(new_state) {
// Since we have states where we add information this happens quite frequently
tracing::trace!(
"Same state transition for cfd with order_id {}: {}",
cfd.id(),
current_state
);
}
/// Appends an event to the `events` table.
///
/// To make handling of `None` events more ergonomic, you can pass anything in here that implements
/// `Into<Option>` event.
pub async fn append_event(
event: impl Into<Option<Event>>,
conn: &mut PoolConnection<Sqlite>,
) -> Result<()> {
let event = match event.into() {
Some(event) => event,
None => return Ok(()),
};
let cfd_state = serde_json::to_string(new_state)?;
let (event_name, event_data) = event.event.to_json();
sqlx::query(
r#"
insert into cfd_states (
let query_result = sqlx::query(
r##"
insert into events (
cfd_id,
state
) values ($1, $2);
"#,
name,
data,
created_at
) values (
(select id from cfds where cfds.uuid = $1),
$2, $3, $4
)"##,
)
.bind(cfd_id)
.bind(cfd_state)
.bind(&event.id)
.bind(&event_name)
.bind(&event_data)
.bind(&event.timestamp)
.execute(conn)
.await?;
if query_result.rows_affected() != 1 {
anyhow::bail!("failed to insert event");
}
Ok(())
}
async fn load_cfd_id_by_order_uuid(
order_uuid: OrderId,
conn: &mut PoolConnection<Sqlite>,
) -> anyhow::Result<i64> {
let cfd_id = sqlx::query!(
r#"
select
id
from cfds
where cfds.uuid = $1;
"#,
order_uuid
)
.fetch_one(conn)
.await?;
let cfd_id = cfd_id.id.context("No cfd found")?;
Ok(cfd_id)
// TODO: Make sqlx directly instantiate this struct instead of mapping manually. Need to create
// newtype for `settlement_interval`.
pub struct Cfd {
pub id: OrderId,
pub position: Position,
pub initial_price: Price,
pub leverage: Leverage,
pub settlement_interval: Duration,
pub quantity_usd: Usd,
pub counterparty_network_identity: Identity,
pub role: Role,
}
async fn load_latest_cfd_state(
cfd_id: i64,
conn: &mut PoolConnection<Sqlite>,
) -> anyhow::Result<CfdState> {
let latest_cfd_state = sqlx::query!(
pub async fn load_cfd(id: OrderId, conn: &mut PoolConnection<Sqlite>) -> Result<(Cfd, Vec<Event>)> {
let cfd_row = sqlx::query!(
r#"
select
state
from cfd_states
where cfd_id = $1
order by id desc
limit 1;
"#,
cfd_id
select
id as cfd_id,
uuid as "uuid: crate::model::cfd::OrderId",
position as "position: crate::model::Position",
initial_price as "initial_price: crate::model::Price",
leverage as "leverage: crate::model::Leverage",
settlement_time_interval_hours,
quantity_usd as "quantity_usd: crate::model::Usd",
counterparty_network_identity as "counterparty_network_identity: crate::model::Identity",
role as "role: crate::model::cfd::Role"
from
cfds
where
cfds.uuid = $1
"#,
id
)
.fetch_one(conn)
.fetch_one(&mut *conn)
.await?;
let latest_cfd_state_in_db: CfdState = serde_json::from_str(latest_cfd_state.state.as_str())?;
Ok(latest_cfd_state_in_db)
}
pub async fn load_cfd(order_id: OrderId, conn: &mut PoolConnection<Sqlite>) -> Result<Cfd> {
let row = sqlx::query!(
let cfd = Cfd {
id: cfd_row.uuid,
position: cfd_row.position,
initial_price: cfd_row.initial_price,
leverage: cfd_row.leverage,
settlement_interval: Duration::hours(cfd_row.settlement_time_interval_hours),
quantity_usd: cfd_row.quantity_usd,
counterparty_network_identity: cfd_row.counterparty_network_identity,
role: cfd_row.role,
};
let events = sqlx::query!(
r#"
with state as (
select
cfd_id,
state
from cfd_states
inner join cfds on cfds.id = cfd_states.cfd_id
where cfd_states.id in (
select
max(id) as id
from cfd_states
group by (cfd_id)
)
)
select
cfds.uuid as "uuid: crate::model::cfd::OrderId",
cfds.trading_pair as "trading_pair: crate::model::TradingPair",
cfds.position as "position: crate::model::Position",
cfds.initial_price as "initial_price: crate::model::Price",
cfds.leverage as "leverage: crate::model::Leverage",
cfds.liquidation_price as "liquidation_price: crate::model::Price",
cfds.creation_timestamp_seconds as "creation_timestamp_seconds: crate::model::Timestamp",
cfds.settlement_time_interval_seconds as "settlement_time_interval_secs: i64",
cfds.role as "role: crate::model::cfd::Role",
cfds.fee_rate as "fee_rate: u32",
cfds.quantity_usd as "quantity_usd: crate::model::Usd",
cfds.counterparty as "counterparty: crate::model::Identity",
state.state
from cfds
inner join state on state.cfd_id = cfds.id
where cfds.uuid = $1
"#,
order_id
name,
data,
created_at as "created_at: crate::model::Timestamp"
from
events
where
cfd_id = $1
"#,
cfd_row.cfd_id
)
.fetch_one(conn)
.await?;
.fetch_all(&mut *conn)
.await?
.into_iter()
.map(|row| {
Ok(Event {
timestamp: row.created_at,
id,
event: CfdEvent::from_json(row.name, row.data)?,
})
})
.collect::<Result<Vec<_>>>()?;
// TODO:
// still have the use of serde_json::from_str() here, which will be dealt with
// via https://github.com/comit-network/hermes/issues/290
Ok(Cfd::new(
row.uuid,
row.trading_pair,
row.position,
row.initial_price,
row.leverage,
row.liquidation_price,
row.creation_timestamp_seconds,
Duration::new(row.settlement_time_interval_secs, 0),
row.role,
row.fee_rate,
row.quantity_usd,
serde_json::from_str(row.state.as_str())?,
row.counterparty,
))
Ok((cfd, events))
}
/// Loads all CFDs with the latest state as the CFD state
pub async fn load_all_cfds(conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<Vec<Cfd>> {
let rows = sqlx::query!(
pub async fn load_all_cfd_ids(conn: &mut PoolConnection<Sqlite>) -> Result<Vec<OrderId>> {
let ids = sqlx::query!(
r#"
with state as (
select
cfd_id,
state
from cfd_states
inner join cfds on cfds.id = cfd_states.cfd_id
where cfd_states.id in (
select
max(id) as id
from cfd_states
group by (cfd_id)
)
)
select
cfds.uuid as "uuid: crate::model::cfd::OrderId",
cfds.trading_pair as "trading_pair: crate::model::TradingPair",
cfds.position as "position: crate::model::Position",
cfds.initial_price as "initial_price: crate::model::Price",
cfds.leverage as "leverage: crate::model::Leverage",
cfds.liquidation_price as "liquidation_price: crate::model::Price",
cfds.creation_timestamp_seconds as "creation_timestamp_seconds: crate::model::Timestamp",
cfds.settlement_time_interval_seconds as "settlement_time_interval_secs: i64",
cfds.role as "role: crate::model::cfd::Role",
cfds.fee_rate as "fee_rate: u32",
cfds.quantity_usd as "quantity_usd: crate::model::Usd",
cfds.counterparty as "counterparty: crate::model::Identity",
state.state
from cfds
inner join state on state.cfd_id = cfds.id
"#
uuid as "uuid: crate::model::cfd::OrderId"
from
cfds
"#
)
.fetch_all(conn)
.await?;
let cfds = rows
.into_iter()
.map(|row| {
Ok(Cfd::new(
row.uuid,
row.trading_pair,
row.position,
row.initial_price,
row.leverage,
row.liquidation_price,
row.creation_timestamp_seconds,
Duration::new(row.settlement_time_interval_secs, 0),
row.role,
row.fee_rate,
row.quantity_usd,
serde_json::from_str(row.state.as_str())?,
row.counterparty,
))
})
.collect::<Result<Vec<_>>>()?;
.fetch_all(&mut *conn)
.await?
.into_iter()
.map(|r| r.uuid)
.collect();
Ok(cfds)
Ok(ids)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::cfd::Cfd;
use crate::model::cfd::CfdState;
use crate::model::cfd::Role;
use crate::model::Identity;
use crate::model::Leverage;
use crate::model::Position;
use crate::model::Price;
use crate::model::Timestamp;
use crate::model::TradingPair;
use crate::model::Usd;
use crate::seed::Seed;
use crate::SETTLEMENT_INTERVAL;
use pretty_assertions::assert_eq;
use rust_decimal_macros::dec;
use sqlx::SqlitePool;
@ -297,73 +205,60 @@ mod tests {
let mut conn = setup_test_db().await;
let cfd = Cfd::dummy().insert(&mut conn).await;
let loaded = load_all_cfds(&mut conn).await.unwrap();
assert_eq!(vec![cfd], loaded);
let (
super::Cfd {
id,
position,
initial_price,
leverage,
settlement_interval,
quantity_usd,
counterparty_network_identity,
role,
},
_,
) = load_cfd(cfd.id(), &mut conn).await.unwrap();
assert_eq!(cfd.id(), id);
assert_eq!(cfd.position(), position);
assert_eq!(cfd.initial_price(), initial_price);
assert_eq!(cfd.leverage(), leverage);
assert_eq!(cfd.settlement_time_interval_hours(), settlement_interval);
assert_eq!(cfd.quantity(), quantity_usd);
assert_eq!(
cfd.counterparty_network_identity(),
counterparty_network_identity
);
assert_eq!(cfd.role(), role);
}
#[tokio::test]
async fn test_insert_and_load_cfd_by_order_id() {
async fn test_append_events() {
let mut conn = setup_test_db().await;
let cfd = Cfd::dummy().insert(&mut conn).await;
let loaded = load_cfd(cfd.id(), &mut conn).await.unwrap();
assert_eq!(cfd, loaded)
}
#[tokio::test]
async fn test_insert_and_load_cfd_by_order_id_multiple() {
let mut conn = setup_test_db().await;
let cfd1 = Cfd::dummy().insert(&mut conn).await;
let cfd2 = Cfd::dummy().insert(&mut conn).await;
let loaded_1 = load_cfd(cfd1.id(), &mut conn).await.unwrap();
let loaded_2 = load_cfd(cfd2.id(), &mut conn).await.unwrap();
assert_eq!(cfd1, loaded_1);
assert_eq!(cfd2, loaded_2);
}
#[tokio::test]
async fn test_insert_new_cfd_state_and_load_with_multiple_cfd() {
let mut conn = setup_test_db().await;
let mut cfd_1 = Cfd::dummy().insert(&mut conn).await;
*cfd_1.state_mut() = CfdState::accepted();
append_cfd_state(&cfd_1, &mut conn).await.unwrap();
let cfds_from_db = load_all_cfds(&mut conn).await.unwrap();
assert_eq!(vec![cfd_1.clone()], cfds_from_db);
let mut cfd_2 = Cfd::dummy().insert(&mut conn).await;
let cfds_from_db = load_all_cfds(&mut conn).await.unwrap();
assert_eq!(vec![cfd_1.clone(), cfd_2.clone()], cfds_from_db);
*cfd_2.state_mut() = CfdState::rejected();
append_cfd_state(&cfd_2, &mut conn).await.unwrap();
let timestamp = Timestamp::now();
let cfds_from_db = load_all_cfds(&mut conn).await.unwrap();
assert_eq!(vec![cfd_1, cfd_2], cfds_from_db);
}
let event1 = Event {
timestamp,
id: cfd.id(),
event: CfdEvent::OfferRejected,
};
#[tokio::test]
async fn inserting_two_cfds_with_same_order_id_should_fail() {
let mut conn = setup_test_db().await;
append_event(event1.clone(), &mut conn).await.unwrap();
let (_, events) = load_cfd(cfd.id(), &mut conn).await.unwrap();
assert_eq!(events, vec![event1.clone()]);
let cfd = Cfd::dummy().insert(&mut conn).await;
let event2 = Event {
timestamp,
id: cfd.id(),
event: CfdEvent::RevokeConfirmed,
};
let error = insert_cfd(&cfd, &mut conn).await.err().unwrap();
assert_eq!(
format!("{:#}", error),
format!(
"Failed to insert CFD with id {}: error returned from database: UNIQUE constraint failed: cfds.uuid: UNIQUE constraint failed: cfds.uuid",
cfd.id(),
)
);
append_event(event2.clone(), &mut conn).await.unwrap();
let (_, events) = load_cfd(cfd.id(), &mut conn).await.unwrap();
assert_eq!(events, vec![event1, event2])
}
async fn setup_test_db() -> PoolConnection<Sqlite> {
@ -376,23 +271,17 @@ mod tests {
impl Cfd {
fn dummy() -> Self {
let (pub_key, _) = Seed::default().derive_identity();
let dummy_identity = Identity::new(pub_key);
Cfd::new(
Self::new(
OrderId::default(),
TradingPair::BtcUsd,
Position::Long,
Price::new(dec!(1000)).unwrap(),
Price::new(dec!(60_000)).unwrap(),
Leverage::new(2).unwrap(),
Price::new(dec!(400)).unwrap(),
Timestamp::now(),
SETTLEMENT_INTERVAL,
Duration::hours(24),
Role::Taker,
1,
Usd::new(dec!(1000)),
CfdState::outgoing_order_request(),
dummy_identity,
Usd::new(dec!(1_000)),
"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
.parse()
.unwrap(),
)
}

91
daemon/src/housekeeping.rs

@ -1,91 +0,0 @@
use crate::db::append_cfd_state;
use crate::db::load_all_cfds;
use crate::model::cfd::Cfd;
use crate::model::cfd::CfdState;
use crate::try_continue;
use crate::wallet;
use anyhow::Result;
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use sqlx::SqlitePool;
use xtra::Address;
/// Perform necessary housekeeping before actor system startup
pub async fn new(db: &SqlitePool, wallet: &Address<wallet::Actor>) -> Result<()> {
let mut conn = db.acquire().await?;
transition_non_continue_cfds_to_setup_failed(&mut conn).await?;
rebroadcast_transactions(&mut conn, wallet).await?;
Ok(())
}
async fn transition_non_continue_cfds_to_setup_failed(
conn: &mut PoolConnection<Sqlite>,
) -> Result<()> {
let mut cfds = load_all_cfds(conn).await?;
for cfd in cfds.iter_mut().filter(|cfd| Cfd::is_cleanup(cfd)) {
*cfd.state_mut() = CfdState::setup_failed(format!(
"Was in state {} which cannot be continued.",
cfd.state()
));
append_cfd_state(cfd, conn).await?;
}
Ok(())
}
async fn rebroadcast_transactions(
conn: &mut PoolConnection<Sqlite>,
wallet: &Address<wallet::Actor>,
) -> Result<()> {
let cfds = load_all_cfds(conn).await?;
for dlc in cfds.iter().filter_map(Cfd::pending_open_dlc) {
let txid = try_continue!(wallet
.send(wallet::TryBroadcastTransaction {
tx: dlc.lock.0.clone()
})
.await
.expect("if sending to actor fails here we are screwed anyway"));
tracing::info!("Lock transaction published with txid {}", txid);
}
for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) {
let signed_refund_tx = cfd.refund_tx()?;
let txid = try_continue!(wallet
.send(wallet::TryBroadcastTransaction {
tx: signed_refund_tx
})
.await
.expect("if sending to actor fails here we are screwed anyway"));
tracing::info!("Refund transaction published on chain: {}", txid);
}
for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) {
let signed_commit_tx = cfd.commit_tx()?;
let txid = try_continue!(wallet
.send(wallet::TryBroadcastTransaction {
tx: signed_commit_tx
})
.await
.expect("if sending to actor fails here we are screwed anyway"));
tracing::info!("Commit transaction published on chain: {}", txid);
}
for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_cet(cfd)) {
// Double question mark OK because if we are in PendingCet we must have been Ready before
let signed_cet = cfd.cet()??;
let txid = try_continue!(wallet
.send(wallet::TryBroadcastTransaction { tx: signed_cet })
.await
.expect("if sending to actor fails here we are screwed anyway"));
tracing::info!("CET published on chain: {}", txid);
}
Ok(())
}

6
daemon/src/lib.rs

@ -6,7 +6,6 @@ use crate::maker_cfd::TakerConnected;
use crate::model::cfd::Cfd;
use crate::model::cfd::Order;
use crate::model::cfd::OrderId;
use crate::model::cfd::UpdateCfdProposals;
use crate::model::Identity;
use crate::model::Price;
use crate::model::Usd;
@ -45,7 +44,6 @@ pub mod collab_settlement_taker;
pub mod connection;
pub mod db;
pub mod fan_out;
pub mod housekeeping;
pub mod keypair;
pub mod logger;
pub mod maker_cfd;
@ -70,7 +68,6 @@ pub mod taker_cfd;
pub mod to_sse_event;
pub mod tokio_ext;
pub mod try_continue;
pub mod tx;
pub mod wallet;
pub mod wire;
pub mod xtra_ext;
@ -375,7 +372,7 @@ where
.create(None)
.run();
let (_auto_rollover_address, auto_rollover_fut) = auto_rollover::Actor::new(
let (auto_rollover_address, auto_rollover_fut) = auto_rollover::Actor::new(
db,
oracle_pk,
projection_actor,
@ -386,6 +383,7 @@ where
)
.create(None)
.run();
std::mem::forget(auto_rollover_address); // leak this address to avoid shutdown
tasks.add(cfd_actor_fut);
tasks.add(auto_rollover_fut);

4
daemon/src/maker.rs

@ -10,7 +10,6 @@ use daemon::auth;
use daemon::auth::MAKER_USERNAME;
use daemon::bitmex_price_feed;
use daemon::db;
use daemon::housekeeping;
use daemon::logger;
use daemon::maker_inc_connections;
use daemon::model::cfd::Role;
@ -236,7 +235,6 @@ async fn main() -> Result<()> {
.context("Db migrations failed")?;
// Create actors
housekeeping::new(&db, &wallet).await?;
let (projection_actor, projection_context) = xtra::Context::new(None);
@ -275,7 +273,7 @@ async fn main() -> Result<()> {
tasks.add(task);
let (proj_actor, projection_feeds) =
projection::Actor::new(db.clone(), Role::Maker, bitcoin_network).await?;
projection::Actor::new(db.clone(), Role::Maker, bitcoin_network);
tasks.add(projection_context.run(proj_actor));
maker.listen_on(listener);

383
daemon/src/maker_cfd.rs

@ -1,15 +1,15 @@
use crate::address_map::AddressMap;
use crate::address_map::Stopping;
use crate::cfd_actors::append_cfd_state;
use crate::cfd_actors;
use crate::cfd_actors::insert_cfd_and_update_feed;
use crate::cfd_actors::{self};
use crate::cfd_actors::load_cfd;
use crate::collab_settlement_maker;
use crate::db::load_cfd;
use crate::db::append_event;
use crate::maker_inc_connections;
use crate::model;
use crate::model::cfd::Cfd;
use crate::model::cfd::CfdState;
use crate::model::cfd::CfdStateCommon;
use crate::model::cfd::MakerSettlementCompleted;
use crate::model::cfd::CfdEvent;
use crate::model::cfd::CollaborativeSettlement;
use crate::model::cfd::Order;
use crate::model::cfd::OrderId;
use crate::model::cfd::Origin;
@ -18,8 +18,8 @@ use crate::model::cfd::RolloverProposal;
use crate::model::cfd::SettlementProposal;
use crate::model::cfd::SetupCompleted;
use crate::model::Identity;
use crate::model::Position;
use crate::model::Price;
use crate::model::Timestamp;
use crate::model::Usd;
use crate::monitor;
use crate::monitor::MonitorParams;
@ -34,14 +34,14 @@ use crate::wallet;
use crate::wire;
use crate::wire::TakerToMaker;
use crate::Tasks;
use anyhow::bail;
use anyhow::Context as _;
use anyhow::Result;
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use std::collections::HashSet;
use time::Duration;
use time::OffsetDateTime;
use xtra::prelude::*;
use xtra::Actor as _;
use xtra_productivity::xtra_productivity;
@ -152,54 +152,6 @@ impl<O, M, T, W> Actor<O, M, T, W> {
.await?;
Ok(())
}
async fn append_cfd_state_setup_failed(
&mut self,
order_id: OrderId,
error: anyhow::Error,
) -> Result<()> {
tracing::error!(%order_id, "Contract setup failed: {:#?}", error);
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
*cfd.state_mut() = CfdState::setup_failed(error.to_string());
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
Ok(())
}
async fn append_cfd_state_rejected(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
*cfd.state_mut() = CfdState::rejected();
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
Ok(())
}
}
impl<O, M, T, W> Actor<O, M, T, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event(event, &mut conn, &self.wallet, &self.projection_actor)
.await?;
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_oracle_attestation(
attestation,
&mut conn,
&self.wallet,
&self.projection_actor,
)
.await?;
Ok(())
}
}
impl<O, M, T, W> Actor<O, M, T, W>
@ -228,26 +180,6 @@ where
self.update_connected_takers().await?;
Ok(())
}
async fn reject_order(
&mut self,
taker_id: Identity,
mut cfd: Cfd,
mut conn: PoolConnection<Sqlite>,
) -> Result<()> {
*cfd.state_mut() = CfdState::rejected();
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
self.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
msg: wire::MakerToTaker::RejectOrder(cfd.id()),
})
.await??;
Ok(())
}
}
#[xtra_productivity]
@ -302,15 +234,10 @@ where
proposal.order_id
);
// check if CFD is in open state, otherwise we should not proceed
let mut conn = self.db.acquire().await?;
let cfd = load_cfd(proposal.order_id, &mut conn).await?;
match cfd.state() {
CfdState::Open { .. } => (),
_ => {
anyhow::bail!("Order is in invalid state. Cannot propose roll over.")
}
};
cfd.is_rollover_possible(OffsetDateTime::now_utc())?;
let this = ctx.address().expect("acquired own address");
@ -406,6 +333,14 @@ where
}
};
let cfd = Cfd::from_order(
current_order.clone(),
Position::Short,
quantity,
taker_id,
Role::Maker,
);
// 2. Remove current order
// The order is removed before we update the state, because the maker might react on the
// state change. Once we know that we go for either an accept/reject scenario we
@ -418,19 +353,6 @@ where
self.projection_actor.send(projection::Update(None)).await?;
// 3. Insert CFD in DB
let cfd = Cfd::from_order(
current_order.clone(),
quantity,
CfdState::IncomingOrderRequest {
common: CfdStateCommon {
transition_timestamp: Timestamp::now(),
},
taker_id,
},
taker_id,
Role::Maker,
);
insert_cfd_and_update_feed(&cfd, &mut conn, &self.projection_actor).await?;
// 4. Try to get the oracle announcement, if that fails we should exit prior to changing any
@ -472,59 +394,23 @@ impl<O, M, T, W> Actor<O, M, T, W> {
tracing::debug!(%order_id, "Maker accepts order");
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
self.setup_actors
.send(&order_id, setup_maker::Accepted)
.await
.with_context(|| format!("No active contract setup for order {}", order_id))?;
*cfd.state_mut() = CfdState::contract_setup();
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
Ok(())
}
}
#[xtra_productivity(message_impl = false)]
impl<O, M, T, W> Actor<O, M, T, W> {
async fn handle_setup_actor_stopping(&mut self, message: Stopping<setup_maker::Actor>) {
self.setup_actors.gc(message);
}
}
#[xtra_productivity(message_impl = false)]
impl<O, M, T, W> Actor<O, M, T, W> {
async fn handle_settlement_actor_stopping(
&mut self,
message: Stopping<collab_settlement_maker::Actor>,
) {
self.settlement_actors.gc(message);
}
}
#[xtra_productivity]
impl<O, M, T, W> Actor<O, M, T, W>
where
T: xtra::Handler<maker_inc_connections::TakerMessage>,
{
async fn handle_reject_order(&mut self, msg: RejectOrder) -> Result<()> {
let RejectOrder { order_id } = msg;
tracing::debug!(%order_id, "Maker rejects order");
let mut conn = self.db.acquire().await?;
let cfd = load_cfd(order_id, &mut conn).await?;
let taker_id = match cfd.state() {
CfdState::IncomingOrderRequest { taker_id, .. } => taker_id,
_ => {
anyhow::bail!("Order is in invalid state. Ignoring trying to reject it.")
}
};
self.reject_order(*taker_id, cfd, conn).await?;
self.setup_actors
.send(&order_id, setup_maker::Rejected)
.await
.with_context(|| format!("No active contract setup for order {}", order_id))?;
Ok(())
}
@ -552,49 +438,83 @@ where
}
}
#[xtra_productivity]
#[xtra_productivity(message_impl = false)]
impl<O, M, T, W> Actor<O, M, T, W> {
async fn handle_setup_actor_stopping(&mut self, message: Stopping<setup_maker::Actor>) {
self.setup_actors.gc(message);
}
async fn handle_settlement_actor_stopping(
&mut self,
message: Stopping<collab_settlement_maker::Actor>,
) {
self.settlement_actors.gc(message);
}
}
#[xtra_productivity(message_impl = false)]
impl<O, M, T, W> Actor<O, M, T, W>
where
M: xtra::Handler<monitor::CollaborativeSettlement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_settlement_completed(&mut self, msg: MakerSettlementCompleted) -> Result<()> {
let (order_id, settlement, script_pubkey) = match msg {
MakerSettlementCompleted::Succeeded {
order_id,
payload: (settlement, script_pubkey),
} => (order_id, settlement, script_pubkey),
MakerSettlementCompleted::Rejected { .. } => {
return Ok(());
}
MakerSettlementCompleted::Failed { order_id, error } => {
tracing::warn!(%order_id, "Collaborative settlement failed: {:#}", error);
return Ok(());
}
};
async fn handle_settlement_completed(
&mut self,
msg: model::cfd::Completed<CollaborativeSettlement>,
) -> Result<()> {
let order_id = msg.order_id();
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
let cfd = load_cfd(order_id, &mut conn).await?;
let tx = settlement.tx.clone();
cfd.handle_proposal_signed(settlement)
.context("Failed to update state with collaborative settlement")?;
let event = cfd.settle_collaboratively(msg)?;
append_event(event.clone(), &mut conn).await?;
self.projection_actor.send(projection::CfdsChanged).await?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
match event.event {
CfdEvent::CollaborativeSettlementCompleted {
spend_tx, script, ..
} => {
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction { tx: spend_tx })
.await?
.context("Broadcasting close transaction")?;
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction { tx })
.await?
.context("Broadcasting close transaction")?;
tracing::info!(%order_id, "Close transaction published with txid {}", txid);
tracing::info!(%order_id, "Close transaction published with txid {}", txid);
self.monitor_actor
.send(monitor::CollaborativeSettlement {
order_id,
tx: (txid, script_pubkey),
})
.await?;
self.monitor_actor
.send(monitor::CollaborativeSettlement {
order_id,
tx: (txid, script),
})
.await?;
}
CfdEvent::CollaborativeSettlementRejected { commit_tx } => {
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction { tx: commit_tx })
.await?
.context("Broadcasting commit transaction")?;
tracing::info!(
"Closing non-collaboratively. Commit tx published with txid {}",
txid
)
}
CfdEvent::CollaborativeSettlementFailed { commit_tx } => {
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction { tx: commit_tx })
.await?
.context("Broadcasting commit transaction")?;
tracing::warn!(
"Closing non-collaboratively. Commit tx published with txid {}",
txid
)
}
_ => bail!("Unexpected event {:?}", event.event),
}
Ok(())
}
@ -621,33 +541,8 @@ where
M: xtra::Handler<monitor::StartMonitoring>,
O: xtra::Handler<oracle::MonitorAttestation>,
{
async fn handle_roll_over_completed(&mut self, msg: Completed) -> Result<()> {
// We handle rollover success in the maker_cfd::Actor instead of the rollover_maker::Actor
// because we do not have access to the DB in the rollover_maker::Actor
let Completed { order_id, dlc } = msg;
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
*cfd.state_mut() = CfdState::Open {
common: CfdStateCommon::default(),
dlc: dlc.clone(),
attestation: None,
collaborative_close: None,
};
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
self.monitor_actor
.send(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()),
})
.await?;
self.oracle_actor
.send(oracle::MonitorAttestation {
event_id: dlc.settlement_event_id,
})
.await?;
async fn handle_roll_over_completed(&mut self, _: Completed) -> Result<()> {
// TODO: Implement this in terms of event sourcing
Ok(())
}
@ -752,31 +647,24 @@ where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_setup_completed(&mut self, msg: SetupCompleted) -> Result<()> {
let (order_id, dlc) = match msg {
SetupCompleted::Succeeded {
order_id,
payload: (dlc, _),
} => (order_id, dlc),
SetupCompleted::Failed { order_id, error } => {
self.append_cfd_state_setup_failed(order_id, error).await?;
return anyhow::Ok(());
}
SetupCompleted::Rejected { order_id, .. } => {
self.append_cfd_state_rejected(order_id).await?;
return anyhow::Ok(());
}
};
let order_id = msg.order_id();
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
*cfd.state_mut() = CfdState::PendingOpen {
common: CfdStateCommon::default(),
dlc: dlc.clone(),
attestation: None,
let cfd = load_cfd(order_id, &mut conn).await?;
let event = cfd.setup_contract(msg)?;
append_event(event.clone(), &mut conn).await?;
self.projection_actor.send(projection::CfdsChanged).await?;
let dlc = match event.event {
CfdEvent::ContractSetupCompleted { dlc } => dlc,
CfdEvent::OfferRejected | CfdEvent::ContractSetupFailed => {
return Ok(());
}
_ => bail!("Unexpected event {:?}", event.event),
};
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
tracing::info!("Setup complete, publishing on chain now");
let txid = self
.wallet
@ -790,7 +678,7 @@ where
self.monitor_actor
.send(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()),
params: MonitorParams::new(dlc.clone()),
})
.await?;
@ -836,13 +724,31 @@ where
}
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<monitor::Event> for Actor<O, M, T, W>
#[xtra_productivity(message_impl = false)]
impl<O, M, T, W> Actor<O, M, T, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context<Self>) -> Result<()> {
self.handle_monitoring_event(msg).await
async fn handle_monitor(&mut self, msg: monitor::Event) {
if let Err(e) =
cfd_actors::handle_monitoring_event(msg, &self.db, &self.wallet, &self.projection_actor)
.await
{
tracing::error!("Unable to handle monotoring event: {:#}", e)
}
}
async fn handle_attestation(&mut self, msg: oracle::Attestation) {
if let Err(e) = cfd_actors::handle_oracle_attestation(
msg,
&self.db,
&self.wallet,
&self.projection_actor,
)
.await
{
tracing::warn!("Failed to handle oracle attestation: {:#}", e)
}
}
}
@ -863,15 +769,15 @@ where
+ xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(
&mut self,
FromTaker { taker_id, msg }: FromTaker,
ctx: &mut Context<Self>,
) -> Result<()> {
async fn handle(&mut self, FromTaker { taker_id, msg }: FromTaker, ctx: &mut Context<Self>) {
match msg {
wire::TakerToMaker::TakeOrder { order_id, quantity } => {
self.handle_take_order(taker_id, order_id, quantity, ctx)
.await?
if let Err(e) = self
.handle_take_order(taker_id, order_id, quantity, ctx)
.await
{
tracing::error!("Error when handling order take request: {:#}", e)
}
}
wire::TakerToMaker::Settlement {
order_id,
@ -933,21 +839,6 @@ where
TakerToMaker::Hello(_) => {
unreachable!("The Hello message is not sent to the cfd actor")
}
};
Ok(())
}
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<oracle::Attestation>
for Actor<O, M, T, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context<Self>) {
if let Err(e) = self.handle_oracle_attestation(msg).await {
tracing::warn!("Failed to handle oracle attestation: {:#}", e)
}
}
}
@ -965,7 +856,7 @@ impl Message for Completed {
}
impl Message for FromTaker {
type Result = Result<()>;
type Result = ();
}
impl<O: 'static, M: 'static, T: 'static, W: 'static> xtra::Actor for Actor<O, M, T, W> {}

6
daemon/src/maker_inc_connections.rs

@ -230,7 +230,7 @@ impl Actor {
let this = ctx.address().expect("self to be alive");
let read_fut = async move {
while let Ok(Some(msg)) = read.try_next().await {
let res = this.send(FromTaker { taker_id, msg }).log_failure("").await;
let res = this.send(FromTaker { taker_id, msg }).await;
if res.is_err() {
break;
@ -381,7 +381,7 @@ impl Actor {
#[xtra_productivity(message_impl = false)]
impl Actor {
async fn handle_msg_from_taker(&mut self, msg: FromTaker) -> Result<()> {
async fn handle_msg_from_taker(&mut self, msg: FromTaker) {
let msg_str = msg.msg.to_string();
tracing::trace!(target = "wire", taker_id = %msg.taker_id, "Received {}", msg_str);
@ -423,8 +423,6 @@ impl Actor {
let _ = self.taker_msg_channel.send(msg);
}
}
Ok(())
}
async fn handle_setup_actor_stopping(&mut self, message: Stopping<setup_maker::Actor>) {

11
daemon/src/model.rs

@ -6,6 +6,7 @@ use bdk::bitcoin::Address;
use bdk::bitcoin::Amount;
use bdk::bitcoin::Denomination;
use chrono::DateTime;
use derive_more::Display;
use reqwest::Url;
use rust_decimal::prelude::ToPrimitive;
use rust_decimal::Decimal;
@ -164,6 +165,12 @@ impl Leverage {
}
}
impl fmt::Display for Leverage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "x{}", self.0)
}
}
// add impl's to do algebra with Usd, Leverage, and ExhangeRate as required
impl Mul<Leverage> for Usd {
type Output = Usd;
@ -395,7 +402,7 @@ pub enum TradingPair {
BtcUsd,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, sqlx::Type)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, sqlx::Type, Display)]
pub enum Position {
Long,
Short,
@ -706,7 +713,7 @@ mod tests {
}
#[test]
fn roundtrip_taker_id_serde() {
fn roundtrip_identity_serde() {
let id = Identity::new(x25519_dalek::PublicKey::from([42u8; 32]));
serde_test::assert_tokens(

2015
daemon/src/model/cfd.rs

File diff suppressed because it is too large

252
daemon/src/monitor.rs

@ -1,10 +1,10 @@
use crate::db;
use crate::model;
use crate::model::cfd::CetStatus;
use crate::model::cfd::Cfd;
use crate::model::cfd::CfdState;
use crate::model::cfd;
use crate::model::cfd::CfdEvent;
use crate::model::cfd::Dlc;
use crate::model::cfd::OrderId;
use crate::model::cfd::CET_TIMELOCK;
use crate::model::BitMexPriceEventId;
use crate::oracle;
use crate::oracle::Attestation;
@ -47,6 +47,8 @@ pub struct CollaborativeSettlement {
pub tx: (Txid, Script),
}
// TODO: The design of this struct causes a lot of marshalling und unmarshelling that is quite
// unnecessary. Should be taken apart so we can handle all cases individually!
#[derive(Clone)]
pub struct MonitorParams {
lock: (Txid, Descriptor<PublicKey>),
@ -59,6 +61,8 @@ pub struct MonitorParams {
pub struct Sync;
// TODO: Send messages to the projection actor upon finality events so we send out updates.
// -> Might as well just send out all events independent of sending to the cfd actor.
pub struct Actor<C = bdk::electrum_client::Client> {
cfds: HashMap<OrderId, MonitorParams>,
event_channel: Box<dyn StrongMessageChannel<Event>>,
@ -69,14 +73,109 @@ pub struct Actor<C = bdk::electrum_client::Client> {
tasks: Tasks,
}
/// Read-model of the CFD for the monitoring actor.
#[derive(Default)]
struct Cfd {
params: Option<MonitorParams>,
monitor_lock_finality: bool,
monitor_commit_finality: bool,
monitor_cet_timelock: bool,
monitor_refund_timelock: bool,
monitor_refund_finality: bool,
monitor_revoked_commit_transactions: bool,
// Ideally, all of the above would be like this.
monitor_collaborative_settlement_finality: Option<(Txid, Script)>,
}
impl Cfd {
// TODO: Ideally, we would only set the specific monitoring events to `true` that occur _next_,
// like lock_finality after contract-setup. However, this would require that
// - either the monitoring actor is smart enough to know that it needs to monitor for
// commit-finality after lock-finality
// - or some other actor tells it to do that
//
// At the moment, neither of those two is the case which is why we set everything to true that
// might become relevant. See also https://github.com/itchysats/itchysats/issues/605 and https://github.com/itchysats/itchysats/issues/236.
fn apply(self, event: cfd::Event) -> Self {
match event.event {
CfdEvent::ContractSetupCompleted { dlc } => Self {
params: Some(MonitorParams::new(dlc)),
monitor_lock_finality: true,
monitor_commit_finality: true,
monitor_cet_timelock: true,
monitor_refund_timelock: true,
monitor_refund_finality: true,
monitor_revoked_commit_transactions: false,
monitor_collaborative_settlement_finality: None,
},
CfdEvent::RolloverCompleted { dlc } => {
Self {
params: Some(MonitorParams::new(dlc)),
monitor_lock_finality: false, // Lock is already final after rollover.
monitor_commit_finality: true,
monitor_cet_timelock: true,
monitor_refund_timelock: true,
monitor_refund_finality: true,
monitor_revoked_commit_transactions: true, /* After rollover, the other party
* might publish old states. */
monitor_collaborative_settlement_finality: None,
}
}
CfdEvent::CollaborativeSettlementCompleted {
spend_tx, script, ..
} => {
Self {
monitor_lock_finality: false, // Lock is already final if we collab settle.
monitor_commit_finality: true, // The other party might still want to race us.
monitor_collaborative_settlement_finality: Some((spend_tx.txid(), script)),
..self
}
}
CfdEvent::ContractSetupFailed
| CfdEvent::OfferRejected
| CfdEvent::RolloverRejected => {
Self::default() // all false / empty
}
CfdEvent::LockConfirmed => Self {
monitor_lock_finality: false,
..self
},
CfdEvent::CommitConfirmed => Self {
monitor_commit_finality: false,
..self
},
// final states, don't monitor anything
CfdEvent::CetConfirmed
| CfdEvent::RefundConfirmed
| CfdEvent::CollaborativeSettlementConfirmed => Self::default(),
CfdEvent::CetTimelockConfirmedPriorOracleAttestation
| CfdEvent::CetTimelockConfirmedPostOracleAttestation { .. } => Self {
monitor_cet_timelock: false,
..self
},
CfdEvent::RefundTimelockConfirmed { .. } => Self {
monitor_refund_timelock: false,
..self
},
CfdEvent::RolloverFailed
| CfdEvent::ManualCommit { .. }
| CfdEvent::OracleAttestedPostCetTimelock { .. }
| CfdEvent::OracleAttestedPriorCetTimelock { .. }
| CfdEvent::CollaborativeSettlementRejected { .. }
| CfdEvent::CollaborativeSettlementFailed { .. } => self,
CfdEvent::RevokeConfirmed => todo!("Deal with revoked"),
}
}
}
impl Actor<bdk::electrum_client::Client> {
pub async fn new(
db: SqlitePool,
electrum_rpc_url: String,
event_channel: Box<dyn StrongMessageChannel<Event>>,
) -> Result<Self> {
let cfds = db::load_all_cfds(&mut db.acquire().await?).await?;
let client = bdk::electrum_client::Client::new(&electrum_rpc_url)
.context("Failed to initialize Electrum RPC client")?;
@ -96,84 +195,55 @@ impl Actor<bdk::electrum_client::Client> {
tasks: Tasks::default(),
};
for cfd in cfds {
match cfd.state().clone() {
// In PendingOpen we know the complete dlc setup and assume that the lock transaction will be published
CfdState::PendingOpen { dlc, .. } => {
let params = MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.id(), params.clone());
actor.monitor_all(&params, cfd.id());
}
CfdState::Open { dlc, .. } | CfdState::PendingCommit { dlc, .. } => {
let params = MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.id(), params.clone());
actor.monitor_commit_finality(&params, cfd.id());
actor.monitor_commit_cet_timelock(&params, cfd.id());
actor.monitor_commit_refund_timelock(&params, cfd.id());
actor.monitor_refund_finality(&params,cfd.id());
if let Some(model::cfd::CollaborativeSettlement { tx, ..}
) = cfd.state().get_collaborative_close() {
let close_params = (tx.txid(),
tx.output.first().context("transaction has zero outputs")?.script_pubkey.clone());
actor.monitor_close_finality(close_params,cfd.id());
}
}
CfdState::OpenCommitted { dlc, cet_status, .. } => {
let params = MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.id(), params.clone());
match cet_status {
CetStatus::Unprepared => {
actor.monitor_commit_cet_timelock(&params, cfd.id());
actor.monitor_commit_refund_timelock(&params, cfd.id());
actor.monitor_refund_finality(&params,cfd.id());
}
CetStatus::OracleSigned(attestation) => {
actor.monitor_cet_finality(map_cets(dlc.cets), attestation.into(), cfd.id())?;
actor.monitor_commit_cet_timelock(&params, cfd.id());
actor.monitor_commit_refund_timelock(&params, cfd.id());
actor.monitor_refund_finality(&params,cfd.id());
}
CetStatus::TimelockExpired => {
actor.monitor_commit_refund_timelock(&params, cfd.id());
actor.monitor_refund_finality(&params,cfd.id());
}
CetStatus::Ready(attestation) => {
actor.monitor_cet_finality(map_cets(dlc.cets), attestation.into(), cfd.id())?;
actor.monitor_commit_refund_timelock(&params, cfd.id());
actor.monitor_refund_finality(&params,cfd.id());
}
}
}
CfdState::PendingCet { dlc, attestation, .. } => {
let params = MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.id(), params.clone());
let mut conn = db.acquire().await?;
actor.monitor_cet_finality(map_cets(dlc.cets), attestation.into(), cfd.id())?;
actor.monitor_commit_refund_timelock(&params, cfd.id());
actor.monitor_refund_finality(&params,cfd.id());
}
CfdState::PendingRefund { dlc, .. } => {
let params = MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.id(), params.clone());
for id in db::load_all_cfd_ids(&mut conn).await? {
let (_, events) = db::load_cfd(id, &mut conn).await?;
actor.monitor_commit_refund_timelock(&params, cfd.id());
actor.monitor_refund_finality(&params,cfd.id());
}
let Cfd {
params,
monitor_lock_finality,
monitor_commit_finality,
monitor_cet_timelock,
monitor_refund_timelock,
monitor_refund_finality,
monitor_revoked_commit_transactions,
monitor_collaborative_settlement_finality,
} = events.into_iter().fold(Cfd::default(), Cfd::apply);
let params = match params {
None => continue,
Some(params) => params,
};
actor.cfds.insert(id, params.clone());
// too early to monitor
CfdState::OutgoingOrderRequest { .. }
| CfdState::IncomingOrderRequest { .. }
| CfdState::Accepted { .. }
| CfdState::ContractSetup { .. }
// final states
| CfdState::Closed { .. }
| CfdState::Rejected { .. }
| CfdState::Refunded { .. }
| CfdState::SetupFailed { .. } => ()
if monitor_lock_finality {
actor.monitor_lock_finality(&params, id);
}
if monitor_commit_finality {
actor.monitor_commit_finality(&params, id)
}
if monitor_cet_timelock {
actor.monitor_commit_cet_timelock(&params, id);
}
if monitor_refund_timelock {
actor.monitor_commit_refund_timelock(&params, id);
}
if monitor_refund_finality {
actor.monitor_refund_finality(&params, id);
}
if monitor_revoked_commit_transactions {
actor.monitor_revoked_commit_transactions(&params, id);
}
if let Some(params) = monitor_collaborative_settlement_finality {
actor.monitor_close_finality(params, id);
}
}
@ -220,7 +290,7 @@ where
.entry((params.commit.0, params.commit.1.script_pubkey()))
.or_default()
.push((
ScriptStatus::with_confirmations(Cfd::CET_TIMELOCK),
ScriptStatus::with_confirmations(CET_TIMELOCK),
Event::CetTimelockExpired(order_id),
));
}
@ -438,7 +508,7 @@ where
for (target_status, event) in reached_monitoring_target {
tracing::info!(%txid, target = %target_status, current = %status, "Bitcoin transaction reached monitoring target");
self.event_channel.send(event).await??;
self.event_channel.send(event).await?;
}
}
}
@ -581,17 +651,13 @@ impl Event {
}
impl MonitorParams {
pub fn new(dlc: Dlc, refund_timelock_in_blocks: u32) -> Self {
pub fn new(dlc: Dlc) -> Self {
let script_pubkey = dlc.maker_address.script_pubkey();
MonitorParams {
lock: (dlc.lock.0.txid(), dlc.lock.1),
commit: (dlc.commit.0.txid(), dlc.commit.2),
cets: map_cets(dlc.cets),
refund: (
dlc.refund.0.txid(),
script_pubkey,
refund_timelock_in_blocks,
),
refund: (dlc.refund.0.txid(), script_pubkey, dlc.refund_timelock),
revoked_commits: dlc
.revoked_commit
.iter()
@ -633,7 +699,7 @@ fn map_cets(
}
impl xtra::Message for Event {
type Result = Result<()>;
type Result = ();
}
impl xtra::Message for Sync {
@ -681,6 +747,7 @@ where
);
}
}
#[async_trait]
impl<C> xtra::Handler<Sync> for Actor<C>
where
@ -703,6 +770,7 @@ impl xtra::Handler<oracle::Attestation> for Actor {
#[cfg(test)]
mod tests {
use super::*;
use crate::model::cfd::CET_TIMELOCK;
use bdk::bitcoin::blockdata::block;
use bdk::electrum_client::Batch;
use bdk::electrum_client::Error;
@ -736,7 +804,7 @@ mod tests {
vec![
(ScriptStatus::finality(), commit_finality.clone()),
(
ScriptStatus::with_confirmations(Cfd::CET_TIMELOCK),
ScriptStatus::with_confirmations(CET_TIMELOCK),
refund_expired.clone(),
),
],
@ -873,10 +941,8 @@ mod tests {
#[async_trait]
impl xtra::Handler<Event> for MessageRecordingActor {
async fn handle(&mut self, message: Event, _ctx: &mut xtra::Context<Self>) -> Result<()> {
async fn handle(&mut self, message: Event, _ctx: &mut xtra::Context<Self>) {
self.events.push(message);
Ok(())
}
}

61
daemon/src/oracle.rs

@ -1,5 +1,6 @@
use crate::db;
use crate::model::cfd::CfdState;
use crate::model::cfd::CfdEvent;
use crate::model::cfd::Event;
use crate::model::BitMexPriceEventId;
use crate::tokio_ext;
use crate::try_continue;
@ -68,39 +69,49 @@ struct NewAttestationFetched {
attestation: Attestation,
}
#[derive(Default)]
struct Cfd {
pending_attestation: Option<BitMexPriceEventId>,
}
impl Cfd {
fn apply(self, event: Event) -> Self {
let settlement_event_id = match event.event {
CfdEvent::ContractSetupCompleted { dlc } => dlc.settlement_event_id,
CfdEvent::RolloverCompleted { dlc } => dlc.settlement_event_id,
// TODO: There might be a few cases where we do not need to monitor the attestation,
// e.g. when we already agreed to collab. settle. Ignoring it for now
// because I don't want to think about it and it doesn't cause much harm to do the
// monitoring :)
_ => return self,
};
// we can comfortably overwrite what was there because events are processed in order, thus
// old attestations don't matter.
Self {
pending_attestation: Some(settlement_event_id),
}
}
}
impl Actor {
pub async fn new(
db: SqlitePool,
attestation_channel: Box<dyn StrongMessageChannel<Attestation>>,
announcement_lookahead: Duration,
) -> Result<Self> {
let cfds = db::load_all_cfds(&mut db.acquire().await?).await?;
let mut pending_attestations = HashSet::new();
for cfd in cfds {
match cfd.state().clone() {
CfdState::PendingOpen { dlc, ..}
| CfdState::Open { dlc, .. }
| CfdState::PendingCommit { dlc, .. }
| CfdState::OpenCommitted { dlc, .. }
| CfdState::PendingCet { dlc, .. } =>
{
pending_attestations.insert(dlc.settlement_event_id);
}
let mut conn = db.acquire().await?;
for id in db::load_all_cfd_ids(&mut conn).await? {
let (_, events) = db::load_cfd(id, &mut conn).await?;
let cfd = events
.into_iter()
.fold(Cfd::default(), |cfd, event| cfd.apply(event));
// Irrelevant for restart
CfdState::OutgoingOrderRequest { .. }
| CfdState::IncomingOrderRequest { .. }
| CfdState::Accepted { .. }
| CfdState::Rejected { .. }
| CfdState::ContractSetup { .. }
// Final states
| CfdState::Closed { .. }
| CfdState::PendingRefund { .. }
| CfdState::Refunded { .. }
| CfdState::SetupFailed { .. } => ()
if let Some(pending_attestation) = cfd.pending_attestation {
pending_attestations.insert(pending_attestation);
}
}

1015
daemon/src/projection.rs

File diff suppressed because it is too large

87
daemon/src/rollover_maker.rs

@ -6,16 +6,13 @@ use crate::model::cfd::OrderId;
use crate::model::cfd::Role;
use crate::model::cfd::RolloverProposal;
use crate::model::cfd::SettlementKind;
use crate::model::cfd::UpdateCfdProposal;
use crate::model::Identity;
use crate::oracle;
use crate::oracle::GetAnnouncement;
use crate::projection;
use crate::projection::try_into_update_rollover_proposal;
use crate::projection::UpdateRollOverProposal;
use crate::schnorrsig;
use crate::setup_contract;
use crate::setup_contract::RolloverParams;
use crate::tokio_ext::spawn_fallible;
use crate::wire;
use crate::wire::MakerToTaker;
@ -89,18 +86,13 @@ impl xtra::Actor for Actor {
}
async fn started(&mut self, _ctx: &mut Context<Self>) {
let new_proposal = UpdateCfdProposal::RollOverProposal {
proposal: self.proposal.clone(),
direction: SettlementKind::Incoming,
};
let _ = self
.update_proposal(Some((self.proposal.clone(), SettlementKind::Incoming)))
.await;
}
self.projection_actor
.send(
try_into_update_rollover_proposal(new_proposal)
.expect("update cfd proposal is rollover proposal"),
)
.await
.expect("projection actor is running");
async fn stopped(self) {
let _ = self.update_proposal(None).await;
}
}
@ -136,7 +128,7 @@ impl Actor {
}
}
async fn update_contract(&mut self, dlc: Dlc, ctx: &mut xtra::Context<Self>) -> Result<()> {
async fn complete(&mut self, dlc: Dlc, ctx: &mut xtra::Context<Self>) -> Result<()> {
let msg = Completed {
order_id: self.cfd.id(),
dlc,
@ -145,41 +137,50 @@ impl Actor {
.send(msg)
.log_failure("Failed to report rollover completion")
.await?;
ctx.stop();
Ok(())
}
async fn fail(&mut self, ctx: &mut xtra::Context<Self>, error: anyhow::Error) {
tracing::info!(id = %self.cfd.id(), %error, "Rollover failed");
if let Err(err) = self
.projection_actor
.send(projection::UpdateRollOverProposal {
async fn update_proposal(
&self,
proposal: Option<(RolloverProposal, SettlementKind)>,
) -> Result<()> {
self.projection_actor
.send(UpdateRollOverProposal {
order: self.cfd.id(),
proposal: None,
proposal,
})
.await
{
tracing::error!(%err, "projection actor unreachable when attempting to fail rollover");
}
.await?;
Ok(())
}
async fn fail(&mut self, ctx: &mut xtra::Context<Self>, error: anyhow::Error) {
tracing::info!(id = %self.cfd.id(), %error, "Rollover failed");
ctx.stop();
}
async fn accept(&mut self, ctx: &mut xtra::Context<Self>) -> Result<()> {
let order_id = self.cfd.id();
if self.sent_from_taker.is_some() {
tracing::warn!(%order_id, "Rollover already active");
return Ok(());
}
let (sender, receiver) = mpsc::unbounded();
self.sent_from_taker = Some(sender);
tracing::debug!(%order_id, "Maker accepts a roll_over proposal" );
let cfd = self.cfd.clone();
let (rollover_params, dlc, interval) = self.cfd.start_rollover()?;
let dlc = cfd.open_dlc().expect("CFD was in wrong state");
let oracle_event_id = oracle::next_announcement_after(
time::OffsetDateTime::now_utc() + cfd.settlement_interval(),
)?;
let oracle_event_id =
oracle::next_announcement_after(time::OffsetDateTime::now_utc() + interval)?;
let taker_id = self.taker_id;
@ -193,12 +194,7 @@ impl Actor {
})
.await??;
self.projection_actor
.send(UpdateRollOverProposal {
order: order_id,
proposal: None,
})
.await?;
let _ = self.update_proposal(None).await;
let announcement = self
.oracle_actor
@ -215,13 +211,7 @@ impl Actor {
}),
receiver,
(self.oracle_pk, announcement),
RolloverParams::new(
cfd.price(),
cfd.quantity_usd(),
cfd.leverage(),
cfd.refund_timelock_in_blocks(),
cfd.fee_rate(),
),
rollover_params,
Role::Maker,
dlc,
self.n_payouts,
@ -250,12 +240,7 @@ impl Actor {
msg: MakerToTaker::RejectRollOver(self.cfd.id()),
})
.await??;
self.projection_actor
.send(UpdateRollOverProposal {
order: self.cfd.id(),
proposal: None,
})
.await?;
ctx.stop();
Ok(())
@ -308,7 +293,7 @@ impl Actor {
msg: RolloverSucceeded,
ctx: &mut xtra::Context<Self>,
) {
if let Err(err) = self.update_contract(msg.dlc.clone(), ctx).await {
if let Err(err) = self.complete(msg.dlc.clone(), ctx).await {
self.fail(ctx, err).await;
}
}

37
daemon/src/rollover_taker.rs

@ -15,7 +15,6 @@ use crate::oracle::GetAnnouncement;
use crate::projection;
use crate::projection::UpdateRollOverProposal;
use crate::setup_contract;
use crate::setup_contract::RolloverParams;
use crate::tokio_ext::spawn_fallible;
use crate::wire;
use crate::wire::RollOverMsg;
@ -117,6 +116,7 @@ impl Actor {
self.update_proposal(None).await?;
let (rollover_params, dlc, _) = self.cfd.start_rollover()?;
let (sender, receiver) = mpsc::unbounded::<RollOverMsg>();
// store the writing end to forward messages from the maker to
// the spawned rollover task
@ -128,15 +128,9 @@ impl Actor {
}),
receiver,
(self.oracle_pk, announcement),
RolloverParams::new(
self.cfd.price(),
self.cfd.quantity_usd(),
self.cfd.leverage(),
self.cfd.refund_timelock_in_blocks(),
self.cfd.fee_rate(),
),
rollover_params,
Role::Taker,
self.cfd.dlc().context("No DLC in CFD")?,
dlc,
self.n_payouts,
);
@ -153,15 +147,6 @@ impl Actor {
Ok(())
}
async fn handle_rejected(&self) -> Result<()> {
let order_id = self.cfd.id();
tracing::info!(%order_id, "Rollover proposal got rejected");
self.update_proposal(None).await?;
Ok(())
}
pub async fn forward_protocol_msg(&mut self, msg: wire::RollOverMsg) -> Result<()> {
let sender = self
.rollover_msg_sender
@ -196,7 +181,7 @@ impl Actor {
#[async_trait]
impl xtra::Actor for Actor {
async fn started(&mut self, ctx: &mut xtra::Context<Self>) {
if let Err(e) = self.cfd.can_roll_over(OffsetDateTime::now_utc()) {
if let Err(e) = self.cfd.is_rollover_possible(OffsetDateTime::now_utc()) {
self.complete(
match e {
CannotRollover::NoDlc => RolloverCompleted::Failed {
@ -262,6 +247,10 @@ impl xtra::Actor for Actor {
xtra::KeepRunning::StopAll
}
async fn stopped(self) {
let _ = self.update_proposal(None).await;
}
}
#[xtra_productivity]
@ -285,13 +274,11 @@ impl Actor {
pub async fn reject_rollover(&mut self, _: RollOverRejected, ctx: &mut xtra::Context<Self>) {
let order_id = self.cfd.id();
let completed = if let Err(error) = self.handle_rejected().await {
RolloverCompleted::Failed { order_id, error }
} else {
RolloverCompleted::rejected(order_id)
};
self.complete(completed, ctx).await;
tracing::info!(%order_id, "Rollover proposal got rejected");
self.complete(RolloverCompleted::rejected(order_id), ctx)
.await;
}
pub async fn handle_rollover_succeeded(

2
daemon/src/routes_maker.rs

@ -3,6 +3,7 @@ use bdk::bitcoin::Network;
use daemon::auth::Authenticated;
use daemon::maker_inc_connections;
use daemon::model::cfd::OrderId;
use daemon::model::Identity;
use daemon::model::Price;
use daemon::model::Usd;
use daemon::model::WalletInfo;
@ -11,7 +12,6 @@ use daemon::oracle;
use daemon::projection::Cfd;
use daemon::projection::CfdAction;
use daemon::projection::Feeds;
use daemon::projection::Identity;
use daemon::routes::EmbeddedFileExt;
use daemon::to_sse_event::ToSseEvent;
use daemon::wallet;

4
daemon/src/routes_taker.rs

@ -10,11 +10,11 @@ use daemon::model::Usd;
use daemon::model::WalletInfo;
use daemon::monitor;
use daemon::oracle;
use daemon::projection;
use daemon::projection::CfdAction;
use daemon::projection::Feeds;
use daemon::routes::EmbeddedFileExt;
use daemon::to_sse_event::ToSseEvent;
use daemon::tx;
use daemon::wallet;
use daemon::TakerActorSystem;
use http_api_problem::HttpApiProblem;
@ -258,5 +258,5 @@ pub async fn post_withdraw_request(
.detail(e.to_string())
})?;
Ok(tx::to_mempool_url(txid, *network.inner()))
Ok(projection::to_mempool_url(txid, *network.inner()))
}

31
daemon/src/send_async_safe.rs

@ -1,4 +1,5 @@
use async_trait::async_trait;
use std::fmt;
#[async_trait]
pub trait SendAsyncSafe<M, R>
@ -32,7 +33,35 @@ impl<A, M, E> SendAsyncSafe<M, Result<(), E>> for xtra::Address<A>
where
A: xtra::Handler<M>,
M: xtra::Message<Result = Result<(), E>>,
E: std::error::Error + Send,
E: fmt::Display + Send,
{
async fn send_async_safe(&self, msg: M) -> Result<(), xtra::Disconnected> {
if !self.is_connected() {
return Err(xtra::Disconnected);
}
let send_fut = self.send(msg);
#[allow(clippy::disallowed_method)]
tokio::spawn(async {
let e = match send_fut.await {
Ok(Err(e)) => format!("{:#}", e),
Err(e) => format!("{:#}", e),
Ok(Ok(())) => return,
};
tracing::warn!("Async message invocation failed: {:#}", e)
});
Ok(())
}
}
#[async_trait]
impl<M, E> SendAsyncSafe<M, Result<(), E>> for Box<dyn xtra::prelude::MessageChannel<M>>
where
M: xtra::Message<Result = Result<(), E>>,
E: fmt::Display + Send,
{
async fn send_async_safe(&self, msg: M) -> Result<(), xtra::Disconnected> {
if !self.is_connected() {

11
daemon/src/setup_contract.rs

@ -1,8 +1,8 @@
use crate::model;
use crate::model::cfd::Cet;
use crate::model::cfd::Dlc;
use crate::model::cfd::RevokedCommit;
use crate::model::cfd::Role;
use crate::model::cfd::CET_TIMELOCK;
use crate::model::Leverage;
use crate::model::Price;
use crate::model::Usd;
@ -157,7 +157,7 @@ pub async fn new(
(params.maker().clone(), *params.maker_punish()),
(params.taker().clone(), *params.taker_punish()),
oracle_pk,
(model::cfd::Cfd::CET_TIMELOCK, setup_params.refund_timelock),
(CET_TIMELOCK, setup_params.refund_timelock),
payouts,
sk,
setup_params.fee_rate,
@ -336,6 +336,7 @@ pub async fn new(
taker_lock_amount: params.taker().lock_amount,
revoked_commit: Vec::new(),
settlement_event_id,
refund_timelock: setup_params.refund_timelock,
})
}
@ -453,10 +454,7 @@ pub async fn roll_over(
taker_punish_params,
),
oracle_pk,
(
model::cfd::Cfd::CET_TIMELOCK,
rollover_params.refund_timelock,
),
(CET_TIMELOCK, rollover_params.refund_timelock),
payouts,
sk,
rollover_params.fee_rate,
@ -645,6 +643,7 @@ pub async fn roll_over(
taker_lock_amount,
revoked_commit,
settlement_event_id: announcement.id,
refund_timelock: rollover_params.refund_timelock,
})
}

49
daemon/src/setup_maker.rs

@ -1,6 +1,7 @@
use crate::address_map::ActorName;
use crate::address_map::Stopping;
use crate::maker_inc_connections;
use crate::maker_inc_connections::TakerMessage;
use crate::model::cfd::Cfd;
use crate::model::cfd::Dlc;
use crate::model::cfd::Order;
@ -9,11 +10,12 @@ use crate::model::cfd::Role;
use crate::model::cfd::SetupCompleted;
use crate::model::Identity;
use crate::oracle::Announcement;
use crate::send_async_safe::SendAsyncSafe;
use crate::setup_contract;
use crate::setup_contract::SetupParams;
use crate::tokio_ext::spawn_fallible;
use crate::wallet;
use crate::wire;
use crate::wire::MakerToTaker;
use crate::wire::SetupMsg;
use crate::xtra_ext::LogFailure;
use anyhow::Context;
@ -85,25 +87,18 @@ impl Actor {
// the spawned contract setup task
self.setup_msg_sender = Some(sender);
let taker_id = self.taker_id;
let (setup_params, identity) = self.cfd.start_contract_setup()?;
let contract_future = setup_contract::new(
self.taker.sink().with(move |msg| {
future::ok(maker_inc_connections::TakerMessage {
taker_id,
taker_id: identity,
msg: wire::MakerToTaker::Protocol { order_id, msg },
})
}),
receiver,
(self.oracle_pk, self.announcement.clone()),
SetupParams::new(
self.cfd.margin()?,
self.cfd.counterparty_margin()?,
self.cfd.price(),
self.cfd.quantity_usd(),
self.cfd.leverage(),
self.cfd.refund_timelock_in_blocks(),
self.cfd.fee_rate(),
),
setup_params,
self.build_party_params.clone_channel(),
self.sign.clone_channel(),
Role::Maker,
@ -137,6 +132,12 @@ impl Actor {
impl Actor {
fn handle(&mut self, _msg: Accepted, ctx: &mut xtra::Context<Self>) {
let order_id = self.cfd.id();
if self.setup_msg_sender.is_some() {
tracing::warn!(%order_id, "Contract setup already active");
return;
}
tracing::info!(%order_id, "Maker accepts an order");
let this = ctx
@ -171,8 +172,23 @@ impl Actor {
}
fn handle(&mut self, _msg: Rejected, ctx: &mut xtra::Context<Self>) {
self.complete(SetupCompleted::rejected(self.cfd.id()), ctx)
let _ = self
.taker
.send(TakerMessage {
taker_id: self.taker_id,
msg: MakerToTaker::RejectOrder(self.cfd.id()),
})
.log_failure("Failed to reject order to taker")
.await;
// We cannot use completed here because we are sending a message to ourselves and using
// `send` would be a deadlock!
let _ = self
.on_completed
.send_async_safe(SetupCompleted::rejected(self.cfd.id()))
.await;
ctx.stop();
}
fn handle(&mut self, msg: SetupSucceeded, ctx: &mut xtra::Context<Self>) {
@ -208,8 +224,7 @@ impl Actor {
#[async_trait]
impl xtra::Actor for Actor {
async fn started(&mut self, ctx: &mut xtra::Context<Self>) {
let quantity = self.cfd.quantity_usd();
let cfd = self.cfd.clone();
let quantity = self.cfd.quantity();
if quantity < self.order.min_quantity || quantity > self.order.max_quantity {
let reason = format!(
"Order rejected: quantity {} not in range [{}, {}]",
@ -221,12 +236,12 @@ impl xtra::Actor for Actor {
.taker
.send(maker_inc_connections::TakerMessage {
taker_id: self.taker_id,
msg: wire::MakerToTaker::RejectOrder(cfd.id()),
msg: wire::MakerToTaker::RejectOrder(self.cfd.id()),
})
.await;
self.complete(
SetupCompleted::rejected_due_to(cfd.id(), anyhow::format_err!(reason)),
SetupCompleted::rejected_due_to(self.cfd.id(), anyhow::format_err!(reason)),
ctx,
)
.await;

40
daemon/src/setup_taker.rs

@ -1,14 +1,12 @@
use crate::address_map;
use crate::connection;
use crate::model::cfd::Cfd;
use crate::model::cfd::CfdState;
use crate::model::cfd::Dlc;
use crate::model::cfd::OrderId;
use crate::model::cfd::Role;
use crate::model::cfd::SetupCompleted;
use crate::oracle::Announcement;
use crate::setup_contract;
use crate::setup_contract::SetupParams;
use crate::tokio_ext::spawn_fallible;
use crate::wallet;
use crate::wire;
@ -33,7 +31,6 @@ pub struct Actor {
build_party_params: Box<dyn MessageChannel<wallet::BuildPartyParams>>,
sign: Box<dyn MessageChannel<wallet::Sign>>,
maker: xtra::Address<connection::Actor>,
on_accepted: Box<dyn MessageChannel<Started>>,
on_completed: Box<dyn MessageChannel<SetupCompleted>>,
setup_msg_sender: Option<UnboundedSender<SetupMsg>>,
}
@ -46,7 +43,6 @@ impl Actor {
build_party_params: &(impl MessageChannel<wallet::BuildPartyParams> + 'static),
sign: &(impl MessageChannel<wallet::Sign> + 'static),
maker: xtra::Address<connection::Actor>,
on_accepted: &(impl MessageChannel<Started> + 'static),
on_completed: &(impl MessageChannel<SetupCompleted> + 'static),
) -> Self {
Self {
@ -57,7 +53,6 @@ impl Actor {
build_party_params: build_party_params.clone_channel(),
sign: sign.clone_channel(),
maker,
on_accepted: on_accepted.clone_channel(),
on_completed: on_completed.clone_channel(),
setup_msg_sender: None,
}
@ -68,18 +63,9 @@ impl Actor {
impl Actor {
fn handle(&mut self, _: Accepted, ctx: &mut xtra::Context<Self>) -> Result<()> {
let order_id = self.cfd.id();
*self.cfd.state_mut() = CfdState::contract_setup();
tracing::info!(%order_id, "Order got accepted");
// inform the `taker_cfd::Actor` about the start of contract
// setup, so that the db and UI can be updated accordingly
self.on_accepted
.send(Started(order_id))
.log_failure("Failed to inform about contract setup start")
.await?;
let (setup_params, _) = self.cfd.start_contract_setup()?;
let (sender, receiver) = mpsc::unbounded::<SetupMsg>();
// store the writing end to forward messages from the maker to
// the spawned contract setup task
@ -90,15 +76,7 @@ impl Actor {
.with(move |msg| future::ok(wire::TakerToMaker::Protocol { order_id, msg })),
receiver,
(self.oracle_pk, self.announcement.clone()),
SetupParams::new(
self.cfd.margin()?,
self.cfd.counterparty_margin()?,
self.cfd.price(),
self.cfd.quantity_usd(),
self.cfd.leverage(),
self.cfd.refund_timelock_in_blocks(),
self.cfd.fee_rate(),
),
setup_params,
self.build_party_params.clone_channel(),
self.sign.clone_channel(),
Role::Taker,
@ -184,7 +162,7 @@ impl xtra::Actor for Actor {
.maker
.send(connection::TakeOrder {
order_id: self.cfd.id(),
quantity: self.cfd.quantity_usd(),
quantity: self.cfd.quantity(),
address,
})
.await;
@ -201,10 +179,6 @@ impl xtra::Actor for Actor {
/// by the maker.
pub struct Accepted;
/// Message sent from the `setup_taker::Actor` to the
/// `taker_cfd::Actor` to notify that the contract setup has started.
pub struct Started(pub OrderId);
/// Message sent from the `connection::Actor` to the
/// `setup_taker::Actor` to notify that the order taken was rejected
/// by the maker.
@ -245,14 +219,6 @@ impl Rejected {
}
}
impl xtra::Message for Started {
type Result = Result<()>;
}
impl xtra::Message for SetupCompleted {
type Result = Result<()>;
}
impl address_map::ActorName for Actor {
fn actor_name() -> String {
"Taker contract setup".to_string()

4
daemon/src/taker.rs

@ -10,7 +10,6 @@ use clap::Subcommand;
use daemon::bitmex_price_feed;
use daemon::connection::connect;
use daemon::db;
use daemon::housekeeping;
use daemon::logger;
use daemon::model::cfd::Role;
use daemon::model::Identity;
@ -229,7 +228,6 @@ async fn main() -> Result<()> {
.context("Db migrations failed")?;
// Create actors
housekeeping::new(&db, &wallet).await?;
let (projection_actor, projection_context) = xtra::Context::new(None);
@ -262,7 +260,7 @@ async fn main() -> Result<()> {
tasks.add(task);
let (proj_actor, projection_feeds) =
projection::Actor::new(db.clone(), Role::Taker, bitcoin_network).await?;
projection::Actor::new(db.clone(), Role::Taker, bitcoin_network);
tasks.add(projection_context.run(proj_actor));
let possible_addresses = resolve_maker_addresses(&opts.maker).await?;

248
daemon/src/taker_cfd.rs

@ -1,20 +1,21 @@
use crate::address_map::AddressMap;
use crate::cfd_actors::append_cfd_state;
use crate::cfd_actors;
use crate::cfd_actors::insert_cfd_and_update_feed;
use crate::cfd_actors::{self};
use crate::cfd_actors::load_cfd;
use crate::collab_settlement_taker;
use crate::connection;
use crate::db::load_cfd;
use crate::db::append_event;
use crate::model::cfd::Cfd;
use crate::model::cfd::CfdState;
use crate::model::cfd::CfdStateCommon;
use crate::model::cfd::CfdEvent;
use crate::model::cfd::CollaborativeSettlement;
use crate::model::cfd::Completed;
use crate::model::cfd::Order;
use crate::model::cfd::OrderId;
use crate::model::cfd::Origin;
use crate::model::cfd::Role;
use crate::model::cfd::SetupCompleted;
use crate::model::cfd::TakerSettlementCompleted;
use crate::model::Identity;
use crate::model::Position;
use crate::model::Price;
use crate::model::Usd;
use crate::monitor;
@ -153,37 +154,69 @@ where
Ok(())
}
}
async fn handle_settlement_completed(&mut self, msg: TakerSettlementCompleted) -> Result<()> {
let (order_id, settlement) = match msg {
TakerSettlementCompleted::Succeeded {
order_id,
payload: settlement,
} => (order_id, settlement),
TakerSettlementCompleted::Rejected { order_id, reason } => {
tracing::debug!(%order_id, "Collaborative settlement failed: {:#}", reason);
return Ok(());
}
TakerSettlementCompleted::Failed { order_id, error } => {
tracing::warn!(%order_id, "Collaborative settlement failed: {:#}", error);
return Ok(());
}
};
let settlement_txid = settlement.tx.txid();
#[xtra_productivity(message_impl = false)]
impl<O, M, W> Actor<O, M, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
M: xtra::Handler<monitor::CollaborativeSettlement>,
{
async fn handle_settlement_completed(
&mut self,
msg: Completed<CollaborativeSettlement>,
) -> Result<()> {
let order_id = msg.order_id();
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
let dlc = cfd.dlc().context("No DLC in CFD")?;
cfd.handle_proposal_signed(settlement)?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
let cfd = load_cfd(order_id, &mut conn).await?;
self.monitor_actor
.send(monitor::CollaborativeSettlement {
order_id,
tx: (settlement_txid, dlc.script_pubkey_for(Role::Taker)),
})
.await?;
let event = cfd.settle_collaboratively(msg)?;
append_event(event.clone(), &mut conn).await?;
self.projection_actor.send(projection::CfdsChanged).await?;
match event.event {
CfdEvent::CollaborativeSettlementCompleted {
spend_tx, script, ..
} => {
// TODO: Publish the tx once the collaborative settlement is symmetric, allowing the
// taker to publish as well.
let txid = spend_tx.txid();
tracing::info!(%order_id, "Collaborative settlement completed successfully {}", txid);
self.monitor_actor
.send(monitor::CollaborativeSettlement {
order_id,
tx: (txid, script),
})
.await?;
}
CfdEvent::CollaborativeSettlementRejected { commit_tx } => {
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction { tx: commit_tx })
.await?
.context("Broadcasting commit transaction")?;
tracing::info!(
"Closing non-collaboratively. Commit tx published with txid {}",
txid
)
}
CfdEvent::CollaborativeSettlementFailed { commit_tx } => {
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction { tx: commit_tx })
.await?
.context("Broadcasting commit transaction")?;
tracing::warn!(
"Closing non-collaboratively. Commit tx published with txid {}",
txid
)
}
_ => bail!("Unexpected event {:?}", event.event),
}
Ok(())
}
@ -196,12 +229,6 @@ impl<O, M, W> Actor<O, M, W> {
Some(mut order) => {
order.origin = Origin::Theirs;
let mut conn = self.db.acquire().await?;
if load_cfd(order.id, &mut conn).await.is_ok() {
bail!("Received order {} from maker, but already have a cfd in the database for that order. The maker did not properly remove the order.", order.id)
}
self.current_order = Some(order.clone());
self.projection_actor
@ -214,67 +241,6 @@ impl<O, M, W> Actor<O, M, W> {
}
Ok(())
}
async fn append_cfd_state_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Order rejected");
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
*cfd.state_mut() = CfdState::rejected();
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
Ok(())
}
async fn append_cfd_state_setup_failed(
&mut self,
order_id: OrderId,
error: anyhow::Error,
) -> Result<()> {
tracing::error!(%order_id, "Contract setup failed: {:#?}", error);
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
*cfd.state_mut() = CfdState::setup_failed(error.to_string());
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
Ok(())
}
/// Set the state of the CFD in the database to `ContractSetup`
/// and update the corresponding projection.
async fn handle_setup_started(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
*cfd.state_mut() = CfdState::contract_setup();
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
Ok(())
}
}
impl<O, M, W> Actor<O, M, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event(event, &mut conn, &self.wallet, &self.projection_actor)
.await?;
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_oracle_attestation(
attestation,
&mut conn,
&self.wallet,
&self.projection_actor,
)
.await?;
Ok(())
}
}
#[xtra_productivity]
@ -306,10 +272,13 @@ where
tracing::info!("Taking current order: {:?}", &current_order);
// We create the cfd here without any events yet, only static data
// Once the contract setup completes (rejected / accepted / failed) the first event will be
// recorded
let cfd = Cfd::from_order(
current_order.clone(),
Position::Long,
quantity,
CfdState::outgoing_order_request(),
self.maker_identity,
Role::Taker,
);
@ -337,7 +306,6 @@ where
&self.wallet,
self.conn_actor.clone(),
&this,
&this,
)
.create(None)
.run();
@ -357,33 +325,24 @@ where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_setup_completed(&mut self, msg: SetupCompleted) -> Result<()> {
let (order_id, dlc) = match msg {
SetupCompleted::Succeeded {
order_id,
payload: (dlc, _),
} => (order_id, dlc),
SetupCompleted::Rejected { order_id, .. } => {
self.append_cfd_state_rejected(order_id).await?;
return Ok(());
}
SetupCompleted::Failed { order_id, error } => {
self.append_cfd_state_setup_failed(order_id, error).await?;
return Ok(());
}
};
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
let order_id = msg.order_id();
tracing::info!("Setup complete, publishing on chain now");
let cfd = load_cfd(order_id, &mut conn).await?;
let event = cfd.setup_contract(msg)?;
append_event(event.clone(), &mut conn).await?;
self.projection_actor.send(projection::CfdsChanged).await?;
*cfd.state_mut() = CfdState::PendingOpen {
common: CfdStateCommon::default(),
dlc: dlc.clone(),
attestation: None,
let dlc = match event.event {
CfdEvent::ContractSetupCompleted { dlc } => dlc,
CfdEvent::OfferRejected | CfdEvent::ContractSetupFailed => {
return Ok(());
}
_ => bail!("Unexpected event {:?}", event.event),
};
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
tracing::info!("Setup complete, publishing on chain now");
let txid = self
.wallet
@ -397,7 +356,7 @@ where
self.monitor_actor
.send(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()),
params: MonitorParams::new(dlc.clone()),
})
.await?;
@ -430,33 +389,32 @@ where
}
}
#[async_trait]
impl<O: 'static, M: 'static, W: 'static> Handler<monitor::Event> for Actor<O, M, W>
#[xtra_productivity(message_impl = false)]
impl<O, M, W> Actor<O, M, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context<Self>) -> Result<()> {
self.handle_monitoring_event(msg).await
async fn handle_monitor(&mut self, msg: monitor::Event) {
if let Err(e) =
cfd_actors::handle_monitoring_event(msg, &self.db, &self.wallet, &self.projection_actor)
.await
{
tracing::error!("Unable to handle monotoring event: {:#}", e)
}
}
}
#[async_trait]
impl<O: 'static, M: 'static, W: 'static> Handler<oracle::Attestation> for Actor<O, M, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context<Self>) {
if let Err(e) = self.handle_oracle_attestation(msg).await {
async fn handle_attestation(&mut self, msg: oracle::Attestation) {
if let Err(e) = cfd_actors::handle_oracle_attestation(
msg,
&self.db,
&self.wallet,
&self.projection_actor,
)
.await
{
tracing::warn!("Failed to handle oracle attestation: {:#}", e)
}
}
}
#[async_trait]
impl<O: 'static, M: 'static, W: 'static> Handler<setup_taker::Started> for Actor<O, M, W> {
async fn handle(&mut self, msg: setup_taker::Started, _ctx: &mut Context<Self>) -> Result<()> {
self.handle_setup_started(msg.0).await
}
}
impl<O: 'static, M: 'static, W: 'static> xtra::Actor for Actor<O, M, W> {}

2
daemon/src/to_sse_event.rs

@ -1,10 +1,10 @@
use crate::connection;
use crate::model;
use crate::model::Identity;
use crate::model::Timestamp;
use crate::projection::Cfd;
use crate::projection::CfdAction;
use crate::projection::CfdOrder;
use crate::projection::Identity;
use crate::projection::Quote;
use crate::to_sse_event::ConnectionCloseReason::MakerVersionOutdated;
use crate::to_sse_event::ConnectionCloseReason::TakerVersionOutdated;

119
daemon/src/tx.rs

@ -1,119 +0,0 @@
use bdk::bitcoin::Network;
use bdk::bitcoin::Txid;
use serde::Serialize;
use crate::model::cfd;
#[derive(Debug, Clone, Serialize)]
pub struct TxUrl {
pub label: TxLabel,
pub url: String,
}
/// Construct a mempool.space URL for a given txid
pub fn to_mempool_url(txid: Txid, network: Network) -> String {
match network {
Network::Bitcoin => format!("https://mempool.space/tx/{}", txid),
Network::Testnet => format!("https://mempool.space/testnet/tx/{}", txid),
Network::Signet => format!("https://mempool.space/signet/tx/{}", txid),
Network::Regtest => txid.to_string(),
}
}
impl TxUrl {
pub fn new(txid: Txid, network: Network, label: TxLabel) -> Self {
Self {
label,
url: to_mempool_url(txid, network),
}
}
}
pub fn to_tx_url_list(state: cfd::CfdState, network: Network) -> Vec<TxUrl> {
use cfd::CfdState::*;
let tx_ub = TxUrlBuilder::new(network);
match state {
PendingOpen { dlc, .. } => {
vec![tx_ub.lock(&dlc)]
}
PendingCommit { dlc, .. } => vec![tx_ub.lock(&dlc), tx_ub.commit(&dlc)],
OpenCommitted { dlc, .. } => vec![tx_ub.lock(&dlc), tx_ub.commit(&dlc)],
Open {
dlc,
collaborative_close,
..
} => {
let mut tx_urls = vec![tx_ub.lock(&dlc)];
if let Some(collaborative_close) = collaborative_close {
tx_urls.push(tx_ub.collaborative_close(collaborative_close.tx.txid()));
}
tx_urls
}
PendingCet {
dlc, attestation, ..
} => vec![
tx_ub.lock(&dlc),
tx_ub.commit(&dlc),
tx_ub.cet(attestation.txid()),
],
Closed {
payout: cfd::Payout::Cet(attestation),
..
} => vec![tx_ub.cet(attestation.txid())],
Closed {
payout: cfd::Payout::CollaborativeClose(collaborative_close),
..
} => {
vec![tx_ub.collaborative_close(collaborative_close.tx.txid())]
}
PendingRefund { dlc, .. } => vec![tx_ub.lock(&dlc), tx_ub.commit(&dlc), tx_ub.refund(&dlc)],
Refunded { dlc, .. } => vec![tx_ub.refund(&dlc)],
OutgoingOrderRequest { .. }
| IncomingOrderRequest { .. }
| Accepted { .. }
| Rejected { .. }
| ContractSetup { .. }
| SetupFailed { .. } => vec![],
}
}
struct TxUrlBuilder {
network: Network,
}
impl TxUrlBuilder {
pub fn new(network: Network) -> Self {
Self { network }
}
pub fn lock(&self, dlc: &cfd::Dlc) -> TxUrl {
TxUrl::new(dlc.lock.0.txid(), self.network, TxLabel::Lock)
}
pub fn commit(&self, dlc: &cfd::Dlc) -> TxUrl {
TxUrl::new(dlc.commit.0.txid(), self.network, TxLabel::Commit)
}
pub fn cet(&self, txid: Txid) -> TxUrl {
TxUrl::new(txid, self.network, TxLabel::Cet)
}
pub fn collaborative_close(&self, txid: Txid) -> TxUrl {
TxUrl::new(txid, self.network, TxLabel::Collaborative)
}
pub fn refund(&self, dlc: &cfd::Dlc) -> TxUrl {
TxUrl::new(dlc.refund.0.txid(), self.network, TxLabel::Refund)
}
}
#[derive(Debug, Clone, Serialize)]
pub enum TxLabel {
Lock,
Commit,
Cet,
Refund,
Collaborative,
}

6
daemon/src/xtra_ext.rs

@ -1,4 +1,5 @@
use async_trait::async_trait;
use std::fmt;
use xtra::address;
use xtra::message_channel;
use xtra::Actor;
@ -31,9 +32,10 @@ where
}
#[async_trait]
impl<M> LogFailure for message_channel::SendFuture<M>
impl<M, E> LogFailure for message_channel::SendFuture<M>
where
M: xtra::Message<Result = anyhow::Result<()>>,
M: xtra::Message<Result = anyhow::Result<(), E>>,
E: fmt::Display + Send,
{
async fn log_failure(self, context: &str) -> Result<(), Disconnected> {
if let Err(e) = self.await? {

26
daemon/tests/happy_path.rs

@ -14,11 +14,11 @@ use crate::harness::Taker;
use crate::harness::TakerConfig;
use daemon::connection::ConnectionStatus;
use daemon::model::cfd::OrderId;
use daemon::model::Identity;
use daemon::model::Usd;
use daemon::monitor::Event;
use daemon::oracle;
use daemon::projection::CfdState;
use daemon::projection::Identity;
use maia::secp256k1_zkp::schnorrsig;
use rust_decimal_macros::dec;
use std::time::Duration;
@ -70,13 +70,10 @@ async fn taker_takes_order_and_maker_rejects() {
.unwrap();
taker.mocks.mock_oracle_announcement().await;
maker.mocks.mock_oracle_announcement().await;
taker.take_order(received.clone(), Usd::new(dec!(10))).await;
let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap();
assert_eq!(taker_cfd.order_id, received.id);
assert_eq!(maker_cfd.order_id, received.id);
assert_eq!(taker_cfd.state, CfdState::OutgoingOrderRequest);
assert_eq!(maker_cfd.state, CfdState::IncomingOrderRequest);
assert_next_state!(CfdState::PendingSetup, maker, taker, received.id);
maker.reject_take_request(received.clone()).await;
@ -101,7 +98,7 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() {
maker.mocks.mock_oracle_announcement().await;
taker.take_order(received.clone(), Usd::new(dec!(5))).await;
let (_, _) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap();
assert_next_state!(CfdState::PendingSetup, maker, taker, received.id);
maker.mocks.mock_party_params().await;
taker.mocks.mock_party_params().await;
@ -115,17 +112,13 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() {
maker.mocks.mock_monitor_start_monitoring().await;
taker.mocks.mock_monitor_start_monitoring().await;
maker.accept_take_request(received.clone()).await;
assert_next_state!(CfdState::ContractSetup, maker, taker, received.id);
maker.mocks.mock_wallet_sign_and_broadcast().await;
taker.mocks.mock_wallet_sign_and_broadcast().await;
maker.accept_take_request(received.clone()).await;
assert_next_state!(CfdState::PendingOpen, maker, taker, received.id);
deliver_event!(maker, taker, Event::LockFinality(received.id));
assert_next_state!(CfdState::Open, maker, taker, received.id);
}
@ -238,7 +231,7 @@ async fn maker_notices_lack_of_taker() {
let (mut maker, taker) = start_both().await;
assert_eq!(
vec![taker.id.clone()],
vec![taker.id],
next(maker.connected_takers_feed()).await.unwrap()
);
@ -287,7 +280,7 @@ async fn start_from_open_cfd_state(announcement: oracle::Announcement) -> (Maker
.await;
taker.take_order(received.clone(), Usd::new(dec!(5))).await;
let (_, _) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap();
assert_next_state!(CfdState::PendingSetup, maker, taker, received.id);
maker.mocks.mock_party_params().await;
taker.mocks.mock_party_params().await;
@ -301,16 +294,13 @@ async fn start_from_open_cfd_state(announcement: oracle::Announcement) -> (Maker
maker.mocks.mock_monitor_start_monitoring().await;
taker.mocks.mock_monitor_start_monitoring().await;
maker.accept_take_request(received.clone()).await;
assert_next_state!(CfdState::ContractSetup, maker, taker, received.id);
maker.mocks.mock_wallet_sign_and_broadcast().await;
taker.mocks.mock_wallet_sign_and_broadcast().await;
maker.accept_take_request(received.clone()).await;
assert_next_state!(CfdState::PendingOpen, maker, taker, received.id);
deliver_event!(maker, taker, Event::LockFinality(received.id));
assert_next_state!(CfdState::Open, maker, taker, received.id);
(maker, taker, received.id)

7
daemon/tests/harness/flow.rs

@ -1,3 +1,4 @@
use anyhow::ensure;
use anyhow::Context;
use anyhow::Result;
use daemon::projection::Cfd;
@ -5,6 +6,7 @@ use daemon::projection::CfdOrder;
use daemon::tokio_ext::FutureExt;
use std::time::Duration;
use tokio::sync::watch;
use tokio::time::sleep;
/// Waiting time for the time on the watch channel before returning error
const NEXT_WAIT_TIME: Duration = Duration::from_secs(if cfg!(debug_assertions) { 180 } else { 30 });
@ -16,11 +18,12 @@ pub async fn next_cfd(
rx_a: &mut watch::Receiver<Vec<Cfd>>,
rx_b: &mut watch::Receiver<Vec<Cfd>>,
) -> Result<(Cfd, Cfd)> {
sleep(Duration::from_secs(1)).await; // TODO: Try to remove this workaround
let (a, b) = tokio::join!(next(rx_a), next(rx_b));
let (a, b) = (a?, b?);
assert_eq!(a.len(), 1);
assert_eq!(b.len(), 1);
ensure!(a.len() == 1, "expected 1 cfd on feed");
ensure!(b.len() == 1, "expected 1 cfd on feed");
Ok((a.first().unwrap().clone(), b.first().unwrap().clone()))
}

2
daemon/tests/harness/maia.rs

@ -1,6 +1,6 @@
use daemon::model::BitMexPriceEventId;
use daemon::oracle;
use daemon::oracle::Attestation;
use daemon::oracle::{self};
use maia::secp256k1_zkp::schnorrsig;
use maia::secp256k1_zkp::SecretKey;
use std::str::FromStr;

9
daemon/tests/harness/mocks/mod.rs

@ -1,13 +1,10 @@
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::MutexGuard;
use self::monitor::MonitorActor;
use self::oracle::OracleActor;
use self::wallet::WalletActor;
use super::maia::OliviaData;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::MutexGuard;
pub mod monitor;
pub mod oracle;

14
daemon/tests/harness/mod.rs

@ -8,16 +8,16 @@ use daemon::connection::ConnectionStatus;
use daemon::db;
use daemon::maker_cfd;
use daemon::maker_inc_connections;
use daemon::model;
use daemon::model::cfd::OrderId;
use daemon::model::cfd::Role;
use daemon::model::Identity;
use daemon::model::Price;
use daemon::model::Usd;
use daemon::model::{self};
use daemon::projection;
use daemon::projection::Cfd;
use daemon::projection::CfdOrder;
use daemon::projection::Feeds;
use daemon::projection::Identity;
use daemon::seed::Seed;
use daemon::taker_cfd;
use daemon::MakerActorSystem;
@ -179,9 +179,7 @@ impl Maker {
.await
.unwrap();
let (proj_actor, feeds) = projection::Actor::new(db, Role::Maker, Network::Testnet)
.await
.unwrap();
let (proj_actor, feeds) = projection::Actor::new(db, Role::Maker, Network::Testnet);
tasks.add(projection_context.run(proj_actor));
let address = listener.local_addr().unwrap();
@ -296,9 +294,7 @@ impl Taker {
.await
.unwrap();
let (proj_actor, feeds) = projection::Actor::new(db, Role::Taker, Network::Testnet)
.await
.unwrap();
let (proj_actor, feeds) = projection::Actor::new(db, Role::Taker, Network::Testnet);
tasks.add(projection_context.run(proj_actor));
tasks.add(connect(
@ -309,7 +305,7 @@ impl Taker {
));
Self {
id: model::Identity::new(identity_pk).into(),
id: model::Identity::new(identity_pk),
system: taker,
feeds,
mocks,

86
docs/asset/mvp_maker_taker_db.puml

@ -6,32 +6,86 @@ hide circle
' avoid problems with angled crows feet
skinparam linetype ortho
entity "orders" as order {
*id : number <<PK>> <<generated>>
entity "cfds" as cfds {
id <<PK>> <<auto-increment>>
--
...
uuid <<unique>>
--
position
initial_price
leverage
settlement_time_interval_hours
quantity
counterparty_network_identity
}
entity "cfds" as cfd {
*id : number <<PK>> <<generated>>
--
*order_id : text <<FK>>
--
quantity_usd: long
creation_timestamp: Date
entity "events" as events {
id <<PK>> <<auto-increment>>
--
cfd_id <<FK>>
--
name: text
data: json
timestamp: date
}
entity "cfd_states" as cfd_states {
*id : number <<PK>> <<generated>>
cfds }o--|| events
entity "taker::CfdAggregate" as taker_cfd_aggregate {
--
--
state: blob
start_contract_setup() -> ContractSetupParams
complete_contract_setup() -> Event::Dlc
start_rollover()
complete_rollover()
start_collaborative_settlement()
complete_collaborative_settlement()
start_non_collaborative_settlement()
lock_confirmed()
commit_confirmed()
cet_confirmed()
refund_confirmed()
collaborative_settlement_confirmed()
cet_timelock_expired()
refund_timelock_expired()
oracle_attested()
}
note left: state de-/serialized \nfrom rust state enum \nthis is not backwards\ncompatible, but that's \nOK for the MVP
entity "maker::CfdAggregate" as maker_cfd_aggregate {
--
--
start_contract_setup() -> ContractSetupParams
complete_contract_setup() -> Event::Dlc
rollover_requested() -> RolloverParams
accept_rollover()
reject_rollover()
complete_rollover()
order ||--|| cfd
collaborative_settlement_requested()
accept_collaborative_settlement()
reject_collaborative_settlement()
complete_collaborative_settlement()
cfd ||--|{ cfd_states
start_non_collaborative_settlement()
lock_confirmed()
commit_confirmed()
cet_confirmed()
refund_confirmed()
collaborative_settlement_confirmed()
cet_timelock_expired()
refund_timelock_expired()
oracle_attested()
}
entity "Event" as taker_event {
}
@enduml

27
maker-frontend/src/components/Types.tsx

@ -69,16 +69,10 @@ export class State {
public getLabel(): string {
switch (this.key) {
case StateKey.INCOMING_ORDER_REQUEST:
return "Order Requested";
case StateKey.OUTGOING_ORDER_REQUEST:
return "Order Requested";
case StateKey.ACCEPTED:
return "Accepted";
case StateKey.PENDING_SETUP:
return "Setting up";
case StateKey.REJECTED:
return "Rejected";
case StateKey.CONTRACT_SETUP:
return "Contract Setup";
case StateKey.PENDING_OPEN:
return "Pending Open";
case StateKey.OPEN:
@ -117,7 +111,6 @@ export class State {
const orange = "orange";
switch (this.key) {
case StateKey.ACCEPTED:
case StateKey.OPEN:
return green;
@ -131,13 +124,11 @@ export class State {
case StateKey.PENDING_CLOSE:
return orange;
case StateKey.OUTGOING_ORDER_REQUEST:
case StateKey.INCOMING_ORDER_REQUEST:
case StateKey.PENDING_SETUP:
case StateKey.OUTGOING_SETTLEMENT_PROPOSAL:
case StateKey.INCOMING_SETTLEMENT_PROPOSAL:
case StateKey.INCOMING_ROLL_OVER_PROPOSAL:
case StateKey.OUTGOING_ROLL_OVER_PROPOSAL:
case StateKey.CONTRACT_SETUP:
case StateKey.PENDING_OPEN:
case StateKey.REFUNDED:
case StateKey.SETUP_FAILED:
@ -148,12 +139,7 @@ export class State {
public getGroup(): StateGroupKey {
switch (this.key) {
case StateKey.INCOMING_ORDER_REQUEST:
return StateGroupKey.PENDING_ORDER;
case StateKey.OUTGOING_ORDER_REQUEST:
case StateKey.ACCEPTED:
case StateKey.CONTRACT_SETUP:
case StateKey.PENDING_SETUP:
return StateGroupKey.OPENING;
case StateKey.PENDING_OPEN:
@ -195,11 +181,8 @@ export enum Action {
}
const enum StateKey {
OUTGOING_ORDER_REQUEST = "OutgoingOrderRequest",
INCOMING_ORDER_REQUEST = "IncomingOrderRequest",
ACCEPTED = "Accepted",
PENDING_SETUP = "PendingSetup",
REJECTED = "Rejected",
CONTRACT_SETUP = "ContractSetup",
PENDING_OPEN = "PendingOpen",
OPEN = "Open",
PENDING_CLOSE = "PendingClose",

28
taker-frontend/src/types.ts

@ -86,16 +86,10 @@ export class State {
public getLabel(): string {
switch (this.key) {
case StateKey.INCOMING_ORDER_REQUEST:
return "Order Requested";
case StateKey.OUTGOING_ORDER_REQUEST:
return "Order Requested";
case StateKey.ACCEPTED:
return "Accepted";
case StateKey.PENDING_SETUP:
return "Setting up";
case StateKey.REJECTED:
return "Rejected";
case StateKey.CONTRACT_SETUP:
return "Contract Setup";
case StateKey.PENDING_OPEN:
return "Pending Open";
case StateKey.OPEN:
@ -134,7 +128,6 @@ export class State {
const orange = "orange";
switch (this.key) {
case StateKey.ACCEPTED:
case StateKey.OPEN:
return green;
@ -148,13 +141,11 @@ export class State {
case StateKey.PENDING_CLOSE:
return orange;
case StateKey.OUTGOING_ORDER_REQUEST:
case StateKey.INCOMING_ORDER_REQUEST:
case StateKey.PENDING_SETUP:
case StateKey.OUTGOING_SETTLEMENT_PROPOSAL:
case StateKey.INCOMING_SETTLEMENT_PROPOSAL:
case StateKey.INCOMING_ROLL_OVER_PROPOSAL:
case StateKey.OUTGOING_ROLL_OVER_PROPOSAL:
case StateKey.CONTRACT_SETUP:
case StateKey.PENDING_OPEN:
case StateKey.REFUNDED:
case StateKey.SETUP_FAILED:
@ -165,12 +156,7 @@ export class State {
public getGroup(): StateGroupKey {
switch (this.key) {
case StateKey.INCOMING_ORDER_REQUEST:
return StateGroupKey.PENDING_ORDER;
case StateKey.OUTGOING_ORDER_REQUEST:
case StateKey.ACCEPTED:
case StateKey.CONTRACT_SETUP:
case StateKey.PENDING_SETUP:
return StateGroupKey.OPENING;
case StateKey.PENDING_OPEN:
@ -200,11 +186,8 @@ export class State {
}
export const enum StateKey {
OUTGOING_ORDER_REQUEST = "OutgoingOrderRequest",
INCOMING_ORDER_REQUEST = "IncomingOrderRequest",
ACCEPTED = "Accepted",
PENDING_SETUP = "PendingSetup",
REJECTED = "Rejected",
CONTRACT_SETUP = "ContractSetup",
PENDING_OPEN = "PendingOpen",
OPEN = "Open",
PENDING_CLOSE = "PendingClose",
@ -224,7 +207,6 @@ export const enum StateKey {
export enum StateGroupKey {
/// A CFD which is still being set up (not on chain yet)
OPENING = "Opening",
PENDING_ORDER = "Pending Order",
/// A CFD that is an ongoing open position (on chain)
OPEN = "Open",
PENDING_SETTLEMENT = "Pending Settlement",

Loading…
Cancel
Save