From 2e0ccd79168ab2f6955c8859d05b87c371a652a3 Mon Sep 17 00:00:00 2001 From: Lucas Soriano del Pino Date: Wed, 24 Nov 2021 13:52:49 +1100 Subject: [PATCH 1/5] Prefer 2 MessageChannels over generic Address --- daemon/src/maker_cfd.rs | 1 + daemon/src/setup_contract.rs | 17 ++++++++--------- daemon/src/taker_cfd.rs | 1 + 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 0271584..33cd71b 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -552,6 +552,7 @@ where cfd.refund_timelock_in_blocks(), ), self.wallet.clone(), + self.wallet.clone(), Role::Maker, self.n_payouts, ); diff --git a/daemon/src/setup_contract.rs b/daemon/src/setup_contract.rs index 853740f..8157fc2 100644 --- a/daemon/src/setup_contract.rs +++ b/daemon/src/setup_contract.rs @@ -23,7 +23,7 @@ use std::collections::HashMap; use std::iter::FromIterator; use std::ops::RangeInclusive; use std::time::Duration; -use xtra::Address; +use xtra::prelude::MessageChannel; pub struct SetupParams { margin: Amount, @@ -56,23 +56,22 @@ impl SetupParams { /// Given an initial set of parameters, sets up the CFD contract with /// the other party. -pub async fn new( +#[allow(clippy::too_many_arguments)] +pub async fn new( mut sink: impl Sink + Unpin, mut stream: impl FusedStream + Unpin, (oracle_pk, announcement): (schnorrsig::PublicKey, oracle::Announcement), setup_params: SetupParams, - wallet: Address, + build_party_params_channel: impl MessageChannel, + sign_channel: impl MessageChannel, role: Role, n_payouts: usize, -) -> Result -where - W: xtra::Handler + xtra::Handler, -{ +) -> Result { let (sk, pk) = crate::keypair::new(&mut rand::thread_rng()); let (rev_sk, rev_pk) = crate::keypair::new(&mut rand::thread_rng()); let (publish_sk, publish_pk) = crate::keypair::new(&mut rand::thread_rng()); - let own_params = wallet + let own_params = build_party_params_channel .send(wallet::BuildPartyParams { amount: setup_params.margin, identity_pk: pk, @@ -211,7 +210,7 @@ where tracing::info!("Verified all signatures"); - let mut signed_lock_tx = wallet + let mut signed_lock_tx = sign_channel .send(wallet::Sign { psbt: lock_tx }) .await .context("Failed to send message to wallet actor")? diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 8d19dfc..426be0b 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -495,6 +495,7 @@ where cfd.refund_timelock_in_blocks(), ), self.wallet.clone(), + self.wallet.clone(), Role::Taker, self.n_payouts, ); From 2f2ab75b758634fd0ad84ba4c2048a5c9972ec9b Mon Sep 17 00:00:00 2001 From: Lucas Soriano del Pino Date: Wed, 24 Nov 2021 13:55:08 +1100 Subject: [PATCH 2/5] Associate contract setup messages with an order ID This is the first step in allowing concurrent execution of contract setup for different CFDs. --- daemon/src/maker_cfd.rs | 4 ++-- daemon/src/taker_cfd.rs | 6 +++--- daemon/src/wire.rs | 14 ++++++++++---- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 33cd71b..ef8cabd 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -538,7 +538,7 @@ where self.takers.clone().into_sink().with(move |msg| { future::ok(maker_inc_connections::TakerMessage { taker_id, - msg: wire::MakerToTaker::Protocol(msg), + msg: wire::MakerToTaker::Protocol { order_id, msg }, }) }), receiver, @@ -1133,7 +1133,7 @@ where } => { log_error!(self.handle_initiate_settlement(taker_id, order_id, sig_taker)) } - wire::TakerToMaker::Protocol(msg) => { + wire::TakerToMaker::Protocol { msg, .. } => { log_error!(self.handle_inc_protocol_msg(taker_id, msg)) } wire::TakerToMaker::ProposeRollOver { diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 426be0b..5b979c8 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -483,7 +483,7 @@ where self.send_to_maker .sink() .clone_message_sink() - .with(|msg| future::ok(wire::TakerToMaker::Protocol(msg))), + .with(move |msg| future::ok(wire::TakerToMaker::Protocol { order_id, msg })), receiver, (self.oracle_pk, offer_announcement), SetupParams::new( @@ -755,8 +755,8 @@ where wire::MakerToTaker::InvalidOrderId(order_id) => { log_error!(self.handle_invalid_order_id(order_id)) } - wire::MakerToTaker::Protocol(setup_msg) => { - log_error!(self.handle_inc_protocol_msg(setup_msg)) + wire::MakerToTaker::Protocol { msg, .. } => { + log_error!(self.handle_inc_protocol_msg(msg)) } wire::MakerToTaker::ConfirmRollOver { order_id, diff --git a/daemon/src/wire.rs b/daemon/src/wire.rs index 2782c1c..8979dd1 100644 --- a/daemon/src/wire.rs +++ b/daemon/src/wire.rs @@ -43,7 +43,10 @@ pub enum TakerToMaker { order_id: OrderId, timestamp: Timestamp, }, - Protocol(SetupMsg), + Protocol { + order_id: OrderId, + msg: SetupMsg, + }, RollOverProtocol(RollOverMsg), } @@ -53,7 +56,7 @@ impl fmt::Display for TakerToMaker { TakerToMaker::TakeOrder { .. } => write!(f, "TakeOrder"), TakerToMaker::ProposeSettlement { .. } => write!(f, "ProposeSettlement"), TakerToMaker::InitiateSettlement { .. } => write!(f, "InitiateSettlement"), - TakerToMaker::Protocol(_) => write!(f, "Protocol"), + TakerToMaker::Protocol { .. } => write!(f, "Protocol"), TakerToMaker::ProposeRollOver { .. } => write!(f, "ProposeRollOver"), TakerToMaker::RollOverProtocol(_) => write!(f, "RollOverProtocol"), } @@ -72,7 +75,10 @@ pub enum MakerToTaker { ConfirmSettlement(OrderId), RejectSettlement(OrderId), InvalidOrderId(OrderId), - Protocol(SetupMsg), + Protocol { + order_id: OrderId, + msg: SetupMsg, + }, RollOverProtocol(RollOverMsg), ConfirmRollOver { order_id: OrderId, @@ -91,7 +97,7 @@ impl fmt::Display for MakerToTaker { MakerToTaker::ConfirmSettlement(_) => write!(f, "ConfirmSettlement"), MakerToTaker::RejectSettlement(_) => write!(f, "RejectSettlement"), MakerToTaker::InvalidOrderId(_) => write!(f, "InvalidOrderId"), - MakerToTaker::Protocol(_) => write!(f, "Protocol"), + MakerToTaker::Protocol { .. } => write!(f, "Protocol"), MakerToTaker::ConfirmRollOver { .. } => write!(f, "ConfirmRollOver"), MakerToTaker::RejectRollOver(_) => write!(f, "RejectRollOver"), MakerToTaker::RollOverProtocol(_) => write!(f, "RollOverProtocol"), From eec8988456a10e6bc2659d3fd0cfcac2586cba5a Mon Sep 17 00:00:00 2001 From: Lucas Soriano del Pino Date: Tue, 23 Nov 2021 20:55:57 +1100 Subject: [PATCH 3/5] Introduce short-lived setup_taker_actor --- daemon/src/connection.rs | 62 +++++- daemon/src/lib.rs | 3 +- daemon/src/maker_cfd.rs | 4 +- daemon/src/setup_contract.rs | 4 +- daemon/src/setup_taker.rs | 231 ++++++++++++++++++++ daemon/src/taker_cfd.rs | 403 ++++++++++++++++------------------- daemon/tests/happy_path.rs | 2 + 7 files changed, 488 insertions(+), 221 deletions(-) create mode 100644 daemon/src/setup_taker.rs diff --git a/daemon/src/connection.rs b/daemon/src/connection.rs index e9ca16f..143675b 100644 --- a/daemon/src/connection.rs +++ b/daemon/src/connection.rs @@ -1,6 +1,9 @@ -use crate::{log_error, noise, send_to_socket, wire, Tasks}; +use crate::model::cfd::OrderId; +use crate::model::Usd; +use crate::{log_error, noise, send_to_socket, setup_taker, wire, Tasks}; use anyhow::Result; use futures::StreamExt; +use std::collections::HashMap; use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime}; @@ -28,6 +31,7 @@ pub struct Actor { /// Max duration since the last heartbeat until we die. timeout: Duration, connected_state: Option, + setup_actors: HashMap>, } pub struct Connect { @@ -48,6 +52,19 @@ pub enum ConnectionStatus { Offline, } +/// Message sent from the `setup_taker::Actor` to the +/// `connection::Actor` so that it can forward it to the maker. +/// +/// Additionally, the address of this instance of the +/// `setup_taker::Actor` is included so that the `connection::Actor` +/// knows where to forward the contract setup messages from the maker +/// about this particular order. +pub struct TakeOrder { + pub order_id: OrderId, + pub quantity: Usd, + pub address: xtra::Address, +} + impl Actor { pub fn new( status_sender: watch::Sender, @@ -65,6 +82,7 @@ impl Actor { maker_to_taker, timeout, connected_state: None, + setup_actors: HashMap::new(), } } } @@ -76,6 +94,22 @@ impl Actor { } } +#[xtra_productivity] +impl Actor { + async fn handle_take_order(&mut self, msg: TakeOrder) -> Result<()> { + self.send_to_maker + .send(wire::TakerToMaker::TakeOrder { + order_id: msg.order_id, + quantity: msg.quantity, + }) + .await?; + + self.setup_actors.insert(msg.order_id, msg.address); + + Ok(()) + } +} + #[xtra_productivity] impl Actor { async fn handle_connect( @@ -147,6 +181,32 @@ impl Actor { .expect("wire messages only to arrive in connected state") .last_heartbeat = SystemTime::now(); } + wire::MakerToTaker::ConfirmOrder(order_id) => match self.setup_actors.get(&order_id) { + Some(addr) => { + let _ = addr.send(setup_taker::Accepted).await; + } + None => { + tracing::warn!(%order_id, "No active contract setup"); + } + }, + wire::MakerToTaker::RejectOrder(order_id) => match self.setup_actors.get(&order_id) { + Some(addr) => { + let _ = addr.send(setup_taker::Rejected).await; + } + None => { + tracing::warn!(%order_id, "No active contract setup"); + } + }, + wire::MakerToTaker::Protocol { order_id, msg } => { + match self.setup_actors.get(&order_id) { + Some(addr) => { + let _ = addr.send(msg).await; + } + None => { + tracing::warn!(%order_id, "No active contract setup"); + } + } + } other => { // this one should go to the taker cfd actor log_error!(self.maker_to_taker.send(other)); diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 236445b..8174ea4 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -43,6 +43,7 @@ pub mod routes; pub mod seed; pub mod send_to_socket; pub mod setup_contract; +pub mod setup_taker; pub mod taker_cfd; pub mod to_sse_event; pub mod tokio_ext; @@ -250,7 +251,7 @@ where wallet_addr, oracle_pk, projection_actor, - Box::new(connection_actor_addr.clone()), + connection_actor_addr.clone(), monitor_addr.clone(), oracle_addr, n_payouts, diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index ef8cabd..e6dd271 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -551,8 +551,8 @@ where cfd.order.leverage, cfd.refund_timelock_in_blocks(), ), - self.wallet.clone(), - self.wallet.clone(), + Box::new(self.wallet.clone()), + Box::new(self.wallet.clone()), Role::Maker, self.n_payouts, ); diff --git a/daemon/src/setup_contract.rs b/daemon/src/setup_contract.rs index 8157fc2..940d565 100644 --- a/daemon/src/setup_contract.rs +++ b/daemon/src/setup_contract.rs @@ -62,8 +62,8 @@ pub async fn new( mut stream: impl FusedStream + Unpin, (oracle_pk, announcement): (schnorrsig::PublicKey, oracle::Announcement), setup_params: SetupParams, - build_party_params_channel: impl MessageChannel, - sign_channel: impl MessageChannel, + build_party_params_channel: Box>, + sign_channel: Box>, role: Role, n_payouts: usize, ) -> Result { diff --git a/daemon/src/setup_taker.rs b/daemon/src/setup_taker.rs new file mode 100644 index 0000000..b6b4973 --- /dev/null +++ b/daemon/src/setup_taker.rs @@ -0,0 +1,231 @@ +use crate::model::cfd::{Cfd, CfdState, Dlc, Order, OrderId, Role}; +use crate::model::Usd; +use crate::oracle::Announcement; +use crate::setup_contract::{self, SetupParams}; +use crate::tokio_ext::spawn_fallible; +use crate::wire::{self, SetupMsg}; +use crate::{connection, wallet}; +use anyhow::{Context, Result}; +use async_trait::async_trait; +use futures::channel::mpsc::{self, UnboundedSender}; +use futures::{future, SinkExt}; +use maia::secp256k1_zkp::schnorrsig; +use xtra::prelude::*; +use xtra_productivity::xtra_productivity; + +pub struct Actor { + order: Order, + quantity: Usd, + n_payouts: usize, + oracle_pk: schnorrsig::PublicKey, + announcement: Announcement, + build_party_params: Box>, + sign: Box>, + maker: xtra::Address, + on_accepted: Box>, + on_completed: Box>, + setup_msg_sender: Option>, +} + +impl Actor { + #[allow(clippy::too_many_arguments)] + pub fn new( + (order, quantity, n_payouts): (Order, Usd, usize), + (oracle_pk, announcement): (schnorrsig::PublicKey, Announcement), + build_party_params: &(impl MessageChannel + 'static), + sign: &(impl MessageChannel + 'static), + maker: xtra::Address, + on_accepted: &(impl MessageChannel + 'static), + on_completed: &(impl MessageChannel + 'static), + ) -> Self { + Self { + order, + quantity, + n_payouts, + oracle_pk, + announcement, + build_party_params: build_party_params.clone_channel(), + sign: sign.clone_channel(), + maker, + on_accepted: on_accepted.clone_channel(), + on_completed: on_completed.clone_channel(), + setup_msg_sender: None, + } + } +} + +#[xtra_productivity] +impl Actor { + fn handle(&mut self, _: Accepted, ctx: &mut xtra::Context) -> Result<()> { + let order_id = self.order.id; + tracing::info!(%order_id, "Order got accepted"); + + // inform the `taker_cfd::Actor` about the start of contract + // setup, so that the db and UI can be updated accordingly + self.on_accepted.send(Started(order_id)).await?; + + let cfd = Cfd::new( + self.order.clone(), + self.quantity, + CfdState::contract_setup(), + ); + + let (sender, receiver) = mpsc::unbounded::(); + // store the writing end to forward messages from the taker to + // the spawned contract setup task + self.setup_msg_sender = Some(sender); + + let contract_future = setup_contract::new( + xtra::message_channel::MessageChannel::sink(&self.maker) + .with(move |msg| future::ok(wire::TakerToMaker::Protocol { order_id, msg })), + receiver, + (self.oracle_pk, self.announcement.clone()), + SetupParams::new( + cfd.margin()?, + cfd.counterparty_margin()?, + cfd.order.price, + cfd.quantity_usd, + cfd.order.leverage, + cfd.refund_timelock_in_blocks(), + ), + self.build_party_params.clone_channel(), + self.sign.clone_channel(), + Role::Taker, + self.n_payouts, + ); + + let this = ctx.address().expect("self to be alive"); + spawn_fallible::<_, anyhow::Error>(async move { + let _ = match contract_future.await { + Ok(dlc) => this.send(SetupSucceeded { order_id, dlc }).await?, + Err(error) => this.send(SetupFailed { order_id, error }).await?, + }; + + Ok(()) + }); + + Ok(()) + } + + fn handle(&mut self, _: Rejected, ctx: &mut xtra::Context) -> Result<()> { + self.on_completed + .send(Completed::Rejected { + order_id: self.order.id, + }) + .await?; + + ctx.stop(); + + Ok(()) + } + + fn handle(&mut self, msg: wire::SetupMsg, _ctx: &mut xtra::Context) -> Result<()> { + let mut sender = self + .setup_msg_sender + .clone() + .context("Cannot forward message to contract setup task")?; + sender.send(msg).await?; + + Ok(()) + } + + fn handle(&mut self, msg: SetupSucceeded, ctx: &mut xtra::Context) -> Result<()> { + self.on_completed + .send(Completed::NewContract { + order_id: msg.order_id, + dlc: msg.dlc, + }) + .await?; + + ctx.stop(); + + Ok(()) + } + + fn handle(&mut self, msg: SetupFailed, ctx: &mut xtra::Context) -> Result<()> { + self.on_completed + .send(Completed::Failed { + order_id: msg.order_id, + error: msg.error, + }) + .await?; + + ctx.stop(); + + Ok(()) + } +} + +#[async_trait] +impl xtra::Actor for Actor { + async fn started(&mut self, ctx: &mut xtra::Context) { + let address = ctx + .address() + .expect("actor to be able to give address to itself"); + let res = self + .maker + .send(connection::TakeOrder { + order_id: self.order.id, + quantity: self.quantity, + address, + }) + .await; + + if let Err(e) = res { + tracing::error!(%self.order.id, "Stopping setup_taker actor: {}", e); + ctx.stop() + } + } +} + +/// Message sent from the `connection::Actor` to the +/// `setup_taker::Actor` to notify that the order taken was accepted +/// by the maker. +pub struct Accepted; + +/// Message sent from the `setup_taker::Actor` to the +/// `taker_cfd::Actor` to notify that the contract setup has started. +pub struct Started(pub OrderId); + +/// Message sent from the `connection::Actor` to the +/// `setup_taker::Actor` to notify that the order taken was rejected +/// by the maker. +pub struct Rejected; + +/// Message sent from the spawned task to `setup_taker::Actor` to +/// notify that the contract setup has finished successfully. +pub struct SetupSucceeded { + order_id: OrderId, + dlc: Dlc, +} + +/// Message sent from the spawned task to `setup_taker::Actor` to +/// notify that the contract setup has failed. +pub struct SetupFailed { + order_id: OrderId, + error: anyhow::Error, +} + +/// Message sent from the `setup_taker::Actor` to the +/// `taker_cfd::Actor` to notify that the contract setup has finished. +pub enum Completed { + NewContract { + order_id: OrderId, + dlc: Dlc, + }, + Rejected { + order_id: OrderId, + }, + Failed { + order_id: OrderId, + error: anyhow::Error, + }, +} + +impl xtra::Message for Started { + type Result = (); +} + +impl xtra::Message for Completed { + type Result = (); +} diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 5b979c8..8f6fda5 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -6,18 +6,22 @@ use crate::model::cfd::{ }; use crate::model::{BitMexPriceEventId, Price, Timestamp, Usd}; use crate::monitor::{self, MonitorParams}; -use crate::setup_contract::{RolloverParams, SetupParams}; +use crate::setup_contract::RolloverParams; use crate::tokio_ext::FutureExt; -use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg}; -use crate::{log_error, oracle, projection, setup_contract, wallet, wire}; +use crate::wire::RollOverMsg; +use crate::{ + connection, log_error, oracle, projection, setup_contract, setup_taker, wallet, wire, Tasks, +}; use anyhow::{bail, Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; use futures::channel::mpsc; use futures::future::RemoteHandle; use futures::{future, SinkExt}; +use std::collections::hash_map::Entry; use std::collections::HashMap; use xtra::prelude::*; +use xtra::Actor as _; pub struct TakeOffer { pub order_id: OrderId, @@ -37,24 +41,11 @@ pub enum CfdAction { }, } -pub struct CfdSetupCompleted { - pub order_id: OrderId, - pub dlc: Result, -} - pub struct CfdRollOverCompleted { pub order_id: OrderId, pub dlc: Result, } -enum SetupState { - Active { - sender: mpsc::UnboundedSender, - _task: RemoteHandle<()>, - }, - None, -} - enum RollOverState { Active { sender: mpsc::UnboundedSender, @@ -68,13 +59,14 @@ pub struct Actor { wallet: Address, oracle_pk: schnorrsig::PublicKey, projection_actor: Address, - send_to_maker: Box>, + conn_actor: Address, monitor_actor: Address, - setup_state: SetupState, + setup_actors: HashMap>, roll_over_state: RollOverState, oracle_actor: Address, current_pending_proposals: UpdateCfdProposals, n_payouts: usize, + tasks: Tasks, } impl Actor @@ -89,7 +81,7 @@ where wallet: Address, oracle_pk: schnorrsig::PublicKey, projection_actor: Address, - send_to_maker: Box>, + conn_actor: Address, monitor_actor: Address, oracle_actor: Address, n_payouts: usize, @@ -99,13 +91,14 @@ where wallet, oracle_pk, projection_actor, - send_to_maker, + conn_actor, monitor_actor, - setup_state: SetupState::None, roll_over_state: RollOverState::None, oracle_actor, current_pending_proposals: HashMap::new(), n_payouts, + setup_actors: HashMap::new(), + tasks: Tasks::default(), } } } @@ -139,33 +132,6 @@ impl Actor { } } } - - async fn handle_take_offer(&mut self, order_id: OrderId, quantity: Usd) -> Result<()> { - let mut conn = self.db.acquire().await?; - - let current_order = load_order_by_id(order_id, &mut conn).await?; - - tracing::info!("Taking current order: {:?}", ¤t_order); - - let cfd = Cfd::new( - current_order.clone(), - quantity, - CfdState::outgoing_order_request(), - ); - - insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.projection_actor).await?; - - // Cleanup own order feed, after inserting the cfd. - // Due to the 1:1 relationship between order and cfd we can never create another cfd for the - // same order id. - self.projection_actor.send(projection::Update(None)).await?; - - self.send_to_maker - .send(wire::TakerToMaker::TakeOrder { order_id, quantity }) - .await?; - - Ok(()) - } } impl Actor @@ -218,7 +184,7 @@ where ); self.send_pending_update_proposals().await?; - self.send_to_maker + self.conn_actor .send(wire::TakerToMaker::ProposeSettlement { order_id: proposal.order_id, timestamp: proposal.timestamp, @@ -230,12 +196,6 @@ where Ok(()) } - async fn handle_order_rejected(&mut self, order_id: OrderId) -> Result<()> { - self.append_cfd_state_rejected(order_id).await?; - - Ok(()) - } - async fn handle_settlement_rejected(&mut self, order_id: OrderId) -> Result<()> { tracing::info!(%order_id, "Settlement proposal got rejected"); @@ -254,19 +214,6 @@ where Ok(()) } - async fn handle_inc_protocol_msg(&mut self, msg: SetupMsg) -> Result<()> { - match &mut self.setup_state { - SetupState::Active { sender, .. } => { - sender.send(msg).await?; - } - SetupState::None => { - anyhow::bail!("Received setup message without an active contract setup") - } - } - - Ok(()) - } - async fn handle_inc_roll_over_msg(&mut self, msg: RollOverMsg) -> Result<()> { match &mut self.roll_over_state { RollOverState::Active { sender, .. } => { @@ -287,17 +234,6 @@ where Ok(()) } - - async fn append_cfd_state_rejected(&mut self, order_id: OrderId) -> Result<()> { - tracing::debug!(%order_id, "Order rejected"); - - let mut conn = self.db.acquire().await?; - let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - cfd.state = CfdState::rejected(); - append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; - - Ok(()) - } } impl Actor { @@ -329,6 +265,73 @@ impl Actor { Ok(()) } + async fn handle_order_rejected(&mut self, order_id: OrderId) -> Result<()> { + self.append_cfd_state_rejected(order_id).await?; + + Ok(()) + } + + async fn append_cfd_state_rejected(&mut self, order_id: OrderId) -> Result<()> { + tracing::debug!(%order_id, "Order rejected"); + + let mut conn = self.db.acquire().await?; + let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + cfd.state = CfdState::rejected(); + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; + + Ok(()) + } + + async fn handle_contract_setup_failed( + &mut self, + order_id: OrderId, + error: anyhow::Error, + ) -> Result<()> { + tracing::error!(%order_id, "Contract setup failed: {:#?}", error); + + let mut conn = self.db.acquire().await?; + let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + cfd.state = CfdState::setup_failed(error.to_string()); + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; + + Ok(()) + } + + /// Set the state of the CFD in the database to `ContractSetup` + /// and update the corresponding projection. + async fn handle_setup_started(&mut self, order_id: OrderId) -> Result<()> { + let mut conn = self.db.acquire().await?; + let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + cfd.state = CfdState::contract_setup(); + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; + + Ok(()) + } +} + +impl Actor +where + W: xtra::Handler, +{ + async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { + let mut conn = self.db.acquire().await?; + cfd_actors::handle_monitoring_event(event, &mut conn, &self.wallet, &self.projection_actor) + .await?; + Ok(()) + } + + async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { + let mut conn = self.db.acquire().await?; + cfd_actors::handle_oracle_attestation( + attestation, + &mut conn, + &self.wallet, + &self.projection_actor, + ) + .await?; + Ok(()) + } + async fn handle_propose_roll_over(&mut self, order_id: OrderId) -> Result<()> { if self.current_pending_proposals.contains_key(&order_id) { anyhow::bail!("An update for order id {} is already in progress", order_id) @@ -348,7 +351,7 @@ impl Actor { ); self.send_pending_update_proposals().await?; - self.send_to_maker + self.conn_actor .send(wire::TakerToMaker::ProposeRollOver { order_id: proposal.order_id, timestamp: proposal.timestamp, @@ -360,24 +363,75 @@ impl Actor { impl Actor where - W: xtra::Handler, + Self: xtra::Handler, + O: xtra::Handler + xtra::Handler, + W: xtra::Handler + xtra::Handler, { - async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> { + async fn handle_take_offer( + &mut self, + order_id: OrderId, + quantity: Usd, + ctx: &mut Context, + ) -> Result<()> { + let entry = self.setup_actors.entry(order_id); + if matches!(entry, Entry::Occupied(ref occupied) if occupied.get().is_connected()) { + bail!( + "A contract setup for order id {} is already in progress", + order_id + ) + } + let mut conn = self.db.acquire().await?; - cfd_actors::handle_oracle_attestation( - attestation, - &mut conn, + + let current_order = load_order_by_id(order_id, &mut conn).await?; + + tracing::info!("Taking current order: {:?}", ¤t_order); + + let cfd = Cfd::new( + current_order.clone(), + quantity, + CfdState::outgoing_order_request(), + ); + + insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.projection_actor).await?; + + // Cleanup own order feed, after inserting the cfd. + // Due to the 1:1 relationship between order and cfd we can never create another cfd for the + // same order id. + self.projection_actor.send(projection::Update(None)).await?; + + let announcement = self + .oracle_actor + .send(oracle::GetAnnouncement(cfd.order.oracle_event_id)) + .await? + .with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?; + + let this = ctx + .address() + .expect("actor to be able to give address to itself"); + let (addr, fut) = setup_taker::Actor::new( + (current_order, quantity, self.n_payouts), + (self.oracle_pk, announcement), &self.wallet, - &self.projection_actor, + &self.wallet, + self.conn_actor.clone(), + &this, + &this, ) - .await?; - Ok(()) - } + .create(None) + .run(); + + match entry { + Entry::Occupied(mut disconnected) => { + disconnected.insert(addr); + } + Entry::Vacant(vacant) => { + vacant.insert(addr); + } + } + + self.tasks.add(fut); - async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> { - let mut conn = self.db.acquire().await?; - cfd_actors::handle_monitoring_event(event, &mut conn, &self.wallet, &self.projection_actor) - .await?; Ok(()) } } @@ -388,30 +442,10 @@ where M: xtra::Handler, W: xtra::Handler, { - async fn handle_cfd_setup_completed( - &mut self, - order_id: OrderId, - dlc: Result, - ) -> Result<()> { - self.setup_state = SetupState::None; - + async fn handle_new_contract(&mut self, order_id: OrderId, dlc: Dlc) -> Result<()> { let mut conn = self.db.acquire().await?; let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - let dlc = match dlc { - Ok(dlc) => dlc, - Err(e) => { - cfd.state = CfdState::SetupFailed { - common: CfdStateCommon::default(), - info: e.to_string(), - }; - - append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; - - return Err(e); - } - }; - tracing::info!("Setup complete, publishing on chain now"); cfd.state = CfdState::PendingOpen { @@ -448,80 +482,6 @@ where } } -impl Actor -where - Self: xtra::Handler, - O: xtra::Handler + xtra::Handler, - W: xtra::Handler + xtra::Handler, -{ - async fn handle_order_accepted( - &mut self, - order_id: OrderId, - ctx: &mut Context, - ) -> Result<()> { - tracing::info!(%order_id, "Order got accepted"); - - let (sender, receiver) = mpsc::unbounded(); - - if let SetupState::Active { .. } = self.setup_state { - anyhow::bail!("Already setting up a contract!") - } - - let mut conn = self.db.acquire().await?; - let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - cfd.state = CfdState::contract_setup(); - - append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; - - let offer_announcement = self - .oracle_actor - .send(oracle::GetAnnouncement(cfd.order.oracle_event_id)) - .await? - .with_context(|| format!("Announcement {} not found", cfd.order.oracle_event_id))?; - - let contract_future = setup_contract::new( - self.send_to_maker - .sink() - .clone_message_sink() - .with(move |msg| future::ok(wire::TakerToMaker::Protocol { order_id, msg })), - receiver, - (self.oracle_pk, offer_announcement), - SetupParams::new( - cfd.margin()?, - cfd.counterparty_margin()?, - cfd.order.price, - cfd.quantity_usd, - cfd.order.leverage, - cfd.refund_timelock_in_blocks(), - ), - self.wallet.clone(), - self.wallet.clone(), - Role::Taker, - self.n_payouts, - ); - - let this = ctx - .address() - .expect("actor to be able to give address to itself"); - - let task = async move { - let dlc = contract_future.await; - - this.send(CfdSetupCompleted { order_id, dlc }) - .await - .expect("always connected to ourselves") - } - .spawn_with_handle(); - - self.setup_state = SetupState::Active { - sender, - _task: task, - }; - - Ok(()) - } -} - impl Actor where Self: xtra::Handler, @@ -556,8 +516,7 @@ where .with_context(|| format!("Announcement {} not found", oracle_event_id))?; let contract_future = setup_contract::roll_over( - self.send_to_maker - .sink() + xtra::message_channel::MessageChannel::sink(&self.conn_actor) .with(|msg| future::ok(wire::TakerToMaker::RollOverProtocol(msg))), receiver, (self.oracle_pk, announcement), @@ -660,7 +619,7 @@ where let proposal = self.get_settlement_proposal(order_id)?; let (tx, sig_taker) = dlc.close_transaction(proposal)?; - self.send_to_maker + self.conn_actor .send(wire::TakerToMaker::InitiateSettlement { order_id, sig_taker, @@ -688,9 +647,15 @@ where } #[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: TakeOffer, _ctx: &mut Context) -> Result<()> { - self.handle_take_offer(msg.order_id, msg.quantity).await +impl Handler for Actor +where + Self: xtra::Handler, + O: xtra::Handler + xtra::Handler, + W: xtra::Handler + xtra::Handler, +{ + async fn handle(&mut self, msg: TakeOffer, ctx: &mut Context) -> Result<()> { + self.handle_take_offer(msg.order_id, msg.quantity, ctx) + .await } } @@ -725,7 +690,7 @@ where #[async_trait] impl Handler for Actor where - Self: xtra::Handler + xtra::Handler, + Self: xtra::Handler, O: xtra::Handler + xtra::Handler, M: xtra::Handler, W: xtra::Handler @@ -734,18 +699,9 @@ where { async fn handle(&mut self, msg: wire::MakerToTaker, ctx: &mut Context) { match msg { - wire::MakerToTaker::Heartbeat => { - unreachable!("Heartbeats should be handled somewhere else") - } wire::MakerToTaker::CurrentOrder(current_order) => { log_error!(self.handle_new_order(current_order)) } - wire::MakerToTaker::ConfirmOrder(order_id) => { - log_error!(self.handle_order_accepted(order_id, ctx)) - } - wire::MakerToTaker::RejectOrder(order_id) => { - log_error!(self.handle_order_rejected(order_id)) - } wire::MakerToTaker::ConfirmSettlement(order_id) => { log_error!(self.handle_settlement_accepted(order_id, ctx)) } @@ -755,9 +711,6 @@ where wire::MakerToTaker::InvalidOrderId(order_id) => { log_error!(self.handle_invalid_order_id(order_id)) } - wire::MakerToTaker::Protocol { msg, .. } => { - log_error!(self.handle_inc_protocol_msg(msg)) - } wire::MakerToTaker::ConfirmRollOver { order_id, oracle_event_id, @@ -767,22 +720,39 @@ where wire::MakerToTaker::RejectRollOver(order_id) => { log_error!(self.handle_roll_over_rejected(order_id)) } - MakerToTaker::RollOverProtocol(roll_over_msg) => { + wire::MakerToTaker::RollOverProtocol(roll_over_msg) => { log_error!(self.handle_inc_roll_over_msg(roll_over_msg)) } + wire::MakerToTaker::Heartbeat => { + unreachable!("Heartbeats should be handled somewhere else") + } + wire::MakerToTaker::ConfirmOrder(_) + | wire::MakerToTaker::RejectOrder(_) + | wire::MakerToTaker::Protocol { .. } => { + unreachable!("These messages should be sent to the `setup_taker::Actor`") + } } } } #[async_trait] -impl Handler for Actor +impl Handler for Actor where O: xtra::Handler, M: xtra::Handler, W: xtra::Handler, { - async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context) { - log_error!(self.handle_cfd_setup_completed(msg.order_id, msg.dlc)); + async fn handle(&mut self, msg: setup_taker::Completed, _ctx: &mut Context) { + use setup_taker::Completed::*; + match msg { + NewContract { order_id, dlc } => { + log_error!(self.handle_new_contract(order_id, dlc)) + } + Rejected { order_id } => log_error!(self.handle_order_rejected(order_id)), + Failed { order_id, error } => { + log_error!(self.handle_contract_setup_failed(order_id, error)) + } + } } } @@ -817,6 +787,13 @@ where } } +#[async_trait] +impl Handler for Actor { + async fn handle(&mut self, msg: setup_taker::Started, _ctx: &mut Context) { + log_error!(self.handle_setup_started(msg.0)) + } +} + impl Message for TakeOffer { type Result = Result<()>; } @@ -825,10 +802,6 @@ impl Message for CfdAction { type Result = Result<()>; } -impl Message for CfdSetupCompleted { - type Result = (); -} - impl Message for CfdRollOverCompleted { type Result = (); } diff --git a/daemon/tests/happy_path.rs b/daemon/tests/happy_path.rs index edf8bdc..d9490c4 100644 --- a/daemon/tests/happy_path.rs +++ b/daemon/tests/happy_path.rs @@ -40,6 +40,7 @@ async fn taker_takes_order_and_maker_rejects() { .await .unwrap(); + taker.mocks.mock_oracle_annoucement().await; taker.take_order(received.clone(), Usd::new(dec!(10))).await; let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); @@ -77,6 +78,7 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() { .await .unwrap(); + taker.mocks.mock_oracle_annoucement().await; taker.take_order(received.clone(), Usd::new(dec!(5))).await; let (_, _) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); From a6b5bfe620b4e9c9d0f0be9b0166a0bb7410b6c8 Mon Sep 17 00:00:00 2001 From: Lucas Soriano del Pino Date: Wed, 24 Nov 2021 22:59:56 +1100 Subject: [PATCH 4/5] Correct typo in actor tests --- daemon/tests/happy_path.rs | 8 ++++---- daemon/tests/harness/mocks/mod.rs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/daemon/tests/happy_path.rs b/daemon/tests/happy_path.rs index d9490c4..fc28d58 100644 --- a/daemon/tests/happy_path.rs +++ b/daemon/tests/happy_path.rs @@ -40,7 +40,7 @@ async fn taker_takes_order_and_maker_rejects() { .await .unwrap(); - taker.mocks.mock_oracle_annoucement().await; + taker.mocks.mock_oracle_announcement().await; taker.take_order(received.clone(), Usd::new(dec!(10))).await; let (taker_cfd, maker_cfd) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); @@ -78,12 +78,12 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() { .await .unwrap(); - taker.mocks.mock_oracle_annoucement().await; + taker.mocks.mock_oracle_announcement().await; taker.take_order(received.clone(), Usd::new(dec!(5))).await; let (_, _) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); - maker.mocks.mock_oracle_annoucement().await; - taker.mocks.mock_oracle_annoucement().await; + maker.mocks.mock_oracle_announcement().await; + taker.mocks.mock_oracle_announcement().await; maker.mocks.mock_party_params().await; taker.mocks.mock_party_params().await; diff --git a/daemon/tests/harness/mocks/mod.rs b/daemon/tests/harness/mocks/mod.rs index 7ab066b..ec42752 100644 --- a/daemon/tests/harness/mocks/mod.rs +++ b/daemon/tests/harness/mocks/mod.rs @@ -54,7 +54,7 @@ impl Mocks { .in_sequence(&mut seq); } - pub async fn mock_oracle_annoucement(&mut self) { + pub async fn mock_oracle_announcement(&mut self) { self.oracle() .await .expect_get_announcement() From d7f7d8f91292e11edea561832aafb3e170816d59 Mon Sep 17 00:00:00 2001 From: Lucas Soriano del Pino Date: Thu, 25 Nov 2021 11:58:47 +1100 Subject: [PATCH 5/5] fixup! Introduce short-lived setup_taker_actor --- daemon/src/taker_cfd.rs | 33 +++++++++++++++------------------ 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 8f6fda5..0535933 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -265,12 +265,6 @@ impl Actor { Ok(()) } - async fn handle_order_rejected(&mut self, order_id: OrderId) -> Result<()> { - self.append_cfd_state_rejected(order_id).await?; - - Ok(()) - } - async fn append_cfd_state_rejected(&mut self, order_id: OrderId) -> Result<()> { tracing::debug!(%order_id, "Order rejected"); @@ -282,7 +276,7 @@ impl Actor { Ok(()) } - async fn handle_contract_setup_failed( + async fn append_cfd_state_setup_failed( &mut self, order_id: OrderId, error: anyhow::Error, @@ -442,7 +436,19 @@ where M: xtra::Handler, W: xtra::Handler, { - async fn handle_new_contract(&mut self, order_id: OrderId, dlc: Dlc) -> Result<()> { + async fn handle_setup_completed(&mut self, msg: setup_taker::Completed) -> Result<()> { + let (order_id, dlc) = match msg { + setup_taker::Completed::NewContract { order_id, dlc } => (order_id, dlc), + setup_taker::Completed::Rejected { order_id } => { + self.append_cfd_state_rejected(order_id).await?; + return Ok(()); + } + setup_taker::Completed::Failed { order_id, error } => { + self.append_cfd_state_setup_failed(order_id, error).await?; + return Ok(()); + } + }; + let mut conn = self.db.acquire().await?; let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; @@ -743,16 +749,7 @@ where W: xtra::Handler, { async fn handle(&mut self, msg: setup_taker::Completed, _ctx: &mut Context) { - use setup_taker::Completed::*; - match msg { - NewContract { order_id, dlc } => { - log_error!(self.handle_new_contract(order_id, dlc)) - } - Rejected { order_id } => log_error!(self.handle_order_rejected(order_id)), - Failed { order_id, error } => { - log_error!(self.handle_contract_setup_failed(order_id, error)) - } - } + log_error!(self.handle_setup_completed(msg)) } }