From fc2b5fe4823a49b2b07b3be36deed26d6442f520 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 18 Oct 2021 10:35:18 +1100 Subject: [PATCH 1/3] Sync with Olivia every 5 seconds We are only making requests to Olivia that are absolutely necessary, i.e. only fetch attestations when they are likely ready and only fetch attestations that we definitely need. As a result, we can trigger the sync much more frequent. Fixes #349. --- daemon/src/maker.rs | 2 +- daemon/src/taker.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 9ebbb7c..c5bf8d2 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -257,7 +257,7 @@ async fn main() -> Result<()> { tokio::spawn( oracle_actor_context - .notify_interval(Duration::from_secs(60), || oracle::Sync) + .notify_interval(Duration::from_secs(5), || oracle::Sync) .unwrap(), ); let actor = fan_out::Actor::new(&[&cfd_maker_actor_inbox, &monitor_actor_address]) diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 967dc79..bc2acef 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -251,7 +251,7 @@ async fn main() -> Result<()> { tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); tokio::spawn( oracle_actor_context - .notify_interval(Duration::from_secs(60), || oracle::Sync) + .notify_interval(Duration::from_secs(5), || oracle::Sync) .unwrap(), ); let actor = fan_out::Actor::new(&[&cfd_actor_inbox, &monitor_actor_address]) From 6d033264e80025e09d9376239157ea01a7d2b43e Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 18 Oct 2021 10:46:48 +1100 Subject: [PATCH 2/3] Fetch announcement in case it is not present but requested This will make sure that if a user retries an operation, the announcement is there. --- daemon/src/oracle.rs | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/daemon/src/oracle.rs b/daemon/src/oracle.rs index 153cf8c..3c5f63b 100644 --- a/daemon/src/oracle.rs +++ b/daemon/src/oracle.rs @@ -224,13 +224,20 @@ impl xtra::Handler for Actor { msg: GetAnnouncement, _ctx: &mut xtra::Context, ) -> Option { - self.announcements - .get_key_value(&msg.0) - .map(|(id, (time, nonce_pks))| Announcement { - id: *id, - expected_outcome_time: *time, - nonce_pks: nonce_pks.clone(), - }) + let announcement = + self.announcements + .get_key_value(&msg.0) + .map(|(id, (time, nonce_pks))| Announcement { + id: *id, + expected_outcome_time: *time, + nonce_pks: nonce_pks.clone(), + }); + + if announcement.is_none() { + self.pending_announcements.insert(msg.0); + } + + announcement } } From 6d6c40436d8d5185508769613ad2627f7ff609d8 Mon Sep 17 00:00:00 2001 From: DelicioiusHair Date: Thu, 14 Oct 2021 15:35:23 +1000 Subject: [PATCH 3/3] Fix up SQL queries This PR does a few things: * cleans up the SQL to make the queries clearer in terms of intent, as well as eliminating the use of an extra transaction in some write queries. * adds some additional testing * (mostly) eliminates the use of `serde_json::to_string()`, making the data columns behave in a more sane manner --- Cargo.lock | 1 + daemon/Cargo.toml | 2 +- ...0903050345_create_cfd_and_order_tables.sql | 28 +- daemon/sqlx-data.json | 286 +++---- daemon/src/db.rs | 697 ++++++++++-------- daemon/src/model.rs | 7 +- daemon/src/model/cfd.rs | 5 +- 7 files changed, 552 insertions(+), 474 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d8f25fa..683e2f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2615,6 +2615,7 @@ dependencies = [ "thiserror", "tokio-stream", "url", + "uuid", "webpki", "webpki-roots 0.21.1", "whoami", diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml index 000c067..fbd611b 100644 --- a/daemon/Cargo.toml +++ b/daemon/Cargo.toml @@ -32,7 +32,7 @@ serde_json = "1" serde_plain = "1" serde_with = { version = "1", features = ["macros"] } sha2 = "0.9" -sqlx = { version = "0.5", features = ["offline"] } +sqlx = { version = "0.5", features = ["offline", "sqlite", "uuid"] } thiserror = "1" time = { version = "0.3", features = ["serde"] } tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "net"] } diff --git a/daemon/migrations/20210903050345_create_cfd_and_order_tables.sql b/daemon/migrations/20210903050345_create_cfd_and_order_tables.sql index a4904a6..f9a08f3 100644 --- a/daemon/migrations/20210903050345_create_cfd_and_order_tables.sql +++ b/daemon/migrations/20210903050345_create_cfd_and_order_tables.sql @@ -1,19 +1,21 @@ -- todo: Decimal is had to deserialize as number so we use text create table if not exists orders ( - id integer primary key autoincrement, - uuid text unique not null, - trading_pair text not null, - position text not null, - initial_price text not null, - min_quantity text not null, - max_quantity text not null, - leverage integer not null, - liquidation_price text not null, - creation_timestamp text not null, - term text not null, - origin text not null, - oracle_event_id text not null + id integer primary key autoincrement, + uuid text unique not null, + trading_pair text not null, + position text not null, + initial_price text not null, + min_quantity text not null, + max_quantity text not null, + leverage integer not null, + liquidation_price text not null, + creation_timestamp_seconds integer not null, + creation_timestamp_nanoseconds integer not null, + term_seconds integer not null, + term_nanoseconds integer not null, + origin text not null, + oracle_event_id text not null ); create unique index if not exists orders_uuid diff --git a/daemon/sqlx-data.json b/daemon/sqlx-data.json index 83cf7c2..774e8e0 100644 --- a/daemon/sqlx-data.json +++ b/daemon/sqlx-data.json @@ -1,7 +1,7 @@ { "db": "SQLite", - "50abbb297394739ec9d85917f8c32aa8bcfa0bfe140b24e9eeda4ce8d30d4f8d": { - "query": "\n select\n state\n from cfd_states\n where cfd_id = ?\n order by id desc\n limit 1;\n ", + "221a6283db798bacaba99e7e85130f9a8bbea1299d8cb99d272b1d478dc19775": { + "query": "\n select\n state\n from cfd_states\n where cfd_id = $1\n order by id desc\n limit 1;\n ", "describe": { "columns": [ { @@ -18,78 +18,88 @@ ] } }, - "63c8fd1a1512b55c19feea463dbb6a0fdf98a325801ba8677e2a33fd3ee1ebb9": { - "query": "\n select\n orders.uuid as order_id,\n orders.initial_price as price,\n orders.min_quantity as min_quantity,\n orders.max_quantity as max_quantity,\n orders.leverage as leverage,\n orders.trading_pair as trading_pair,\n orders.position as position,\n orders.origin as origin,\n orders.liquidation_price as liquidation_price,\n orders.creation_timestamp as creation_timestamp,\n orders.term as term,\n orders.oracle_event_id,\n cfds.quantity_usd as quantity_usd,\n cfd_states.state as state\n from cfds as cfds\n inner join orders as orders on cfds.order_id = orders.id\n inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id\n where cfd_states.state in (\n select\n state\n from cfd_states\n where cfd_id = cfds.id\n order by id desc\n limit 1\n )\n ", + "2687e30ae7feb70b443b0de40712a0233c47afbd429da580df72c08286ce7008": { + "query": "\n with ord as (\n select\n id as order_id,\n uuid,\n trading_pair,\n position,\n initial_price,\n min_quantity,\n max_quantity,\n leverage,\n liquidation_price,\n creation_timestamp_seconds as ts_secs,\n creation_timestamp_nanoseconds as ts_nanos,\n term_seconds as term_secs,\n term_nanoseconds as term_nanos,\n origin,\n oracle_event_id\n from orders\n ),\n\n cfd as (\n select\n ord.order_id,\n id as cfd_id,\n quantity_usd\n from cfds\n inner join ord on ord.order_id = id\n ),\n\n state as (\n select\n id as state_id,\n cfd.order_id,\n cfd.quantity_usd,\n state\n from cfd_states\n inner join cfd on cfd.cfd_id = cfd_states.cfd_id\n where id in (\n select\n max(id) as id\n from cfd_states\n group by (cfd_id)\n )\n )\n\n select\n ord.uuid as \"uuid: crate::model::cfd::OrderId\",\n ord.trading_pair as \"trading_pair: crate::model::TradingPair\",\n ord.position as \"position: crate::model::Position\",\n ord.initial_price,\n ord.min_quantity,\n ord.max_quantity,\n ord.leverage as \"leverage: crate::model::Leverage\",\n ord.liquidation_price,\n ord.ts_secs as \"ts_secs: i64\",\n ord.ts_nanos as \"ts_nanos: i32\",\n ord.term_secs as \"term_secs: i64\",\n ord.term_nanos as \"term_nanos: i32\",\n ord.origin as \"origin: crate::model::cfd::Origin\",\n ord.oracle_event_id,\n state.quantity_usd,\n state.state\n\n from ord\n inner join state on state.order_id = ord.order_id\n ", "describe": { "columns": [ { - "name": "order_id", + "name": "uuid: crate::model::cfd::OrderId", "ordinal": 0, "type_info": "Text" }, { - "name": "price", + "name": "trading_pair: crate::model::TradingPair", "ordinal": 1, "type_info": "Text" }, { - "name": "min_quantity", + "name": "position: crate::model::Position", "ordinal": 2, "type_info": "Text" }, { - "name": "max_quantity", + "name": "initial_price", "ordinal": 3, "type_info": "Text" }, { - "name": "leverage", + "name": "min_quantity", "ordinal": 4, - "type_info": "Int64" + "type_info": "Text" }, { - "name": "trading_pair", + "name": "max_quantity", "ordinal": 5, "type_info": "Text" }, { - "name": "position", + "name": "leverage: crate::model::Leverage", "ordinal": 6, - "type_info": "Text" + "type_info": "Int64" }, { - "name": "origin", + "name": "liquidation_price", "ordinal": 7, "type_info": "Text" }, { - "name": "liquidation_price", + "name": "ts_secs: i64", "ordinal": 8, - "type_info": "Text" + "type_info": "Int64" }, { - "name": "creation_timestamp", + "name": "ts_nanos: i32", "ordinal": 9, - "type_info": "Text" + "type_info": "Int64" }, { - "name": "term", + "name": "term_secs: i64", "ordinal": 10, + "type_info": "Int64" + }, + { + "name": "term_nanos: i32", + "ordinal": 11, + "type_info": "Int64" + }, + { + "name": "origin: crate::model::cfd::Origin", + "ordinal": 12, "type_info": "Text" }, { "name": "oracle_event_id", - "ordinal": 11, + "ordinal": 13, "type_info": "Text" }, { "name": "quantity_usd", - "ordinal": 12, + "ordinal": 14, "type_info": "Text" }, { "name": "state", - "ordinal": 13, + "ordinal": 15, "type_info": "Text" } ], @@ -110,120 +120,94 @@ false, false, false, + false, + false, false ] } }, - "8e7571250da58b12f5884f17656e5966957c7798ea029c701a4fc43fd613f015": { - "query": "\n select\n id\n from cfds\n where order_uuid = ?;\n ", - "describe": { - "columns": [ - { - "name": "id", - "ordinal": 0, - "type_info": "Int64" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [ - true - ] - } - }, - "a464a1feb12abadff8bfd5b2b3b7362f3846869c0702944b21737eff8f420be5": { - "query": "\n insert into cfd_states (\n cfd_id,\n state\n ) values (?, ?);\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 2 - }, - "nullable": [] - } - }, - "a59cf0824b5e4191c94c229086ce464d1b8084f65a6fafb165d27ce6df5b815b": { - "query": "\n insert into orders (\n uuid,\n trading_pair,\n position,\n initial_price,\n min_quantity,\n max_quantity,\n leverage,\n liquidation_price,\n creation_timestamp,\n term,\n origin,\n oracle_event_id\n ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 12 - }, - "nullable": [] - } - }, - "e0d3fe192c419be4fa65199d1e024ce8794e19a847be53038b7a8a33afb36bd2": { - "query": "\n select\n orders.uuid as order_id,\n orders.initial_price as price,\n orders.min_quantity as min_quantity,\n orders.max_quantity as max_quantity,\n orders.leverage as leverage,\n orders.trading_pair as trading_pair,\n orders.position as position,\n orders.origin as origin,\n orders.liquidation_price as liquidation_price,\n orders.creation_timestamp as creation_timestamp,\n orders.term as term,\n orders.oracle_event_id,\n cfds.quantity_usd as quantity_usd,\n cfd_states.state as state\n from cfds as cfds\n inner join orders as orders on cfds.order_id = orders.id\n inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id\n where cfd_states.state in (\n select\n state\n from cfd_states\n where cfd_id = cfds.id\n order by id desc\n limit 1\n )\n and orders.uuid = ?\n ", + "296b741deaf0219b2f2d2f2a65128839796e68f1edf4d8f6488d7f7dceb63923": { + "query": "\n with ord as (\n select\n id as order_id,\n uuid,\n trading_pair,\n position,\n initial_price,\n min_quantity,\n max_quantity,\n leverage,\n liquidation_price,\n creation_timestamp_seconds as ts_secs,\n creation_timestamp_nanoseconds as ts_nanos,\n term_seconds as term_secs,\n term_nanoseconds as term_nanos,\n origin,\n oracle_event_id\n from orders\n ),\n\n cfd as (\n select\n ord.order_id,\n id as cfd_id,\n quantity_usd\n from cfds\n inner join ord on ord.order_id = id\n ),\n\n state as (\n select\n id as state_id,\n cfd.order_id,\n cfd.quantity_usd,\n state\n from cfd_states\n inner join cfd on cfd.cfd_id = cfd_states.cfd_id\n where id in (\n select\n max(id) as id\n from cfd_states\n group by (cfd_id)\n )\n )\n\n select\n ord.uuid as \"uuid: crate::model::cfd::OrderId\",\n ord.trading_pair as \"trading_pair: crate::model::TradingPair\",\n ord.position as \"position: crate::model::Position\",\n ord.initial_price,\n ord.min_quantity,\n ord.max_quantity,\n ord.leverage as \"leverage: crate::model::Leverage\",\n ord.liquidation_price,\n ord.ts_secs as \"ts_secs: i64\",\n ord.ts_nanos as \"ts_nanos: i32\",\n ord.term_secs as \"term_secs: i64\",\n ord.term_nanos as \"term_nanos: i32\",\n ord.origin as \"origin: crate::model::cfd::Origin\",\n ord.oracle_event_id,\n state.quantity_usd,\n state.state\n\n from ord\n inner join state on state.order_id = ord.order_id\n\n where ord.oracle_event_id = $1\n ", "describe": { "columns": [ { - "name": "order_id", + "name": "uuid: crate::model::cfd::OrderId", "ordinal": 0, "type_info": "Text" }, { - "name": "price", + "name": "trading_pair: crate::model::TradingPair", "ordinal": 1, "type_info": "Text" }, { - "name": "min_quantity", + "name": "position: crate::model::Position", "ordinal": 2, "type_info": "Text" }, { - "name": "max_quantity", + "name": "initial_price", "ordinal": 3, "type_info": "Text" }, { - "name": "leverage", + "name": "min_quantity", "ordinal": 4, - "type_info": "Int64" + "type_info": "Text" }, { - "name": "trading_pair", + "name": "max_quantity", "ordinal": 5, "type_info": "Text" }, { - "name": "position", + "name": "leverage: crate::model::Leverage", "ordinal": 6, - "type_info": "Text" + "type_info": "Int64" }, { - "name": "origin", + "name": "liquidation_price", "ordinal": 7, "type_info": "Text" }, { - "name": "liquidation_price", + "name": "ts_secs: i64", "ordinal": 8, - "type_info": "Text" + "type_info": "Int64" }, { - "name": "creation_timestamp", + "name": "ts_nanos: i32", "ordinal": 9, - "type_info": "Text" + "type_info": "Int64" }, { - "name": "term", + "name": "term_secs: i64", "ordinal": 10, + "type_info": "Int64" + }, + { + "name": "term_nanos: i32", + "ordinal": 11, + "type_info": "Int64" + }, + { + "name": "origin: crate::model::cfd::Origin", + "ordinal": 12, "type_info": "Text" }, { "name": "oracle_event_id", - "ordinal": 11, + "ordinal": 13, "type_info": "Text" }, { "name": "quantity_usd", - "ordinal": 12, + "ordinal": 14, "type_info": "Text" }, { "name": "state", - "ordinal": 13, + "ordinal": 15, "type_info": "Text" } ], @@ -244,82 +228,112 @@ false, false, false, + false, + false, false ] } }, - "e45a49eb1fb4cbddc046712d9e2df2db0c18c618f0fae2133e51d82eda6bec36": { - "query": "\n select\n orders.uuid as order_id,\n orders.initial_price as price,\n orders.min_quantity as min_quantity,\n orders.max_quantity as max_quantity,\n orders.leverage as leverage,\n orders.trading_pair as trading_pair,\n orders.position as position,\n orders.origin as origin,\n orders.liquidation_price as liquidation_price,\n orders.creation_timestamp as creation_timestamp,\n orders.term as term,\n orders.oracle_event_id,\n cfds.quantity_usd as quantity_usd,\n cfd_states.state as state\n from cfds as cfds\n inner join orders as orders on cfds.order_id = orders.id\n inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id\n where cfd_states.state in (\n select\n state\n from cfd_states\n where cfd_id = cfds.id\n order by id desc\n limit 1\n )\n and orders.oracle_event_id = ?\n ", + "8cbe349911b35d8e79763d64b4f5813b4bd98f12e0bba5ada84d2cae8b08ef4f": { + "query": "\n select\n id\n from cfds\n where order_uuid = $1;\n ", "describe": { "columns": [ { - "name": "order_id", + "name": "id", + "ordinal": 0, + "type_info": "Int64" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + true + ] + } + }, + "b1499d56ef5b4218d49f7e8179b03f9f9f8551a4826e93850608828faca1eeab": { + "query": "\n with ord as (\n select\n id as order_id,\n uuid,\n trading_pair,\n position,\n initial_price,\n min_quantity,\n max_quantity,\n leverage,\n liquidation_price,\n creation_timestamp_seconds as ts_secs,\n creation_timestamp_nanoseconds as ts_nanos,\n term_seconds as term_secs,\n term_nanoseconds as term_nanos,\n origin,\n oracle_event_id\n from orders\n ),\n\n cfd as (\n select\n ord.order_id,\n id as cfd_id,\n quantity_usd\n from cfds\n inner join ord on ord.order_id = id\n ),\n\n state as (\n select\n id as state_id,\n cfd.order_id,\n cfd.quantity_usd,\n state\n from cfd_states\n inner join cfd on cfd.cfd_id = cfd_states.cfd_id\n where id in (\n select\n max(id) as id\n from cfd_states\n group by (cfd_id)\n )\n )\n\n select\n ord.uuid as \"uuid: crate::model::cfd::OrderId\",\n ord.trading_pair as \"trading_pair: crate::model::TradingPair\",\n ord.position as \"position: crate::model::Position\",\n ord.initial_price,\n ord.min_quantity,\n ord.max_quantity,\n ord.leverage as \"leverage: crate::model::Leverage\",\n ord.liquidation_price,\n ord.ts_secs as \"ts_secs: i64\",\n ord.ts_nanos as \"ts_nanos: i32\",\n ord.term_secs as \"term_secs: i64\",\n ord.term_nanos as \"term_nanos: i32\",\n ord.origin as \"origin: crate::model::cfd::Origin\",\n ord.oracle_event_id,\n state.quantity_usd,\n state.state\n\n from ord\n inner join state on state.order_id = ord.order_id\n\n where ord.uuid = $1\n ", + "describe": { + "columns": [ + { + "name": "uuid: crate::model::cfd::OrderId", "ordinal": 0, "type_info": "Text" }, { - "name": "price", + "name": "trading_pair: crate::model::TradingPair", "ordinal": 1, "type_info": "Text" }, { - "name": "min_quantity", + "name": "position: crate::model::Position", "ordinal": 2, "type_info": "Text" }, { - "name": "max_quantity", + "name": "initial_price", "ordinal": 3, "type_info": "Text" }, { - "name": "leverage", + "name": "min_quantity", "ordinal": 4, - "type_info": "Int64" + "type_info": "Text" }, { - "name": "trading_pair", + "name": "max_quantity", "ordinal": 5, "type_info": "Text" }, { - "name": "position", + "name": "leverage: crate::model::Leverage", "ordinal": 6, - "type_info": "Text" + "type_info": "Int64" }, { - "name": "origin", + "name": "liquidation_price", "ordinal": 7, "type_info": "Text" }, { - "name": "liquidation_price", + "name": "ts_secs: i64", "ordinal": 8, - "type_info": "Text" + "type_info": "Int64" }, { - "name": "creation_timestamp", + "name": "ts_nanos: i32", "ordinal": 9, - "type_info": "Text" + "type_info": "Int64" }, { - "name": "term", + "name": "term_secs: i64", "ordinal": 10, + "type_info": "Int64" + }, + { + "name": "term_nanos: i32", + "ordinal": 11, + "type_info": "Int64" + }, + { + "name": "origin: crate::model::cfd::Origin", + "ordinal": 12, "type_info": "Text" }, { "name": "oracle_event_id", - "ordinal": 11, + "ordinal": 13, "type_info": "Text" }, { "name": "quantity_usd", - "ordinal": 12, + "ordinal": 14, "type_info": "Text" }, { "name": "state", - "ordinal": 13, + "ordinal": 15, "type_info": "Text" } ], @@ -340,77 +354,84 @@ false, false, false, + false, + false, false ] } }, - "f3dce76f316212c91cb3402b0cef00f1c9adbef8519c54e9bdbd373aab946209": { - "query": "\n select * from orders where uuid = ?;\n ", + "b708b7116aa14093e0e21ab8f87e52928a70649a62250902054ef6526f1d6f66": { + "query": "\n select\n uuid as \"uuid: crate::model::cfd::OrderId\",\n trading_pair as \"trading_pair: crate::model::TradingPair\",\n position as \"position: crate::model::Position\",\n initial_price,\n min_quantity,\n max_quantity,\n leverage as \"leverage: crate::model::Leverage\",\n liquidation_price,\n creation_timestamp_seconds as \"ts_secs: i64\",\n creation_timestamp_nanoseconds as \"ts_nanos: i32\",\n term_seconds as \"term_secs: i64\",\n term_nanoseconds as \"term_nanos: i32\",\n origin as \"origin: crate::model::cfd::Origin\",\n oracle_event_id\n\n from orders\n where uuid = $1\n ", "describe": { "columns": [ { - "name": "id", + "name": "uuid: crate::model::cfd::OrderId", "ordinal": 0, - "type_info": "Int64" + "type_info": "Text" }, { - "name": "uuid", + "name": "trading_pair: crate::model::TradingPair", "ordinal": 1, "type_info": "Text" }, { - "name": "trading_pair", + "name": "position: crate::model::Position", "ordinal": 2, "type_info": "Text" }, { - "name": "position", + "name": "initial_price", "ordinal": 3, "type_info": "Text" }, { - "name": "initial_price", + "name": "min_quantity", "ordinal": 4, "type_info": "Text" }, { - "name": "min_quantity", + "name": "max_quantity", "ordinal": 5, "type_info": "Text" }, { - "name": "max_quantity", + "name": "leverage: crate::model::Leverage", "ordinal": 6, - "type_info": "Text" + "type_info": "Int64" }, { - "name": "leverage", + "name": "liquidation_price", "ordinal": 7, - "type_info": "Int64" + "type_info": "Text" }, { - "name": "liquidation_price", + "name": "ts_secs: i64", "ordinal": 8, - "type_info": "Text" + "type_info": "Int64" }, { - "name": "creation_timestamp", + "name": "ts_nanos: i32", "ordinal": 9, - "type_info": "Text" + "type_info": "Int64" }, { - "name": "term", + "name": "term_secs: i64", "ordinal": 10, - "type_info": "Text" + "type_info": "Int64" }, { - "name": "origin", + "name": "term_nanos: i32", "ordinal": 11, + "type_info": "Int64" + }, + { + "name": "origin: crate::model::cfd::Origin", + "ordinal": 12, "type_info": "Text" }, { "name": "oracle_event_id", - "ordinal": 12, + "ordinal": 13, "type_info": "Text" } ], @@ -418,7 +439,8 @@ "Right": 1 }, "nullable": [ - true, + false, + false, false, false, false, @@ -433,25 +455,5 @@ false ] } - }, - "fac3d990211ef9e5fa8bcd6e1d6e0c8a81652b98fb11f2686affd1593fba75fd": { - "query": "\n insert into cfd_states (\n cfd_id,\n state\n ) values (?, ?);\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 2 - }, - "nullable": [] - } - }, - "fad32c267100e26f4fba80e4feb5ff45ee29c3a67bd378f6627b1f13ee45c573": { - "query": "\n insert into cfds (\n order_id,\n order_uuid,\n quantity_usd\n ) values (?, ?, ?);\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 3 - }, - "nullable": [] - } } } \ No newline at end of file diff --git a/daemon/src/db.rs b/daemon/src/db.rs index a024700..27fad5a 100644 --- a/daemon/src/db.rs +++ b/daemon/src/db.rs @@ -1,11 +1,14 @@ -use crate::model::cfd::{Cfd, CfdState, Order, OrderId, Origin}; -use crate::model::{BitMexPriceEventId, Leverage, Position}; +use crate::model::cfd::{Cfd, CfdState, Order, OrderId}; +use crate::model::{BitMexPriceEventId, Usd}; use anyhow::{Context, Result}; use rocket_db_pools::sqlx; +use rust_decimal::Decimal; use sqlx::pool::PoolConnection; -use sqlx::{Acquire, Sqlite, SqlitePool}; -use std::convert::TryInto; +use sqlx::{Sqlite, SqlitePool}; use std::mem; +use std::str::FromStr; +use std::time::SystemTime; +use time::{Duration, OffsetDateTime}; pub async fn run_migrations(pool: &SqlitePool) -> anyhow::Result<()> { sqlx::migrate!("./migrations").run(pool).await?; @@ -13,49 +16,48 @@ pub async fn run_migrations(pool: &SqlitePool) -> anyhow::Result<()> { } pub async fn insert_order(order: &Order, conn: &mut PoolConnection) -> anyhow::Result<()> { - let uuid = serde_json::to_string(&order.id).unwrap(); - let trading_pair = serde_json::to_string(&order.trading_pair).unwrap(); - let position = serde_json::to_string(&order.position).unwrap(); - let initial_price = serde_json::to_string(&order.price).unwrap(); - let min_quantity = serde_json::to_string(&order.min_quantity).unwrap(); - let max_quantity = serde_json::to_string(&order.max_quantity).unwrap(); - let leverage = order.leverage.0; - let liquidation_price = serde_json::to_string(&order.liquidation_price).unwrap(); - let creation_timestamp = serde_json::to_string(&order.creation_timestamp).unwrap(); - let term = serde_json::to_string(&order.term).unwrap(); - let origin = serde_json::to_string(&order.origin).unwrap(); - let oracle_event_id = order.oracle_event_id.to_string(); - - sqlx::query!( - r#" - insert into orders ( - uuid, - trading_pair, - position, - initial_price, - min_quantity, - max_quantity, - leverage, - liquidation_price, - creation_timestamp, - term, - origin, - oracle_event_id - ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); - "#, - uuid, - trading_pair, - position, - initial_price, - min_quantity, - max_quantity, - leverage, - liquidation_price, - creation_timestamp, - term, - origin, - oracle_event_id + sqlx::query( + r#"insert into orders ( + uuid, + trading_pair, + position, + initial_price, + min_quantity, + max_quantity, + leverage, + liquidation_price, + creation_timestamp_seconds, + creation_timestamp_nanoseconds, + term_seconds, + term_nanoseconds, + origin, + oracle_event_id + ) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)"#, + ) + .bind(&order.id) + .bind(&order.trading_pair) + .bind(&order.position) + .bind(&order.price.to_string()) + .bind(&order.min_quantity.to_string()) + .bind(&order.max_quantity.to_string()) + .bind(order.leverage.0) + .bind(&order.liquidation_price.to_string()) + .bind( + order + .creation_timestamp + .duration_since(SystemTime::UNIX_EPOCH)? + .as_secs() as i64, + ) + .bind( + order + .creation_timestamp + .duration_since(SystemTime::UNIX_EPOCH)? + .subsec_nanos() as i32, ) + .bind(&order.term.whole_seconds()) + .bind(&order.term.subsec_nanoseconds()) + .bind(&order.origin) + .bind(&order.oracle_event_id.to_string()) .execute(conn) .await?; @@ -66,97 +68,81 @@ pub async fn load_order_by_id( id: OrderId, conn: &mut PoolConnection, ) -> anyhow::Result { - let uuid = serde_json::to_string(&id).unwrap(); - let row = sqlx::query!( r#" - select * from orders where uuid = ?; + select + uuid as "uuid: crate::model::cfd::OrderId", + trading_pair as "trading_pair: crate::model::TradingPair", + position as "position: crate::model::Position", + initial_price, + min_quantity, + max_quantity, + leverage as "leverage: crate::model::Leverage", + liquidation_price, + creation_timestamp_seconds as "ts_secs: i64", + creation_timestamp_nanoseconds as "ts_nanos: i32", + term_seconds as "term_secs: i64", + term_nanoseconds as "term_nanos: i32", + origin as "origin: crate::model::cfd::Origin", + oracle_event_id + + from orders + where uuid = $1 "#, - uuid + id ) .fetch_one(conn) .await?; - let uuid = serde_json::from_str(row.uuid.as_str()).unwrap(); - let trading_pair = serde_json::from_str(row.trading_pair.as_str()).unwrap(); - let position = serde_json::from_str(row.position.as_str()).unwrap(); - let initial_price = serde_json::from_str(row.initial_price.as_str()).unwrap(); - let min_quantity = serde_json::from_str(row.min_quantity.as_str()).unwrap(); - let max_quantity = serde_json::from_str(row.max_quantity.as_str()).unwrap(); - let leverage = Leverage(row.leverage.try_into().unwrap()); - let liquidation_price = serde_json::from_str(row.liquidation_price.as_str()).unwrap(); - let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap(); - let term = serde_json::from_str(row.term.as_str()).unwrap(); - let origin = serde_json::from_str(row.origin.as_str()).unwrap(); - Ok(Order { - id: uuid, - trading_pair, - position, - price: initial_price, - min_quantity, - max_quantity, - leverage, - liquidation_price, - creation_timestamp, - term, - origin, - oracle_event_id: row.oracle_event_id.parse().unwrap(), + id: row.uuid, + trading_pair: row.trading_pair, + position: row.position, + price: Usd(Decimal::from_str(&row.initial_price)?), + min_quantity: Usd(Decimal::from_str(&row.min_quantity)?), + max_quantity: Usd(Decimal::from_str(&row.max_quantity)?), + leverage: row.leverage, + liquidation_price: Usd(Decimal::from_str(&row.liquidation_price)?), + creation_timestamp: convert_to_system_time(row.ts_secs, row.ts_nanos)?, + term: Duration::new(row.term_secs, row.term_nanos), + origin: row.origin, + oracle_event_id: row.oracle_event_id.parse::()?, }) } pub async fn insert_cfd(cfd: &Cfd, conn: &mut PoolConnection) -> anyhow::Result<()> { - let mut tx = conn.begin().await?; - - let order_uuid = serde_json::to_string(&cfd.order.id)?; - let order_row = sqlx::query!( + let state = serde_json::to_string(&cfd.state)?; + sqlx::query( r#" - select * from orders where uuid = ?; - "#, - order_uuid - ) - .fetch_one(&mut tx) - .await?; - - let order_id = order_row.id; - let quantity_usd = serde_json::to_string(&cfd.quantity_usd)?; - - let cfd_state = serde_json::to_string(&cfd.state)?; - - // save cfd + state in a transaction to make sure the state is only inserted if the cfd was - // inserted - - let cfd_id = sqlx::query!( - r#" - insert into cfds ( - order_id, - order_uuid, - quantity_usd - ) values (?, ?, ?); - "#, - order_id, - order_uuid, - quantity_usd, - ) - .execute(&mut tx) - .await? - .last_insert_rowid(); + insert into cfds ( + order_id, + order_uuid, + quantity_usd + ) + select + id as order_id, + uuid as order_uuid, + $2 as quantity_usd + from orders + where uuid = $1; - sqlx::query!( - r#" - insert into cfd_states ( - cfd_id, - state - ) values (?, ?); - "#, - cfd_id, - cfd_state, + insert into cfd_states ( + cfd_id, + state + ) + select + id as cfd_id, + $3 as state + from cfds + order by id desc limit 1; + "#, ) - .execute(&mut tx) + .bind(&cfd.order.id) + .bind(&cfd.quantity_usd.to_string()) + .bind(state) + .execute(conn) .await?; - tx.commit().await?; - Ok(()) } @@ -178,16 +164,16 @@ pub async fn append_cfd_state(cfd: &Cfd, conn: &mut PoolConnection) -> a let cfd_state = serde_json::to_string(new_state)?; - sqlx::query!( + sqlx::query( r#" insert into cfd_states ( cfd_id, state - ) values (?, ?); + ) values ($1, $2); "#, - cfd_id, - cfd_state, ) + .bind(cfd_id) + .bind(cfd_state) .execute(conn) .await?; @@ -199,14 +185,12 @@ async fn load_cfd_id_by_order_uuid( order_uuid: OrderId, conn: &mut PoolConnection, ) -> anyhow::Result { - let order_uuid = serde_json::to_string(&order_uuid)?; - let cfd_id = sqlx::query!( r#" select id from cfds - where order_uuid = ?; + where order_uuid = $1; "#, order_uuid ) @@ -228,7 +212,7 @@ async fn load_latest_cfd_state( select state from cfd_states - where cfd_id = ? + where cfd_id = $1 order by id desc limit 1; "#, @@ -246,159 +230,205 @@ pub async fn load_cfd_by_order_id( order_id: OrderId, conn: &mut PoolConnection, ) -> Result { - let order_uuid = serde_json::to_string(&order_id)?; - let row = sqlx::query!( r#" - select - orders.uuid as order_id, - orders.initial_price as price, - orders.min_quantity as min_quantity, - orders.max_quantity as max_quantity, - orders.leverage as leverage, - orders.trading_pair as trading_pair, - orders.position as position, - orders.origin as origin, - orders.liquidation_price as liquidation_price, - orders.creation_timestamp as creation_timestamp, - orders.term as term, - orders.oracle_event_id, - cfds.quantity_usd as quantity_usd, - cfd_states.state as state - from cfds as cfds - inner join orders as orders on cfds.order_id = orders.id - inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id - where cfd_states.state in ( + with ord as ( + select + id as order_id, + uuid, + trading_pair, + position, + initial_price, + min_quantity, + max_quantity, + leverage, + liquidation_price, + creation_timestamp_seconds as ts_secs, + creation_timestamp_nanoseconds as ts_nanos, + term_seconds as term_secs, + term_nanoseconds as term_nanos, + origin, + oracle_event_id + from orders + ), + + cfd as ( select - state - from cfd_states - where cfd_id = cfds.id - order by id desc - limit 1 + ord.order_id, + id as cfd_id, + quantity_usd + from cfds + inner join ord on ord.order_id = id + ), + + state as ( + select + id as state_id, + cfd.order_id, + cfd.quantity_usd, + state + from cfd_states + inner join cfd on cfd.cfd_id = cfd_states.cfd_id + where id in ( + select + max(id) as id + from cfd_states + group by (cfd_id) + ) ) - and orders.uuid = ? + + select + ord.uuid as "uuid: crate::model::cfd::OrderId", + ord.trading_pair as "trading_pair: crate::model::TradingPair", + ord.position as "position: crate::model::Position", + ord.initial_price, + ord.min_quantity, + ord.max_quantity, + ord.leverage as "leverage: crate::model::Leverage", + ord.liquidation_price, + ord.ts_secs as "ts_secs: i64", + ord.ts_nanos as "ts_nanos: i32", + ord.term_secs as "term_secs: i64", + ord.term_nanos as "term_nanos: i32", + ord.origin as "origin: crate::model::cfd::Origin", + ord.oracle_event_id, + state.quantity_usd, + state.state + + from ord + inner join state on state.order_id = ord.order_id + + where ord.uuid = $1 "#, - order_uuid + order_id ) .fetch_one(conn) .await?; - let order_id = serde_json::from_str(row.order_id.as_str()).unwrap(); - let trading_pair = serde_json::from_str(row.trading_pair.as_str()).unwrap(); - let position: Position = serde_json::from_str(row.position.as_str()).unwrap(); - let price = serde_json::from_str(row.price.as_str()).unwrap(); - let min_quantity = serde_json::from_str(row.min_quantity.as_str()).unwrap(); - let max_quantity = serde_json::from_str(row.max_quantity.as_str()).unwrap(); - let leverage = Leverage(row.leverage.try_into().unwrap()); - let liquidation_price = serde_json::from_str(row.liquidation_price.as_str()).unwrap(); - let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap(); - let term = serde_json::from_str(row.term.as_str()).unwrap(); - let origin: Origin = serde_json::from_str(row.origin.as_str()).unwrap(); - let oracle_event_id = row.oracle_event_id.parse().unwrap(); - - let quantity = serde_json::from_str(row.quantity_usd.as_str()).unwrap(); - let latest_state = serde_json::from_str(row.state.as_str()).unwrap(); - let order = Order { - id: order_id, - trading_pair, - position, - price, - min_quantity, - max_quantity, - leverage, - liquidation_price, - creation_timestamp, - term, - origin, - oracle_event_id, + id: row.uuid, + trading_pair: row.trading_pair, + position: row.position, + price: Usd(Decimal::from_str(&row.initial_price)?), + min_quantity: Usd(Decimal::from_str(&row.min_quantity)?), + max_quantity: Usd(Decimal::from_str(&row.max_quantity)?), + leverage: row.leverage, + liquidation_price: Usd(Decimal::from_str(&row.liquidation_price)?), + creation_timestamp: convert_to_system_time(row.ts_secs, row.ts_nanos)?, + term: Duration::new(row.term_secs, row.term_nanos), + origin: row.origin, + oracle_event_id: row.oracle_event_id.parse::()?, }; + // TODO: + // still have the use of serde_json::from_str() here, which will be dealt with + // via https://github.com/comit-network/hermes/issues/290 Ok(Cfd { order, - quantity_usd: quantity, - state: latest_state, + quantity_usd: Usd(Decimal::from_str(&row.quantity_usd)?), + state: serde_json::from_str(row.state.as_str())?, }) } /// Loads all CFDs with the latest state as the CFD state pub async fn load_all_cfds(conn: &mut PoolConnection) -> anyhow::Result> { - // TODO: Could be optimized with something like but not sure it's worth the complexity: - let rows = sqlx::query!( r#" - select - orders.uuid as order_id, - orders.initial_price as price, - orders.min_quantity as min_quantity, - orders.max_quantity as max_quantity, - orders.leverage as leverage, - orders.trading_pair as trading_pair, - orders.position as position, - orders.origin as origin, - orders.liquidation_price as liquidation_price, - orders.creation_timestamp as creation_timestamp, - orders.term as term, - orders.oracle_event_id, - cfds.quantity_usd as quantity_usd, - cfd_states.state as state - from cfds as cfds - inner join orders as orders on cfds.order_id = orders.id - inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id - where cfd_states.state in ( + with ord as ( + select + id as order_id, + uuid, + trading_pair, + position, + initial_price, + min_quantity, + max_quantity, + leverage, + liquidation_price, + creation_timestamp_seconds as ts_secs, + creation_timestamp_nanoseconds as ts_nanos, + term_seconds as term_secs, + term_nanoseconds as term_nanos, + origin, + oracle_event_id + from orders + ), + + cfd as ( + select + ord.order_id, + id as cfd_id, + quantity_usd + from cfds + inner join ord on ord.order_id = id + ), + + state as ( select - state - from cfd_states - where cfd_id = cfds.id - order by id desc - limit 1 + id as state_id, + cfd.order_id, + cfd.quantity_usd, + state + from cfd_states + inner join cfd on cfd.cfd_id = cfd_states.cfd_id + where id in ( + select + max(id) as id + from cfd_states + group by (cfd_id) + ) ) + + select + ord.uuid as "uuid: crate::model::cfd::OrderId", + ord.trading_pair as "trading_pair: crate::model::TradingPair", + ord.position as "position: crate::model::Position", + ord.initial_price, + ord.min_quantity, + ord.max_quantity, + ord.leverage as "leverage: crate::model::Leverage", + ord.liquidation_price, + ord.ts_secs as "ts_secs: i64", + ord.ts_nanos as "ts_nanos: i32", + ord.term_secs as "term_secs: i64", + ord.term_nanos as "term_nanos: i32", + ord.origin as "origin: crate::model::cfd::Origin", + ord.oracle_event_id, + state.quantity_usd, + state.state + + from ord + inner join state on state.order_id = ord.order_id "# ) .fetch_all(conn) .await?; let cfds = rows - .iter() + .into_iter() .map(|row| { - let order_id = serde_json::from_str(row.order_id.as_str()).unwrap(); - let trading_pair = serde_json::from_str(row.trading_pair.as_str()).unwrap(); - let position: Position = serde_json::from_str(row.position.as_str()).unwrap(); - let price = serde_json::from_str(row.price.as_str()).unwrap(); - let min_quantity = serde_json::from_str(row.min_quantity.as_str()).unwrap(); - let max_quantity = serde_json::from_str(row.max_quantity.as_str()).unwrap(); - let leverage = Leverage(row.leverage.try_into().unwrap()); - let liquidation_price = serde_json::from_str(row.liquidation_price.as_str()).unwrap(); - let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap(); - let term = serde_json::from_str(row.term.as_str()).unwrap(); - let origin: Origin = serde_json::from_str(row.origin.as_str()).unwrap(); - let oracle_event_id = row.oracle_event_id.clone().parse().unwrap(); - - let quantity = serde_json::from_str(row.quantity_usd.as_str()).unwrap(); - let latest_state = serde_json::from_str(row.state.as_str()).unwrap(); - let order = Order { - id: order_id, - trading_pair, - position, - price, - min_quantity, - max_quantity, - leverage, - liquidation_price, - creation_timestamp, - term, - origin, - oracle_event_id, + id: row.uuid, + trading_pair: row.trading_pair, + position: row.position, + price: Usd(Decimal::from_str(&row.initial_price)?), + min_quantity: Usd(Decimal::from_str(&row.min_quantity)?), + max_quantity: Usd(Decimal::from_str(&row.max_quantity)?), + leverage: row.leverage, + liquidation_price: Usd(Decimal::from_str(&row.liquidation_price)?), + creation_timestamp: convert_to_system_time(row.ts_secs, row.ts_nanos)?, + term: Duration::new(row.term_secs, row.term_nanos), + origin: row.origin, + oracle_event_id: row.oracle_event_id.parse::()?, }; - Cfd { + Ok(Cfd { order, - quantity_usd: quantity, - state: latest_state, - } + quantity_usd: Usd(Decimal::from_str(&row.quantity_usd)?), + state: serde_json::from_str(row.state.as_str())?, + }) }) - .collect(); + .collect::>>()?; Ok(cfds) } @@ -408,88 +438,119 @@ pub async fn load_cfds_by_oracle_event_id( oracle_event_id: BitMexPriceEventId, conn: &mut PoolConnection, ) -> anyhow::Result> { - let oracle_event_id = oracle_event_id.to_string(); - + let event_id = oracle_event_id.to_string(); let rows = sqlx::query!( r#" - select - orders.uuid as order_id, - orders.initial_price as price, - orders.min_quantity as min_quantity, - orders.max_quantity as max_quantity, - orders.leverage as leverage, - orders.trading_pair as trading_pair, - orders.position as position, - orders.origin as origin, - orders.liquidation_price as liquidation_price, - orders.creation_timestamp as creation_timestamp, - orders.term as term, - orders.oracle_event_id, - cfds.quantity_usd as quantity_usd, - cfd_states.state as state - from cfds as cfds - inner join orders as orders on cfds.order_id = orders.id - inner join cfd_states as cfd_states on cfd_states.cfd_id = cfds.id - where cfd_states.state in ( + with ord as ( + select + id as order_id, + uuid, + trading_pair, + position, + initial_price, + min_quantity, + max_quantity, + leverage, + liquidation_price, + creation_timestamp_seconds as ts_secs, + creation_timestamp_nanoseconds as ts_nanos, + term_seconds as term_secs, + term_nanoseconds as term_nanos, + origin, + oracle_event_id + from orders + ), + + cfd as ( + select + ord.order_id, + id as cfd_id, + quantity_usd + from cfds + inner join ord on ord.order_id = id + ), + + state as ( select - state - from cfd_states - where cfd_id = cfds.id - order by id desc - limit 1 + id as state_id, + cfd.order_id, + cfd.quantity_usd, + state + from cfd_states + inner join cfd on cfd.cfd_id = cfd_states.cfd_id + where id in ( + select + max(id) as id + from cfd_states + group by (cfd_id) + ) ) - and orders.oracle_event_id = ? + + select + ord.uuid as "uuid: crate::model::cfd::OrderId", + ord.trading_pair as "trading_pair: crate::model::TradingPair", + ord.position as "position: crate::model::Position", + ord.initial_price, + ord.min_quantity, + ord.max_quantity, + ord.leverage as "leverage: crate::model::Leverage", + ord.liquidation_price, + ord.ts_secs as "ts_secs: i64", + ord.ts_nanos as "ts_nanos: i32", + ord.term_secs as "term_secs: i64", + ord.term_nanos as "term_nanos: i32", + ord.origin as "origin: crate::model::cfd::Origin", + ord.oracle_event_id, + state.quantity_usd, + state.state + + from ord + inner join state on state.order_id = ord.order_id + + where ord.oracle_event_id = $1 "#, - oracle_event_id + event_id ) .fetch_all(conn) .await?; let cfds = rows - .iter() + .into_iter() .map(|row| { - let order_id = serde_json::from_str(row.order_id.as_str()).unwrap(); - let trading_pair = serde_json::from_str(row.trading_pair.as_str()).unwrap(); - let position: Position = serde_json::from_str(row.position.as_str()).unwrap(); - let price = serde_json::from_str(row.price.as_str()).unwrap(); - let min_quantity = serde_json::from_str(row.min_quantity.as_str()).unwrap(); - let max_quantity = serde_json::from_str(row.max_quantity.as_str()).unwrap(); - let leverage = Leverage(row.leverage.try_into().unwrap()); - let liquidation_price = serde_json::from_str(row.liquidation_price.as_str()).unwrap(); - let creation_timestamp = serde_json::from_str(row.creation_timestamp.as_str()).unwrap(); - let term = serde_json::from_str(row.term.as_str()).unwrap(); - let origin: Origin = serde_json::from_str(row.origin.as_str()).unwrap(); - let oracle_event_id = row.oracle_event_id.parse().unwrap(); - - let quantity = serde_json::from_str(row.quantity_usd.as_str()).unwrap(); - let latest_state = serde_json::from_str(row.state.as_str()).unwrap(); - let order = Order { - id: order_id, - trading_pair, - position, - price, - min_quantity, - max_quantity, - leverage, - liquidation_price, - creation_timestamp, - term, - origin, - oracle_event_id, + id: row.uuid, + trading_pair: row.trading_pair, + position: row.position, + price: Usd(Decimal::from_str(&row.initial_price)?), + min_quantity: Usd(Decimal::from_str(&row.min_quantity)?), + max_quantity: Usd(Decimal::from_str(&row.max_quantity)?), + leverage: row.leverage, + liquidation_price: Usd(Decimal::from_str(&row.liquidation_price)?), + creation_timestamp: convert_to_system_time(row.ts_secs, row.ts_nanos)?, + term: Duration::new(row.term_secs, row.term_nanos), + origin: row.origin, + oracle_event_id: row.oracle_event_id.parse::()?, }; - Cfd { + Ok(Cfd { order, - quantity_usd: quantity, - state: latest_state, - } + quantity_usd: Usd(Decimal::from_str(&row.quantity_usd)?), + state: serde_json::from_str(row.state.as_str())?, + }) }) - .collect(); + .collect::>>()?; Ok(cfds) } +fn convert_to_system_time(row_secs: i64, row_nanos: i32) -> Result { + let secs = row_secs as i128; + let nanos = row_nanos as i128; + let offset_dt = OffsetDateTime::from_unix_timestamp_nanos(secs * 1_000_000_000 + nanos)?; + + Ok(SystemTime::from(offset_dt)) +} + #[cfg(test)] mod tests { use pretty_assertions::assert_eq; @@ -500,7 +561,7 @@ mod tests { use time::OffsetDateTime; use crate::db::insert_order; - use crate::model::cfd::{Cfd, CfdState, Order}; + use crate::model::cfd::{Cfd, CfdState, Order, Origin}; use crate::model::Usd; use super::*; @@ -612,8 +673,11 @@ mod tests { async fn test_multiple_cfd_updates_per_cfd() { let mut conn = setup_test_db().await; - for _ in 0..100 { - let mut cfd = Cfd::dummy().insert(&mut conn).await; + for i in 0..100 { + let mut cfd = Cfd::dummy() + .with_event_id(BitMexPriceEventId::event1()) + .insert(&mut conn) + .await; let n_updates = rand::thread_rng().gen_range(1, 30); @@ -623,8 +687,15 @@ mod tests { } // verify current state is correct - let loaded = load_cfd_by_order_id(cfd.order.id, &mut conn).await.unwrap(); - assert_eq!(loaded, cfd); + let loaded_by_order_id = load_cfd_by_order_id(cfd.order.id, &mut conn).await.unwrap(); + assert_eq!(loaded_by_order_id, cfd); + + // load_cfds_by_oracle_event_id can return multiple CFDs + let loaded_by_oracle_event_id = + load_cfds_by_oracle_event_id(BitMexPriceEventId::event1(), &mut conn) + .await + .unwrap(); + assert_eq!(loaded_by_oracle_event_id.len(), i + 1); } // verify query returns only one state per CFD diff --git a/daemon/src/model.rs b/daemon/src/model.rs index 9618fda..bb65dc6 100644 --- a/daemon/src/model.rs +++ b/daemon/src/model.rs @@ -72,15 +72,16 @@ impl From for Percent { } } -#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, sqlx::Type)] +#[sqlx(transparent)] pub struct Leverage(pub u8); -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, sqlx::Type)] pub enum TradingPair { BtcUsd, } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, sqlx::Type)] pub enum Position { Buy, Sell, diff --git a/daemon/src/model/cfd.rs b/daemon/src/model/cfd.rs index f2f64db..5f6ecf0 100644 --- a/daemon/src/model/cfd.rs +++ b/daemon/src/model/cfd.rs @@ -19,7 +19,8 @@ use std::time::SystemTime; use time::Duration; use uuid::Uuid; -#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] +#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, sqlx::Type)] +#[sqlx(transparent)] pub struct OrderId(Uuid); impl Default for OrderId { @@ -45,7 +46,7 @@ impl<'v> FromParam<'v> for OrderId { // TODO: Could potentially remove this and use the Role in the Order instead /// Origin of the order -#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Copy, Clone, Serialize, Deserialize, PartialEq, sqlx::Type)] pub enum Origin { Ours, Theirs,