Browse Source

Don't store order in database

resilient-broadcast
Thomas Eizinger 3 years ago
committed by Daniel Karzel
parent
commit
4e7ae8bffe
No known key found for this signature in database GPG Key ID: 30C3FC2E438ADB6E
  1. 31
      daemon/migrations/20211210000000_remove-order-from-database.sql
  2. 214
      daemon/sqlx-data.json
  3. 14
      daemon/src/collab_settlement_maker.rs
  4. 14
      daemon/src/collab_settlement_taker.rs
  5. 442
      daemon/src/db.rs
  6. 50
      daemon/src/maker_cfd.rs
  7. 1
      daemon/src/model.rs
  8. 75
      daemon/src/model/cfd.rs
  9. 56
      daemon/src/monitor.rs
  10. 16
      daemon/src/projection.rs
  11. 28
      daemon/src/rollover_taker.rs
  12. 50
      daemon/src/setup_maker.rs
  13. 48
      daemon/src/setup_taker.rs
  14. 21
      daemon/src/taker_cfd.rs

31
daemon/migrations/20211210000000_remove-order-from-database.sql

@ -0,0 +1,31 @@
drop table cfd_states;
drop table cfds;
drop table orders;
create table if not exists cfds
(
id integer primary key autoincrement,
uuid text unique not null,
trading_pair text not null,
position text not null,
initial_price text not null,
leverage integer not null,
liquidation_price text not null,
creation_timestamp_seconds integer not null,
settlement_time_interval_seconds integer not null,
origin text not null,
oracle_event_id text not null,
fee_rate integer not null,
quantity_usd text not null,
counterparty text not null
);
create unique index if not exists cfd_uuid on cfds (uuid);
create table if not exists cfd_states
(
id integer primary key autoincrement,
cfd_id integer not null,
state text not null,
foreign key (cfd_id) references cfds (id)
);

214
daemon/sqlx-data.json

