diff --git a/Cargo.lock b/Cargo.lock index d9ce6d7..a66b25d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -637,9 +637,11 @@ dependencies = [ "cfd_protocol", "chrono", "clap", + "derive_more", "futures", "hex", "hkdf", + "http-api-problem", "itertools", "mockall", "mockall_derive", @@ -712,6 +714,17 @@ dependencies = [ "syn", ] +[[package]] +name = "derive_more" +version = "0.99.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40eebddd2156ce1bb37b20bbe5151340a31828b1f2d22ba4141f3531710e38df" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "devise" version = "0.3.1" @@ -1211,6 +1224,18 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-api-problem" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d43e8970113f6e4a9138e6cd57b20de3ed99585cab427eb99d27a440827dbe2" +dependencies = [ + "http", + "rocket", + "serde", + "serde_json", +] + [[package]] name = "http-body" version = "0.4.3" diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml index af4922d..ff3be62 100644 --- a/daemon/Cargo.toml +++ b/daemon/Cargo.toml @@ -12,9 +12,11 @@ bytes = "1" cfd_protocol = { path = "../cfd_protocol" } chrono = { version = "0.4", features = ["serde"] } clap = "3.0.0-beta.5" +derive_more = { version = "0.99.16", default-features = false, features = ["display"] } futures = { version = "0.3", default-features = false } hex = "0.4" hkdf = "0.11" +http-api-problem = { version = "0.51.0", features = ["rocket"] } itertools = "0.10" nalgebra = { version = "0.29", default-features = false, features = ["std"] } ndarray = "0.15.3" diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 767718b..62d0929 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -925,7 +925,7 @@ where + xtra::Handler + xtra::Handler, { - async fn handle(&mut self, msg: CfdAction, ctx: &mut Context) { + async fn handle(&mut self, msg: CfdAction, ctx: &mut Context) -> Result<()> { use CfdAction::*; if let Err(e) = match msg { AcceptOrder { order_id } => self.handle_accept_order(order_id, ctx).await, @@ -936,8 +936,9 @@ where RejectRollOver { order_id } => self.handle_reject_roll_over(order_id).await, Commit { order_id } => self.handle_commit(order_id).await, } { - tracing::error!("Message handler failed: {:#}", e); + anyhow::bail!("Message handler failed: {:#}", e); } + Ok(()) } } @@ -946,8 +947,9 @@ impl Handler for Actor where T: xtra::Handler, { - async fn handle(&mut self, msg: NewOrder, _ctx: &mut Context) { - log_error!(self.handle_new_order(msg.price, msg.min_quantity, msg.max_quantity)); + async fn handle(&mut self, msg: NewOrder, _ctx: &mut Context) -> Result<()> { + self.handle_new_order(msg.price, msg.min_quantity, msg.max_quantity) + .await } } @@ -1067,7 +1069,7 @@ where } impl Message for NewOrder { - type Result = (); + type Result = Result<()>; } impl Message for NewTakerOnline { @@ -1083,7 +1085,7 @@ impl Message for CfdRollOverCompleted { } impl Message for CfdAction { - type Result = (); + type Result = Result<()>; } impl Message for FromTaker { diff --git a/daemon/src/routes_maker.rs b/daemon/src/routes_maker.rs index 761dcc4..ed68ae3 100644 --- a/daemon/src/routes_maker.rs +++ b/daemon/src/routes_maker.rs @@ -6,6 +6,7 @@ use daemon::model::{Price, Usd, WalletInfo}; use daemon::routes::EmbeddedFileExt; use daemon::to_sse_event::{CfdAction, CfdsWithAuxData, ToSseEvent}; use daemon::{bitmex_price_feed, maker_cfd}; +use http_api_problem::{HttpApiProblem, StatusCode}; use rocket::http::{ContentType, Header, Status}; use rocket::response::stream::EventStream; use rocket::response::{status, Responder}; @@ -17,7 +18,7 @@ use std::borrow::Cow; use std::path::PathBuf; use tokio::select; use tokio::sync::watch; -use xtra::prelude::MessageChannel; +use xtra::prelude::*; #[rocket::get("/feed")] pub async fn maker_feed( @@ -109,18 +110,24 @@ pub struct CfdNewOrderRequest { } #[rocket::post("/order/sell", data = "")] -pub fn post_sell_order( +pub async fn post_sell_order( order: Json, new_order_channel: &State>>, _auth: Authenticated, -) -> Result, Status> { +) -> Result, HttpApiProblem> { new_order_channel - .do_send(maker_cfd::NewOrder { + .send(maker_cfd::NewOrder { price: order.price, min_quantity: order.min_quantity, max_quantity: order.max_quantity, }) - .map_err(|_| Status::new(500))?; + .await + .unwrap_or_else(|_| anyhow::bail!("actor disconnected")) // TODO: is there a better way? + .map_err(|_| { + HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR) + .title("Action failed") + .detail("failed to post a sell order") + })?; Ok(status::Accepted(None)) } @@ -143,40 +150,42 @@ pub struct PromptAuthentication { } #[rocket::post("/cfd//")] -pub fn post_cfd_action( +pub async fn post_cfd_action( id: OrderId, action: CfdAction, cfd_action_channel: &State>>, _auth: Authenticated, -) -> Result, status::Custom<()>> { +) -> Result, HttpApiProblem> { use maker_cfd::CfdAction::*; let result = match action { - CfdAction::AcceptOrder => cfd_action_channel.do_send(AcceptOrder { order_id: id }), - CfdAction::RejectOrder => cfd_action_channel.do_send(RejectOrder { order_id: id }), - CfdAction::AcceptSettlement => { - cfd_action_channel.do_send(AcceptSettlement { order_id: id }) - } - CfdAction::RejectSettlement => { - cfd_action_channel.do_send(RejectSettlement { order_id: id }) - } - CfdAction::AcceptRollOver => cfd_action_channel.do_send(AcceptRollOver { order_id: id }), - CfdAction::RejectRollOver => cfd_action_channel.do_send(RejectRollOver { order_id: id }), - CfdAction::Commit => cfd_action_channel.do_send(Commit { order_id: id }), + CfdAction::AcceptOrder => cfd_action_channel.send(AcceptOrder { order_id: id }), + CfdAction::RejectOrder => cfd_action_channel.send(RejectOrder { order_id: id }), + CfdAction::AcceptSettlement => cfd_action_channel.send(AcceptSettlement { order_id: id }), + CfdAction::RejectSettlement => cfd_action_channel.send(RejectSettlement { order_id: id }), + CfdAction::AcceptRollOver => cfd_action_channel.send(AcceptRollOver { order_id: id }), + CfdAction::RejectRollOver => cfd_action_channel.send(RejectRollOver { order_id: id }), + CfdAction::Commit => cfd_action_channel.send(Commit { order_id: id }), CfdAction::Settle => { - tracing::error!("Collaborative settlement can only be triggered by taker"); - - return Err(status::Custom(Status::BadRequest, ())); + let msg = "Collaborative settlement can only be triggered by taker"; + tracing::error!(msg); + return Err(HttpApiProblem::new(StatusCode::BAD_REQUEST).detail(msg)); } CfdAction::RollOver => { - tracing::error!("RollOver proposal can only be triggered by taker"); - - return Err(status::Custom(Status::BadRequest, ())); + let msg = "RollOver proposal can only be triggered by taker"; + tracing::error!(msg); + return Err(HttpApiProblem::new(StatusCode::BAD_REQUEST).detail(msg)); } }; result - .map(|()| status::Accepted(None)) - .map_err(|_| status::Custom(Status::InternalServerError, ())) + .await + .unwrap_or_else(|_| anyhow::bail!("actor disconnected")) // TODO: is there a better way? + .map_err(|_| { + HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR) + .title(action.to_string() + " failed") + })?; + + Ok(status::Accepted(None)) } #[rocket::get("/alive")] diff --git a/daemon/src/to_sse_event.rs b/daemon/src/to_sse_event.rs index 049e7e8..0acd0ed 100644 --- a/daemon/src/to_sse_event.rs +++ b/daemon/src/to_sse_event.rs @@ -167,7 +167,7 @@ pub enum TxLabel { Collaborative, } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Debug, derive_more::Display, Clone, Serialize, Deserialize, PartialEq)] #[serde(rename_all = "camelCase")] pub enum CfdAction { AcceptOrder, diff --git a/daemon/tests/happy_path.rs b/daemon/tests/happy_path.rs index 2829f3d..7295bd4 100644 --- a/daemon/tests/happy_path.rs +++ b/daemon/tests/happy_path.rs @@ -25,7 +25,7 @@ async fn taker_receives_order_from_maker_on_publication() { assert!(is_next_none(&mut taker.order_feed).await); - maker.publish_order(dummy_new_order()); + maker.publish_order(dummy_new_order()).await; let (published, received) = tokio::join!( next_some(&mut maker.order_feed), @@ -43,7 +43,7 @@ async fn taker_takes_order_and_maker_rejects() { // TODO: Why is this needed? For the cfd stream it is not needed is_next_none(&mut taker.order_feed).await; - maker.publish_order(dummy_new_order()); + maker.publish_order(dummy_new_order()).await; let (_, received) = next_order(&mut maker.order_feed, &mut taker.order_feed).await; @@ -94,7 +94,7 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() { is_next_none(&mut taker.order_feed).await; - maker.publish_order(dummy_new_order()); + maker.publish_order(dummy_new_order()).await; let (_, received) = next_order(&mut maker.order_feed, &mut taker.order_feed).await; diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index 5225cd3..3f81d09 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -109,8 +109,12 @@ impl Maker { } } - pub fn publish_order(&mut self, new_order_params: maker_cfd::NewOrder) { - self.cfd_actor_addr.do_send(new_order_params).unwrap(); + pub async fn publish_order(&mut self, new_order_params: maker_cfd::NewOrder) { + self.cfd_actor_addr + .send(new_order_params) + .await + .unwrap() + .unwrap(); } pub fn reject_take_request(&self, order: Order) {