Browse Source

Merge #806

806: Move maker rollover logic to dedicated actor r=rishflab a=rishflab

The rollover actor is spawned the the maker receives a rollover proposal message from the taker. After the rollover is complete, it sends itself
a completion message triggering a "cleanup" handler. Cleanup involves sending messages to maker_cfd actor and the maker_inc_connections to
trigger removal of stored references to the rollover actor.

potential todo:
- [x] Move projection actor to `maker_rollover::Actor` instead of doing all the ui updates in the "maker_cfd::Actor"

Co-authored-by: rishflab <rishflab@hotmail.com>
test-force-close-without-fake-clock
bors[bot] 3 years ago
committed by GitHub
parent
commit
119831bca4
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      daemon/src/lib.rs
  2. 403
      daemon/src/maker_cfd.rs
  3. 22
      daemon/src/maker_inc_connections.rs
  4. 317
      daemon/src/rollover_maker.rs
  5. 5
      daemon/src/rollover_taker.rs
  6. 7
      daemon/src/wire.rs

5
daemon/src/lib.rs

@ -49,6 +49,7 @@ pub mod olivia;
pub mod oracle; pub mod oracle;
pub mod payout_curve; pub mod payout_curve;
pub mod projection; pub mod projection;
pub mod rollover_maker;
pub mod rollover_taker; pub mod rollover_taker;
pub mod routes; pub mod routes;
pub mod seed; pub mod seed;
@ -122,7 +123,9 @@ where
+ xtra::Handler<maker_inc_connections::ConfirmOrder> + xtra::Handler<maker_inc_connections::ConfirmOrder>
+ xtra::Handler<Stopping<setup_maker::Actor>> + xtra::Handler<Stopping<setup_maker::Actor>>
+ xtra::Handler<maker_inc_connections::settlement::Response> + xtra::Handler<maker_inc_connections::settlement::Response>
+ xtra::Handler<Stopping<collab_settlement_maker::Actor>>, + xtra::Handler<Stopping<collab_settlement_maker::Actor>>
+ xtra::Handler<Stopping<rollover_maker::Actor>>
+ xtra::Handler<maker_cfd::RollOverProposed>,
W: xtra::Handler<wallet::BuildPartyParams> W: xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::Sign> + xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::TryBroadcastTransaction>, + xtra::Handler<wallet::TryBroadcastTransaction>,

403
daemon/src/maker_cfd.rs

