Browse Source

Merge #819

819: Don't store order in database r=da-kami a=thomaseizinger

Todo:

- [x] Needs manual testing

Tested opening a CFD locally. All fine.

Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
resilient-broadcast
bors[bot] 3 years ago
committed by GitHub
parent
commit
31c5858d73
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 31
      daemon/migrations/20211210000000_remove-order-from-database.sql
  2. 214
      daemon/sqlx-data.json
  3. 14
      daemon/src/collab_settlement_maker.rs
  4. 14
      daemon/src/collab_settlement_taker.rs
  5. 442
      daemon/src/db.rs
  6. 50
      daemon/src/maker_cfd.rs
  7. 1
      daemon/src/model.rs
  8. 75
      daemon/src/model/cfd.rs
  9. 56
      daemon/src/monitor.rs
  10. 16
      daemon/src/projection.rs
  11. 28
      daemon/src/rollover_taker.rs
  12. 50
      daemon/src/setup_maker.rs
  13. 48
      daemon/src/setup_taker.rs
  14. 21
      daemon/src/taker_cfd.rs

31
daemon/migrations/20211210000000_remove-order-from-database.sql

@ -0,0 +1,31 @@
drop table cfd_states;
drop table cfds;
drop table orders;
create table if not exists cfds
(
id integer primary key autoincrement,
uuid text unique not null,
trading_pair text not null,
position text not null,
initial_price text not null,
leverage integer not null,
liquidation_price text not null,
creation_timestamp_seconds integer not null,
settlement_time_interval_seconds integer not null,
origin text not null,
oracle_event_id text not null,
fee_rate integer not null,
quantity_usd text not null,
counterparty text not null
);
create unique index if not exists cfd_uuid on cfds (uuid);
create table if not exists cfd_states
(
id integer primary key autoincrement,
cfd_id integer not null,
state text not null,
foreign key (cfd_id) references cfds (id)
);

214
daemon/sqlx-data.json

