From 27884db0304e057d5b7b8910811f5ca8842229f3 Mon Sep 17 00:00:00 2001 From: rishflab Date: Thu, 16 Dec 2021 15:06:18 +1100 Subject: [PATCH 1/2] Hide some internal actor details from rocket state There are still details being leaked but this is start. --- daemon/src/lib.rs | 150 +++++++++++++++++++++++++-- daemon/src/maker.rs | 17 ++- daemon/src/routes_maker.rs | 62 +++++------ daemon/src/routes_taker.rs | 56 ++++------ daemon/src/taker.rs | 16 +-- daemon/tests/harness/mocks/wallet.rs | 7 ++ 6 files changed, 207 insertions(+), 101 deletions(-) diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 0cf6212..867d49a 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -1,15 +1,22 @@ #![cfg_attr(not(test), warn(clippy::unwrap_used))] #![warn(clippy::disallowed_method)] +use crate::bitcoin::Txid; use crate::maker_cfd::FromTaker; use crate::maker_cfd::TakerConnected; use crate::model::cfd::Cfd; use crate::model::cfd::Order; +use crate::model::cfd::OrderId; use crate::model::cfd::UpdateCfdProposals; use crate::model::Identity; +use crate::model::Price; +use crate::model::Usd; use crate::oracle::Attestation; use crate::tokio_ext::FutureExt; use address_map::Stopping; use anyhow::Result; +use bdk::bitcoin; +use bdk::bitcoin::Amount; +use bdk::FeeRate; use connection::ConnectionStatus; use futures::future::RemoteHandle; use maia::secp256k1_zkp::schnorrsig; @@ -106,8 +113,9 @@ impl Tasks { pub struct MakerActorSystem { pub cfd_actor_addr: Address>, + wallet_actor_addr: Address, pub inc_conn_addr: Address, - pub tasks: Tasks, + _tasks: Tasks, } impl MakerActorSystem @@ -129,7 +137,8 @@ where + xtra::Handler, W: xtra::Handler + xtra::Handler - + xtra::Handler, + + xtra::Handler + + xtra::Handler, { #[allow(clippy::too_many_arguments)] pub async fn new( @@ -159,7 +168,7 @@ where let (cfd_actor_addr, cfd_actor_fut) = maker_cfd::Actor::new( db, - wallet_addr, + wallet_addr.clone(), settlement_interval, oracle_pk, projection_actor, @@ -195,17 +204,101 @@ where Ok(Self { cfd_actor_addr, + wallet_actor_addr: wallet_addr, inc_conn_addr, - tasks, + _tasks: tasks, }) } + + pub async fn new_order( + &self, + price: Price, + min_quantity: Usd, + max_quantity: Usd, + fee_rate: Option, + ) -> Result<()> { + self.cfd_actor_addr + .send(maker_cfd::NewOrder { + price, + min_quantity, + max_quantity, + fee_rate: fee_rate.unwrap_or(1), + }) + .await??; + + Ok(()) + } + + pub async fn accept_order(&self, order_id: OrderId) -> Result<()> { + self.cfd_actor_addr + .send(maker_cfd::AcceptOrder { order_id }) + .await??; + Ok(()) + } + + pub async fn reject_order(&self, order_id: OrderId) -> Result<()> { + self.cfd_actor_addr + .send(maker_cfd::RejectOrder { order_id }) + .await??; + Ok(()) + } + + pub async fn accept_settlement(&self, order_id: OrderId) -> Result<()> { + self.cfd_actor_addr + .send(maker_cfd::AcceptSettlement { order_id }) + .await??; + Ok(()) + } + + pub async fn reject_settlement(&self, order_id: OrderId) -> Result<()> { + self.cfd_actor_addr + .send(maker_cfd::RejectSettlement { order_id }) + .await??; + Ok(()) + } + + pub async fn accept_rollover(&self, order_id: OrderId) -> Result<()> { + self.cfd_actor_addr + .send(maker_cfd::AcceptRollOver { order_id }) + .await??; + Ok(()) + } + + pub async fn reject_rollover(&self, order_id: OrderId) -> Result<()> { + self.cfd_actor_addr + .send(maker_cfd::RejectRollOver { order_id }) + .await??; + Ok(()) + } + pub async fn commit(&self, order_id: OrderId) -> Result<()> { + self.cfd_actor_addr + .send(maker_cfd::Commit { order_id }) + .await??; + Ok(()) + } + + pub async fn withdraw( + &self, + amount: Option, + address: bitcoin::Address, + fee: f32, + ) -> Result { + self.wallet_actor_addr + .send(wallet::Withdraw { + amount, + address, + fee: Some(bdk::FeeRate::from_sat_per_vb(fee)), + }) + .await? + } } pub struct TakerActorSystem { pub cfd_actor_addr: Address>, pub connection_actor_addr: Address, pub maker_online_status_feed_receiver: watch::Receiver, - pub tasks: Tasks, + wallet_actor_addr: Address, + _tasks: Tasks, } impl TakerActorSystem @@ -219,12 +312,13 @@ where + xtra::Handler, W: xtra::Handler + xtra::Handler - + xtra::Handler, + + xtra::Handler + + xtra::Handler, { #[allow(clippy::too_many_arguments)] pub async fn new( db: SqlitePool, - wallet_addr: Address, + wallet_actor_addr: Address, oracle_pk: schnorrsig::PublicKey, identity_sk: x25519_dalek::StaticSecret, oracle_constructor: impl FnOnce(Box>) -> FO, @@ -250,7 +344,7 @@ where let (connection_actor_addr, connection_actor_ctx) = xtra::Context::new(None); let (cfd_actor_addr, cfd_actor_fut) = taker_cfd::Actor::new( db.clone(), - wallet_addr, + wallet_actor_addr.clone(), oracle_pk, projection_actor.clone(), connection_actor_addr.clone(), @@ -302,7 +396,45 @@ where cfd_actor_addr, connection_actor_addr, maker_online_status_feed_receiver, - tasks, + wallet_actor_addr, + _tasks: tasks, }) } + + pub async fn take_offer(&self, order_id: OrderId, quantity: Usd) -> Result<()> { + self.cfd_actor_addr + .send(taker_cfd::TakeOffer { order_id, quantity }) + .await??; + Ok(()) + } + + pub async fn commit(&self, order_id: OrderId) -> Result<()> { + self.cfd_actor_addr + .send(taker_cfd::Commit { order_id }) + .await? + } + + pub async fn propose_settlement(&self, order_id: OrderId, current_price: Price) -> Result<()> { + self.cfd_actor_addr + .send(taker_cfd::ProposeSettlement { + order_id, + current_price, + }) + .await? + } + + pub async fn withdraw( + &self, + amount: Option, + address: bitcoin::Address, + fee_rate: FeeRate, + ) -> Result { + self.wallet_actor_addr + .send(wallet::Withdraw { + amount, + address, + fee: Some(fee_rate), + }) + .await? + } } diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 2954fab..235e59c 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -30,7 +30,6 @@ use sqlx::SqlitePool; use std::net::SocketAddr; use std::path::PathBuf; use std::str::FromStr; -use std::task::Poll; use tracing_subscriber::filter::LevelFilter; use xtra::Actor; @@ -241,11 +240,7 @@ async fn main() -> Result<()> { let (projection_actor, projection_context) = xtra::Context::new(None); - let MakerActorSystem { - cfd_actor_addr, - inc_conn_addr: incoming_connection_addr, - tasks: _tasks, - } = MakerActorSystem::new( + let mut maker = MakerActorSystem::new( db.clone(), wallet.clone(), oracle, @@ -294,15 +289,19 @@ async fn main() -> Result<()> { Poll::Ready(Some(message)) }); - tasks.add(incoming_connection_addr.attach_stream(listener_stream)); + tasks.add( + maker_actor_system + .inc_conn_addr + .clone() + .attach_stream(listener_stream), + ); rocket::custom(figment) .manage(projection_feeds) - .manage(cfd_actor_addr) .manage(wallet_feed_receiver) + .manage(maker) .manage(auth_password) .manage(bitcoin_network) - .manage(wallet) .mount( "/api", rocket::routes![ diff --git a/daemon/src/routes_maker.rs b/daemon/src/routes_maker.rs index 7565cf4..3039596 100644 --- a/daemon/src/routes_maker.rs +++ b/daemon/src/routes_maker.rs @@ -1,7 +1,6 @@ use anyhow::Result; use bdk::bitcoin::Network; use daemon::auth::Authenticated; -use daemon::maker_cfd; use daemon::maker_inc_connections; use daemon::model::cfd::OrderId; use daemon::model::Price; @@ -16,6 +15,7 @@ use daemon::projection::Identity; use daemon::routes::EmbeddedFileExt; use daemon::to_sse_event::ToSseEvent; use daemon::wallet; +use daemon::MakerActorSystem; use http_api_problem::HttpApiProblem; use http_api_problem::StatusCode; use rocket::http::ContentType; @@ -32,11 +32,9 @@ use std::borrow::Cow; use std::path::PathBuf; use tokio::select; use tokio::sync::watch; -use xtra::prelude::*; -pub type Maker = xtra::Address< - maker_cfd::Actor, ->; +pub type Maker = + MakerActorSystem; #[allow(clippy::too_many_arguments)] #[rocket::get("/feed")] @@ -110,18 +108,17 @@ pub struct CfdNewOrderRequest { #[rocket::post("/order/sell", data = "")] pub async fn post_sell_order( order: Json, - cfd_actor: &State, + maker: &State, _auth: Authenticated, ) -> Result, HttpApiProblem> { - cfd_actor - .send(maker_cfd::NewOrder { - price: order.price, - min_quantity: order.min_quantity, - max_quantity: order.max_quantity, - fee_rate: order.fee_rate.unwrap_or(1), - }) + maker + .new_order( + order.price, + order.min_quantity, + order.max_quantity, + order.fee_rate, + ) .await - .unwrap_or_else(|e| anyhow::bail!(e)) .map_err(|e| { HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR) .title("Posting offer failed") @@ -152,19 +149,17 @@ pub struct PromptAuthentication { pub async fn post_cfd_action( id: OrderId, action: CfdAction, - cfd_actor: &State, + maker: &State, _auth: Authenticated, ) -> Result, HttpApiProblem> { - use maker_cfd::*; - let result = match action { - CfdAction::AcceptOrder => cfd_actor.send(AcceptOrder { order_id: id }).await, - CfdAction::RejectOrder => cfd_actor.send(RejectOrder { order_id: id }).await, - CfdAction::AcceptSettlement => cfd_actor.send(AcceptSettlement { order_id: id }).await, - CfdAction::RejectSettlement => cfd_actor.send(RejectSettlement { order_id: id }).await, - CfdAction::AcceptRollOver => cfd_actor.send(AcceptRollOver { order_id: id }).await, - CfdAction::RejectRollOver => cfd_actor.send(RejectRollOver { order_id: id }).await, - CfdAction::Commit => cfd_actor.send(Commit { order_id: id }).await, + CfdAction::AcceptOrder => maker.accept_order(id).await, + CfdAction::RejectOrder => maker.reject_order(id).await, + CfdAction::AcceptSettlement => maker.accept_settlement(id).await, + CfdAction::RejectSettlement => maker.reject_settlement(id).await, + CfdAction::AcceptRollOver => maker.accept_rollover(id).await, + CfdAction::RejectRollOver => maker.reject_rollover(id).await, + CfdAction::Commit => maker.commit(id).await, CfdAction::Settle => { let msg = "Collaborative settlement can only be triggered by taker"; tracing::error!(msg); @@ -172,7 +167,7 @@ pub async fn post_cfd_action( } }; - result.unwrap_or_else(|e| anyhow::bail!(e)).map_err(|e| { + result.map_err(|e| { HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR) .title(action.to_string() + " failed") .detail(e.to_string()) @@ -211,29 +206,24 @@ pub struct WithdrawRequest { #[rocket::post("/withdraw", data = "")] pub async fn post_withdraw_request( withdraw_request: Json, - wallet: &State>, + maker: &State, network: &State, _auth: Authenticated, ) -> Result { let amount = (withdraw_request.amount != bdk::bitcoin::Amount::ZERO).then(|| withdraw_request.amount); - let txid = wallet - .send(wallet::Withdraw { + let txid = maker + .withdraw( amount, - address: withdraw_request.address.clone(), - fee: Some(bdk::FeeRate::from_sat_per_vb(withdraw_request.fee)), - }) + withdraw_request.address.clone(), + withdraw_request.fee, + ) .await .map_err(|e| { HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR) .title("Could not proceed with withdraw request") .detail(e.to_string()) - })? - .map_err(|e| { - HttpApiProblem::new(StatusCode::BAD_REQUEST) - .title("Could not withdraw funds") - .detail(e.to_string()) })?; let url = match network.inner() { diff --git a/daemon/src/routes_taker.rs b/daemon/src/routes_taker.rs index 910b9f9..6a167ec 100644 --- a/daemon/src/routes_taker.rs +++ b/daemon/src/routes_taker.rs @@ -13,10 +13,10 @@ use daemon::oracle; use daemon::projection::CfdAction; use daemon::projection::Feeds; use daemon::routes::EmbeddedFileExt; -use daemon::taker_cfd; use daemon::to_sse_event::ToSseEvent; use daemon::tx; use daemon::wallet; +use daemon::TakerActorSystem; use http_api_problem::HttpApiProblem; use http_api_problem::StatusCode; use rocket::http::ContentType; @@ -33,9 +33,8 @@ use std::borrow::Cow; use std::path::PathBuf; use tokio::select; use tokio::sync::watch; -use xtra::prelude::*; -type Taker = xtra::Address>; +type Taker = TakerActorSystem; #[rocket::get("/feed")] pub async fn feed( @@ -102,15 +101,11 @@ pub struct CfdOrderRequest { #[rocket::post("/cfd/order", data = "")] pub async fn post_order_request( cfd_order_request: Json, - cfd_actor: &State, + taker: &State, ) -> Result, HttpApiProblem> { - cfd_actor - .send(taker_cfd::TakeOffer { - order_id: cfd_order_request.order_id, - quantity: cfd_order_request.quantity, - }) + taker + .take_offer(cfd_order_request.order_id, cfd_order_request.quantity) .await - .unwrap_or_else(|e| anyhow::bail!(e.to_string())) .map_err(|e| { HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR) .title("Order request failed") @@ -124,7 +119,7 @@ pub async fn post_order_request( pub async fn post_cfd_action( id: OrderId, action: CfdAction, - cfd_actor: &State, + taker: &State, feeds: &State, ) -> Result, HttpApiProblem> { let result = match action { @@ -137,7 +132,7 @@ pub async fn post_cfd_action( return Err(HttpApiProblem::new(StatusCode::BAD_REQUEST) .detail(format!("taker cannot invoke action {}", action))); } - CfdAction::Commit => cfd_actor.send(taker_cfd::Commit { order_id: id }).await, + CfdAction::Commit => taker.commit(id).await, CfdAction::Settle => { let quote: bitmex_price_feed::Quote = match feeds.quote.borrow().as_ref() { Some(quote) => quote.clone().into(), @@ -149,22 +144,16 @@ pub async fn post_cfd_action( }; let current_price = quote.for_taker(); - cfd_actor - .send(taker_cfd::ProposeSettlement { - order_id: id, - current_price, - }) - .await + + taker.propose_settlement(id, current_price).await } }; - result - .unwrap_or_else(|e| anyhow::bail!(e.to_string())) - .map_err(|e| { - HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR) - .title(action.to_string() + " failed") - .detail(e.to_string()) - })?; + result.map_err(|e| { + HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR) + .title(action.to_string() + " failed") + .detail(e.to_string()) + })?; Ok(status::Accepted(None)) } @@ -228,28 +217,23 @@ pub struct WithdrawRequest { #[rocket::post("/withdraw", data = "")] pub async fn post_withdraw_request( withdraw_request: Json, - wallet: &State>, + taker: &State, network: &State, ) -> Result { let amount = (withdraw_request.amount != bdk::bitcoin::Amount::ZERO).then(|| withdraw_request.amount); - let txid = wallet - .send(wallet::Withdraw { + let txid = taker + .withdraw( amount, - address: withdraw_request.address.clone(), - fee: Some(bdk::FeeRate::from_sat_per_vb(withdraw_request.fee)), - }) + withdraw_request.address.clone(), + bdk::FeeRate::from_sat_per_vb(withdraw_request.fee), + ) .await .map_err(|e| { HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR) .title("Could not proceed with withdraw request") .detail(e.to_string()) - })? - .map_err(|e| { - HttpApiProblem::new(StatusCode::BAD_REQUEST) - .title("Could not withdraw funds") - .detail(e.to_string()) })?; Ok(tx::to_mempool_url(txid, *network.inner())) diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 966c51b..2ae66e5 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -233,12 +233,7 @@ async fn main() -> Result<()> { let (projection_actor, projection_context) = xtra::Context::new(None); - let TakerActorSystem { - cfd_actor_addr, - connection_actor_addr, - maker_online_status_feed_receiver, - tasks: _tasks, - } = TakerActorSystem::new( + let taker = TakerActorSystem::new( db.clone(), wallet.clone(), oracle, @@ -273,19 +268,18 @@ async fn main() -> Result<()> { let possible_addresses = resolve_maker_addresses(&opts.maker).await?; tasks.add(connect( - maker_online_status_feed_receiver.clone(), - connection_actor_addr, + taker.maker_online_status_feed_receiver.clone(), + taker.connection_actor_addr.clone(), maker_identity, possible_addresses, )); let rocket = rocket::custom(figment) .manage(projection_feeds) - .manage(cfd_actor_addr) .manage(wallet_feed_receiver) .manage(bitcoin_network) - .manage(wallet) - .manage(maker_online_status_feed_receiver) + .manage(taker.maker_online_status_feed_receiver.clone()) + .manage(taker) .mount( "/api", rocket::routes![ diff --git a/daemon/tests/harness/mocks/wallet.rs b/daemon/tests/harness/mocks/wallet.rs index b2c9671..8dbbb7f 100644 --- a/daemon/tests/harness/mocks/wallet.rs +++ b/daemon/tests/harness/mocks/wallet.rs @@ -38,6 +38,9 @@ impl WalletActor { async fn handle(&mut self, msg: wallet::TryBroadcastTransaction) -> Result { self.mock.lock().await.broadcast(msg) } + async fn handle(&mut self, msg: wallet::Withdraw) -> Result { + self.mock.lock().await.withdraw(msg) + } } #[automock] @@ -53,6 +56,10 @@ pub trait Wallet { fn broadcast(&mut self, _msg: wallet::TryBroadcastTransaction) -> Result { unreachable!("mockall will reimplement this method") } + + fn withdraw(&mut self, _msg: wallet::Withdraw) -> Result { + unreachable!("mockall will reimplement this method") + } } #[allow(dead_code)] From b4a27cdb4a8383de4c07ef2a02999d35b25b39b6 Mon Sep 17 00:00:00 2001 From: rishflab Date: Fri, 17 Dec 2021 16:02:08 +1100 Subject: [PATCH 2/2] Move maker tcp connection listener task to maker actor system --- daemon/src/lib.rs | 23 +++++++++++++++++++++-- daemon/src/maker.rs | 18 +----------------- daemon/tests/harness/mod.rs | 16 ++-------------- 3 files changed, 24 insertions(+), 33 deletions(-) diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 867d49a..483f65c 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -23,7 +23,9 @@ use maia::secp256k1_zkp::schnorrsig; use maker_cfd::TakerDisconnected; use sqlx::SqlitePool; use std::future::Future; +use std::task::Poll; use std::time::Duration; +use tokio::net::TcpListener; use tokio::sync::watch; use xtra::message_channel::MessageChannel; use xtra::message_channel::StrongMessageChannel; @@ -114,7 +116,7 @@ impl Tasks { pub struct MakerActorSystem { pub cfd_actor_addr: Address>, wallet_actor_addr: Address, - pub inc_conn_addr: Address, + inc_conn_addr: Address, _tasks: Tasks, } @@ -134,7 +136,8 @@ where + xtra::Handler + xtra::Handler> + xtra::Handler> - + xtra::Handler, + + xtra::Handler + + xtra::Handler, W: xtra::Handler + xtra::Handler + xtra::Handler @@ -210,6 +213,22 @@ where }) } + pub fn listen_on(&mut self, listener: TcpListener) { + let listener_stream = futures::stream::poll_fn(move |ctx| { + let message = match futures::ready!(listener.poll_accept(ctx)) { + Ok((stream, address)) => { + maker_inc_connections::ListenerMessage::NewConnection { stream, address } + } + Err(e) => maker_inc_connections::ListenerMessage::Error { source: e }, + }; + + Poll::Ready(Some(message)) + }); + + self._tasks + .add(self.inc_conn_addr.clone().attach_stream(listener_stream)); + } + pub async fn new_order( &self, price: Price, diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 235e59c..2007b2d 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -278,23 +278,7 @@ async fn main() -> Result<()> { projection::Actor::new(db.clone(), Role::Maker, bitcoin_network).await?; tasks.add(projection_context.run(proj_actor)); - let listener_stream = futures::stream::poll_fn(move |ctx| { - let message = match futures::ready!(listener.poll_accept(ctx)) { - Ok((stream, address)) => { - maker_inc_connections::ListenerMessage::NewConnection { stream, address } - } - Err(e) => maker_inc_connections::ListenerMessage::Error { source: e }, - }; - - Poll::Ready(Some(message)) - }); - - tasks.add( - maker_actor_system - .inc_conn_addr - .clone() - .attach_stream(listener_stream), - ); + maker.listen_on(listener); rocket::custom(figment) .manage(projection_feeds) diff --git a/daemon/tests/harness/mod.rs b/daemon/tests/harness/mod.rs index d490121..cea251c 100644 --- a/daemon/tests/harness/mod.rs +++ b/daemon/tests/harness/mod.rs @@ -29,7 +29,6 @@ use rust_decimal_macros::dec; use sqlx::SqlitePool; use std::net::SocketAddr; use std::str::FromStr; -use std::task::Poll; use std::time::Duration; use tokio::net::TcpListener; use tokio::sync::watch; @@ -158,7 +157,7 @@ impl Maker { // system startup sends sync messages, mock them mocks.mock_sync_handlers().await; - let maker = daemon::MakerActorSystem::new( + let mut maker = daemon::MakerActorSystem::new( db.clone(), wallet_addr, config.oracle_pk, @@ -187,18 +186,7 @@ impl Maker { let address = listener.local_addr().unwrap(); - let listener_stream = futures::stream::poll_fn(move |ctx| { - let message = match futures::ready!(listener.poll_accept(ctx)) { - Ok((stream, address)) => { - maker_inc_connections::ListenerMessage::NewConnection { stream, address } - } - Err(e) => maker_inc_connections::ListenerMessage::Error { source: e }, - }; - - Poll::Ready(Some(message)) - }); - - tasks.add(maker.inc_conn_addr.clone().attach_stream(listener_stream)); + maker.listen_on(listener); Self { system: maker,