Browse Source

Merge #796

796: Introduce short-lived `rollover_taker::Actor` r=luckysori a=luckysori

Very similar to the other ones we've done.

Things to consider:
- Shall we consolidate the usage of "rollover" over "roll over" across the entire codebase? I don't think I have been able to stay consistent even in this PR :(
- I've added an `OrderId` to the `wire::MakerToTaker::ConfirmRollOver` message, because the maker could be talking about more than one rollover.
- The `taker_cfd::Actor` no longer handles the raw `wire::MakerToTaker` enumeration of messages 🎉 After creating all these short-lived actors it only needs to handle a dedicated `CurrentOrder` message originating from the maker.
- Error handling is worth reviewing in detail as I've messed that up in previous related PRs.

Co-authored-by: Lucas Soriano del Pino <l.soriano.del.pino@gmail.com>
feature/force-stop-button
bors[bot] 3 years ago
committed by GitHub
parent
commit
9cb728e60a
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 79
      daemon/src/connection.rs
  2. 3
      daemon/src/lib.rs
  3. 2
      daemon/src/maker_cfd.rs
  4. 318
      daemon/src/rollover_taker.rs
  5. 306
      daemon/src/taker_cfd.rs
  6. 7
      daemon/src/wire.rs

79
daemon/src/connection.rs