@ -1,95 +1,5 @@
{ {
"db": "SQLite", "db": "SQLite",
"047070578f79ceaa739c1b307d138ee30e3a8730c4ac357213801e23d73f2841": {
"query": "\n select\n uuid as \"uuid: crate::model::cfd::OrderId\",\n trading_pair as \"trading_pair: crate::model::TradingPair\",\n position as \"position: crate::model::Position\",\n initial_price as \"initial_price: crate::model::Price\",\n min_quantity as \"min_quantity: crate::model::Usd\",\n max_quantity as \"max_quantity: crate::model::Usd\",\n leverage as \"leverage: crate::model::Leverage\",\n liquidation_price as \"liquidation_price: crate::model::Price\",\n creation_timestamp_seconds as \"ts_secs: crate::model::Timestamp\",\n settlement_time_interval_seconds as \"settlement_time_interval_secs: i64\",\n origin as \"origin: crate::model::cfd::Origin\",\n oracle_event_id as \"oracle_event_id: crate::model::BitMexPriceEventId\",\n fee_rate as \"fee_rate: u32\"\n from\n orders\n where\n uuid = $1\n ",
"describe": {
"columns": [
{
"name": "uuid: crate::model::cfd::OrderId",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "trading_pair: crate::model::TradingPair",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "position: crate::model::Position",
"ordinal": 2,
"type_info": "Text"
},
{
"name": "initial_price: crate::model::Price",
"ordinal": 3,
"type_info": "Text"
},
{
"name": "min_quantity: crate::model::Usd",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "max_quantity: crate::model::Usd",
"ordinal": 5,
"type_info": "Text"
},
{
"name": "leverage: crate::model::Leverage",
"ordinal": 6,
"type_info": "Int64"
},
{
"name": "liquidation_price: crate::model::Price",
"ordinal": 7,
"type_info": "Text"
},
{
"name": "ts_secs: crate::model::Timestamp",
"ordinal": 8,
"type_info": "Int64"
},
{
"name": "settlement_time_interval_secs: i64",
"ordinal": 9,
"type_info": "Int64"
},
{
"name": "origin: crate::model::cfd::Origin",
"ordinal": 10,
"type_info": "Text"
},
{
"name": "oracle_event_id: crate::model::BitMexPriceEventId",
"ordinal": 11,
"type_info": "Text"
},
{
"name": "fee_rate: u32",
"ordinal": 12,
"type_info": "Null"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
false
]
}
},
"221a6283db798bacaba99e7e85130f9a8bbea1299d8cb99d272b1d478dc19775": { "221a6283db798bacaba99e7e85130f9a8bbea1299d8cb99d272b1d478dc19775": {
"query": "\n select\n state\n from cfd_states\n where cfd_id = $1\n order by id desc\n limit 1;\n ", "query": "\n select\n state\n from cfd_states\n where cfd_id = $1\n order by id desc\n limit 1;\n ",
"describe": { "describe": {
@ -108,8 +18,8 @@
] ]
} }
}, },
"28d8b9ddd2dd85a9096200e6abd170a09ed35e1c905c081e535e19800016cd7d": { "3efef7791c6a33eff33426e4b68e1f908be8da5efd2f991ba8e67df99f7ab360": {
"query": "\n with ord as (\n select\n id as order_id,\n uuid,\n trading_pair,\n position,\n initial_price,\n min_quantity,\n max_quantity,\n leverage,\n liquidation_price,\n creation_timestamp_seconds as ts_secs,\n settlement_time_interval_seconds as settlement_time_interval_secs,\n origin,\n oracle_event_id,\n fee_rate\n from orders\n ),\n\n cfd as (\n select\n ord.order_id,\n id as cfd_id,\n quantity_usd,\n counterparty\n from cfds\n inner join ord on ord.order_id = cfds.order_id\n ),\n\n state as (\n select\n id as state_id,\n cfd.order_id,\n cfd.quantity_usd,\n cfd.counterparty,\n state\n from cfd_states\n inner join cfd on cfd.cfd_id = cfd_states.cfd_id\n where id in (\n select\n max(id) as id\n from cfd_states\n group by (cfd_id)\n )\n )\n\n select\n ord.uuid as \"uuid: crate::model::cfd::OrderId\",\n ord.trading_pair as \"trading_pair: crate::model::TradingPair\",\n ord.position as \"position: crate::model::Position\",\n ord.initial_price as \"initial_price: crate::model::Price\",\n ord.min_quantity as \"min_quantity: crate::model::Usd\",\n ord.max_quantity as \"max_quantity: crate::model::Usd\",\n ord.leverage as \"leverage: crate::model::Leverage\",\n ord.liquidation_price as \"liquidation_price: crate::model::Price\",\n ord.ts_secs as \"ts_secs: crate::model::Timestamp\",\n ord.settlement_time_interval_secs as \"settlement_time_interval_secs: i64\",\n ord.origin as \"origin: crate::model::cfd::Origin\",\n ord.oracle_event_id as \"oracle_event_id: crate::model::BitMexPriceEventId\",\n ord.fee_rate as \"fee_rate: u32\",\n state.quantity_usd as \"quantity_usd: crate::model::Usd\",\n state.counterparty as \"counterparty: crate::model::Identity\",\n state.state\n\n from ord\n inner join state on state.order_id = ord.order_id\n\n where ord.uuid = $1\n ", "query": "\n with state as (\n select\n cfd_id,\n state\n from cfd_states\n inner join cfds on cfds.id = cfd_states.cfd_id\n where cfd_states.id in (\n select\n max(id) as id\n from cfd_states\n group by (cfd_id)\n )\n )\n\n select\n cfds.uuid as \"uuid: crate::model::cfd::OrderId\",\n cfds.trading_pair as \"trading_pair: crate::model::TradingPair\",\n cfds.position as \"position: crate::model::Position\",\n cfds.initial_price as \"initial_price: crate::model::Price\",\n cfds.leverage as \"leverage: crate::model::Leverage\",\n cfds.liquidation_price as \"liquidation_price: crate::model::Price\",\n cfds.creation_timestamp_seconds as \"creation_timestamp_seconds: crate::model::Timestamp\",\n cfds.settlement_time_interval_seconds as \"settlement_time_interval_secs: i64\",\n cfds.origin as \"origin: crate::model::cfd::Origin\",\n cfds.oracle_event_id as \"oracle_event_id: crate::model::BitMexPriceEventId\",\n cfds.fee_rate as \"fee_rate: u32\",\n cfds.quantity_usd as \"quantity_usd: crate::model::Usd\",\n cfds.counterparty as \"counterparty: crate::model::Identity\",\n state.state\n\n from cfds\n inner join state on state.cfd_id = cfds.id\n\n where cfds.oracle_event_id = $1\n ",
"describe": { "describe": {
"columns": [ "columns": [
{ {
@ -132,64 +42,54 @@
"ordinal": 3, "ordinal": 3,
"type_info": "Text" "type_info": "Text"
}, },
{
"name": "min_quantity: crate::model::Usd",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "max_quantity: crate::model::Usd",
"ordinal": 5,
"type_info": "Text"
},
{ {
"name": "leverage: crate::model::Leverage", "name": "leverage: crate::model::Leverage",
"ordinal": 6, "ordinal": 4,
"type_info": "Int64" "type_info": "Int64"
}, },
{ {
"name": "liquidation_price: crate::model::Price", "name": "liquidation_price: crate::model::Price",
"ordinal": 7, "ordinal": 5,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "ts_secs: crate::model::Timestamp", "name": "creation_timestamp_seconds: crate::model::Timestamp",
"ordinal": 8, "ordinal": 6,
"type_info": "Int64" "type_info": "Int64"
}, },
{ {
"name": "settlement_time_interval_secs: i64", "name": "settlement_time_interval_secs: i64",
"ordinal": 9, "ordinal": 7,
"type_info": "Int64" "type_info": "Int64"
}, },
{ {
"name": "origin: crate::model::cfd::Origin", "name": "origin: crate::model::cfd::Origin",
"ordinal": 10, "ordinal": 8,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "oracle_event_id: crate::model::BitMexPriceEventId", "name": "oracle_event_id: crate::model::BitMexPriceEventId",
"ordinal": 11, "ordinal": 9,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "fee_rate: u32", "name": "fee_rate: u32",
"ordinal": 12, "ordinal": 10,
"type_info": "Null" "type_info": "Int64"
}, },
{ {
"name": "quantity_usd: crate::model::Usd", "name": "quantity_usd: crate::model::Usd",
"ordinal": 13, "ordinal": 11,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "counterparty: crate::model::Identity", "name": "counterparty: crate::model::Identity",
"ordinal": 14, "ordinal": 12,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "state", "name": "state",
"ordinal": 15, "ordinal": 13,
"type_info": "Text" "type_info": "Text"
} }
], ],
@ -210,14 +110,12 @@
false, false,
false, false,
false, false,
false,
false,
false false
] ]
} }
}, },
"8cbe349911b35d8e79763d64b4f5813b4bd98f12e0bba5ada84d2cae8b08ef4f": { "9f31d4002a7328b199a24d50149f2724706e2d391a94b76d7894983f5eb71c4b": {
"query": "\n select\n id\n from cfds\n where order_uuid = $1;\n ", "query": "\n select\n id\n from cfds\n where cfds.uuid = $1;\n ",
"describe": { "describe": {
"columns": [ "columns": [
{ {
@ -234,8 +132,8 @@
] ]
} }
}, },
"ef4fb6c58e79051bd09ad04f59b7896df5228c6c848c999149668d7c733115c2": { "c64374031a424b78b1061d05d0087d79c7251fe6da1dd3cb5d146d1e2b4dd12f": {
"query": "\n with ord as (\n select\n id as order_id,\n uuid,\n trading_pair,\n position,\n initial_price,\n min_quantity,\n max_quantity,\n leverage,\n liquidation_price,\n creation_timestamp_seconds as ts_secs,\n settlement_time_interval_seconds as settlement_time_interval_secs,\n origin,\n oracle_event_id,\n fee_rate\n from orders\n ),\n\n cfd as (\n select\n ord.order_id,\n id as cfd_id,\n quantity_usd,\n counterparty\n from cfds\n inner join ord on ord.order_id = cfds.order_id\n ),\n\n state as (\n select\n id as state_id,\n cfd.order_id,\n cfd.quantity_usd,\n cfd.counterparty,\n state\n from cfd_states\n inner join cfd on cfd.cfd_id = cfd_states.cfd_id\n where id in (\n select\n max(id) as id\n from cfd_states\n group by (cfd_id)\n )\n )\n\n select\n ord.uuid as \"uuid: crate::model::cfd::OrderId\",\n ord.trading_pair as \"trading_pair: crate::model::TradingPair\",\n ord.position as \"position: crate::model::Position\",\n ord.initial_price as \"initial_price: crate::model::Price\",\n ord.min_quantity as \"min_quantity: crate::model::Usd\",\n ord.max_quantity as \"max_quantity: crate::model::Usd\",\n ord.leverage as \"leverage: crate::model::Leverage\",\n ord.liquidation_price as \"liquidation_price: crate::model::Price\",\n ord.ts_secs as \"ts_secs: crate::model::Timestamp\",\n ord.settlement_time_interval_secs as \"settlement_time_interval_secs: i64\",\n ord.origin as \"origin: crate::model::cfd::Origin\",\n ord.oracle_event_id as \"oracle_event_id: crate::model::BitMexPriceEventId\",\n ord.fee_rate as \"fee_rate: u32\",\n state.quantity_usd as \"quantity_usd: crate::model::Usd\",\n state.counterparty as \"counterparty: crate::model::Identity\",\n state.state\n\n from ord\n inner join state on state.order_id = ord.order_id\n ", "query": "\n with state as (\n select\n cfd_id,\n state\n from cfd_states\n inner join cfds on cfds.id = cfd_states.cfd_id\n where cfd_states.id in (\n select\n max(id) as id\n from cfd_states\n group by (cfd_id)\n )\n )\n\n select\n cfds.uuid as \"uuid: crate::model::cfd::OrderId\",\n cfds.trading_pair as \"trading_pair: crate::model::TradingPair\",\n cfds.position as \"position: crate::model::Position\",\n cfds.initial_price as \"initial_price: crate::model::Price\",\n cfds.leverage as \"leverage: crate::model::Leverage\",\n cfds.liquidation_price as \"liquidation_price: crate::model::Price\",\n cfds.creation_timestamp_seconds as \"creation_timestamp_seconds: crate::model::Timestamp\",\n cfds.settlement_time_interval_seconds as \"settlement_time_interval_secs: i64\",\n cfds.origin as \"origin: crate::model::cfd::Origin\",\n cfds.oracle_event_id as \"oracle_event_id: crate::model::BitMexPriceEventId\",\n cfds.fee_rate as \"fee_rate: u32\",\n cfds.quantity_usd as \"quantity_usd: crate::model::Usd\",\n cfds.counterparty as \"counterparty: crate::model::Identity\",\n state.state\n\n from cfds\n inner join state on state.cfd_id = cfds.id\n ",
"describe": { "describe": {
"columns": [ "columns": [
{ {
@ -258,64 +156,54 @@
"ordinal": 3, "ordinal": 3,
"type_info": "Text" "type_info": "Text"
}, },
{
"name": "min_quantity: crate::model::Usd",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "max_quantity: crate::model::Usd",
"ordinal": 5,
"type_info": "Text"
},
{ {
"name": "leverage: crate::model::Leverage", "name": "leverage: crate::model::Leverage",
"ordinal": 6, "ordinal": 4,
"type_info": "Int64" "type_info": "Int64"
}, },
{ {
"name": "liquidation_price: crate::model::Price", "name": "liquidation_price: crate::model::Price",
"ordinal": 7, "ordinal": 5,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "ts_secs: crate::model::Timestamp", "name": "creation_timestamp_seconds: crate::model::Timestamp",
"ordinal": 8, "ordinal": 6,
"type_info": "Int64" "type_info": "Int64"
}, },
{ {
"name": "settlement_time_interval_secs: i64", "name": "settlement_time_interval_secs: i64",
"ordinal": 9, "ordinal": 7,
"type_info": "Int64" "type_info": "Int64"
}, },
{ {
"name": "origin: crate::model::cfd::Origin", "name": "origin: crate::model::cfd::Origin",
"ordinal": 10, "ordinal": 8,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "oracle_event_id: crate::model::BitMexPriceEventId", "name": "oracle_event_id: crate::model::BitMexPriceEventId",
"ordinal": 11, "ordinal": 9,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "fee_rate: u32", "name": "fee_rate: u32",
"ordinal": 12, "ordinal": 10,
"type_info": "Null" "type_info": "Int64"
}, },
{ {
"name": "quantity_usd: crate::model::Usd", "name": "quantity_usd: crate::model::Usd",
"ordinal": 13, "ordinal": 11,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "counterparty: crate::model::Identity", "name": "counterparty: crate::model::Identity",
"ordinal": 14, "ordinal": 12,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "state", "name": "state",
"ordinal": 15, "ordinal": 13,
"type_info": "Text" "type_info": "Text"
} }
], ],
@ -336,14 +224,12 @@
false, false,
false, false,
false, false,
false,
false,
false false
] ]
} }
}, },
"f6ac595731d61e166b2afaa4923e9f05c75cfad6d1d734ab43e7567d0e29adec": { "ff0e5909f36d03a19434acef0bf307c5b8a70beae6f619853f4e3ce5a8c53b61": {
"query": "\n with ord as (\n select\n id as order_id,\n uuid,\n trading_pair,\n position,\n initial_price,\n min_quantity,\n max_quantity,\n leverage,\n liquidation_price,\n creation_timestamp_seconds as ts_secs,\n settlement_time_interval_seconds as settlement_time_interval_secs,\n origin,\n oracle_event_id,\n fee_rate\n from orders\n ),\n\n cfd as (\n select\n ord.order_id,\n id as cfd_id,\n quantity_usd,\n counterparty\n from cfds\n inner join ord on ord.order_id = cfds.order_id\n ),\n\n state as (\n select\n id as state_id,\n cfd.order_id,\n cfd.quantity_usd,\n cfd.counterparty,\n state\n from cfd_states\n inner join cfd on cfd.cfd_id = cfd_states.cfd_id\n where id in (\n select\n max(id) as id\n from cfd_states\n group by (cfd_id)\n )\n )\n\n select\n ord.uuid as \"uuid: crate::model::cfd::OrderId\",\n ord.trading_pair as \"trading_pair: crate::model::TradingPair\",\n ord.position as \"position: crate::model::Position\",\n ord.initial_price as \"initial_price: crate::model::Price\",\n ord.min_quantity as \"min_quantity: crate::model::Usd\",\n ord.max_quantity as \"max_quantity: crate::model::Usd\",\n ord.leverage as \"leverage: crate::model::Leverage\",\n ord.liquidation_price as \"liquidation_price: crate::model::Price\",\n ord.ts_secs as \"ts_secs: crate::model::Timestamp\",\n ord.settlement_time_interval_secs as \"settlement_time_interval_secs: i64\",\n ord.origin as \"origin: crate::model::cfd::Origin\",\n ord.oracle_event_id as \"oracle_event_id: crate::model::BitMexPriceEventId\",\n ord.fee_rate as \"fee_rate: u32\",\n state.quantity_usd as \"quantity_usd: crate::model::Usd\",\n state.counterparty as \"counterparty: crate::model::Identity\",\n state.state\n\n from ord\n inner join state on state.order_id = ord.order_id\n\n where ord.oracle_event_id = $1\n ", "query": "\n with state as (\n select\n cfd_id,\n state\n from cfd_states\n inner join cfds on cfds.id = cfd_states.cfd_id\n where cfd_states.id in (\n select\n max(id) as id\n from cfd_states\n group by (cfd_id)\n )\n )\n\n select\n cfds.uuid as \"uuid: crate::model::cfd::OrderId\",\n cfds.trading_pair as \"trading_pair: crate::model::TradingPair\",\n cfds.position as \"position: crate::model::Position\",\n cfds.initial_price as \"initial_price: crate::model::Price\",\n cfds.leverage as \"leverage: crate::model::Leverage\",\n cfds.liquidation_price as \"liquidation_price: crate::model::Price\",\n cfds.creation_timestamp_seconds as \"creation_timestamp_seconds: crate::model::Timestamp\",\n cfds.settlement_time_interval_seconds as \"settlement_time_interval_secs: i64\",\n cfds.origin as \"origin: crate::model::cfd::Origin\",\n cfds.oracle_event_id as \"oracle_event_id: crate::model::BitMexPriceEventId\",\n cfds.fee_rate as \"fee_rate: u32\",\n cfds.quantity_usd as \"quantity_usd: crate::model::Usd\",\n cfds.counterparty as \"counterparty: crate::model::Identity\",\n state.state\n\n from cfds\n inner join state on state.cfd_id = cfds.id\n\n where cfds.uuid = $1\n ",
"describe": { "describe": {
"columns": [ "columns": [
{ {
@ -366,64 +252,54 @@
"ordinal": 3, "ordinal": 3,
"type_info": "Text" "type_info": "Text"
}, },
{
"name": "min_quantity: crate::model::Usd",
"ordinal": 4,
"type_info": "Text"
},
{
"name": "max_quantity: crate::model::Usd",
"ordinal": 5,
"type_info": "Text"
},
{ {
"name": "leverage: crate::model::Leverage", "name": "leverage: crate::model::Leverage",
"ordinal": 6, "ordinal": 4,
"type_info": "Int64" "type_info": "Int64"
}, },
{ {
"name": "liquidation_price: crate::model::Price", "name": "liquidation_price: crate::model::Price",
"ordinal": 7, "ordinal": 5,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "ts_secs: crate::model::Timestamp", "name": "creation_timestamp_seconds: crate::model::Timestamp",
"ordinal": 8, "ordinal": 6,
"type_info": "Int64" "type_info": "Int64"
}, },
{ {
"name": "settlement_time_interval_secs: i64", "name": "settlement_time_interval_secs: i64",
"ordinal": 9, "ordinal": 7,
"type_info": "Int64" "type_info": "Int64"
}, },
{ {
"name": "origin: crate::model::cfd::Origin", "name": "origin: crate::model::cfd::Origin",
"ordinal": 10, "ordinal": 8,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "oracle_event_id: crate::model::BitMexPriceEventId", "name": "oracle_event_id: crate::model::BitMexPriceEventId",
"ordinal": 11, "ordinal": 9,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "fee_rate: u32", "name": "fee_rate: u32",
"ordinal": 12, "ordinal": 10,
"type_info": "Null" "type_info": "Int64"
}, },
{ {
"name": "quantity_usd: crate::model::Usd", "name": "quantity_usd: crate::model::Usd",
"ordinal": 13, "ordinal": 11,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "counterparty: crate::model::Identity", "name": "counterparty: crate::model::Identity",
"ordinal": 14, "ordinal": 12,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "state", "name": "state",
"ordinal": 15, "ordinal": 13,
"type_info": "Text" "type_info": "Text"
} }
], ],
@ -444,8 +320,6 @@
false, false,
false, false,
false, false,
false,
false,
false false
] ]
} }

14
daemon/src/collab_settlement_maker.rs

