Browse Source

Merge #628

628: Misc cleanup r=da-kami a=thomaseizinger



Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
reconnect-to-maker
bors[bot] 3 years ago
committed by GitHub
parent
commit
faad0cc816
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      daemon/src/cfd_actors.rs
  2. 41
      daemon/src/maker_cfd.rs
  3. 92
      daemon/src/maker_inc_connections.rs
  4. 508
      daemon/src/model/cfd.rs
  5. 7
      daemon/src/oracle.rs
  6. 17
      daemon/src/taker_cfd.rs
  7. 2
      daemon/tests/harness/mocks/mod.rs
  8. 10
      daemon/tests/harness/mocks/oracle.rs

11
daemon/src/cfd_actors.rs

@ -1,5 +1,5 @@
use crate::db::load_cfd_by_order_id;
use crate::model::cfd::{Attestation, Cfd, CfdState, CfdStateChangeEvent, OrderId};
use crate::model::cfd::{Attestation, Cfd, CfdState, OrderId};
use crate::{db, monitor, oracle, try_continue, wallet};
use anyhow::{bail, Context, Result};
use sqlx::pool::PoolConnection;
@ -50,7 +50,7 @@ where
.context("Failed to send transaction")?;
tracing::info!("CET published with txid {}", txid);
if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() {
if cfd.handle_cet_sent()?.is_none() {
bail!("If we can get the CET we should be able to transition")
}
@ -78,7 +78,7 @@ where
let mut cfd = db::load_cfd_by_order_id(order_id, conn).await?;
if cfd.handle(CfdStateChangeEvent::Monitor(event))?.is_none() {
if cfd.handle_monitoring_event(event)?.is_none() {
// early exit if there was not state change
// this is for cases where we are already in a final state
return Ok(());
@ -122,7 +122,7 @@ where
.await?
.context("Failed to publish commit tx")?;
if cfd.handle(CfdStateChangeEvent::CommitTxSent)?.is_none() {
if cfd.handle_commit_tx_sent()?.is_none() {
bail!("If we can get the commit tx we should be able to transition")
}
@ -160,8 +160,7 @@ where
cfd.role(),
));
let new_state =
try_continue!(cfd.handle(CfdStateChangeEvent::OracleAttestation(attestation)));
let new_state = try_continue!(cfd.handle_oracle_attestation(attestation));
if new_state.is_none() {
// if we don't transition to a new state after oracle attestation we ignore the cfd

41
daemon/src/maker_cfd.rs

@ -1,10 +1,8 @@
use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_send_to_feed};
use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id};
use crate::maker_inc_connections::TakerCommand;
use crate::model::cfd::{
Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc, Order,
OrderId, Origin, Role, RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal,
UpdateCfdProposals,
Cfd, CfdState, CfdStateCommon, CollaborativeSettlement, Dlc, Order, OrderId, Origin, Role,
RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal, UpdateCfdProposals,
};
use crate::model::{Price, TakerId, Timestamp, Usd};
use crate::monitor::MonitorParams;
@ -339,9 +337,7 @@ where
self.takers
.do_send_async(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::SendOrder {
order: current_order,
},
msg: wire::MakerToTaker::CurrentOrder(current_order),
})
.await?;
@ -357,7 +353,7 @@ where
.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifySettlementAccepted { id: order_id },
msg: wire::MakerToTaker::ConfirmSettlement(order_id),
})
.await?
{
@ -390,7 +386,7 @@ where
self.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifySettlementRejected { id: order_id },
msg: wire::MakerToTaker::RejectSettlement(order_id),
})
.await??;
@ -422,7 +418,7 @@ where
self.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifyRollOverRejected { id: order_id },
msg: wire::MakerToTaker::RejectRollOver(order_id),
})
.await??;
@ -460,7 +456,7 @@ where
self.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifyInvalidOrderId { id: order_id },
msg: wire::MakerToTaker::InvalidOrderId(order_id),
})
.await??;
@ -542,7 +538,7 @@ where
self.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifyOrderRejected { id: cfd.order.id },
msg: wire::MakerToTaker::RejectOrder(cfd.order.id),
})
.await??;
@ -587,8 +583,7 @@ where
let offer_announcement = self
.oracle_actor
.send(oracle::GetAnnouncement(cfd.order.oracle_event_id))
.await?
.with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?;
.await??;
// 3. Notify the taker that we are ready for contract setup
// Use `.send` here to ensure we only continue once the message has been sent
@ -597,7 +592,7 @@ where
self.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifyOrderAccepted { id: order_id },
msg: wire::MakerToTaker::ConfirmOrder(cfd.order.id),
})
.await??;
@ -611,7 +606,7 @@ where
self.takers.clone().into_sink().with(move |msg| {
future::ok(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::Protocol(msg),
msg: wire::MakerToTaker::Protocol(msg),
})
}),
receiver,
@ -758,8 +753,8 @@ where
self.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::NotifyRollOverAccepted {
id: proposal.order_id,
msg: wire::MakerToTaker::ConfirmRollOver {
order_id: proposal.order_id,
oracle_event_id,
},
})
@ -776,7 +771,7 @@ where
self.takers.clone().into_sink().with(move |msg| {
future::ok(maker_inc_connections::TakerMessage {
taker_id,
command: TakerCommand::RollOverProtocol(msg),
msg: wire::MakerToTaker::RollOverProtocol(msg),
})
}),
receiver,
@ -887,9 +882,11 @@ where
let (tx, sig_maker) = dlc.close_transaction(proposal)?;
let own_script_pubkey = dlc.script_pubkey_for(cfd.role());
cfd.handle(CfdStateChangeEvent::ProposalSigned(
CollaborativeSettlement::new(tx.clone(), own_script_pubkey.clone(), proposal.price)?,
))?;
cfd.handle_proposal_signed(CollaborativeSettlement::new(
tx.clone(),
own_script_pubkey.clone(),
proposal.price,
)?)?;
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
let spend_tx = dlc.finalize_spend_transaction((tx, sig_maker), sig_taker)?;

