Browse Source

Merge #333

333: Use `MessageChannel` in `maker_routes` r=luckysori a=luckysori

More work towards #231.

Co-authored-by: Lucas Soriano del Pino <l.soriano.del.pino@gmail.com>
refactor/no-log-handler
bors[bot] 3 years ago
committed by GitHub
parent
commit
9423b14a0b
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      daemon/src/maker.rs
  2. 117
      daemon/src/maker_cfd.rs
  3. 51
      daemon/src/routes_maker.rs

7
daemon/src/maker.rs

@ -288,8 +288,13 @@ async fn main() -> Result<()> {
tokio::spawn(maker_inc_connections_address.attach_stream(listener_stream)); tokio::spawn(maker_inc_connections_address.attach_stream(listener_stream));
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
let cfd_action_channel =
MessageChannel::<maker_cfd::CfdAction>::clone_channel(&cfd_maker_actor_inbox);
let new_order_channel =
MessageChannel::<maker_cfd::NewOrder>::clone_channel(&cfd_maker_actor_inbox);
Ok(rocket Ok(rocket
.manage(cfd_maker_actor_inbox) .manage(cfd_action_channel)
.manage(new_order_channel)
.manage(cfd_feed_receiver)) .manage(cfd_feed_receiver))
}, },
)) ))

117
daemon/src/maker_cfd.rs

@ -22,32 +22,14 @@ use tokio::sync::watch;
use xtra::prelude::*; use xtra::prelude::*;
use xtra::KeepRunning; use xtra::KeepRunning;
pub struct AcceptOrder { pub enum CfdAction {
pub order_id: OrderId, AcceptOrder { order_id: OrderId },
} RejectOrder { order_id: OrderId },
AcceptSettlement { order_id: OrderId },
pub struct RejectOrder { RejectSettlement { order_id: OrderId },
pub order_id: OrderId, AcceptRollOver { order_id: OrderId },
} RejectRollOver { order_id: OrderId },
Commit { order_id: OrderId },
pub struct Commit {
pub order_id: OrderId,
}
pub struct AcceptSettlement {
pub order_id: OrderId,
}
pub struct RejectSettlement {
pub order_id: OrderId,
}
pub struct AcceptRollOver {
pub order_id: OrderId,
}
pub struct RejectRollOver {
pub order_id: OrderId,
} }
pub struct NewOrder { pub struct NewOrder {
@ -887,51 +869,20 @@ impl Actor {
} }
#[async_trait] #[async_trait]
impl Handler<AcceptOrder> for Actor { impl Handler<CfdAction> for Actor {
async fn handle(&mut self, msg: AcceptOrder, ctx: &mut Context<Self>) { async fn handle(&mut self, msg: CfdAction, ctx: &mut Context<Self>) {
log_error!(self.handle_accept_order(msg.order_id, ctx)) use CfdAction::*;
} if let Err(e) = match msg {
} AcceptOrder { order_id } => self.handle_accept_order(order_id, ctx).await,
RejectOrder { order_id } => self.handle_reject_order(order_id).await,
#[async_trait] AcceptSettlement { order_id } => self.handle_accept_settlement(order_id).await,
impl Handler<RejectOrder> for Actor { RejectSettlement { order_id } => self.handle_reject_settlement(order_id).await,
async fn handle(&mut self, msg: RejectOrder, _ctx: &mut Context<Self>) { AcceptRollOver { order_id } => self.handle_accept_roll_over(order_id, ctx).await,
log_error!(self.handle_reject_order(msg.order_id)) RejectRollOver { order_id } => self.handle_reject_roll_over(order_id).await,
} Commit { order_id } => self.handle_commit(order_id).await,
} } {
tracing::error!("Message handler failed: {:#}", e);
#[async_trait]
impl Handler<AcceptSettlement> for Actor {
async fn handle(&mut self, msg: AcceptSettlement, _ctx: &mut Context<Self>) {
log_error!(self.handle_accept_settlement(msg.order_id))
}
}
#[async_trait]
impl Handler<RejectSettlement> for Actor {
async fn handle(&mut self, msg: RejectSettlement, _ctx: &mut Context<Self>) {
log_error!(self.handle_reject_settlement(msg.order_id))
}
}
#[async_trait]
impl Handler<AcceptRollOver> for Actor {
async fn handle(&mut self, msg: AcceptRollOver, ctx: &mut Context<Self>) {
log_error!(self.handle_accept_roll_over(msg.order_id, ctx))
}
} }
#[async_trait]
impl Handler<RejectRollOver> for Actor {
async fn handle(&mut self, msg: RejectRollOver, _ctx: &mut Context<Self>) {
log_error!(self.handle_reject_roll_over(msg.order_id))
}
}
#[async_trait]
impl Handler<Commit> for Actor {
async fn handle(&mut self, msg: Commit, _ctx: &mut Context<Self>) {
log_error!(self.handle_commit(msg.order_id))
} }
} }
@ -1062,31 +1013,7 @@ impl Message for CfdRollOverCompleted {
type Result = (); type Result = ();
} }
impl Message for AcceptOrder { impl Message for CfdAction {
type Result = ();
}
impl Message for RejectOrder {
type Result = ();
}
impl Message for Commit {
type Result = ();
}
impl Message for AcceptSettlement {
type Result = ();
}
impl Message for RejectSettlement {
type Result = ();
}
impl Message for AcceptRollOver {
type Result = ();
}
impl Message for RejectRollOver {
type Result = (); type Result = ();
} }

