Browse Source

Hide some internal actor details from rocket state

There are still details being leaked but this is start.
update-blockstream-electrum-server-url
rishflab 3 years ago
parent
commit
27884db030
  1. 150
      daemon/src/lib.rs
  2. 17
      daemon/src/maker.rs
  3. 62
      daemon/src/routes_maker.rs
  4. 48
      daemon/src/routes_taker.rs
  5. 16
      daemon/src/taker.rs
  6. 7
      daemon/tests/harness/mocks/wallet.rs

150
daemon/src/lib.rs

@ -1,15 +1,22 @@
#![cfg_attr(not(test), warn(clippy::unwrap_used))]
#![warn(clippy::disallowed_method)]
use crate::bitcoin::Txid;
use crate::maker_cfd::FromTaker;
use crate::maker_cfd::TakerConnected;
use crate::model::cfd::Cfd;
use crate::model::cfd::Order;
use crate::model::cfd::OrderId;
use crate::model::cfd::UpdateCfdProposals;
use crate::model::Identity;
use crate::model::Price;
use crate::model::Usd;
use crate::oracle::Attestation;
use crate::tokio_ext::FutureExt;
use address_map::Stopping;
use anyhow::Result;
use bdk::bitcoin;
use bdk::bitcoin::Amount;
use bdk::FeeRate;
use connection::ConnectionStatus;
use futures::future::RemoteHandle;
use maia::secp256k1_zkp::schnorrsig;
@ -106,8 +113,9 @@ impl Tasks {
pub struct MakerActorSystem<O, M, T, W> {
pub cfd_actor_addr: Address<maker_cfd::Actor<O, M, T, W>>,
wallet_actor_addr: Address<W>,
pub inc_conn_addr: Address<T>,
pub tasks: Tasks,
_tasks: Tasks,
}
impl<O, M, T, W> MakerActorSystem<O, M, T, W>
@ -129,7 +137,8 @@ where
+ xtra::Handler<maker_cfd::RollOverProposed>,
W: xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::TryBroadcastTransaction>,
+ xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Withdraw>,
{
#[allow(clippy::too_many_arguments)]
pub async fn new<FO, FM>(
@ -159,7 +168,7 @@ where
let (cfd_actor_addr, cfd_actor_fut) = maker_cfd::Actor::new(
db,
wallet_addr,
wallet_addr.clone(),
settlement_interval,
oracle_pk,
projection_actor,
@ -195,17 +204,101 @@ where
Ok(Self {
cfd_actor_addr,
wallet_actor_addr: wallet_addr,
inc_conn_addr,
tasks,
_tasks: tasks,
})
}
pub async fn new_order(
&self,
price: Price,
min_quantity: Usd,
max_quantity: Usd,
fee_rate: Option<u32>,
) -> Result<()> {
self.cfd_actor_addr
.send(maker_cfd::NewOrder {
price,
min_quantity,
max_quantity,
fee_rate: fee_rate.unwrap_or(1),
})
.await??;
Ok(())
}
pub async fn accept_order(&self, order_id: OrderId) -> Result<()> {
self.cfd_actor_addr
.send(maker_cfd::AcceptOrder { order_id })
.await??;
Ok(())
}
pub async fn reject_order(&self, order_id: OrderId) -> Result<()> {
self.cfd_actor_addr
.send(maker_cfd::RejectOrder { order_id })
.await??;
Ok(())
}
pub async fn accept_settlement(&self, order_id: OrderId) -> Result<()> {
self.cfd_actor_addr
.send(maker_cfd::AcceptSettlement { order_id })
.await??;
Ok(())
}
pub async fn reject_settlement(&self, order_id: OrderId) -> Result<()> {
self.cfd_actor_addr
.send(maker_cfd::RejectSettlement { order_id })
.await??;
Ok(())
}
pub async fn accept_rollover(&self, order_id: OrderId) -> Result<()> {
self.cfd_actor_addr
.send(maker_cfd::AcceptRollOver { order_id })
.await??;
Ok(())
}
pub async fn reject_rollover(&self, order_id: OrderId) -> Result<()> {
self.cfd_actor_addr
.send(maker_cfd::RejectRollOver { order_id })
.await??;
Ok(())
}
pub async fn commit(&self, order_id: OrderId) -> Result<()> {
self.cfd_actor_addr
.send(maker_cfd::Commit { order_id })
.await??;
Ok(())
}
pub async fn withdraw(
&self,
amount: Option<Amount>,
address: bitcoin::Address,
fee: f32,
) -> Result<Txid> {
self.wallet_actor_addr
.send(wallet::Withdraw {
amount,
address,
fee: Some(bdk::FeeRate::from_sat_per_vb(fee)),
})
.await?
}
}
pub struct TakerActorSystem<O, M, W> {
pub cfd_actor_addr: Address<taker_cfd::Actor<O, M, W>>,
pub connection_actor_addr: Address<connection::Actor>,
pub maker_online_status_feed_receiver: watch::Receiver<ConnectionStatus>,
pub tasks: Tasks,
wallet_actor_addr: Address<W>,
_tasks: Tasks,
}
impl<O, M, W> TakerActorSystem<O, M, W>
@ -219,12 +312,13 @@ where
+ xtra::Handler<oracle::Attestation>,
W: xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::TryBroadcastTransaction>,
+ xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Withdraw>,
{
#[allow(clippy::too_many_arguments)]
pub async fn new<FM, FO>(
db: SqlitePool,
wallet_addr: Address<W>,
wallet_actor_addr: Address<W>,
oracle_pk: schnorrsig::PublicKey,
identity_sk: x25519_dalek::StaticSecret,
oracle_constructor: impl FnOnce(Box<dyn StrongMessageChannel<Attestation>>) -> FO,
@ -250,7 +344,7 @@ where
let (connection_actor_addr, connection_actor_ctx) = xtra::Context::new(None);
let (cfd_actor_addr, cfd_actor_fut) = taker_cfd::Actor::new(
db.clone(),
wallet_addr,
wallet_actor_addr.clone(),
oracle_pk,
projection_actor.clone(),
connection_actor_addr.clone(),
@ -302,7 +396,45 @@ where
cfd_actor_addr,
connection_actor_addr,
maker_online_status_feed_receiver,
tasks,
wallet_actor_addr,
_tasks: tasks,
})
}
pub async fn take_offer(&self, order_id: OrderId, quantity: Usd) -> Result<()> {
self.cfd_actor_addr
.send(taker_cfd::TakeOffer { order_id, quantity })
.await??;
Ok(())
}
pub async fn commit(&self, order_id: OrderId) -> Result<()> {
self.cfd_actor_addr
.send(taker_cfd::Commit { order_id })
.await?
}
pub async fn propose_settlement(&self, order_id: OrderId, current_price: Price) -> Result<()> {
self.cfd_actor_addr
.send(taker_cfd::ProposeSettlement {
order_id,
current_price,
})
.await?
}
pub async fn withdraw(
&self,
amount: Option<Amount>,
address: bitcoin::Address,
fee_rate: FeeRate,
) -> Result<Txid> {
self.wallet_actor_addr
.send(wallet::Withdraw {
amount,
address,
fee: Some(fee_rate),
})
.await?
}
}

17
daemon/src/maker.rs

@ -30,7 +30,6 @@ use sqlx::SqlitePool;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::task::Poll;
use tracing_subscriber::filter::LevelFilter;
use xtra::Actor;
@ -241,11 +240,7 @@ async fn main() -> Result<()> {
let (projection_actor, projection_context) = xtra::Context::new(None);
let MakerActorSystem {
cfd_actor_addr,
inc_conn_addr: incoming_connection_addr,
tasks: _tasks,
} = MakerActorSystem::new(
let mut maker = MakerActorSystem::new(
db.clone(),
wallet.clone(),
oracle,
@ -294,15 +289,19 @@ async fn main() -> Result<()> {
Poll::Ready(Some(message))
});
tasks.add(incoming_connection_addr.attach_stream(listener_stream));
tasks.add(
maker_actor_system
.inc_conn_addr
.clone()
.attach_stream(listener_stream),
);
rocket::custom(figment)
.manage(projection_feeds)
.manage(cfd_actor_addr)
.manage(wallet_feed_receiver)
.manage(maker)
.manage(auth_password)
.manage(bitcoin_network)
.manage(wallet)
.mount(
"/api",
rocket::routes![

62
daemon/src/routes_maker.rs

@ -1,7 +1,6 @@
use anyhow::Result;
use bdk::bitcoin::Network;
use daemon::auth::Authenticated;
use daemon::maker_cfd;
use daemon::maker_inc_connections;
use daemon::model::cfd::OrderId;
use daemon::model::Price;
@ -16,6 +15,7 @@ use daemon::projection::Identity;
use daemon::routes::EmbeddedFileExt;
use daemon::to_sse_event::ToSseEvent;
use daemon::wallet;
use daemon::MakerActorSystem;
use http_api_problem::HttpApiProblem;
use http_api_problem::StatusCode;
use rocket::http::ContentType;
@ -32,11 +32,9 @@ use std::borrow::Cow;
use std::path::PathBuf;
use tokio::select;
use tokio::sync::watch;
use xtra::prelude::*;
pub type Maker = xtra::Address<
maker_cfd::Actor<oracle::Actor, monitor::Actor, maker_inc_connections::Actor, wallet::Actor>,
>;
pub type Maker =
MakerActorSystem<oracle::Actor, monitor::Actor, maker_inc_connections::Actor, wallet::Actor>;
#[allow(clippy::too_many_arguments)]
#[rocket::get("/feed")]
@ -110,18 +108,17 @@ pub struct CfdNewOrderRequest {
#[rocket::post("/order/sell", data = "<order>")]
pub async fn post_sell_order(
order: Json<CfdNewOrderRequest>,
cfd_actor: &State<Maker>,
maker: &State<Maker>,
_auth: Authenticated,
) -> Result<status::Accepted<()>, HttpApiProblem> {
cfd_actor
.send(maker_cfd::NewOrder {
price: order.price,
min_quantity: order.min_quantity,
max_quantity: order.max_quantity,
fee_rate: order.fee_rate.unwrap_or(1),
})
maker
.new_order(
order.price,
order.min_quantity,
order.max_quantity,
order.fee_rate,
)
.await
.unwrap_or_else(|e| anyhow::bail!(e))
.map_err(|e| {
HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR)
.title("Posting offer failed")
@ -152,19 +149,17 @@ pub struct PromptAuthentication {
pub async fn post_cfd_action(
id: OrderId,
action: CfdAction,
cfd_actor: &State<Maker>,
maker: &State<Maker>,
_auth: Authenticated,
) -> Result<status::Accepted<()>, HttpApiProblem> {
use maker_cfd::*;
let result = match action {
CfdAction::AcceptOrder => cfd_actor.send(AcceptOrder { order_id: id }).await,
CfdAction::RejectOrder => cfd_actor.send(RejectOrder { order_id: id }).await,
CfdAction::AcceptSettlement => cfd_actor.send(AcceptSettlement { order_id: id }).await,
CfdAction::RejectSettlement => cfd_actor.send(RejectSettlement { order_id: id }).await,
CfdAction::AcceptRollOver => cfd_actor.send(AcceptRollOver { order_id: id }).await,
CfdAction::RejectRollOver => cfd_actor.send(RejectRollOver { order_id: id }).await,
CfdAction::Commit => cfd_actor.send(Commit { order_id: id }).await,
CfdAction::AcceptOrder => maker.accept_order(id).await,
CfdAction::RejectOrder => maker.reject_order(id).await,
CfdAction::AcceptSettlement => maker.accept_settlement(id).await,
CfdAction::RejectSettlement => maker.reject_settlement(id).await,
CfdAction::AcceptRollOver => maker.accept_rollover(id).await,
CfdAction::RejectRollOver => maker.reject_rollover(id).await,
CfdAction::Commit => maker.commit(id).await,
CfdAction::Settle => {
let msg = "Collaborative settlement can only be triggered by taker";
tracing::error!(msg);
@ -172,7 +167,7 @@ pub async fn post_cfd_action(
}
};
result.unwrap_or_else(|e| anyhow::bail!(e)).map_err(|e| {
result.map_err(|e| {
HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR)
.title(action.to_string() + " failed")
.detail(e.to_string())
@ -211,29 +206,24 @@ pub struct WithdrawRequest {
#[rocket::post("/withdraw", data = "<withdraw_request>")]
pub async fn post_withdraw_request(
withdraw_request: Json<WithdrawRequest>,
wallet: &State<Address<wallet::Actor>>,
maker: &State<Maker>,
network: &State<Network>,
_auth: Authenticated,
) -> Result<String, HttpApiProblem> {
let amount =
(withdraw_request.amount != bdk::bitcoin::Amount::ZERO).then(|| withdraw_request.amount);
let txid = wallet
.send(wallet::Withdraw {
let txid = maker
.withdraw(
amount,
address: withdraw_request.address.clone(),
fee: Some(bdk::FeeRate::from_sat_per_vb(withdraw_request.fee)),
})
withdraw_request.address.clone(),
withdraw_request.fee,
)
.await
.map_err(|e| {
HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR)
.title("Could not proceed with withdraw request")
.detail(e.to_string())
})?
.map_err(|e| {
HttpApiProblem::new(StatusCode::BAD_REQUEST)
.title("Could not withdraw funds")
.detail(e.to_string())
})?;
let url = match network.inner() {

48
daemon/src/routes_taker.rs

@ -13,10 +13,10 @@ use daemon::oracle;
use daemon::projection::CfdAction;
use daemon::projection::Feeds;
use daemon::routes::EmbeddedFileExt;
use daemon::taker_cfd;
use daemon::to_sse_event::ToSseEvent;
use daemon::tx;
use daemon::wallet;
use daemon::TakerActorSystem;
use http_api_problem::HttpApiProblem;
use http_api_problem::StatusCode;
use rocket::http::ContentType;
@ -33,9 +33,8 @@ use std::borrow::Cow;
use std::path::PathBuf;
use tokio::select;
use tokio::sync::watch;
use xtra::prelude::*;
type Taker = xtra::Address<taker_cfd::Actor<oracle::Actor, monitor::Actor, wallet::Actor>>;
type Taker = TakerActorSystem<oracle::Actor, monitor::Actor, wallet::Actor>;
#[rocket::get("/feed")]
pub async fn feed(
@ -102,15 +101,11 @@ pub struct CfdOrderRequest {
#[rocket::post("/cfd/order", data = "<cfd_order_request>")]
pub async fn post_order_request(
cfd_order_request: Json<CfdOrderRequest>,
cfd_actor: &State<Taker>,
taker: &State<Taker>,
) -> Result<status::Accepted<()>, HttpApiProblem> {
cfd_actor
.send(taker_cfd::TakeOffer {
order_id: cfd_order_request.order_id,
quantity: cfd_order_request.quantity,
})
taker
.take_offer(cfd_order_request.order_id, cfd_order_request.quantity)
.await
.unwrap_or_else(|e| anyhow::bail!(e.to_string()))
.map_err(|e| {
HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR)
.title("Order request failed")
@ -124,7 +119,7 @@ pub async fn post_order_request(
pub async fn post_cfd_action(
id: OrderId,
action: CfdAction,
cfd_actor: &State<Taker>,
taker: &State<Taker>,
feeds: &State<Feeds>,
) -> Result<status::Accepted<()>, HttpApiProblem> {
let result = match action {
@ -137,7 +132,7 @@ pub async fn post_cfd_action(
return Err(HttpApiProblem::new(StatusCode::BAD_REQUEST)
.detail(format!("taker cannot invoke action {}", action)));
}
CfdAction::Commit => cfd_actor.send(taker_cfd::Commit { order_id: id }).await,
CfdAction::Commit => taker.commit(id).await,
CfdAction::Settle => {
let quote: bitmex_price_feed::Quote = match feeds.quote.borrow().as_ref() {
Some(quote) => quote.clone().into(),
@ -149,18 +144,12 @@ pub async fn post_cfd_action(
};
let current_price = quote.for_taker();
cfd_actor
.send(taker_cfd::ProposeSettlement {
order_id: id,
current_price,
})
.await
taker.propose_settlement(id, current_price).await
}
};
result
.unwrap_or_else(|e| anyhow::bail!(e.to_string()))
.map_err(|e| {
result.map_err(|e| {
HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR)
.title(action.to_string() + " failed")
.detail(e.to_string())
@ -228,28 +217,23 @@ pub struct WithdrawRequest {
#[rocket::post("/withdraw", data = "<withdraw_request>")]
pub async fn post_withdraw_request(
withdraw_request: Json<WithdrawRequest>,
wallet: &State<Address<wallet::Actor>>,
taker: &State<Taker>,
network: &State<Network>,
) -> Result<String, HttpApiProblem> {
let amount =
(withdraw_request.amount != bdk::bitcoin::Amount::ZERO).then(|| withdraw_request.amount);
let txid = wallet
.send(wallet::Withdraw {
let txid = taker
.withdraw(
amount,
address: withdraw_request.address.clone(),
fee: Some(bdk::FeeRate::from_sat_per_vb(withdraw_request.fee)),
})
withdraw_request.address.clone(),
bdk::FeeRate::from_sat_per_vb(withdraw_request.fee),
)
.await
.map_err(|e| {
HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR)
.title("Could not proceed with withdraw request")
.detail(e.to_string())
})?
.map_err(|e| {
HttpApiProblem::new(StatusCode::BAD_REQUEST)
.title("Could not withdraw funds")
.detail(e.to_string())
})?;
Ok(tx::to_mempool_url(txid, *network.inner()))

16
daemon/src/taker.rs

@ -233,12 +233,7 @@ async fn main() -> Result<()> {
let (projection_actor, projection_context) = xtra::Context::new(None);
let TakerActorSystem {
cfd_actor_addr,
connection_actor_addr,
maker_online_status_feed_receiver,
tasks: _tasks,
} = TakerActorSystem::new(
let taker = TakerActorSystem::new(
db.clone(),
wallet.clone(),
oracle,
@ -273,19 +268,18 @@ async fn main() -> Result<()> {
let possible_addresses = resolve_maker_addresses(&opts.maker).await?;
tasks.add(connect(
maker_online_status_feed_receiver.clone(),
connection_actor_addr,
taker.maker_online_status_feed_receiver.clone(),
taker.connection_actor_addr.clone(),
maker_identity,
possible_addresses,
));
let rocket = rocket::custom(figment)
.manage(projection_feeds)
.manage(cfd_actor_addr)
.manage(wallet_feed_receiver)
.manage(bitcoin_network)
.manage(wallet)
.manage(maker_online_status_feed_receiver)
.manage(taker.maker_online_status_feed_receiver.clone())
.manage(taker)
.mount(
"/api",
rocket::routes![

7
daemon/tests/harness/mocks/wallet.rs

@ -38,6 +38,9 @@ impl WalletActor {
async fn handle(&mut self, msg: wallet::TryBroadcastTransaction) -> Result<Txid> {
self.mock.lock().await.broadcast(msg)
}
async fn handle(&mut self, msg: wallet::Withdraw) -> Result<Txid> {
self.mock.lock().await.withdraw(msg)
}
}
#[automock]
@ -53,6 +56,10 @@ pub trait Wallet {
fn broadcast(&mut self, _msg: wallet::TryBroadcastTransaction) -> Result<Txid> {
unreachable!("mockall will reimplement this method")
}
fn withdraw(&mut self, _msg: wallet::Withdraw) -> Result<Txid> {
unreachable!("mockall will reimplement this method")
}
}
#[allow(dead_code)]

Loading…
Cancel
Save