92
daemon/src/maker_inc_connections.rs

@ -1,6 +1,6 @@
use crate::maker_cfd::{FromTaker, NewTakerOnline};
use crate::model::cfd::{Order, OrderId};
use crate::model::{BitMexPriceEventId, TakerId};
use crate::model::cfd::Order;
use crate::model::TakerId;
use crate::noise::TransportStateExt;
use crate::tokio_ext::FutureExt;
use crate::{forward_only_ok, maker_cfd, noise, send_to_socket, wire, Tasks};
@ -19,40 +19,9 @@ use xtra_productivity::xtra_productivity;
pub struct BroadcastOrder(pub Option<Order>);
#[allow(clippy::large_enum_variant)]
pub enum TakerCommand {
SendOrder {
order: Option<Order>,
},
NotifyInvalidOrderId {
id: OrderId,
},
NotifyOrderAccepted {
id: OrderId,
},
NotifyOrderRejected {
id: OrderId,
},
NotifySettlementAccepted {
id: OrderId,
},
NotifySettlementRejected {
id: OrderId,
},
NotifyRollOverAccepted {
id: OrderId,
oracle_event_id: BitMexPriceEventId,
},
NotifyRollOverRejected {
id: OrderId,
},
Protocol(wire::SetupMsg),
RollOverProtocol(wire::RollOverMsg),
}
pub struct TakerMessage {
pub taker_id: TakerId,
pub command: TakerCommand,
pub msg: wire::MakerToTaker,
}
pub enum ListenerMessage {
@ -183,60 +152,7 @@ impl Actor {
}
async fn handle_taker_message(&mut self, msg: TakerMessage) -> Result<(), NoConnection> {
match msg.command {
TakerCommand::SendOrder { order } => {
self.send_to_taker(&msg.taker_id, wire::MakerToTaker::CurrentOrder(order))
.await?;
}
TakerCommand::NotifyInvalidOrderId { id } => {
self.send_to_taker(&msg.taker_id, wire::MakerToTaker::InvalidOrderId(id))
.await?;
}
TakerCommand::NotifyOrderAccepted { id } => {
self.send_to_taker(&msg.taker_id, wire::MakerToTaker::ConfirmOrder(id))
.await?;
}
TakerCommand::NotifyOrderRejected { id } => {
self.send_to_taker(&msg.taker_id, wire::MakerToTaker::RejectOrder(id))
.await?;
}
TakerCommand::NotifySettlementAccepted { id } => {
self.send_to_taker(&msg.taker_id, wire::MakerToTaker::ConfirmSettlement(id))
.await?;
}
TakerCommand::NotifySettlementRejected { id } => {
self.send_to_taker(&msg.taker_id, wire::MakerToTaker::RejectSettlement(id))
.await?;
}
TakerCommand::Protocol(setup_msg) => {
self.send_to_taker(&msg.taker_id, wire::MakerToTaker::Protocol(setup_msg))
.await?;
}
TakerCommand::NotifyRollOverAccepted {
id,
oracle_event_id,
} => {
self.send_to_taker(
&msg.taker_id,
wire::MakerToTaker::ConfirmRollOver {
order_id: id,
oracle_event_id,
},
)
.await?;
}
TakerCommand::NotifyRollOverRejected { id } => {
self.send_to_taker(&msg.taker_id, wire::MakerToTaker::RejectRollOver(id))
.await?;
}
TakerCommand::RollOverProtocol(roll_over_msg) => {
self.send_to_taker(
&msg.taker_id,
wire::MakerToTaker::RollOverProtocol(roll_over_msg),
)
.await?;
}
}
self.send_to_taker(&msg.taker_id, msg.msg).await?;
Ok(())
}

508
daemon/src/model/cfd.rs

@ -698,18 +698,15 @@ impl Cfd {
pub const CET_TIMELOCK: u32 = 12;
pub fn handle(&mut self, event: CfdStateChangeEvent) -> Result<Option<CfdState>> {
pub fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<Option<CfdState>> {
use CfdState::*;
// TODO: Display impl
tracing::info!("Cfd state change event {:?}", event);
let order_id = self.order.id;
// early exit if already final
if let SetupFailed { .. } | Closed { .. } | Refunded { .. } = self.state.clone() {
tracing::trace!(
"Ignoring event {:?} because cfd already in state {}",
"Ignoring monitoring event {:?} because cfd already in state {}",
event,
self.state
);
@ -717,213 +714,80 @@ impl Cfd {
}
let new_state = match event {
CfdStateChangeEvent::Monitor(event) => match event {
monitor::Event::LockFinality(_) => {
if let PendingOpen { dlc, .. } = self.state.clone() {
CfdState::Open {
common: CfdStateCommon {
transition_timestamp: Timestamp::now()?,
},
dlc,
attestation: None,
collaborative_close: None,
}
} else if let Open {
dlc,
attestation,
collaborative_close,
..
} = self.state.clone()
{
CfdState::Open {
common: CfdStateCommon {
transition_timestamp: Timestamp::now()?,
},
dlc,
attestation,
collaborative_close,
}
} else {
bail!(
"Cannot transition to Open because of unexpected state {}",
self.state
)
}
}
monitor::Event::CommitFinality(_) => {
let (dlc, attestation) = if let PendingCommit {
dlc, attestation, ..
} = self.state.clone()
{
(dlc, attestation)
} else if let PendingOpen {
dlc, attestation, ..
}
| Open {
dlc, attestation, ..
} = self.state.clone()
{
tracing::debug!(%order_id, "Was in unexpected state {}, jumping ahead to OpenCommitted", self.state);
(dlc, attestation)
} else {
bail!(
"Cannot transition to OpenCommitted because of unexpected state {}",
self.state
)
};
OpenCommitted {
monitor::Event::LockFinality(_) => {
if let PendingOpen { dlc, .. } = self.state.clone() {
CfdState::Open {
common: CfdStateCommon {
transition_timestamp: Timestamp::now()?,
},
dlc,
cet_status: if let Some(attestation) = attestation {
CetStatus::OracleSigned(attestation)
} else {
CetStatus::Unprepared
},
attestation: None,
collaborative_close: None,
}
}
monitor::Event::CloseFinality(_) => {
let collaborative_close = self.collaborative_close().context("No collaborative close after reaching collaborative close finality")?;
CfdState::closed(Payout::CollaborativeClose(collaborative_close))
},
monitor::Event::CetTimelockExpired(_) => match self.state.clone() {
CfdState::OpenCommitted {
dlc,
cet_status: CetStatus::Unprepared,
..
} => CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: Timestamp::now()?,
},
dlc,
cet_status: CetStatus::TimelockExpired,
},
CfdState::OpenCommitted {
dlc,
cet_status: CetStatus::OracleSigned(attestation),
..
} => CfdState::OpenCommitted {
} else if let Open {
dlc,
attestation,
collaborative_close,
..
} = self.state.clone()
{
CfdState::Open {
common: CfdStateCommon {
transition_timestamp: Timestamp::now()?,
},
dlc,
cet_status: CetStatus::Ready(attestation),
},
PendingOpen {
dlc, attestation, ..
}
| Open {
dlc, attestation, ..
}
| PendingCommit {
dlc, attestation, ..
} => {
tracing::debug!(%order_id, "Was in unexpected state {}, jumping ahead to OpenCommitted", self.state);
CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: Timestamp::now()?,
},
dlc,
cet_status: match attestation {
None => CetStatus::TimelockExpired,
Some(attestation) => CetStatus::Ready(attestation),
},
}
attestation,
collaborative_close,
}
_ => bail!(
"Cannot transition to OpenCommitted because of unexpected state {}",
} else {
bail!(
"Cannot transition to Open because of unexpected state {}",
self.state
),
},
monitor::Event::RefundTimelockExpired(_) => {
let dlc = if let OpenCommitted { dlc, .. } = self.state.clone() {
dlc
} else if let Open { dlc, .. } | PendingOpen { dlc, .. } = self.state.clone() {
tracing::debug!(%order_id, "Was in unexpected state {}, jumping ahead to PendingRefund", self.state);
dlc
} else {
bail!(
"Cannot transition to PendingRefund because of unexpected state {}",
self.state
)
};
CfdState::must_refund(dlc)
}
monitor::Event::RefundFinality(_) => {
let dlc = self
.dlc()
.context("No dlc available when reaching refund finality")?;
CfdState::refunded(dlc)
)
}
monitor::Event::CetFinality(_) => {
let attestation = self
.attestation()
.context("No attestation available when reaching CET finality")?;
CfdState::closed(Payout::Cet(attestation))
}
monitor::Event::RevokedTransactionFound(_) => {
todo!("Punish bad counterparty")
}
monitor::Event::CommitFinality(_) => {
let (dlc, attestation) = if let PendingCommit {
dlc, attestation, ..
} = self.state.clone()
{
(dlc, attestation)
} else if let PendingOpen {
dlc, attestation, ..
}
},
CfdStateChangeEvent::CommitTxSent => {
let (dlc, attestation ) = match self.state.clone() {
PendingOpen {
dlc, attestation, ..
} => (dlc, attestation),
Open {
dlc,
attestation,
..
} => (dlc, attestation),
_ => {
bail!(
"Cannot transition to PendingCommit because of unexpected state {}",
self.state
)
}
| Open {
dlc, attestation, ..
} = self.state.clone()
{
tracing::debug!(%order_id, "Was in unexpected state {}, jumping ahead to OpenCommitted", self.state);
(dlc, attestation)
} else {
bail!(
"Cannot transition to OpenCommitted because of unexpected state {}",
self.state
)
};
PendingCommit {
OpenCommitted {
common: CfdStateCommon {
transition_timestamp: Timestamp::now()?,
},
dlc,
attestation,
cet_status: if let Some(attestation) = attestation {
CetStatus::OracleSigned(attestation)
} else {
CetStatus::Unprepared
},
}
}
CfdStateChangeEvent::OracleAttestation(attestation) => match self.state.clone() {
CfdState::PendingOpen { dlc, .. } => CfdState::PendingOpen {
common: CfdStateCommon {
transition_timestamp: Timestamp::now()?,
},
dlc,
attestation: Some(attestation),
},
CfdState::Open { dlc, .. } => CfdState::Open {
common: CfdStateCommon {
transition_timestamp: Timestamp::now()?,
},
dlc,
attestation: Some(attestation),
collaborative_close: None,
},
CfdState::PendingCommit {
dlc,
..
} => CfdState::PendingCommit {
common: CfdStateCommon {
transition_timestamp: Timestamp::now()?,
},
dlc,
attestation: Some(attestation),
},
monitor::Event::CloseFinality(_) => {
let collaborative_close = self.collaborative_close().context(
"No collaborative close after reaching collaborative close finality",
)?;
CfdState::closed(Payout::CollaborativeClose(collaborative_close))
}
monitor::Event::CetTimelockExpired(_) => match self.state.clone() {
CfdState::OpenCommitted {
dlc,
cet_status: CetStatus::Unprepared,
@ -933,11 +797,11 @@ impl Cfd {
transition_timestamp: Timestamp::now()?,
},
dlc,
cet_status: CetStatus::OracleSigned(attestation),
cet_status: CetStatus::TimelockExpired,
},
CfdState::OpenCommitted {
dlc,
cet_status: CetStatus::TimelockExpired,
cet_status: CetStatus::OracleSigned(attestation),
..
} => CfdState::OpenCommitted {
common: CfdStateCommon {
@ -946,43 +810,239 @@ impl Cfd {
dlc,
cet_status: CetStatus::Ready(attestation),
},
PendingOpen {
dlc, attestation, ..
}
| Open {
dlc, attestation, ..
}
| PendingCommit {
dlc, attestation, ..
} => {
tracing::debug!(%order_id, "Was in unexpected state {}, jumping ahead to OpenCommitted", self.state);
CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: Timestamp::now()?,
},
dlc,
cet_status: match attestation {
None => CetStatus::TimelockExpired,
Some(attestation) => CetStatus::Ready(attestation),
},
}
}
_ => bail!(
"Cannot transition to OpenCommitted because of unexpected state {}",
self.state
),
},
CfdStateChangeEvent::CetSent => {
let dlc = self.dlc().context("No DLC available after CET was sent")?;
monitor::Event::RefundTimelockExpired(_) => {
let dlc = if let OpenCommitted { dlc, .. } = self.state.clone() {
dlc
} else if let Open { dlc, .. } | PendingOpen { dlc, .. } = self.state.clone() {
tracing::debug!(%order_id, "Was in unexpected state {}, jumping ahead to PendingRefund", self.state);
dlc
} else {
bail!(
"Cannot transition to PendingRefund because of unexpected state {}",
self.state
)
};
CfdState::must_refund(dlc)
}
monitor::Event::RefundFinality(_) => {
let dlc = self
.dlc()
.context("No dlc available when reaching refund finality")?;
CfdState::refunded(dlc)
}
monitor::Event::CetFinality(_) => {
let attestation = self
.attestation()
.context("No attestation available after CET was sent")?;
.context("No attestation available when reaching CET finality")?;
CfdState::PendingCet {
common: CfdStateCommon {
transition_timestamp: Timestamp::now()?,
},
dlc,
attestation,
}
CfdState::closed(Payout::Cet(attestation))
}
monitor::Event::RevokedTransactionFound(_) => {
todo!("Punish bad counterparty")
}
};
self.state = new_state.clone();
Ok(Some(new_state))
}
pub fn handle_commit_tx_sent(&mut self) -> Result<Option<CfdState>> {
use CfdState::*;
// early exit if already final
if let SetupFailed { .. } | Closed { .. } | Refunded { .. } = self.state.clone() {
tracing::trace!(
"Ignoring sent commit transaction because cfd already in state {}",
self.state
);
return Ok(None);
}
let (dlc, attestation) = match self.state.clone() {
PendingOpen {
dlc, attestation, ..
} => (dlc, attestation),
Open {
dlc, attestation, ..
} => (dlc, attestation),
_ => {
bail!(
"Cannot transition to PendingCommit because of unexpected state {}",
self.state
)
}
};
self.state = PendingCommit {
common: CfdStateCommon {
transition_timestamp: Timestamp::now()?,
},
CfdStateChangeEvent::ProposalSigned(collaborative_close) => match self.state.clone() {
CfdState::Open {
common,
dlc,
attestation,
..
} => CfdState::Open {
common,
dlc,
attestation,
collaborative_close: Some(collaborative_close),
dlc,
attestation,
};
Ok(Some(self.state.clone()))
}
pub fn handle_oracle_attestation(
&mut self,
attestation: Attestation,
) -> Result<Option<CfdState>> {
use CfdState::*;
// early exit if already final
if let SetupFailed { .. } | Closed { .. } | Refunded { .. } = self.state.clone() {
tracing::trace!(
"Ignoring oracle attestation because cfd already in state {}",
self.state
);
return Ok(None);
}
let new_state = match self.state.clone() {
CfdState::PendingOpen { dlc, .. } => CfdState::PendingOpen {
common: CfdStateCommon {
transition_timestamp: Timestamp::now()?,
},
_ => bail!(
"Cannot add proposed settlement details to state because of unexpected state {}",
self.state
),
dlc,
attestation: Some(attestation),
},
CfdState::Open { dlc, .. } => CfdState::Open {
common: CfdStateCommon {
transition_timestamp: Timestamp::now()?,
},
dlc,
attestation: Some(attestation),
collaborative_close: None,
},
CfdState::PendingCommit { dlc, .. } => CfdState::PendingCommit {
common: CfdStateCommon {
transition_timestamp: Timestamp::now()?,
},
dlc,
attestation: Some(attestation),
},
CfdState::OpenCommitted {
dlc,
cet_status: CetStatus::Unprepared,
..
} => CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: Timestamp::now()?,
},
dlc,
cet_status: CetStatus::OracleSigned(attestation),
},
CfdState::OpenCommitted {
dlc,
cet_status: CetStatus::TimelockExpired,
..
} => CfdState::OpenCommitted {
common: CfdStateCommon {
transition_timestamp: Timestamp::now()?,
},
dlc,
cet_status: CetStatus::Ready(attestation),
},
_ => bail!(
"Cannot transition to OpenCommitted because of unexpected state {}",
self.state
),
};
self.state = new_state.clone();
Ok(Some(new_state))
}
pub fn handle_cet_sent(&mut self) -> Result<Option<CfdState>> {
use CfdState::*;
// early exit if already final
if let SetupFailed { .. } | Closed { .. } | Refunded { .. } = self.state.clone() {
tracing::trace!(
"Ignoring pending CET because cfd already in state {}",
self.state
);
return Ok(None);
}
let dlc = self.dlc().context("No DLC available after CET was sent")?;
let attestation = self
.attestation()
.context("No attestation available after CET was sent")?;
self.state = CfdState::PendingCet {
common: CfdStateCommon {
transition_timestamp: Timestamp::now()?,
},
dlc,
attestation,
};
Ok(Some(self.state.clone()))
}
pub fn handle_proposal_signed(
&mut self,
collaborative_close: CollaborativeSettlement,
) -> Result<Option<CfdState>> {
use CfdState::*;
// early exit if already final
if let SetupFailed { .. } | Closed { .. } | Refunded { .. } = self.state.clone() {
tracing::trace!(
"Ignoring collaborative settlement because cfd already in state {}",
self.state
);
return Ok(None);
}
let new_state = match self.state.clone() {
CfdState::Open {
common,
dlc,
attestation,
..
} => CfdState::Open {
common,
dlc,
attestation,
collaborative_close: Some(collaborative_close),
},
_ => bail!(
"Cannot add proposed settlement details to state because of unexpected state {}",
self.state
),
};
self.state = new_state.clone();
@ -1282,18 +1342,6 @@ pub struct NotReadyYet {
cet_status: CetStatus,
}
#[derive(Debug, Clone)]
pub enum CfdStateChangeEvent {
// TODO: group other events by actors into enums and add them here so we can bundle all
// transitions into cfd.transition_to(...)
Monitor(monitor::Event),
CommitTxSent,
OracleAttestation(Attestation),
CetSent,
/// Settlement proposal was signed by the taker and sent to the maker
ProposalSigned(CollaborativeSettlement),
}
pub trait AsBlocks {
/// Calculates the duration in Bitcoin blocks.
///

7
daemon/src/oracle.rs

@ -218,7 +218,7 @@ impl Actor {
&mut self,
msg: GetAnnouncement,
_ctx: &mut xtra::Context<Self>,
) -> Option<Announcement> {
) -> Result<Announcement, NoAnnouncement> {
self.announcements
.get_key_value(&msg.0)
.map(|(id, (time, nonce_pks))| Announcement {
@ -226,6 +226,7 @@ impl Actor {
expected_outcome_time: *time,
nonce_pks: nonce_pks.clone(),
})
.ok_or(NoAnnouncement(msg.0))
}
fn handle_new_announcement_fetched(
@ -243,6 +244,10 @@ impl Actor {
}
}
#[derive(Debug, Clone, thiserror::Error)]
#[error("Announcement {0} not found")]
pub struct NoAnnouncement(pub BitMexPriceEventId);
#[async_trait]
impl xtra::Handler<NewAttestationFetched> for Actor {
async fn handle(&mut self, msg: NewAttestationFetched, _ctx: &mut xtra::Context<Self>) {

17
daemon/src/taker_cfd.rs

@ -1,9 +1,8 @@
use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_send_to_feed};
use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id};
use crate::model::cfd::{
Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, CollaborativeSettlement, Dlc, Order,
OrderId, Origin, Role, RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal,
UpdateCfdProposals,
Cfd, CfdState, CfdStateCommon, CollaborativeSettlement, Dlc, Order, OrderId, Origin, Role,
RollOverProposal, SettlementKind, SettlementProposal, UpdateCfdProposal, UpdateCfdProposals,
};
use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd};
use crate::monitor::{self, MonitorParams};
@ -676,13 +675,11 @@ where
})
.await?;
cfd.handle(CfdStateChangeEvent::ProposalSigned(
CollaborativeSettlement::new(
tx.clone(),
dlc.script_pubkey_for(cfd.role()),
proposal.price,
)?,
))?;
cfd.handle_proposal_signed(CollaborativeSettlement::new(
tx.clone(),
dlc.script_pubkey_for(cfd.role()),
proposal.price,
)?)?;
append_cfd_state(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?;
self.remove_pending_proposal(&order_id)?;

2
daemon/tests/harness/mocks/mod.rs

@ -58,7 +58,7 @@ impl Mocks {
self.oracle()
.await
.expect_get_announcement()
.return_const(Some(oracle::dummy_announcement()));
.return_const(Ok(oracle::dummy_announcement()));
}
pub async fn mock_oracle_monitor_attestation(&mut self) {

10
daemon/tests/harness/mocks/oracle.rs

@ -18,7 +18,10 @@ impl Oracle for OracleActor {}
#[xtra_productivity(message_impl = false)]
impl OracleActor {
async fn handle(&mut self, msg: oracle::GetAnnouncement) -> Option<oracle::Announcement> {
async fn handle(
&mut self,
msg: oracle::GetAnnouncement,
) -> Result<oracle::Announcement, oracle::NoAnnouncement> {
self.mock.lock().await.get_announcement(msg)
}
@ -33,7 +36,10 @@ impl OracleActor {
#[automock]
pub trait Oracle {
fn get_announcement(&mut self, _msg: oracle::GetAnnouncement) -> Option<oracle::Announcement> {
fn get_announcement(
&mut self,
_msg: oracle::GetAnnouncement,
) -> Result<oracle::Announcement, oracle::NoAnnouncement> {
unreachable!("mockall will reimplement this method")
}

Loading…
Cancel
Save