51
daemon/src/routes_maker.rs

@ -17,7 +17,7 @@ use std::borrow::Cow;
use std::path::PathBuf; use std::path::PathBuf;
use tokio::select; use tokio::select;
use tokio::sync::watch; use tokio::sync::watch;
use xtra::Address; use xtra::prelude::MessageChannel;
#[rocket::get("/feed")] #[rocket::get("/feed")]
pub async fn maker_feed( pub async fn maker_feed(
@ -109,18 +109,17 @@ pub struct CfdNewOrderRequest {
} }
#[rocket::post("/order/sell", data = "<order>")] #[rocket::post("/order/sell", data = "<order>")]
pub async fn post_sell_order( pub fn post_sell_order(
order: Json<CfdNewOrderRequest>, order: Json<CfdNewOrderRequest>,
cfd_actor_address: &State<Address<maker_cfd::Actor>>, new_order_channel: &State<Box<dyn MessageChannel<maker_cfd::NewOrder>>>,
_auth: Authenticated, _auth: Authenticated,
) -> Result<status::Accepted<()>, Status> { ) -> Result<status::Accepted<()>, Status> {
cfd_actor_address new_order_channel
.do_send_async(maker_cfd::NewOrder { .do_send(maker_cfd::NewOrder {
price: order.price, price: order.price,
min_quantity: order.min_quantity, min_quantity: order.min_quantity,
max_quantity: order.max_quantity, max_quantity: order.max_quantity,
}) })
.await
.map_err(|_| Status::new(500))?; .map_err(|_| Status::new(500))?;
Ok(status::Accepted(None)) Ok(status::Accepted(None))
@ -144,54 +143,48 @@ pub struct PromptAuthentication {
} }
#[rocket::post("/cfd/<id>/<action>")] #[rocket::post("/cfd/<id>/<action>")]
pub async fn post_cfd_action( pub fn post_cfd_action(
id: OrderId, id: OrderId,
action: CfdAction, action: CfdAction,
cfd_actor_address: &State<Address<maker_cfd::Actor>>, cfd_action_channel: &State<Box<dyn MessageChannel<maker_cfd::CfdAction>>>,
_auth: Authenticated, _auth: Authenticated,
) -> Result<status::Accepted<()>, status::BadRequest<String>> { ) -> Result<status::Accepted<()>, status::BadRequest<String>> {
use maker_cfd::CfdAction::*;
match action { match action {
CfdAction::AcceptOrder => { CfdAction::AcceptOrder => {
cfd_actor_address cfd_action_channel
.do_send_async(maker_cfd::AcceptOrder { order_id: id }) .do_send(AcceptOrder { order_id: id })
.await
.expect("actor to always be available"); .expect("actor to always be available");
} }
CfdAction::RejectOrder => { CfdAction::RejectOrder => {
cfd_actor_address cfd_action_channel
.do_send_async(maker_cfd::RejectOrder { order_id: id }) .do_send(RejectOrder { order_id: id })
.await
.expect("actor to always be available"); .expect("actor to always be available");
} }
CfdAction::AcceptSettlement => { CfdAction::AcceptSettlement => {
cfd_actor_address cfd_action_channel
.do_send_async(maker_cfd::AcceptSettlement { order_id: id }) .do_send(AcceptSettlement { order_id: id })
.await
.expect("actor to always be available"); .expect("actor to always be available");
} }
CfdAction::RejectSettlement => { CfdAction::RejectSettlement => {
cfd_actor_address cfd_action_channel
.do_send_async(maker_cfd::RejectSettlement { order_id: id }) .do_send(RejectSettlement { order_id: id })
.await
.expect("actor to always be available"); .expect("actor to always be available");
} }
CfdAction::AcceptRollOver => { CfdAction::AcceptRollOver => {
cfd_actor_address cfd_action_channel
.do_send_async(maker_cfd::AcceptRollOver { order_id: id }) .do_send(AcceptRollOver { order_id: id })
.await
.expect("actor to always be available"); .expect("actor to always be available");
} }
CfdAction::RejectRollOver => { CfdAction::RejectRollOver => {
cfd_actor_address cfd_action_channel
.do_send_async(maker_cfd::RejectRollOver { order_id: id }) .do_send(RejectRollOver { order_id: id })
.await
.expect("actor to always be available"); .expect("actor to always be available");
} }
CfdAction::Commit => { CfdAction::Commit => {
cfd_actor_address cfd_action_channel
.do_send_async(maker_cfd::Commit { order_id: id }) .do_send(Commit { order_id: id })
.await
.expect("actor to always be available"); .expect("actor to always be available");
} }
CfdAction::Settle => { CfdAction::Settle => {

Loading…
Cancel
Save