Browse Source

Remove `maker_cfd::CfdAction` enum

Originally we thought we need this enum because we would need to use
`MessageChannel`s from the rocket layer. But we can actually fully
qualify the address without issues.

We also introduce `xtra_productivity` to remove some of the indirection
that is happening in this file.
debug-collab-settlement
Thomas Eizinger 3 years ago
parent
commit
07c79b8396
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 10
      daemon/src/maker.rs
  2. 430
      daemon/src/maker_cfd.rs
  3. 42
      daemon/src/routes_maker.rs
  4. 2
      daemon/tests/harness/mocks/wallet.rs
  5. 5
      daemon/tests/harness/mod.rs

10
daemon/src/maker.rs

@ -11,7 +11,7 @@ use daemon::model::{TakerId, WalletInfo};
use daemon::seed::Seed;
use daemon::tokio_ext::FutureExt;
use daemon::{
bitmex_price_feed, db, housekeeping, logger, maker_cfd, maker_inc_connections, monitor, oracle,
bitmex_price_feed, db, housekeeping, logger, maker_inc_connections, monitor, oracle,
projection, wallet, wallet_sync, MakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS,
SETTLEMENT_INTERVAL,
};
@ -24,7 +24,6 @@ use std::str::FromStr;
use std::task::Poll;
use tokio::sync::watch;
use tracing_subscriber::filter::LevelFilter;
use xtra::prelude::*;
use xtra::Actor;
mod routes_maker;
@ -305,17 +304,12 @@ async fn main() -> Result<()> {
});
tasks.add(incoming_connection_addr.attach_stream(listener_stream));
tasks.add(wallet_sync::new(wallet.clone(), wallet_feed_sender));
let cfd_action_channel = MessageChannel::<maker_cfd::CfdAction>::clone_channel(&cfd_actor_addr);
let new_order_channel = MessageChannel::<maker_cfd::NewOrder>::clone_channel(&cfd_actor_addr);
rocket::custom(figment)
.manage(order_feed_receiver)
.manage(update_cfd_feed_receiver)
.manage(cfd_action_channel)
.manage(new_order_channel)
.manage(cfd_actor_addr)
.manage(cfd_feed_receiver)
.manage(connected_takers_feed_receiver)
.manage(wallet_feed_receiver)

430
daemon/src/maker_cfd.rs

