Browse Source

DB code rebasing for #88

The primary goal was to remove all of the calls to `serde_json::to_string()`
for the data-handling, thus enabling us to do (more or less):

```rust
Order {
    row.column,
    ...
}
```

as well as clean up the SQL for easier reading. This has mostly been
accomplished, with further refinements easily accomplished once the
upstream issues in `sqlx` are addressed. See #314 for issues we are
tracking.
refactor/no-log-handler
DelicioiusHair 3 years ago
parent
commit
5cf88a2b01
  1. 1
      Cargo.lock
  2. 2
      daemon/Cargo.toml
  3. 6
      daemon/migrations/20210903050345_create_cfd_and_order_tables.sql
  4. 292
      daemon/sqlx-data.json
  5. 636
      daemon/src/db.rs
  6. 19
      daemon/src/model.rs
  7. 5
      daemon/src/model/cfd.rs

1
Cargo.lock

@ -2616,6 +2616,7 @@ dependencies = [
"thiserror", "thiserror",
"tokio-stream", "tokio-stream",
"url", "url",
"uuid",
"webpki", "webpki",
"webpki-roots 0.21.1", "webpki-roots 0.21.1",
"whoami", "whoami",

2
daemon/Cargo.toml

@ -32,7 +32,7 @@ serde_json = "1"
serde_plain = "1" serde_plain = "1"
serde_with = { version = "1", features = ["macros"] } serde_with = { version = "1", features = ["macros"] }
sha2 = "0.9" sha2 = "0.9"
sqlx = { version = "0.5", features = ["offline"] } sqlx = { version = "0.5", features = ["offline", "sqlite", "uuid"] }
thiserror = "1" thiserror = "1"
time = { version = "0.3", features = ["serde"] } time = { version = "0.3", features = ["serde"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "net"] } tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "net"] }

6
daemon/migrations/20210903050345_create_cfd_and_order_tables.sql

@ -10,8 +10,10 @@ create table if not exists orders
max_quantity text not null, max_quantity text not null,
leverage integer not null, leverage integer not null,
liquidation_price text not null, liquidation_price text not null,
creation_timestamp text not null, creation_timestamp_seconds integer not null,
term text not null, creation_timestamp_nanoseconds integer not null,
term_seconds integer not null,
term_nanoseconds integer not null,
origin text not null, origin text not null,
oracle_event_id text not null oracle_event_id text not null
); );

292
daemon/sqlx-data.json

