diff --git a/daemon/src/routes_taker.rs b/daemon/src/routes_taker.rs index 9f0d2e3..d383510 100644 --- a/daemon/src/routes_taker.rs +++ b/daemon/src/routes_taker.rs @@ -4,6 +4,7 @@ use daemon::model::{Leverage, Price, Usd, WalletInfo}; use daemon::routes::EmbeddedFileExt; use daemon::to_sse_event::{CfdAction, CfdsWithAuxData, ToSseEvent}; use daemon::{bitmex_price_feed, taker_cfd}; +use http_api_problem::{HttpApiProblem, StatusCode}; use rocket::http::{ContentType, Status}; use rocket::response::stream::EventStream; use rocket::response::{status, Responder}; @@ -15,7 +16,7 @@ use std::borrow::Cow; use std::path::PathBuf; use tokio::select; use tokio::sync::watch; -use xtra::prelude::MessageChannel; +use xtra::prelude::*; #[rocket::get("/feed")] pub async fn feed( @@ -105,13 +106,21 @@ pub struct CfdOrderRequest { pub async fn post_order_request( cfd_order_request: Json, take_offer_channel: &State>>, -) { +) -> Result, HttpApiProblem> { take_offer_channel - .do_send(taker_cfd::TakeOffer { + .send(taker_cfd::TakeOffer { order_id: cfd_order_request.order_id, quantity: cfd_order_request.quantity, }) - .expect("actor to always be available"); + .await + .unwrap_or_else(|e| anyhow::bail!(e.to_string())) + .map_err(|e| { + HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR) + .title("Order request failed") + .detail(e.to_string()) + })?; + + Ok(status::Accepted(None)) } #[rocket::post("/cfd//")] @@ -120,37 +129,37 @@ pub async fn post_cfd_action( action: CfdAction, cfd_action_channel: &State>>, quote_updates: &State>, -) -> Result, status::BadRequest> { +) -> Result, HttpApiProblem> { use taker_cfd::CfdAction::*; - match action { + let result = match action { CfdAction::AcceptOrder | CfdAction::RejectOrder | CfdAction::AcceptSettlement | CfdAction::RejectSettlement | CfdAction::AcceptRollOver | CfdAction::RejectRollOver => { - return Err(status::BadRequest(None)); - } - CfdAction::Commit => { - cfd_action_channel - .do_send(Commit { order_id: id }) - .map_err(|e| status::BadRequest(Some(e.to_string())))?; + return Err(HttpApiProblem::new(StatusCode::BAD_REQUEST) + .detail("taker cannot invoke this actions")); } + CfdAction::Commit => cfd_action_channel.send(Commit { order_id: id }), CfdAction::Settle => { let current_price = quote_updates.borrow().for_taker(); - cfd_action_channel - .do_send(ProposeSettlement { - order_id: id, - current_price, - }) - .expect("actor to always be available"); + cfd_action_channel.send(ProposeSettlement { + order_id: id, + current_price, + }) } - CfdAction::RollOver => { - cfd_action_channel - .do_send(ProposeRollOver { order_id: id }) - .expect("actor to always be available"); - } - } + CfdAction::RollOver => cfd_action_channel.send(ProposeRollOver { order_id: id }), + }; + + result + .await + .unwrap_or_else(|e| anyhow::bail!(e.to_string())) + .map_err(|e| { + HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR) + .title(action.to_string() + " failed") + .detail(e.to_string()) + })?; Ok(status::Accepted(None)) } @@ -177,7 +186,7 @@ pub struct MarginResponse { #[rocket::post("/calculate/margin", data = "")] pub fn margin_calc( margin_request: Json, -) -> Result>, status::BadRequest> { +) -> Result>, HttpApiProblem> { let margin = calculate_long_margin( margin_request.price, margin_request.quantity, diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 89f94cc..b059963 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -652,8 +652,8 @@ where #[async_trait] impl Handler for Actor { - async fn handle(&mut self, msg: TakeOffer, _ctx: &mut Context) { - log_error!(self.handle_take_offer(msg.order_id, msg.quantity)); + async fn handle(&mut self, msg: TakeOffer, _ctx: &mut Context) -> Result<()> { + self.handle_take_offer(msg.order_id, msg.quantity).await } } @@ -664,7 +664,7 @@ where + xtra::Handler + xtra::Handler, { - async fn handle(&mut self, msg: CfdAction, _ctx: &mut Context) { + async fn handle(&mut self, msg: CfdAction, _ctx: &mut Context) -> Result<()> { use CfdAction::*; if let Err(e) = match msg { @@ -679,7 +679,9 @@ where ProposeRollOver { order_id } => self.handle_propose_roll_over(order_id).await, } { tracing::error!("Message handler failed: {:#}", e); + anyhow::bail!(e) } + Ok(()) } } @@ -789,11 +791,11 @@ where } impl Message for TakeOffer { - type Result = (); + type Result = Result<()>; } impl Message for CfdAction { - type Result = (); + type Result = Result<()>; } // this signature is a bit different because we use `Address::attach_stream`