@ -10,15 +10,11 @@ use crate::maker_inc_connections;
use crate::model::cfd::Cfd; use crate::model::cfd::Cfd;
use crate::model::cfd::CfdState; use crate::model::cfd::CfdState;
use crate::model::cfd::CfdStateCommon; use crate::model::cfd::CfdStateCommon;
use crate::model::cfd::Dlc;
use crate::model::cfd::Order; use crate::model::cfd::Order;
use crate::model::cfd::OrderId; use crate::model::cfd::OrderId;
use crate::model::cfd::Origin; use crate::model::cfd::Origin;
use crate::model::cfd::Role;
use crate::model::cfd::RollOverProposal; use crate::model::cfd::RollOverProposal;
use crate::model::cfd::SettlementKind;
use crate::model::cfd::SettlementProposal; use crate::model::cfd::SettlementProposal;
use crate::model::cfd::UpdateCfdProposal;
use crate::model::Identity; use crate::model::Identity;
use crate::model::Price; use crate::model::Price;
use crate::model::Timestamp; use crate::model::Timestamp;
@ -27,14 +23,10 @@ use crate::monitor::MonitorParams;
use crate::monitor::{self}; use crate::monitor::{self};
use crate::oracle; use crate::oracle;
use crate::projection; use crate::projection;
use crate::projection::try_into_update_rollover_proposal;
use crate::projection::Update; use crate::projection::Update;
use crate::projection::UpdateRollOverProposal; use crate::rollover_maker;
use crate::projection::UpdateSettlementProposal; use crate::rollover_maker::Completed;
use crate::setup_contract;
use crate::setup_contract::RolloverParams;
use crate::setup_maker; use crate::setup_maker;
use crate::tokio_ext::FutureExt;
use crate::wallet; use crate::wallet;
use crate::wire; use crate::wire;
use crate::wire::TakerToMaker; use crate::wire::TakerToMaker;
@ -43,13 +35,8 @@ use anyhow::Context as _;
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::secp256k1::schnorrsig;
use futures::channel::mpsc;
use futures::future;
use futures::future::RemoteHandle;
use futures::SinkExt;
use sqlx::pool::PoolConnection; use sqlx::pool::PoolConnection;
use sqlx::Sqlite; use sqlx::Sqlite;
use std::collections::HashMap;
use std::collections::HashSet; use std::collections::HashSet;
use time::Duration; use time::Duration;
use xtra::prelude::*; use xtra::prelude::*;
@ -74,6 +61,10 @@ pub struct AcceptRollOver {
pub struct RejectRollOver { pub struct RejectRollOver {
pub order_id: OrderId, pub order_id: OrderId,
} }
pub struct RollOverProposed {
pub order_id: OrderId,
pub address: xtra::Address<rollover_maker::Actor>,
}
pub struct Commit { pub struct Commit {
pub order_id: OrderId, pub order_id: OrderId,
} }
@ -92,11 +83,6 @@ pub struct TakerDisconnected {
pub id: Identity, pub id: Identity,
} }
pub struct CfdRollOverCompleted {
pub order_id: OrderId,
pub dlc: Result<Dlc>,
}
pub struct FromTaker { pub struct FromTaker {
pub taker_id: Identity, pub taker_id: Identity,
pub msg: wire::TakerToMaker, pub msg: wire::TakerToMaker,
@ -108,29 +94,18 @@ pub struct Actor<O, M, T, W> {
settlement_interval: Duration, settlement_interval: Duration,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
projection_actor: Address<projection::Actor>, projection_actor: Address<projection::Actor>,
rollover_actors: AddressMap<OrderId, rollover_maker::Actor>,
takers: Address<T>, takers: Address<T>,
current_order: Option<Order>, current_order: Option<Order>,
monitor_actor: Address<M>, monitor_actor: Address<M>,
setup_actors: AddressMap<OrderId, setup_maker::Actor>, setup_actors: AddressMap<OrderId, setup_maker::Actor>,
settlement_actors: AddressMap<OrderId, collab_settlement_maker::Actor>, settlement_actors: AddressMap<OrderId, collab_settlement_maker::Actor>,
roll_over_state: RollOverState,
oracle_actor: Address<O>, oracle_actor: Address<O>,
// Maker needs to also store Identity to be able to send a reply back
current_pending_proposals: HashMap<OrderId, (UpdateCfdProposal, Identity)>,
connected_takers: HashSet<Identity>, connected_takers: HashSet<Identity>,
n_payouts: usize, n_payouts: usize,
tasks: Tasks, tasks: Tasks,
} }
enum RollOverState {
Active {
taker: Identity,
sender: mpsc::UnboundedSender<wire::RollOverMsg>,
_task: RemoteHandle<()>,
},
None,
}
impl<O, M, T, W> Actor<O, M, T, W> { impl<O, M, T, W> Actor<O, M, T, W> {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
@ -150,13 +125,12 @@ impl<O, M, T, W> Actor<O, M, T, W> {
settlement_interval, settlement_interval,
oracle_pk, oracle_pk,
projection_actor, projection_actor,
rollover_actors: AddressMap::default(),
takers, takers,
current_order: None, current_order: None,
monitor_actor, monitor_actor,
setup_actors: AddressMap::default(), setup_actors: AddressMap::default(),
roll_over_state: RollOverState::None,
oracle_actor, oracle_actor,
current_pending_proposals: HashMap::new(),
n_payouts, n_payouts,
connected_takers: HashSet::new(), connected_takers: HashSet::new(),
settlement_actors: AddressMap::default(), settlement_actors: AddressMap::default(),
@ -164,95 +138,6 @@ impl<O, M, T, W> Actor<O, M, T, W> {
} }
} }
async fn handle_propose_roll_over(
&mut self,
proposal: RollOverProposal,
taker_id: Identity,
) -> Result<()> {
tracing::info!(
"Received proposal from the taker {}: {:?} to roll over order {}",
taker_id,
proposal,
proposal.order_id
);
// check if CFD is in open state, otherwise we should not proceed
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(proposal.order_id, &mut conn).await?;
match cfd {
Cfd {
state: CfdState::Open { .. },
..
} => (),
_ => {
anyhow::bail!("Order is in invalid state. Cannot propose roll over.")
}
};
let new_proposal = UpdateCfdProposal::RollOverProposal {
proposal: proposal.clone(),
direction: SettlementKind::Incoming,
};
self.current_pending_proposals
.insert(proposal.order_id, (new_proposal.clone(), taker_id));
self.projection_actor
.send(try_into_update_rollover_proposal(new_proposal)?)
.await?;
Ok(())
}
async fn handle_inc_roll_over_protocol_msg(
&mut self,
taker_id: Identity,
msg: wire::RollOverMsg,
) -> Result<()> {
match &mut self.roll_over_state {
RollOverState::Active { taker, sender, .. } if taker_id == *taker => {
sender.send(msg).await?;
}
RollOverState::Active { taker, .. } => {
anyhow::bail!("Currently rolling over with different taker {}", taker)
}
RollOverState::None => {}
}
Ok(())
}
/// Removes a proposal and updates the update cfd proposals' feed
async fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> {
let removed_proposal = self.current_pending_proposals.remove(order_id);
// Strip the identity, ID doesn't care about this implementation detail
let removed_proposal = removed_proposal.map(|(proposal, _)| proposal);
if let Some(removed_proposal) = removed_proposal {
match removed_proposal {
UpdateCfdProposal::Settlement { .. } => {
self.projection_actor
.send(UpdateSettlementProposal {
order: *order_id,
proposal: None,
})
.await?
}
UpdateCfdProposal::RollOverProposal { .. } => {
self.projection_actor
.send(UpdateRollOverProposal {
order: *order_id,
proposal: None,
})
.await?
}
}
} else {
anyhow::bail!("Could not find proposal with order id: {}", &order_id);
}
Ok(())
}
async fn update_connected_takers(&mut self) -> Result<()> { async fn update_connected_takers(&mut self) -> Result<()> {
self.projection_actor self.projection_actor
.send(Update( .send(Update(
@ -365,6 +250,111 @@ where
} }
} }
#[xtra_productivity]
impl<O, M, T, W> Actor<O, M, T, W> {
async fn handle_accept_rollover(&mut self, msg: AcceptRollOver) -> Result<()> {
if self
.rollover_actors
.send(&msg.order_id, rollover_maker::AcceptRollOver)
.await
.is_err()
{
tracing::warn!(%msg.order_id, "No active rollover");
}
Ok(())
}
async fn handle_reject_rollover(&mut self, msg: RejectRollOver) -> Result<()> {
if self
.rollover_actors
.send(&msg.order_id, rollover_maker::RejectRollOver)
.await
.is_err()
{
tracing::warn!(%msg.order_id, "No active rollover");
}
Ok(())
}
}
impl<O, M, T, W> Actor<O, M, T, W>
where
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
T: xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<Stopping<rollover_maker::Actor>>
+ xtra::Handler<RollOverProposed>,
W: 'static,
Self: xtra::Handler<Stopping<rollover_maker::Actor>>,
{
async fn handle_propose_roll_over(
&mut self,
proposal: RollOverProposal,
taker_id: Identity,
ctx: &mut Context<Self>,
) -> Result<()> {
tracing::info!(
"Received proposal from the taker {}: {:?} to roll over order {}",
taker_id,
proposal,
proposal.order_id
);
// check if CFD is in open state, otherwise we should not proceed
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(proposal.order_id, &mut conn).await?;
match cfd {
Cfd {
state: CfdState::Open { .. },
..
} => (),
_ => {
anyhow::bail!("Order is in invalid state. Cannot propose roll over.")
}
};
let this = ctx.address().expect("acquired own address");
let (rollover_actor_addr, rollover_actor_future) = rollover_maker::Actor::new(
&self.takers,
cfd,
taker_id,
self.oracle_pk,
&this,
&self.oracle_actor,
(&self.takers, &this),
self.projection_actor.clone(),
proposal.clone(),
self.n_payouts,
)
.create(None)
.run();
self.tasks.add(rollover_actor_future);
self.takers
.send(RollOverProposed {
order_id: proposal.order_id,
address: rollover_actor_addr.clone(),
})
.await?;
self.rollover_actors
.insert(proposal.order_id, rollover_actor_addr);
Ok(())
}
}
#[xtra_productivity(message_impl = false)]
impl<O, M, T, W> Actor<O, M, T, W> {
async fn handle_rollover_actor_stopping(&mut self, msg: Stopping<rollover_maker::Actor>) {
self.rollover_actors.gc(msg);
}
}
impl<O, M, T, W> Actor<O, M, T, W> impl<O, M, T, W> Actor<O, M, T, W>
where where
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>, O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
@ -569,41 +559,6 @@ where
Ok(()) Ok(())
} }
async fn handle_reject_roll_over(&mut self, msg: RejectRollOver) -> Result<()> {
let RejectRollOver { order_id } = msg;
tracing::debug!(%order_id, "Maker rejects a roll_over proposal" );
// Validate if order is actually being requested to be extended
let (_, taker_id) = match self.current_pending_proposals.get(&order_id) {
Some((
UpdateCfdProposal::RollOverProposal {
proposal,
direction: SettlementKind::Incoming,
},
taker_id,
)) => (proposal, *taker_id),
_ => {
anyhow::bail!("Order is in invalid state. Ignoring reject roll over request.")
}
};
// clean-up state ahead of sending to ensure consistency in case we fail to deliver the
// message
self.remove_pending_proposal(&order_id)
.await
.context("rejected roll_over")?;
self.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
msg: wire::MakerToTaker::RejectRollOver(order_id),
})
.await??;
Ok(())
}
} }
#[xtra_productivity] #[xtra_productivity]
@ -658,109 +613,6 @@ where
} }
} }
#[xtra_productivity]
impl<O, M, T, W> Actor<O, M, T, W>
where
Self: xtra::Handler<CfdRollOverCompleted>,
O: xtra::Handler<oracle::MonitorAttestation> + xtra::Handler<oracle::GetAnnouncement>,
T: xtra::Handler<maker_inc_connections::TakerMessage>,
W: xtra::Handler<wallet::Sign> + xtra::Handler<wallet::BuildPartyParams>,
{
async fn handle_accept_roll_over(
&mut self,
msg: AcceptRollOver,
ctx: &mut Context<Self>,
) -> Result<()> {
let AcceptRollOver { order_id } = msg;
tracing::debug!(%order_id, "Maker accepts a roll_over proposal" );
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
// Validate if order is actually being requested to be extended
let (proposal, taker_id) = match self.current_pending_proposals.get(&order_id) {
Some((
UpdateCfdProposal::RollOverProposal {
proposal,
direction: SettlementKind::Incoming,
},
taker_id,
)) => (proposal, *taker_id),
_ => {
anyhow::bail!("Order is in invalid state. Ignoring trying to accept the roll over request it.")
}
};
let dlc = cfd.open_dlc().context("CFD was in wrong state")?;
let oracle_event_id = oracle::next_announcement_after(
time::OffsetDateTime::now_utc() + cfd.settlement_interval,
)?;
let announcement = self
.oracle_actor
.send(oracle::GetAnnouncement(oracle_event_id))
.await?
.with_context(|| format!("Announcement {} not found", oracle_event_id))?;
self.takers
.send(maker_inc_connections::TakerMessage {
taker_id,
msg: wire::MakerToTaker::ConfirmRollOver {
order_id: proposal.order_id,
oracle_event_id,
},
})
.await??;
let (sender, receiver) = mpsc::unbounded();
let contract_future = setup_contract::roll_over(
self.takers.clone().into_sink().with(move |msg| {
future::ok(maker_inc_connections::TakerMessage {
taker_id,
msg: wire::MakerToTaker::RollOverProtocol { order_id, msg },
})
}),
receiver,
(self.oracle_pk, announcement),
RolloverParams::new(
cfd.price,
cfd.quantity_usd,
cfd.leverage,
cfd.refund_timelock_in_blocks(),
cfd.fee_rate,
),
Role::Maker,
dlc,
self.n_payouts,
);
let this = ctx
.address()
.expect("actor to be able to give address to itself");
let task = async move {
let dlc = contract_future.await;
this.send(CfdRollOverCompleted { order_id, dlc })
.await
.expect("always connected to ourselves")
}
.spawn_with_handle();
self.roll_over_state = RollOverState::Active {
sender,
taker: taker_id,
_task: task,
};
self.remove_pending_proposal(&order_id)
.await
.context("accepted roll_over")?;
Ok(())
}
}
#[xtra_productivity] #[xtra_productivity]
impl<O, M, T, W> Actor<O, M, T, W> impl<O, M, T, W> Actor<O, M, T, W>
where where
@ -782,13 +634,10 @@ where
M: xtra::Handler<monitor::StartMonitoring>, M: xtra::Handler<monitor::StartMonitoring>,
O: xtra::Handler<oracle::MonitorAttestation>, O: xtra::Handler<oracle::MonitorAttestation>,
{ {
async fn handle_roll_over_completed( async fn handle_roll_over_completed(&mut self, msg: Completed) -> Result<()> {
&mut self, // We handle rollover success in the maker_cfd::Actor instead of the rollover_maker::Actor
order_id: OrderId, // because we do not have access to the DB in the rollover_maker::Actor
dlc: Result<Dlc>, let Completed { order_id, dlc } = msg;
) -> Result<()> {
let dlc = dlc.context("Failed to roll over contract with taker")?;
self.roll_over_state = RollOverState::None;
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
@ -990,14 +839,13 @@ where
} }
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<CfdRollOverCompleted> impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<Completed> for Actor<O, M, T, W>
for Actor<O, M, T, W>
where where
M: xtra::Handler<monitor::StartMonitoring>, M: xtra::Handler<monitor::StartMonitoring>,
O: xtra::Handler<oracle::MonitorAttestation>, O: xtra::Handler<oracle::MonitorAttestation>,
{ {
async fn handle(&mut self, msg: CfdRollOverCompleted, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: Completed, _ctx: &mut Context<Self>) {
log_error!(self.handle_roll_over_completed(msg.order_id, msg.dlc)); log_error!(self.handle_roll_over_completed(msg));
} }
} }
@ -1020,8 +868,10 @@ where
+ xtra::Handler<maker_inc_connections::TakerMessage> + xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder> + xtra::Handler<maker_inc_connections::BroadcastOrder>
+ xtra::Handler<Stopping<setup_maker::Actor>> + xtra::Handler<Stopping<setup_maker::Actor>>
+ xtra::Handler<Stopping<rollover_maker::Actor>>
+ xtra::Handler<maker_inc_connections::settlement::Response> + xtra::Handler<maker_inc_connections::settlement::Response>
+ xtra::Handler<Stopping<collab_settlement_maker::Actor>>, + xtra::Handler<Stopping<collab_settlement_maker::Actor>>
+ xtra::Handler<RollOverProposed>,
W: xtra::Handler<wallet::Sign> W: xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams> + xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::TryBroadcastTransaction>, + xtra::Handler<wallet::TryBroadcastTransaction>,
@ -1069,10 +919,11 @@ where
timestamp, timestamp,
}, },
taker_id, taker_id,
ctx
)) ))
} }
wire::TakerToMaker::RollOverProtocol(msg) => { wire::TakerToMaker::RollOverProtocol { .. } => {
log_error!(self.handle_inc_roll_over_protocol_msg(taker_id, msg)) unreachable!("This kind of message should be sent to the rollover_maker::Actor`")
} }
wire::TakerToMaker::Protocol { .. } => { wire::TakerToMaker::Protocol { .. } => {
unreachable!("This kind of message should be sent to the `setup_maker::Actor`") unreachable!("This kind of message should be sent to the `setup_maker::Actor`")
@ -1103,7 +954,7 @@ impl Message for TakerDisconnected {
type Result = (); type Result = ();
} }
impl Message for CfdRollOverCompleted { impl Message for Completed {
type Result = (); type Result = ();
} }

22
daemon/src/maker_inc_connections.rs

@ -10,6 +10,7 @@ use crate::model::cfd::OrderId;
use crate::model::Identity; use crate::model::Identity;
use crate::noise; use crate::noise;
use crate::noise::TransportStateExt; use crate::noise::TransportStateExt;
use crate::rollover_maker;
use crate::send_to_socket; use crate::send_to_socket;
use crate::setup_maker; use crate::setup_maker;
use crate::tokio_ext::FutureExt; use crate::tokio_ext::FutureExt;
@ -105,6 +106,7 @@ pub struct Actor {
heartbeat_interval: Duration, heartbeat_interval: Duration,
setup_actors: AddressMap<OrderId, setup_maker::Actor>, setup_actors: AddressMap<OrderId, setup_maker::Actor>,
settlement_actors: AddressMap<OrderId, collab_settlement_maker::Actor>, settlement_actors: AddressMap<OrderId, collab_settlement_maker::Actor>,
rollover_actors: AddressMap<OrderId, rollover_maker::Actor>,
connection_tasks: HashMap<Identity, Tasks>, connection_tasks: HashMap<Identity, Tasks>,
} }
@ -125,6 +127,7 @@ impl Actor {
heartbeat_interval, heartbeat_interval,
setup_actors: AddressMap::default(), setup_actors: AddressMap::default(),
settlement_actors: AddressMap::default(), settlement_actors: AddressMap::default(),
rollover_actors: AddressMap::default(),
connection_tasks: HashMap::new(), connection_tasks: HashMap::new(),
} }
} }
@ -330,6 +333,11 @@ impl Actor {
self.drop_taker_connection(&taker_id).await; self.drop_taker_connection(&taker_id).await;
} }
async fn handle_rollover_proposed(&mut self, message: maker_cfd::RollOverProposed) {
self.rollover_actors
.insert(message.order_id, message.address);
}
} }
#[xtra_productivity(message_impl = false)] #[xtra_productivity(message_impl = false)]
@ -345,6 +353,16 @@ impl Actor {
tracing::error!(%order_id, "No active contract setup"); tracing::error!(%order_id, "No active contract setup");
} }
}, },
RollOverProtocol { order_id, msg } => {
if self
.rollover_actors
.send(&order_id, rollover_maker::ProtocolMsg(msg))
.await
.is_err()
{
tracing::warn!(%order_id, "No active rollover actor")
}
}
Settlement { Settlement {
order_id, order_id,
msg: taker_to_maker::Settlement::Initiate { sig_taker }, msg: taker_to_maker::Settlement::Initiate { sig_taker },
@ -368,6 +386,10 @@ impl Actor {
self.setup_actors.gc(message); self.setup_actors.gc(message);
} }
async fn handle_rollover_actor_stopping(&mut self, message: Stopping<rollover_maker::Actor>) {
self.rollover_actors.gc(message);
}
async fn handle_settlement_actor_stopping( async fn handle_settlement_actor_stopping(
&mut self, &mut self,
message: Stopping<collab_settlement_maker::Actor>, message: Stopping<collab_settlement_maker::Actor>,

317
daemon/src/rollover_maker.rs

@ -0,0 +1,317 @@
use crate::address_map::ActorName;
use crate::maker_inc_connections;
use crate::maker_inc_connections::TakerMessage;
use crate::model::cfd::Dlc;
use crate::model::cfd::OrderId;
use crate::model::cfd::Role;
use crate::model::cfd::RollOverProposal;
use crate::model::cfd::SettlementKind;
use crate::model::cfd::UpdateCfdProposal;
use crate::model::Identity;
use crate::oracle;
use crate::oracle::GetAnnouncement;
use crate::projection;
use crate::projection::try_into_update_rollover_proposal;
use crate::projection::UpdateRollOverProposal;
use crate::schnorrsig;
use crate::setup_contract;
use crate::setup_contract::RolloverParams;
use crate::tokio_ext::spawn_fallible;
use crate::wire;
use crate::wire::MakerToTaker;
use crate::wire::RollOverMsg;
use crate::Cfd;
use crate::Stopping;
use anyhow::Context as _;
use anyhow::Result;
use futures::channel::mpsc;
use futures::channel::mpsc::UnboundedSender;
use futures::future;
use futures::SinkExt;
use xtra::prelude::MessageChannel;
use xtra::Context;
use xtra::KeepRunning;
use xtra_productivity::xtra_productivity;
pub struct AcceptRollOver;
pub struct RejectRollOver;
pub struct ProtocolMsg(pub wire::RollOverMsg);
/// Message sent from the spawned task to `rollover_taker::Actor` to
/// notify that rollover has finished successfully.
pub struct RolloverSucceeded {
dlc: Dlc,
}
/// Message sent from the spawned task to `rollover_taker::Actor` to
/// notify that rollover has failed.
pub struct RolloverFailed {
error: anyhow::Error,
}
#[allow(clippy::large_enum_variant)]
pub struct Completed {
pub order_id: OrderId,
pub dlc: Dlc,
}
pub struct Actor {
send_to_taker_actor: Box<dyn MessageChannel<TakerMessage>>,
cfd: Cfd,
taker_id: Identity,
n_payouts: usize,
oracle_pk: schnorrsig::PublicKey,
sent_from_taker: Option<UnboundedSender<RollOverMsg>>,
maker_cfd_actor: Box<dyn MessageChannel<Completed>>,
oracle_actor: Box<dyn MessageChannel<GetAnnouncement>>,
on_stopping: Vec<Box<dyn MessageChannel<Stopping<Self>>>>,
projection_actor: xtra::Address<projection::Actor>,
proposal: RollOverProposal,
}
#[async_trait::async_trait]
impl xtra::Actor for Actor {
async fn stopping(&mut self, ctx: &mut Context<Self>) -> KeepRunning {
let address = ctx.address().expect("acquired own actor address");
for channel in self.on_stopping.iter() {
let _ = channel
.send(Stopping {
me: address.clone(),
})
.await;
}
KeepRunning::StopAll
}
async fn started(&mut self, _ctx: &mut Context<Self>) {
let new_proposal = UpdateCfdProposal::RollOverProposal {
proposal: self.proposal.clone(),
direction: SettlementKind::Incoming,
};
self.projection_actor
.send(
try_into_update_rollover_proposal(new_proposal)
.expect("update cfd proposal is rollover proposal"),
)
.await
.expect("projection actor is running");
}
}
impl Actor {
#[allow(clippy::too_many_arguments)]
pub fn new(
send_to_taker_actor: &(impl MessageChannel<TakerMessage> + 'static),
cfd: Cfd,
taker_id: Identity,
oracle_pk: schnorrsig::PublicKey,
maker_cfd_actor: &(impl MessageChannel<Completed> + 'static),
oracle_actor: &(impl MessageChannel<GetAnnouncement> + 'static),
(on_stopping0, on_stopping1): (
&(impl MessageChannel<Stopping<Self>> + 'static),
&(impl MessageChannel<Stopping<Self>> + 'static),
),
projection_actor: xtra::Address<projection::Actor>,
proposal: RollOverProposal,
n_payouts: usize,
) -> Self {
Self {
send_to_taker_actor: send_to_taker_actor.clone_channel(),
cfd,
taker_id,
n_payouts,
oracle_pk,
sent_from_taker: None,
maker_cfd_actor: maker_cfd_actor.clone_channel(),
oracle_actor: oracle_actor.clone_channel(),
on_stopping: vec![on_stopping0.clone_channel(), on_stopping1.clone_channel()],
projection_actor,
proposal,
}
}
async fn update_contract(&mut self, dlc: Dlc, ctx: &mut xtra::Context<Self>) -> Result<()> {
let msg = Completed {
order_id: self.cfd.id,
dlc,
};
self.maker_cfd_actor.send(msg).await?;
ctx.stop();
Ok(())
}
async fn fail(&mut self, ctx: &mut xtra::Context<Self>, error: anyhow::Error) {
tracing::info!(%self.cfd.id, %error, "Rollover failed");
if let Err(err) = self
.projection_actor
.send(projection::UpdateRollOverProposal {
order: self.cfd.id,
proposal: None,
})
.await
{
tracing::error!(%err, "projection actor unreachable when attempting to fail rollover");
}
ctx.stop();
}
async fn accept(&mut self, ctx: &mut xtra::Context<Self>) -> Result<()> {
let order_id = self.cfd.id;
let (sender, receiver) = mpsc::unbounded();
self.sent_from_taker = Some(sender);
tracing::debug!(%order_id, "Maker accepts a roll_over proposal" );
let cfd = self.cfd.clone();
let dlc = cfd.open_dlc().expect("CFD was in wrong state");
let oracle_event_id = oracle::next_announcement_after(
time::OffsetDateTime::now_utc() + cfd.settlement_interval,
)?;
let taker_id = self.taker_id;
self.send_to_taker_actor
.send(maker_inc_connections::TakerMessage {
taker_id,
msg: wire::MakerToTaker::ConfirmRollOver {
order_id,
oracle_event_id,
},
})
.await??;
self.projection_actor
.send(UpdateRollOverProposal {
order: order_id,
proposal: None,
})
.await?;
let announcement = self
.oracle_actor
.send(oracle::GetAnnouncement(oracle_event_id))
.await?
.with_context(|| format!("Announcement {} not found", oracle_event_id))?;
let rollover_fut = setup_contract::roll_over(
self.send_to_taker_actor.sink().with(move |msg| {
future::ok(maker_inc_connections::TakerMessage {
taker_id,
msg: wire::MakerToTaker::RollOverProtocol { order_id, msg },
})
}),
receiver,
(self.oracle_pk, announcement),
RolloverParams::new(
cfd.price,
cfd.quantity_usd,
cfd.leverage,
cfd.refund_timelock_in_blocks(),
cfd.fee_rate,
),
Role::Maker,
dlc,
self.n_payouts,
);
let this = ctx.address().expect("self to be alive");
spawn_fallible::<_, anyhow::Error>(async move {
let _ = match rollover_fut.await {
Ok(dlc) => this.send(RolloverSucceeded { dlc }).await?,
Err(error) => this.send(RolloverFailed { error }).await?,
};
Ok(())
});
Ok(())
}
async fn reject(&mut self, ctx: &mut xtra::Context<Self>) -> Result<()> {
tracing::info!(%self.cfd.id, "Maker rejects a roll_over proposal" );
self.send_to_taker_actor
.send(TakerMessage {
taker_id: self.taker_id,
msg: MakerToTaker::RejectRollOver(self.cfd.id),
})
.await??;
self.projection_actor
.send(UpdateRollOverProposal {
order: self.cfd.id,
proposal: None,
})
.await?;
ctx.stop();
Ok(())
}
pub async fn forward_protocol_msg(&mut self, msg: ProtocolMsg) -> Result<()> {
let sender = self
.sent_from_taker
.as_mut()
.context("cannot forward message to rollover task")?;
sender.send(msg.0).await?;
Ok(())
}
}
#[xtra_productivity]
impl Actor {
async fn handle_accept_rollover(
&mut self,
_msg: AcceptRollOver,
ctx: &mut xtra::Context<Self>,
) {
if let Err(err) = self.accept(ctx).await {
self.fail(ctx, err).await;
};
}
async fn handle_reject_rollover(
&mut self,
_msg: RejectRollOver,
ctx: &mut xtra::Context<Self>,
) {
if let Err(err) = self.reject(ctx).await {
self.fail(ctx, err).await;
};
}
async fn handle_protocol_msg(&mut self, msg: ProtocolMsg, ctx: &mut xtra::Context<Self>) {
if let Err(err) = self.forward_protocol_msg(msg).await {
self.fail(ctx, err).await;
};
}
async fn handle_rollover_failed(&mut self, msg: RolloverFailed, ctx: &mut xtra::Context<Self>) {
self.fail(ctx, msg.error).await;
}
async fn handle_rollover_succeeded(
&mut self,
msg: RolloverSucceeded,
ctx: &mut xtra::Context<Self>,
) {
if let Err(err) = self.update_contract(msg.dlc.clone(), ctx).await {
self.fail(ctx, err).await;
}
}
}
impl ActorName for Actor {
fn actor_name() -> String {
"Maker rollover".to_string()
}
}

5
daemon/src/rollover_taker.rs

@ -113,8 +113,9 @@ impl Actor {
self.rollover_msg_sender = Some(sender); self.rollover_msg_sender = Some(sender);
let rollover_fut = setup_contract::roll_over( let rollover_fut = setup_contract::roll_over(
xtra::message_channel::MessageChannel::sink(&self.maker) xtra::message_channel::MessageChannel::sink(&self.maker).with(move |msg| {
.with(|msg| future::ok(wire::TakerToMaker::RollOverProtocol(msg))), future::ok(wire::TakerToMaker::RollOverProtocol { order_id, msg })
}),
receiver, receiver,
(self.oracle_pk, announcement), (self.oracle_pk, announcement),
RolloverParams::new( RolloverParams::new(

7
daemon/src/wire.rs

@ -86,7 +86,10 @@ pub enum TakerToMaker {
order_id: OrderId, order_id: OrderId,
msg: SetupMsg, msg: SetupMsg,
}, },
RollOverProtocol(RollOverMsg), RollOverProtocol {
order_id: OrderId,
msg: RollOverMsg,
},
Settlement { Settlement {
order_id: OrderId, order_id: OrderId,
msg: taker_to_maker::Settlement, msg: taker_to_maker::Settlement,
@ -99,7 +102,7 @@ impl fmt::Display for TakerToMaker {
TakerToMaker::TakeOrder { .. } => write!(f, "TakeOrder"), TakerToMaker::TakeOrder { .. } => write!(f, "TakeOrder"),
TakerToMaker::Protocol { .. } => write!(f, "Protocol"), TakerToMaker::Protocol { .. } => write!(f, "Protocol"),
TakerToMaker::ProposeRollOver { .. } => write!(f, "ProposeRollOver"), TakerToMaker::ProposeRollOver { .. } => write!(f, "ProposeRollOver"),
TakerToMaker::RollOverProtocol(_) => write!(f, "RollOverProtocol"), TakerToMaker::RollOverProtocol { .. } => write!(f, "RollOverProtocol"),
TakerToMaker::Settlement { .. } => write!(f, "Settlement"), TakerToMaker::Settlement { .. } => write!(f, "Settlement"),
TakerToMaker::Hello(_) => write!(f, "Hello"), TakerToMaker::Hello(_) => write!(f, "Hello"),
} }

Loading…
Cancel
Save