@ -1,95 +1,5 @@
{
"db": "SQLite",
"047070578f79ceaa739c1b307d138ee30e3a8730c4ac357213801e23d73f2841": {
"query": "\n select\n uuid as \"uuid: crate::model::cfd::OrderId\",\n trading_pair as \"trading_pair: crate::model::TradingPair\",\n position as \"position: crate::model::Position\",\n initial_price as \"initial_price: crate::model::Price\",\n min_quantity as \"min_quantity: crate::model::Usd\",\n max_quantity as \"max_quantity: crate::model::Usd\",\n leverage as \"leverage: crate::model::Leverage\",\n liquidation_price as \"liquidation_price: crate::model::Price\",\n creation_timestamp_seconds as \"ts_secs: crate::model::Timestamp\",\n settlement_time_interval_seconds as \"settlement_time_interval_secs: i64\",\n origin as \"origin: crate::model::cfd::Origin\",\n oracle_event_id as \"oracle_event_id: crate::model::BitMexPriceEventId\",\n fee_rate as \"fee_rate: u32\"\n from\n orders\n where\n uuid = $1\n ",
"describe": {
"columns": [
{
"name": "uuid: crate::model::cfd::OrderId",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "trading_pair: crate::model::TradingPair",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "position: crate::model::Position",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "initial_price: crate::model::Price",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "min_quantity: crate::model::Usd",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "max_quantity: crate::model::Usd",
"ordinal": 5,
"type_info": "Text"
},
{
"name": "leverage: crate::model::Leverage",
"ordinal": 6,
"type_info": "Int64"
},
{
"name": "liquidation_price: crate::model::Price",
"ordinal": 7,
"type_info": "Text"
},
{
"name": "ts_secs: crate::model::Timestamp",
"ordinal": 8,
"type_info": "Int64"
},
{
"name": "settlement_time_interval_secs: i64",
"ordinal": 9,
"type_info": "Int64"
},
{
"name": "origin: crate::model::cfd::Origin",
"ordinal": 10,
"type_info": "Text"
},
{
"name": "oracle_event_id: crate::model::BitMexPriceEventId",
"ordinal": 11,
"type_info": "Text"
},
{
"name": "fee_rate: u32",
"ordinal": 12,
"type_info": "Null"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false
]
}
},
"221a6283db798bacaba99e7e85130f9a8bbea1299d8cb99d272b1d478dc19775": {
"query": "\n select\n state\n from cfd_states\n where cfd_id = $1\n order by id desc\n limit 1;\n ",
"describe": {
@ -108,8 +18,8 @@
]
}
},
"28d8b9ddd2dd85a9096200e6abd170a09ed35e1c905c081e535e19800016cd7d": {
"query": "\n with ord as (\n select\n id as order_id,\n uuid,\n trading_pair,\n position,\n initial_price,\n min_quantity,\n max_quantity,\n leverage,\n liquidation_price,\n creation_timestamp_seconds as ts_secs,\n settlement_time_interval_seconds as settlement_time_interval_secs,\n origin,\n oracle_event_id,\n fee_rate\n from orders\n ),\n\n cfd as (\n select\n ord.order_id,\n id as cfd_id,\n quantity_usd,\n counterparty\n from cfds\n inner join ord on ord.order_id = cfds.order_id\n ),\n\n state as (\n select\n id as state_id,\n cfd.order_id,\n cfd.quantity_usd,\n cfd.counterparty,\n state\n from cfd_states\n inner join cfd on cfd.cfd_id = cfd_states.cfd_id\n where id in (\n select\n max(id) as id\n from cfd_states\n group by (cfd_id)\n )\n )\n\n select\n ord.uuid as \"uuid: crate::model::cfd::OrderId\",\n ord.trading_pair as \"trading_pair: crate::model::TradingPair\",\n ord.position as \"position: crate::model::Position\",\n ord.initial_price as \"initial_price: crate::model::Price\",\n ord.min_quantity as \"min_quantity: crate::model::Usd\",\n ord.max_quantity as \"max_quantity: crate::model::Usd\",\n ord.leverage as \"leverage: crate::model::Leverage\",\n ord.liquidation_price as \"liquidation_price: crate::model::Price\",\n ord.ts_secs as \"ts_secs: crate::model::Timestamp\",\n ord.settlement_time_interval_secs as \"settlement_time_interval_secs: i64\",\n ord.origin as \"origin: crate::model::cfd::Origin\",\n ord.oracle_event_id as \"oracle_event_id: crate::model::BitMexPriceEventId\",\n ord.fee_rate as \"fee_rate: u32\",\n state.quantity_usd as \"quantity_usd: crate::model::Usd\",\n state.counterparty as \"counterparty: crate::model::Identity\",\n state.state\n\n from ord\n inner join state on state.order_id = ord.order_id\n\n where ord.uuid = $1\n ",
"3efef7791c6a33eff33426e4b68e1f908be8da5efd2f991ba8e67df99f7ab360": {
"query": "\n with state as (\n select\n cfd_id,\n state\n from cfd_states\n inner join cfds on cfds.id = cfd_states.cfd_id\n where cfd_states.id in (\n select\n max(id) as id\n from cfd_states\n group by (cfd_id)\n )\n )\n\n select\n cfds.uuid as \"uuid: crate::model::cfd::OrderId\",\n cfds.trading_pair as \"trading_pair: crate::model::TradingPair\",\n cfds.position as \"position: crate::model::Position\",\n cfds.initial_price as \"initial_price: crate::model::Price\",\n cfds.leverage as \"leverage: crate::model::Leverage\",\n cfds.liquidation_price as \"liquidation_price: crate::model::Price\",\n cfds.creation_timestamp_seconds as \"creation_timestamp_seconds: crate::model::Timestamp\",\n cfds.settlement_time_interval_seconds as \"settlement_time_interval_secs: i64\",\n cfds.origin as \"origin: crate::model::cfd::Origin\",\n cfds.oracle_event_id as \"oracle_event_id: crate::model::BitMexPriceEventId\",\n cfds.fee_rate as \"fee_rate: u32\",\n cfds.quantity_usd as \"quantity_usd: crate::model::Usd\",\n cfds.counterparty as \"counterparty: crate::model::Identity\",\n state.state\n\n from cfds\n inner join state on state.cfd_id = cfds.id\n\n where cfds.oracle_event_id = $1\n ",
"describe": {
"columns": [
{
@ -132,64 +42,54 @@
"ordinal": 3,
"type_info": "Text"
},
{
"name": "min_quantity: crate::model::Usd",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "max_quantity: crate::model::Usd",
"ordinal": 5,
"type_info": "Text"
},
{
"name": "leverage: crate::model::Leverage",
"ordinal": 6,
"ordinal": 4,
"type_info": "Int64"
},
{
"name": "liquidation_price: crate::model::Price",
"ordinal": 7,
"ordinal": 5,
"type_info": "Text"
},
{
"name": "ts_secs: crate::model::Timestamp",
"ordinal": 8,
"name": "creation_timestamp_seconds: crate::model::Timestamp",
"ordinal": 6,
"type_info": "Int64"
},
{
"name": "settlement_time_interval_secs: i64",
"ordinal": 9,
"ordinal": 7,
"type_info": "Int64"
},
{
"name": "origin: crate::model::cfd::Origin",
"ordinal": 10,
"ordinal": 8,
"type_info": "Text"
},
{
"name": "oracle_event_id: crate::model::BitMexPriceEventId",
"ordinal": 11,
"ordinal": 9,
"type_info": "Text"
},
{
"name": "fee_rate: u32",
"ordinal": 12,
"type_info": "Null"
"ordinal": 10,
"type_info": "Int64"
},
{
"name": "quantity_usd: crate::model::Usd",
"ordinal": 13,
"ordinal": 11,
"type_info": "Text"
},
{
"name": "counterparty: crate::model::Identity",
"ordinal": 14,
"ordinal": 12,
"type_info": "Text"
},
{
"name": "state",
"ordinal": 15,
"ordinal": 13,
"type_info": "Text"
}
],
@ -210,14 +110,12 @@
false,
false,
false,
false,
false,
false
]
}
},
"8cbe349911b35d8e79763d64b4f5813b4bd98f12e0bba5ada84d2cae8b08ef4f": {
"query": "\n select\n id\n from cfds\n where order_uuid = $1;\n ",
"9f31d4002a7328b199a24d50149f2724706e2d391a94b76d7894983f5eb71c4b": {
"query": "\n select\n id\n from cfds\n where cfds.uuid = $1;\n ",
"describe": {
"columns": [
{
@ -234,8 +132,8 @@
]
}
},
"ef4fb6c58e79051bd09ad04f59b7896df5228c6c848c999149668d7c733115c2": {
"query": "\n with ord as (\n select\n id as order_id,\n uuid,\n trading_pair,\n position,\n initial_price,\n min_quantity,\n max_quantity,\n leverage,\n liquidation_price,\n creation_timestamp_seconds as ts_secs,\n settlement_time_interval_seconds as settlement_time_interval_secs,\n origin,\n oracle_event_id,\n fee_rate\n from orders\n ),\n\n cfd as (\n select\n ord.order_id,\n id as cfd_id,\n quantity_usd,\n counterparty\n from cfds\n inner join ord on ord.order_id = cfds.order_id\n ),\n\n state as (\n select\n id as state_id,\n cfd.order_id,\n cfd.quantity_usd,\n cfd.counterparty,\n state\n from cfd_states\n inner join cfd on cfd.cfd_id = cfd_states.cfd_id\n where id in (\n select\n max(id) as id\n from cfd_states\n group by (cfd_id)\n )\n )\n\n select\n ord.uuid as \"uuid: crate::model::cfd::OrderId\",\n ord.trading_pair as \"trading_pair: crate::model::TradingPair\",\n ord.position as \"position: crate::model::Position\",\n ord.initial_price as \"initial_price: crate::model::Price\",\n ord.min_quantity as \"min_quantity: crate::model::Usd\",\n ord.max_quantity as \"max_quantity: crate::model::Usd\",\n ord.leverage as \"leverage: crate::model::Leverage\",\n ord.liquidation_price as \"liquidation_price: crate::model::Price\",\n ord.ts_secs as \"ts_secs: crate::model::Timestamp\",\n ord.settlement_time_interval_secs as \"settlement_time_interval_secs: i64\",\n ord.origin as \"origin: crate::model::cfd::Origin\",\n ord.oracle_event_id as \"oracle_event_id: crate::model::BitMexPriceEventId\",\n ord.fee_rate as \"fee_rate: u32\",\n state.quantity_usd as \"quantity_usd: crate::model::Usd\",\n state.counterparty as \"counterparty: crate::model::Identity\",\n state.state\n\n from ord\n inner join state on state.order_id = ord.order_id\n ",
"c64374031a424b78b1061d05d0087d79c7251fe6da1dd3cb5d146d1e2b4dd12f": {
"query": "\n with state as (\n select\n cfd_id,\n state\n from cfd_states\n inner join cfds on cfds.id = cfd_states.cfd_id\n where cfd_states.id in (\n select\n max(id) as id\n from cfd_states\n group by (cfd_id)\n )\n )\n\n select\n cfds.uuid as \"uuid: crate::model::cfd::OrderId\",\n cfds.trading_pair as \"trading_pair: crate::model::TradingPair\",\n cfds.position as \"position: crate::model::Position\",\n cfds.initial_price as \"initial_price: crate::model::Price\",\n cfds.leverage as \"leverage: crate::model::Leverage\",\n cfds.liquidation_price as \"liquidation_price: crate::model::Price\",\n cfds.creation_timestamp_seconds as \"creation_timestamp_seconds: crate::model::Timestamp\",\n cfds.settlement_time_interval_seconds as \"settlement_time_interval_secs: i64\",\n cfds.origin as \"origin: crate::model::cfd::Origin\",\n cfds.oracle_event_id as \"oracle_event_id: crate::model::BitMexPriceEventId\",\n cfds.fee_rate as \"fee_rate: u32\",\n cfds.quantity_usd as \"quantity_usd: crate::model::Usd\",\n cfds.counterparty as \"counterparty: crate::model::Identity\",\n state.state\n\n from cfds\n inner join state on state.cfd_id = cfds.id\n ",
"describe": {
"columns": [
{
@ -258,64 +156,54 @@
"ordinal": 3,
"type_info": "Text"
},
{
"name": "min_quantity: crate::model::Usd",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "max_quantity: crate::model::Usd",
"ordinal": 5,
"type_info": "Text"
},
{
"name": "leverage: crate::model::Leverage",
"ordinal": 6,
"ordinal": 4,
"type_info": "Int64"
},
{
"name": "liquidation_price: crate::model::Price",
"ordinal": 7,
"ordinal": 5,
"type_info": "Text"
},
{
"name": "ts_secs: crate::model::Timestamp",
"ordinal": 8,
"name": "creation_timestamp_seconds: crate::model::Timestamp",
"ordinal": 6,
"type_info": "Int64"
},
{
"name": "settlement_time_interval_secs: i64",
"ordinal": 9,
"ordinal": 7,
"type_info": "Int64"
},
{
"name": "origin: crate::model::cfd::Origin",
"ordinal": 10,
"ordinal": 8,
"type_info": "Text"
},
{
"name": "oracle_event_id: crate::model::BitMexPriceEventId",
"ordinal": 11,
"ordinal": 9,
"type_info": "Text"
},
{
"name": "fee_rate: u32",
"ordinal": 12,
"type_info": "Null"
"ordinal": 10,
"type_info": "Int64"
},
{
"name": "quantity_usd: crate::model::Usd",
"ordinal": 13,
"ordinal": 11,
"type_info": "Text"
},
{
"name": "counterparty: crate::model::Identity",
"ordinal": 14,
"ordinal": 12,
"type_info": "Text"
},
{
"name": "state",
"ordinal": 15,
"ordinal": 13,
"type_info": "Text"
}
],
@ -336,14 +224,12 @@
false,
false,
false,
false,
false,
false
]
}
},
"f6ac595731d61e166b2afaa4923e9f05c75cfad6d1d734ab43e7567d0e29adec": {
"query": "\n with ord as (\n select\n id as order_id,\n uuid,\n trading_pair,\n position,\n initial_price,\n min_quantity,\n max_quantity,\n leverage,\n liquidation_price,\n creation_timestamp_seconds as ts_secs,\n settlement_time_interval_seconds as settlement_time_interval_secs,\n origin,\n oracle_event_id,\n fee_rate\n from orders\n ),\n\n cfd as (\n select\n ord.order_id,\n id as cfd_id,\n quantity_usd,\n counterparty\n from cfds\n inner join ord on ord.order_id = cfds.order_id\n ),\n\n state as (\n select\n id as state_id,\n cfd.order_id,\n cfd.quantity_usd,\n cfd.counterparty,\n state\n from cfd_states\n inner join cfd on cfd.cfd_id = cfd_states.cfd_id\n where id in (\n select\n max(id) as id\n from cfd_states\n group by (cfd_id)\n )\n )\n\n select\n ord.uuid as \"uuid: crate::model::cfd::OrderId\",\n ord.trading_pair as \"trading_pair: crate::model::TradingPair\",\n ord.position as \"position: crate::model::Position\",\n ord.initial_price as \"initial_price: crate::model::Price\",\n ord.min_quantity as \"min_quantity: crate::model::Usd\",\n ord.max_quantity as \"max_quantity: crate::model::Usd\",\n ord.leverage as \"leverage: crate::model::Leverage\",\n ord.liquidation_price as \"liquidation_price: crate::model::Price\",\n ord.ts_secs as \"ts_secs: crate::model::Timestamp\",\n ord.settlement_time_interval_secs as \"settlement_time_interval_secs: i64\",\n ord.origin as \"origin: crate::model::cfd::Origin\",\n ord.oracle_event_id as \"oracle_event_id: crate::model::BitMexPriceEventId\",\n ord.fee_rate as \"fee_rate: u32\",\n state.quantity_usd as \"quantity_usd: crate::model::Usd\",\n state.counterparty as \"counterparty: crate::model::Identity\",\n state.state\n\n from ord\n inner join state on state.order_id = ord.order_id\n\n where ord.oracle_event_id = $1\n ",
"ff0e5909f36d03a19434acef0bf307c5b8a70beae6f619853f4e3ce5a8c53b61": {
"query": "\n with state as (\n select\n cfd_id,\n state\n from cfd_states\n inner join cfds on cfds.id = cfd_states.cfd_id\n where cfd_states.id in (\n select\n max(id) as id\n from cfd_states\n group by (cfd_id)\n )\n )\n\n select\n cfds.uuid as \"uuid: crate::model::cfd::OrderId\",\n cfds.trading_pair as \"trading_pair: crate::model::TradingPair\",\n cfds.position as \"position: crate::model::Position\",\n cfds.initial_price as \"initial_price: crate::model::Price\",\n cfds.leverage as \"leverage: crate::model::Leverage\",\n cfds.liquidation_price as \"liquidation_price: crate::model::Price\",\n cfds.creation_timestamp_seconds as \"creation_timestamp_seconds: crate::model::Timestamp\",\n cfds.settlement_time_interval_seconds as \"settlement_time_interval_secs: i64\",\n cfds.origin as \"origin: crate::model::cfd::Origin\",\n cfds.oracle_event_id as \"oracle_event_id: crate::model::BitMexPriceEventId\",\n cfds.fee_rate as \"fee_rate: u32\",\n cfds.quantity_usd as \"quantity_usd: crate::model::Usd\",\n cfds.counterparty as \"counterparty: crate::model::Identity\",\n state.state\n\n from cfds\n inner join state on state.cfd_id = cfds.id\n\n where cfds.uuid = $1\n ",
"describe": {
"columns": [
{
@ -366,64 +252,54 @@
"ordinal": 3,
"type_info": "Text"
},
{
"name": "min_quantity: crate::model::Usd",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "max_quantity: crate::model::Usd",
"ordinal": 5,
"type_info": "Text"
},
{
"name": "leverage: crate::model::Leverage",
"ordinal": 6,
"ordinal": 4,
"type_info": "Int64"
},
{
"name": "liquidation_price: crate::model::Price",
"ordinal": 7,
"ordinal": 5,
"type_info": "Text"
},
{
"name": "ts_secs: crate::model::Timestamp",
"ordinal": 8,
"name": "creation_timestamp_seconds: crate::model::Timestamp",
"ordinal": 6,
"type_info": "Int64"
},
{
"name": "settlement_time_interval_secs: i64",
"ordinal": 9,
"ordinal": 7,
"type_info": "Int64"
},
{
"name": "origin: crate::model::cfd::Origin",
"ordinal": 10,
"ordinal": 8,
"type_info": "Text"
},
{
"name": "oracle_event_id: crate::model::BitMexPriceEventId",
"ordinal": 11,
"ordinal": 9,
"type_info": "Text"
},
{
"name": "fee_rate: u32",
"ordinal": 12,
"type_info": "Null"
"ordinal": 10,
"type_info": "Int64"
},
{
"name": "quantity_usd: crate::model::Usd",
"ordinal": 13,
"ordinal": 11,
"type_info": "Text"
},
{
"name": "counterparty: crate::model::Identity",
"ordinal": 14,
"ordinal": 12,
"type_info": "Text"
},
{
"name": "state",
"ordinal": 15,
"ordinal": 13,
"type_info": "Text"
}
],
@ -444,8 +320,6 @@
false,
false,
false,
false,
false,
false
]
}

14
daemon/src/collab_settlement_maker.rs

@ -45,7 +45,7 @@ pub struct Initiated {
#[xtra_productivity]
impl Actor {
async fn handle(&mut self, _: Accepted, ctx: &mut xtra::Context<Self>) {
let order_id = self.cfd.order.id;
let order_id = self.cfd.id;
tracing::info!(%order_id, "Settlement proposal accepted");
@ -54,7 +54,7 @@ impl Actor {
}
async fn handle(&mut self, _: Rejected, ctx: &mut xtra::Context<Self>) {
let order_id = self.cfd.order.id;
let order_id = self.cfd.id;
tracing::info!(%order_id, "Settlement proposal rejected");
@ -65,7 +65,7 @@ impl Actor {
async fn handle(&mut self, msg: Initiated, ctx: &mut xtra::Context<Self>) {
let completed = async {
tracing::info!(
order_id = %self.cfd.order.id,
order_id = %self.cfd.id,
taker_id = %self.taker_id,
"Received signature for collaborative settlement"
);
@ -85,14 +85,14 @@ impl Actor {
self.update_proposal(None).await;
anyhow::Ok(Completed::Confirmed {
order_id: self.cfd.order.id,
order_id: self.cfd.id,
settlement,
script_pubkey: dlc.script_pubkey_for(Role::Maker),
})
}
.await
.unwrap_or_else(|e| Completed::Failed {
order_id: self.cfd.order.id,
order_id: self.cfd.id,
error: e,
});
@ -154,7 +154,7 @@ impl Actor {
if let Err(e) = self
.projection
.send(projection::UpdateSettlementProposal {
order: self.cfd.order.id,
order: self.cfd.id,
proposal,
})
.await
@ -191,7 +191,7 @@ impl Actor {
decision: maker_inc_connections::settlement::Decision,
ctx: &mut xtra::Context<Self>,
) {
let order_id = self.cfd.order.id;
let order_id = self.cfd.id;
if let Err(e) = self
.connections

14
daemon/src/collab_settlement_taker.rs

@ -41,7 +41,7 @@ impl Actor {
if !self.cfd.is_collaborative_settle_possible() {
anyhow::bail!(
"Settlement proposal not possible because for cfd {} is in state {} which cannot be collaboratively settled",
self.cfd.order.id,
self.cfd.id,
self.cfd.state
)
}
@ -53,7 +53,7 @@ impl Actor {
maker: self.proposal.maker,
price: self.proposal.price,
address: this,
order_id: self.cfd.order.id,
order_id: self.cfd.id,
})
.await??;
@ -64,7 +64,7 @@ impl Actor {
}
async fn handle_confirmed(&mut self) -> Result<CollaborativeSettlement> {
let order_id = self.cfd.order.id;
let order_id = self.cfd.id;
tracing::info!(%order_id, "Settlement proposal got accepted");
@ -93,7 +93,7 @@ impl Actor {
}
async fn handle_rejected(&mut self) -> Result<()> {
let order_id = self.cfd.order.id;
let order_id = self.cfd.id;
tracing::info!(%order_id, "Settlement proposal got rejected");
@ -108,7 +108,7 @@ impl Actor {
) -> Result<()> {
self.projection
.send(projection::UpdateSettlementProposal {
order: self.cfd.order.id,
order: self.cfd.id,
proposal,
})
.await?;
@ -131,7 +131,7 @@ impl xtra::Actor for Actor {
if let Err(e) = self.propose(this).await {
self.complete(
Completed::Failed {
order_id: self.cfd.order.id,
order_id: self.cfd.id,
error: e,
},
ctx,
@ -170,7 +170,7 @@ impl Actor {
msg: wire::maker_to_taker::Settlement,
ctx: &mut xtra::Context<Self>,
) {
let order_id = self.cfd.order.id;
let order_id = self.cfd.id;
let completed = match msg {
wire::maker_to_taker::Settlement::Confirm => match self.handle_confirmed().await {

442
daemon/src/db.rs

@ -1,6 +1,6 @@
use crate::model::cfd::{Cfd, CfdState, Order, OrderId};
use crate::model::cfd::{Cfd, CfdState, OrderId};
use crate::model::BitMexPriceEventId;
use anyhow::{bail, Context, Result};
use anyhow::{Context, Result};
use sqlx::pool::PoolConnection;
use sqlx::{Sqlite, SqlitePool};
use std::mem;
@ -11,118 +11,25 @@ pub async fn run_migrations(pool: &SqlitePool) -> anyhow::Result<()> {
Ok(())
}
pub async fn insert_order(order: &Order, conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<()> {
pub async fn insert_cfd(cfd: &Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<()> {
let state = serde_json::to_string(&cfd.state)?;
let query_result = sqlx::query(
r#"insert into orders (
r#"
insert into cfds (
uuid,
trading_pair,
position,
initial_price,
min_quantity,
max_quantity,
leverage,
liquidation_price,
creation_timestamp_seconds,
settlement_time_interval_seconds,
origin,
oracle_event_id,
fee_rate
) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)"#,
)
.bind(&order.id)
.bind(&order.trading_pair)
.bind(&order.position)
.bind(&order.price)
.bind(&order.min_quantity)
.bind(&order.max_quantity)
.bind(&order.leverage)
.bind(&order.liquidation_price)
.bind(&order.creation_timestamp.seconds())
.bind(&order.settlement_interval.whole_seconds())
.bind(&order.origin)
.bind(&order.oracle_event_id)
.bind(&order.fee_rate)
.execute(conn)
.await?;
if query_result.rows_affected() != 1 {
anyhow::bail!("failed to insert order");
}
Ok(())
}
pub async fn load_order_by_id(
id: OrderId,
conn: &mut PoolConnection<Sqlite>,
) -> anyhow::Result<Order> {
let row = sqlx::query!(
r#"
select
uuid as "uuid: crate::model::cfd::OrderId",
trading_pair as "trading_pair: crate::model::TradingPair",
position as "position: crate::model::Position",
initial_price as "initial_price: crate::model::Price",
min_quantity as "min_quantity: crate::model::Usd",
max_quantity as "max_quantity: crate::model::Usd",
leverage as "leverage: crate::model::Leverage",
liquidation_price as "liquidation_price: crate::model::Price",
creation_timestamp_seconds as "ts_secs: crate::model::Timestamp",
settlement_time_interval_seconds as "settlement_time_interval_secs: i64",
origin as "origin: crate::model::cfd::Origin",
oracle_event_id as "oracle_event_id: crate::model::BitMexPriceEventId",
fee_rate as "fee_rate: u32"
from
orders
where
uuid = $1
"#,
id
)
.fetch_one(conn)
.await?;
Ok(Order {
id: row.uuid,
trading_pair: row.trading_pair,
position: row.position,
price: row.initial_price,
min_quantity: row.min_quantity,
max_quantity: row.max_quantity,
leverage: row.leverage,
liquidation_price: row.liquidation_price,
creation_timestamp: row.ts_secs,
settlement_interval: Duration::new(row.settlement_time_interval_secs, 0),
origin: row.origin,
oracle_event_id: row.oracle_event_id,
fee_rate: row.fee_rate,
})
}
pub async fn insert_cfd(cfd: &Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<()> {
if load_cfd_by_order_id(cfd.order.id, conn).await.is_ok() {
bail!(
"Cannot insert cfd because there is already a cfd for order id {}",
cfd.order.id
)
}
let state = serde_json::to_string(&cfd.state)?;
let query_result = sqlx::query(
r#"
insert into cfds (
order_id,
order_uuid,
fee_rate,
quantity_usd,
counterparty
)
select
id as order_id,
uuid as order_uuid,
$2 as quantity_usd,
$3 as counterparty
from orders
where uuid = $1;
) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13);
insert into cfd_states (
cfd_id,
@ -130,17 +37,28 @@ pub async fn insert_cfd(cfd: &Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow:
)
select
id as cfd_id,
$4 as state
$14 as state
from cfds
order by id desc limit 1;
"#,
)
.bind(&cfd.order.id)
.bind(&cfd.id)
.bind(&cfd.trading_pair)
.bind(&cfd.position)
.bind(&cfd.price)
.bind(&cfd.leverage)
.bind(&cfd.liquidation_price)
.bind(&cfd.creation_timestamp)
.bind(&cfd.settlement_interval.whole_seconds())
.bind(&cfd.origin)
.bind(&cfd.oracle_event_id)
.bind(&cfd.fee_rate)
.bind(&cfd.quantity_usd)
.bind(&cfd.counterparty)
.bind(state)
.execute(conn)
.await?;
.await
.with_context(|| format!("Failed to insert CFD with id {}", cfd.id))?;
// Should be 2 because we insert into cfds and cfd_states
if query_result.rows_affected() != 2 {
@ -151,7 +69,7 @@ pub async fn insert_cfd(cfd: &Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow:
}
pub async fn append_cfd_state(cfd: &Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<()> {
let cfd_id = load_cfd_id_by_order_uuid(cfd.order.id, conn).await?;
let cfd_id = load_cfd_id_by_order_uuid(cfd.id, conn).await?;
let current_state = load_latest_cfd_state(cfd_id, conn)
.await
.context("loading latest state failed")?;
@ -161,7 +79,7 @@ pub async fn append_cfd_state(cfd: &Cfd, conn: &mut PoolConnection<Sqlite>) -> a
// Since we have states where we add information this happens quite frequently
tracing::trace!(
"Same state transition for cfd with order_id {}: {}",
cfd.order.id,
cfd.id,
current_state
);
}
@ -193,7 +111,7 @@ async fn load_cfd_id_by_order_uuid(
select
id
from cfds
where order_uuid = $1;
where cfds.uuid = $1;
"#,
order_uuid
)
@ -234,45 +152,13 @@ pub async fn load_cfd_by_order_id(
) -> Result<Cfd> {
let row = sqlx::query!(
r#"
with ord as (
with state as (
select
id as order_id,
uuid,
trading_pair,
position,
initial_price,
min_quantity,
max_quantity,
leverage,
liquidation_price,
creation_timestamp_seconds as ts_secs,
settlement_time_interval_seconds as settlement_time_interval_secs,
origin,
oracle_event_id,
fee_rate
from orders
),
cfd as (
select
ord.order_id,
id as cfd_id,
quantity_usd,
counterparty
from cfds
inner join ord on ord.order_id = cfds.order_id
),
state as (
select
id as state_id,
cfd.order_id,
cfd.quantity_usd,
cfd.counterparty,
cfd_id,
state
from cfd_states
inner join cfd on cfd.cfd_id = cfd_states.cfd_id
where id in (
inner join cfds on cfds.id = cfd_states.cfd_id
where cfd_states.id in (
select
max(id) as id
from cfd_states
@ -281,54 +167,46 @@ pub async fn load_cfd_by_order_id(
)
select
ord.uuid as "uuid: crate::model::cfd::OrderId",
ord.trading_pair as "trading_pair: crate::model::TradingPair",
ord.position as "position: crate::model::Position",
ord.initial_price as "initial_price: crate::model::Price",
ord.min_quantity as "min_quantity: crate::model::Usd",
ord.max_quantity as "max_quantity: crate::model::Usd",
ord.leverage as "leverage: crate::model::Leverage",
ord.liquidation_price as "liquidation_price: crate::model::Price",
ord.ts_secs as "ts_secs: crate::model::Timestamp",
ord.settlement_time_interval_secs as "settlement_time_interval_secs: i64",
ord.origin as "origin: crate::model::cfd::Origin",
ord.oracle_event_id as "oracle_event_id: crate::model::BitMexPriceEventId",
ord.fee_rate as "fee_rate: u32",
state.quantity_usd as "quantity_usd: crate::model::Usd",
state.counterparty as "counterparty: crate::model::Identity",
cfds.uuid as "uuid: crate::model::cfd::OrderId",
cfds.trading_pair as "trading_pair: crate::model::TradingPair",
cfds.position as "position: crate::model::Position",
cfds.initial_price as "initial_price: crate::model::Price",
cfds.leverage as "leverage: crate::model::Leverage",
cfds.liquidation_price as "liquidation_price: crate::model::Price",
cfds.creation_timestamp_seconds as "creation_timestamp_seconds: crate::model::Timestamp",
cfds.settlement_time_interval_seconds as "settlement_time_interval_secs: i64",
cfds.origin as "origin: crate::model::cfd::Origin",
cfds.oracle_event_id as "oracle_event_id: crate::model::BitMexPriceEventId",
cfds.fee_rate as "fee_rate: u32",
cfds.quantity_usd as "quantity_usd: crate::model::Usd",
cfds.counterparty as "counterparty: crate::model::Identity",
state.state
from ord
inner join state on state.order_id = ord.order_id
from cfds
inner join state on state.cfd_id = cfds.id
where ord.uuid = $1
where cfds.uuid = $1
"#,
order_id
)
.fetch_one(conn)
.await?;
let order = Order {
// TODO:
// still have the use of serde_json::from_str() here, which will be dealt with
// via https://github.com/comit-network/hermes/issues/290
Ok(Cfd {
id: row.uuid,
trading_pair: row.trading_pair,
position: row.position,
price: row.initial_price,
min_quantity: row.min_quantity,
max_quantity: row.max_quantity,
leverage: row.leverage,
liquidation_price: row.liquidation_price,
creation_timestamp: row.ts_secs,
creation_timestamp: row.creation_timestamp_seconds,
settlement_interval: Duration::new(row.settlement_time_interval_secs, 0),
origin: row.origin,
oracle_event_id: row.oracle_event_id,
fee_rate: row.fee_rate,
};
// TODO:
// still have the use of serde_json::from_str() here, which will be dealt with
// via https://github.com/comit-network/hermes/issues/290
Ok(Cfd {
order,
quantity_usd: row.quantity_usd,
state: serde_json::from_str(row.state.as_str())?,
counterparty: row.counterparty,
@ -339,45 +217,13 @@ pub async fn load_cfd_by_order_id(
pub async fn load_all_cfds(conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<Vec<Cfd>> {
let rows = sqlx::query!(
r#"
with ord as (
select
id as order_id,
uuid,
trading_pair,
position,
initial_price,
min_quantity,
max_quantity,
leverage,
liquidation_price,
creation_timestamp_seconds as ts_secs,
settlement_time_interval_seconds as settlement_time_interval_secs,
origin,
oracle_event_id,
fee_rate
from orders
),
cfd as (
with state as (
select
ord.order_id,
id as cfd_id,
quantity_usd,
counterparty
from cfds
inner join ord on ord.order_id = cfds.order_id
),
state as (
select
id as state_id,
cfd.order_id,
cfd.quantity_usd,
cfd.counterparty,
cfd_id,
state
from cfd_states
inner join cfd on cfd.cfd_id = cfd_states.cfd_id
where id in (
inner join cfds on cfds.id = cfd_states.cfd_id
where cfd_states.id in (
select
max(id) as id
from cfd_states
@ -386,25 +232,23 @@ pub async fn load_all_cfds(conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<
)
select
ord.uuid as "uuid: crate::model::cfd::OrderId",
ord.trading_pair as "trading_pair: crate::model::TradingPair",
ord.position as "position: crate::model::Position",
ord.initial_price as "initial_price: crate::model::Price",
ord.min_quantity as "min_quantity: crate::model::Usd",
ord.max_quantity as "max_quantity: crate::model::Usd",
ord.leverage as "leverage: crate::model::Leverage",
ord.liquidation_price as "liquidation_price: crate::model::Price",
ord.ts_secs as "ts_secs: crate::model::Timestamp",
ord.settlement_time_interval_secs as "settlement_time_interval_secs: i64",
ord.origin as "origin: crate::model::cfd::Origin",
ord.oracle_event_id as "oracle_event_id: crate::model::BitMexPriceEventId",
ord.fee_rate as "fee_rate: u32",
state.quantity_usd as "quantity_usd: crate::model::Usd",
state.counterparty as "counterparty: crate::model::Identity",
cfds.uuid as "uuid: crate::model::cfd::OrderId",
cfds.trading_pair as "trading_pair: crate::model::TradingPair",
cfds.position as "position: crate::model::Position",
cfds.initial_price as "initial_price: crate::model::Price",
cfds.leverage as "leverage: crate::model::Leverage",
cfds.liquidation_price as "liquidation_price: crate::model::Price",
cfds.creation_timestamp_seconds as "creation_timestamp_seconds: crate::model::Timestamp",
cfds.settlement_time_interval_seconds as "settlement_time_interval_secs: i64",
cfds.origin as "origin: crate::model::cfd::Origin",
cfds.oracle_event_id as "oracle_event_id: crate::model::BitMexPriceEventId",
cfds.fee_rate as "fee_rate: u32",
cfds.quantity_usd as "quantity_usd: crate::model::Usd",
cfds.counterparty as "counterparty: crate::model::Identity",
state.state
from ord
inner join state on state.order_id = ord.order_id
from cfds
inner join state on state.cfd_id = cfds.id
"#
)
.fetch_all(conn)
@ -413,24 +257,18 @@ pub async fn load_all_cfds(conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<
let cfds = rows
.into_iter()
.map(|row| {
let order = Order {
Ok(Cfd {
id: row.uuid,
trading_pair: row.trading_pair,
position: row.position,
price: row.initial_price,
min_quantity: row.min_quantity,
max_quantity: row.max_quantity,
leverage: row.leverage,
liquidation_price: row.liquidation_price,
creation_timestamp: row.ts_secs,
creation_timestamp: row.creation_timestamp_seconds,
settlement_interval: Duration::new(row.settlement_time_interval_secs, 0),
origin: row.origin,
oracle_event_id: row.oracle_event_id,
fee_rate: row.fee_rate,
};
Ok(Cfd {
order,
quantity_usd: row.quantity_usd,
state: serde_json::from_str(row.state.as_str())?,
counterparty: row.counterparty,
@ -449,45 +287,13 @@ pub async fn load_cfds_by_oracle_event_id(
let event_id = oracle_event_id.to_string();
let rows = sqlx::query!(
r#"
with ord as (
select
id as order_id,
uuid,
trading_pair,
position,
initial_price,
min_quantity,
max_quantity,
leverage,
liquidation_price,
creation_timestamp_seconds as ts_secs,
settlement_time_interval_seconds as settlement_time_interval_secs,
origin,
oracle_event_id,
fee_rate
from orders
),
cfd as (
with state as (
select
ord.order_id,
id as cfd_id,
quantity_usd,
counterparty
from cfds
inner join ord on ord.order_id = cfds.order_id
),
state as (
select
id as state_id,
cfd.order_id,
cfd.quantity_usd,
cfd.counterparty,
cfd_id,
state
from cfd_states
inner join cfd on cfd.cfd_id = cfd_states.cfd_id
where id in (
inner join cfds on cfds.id = cfd_states.cfd_id
where cfd_states.id in (
select
max(id) as id
from cfd_states
@ -496,27 +302,25 @@ pub async fn load_cfds_by_oracle_event_id(
)
select
ord.uuid as "uuid: crate::model::cfd::OrderId",
ord.trading_pair as "trading_pair: crate::model::TradingPair",
ord.position as "position: crate::model::Position",
ord.initial_price as "initial_price: crate::model::Price",
ord.min_quantity as "min_quantity: crate::model::Usd",
ord.max_quantity as "max_quantity: crate::model::Usd",
ord.leverage as "leverage: crate::model::Leverage",
ord.liquidation_price as "liquidation_price: crate::model::Price",
ord.ts_secs as "ts_secs: crate::model::Timestamp",
ord.settlement_time_interval_secs as "settlement_time_interval_secs: i64",
ord.origin as "origin: crate::model::cfd::Origin",
ord.oracle_event_id as "oracle_event_id: crate::model::BitMexPriceEventId",
ord.fee_rate as "fee_rate: u32",
state.quantity_usd as "quantity_usd: crate::model::Usd",
state.counterparty as "counterparty: crate::model::Identity",
cfds.uuid as "uuid: crate::model::cfd::OrderId",
cfds.trading_pair as "trading_pair: crate::model::TradingPair",
cfds.position as "position: crate::model::Position",
cfds.initial_price as "initial_price: crate::model::Price",
cfds.leverage as "leverage: crate::model::Leverage",
cfds.liquidation_price as "liquidation_price: crate::model::Price",
cfds.creation_timestamp_seconds as "creation_timestamp_seconds: crate::model::Timestamp",
cfds.settlement_time_interval_seconds as "settlement_time_interval_secs: i64",
cfds.origin as "origin: crate::model::cfd::Origin",
cfds.oracle_event_id as "oracle_event_id: crate::model::BitMexPriceEventId",
cfds.fee_rate as "fee_rate: u32",
cfds.quantity_usd as "quantity_usd: crate::model::Usd",
cfds.counterparty as "counterparty: crate::model::Identity",
state.state
from ord
inner join state on state.order_id = ord.order_id
from cfds
inner join state on state.cfd_id = cfds.id
where ord.oracle_event_id = $1
where cfds.oracle_event_id = $1
"#,
event_id
)
@ -526,24 +330,18 @@ pub async fn load_cfds_by_oracle_event_id(
let cfds = rows
.into_iter()
.map(|row| {
let order = Order {
Ok(Cfd {
id: row.uuid,
trading_pair: row.trading_pair,
position: row.position,
price: row.initial_price,
min_quantity: row.min_quantity,
max_quantity: row.max_quantity,
leverage: row.leverage,
liquidation_price: row.liquidation_price,
creation_timestamp: row.ts_secs,
creation_timestamp: row.creation_timestamp_seconds,
settlement_interval: Duration::new(row.settlement_time_interval_secs, 0),
origin: row.origin,
oracle_event_id: row.oracle_event_id,
fee_rate: row.fee_rate,
};
Ok(Cfd {
order,
quantity_usd: row.quantity_usd,
state: serde_json::from_str(row.state.as_str())?,
counterparty: row.counterparty,
@ -557,7 +355,6 @@ pub async fn load_cfds_by_oracle_event_id(
#[cfg(test)]
mod tests {
use super::*;
use crate::db::insert_order;
use crate::model::cfd::{Cfd, CfdState, Order, Origin};
use crate::model::{Identity, Price, Usd};
use crate::seed::Seed;
@ -568,16 +365,6 @@ mod tests {
use time::macros::datetime;
use time::OffsetDateTime;
#[tokio::test]
async fn test_insert_and_load_order() {
let mut conn = setup_test_db().await;
let order = Order::dummy().insert(&mut conn).await;
let loaded = load_order_by_id(order.id, &mut conn).await.unwrap();
assert_eq!(order, loaded);
}
#[tokio::test]
async fn test_insert_and_load_cfd() {
let mut conn = setup_test_db().await;
@ -593,7 +380,7 @@ mod tests {
let mut conn = setup_test_db().await;
let cfd = Cfd::dummy().insert(&mut conn).await;
let loaded = load_cfd_by_order_id(cfd.order.id, &mut conn).await.unwrap();
let loaded = load_cfd_by_order_id(cfd.id, &mut conn).await.unwrap();
assert_eq!(cfd, loaded)
}
@ -605,12 +392,8 @@ mod tests {
let cfd1 = Cfd::dummy().insert(&mut conn).await;
let cfd2 = Cfd::dummy().insert(&mut conn).await;
let loaded_1 = load_cfd_by_order_id(cfd1.order.id, &mut conn)
.await
.unwrap();
let loaded_2 = load_cfd_by_order_id(cfd2.order.id, &mut conn)
.await
.unwrap();
let loaded_1 = load_cfd_by_order_id(cfd1.id, &mut conn).await.unwrap();
let loaded_2 = load_cfd_by_order_id(cfd2.id, &mut conn).await.unwrap();
assert_eq!(cfd1, loaded_1);
assert_eq!(cfd2, loaded_2);
@ -669,21 +452,6 @@ mod tests {
assert_eq!(vec![cfd_1, cfd_2], cfds_from_db);
}
#[tokio::test]
async fn test_insert_order_without_cfd_associates_with_correct_cfd() {
let mut conn = setup_test_db().await;
// Insert an order without a CFD
let _order_1 = Order::dummy().insert(&mut conn).await;
// Insert a CFD (this also inserts an order)
let cfd_1 = Cfd::dummy().insert(&mut conn).await;
let all_cfds = load_all_cfds(&mut conn).await.unwrap();
assert_eq!(all_cfds, vec![cfd_1]);
}
// test more data; test will add 100 cfds to the database, with each
// having a random number of random updates. Final results are deterministic.
#[tokio::test]
@ -704,7 +472,7 @@ mod tests {
}
// verify current state is correct
let loaded_by_order_id = load_cfd_by_order_id(cfd.order.id, &mut conn).await.unwrap();
let loaded_by_order_id = load_cfd_by_order_id(cfd.id, &mut conn).await.unwrap();
assert_eq!(loaded_by_order_id, cfd);
// load_cfds_by_oracle_event_id can return multiple CFDs
@ -729,10 +497,10 @@ mod tests {
let error = insert_cfd(&cfd, &mut conn).await.err().unwrap();
assert_eq!(
error.to_string(),
format!("{:#}", error),
format!(
"Cannot insert cfd because there is already a cfd for order id {}",
cfd.order.id
"Failed to insert CFD with id {}: error returned from database: UNIQUE constraint failed: cfds.uuid: UNIQUE constraint failed: cfds.uuid",
cfd.id,
)
);
}
@ -780,14 +548,13 @@ mod tests {
/// Insert this [`Cfd`] into the database, returning the instance for further chaining.
async fn insert(self, conn: &mut PoolConnection<Sqlite>) -> Self {
insert_order(&self.order, conn).await.unwrap();
insert_cfd(&self, conn).await.unwrap();
self
}
fn with_event_id(mut self, id: BitMexPriceEventId) -> Self {
self.order.oracle_event_id = id;
self.oracle_event_id = id;
self
}
}
@ -805,12 +572,5 @@ mod tests {
)
.unwrap()
}
/// Insert this [`Order`] into the database, returning the instance for further chaining.
async fn insert(self, conn: &mut PoolConnection<Sqlite>) -> Self {
insert_order(&self, conn).await.unwrap();
self
}
}
}

50
daemon/src/maker_cfd.rs

@ -1,6 +1,6 @@
use crate::address_map::{AddressMap, Stopping};
use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_update_feed};
use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id};
use crate::db::load_cfd_by_order_id;
use crate::model::cfd::{
Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId, Origin, Role, RollOverProposal,
SettlementKind, SettlementProposal, UpdateCfdProposal,
@ -84,7 +84,7 @@ pub struct Actor<O, M, T, W> {
oracle_pk: schnorrsig::PublicKey,
projection_actor: Address<projection::Actor>,
takers: Address<T>,
current_order_id: Option<OrderId>,
current_order: Option<Order>,
monitor_actor: Address<M>,
setup_actors: AddressMap<OrderId, setup_maker::Actor>,
settlement_actors: AddressMap<OrderId, collab_settlement_maker::Actor>,
@ -126,7 +126,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
oracle_pk,
projection_actor,
takers,
current_order_id: None,
current_order: None,
monitor_actor,
setup_actors: AddressMap::default(),
roll_over_state: RollOverState::None,
@ -294,20 +294,13 @@ where
T: xtra::Handler<maker_inc_connections::TakerMessage>,
{
async fn handle_taker_connected(&mut self, taker_id: Identity) -> Result<()> {
let mut conn = self.db.acquire().await?;
let current_order = match self.current_order_id {
Some(current_order_id) => Some(load_order_by_id(current_order_id, &mut conn).await?),
None => None,
};
// Need to use `do_send_async` here because we are being invoked from the
// `maker_inc_connections::Actor`. Using `send` would result in a deadlock.
#[allow(clippy::disallowed_method)]
self.takers
.do_send_async(maker_inc_connections::TakerMessage {
taker_id,
msg: wire::MakerToTaker::CurrentOrder(current_order),
msg: wire::MakerToTaker::CurrentOrder(self.current_order.clone()),
})
.await?;
@ -339,7 +332,7 @@ where
self.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
msg: wire::MakerToTaker::RejectOrder(cfd.order.id),
msg: wire::MakerToTaker::RejectOrder(cfd.id),
})
.await??;
@ -381,10 +374,8 @@ where
let mut conn = self.db.acquire().await?;
// 1. Validate if order is still valid
let current_order = match self.current_order_id {
Some(current_order_id) if current_order_id == order_id => {
load_order_by_id(current_order_id, &mut conn).await?
}
let current_order = match &self.current_order {
Some(current_order) if current_order.id == order_id => current_order.clone(),
_ => {
// An outdated order on the taker side does not require any state change on the
// maker. notifying the taker with a specific message should be sufficient.
@ -407,7 +398,7 @@ where
// The order is removed before we update the state, because the maker might react on the
// state change. Once we know that we go for either an accept/reject scenario we
// have to remove the current order.
self.current_order_id = None;
self.current_order = None;
// Need to use `do_send_async` here because invoking the
// corresponding handler can result in a deadlock with another
@ -437,15 +428,16 @@ where
// state
let announcement = self
.oracle_actor
.send(oracle::GetAnnouncement(cfd.order.oracle_event_id))
.send(oracle::GetAnnouncement(cfd.oracle_event_id))
.await??;
// 5. Start up contract setup actor
let this = ctx
.address()
.expect("actor to be able to give address to itself");
let (addr, fut) = setup_maker::Actor::new(
(cfd.order, cfd.quantity_usd, self.n_payouts),
(cfd, current_order, self.n_payouts),
(self.oracle_pk, announcement),
&self.wallet,
&self.wallet,
@ -678,7 +670,7 @@ where
let dlc = cfd.open_dlc().context("CFD was in wrong state")?;
let oracle_event_id = oracle::next_announcement_after(
time::OffsetDateTime::now_utc() + cfd.order.settlement_interval,
time::OffsetDateTime::now_utc() + cfd.settlement_interval,
)?;
let announcement = self
.oracle_actor
@ -707,11 +699,11 @@ where
receiver,
(self.oracle_pk, announcement),
RolloverParams::new(
cfd.order.price,
cfd.price,
cfd.quantity_usd,
cfd.order.leverage,
cfd.leverage,
cfd.refund_timelock_in_blocks(),
cfd.order.fee_rate,
cfd.fee_rate,
),
Role::Maker,
dlc,
@ -874,19 +866,15 @@ where
fee_rate,
)?;
// 1. Save to DB
let mut conn = self.db.acquire().await?;
insert_order(&order, &mut conn).await?;
// 2. Update actor state to current order
self.current_order_id.replace(order.id);
// 1. Update actor state to current order
self.current_order.replace(order.clone());
// 3. Notify UI via feed
// 2. Notify UI via feed
self.projection_actor
.send(projection::Update(Some(order.clone())))
.await?;
// 4. Inform connected takers
// 3. Inform connected takers
self.takers
.send(maker_inc_connections::BroadcastOrder(Some(order)))
.await?;

1
daemon/src/model.rs

@ -533,6 +533,7 @@ impl str::FromStr for BitMexPriceEventId {
impl_sqlx_type_display_from_str!(BitMexPriceEventId);
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, sqlx::Type)]
#[sqlx(transparent)]
pub struct Timestamp(i64);
impl Timestamp {

75
daemon/src/model/cfd.rs

@ -574,7 +574,30 @@ pub type UpdateCfdProposals = HashMap<OrderId, UpdateCfdProposal>;
/// Represents a cfd (including state)
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Cfd {
pub order: Order,
pub id: OrderId,
pub trading_pair: TradingPair,
pub position: Position,
pub price: Price,
pub leverage: Leverage,
pub liquidation_price: Price,
pub creation_timestamp: Timestamp,
/// The duration that will be used for calculating the settlement timestamp
pub settlement_interval: Duration,
pub origin: Origin,
/// The id of the event to be used for price attestation
///
/// The maker includes this into the Order based on the Oracle announcement to be used.
pub oracle_event_id: BitMexPriceEventId,
pub fee_rate: u32,
pub quantity_usd: Usd,
pub state: CfdState,
@ -586,19 +609,27 @@ pub struct Cfd {
impl Cfd {
pub fn new(order: Order, quantity: Usd, state: CfdState, counterparty: Identity) -> Self {
Cfd {
order,
id: order.id,
quantity_usd: quantity,
state,
trading_pair: order.trading_pair,
position: order.position,
price: order.price,
leverage: order.leverage,
liquidation_price: order.liquidation_price,
creation_timestamp: Timestamp::now(),
settlement_interval: order.settlement_interval,
origin: order.origin,
oracle_event_id: order.oracle_event_id,
fee_rate: order.fee_rate,
counterparty,
}
}
pub fn margin(&self) -> Result<Amount> {
let margin = match self.position() {
Position::Long => {
calculate_long_margin(self.order.price, self.quantity_usd, self.order.leverage)
}
Position::Short => calculate_short_margin(self.order.price, self.quantity_usd),
Position::Long => calculate_long_margin(self.price, self.quantity_usd, self.leverage),
Position::Short => calculate_short_margin(self.price, self.quantity_usd),
};
Ok(margin)
@ -606,10 +637,8 @@ impl Cfd {
pub fn counterparty_margin(&self) -> Result<Amount> {
let margin = match self.position() {
Position::Long => calculate_short_margin(self.order.price, self.quantity_usd),
Position::Short => {
calculate_long_margin(self.order.price, self.quantity_usd, self.order.leverage)
}
Position::Long => calculate_short_margin(self.price, self.quantity_usd),
Position::Short => calculate_long_margin(self.price, self.quantity_usd, self.leverage),
};
Ok(margin)
@ -624,10 +653,10 @@ impl Cfd {
};
let (p_n_l, p_n_l_percent) = calculate_profit(
self.order.price,
self.price,
closing_price,
self.quantity_usd,
self.order.leverage,
self.leverage,
self.position(),
)?;
@ -639,12 +668,8 @@ impl Cfd {
current_price: Price,
n_payouts: usize,
) -> Result<SettlementProposal> {
let payout_curve = payout_curve::calculate(
self.order.price,
self.quantity_usd,
self.order.leverage,
n_payouts,
)?;
let payout_curve =
payout_curve::calculate(self.price, self.quantity_usd, self.leverage, n_payouts)?;
let payout = {
let current_price = current_price.try_into_u64()?;
@ -655,7 +680,7 @@ impl Cfd {
};
let settlement = SettlementProposal {
order_id: self.order.id,
order_id: self.id,
timestamp: Timestamp::now(),
taker: *payout.taker_amount(),
maker: *payout.maker_amount(),
@ -666,11 +691,11 @@ impl Cfd {
}
pub fn position(&self) -> Position {
match self.order.origin {
Origin::Ours => self.order.position,
match self.origin {
Origin::Ours => self.position,
// If the order is not our own we take the counter-position in the CFD
Origin::Theirs => match self.order.position {
Origin::Theirs => match self.position {
Position::Long => Position::Short,
Position::Short => Position::Long,
},
@ -678,7 +703,7 @@ impl Cfd {
}
pub fn refund_timelock_in_blocks(&self) -> u32 {
(self.order.settlement_interval * Cfd::REFUND_THRESHOLD)
(self.settlement_interval * Cfd::REFUND_THRESHOLD)
.as_blocks()
.ceil() as u32
}
@ -708,7 +733,7 @@ impl Cfd {
pub fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<Option<CfdState>> {
use CfdState::*;
let order_id = self.order.id;
let order_id = self.id;
// early exit if already final
if let SetupFailed { .. } | Closed { .. } | Refunded { .. } = self.state.clone() {
@ -1152,7 +1177,7 @@ impl Cfd {
}
pub fn role(&self) -> Role {
self.order.origin.into()
self.origin.into()
}
pub fn dlc(&self) -> Option<Dlc> {

56
daemon/src/monitor.rs

@ -79,66 +79,66 @@ impl Actor<bdk::electrum_client::Client> {
// In PendingOpen we know the complete dlc setup and assume that the lock transaction will be published
CfdState::PendingOpen { dlc, .. } => {
let params = MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.order.id, params.clone());
actor.monitor_all(&params, cfd.order.id);
actor.cfds.insert(cfd.id, params.clone());
actor.monitor_all(&params, cfd.id);
}
CfdState::Open { dlc, .. } | CfdState::PendingCommit { dlc, .. } => {
let params = MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.order.id, params.clone());
actor.cfds.insert(cfd.id, params.clone());
actor.monitor_commit_finality(&params, cfd.order.id);
actor.monitor_commit_cet_timelock(&params, cfd.order.id);
actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id);
actor.monitor_commit_finality(&params, cfd.id);
actor.monitor_commit_cet_timelock(&params, cfd.id);
actor.monitor_commit_refund_timelock(&params, cfd.id);
actor.monitor_refund_finality(&params,cfd.id);
if let Some(model::cfd::CollaborativeSettlement { tx, ..}
) = cfd.state.get_collaborative_close() {
let close_params = (tx.txid(),
tx.output.first().context("transaction has zero outputs")?.script_pubkey.clone());
actor.monitor_close_finality(close_params,cfd.order.id);
actor.monitor_close_finality(close_params,cfd.id);
}
}
CfdState::OpenCommitted { dlc, cet_status, .. } => {
let params = MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.order.id, params.clone());
actor.cfds.insert(cfd.id, params.clone());
match cet_status {
CetStatus::Unprepared => {
actor.monitor_commit_cet_timelock(&params, cfd.order.id);
actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id);
actor.monitor_commit_cet_timelock(&params, cfd.id);
actor.monitor_commit_refund_timelock(&params, cfd.id);
actor.monitor_refund_finality(&params,cfd.id);
}
CetStatus::OracleSigned(attestation) => {
actor.monitor_cet_finality(map_cets(dlc.cets), attestation.into(), cfd.order.id)?;
actor.monitor_commit_cet_timelock(&params, cfd.order.id);
actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id);
actor.monitor_cet_finality(map_cets(dlc.cets), attestation.into(), cfd.id)?;
actor.monitor_commit_cet_timelock(&params, cfd.id);
actor.monitor_commit_refund_timelock(&params, cfd.id);
actor.monitor_refund_finality(&params,cfd.id);
}
CetStatus::TimelockExpired => {
actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id);
actor.monitor_commit_refund_timelock(&params, cfd.id);
actor.monitor_refund_finality(&params,cfd.id);
}
CetStatus::Ready(attestation) => {
actor.monitor_cet_finality(map_cets(dlc.cets), attestation.into(), cfd.order.id)?;
actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id);
actor.monitor_cet_finality(map_cets(dlc.cets), attestation.into(), cfd.id)?;
actor.monitor_commit_refund_timelock(&params, cfd.id);
actor.monitor_refund_finality(&params,cfd.id);
}
}
}
CfdState::PendingCet { dlc, attestation, .. } => {
let params = MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.order.id, params.clone());
actor.cfds.insert(cfd.id, params.clone());
actor.monitor_cet_finality(map_cets(dlc.cets), attestation.into(), cfd.order.id)?;
actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id);
actor.monitor_cet_finality(map_cets(dlc.cets), attestation.into(), cfd.id)?;
actor.monitor_commit_refund_timelock(&params, cfd.id);
actor.monitor_refund_finality(&params,cfd.id);
}
CfdState::PendingRefund { dlc, .. } => {
let params = MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.order.id, params.clone());
actor.cfds.insert(cfd.id, params.clone());
actor.monitor_commit_refund_timelock(&params, cfd.order.id);
actor.monitor_refund_finality(&params,cfd.order.id);
actor.monitor_commit_refund_timelock(&params, cfd.id);
actor.monitor_refund_finality(&params,cfd.id);
}
// too early to monitor

16
daemon/src/projection.rs

@ -520,21 +520,21 @@ impl From<CfdsWithAuxData> for Vec<Cfd> {
}
})
.unwrap_or_else(|| {
tracing::debug!(order_id = %cfd.order.id, "Unable to calculate profit/loss without current price");
tracing::debug!(order_id = %cfd.id, "Unable to calculate profit/loss without current price");
(None, None)
});
let pending_proposal = input.pending_proposals.get(&cfd.order.id);
let pending_proposal = input.pending_proposals.get(&cfd.id);
let state = to_cfd_state(&cfd.state, pending_proposal);
Cfd {
order_id: cfd.order.id,
initial_price: cfd.order.price.into(),
leverage: cfd.order.leverage,
trading_pair: cfd.order.trading_pair.clone(),
order_id: cfd.id,
initial_price: cfd.price.into(),
leverage: cfd.leverage,
trading_pair: cfd.trading_pair.clone(),
position: cfd.position(),
liquidation_price: cfd.order.liquidation_price.into(),
liquidation_price: cfd.liquidation_price.into(),
quantity_usd: cfd.quantity_usd.into(),
profit_btc,
profit_percent,
@ -548,7 +548,7 @@ impl From<CfdsWithAuxData> for Vec<Cfd> {
margin_counterparty: cfd.counterparty_margin().expect("margin to be available"),
details: to_cfd_details(cfd, network),
expiry_timestamp: match cfd.expiry_timestamp() {
None => cfd.order.oracle_event_id.timestamp(),
None => cfd.oracle_event_id.timestamp(),
Some(timestamp) => timestamp,
},
counterparty: cfd.counterparty.into(),

28
daemon/src/rollover_taker.rs

@ -58,7 +58,7 @@ impl Actor {
async fn propose(&self, this: xtra::Address<Self>) -> Result<()> {
self.maker
.send(connection::ProposeRollOver {
order_id: self.cfd.order.id,
order_id: self.cfd.id,
timestamp: self.timestamp,
address: this,
})
@ -66,7 +66,7 @@ impl Actor {
self.update_proposal(Some((
RollOverProposal {
order_id: self.cfd.order.id,
order_id: self.cfd.id,
timestamp: self.timestamp,
},
SettlementKind::Outgoing,
@ -88,7 +88,7 @@ impl Actor {
.await?
.with_context(|| format!("Announcement {} not found", oracle_event_id))?;
let order_id = self.cfd.order.id;
let order_id = self.cfd.id;
tracing::info!(%order_id, "Rollover proposal got accepted");
self.update_proposal(None).await?;
@ -104,11 +104,11 @@ impl Actor {
receiver,
(self.oracle_pk, announcement),
RolloverParams::new(
self.cfd.order.price,
self.cfd.price,
self.cfd.quantity_usd,
self.cfd.order.leverage,
self.cfd.leverage,
self.cfd.refund_timelock_in_blocks(),
self.cfd.order.fee_rate,
self.cfd.fee_rate,
),
Role::Taker,
self.cfd.dlc().context("No DLC in CFD")?,
@ -129,7 +129,7 @@ impl Actor {
}
async fn handle_rejected(&self) -> Result<()> {
let order_id = self.cfd.order.id;
let order_id = self.cfd.id;
tracing::info!(%order_id, "Rollover proposal got rejected");
self.update_proposal(None).await?;
@ -153,7 +153,7 @@ impl Actor {
) -> Result<()> {
self.projection
.send(UpdateRollOverProposal {
order: self.cfd.order.id,
order: self.cfd.id,
proposal,
})
.await?;
@ -175,7 +175,7 @@ impl xtra::Actor for Actor {
if let Err(e) = self.propose(this).await {
self.complete(
Completed::Failed {
order_id: self.cfd.order.id,
order_id: self.cfd.id,
error: e,
},
ctx,
@ -207,7 +207,7 @@ impl Actor {
if let Err(error) = self.handle_confirmed(msg, ctx).await {
self.complete(
Completed::Failed {
order_id: self.cfd.order.id,
order_id: self.cfd.id,
error,
},
ctx,
@ -217,7 +217,7 @@ impl Actor {
}
pub async fn reject_rollover(&mut self, _: RollOverRejected, ctx: &mut xtra::Context<Self>) {
let order_id = self.cfd.order.id;
let order_id = self.cfd.id;
let completed = if let Err(error) = self.handle_rejected().await {
Completed::Failed { order_id, error }
} else {
@ -234,7 +234,7 @@ impl Actor {
) {
self.complete(
Completed::UpdatedContract {
order_id: self.cfd.order.id,
order_id: self.cfd.id,
dlc: msg.dlc,
},
ctx,
@ -249,7 +249,7 @@ impl Actor {
) {
self.complete(
Completed::Failed {
order_id: self.cfd.order.id,
order_id: self.cfd.id,
error: msg.error,
},
ctx,
@ -265,7 +265,7 @@ impl Actor {
if let Err(error) = self.forward_protocol_msg(msg).await {
self.complete(
Completed::Failed {
order_id: self.cfd.order.id,
order_id: self.cfd.id,
error,
},
ctx,

50
daemon/src/setup_maker.rs

@ -1,6 +1,6 @@
use crate::address_map::{ActorName, Stopping};
use crate::model::cfd::{Cfd, CfdState, Dlc, Order, OrderId, Role};
use crate::model::{Identity, Usd};
use crate::model::cfd::{Cfd, Dlc, Order, OrderId, Role};
use crate::model::Identity;
use crate::oracle::Announcement;
use crate::setup_contract::{self, SetupParams};
use crate::tokio_ext::spawn_fallible;
@ -15,8 +15,8 @@ use xtra::prelude::MessageChannel;
use xtra_productivity::xtra_productivity;
pub struct Actor {
cfd: Cfd,
order: Order,
quantity: Usd,
n_payouts: usize,
oracle_pk: schnorrsig::PublicKey,
announcement: Announcement,
@ -32,7 +32,7 @@ pub struct Actor {
impl Actor {
pub fn new(
(order, quantity, n_payouts): (Order, Usd, usize),
(cfd, order, n_payouts): (Cfd, Order, usize),
(oracle_pk, announcement): (schnorrsig::PublicKey, Announcement),
build_party_params: &(impl MessageChannel<wallet::BuildPartyParams> + 'static),
sign: &(impl MessageChannel<wallet::Sign> + 'static),
@ -48,8 +48,8 @@ impl Actor {
),
) -> Self {
Self {
cfd,
order,
quantity,
n_payouts,
oracle_pk,
announcement,
@ -65,13 +65,7 @@ impl Actor {
}
async fn contract_setup(&mut self, this: xtra::Address<Self>) -> Result<()> {
let order_id = self.order.id;
let cfd = Cfd::new(
self.order.clone(),
self.quantity,
CfdState::contract_setup(),
self.taker_id,
);
let order_id = self.cfd.id;
let (sender, receiver) = mpsc::unbounded();
// store the writing end to forward messages from the taker to
@ -89,13 +83,13 @@ impl Actor {
receiver,
(self.oracle_pk, self.announcement.clone()),
SetupParams::new(
cfd.margin()?,
cfd.counterparty_margin()?,
cfd.order.price,
cfd.quantity_usd,
cfd.order.leverage,
cfd.refund_timelock_in_blocks(),
cfd.order.fee_rate,
self.cfd.margin()?,
self.cfd.counterparty_margin()?,
self.cfd.price,
self.cfd.quantity_usd,
self.cfd.leverage,
self.cfd.refund_timelock_in_blocks(),
self.cfd.fee_rate,
),
self.build_party_params.clone_channel(),
self.sign.clone_channel(),
@ -125,7 +119,7 @@ impl Actor {
#[xtra_productivity]
impl Actor {
fn handle(&mut self, _msg: Accepted, ctx: &mut xtra::Context<Self>) {
let order_id = self.order.id;
let order_id = self.cfd.id;
tracing::info!(%order_id, "Maker accepts an order");
let this = ctx
@ -160,7 +154,7 @@ impl Actor {
}
fn handle(&mut self, _msg: Rejected, ctx: &mut xtra::Context<Self>) {
self.complete(Completed::Rejected(self.order.id), ctx).await;
self.complete(Completed::Rejected(self.cfd.id), ctx).await;
}
fn handle(&mut self, msg: SetupSucceeded, ctx: &mut xtra::Context<Self>) {
@ -202,25 +196,25 @@ impl Actor {
#[async_trait]
impl xtra::Actor for Actor {
async fn started(&mut self, ctx: &mut xtra::Context<Self>) {
let quantity = self.quantity;
let order = self.order.clone();
if quantity < order.min_quantity || quantity > order.max_quantity {
let quantity = self.cfd.quantity_usd;
let cfd = self.cfd.clone();
if quantity < self.order.min_quantity || quantity > self.order.max_quantity {
tracing::info!(
"Order rejected: quantity {} not in range [{}, {}]",
quantity,
order.min_quantity,
order.max_quantity
self.order.min_quantity,
self.order.max_quantity
);
let _ = self
.taker
.send(maker_inc_connections::TakerMessage {
taker_id: self.taker_id,
msg: wire::MakerToTaker::RejectOrder(order.id),
msg: wire::MakerToTaker::RejectOrder(cfd.id),
})
.await;
self.complete(Completed::Rejected(order.id), ctx).await;
self.complete(Completed::Rejected(cfd.id), ctx).await;
}
}

48
daemon/src/setup_taker.rs

@ -1,5 +1,4 @@
use crate::model::cfd::{Cfd, CfdState, Completed, Dlc, Order, OrderId, Role};
use crate::model::{Identity, Usd};
use crate::model::cfd::{Cfd, CfdState, Completed, Dlc, OrderId, Role};
use crate::oracle::Announcement;
use crate::setup_contract::{self, SetupParams};
use crate::tokio_ext::spawn_fallible;
@ -14,8 +13,7 @@ use xtra::prelude::*;
use xtra_productivity::xtra_productivity;
pub struct Actor {
order: Order,
quantity: Usd,
cfd: Cfd,
n_payouts: usize,
oracle_pk: schnorrsig::PublicKey,
announcement: Announcement,
@ -25,24 +23,21 @@ pub struct Actor {
on_accepted: Box<dyn MessageChannel<Started>>,
on_completed: Box<dyn MessageChannel<Completed>>,
setup_msg_sender: Option<UnboundedSender<SetupMsg>>,
maker_identity: Identity,
}
impl Actor {
#[allow(clippy::too_many_arguments)]
pub fn new(
(order, quantity, n_payouts): (Order, Usd, usize),
(cfd, n_payouts): (Cfd, usize),
(oracle_pk, announcement): (schnorrsig::PublicKey, Announcement),
build_party_params: &(impl MessageChannel<wallet::BuildPartyParams> + 'static),
sign: &(impl MessageChannel<wallet::Sign> + 'static),
maker: xtra::Address<connection::Actor>,
on_accepted: &(impl MessageChannel<Started> + 'static),
on_completed: &(impl MessageChannel<Completed> + 'static),
maker_identity: Identity,
) -> Self {
Self {
order,
quantity,
cfd,
n_payouts,
oracle_pk,
announcement,
@ -52,7 +47,6 @@ impl Actor {
on_accepted: on_accepted.clone_channel(),
on_completed: on_completed.clone_channel(),
setup_msg_sender: None,
maker_identity,
}
}
}
@ -60,20 +54,16 @@ impl Actor {
#[xtra_productivity]
impl Actor {
fn handle(&mut self, _: Accepted, ctx: &mut xtra::Context<Self>) -> Result<()> {
let order_id = self.order.id;
let order_id = self.cfd.id;
self.cfd.state = CfdState::contract_setup();
tracing::info!(%order_id, "Order got accepted");
// inform the `taker_cfd::Actor` about the start of contract
// setup, so that the db and UI can be updated accordingly
self.on_accepted.send(Started(order_id)).await?;
let cfd = Cfd::new(
self.order.clone(),
self.quantity,
CfdState::contract_setup(),
self.maker_identity,
);
let (sender, receiver) = mpsc::unbounded::<SetupMsg>();
// store the writing end to forward messages from the maker to
// the spawned contract setup task
@ -85,13 +75,13 @@ impl Actor {
receiver,
(self.oracle_pk, self.announcement.clone()),
SetupParams::new(
cfd.margin()?,
cfd.counterparty_margin()?,
cfd.order.price,
cfd.quantity_usd,
cfd.order.leverage,
cfd.refund_timelock_in_blocks(),
cfd.order.fee_rate,
self.cfd.margin()?,
self.cfd.counterparty_margin()?,
self.cfd.price,
self.cfd.quantity_usd,
self.cfd.leverage,
self.cfd.refund_timelock_in_blocks(),
self.cfd.fee_rate,
),
self.build_party_params.clone_channel(),
self.sign.clone_channel(),
@ -113,7 +103,7 @@ impl Actor {
}
fn handle(&mut self, msg: Rejected, ctx: &mut xtra::Context<Self>) -> Result<()> {
let order_id = self.order.id;
let order_id = self.cfd.id;
tracing::info!(%order_id, "Order got rejected");
if msg.is_invalid_order {
@ -175,14 +165,14 @@ impl xtra::Actor for Actor {
let res = self
.maker
.send(connection::TakeOrder {
order_id: self.order.id,
quantity: self.quantity,
order_id: self.cfd.id,
quantity: self.cfd.quantity_usd,
address,
})
.await;
if let Err(e) = res {
tracing::warn!(%self.order.id, "Stopping setup_taker actor: {}", e);
tracing::warn!(%self.cfd.id, "Stopping setup_taker actor: {}", e);
ctx.stop()
}
}

21
daemon/src/taker_cfd.rs

@ -1,6 +1,6 @@
use crate::address_map::{AddressMap, Stopping};
use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_update_feed};
use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id};
use crate::db::load_cfd_by_order_id;
use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Completed, Order, OrderId, Origin, Role};
use crate::model::{Identity, Price, Usd};
use crate::monitor::{self, MonitorParams};
@ -48,6 +48,7 @@ pub struct Actor<O, M, W> {
oracle_actor: Address<O>,
n_payouts: usize,
tasks: Tasks,
current_order: Option<Order>,
maker_identity: Identity,
}
@ -82,6 +83,7 @@ where
collab_settlement_actors: AddressMap::default(),
rollover_actors: AddressMap::default(),
tasks: Tasks::default(),
current_order: None,
maker_identity,
}
}
@ -190,10 +192,7 @@ impl<O, M, W> Actor<O, M, W> {
bail!("Received order {} from maker, but already have a cfd in the database for that order. The maker did not properly remove the order.", order.id)
}
if load_order_by_id(order.id, &mut conn).await.is_err() {
// only insert the order if we don't know it yet
insert_order(&order, &mut conn).await?;
}
self.current_order = Some(order.clone());
self.projection_actor
.send(projection::Update(Some(order)))
@ -290,7 +289,10 @@ where
let mut conn = self.db.acquire().await?;
let current_order = load_order_by_id(order_id, &mut conn).await?;
let current_order = self
.current_order
.clone()
.context("No current order from maker")?;
tracing::info!("Taking current order: {:?}", &current_order);
@ -310,22 +312,21 @@ where
let announcement = self
.oracle_actor
.send(oracle::GetAnnouncement(cfd.order.oracle_event_id))
.send(oracle::GetAnnouncement(cfd.oracle_event_id))
.await?
.with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?;
.with_context(|| format!("Announcement {} not found", cfd.oracle_event_id))?;
let this = ctx
.address()
.expect("actor to be able to give address to itself");
let (addr, fut) = setup_taker::Actor::new(
(current_order, quantity, self.n_payouts),
(cfd, self.n_payouts),
(self.oracle_pk, announcement),
&self.wallet,
&self.wallet,
self.conn_actor.clone(),
&this,
&this,
self.maker_identity,
)
.create(None)
.run();

Loading…
Cancel
Save