From 3d9601e66578de9b71f41abf0f622dd0b1beef4e Mon Sep 17 00:00:00 2001 From: Lucas Soriano del Pino Date: Fri, 15 Oct 2021 10:32:44 +1100 Subject: [PATCH 1/3] Use MessageChannel in post_sell_order HTTP handler --- daemon/src/maker.rs | 3 +++ daemon/src/routes_maker.rs | 10 +++++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 67e67fd..a827706 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -288,8 +288,11 @@ async fn main() -> Result<()> { tokio::spawn(maker_inc_connections_address.attach_stream(listener_stream)); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); + let new_order_channel = + MessageChannel::::clone_channel(&cfd_maker_actor_inbox); Ok(rocket .manage(cfd_maker_actor_inbox) + .manage(new_order_channel) .manage(cfd_feed_receiver)) }, )) diff --git a/daemon/src/routes_maker.rs b/daemon/src/routes_maker.rs index 404b55f..928e9ee 100644 --- a/daemon/src/routes_maker.rs +++ b/daemon/src/routes_maker.rs @@ -17,6 +17,7 @@ use std::borrow::Cow; use std::path::PathBuf; use tokio::select; use tokio::sync::watch; +use xtra::prelude::MessageChannel; use xtra::Address; #[rocket::get("/feed")] @@ -109,18 +110,17 @@ pub struct CfdNewOrderRequest { } #[rocket::post("/order/sell", data = "")] -pub async fn post_sell_order( +pub fn post_sell_order( order: Json, - cfd_actor_address: &State>, + new_order_channel: &State>>, _auth: Authenticated, ) -> Result, Status> { - cfd_actor_address - .do_send_async(maker_cfd::NewOrder { + new_order_channel + .do_send(maker_cfd::NewOrder { price: order.price, min_quantity: order.min_quantity, max_quantity: order.max_quantity, }) - .await .map_err(|_| Status::new(500))?; Ok(status::Accepted(None)) From f393e421133182d0e2f496f73a3a27cd10226612 Mon Sep 17 00:00:00 2001 From: Lucas Soriano del Pino Date: Fri, 15 Oct 2021 11:46:22 +1100 Subject: [PATCH 2/3] Introduce maker_cfd::CfdAction enum To reduce boilerplate and allow us to use an `xtra::MessageChannel` in the (very near) future. --- daemon/src/maker_cfd.rs | 119 +++++++------------------------------ daemon/src/routes_maker.rs | 15 ++--- 2 files changed, 31 insertions(+), 103 deletions(-) diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 54ef783..a7a034e 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -24,32 +24,14 @@ use tokio::sync::watch; use xtra::prelude::*; use xtra::KeepRunning; -pub struct AcceptOrder { - pub order_id: OrderId, -} - -pub struct RejectOrder { - pub order_id: OrderId, -} - -pub struct Commit { - pub order_id: OrderId, -} - -pub struct AcceptSettlement { - pub order_id: OrderId, -} - -pub struct RejectSettlement { - pub order_id: OrderId, -} - -pub struct AcceptRollOver { - pub order_id: OrderId, -} - -pub struct RejectRollOver { - pub order_id: OrderId, +pub enum CfdAction { + AcceptOrder { order_id: OrderId }, + RejectOrder { order_id: OrderId }, + AcceptSettlement { order_id: OrderId }, + RejectSettlement { order_id: OrderId }, + AcceptRollOver { order_id: OrderId }, + RejectRollOver { order_id: OrderId }, + Commit { order_id: OrderId }, } pub struct NewOrder { @@ -984,51 +966,20 @@ impl Actor { } #[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: AcceptOrder, ctx: &mut Context) { - log_error!(self.handle_accept_order(msg.order_id, ctx)) - } -} - -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: RejectOrder, _ctx: &mut Context) { - log_error!(self.handle_reject_order(msg.order_id)) - } -} - -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: AcceptSettlement, _ctx: &mut Context) { - log_error!(self.handle_accept_settlement(msg.order_id)) - } -} - -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: RejectSettlement, _ctx: &mut Context) { - log_error!(self.handle_reject_settlement(msg.order_id)) - } -} - -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: AcceptRollOver, ctx: &mut Context) { - log_error!(self.handle_accept_roll_over(msg.order_id, ctx)) - } -} - -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: RejectRollOver, _ctx: &mut Context) { - log_error!(self.handle_reject_roll_over(msg.order_id)) - } -} - -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: Commit, _ctx: &mut Context) { - log_error!(self.handle_commit(msg.order_id)) +impl Handler for Actor { + async fn handle(&mut self, msg: CfdAction, ctx: &mut Context) { + use CfdAction::*; + if let Err(e) = match msg { + AcceptOrder { order_id } => self.handle_accept_order(order_id, ctx).await, + RejectOrder { order_id } => self.handle_reject_order(order_id).await, + AcceptSettlement { order_id } => self.handle_accept_settlement(order_id).await, + RejectSettlement { order_id } => self.handle_reject_settlement(order_id).await, + AcceptRollOver { order_id } => self.handle_accept_roll_over(order_id, ctx).await, + RejectRollOver { order_id } => self.handle_reject_roll_over(order_id).await, + Commit { order_id } => self.handle_commit(order_id).await, + } { + tracing::error!("Message handler failed: {:#}", e); + } } } @@ -1159,31 +1110,7 @@ impl Message for CfdRollOverCompleted { type Result = (); } -impl Message for AcceptOrder { - type Result = (); -} - -impl Message for RejectOrder { - type Result = (); -} - -impl Message for Commit { - type Result = (); -} - -impl Message for AcceptSettlement { - type Result = (); -} - -impl Message for RejectSettlement { - type Result = (); -} - -impl Message for AcceptRollOver { - type Result = (); -} - -impl Message for RejectRollOver { +impl Message for CfdAction { type Result = (); } diff --git a/daemon/src/routes_maker.rs b/daemon/src/routes_maker.rs index 928e9ee..2e8b5ef 100644 --- a/daemon/src/routes_maker.rs +++ b/daemon/src/routes_maker.rs @@ -150,47 +150,48 @@ pub async fn post_cfd_action( cfd_actor_address: &State>, _auth: Authenticated, ) -> Result, status::BadRequest> { + use maker_cfd::CfdAction::*; match action { CfdAction::AcceptOrder => { cfd_actor_address - .do_send_async(maker_cfd::AcceptOrder { order_id: id }) + .do_send_async(AcceptOrder { order_id: id }) .await .expect("actor to always be available"); } CfdAction::RejectOrder => { cfd_actor_address - .do_send_async(maker_cfd::RejectOrder { order_id: id }) + .do_send_async(RejectOrder { order_id: id }) .await .expect("actor to always be available"); } CfdAction::AcceptSettlement => { cfd_actor_address - .do_send_async(maker_cfd::AcceptSettlement { order_id: id }) + .do_send_async(AcceptSettlement { order_id: id }) .await .expect("actor to always be available"); } CfdAction::RejectSettlement => { cfd_actor_address - .do_send_async(maker_cfd::RejectSettlement { order_id: id }) + .do_send_async(RejectSettlement { order_id: id }) .await .expect("actor to always be available"); } CfdAction::AcceptRollOver => { cfd_actor_address - .do_send_async(maker_cfd::AcceptRollOver { order_id: id }) + .do_send_async(AcceptRollOver { order_id: id }) .await .expect("actor to always be available"); } CfdAction::RejectRollOver => { cfd_actor_address - .do_send_async(maker_cfd::RejectRollOver { order_id: id }) + .do_send_async(RejectRollOver { order_id: id }) .await .expect("actor to always be available"); } CfdAction::Commit => { cfd_actor_address - .do_send_async(maker_cfd::Commit { order_id: id }) + .do_send_async(Commit { order_id: id }) .await .expect("actor to always be available"); } From 1342db80b1e1188908e44dae157d30bdb0dd7029 Mon Sep 17 00:00:00 2001 From: Lucas Soriano del Pino Date: Fri, 15 Oct 2021 10:46:32 +1100 Subject: [PATCH 3/3] Use MessageChannel in post_cfd_action HTTP handler --- daemon/src/maker.rs | 4 +++- daemon/src/routes_maker.rs | 40 +++++++++++++++----------------------- 2 files changed, 19 insertions(+), 25 deletions(-) diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index a827706..9ebbb7c 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -288,10 +288,12 @@ async fn main() -> Result<()> { tokio::spawn(maker_inc_connections_address.attach_stream(listener_stream)); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); + let cfd_action_channel = + MessageChannel::::clone_channel(&cfd_maker_actor_inbox); let new_order_channel = MessageChannel::::clone_channel(&cfd_maker_actor_inbox); Ok(rocket - .manage(cfd_maker_actor_inbox) + .manage(cfd_action_channel) .manage(new_order_channel) .manage(cfd_feed_receiver)) }, diff --git a/daemon/src/routes_maker.rs b/daemon/src/routes_maker.rs index 2e8b5ef..2485ca3 100644 --- a/daemon/src/routes_maker.rs +++ b/daemon/src/routes_maker.rs @@ -18,7 +18,6 @@ use std::path::PathBuf; use tokio::select; use tokio::sync::watch; use xtra::prelude::MessageChannel; -use xtra::Address; #[rocket::get("/feed")] pub async fn maker_feed( @@ -144,55 +143,48 @@ pub struct PromptAuthentication { } #[rocket::post("/cfd//")] -pub async fn post_cfd_action( +pub fn post_cfd_action( id: OrderId, action: CfdAction, - cfd_actor_address: &State>, + cfd_action_channel: &State>>, _auth: Authenticated, ) -> Result, status::BadRequest> { use maker_cfd::CfdAction::*; match action { CfdAction::AcceptOrder => { - cfd_actor_address - .do_send_async(AcceptOrder { order_id: id }) - .await + cfd_action_channel + .do_send(AcceptOrder { order_id: id }) .expect("actor to always be available"); } CfdAction::RejectOrder => { - cfd_actor_address - .do_send_async(RejectOrder { order_id: id }) - .await + cfd_action_channel + .do_send(RejectOrder { order_id: id }) .expect("actor to always be available"); } CfdAction::AcceptSettlement => { - cfd_actor_address - .do_send_async(AcceptSettlement { order_id: id }) - .await + cfd_action_channel + .do_send(AcceptSettlement { order_id: id }) .expect("actor to always be available"); } CfdAction::RejectSettlement => { - cfd_actor_address - .do_send_async(RejectSettlement { order_id: id }) - .await + cfd_action_channel + .do_send(RejectSettlement { order_id: id }) .expect("actor to always be available"); } CfdAction::AcceptRollOver => { - cfd_actor_address - .do_send_async(AcceptRollOver { order_id: id }) - .await + cfd_action_channel + .do_send(AcceptRollOver { order_id: id }) .expect("actor to always be available"); } CfdAction::RejectRollOver => { - cfd_actor_address - .do_send_async(RejectRollOver { order_id: id }) - .await + cfd_action_channel + .do_send(RejectRollOver { order_id: id }) .expect("actor to always be available"); } CfdAction::Commit => { - cfd_actor_address - .do_send_async(Commit { order_id: id }) - .await + cfd_action_channel + .do_send(Commit { order_id: id }) .expect("actor to always be available"); } CfdAction::Settle => {