@ -1,9 +1,13 @@
use crate::address_map::{AddressMap, Stopping}; use crate::address_map::{AddressMap, Stopping};
use crate::model::cfd::OrderId; use crate::model::cfd::OrderId;
use crate::model::{Identity, Price, Timestamp, Usd}; use crate::model::{Identity, Price, Timestamp, Usd};
use crate::taker_cfd::CurrentOrder;
use crate::tokio_ext::FutureExt; use crate::tokio_ext::FutureExt;
use crate::wire::{EncryptedJsonCodec, TakerToMaker, Version}; use crate::wire::{EncryptedJsonCodec, TakerToMaker, Version};
use crate::{collab_settlement_taker, log_error, noise, send_to_socket, setup_taker, wire, Tasks}; use crate::{
collab_settlement_taker, log_error, noise, rollover_taker, send_to_socket, setup_taker, wire,
Tasks,
};
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use bdk::bitcoin::Amount; use bdk::bitcoin::Amount;
use futures::{SinkExt, StreamExt, TryStreamExt}; use futures::{SinkExt, StreamExt, TryStreamExt};
@ -31,13 +35,14 @@ pub struct Actor {
send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker>>, send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker>>,
send_to_maker_ctx: xtra::Context<send_to_socket::Actor<wire::TakerToMaker>>, send_to_maker_ctx: xtra::Context<send_to_socket::Actor<wire::TakerToMaker>>,
identity_sk: x25519_dalek::StaticSecret, identity_sk: x25519_dalek::StaticSecret,
maker_to_taker: Box<dyn MessageChannel<wire::MakerToTaker>>, current_order: Box<dyn MessageChannel<CurrentOrder>>,
/// Max duration since the last heartbeat until we die. /// Max duration since the last heartbeat until we die.
heartbeat_timeout: Duration, heartbeat_timeout: Duration,
connect_timeout: Duration, connect_timeout: Duration,
connected_state: Option<ConnectedState>, connected_state: Option<ConnectedState>,
setup_actors: HashMap<OrderId, xtra::Address<setup_taker::Actor>>, setup_actors: HashMap<OrderId, xtra::Address<setup_taker::Actor>>,
collab_settlement_actors: AddressMap<OrderId, collab_settlement_taker::Actor>, collab_settlement_actors: AddressMap<OrderId, collab_settlement_taker::Actor>,
rollover_actors: AddressMap<OrderId, rollover_taker::Actor>,
} }
pub struct Connect { pub struct Connect {
@ -90,10 +95,16 @@ pub struct ProposeSettlement {
pub address: xtra::Address<collab_settlement_taker::Actor>, pub address: xtra::Address<collab_settlement_taker::Actor>,
} }
pub struct ProposeRollOver {
pub order_id: OrderId,
pub timestamp: Timestamp,
pub address: xtra::Address<rollover_taker::Actor>,
}
impl Actor { impl Actor {
pub fn new( pub fn new(
status_sender: watch::Sender<ConnectionStatus>, status_sender: watch::Sender<ConnectionStatus>,
maker_to_taker: Box<dyn MessageChannel<wire::MakerToTaker>>, current_order: &(impl MessageChannel<CurrentOrder> + 'static),
identity_sk: x25519_dalek::StaticSecret, identity_sk: x25519_dalek::StaticSecret,
hearthbeat_timeout: Duration, hearthbeat_timeout: Duration,
connect_timeout: Duration, connect_timeout: Duration,
@ -105,12 +116,13 @@ impl Actor {
send_to_maker: Box::new(send_to_maker_addr), send_to_maker: Box::new(send_to_maker_addr),
send_to_maker_ctx, send_to_maker_ctx,
identity_sk, identity_sk,
maker_to_taker, current_order: current_order.clone_channel(),
heartbeat_timeout: hearthbeat_timeout, heartbeat_timeout: hearthbeat_timeout,
connected_state: None, connected_state: None,
setup_actors: HashMap::new(), setup_actors: HashMap::new(),
connect_timeout, connect_timeout,
collab_settlement_actors: AddressMap::default(), collab_settlement_actors: AddressMap::default(),
rollover_actors: AddressMap::default(),
} }
} }
} }
@ -127,6 +139,10 @@ impl Actor {
) { ) {
self.collab_settlement_actors.gc(message); self.collab_settlement_actors.gc(message);
} }
async fn handle_rollover_actor_stopping(&mut self, message: Stopping<rollover_taker::Actor>) {
self.rollover_actors.gc(message);
}
} }
#[xtra_productivity] #[xtra_productivity]
@ -170,6 +186,25 @@ impl Actor {
Ok(()) Ok(())
} }
async fn handle_propose_roll_over(&mut self, msg: ProposeRollOver) -> Result<()> {
let ProposeRollOver {
order_id,
timestamp,
address,
} = msg;
self.send_to_maker
.send(wire::TakerToMaker::ProposeRollOver {
order_id,
timestamp,
})
.await?;
self.rollover_actors.insert(order_id, address);
Ok(())
}
} }
#[xtra_productivity] #[xtra_productivity]
@ -338,13 +373,41 @@ impl Actor {
tracing::warn!(%order_id, "No active collaborative settlement"); tracing::warn!(%order_id, "No active collaborative settlement");
} }
} }
wire::MakerToTaker::ConfirmRollOver {
order_id,
oracle_event_id,
} => {
if !self
.rollover_actors
.send(
&order_id,
rollover_taker::RollOverAccepted { oracle_event_id },
)
.await
{
tracing::warn!(%order_id, "No active rollover");
}
}
wire::MakerToTaker::RejectRollOver(order_id) => {
if !self
.rollover_actors
.send(&order_id, rollover_taker::RollOverRejected)
.await
{
tracing::warn!(%order_id, "No active rollover");
}
}
wire::MakerToTaker::RollOverProtocol { order_id, msg } => {
if !self.rollover_actors.send(&order_id, msg).await {
tracing::warn!(%order_id, "No active rollover");
}
}
wire::MakerToTaker::CurrentOrder(msg) => {
log_error!(self.current_order.send(CurrentOrder(msg)));
}
wire::MakerToTaker::Hello(_) => { wire::MakerToTaker::Hello(_) => {
tracing::warn!("Ignoring unexpected Hello message from maker. Hello is only expected when opening a new connection.") tracing::warn!("Ignoring unexpected Hello message from maker. Hello is only expected when opening a new connection.")
} }
other => {
// this one should go to the taker cfd actor
log_error!(self.maker_to_taker.send(other));
}
} }
KeepRunning::Yes KeepRunning::Yes
} }

3
daemon/src/lib.rs

