@ -16,7 +16,7 @@ pub async fn run_migrations(pool: &SqlitePool) -> anyhow::Result<()> {
}
}
pub async fn insert_order ( order : & Order , conn : & mut PoolConnection < Sqlite > ) -> anyhow ::Result < ( ) > {
pub async fn insert_order ( order : & Order , conn : & mut PoolConnection < Sqlite > ) -> anyhow ::Result < ( ) > {
sqlx ::query (
let query_result = sqlx ::query (
r # " insert into orders (
r # " insert into orders (
uuid ,
uuid ,
trading_pair ,
trading_pair ,
@ -61,6 +61,10 @@ pub async fn insert_order(order: &Order, conn: &mut PoolConnection<Sqlite>) -> a
. execute ( conn )
. execute ( conn )
. await ? ;
. await ? ;
if query_result . rows_affected ( ) ! = 1 {
anyhow ::bail ! ( "failed to insert order" ) ;
}
Ok ( ( ) )
Ok ( ( ) )
}
}
@ -112,7 +116,7 @@ pub async fn load_order_by_id(
pub async fn insert_cfd ( cfd : & Cfd , conn : & mut PoolConnection < Sqlite > ) -> anyhow ::Result < ( ) > {
pub async fn insert_cfd ( cfd : & Cfd , conn : & mut PoolConnection < Sqlite > ) -> anyhow ::Result < ( ) > {
let state = serde_json ::to_string ( & cfd . state ) ? ;
let state = serde_json ::to_string ( & cfd . state ) ? ;
sqlx ::query (
let query_result = sqlx ::query (
r # "
r # "
insert into cfds (
insert into cfds (
order_id ,
order_id ,
@ -143,6 +147,11 @@ pub async fn insert_cfd(cfd: &Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow:
. execute ( conn )
. execute ( conn )
. await ? ;
. await ? ;
// Should be 2 because we insert into cfds and cfd_states
if query_result . rows_affected ( ) ! = 2 {
anyhow ::bail ! ( "failed to insert cfd" ) ;
}
Ok ( ( ) )
Ok ( ( ) )
}
}
@ -553,14 +562,16 @@ fn convert_to_system_time(row_secs: i64, row_nanos: i32) -> Result<SystemTime> {
#[ cfg(test) ]
#[ cfg(test) ]
mod tests {
mod tests {
use crate ::cfd_actors ;
use pretty_assertions ::assert_eq ;
use pretty_assertions ::assert_eq ;
use rand ::Rng ;
use rand ::Rng ;
use rust_decimal_macros ::dec ;
use rust_decimal_macros ::dec ;
use sqlx ::SqlitePool ;
use sqlx ::SqlitePool ;
use time ::macros ::datetime ;
use time ::macros ::datetime ;
use time ::OffsetDateTime ;
use time ::OffsetDateTime ;
use tokio ::sync ::watch ;
use crate ::db ::insert_order ;
use crate ::db ::{ self , insert_order } ;
use crate ::model ::cfd ::{ Cfd , CfdState , Order , Origin } ;
use crate ::model ::cfd ::{ Cfd , CfdState , Order , Origin } ;
use crate ::model ::Usd ;
use crate ::model ::Usd ;
@ -586,6 +597,32 @@ mod tests {
assert_eq ! ( vec ! [ cfd ] , loaded ) ;
assert_eq ! ( vec ! [ cfd ] , loaded ) ;
}
}
#[ tokio::test ]
async fn test_insert_like_cfd_actor ( ) {
let mut conn = setup_test_db ( ) . await ;
let cfds = load_all_cfds ( & mut conn ) . await . unwrap ( ) ;
let ( cfd_feed_sender , cfd_feed_receiver ) = watch ::channel ( cfds . clone ( ) ) ;
assert_eq ! ( cfd_feed_receiver . borrow ( ) . clone ( ) , vec ! [ ] ) ;
let cfd_1 = Cfd ::dummy ( ) ;
db ::insert_order ( & cfd_1 . order , & mut conn ) . await . unwrap ( ) ;
cfd_actors ::insert_cfd ( & cfd_1 , & mut conn , & cfd_feed_sender )
. await
. unwrap ( ) ;
assert_eq ! ( cfd_feed_receiver . borrow ( ) . clone ( ) , vec ! [ cfd_1 . clone ( ) ] ) ;
let cfd_2 = Cfd ::dummy ( ) ;
db ::insert_order ( & cfd_2 . order , & mut conn ) . await . unwrap ( ) ;
cfd_actors ::insert_cfd ( & cfd_2 , & mut conn , & cfd_feed_sender )
. await
. unwrap ( ) ;
assert_eq ! ( cfd_feed_receiver . borrow ( ) . clone ( ) , vec ! [ cfd_1 , cfd_2 ] ) ;
}
#[ tokio::test ]
#[ tokio::test ]
async fn test_insert_and_load_cfd_by_order_id ( ) {
async fn test_insert_and_load_cfd_by_order_id ( ) {
let mut conn = setup_test_db ( ) . await ;
let mut conn = setup_test_db ( ) . await ;