@ -1,100 +1,92 @@
{ {
"db": "SQLite", "db": "SQLite",
"50abbb297394739ec9d85917f8c32aa8bcfa0bfe140b24e9eeda4ce8d30d4f8d": { "0862abd1900c5c5ae4a145635c19aef1a563ea470530d83325839868ffacd7bf": {
"query": "\n select\n state\n from cfd_states\n where cfd_id = ?\n order by id desc\n limit 1;\n ", "query": "\n with ord as (\n select\n id as order_id,\n uuid,\n trading_pair,\n position,\n initial_price,\n min_quantity,\n max_quantity,\n leverage,\n liquidation_price,\n creation_timestamp_seconds as ts_secs,\n creation_timestamp_nanoseconds as ts_nanos,\n term_seconds as term_secs,\n term_nanoseconds as term_nanos,\n origin,\n oracle_event_id\n from orders\n ),\n\n cfd as (\n select\n id as cfd_id,\n order_id,\n quantity_usd\n from cfds\n ),\n\n tmp as (\n select\n id as state_id,\n cfd_id,\n state\n from cfd_states\n ),\n\n state as (\n select\n tmp.state,\n cfd.order_id,\n cfd.quantity_usd\n from tmp\n inner join cfd on tmp.cfd_id = cfd.cfd_id\n where tmp.state_id in (\n select max(state_id)\n from tmp\n group by state_id\n )\n )\n\n select\n ord.uuid as \"uuid: crate::model::cfd::OrderId\",\n ord.trading_pair as \"trading_pair: crate::model::TradingPair\",\n ord.position as \"position: crate::model::Position\",\n ord.initial_price,\n ord.min_quantity,\n ord.max_quantity,\n ord.leverage as \"leverage: crate::model::Leverage\",\n ord.liquidation_price,\n ord.ts_secs as \"ts_secs: i64\",\n ord.ts_nanos as \"ts_nanos: i32\",\n ord.term_secs as \"term_secs: i64\",\n ord.term_nanos as \"term_nanos: i32\",\n ord.origin as \"origin: crate::model::cfd::Origin\",\n ord.oracle_event_id,\n state.quantity_usd,\n state.state\n\n from ord\n inner join state on state.order_id = ord.order_id\n\n where ord.uuid = $1\n ",
"describe": { "describe": {
"columns": [ "columns": [
{ {
"name": "state", "name": "uuid: crate::model::cfd::OrderId",
"ordinal": 0, "ordinal": 0,
"type_info": "Text" "type_info": "Text"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false
]
}
}, },
"63c8fd1a1512b55c19feea463dbb6a0fdf98a325801ba8677e2a33fd3ee1ebb9": {
"query": "\n select\n orders.uuid as order_id,\n orders.initial_price as price,\n orders.min_quantity as min_quantity,\n orders.max_quantity as max_quantity,\n orders.leverage as leverage,\n orders.trading_pair as trading_pair,\n orders.position as position,\n orders.origin as origin,\n orders.liquidation_price as liquidation_price,\n orders.creation_timestamp as creation_timestamp,\n orders.term as term,\n orders.oracle_event_id,\n cfds.quantity_usd as quantity_usd,\n cfd_states.state as state\n from cfds as cfds\n inner join orders as orders on cfds.order_id = orders.id\n inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id\n where cfd_states.state in (\n select\n state\n from cfd_states\n where cfd_id = cfds.id\n order by id desc\n limit 1\n )\n ",
"describe": {
"columns": [
{ {
"name": "order_id", "name": "trading_pair: crate::model::TradingPair",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "price",
"ordinal": 1, "ordinal": 1,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "min_quantity", "name": "position: crate::model::Position",
"ordinal": 2, "ordinal": 2,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "max_quantity", "name": "initial_price",
"ordinal": 3, "ordinal": 3,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "leverage", "name": "min_quantity",
"ordinal": 4, "ordinal": 4,
"type_info": "Int64" "type_info": "Text"
}, },
{ {
"name": "trading_pair", "name": "max_quantity",
"ordinal": 5, "ordinal": 5,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "position", "name": "leverage: crate::model::Leverage",
"ordinal": 6, "ordinal": 6,
"type_info": "Text" "type_info": "Int64"
}, },
{ {
"name": "origin", "name": "liquidation_price",
"ordinal": 7, "ordinal": 7,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "liquidation_price", "name": "ts_secs: i64",
"ordinal": 8, "ordinal": 8,
"type_info": "Text" "type_info": "Int64"
}, },
{ {
"name": "creation_timestamp", "name": "ts_nanos: i32",
"ordinal": 9, "ordinal": 9,
"type_info": "Text" "type_info": "Int64"
}, },
{ {
"name": "term", "name": "term_secs: i64",
"ordinal": 10, "ordinal": 10,
"type_info": "Int64"
},
{
"name": "term_nanos: i32",
"ordinal": 11,
"type_info": "Int64"
},
{
"name": "origin: crate::model::cfd::Origin",
"ordinal": 12,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "oracle_event_id", "name": "oracle_event_id",
"ordinal": 11, "ordinal": 13,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "quantity_usd", "name": "quantity_usd",
"ordinal": 12, "ordinal": 14,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "state", "name": "state",
"ordinal": 13, "ordinal": 15,
"type_info": "Text" "type_info": "Text"
} }
], ],
"parameters": { "parameters": {
"Right": 0 "Right": 1
}, },
"nullable": [ "nullable": [
false, false,
@ -110,120 +102,112 @@
false, false,
false, false,
false, false,
false,
false,
false false
] ]
} }
}, },
"8e7571250da58b12f5884f17656e5966957c7798ea029c701a4fc43fd613f015": { "221a6283db798bacaba99e7e85130f9a8bbea1299d8cb99d272b1d478dc19775": {
"query": "\n select\n id\n from cfds\n where order_uuid = ?;\n ", "query": "\n select\n state\n from cfd_states\n where cfd_id = $1\n order by id desc\n limit 1;\n ",
"describe": { "describe": {
"columns": [ "columns": [
{ {
"name": "id", "name": "state",
"ordinal": 0, "ordinal": 0,
"type_info": "Int64" "type_info": "Text"
} }
], ],
"parameters": { "parameters": {
"Right": 1 "Right": 1
}, },
"nullable": [ "nullable": [
true false
] ]
} }
}, },
"a464a1feb12abadff8bfd5b2b3b7362f3846869c0702944b21737eff8f420be5": { "62c4b31272cc2c889ebc946f4eaa3160ef2cf8c272412a77e9339249468d14bf": {
"query": "\n insert into cfd_states (\n cfd_id,\n state\n ) values (?, ?);\n ", "query": "\n with ord as (\n select\n id as order_id,\n uuid,\n trading_pair,\n position,\n initial_price,\n min_quantity,\n max_quantity,\n leverage,\n liquidation_price,\n creation_timestamp_seconds as ts_secs,\n creation_timestamp_nanoseconds as ts_nanos,\n term_seconds as term_secs,\n term_nanoseconds as term_nanos,\n origin,\n oracle_event_id\n from orders\n ),\n\n cfd as (\n select\n id as cfd_id,\n order_id,\n quantity_usd as quantity_usd\n from cfds\n ),\n\n tmp as (\n select\n id as state_id,\n cfd_id,\n state\n from cfd_states\n ),\n\n state as (\n select\n tmp.state,\n cfd.order_id,\n cfd.quantity_usd\n from tmp\n inner join cfd on tmp.cfd_id = cfd.cfd_id\n where tmp.state_id in (\n select max(state_id)\n from tmp\n group by state_id\n )\n )\n\n select\n ord.uuid as \"uuid: crate::model::cfd::OrderId\",\n ord.trading_pair as \"trading_pair: crate::model::TradingPair\",\n ord.position as \"position: crate::model::Position\",\n ord.initial_price,\n ord.min_quantity,\n ord.max_quantity,\n ord.leverage as \"leverage: crate::model::Leverage\",\n ord.liquidation_price,\n ord.ts_secs as \"ts_secs: i64\",\n ord.ts_nanos as \"ts_nanos: i32\",\n ord.term_secs as \"term_secs: i64\",\n ord.term_nanos as \"term_nanos: i32\",\n ord.origin as \"origin: crate::model::cfd::Origin\",\n ord.oracle_event_id,\n state.quantity_usd,\n state.state\n\n from ord\n inner join state on state.order_id = ord.order_id\n\n where ord.oracle_event_id = $1\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 2
},
"nullable": []
}
},
"a59cf0824b5e4191c94c229086ce464d1b8084f65a6fafb165d27ce6df5b815b": {
"query": "\n insert into orders (\n uuid,\n trading_pair,\n position,\n initial_price,\n min_quantity,\n max_quantity,\n leverage,\n liquidation_price,\n creation_timestamp,\n term,\n origin,\n oracle_event_id\n ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 12
},
"nullable": []
}
},
"e0d3fe192c419be4fa65199d1e024ce8794e19a847be53038b7a8a33afb36bd2": {
"query": "\n select\n orders.uuid as order_id,\n orders.initial_price as price,\n orders.min_quantity as min_quantity,\n orders.max_quantity as max_quantity,\n orders.leverage as leverage,\n orders.trading_pair as trading_pair,\n orders.position as position,\n orders.origin as origin,\n orders.liquidation_price as liquidation_price,\n orders.creation_timestamp as creation_timestamp,\n orders.term as term,\n orders.oracle_event_id,\n cfds.quantity_usd as quantity_usd,\n cfd_states.state as state\n from cfds as cfds\n inner join orders as orders on cfds.order_id = orders.id\n inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id\n where cfd_states.state in (\n select\n state\n from cfd_states\n where cfd_id = cfds.id\n order by id desc\n limit 1\n )\n and orders.uuid = ?\n ",
"describe": { "describe": {
"columns": [ "columns": [
{ {
"name": "order_id", "name": "uuid: crate::model::cfd::OrderId",
"ordinal": 0, "ordinal": 0,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "price", "name": "trading_pair: crate::model::TradingPair",
"ordinal": 1, "ordinal": 1,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "min_quantity", "name": "position: crate::model::Position",
"ordinal": 2, "ordinal": 2,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "max_quantity", "name": "initial_price",
"ordinal": 3, "ordinal": 3,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "leverage", "name": "min_quantity",
"ordinal": 4, "ordinal": 4,
"type_info": "Int64" "type_info": "Text"
}, },
{ {
"name": "trading_pair", "name": "max_quantity",
"ordinal": 5, "ordinal": 5,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "position", "name": "leverage: crate::model::Leverage",
"ordinal": 6, "ordinal": 6,
"type_info": "Text" "type_info": "Int64"
}, },
{ {
"name": "origin", "name": "liquidation_price",
"ordinal": 7, "ordinal": 7,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "liquidation_price", "name": "ts_secs: i64",
"ordinal": 8, "ordinal": 8,
"type_info": "Text" "type_info": "Int64"
}, },
{ {
"name": "creation_timestamp", "name": "ts_nanos: i32",
"ordinal": 9, "ordinal": 9,
"type_info": "Text" "type_info": "Int64"
}, },
{ {
"name": "term", "name": "term_secs: i64",
"ordinal": 10, "ordinal": 10,
"type_info": "Int64"
},
{
"name": "term_nanos: i32",
"ordinal": 11,
"type_info": "Int64"
},
{
"name": "origin: crate::model::cfd::Origin",
"ordinal": 12,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "oracle_event_id", "name": "oracle_event_id",
"ordinal": 11, "ordinal": 13,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "quantity_usd", "name": "quantity_usd",
"ordinal": 12, "ordinal": 14,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "state", "name": "state",
"ordinal": 13, "ordinal": 15,
"type_info": "Text" "type_info": "Text"
} }
], ],
@ -244,81 +228,101 @@
false, false,
false, false,
false, false,
false,
false,
false false
] ]
} }
}, },
"e45a49eb1fb4cbddc046712d9e2df2db0c18c618f0fae2133e51d82eda6bec36": { "8cbe349911b35d8e79763d64b4f5813b4bd98f12e0bba5ada84d2cae8b08ef4f": {
"query": "\n select\n orders.uuid as order_id,\n orders.initial_price as price,\n orders.min_quantity as min_quantity,\n orders.max_quantity as max_quantity,\n orders.leverage as leverage,\n orders.trading_pair as trading_pair,\n orders.position as position,\n orders.origin as origin,\n orders.liquidation_price as liquidation_price,\n orders.creation_timestamp as creation_timestamp,\n orders.term as term,\n orders.oracle_event_id,\n cfds.quantity_usd as quantity_usd,\n cfd_states.state as state\n from cfds as cfds\n inner join orders as orders on cfds.order_id = orders.id\n inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id\n where cfd_states.state in (\n select\n state\n from cfd_states\n where cfd_id = cfds.id\n order by id desc\n limit 1\n )\n and orders.oracle_event_id = ?\n ", "query": "\n select\n id\n from cfds\n where order_uuid = $1;\n ",
"describe": { "describe": {
"columns": [ "columns": [
{ {
"name": "order_id", "name": "id",
"ordinal": 0,
"type_info": "Int64"
}
],
"parameters": {
"Right": 1
},
"nullable": [
true
]
}
},
"b708b7116aa14093e0e21ab8f87e52928a70649a62250902054ef6526f1d6f66": {
"query": "\n select\n uuid as \"uuid: crate::model::cfd::OrderId\",\n trading_pair as \"trading_pair: crate::model::TradingPair\",\n position as \"position: crate::model::Position\",\n initial_price,\n min_quantity,\n max_quantity,\n leverage as \"leverage: crate::model::Leverage\",\n liquidation_price,\n creation_timestamp_seconds as \"ts_secs: i64\",\n creation_timestamp_nanoseconds as \"ts_nanos: i32\",\n term_seconds as \"term_secs: i64\",\n term_nanoseconds as \"term_nanos: i32\",\n origin as \"origin: crate::model::cfd::Origin\",\n oracle_event_id\n\n from orders\n where uuid = $1\n ",
"describe": {
"columns": [
{
"name": "uuid: crate::model::cfd::OrderId",
"ordinal": 0, "ordinal": 0,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "price", "name": "trading_pair: crate::model::TradingPair",
"ordinal": 1, "ordinal": 1,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "min_quantity", "name": "position: crate::model::Position",
"ordinal": 2, "ordinal": 2,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "max_quantity", "name": "initial_price",
"ordinal": 3, "ordinal": 3,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "leverage", "name": "min_quantity",
"ordinal": 4, "ordinal": 4,
"type_info": "Int64" "type_info": "Text"
}, },
{ {
"name": "trading_pair", "name": "max_quantity",
"ordinal": 5, "ordinal": 5,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "position", "name": "leverage: crate::model::Leverage",
"ordinal": 6, "ordinal": 6,
"type_info": "Text" "type_info": "Int64"
}, },
{ {
"name": "origin", "name": "liquidation_price",
"ordinal": 7, "ordinal": 7,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "liquidation_price", "name": "ts_secs: i64",
"ordinal": 8, "ordinal": 8,
"type_info": "Text" "type_info": "Int64"
}, },
{ {
"name": "creation_timestamp", "name": "ts_nanos: i32",
"ordinal": 9, "ordinal": 9,
"type_info": "Text" "type_info": "Int64"
}, },
{ {
"name": "term", "name": "term_secs: i64",
"ordinal": 10, "ordinal": 10,
"type_info": "Text" "type_info": "Int64"
}, },
{ {
"name": "oracle_event_id", "name": "term_nanos: i32",
"ordinal": 11, "ordinal": 11,
"type_info": "Text" "type_info": "Int64"
}, },
{ {
"name": "quantity_usd", "name": "origin: crate::model::cfd::Origin",
"ordinal": 12, "ordinal": 12,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "state", "name": "oracle_event_id",
"ordinal": 13, "ordinal": 13,
"type_info": "Text" "type_info": "Text"
} }
@ -344,81 +348,99 @@
] ]
} }
}, },
"f3dce76f316212c91cb3402b0cef00f1c9adbef8519c54e9bdbd373aab946209": { "d8323d50427833c983088011597081023ac564d65bc459d26aa27549a49b1df8": {
"query": "\n select * from orders where uuid = ?;\n ", "query": "\n with ord as (\n select\n id as order_id,\n uuid,\n trading_pair,\n position,\n initial_price,\n min_quantity,\n max_quantity,\n leverage,\n liquidation_price,\n creation_timestamp_seconds as ts_secs,\n creation_timestamp_nanoseconds as ts_nanos,\n term_seconds as term_secs,\n term_nanoseconds as term_nanos,\n origin,\n oracle_event_id\n from orders\n ),\n\n cfd as (\n select\n id as cfd_id,\n order_id,\n quantity_usd as quantity_usd\n from cfds\n ),\n\n tmp as (\n select\n id as state_id,\n cfd_id,\n state\n from cfd_states\n ),\n\n state as (\n select\n tmp.state,\n cfd.order_id,\n cfd.quantity_usd\n from tmp\n inner join cfd on tmp.cfd_id = cfd.cfd_id\n where tmp.state_id in (\n select max(state_id)\n from tmp\n group by state_id\n )\n )\n\n select\n ord.uuid as \"uuid: crate::model::cfd::OrderId\",\n ord.trading_pair as \"trading_pair: crate::model::TradingPair\",\n ord.position as \"position: crate::model::Position\",\n ord.initial_price,\n ord.min_quantity,\n ord.max_quantity,\n ord.leverage as \"leverage: crate::model::Leverage\",\n ord.liquidation_price,\n ord.ts_secs as \"ts_secs: i64\",\n ord.ts_nanos as \"ts_nanos: i32\",\n ord.term_secs as \"term_secs: i64\",\n ord.term_nanos as \"term_nanos: i32\",\n ord.origin as \"origin: crate::model::cfd::Origin\",\n ord.oracle_event_id,\n state.quantity_usd,\n state.state\n\n from ord\n inner join state on state.order_id = ord.order_id\n ",
"describe": { "describe": {
"columns": [ "columns": [
{ {
"name": "id", "name": "uuid: crate::model::cfd::OrderId",
"ordinal": 0, "ordinal": 0,
"type_info": "Int64" "type_info": "Text"
}, },
{ {
"name": "uuid", "name": "trading_pair: crate::model::TradingPair",
"ordinal": 1, "ordinal": 1,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "trading_pair", "name": "position: crate::model::Position",
"ordinal": 2, "ordinal": 2,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "position", "name": "initial_price",
"ordinal": 3, "ordinal": 3,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "initial_price", "name": "min_quantity",
"ordinal": 4, "ordinal": 4,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "min_quantity", "name": "max_quantity",
"ordinal": 5, "ordinal": 5,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "max_quantity", "name": "leverage: crate::model::Leverage",
"ordinal": 6, "ordinal": 6,
"type_info": "Text" "type_info": "Int64"
}, },
{ {
"name": "leverage", "name": "liquidation_price",
"ordinal": 7, "ordinal": 7,
"type_info": "Int64" "type_info": "Text"
}, },
{ {
"name": "liquidation_price", "name": "ts_secs: i64",
"ordinal": 8, "ordinal": 8,
"type_info": "Text" "type_info": "Int64"
}, },
{ {
"name": "creation_timestamp", "name": "ts_nanos: i32",
"ordinal": 9, "ordinal": 9,
"type_info": "Text" "type_info": "Int64"
}, },
{ {
"name": "term", "name": "term_secs: i64",
"ordinal": 10, "ordinal": 10,
"type_info": "Text" "type_info": "Int64"
}, },
{ {
"name": "origin", "name": "term_nanos: i32",
"ordinal": 11, "ordinal": 11,
"type_info": "Int64"
},
{
"name": "origin: crate::model::cfd::Origin",
"ordinal": 12,
"type_info": "Text" "type_info": "Text"
}, },
{ {
"name": "oracle_event_id", "name": "oracle_event_id",
"ordinal": 12, "ordinal": 13,
"type_info": "Text"
},
{
"name": "quantity_usd",
"ordinal": 14,
"type_info": "Text"
},
{
"name": "state",
"ordinal": 15,
"type_info": "Text" "type_info": "Text"
} }
], ],
"parameters": { "parameters": {
"Right": 1 "Right": 0
}, },
"nullable": [ "nullable": [
true, false,
false,
false,
false,
false, false,
false, false,
false, false,
@ -434,22 +456,22 @@
] ]
} }
}, },
"fac3d990211ef9e5fa8bcd6e1d6e0c8a81652b98fb11f2686affd1593fba75fd": { "dde58d2118537f05941a79896639efb2e64f2bb6cf76f14c6c2eec20ec253160": {
"query": "\n insert into cfd_states (\n cfd_id,\n state\n ) values (?, ?);\n ", "query": "\n insert into cfds (\n order_id,\n order_uuid,\n quantity_usd\n )\n select\n id as order_id,\n uuid as order_uuid,\n $2 as quantity_usd\n from orders\n where uuid = $1;\n\n insert into cfd_states (\n cfd_id,\n state\n )\n select\n id as cfd_id,\n $3 as state\n from cfds\n order by id desc limit 1;\n ",
"describe": { "describe": {
"columns": [], "columns": [],
"parameters": { "parameters": {
"Right": 2 "Right": 3
}, },
"nullable": [] "nullable": []
} }
}, },
"fad32c267100e26f4fba80e4feb5ff45ee29c3a67bd378f6627b1f13ee45c573": { "faaf391b5dca70a87c5f8ac9346fd6d9dd1ae6b83d99a0311df463ecaaf8c301": {
"query": "\n insert into cfds (\n order_id,\n order_uuid,\n quantity_usd\n ) values (?, ?, ?);\n ", "query": "\n insert into cfd_states (\n cfd_id,\n state\n ) values ($1, $2);\n ",
"describe": { "describe": {
"columns": [], "columns": [],
"parameters": { "parameters": {
"Right": 3 "Right": 2
}, },
"nullable": [] "nullable": []
} }

636
daemon/src/db.rs

@ -1,11 +1,14 @@
use crate::model::cfd::{Cfd, CfdState, Order, OrderId, Origin}; use crate::model::cfd::{Cfd, CfdState, Order, OrderId};
use crate::model::{BitMexPriceEventId, Leverage, Position}; use crate::model::{BitMexPriceEventId, Usd};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use rocket_db_pools::sqlx; use rocket_db_pools::sqlx;
use rust_decimal::Decimal;
use sqlx::pool::PoolConnection; use sqlx::pool::PoolConnection;
use sqlx::{Acquire, Sqlite, SqlitePool}; use sqlx::{Sqlite, SqlitePool};
use std::convert::TryInto;
use std::mem; use std::mem;
use std::str::FromStr;
use std::time::SystemTime;
use time::{Duration, OffsetDateTime};
pub async fn run_migrations(pool: &SqlitePool) -> anyhow::Result<()> { pub async fn run_migrations(pool: &SqlitePool) -> anyhow::Result<()> {
sqlx::migrate!("./migrations").run(pool).await?; sqlx::migrate!("./migrations").run(pool).await?;
@ -13,36 +16,8 @@ pub async fn run_migrations(pool: &SqlitePool) -> anyhow::Result<()> {
} }
pub async fn insert_order(order: &Order, conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<()> { pub async fn insert_order(order: &Order, conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<()> {
let uuid = serde_json::to_string(&order.id).unwrap(); sqlx::query(
let trading_pair = serde_json::to_string(&order.trading_pair).unwrap(); r#"insert into orders (
let position = serde_json::to_string(&order.position).unwrap();
let initial_price = serde_json::to_string(&order.price).unwrap();
let min_quantity = serde_json::to_string(&order.min_quantity).unwrap();
let max_quantity = serde_json::to_string(&order.max_quantity).unwrap();
let leverage = order.leverage.0;
let liquidation_price = serde_json::to_string(&order.liquidation_price).unwrap();
let creation_timestamp = serde_json::to_string(&order.creation_timestamp).unwrap();
let term = serde_json::to_string(&order.term).unwrap();
let origin = serde_json::to_string(&order.origin).unwrap();
let oracle_event_id = order.oracle_event_id.to_string();
sqlx::query!(
r#"
insert into orders (
uuid,
trading_pair,
position,
initial_price,
min_quantity,
max_quantity,
leverage,
liquidation_price,
creation_timestamp,
term,
origin,
oracle_event_id
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
"#,
uuid, uuid,
trading_pair, trading_pair,
position, position,
@ -51,11 +26,38 @@ pub async fn insert_order(order: &Order, conn: &mut PoolConnection<Sqlite>) -> a
max_quantity, max_quantity,
leverage, leverage,
liquidation_price, liquidation_price,
creation_timestamp, creation_timestamp_seconds,
term, creation_timestamp_nanoseconds,
term_seconds,
term_nanoseconds,
origin, origin,
oracle_event_id oracle_event_id
) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)"#,
) )
.bind(&order.id)
.bind(&order.trading_pair)
.bind(&order.position)
.bind(&order.price.to_string())
.bind(&order.min_quantity.to_string())
.bind(&order.max_quantity.to_string())
.bind(order.leverage.0)
.bind(&order.liquidation_price.to_string())
.bind(
order
.creation_timestamp
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs() as i64,
)
.bind(
order
.creation_timestamp
.duration_since(SystemTime::UNIX_EPOCH)?
.subsec_nanos() as i32,
)
.bind(&order.term.whole_seconds())
.bind(&order.term.subsec_nanoseconds())
.bind(&order.origin)
.bind(&order.oracle_event_id.to_string())
.execute(conn) .execute(conn)
.await?; .await?;
@ -66,97 +68,83 @@ pub async fn load_order_by_id(
id: OrderId, id: OrderId,
conn: &mut PoolConnection<Sqlite>, conn: &mut PoolConnection<Sqlite>,
) -> anyhow::Result<Order> { ) -> anyhow::Result<Order> {
let uuid = serde_json::to_string(&id).unwrap();
let row = sqlx::query!( let row = sqlx::query!(
r#" r#"
select * from orders where uuid = ?; select
uuid as "uuid: crate::model::cfd::OrderId",
trading_pair as "trading_pair: crate::model::TradingPair",
position as "position: crate::model::Position",
initial_price,
min_quantity,
max_quantity,
leverage as "leverage: crate::model::Leverage",
liquidation_price,
creation_timestamp_seconds as "ts_secs: i64",
creation_timestamp_nanoseconds as "ts_nanos: i32",
term_seconds as "term_secs: i64",
term_nanoseconds as "term_nanos: i32",
origin as "origin: crate::model::cfd::Origin",
oracle_event_id
from orders
where uuid = $1
"#, "#,
uuid id
) )
.fetch_one(conn) .fetch_one(conn)
.await?; .await?;
let uuid = serde_json::from_str(row.uuid.as_str()).unwrap();
let trading_pair = serde_json::from_str(row.trading_pair.as_str()).unwrap();
let position = serde_json::from_str(row.position.as_str()).unwrap();
let initial_price = serde_json::from_str(row.initial_price.as_str()).unwrap();
let min_quantity = serde_json::from_str(row.min_quantity.as_str()).unwrap();
let max_quantity = serde_json::from_str(row.max_quantity.as_str()).unwrap();
let leverage = Leverage(row.leverage.try_into().unwrap());
let liquidation_price = serde_json::from_str(row.liquidation_price.as_str()).unwrap();
let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap();
let term = serde_json::from_str(row.term.as_str()).unwrap();
let origin = serde_json::from_str(row.origin.as_str()).unwrap();
Ok(Order { Ok(Order {
id: uuid, id: row.uuid,
trading_pair, trading_pair: row.trading_pair,
position, position: row.position,
price: initial_price, price: Usd(Decimal::from_str(&row.initial_price)?),
min_quantity, min_quantity: Usd(Decimal::from_str(&row.min_quantity)?),
max_quantity, max_quantity: Usd(Decimal::from_str(&row.max_quantity)?),
leverage, leverage: row.leverage,
liquidation_price, liquidation_price: Usd(Decimal::from_str(&row.liquidation_price)?),
creation_timestamp, creation_timestamp: convert_to_system_time(row.ts_secs, row.ts_nanos)?,
term, term: Duration::new(row.term_secs, row.term_nanos),
origin, origin: row.origin,
oracle_event_id: row.oracle_event_id.parse().unwrap(), oracle_event_id: row.oracle_event_id.parse::<BitMexPriceEventId>()?,
}) })
} }
pub async fn insert_cfd(cfd: Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<()> { pub async fn insert_cfd(cfd: Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<()> {
let mut tx = conn.begin().await?; let order_uuid = &cfd.order.id;
let quantity_usd = &cfd.quantity_usd.to_string();
let order_uuid = serde_json::to_string(&cfd.order.id)?; let state = serde_json::to_string(&cfd.state)?;
let order_row = sqlx::query!( sqlx::query!(
r#"
select * from orders where uuid = ?;
"#,
order_uuid
)
.fetch_one(&mut tx)
.await?;
let order_id = order_row.id;
let quantity_usd = serde_json::to_string(&cfd.quantity_usd)?;
let cfd_state = serde_json::to_string(&cfd.state)?;
// save cfd + state in a transaction to make sure the state is only inserted if the cfd was
// inserted
let cfd_id = sqlx::query!(
r#" r#"
insert into cfds ( insert into cfds (
order_id, order_id,
order_uuid, order_uuid,
quantity_usd quantity_usd
) values (?, ?, ?);
"#,
order_id,
order_uuid,
quantity_usd,
) )
.execute(&mut tx) select
.await? id as order_id,
.last_insert_rowid(); uuid as order_uuid,
$2 as quantity_usd
from orders
where uuid = $1;
sqlx::query!(
r#"
insert into cfd_states ( insert into cfd_states (
cfd_id, cfd_id,
state state
) values (?, ?); )
select
id as cfd_id,
$3 as state
from cfds
order by id desc limit 1;
"#, "#,
cfd_id, order_uuid,
cfd_state, quantity_usd,
state
) )
.execute(&mut tx) .execute(conn)
.await?; .await?;
tx.commit().await?;
Ok(()) Ok(())
} }
@ -187,7 +175,7 @@ pub async fn insert_new_cfd_state_by_order_id(
insert into cfd_states ( insert into cfd_states (
cfd_id, cfd_id,
state state
) values (?, ?); ) values ($1, $2);
"#, "#,
cfd_id, cfd_id,
cfd_state, cfd_state,
@ -203,14 +191,12 @@ async fn load_cfd_id_by_order_uuid(
order_uuid: OrderId, order_uuid: OrderId,
conn: &mut PoolConnection<Sqlite>, conn: &mut PoolConnection<Sqlite>,
) -> anyhow::Result<i64> { ) -> anyhow::Result<i64> {
let order_uuid = serde_json::to_string(&order_uuid)?;
let cfd_id = sqlx::query!( let cfd_id = sqlx::query!(
r#" r#"
select select
id id
from cfds from cfds
where order_uuid = ?; where order_uuid = $1;
"#, "#,
order_uuid order_uuid
) )
@ -232,7 +218,7 @@ async fn load_latest_cfd_state(
select select
state state
from cfd_states from cfd_states
where cfd_id = ? where cfd_id = $1
order by id desc order by id desc
limit 1; limit 1;
"#, "#,
@ -250,159 +236,215 @@ pub async fn load_cfd_by_order_id(
order_id: OrderId, order_id: OrderId,
conn: &mut PoolConnection<Sqlite>, conn: &mut PoolConnection<Sqlite>,
) -> Result<Cfd> { ) -> Result<Cfd> {
let order_uuid = serde_json::to_string(&order_id)?;
let row = sqlx::query!( let row = sqlx::query!(
r#" r#"
with ord as (
select select
orders.uuid as order_id, id as order_id,
orders.initial_price as price, uuid,
orders.min_quantity as min_quantity, trading_pair,
orders.max_quantity as max_quantity, position,
orders.leverage as leverage, initial_price,
orders.trading_pair as trading_pair, min_quantity,
orders.position as position, max_quantity,
orders.origin as origin, leverage,
orders.liquidation_price as liquidation_price, liquidation_price,
orders.creation_timestamp as creation_timestamp, creation_timestamp_seconds as ts_secs,
orders.term as term, creation_timestamp_nanoseconds as ts_nanos,
orders.oracle_event_id, term_seconds as term_secs,
cfds.quantity_usd as quantity_usd, term_nanoseconds as term_nanos,
cfd_states.state as state origin,
from cfds as cfds oracle_event_id
inner join orders as orders on cfds.order_id = orders.id from orders
inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id ),
where cfd_states.state in (
cfd as (
select select
id as cfd_id,
order_id,
quantity_usd
from cfds
),
tmp as (
select
id as state_id,
cfd_id,
state state
from cfd_states from cfd_states
where cfd_id = cfds.id ),
order by id desc
limit 1 state as (
select
tmp.state,
cfd.order_id,
cfd.quantity_usd
from tmp
inner join cfd on tmp.cfd_id = cfd.cfd_id
where tmp.state_id in (
select max(state_id)
from tmp
group by state_id
)
) )
and orders.uuid = ?
select
ord.uuid as "uuid: crate::model::cfd::OrderId",
ord.trading_pair as "trading_pair: crate::model::TradingPair",
ord.position as "position: crate::model::Position",
ord.initial_price,
ord.min_quantity,
ord.max_quantity,
ord.leverage as "leverage: crate::model::Leverage",
ord.liquidation_price,
ord.ts_secs as "ts_secs: i64",
ord.ts_nanos as "ts_nanos: i32",
ord.term_secs as "term_secs: i64",
ord.term_nanos as "term_nanos: i32",
ord.origin as "origin: crate::model::cfd::Origin",
ord.oracle_event_id,
state.quantity_usd,
state.state
from ord
inner join state on state.order_id = ord.order_id
where ord.uuid = $1
"#, "#,
order_uuid order_id
) )
.fetch_one(conn) .fetch_one(conn)
.await?; .await?;
let order_id = serde_json::from_str(row.order_id.as_str()).unwrap();
let trading_pair = serde_json::from_str(row.trading_pair.as_str()).unwrap();
let position: Position = serde_json::from_str(row.position.as_str()).unwrap();
let price = serde_json::from_str(row.price.as_str()).unwrap();
let min_quantity = serde_json::from_str(row.min_quantity.as_str()).unwrap();
let max_quantity = serde_json::from_str(row.max_quantity.as_str()).unwrap();
let leverage = Leverage(row.leverage.try_into().unwrap());
let liquidation_price = serde_json::from_str(row.liquidation_price.as_str()).unwrap();
let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap();
let term = serde_json::from_str(row.term.as_str()).unwrap();
let origin: Origin = serde_json::from_str(row.origin.as_str()).unwrap();
let oracle_event_id = row.oracle_event_id.parse().unwrap();
let quantity = serde_json::from_str(row.quantity_usd.as_str()).unwrap();
let latest_state = serde_json::from_str(row.state.as_str()).unwrap();
let order = Order { let order = Order {
id: order_id, id: row.uuid,
trading_pair, trading_pair: row.trading_pair,
position, position: row.position,
price, price: Usd(Decimal::from_str(&row.initial_price)?),
min_quantity, min_quantity: Usd(Decimal::from_str(&row.min_quantity)?),
max_quantity, max_quantity: Usd(Decimal::from_str(&row.max_quantity)?),
leverage, leverage: row.leverage,
liquidation_price, liquidation_price: Usd(Decimal::from_str(&row.liquidation_price)?),
creation_timestamp, creation_timestamp: convert_to_system_time(row.ts_secs, row.ts_nanos)?,
term, term: Duration::new(row.term_secs, row.term_nanos),
origin, origin: row.origin,
oracle_event_id, oracle_event_id: row.oracle_event_id.parse::<BitMexPriceEventId>()?,
}; };
// TODO:
// still have the use of serde_json::from_str() here, which will be dealt with
// via https://github.com/comit-network/hermes/issues/290
Ok(Cfd { Ok(Cfd {
order, order,
quantity_usd: quantity, quantity_usd: Usd(Decimal::from_str(&row.quantity_usd)?),
state: latest_state, state: serde_json::from_str(row.state.as_str())?,
}) })
} }
/// Loads all CFDs with the latest state as the CFD state /// Loads all CFDs with the latest state as the CFD state
pub async fn load_all_cfds(conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<Vec<Cfd>> { pub async fn load_all_cfds(conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<Vec<Cfd>> {
// TODO: Could be optimized with something like but not sure it's worth the complexity:
let rows = sqlx::query!( let rows = sqlx::query!(
r#" r#"
with ord as (
select
id as order_id,
uuid,
trading_pair,
position,
initial_price,
min_quantity,
max_quantity,
leverage,
liquidation_price,
creation_timestamp_seconds as ts_secs,
creation_timestamp_nanoseconds as ts_nanos,
term_seconds as term_secs,
term_nanoseconds as term_nanos,
origin,
oracle_event_id
from orders
),
cfd as (
select select
orders.uuid as order_id, id as cfd_id,
orders.initial_price as price, order_id,
orders.min_quantity as min_quantity, quantity_usd as quantity_usd
orders.max_quantity as max_quantity, from cfds
orders.leverage as leverage, ),
orders.trading_pair as trading_pair,
orders.position as position, tmp as (
orders.origin as origin,
orders.liquidation_price as liquidation_price,
orders.creation_timestamp as creation_timestamp,
orders.term as term,
orders.oracle_event_id,
cfds.quantity_usd as quantity_usd,
cfd_states.state as state
from cfds as cfds
inner join orders as orders on cfds.order_id = orders.id
inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id
where cfd_states.state in (
select select
id as state_id,
cfd_id,
state state
from cfd_states from cfd_states
where cfd_id = cfds.id ),
order by id desc
limit 1 state as (
select
tmp.state,
cfd.order_id,
cfd.quantity_usd
from tmp
inner join cfd on tmp.cfd_id = cfd.cfd_id
where tmp.state_id in (
select max(state_id)
from tmp
group by state_id
)
) )
select
ord.uuid as "uuid: crate::model::cfd::OrderId",
ord.trading_pair as "trading_pair: crate::model::TradingPair",
ord.position as "position: crate::model::Position",
ord.initial_price,
ord.min_quantity,
ord.max_quantity,
ord.leverage as "leverage: crate::model::Leverage",
ord.liquidation_price,
ord.ts_secs as "ts_secs: i64",
ord.ts_nanos as "ts_nanos: i32",
ord.term_secs as "term_secs: i64",
ord.term_nanos as "term_nanos: i32",
ord.origin as "origin: crate::model::cfd::Origin",
ord.oracle_event_id,
state.quantity_usd,
state.state
from ord
inner join state on state.order_id = ord.order_id
"# "#
) )
.fetch_all(conn) .fetch_all(conn)
.await?; .await?;
let cfds = rows let cfds = rows
.iter() .into_iter()
.map(|row| { .map(|row| {
let order_id = serde_json::from_str(row.order_id.as_str()).unwrap();
let trading_pair = serde_json::from_str(row.trading_pair.as_str()).unwrap();
let position: Position = serde_json::from_str(row.position.as_str()).unwrap();
let price = serde_json::from_str(row.price.as_str()).unwrap();
let min_quantity = serde_json::from_str(row.min_quantity.as_str()).unwrap();
let max_quantity = serde_json::from_str(row.max_quantity.as_str()).unwrap();
let leverage = Leverage(row.leverage.try_into().unwrap());
let liquidation_price = serde_json::from_str(row.liquidation_price.as_str()).unwrap();
let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap();
let term = serde_json::from_str(row.term.as_str()).unwrap();
let origin: Origin = serde_json::from_str(row.origin.as_str()).unwrap();
let oracle_event_id = row.oracle_event_id.clone().parse().unwrap();
let quantity = serde_json::from_str(row.quantity_usd.as_str()).unwrap();
let latest_state = serde_json::from_str(row.state.as_str()).unwrap();
let order = Order { let order = Order {
id: order_id, id: row.uuid,
trading_pair, trading_pair: row.trading_pair,
position, position: row.position,
price, price: Usd(Decimal::from_str(&row.initial_price)?),
min_quantity, min_quantity: Usd(Decimal::from_str(&row.min_quantity)?),
max_quantity, max_quantity: Usd(Decimal::from_str(&row.max_quantity)?),
leverage, leverage: row.leverage,
liquidation_price, liquidation_price: Usd(Decimal::from_str(&row.liquidation_price)?),
creation_timestamp, creation_timestamp: convert_to_system_time(row.ts_secs, row.ts_nanos)?,
term, term: Duration::new(row.term_secs, row.term_nanos),
origin, origin: row.origin,
oracle_event_id, oracle_event_id: row.oracle_event_id.parse::<BitMexPriceEventId>()?,
}; };
Cfd { Ok(Cfd {
order, order,
quantity_usd: quantity, quantity_usd: Usd(Decimal::from_str(&row.quantity_usd)?),
state: latest_state, state: serde_json::from_str(row.state.as_str())?,
} })
}) })
.collect(); .collect::<Result<Vec<_>>>()?;
Ok(cfds) Ok(cfds)
} }
@ -412,88 +454,124 @@ pub async fn load_cfds_by_oracle_event_id(
oracle_event_id: BitMexPriceEventId, oracle_event_id: BitMexPriceEventId,
conn: &mut PoolConnection<Sqlite>, conn: &mut PoolConnection<Sqlite>,
) -> anyhow::Result<Vec<Cfd>> { ) -> anyhow::Result<Vec<Cfd>> {
let oracle_event_id = oracle_event_id.to_string(); let event_id = oracle_event_id.to_string();
let rows = sqlx::query!( let rows = sqlx::query!(
r#" r#"
with ord as (
select select
orders.uuid as order_id, id as order_id,
orders.initial_price as price, uuid,
orders.min_quantity as min_quantity, trading_pair,
orders.max_quantity as max_quantity, position,
orders.leverage as leverage, initial_price,
orders.trading_pair as trading_pair, min_quantity,
orders.position as position, max_quantity,
orders.origin as origin, leverage,
orders.liquidation_price as liquidation_price, liquidation_price,
orders.creation_timestamp as creation_timestamp, creation_timestamp_seconds as ts_secs,
orders.term as term, creation_timestamp_nanoseconds as ts_nanos,
orders.oracle_event_id, term_seconds as term_secs,
cfds.quantity_usd as quantity_usd, term_nanoseconds as term_nanos,
cfd_states.state as state origin,
from cfds as cfds oracle_event_id
inner join orders as orders on cfds.order_id = orders.id from orders
inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id ),
where cfd_states.state in (
cfd as (
select
id as cfd_id,
order_id,
quantity_usd as quantity_usd
from cfds
),
tmp as (
select select
id as state_id,
cfd_id,
state state
from cfd_states from cfd_states
where cfd_id = cfds.id ),
order by id desc
limit 1 state as (
select
tmp.state,
cfd.order_id,
cfd.quantity_usd
from tmp
inner join cfd on tmp.cfd_id = cfd.cfd_id
where tmp.state_id in (
select max(state_id)
from tmp
group by state_id
)
) )
and orders.oracle_event_id = ?
select
ord.uuid as "uuid: crate::model::cfd::OrderId",
ord.trading_pair as "trading_pair: crate::model::TradingPair",
ord.position as "position: crate::model::Position",
ord.initial_price,
ord.min_quantity,
ord.max_quantity,
ord.leverage as "leverage: crate::model::Leverage",
ord.liquidation_price,
ord.ts_secs as "ts_secs: i64",
ord.ts_nanos as "ts_nanos: i32",
ord.term_secs as "term_secs: i64",
ord.term_nanos as "term_nanos: i32",
ord.origin as "origin: crate::model::cfd::Origin",
ord.oracle_event_id,
state.quantity_usd,
state.state
from ord
inner join state on state.order_id = ord.order_id
where ord.oracle_event_id = $1
"#, "#,
oracle_event_id event_id
) )
.fetch_all(conn) .fetch_all(conn)
.await?; .await?;
let cfds = rows let cfds = rows
.iter() .into_iter()
.map(|row| { .map(|row| {
let order_id = serde_json::from_str(row.order_id.as_str()).unwrap();
let trading_pair = serde_json::from_str(row.trading_pair.as_str()).unwrap();
let position: Position = serde_json::from_str(row.position.as_str()).unwrap();
let price = serde_json::from_str(row.price.as_str()).unwrap();
let min_quantity = serde_json::from_str(row.min_quantity.as_str()).unwrap();
let max_quantity = serde_json::from_str(row.max_quantity.as_str()).unwrap();
let leverage = Leverage(row.leverage.try_into().unwrap());
let liquidation_price = serde_json::from_str(row.liquidation_price.as_str()).unwrap();
let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap();
let term = serde_json::from_str(row.term.as_str()).unwrap();
let origin: Origin = serde_json::from_str(row.origin.as_str()).unwrap();
let oracle_event_id = row.oracle_event_id.parse().unwrap();
let quantity = serde_json::from_str(row.quantity_usd.as_str()).unwrap();
let latest_state = serde_json::from_str(row.state.as_str()).unwrap();
let order = Order { let order = Order {
id: order_id, id: row.uuid,
trading_pair, trading_pair: row.trading_pair,
position, position: row.position,
price, price: Usd(Decimal::from_str(&row.initial_price)?),
min_quantity, min_quantity: Usd(Decimal::from_str(&row.min_quantity)?),
max_quantity, max_quantity: Usd(Decimal::from_str(&row.max_quantity)?),
leverage, leverage: row.leverage,
liquidation_price, liquidation_price: Usd(Decimal::from_str(&row.liquidation_price)?),
creation_timestamp, creation_timestamp: convert_to_system_time(row.ts_secs, row.ts_nanos)?,
term, term: Duration::new(row.term_secs, row.term_nanos),
origin, origin: row.origin,
oracle_event_id, oracle_event_id: row.oracle_event_id.parse::<BitMexPriceEventId>()?,
}; };
Cfd { Ok(Cfd {
order, order,
quantity_usd: quantity, quantity_usd: Usd(Decimal::from_str(&row.quantity_usd)?),
state: latest_state, state: serde_json::from_str(row.state.as_str())?,
}
}) })
.collect(); })
.collect::<Result<Vec<_>>>()?;
Ok(cfds) Ok(cfds)
} }
fn convert_to_system_time(row_secs: i64, row_nanos: i32) -> Result<SystemTime> {
let secs = row_secs as i128;
let nanos = row_nanos as i128;
let offset_dt = OffsetDateTime::from_unix_timestamp_nanos(secs * 1_000_000_000 + nanos)?;
Ok(SystemTime::from(offset_dt))
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::fs::File; use std::fs::File;
@ -506,7 +584,7 @@ mod tests {
use time::OffsetDateTime; use time::OffsetDateTime;
use crate::db::insert_order; use crate::db::insert_order;
use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Order}; use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Order, Origin};
use crate::model::Usd; use crate::model::Usd;
use super::*; use super::*;
@ -642,7 +720,7 @@ mod tests {
.unwrap(); .unwrap();
let cfds_from_db = load_all_cfds(&mut conn).await.unwrap(); let cfds_from_db = load_all_cfds(&mut conn).await.unwrap();
let cfd_from_db = cfds_from_db.first().unwrap().clone(); let cfd_from_db = cfds_from_db.last().unwrap().clone();
assert_eq!(cfd, cfd_from_db) assert_eq!(cfd, cfd_from_db)
} }

19
daemon/src/model.rs

@ -72,15 +72,16 @@ impl From<Decimal> for Percent {
} }
} }
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, sqlx::Type)]
#[sqlx(transparent)]
pub struct Leverage(pub u8); pub struct Leverage(pub u8);
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, sqlx::Type)]
pub enum TradingPair { pub enum TradingPair {
BtcUsd, BtcUsd,
} }
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, sqlx::Type)]
pub enum Position { pub enum Position {
Buy, Buy,
Sell, Sell,
@ -109,7 +110,17 @@ pub struct WalletInfo {
} }
#[derive( #[derive(
Debug, Clone, Copy, SerializeDisplay, DeserializeFromStr, PartialEq, Eq, Hash, PartialOrd, Ord, Debug,
Clone,
Copy,
SerializeDisplay,
DeserializeFromStr,
PartialEq,
Eq,
Hash,
PartialOrd,
Ord,
sqlx::Type,
)] )]
pub struct BitMexPriceEventId { pub struct BitMexPriceEventId {
/// The timestamp this price event refers to. /// The timestamp this price event refers to.

5
daemon/src/model/cfd.rs

@ -19,7 +19,8 @@ use std::time::SystemTime;
use time::Duration; use time::Duration;
use uuid::Uuid; use uuid::Uuid;
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] #[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, sqlx::Type)]
#[sqlx(transparent)]
pub struct OrderId(Uuid); pub struct OrderId(Uuid);
impl Default for OrderId { impl Default for OrderId {
@ -45,7 +46,7 @@ impl<'v> FromParam<'v> for OrderId {
// TODO: Could potentially remove this and use the Role in the Order instead // TODO: Could potentially remove this and use the Role in the Order instead
/// Origin of the order /// Origin of the order
#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, sqlx::Type)]
pub enum Origin { pub enum Origin {
Ours, Ours,
Theirs, Theirs,

Loading…
Cancel
Save