@ -25,17 +25,29 @@ use sqlx::Sqlite;
use std::collections::{HashMap, HashSet};
use time::Duration;
use xtra::prelude::*;
use xtra_productivity::xtra_productivity;
pub enum CfdAction {
AcceptOrder { order_id: OrderId },
RejectOrder { order_id: OrderId },
AcceptSettlement { order_id: OrderId },
RejectSettlement { order_id: OrderId },
AcceptRollOver { order_id: OrderId },
RejectRollOver { order_id: OrderId },
Commit { order_id: OrderId },
pub struct AcceptOrder {
pub order_id: OrderId,
}
pub struct RejectOrder {
pub order_id: OrderId,
}
pub struct AcceptSettlement {
pub order_id: OrderId,
}
pub struct RejectSettlement {
pub order_id: OrderId,
}
pub struct AcceptRollOver {
pub order_id: OrderId,
}
pub struct RejectRollOver {
pub order_id: OrderId,
}
pub struct Commit {
pub order_id: OrderId,
}
pub struct NewOrder {
pub price: Price,
pub min_quantity: Usd,
@ -65,7 +77,12 @@ pub struct FromTaker {
pub msg: wire::TakerToMaker,
}
pub struct Actor<O, M, T, W> {
pub struct Actor<
O = oracle::Actor,
M = monitor::Actor,
T = maker_inc_connections::Actor,
W = wallet::Actor,
> {
db: sqlx::SqlitePool,
wallet: Address<W>,
settlement_interval: Duration,
@ -301,14 +318,6 @@ impl<O, M, T, W> Actor<O, M, T, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(order_id, &mut conn, &self.wallet, &self.projection_actor)
.await?;
Ok(())
}
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event(event, &mut conn, &self.wallet, &self.projection_actor)
@ -366,85 +375,20 @@ where
Ok(())
}
async fn handle_accept_settlement(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Maker accepts a settlement proposal" );
let taker_id = self.get_taker_id_of_proposal(&order_id)?;
match self
.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
msg: wire::MakerToTaker::ConfirmSettlement(order_id),
})
.await?
{
Ok(_) => {
self.current_agreed_proposals
.insert(order_id, self.get_settlement_proposal(order_id)?);
self.remove_pending_proposal(&order_id)
.await
.context("accepted settlement")?;
}
Err(e) => {
tracing::warn!("Failed to notify taker of accepted settlement: {}", e);
self.remove_pending_proposal(&order_id)
.await
.context("accepted settlement")?;
}
}
Ok(())
}
async fn handle_reject_settlement(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Maker rejects a settlement proposal" );
let taker_id = self.get_taker_id_of_proposal(&order_id)?;
// clean-up state ahead of sending to ensure consistency in case we fail to deliver the
// message
self.remove_pending_proposal(&order_id)
.await
.context("rejected settlement")?;
self.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
msg: wire::MakerToTaker::RejectSettlement(order_id),
})
.await??;
Ok(())
}
async fn handle_reject_roll_over(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Maker rejects a roll_over proposal" );
// Validate if order is actually being requested to be extended
let (_, taker_id) = match self.current_pending_proposals.get(&order_id) {
Some((
UpdateCfdProposal::RollOverProposal {
proposal,
direction: SettlementKind::Incoming,
},
taker_id,
)) => (proposal, *taker_id),
_ => {
anyhow::bail!("Order is in invalid state. Ignoring reject roll over request.")
}
};
async fn reject_order(
&mut self,
taker_id: TakerId,
mut cfd: Cfd,
mut conn: PoolConnection<Sqlite>,
) -> Result<()> {
cfd.state = CfdState::rejected();
// clean-up state ahead of sending to ensure consistency in case we fail to deliver the
// message
self.remove_pending_proposal(&order_id)
.await
.context("rejected roll_over")?;
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
self.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
msg: wire::MakerToTaker::RejectRollOver(order_id),
msg: wire::MakerToTaker::RejectOrder(cfd.order.id),
})
.await??;
@ -529,49 +473,9 @@ where
Ok(())
}
async fn handle_reject_order(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Maker rejects an order" );
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let taker_id = match cfd {
Cfd {
state: CfdState::IncomingOrderRequest { taker_id, .. },
..
} => taker_id,
_ => {
anyhow::bail!("Order is in invalid state. Ignoring trying to accept it.")
}
};
self.reject_order(taker_id, cfd, conn).await?;
Ok(())
}
async fn reject_order(
&mut self,
taker_id: TakerId,
mut cfd: Cfd,
mut conn: PoolConnection<Sqlite>,
) -> Result<()> {
cfd.state = CfdState::rejected();
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
self.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
msg: wire::MakerToTaker::RejectOrder(cfd.order.id),
})
.await??;
Ok(())
}
}
#[xtra_productivity]
impl<O, M, T, W> Actor<O, M, T, W>
where
Self: xtra::Handler<CfdSetupCompleted>,
@ -581,9 +485,11 @@ where
{
async fn handle_accept_order(
&mut self,
order_id: OrderId,
msg: AcceptOrder,
ctx: &mut Context<Self>,
) -> Result<()> {
let AcceptOrder { order_id } = msg;
if let SetupState::Active { .. } = self.setup_state {
anyhow::bail!("Already setting up a contract!")
}
@ -674,81 +580,141 @@ where
}
}
#[xtra_productivity]
impl<O, M, T, W> Actor<O, M, T, W>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
T: xtra::Handler<maker_inc_connections::TakerMessage>,
{
async fn handle_cfd_setup_completed(
&mut self,
order_id: OrderId,
dlc: Result<Dlc>,
) -> Result<()> {
self.setup_state = SetupState::None;
async fn handle_reject_order(&mut self, msg: RejectOrder) -> Result<()> {
let RejectOrder { order_id } = msg;
tracing::debug!(%order_id, "Maker rejects an order" );
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let dlc = match dlc {
Ok(dlc) => dlc,
Err(e) => {
cfd.state = CfdState::SetupFailed {
common: CfdStateCommon::default(),
info: e.to_string(),
};
let taker_id = match cfd {
Cfd {
state: CfdState::IncomingOrderRequest { taker_id, .. },
..
} => taker_id,
_ => {
anyhow::bail!("Order is in invalid state. Ignoring trying to accept it.")
}
};
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
self.reject_order(taker_id, cfd, conn).await?;
return Err(e);
Ok(())
}
async fn handle_accept_settlement(&mut self, msg: AcceptSettlement) -> Result<()> {
let AcceptSettlement { order_id } = msg;
tracing::debug!(%order_id, "Maker accepts a settlement proposal" );
let taker_id = self.get_taker_id_of_proposal(&order_id)?;
match self
.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
msg: wire::MakerToTaker::ConfirmSettlement(order_id),
})
.await?
{
Ok(_) => {
self.current_agreed_proposals
.insert(order_id, self.get_settlement_proposal(order_id)?);
self.remove_pending_proposal(&order_id)
.await
.context("accepted settlement")?;
}
};
Err(e) => {
tracing::warn!("Failed to notify taker of accepted settlement: {}", e);
self.remove_pending_proposal(&order_id)
.await
.context("accepted settlement")?;
}
}
cfd.state = CfdState::PendingOpen {
common: CfdStateCommon::default(),
dlc: dlc.clone(),
attestation: None,
};
Ok(())
}
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
async fn handle_reject_settlement(&mut self, msg: RejectSettlement) -> Result<()> {
let RejectSettlement { order_id } = msg;
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction {
tx: dlc.lock.0.clone(),
tracing::debug!(%order_id, "Maker rejects a settlement proposal" );
let taker_id = self.get_taker_id_of_proposal(&order_id)?;
// clean-up state ahead of sending to ensure consistency in case we fail to deliver the
// message
self.remove_pending_proposal(&order_id)
.await
.context("rejected settlement")?;
self.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
msg: wire::MakerToTaker::RejectSettlement(order_id),
})
.await??;
tracing::info!("Lock transaction published with txid {}", txid);
Ok(())
}
self.monitor_actor
.send(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()),
})
.await?;
async fn handle_reject_roll_over(&mut self, msg: RejectRollOver) -> Result<()> {
let RejectRollOver { order_id } = msg;
self.oracle_actor
.send(oracle::MonitorAttestation {
event_id: dlc.settlement_event_id,
tracing::debug!(%order_id, "Maker rejects a roll_over proposal" );
// Validate if order is actually being requested to be extended
let (_, taker_id) = match self.current_pending_proposals.get(&order_id) {
Some((
UpdateCfdProposal::RollOverProposal {
proposal,
direction: SettlementKind::Incoming,
},
taker_id,
)) => (proposal, *taker_id),
_ => {
anyhow::bail!("Order is in invalid state. Ignoring reject roll over request.")
}
};
// clean-up state ahead of sending to ensure consistency in case we fail to deliver the
// message
self.remove_pending_proposal(&order_id)
.await
.context("rejected roll_over")?;
self.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
msg: wire::MakerToTaker::RejectRollOver(order_id),
})
.await?;
.await??;
Ok(())
}
}
#[xtra_productivity]
impl<O, M, T, W> Actor<O, M, T, W>
where
Self: xtra::Handler<CfdRollOverCompleted>,
O: xtra::Handler<oracle::MonitorAttestation> + xtra::Handler<oracle::GetAnnouncement>,
T: xtra::Handler<maker_inc_connections::TakerMessage>,
W: xtra::Handler<wallet::Sign> + xtra::Handler<wallet::BuildPartyParams>,
{
async fn handle_accept_roll_over(
&mut self,
order_id: OrderId,
msg: AcceptRollOver,
ctx: &mut Context<Self>,
) -> Result<()> {
let AcceptRollOver { order_id } = msg;
tracing::debug!(%order_id, "Maker accepts a roll_over proposal" );
let mut conn = self.db.acquire().await?;
@ -836,6 +802,86 @@ where
}
}
#[xtra_productivity]
impl<O, M, T, W> Actor<O, M, T, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_commit(&mut self, msg: Commit) -> Result<()> {
let Commit { order_id } = msg;
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(order_id, &mut conn, &self.wallet, &self.projection_actor)
.await?;
Ok(())
}
}
impl<O, M, T, W> Actor<O, M, T, W>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_cfd_setup_completed(
&mut self,
order_id: OrderId,
dlc: Result<Dlc>,
) -> Result<()> {
self.setup_state = SetupState::None;
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let dlc = match dlc {
Ok(dlc) => dlc,
Err(e) => {
cfd.state = CfdState::SetupFailed {
common: CfdStateCommon::default(),
info: e.to_string(),
};
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
return Err(e);
}
};
cfd.state = CfdState::PendingOpen {
common: CfdStateCommon::default(),
dlc: dlc.clone(),
attestation: None,
};
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
let txid = self
.wallet
.send(wallet::TryBroadcastTransaction {
tx: dlc.lock.0.clone(),
})
.await??;
tracing::info!("Lock transaction published with txid {}", txid);
self.monitor_actor
.send(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()),
})
.await?;
self.oracle_actor
.send(oracle::MonitorAttestation {
event_id: dlc.settlement_event_id,
})
.await?;
Ok(())
}
}
impl<O, M, T, W> Actor<O, M, T, W>
where
M: xtra::Handler<monitor::StartMonitoring>,
@ -947,34 +993,6 @@ where
}
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<CfdAction> for Actor<O, M, T, W>
where
Self: xtra::Handler<CfdSetupCompleted> + xtra::Handler<CfdRollOverCompleted>,
O: xtra::Handler<oracle::MonitorAttestation> + xtra::Handler<oracle::GetAnnouncement>,
T: xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>,
W: xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: CfdAction, ctx: &mut Context<Self>) -> Result<()> {
use CfdAction::*;
if let Err(e) = match msg {
AcceptOrder { order_id } => self.handle_accept_order(order_id, ctx).await,
RejectOrder { order_id } => self.handle_reject_order(order_id).await,
AcceptSettlement { order_id } => self.handle_accept_settlement(order_id).await,
RejectSettlement { order_id } => self.handle_reject_settlement(order_id).await,
AcceptRollOver { order_id } => self.handle_accept_roll_over(order_id, ctx).await,
RejectRollOver { order_id } => self.handle_reject_roll_over(order_id).await,
Commit { order_id } => self.handle_commit(order_id).await,
} {
anyhow::bail!("Message handler failed: {:#}", e);
}
Ok(())
}
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<NewOrder> for Actor<O, M, T, W>
where
@ -1168,10 +1186,6 @@ impl Message for CfdRollOverCompleted {
type Result = ();
}
impl Message for CfdAction {
type Result = Result<()>;
}
impl Message for FromTaker {
type Result = ();
}

