From 9cd1c2232002fbc5a6c552e3d06fd59ff3378b47 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 10 Dec 2021 09:48:09 +1100 Subject: [PATCH] Introduce perpetual CFDs by rolling them over automatically Co-authored-by: Daniel Karzel --- CHANGELOG.md | 10 + daemon/src/auto_rollover.rs | 194 ++++++++++++++++++++ daemon/src/lib.rs | 20 +- daemon/src/maker_cfd.rs | 8 +- daemon/src/model/cfd.rs | 344 ++++++++++++++++++++++++++++++++++- daemon/src/projection.rs | 7 +- daemon/src/rollover_maker.rs | 6 +- daemon/src/rollover_taker.rs | 54 +++++- daemon/src/routes_maker.rs | 5 - daemon/src/routes_taker.rs | 5 - daemon/src/taker_cfd.rs | 107 +---------- 11 files changed, 624 insertions(+), 136 deletions(-) create mode 100644 daemon/src/auto_rollover.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 8d541a5..96cdc68 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Off-chain perpetual CFD rollover. + Hourly, the taker sends a rollover request to the maker for each open position. + The maker can accept or reject a rollover proposal. + Upon acceptance the taker and maker collaboratively agree on an oracle price event further in the future and generate new payout transactions accordingly. + The previous payout transactions are invalidated. + The new payout transactions spend from the same lock transaction, so the rollover happens off-chain. + In case a maker rejects a rollover request from a taker the old oracle price event and payout transactions stay in place. + ## [0.3.0] - 2021-12-09 Initial release for mainnet. diff --git a/daemon/src/auto_rollover.rs b/daemon/src/auto_rollover.rs new file mode 100644 index 0000000..ae6f9a2 --- /dev/null +++ b/daemon/src/auto_rollover.rs @@ -0,0 +1,194 @@ +use crate::address_map::AddressMap; +use crate::address_map::Stopping; +use crate::cfd_actors::append_cfd_state; +use crate::connection; +use crate::db; +use crate::db::load_cfd; +use crate::model::cfd::CannotRollover; +use crate::model::cfd::CfdState; +use crate::model::cfd::CfdStateCommon; +use crate::model::cfd::OrderId; +use crate::monitor; +use crate::monitor::MonitorParams; +use crate::oracle; +use crate::projection; +use crate::rollover_taker; +use crate::Tasks; +use anyhow::Result; +use async_trait::async_trait; +use maia::secp256k1_zkp::schnorrsig; +use std::time::Duration; +use xtra::Actor as _; +use xtra::Address; +use xtra_productivity::xtra_productivity; + +pub struct Actor { + db: sqlx::SqlitePool, + oracle_pk: schnorrsig::PublicKey, + projection_actor: Address, + conn_actor: Address, + monitor_actor: Address, + oracle_actor: Address, + n_payouts: usize, + + rollover_actors: AddressMap, + + tasks: Tasks, +} + +impl Actor { + pub fn new( + db: sqlx::SqlitePool, + oracle_pk: schnorrsig::PublicKey, + projection_actor: Address, + conn_actor: Address, + monitor_actor: Address, + oracle_actor: Address, + n_payouts: usize, + ) -> Self { + Self { + db, + oracle_pk, + projection_actor, + conn_actor, + monitor_actor, + oracle_actor, + n_payouts, + rollover_actors: AddressMap::default(), + tasks: Tasks::default(), + } + } +} + +#[xtra_productivity] +impl Actor +where + M: xtra::Handler, + O: xtra::Handler + xtra::Handler, +{ + async fn handle(&mut self, _msg: AutoRollover, ctx: &mut xtra::Context) -> Result<()> { + tracing::trace!("Checking all CFDs for rollover eligibility"); + + let mut conn = self.db.acquire().await?; + let cfds = db::load_all_cfds(&mut conn).await?; + + let this = ctx + .address() + .expect("actor to be able to give address to itself"); + + for cfd in cfds { + let disconnected = match self.rollover_actors.get_disconnected(cfd.id()) { + Ok(disconnected) => disconnected, + Err(_) => { + tracing::debug!(order_id=%cfd.id(), "Rollover already in progress"); + continue; + } + }; + + let (addr, fut) = rollover_taker::Actor::new( + (cfd, self.n_payouts), + self.oracle_pk, + self.conn_actor.clone(), + &self.oracle_actor, + self.projection_actor.clone(), + &this, + (&this, &self.conn_actor), + ) + .create(None) + .run(); + + disconnected.insert(addr); + self.tasks.add(fut); + } + + Ok(()) + } +} + +#[xtra_productivity(message_impl = false)] +impl Actor +where + O: 'static, + M: 'static, + M: xtra::Handler, + O: xtra::Handler + xtra::Handler, +{ + async fn handle_rollover_completed(&mut self, msg: rollover_taker::Completed) -> Result<()> { + use rollover_taker::Completed::*; + let (order_id, dlc) = match msg { + UpdatedContract { order_id, dlc } => (order_id, dlc), + Rejected { .. } => { + return Ok(()); + } + Failed { order_id, error } => { + tracing::warn!(%order_id, "Rollover failed: {:#}", error); + return Ok(()); + } + NoRollover { order_id, reason } => { + match reason { + CannotRollover::NoDlc => { + tracing::warn!(%order_id, "Not rolled over: {:#}", reason); + } + CannotRollover::AlreadyExpired + | CannotRollover::WasJustRolledOver + | CannotRollover::WrongState { .. } => { + tracing::debug!(%order_id, "Not rolled over: {:#}", reason); + } + } + return Ok(()); + } + }; + + let mut conn = self.db.acquire().await?; + let mut cfd = load_cfd(order_id, &mut conn).await?; + *cfd.state_mut() = CfdState::Open { + common: CfdStateCommon::default(), + dlc: dlc.clone(), + attestation: None, + collaborative_close: None, + }; + + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; + + self.monitor_actor + .send(monitor::StartMonitoring { + id: order_id, + params: MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()), + }) + .await?; + + self.oracle_actor + .send(oracle::MonitorAttestation { + event_id: dlc.settlement_event_id, + }) + .await?; + + Ok(()) + } + + async fn handle_rollover_actor_stopping(&mut self, msg: Stopping) { + self.rollover_actors.gc(msg); + } +} + +#[async_trait] +impl xtra::Actor for Actor +where + O: 'static, + M: 'static, + Self: xtra::Handler, +{ + async fn started(&mut self, ctx: &mut xtra::Context) { + let fut = ctx + .notify_interval( + rollover_taker::MAX_ROLLOVER_DURATION + Duration::from_secs(60), + || AutoRollover, + ) + .expect("we are alive"); + + self.tasks.add(fut); + } +} + +/// Message to trigger roll-over on a regular interval +pub struct AutoRollover; diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index d81d8b6..b415051 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -29,6 +29,7 @@ pub mod sqlx_ext; // Must come first because it is a macro. pub mod actors; pub mod address_map; pub mod auth; +pub mod auto_rollover; pub mod bdk_ext; pub mod bitmex_price_feed; pub mod cfd_actors; @@ -86,7 +87,7 @@ pub const N_PAYOUTS: usize = 200; /// - How the oracle event id is chosen when creating an order (maker) /// - The sliding window of cached oracle announcements (maker, taker) /// - The auto-rollover time-window (taker) -pub const SETTLEMENT_INTERVAL: time::Duration = time::Duration::days(7); +pub const SETTLEMENT_INTERVAL: time::Duration = time::Duration::hours(24); /// Struct controlling the lifetime of the async tasks, /// such as running actors and periodic notifications. @@ -268,20 +269,33 @@ where let (connection_actor_addr, connection_actor_ctx) = xtra::Context::new(None); let (cfd_actor_addr, cfd_actor_fut) = taker_cfd::Actor::new( - db, + db.clone(), wallet_addr, oracle_pk, + projection_actor.clone(), + connection_actor_addr.clone(), + monitor_addr.clone(), + oracle_addr.clone(), + n_payouts, + maker_identity, + ) + .create(None) + .run(); + + let (_auto_rollover_address, auto_rollover_fut) = auto_rollover::Actor::new( + db, + oracle_pk, projection_actor, connection_actor_addr.clone(), monitor_addr.clone(), oracle_addr, n_payouts, - maker_identity, ) .create(None) .run(); tasks.add(cfd_actor_fut); + tasks.add(auto_rollover_fut); tasks.add(connection_actor_ctx.run(connection::Actor::new( maker_online_status_feed_sender, diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 2a74a9e..ecb56b9 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -14,14 +14,14 @@ use crate::model::cfd::Order; use crate::model::cfd::OrderId; use crate::model::cfd::Origin; use crate::model::cfd::Role; -use crate::model::cfd::RollOverProposal; +use crate::model::cfd::RolloverProposal; use crate::model::cfd::SettlementProposal; use crate::model::Identity; use crate::model::Price; use crate::model::Timestamp; use crate::model::Usd; +use crate::monitor; use crate::monitor::MonitorParams; -use crate::monitor::{self}; use crate::oracle; use crate::projection; use crate::projection::Update; @@ -290,7 +290,7 @@ where { async fn handle_propose_roll_over( &mut self, - proposal: RollOverProposal, + proposal: RolloverProposal, taker_id: Identity, ctx: &mut Context, ) -> Result<()> { @@ -904,7 +904,7 @@ where timestamp, } => { log_error!(self.handle_propose_roll_over( - RollOverProposal { + RolloverProposal { order_id, timestamp, }, diff --git a/daemon/src/model/cfd.rs b/daemon/src/model/cfd.rs index eec6fbf..dd7a1ab 100644 --- a/daemon/src/model/cfd.rs +++ b/daemon/src/model/cfd.rs @@ -11,6 +11,7 @@ use crate::model::Usd; use crate::monitor; use crate::oracle; use crate::payout_curve; +use crate::SETTLEMENT_INTERVAL; use anyhow::bail; use anyhow::Context; use anyhow::Result; @@ -562,7 +563,7 @@ pub enum UpdateCfdProposal { direction: SettlementKind, }, RollOverProposal { - proposal: RollOverProposal, + proposal: RolloverProposal, direction: SettlementKind, }, } @@ -579,7 +580,7 @@ pub struct SettlementProposal { /// Proposed collaborative settlement #[derive(Debug, Clone)] -pub struct RollOverProposal { +pub struct RolloverProposal { pub order_id: OrderId, pub timestamp: Timestamp, } @@ -1235,6 +1236,7 @@ impl Cfd { | CfdState::Open { dlc, .. } | CfdState::PendingCommit { dlc, .. } | CfdState::OpenCommitted { dlc, .. } + | CfdState::PendingRefund { dlc, .. } | CfdState::PendingCet { dlc, .. } => Some(dlc), CfdState::OutgoingOrderRequest { .. } @@ -1243,7 +1245,6 @@ impl Cfd { | CfdState::Rejected { .. } | CfdState::ContractSetup { .. } | CfdState::Closed { .. } - | CfdState::PendingRefund { .. } | CfdState::Refunded { .. } | CfdState::SetupFailed { .. } => None, } @@ -1339,6 +1340,37 @@ impl Cfd { } } + /// Only cfds in state `Open` that have not received an attestation and are within 23 hours + /// until expiry are eligible for rollover + pub fn can_roll_over(&self, now: OffsetDateTime) -> Result<(), CannotRollover> { + let expiry_timestamp = self.expiry_timestamp().ok_or(CannotRollover::NoDlc)?; + + if now > expiry_timestamp { + return Err(CannotRollover::AlreadyExpired); + } + + let time_until_expiry = expiry_timestamp - now; + + if time_until_expiry > SETTLEMENT_INTERVAL - Duration::HOUR { + return Err(CannotRollover::WasJustRolledOver); + } + + // only state open with no attestation is acceptable for rollover + if !matches!( + self.state.clone(), + CfdState::Open { + attestation: None, + .. + } + ) { + return Err(CannotRollover::WrongState { + state: self.state.to_string(), + }); + } + + Ok(()) + } + pub fn id(&self) -> OrderId { self.id } @@ -1388,6 +1420,18 @@ impl Cfd { } } +#[derive(thiserror::Error, Debug, PartialEq)] +pub enum CannotRollover { + #[error("Cfd does not have a dlc")] + NoDlc, + #[error("The Cfd is already expired")] + AlreadyExpired, + #[error("The Cfd was just rolled over")] + WasJustRolledOver, + #[error("Cannot roll over in state {state}")] + WrongState { state: String }, +} + #[derive(thiserror::Error, Debug, Clone)] #[error("The cfd is not ready for CET publication yet: {cet_status}")] pub struct NotReadyYet { @@ -1810,7 +1854,13 @@ pub enum Completed { #[cfg(test)] mod tests { use super::*; + use crate::seed::Seed; + use bdk::bitcoin::util::psbt::Global; + use bdk::bitcoin::util::psbt::PartiallySignedTransaction; use rust_decimal_macros::dec; + use std::collections::BTreeMap; + use std::str::FromStr; + use time::macros::datetime; #[test] fn given_default_values_then_expected_liquidation_price() { @@ -2057,4 +2107,292 @@ mod tests { assert_eq!(id, deserialized); } + + #[tokio::test] + async fn given_cfd_expires_now_then_rollover() { + // --|----|-------------------------------------------------|--> time + // ct 1h 24h + // --|----|<--------------------rollover------------------->|-- + // now + + let cfd = Cfd::dummy_open(BitMexPriceEventId::with_20_digits( + datetime!(2021-11-19 10:00:00).assume_utc(), + )); + let result = cfd.can_roll_over(datetime!(2021-11-19 10:00:00).assume_utc()); + + assert!(result.is_ok()); + } + + #[tokio::test] + async fn given_cfd_expires_within_23hours_then_rollover() { + // --|----|-------------------------------------------------|--> time + // ct 1h 24h + // --|----|<--------------------rollover------------------->|-- + // now + + let cfd = Cfd::dummy_open(BitMexPriceEventId::with_20_digits( + datetime!(2021-11-19 10:00:00).assume_utc(), + )); + + let result = cfd.can_roll_over(datetime!(2021-11-18 11:00:00).assume_utc()); + + assert!(result.is_ok()); + } + + #[tokio::test] + async fn given_cfd_past_expiry_time_then_no_rollover() { + // --|----|-------------------------------------------------|--> time + // ct 1h 24h + // --|----|<--------------------rollover------------------->|-- + // now + + let cfd = Cfd::dummy_open(BitMexPriceEventId::with_20_digits( + datetime!(2021-11-19 10:00:00).assume_utc(), + )); + let cannot_roll_over = cfd + .can_roll_over(datetime!(2021-11-19 10:00:01).assume_utc()) + .unwrap_err(); + + assert_eq!(cannot_roll_over, CannotRollover::AlreadyExpired) + } + + #[tokio::test] + async fn given_cfd_was_just_rolled_over_then_no_rollover() { + // --|----|-------------------------------------------------|--> time + // ct 1h 24h + // --|----|<--------------------rollover------------------->|-- + // now + + let cfd = Cfd::dummy_open(BitMexPriceEventId::with_20_digits( + datetime!(2021-11-19 10:00:00).assume_utc(), + )); + let cannot_roll_over = cfd + .can_roll_over(datetime!(2021-11-18 10:00:01).assume_utc()) + .unwrap_err(); + + assert_eq!(cannot_roll_over, CannotRollover::WasJustRolledOver) + } + + #[tokio::test] + async fn given_cfd_out_of_bounds_expiry_then_no_rollover() { + // --|----|-------------------------------------------------|--> time + // ct 1h 24h + // --|----|<--------------------rollover------------------->|-- + // now + + let cfd = Cfd::dummy_open(BitMexPriceEventId::with_20_digits( + datetime!(2021-11-19 10:00:00).assume_utc(), + )); + let cannot_roll_over = cfd + .can_roll_over(datetime!(2021-11-18 09:59:59).assume_utc()) + .unwrap_err(); + + assert_eq!(cannot_roll_over, CannotRollover::WasJustRolledOver) + } + + #[tokio::test] + async fn given_cfd_was_renewed_less_than_1h_ago_then_no_rollover() { + // --|----|-------------------------------------------------|--> time + // ct 1h 24h + // --|----|<--------------------rollover------------------->|-- + // now + + let cfd = Cfd::dummy_open(BitMexPriceEventId::with_20_digits( + datetime!(2021-11-19 10:00:00).assume_utc(), + )); + let cannot_roll_over = cfd + .can_roll_over(datetime!(2021-11-18 10:59:59).assume_utc()) + .unwrap_err(); + + assert_eq!(cannot_roll_over, CannotRollover::WasJustRolledOver) + } + + #[tokio::test] + async fn given_cfd_has_attestation_then_no_rollover() { + let cfd = Cfd::dummy_open_with_attestation(BitMexPriceEventId::with_20_digits( + datetime!(2021-11-19 10:00:00).assume_utc(), + )); + + let cannot_roll_over = cfd + .can_roll_over(datetime!(2021-11-19 10:00:00).assume_utc()) + .unwrap_err(); + + assert!(matches!( + cannot_roll_over, + CannotRollover::WrongState { .. } + )) + } + + #[tokio::test] + async fn given_cfd_not_in_open_then_no_rollover() { + let cfd = Cfd::dummy_not_open_but_dlc(BitMexPriceEventId::with_20_digits( + datetime!(2021-11-19 10:00:00).assume_utc(), + )); + + let cannot_roll_over = cfd + .can_roll_over(datetime!(2021-11-19 10:00:00).assume_utc()) + .unwrap_err(); + + assert!(matches!( + cannot_roll_over, + CannotRollover::WrongState { .. } + )) + } + + impl Cfd { + fn dummy_open(event_id: BitMexPriceEventId) -> Self { + Cfd::from_order( + Order::dummy_model(), + Usd::new(dec!(1000)), + CfdState::Open { + common: Default::default(), + dlc: Dlc::dummy(Some(event_id)), + attestation: None, + collaborative_close: None, + }, + dummy_identity(), + Role::Taker, + ) + } + + fn dummy_open_with_attestation(event_id: BitMexPriceEventId) -> Self { + Cfd::from_order( + Order::dummy_model(), + Usd::new(dec!(1000)), + CfdState::Open { + common: Default::default(), + dlc: Dlc::dummy(Some(event_id)), + // the dummy_dlc contains a dummy range [0, 1] + attestation: Some( + Attestation::new( + BitMexPriceEventId::with_20_digits(OffsetDateTime::now_utc()), + 0, + vec![], + Dlc::dummy(None), + Role::Taker, + ) + .unwrap(), + ), + collaborative_close: None, + }, + dummy_identity(), + Role::Taker, + ) + } + + fn dummy_not_open_but_dlc(event_id: BitMexPriceEventId) -> Self { + Cfd::from_order( + Order::dummy_model(), + Usd::new(dec!(1000)), + CfdState::PendingRefund { + common: Default::default(), + dlc: Dlc::dummy(Some(event_id)), + }, + dummy_identity(), + Role::Taker, + ) + } + } + + impl Order { + fn dummy_model() -> Self { + Order::new_short( + Price::new(dec!(1000)).unwrap(), + Usd::new(dec!(100)), + Usd::new(dec!(1000)), + Origin::Theirs, + dummy_event_id(), + time::Duration::hours(24), + 1, + ) + .unwrap() + } + } + + impl Dlc { + fn dummy(event_id: Option) -> Self { + let dummy_sk = SecretKey::from_slice(&[1; 32]).unwrap(); + let dummy_pk = PublicKey::from_slice(&[ + 3, 23, 183, 225, 206, 31, 159, 148, 195, 42, 67, 115, 146, 41, 248, 140, 11, 3, 51, + 41, 111, 180, 110, 143, 114, 134, 88, 73, 198, 174, 52, 184, 78, + ]) + .unwrap(); + + let dummy_addr = Address::from_str("132F25rTsvBdp9JzLLBHP5mvGY66i1xdiM").unwrap(); + + let dummy_tx = dummy_partially_signed_transaction().extract_tx(); + let dummy_adapter_sig = "03424d14a5471c048ab87b3b83f6085d125d5864249ae4297a57c84e74710bb6730223f325042fce535d040fee52ec13231bf709ccd84233c6944b90317e62528b2527dff9d659a96db4c99f9750168308633c1867b70f3a18fb0f4539a1aecedcd1fc0148fc22f36b6303083ece3f872b18e35d368b3958efe5fb081f7716736ccb598d269aa3084d57e1855e1ea9a45efc10463bbf32ae378029f5763ceb40173f" + .parse() + .unwrap(); + + let dummy_sig = Signature::from_str("3046022100839c1fbc5304de944f697c9f4b1d01d1faeba32d751c0f7acb21ac8a0f436a72022100e89bd46bb3a5a62adc679f659b7ce876d83ee297c7a5587b2011c4fcc72eab45").unwrap(); + + let mut dummy_cet_with_zero_price_range = HashMap::new(); + dummy_cet_with_zero_price_range.insert( + BitMexPriceEventId::with_20_digits(OffsetDateTime::now_utc()), + vec![Cet { + tx: dummy_tx.clone(), + adaptor_sig: dummy_adapter_sig, + range: RangeInclusive::new(0, 1), + n_bits: 0, + }], + ); + + Dlc { + identity: dummy_sk, + identity_counterparty: dummy_pk, + revocation: dummy_sk, + revocation_pk_counterparty: dummy_pk, + publish: dummy_sk, + publish_pk_counterparty: dummy_pk, + maker_address: dummy_addr.clone(), + taker_address: dummy_addr, + lock: (dummy_tx.clone(), Descriptor::new_pk(dummy_pk)), + commit: ( + dummy_tx.clone(), + dummy_adapter_sig, + Descriptor::new_pk(dummy_pk), + ), + cets: dummy_cet_with_zero_price_range, + refund: (dummy_tx, dummy_sig), + maker_lock_amount: Default::default(), + taker_lock_amount: Default::default(), + revoked_commit: vec![], + settlement_event_id: match event_id { + Some(event_id) => event_id, + None => dummy_event_id(), + }, + } + } + } + + pub fn dummy_partially_signed_transaction() -> PartiallySignedTransaction { + // very simple dummy psbt that does not contain anything + // pulled in from github.com-1ecc6299db9ec823/bitcoin-0.27.1/src/util/psbt/mod.rs:238 + + PartiallySignedTransaction { + global: Global { + unsigned_tx: Transaction { + version: 2, + lock_time: 0, + input: vec![], + output: vec![], + }, + xpub: Default::default(), + version: 0, + proprietary: BTreeMap::new(), + unknown: BTreeMap::new(), + }, + inputs: vec![], + outputs: vec![], + } + } + + pub fn dummy_identity() -> Identity { + Identity::new(Seed::default().derive_identity().0) + } + + pub fn dummy_event_id() -> BitMexPriceEventId { + BitMexPriceEventId::with_20_digits(OffsetDateTime::now_utc()) + } } diff --git a/daemon/src/projection.rs b/daemon/src/projection.rs index c198d02..7b7b82d 100644 --- a/daemon/src/projection.rs +++ b/daemon/src/projection.rs @@ -4,7 +4,7 @@ use crate::model; use crate::model::cfd::Cfd as ModelCfd; use crate::model::cfd::OrderId; use crate::model::cfd::Role; -use crate::model::cfd::RollOverProposal; +use crate::model::cfd::RolloverProposal; use crate::model::cfd::SettlementKind; use crate::model::cfd::SettlementProposal; use crate::model::cfd::UpdateCfdProposal; @@ -37,7 +37,7 @@ pub struct UpdateSettlementProposal { /// Amend a given rollover proposal (if `proposal.is_none()`, it should be removed) pub struct UpdateRollOverProposal { pub order: OrderId, - pub proposal: Option<(RollOverProposal, SettlementKind)>, + pub proposal: Option<(RolloverProposal, SettlementKind)>, } /// Store the latest state of `T` for display purposes @@ -446,7 +446,6 @@ pub enum CfdAction { Settle, AcceptSettlement, RejectSettlement, - RollOver, AcceptRollOver, RejectRollOver, } @@ -472,7 +471,7 @@ fn available_actions(state: CfdState, role: Role) -> Vec { vec![CfdAction::Commit] } (CfdState::Open { .. }, Role::Taker) => { - vec![CfdAction::RollOver, CfdAction::Commit, CfdAction::Settle] + vec![CfdAction::Commit, CfdAction::Settle] } (CfdState::Open { .. }, Role::Maker) => vec![CfdAction::Commit], _ => vec![], diff --git a/daemon/src/rollover_maker.rs b/daemon/src/rollover_maker.rs index b85db8c..4e07d8a 100644 --- a/daemon/src/rollover_maker.rs +++ b/daemon/src/rollover_maker.rs @@ -4,7 +4,7 @@ use crate::maker_inc_connections::TakerMessage; use crate::model::cfd::Dlc; use crate::model::cfd::OrderId; use crate::model::cfd::Role; -use crate::model::cfd::RollOverProposal; +use crate::model::cfd::RolloverProposal; use crate::model::cfd::SettlementKind; use crate::model::cfd::UpdateCfdProposal; use crate::model::Identity; @@ -68,7 +68,7 @@ pub struct Actor { oracle_actor: Box>, on_stopping: Vec>>>, projection_actor: xtra::Address, - proposal: RollOverProposal, + proposal: RolloverProposal, } #[async_trait::async_trait] @@ -117,7 +117,7 @@ impl Actor { &(impl MessageChannel> + 'static), ), projection_actor: xtra::Address, - proposal: RollOverProposal, + proposal: RolloverProposal, n_payouts: usize, ) -> Self { Self { diff --git a/daemon/src/rollover_taker.rs b/daemon/src/rollover_taker.rs index 77adc14..418fd54 100644 --- a/daemon/src/rollover_taker.rs +++ b/daemon/src/rollover_taker.rs @@ -1,11 +1,12 @@ use crate::address_map::ActorName; use crate::address_map::Stopping; use crate::connection; +use crate::model::cfd::CannotRollover; use crate::model::cfd::Cfd; use crate::model::cfd::Dlc; use crate::model::cfd::OrderId; use crate::model::cfd::Role; -use crate::model::cfd::RollOverProposal; +use crate::model::cfd::RolloverProposal; use crate::model::cfd::SettlementKind; use crate::model::BitMexPriceEventId; use crate::model::Timestamp; @@ -18,6 +19,8 @@ use crate::setup_contract::RolloverParams; use crate::tokio_ext::spawn_fallible; use crate::wire; use crate::wire::RollOverMsg; +use crate::Tasks; +use anyhow::anyhow; use anyhow::Context; use anyhow::Result; use async_trait::async_trait; @@ -26,9 +29,13 @@ use futures::channel::mpsc::UnboundedSender; use futures::future; use futures::SinkExt; use maia::secp256k1_zkp::schnorrsig; +use std::time::Duration; +use time::OffsetDateTime; use xtra::prelude::MessageChannel; use xtra_productivity::xtra_productivity; +pub const MAX_ROLLOVER_DURATION: Duration = Duration::from_secs(4 * 60); + pub struct Actor { cfd: Cfd, n_payouts: usize, @@ -40,6 +47,7 @@ pub struct Actor { on_completed: Box>, on_stopping: Vec>>>, rollover_msg_sender: Option>, + tasks: Tasks, } impl Actor { @@ -66,10 +74,12 @@ impl Actor { on_completed: on_completed.clone_channel(), on_stopping: vec![on_stopping0.clone_channel(), on_stopping1.clone_channel()], rollover_msg_sender: None, + tasks: Tasks::default(), } } async fn propose(&self, this: xtra::Address) -> Result<()> { + tracing::trace!(order_id=%self.cfd.id(), "Proposing rollover"); self.maker .send(connection::ProposeRollOver { order_id: self.cfd.id(), @@ -79,7 +89,7 @@ impl Actor { .await??; self.update_proposal(Some(( - RollOverProposal { + RolloverProposal { order_id: self.cfd.id(), timestamp: self.timestamp, }, @@ -164,7 +174,7 @@ impl Actor { async fn update_proposal( &self, - proposal: Option<(RollOverProposal, SettlementKind)>, + proposal: Option<(RolloverProposal, SettlementKind)>, ) -> Result<()> { self.projection .send(UpdateRollOverProposal { @@ -186,7 +196,21 @@ impl Actor { #[async_trait] impl xtra::Actor for Actor { async fn started(&mut self, ctx: &mut xtra::Context) { + if let Err(e) = self.cfd.can_roll_over(OffsetDateTime::now_utc()) { + self.complete( + Completed::NoRollover { + order_id: self.cfd.id(), + reason: e, + }, + ctx, + ) + .await; + + return; + } + let this = ctx.address().expect("self to be alive"); + if let Err(e) = self.propose(this).await { self.complete( Completed::Failed { @@ -196,7 +220,27 @@ impl xtra::Actor for Actor { ctx, ) .await; + + return; } + + let cleanup = { + let this = ctx.address().expect("self to be alive"); + async move { + tokio::time::sleep(MAX_ROLLOVER_DURATION).await; + + this.send(RolloverFailed { + error: anyhow!( + "Maker did not react within {} seconds, cleaning up", + MAX_ROLLOVER_DURATION.as_secs() + ), + }) + .await + .expect("can send to ourselves"); + } + }; + + self.tasks.add(cleanup); } async fn stopping(&mut self, ctx: &mut xtra::Context) -> xtra::KeepRunning { @@ -327,6 +371,10 @@ pub enum Completed { order_id: OrderId, error: anyhow::Error, }, + NoRollover { + order_id: OrderId, + reason: CannotRollover, + }, } impl xtra::Message for Completed { diff --git a/daemon/src/routes_maker.rs b/daemon/src/routes_maker.rs index 4bb8072..7565cf4 100644 --- a/daemon/src/routes_maker.rs +++ b/daemon/src/routes_maker.rs @@ -170,11 +170,6 @@ pub async fn post_cfd_action( tracing::error!(msg); return Err(HttpApiProblem::new(StatusCode::BAD_REQUEST).detail(msg)); } - CfdAction::RollOver => { - let msg = "RollOver proposal can only be triggered by taker"; - tracing::error!(msg); - return Err(HttpApiProblem::new(StatusCode::BAD_REQUEST).detail(msg)); - } }; result.unwrap_or_else(|e| anyhow::bail!(e)).map_err(|e| { diff --git a/daemon/src/routes_taker.rs b/daemon/src/routes_taker.rs index 347d266..910b9f9 100644 --- a/daemon/src/routes_taker.rs +++ b/daemon/src/routes_taker.rs @@ -156,11 +156,6 @@ pub async fn post_cfd_action( }) .await } - CfdAction::RollOver => { - cfd_actor - .send(taker_cfd::ProposeRollOver { order_id: id }) - .await - } }; result diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 8b42b67..27cbea3 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -1,5 +1,4 @@ use crate::address_map::AddressMap; -use crate::address_map::Stopping; use crate::cfd_actors::append_cfd_state; use crate::cfd_actors::insert_cfd_and_update_feed; use crate::cfd_actors::{self}; @@ -18,11 +17,10 @@ use crate::model::cfd::Role; use crate::model::Identity; use crate::model::Price; use crate::model::Usd; +use crate::monitor; use crate::monitor::MonitorParams; -use crate::monitor::{self}; use crate::oracle; use crate::projection; -use crate::rollover_taker; use crate::setup_taker; use crate::wallet; use crate::Tasks; @@ -47,10 +45,6 @@ pub struct ProposeSettlement { pub current_price: Price, } -pub struct ProposeRollOver { - pub order_id: OrderId, -} - pub struct Commit { pub order_id: OrderId, } @@ -64,7 +58,6 @@ pub struct Actor { monitor_actor: Address, setup_actors: AddressMap, collab_settlement_actors: AddressMap, - rollover_actors: AddressMap, oracle_actor: Address, n_payouts: usize, tasks: Tasks, @@ -101,7 +94,6 @@ where n_payouts, setup_actors: AddressMap::default(), collab_settlement_actors: AddressMap::default(), - rollover_actors: AddressMap::default(), tasks: Tasks::default(), current_order: None, maker_identity, @@ -418,103 +410,6 @@ where } } -#[xtra_productivity] -impl Actor -where - M: xtra::Handler, - O: xtra::Handler + xtra::Handler, -{ - async fn handle_propose_rollover( - &mut self, - msg: ProposeRollOver, - ctx: &mut Context, - ) -> Result<()> { - let ProposeRollOver { order_id } = msg; - - let disconnected = self - .rollover_actors - .get_disconnected(order_id) - .with_context(|| format!("Rollover for order {} is already in progress", order_id))?; - - let mut conn = self.db.acquire().await?; - let cfd = load_cfd(order_id, &mut conn).await?; - - let this = ctx - .address() - .expect("actor to be able to give address to itself"); - let (addr, fut) = rollover_taker::Actor::new( - (cfd, self.n_payouts), - self.oracle_pk, - self.conn_actor.clone(), - &self.oracle_actor, - self.projection_actor.clone(), - &this, - (&this, &self.conn_actor), - ) - .create(None) - .run(); - - disconnected.insert(addr); - self.tasks.add(fut); - - Ok(()) - } -} - -#[xtra_productivity(message_impl = false)] -impl Actor -where - M: xtra::Handler, - O: xtra::Handler, -{ - async fn handle_rollover_completed(&mut self, msg: rollover_taker::Completed) -> Result<()> { - use rollover_taker::Completed::*; - let (order_id, dlc) = match msg { - UpdatedContract { order_id, dlc } => (order_id, dlc), - Rejected { .. } => { - return Ok(()); - } - Failed { order_id, error } => { - tracing::warn!(%order_id, "Rollover failed: {:#}", error); - return Ok(()); - } - }; - - let mut conn = self.db.acquire().await?; - let mut cfd = load_cfd(order_id, &mut conn).await?; - *cfd.state_mut() = CfdState::Open { - common: CfdStateCommon::default(), - dlc: dlc.clone(), - attestation: None, - collaborative_close: None, - }; - - append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; - - self.monitor_actor - .send(monitor::StartMonitoring { - id: order_id, - params: MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()), - }) - .await?; - - self.oracle_actor - .send(oracle::MonitorAttestation { - event_id: dlc.settlement_event_id, - }) - .await?; - - Ok(()) - } -} - -#[xtra_productivity(message_impl = false)] -impl Actor { - async fn handle_rollover_actor_stopping(&mut self, msg: Stopping) { - self.rollover_actors.gc(msg); - } -} - #[xtra_productivity] impl Actor { async fn handle_current_order(&mut self, msg: CurrentOrder) {