@ -45,7 +45,7 @@ pub struct Initiated {
#[xtra_productivity] #[xtra_productivity]
impl Actor { impl Actor {
async fn handle(&mut self, _: Accepted, ctx: &mut xtra::Context<Self>) { async fn handle(&mut self, _: Accepted, ctx: &mut xtra::Context<Self>) {
let order_id = self.cfd.order.id; let order_id = self.cfd.id;
tracing::info!(%order_id, "Settlement proposal accepted"); tracing::info!(%order_id, "Settlement proposal accepted");
@ -54,7 +54,7 @@ impl Actor {
} }
async fn handle(&mut self, _: Rejected, ctx: &mut xtra::Context<Self>) { async fn handle(&mut self, _: Rejected, ctx: &mut xtra::Context<Self>) {
let order_id = self.cfd.order.id; let order_id = self.cfd.id;
tracing::info!(%order_id, "Settlement proposal rejected"); tracing::info!(%order_id, "Settlement proposal rejected");
@ -65,7 +65,7 @@ impl Actor {
async fn handle(&mut self, msg: Initiated, ctx: &mut xtra::Context<Self>) { async fn handle(&mut self, msg: Initiated, ctx: &mut xtra::Context<Self>) {
let completed = async { let completed = async {
tracing::info!( tracing::info!(
order_id = %self.cfd.order.id, order_id = %self.cfd.id,
taker_id = %self.taker_id, taker_id = %self.taker_id,
"Received signature for collaborative settlement" "Received signature for collaborative settlement"
); );
@ -85,14 +85,14 @@ impl Actor {
self.update_proposal(None).await; self.update_proposal(None).await;
anyhow::Ok(Completed::Confirmed { anyhow::Ok(Completed::Confirmed {
order_id: self.cfd.order.id, order_id: self.cfd.id,
settlement, settlement,
script_pubkey: dlc.script_pubkey_for(Role::Maker), script_pubkey: dlc.script_pubkey_for(Role::Maker),
}) })
} }
.await .await
.unwrap_or_else(|e| Completed::Failed { .unwrap_or_else(|e| Completed::Failed {
order_id: self.cfd.order.id, order_id: self.cfd.id,
error: e, error: e,
}); });
@ -154,7 +154,7 @@ impl Actor {
if let Err(e) = self if let Err(e) = self
.projection .projection
.send(projection::UpdateSettlementProposal { .send(projection::UpdateSettlementProposal {
order: self.cfd.order.id, order: self.cfd.id,
proposal, proposal,
}) })
.await .await
@ -191,7 +191,7 @@ impl Actor {
decision: maker_inc_connections::settlement::Decision, decision: maker_inc_connections::settlement::Decision,
ctx: &mut xtra::Context<Self>, ctx: &mut xtra::Context<Self>,
) { ) {
let order_id = self.cfd.order.id; let order_id = self.cfd.id;
if let Err(e) = self if let Err(e) = self
.connections .connections

14
daemon/src/collab_settlement_taker.rs

@ -41,7 +41,7 @@ impl Actor {
if !self.cfd.is_collaborative_settle_possible() { if !self.cfd.is_collaborative_settle_possible() {
anyhow::bail!( anyhow::bail!(
"Settlement proposal not possible because for cfd {} is in state {} which cannot be collaboratively settled", "Settlement proposal not possible because for cfd {} is in state {} which cannot be collaboratively settled",
self.cfd.order.id, self.cfd.id,
self.cfd.state self.cfd.state
) )
} }
@ -53,7 +53,7 @@ impl Actor {
maker: self.proposal.maker, maker: self.proposal.maker,
price: self.proposal.price, price: self.proposal.price,
address: this, address: this,
order_id: self.cfd.order.id, order_id: self.cfd.id,
}) })
.await??; .await??;
@ -64,7 +64,7 @@ impl Actor {
} }
async fn handle_confirmed(&mut self) -> Result<CollaborativeSettlement> { async fn handle_confirmed(&mut self) -> Result<CollaborativeSettlement> {
let order_id = self.cfd.order.id; let order_id = self.cfd.id;
tracing::info!(%order_id, "Settlement proposal got accepted"); tracing::info!(%order_id, "Settlement proposal got accepted");
@ -93,7 +93,7 @@ impl Actor {
} }
async fn handle_rejected(&mut self) -> Result<()> { async fn handle_rejected(&mut self) -> Result<()> {
let order_id = self.cfd.order.id; let order_id = self.cfd.id;
tracing::info!(%order_id, "Settlement proposal got rejected"); tracing::info!(%order_id, "Settlement proposal got rejected");
@ -108,7 +108,7 @@ impl Actor {
) -> Result<()> { ) -> Result<()> {
self.projection self.projection
.send(projection::UpdateSettlementProposal { .send(projection::UpdateSettlementProposal {
order: self.cfd.order.id, order: self.cfd.id,
proposal, proposal,
}) })
.await?; .await?;
@ -131,7 +131,7 @@ impl xtra::Actor for Actor {
if let Err(e) = self.propose(this).await { if let Err(e) = self.propose(this).await {
self.complete( self.complete(
Completed::Failed { Completed::Failed {
order_id: self.cfd.order.id, order_id: self.cfd.id,
error: e, error: e,
}, },
ctx, ctx,
@ -170,7 +170,7 @@ impl Actor {
msg: wire::maker_to_taker::Settlement, msg: wire::maker_to_taker::Settlement,
ctx: &mut xtra::Context<Self>, ctx: &mut xtra::Context<Self>,
) { ) {
let order_id = self.cfd.order.id; let order_id = self.cfd.id;
let completed = match msg { let completed = match msg {
wire::maker_to_taker::Settlement::Confirm => match self.handle_confirmed().await { wire::maker_to_taker::Settlement::Confirm => match self.handle_confirmed().await {

442
daemon/src/db.rs

@ -1,6 +1,6 @@
use crate::model::cfd::{Cfd, CfdState, Order, OrderId}; use crate::model::cfd::{Cfd, CfdState, OrderId};
use crate::model::BitMexPriceEventId; use crate::model::BitMexPriceEventId;
use anyhow::{bail, Context, Result}; use anyhow::{Context, Result};
use sqlx::pool::PoolConnection; use sqlx::pool::PoolConnection;
use sqlx::{Sqlite, SqlitePool}; use sqlx::{Sqlite, SqlitePool};
use std::mem; use std::mem;
@ -11,118 +11,25 @@ pub async fn run_migrations(pool: &SqlitePool) -> anyhow::Result<()> {
Ok(()) Ok(())
} }
pub async fn insert_order(order: &Order, conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<()> { pub async fn insert_cfd(cfd: &Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<()> {
let state = serde_json::to_string(&cfd.state)?;
let query_result = sqlx::query( let query_result = sqlx::query(
r#"insert into orders ( r#"
insert into cfds (
uuid, uuid,
trading_pair, trading_pair,
position, position,
initial_price, initial_price,
min_quantity,
max_quantity,
leverage, leverage,
liquidation_price, liquidation_price,
creation_timestamp_seconds, creation_timestamp_seconds,
settlement_time_interval_seconds, settlement_time_interval_seconds,
origin, origin,
oracle_event_id, oracle_event_id,
fee_rate fee_rate,
) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)"#,
)
.bind(&order.id)
.bind(&order.trading_pair)
.bind(&order.position)
.bind(&order.price)
.bind(&order.min_quantity)
.bind(&order.max_quantity)
.bind(&order.leverage)
.bind(&order.liquidation_price)
.bind(&order.creation_timestamp.seconds())
.bind(&order.settlement_interval.whole_seconds())
.bind(&order.origin)
.bind(&order.oracle_event_id)
.bind(&order.fee_rate)
.execute(conn)
.await?;
if query_result.rows_affected() != 1 {
anyhow::bail!("failed to insert order");
}
Ok(())
}
pub async fn load_order_by_id(
id: OrderId,
conn: &mut PoolConnection<Sqlite>,
) -> anyhow::Result<Order> {
let row = sqlx::query!(
r#"
select
uuid as "uuid: crate::model::cfd::OrderId",
trading_pair as "trading_pair: crate::model::TradingPair",
position as "position: crate::model::Position",
initial_price as "initial_price: crate::model::Price",
min_quantity as "min_quantity: crate::model::Usd",
max_quantity as "max_quantity: crate::model::Usd",
leverage as "leverage: crate::model::Leverage",
liquidation_price as "liquidation_price: crate::model::Price",
creation_timestamp_seconds as "ts_secs: crate::model::Timestamp",
settlement_time_interval_seconds as "settlement_time_interval_secs: i64",
origin as "origin: crate::model::cfd::Origin",
oracle_event_id as "oracle_event_id: crate::model::BitMexPriceEventId",
fee_rate as "fee_rate: u32"
from
orders
where
uuid = $1
"#,
id
)
.fetch_one(conn)
.await?;
Ok(Order {
id: row.uuid,
trading_pair: row.trading_pair,
position: row.position,
price: row.initial_price,
min_quantity: row.min_quantity,
max_quantity: row.max_quantity,
leverage: row.leverage,
liquidation_price: row.liquidation_price,
creation_timestamp: row.ts_secs,
settlement_interval: Duration::new(row.settlement_time_interval_secs, 0),
origin: row.origin,
oracle_event_id: row.oracle_event_id,
fee_rate: row.fee_rate,
})
}
pub async fn insert_cfd(cfd: &Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<()> {
if load_cfd_by_order_id(cfd.order.id, conn).await.is_ok() {
bail!(
"Cannot insert cfd because there is already a cfd for order id {}",
cfd.order.id
)
}
let state = serde_json::to_string(&cfd.state)?;
let query_result = sqlx::query(
r#"
insert into cfds (
order_id,
order_uuid,
quantity_usd, quantity_usd,
counterparty counterparty
) ) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13);
select
id as order_id,
uuid as order_uuid,
$2 as quantity_usd,
$3 as counterparty
from orders
where uuid = $1;
insert into cfd_states ( insert into cfd_states (
cfd_id, cfd_id,
@ -130,17 +37,28 @@ pub async fn insert_cfd(cfd: &Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow:
) )
select select
id as cfd_id, id as cfd_id,
$4 as state $14 as state
from cfds from cfds
order by id desc limit 1; order by id desc limit 1;
"#, "#,
) )
.bind(&cfd.order.id) .bind(&cfd.id)
.bind(&cfd.trading_pair)
.bind(&cfd.position)
.bind(&cfd.price)
.bind(&cfd.leverage)
.bind(&cfd.liquidation_price)
.bind(&cfd.creation_timestamp)
.bind(&cfd.settlement_interval.whole_seconds())
.bind(&cfd.origin)
.bind(&cfd.oracle_event_id)
.bind(&cfd.fee_rate)
.bind(&cfd.quantity_usd) .bind(&cfd.quantity_usd)
.bind(&cfd.counterparty) .bind(&cfd.counterparty)
.bind(state) .bind(state)
.execute(conn) .execute(conn)
.await?; .await
.with_context(|| format!("Failed to insert CFD with id {}", cfd.id))?;
// Should be 2 because we insert into cfds and cfd_states // Should be 2 because we insert into cfds and cfd_states
if query_result.rows_affected() != 2 { if query_result.rows_affected() != 2 {
@ -151,7 +69,7 @@ pub async fn insert_cfd(cfd: &Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow:
} }
pub async fn append_cfd_state(cfd: &Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<()> { pub async fn append_cfd_state(cfd: &Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<()> {
let cfd_id = load_cfd_id_by_order_uuid(cfd.order.id, conn).await?; let cfd_id = load_cfd_id_by_order_uuid(cfd.id, conn).await?;
let current_state = load_latest_cfd_state(cfd_id, conn) let current_state = load_latest_cfd_state(cfd_id, conn)
.await .await
.context("loading latest state failed")?; .context("loading latest state failed")?;
@ -161,7 +79,7 @@ pub async fn append_cfd_state(cfd: &Cfd, conn: &mut PoolConnection<Sqlite>) -> a
// Since we have states where we add information this happens quite frequently // Since we have states where we add information this happens quite frequently
tracing::trace!( tracing::trace!(
"Same state transition for cfd with order_id {}: {}", "Same state transition for cfd with order_id {}: {}",
cfd.order.id, cfd.id,
current_state current_state
); );
} }
@ -193,7 +111,7 @@ async fn load_cfd_id_by_order_uuid(
select select
id id
from cfds from cfds
where order_uuid = $1; where cfds.uuid = $1;
"#, "#,
order_uuid order_uuid
) )
@ -234,45 +152,13 @@ pub async fn load_cfd_by_order_id(
) -> Result<Cfd> { ) -> Result<Cfd> {
let row = sqlx::query!( let row = sqlx::query!(
r#" r#"
with ord as ( with state as (
select
id as order_id,
uuid,
trading_pair,
position,
initial_price,
min_quantity,
max_quantity,
leverage,
liquidation_price,
creation_timestamp_seconds as ts_secs,
settlement_time_interval_seconds as settlement_time_interval_secs,
origin,
oracle_event_id,
fee_rate
from orders
),
cfd as (
select select
ord.order_id, cfd_id,
id as cfd_id,
quantity_usd,
counterparty
from cfds
inner join ord on ord.order_id = cfds.order_id
),
state as (
select
id as state_id,
cfd.order_id,
cfd.quantity_usd,
cfd.counterparty,
state state
from cfd_states from cfd_states
inner join cfd on cfd.cfd_id = cfd_states.cfd_id inner join cfds on cfds.id = cfd_states.cfd_id
where id in ( where cfd_states.id in (
select select
max(id) as id max(id) as id
from cfd_states from cfd_states
@ -281,54 +167,46 @@ pub async fn load_cfd_by_order_id(
) )
select select
ord.uuid as "uuid: crate::model::cfd::OrderId", cfds.uuid as "uuid: crate::model::cfd::OrderId",
ord.trading_pair as "trading_pair: crate::model::TradingPair", cfds.trading_pair as "trading_pair: crate::model::TradingPair",
ord.position as "position: crate::model::Position", cfds.position as "position: crate::model::Position",
ord.initial_price as "initial_price: crate::model::Price", cfds.initial_price as "initial_price: crate::model::Price",
ord.min_quantity as "min_quantity: crate::model::Usd", cfds.leverage as "leverage: crate::model::Leverage",
ord.max_quantity as "max_quantity: crate::model::Usd", cfds.liquidation_price as "liquidation_price: crate::model::Price",
ord.leverage as "leverage: crate::model::Leverage", cfds.creation_timestamp_seconds as "creation_timestamp_seconds: crate::model::Timestamp",
ord.liquidation_price as "liquidation_price: crate::model::Price", cfds.settlement_time_interval_seconds as "settlement_time_interval_secs: i64",
ord.ts_secs as "ts_secs: crate::model::Timestamp", cfds.origin as "origin: crate::model::cfd::Origin",
ord.settlement_time_interval_secs as "settlement_time_interval_secs: i64", cfds.oracle_event_id as "oracle_event_id: crate::model::BitMexPriceEventId",
ord.origin as "origin: crate::model::cfd::Origin", cfds.fee_rate as "fee_rate: u32",
ord.oracle_event_id as "oracle_event_id: crate::model::BitMexPriceEventId", cfds.quantity_usd as "quantity_usd: crate::model::Usd",
ord.fee_rate as "fee_rate: u32", cfds.counterparty as "counterparty: crate::model::Identity",
state.quantity_usd as "quantity_usd: crate::model::Usd",
state.counterparty as "counterparty: crate::model::Identity",
state.state state.state
from ord from cfds
inner join state on state.order_id = ord.order_id inner join state on state.cfd_id = cfds.id
where ord.uuid = $1 where cfds.uuid = $1
"#, "#,
order_id order_id
) )
.fetch_one(conn) .fetch_one(conn)
.await?; .await?;
let order = Order { // TODO:
// still have the use of serde_json::from_str() here, which will be dealt with
// via https://github.com/comit-network/hermes/issues/290
Ok(Cfd {
id: row.uuid, id: row.uuid,
trading_pair: row.trading_pair, trading_pair: row.trading_pair,
position: row.position, position: row.position,
price: row.initial_price, price: row.initial_price,
min_quantity: row.min_quantity,
max_quantity: row.max_quantity,
leverage: row.leverage, leverage: row.leverage,
liquidation_price: row.liquidation_price, liquidation_price: row.liquidation_price,
creation_timestamp: row.ts_secs, creation_timestamp: row.creation_timestamp_seconds,
settlement_interval: Duration::new(row.settlement_time_interval_secs, 0), settlement_interval: Duration::new(row.settlement_time_interval_secs, 0),
origin: row.origin, origin: row.origin,
oracle_event_id: row.oracle_event_id, oracle_event_id: row.oracle_event_id,
fee_rate: row.fee_rate, fee_rate: row.fee_rate,
};
// TODO:
// still have the use of serde_json::from_str() here, which will be dealt with
// via https://github.com/comit-network/hermes/issues/290
Ok(Cfd {
order,
quantity_usd: row.quantity_usd, quantity_usd: row.quantity_usd,
state: serde_json::from_str(row.state.as_str())?, state: serde_json::from_str(row.state.as_str())?,
counterparty: row.counterparty, counterparty: row.counterparty,
@ -339,45 +217,13 @@ pub async fn load_cfd_by_order_id(
pub async fn load_all_cfds(conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<Vec<Cfd>> { pub async fn load_all_cfds(conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<Vec<Cfd>> {
let rows = sqlx::query!( let rows = sqlx::query!(
r#" r#"
with ord as ( with state as (
select
id as order_id,
uuid,
trading_pair,
position,
initial_price,
min_quantity,
max_quantity,
leverage,
liquidation_price,
creation_timestamp_seconds as ts_secs,
settlement_time_interval_seconds as settlement_time_interval_secs,
origin,
oracle_event_id,
fee_rate
from orders
),
cfd as (
select select
ord.order_id, cfd_id,
id as cfd_id,
quantity_usd,
counterparty
from cfds
inner join ord on ord.order_id = cfds.order_id
),
state as (
select
id as state_id,
cfd.order_id,
cfd.quantity_usd,
cfd.counterparty,
state state
from cfd_states from cfd_states
inner join cfd on cfd.cfd_id = cfd_states.cfd_id inner join cfds on cfds.id = cfd_states.cfd_id
where id in ( where cfd_states.id in (
select select
max(id) as id max(id) as id
from cfd_states from cfd_states
@ -386,25 +232,23 @@ pub async fn load_all_cfds(conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<
) )
select select
ord.uuid as "uuid: crate::model::cfd::OrderId", cfds.uuid as "uuid: crate::model::cfd::OrderId",
ord.trading_pair as "trading_pair: crate::model::TradingPair", cfds.trading_pair as "trading_pair: crate::model::TradingPair",
ord.position as "position: crate::model::Position", cfds.position as "position: crate::model::Position",
ord.initial_price as "initial_price: crate::model::Price", cfds.initial_price as "initial_price: crate::model::Price",
ord.min_quantity as "min_quantity: crate::model::Usd", cfds.leverage as "leverage: crate::model::Leverage",
ord.max_quantity as "max_quantity: crate::model::Usd", cfds.liquidation_price as "liquidation_price: crate::model::Price",
ord.leverage as "leverage: crate::model::Leverage", cfds.creation_timestamp_seconds as "creation_timestamp_seconds: crate::model::Timestamp",
ord.liquidation_price as "liquidation_price: crate::model::Price", cfds.settlement_time_interval_seconds as "settlement_time_interval_secs: i64",
ord.ts_secs as "ts_secs: crate::model::Timestamp", cfds.origin as "origin: crate::model::cfd::Origin",
ord.settlement_time_interval_secs as "settlement_time_interval_secs: i64", cfds.oracle_event_id as "oracle_event_id: crate::model::BitMexPriceEventId",
ord.origin as "origin: crate::model::cfd::Origin", cfds.fee_rate as "fee_rate: u32",
ord.oracle_event_id as "oracle_event_id: crate::model::BitMexPriceEventId", cfds.quantity_usd as "quantity_usd: crate::model::Usd",
ord.fee_rate as "fee_rate: u32", cfds.counterparty as "counterparty: crate::model::Identity",
state.quantity_usd as "quantity_usd: crate::model::Usd",
state.counterparty as "counterparty: crate::model::Identity",
state.state state.state
from ord from cfds
inner join state on state.order_id = ord.order_id inner join state on state.cfd_id = cfds.id
"# "#
) )
.fetch_all(conn) .fetch_all(conn)
@ -413,24 +257,18 @@ pub async fn load_all_cfds(conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<
let cfds = rows let cfds = rows
.into_iter() .into_iter()
.map(|row| { .map(|row| {
let order = Order { Ok(Cfd {
id: row.uuid, id: row.uuid,
trading_pair: row.trading_pair, trading_pair: row.trading_pair,
position: row.position, position: row.position,
price: row.initial_price, price: row.initial_price,
min_quantity: row.min_quantity,
max_quantity: row.max_quantity,
leverage: row.leverage, leverage: row.leverage,
liquidation_price: row.liquidation_price, liquidation_price: row.liquidation_price,
creation_timestamp: row.ts_secs, creation_timestamp: row.creation_timestamp_seconds,
settlement_interval: Duration::new(row.settlement_time_interval_secs, 0), settlement_interval: Duration::new(row.settlement_time_interval_secs, 0),
origin: row.origin, origin: row.origin,
oracle_event_id: row.oracle_event_id, oracle_event_id: row.oracle_event_id,
fee_rate: row.fee_rate, fee_rate: row.fee_rate,
};
Ok(Cfd {
order,
quantity_usd: row.quantity_usd, quantity_usd: row.quantity_usd,
state: serde_json::from_str(row.state.as_str())?, state: serde_json::from_str(row.state.as_str())?,
counterparty: row.counterparty, counterparty: row.counterparty,
@ -449,45 +287,13 @@ pub async fn load_cfds_by_oracle_event_id(
let event_id = oracle_event_id.to_string(); let event_id = oracle_event_id.to_string();
let rows = sqlx::query!( let rows = sqlx::query!(
r#" r#"
with ord as ( with state as (
select
id as order_id,
uuid,
trading_pair,
position,
initial_price,
min_quantity,
max_quantity,
leverage,
liquidation_price,
creation_timestamp_seconds as ts_secs,
settlement_time_interval_seconds as settlement_time_interval_secs,
origin,
oracle_event_id,
fee_rate
from orders
),
cfd as (
select select
ord.order_id, cfd_id,
id as cfd_id,
quantity_usd,
counterparty
from cfds
inner join ord on ord.order_id = cfds.order_id
),
state as (
select
id as state_id,
cfd.order_id,
cfd.quantity_usd,
cfd.counterparty,
state state
from cfd_states from cfd_states
inner join cfd on cfd.cfd_id = cfd_states.cfd_id inner join cfds on cfds.id = cfd_states.cfd_id
where id in ( where cfd_states.id in (
select select
max(id) as id max(id) as id
from cfd_states from cfd_states
@ -496,27 +302,25 @@ pub async fn load_cfds_by_oracle_event_id(
) )
select select
ord.uuid as "uuid: crate::model::cfd::OrderId", cfds.uuid as "uuid: crate::model::cfd::OrderId",
ord.trading_pair as "trading_pair: crate::model::TradingPair", cfds.trading_pair as "trading_pair: crate::model::TradingPair",
ord.position as "position: crate::model::Position", cfds.position as "position: crate::model::Position",
ord.initial_price as "initial_price: crate::model::Price", cfds.initial_price as "initial_price: crate::model::Price",
ord.min_quantity as "min_quantity: crate::model::Usd", cfds.leverage as "leverage: crate::model::Leverage",
ord.max_quantity as "max_quantity: crate::model::Usd", cfds.liquidation_price as "liquidation_price: crate::model::Price",
ord.leverage as "leverage: crate::model::Leverage", cfds.creation_timestamp_seconds as "creation_timestamp_seconds: crate::model::Timestamp",
ord.liquidation_price as "liquidation_price: crate::model::Price", cfds.settlement_time_interval_seconds as "settlement_time_interval_secs: i64",
ord.ts_secs as "ts_secs: crate::model::Timestamp", cfds.origin as "origin: crate::model::cfd::Origin",
ord.settlement_time_interval_secs as "settlement_time_interval_secs: i64", cfds.oracle_event_id as "oracle_event_id: crate::model::BitMexPriceEventId",
ord.origin as "origin: crate::model::cfd::Origin", cfds.fee_rate as "fee_rate: u32",
ord.oracle_event_id as "oracle_event_id: crate::model::BitMexPriceEventId", cfds.quantity_usd as "quantity_usd: crate::model::Usd",
ord.fee_rate as "fee_rate: u32", cfds.counterparty as "counterparty: crate::model::Identity",
state.quantity_usd as "quantity_usd: crate::model::Usd",
state.counterparty as "counterparty: crate::model::Identity",
state.state state.state
from ord from cfds
inner join state on state.order_id = ord.order_id inner join state on state.cfd_id = cfds.id
where ord.oracle_event_id = $1 where cfds.oracle_event_id = $1
"#, "#,
event_id event_id
) )
@ -526,24 +330,18 @@ pub async fn load_cfds_by_oracle_event_id(
let cfds = rows let cfds = rows
.into_iter() .into_iter()
.map(|row| { .map(|row| {
let order = Order { Ok(Cfd {
id: row.uuid, id: row.uuid,
trading_pair: row.trading_pair, trading_pair: row.trading_pair,
position: row.position, position: row.position,
price: row.initial_price, price: row.initial_price,
min_quantity: row.min_quantity,
max_quantity: row.max_quantity,
leverage: row.leverage, leverage: row.leverage,
liquidation_price: row.liquidation_price, liquidation_price: row.liquidation_price,
creation_timestamp: row.ts_secs, creation_timestamp: row.creation_timestamp_seconds,
settlement_interval: Duration::new(row.settlement_time_interval_secs, 0), settlement_interval: Duration::new(row.settlement_time_interval_secs, 0),
origin: row.origin, origin: row.origin,
oracle_event_id: row.oracle_event_id, oracle_event_id: row.oracle_event_id,
fee_rate: row.fee_rate, fee_rate: row.fee_rate,
};
Ok(Cfd {
order,
quantity_usd: row.quantity_usd, quantity_usd: row.quantity_usd,
state: serde_json::from_str(row.state.as_str())?, state: serde_json::from_str(row.state.as_str())?,
counterparty: row.counterparty, counterparty: row.counterparty,
@ -557,7 +355,6 @@ pub async fn load_cfds_by_oracle_event_id(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::db::insert_order;
use crate::model::cfd::{Cfd, CfdState, Order, Origin}; use crate::model::cfd::{Cfd, CfdState, Order, Origin};
use crate::model::{Identity, Price, Usd}; use crate::model::{Identity, Price, Usd};
use crate::seed::Seed; use crate::seed::Seed;
@ -568,16 +365,6 @@ mod tests {
use time::macros::datetime; use time::macros::datetime;
use time::OffsetDateTime; use time::OffsetDateTime;
#[tokio::test]
async fn test_insert_and_load_order() {
let mut conn = setup_test_db().await;
let order = Order::dummy().insert(&mut conn).await;
let loaded = load_order_by_id(order.id, &mut conn).await.unwrap();
assert_eq!(order, loaded);
}
#[tokio::test] #[tokio::test]
async fn test_insert_and_load_cfd() { async fn test_insert_and_load_cfd() {
let mut conn = setup_test_db().await; let mut conn = setup_test_db().await;
@ -593,7 +380,7 @@ mod tests {
let mut conn = setup_test_db().await; let mut conn = setup_test_db().await;
let cfd = Cfd::dummy().insert(&mut conn).await; let cfd = Cfd::dummy().insert(&mut conn).await;
let loaded = load_cfd_by_order_id(cfd.order.id, &mut conn).await.unwrap(); let loaded = load_cfd_by_order_id(cfd.id, &mut conn).await.unwrap();
assert_eq!(cfd, loaded) assert_eq!(cfd, loaded)
} }
@ -605,12 +392,8 @@ mod tests {
let cfd1 = Cfd::dummy().insert(&mut conn).await; let cfd1 = Cfd::dummy().insert(&mut conn).await;
let cfd2 = Cfd::dummy().insert(&mut conn).await; let cfd2 = Cfd::dummy().insert(&mut conn).await;
let loaded_1 = load_cfd_by_order_id(cfd1.order.id, &mut conn) let loaded_1 = load_cfd_by_order_id(cfd1.id, &mut conn).await.unwrap();
.await let loaded_2 = load_cfd_by_order_id(cfd2.id, &mut conn).await.unwrap();
.unwrap();
let loaded_2 = load_cfd_by_order_id(cfd2.order.id, &mut conn)
.await
.unwrap();
assert_eq!(cfd1, loaded_1); assert_eq!(cfd1, loaded_1);
assert_eq!(cfd2, loaded_2); assert_eq!(cfd2, loaded_2);
@ -669,21 +452,6 @@ mod tests {
assert_eq!(vec![cfd_1, cfd_2], cfds_from_db); assert_eq!(vec![cfd_1, cfd_2], cfds_from_db);
} }
#[tokio::test]
async fn test_insert_order_without_cfd_associates_with_correct_cfd() {
let mut conn = setup_test_db().await;
// Insert an order without a CFD
let _order_1 = Order::dummy().insert(&mut conn).await;
// Insert a CFD (this also inserts an order)
let cfd_1 = Cfd::dummy().insert(&mut conn).await;
let all_cfds = load_all_cfds(&mut conn).await.unwrap();
assert_eq!(all_cfds, vec![cfd_1]);
}
// test more data; test will add 100 cfds to the database, with each // test more data; test will add 100 cfds to the database, with each
// having a random number of random updates. Final results are deterministic. // having a random number of random updates. Final results are deterministic.
#[tokio::test] #[tokio::test]
@ -704,7 +472,7 @@ mod tests {
} }
// verify current state is correct // verify current state is correct
let loaded_by_order_id = load_cfd_by_order_id(cfd.order.id, &mut conn).await.unwrap(); let loaded_by_order_id = load_cfd_by_order_id(cfd.id, &mut conn).await.unwrap();
assert_eq!(loaded_by_order_id, cfd); assert_eq!(loaded_by_order_id, cfd);
// load_cfds_by_oracle_event_id can return multiple CFDs // load_cfds_by_oracle_event_id can return multiple CFDs
@ -729,10 +497,10 @@ mod tests {
let error = insert_cfd(&cfd, &mut conn).await.err().unwrap(); let error = insert_cfd(&cfd, &mut conn).await.err().unwrap();
assert_eq!( assert_eq!(
error.to_string(), format!("{:#}", error),
format!( format!(
"Cannot insert cfd because there is already a cfd for order id {}", "Failed to insert CFD with id {}: error returned from database: UNIQUE constraint failed: cfds.uuid: UNIQUE constraint failed: cfds.uuid",
cfd.order.id cfd.id,
) )
); );
} }
@ -780,14 +548,13 @@ mod tests {
/// Insert this [`Cfd`] into the database, returning the instance for further chaining. /// Insert this [`Cfd`] into the database, returning the instance for further chaining.
async fn insert(self, conn: &mut PoolConnection<Sqlite>) -> Self { async fn insert(self, conn: &mut PoolConnection<Sqlite>) -> Self {
insert_order(&self.order, conn).await.unwrap();
insert_cfd(&self, conn).await.unwrap(); insert_cfd(&self, conn).await.unwrap();
self self
} }
fn with_event_id(mut self, id: BitMexPriceEventId) -> Self { fn with_event_id(mut self, id: BitMexPriceEventId) -> Self {
self.order.oracle_event_id = id; self.oracle_event_id = id;
self self
} }
} }
@ -805,12 +572,5 @@ mod tests {
) )
.unwrap() .unwrap()
} }
/// Insert this [`Order`] into the database, returning the instance for further chaining.
async fn insert(self, conn: &mut PoolConnection<Sqlite>) -> Self {
insert_order(&self, conn).await.unwrap();
self
}
} }
} }

50
daemon/src/maker_cfd.rs

@ -1,6 +1,6 @@
use crate::address_map::{AddressMap, Stopping}; use crate::address_map::{AddressMap, Stopping};
use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_update_feed}; use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_update_feed};
use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; use crate::db::load_cfd_by_order_id;
use crate::model::cfd::{ use crate::model::cfd::{
Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId, Origin, Role, RollOverProposal, Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId, Origin, Role, RollOverProposal,
SettlementKind, SettlementProposal, UpdateCfdProposal, SettlementKind, SettlementProposal, UpdateCfdProposal,
@ -84,7 +84,7 @@ pub struct Actor<O, M, T, W> {
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
projection_actor: Address<projection::Actor>, projection_actor: Address<projection::Actor>,
takers: Address<T>, takers: Address<T>,
current_order_id: Option<OrderId>, current_order: Option<Order>,
monitor_actor: Address<M>, monitor_actor: Address<M>,
setup_actors: AddressMap<OrderId, setup_maker::Actor>, setup_actors: AddressMap<OrderId, setup_maker::Actor>,
settlement_actors: AddressMap<OrderId, collab_settlement_maker::Actor>, settlement_actors: AddressMap<OrderId, collab_settlement_maker::Actor>,
@ -126,7 +126,7 @@ impl<O, M, T, W> Actor<O, M, T, W> {
oracle_pk, oracle_pk,
projection_actor, projection_actor,
takers, takers,
current_order_id: None, current_order: None,
monitor_actor, monitor_actor,
setup_actors: AddressMap::default(), setup_actors: AddressMap::default(),
roll_over_state: RollOverState::None, roll_over_state: RollOverState::None,
@ -294,20 +294,13 @@ where
T: xtra::Handler<maker_inc_connections::TakerMessage>, T: xtra::Handler<maker_inc_connections::TakerMessage>,
{ {
async fn handle_taker_connected(&mut self, taker_id: Identity) -> Result<()> { async fn handle_taker_connected(&mut self, taker_id: Identity) -> Result<()> {
let mut conn = self.db.acquire().await?;
let current_order = match self.current_order_id {
Some(current_order_id) => Some(load_order_by_id(current_order_id, &mut conn).await?),
None => None,
};
// Need to use `do_send_async` here because we are being invoked from the // Need to use `do_send_async` here because we are being invoked from the
// `maker_inc_connections::Actor`. Using `send` would result in a deadlock. // `maker_inc_connections::Actor`. Using `send` would result in a deadlock.
#[allow(clippy::disallowed_method)] #[allow(clippy::disallowed_method)]
self.takers self.takers
.do_send_async(maker_inc_connections::TakerMessage { .do_send_async(maker_inc_connections::TakerMessage {
taker_id, taker_id,
msg: wire::MakerToTaker::CurrentOrder(current_order), msg: wire::MakerToTaker::CurrentOrder(self.current_order.clone()),
}) })
.await?; .await?;
@ -339,7 +332,7 @@ where
self.takers self.takers
.send(maker_inc_connections::TakerMessage { .send(maker_inc_connections::TakerMessage {
taker_id, taker_id,
msg: wire::MakerToTaker::RejectOrder(cfd.order.id), msg: wire::MakerToTaker::RejectOrder(cfd.id),
}) })
.await??; .await??;
@ -381,10 +374,8 @@ where
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
// 1. Validate if order is still valid // 1. Validate if order is still valid
let current_order = match self.current_order_id { let current_order = match &self.current_order {
Some(current_order_id) if current_order_id == order_id => { Some(current_order) if current_order.id == order_id => current_order.clone(),
load_order_by_id(current_order_id, &mut conn).await?
}
_ => { _ => {
// An outdated order on the taker side does not require any state change on the // An outdated order on the taker side does not require any state change on the
// maker. notifying the taker with a specific message should be sufficient. // maker. notifying the taker with a specific message should be sufficient.
@ -407,7 +398,7 @@ where
// The order is removed before we update the state, because the maker might react on the // The order is removed before we update the state, because the maker might react on the
// state change. Once we know that we go for either an accept/reject scenario we // state change. Once we know that we go for either an accept/reject scenario we
// have to remove the current order. // have to remove the current order.
self.current_order_id = None; self.current_order = None;
// Need to use `do_send_async` here because invoking the // Need to use `do_send_async` here because invoking the
// corresponding handler can result in a deadlock with another // corresponding handler can result in a deadlock with another
@ -437,15 +428,16 @@ where
// state // state
let announcement = self let announcement = self
.oracle_actor .oracle_actor
.send(oracle::GetAnnouncement(cfd.order.oracle_event_id)) .send(oracle::GetAnnouncement(cfd.oracle_event_id))
.await??; .await??;
// 5. Start up contract setup actor // 5. Start up contract setup actor
let this = ctx let this = ctx
.address() .address()
.expect("actor to be able to give address to itself"); .expect("actor to be able to give address to itself");
let (addr, fut) = setup_maker::Actor::new( let (addr, fut) = setup_maker::Actor::new(
(cfd.order, cfd.quantity_usd, self.n_payouts), (cfd, current_order, self.n_payouts),
(self.oracle_pk, announcement), (self.oracle_pk, announcement),
&self.wallet, &self.wallet,
&self.wallet, &self.wallet,
@ -678,7 +670,7 @@ where
let dlc = cfd.open_dlc().context("CFD was in wrong state")?; let dlc = cfd.open_dlc().context("CFD was in wrong state")?;
let oracle_event_id = oracle::next_announcement_after( let oracle_event_id = oracle::next_announcement_after(
time::OffsetDateTime::now_utc() + cfd.order.settlement_interval, time::OffsetDateTime::now_utc() + cfd.settlement_interval,
)?; )?;
let announcement = self let announcement = self
.oracle_actor .oracle_actor
@ -707,11 +699,11 @@ where
receiver, receiver,
(self.oracle_pk, announcement), (self.oracle_pk, announcement),
RolloverParams::new( RolloverParams::new(
cfd.order.price, cfd.price,
cfd.quantity_usd, cfd.quantity_usd,
cfd.order.leverage, cfd.leverage,
cfd.refund_timelock_in_blocks(), cfd.refund_timelock_in_blocks(),
cfd.order.fee_rate, cfd.fee_rate,
), ),
Role::Maker, Role::Maker,
dlc, dlc,
@ -874,19 +866,15 @@ where
fee_rate, fee_rate,
)?; )?;
// 1. Save to DB // 1. Update actor state to current order
let mut conn = self.db.acquire().await?; self.current_order.replace(order.clone());
insert_order(&order, &mut conn).await?;
// 2. Update actor state to current order
self.current_order_id.replace(order.id);
// 3. Notify UI via feed // 2. Notify UI via feed
self.projection_actor self.projection_actor
.send(projection::Update(Some(order.clone()))) .send(projection::Update(Some(order.clone())))
.await?; .await?;
// 4. Inform connected takers // 3. Inform connected takers
self.takers self.takers
.send(maker_inc_connections::BroadcastOrder(Some(order))) .send(maker_inc_connections::BroadcastOrder(Some(order)))
.await?; .await?;

1
daemon/src/model.rs

@ -533,6 +533,7 @@ impl str::FromStr for BitMexPriceEventId {
impl_sqlx_type_display_from_str!(BitMexPriceEventId); impl_sqlx_type_display_from_str!(BitMexPriceEventId);
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, sqlx::Type)] #[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, sqlx::Type)]
#[sqlx(transparent)]
pub struct Timestamp(i64); pub struct Timestamp(i64);
impl Timestamp { impl Timestamp {

75
daemon/src/model/cfd.rs

@ -574,7 +574,30 @@ pub type UpdateCfdProposals = HashMap<OrderId, UpdateCfdProposal>;
/// Represents a cfd (including state) /// Represents a cfd (including state)
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Cfd { pub struct Cfd {
pub order: Order, pub id: OrderId,
pub trading_pair: TradingPair,
pub position: Position,
pub price: Price,
pub leverage: Leverage,
pub liquidation_price: Price,
pub creation_timestamp: Timestamp,
/// The duration that will be used for calculating the settlement timestamp
pub settlement_interval: Duration,
pub origin: Origin,
/// The id of the event to be used for price attestation
///
/// The maker includes this into the Order based on the Oracle announcement to be used.
pub oracle_event_id: BitMexPriceEventId,
pub fee_rate: u32,
pub quantity_usd: Usd, pub quantity_usd: Usd,
pub state: CfdState, pub state: CfdState,
@ -586,19 +609,27 @@ pub struct Cfd {
impl Cfd { impl Cfd {
pub fn new(order: Order, quantity: Usd, state: CfdState, counterparty: Identity) -> Self { pub fn new(order: Order, quantity: Usd, state: CfdState, counterparty: Identity) -> Self {
Cfd { Cfd {
order, id: order.id,
quantity_usd: quantity, quantity_usd: quantity,
state, state,
trading_pair: order.trading_pair,
position: order.position,
price: order.price,
leverage: order.leverage,
liquidation_price: order.liquidation_price,
creation_timestamp: Timestamp::now(),
settlement_interval: order.settlement_interval,
origin: order.origin,
oracle_event_id: order.oracle_event_id,
fee_rate: order.fee_rate,
counterparty, counterparty,
} }
} }
pub fn margin(&self) -> Result<Amount> { pub fn margin(&self) -> Result<Amount> {
let margin = match self.position() { let margin = match self.position() {
Position::Long => { Position::Long => calculate_long_margin(self.price, self.quantity_usd, self.leverage),
calculate_long_margin(self.order.price, self.quantity_usd, self.order.leverage) Position::Short => calculate_short_margin(self.price, self.quantity_usd),
}
Position::Short => calculate_short_margin(self.order.price, self.quantity_usd),
}; };
Ok(margin) Ok(margin)
@ -606,10 +637,8 @@ impl Cfd {
pub fn counterparty_margin(&self) -> Result<Amount> { pub fn counterparty_margin(&self) -> Result<Amount> {
let margin = match self.position() { let margin = match self.position() {
Position::Long => calculate_short_margin(self.order.price, self.quantity_usd), Position::Long => calculate_short_margin(self.price, self.quantity_usd),
Position::Short => { Position::Short => calculate_long_margin(self.price, self.quantity_usd, self.leverage),
calculate_long_margin(self.order.price, self.quantity_usd, self.order.leverage)
}
}; };
Ok(margin) Ok(margin)
@ -624,10 +653,10 @@ impl Cfd {
}; };
let (p_n_l, p_n_l_percent) = calculate_profit( let (p_n_l, p_n_l_percent) = calculate_profit(
self.order.price, self.price,
closing_price, closing_price,
self.quantity_usd, self.quantity_usd,
self.order.leverage, self.leverage,
self.position(), self.position(),
)?; )?;
@ -639,12 +668,8 @@ impl Cfd {
current_price: Price, current_price: Price,
n_payouts: usize, n_payouts: usize,
) -> Result<SettlementProposal> { ) -> Result<SettlementProposal> {
let payout_curve = payout_curve::calculate( let payout_curve =
self.order.price, payout_curve::calculate(self.price, self.quantity_usd, self.leverage, n_payouts)?;
self.quantity_usd,
self.order.leverage,
n_payouts,
)?;
let payout = { let payout = {
let current_price = current_price.try_into_u64()?; let current_price = current_price.try_into_u64()?;
@ -655,7 +680,7 @@ impl Cfd {
}; };
let settlement = SettlementProposal { let settlement = SettlementProposal {
order_id: self.order.id, order_id: self.id,
timestamp: Timestamp::now(), timestamp: Timestamp::now(),
taker: *payout.taker_amount(), taker: *payout.taker_amount(),
maker: *payout.maker_amount(), maker: *payout.maker_amount(),
@ -666,11 +691,11 @@ impl Cfd {
} }
pub fn position(&self) -> Position { pub fn position(&self) -> Position {
match self.order.origin { match self.origin {
Origin::Ours => self.order.position, Origin::Ours => self.position,
// If the order is not our own we take the counter-position in the CFD // If the order is not our own we take the counter-position in the CFD
Origin::Theirs => match self.order.position { Origin::Theirs => match self.position {
Position::Long => Position::Short, Position::Long => Position::Short,
Position::Short => Position::Long, Position::Short => Position::Long,
}, },
@ -678,7 +703,7 @@ impl Cfd {
} }
pub fn refund_timelock_in_blocks(&self) -> u32 { pub fn refund_timelock_in_blocks(&self) -> u32 {
(self.order.settlement_interval * Cfd::REFUND_THRESHOLD) (self.settlement_interval * Cfd::REFUND_THRESHOLD)
.as_blocks() .as_blocks()
.ceil() as u32 .ceil() as u32
} }
@ -708,7 +733,7 @@ impl Cfd {
pub fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<Option<CfdState>> { pub fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<Option<CfdState>> {
use CfdState::*; use CfdState::*;
let order_id = self.order.id; let order_id = self.id;
// early exit if already final // early exit if already final
if let SetupFailed { .. } | Closed { .. } | Refunded { .. } = self.state.clone() { if let SetupFailed { .. } | Closed { .. } | Refunded { .. } = self.state.clone() {
@ -1152,7 +1177,7 @@ impl Cfd {
} }
pub fn role(&self) -> Role { pub fn role(&self) -> Role {
self.order.origin.into() self.origin.into()
} }
pub fn dlc(&self) -> Option<Dlc> { pub fn dlc(&self) -> Option<Dlc> {

56
daemon/src/monitor.rs

@ -79,66 +79,66 @@ impl Actor<bdk::electrum_client::Client> {
// In PendingOpen we know the complete dlc setup and assume that the lock transaction will be published // In PendingOpen we know the complete dlc setup and assume that the lock transaction will be published
CfdState::PendingOpen { dlc, .. } => { CfdState::PendingOpen { dlc, .. } => {
let params = MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()); let params = MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.order.id, params.clone()); actor.cfds.insert(cfd.id, params.clone());
actor.monitor_all(&params, cfd.order.id); actor.monitor_all(&params, cfd.id);
} }
CfdState::Open { dlc, .. } | CfdState::PendingCommit { dlc, .. } => { CfdState::Open { dlc, .. } | CfdState::PendingCommit { dlc, .. } => {
let params = MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()); let params = MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.order.id, params.clone()); actor.cfds.insert(cfd.id, params.clone());
actor.monitor_commit_finality(&params, cfd.order.id); actor.monitor_commit_finality(&params, cfd.id);
actor.monitor_commit_cet_timelock(&params, cfd.order.id); actor.monitor_commit_cet_timelock(&params, cfd.id);
actor.monitor_commit_refund_timelock(&params, cfd.order.id); actor.monitor_commit_refund_timelock(&params, cfd.id);
actor.monitor_refund_finality(&params,cfd.order.id); actor.monitor_refund_finality(&params,cfd.id);
if let Some(model::cfd::CollaborativeSettlement { tx, ..} if let Some(model::cfd::CollaborativeSettlement { tx, ..}
) = cfd.state.get_collaborative_close() { ) = cfd.state.get_collaborative_close() {
let close_params = (tx.txid(), let close_params = (tx.txid(),
tx.output.first().context("transaction has zero outputs")?.script_pubkey.clone()); tx.output.first().context("transaction has zero outputs")?.script_pubkey.clone());
actor.monitor_close_finality(close_params,cfd.order.id); actor.monitor_close_finality(close_params,cfd.id);
} }
} }
CfdState::OpenCommitted { dlc, cet_status, .. } => { CfdState::OpenCommitted { dlc, cet_status, .. } => {
let params = MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()); let params = MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.order.id, params.clone()); actor.cfds.insert(cfd.id, params.clone());
match cet_status { match cet_status {
CetStatus::Unprepared => { CetStatus::Unprepared => {
actor.monitor_commit_cet_timelock(&params, cfd.order.id); actor.monitor_commit_cet_timelock(&params, cfd.id);
actor.monitor_commit_refund_timelock(&params, cfd.order.id); actor.monitor_commit_refund_timelock(&params, cfd.id);
actor.monitor_refund_finality(&params,cfd.order.id); actor.monitor_refund_finality(&params,cfd.id);
} }
CetStatus::OracleSigned(attestation) => { CetStatus::OracleSigned(attestation) => {
actor.monitor_cet_finality(map_cets(dlc.cets), attestation.into(), cfd.order.id)?; actor.monitor_cet_finality(map_cets(dlc.cets), attestation.into(), cfd.id)?;
actor.monitor_commit_cet_timelock(&params, cfd.order.id); actor.monitor_commit_cet_timelock(&params, cfd.id);
actor.monitor_commit_refund_timelock(&params, cfd.order.id); actor.monitor_commit_refund_timelock(&params, cfd.id);
actor.monitor_refund_finality(&params,cfd.order.id); actor.monitor_refund_finality(&params,cfd.id);
} }
CetStatus::TimelockExpired => { CetStatus::TimelockExpired => {
actor.monitor_commit_refund_timelock(&params, cfd.order.id); actor.monitor_commit_refund_timelock(&params, cfd.id);
actor.monitor_refund_finality(&params,cfd.order.id); actor.monitor_refund_finality(&params,cfd.id);
} }
CetStatus::Ready(attestation) => { CetStatus::Ready(attestation) => {
actor.monitor_cet_finality(map_cets(dlc.cets), attestation.into(), cfd.order.id)?; actor.monitor_cet_finality(map_cets(dlc.cets), attestation.into(), cfd.id)?;
actor.monitor_commit_refund_timelock(&params, cfd.order.id); actor.monitor_commit_refund_timelock(&params, cfd.id);
actor.monitor_refund_finality(&params,cfd.order.id); actor.monitor_refund_finality(&params,cfd.id);
} }
} }
} }
CfdState::PendingCet { dlc, attestation, .. } => { CfdState::PendingCet { dlc, attestation, .. } => {
let params = MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()); let params = MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.order.id, params.clone()); actor.cfds.insert(cfd.id, params.clone());
actor.monitor_cet_finality(map_cets(dlc.cets), attestation.into(), cfd.order.id)?; actor.monitor_cet_finality(map_cets(dlc.cets), attestation.into(), cfd.id)?;
actor.monitor_commit_refund_timelock(&params, cfd.order.id); actor.monitor_commit_refund_timelock(&params, cfd.id);
actor.monitor_refund_finality(&params,cfd.order.id); actor.monitor_refund_finality(&params,cfd.id);
} }
CfdState::PendingRefund { dlc, .. } => { CfdState::PendingRefund { dlc, .. } => {
let params = MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()); let params = MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks());
actor.cfds.insert(cfd.order.id, params.clone()); actor.cfds.insert(cfd.id, params.clone());
actor.monitor_commit_refund_timelock(&params, cfd.order.id); actor.monitor_commit_refund_timelock(&params, cfd.id);
actor.monitor_refund_finality(&params,cfd.order.id); actor.monitor_refund_finality(&params,cfd.id);
} }
// too early to monitor // too early to monitor

16
daemon/src/projection.rs

@ -520,21 +520,21 @@ impl From<CfdsWithAuxData> for Vec<Cfd> {
} }
}) })
.unwrap_or_else(|| { .unwrap_or_else(|| {
tracing::debug!(order_id = %cfd.order.id, "Unable to calculate profit/loss without current price"); tracing::debug!(order_id = %cfd.id, "Unable to calculate profit/loss without current price");
(None, None) (None, None)
}); });
let pending_proposal = input.pending_proposals.get(&cfd.order.id); let pending_proposal = input.pending_proposals.get(&cfd.id);
let state = to_cfd_state(&cfd.state, pending_proposal); let state = to_cfd_state(&cfd.state, pending_proposal);
Cfd { Cfd {
order_id: cfd.order.id, order_id: cfd.id,
initial_price: cfd.order.price.into(), initial_price: cfd.price.into(),
leverage: cfd.order.leverage, leverage: cfd.leverage,
trading_pair: cfd.order.trading_pair.clone(), trading_pair: cfd.trading_pair.clone(),
position: cfd.position(), position: cfd.position(),
liquidation_price: cfd.order.liquidation_price.into(), liquidation_price: cfd.liquidation_price.into(),
quantity_usd: cfd.quantity_usd.into(), quantity_usd: cfd.quantity_usd.into(),
profit_btc, profit_btc,
profit_percent, profit_percent,
@ -548,7 +548,7 @@ impl From<CfdsWithAuxData> for Vec<Cfd> {
margin_counterparty: cfd.counterparty_margin().expect("margin to be available"), margin_counterparty: cfd.counterparty_margin().expect("margin to be available"),
details: to_cfd_details(cfd, network), details: to_cfd_details(cfd, network),
expiry_timestamp: match cfd.expiry_timestamp() { expiry_timestamp: match cfd.expiry_timestamp() {
None => cfd.order.oracle_event_id.timestamp(), None => cfd.oracle_event_id.timestamp(),
Some(timestamp) => timestamp, Some(timestamp) => timestamp,
}, },
counterparty: cfd.counterparty.into(), counterparty: cfd.counterparty.into(),

28
daemon/src/rollover_taker.rs

@ -58,7 +58,7 @@ impl Actor {
async fn propose(&self, this: xtra::Address<Self>) -> Result<()> { async fn propose(&self, this: xtra::Address<Self>) -> Result<()> {
self.maker self.maker
.send(connection::ProposeRollOver { .send(connection::ProposeRollOver {
order_id: self.cfd.order.id, order_id: self.cfd.id,
timestamp: self.timestamp, timestamp: self.timestamp,
address: this, address: this,
}) })
@ -66,7 +66,7 @@ impl Actor {
self.update_proposal(Some(( self.update_proposal(Some((
RollOverProposal { RollOverProposal {
order_id: self.cfd.order.id, order_id: self.cfd.id,
timestamp: self.timestamp, timestamp: self.timestamp,
}, },
SettlementKind::Outgoing, SettlementKind::Outgoing,
@ -88,7 +88,7 @@ impl Actor {
.await? .await?
.with_context(|| format!("Announcement {} not found", oracle_event_id))?; .with_context(|| format!("Announcement {} not found", oracle_event_id))?;
let order_id = self.cfd.order.id; let order_id = self.cfd.id;
tracing::info!(%order_id, "Rollover proposal got accepted"); tracing::info!(%order_id, "Rollover proposal got accepted");
self.update_proposal(None).await?; self.update_proposal(None).await?;
@ -104,11 +104,11 @@ impl Actor {
receiver, receiver,
(self.oracle_pk, announcement), (self.oracle_pk, announcement),
RolloverParams::new( RolloverParams::new(
self.cfd.order.price, self.cfd.price,
self.cfd.quantity_usd, self.cfd.quantity_usd,
self.cfd.order.leverage, self.cfd.leverage,
self.cfd.refund_timelock_in_blocks(), self.cfd.refund_timelock_in_blocks(),
self.cfd.order.fee_rate, self.cfd.fee_rate,
), ),
Role::Taker, Role::Taker,
self.cfd.dlc().context("No DLC in CFD")?, self.cfd.dlc().context("No DLC in CFD")?,
@ -129,7 +129,7 @@ impl Actor {
} }
async fn handle_rejected(&self) -> Result<()> { async fn handle_rejected(&self) -> Result<()> {
let order_id = self.cfd.order.id; let order_id = self.cfd.id;
tracing::info!(%order_id, "Rollover proposal got rejected"); tracing::info!(%order_id, "Rollover proposal got rejected");
self.update_proposal(None).await?; self.update_proposal(None).await?;
@ -153,7 +153,7 @@ impl Actor {
) -> Result<()> { ) -> Result<()> {
self.projection self.projection
.send(UpdateRollOverProposal { .send(UpdateRollOverProposal {
order: self.cfd.order.id, order: self.cfd.id,
proposal, proposal,
}) })
.await?; .await?;
@ -175,7 +175,7 @@ impl xtra::Actor for Actor {
if let Err(e) = self.propose(this).await { if let Err(e) = self.propose(this).await {
self.complete( self.complete(
Completed::Failed { Completed::Failed {
order_id: self.cfd.order.id, order_id: self.cfd.id,
error: e, error: e,
}, },
ctx, ctx,
@ -207,7 +207,7 @@ impl Actor {
if let Err(error) = self.handle_confirmed(msg, ctx).await { if let Err(error) = self.handle_confirmed(msg, ctx).await {
self.complete( self.complete(
Completed::Failed { Completed::Failed {
order_id: self.cfd.order.id, order_id: self.cfd.id,
error, error,
}, },
ctx, ctx,
@ -217,7 +217,7 @@ impl Actor {
} }
pub async fn reject_rollover(&mut self, _: RollOverRejected, ctx: &mut xtra::Context<Self>) { pub async fn reject_rollover(&mut self, _: RollOverRejected, ctx: &mut xtra::Context<Self>) {
let order_id = self.cfd.order.id; let order_id = self.cfd.id;
let completed = if let Err(error) = self.handle_rejected().await { let completed = if let Err(error) = self.handle_rejected().await {
Completed::Failed { order_id, error } Completed::Failed { order_id, error }
} else { } else {
@ -234,7 +234,7 @@ impl Actor {
) { ) {
self.complete( self.complete(
Completed::UpdatedContract { Completed::UpdatedContract {
order_id: self.cfd.order.id, order_id: self.cfd.id,
dlc: msg.dlc, dlc: msg.dlc,
}, },
ctx, ctx,
@ -249,7 +249,7 @@ impl Actor {
) { ) {
self.complete( self.complete(
Completed::Failed { Completed::Failed {
order_id: self.cfd.order.id, order_id: self.cfd.id,
error: msg.error, error: msg.error,
}, },
ctx, ctx,
@ -265,7 +265,7 @@ impl Actor {
if let Err(error) = self.forward_protocol_msg(msg).await { if let Err(error) = self.forward_protocol_msg(msg).await {
self.complete( self.complete(
Completed::Failed { Completed::Failed {
order_id: self.cfd.order.id, order_id: self.cfd.id,
error, error,
}, },
ctx, ctx,

50
daemon/src/setup_maker.rs

@ -1,6 +1,6 @@
use crate::address_map::{ActorName, Stopping}; use crate::address_map::{ActorName, Stopping};
use crate::model::cfd::{Cfd, CfdState, Dlc, Order, OrderId, Role}; use crate::model::cfd::{Cfd, Dlc, Order, OrderId, Role};
use crate::model::{Identity, Usd}; use crate::model::Identity;
use crate::oracle::Announcement; use crate::oracle::Announcement;
use crate::setup_contract::{self, SetupParams}; use crate::setup_contract::{self, SetupParams};
use crate::tokio_ext::spawn_fallible; use crate::tokio_ext::spawn_fallible;
@ -15,8 +15,8 @@ use xtra::prelude::MessageChannel;
use xtra_productivity::xtra_productivity; use xtra_productivity::xtra_productivity;
pub struct Actor { pub struct Actor {
cfd: Cfd,
order: Order, order: Order,
quantity: Usd,
n_payouts: usize, n_payouts: usize,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
announcement: Announcement, announcement: Announcement,
@ -32,7 +32,7 @@ pub struct Actor {
impl Actor { impl Actor {
pub fn new( pub fn new(
(order, quantity, n_payouts): (Order, Usd, usize), (cfd, order, n_payouts): (Cfd, Order, usize),
(oracle_pk, announcement): (schnorrsig::PublicKey, Announcement), (oracle_pk, announcement): (schnorrsig::PublicKey, Announcement),
build_party_params: &(impl MessageChannel<wallet::BuildPartyParams> + 'static), build_party_params: &(impl MessageChannel<wallet::BuildPartyParams> + 'static),
sign: &(impl MessageChannel<wallet::Sign> + 'static), sign: &(impl MessageChannel<wallet::Sign> + 'static),
@ -48,8 +48,8 @@ impl Actor {
), ),
) -> Self { ) -> Self {
Self { Self {
cfd,
order, order,
quantity,
n_payouts, n_payouts,
oracle_pk, oracle_pk,
announcement, announcement,
@ -65,13 +65,7 @@ impl Actor {
} }
async fn contract_setup(&mut self, this: xtra::Address<Self>) -> Result<()> { async fn contract_setup(&mut self, this: xtra::Address<Self>) -> Result<()> {
let order_id = self.order.id; let order_id = self.cfd.id;
let cfd = Cfd::new(
self.order.clone(),
self.quantity,
CfdState::contract_setup(),
self.taker_id,
);
let (sender, receiver) = mpsc::unbounded(); let (sender, receiver) = mpsc::unbounded();
// store the writing end to forward messages from the taker to // store the writing end to forward messages from the taker to
@ -89,13 +83,13 @@ impl Actor {
receiver, receiver,
(self.oracle_pk, self.announcement.clone()), (self.oracle_pk, self.announcement.clone()),
SetupParams::new( SetupParams::new(
cfd.margin()?, self.cfd.margin()?,
cfd.counterparty_margin()?, self.cfd.counterparty_margin()?,
cfd.order.price, self.cfd.price,
cfd.quantity_usd, self.cfd.quantity_usd,
cfd.order.leverage, self.cfd.leverage,
cfd.refund_timelock_in_blocks(), self.cfd.refund_timelock_in_blocks(),
cfd.order.fee_rate, self.cfd.fee_rate,
), ),
self.build_party_params.clone_channel(), self.build_party_params.clone_channel(),
self.sign.clone_channel(), self.sign.clone_channel(),
@ -125,7 +119,7 @@ impl Actor {
#[xtra_productivity] #[xtra_productivity]
impl Actor { impl Actor {
fn handle(&mut self, _msg: Accepted, ctx: &mut xtra::Context<Self>) { fn handle(&mut self, _msg: Accepted, ctx: &mut xtra::Context<Self>) {
let order_id = self.order.id; let order_id = self.cfd.id;
tracing::info!(%order_id, "Maker accepts an order"); tracing::info!(%order_id, "Maker accepts an order");
let this = ctx let this = ctx
@ -160,7 +154,7 @@ impl Actor {
} }
fn handle(&mut self, _msg: Rejected, ctx: &mut xtra::Context<Self>) { fn handle(&mut self, _msg: Rejected, ctx: &mut xtra::Context<Self>) {
self.complete(Completed::Rejected(self.order.id), ctx).await; self.complete(Completed::Rejected(self.cfd.id), ctx).await;
} }
fn handle(&mut self, msg: SetupSucceeded, ctx: &mut xtra::Context<Self>) { fn handle(&mut self, msg: SetupSucceeded, ctx: &mut xtra::Context<Self>) {
@ -202,25 +196,25 @@ impl Actor {
#[async_trait] #[async_trait]
impl xtra::Actor for Actor { impl xtra::Actor for Actor {
async fn started(&mut self, ctx: &mut xtra::Context<Self>) { async fn started(&mut self, ctx: &mut xtra::Context<Self>) {
let quantity = self.quantity; let quantity = self.cfd.quantity_usd;
let order = self.order.clone(); let cfd = self.cfd.clone();
if quantity < order.min_quantity || quantity > order.max_quantity { if quantity < self.order.min_quantity || quantity > self.order.max_quantity {
tracing::info!( tracing::info!(
"Order rejected: quantity {} not in range [{}, {}]", "Order rejected: quantity {} not in range [{}, {}]",
quantity, quantity,
order.min_quantity, self.order.min_quantity,
order.max_quantity self.order.max_quantity
); );
let _ = self let _ = self
.taker .taker
.send(maker_inc_connections::TakerMessage { .send(maker_inc_connections::TakerMessage {
taker_id: self.taker_id, taker_id: self.taker_id,
msg: wire::MakerToTaker::RejectOrder(order.id), msg: wire::MakerToTaker::RejectOrder(cfd.id),
}) })
.await; .await;
self.complete(Completed::Rejected(order.id), ctx).await; self.complete(Completed::Rejected(cfd.id), ctx).await;
} }
} }

48
daemon/src/setup_taker.rs

@ -1,5 +1,4 @@
use crate::model::cfd::{Cfd, CfdState, Completed, Dlc, Order, OrderId, Role}; use crate::model::cfd::{Cfd, CfdState, Completed, Dlc, OrderId, Role};
use crate::model::{Identity, Usd};
use crate::oracle::Announcement; use crate::oracle::Announcement;
use crate::setup_contract::{self, SetupParams}; use crate::setup_contract::{self, SetupParams};
use crate::tokio_ext::spawn_fallible; use crate::tokio_ext::spawn_fallible;
@ -14,8 +13,7 @@ use xtra::prelude::*;
use xtra_productivity::xtra_productivity; use xtra_productivity::xtra_productivity;
pub struct Actor { pub struct Actor {
order: Order, cfd: Cfd,
quantity: Usd,
n_payouts: usize, n_payouts: usize,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
announcement: Announcement, announcement: Announcement,
@ -25,24 +23,21 @@ pub struct Actor {
on_accepted: Box<dyn MessageChannel<Started>>, on_accepted: Box<dyn MessageChannel<Started>>,
on_completed: Box<dyn MessageChannel<Completed>>, on_completed: Box<dyn MessageChannel<Completed>>,
setup_msg_sender: Option<UnboundedSender<SetupMsg>>, setup_msg_sender: Option<UnboundedSender<SetupMsg>>,
maker_identity: Identity,
} }
impl Actor { impl Actor {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
(order, quantity, n_payouts): (Order, Usd, usize), (cfd, n_payouts): (Cfd, usize),
(oracle_pk, announcement): (schnorrsig::PublicKey, Announcement), (oracle_pk, announcement): (schnorrsig::PublicKey, Announcement),
build_party_params: &(impl MessageChannel<wallet::BuildPartyParams> + 'static), build_party_params: &(impl MessageChannel<wallet::BuildPartyParams> + 'static),
sign: &(impl MessageChannel<wallet::Sign> + 'static), sign: &(impl MessageChannel<wallet::Sign> + 'static),
maker: xtra::Address<connection::Actor>, maker: xtra::Address<connection::Actor>,
on_accepted: &(impl MessageChannel<Started> + 'static), on_accepted: &(impl MessageChannel<Started> + 'static),
on_completed: &(impl MessageChannel<Completed> + 'static), on_completed: &(impl MessageChannel<Completed> + 'static),
maker_identity: Identity,
) -> Self { ) -> Self {
Self { Self {
order, cfd,
quantity,
n_payouts, n_payouts,
oracle_pk, oracle_pk,
announcement, announcement,
@ -52,7 +47,6 @@ impl Actor {
on_accepted: on_accepted.clone_channel(), on_accepted: on_accepted.clone_channel(),
on_completed: on_completed.clone_channel(), on_completed: on_completed.clone_channel(),
setup_msg_sender: None, setup_msg_sender: None,
maker_identity,
} }
} }
} }
@ -60,20 +54,16 @@ impl Actor {
#[xtra_productivity] #[xtra_productivity]
impl Actor { impl Actor {
fn handle(&mut self, _: Accepted, ctx: &mut xtra::Context<Self>) -> Result<()> { fn handle(&mut self, _: Accepted, ctx: &mut xtra::Context<Self>) -> Result<()> {
let order_id = self.order.id; let order_id = self.cfd.id;
self.cfd.state = CfdState::contract_setup();
tracing::info!(%order_id, "Order got accepted"); tracing::info!(%order_id, "Order got accepted");
// inform the `taker_cfd::Actor` about the start of contract // inform the `taker_cfd::Actor` about the start of contract
// setup, so that the db and UI can be updated accordingly // setup, so that the db and UI can be updated accordingly
self.on_accepted.send(Started(order_id)).await?; self.on_accepted.send(Started(order_id)).await?;
let cfd = Cfd::new(
self.order.clone(),
self.quantity,
CfdState::contract_setup(),
self.maker_identity,
);
let (sender, receiver) = mpsc::unbounded::<SetupMsg>(); let (sender, receiver) = mpsc::unbounded::<SetupMsg>();
// store the writing end to forward messages from the maker to // store the writing end to forward messages from the maker to
// the spawned contract setup task // the spawned contract setup task
@ -85,13 +75,13 @@ impl Actor {
receiver, receiver,
(self.oracle_pk, self.announcement.clone()), (self.oracle_pk, self.announcement.clone()),
SetupParams::new( SetupParams::new(
cfd.margin()?, self.cfd.margin()?,
cfd.counterparty_margin()?, self.cfd.counterparty_margin()?,
cfd.order.price, self.cfd.price,
cfd.quantity_usd, self.cfd.quantity_usd,
cfd.order.leverage, self.cfd.leverage,
cfd.refund_timelock_in_blocks(), self.cfd.refund_timelock_in_blocks(),
cfd.order.fee_rate, self.cfd.fee_rate,
), ),
self.build_party_params.clone_channel(), self.build_party_params.clone_channel(),
self.sign.clone_channel(), self.sign.clone_channel(),
@ -113,7 +103,7 @@ impl Actor {
} }
fn handle(&mut self, msg: Rejected, ctx: &mut xtra::Context<Self>) -> Result<()> { fn handle(&mut self, msg: Rejected, ctx: &mut xtra::Context<Self>) -> Result<()> {
let order_id = self.order.id; let order_id = self.cfd.id;
tracing::info!(%order_id, "Order got rejected"); tracing::info!(%order_id, "Order got rejected");
if msg.is_invalid_order { if msg.is_invalid_order {
@ -175,14 +165,14 @@ impl xtra::Actor for Actor {
let res = self let res = self
.maker .maker
.send(connection::TakeOrder { .send(connection::TakeOrder {
order_id: self.order.id, order_id: self.cfd.id,
quantity: self.quantity, quantity: self.cfd.quantity_usd,
address, address,
}) })
.await; .await;
if let Err(e) = res { if let Err(e) = res {
tracing::warn!(%self.order.id, "Stopping setup_taker actor: {}", e); tracing::warn!(%self.cfd.id, "Stopping setup_taker actor: {}", e);
ctx.stop() ctx.stop()
} }
} }

21
daemon/src/taker_cfd.rs

@ -1,6 +1,6 @@
use crate::address_map::{AddressMap, Stopping}; use crate::address_map::{AddressMap, Stopping};
use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_update_feed}; use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_update_feed};
use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; use crate::db::load_cfd_by_order_id;
use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Completed, Order, OrderId, Origin, Role}; use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Completed, Order, OrderId, Origin, Role};
use crate::model::{Identity, Price, Usd}; use crate::model::{Identity, Price, Usd};
use crate::monitor::{self, MonitorParams}; use crate::monitor::{self, MonitorParams};
@ -48,6 +48,7 @@ pub struct Actor<O, M, W> {
oracle_actor: Address<O>, oracle_actor: Address<O>,
n_payouts: usize, n_payouts: usize,
tasks: Tasks, tasks: Tasks,
current_order: Option<Order>,
maker_identity: Identity, maker_identity: Identity,
} }
@ -82,6 +83,7 @@ where
collab_settlement_actors: AddressMap::default(), collab_settlement_actors: AddressMap::default(),
rollover_actors: AddressMap::default(), rollover_actors: AddressMap::default(),
tasks: Tasks::default(), tasks: Tasks::default(),
current_order: None,
maker_identity, maker_identity,
} }
} }
@ -190,10 +192,7 @@ impl<O, M, W> Actor<O, M, W> {
bail!("Received order {} from maker, but already have a cfd in the database for that order. The maker did not properly remove the order.", order.id) bail!("Received order {} from maker, but already have a cfd in the database for that order. The maker did not properly remove the order.", order.id)
} }
if load_order_by_id(order.id, &mut conn).await.is_err() { self.current_order = Some(order.clone());
// only insert the order if we don't know it yet
insert_order(&order, &mut conn).await?;
}
self.projection_actor self.projection_actor
.send(projection::Update(Some(order))) .send(projection::Update(Some(order)))
@ -290,7 +289,10 @@ where
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
let current_order = load_order_by_id(order_id, &mut conn).await?; let current_order = self
.current_order
.clone()
.context("No current order from maker")?;
tracing::info!("Taking current order: {:?}", &current_order); tracing::info!("Taking current order: {:?}", &current_order);
@ -310,22 +312,21 @@ where
let announcement = self let announcement = self
.oracle_actor .oracle_actor
.send(oracle::GetAnnouncement(cfd.order.oracle_event_id)) .send(oracle::GetAnnouncement(cfd.oracle_event_id))
.await? .await?
.with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?; .with_context(|| format!("Announcement {} not found", cfd.oracle_event_id))?;
let this = ctx let this = ctx
.address() .address()
.expect("actor to be able to give address to itself"); .expect("actor to be able to give address to itself");
let (addr, fut) = setup_taker::Actor::new( let (addr, fut) = setup_taker::Actor::new(
(current_order, quantity, self.n_payouts), (cfd, self.n_payouts),
(self.oracle_pk, announcement), (self.oracle_pk, announcement),
&self.wallet, &self.wallet,
&self.wallet, &self.wallet,
self.conn_actor.clone(), self.conn_actor.clone(),
&this, &this,
&this, &this,
self.maker_identity,
) )
.create(None) .create(None)
.run(); .run();

Loading…
Cancel
Save