@ -41,6 +41,7 @@ pub mod olivia;
pub mod oracle; pub mod oracle;
pub mod payout_curve; pub mod payout_curve;
pub mod projection; pub mod projection;
pub mod rollover_taker;
pub mod routes; pub mod routes;
pub mod seed; pub mod seed;
pub mod send_to_socket; pub mod send_to_socket;
@ -270,7 +271,7 @@ where
tasks.add(connection_actor_ctx.run(connection::Actor::new( tasks.add(connection_actor_ctx.run(connection::Actor::new(
maker_online_status_feed_sender, maker_online_status_feed_sender,
Box::new(cfd_actor_addr.clone()), &cfd_actor_addr,
identity_sk, identity_sk,
maker_heartbeat_interval, maker_heartbeat_interval,
connect_timeout, connect_timeout,

2
daemon/src/maker_cfd.rs

@ -729,7 +729,7 @@ where
self.takers.clone().into_sink().with(move |msg| { self.takers.clone().into_sink().with(move |msg| {
future::ok(maker_inc_connections::TakerMessage { future::ok(maker_inc_connections::TakerMessage {
taker_id, taker_id,
msg: wire::MakerToTaker::RollOverProtocol(msg), msg: wire::MakerToTaker::RollOverProtocol { order_id, msg },
}) })
}), }),
receiver, receiver,

318
daemon/src/rollover_taker.rs

@ -0,0 +1,318 @@
use crate::address_map::Stopping;
use crate::connection;
use crate::model::cfd::{Cfd, Dlc, OrderId, Role, RollOverProposal, SettlementKind};
use crate::model::{BitMexPriceEventId, Timestamp};
use crate::oracle::{self, GetAnnouncement};
use crate::projection::{self, UpdateRollOverProposal};
use crate::setup_contract::{self, RolloverParams};
use crate::tokio_ext::spawn_fallible;
use crate::wire::{self, RollOverMsg};
use anyhow::{Context, Result};
use async_trait::async_trait;
use futures::channel::mpsc::{self, UnboundedSender};
use futures::{future, SinkExt};
use maia::secp256k1_zkp::schnorrsig;
use xtra::prelude::MessageChannel;
use xtra_productivity::xtra_productivity;
pub struct Actor {
cfd: Cfd,
n_payouts: usize,
oracle_pk: schnorrsig::PublicKey,
timestamp: Timestamp,
maker: xtra::Address<connection::Actor>,
get_announcement: Box<dyn MessageChannel<GetAnnouncement>>,
projection: xtra::Address<projection::Actor>,
on_completed: Box<dyn MessageChannel<Completed>>,
on_stopping: Vec<Box<dyn MessageChannel<Stopping<Self>>>>,
rollover_msg_sender: Option<UnboundedSender<RollOverMsg>>,
}
impl Actor {
pub fn new(
(cfd, n_payouts): (Cfd, usize),
oracle_pk: schnorrsig::PublicKey,
maker: xtra::Address<connection::Actor>,
get_announcement: &(impl MessageChannel<GetAnnouncement> + 'static),
projection: xtra::Address<projection::Actor>,
on_completed: &(impl MessageChannel<Completed> + 'static),
(on_stopping0, on_stopping1): (
&(impl MessageChannel<Stopping<Self>> + 'static),
&(impl MessageChannel<Stopping<Self>> + 'static),
),
) -> Self {
Self {
cfd,
n_payouts,
oracle_pk,
timestamp: Timestamp::now(),
maker,
get_announcement: get_announcement.clone_channel(),
projection,
on_completed: on_completed.clone_channel(),
on_stopping: vec![on_stopping0.clone_channel(), on_stopping1.clone_channel()],
rollover_msg_sender: None,
}
}
async fn propose(&self, this: xtra::Address<Self>) -> Result<()> {
self.maker
.send(connection::ProposeRollOver {
order_id: self.cfd.order.id,
timestamp: self.timestamp,
address: this,
})
.await??;
self.update_proposal(Some((
RollOverProposal {
order_id: self.cfd.order.id,
timestamp: self.timestamp,
},
SettlementKind::Outgoing,
)))
.await?;
Ok(())
}
async fn handle_confirmed(
&mut self,
msg: RollOverAccepted,
ctx: &mut xtra::Context<Self>,
) -> Result<()> {
let RollOverAccepted { oracle_event_id } = msg;
let announcement = self
.get_announcement
.send(oracle::GetAnnouncement(oracle_event_id))
.await?
.with_context(|| format!("Announcement {} not found", oracle_event_id))?;
let order_id = self.cfd.order.id;
tracing::info!(%order_id, "Rollover proposal got accepted");
self.update_proposal(None).await?;
let (sender, receiver) = mpsc::unbounded::<RollOverMsg>();
// store the writing end to forward messages from the maker to
// the spawned rollover task
self.rollover_msg_sender = Some(sender);
let rollover_fut = setup_contract::roll_over(
xtra::message_channel::MessageChannel::sink(&self.maker)
.with(|msg| future::ok(wire::TakerToMaker::RollOverProtocol(msg))),
receiver,
(self.oracle_pk, announcement),
RolloverParams::new(
self.cfd.order.price,
self.cfd.quantity_usd,
self.cfd.order.leverage,
self.cfd.refund_timelock_in_blocks(),
),
Role::Taker,
self.cfd.dlc().context("No DLC in CFD")?,
self.n_payouts,
);
let this = ctx.address().expect("self to be alive");
spawn_fallible::<_, anyhow::Error>(async move {
let _ = match rollover_fut.await {
Ok(dlc) => this.send(RolloverSucceeded { dlc }).await?,
Err(error) => this.send(RolloverFailed { error }).await?,
};
Ok(())
});
Ok(())
}
async fn handle_rejected(&self) -> Result<()> {
let order_id = self.cfd.order.id;
tracing::info!(%order_id, "Rollover proposal got rejected");
self.update_proposal(None).await?;
Ok(())
}
pub async fn forward_protocol_msg(&mut self, msg: wire::RollOverMsg) -> Result<()> {
let sender = self
.rollover_msg_sender
.as_mut()
.context("Cannot forward message to rollover task")?;
sender.send(msg).await?;
Ok(())
}
async fn update_proposal(
&self,
proposal: Option<(RollOverProposal, SettlementKind)>,
) -> Result<()> {
self.projection
.send(UpdateRollOverProposal {
order: self.cfd.order.id,
proposal,
})
.await?;
Ok(())
}
async fn complete(&mut self, completed: Completed, ctx: &mut xtra::Context<Self>) {
let _ = self.on_completed.send(completed).await;
ctx.stop();
}
}
#[async_trait]
impl xtra::Actor for Actor {
async fn started(&mut self, ctx: &mut xtra::Context<Self>) {
let this = ctx.address().expect("self to be alive");
if let Err(e) = self.propose(this).await {
self.complete(
Completed::Failed {
order_id: self.cfd.order.id,
error: e,
},
ctx,
)
.await;
}
}
async fn stopping(&mut self, ctx: &mut xtra::Context<Self>) -> xtra::KeepRunning {
// inform other actors that we are stopping so that our
// address can be GCd from their AddressMaps
let me = ctx.address().expect("we are still alive");
for channel in self.on_stopping.iter() {
let _ = channel.send(Stopping { me: me.clone() }).await;
}
xtra::KeepRunning::StopAll
}
}
#[xtra_productivity]
impl Actor {
pub async fn handle_confirm_rollover(
&mut self,
msg: RollOverAccepted,
ctx: &mut xtra::Context<Self>,
) {
if let Err(error) = self.handle_confirmed(msg, ctx).await {
self.complete(
Completed::Failed {
order_id: self.cfd.order.id,
error,
},
ctx,
)
.await;
}
}
pub async fn reject_rollover(&mut self, _: RollOverRejected, ctx: &mut xtra::Context<Self>) {
let order_id = self.cfd.order.id;
let completed = if let Err(error) = self.handle_rejected().await {
Completed::Failed { order_id, error }
} else {
Completed::Rejected { order_id }
};
self.complete(completed, ctx).await;
}
pub async fn handle_rollover_succeeded(
&mut self,
msg: RolloverSucceeded,
ctx: &mut xtra::Context<Self>,
) {
self.complete(
Completed::UpdatedContract {
order_id: self.cfd.order.id,
dlc: msg.dlc,
},
ctx,
)
.await;
}
pub async fn handle_rollover_failed(
&mut self,
msg: RolloverFailed,
ctx: &mut xtra::Context<Self>,
) {
self.complete(
Completed::Failed {
order_id: self.cfd.order.id,
error: msg.error,
},
ctx,
)
.await;
}
pub async fn handle_protocol_msg(
&mut self,
msg: wire::RollOverMsg,
ctx: &mut xtra::Context<Self>,
) {
if let Err(error) = self.forward_protocol_msg(msg).await {
self.complete(
Completed::Failed {
order_id: self.cfd.order.id,
error,
},
ctx,
)
.await;
}
}
}
/// Message sent from the `connection::Actor` to the
/// `rollover_taker::Actor` to notify that the maker has accepted the
/// rollover proposal.
pub struct RollOverAccepted {
pub oracle_event_id: BitMexPriceEventId,
}
/// Message sent from the `connection::Actor` to the
/// `rollover_taker::Actor` to notify that the maker has rejected the
/// rollover proposal.
pub struct RollOverRejected;
/// Message sent from the spawned task to `rollover_taker::Actor` to
/// notify that rollover has finished successfully.
pub struct RolloverSucceeded {
dlc: Dlc,
}
/// Message sent from the spawned task to `rollover_taker::Actor` to
/// notify that rollover has failed.
pub struct RolloverFailed {
error: anyhow::Error,
}
#[allow(clippy::large_enum_variant)]
pub enum Completed {
UpdatedContract {
order_id: OrderId,
dlc: Dlc,
},
Rejected {
order_id: OrderId,
},
Failed {
order_id: OrderId,
error: anyhow::Error,
},
}
impl xtra::Message for Completed {
type Result = Result<()>;
}

306
daemon/src/taker_cfd.rs

@ -1,33 +1,22 @@
use crate::address_map::AddressMap; use crate::address_map::{AddressMap, Stopping};
use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_update_feed}; use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_update_feed};
use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id};
use crate::model::cfd::{ use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Completed, Order, OrderId, Origin, Role};
Cfd, CfdState, CfdStateCommon, Completed, Dlc, Order, OrderId, Origin, Role, RollOverProposal, use crate::model::{Price, Usd};
SettlementKind, UpdateCfdProposal, UpdateCfdProposals,
};
use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd};
use crate::monitor::{self, MonitorParams}; use crate::monitor::{self, MonitorParams};
use crate::projection::{
try_into_update_rollover_proposal, UpdateRollOverProposal, UpdateSettlementProposal,
};
use crate::setup_contract::RolloverParams;
use crate::tokio_ext::FutureExt;
use crate::wire::RollOverMsg;
use crate::{ use crate::{
collab_settlement_taker, connection, log_error, oracle, projection, setup_contract, collab_settlement_taker, connection, log_error, oracle, projection, rollover_taker,
setup_taker, wallet, wire, Tasks, setup_taker, wallet, Tasks,
}; };
use anyhow::{bail, Context as _, Result}; use anyhow::{bail, Context as _, Result};
use async_trait::async_trait; use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::secp256k1::schnorrsig;
use futures::channel::mpsc;
use futures::future::RemoteHandle;
use futures::{future, SinkExt};
use std::collections::HashMap;
use xtra::prelude::*; use xtra::prelude::*;
use xtra::Actor as _; use xtra::Actor as _;
use xtra_productivity::xtra_productivity; use xtra_productivity::xtra_productivity;
pub struct CurrentOrder(pub Option<Order>);
pub struct TakeOffer { pub struct TakeOffer {
pub order_id: OrderId, pub order_id: OrderId,
pub quantity: Usd, pub quantity: Usd,
@ -46,19 +35,6 @@ pub struct Commit {
pub order_id: OrderId, pub order_id: OrderId,
} }
pub struct CfdRollOverCompleted {
pub order_id: OrderId,
pub dlc: Result<Dlc>,
}
enum RollOverState {
Active {
sender: mpsc::UnboundedSender<RollOverMsg>,
_task: RemoteHandle<()>,
},
None,
}
pub struct Actor<O, M, W> { pub struct Actor<O, M, W> {
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Address<W>, wallet: Address<W>,
@ -68,9 +44,8 @@ pub struct Actor<O, M, W> {
monitor_actor: Address<M>, monitor_actor: Address<M>,
setup_actors: AddressMap<OrderId, setup_taker::Actor>, setup_actors: AddressMap<OrderId, setup_taker::Actor>,
collab_settlement_actors: AddressMap<OrderId, collab_settlement_taker::Actor>, collab_settlement_actors: AddressMap<OrderId, collab_settlement_taker::Actor>,
roll_over_state: RollOverState, rollover_actors: AddressMap<OrderId, rollover_taker::Actor>,
oracle_actor: Address<O>, oracle_actor: Address<O>,
current_pending_proposals: UpdateCfdProposals,
n_payouts: usize, n_payouts: usize,
tasks: Tasks, tasks: Tasks,
} }
@ -99,48 +74,16 @@ where
projection_actor, projection_actor,
conn_actor, conn_actor,
monitor_actor, monitor_actor,
roll_over_state: RollOverState::None,
oracle_actor, oracle_actor,
current_pending_proposals: HashMap::new(),
n_payouts, n_payouts,
setup_actors: AddressMap::default(), setup_actors: AddressMap::default(),
collab_settlement_actors: AddressMap::default(), collab_settlement_actors: AddressMap::default(),
rollover_actors: AddressMap::default(),
tasks: Tasks::default(), tasks: Tasks::default(),
} }
} }
} }
impl<O, M, W> Actor<O, M, W> {
/// Removes a proposal and updates the update cfd proposals' feed
async fn remove_pending_proposal(&mut self, order_id: &OrderId) -> Result<()> {
let removed_proposal = self.current_pending_proposals.remove(order_id);
if let Some(removed_proposal) = removed_proposal {
match removed_proposal {
UpdateCfdProposal::Settlement { .. } => {
self.projection_actor
.send(UpdateSettlementProposal {
order: *order_id,
proposal: None,
})
.await?
}
UpdateCfdProposal::RollOverProposal { .. } => {
self.projection_actor
.send(UpdateRollOverProposal {
order: *order_id,
proposal: None,
})
.await?
}
}
} else {
anyhow::bail!("Could not find proposal with order id: {}", &order_id);
}
Ok(())
}
}
#[xtra_productivity] #[xtra_productivity]
impl<O, M, W> Actor<O, M, W> impl<O, M, W> Actor<O, M, W>
where where
@ -156,38 +99,6 @@ where
Ok(()) Ok(())
} }
async fn handle_propose_roll_over(&mut self, msg: ProposeRollOver) -> Result<()> {
let ProposeRollOver { order_id } = msg;
if self.current_pending_proposals.contains_key(&order_id) {
anyhow::bail!("An update for order id {} is already in progress", order_id)
}
let proposal = RollOverProposal {
order_id,
timestamp: Timestamp::now(),
};
let new_proposal = UpdateCfdProposal::RollOverProposal {
proposal: proposal.clone(),
direction: SettlementKind::Outgoing,
};
self.current_pending_proposals
.insert(proposal.order_id, new_proposal.clone());
self.projection_actor
.send(try_into_update_rollover_proposal(new_proposal)?)
.await?;
self.conn_actor
.send(wire::TakerToMaker::ProposeRollOver {
order_id: proposal.order_id,
timestamp: proposal.timestamp,
})
.await?;
Ok(())
}
async fn handle_propose_settlement( async fn handle_propose_settlement(
&mut self, &mut self,
msg: ProposeSettlement, msg: ProposeSettlement,
@ -263,36 +174,6 @@ where
} }
} }
impl<O, M, W> Actor<O, M, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>,
{
async fn handle_roll_over_rejected(&mut self, order_id: OrderId) -> Result<()> {
tracing::info!(%order_id, "Roll over proposal got rejected");
self.remove_pending_proposal(&order_id)
.await
.context("rejected settlement")?;
Ok(())
}
async fn handle_inc_roll_over_msg(&mut self, msg: RollOverMsg) -> Result<()> {
match &mut self.roll_over_state {
RollOverState::Active { sender, .. } => {
sender.send(msg).await?;
}
RollOverState::None => {
anyhow::bail!("Received message without an active roll_over setup")
}
}
Ok(())
}
}
impl<O, M, W> Actor<O, M, W> { impl<O, M, W> Actor<O, M, W> {
async fn handle_new_order(&mut self, order: Option<Order>) -> Result<()> { async fn handle_new_order(&mut self, order: Option<Order>) -> Result<()> {
tracing::trace!("new order {:?}", order); tracing::trace!("new order {:?}", order);
@ -510,92 +391,67 @@ where
} }
} }
impl<O: 'static, M: 'static, W: 'static> Actor<O, M, W> #[xtra_productivity]
impl<O, M, W> Actor<O, M, W>
where where
Self: xtra::Handler<CfdRollOverCompleted>, M: xtra::Handler<monitor::StartMonitoring>,
O: xtra::Handler<oracle::GetAnnouncement>, O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>,
{ {
async fn handle_roll_over_accepted( async fn handle_propose_rollover(
&mut self, &mut self,
order_id: OrderId, msg: ProposeRollOver,
oracle_event_id: BitMexPriceEventId,
ctx: &mut Context<Self>, ctx: &mut Context<Self>,
) -> Result<()> { ) -> Result<()> {
tracing::info!(%order_id, "Roll; over request got accepted"); let ProposeRollOver { order_id } = msg;
let (sender, receiver) = mpsc::unbounded();
if let RollOverState::Active { .. } = self.roll_over_state { let disconnected = self
anyhow::bail!("Already rolling over a contract!") .rollover_actors
} .get_disconnected(order_id)
.with_context(|| format!("Rollover for order {} is already in progress", order_id))?;
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let dlc = cfd.open_dlc().context("CFD was in wrong state")?;
let announcement = self
.oracle_actor
.send(oracle::GetAnnouncement(oracle_event_id))
.await?
.with_context(|| format!("Announcement {} not found", oracle_event_id))?;
let contract_future = setup_contract::roll_over(
xtra::message_channel::MessageChannel::sink(&self.conn_actor)
.with(|msg| future::ok(wire::TakerToMaker::RollOverProtocol(msg))),
receiver,
(self.oracle_pk, announcement),
RolloverParams::new(
cfd.order.price,
cfd.quantity_usd,
cfd.order.leverage,
cfd.refund_timelock_in_blocks(),
),
Role::Taker,
dlc,
self.n_payouts,
);
let this = ctx let this = ctx
.address() .address()
.expect("actor to be able to give address to itself"); .expect("actor to be able to give address to itself");
let (addr, fut) = rollover_taker::Actor::new(
(cfd, self.n_payouts),
self.oracle_pk,
self.conn_actor.clone(),
&self.oracle_actor,
self.projection_actor.clone(),
&this,
(&this, &self.conn_actor),
)
.create(None)
.run();
let task = async move { disconnected.insert(addr);
let dlc = contract_future.await; self.tasks.add(fut);
this.send(CfdRollOverCompleted { order_id, dlc })
.await
.expect("always connected to ourselves")
}
.spawn_with_handle();
self.roll_over_state = RollOverState::Active {
sender,
_task: task,
};
self.remove_pending_proposal(&order_id)
.await
.context("Could not remove accepted roll over")?;
Ok(()) Ok(())
} }
} }
impl<O: 'static, M: 'static, W: 'static> Actor<O, M, W> #[xtra_productivity(message_impl = false)]
impl<O, M, W> Actor<O, M, W>
where where
M: xtra::Handler<monitor::StartMonitoring>, M: xtra::Handler<monitor::StartMonitoring>,
O: xtra::Handler<oracle::MonitorAttestation>, O: xtra::Handler<oracle::MonitorAttestation>,
{ {
async fn handle_roll_over_completed( async fn handle_rollover_completed(&mut self, msg: rollover_taker::Completed) -> Result<()> {
&mut self, use rollover_taker::Completed::*;
order_id: OrderId, let (order_id, dlc) = match msg {
dlc: Result<Dlc>, UpdatedContract { order_id, dlc } => (order_id, dlc),
) -> Result<()> { Rejected { .. } => {
let dlc = dlc.context("Failed to roll over contract with maker")?; return Ok(());
self.roll_over_state = RollOverState::None; }
Failed { order_id, error } => {
tracing::warn!(%order_id, "Rollover failed: {:#}", error);
return Ok(());
}
};
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
@ -625,48 +481,17 @@ where
} }
} }
#[async_trait] #[xtra_productivity(message_impl = false)]
impl<O: 'static, M: 'static, W: 'static> Handler<wire::MakerToTaker> for Actor<O, M, W> impl<O, M, W> Actor<O, M, W> {
where async fn handle_rollover_actor_stopping(&mut self, msg: Stopping<rollover_taker::Actor>) {
Self: xtra::Handler<CfdRollOverCompleted>, self.rollover_actors.gc(msg);
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>, }
W: xtra::Handler<wallet::TryBroadcastTransaction> }
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>, #[xtra_productivity]
{ impl<O, M, W> Actor<O, M, W> {
async fn handle(&mut self, msg: wire::MakerToTaker, ctx: &mut Context<Self>) { async fn handle_current_order(&mut self, msg: CurrentOrder) {
match msg { log_error!(self.handle_new_order(msg.0));
wire::MakerToTaker::CurrentOrder(current_order) => {
log_error!(self.handle_new_order(current_order))
}
wire::MakerToTaker::ConfirmRollOver {
order_id,
oracle_event_id,
} => {
log_error!(self.handle_roll_over_accepted(order_id, oracle_event_id, ctx))
}
wire::MakerToTaker::RejectRollOver(order_id) => {
log_error!(self.handle_roll_over_rejected(order_id))
}
wire::MakerToTaker::RollOverProtocol(roll_over_msg) => {
log_error!(self.handle_inc_roll_over_msg(roll_over_msg))
}
wire::MakerToTaker::Heartbeat => {
unreachable!("Heartbeats should be handled somewhere else")
}
wire::MakerToTaker::ConfirmOrder(_)
| wire::MakerToTaker::RejectOrder(_)
| wire::MakerToTaker::Protocol { .. }
| wire::MakerToTaker::InvalidOrderId(_) => {
unreachable!("These messages should be sent to the `setup_taker::Actor`")
}
wire::MakerToTaker::Settlement { .. } => {
unreachable!("These messages should be sent to the `collab_settlement::Actor`")
}
wire::MakerToTaker::Hello(_) => {
unreachable!("Connection related messages are handled in the connection actor")
}
}
} }
} }
@ -682,17 +507,6 @@ where
} }
} }
#[async_trait]
impl<O: 'static, M: 'static, W: 'static> Handler<CfdRollOverCompleted> for Actor<O, M, W>
where
M: xtra::Handler<monitor::StartMonitoring>,
O: xtra::Handler<oracle::MonitorAttestation>,
{
async fn handle(&mut self, msg: CfdRollOverCompleted, _ctx: &mut Context<Self>) {
log_error!(self.handle_roll_over_completed(msg.order_id, msg.dlc));
}
}
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static, W: 'static> Handler<monitor::Event> for Actor<O, M, W> impl<O: 'static, M: 'static, W: 'static> Handler<monitor::Event> for Actor<O, M, W>
where where
@ -720,8 +534,4 @@ impl<O: 'static, M: 'static, W: 'static> Handler<setup_taker::Started> for Actor
} }
} }
impl Message for CfdRollOverCompleted {
type Result = ();
}
impl<O: 'static, M: 'static, W: 'static> xtra::Actor for Actor<O, M, W> {} impl<O: 'static, M: 'static, W: 'static> xtra::Actor for Actor<O, M, W> {}

