Browse Source

Introduce perpetual CFDs by rolling them over automatically

Co-authored-by: Daniel Karzel <daniel@comit.network>
remove-long-heartbeat-interval-in-debug-mode
Thomas Eizinger 3 years ago
parent
commit
9cd1c22320
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 10
      CHANGELOG.md
  2. 194
      daemon/src/auto_rollover.rs
  3. 20
      daemon/src/lib.rs
  4. 8
      daemon/src/maker_cfd.rs
  5. 344
      daemon/src/model/cfd.rs
  6. 7
      daemon/src/projection.rs
  7. 6
      daemon/src/rollover_maker.rs
  8. 54
      daemon/src/rollover_taker.rs
  9. 5
      daemon/src/routes_maker.rs
  10. 5
      daemon/src/routes_taker.rs
  11. 107
      daemon/src/taker_cfd.rs

10
CHANGELOG.md

@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Added
- Off-chain perpetual CFD rollover.
Hourly, the taker sends a rollover request to the maker for each open position.
The maker can accept or reject a rollover proposal.
Upon acceptance the taker and maker collaboratively agree on an oracle price event further in the future and generate new payout transactions accordingly.
The previous payout transactions are invalidated.
The new payout transactions spend from the same lock transaction, so the rollover happens off-chain.
In case a maker rejects a rollover request from a taker the old oracle price event and payout transactions stay in place.
## [0.3.0] - 2021-12-09
Initial release for mainnet.

194
daemon/src/auto_rollover.rs

@ -0,0 +1,194 @@
use crate::address_map::AddressMap;
use crate::address_map::Stopping;
use crate::cfd_actors::append_cfd_state;
use crate::connection;
use crate::db;
use crate::db::load_cfd;
use crate::model::cfd::CannotRollover;
use crate::model::cfd::CfdState;
use crate::model::cfd::CfdStateCommon;
use crate::model::cfd::OrderId;
use crate::monitor;
use crate::monitor::MonitorParams;
use crate::oracle;
use crate::projection;
use crate::rollover_taker;
use crate::Tasks;
use anyhow::Result;
use async_trait::async_trait;
use maia::secp256k1_zkp::schnorrsig;
use std::time::Duration;
use xtra::Actor as _;
use xtra::Address;
use xtra_productivity::xtra_productivity;
pub struct Actor<O, M> {
db: sqlx::SqlitePool,
oracle_pk: schnorrsig::PublicKey,
projection_actor: Address<projection::Actor>,
conn_actor: Address<connection::Actor>,
monitor_actor: Address<M>,
oracle_actor: Address<O>,
n_payouts: usize,
rollover_actors: AddressMap<OrderId, rollover_taker::Actor>,
tasks: Tasks,
}
impl<O, M> Actor<O, M> {
pub fn new(
db: sqlx::SqlitePool,
oracle_pk: schnorrsig::PublicKey,
projection_actor: Address<projection::Actor>,
conn_actor: Address<connection::Actor>,
monitor_actor: Address<M>,
oracle_actor: Address<O>,
n_payouts: usize,
) -> Self {
Self {
db,
oracle_pk,
projection_actor,
conn_actor,
monitor_actor,
oracle_actor,
n_payouts,
rollover_actors: AddressMap::default(),
tasks: Tasks::default(),
}
}
}
#[xtra_productivity]
impl<O, M> Actor<O, M>
where
M: xtra::Handler<monitor::StartMonitoring>,
O: xtra::Handler<oracle::MonitorAttestation> + xtra::Handler<oracle::GetAnnouncement>,
{
async fn handle(&mut self, _msg: AutoRollover, ctx: &mut xtra::Context<Self>) -> Result<()> {
tracing::trace!("Checking all CFDs for rollover eligibility");
let mut conn = self.db.acquire().await?;
let cfds = db::load_all_cfds(&mut conn).await?;
let this = ctx
.address()
.expect("actor to be able to give address to itself");
for cfd in cfds {
let disconnected = match self.rollover_actors.get_disconnected(cfd.id()) {
Ok(disconnected) => disconnected,
Err(_) => {
tracing::debug!(order_id=%cfd.id(), "Rollover already in progress");
continue;
}
};
let (addr, fut) = rollover_taker::Actor::new(
(cfd, self.n_payouts),
self.oracle_pk,
self.conn_actor.clone(),
&self.oracle_actor,
self.projection_actor.clone(),
&this,
(&this, &self.conn_actor),
)
.create(None)
.run();
disconnected.insert(addr);
self.tasks.add(fut);
}
Ok(())
}
}
#[xtra_productivity(message_impl = false)]
impl<O, M> Actor<O, M>
where
O: 'static,
M: 'static,
M: xtra::Handler<monitor::StartMonitoring>,
O: xtra::Handler<oracle::MonitorAttestation> + xtra::Handler<oracle::GetAnnouncement>,
{
async fn handle_rollover_completed(&mut self, msg: rollover_taker::Completed) -> Result<()> {
use rollover_taker::Completed::*;
let (order_id, dlc) = match msg {
UpdatedContract { order_id, dlc } => (order_id, dlc),
Rejected { .. } => {
return Ok(());
}
Failed { order_id, error } => {
tracing::warn!(%order_id, "Rollover failed: {:#}", error);
return Ok(());
}
NoRollover { order_id, reason } => {
match reason {
CannotRollover::NoDlc => {
tracing::warn!(%order_id, "Not rolled over: {:#}", reason);
}
CannotRollover::AlreadyExpired
| CannotRollover::WasJustRolledOver
| CannotRollover::WrongState { .. } => {
tracing::debug!(%order_id, "Not rolled over: {:#}", reason);
}
}
return Ok(());
}
};
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
*cfd.state_mut() = CfdState::Open {
common: CfdStateCommon::default(),
dlc: dlc.clone(),
attestation: None,
collaborative_close: None,
};
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
self.monitor_actor
.send(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()),
})
.await?;
self.oracle_actor
.send(oracle::MonitorAttestation {
event_id: dlc.settlement_event_id,
})
.await?;
Ok(())
}
async fn handle_rollover_actor_stopping(&mut self, msg: Stopping<rollover_taker::Actor>) {
self.rollover_actors.gc(msg);
}
}
#[async_trait]
impl<O, M> xtra::Actor for Actor<O, M>
where
O: 'static,
M: 'static,
Self: xtra::Handler<AutoRollover>,
{
async fn started(&mut self, ctx: &mut xtra::Context<Self>) {
let fut = ctx
.notify_interval(
rollover_taker::MAX_ROLLOVER_DURATION + Duration::from_secs(60),
|| AutoRollover,
)
.expect("we are alive");
self.tasks.add(fut);
}
}
/// Message to trigger roll-over on a regular interval
pub struct AutoRollover;

