From 1dbe127db9103c8f14347959c2ae53cc502c9376 Mon Sep 17 00:00:00 2001 From: Lucas Soriano del Pino Date: Fri, 15 Oct 2021 14:27:00 +1100 Subject: [PATCH 1/5] Use MessageChannel in post_order_request HTTP handler --- daemon/src/routes_taker.rs | 8 ++++---- daemon/src/taker.rs | 8 +++++++- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/daemon/src/routes_taker.rs b/daemon/src/routes_taker.rs index f8c26de..1537942 100644 --- a/daemon/src/routes_taker.rs +++ b/daemon/src/routes_taker.rs @@ -15,6 +15,7 @@ use std::borrow::Cow; use std::path::PathBuf; use tokio::select; use tokio::sync::watch; +use xtra::prelude::MessageChannel; use xtra::Address; #[rocket::get("/feed")] @@ -104,14 +105,13 @@ pub struct CfdOrderRequest { #[rocket::post("/cfd/order", data = "")] pub async fn post_order_request( cfd_order_request: Json, - cfd_actor_inbox: &State>, + take_offer_channel: &State>>, ) { - cfd_actor_inbox - .do_send_async(taker_cfd::TakeOffer { + take_offer_channel + .do_send(taker_cfd::TakeOffer { order_id: cfd_order_request.order_id, quantity: cfd_order_request.quantity, }) - .await .expect("actor to always be available"); } diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index fa87db4..3bbdbe9 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -23,6 +23,7 @@ use std::time::Duration; use tokio::sync::watch; use tokio_util::codec::FramedRead; use tracing_subscriber::filter::LevelFilter; +use xtra::prelude::MessageChannel; use xtra::spawn::TokioGlobalSpawnExt; use xtra::Actor; @@ -264,7 +265,12 @@ async fn main() -> Result<()> { .await .unwrap(); - Ok(rocket.manage(cfd_actor_inbox).manage(cfd_feed_receiver)) + let take_offer_channel = + MessageChannel::::clone_channel(&cfd_actor_inbox); + Ok(rocket + .manage(take_offer_channel) + .manage(cfd_actor_inbox) + .manage(cfd_feed_receiver)) }, )) .mount( From d00ac5bfddd264c5d2bc7a746ece714ef6684bf6 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 15 Oct 2021 14:36:51 +1100 Subject: [PATCH 2/5] Don't return plain-text errors to the user --- daemon/src/routes_maker.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/daemon/src/routes_maker.rs b/daemon/src/routes_maker.rs index 2485ca3..a2d2d7f 100644 --- a/daemon/src/routes_maker.rs +++ b/daemon/src/routes_maker.rs @@ -148,7 +148,7 @@ pub fn post_cfd_action( action: CfdAction, cfd_action_channel: &State>>, _auth: Authenticated, -) -> Result, status::BadRequest> { +) -> Result, status::BadRequest<()>> { use maker_cfd::CfdAction::*; match action { CfdAction::AcceptOrder => { @@ -188,14 +188,14 @@ pub fn post_cfd_action( .expect("actor to always be available"); } CfdAction::Settle => { - return Err(status::BadRequest(Some( - "Collaborative settlement can only be triggered by taker".to_string(), - ))); + tracing::error!("Collaborative settlement can only be triggered by taker"); + + return Err(status::BadRequest(None)); } CfdAction::RollOver => { - return Err(status::BadRequest(Some( - "RollOver proposal can only be triggered by taker".to_string(), - ))); + tracing::error!("RollOver proposal can only be triggered by taker"); + + return Err(status::BadRequest(None)); } } From e2f926388a5f1a691b9d9402f0e187f56725c639 Mon Sep 17 00:00:00 2001 From: Lucas Soriano del Pino Date: Fri, 15 Oct 2021 14:38:28 +1100 Subject: [PATCH 3/5] Introduce taker_cfd::CfdAction enum To reduce boilerplate and allow us to use an `xtra::MessageChannel` in the (very near) future. --- daemon/src/routes_taker.rs | 7 +++-- daemon/src/taker_cfd.rs | 64 +++++++++++++++++--------------------- 2 files changed, 32 insertions(+), 39 deletions(-) diff --git a/daemon/src/routes_taker.rs b/daemon/src/routes_taker.rs index 1537942..556846b 100644 --- a/daemon/src/routes_taker.rs +++ b/daemon/src/routes_taker.rs @@ -122,6 +122,7 @@ pub async fn post_cfd_action( cfd_actor_address: &State>, quote_updates: &State>, ) -> Result, status::BadRequest> { + use taker_cfd::CfdAction::*; match action { CfdAction::AcceptOrder | CfdAction::RejectOrder @@ -133,14 +134,14 @@ pub async fn post_cfd_action( } CfdAction::Commit => { cfd_actor_address - .do_send_async(taker_cfd::Commit { order_id: id }) + .do_send_async(Commit { order_id: id }) .await .map_err(|e| status::BadRequest(Some(e.to_string())))?; } CfdAction::Settle => { let current_price = quote_updates.borrow().for_taker(); cfd_actor_address - .do_send_async(taker_cfd::ProposeSettlement { + .do_send_async(ProposeSettlement { order_id: id, current_price, }) @@ -149,7 +150,7 @@ pub async fn post_cfd_action( } CfdAction::RollOver => { cfd_actor_address - .do_send_async(taker_cfd::ProposeRollOver { order_id: id }) + .do_send_async(ProposeRollOver { order_id: id }) .await .expect("actor to always be available"); } diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 4c3c69c..f1c5ce4 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -26,13 +26,17 @@ pub struct TakeOffer { pub quantity: Usd, } -pub struct ProposeSettlement { - pub order_id: OrderId, - pub current_price: Usd, -} - -pub struct ProposeRollOver { - pub order_id: OrderId, +pub enum CfdAction { + ProposeSettlement { + order_id: OrderId, + current_price: Usd, + }, + ProposeRollOver { + order_id: OrderId, + }, + Commit { + order_id: OrderId, + }, } pub struct MakerStreamMessage { @@ -49,10 +53,6 @@ pub struct CfdRollOverCompleted { pub dlc: Result, } -pub struct Commit { - pub order_id: OrderId, -} - enum SetupState { Active { sender: mpsc::UnboundedSender, @@ -610,16 +610,23 @@ impl Handler for Actor { } #[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: ProposeSettlement, _ctx: &mut Context) { - log_error!(self.handle_propose_settlement(msg.order_id, msg.current_price)); - } -} +impl Handler for Actor { + async fn handle(&mut self, msg: CfdAction, _ctx: &mut Context) { + use CfdAction::*; -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: ProposeRollOver, _ctx: &mut Context) { - log_error!(self.handle_propose_roll_over(msg.order_id)); + if let Err(e) = match msg { + Commit { order_id } => self.handle_commit(order_id).await, + ProposeSettlement { + order_id, + current_price, + } => { + self.handle_propose_settlement(order_id, current_price) + .await + } + ProposeRollOver { order_id } => self.handle_propose_roll_over(order_id).await, + } { + tracing::error!("Message handler failed: {:#}", e); + } } } @@ -704,22 +711,11 @@ impl Handler for Actor { } } -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: Commit, _ctx: &mut Context) { - log_error!(self.handle_commit(msg.order_id)) - } -} - impl Message for TakeOffer { type Result = (); } -impl Message for ProposeSettlement { - type Result = (); -} - -impl Message for ProposeRollOver { +impl Message for CfdAction { type Result = (); } @@ -736,8 +732,4 @@ impl Message for CfdRollOverCompleted { type Result = (); } -impl Message for Commit { - type Result = (); -} - impl xtra::Actor for Actor {} From 7c8d5e110bc94e41069ec6c18f8d6f4c572089d3 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 15 Oct 2021 14:41:23 +1100 Subject: [PATCH 4/5] Don't expose panicking code paths to the user An actor not being reachable is a bad internal state but it should never lead to an actual panic. --- daemon/src/routes_maker.rs | 53 +++++++++++--------------------------- 1 file changed, 15 insertions(+), 38 deletions(-) diff --git a/daemon/src/routes_maker.rs b/daemon/src/routes_maker.rs index a2d2d7f..d092738 100644 --- a/daemon/src/routes_maker.rs +++ b/daemon/src/routes_maker.rs @@ -148,58 +148,35 @@ pub fn post_cfd_action( action: CfdAction, cfd_action_channel: &State>>, _auth: Authenticated, -) -> Result, status::BadRequest<()>> { +) -> Result, status::Custom<()>> { use maker_cfd::CfdAction::*; - match action { - CfdAction::AcceptOrder => { - cfd_action_channel - .do_send(AcceptOrder { order_id: id }) - .expect("actor to always be available"); - } - CfdAction::RejectOrder => { - cfd_action_channel - .do_send(RejectOrder { order_id: id }) - .expect("actor to always be available"); - } + let result = match action { + CfdAction::AcceptOrder => cfd_action_channel.do_send(AcceptOrder { order_id: id }), + CfdAction::RejectOrder => cfd_action_channel.do_send(RejectOrder { order_id: id }), CfdAction::AcceptSettlement => { - cfd_action_channel - .do_send(AcceptSettlement { order_id: id }) - .expect("actor to always be available"); + cfd_action_channel.do_send(AcceptSettlement { order_id: id }) } CfdAction::RejectSettlement => { - cfd_action_channel - .do_send(RejectSettlement { order_id: id }) - .expect("actor to always be available"); - } - - CfdAction::AcceptRollOver => { - cfd_action_channel - .do_send(AcceptRollOver { order_id: id }) - .expect("actor to always be available"); - } - CfdAction::RejectRollOver => { - cfd_action_channel - .do_send(RejectRollOver { order_id: id }) - .expect("actor to always be available"); - } - CfdAction::Commit => { - cfd_action_channel - .do_send(Commit { order_id: id }) - .expect("actor to always be available"); + cfd_action_channel.do_send(RejectSettlement { order_id: id }) } + CfdAction::AcceptRollOver => cfd_action_channel.do_send(AcceptRollOver { order_id: id }), + CfdAction::RejectRollOver => cfd_action_channel.do_send(RejectRollOver { order_id: id }), + CfdAction::Commit => cfd_action_channel.do_send(Commit { order_id: id }), CfdAction::Settle => { tracing::error!("Collaborative settlement can only be triggered by taker"); - return Err(status::BadRequest(None)); + return Err(status::Custom(Status::BadRequest, ())); } CfdAction::RollOver => { tracing::error!("RollOver proposal can only be triggered by taker"); - return Err(status::BadRequest(None)); + return Err(status::Custom(Status::BadRequest, ())); } - } + }; - Ok(status::Accepted(None)) + result + .map(|()| status::Accepted(None)) + .map_err(|_| status::Custom(Status::InternalServerError, ())) } #[rocket::get("/alive")] From 05c1674ae3ba43b9779fbb961fd68fe7294ace5a Mon Sep 17 00:00:00 2001 From: Lucas Soriano del Pino Date: Fri, 15 Oct 2021 14:45:26 +1100 Subject: [PATCH 5/5] Use MessageChannel in taker's post_cfd_action HTTP handler --- daemon/src/routes_taker.rs | 18 +++++++----------- daemon/src/taker.rs | 4 +++- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/daemon/src/routes_taker.rs b/daemon/src/routes_taker.rs index 556846b..03094e0 100644 --- a/daemon/src/routes_taker.rs +++ b/daemon/src/routes_taker.rs @@ -16,7 +16,6 @@ use std::path::PathBuf; use tokio::select; use tokio::sync::watch; use xtra::prelude::MessageChannel; -use xtra::Address; #[rocket::get("/feed")] pub async fn feed( @@ -119,7 +118,7 @@ pub async fn post_order_request( pub async fn post_cfd_action( id: OrderId, action: CfdAction, - cfd_actor_address: &State>, + cfd_action_channel: &State>>, quote_updates: &State>, ) -> Result, status::BadRequest> { use taker_cfd::CfdAction::*; @@ -133,25 +132,22 @@ pub async fn post_cfd_action( return Err(status::BadRequest(None)); } CfdAction::Commit => { - cfd_actor_address - .do_send_async(Commit { order_id: id }) - .await + cfd_action_channel + .do_send(Commit { order_id: id }) .map_err(|e| status::BadRequest(Some(e.to_string())))?; } CfdAction::Settle => { let current_price = quote_updates.borrow().for_taker(); - cfd_actor_address - .do_send_async(ProposeSettlement { + cfd_action_channel + .do_send(ProposeSettlement { order_id: id, current_price, }) - .await .expect("actor to always be available"); } CfdAction::RollOver => { - cfd_actor_address - .do_send_async(ProposeRollOver { order_id: id }) - .await + cfd_action_channel + .do_send(ProposeRollOver { order_id: id }) .expect("actor to always be available"); } } diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 3bbdbe9..967dc79 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -267,9 +267,11 @@ async fn main() -> Result<()> { let take_offer_channel = MessageChannel::::clone_channel(&cfd_actor_inbox); + let cfd_action_channel = + MessageChannel::::clone_channel(&cfd_actor_inbox); Ok(rocket .manage(take_offer_channel) - .manage(cfd_actor_inbox) + .manage(cfd_action_channel) .manage(cfd_feed_receiver)) }, ))