42
daemon/src/routes_maker.rs

@ -5,7 +5,7 @@ use daemon::model::cfd::{Cfd, Order, OrderId, Role, UpdateCfdProposals};
use daemon::model::{Price, TakerId, Usd, WalletInfo};
use daemon::routes::EmbeddedFileExt;
use daemon::to_sse_event::{CfdAction, CfdsWithAuxData, ToSseEvent};
use daemon::{bitmex_price_feed, maker_cfd, wallet};
use daemon::{bitmex_price_feed, maker_cfd, maker_inc_connections, monitor, oracle, wallet};
use http_api_problem::{HttpApiProblem, StatusCode};
use rocket::http::{ContentType, Header, Status};
use rocket::response::stream::EventStream;
@ -20,6 +20,10 @@ use tokio::select;
use tokio::sync::watch;
use xtra::prelude::*;
pub type Maker = xtra::Address<
maker_cfd::Actor<oracle::Actor, monitor::Actor, maker_inc_connections::Actor, wallet::Actor>,
>;
#[allow(clippy::too_many_arguments)]
#[rocket::get("/feed")]
pub async fn maker_feed(
@ -122,10 +126,10 @@ pub struct CfdNewOrderRequest {
#[rocket::post("/order/sell", data = "<order>")]
pub async fn post_sell_order(
order: Json<CfdNewOrderRequest>,
new_order_channel: &State<Box<dyn MessageChannel<maker_cfd::NewOrder>>>,
cfd_actor: &State<Maker>,
_auth: Authenticated,
) -> Result<status::Accepted<()>, HttpApiProblem> {
new_order_channel
cfd_actor
.send(maker_cfd::NewOrder {
price: order.price,
min_quantity: order.min_quantity,
@ -163,18 +167,19 @@ pub struct PromptAuthentication {
pub async fn post_cfd_action(
id: OrderId,
action: CfdAction,
cfd_action_channel: &State<Box<dyn MessageChannel<maker_cfd::CfdAction>>>,
cfd_actor: &State<Maker>,
_auth: Authenticated,
) -> Result<status::Accepted<()>, HttpApiProblem> {
use maker_cfd::CfdAction::*;
use maker_cfd::*;
let result = match action {
CfdAction::AcceptOrder => cfd_action_channel.send(AcceptOrder { order_id: id }),
CfdAction::RejectOrder => cfd_action_channel.send(RejectOrder { order_id: id }),
CfdAction::AcceptSettlement => cfd_action_channel.send(AcceptSettlement { order_id: id }),
CfdAction::RejectSettlement => cfd_action_channel.send(RejectSettlement { order_id: id }),
CfdAction::AcceptRollOver => cfd_action_channel.send(AcceptRollOver { order_id: id }),
CfdAction::RejectRollOver => cfd_action_channel.send(RejectRollOver { order_id: id }),
CfdAction::Commit => cfd_action_channel.send(Commit { order_id: id }),
CfdAction::AcceptOrder => cfd_actor.send(AcceptOrder { order_id: id }).await,
CfdAction::RejectOrder => cfd_actor.send(RejectOrder { order_id: id }).await,
CfdAction::AcceptSettlement => cfd_actor.send(AcceptSettlement { order_id: id }).await,
CfdAction::RejectSettlement => cfd_actor.send(RejectSettlement { order_id: id }).await,
CfdAction::AcceptRollOver => cfd_actor.send(AcceptRollOver { order_id: id }).await,
CfdAction::RejectRollOver => cfd_actor.send(RejectRollOver { order_id: id }).await,
CfdAction::Commit => cfd_actor.send(Commit { order_id: id }).await,
CfdAction::Settle => {
let msg = "Collaborative settlement can only be triggered by taker";
tracing::error!(msg);
@ -187,14 +192,11 @@ pub async fn post_cfd_action(
}
};
result
.await
.unwrap_or_else(|e| anyhow::bail!(e))
.map_err(|e| {
HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR)
.title(action.to_string() + " failed")
.detail(e.to_string())
})?;
result.unwrap_or_else(|e| anyhow::bail!(e)).map_err(|e| {
HttpApiProblem::new(StatusCode::INTERNAL_SERVER_ERROR)
.title(action.to_string() + " failed")
.detail(e.to_string())
})?;
Ok(status::Accepted(None))
}

2
daemon/tests/harness/mocks/wallet.rs

@ -3,7 +3,7 @@ use anyhow::Result;
use bdk::bitcoin::util::psbt::PartiallySignedTransaction;
use bdk::bitcoin::{ecdsa, Amount, Txid};
use daemon::model::{Timestamp, WalletInfo};
use daemon::wallet::{self};
use daemon::wallet;
use maia::secp256k1_zkp::Secp256k1;
use maia::{PartyParams, WalletExt};
use mockall::*;

5
daemon/tests/harness/mod.rs

@ -4,7 +4,6 @@ use crate::harness::mocks::wallet::WalletActor;
use crate::schnorrsig;
use daemon::bitmex_price_feed::Quote;
use daemon::connection::{connect, ConnectionStatus};
use daemon::maker_cfd::CfdAction;
use daemon::model::cfd::{Cfd, Order, Origin, UpdateCfdProposals};
use daemon::model::{Price, TakerId, Timestamp, Usd};
use daemon::seed::Seed;
@ -183,7 +182,7 @@ impl Maker {
pub async fn reject_take_request(&self, order: Order) {
self.system
.cfd_actor_addr
.send(CfdAction::RejectOrder { order_id: order.id })
.send(maker_cfd::RejectOrder { order_id: order.id })
.await
.unwrap()
.unwrap();
@ -192,7 +191,7 @@ impl Maker {
pub async fn accept_take_request(&self, order: Order) {
self.system
.cfd_actor_addr
.send(CfdAction::AcceptOrder { order_id: order.id })
.send(maker_cfd::AcceptOrder { order_id: order.id })
.await
.unwrap()
.unwrap();

Loading…
Cancel
Save