20
daemon/src/lib.rs

@ -29,6 +29,7 @@ pub mod sqlx_ext; // Must come first because it is a macro.
pub mod actors;
pub mod address_map;
pub mod auth;
pub mod auto_rollover;
pub mod bdk_ext;
pub mod bitmex_price_feed;
pub mod cfd_actors;
@ -86,7 +87,7 @@ pub const N_PAYOUTS: usize = 200;
/// - How the oracle event id is chosen when creating an order (maker)
/// - The sliding window of cached oracle announcements (maker, taker)
/// - The auto-rollover time-window (taker)
pub const SETTLEMENT_INTERVAL: time::Duration = time::Duration::days(7);
pub const SETTLEMENT_INTERVAL: time::Duration = time::Duration::hours(24);
/// Struct controlling the lifetime of the async tasks,
/// such as running actors and periodic notifications.
@ -268,20 +269,33 @@ where
let (connection_actor_addr, connection_actor_ctx) = xtra::Context::new(None);
let (cfd_actor_addr, cfd_actor_fut) = taker_cfd::Actor::new(
db,
db.clone(),
wallet_addr,
oracle_pk,
projection_actor.clone(),
connection_actor_addr.clone(),
monitor_addr.clone(),
oracle_addr.clone(),
n_payouts,
maker_identity,
)
.create(None)
.run();
let (_auto_rollover_address, auto_rollover_fut) = auto_rollover::Actor::new(
db,
oracle_pk,
projection_actor,
connection_actor_addr.clone(),
monitor_addr.clone(),
oracle_addr,
n_payouts,
maker_identity,
)
.create(None)
.run();
tasks.add(cfd_actor_fut);
tasks.add(auto_rollover_fut);
tasks.add(connection_actor_ctx.run(connection::Actor::new(
maker_online_status_feed_sender,

8
daemon/src/maker_cfd.rs

@ -14,14 +14,14 @@ use crate::model::cfd::Order;
use crate::model::cfd::OrderId;
use crate::model::cfd::Origin;
use crate::model::cfd::Role;
use crate::model::cfd::RollOverProposal;
use crate::model::cfd::RolloverProposal;
use crate::model::cfd::SettlementProposal;
use crate::model::Identity;
use crate::model::Price;
use crate::model::Timestamp;
use crate::model::Usd;
use crate::monitor;
use crate::monitor::MonitorParams;
use crate::monitor::{self};
use crate::oracle;
use crate::projection;
use crate::projection::Update;
@ -290,7 +290,7 @@ where
{
async fn handle_propose_roll_over(
&mut self,
proposal: RollOverProposal,
proposal: RolloverProposal,
taker_id: Identity,
ctx: &mut Context<Self>,
) -> Result<()> {
@ -904,7 +904,7 @@ where
timestamp,
} => {
log_error!(self.handle_propose_roll_over(
RollOverProposal {
RolloverProposal {
order_id,
timestamp,
},

344
daemon/src/model/cfd.rs

@ -11,6 +11,7 @@ use crate::model::Usd;
use crate::monitor;
use crate::oracle;
use crate::payout_curve;
use crate::SETTLEMENT_INTERVAL;
use anyhow::bail;
use anyhow::Context;
use anyhow::Result;
@ -562,7 +563,7 @@ pub enum UpdateCfdProposal {
direction: SettlementKind,
},
RollOverProposal {
proposal: RollOverProposal,
proposal: RolloverProposal,
direction: SettlementKind,
},
}
@ -579,7 +580,7 @@ pub struct SettlementProposal {
/// Proposed collaborative settlement
#[derive(Debug, Clone)]
pub struct RollOverProposal {
pub struct RolloverProposal {
pub order_id: OrderId,
pub timestamp: Timestamp,
}
@ -1235,6 +1236,7 @@ impl Cfd {
| CfdState::Open { dlc, .. }
| CfdState::PendingCommit { dlc, .. }
| CfdState::OpenCommitted { dlc, .. }
| CfdState::PendingRefund { dlc, .. }
| CfdState::PendingCet { dlc, .. } => Some(dlc),
CfdState::OutgoingOrderRequest { .. }
@ -1243,7 +1245,6 @@ impl Cfd {
| CfdState::Rejected { .. }
| CfdState::ContractSetup { .. }
| CfdState::Closed { .. }
| CfdState::PendingRefund { .. }
| CfdState::Refunded { .. }
| CfdState::SetupFailed { .. } => None,
}
@ -1339,6 +1340,37 @@ impl Cfd {
}
}
/// Only cfds in state `Open` that have not received an attestation and are within 23 hours
/// until expiry are eligible for rollover
pub fn can_roll_over(&self, now: OffsetDateTime) -> Result<(), CannotRollover> {
let expiry_timestamp = self.expiry_timestamp().ok_or(CannotRollover::NoDlc)?;
if now > expiry_timestamp {
return Err(CannotRollover::AlreadyExpired);
}
let time_until_expiry = expiry_timestamp - now;
if time_until_expiry > SETTLEMENT_INTERVAL - Duration::HOUR {
return Err(CannotRollover::WasJustRolledOver);
}
// only state open with no attestation is acceptable for rollover
if !matches!(
self.state.clone(),
CfdState::Open {
attestation: None,
..
}
) {
return Err(CannotRollover::WrongState {
state: self.state.to_string(),
});
}
Ok(())
}
pub fn id(&self) -> OrderId {
self.id
}
@ -1388,6 +1420,18 @@ impl Cfd {
}
}
#[derive(thiserror::Error, Debug, PartialEq)]
pub enum CannotRollover {
#[error("Cfd does not have a dlc")]
NoDlc,
#[error("The Cfd is already expired")]
AlreadyExpired,
#[error("The Cfd was just rolled over")]
WasJustRolledOver,
#[error("Cannot roll over in state {state}")]
WrongState { state: String },
}
#[derive(thiserror::Error, Debug, Clone)]
#[error("The cfd is not ready for CET publication yet: {cet_status}")]
pub struct NotReadyYet {
@ -1810,7 +1854,13 @@ pub enum Completed {
#[cfg(test)]
mod tests {
use super::*;
use crate::seed::Seed;
use bdk::bitcoin::util::psbt::Global;
use bdk::bitcoin::util::psbt::PartiallySignedTransaction;
use rust_decimal_macros::dec;
use std::collections::BTreeMap;
use std::str::FromStr;
use time::macros::datetime;
#[test]
fn given_default_values_then_expected_liquidation_price() {
@ -2057,4 +2107,292 @@ mod tests {
assert_eq!(id, deserialized);
}
#[tokio::test]
async fn given_cfd_expires_now_then_rollover() {
// --|----|-------------------------------------------------|--> time
// ct 1h 24h
// --|----|<--------------------rollover------------------->|--
// now
let cfd = Cfd::dummy_open(BitMexPriceEventId::with_20_digits(
datetime!(2021-11-19 10:00:00).assume_utc(),
));
let result = cfd.can_roll_over(datetime!(2021-11-19 10:00:00).assume_utc());
assert!(result.is_ok());
}
#[tokio::test]
async fn given_cfd_expires_within_23hours_then_rollover() {
// --|----|-------------------------------------------------|--> time
// ct 1h 24h
// --|----|<--------------------rollover------------------->|--
// now
let cfd = Cfd::dummy_open(BitMexPriceEventId::with_20_digits(
datetime!(2021-11-19 10:00:00).assume_utc(),
));
let result = cfd.can_roll_over(datetime!(2021-11-18 11:00:00).assume_utc());
assert!(result.is_ok());
}
#[tokio::test]
async fn given_cfd_past_expiry_time_then_no_rollover() {
// --|----|-------------------------------------------------|--> time
// ct 1h 24h
// --|----|<--------------------rollover------------------->|--
// now
let cfd = Cfd::dummy_open(BitMexPriceEventId::with_20_digits(
datetime!(2021-11-19 10:00:00).assume_utc(),
));
let cannot_roll_over = cfd
.can_roll_over(datetime!(2021-11-19 10:00:01).assume_utc())
.unwrap_err();
assert_eq!(cannot_roll_over, CannotRollover::AlreadyExpired)
}
#[tokio::test]
async fn given_cfd_was_just_rolled_over_then_no_rollover() {
// --|----|-------------------------------------------------|--> time
// ct 1h 24h
// --|----|<--------------------rollover------------------->|--
// now
let cfd = Cfd::dummy_open(BitMexPriceEventId::with_20_digits(
datetime!(2021-11-19 10:00:00).assume_utc(),
));
let cannot_roll_over = cfd
.can_roll_over(datetime!(2021-11-18 10:00:01).assume_utc())
.unwrap_err();
assert_eq!(cannot_roll_over, CannotRollover::WasJustRolledOver)
}
#[tokio::test]
async fn given_cfd_out_of_bounds_expiry_then_no_rollover() {
// --|----|-------------------------------------------------|--> time
// ct 1h 24h
// --|----|<--------------------rollover------------------->|--
// now
let cfd = Cfd::dummy_open(BitMexPriceEventId::with_20_digits(
datetime!(2021-11-19 10:00:00).assume_utc(),
));
let cannot_roll_over = cfd
.can_roll_over(datetime!(2021-11-18 09:59:59).assume_utc())
.unwrap_err();
assert_eq!(cannot_roll_over, CannotRollover::WasJustRolledOver)
}
#[tokio::test]
async fn given_cfd_was_renewed_less_than_1h_ago_then_no_rollover() {
// --|----|-------------------------------------------------|--> time
// ct 1h 24h
// --|----|<--------------------rollover------------------->|--
// now
let cfd = Cfd::dummy_open(BitMexPriceEventId::with_20_digits(
datetime!(2021-11-19 10:00:00).assume_utc(),
));
let cannot_roll_over = cfd
.can_roll_over(datetime!(2021-11-18 10:59:59).assume_utc())
.unwrap_err();
assert_eq!(cannot_roll_over, CannotRollover::WasJustRolledOver)
}
#[tokio::test]
async fn given_cfd_has_attestation_then_no_rollover() {
let cfd = Cfd::dummy_open_with_attestation(BitMexPriceEventId::with_20_digits(
datetime!(2021-11-19 10:00:00).assume_utc(),
));
let cannot_roll_over = cfd
.can_roll_over(datetime!(2021-11-19 10:00:00).assume_utc())
.unwrap_err();
assert!(matches!(
cannot_roll_over,
CannotRollover::WrongState { .. }
))
}
#[tokio::test]
async fn given_cfd_not_in_open_then_no_rollover() {
let cfd = Cfd::dummy_not_open_but_dlc(BitMexPriceEventId::with_20_digits(
datetime!(2021-11-19 10:00:00).assume_utc(),
));
let cannot_roll_over = cfd
.can_roll_over(datetime!(2021-11-19 10:00:00).assume_utc())
.unwrap_err();
assert!(matches!(
cannot_roll_over,
CannotRollover::WrongState { .. }
))
}
impl Cfd {
fn dummy_open(event_id: BitMexPriceEventId) -> Self {
Cfd::from_order(
Order::dummy_model(),
Usd::new(dec!(1000)),
CfdState::Open {
common: Default::default(),
dlc: Dlc::dummy(Some(event_id)),
attestation: None,
collaborative_close: None,
},
dummy_identity(),
Role::Taker,
)
}
fn dummy_open_with_attestation(event_id: BitMexPriceEventId) -> Self {
Cfd::from_order(
Order::dummy_model(),
Usd::new(dec!(1000)),
CfdState::Open {
common: Default::default(),
dlc: Dlc::dummy(Some(event_id)),
// the dummy_dlc contains a dummy range [0, 1]
attestation: Some(
Attestation::new(
BitMexPriceEventId::with_20_digits(OffsetDateTime::now_utc()),
0,
vec![],
Dlc::dummy(None),
Role::Taker,
)
.unwrap(),
),
collaborative_close: None,
},
dummy_identity(),
Role::Taker,
)
}
fn dummy_not_open_but_dlc(event_id: BitMexPriceEventId) -> Self {
Cfd::from_order(
Order::dummy_model(),
Usd::new(dec!(1000)),
CfdState::PendingRefund {
common: Default::default(),
dlc: Dlc::dummy(Some(event_id)),
},
dummy_identity(),
Role::Taker,
)
}
}
impl Order {
fn dummy_model() -> Self {
Order::new_short(
Price::new(dec!(1000)).unwrap(),
Usd::new(dec!(100)),
Usd::new(dec!(1000)),
Origin::Theirs,
dummy_event_id(),
time::Duration::hours(24),
1,
)
.unwrap()
}
}
impl Dlc {
fn dummy(event_id: Option<BitMexPriceEventId>) -> Self {
let dummy_sk = SecretKey::from_slice(&[1; 32]).unwrap();
let dummy_pk = PublicKey::from_slice(&[
3, 23, 183, 225, 206, 31, 159, 148, 195, 42, 67, 115, 146, 41, 248, 140, 11, 3, 51,
41, 111, 180, 110, 143, 114, 134, 88, 73, 198, 174, 52, 184, 78,
])
.unwrap();
let dummy_addr = Address::from_str("132F25rTsvBdp9JzLLBHP5mvGY66i1xdiM").unwrap();
let dummy_tx = dummy_partially_signed_transaction().extract_tx();
let dummy_adapter_sig = "03424d14a5471c048ab87b3b83f6085d125d5864249ae4297a57c84e74710bb6730223f325042fce535d040fee52ec13231bf709ccd84233c6944b90317e62528b2527dff9d659a96db4c99f9750168308633c1867b70f3a18fb0f4539a1aecedcd1fc0148fc22f36b6303083ece3f872b18e35d368b3958efe5fb081f7716736ccb598d269aa3084d57e1855e1ea9a45efc10463bbf32ae378029f5763ceb40173f"
.parse()
.unwrap();
let dummy_sig = Signature::from_str("3046022100839c1fbc5304de944f697c9f4b1d01d1faeba32d751c0f7acb21ac8a0f436a72022100e89bd46bb3a5a62adc679f659b7ce876d83ee297c7a5587b2011c4fcc72eab45").unwrap();
let mut dummy_cet_with_zero_price_range = HashMap::new();
dummy_cet_with_zero_price_range.insert(
BitMexPriceEventId::with_20_digits(OffsetDateTime::now_utc()),
vec![Cet {
tx: dummy_tx.clone(),
adaptor_sig: dummy_adapter_sig,
range: RangeInclusive::new(0, 1),
n_bits: 0,
}],
);
Dlc {
identity: dummy_sk,
identity_counterparty: dummy_pk,
revocation: dummy_sk,
revocation_pk_counterparty: dummy_pk,
publish: dummy_sk,
publish_pk_counterparty: dummy_pk,
maker_address: dummy_addr.clone(),
taker_address: dummy_addr,
lock: (dummy_tx.clone(), Descriptor::new_pk(dummy_pk)),
commit: (
dummy_tx.clone(),
dummy_adapter_sig,
Descriptor::new_pk(dummy_pk),
),
cets: dummy_cet_with_zero_price_range,
refund: (dummy_tx, dummy_sig),
maker_lock_amount: Default::default(),
taker_lock_amount: Default::default(),
revoked_commit: vec![],
settlement_event_id: match event_id {
Some(event_id) => event_id,
None => dummy_event_id(),
},
}
}
}
pub fn dummy_partially_signed_transaction() -> PartiallySignedTransaction {
// very simple dummy psbt that does not contain anything
// pulled in from github.com-1ecc6299db9ec823/bitcoin-0.27.1/src/util/psbt/mod.rs:238
PartiallySignedTransaction {
global: Global {
unsigned_tx: Transaction {
version: 2,
lock_time: 0,
input: vec![],
output: vec![],
},
xpub: Default::default(),
version: 0,
proprietary: BTreeMap::new(),
unknown: BTreeMap::new(),
},
inputs: vec![],
outputs: vec![],
}
}
pub fn dummy_identity() -> Identity {
Identity::new(Seed::default().derive_identity().0)
}
pub fn dummy_event_id() -> BitMexPriceEventId {
BitMexPriceEventId::with_20_digits(OffsetDateTime::now_utc())
}
}

7
daemon/src/projection.rs

@ -4,7 +4,7 @@ use crate::model;
use crate::model::cfd::Cfd as ModelCfd;
use crate::model::cfd::OrderId;
use crate::model::cfd::Role;
use crate::model::cfd::RollOverProposal;
use crate::model::cfd::RolloverProposal;
use crate::model::cfd::SettlementKind;
use crate::model::cfd::SettlementProposal;
use crate::model::cfd::UpdateCfdProposal;
@ -37,7 +37,7 @@ pub struct UpdateSettlementProposal {
/// Amend a given rollover proposal (if `proposal.is_none()`, it should be removed)
pub struct UpdateRollOverProposal {
pub order: OrderId,
pub proposal: Option<(RollOverProposal, SettlementKind)>,
pub proposal: Option<(RolloverProposal, SettlementKind)>,
}
/// Store the latest state of `T` for display purposes
@ -446,7 +446,6 @@ pub enum CfdAction {
Settle,
AcceptSettlement,
RejectSettlement,
RollOver,
AcceptRollOver,
RejectRollOver,
}
@ -472,7 +471,7 @@ fn available_actions(state: CfdState, role: Role) -> Vec<CfdAction> {
vec![CfdAction::Commit]
}
(CfdState::Open { .. }, Role::Taker) => {
vec![CfdAction::RollOver, CfdAction::Commit, CfdAction::Settle]
vec![CfdAction::Commit, CfdAction::Settle]
}
(CfdState::Open { .. }, Role::Maker) => vec![CfdAction::Commit],
_ => vec![],

6
daemon/src/rollover_maker.rs

@ -4,7 +4,7 @@ use crate::maker_inc_connections::TakerMessage;
use crate::model::cfd::Dlc;
use crate::model::cfd::OrderId;
use crate::model::cfd::Role;
use crate::model::cfd::RollOverProposal;
use crate::model::cfd::RolloverProposal;
use crate::model::cfd::SettlementKind;
use crate::model::cfd::UpdateCfdProposal;
use crate::model::Identity;
@ -68,7 +68,7 @@ pub struct Actor {
oracle_actor: Box<dyn MessageChannel<GetAnnouncement>>,
on_stopping: Vec<Box<dyn MessageChannel<Stopping<Self>>>>,
projection_actor: xtra::Address<projection::Actor>,
proposal: RollOverProposal,
proposal: RolloverProposal,
}
#[async_trait::async_trait]
@ -117,7 +117,7 @@ impl Actor {
&(impl MessageChannel<Stopping<Self>> + 'static),
),
projection_actor: xtra::Address<projection::Actor>,
proposal: RollOverProposal,
proposal: RolloverProposal,
n_payouts: usize,
) -> Self {
Self {

54
daemon/src/rollover_taker.rs

@ -1,11 +1,12 @@
use crate::address_map::ActorName;
use crate::address_map::Stopping;
use crate::connection;
use crate::model::cfd::CannotRollover;
use crate::model::cfd::Cfd;
use crate::model::cfd::Dlc;
use crate::model::cfd::OrderId;
use crate::model::cfd::Role;
use crate::model::cfd::RollOverProposal;
use crate::model::cfd::RolloverProposal;
use crate::model::cfd::SettlementKind;
use crate::model::BitMexPriceEventId;
use crate::model::Timestamp;
@ -18,6 +19,8 @@ use crate::setup_contract::RolloverParams;
use crate::tokio_ext::spawn_fallible;
use crate::wire;
use crate::wire::RollOverMsg;
use crate::Tasks;
use anyhow::anyhow;
use anyhow::Context;
use anyhow::Result;
use async_trait::async_trait;
@ -26,9 +29,13 @@ use futures::channel::mpsc::UnboundedSender;
use futures::future;
use futures::SinkExt;
use maia::secp256k1_zkp::schnorrsig;
use std::time::Duration;
use time::OffsetDateTime;
use xtra::prelude::MessageChannel;
use xtra_productivity::xtra_productivity;
pub const MAX_ROLLOVER_DURATION: Duration = Duration::from_secs(4 * 60);
pub struct Actor {
cfd: Cfd,
n_payouts: usize,
@ -40,6 +47,7 @@ pub struct Actor {
on_completed: Box<dyn MessageChannel<Completed>>,
on_stopping: Vec<Box<dyn MessageChannel<Stopping<Self>>>>,
rollover_msg_sender: Option<UnboundedSender<RollOverMsg>>,
tasks: Tasks,
}
impl Actor {
@ -66,10 +74,12 @@ impl Actor {
on_completed: on_completed.clone_channel(),
on_stopping: vec![on_stopping0.clone_channel(), on_stopping1.clone_channel()],
rollover_msg_sender: None,
tasks: Tasks::default(),
}
}
async fn propose(&self, this: xtra::Address<Self>) -> Result<()> {
tracing::trace!(order_id=%self.cfd.id(), "Proposing rollover");
self.maker
.send(connection::ProposeRollOver {
order_id: self.cfd.id(),
@ -79,7 +89,7 @@ impl Actor {
.await??;
self.update_proposal(Some((
RollOverProposal {
RolloverProposal {
order_id: self.cfd.id(),
timestamp: self.timestamp,
},
@ -164,7 +174,7 @@ impl Actor {
async fn update_proposal(
&self,
proposal: Option<(RollOverProposal, SettlementKind)>,
proposal: Option<(RolloverProposal, SettlementKind)>,
) -> Result<()> {
self.projection
.send(UpdateRollOverProposal {
@ -186,7 +196,21 @@ impl Actor {
#[async_trait]
impl xtra::Actor for Actor {
async fn started(&mut self, ctx: &mut xtra::Context<Self>) {
if let Err(e) = self.cfd.can_roll_over(OffsetDateTime::now_utc()) {
self.complete(
Completed::NoRollover {
order_id: self.cfd.id(),
reason: e,
},
ctx,
)
.await;
return;
}
let this = ctx.address().expect("self to be alive");
if let Err(e) = self.propose(this).await {
self.complete(
Completed::Failed {
@ -196,7 +220,27 @@ impl xtra::Actor for Actor {
ctx,
)
.await;
return;
}
let cleanup = {
let this = ctx.address().expect("self to be alive");
async move {
tokio::time::sleep(MAX_ROLLOVER_DURATION).await;
this.send(RolloverFailed {
error: anyhow!(
"Maker did not react within {} seconds, cleaning up",
MAX_ROLLOVER_DURATION.as_secs()
),
})
.await
.expect("can send to ourselves");
}
};
self.tasks.add(cleanup);
}
async fn stopping(&mut self, ctx: &mut xtra::Context<Self>) -> xtra::KeepRunning {
@ -327,6 +371,10 @@ pub enum Completed {
order_id: OrderId,
error: anyhow::Error,
},
NoRollover {
order_id: OrderId,
reason: CannotRollover,
},
}
impl xtra::Message for Completed {

5
daemon/src/routes_maker.rs

@ -170,11 +170,6 @@ pub async fn post_cfd_action(
tracing::error!(msg);
return Err(HttpApiProblem::new(StatusCode::BAD_REQUEST).detail(msg));
}
CfdAction::RollOver => {
let msg = "RollOver proposal can only be triggered by taker";
tracing::error!(msg);
return Err(HttpApiProblem::new(StatusCode::BAD_REQUEST).detail(msg));
}
};
result.unwrap_or_else(|e| anyhow::bail!(e)).map_err(|e| {

5
daemon/src/routes_taker.rs

@ -156,11 +156,6 @@ pub async fn post_cfd_action(
})
.await
}
CfdAction::RollOver => {
cfd_actor
.send(taker_cfd::ProposeRollOver { order_id: id })
.await
}
};
result

107
daemon/src/taker_cfd.rs

@ -1,5 +1,4 @@
use crate::address_map::AddressMap;
use crate::address_map::Stopping;
use crate::cfd_actors::append_cfd_state;
use crate::cfd_actors::insert_cfd_and_update_feed;
use crate::cfd_actors::{self};
@ -18,11 +17,10 @@ use crate::model::cfd::Role;
use crate::model::Identity;
use crate::model::Price;
use crate::model::Usd;
use crate::monitor;
use crate::monitor::MonitorParams;
use crate::monitor::{self};
use crate::oracle;
use crate::projection;
use crate::rollover_taker;
use crate::setup_taker;
use crate::wallet;
use crate::Tasks;
@ -47,10 +45,6 @@ pub struct ProposeSettlement {
pub current_price: Price,
}
pub struct ProposeRollOver {
pub order_id: OrderId,
}
pub struct Commit {
pub order_id: OrderId,
}
@ -64,7 +58,6 @@ pub struct Actor<O, M, W> {
monitor_actor: Address<M>,
setup_actors: AddressMap<OrderId, setup_taker::Actor>,
collab_settlement_actors: AddressMap<OrderId, collab_settlement_taker::Actor>,
rollover_actors: AddressMap<OrderId, rollover_taker::Actor>,
oracle_actor: Address<O>,
n_payouts: usize,
tasks: Tasks,
@ -101,7 +94,6 @@ where
n_payouts,
setup_actors: AddressMap::default(),
collab_settlement_actors: AddressMap::default(),
rollover_actors: AddressMap::default(),
tasks: Tasks::default(),
current_order: None,
maker_identity,
@ -418,103 +410,6 @@ where
}
}
#[xtra_productivity]
impl<O, M, W> Actor<O, M, W>
where
M: xtra::Handler<monitor::StartMonitoring>,
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
{
async fn handle_propose_rollover(
&mut self,
msg: ProposeRollOver,
ctx: &mut Context<Self>,
) -> Result<()> {
let ProposeRollOver { order_id } = msg;
let disconnected = self
.rollover_actors
.get_disconnected(order_id)
.with_context(|| format!("Rollover for order {} is already in progress", order_id))?;
let mut conn = self.db.acquire().await?;
let cfd = load_cfd(order_id, &mut conn).await?;
let this = ctx
.address()
.expect("actor to be able to give address to itself");
let (addr, fut) = rollover_taker::Actor::new(
(cfd, self.n_payouts),
self.oracle_pk,
self.conn_actor.clone(),
&self.oracle_actor,
self.projection_actor.clone(),
&this,
(&this, &self.conn_actor),
)
.create(None)
.run();
disconnected.insert(addr);
self.tasks.add(fut);
Ok(())
}
}
#[xtra_productivity(message_impl = false)]
impl<O, M, W> Actor<O, M, W>
where
M: xtra::Handler<monitor::StartMonitoring>,
O: xtra::Handler<oracle::MonitorAttestation>,
{
async fn handle_rollover_completed(&mut self, msg: rollover_taker::Completed) -> Result<()> {
use rollover_taker::Completed::*;
let (order_id, dlc) = match msg {
UpdatedContract { order_id, dlc } => (order_id, dlc),
Rejected { .. } => {
return Ok(());
}
Failed { order_id, error } => {
tracing::warn!(%order_id, "Rollover failed: {:#}", error);
return Ok(());
}
};
let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd(order_id, &mut conn).await?;
*cfd.state_mut() = CfdState::Open {
common: CfdStateCommon::default(),
dlc: dlc.clone(),
attestation: None,
collaborative_close: None,
};
append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?;
self.monitor_actor
.send(monitor::StartMonitoring {
id: order_id,
params: MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()),
})
.await?;
self.oracle_actor
.send(oracle::MonitorAttestation {
event_id: dlc.settlement_event_id,
})
.await?;
Ok(())
}
}
#[xtra_productivity(message_impl = false)]
impl<O, M, W> Actor<O, M, W> {
async fn handle_rollover_actor_stopping(&mut self, msg: Stopping<rollover_taker::Actor>) {
self.rollover_actors.gc(msg);
}
}
#[xtra_productivity]
impl<O, M, W> Actor<O, M, W> {
async fn handle_current_order(&mut self, msg: CurrentOrder) {

Loading…
Cancel
Save