Browse Source

Provide feedback about taker actions in HTTP response

Invoke user actions (cfd actions & taking an order) synchronously in
order to be able to communicate the results.

Use HttpApiProblem to send error details to the frontend in a standard way.
fix/sql-oddness
Mariusz Klochowicz 3 years ago
parent
commit
f8075fe5ca
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 59
      daemon/src/routes_taker.rs
  2. 12
      daemon/src/taker_cfd.rs

59
daemon/src/routes_taker.rs

@ -4,6 +4,7 @@ use daemon::model::{Leverage, Price, Usd, WalletInfo};
use daemon::routes::EmbeddedFileExt; use daemon::routes::EmbeddedFileExt;
use daemon::to_sse_event::{CfdAction, CfdsWithAuxData, ToSseEvent}; use daemon::to_sse_event::{CfdAction, CfdsWithAuxData, ToSseEvent};
use daemon::{bitmex_price_feed, taker_cfd}; use daemon::{bitmex_price_feed, taker_cfd};
use http_api_problem::{HttpApiProblem, StatusCode};
use rocket::http::{ContentType, Status}; use rocket::http::{ContentType, Status};
use rocket::response::stream::EventStream; use rocket::response::stream::EventStream;
use rocket::response::{status, Responder}; use rocket::response::{status, Responder};
@ -15,7 +16,7 @@ use std::borrow::Cow;
use std::path::PathBuf; use std::path::PathBuf;
use tokio::select; use tokio::select;
use tokio::sync::watch; use tokio::sync::watch;
use xtra::prelude::MessageChannel; use xtra::prelude::*;
#[rocket::get("/feed")] #[rocket::get("/feed")]
pub async fn feed( pub async fn feed(
@ -105,13 +106,21 @@ pub struct CfdOrderRequest {
pub async fn post_order_request( pub async fn post_order_request(
cfd_order_request: Json<CfdOrderRequest>, cfd_order_request: Json<CfdOrderRequest>,
take_offer_channel: &State<Box<dyn MessageChannel<taker_cfd::TakeOffer>>>, take_offer_channel: &State<Box<dyn MessageChannel<taker_cfd::TakeOffer>>>,
) { ) -> Result<status::Accepted<()>, HttpApiProblem> {
take_offer_channel take_offer_channel
.do_send(taker_cfd::TakeOffer { .send(taker_cfd::TakeOffer {
order_id: cfd_order_request.order_id, order_id: cfd_order_request.order_id,
quantity: cfd_order_request.quantity, quantity: cfd_order_request.quantity,
}) })
.expect("actor to always be available"); .await
.unwrap_or_else(|e| anyhow::bail!(e.to_string()))
.map_err(|e| {
HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR)
.title("Order request failed")
.detail(e.to_string())
})?;
Ok(status::Accepted(None))
} }
#[rocket::post("/cfd/<id>/<action>")] #[rocket::post("/cfd/<id>/<action>")]
@ -120,37 +129,37 @@ pub async fn post_cfd_action(
action: CfdAction, action: CfdAction,
cfd_action_channel: &State<Box<dyn MessageChannel<taker_cfd::CfdAction>>>, cfd_action_channel: &State<Box<dyn MessageChannel<taker_cfd::CfdAction>>>,
quote_updates: &State<watch::Receiver<bitmex_price_feed::Quote>>, quote_updates: &State<watch::Receiver<bitmex_price_feed::Quote>>,
) -> Result<status::Accepted<()>, status::BadRequest<String>> { ) -> Result<status::Accepted<()>, HttpApiProblem> {
use taker_cfd::CfdAction::*; use taker_cfd::CfdAction::*;
match action { let result = match action {
CfdAction::AcceptOrder CfdAction::AcceptOrder
| CfdAction::RejectOrder | CfdAction::RejectOrder
| CfdAction::AcceptSettlement | CfdAction::AcceptSettlement
| CfdAction::RejectSettlement | CfdAction::RejectSettlement
| CfdAction::AcceptRollOver | CfdAction::AcceptRollOver
| CfdAction::RejectRollOver => { | CfdAction::RejectRollOver => {
return Err(status::BadRequest(None)); return Err(HttpApiProblem::new(StatusCode::BAD_REQUEST)
} .detail("taker cannot invoke this actions"));
CfdAction::Commit => {
cfd_action_channel
.do_send(Commit { order_id: id })
.map_err(|e| status::BadRequest(Some(e.to_string())))?;
} }
CfdAction::Commit => cfd_action_channel.send(Commit { order_id: id }),
CfdAction::Settle => { CfdAction::Settle => {
let current_price = quote_updates.borrow().for_taker(); let current_price = quote_updates.borrow().for_taker();
cfd_action_channel cfd_action_channel.send(ProposeSettlement {
.do_send(ProposeSettlement { order_id: id,
order_id: id, current_price,
current_price, })
})
.expect("actor to always be available");
} }
CfdAction::RollOver => { CfdAction::RollOver => cfd_action_channel.send(ProposeRollOver { order_id: id }),
cfd_action_channel };
.do_send(ProposeRollOver { order_id: id })
.expect("actor to always be available"); result
} .await
} .unwrap_or_else(|e| anyhow::bail!(e.to_string()))
.map_err(|e| {
HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR)
.title(action.to_string() + " failed")
.detail(e.to_string())
})?;
Ok(status::Accepted(None)) Ok(status::Accepted(None))
} }
@ -177,7 +186,7 @@ pub struct MarginResponse {
#[rocket::post("/calculate/margin", data = "<margin_request>")] #[rocket::post("/calculate/margin", data = "<margin_request>")]
pub fn margin_calc( pub fn margin_calc(
margin_request: Json<MarginRequest>, margin_request: Json<MarginRequest>,
) -> Result<status::Accepted<Json<MarginResponse>>, status::BadRequest<String>> { ) -> Result<status::Accepted<Json<MarginResponse>>, HttpApiProblem> {
let margin = calculate_long_margin( let margin = calculate_long_margin(
margin_request.price, margin_request.price,
margin_request.quantity, margin_request.quantity,

12
daemon/src/taker_cfd.rs

@ -652,8 +652,8 @@ where
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static, W: 'static> Handler<TakeOffer> for Actor<O, M, W> { impl<O: 'static, M: 'static, W: 'static> Handler<TakeOffer> for Actor<O, M, W> {
async fn handle(&mut self, msg: TakeOffer, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: TakeOffer, _ctx: &mut Context<Self>) -> Result<()> {
log_error!(self.handle_take_offer(msg.order_id, msg.quantity)); self.handle_take_offer(msg.order_id, msg.quantity).await
} }
} }
@ -664,7 +664,7 @@ where
+ xtra::Handler<wallet::Sign> + xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>, + xtra::Handler<wallet::BuildPartyParams>,
{ {
async fn handle(&mut self, msg: CfdAction, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: CfdAction, _ctx: &mut Context<Self>) -> Result<()> {
use CfdAction::*; use CfdAction::*;
if let Err(e) = match msg { if let Err(e) = match msg {
@ -679,7 +679,9 @@ where
ProposeRollOver { order_id } => self.handle_propose_roll_over(order_id).await, ProposeRollOver { order_id } => self.handle_propose_roll_over(order_id).await,
} { } {
tracing::error!("Message handler failed: {:#}", e); tracing::error!("Message handler failed: {:#}", e);
anyhow::bail!(e)
} }
Ok(())
} }
} }
@ -789,11 +791,11 @@ where
} }
impl Message for TakeOffer { impl Message for TakeOffer {
type Result = (); type Result = Result<()>;
} }
impl Message for CfdAction { impl Message for CfdAction {
type Result = (); type Result = Result<()>;
} }
// this signature is a bit different because we use `Address::attach_stream` // this signature is a bit different because we use `Address::attach_stream`

Loading…
Cancel
Save