diff --git a/daemon/src/routes_maker.rs b/daemon/src/routes_maker.rs index 2485ca3..d092738 100644 --- a/daemon/src/routes_maker.rs +++ b/daemon/src/routes_maker.rs @@ -148,58 +148,35 @@ pub fn post_cfd_action( action: CfdAction, cfd_action_channel: &State>>, _auth: Authenticated, -) -> Result, status::BadRequest> { +) -> Result, status::Custom<()>> { use maker_cfd::CfdAction::*; - match action { - CfdAction::AcceptOrder => { - cfd_action_channel - .do_send(AcceptOrder { order_id: id }) - .expect("actor to always be available"); - } - CfdAction::RejectOrder => { - cfd_action_channel - .do_send(RejectOrder { order_id: id }) - .expect("actor to always be available"); - } + let result = match action { + CfdAction::AcceptOrder => cfd_action_channel.do_send(AcceptOrder { order_id: id }), + CfdAction::RejectOrder => cfd_action_channel.do_send(RejectOrder { order_id: id }), CfdAction::AcceptSettlement => { - cfd_action_channel - .do_send(AcceptSettlement { order_id: id }) - .expect("actor to always be available"); + cfd_action_channel.do_send(AcceptSettlement { order_id: id }) } CfdAction::RejectSettlement => { - cfd_action_channel - .do_send(RejectSettlement { order_id: id }) - .expect("actor to always be available"); - } - - CfdAction::AcceptRollOver => { - cfd_action_channel - .do_send(AcceptRollOver { order_id: id }) - .expect("actor to always be available"); - } - CfdAction::RejectRollOver => { - cfd_action_channel - .do_send(RejectRollOver { order_id: id }) - .expect("actor to always be available"); - } - CfdAction::Commit => { - cfd_action_channel - .do_send(Commit { order_id: id }) - .expect("actor to always be available"); + cfd_action_channel.do_send(RejectSettlement { order_id: id }) } + CfdAction::AcceptRollOver => cfd_action_channel.do_send(AcceptRollOver { order_id: id }), + CfdAction::RejectRollOver => cfd_action_channel.do_send(RejectRollOver { order_id: id }), + CfdAction::Commit => cfd_action_channel.do_send(Commit { order_id: id }), CfdAction::Settle => { - return Err(status::BadRequest(Some( - "Collaborative settlement can only be triggered by taker".to_string(), - ))); + tracing::error!("Collaborative settlement can only be triggered by taker"); + + return Err(status::Custom(Status::BadRequest, ())); } CfdAction::RollOver => { - return Err(status::BadRequest(Some( - "RollOver proposal can only be triggered by taker".to_string(), - ))); + tracing::error!("RollOver proposal can only be triggered by taker"); + + return Err(status::Custom(Status::BadRequest, ())); } - } + }; - Ok(status::Accepted(None)) + result + .map(|()| status::Accepted(None)) + .map_err(|_| status::Custom(Status::InternalServerError, ())) } #[rocket::get("/alive")] diff --git a/daemon/src/routes_taker.rs b/daemon/src/routes_taker.rs index f8c26de..03094e0 100644 --- a/daemon/src/routes_taker.rs +++ b/daemon/src/routes_taker.rs @@ -15,7 +15,7 @@ use std::borrow::Cow; use std::path::PathBuf; use tokio::select; use tokio::sync::watch; -use xtra::Address; +use xtra::prelude::MessageChannel; #[rocket::get("/feed")] pub async fn feed( @@ -104,14 +104,13 @@ pub struct CfdOrderRequest { #[rocket::post("/cfd/order", data = "")] pub async fn post_order_request( cfd_order_request: Json, - cfd_actor_inbox: &State>, + take_offer_channel: &State>>, ) { - cfd_actor_inbox - .do_send_async(taker_cfd::TakeOffer { + take_offer_channel + .do_send(taker_cfd::TakeOffer { order_id: cfd_order_request.order_id, quantity: cfd_order_request.quantity, }) - .await .expect("actor to always be available"); } @@ -119,9 +118,10 @@ pub async fn post_order_request( pub async fn post_cfd_action( id: OrderId, action: CfdAction, - cfd_actor_address: &State>, + cfd_action_channel: &State>>, quote_updates: &State>, ) -> Result, status::BadRequest> { + use taker_cfd::CfdAction::*; match action { CfdAction::AcceptOrder | CfdAction::RejectOrder @@ -132,25 +132,22 @@ pub async fn post_cfd_action( return Err(status::BadRequest(None)); } CfdAction::Commit => { - cfd_actor_address - .do_send_async(taker_cfd::Commit { order_id: id }) - .await + cfd_action_channel + .do_send(Commit { order_id: id }) .map_err(|e| status::BadRequest(Some(e.to_string())))?; } CfdAction::Settle => { let current_price = quote_updates.borrow().for_taker(); - cfd_actor_address - .do_send_async(taker_cfd::ProposeSettlement { + cfd_action_channel + .do_send(ProposeSettlement { order_id: id, current_price, }) - .await .expect("actor to always be available"); } CfdAction::RollOver => { - cfd_actor_address - .do_send_async(taker_cfd::ProposeRollOver { order_id: id }) - .await + cfd_action_channel + .do_send(ProposeRollOver { order_id: id }) .expect("actor to always be available"); } } diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index fa87db4..967dc79 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -23,6 +23,7 @@ use std::time::Duration; use tokio::sync::watch; use tokio_util::codec::FramedRead; use tracing_subscriber::filter::LevelFilter; +use xtra::prelude::MessageChannel; use xtra::spawn::TokioGlobalSpawnExt; use xtra::Actor; @@ -264,7 +265,14 @@ async fn main() -> Result<()> { .await .unwrap(); - Ok(rocket.manage(cfd_actor_inbox).manage(cfd_feed_receiver)) + let take_offer_channel = + MessageChannel::::clone_channel(&cfd_actor_inbox); + let cfd_action_channel = + MessageChannel::::clone_channel(&cfd_actor_inbox); + Ok(rocket + .manage(take_offer_channel) + .manage(cfd_action_channel) + .manage(cfd_feed_receiver)) }, )) .mount( diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 1710b18..49b11b6 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -26,13 +26,17 @@ pub struct TakeOffer { pub quantity: Usd, } -pub struct ProposeSettlement { - pub order_id: OrderId, - pub current_price: Usd, -} - -pub struct ProposeRollOver { - pub order_id: OrderId, +pub enum CfdAction { + ProposeSettlement { + order_id: OrderId, + current_price: Usd, + }, + ProposeRollOver { + order_id: OrderId, + }, + Commit { + order_id: OrderId, + }, } pub struct MakerStreamMessage { @@ -49,10 +53,6 @@ pub struct CfdRollOverCompleted { pub dlc: Result, } -pub struct Commit { - pub order_id: OrderId, -} - enum SetupState { Active { sender: mpsc::UnboundedSender, @@ -569,16 +569,23 @@ impl Handler for Actor { } #[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: ProposeSettlement, _ctx: &mut Context) { - log_error!(self.handle_propose_settlement(msg.order_id, msg.current_price)); - } -} +impl Handler for Actor { + async fn handle(&mut self, msg: CfdAction, _ctx: &mut Context) { + use CfdAction::*; -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: ProposeRollOver, _ctx: &mut Context) { - log_error!(self.handle_propose_roll_over(msg.order_id)); + if let Err(e) = match msg { + Commit { order_id } => self.handle_commit(order_id).await, + ProposeSettlement { + order_id, + current_price, + } => { + self.handle_propose_settlement(order_id, current_price) + .await + } + ProposeRollOver { order_id } => self.handle_propose_roll_over(order_id).await, + } { + tracing::error!("Message handler failed: {:#}", e); + } } } @@ -663,22 +670,11 @@ impl Handler for Actor { } } -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: Commit, _ctx: &mut Context) { - log_error!(self.handle_commit(msg.order_id)) - } -} - impl Message for TakeOffer { type Result = (); } -impl Message for ProposeSettlement { - type Result = (); -} - -impl Message for ProposeRollOver { +impl Message for CfdAction { type Result = (); } @@ -695,8 +691,4 @@ impl Message for CfdRollOverCompleted { type Result = (); } -impl Message for Commit { - type Result = (); -} - impl xtra::Actor for Actor {}