From df402180f2b5c60eb1ec18ed005b5f12c68f5a84 Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Tue, 19 Oct 2021 11:46:08 +1030 Subject: [PATCH 1/2] Validate sqlx insertions Check query results whether the affected rows match our expectations --- daemon/src/db.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/daemon/src/db.rs b/daemon/src/db.rs index 27fad5a..a3e012e 100644 --- a/daemon/src/db.rs +++ b/daemon/src/db.rs @@ -16,7 +16,7 @@ pub async fn run_migrations(pool: &SqlitePool) -> anyhow::Result<()> { } pub async fn insert_order(order: &Order, conn: &mut PoolConnection) -> anyhow::Result<()> { - sqlx::query( + let query_result = sqlx::query( r#"insert into orders ( uuid, trading_pair, @@ -61,6 +61,10 @@ pub async fn insert_order(order: &Order, conn: &mut PoolConnection) -> a .execute(conn) .await?; + if query_result.rows_affected() != 1 { + anyhow::bail!("failed to insert order"); + } + Ok(()) } @@ -112,7 +116,7 @@ pub async fn load_order_by_id( pub async fn insert_cfd(cfd: &Cfd, conn: &mut PoolConnection) -> anyhow::Result<()> { let state = serde_json::to_string(&cfd.state)?; - sqlx::query( + let query_result = sqlx::query( r#" insert into cfds ( order_id, @@ -143,6 +147,11 @@ pub async fn insert_cfd(cfd: &Cfd, conn: &mut PoolConnection) -> anyhow: .execute(conn) .await?; + // Should be 2 because we insert into cfds and cfd_states + if query_result.rows_affected() != 2 { + anyhow::bail!("failed to insert cfd"); + } + Ok(()) } From 5b615476a308ef014331844fa5d021dedd395e38 Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Tue, 19 Oct 2021 11:46:56 +1030 Subject: [PATCH 2/2] Test order and cfd insertion in the actor Check whether after inserting order and the cfd the Cfd feed works as expected --- daemon/src/db.rs | 30 +++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/daemon/src/db.rs b/daemon/src/db.rs index a3e012e..8b1874f 100644 --- a/daemon/src/db.rs +++ b/daemon/src/db.rs @@ -562,14 +562,16 @@ fn convert_to_system_time(row_secs: i64, row_nanos: i32) -> Result { #[cfg(test)] mod tests { + use crate::cfd_actors; use pretty_assertions::assert_eq; use rand::Rng; use rust_decimal_macros::dec; use sqlx::SqlitePool; use time::macros::datetime; use time::OffsetDateTime; + use tokio::sync::watch; - use crate::db::insert_order; + use crate::db::{self, insert_order}; use crate::model::cfd::{Cfd, CfdState, Order, Origin}; use crate::model::Usd; @@ -595,6 +597,32 @@ mod tests { assert_eq!(vec![cfd], loaded); } + #[tokio::test] + async fn test_insert_like_cfd_actor() { + let mut conn = setup_test_db().await; + + let cfds = load_all_cfds(&mut conn).await.unwrap(); + + let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone()); + + assert_eq!(cfd_feed_receiver.borrow().clone(), vec![]); + + let cfd_1 = Cfd::dummy(); + db::insert_order(&cfd_1.order, &mut conn).await.unwrap(); + cfd_actors::insert_cfd(&cfd_1, &mut conn, &cfd_feed_sender) + .await + .unwrap(); + + assert_eq!(cfd_feed_receiver.borrow().clone(), vec![cfd_1.clone()]); + + let cfd_2 = Cfd::dummy(); + db::insert_order(&cfd_2.order, &mut conn).await.unwrap(); + cfd_actors::insert_cfd(&cfd_2, &mut conn, &cfd_feed_sender) + .await + .unwrap(); + assert_eq!(cfd_feed_receiver.borrow().clone(), vec![cfd_1, cfd_2]); + } + #[tokio::test] async fn test_insert_and_load_cfd_by_order_id() { let mut conn = setup_test_db().await;