Browse Source

Use a single generic enum for protocol completion

Use the same enum across:
- setup taker,
- setup maker,
- rollover taker

Generic enum contains a reason in the Rejection case - if it's not specified, it
defaults to "unknown".

Note: Completed::Succeeded uses a marker type to allow distinguishing the
    protocol in the cfd actor message handlers.
update-blockstream-electrum-server-url
Mariusz Klochowicz 3 years ago
parent
commit
70057f4829
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 37
      daemon/src/auto_rollover.rs
  2. 15
      daemon/src/maker_cfd.rs
  3. 55
      daemon/src/model/cfd.rs
  4. 63
      daemon/src/rollover_taker.rs
  5. 51
      daemon/src/setup_maker.rs
  6. 25
      daemon/src/setup_taker.rs
  7. 19
      daemon/src/taker_cfd.rs

37
daemon/src/auto_rollover.rs

@ -4,10 +4,10 @@ use crate::cfd_actors::append_cfd_state;
use crate::connection;
use crate::db;
use crate::db::load_cfd;
use crate::model::cfd::CannotRollover;
use crate::model::cfd::CfdState;
use crate::model::cfd::CfdStateCommon;
use crate::model::cfd::OrderId;
use crate::model::cfd::RolloverCompleted;
use crate::monitor;
use crate::monitor::MonitorParams;
use crate::oracle;
@ -105,7 +105,7 @@ where
}
}
#[xtra_productivity(message_impl = false)]
#[xtra_productivity]
impl<O, M> Actor<O, M>
where
O: 'static,
@ -113,30 +113,20 @@ where
M: xtra::Handler<monitor::StartMonitoring>,
O: xtra::Handler<oracle::MonitorAttestation> + xtra::Handler<oracle::GetAnnouncement>,
{
async fn handle_rollover_completed(&mut self, msg: rollover_taker::Completed) -> Result<()> {
use rollover_taker::Completed::*;
async fn handle_rollover_completed(&mut self, msg: RolloverCompleted) -> Result<()> {
let (order_id, dlc) = match msg {
UpdatedContract { order_id, dlc } => (order_id, dlc),
Rejected { .. } => {
RolloverCompleted::Succeeded {
order_id,
payload: (dlc, _),
} => (order_id, dlc),
RolloverCompleted::Rejected { order_id, reason } => {
tracing::debug!(%order_id, "Not rolled over: {:#}", reason);
return Ok(());
}
Failed { order_id, error } => {
RolloverCompleted::Failed { order_id, error } => {
tracing::warn!(%order_id, "Rollover failed: {:#}", error);
return Ok(());
}
NoRollover { order_id, reason } => {
match reason {
CannotRollover::NoDlc => {
tracing::warn!(%order_id, "Not rolled over: {:#}", reason);
}
CannotRollover::AlreadyExpired
| CannotRollover::WasJustRolledOver
| CannotRollover::WrongState { .. } => {
tracing::debug!(%order_id, "Not rolled over: {:#}", reason);
}
}
return Ok(());
}
};
let mut conn = self.db.acquire().await?;
@ -165,7 +155,14 @@ where
Ok(())
}
}
#[xtra_productivity(message_impl = false)]
impl<O, M> Actor<O, M>
where
M: xtra::Handler<monitor::StartMonitoring>,
O: xtra::Handler<oracle::MonitorAttestation> + xtra::Handler<oracle::GetAnnouncement>,
{
async fn handle_rollover_actor_stopping(&mut self, msg: Stopping<rollover_taker::Actor>) {
self.rollover_actors.gc(msg);
}

15
daemon/src/maker_cfd.rs

@ -15,6 +15,7 @@ use crate::model::cfd::Origin;
use crate::model::cfd::Role;
use crate::model::cfd::RolloverProposal;
use crate::model::cfd::SettlementProposal;
use crate::model::cfd::SetupCompleted;
use crate::model::Identity;
use crate::model::Price;
use crate::model::Timestamp;
@ -747,22 +748,24 @@ where
}
}
#[xtra_productivity]
#[xtra_productivity(message_impl = false)]
impl<O, M, T, W> Actor<O, M, T, W>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_setup_completed(&mut self, msg: setup_maker::Completed) -> Result<()> {
use setup_maker::Completed::*;
async fn handle_setup_completed(&mut self, msg: SetupCompleted) -> Result<()> {
let (order_id, dlc) = match msg {
NewContract { order_id, dlc } => (order_id, dlc),
Failed { order_id, error } => {
SetupCompleted::Succeeded {
order_id,
payload: (dlc, _),
} => (order_id, dlc),
SetupCompleted::Failed { order_id, error } => {
self.append_cfd_state_setup_failed(order_id, error).await?;
return anyhow::Ok(());
}
Rejected(order_id) => {
SetupCompleted::Rejected { order_id, .. } => {
self.append_cfd_state_rejected(order_id).await?;
return anyhow::Ok(());
}

55
daemon/src/model/cfd.rs

@ -1834,16 +1834,15 @@ impl CollaborativeSettlement {
}
}
/// Message sent from a setup actor to the
/// cfd actor to notify that the contract setup has finished.
#[allow(clippy::large_enum_variant)]
pub enum Completed {
NewContract {
pub enum Completed<P> {
Succeeded {
order_id: OrderId,
dlc: Dlc,
payload: P,
},
Rejected {
order_id: OrderId,
reason: anyhow::Error,
},
Failed {
order_id: OrderId,
@ -1851,6 +1850,52 @@ pub enum Completed {
},
}
impl<P> Completed<P> {
pub fn rejected(order_id: OrderId) -> Self {
Self::Rejected {
order_id,
reason: anyhow::format_err!("unknown"),
}
}
pub fn rejected_due_to(order_id: OrderId, reason: anyhow::Error) -> Self {
Self::Rejected { order_id, reason }
}
}
pub mod marker {
/// Marker type for contract setup completion
pub struct Setup;
/// Marker type for rollover completion
pub struct Rollover;
}
/// Message sent from a setup actor to the
/// cfd actor to notify that the contract setup has finished.
pub type SetupCompleted = Completed<(Dlc, marker::Setup)>;
/// Message sent from a rollover actor to the
/// cfd actor to notify that the rollover has finished (contract got updated).
/// TODO: Roll it out in the maker rollover actor
pub type RolloverCompleted = Completed<(Dlc, marker::Rollover)>;
impl Completed<(Dlc, marker::Setup)> {
pub fn succeeded(order_id: OrderId, dlc: Dlc) -> Self {
Self::Succeeded {
order_id,
payload: (dlc, marker::Setup),
}
}
}
impl Completed<(Dlc, marker::Rollover)> {
pub fn succeeded(order_id: OrderId, dlc: Dlc) -> Self {
Self::Succeeded {
order_id,
payload: (dlc, marker::Rollover),
}
}
}
#[cfg(test)]
mod tests {
use super::*;

63
daemon/src/rollover_taker.rs

@ -4,8 +4,8 @@ use crate::connection;
use crate::model::cfd::CannotRollover;
use crate::model::cfd::Cfd;
use crate::model::cfd::Dlc;
use crate::model::cfd::OrderId;
use crate::model::cfd::Role;
use crate::model::cfd::RolloverCompleted;
use crate::model::cfd::RolloverProposal;
use crate::model::cfd::SettlementKind;
use crate::model::BitMexPriceEventId;
@ -44,7 +44,7 @@ pub struct Actor {
maker: xtra::Address<connection::Actor>,
get_announcement: Box<dyn MessageChannel<GetAnnouncement>>,
projection: xtra::Address<projection::Actor>,
on_completed: Box<dyn MessageChannel<Completed>>,
on_completed: Box<dyn MessageChannel<RolloverCompleted>>,
on_stopping: Vec<Box<dyn MessageChannel<Stopping<Self>>>>,
rollover_msg_sender: Option<UnboundedSender<RollOverMsg>>,
tasks: Tasks,
@ -57,7 +57,7 @@ impl Actor {
maker: xtra::Address<connection::Actor>,
get_announcement: &(impl MessageChannel<GetAnnouncement> + 'static),
projection: xtra::Address<projection::Actor>,
on_completed: &(impl MessageChannel<Completed> + 'static),
on_completed: &(impl MessageChannel<RolloverCompleted> + 'static),
(on_stopping0, on_stopping1): (
&(impl MessageChannel<Stopping<Self>> + 'static),
&(impl MessageChannel<Stopping<Self>> + 'static),
@ -186,7 +186,7 @@ impl Actor {
Ok(())
}
async fn complete(&mut self, completed: Completed, ctx: &mut xtra::Context<Self>) {
async fn complete(&mut self, completed: RolloverCompleted, ctx: &mut xtra::Context<Self>) {
let _ = self.on_completed.send(completed).await;
ctx.stop();
@ -198,9 +198,17 @@ impl xtra::Actor for Actor {
async fn started(&mut self, ctx: &mut xtra::Context<Self>) {
if let Err(e) = self.cfd.can_roll_over(OffsetDateTime::now_utc()) {
self.complete(
Completed::NoRollover {
match e {
CannotRollover::NoDlc => RolloverCompleted::Failed {
order_id: self.cfd.id(),
reason: e,
error: e.into(),
},
CannotRollover::AlreadyExpired
| CannotRollover::WasJustRolledOver
| CannotRollover::WrongState { .. } => RolloverCompleted::Rejected {
order_id: self.cfd.id(),
reason: e.into(),
},
},
ctx,
)
@ -213,7 +221,7 @@ impl xtra::Actor for Actor {
if let Err(e) = self.propose(this).await {
self.complete(
Completed::Failed {
RolloverCompleted::Failed {
order_id: self.cfd.id(),
error: e,
},
@ -265,7 +273,7 @@ impl Actor {
) {
if let Err(error) = self.handle_confirmed(msg, ctx).await {
self.complete(
Completed::Failed {
RolloverCompleted::Failed {
order_id: self.cfd.id(),
error,
},
@ -278,9 +286,9 @@ impl Actor {
pub async fn reject_rollover(&mut self, _: RollOverRejected, ctx: &mut xtra::Context<Self>) {
let order_id = self.cfd.id();
let completed = if let Err(error) = self.handle_rejected().await {
Completed::Failed { order_id, error }
RolloverCompleted::Failed { order_id, error }
} else {
Completed::Rejected { order_id }
RolloverCompleted::rejected(order_id)
};
self.complete(completed, ctx).await;
@ -291,13 +299,7 @@ impl Actor {
msg: RolloverSucceeded,
ctx: &mut xtra::Context<Self>,
) {
self.complete(
Completed::UpdatedContract {
order_id: self.cfd.id(),
dlc: msg.dlc,
},
ctx,
)
self.complete(RolloverCompleted::succeeded(self.cfd.id(), msg.dlc), ctx)
.await;
}
@ -307,7 +309,7 @@ impl Actor {
ctx: &mut xtra::Context<Self>,
) {
self.complete(
Completed::Failed {
RolloverCompleted::Failed {
order_id: self.cfd.id(),
error: msg.error,
},
@ -323,7 +325,7 @@ impl Actor {
) {
if let Err(error) = self.forward_protocol_msg(msg).await {
self.complete(
Completed::Failed {
RolloverCompleted::Failed {
order_id: self.cfd.id(),
error,
},
@ -358,29 +360,6 @@ pub struct RolloverFailed {
error: anyhow::Error,
}
#[allow(clippy::large_enum_variant)]
pub enum Completed {
UpdatedContract {
order_id: OrderId,
dlc: Dlc,
},
Rejected {
order_id: OrderId,
},
Failed {
order_id: OrderId,
error: anyhow::Error,
},
NoRollover {
order_id: OrderId,
reason: CannotRollover,
},
}
impl xtra::Message for Completed {
type Result = Result<()>;
}
impl ActorName for Actor {
fn actor_name() -> String {
"Taker rollover".to_string()

51
daemon/src/setup_maker.rs

@ -6,6 +6,7 @@ use crate::model::cfd::Dlc;
use crate::model::cfd::Order;
use crate::model::cfd::OrderId;
use crate::model::cfd::Role;
use crate::model::cfd::SetupCompleted;
use crate::model::Identity;
use crate::oracle::Announcement;
use crate::setup_contract;
@ -37,7 +38,7 @@ pub struct Actor {
taker: Box<dyn MessageChannel<maker_inc_connections::TakerMessage>>,
confirm_order: Box<dyn MessageChannel<maker_inc_connections::ConfirmOrder>>,
taker_id: Identity,
on_completed: Box<dyn MessageChannel<Completed>>,
on_completed: Box<dyn MessageChannel<SetupCompleted>>,
on_stopping: Vec<Box<dyn MessageChannel<Stopping<Self>>>>,
setup_msg_sender: Option<UnboundedSender<SetupMsg>>,
}
@ -53,7 +54,7 @@ impl Actor {
&(impl MessageChannel<maker_inc_connections::ConfirmOrder> + 'static),
Identity,
),
on_completed: &(impl MessageChannel<Completed> + 'static),
on_completed: &(impl MessageChannel<SetupCompleted> + 'static),
(on_stopping0, on_stopping1): (
&(impl MessageChannel<Stopping<Self>> + 'static),
&(impl MessageChannel<Stopping<Self>> + 'static),
@ -121,7 +122,7 @@ impl Actor {
Ok(())
}
async fn complete(&mut self, completed: Completed, ctx: &mut xtra::Context<Self>) {
async fn complete(&mut self, completed: SetupCompleted, ctx: &mut xtra::Context<Self>) {
let _ = self
.on_completed
.send(completed)
@ -162,7 +163,7 @@ impl Actor {
if let Err(error) = fut.await {
tracing::warn!(%order_id, "Stopping setup_maker actor: {}", error);
self.complete(Completed::Failed { order_id, error }, ctx)
self.complete(SetupCompleted::Failed { order_id, error }, ctx)
.await;
return;
@ -170,23 +171,18 @@ impl Actor {
}
fn handle(&mut self, _msg: Rejected, ctx: &mut xtra::Context<Self>) {
self.complete(Completed::Rejected(self.cfd.id()), ctx).await;
self.complete(SetupCompleted::rejected(self.cfd.id()), ctx)
.await;
}
fn handle(&mut self, msg: SetupSucceeded, ctx: &mut xtra::Context<Self>) {
self.complete(
Completed::NewContract {
order_id: msg.order_id,
dlc: msg.dlc,
},
ctx,
)
self.complete(SetupCompleted::succeeded(msg.order_id, msg.dlc), ctx)
.await
}
fn handle(&mut self, msg: SetupFailed, ctx: &mut xtra::Context<Self>) {
self.complete(
Completed::Failed {
SetupCompleted::Failed {
order_id: msg.order_id,
error: msg.error,
},
@ -215,12 +211,11 @@ impl xtra::Actor for Actor {
let quantity = self.cfd.quantity_usd();
let cfd = self.cfd.clone();
if quantity < self.order.min_quantity || quantity > self.order.max_quantity {
tracing::info!(
let reason = format!(
"Order rejected: quantity {} not in range [{}, {}]",
quantity,
self.order.min_quantity,
self.order.max_quantity
quantity, self.order.min_quantity, self.order.max_quantity
);
tracing::info!("{}", reason.clone());
let _ = self
.taker
@ -230,7 +225,11 @@ impl xtra::Actor for Actor {
})
.await;
self.complete(Completed::Rejected(cfd.id()), ctx).await;
self.complete(
SetupCompleted::rejected_due_to(cfd.id(), anyhow::format_err!(reason)),
ctx,
)
.await;
}
}
@ -275,22 +274,6 @@ pub struct SetupFailed {
error: anyhow::Error,
}
/// Message sent from the `setup_maker::Actor` to the
/// `maker_cfd::Actor` to notify that the contract setup has finished.
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum Completed {
Rejected(OrderId),
NewContract {
order_id: OrderId,
dlc: Dlc,
},
Failed {
order_id: OrderId,
error: anyhow::Error,
},
}
impl xtra::Message for Started {
type Result = ();
}

25
daemon/src/setup_taker.rs

@ -2,10 +2,10 @@ use crate::address_map;
use crate::connection;
use crate::model::cfd::Cfd;
use crate::model::cfd::CfdState;
use crate::model::cfd::Completed;
use crate::model::cfd::Dlc;
use crate::model::cfd::OrderId;
use crate::model::cfd::Role;
use crate::model::cfd::SetupCompleted;
use crate::oracle::Announcement;
use crate::setup_contract;
use crate::setup_contract::SetupParams;
@ -34,7 +34,7 @@ pub struct Actor {
sign: Box<dyn MessageChannel<wallet::Sign>>,
maker: xtra::Address<connection::Actor>,
on_accepted: Box<dyn MessageChannel<Started>>,
on_completed: Box<dyn MessageChannel<Completed>>,
on_completed: Box<dyn MessageChannel<SetupCompleted>>,
setup_msg_sender: Option<UnboundedSender<SetupMsg>>,
}
@ -47,7 +47,7 @@ impl Actor {
sign: &(impl MessageChannel<wallet::Sign> + 'static),
maker: xtra::Address<connection::Actor>,
on_accepted: &(impl MessageChannel<Started> + 'static),
on_completed: &(impl MessageChannel<Completed> + 'static),
on_completed: &(impl MessageChannel<SetupCompleted> + 'static),
) -> Self {
Self {
cfd,
@ -122,12 +122,14 @@ impl Actor {
let order_id = self.cfd.id();
tracing::info!(%order_id, "Order got rejected");
if msg.is_invalid_order {
tracing::warn!(%order_id, "Rejection reason: Invalid order ID");
}
let reason = if msg.is_invalid_order {
anyhow::format_err!("Invalid order id: {}", &order_id)
} else {
anyhow::format_err!("Unknown")
};
self.on_completed
.send(Completed::Rejected { order_id })
.send(SetupCompleted::rejected_due_to(order_id, reason))
.log_failure("Failed to inform about contract setup rejection")
.await?;
@ -148,10 +150,7 @@ impl Actor {
fn handle(&mut self, msg: SetupSucceeded, ctx: &mut xtra::Context<Self>) -> Result<()> {
self.on_completed
.send(Completed::NewContract {
order_id: msg.order_id,
dlc: msg.dlc,
})
.send(SetupCompleted::succeeded(msg.order_id, msg.dlc))
.log_failure("Failed to inform about contract setup completion")
.await?;
@ -162,7 +161,7 @@ impl Actor {
fn handle(&mut self, msg: SetupFailed, ctx: &mut xtra::Context<Self>) -> Result<()> {
self.on_completed
.send(Completed::Failed {
.send(SetupCompleted::Failed {
order_id: msg.order_id,
error: msg.error,
})
@ -250,7 +249,7 @@ impl xtra::Message for Started {
type Result = Result<()>;
}
impl xtra::Message for Completed {
impl xtra::Message for SetupCompleted {
type Result = Result<()>;
}

19
daemon/src/taker_cfd.rs

@ -8,11 +8,11 @@ use crate::db::load_cfd;
use crate::model::cfd::Cfd;
use crate::model::cfd::CfdState;
use crate::model::cfd::CfdStateCommon;
use crate::model::cfd::Completed;
use crate::model::cfd::Order;
use crate::model::cfd::OrderId;
use crate::model::cfd::Origin;
use crate::model::cfd::Role;
use crate::model::cfd::SetupCompleted;
use crate::model::Identity;
use crate::model::Price;
use crate::model::Usd;
@ -281,7 +281,7 @@ where
#[xtra_productivity]
impl<O, M, W> Actor<O, M, W>
where
Self: xtra::Handler<Completed>,
Self: xtra::Handler<SetupCompleted>,
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
W: xtra::Handler<wallet::BuildPartyParams> + xtra::Handler<wallet::Sign>,
{
@ -357,14 +357,17 @@ where
M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_setup_completed(&mut self, msg: Completed) -> Result<()> {
async fn handle_setup_completed(&mut self, msg: SetupCompleted) -> Result<()> {
let (order_id, dlc) = match msg {
Completed::NewContract { order_id, dlc } => (order_id, dlc),
Completed::Rejected { order_id } => {
SetupCompleted::Succeeded {
order_id,
payload: (dlc, _),
} => (order_id, dlc),
SetupCompleted::Rejected { order_id, .. } => {
self.append_cfd_state_rejected(order_id).await?;
return Ok(());
}
Completed::Failed { order_id, error } => {
SetupCompleted::Failed { order_id, error } => {
self.append_cfd_state_setup_failed(order_id, error).await?;
return Ok(());
}
@ -417,13 +420,13 @@ impl<O, M, W> Actor<O, M, W> {
}
#[async_trait]
impl<O: 'static, M: 'static, W: 'static> Handler<Completed> for Actor<O, M, W>
impl<O: 'static, M: 'static, W: 'static> Handler<SetupCompleted> for Actor<O, M, W>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: Completed, _ctx: &mut Context<Self>) -> Result<()> {
async fn handle(&mut self, msg: SetupCompleted, _ctx: &mut Context<Self>) -> Result<()> {
self.handle_setup_completed(msg).await
}
}

Loading…
Cancel
Save