7
daemon/src/wire.rs

@ -106,7 +106,10 @@ pub enum MakerToTaker {
order_id: OrderId, order_id: OrderId,
msg: SetupMsg, msg: SetupMsg,
}, },
RollOverProtocol(RollOverMsg), RollOverProtocol {
order_id: OrderId,
msg: RollOverMsg,
},
ConfirmRollOver { ConfirmRollOver {
order_id: OrderId, order_id: OrderId,
oracle_event_id: BitMexPriceEventId, oracle_event_id: BitMexPriceEventId,
@ -141,7 +144,7 @@ impl fmt::Display for MakerToTaker {
MakerToTaker::Protocol { .. } => write!(f, "Protocol"), MakerToTaker::Protocol { .. } => write!(f, "Protocol"),
MakerToTaker::ConfirmRollOver { .. } => write!(f, "ConfirmRollOver"), MakerToTaker::ConfirmRollOver { .. } => write!(f, "ConfirmRollOver"),
MakerToTaker::RejectRollOver(_) => write!(f, "RejectRollOver"), MakerToTaker::RejectRollOver(_) => write!(f, "RejectRollOver"),
MakerToTaker::RollOverProtocol(_) => write!(f, "RollOverProtocol"), MakerToTaker::RollOverProtocol { .. } => write!(f, "RollOverProtocol"),
MakerToTaker::Settlement { .. } => write!(f, "Settlement"), MakerToTaker::Settlement { .. } => write!(f, "Settlement"),
} }
} }

Loading…
Cancel
Save