Browse Source

Remove taker_cfd::CfdAction enum and use `xtra-productivity`

chore/leaner-release-process
Thomas Eizinger 3 years ago
parent
commit
673555b1b0
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 30
      daemon/src/routes_taker.rs
  2. 10
      daemon/src/taker.rs
  3. 143
      daemon/src/taker_cfd.rs

30
daemon/src/routes_taker.rs

@ -5,7 +5,7 @@ use daemon::model::{Leverage, Price, Usd, WalletInfo};
use daemon::projection::Feeds; use daemon::projection::Feeds;
use daemon::routes::EmbeddedFileExt; use daemon::routes::EmbeddedFileExt;
use daemon::to_sse_event::{CfdAction, CfdsWithAuxData, ToSseEvent}; use daemon::to_sse_event::{CfdAction, CfdsWithAuxData, ToSseEvent};
use daemon::{bitmex_price_feed, taker_cfd, wallet}; use daemon::{bitmex_price_feed, monitor, oracle, taker_cfd, wallet};
use http_api_problem::{HttpApiProblem, StatusCode}; use http_api_problem::{HttpApiProblem, StatusCode};
use rocket::http::{ContentType, Status}; use rocket::http::{ContentType, Status};
use rocket::response::stream::EventStream; use rocket::response::stream::EventStream;
@ -20,6 +20,8 @@ use tokio::select;
use tokio::sync::watch; use tokio::sync::watch;
use xtra::prelude::*; use xtra::prelude::*;
type Taker = xtra::Address<taker_cfd::Actor<oracle::Actor, monitor::Actor, wallet::Actor>>;
#[rocket::get("/feed")] #[rocket::get("/feed")]
pub async fn feed( pub async fn feed(
rx: &State<Feeds>, rx: &State<Feeds>,
@ -114,9 +116,9 @@ pub struct CfdOrderRequest {
#[rocket::post("/cfd/order", data = "<cfd_order_request>")] #[rocket::post("/cfd/order", data = "<cfd_order_request>")]
pub async fn post_order_request( pub async fn post_order_request(
cfd_order_request: Json<CfdOrderRequest>, cfd_order_request: Json<CfdOrderRequest>,
take_offer_channel: &State<Box<dyn MessageChannel<taker_cfd::TakeOffer>>>, cfd_actor: &State<Taker>,
) -> Result<status::Accepted<()>, HttpApiProblem> { ) -> Result<status::Accepted<()>, HttpApiProblem> {
take_offer_channel cfd_actor
.send(taker_cfd::TakeOffer { .send(taker_cfd::TakeOffer {
order_id: cfd_order_request.order_id, order_id: cfd_order_request.order_id,
quantity: cfd_order_request.quantity, quantity: cfd_order_request.quantity,
@ -136,10 +138,9 @@ pub async fn post_order_request(
pub async fn post_cfd_action( pub async fn post_cfd_action(
id: OrderId, id: OrderId,
action: CfdAction, action: CfdAction,
cfd_action_channel: &State<Box<dyn MessageChannel<taker_cfd::CfdAction>>>, cfd_actor: &State<Taker>,
feeds: &State<Feeds>, feeds: &State<Feeds>,
) -> Result<status::Accepted<()>, HttpApiProblem> { ) -> Result<status::Accepted<()>, HttpApiProblem> {
use taker_cfd::CfdAction::*;
let result = match action { let result = match action {
CfdAction::AcceptOrder CfdAction::AcceptOrder
| CfdAction::RejectOrder | CfdAction::RejectOrder
@ -150,20 +151,25 @@ pub async fn post_cfd_action(
return Err(HttpApiProblem::new(StatusCode::BAD_REQUEST) return Err(HttpApiProblem::new(StatusCode::BAD_REQUEST)
.detail(format!("taker cannot invoke action {}", action))); .detail(format!("taker cannot invoke action {}", action)));
} }
CfdAction::Commit => cfd_action_channel.send(Commit { order_id: id }), CfdAction::Commit => cfd_actor.send(taker_cfd::Commit { order_id: id }).await,
CfdAction::Settle => { CfdAction::Settle => {
let quote: bitmex_price_feed::Quote = feeds.quote.borrow().clone().into(); let quote: bitmex_price_feed::Quote = feeds.quote.borrow().clone().into();
let current_price = quote.for_taker(); let current_price = quote.for_taker();
cfd_action_channel.send(ProposeSettlement { cfd_actor
order_id: id, .send(taker_cfd::ProposeSettlement {
current_price, order_id: id,
}) current_price,
})
.await
}
CfdAction::RollOver => {
cfd_actor
.send(taker_cfd::ProposeRollOver { order_id: id })
.await
} }
CfdAction::RollOver => cfd_action_channel.send(ProposeRollOver { order_id: id }),
}; };
result result
.await
.unwrap_or_else(|e| anyhow::bail!(e.to_string())) .unwrap_or_else(|e| anyhow::bail!(e.to_string()))
.map_err(|e| { .map_err(|e| {
HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR) HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR)

10
daemon/src/taker.rs

@ -9,8 +9,8 @@ use daemon::model::WalletInfo;
use daemon::seed::Seed; use daemon::seed::Seed;
use daemon::tokio_ext::FutureExt; use daemon::tokio_ext::FutureExt;
use daemon::{ use daemon::{
bitmex_price_feed, db, housekeeping, logger, monitor, oracle, projection, taker_cfd, wallet, bitmex_price_feed, db, housekeeping, logger, monitor, oracle, projection, wallet, wallet_sync,
wallet_sync, TakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, SETTLEMENT_INTERVAL, TakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, SETTLEMENT_INTERVAL,
}; };
use sqlx::sqlite::SqliteConnectOptions; use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool; use sqlx::SqlitePool;
@ -19,7 +19,6 @@ use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use tokio::sync::watch; use tokio::sync::watch;
use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::filter::LevelFilter;
use xtra::prelude::MessageChannel;
use xtra::Actor; use xtra::Actor;
mod routes_taker; mod routes_taker;
@ -269,13 +268,10 @@ async fn main() -> Result<()> {
)); ));
tasks.add(wallet_sync::new(wallet.clone(), wallet_feed_sender)); tasks.add(wallet_sync::new(wallet.clone(), wallet_feed_sender));
let take_offer_channel = MessageChannel::<taker_cfd::TakeOffer>::clone_channel(&cfd_actor_addr);
let cfd_action_channel = MessageChannel::<taker_cfd::CfdAction>::clone_channel(&cfd_actor_addr);
let rocket = rocket::custom(figment) let rocket = rocket::custom(figment)
.manage(projection_feeds) .manage(projection_feeds)
.manage(take_offer_channel) .manage(cfd_actor_addr)
.manage(cfd_action_channel)
.manage(wallet_feed_receiver) .manage(wallet_feed_receiver)
.manage(bitcoin_network) .manage(bitcoin_network)
.manage(wallet) .manage(wallet)

143
daemon/src/taker_cfd.rs

@ -23,23 +23,24 @@ use std::collections::hash_map::Entry;
use std::collections::HashMap; use std::collections::HashMap;
use xtra::prelude::*; use xtra::prelude::*;
use xtra::Actor as _; use xtra::Actor as _;
use xtra_productivity::xtra_productivity;
pub struct TakeOffer { pub struct TakeOffer {
pub order_id: OrderId, pub order_id: OrderId,
pub quantity: Usd, pub quantity: Usd,
} }
pub enum CfdAction { pub struct ProposeSettlement {
ProposeSettlement { pub order_id: OrderId,
order_id: OrderId, pub current_price: Price,
current_price: Price, }
},
ProposeRollOver { pub struct ProposeRollOver {
order_id: OrderId, pub order_id: OrderId,
}, }
Commit {
order_id: OrderId, pub struct Commit {
}, pub order_id: OrderId,
} }
pub struct CfdRollOverCompleted { pub struct CfdRollOverCompleted {
@ -135,24 +136,59 @@ impl<O, M, W> Actor<O, M, W> {
} }
} }
#[xtra_productivity]
impl<O, M, W> Actor<O, M, W> impl<O, M, W> Actor<O, M, W>
where where
W: xtra::Handler<wallet::TryBroadcastTransaction> W: xtra::Handler<wallet::TryBroadcastTransaction>,
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>,
{ {
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> { async fn handle_commit(&mut self, msg: Commit) -> Result<()> {
let Commit { order_id } = msg;
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(order_id, &mut conn, &self.wallet, &self.projection_actor) cfd_actors::handle_commit(order_id, &mut conn, &self.wallet, &self.projection_actor)
.await?; .await?;
Ok(()) Ok(())
} }
async fn handle_propose_settlement( async fn handle_propose_roll_over(&mut self, msg: ProposeRollOver) -> Result<()> {
&mut self, let ProposeRollOver { order_id } = msg;
order_id: OrderId,
current_price: Price, if self.current_pending_proposals.contains_key(&order_id) {
) -> Result<()> { anyhow::bail!("An update for order id {} is already in progress", order_id)
}
let proposal = RollOverProposal {
order_id,
timestamp: Timestamp::now(),
};
self.current_pending_proposals.insert(
proposal.order_id,
UpdateCfdProposal::RollOverProposal {
proposal: proposal.clone(),
direction: SettlementKind::Outgoing,
},
);
self.send_pending_update_proposals().await?;
self.conn_actor
.send(wire::TakerToMaker::ProposeRollOver {
order_id: proposal.order_id,
timestamp: proposal.timestamp,
})
.await?;
Ok(())
}
}
#[xtra_productivity]
impl<O, M, W> Actor<O, M, W> {
async fn handle_propose_settlement(&mut self, msg: ProposeSettlement) -> Result<()> {
let ProposeSettlement {
order_id,
current_price,
} = msg;
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
@ -196,7 +232,14 @@ where
.await?; .await?;
Ok(()) Ok(())
} }
}
impl<O, M, W> Actor<O, M, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>,
{
async fn handle_settlement_rejected(&mut self, order_id: OrderId) -> Result<()> { async fn handle_settlement_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::info!(%order_id, "Settlement proposal got rejected"); tracing::info!(%order_id, "Settlement proposal got rejected");
@ -318,34 +361,6 @@ where
.await?; .await?;
Ok(()) Ok(())
} }
async fn handle_propose_roll_over(&mut self, order_id: OrderId) -> Result<()> {
if self.current_pending_proposals.contains_key(&order_id) {
anyhow::bail!("An update for order id {} is already in progress", order_id)
}
let proposal = RollOverProposal {
order_id,
timestamp: Timestamp::now(),
};
self.current_pending_proposals.insert(
proposal.order_id,
UpdateCfdProposal::RollOverProposal {
proposal: proposal.clone(),
direction: SettlementKind::Outgoing,
},
);
self.send_pending_update_proposals().await?;
self.conn_actor
.send(wire::TakerToMaker::ProposeRollOver {
order_id: proposal.order_id,
timestamp: proposal.timestamp,
})
.await?;
Ok(())
}
} }
impl<O, M, W> Actor<O, M, W> impl<O, M, W> Actor<O, M, W>
@ -662,34 +677,6 @@ where
} }
} }
#[async_trait]
impl<O: 'static, M: 'static, W: 'static> Handler<CfdAction> for Actor<O, M, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>,
{
async fn handle(&mut self, msg: CfdAction, _ctx: &mut Context<Self>) -> Result<()> {
use CfdAction::*;
if let Err(e) = match msg {
Commit { order_id } => self.handle_commit(order_id).await,
ProposeSettlement {
order_id,
current_price,
} => {
self.handle_propose_settlement(order_id, current_price)
.await
}
ProposeRollOver { order_id } => self.handle_propose_roll_over(order_id).await,
} {
tracing::error!("Message handler failed: {:#}", e);
anyhow::bail!(e)
}
Ok(())
}
}
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static, W: 'static> Handler<wire::MakerToTaker> for Actor<O, M, W> impl<O: 'static, M: 'static, W: 'static> Handler<wire::MakerToTaker> for Actor<O, M, W>
where where
@ -790,10 +777,6 @@ impl Message for TakeOffer {
type Result = Result<()>; type Result = Result<()>;
} }
impl Message for CfdAction {
type Result = Result<()>;
}
impl Message for CfdRollOverCompleted { impl Message for CfdRollOverCompleted {
type Result = (); type Result = ();
} }

Loading…
Cancel
Save