From c6e59abcfc774b407c5c7a6c0fa8a155d30edacc Mon Sep 17 00:00:00 2001 From: Lucas Soriano del Pino Date: Fri, 26 Nov 2021 17:03:19 +1100 Subject: [PATCH] Introduce short-lived setup_maker::Actor --- daemon/src/address_map.rs | 7 + daemon/src/lib.rs | 6 +- daemon/src/maker_cfd.rs | 407 ++++++++++++---------------- daemon/src/maker_inc_connections.rs | 60 +++- daemon/src/setup_maker.rs | 284 +++++++++++++++++++ daemon/src/setup_taker.rs | 2 +- daemon/tests/happy_path.rs | 5 +- 7 files changed, 526 insertions(+), 245 deletions(-) create mode 100644 daemon/src/setup_maker.rs diff --git a/daemon/src/address_map.rs b/daemon/src/address_map.rs index 7723da0..8b9dd9b 100644 --- a/daemon/src/address_map.rs +++ b/daemon/src/address_map.rs @@ -29,6 +29,13 @@ where Ok(Disconnected { entry }) } + pub fn get_connected(&self, key: &K) -> Option<&xtra::Address> { + match self.inner.get(key) { + Some(addr) if addr.is_connected() => Some(addr), + _ => None, + } + } + /// Garbage-collect an address that is no longer active. pub fn gc(&mut self, stopping: Stopping) { self.inner.retain(|_, candidate| stopping.me != *candidate); diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 5d8bdbd..93ff913 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -5,6 +5,7 @@ use crate::maker_cfd::{FromTaker, TakerConnected}; use crate::model::cfd::{Cfd, Order, UpdateCfdProposals}; use crate::oracle::Attestation; use crate::tokio_ext::FutureExt; +use address_map::Stopping; use anyhow::Result; use connection::ConnectionStatus; use futures::future::RemoteHandle; @@ -44,6 +45,7 @@ pub mod routes; pub mod seed; pub mod send_to_socket; pub mod setup_contract; +pub mod setup_maker; pub mod setup_taker; pub mod taker_cfd; pub mod to_sse_event; @@ -112,7 +114,9 @@ where + xtra::Handler + xtra::Handler, T: xtra::Handler - + xtra::Handler, + + xtra::Handler + + xtra::Handler + + xtra::Handler>, W: xtra::Handler + xtra::Handler + xtra::Handler diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index 09ef881..156614f 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -1,3 +1,4 @@ +use crate::address_map::{AddressMap, Stopping}; use crate::cfd_actors::{self, append_cfd_state, insert_cfd_and_send_to_feed}; use crate::db::{insert_order, load_cfd_by_order_id, load_order_by_id}; use crate::model::cfd::{ @@ -10,10 +11,11 @@ use crate::projection::{ try_into_update_rollover_proposal, try_into_update_settlement_proposal, Update, UpdateRollOverProposal, UpdateSettlementProposal, }; -use crate::setup_contract::{RolloverParams, SetupParams}; +use crate::setup_contract::RolloverParams; use crate::tokio_ext::FutureExt; use crate::{ - log_error, maker_inc_connections, monitor, oracle, projection, setup_contract, wallet, wire, + log_error, maker_inc_connections, monitor, oracle, projection, setup_contract, setup_maker, + wallet, wire, Tasks, }; use anyhow::{Context as _, Result}; use async_trait::async_trait; @@ -27,6 +29,7 @@ use sqlx::Sqlite; use std::collections::{HashMap, HashSet}; use time::Duration; use xtra::prelude::*; +use xtra::Actor as _; use xtra_productivity::xtra_productivity; pub struct AcceptOrder { @@ -64,11 +67,6 @@ pub struct TakerDisconnected { pub id: Identity, } -pub struct CfdSetupCompleted { - pub order_id: OrderId, - pub dlc: Result, -} - pub struct CfdRollOverCompleted { pub order_id: OrderId, pub dlc: Result, @@ -93,7 +91,7 @@ pub struct Actor< takers: Address, current_order_id: Option, monitor_actor: Address, - setup_state: SetupState, + setup_actors: AddressMap, roll_over_state: RollOverState, oracle_actor: Address, // Maker needs to also store Identity to be able to send a reply back @@ -101,15 +99,7 @@ pub struct Actor< current_agreed_proposals: HashMap, connected_takers: HashSet, n_payouts: usize, -} - -enum SetupState { - Active { - taker: Identity, - sender: mpsc::UnboundedSender, - _task: RemoteHandle<()>, - }, - None, + tasks: Tasks, } enum RollOverState { @@ -143,13 +133,14 @@ impl Actor { takers, current_order_id: None, monitor_actor, - setup_state: SetupState::None, + setup_actors: AddressMap::default(), roll_over_state: RollOverState::None, oracle_actor, current_pending_proposals: HashMap::new(), current_agreed_proposals: HashMap::new(), n_payouts, connected_takers: HashSet::new(), + tasks: Tasks::default(), } } @@ -216,26 +207,6 @@ impl Actor { Ok(()) } - async fn handle_inc_protocol_msg( - &mut self, - taker_id: Identity, - msg: wire::SetupMsg, - ) -> Result<()> { - match &mut self.setup_state { - SetupState::Active { taker, sender, .. } if taker_id == *taker => { - sender.send(msg).await?; - } - SetupState::Active { taker, .. } => { - anyhow::bail!("Currently setting up contract with taker {}", taker) - } - SetupState::None => { - anyhow::bail!("Received setup message without an active contract setup"); - } - } - - Ok(()) - } - async fn handle_inc_roll_over_protocol_msg( &mut self, taker_id: Identity, @@ -320,6 +291,30 @@ impl Actor { .await?; Ok(()) } + + async fn append_cfd_state_setup_failed( + &mut self, + order_id: OrderId, + error: anyhow::Error, + ) -> Result<()> { + tracing::error!(%order_id, "Contract setup failed: {:#?}", error); + + let mut conn = self.db.acquire().await?; + let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + cfd.state = CfdState::setup_failed(error.to_string()); + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; + + Ok(()) + } + + async fn append_cfd_state_rejected(&mut self, order_id: OrderId) -> Result<()> { + let mut conn = self.db.acquire().await?; + let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + cfd.state = CfdState::rejected(); + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; + + Ok(()) + } } impl Actor @@ -406,17 +401,35 @@ where impl Actor where - T: xtra::Handler - + xtra::Handler, + O: xtra::Handler + xtra::Handler, + M: xtra::Handler, + T: xtra::Handler + + xtra::Handler + + xtra::Handler + + xtra::Handler>, + W: xtra::Handler + + xtra::Handler + + xtra::Handler, { async fn handle_take_order( &mut self, taker_id: Identity, order_id: OrderId, quantity: Usd, + ctx: &mut Context, ) -> Result<()> { tracing::debug!(%taker_id, %quantity, %order_id, "Taker wants to take an order"); + let disconnected = self + .setup_actors + .get_disconnected(order_id) + .with_context(|| { + format!( + "Contract setup for order {} is already in progress", + order_id + ) + })?; + let mut conn = self.db.acquire().await?; // 1. Validate if order is still valid @@ -471,130 +484,69 @@ where ); insert_cfd_and_send_to_feed(&cfd, &mut conn, &self.projection_actor).await?; - // 4. check if order has acceptable amounts and if not reject the cfd - // Since rejection is tied to the cfd state at the moment we can only do this after creating - // a cfd. - if quantity < current_order.min_quantity || quantity > current_order.max_quantity { - tracing::warn!( - "Order rejected because quantity {} was out of bounds. It was either <{} or >{}", - quantity, - current_order.min_quantity, - current_order.max_quantity - ); + // 4. Try to get the oracle announcement, if that fails we should exit prior to changing any + // state + let announcement = self + .oracle_actor + .send(oracle::GetAnnouncement(cfd.order.oracle_event_id)) + .await??; - self.reject_order(taker_id, cfd, conn).await?; - } + // 5. Start up contract setup actor + let this = ctx + .address() + .expect("actor to be able to give address to itself"); + let (addr, fut) = setup_maker::Actor::new( + (cfd.order, cfd.quantity_usd, self.n_payouts), + (self.oracle_pk, announcement), + &self.wallet, + &self.wallet, + (&self.takers, &self.takers, taker_id), + &this, + (&self.takers, &this), + ) + .create(None) + .run(); + + disconnected.insert(addr); + + self.tasks.add(fut); Ok(()) } } #[xtra_productivity] -impl Actor -where - Self: xtra::Handler, - O: xtra::Handler, - T: xtra::Handler, - W: xtra::Handler + xtra::Handler, -{ - async fn handle_accept_order( - &mut self, - msg: AcceptOrder, - ctx: &mut Context, - ) -> Result<()> { +impl Actor { + async fn handle_accept_order(&mut self, msg: AcceptOrder) -> Result<()> { let AcceptOrder { order_id } = msg; - if let SetupState::Active { .. } = self.setup_state { - anyhow::bail!("Already setting up a contract!") - } - - tracing::debug!(%order_id, "Maker accepts an order" ); + tracing::debug!(%order_id, "Maker accepts order"); let mut conn = self.db.acquire().await?; - - // 1. Validate if order is still valid let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - let taker_id = match cfd { - Cfd { - state: CfdState::IncomingOrderRequest { taker_id, .. }, - .. - } => taker_id, - _ => { - anyhow::bail!("Order is in invalid state. Ignoring trying to accept it.") - } - }; - - // 2. Try to get the oracle announcement, if that fails we should exit prior to changing any - // state - let offer_announcement = self - .oracle_actor - .send(oracle::GetAnnouncement(cfd.order.oracle_event_id)) - .await??; - // 3. Notify the taker that we are ready for contract setup - // Use `.send` here to ensure we only continue once the message has been sent - // Nothing done after this call should be able to fail, otherwise we notified the taker, but - // might not transition to `Active` ourselves! - self.takers - .send(maker_inc_connections::TakerMessage { - taker_id, - msg: wire::MakerToTaker::ConfirmOrder(cfd.order.id), - }) - .await??; + if !self + .setup_actors + .send(&order_id, setup_maker::Accepted) + .await + { + anyhow::bail!("No active contract setup for order {}", order_id); + } - // 4. Insert that we are in contract setup and refresh our own feed cfd.state = CfdState::contract_setup(); append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; - // 5. Spawn away the contract setup - let (sender, receiver) = mpsc::unbounded(); - let contract_future = setup_contract::new( - self.takers.clone().into_sink().with(move |msg| { - future::ok(maker_inc_connections::TakerMessage { - taker_id, - msg: wire::MakerToTaker::Protocol { order_id, msg }, - }) - }), - receiver, - (self.oracle_pk, offer_announcement), - SetupParams::new( - cfd.margin()?, - cfd.counterparty_margin()?, - cfd.order.price, - cfd.quantity_usd, - cfd.order.leverage, - cfd.refund_timelock_in_blocks(), - ), - Box::new(self.wallet.clone()), - Box::new(self.wallet.clone()), - Role::Maker, - self.n_payouts, - ); - - let this = ctx - .address() - .expect("actor to be able to give address to itself"); - - let task = async move { - let dlc = contract_future.await; - - this.send(CfdSetupCompleted { order_id, dlc }) - .await - .expect("always connected to ourselves"); - } - .spawn_with_handle(); - - // 6. Record that we are in an active contract setup - self.setup_state = SetupState::Active { - sender, - taker: taker_id, - _task: task, - }; - Ok(()) } } +#[xtra_productivity(message_impl = false)] +impl Actor { + async fn handle_setup_actor_stopping(&mut self, message: Stopping) { + self.setup_actors.gc(message); + } +} + #[xtra_productivity] impl Actor where @@ -603,7 +555,7 @@ where async fn handle_reject_order(&mut self, msg: RejectOrder) -> Result<()> { let RejectOrder { order_id } = msg; - tracing::debug!(%order_id, "Maker rejects an order" ); + tracing::debug!(%order_id, "Maker rejects order"); let mut conn = self.db.acquire().await?; let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; @@ -614,7 +566,7 @@ where .. } => taker_id, _ => { - anyhow::bail!("Order is in invalid state. Ignoring trying to accept it.") + anyhow::bail!("Order is in invalid state. Ignoring trying to reject it.") } }; @@ -839,70 +791,6 @@ where } } -impl Actor -where - O: xtra::Handler, - M: xtra::Handler, - W: xtra::Handler, -{ - async fn handle_cfd_setup_completed( - &mut self, - order_id: OrderId, - dlc: Result, - ) -> Result<()> { - self.setup_state = SetupState::None; - - let mut conn = self.db.acquire().await?; - let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - - let dlc = match dlc { - Ok(dlc) => dlc, - Err(e) => { - cfd.state = CfdState::SetupFailed { - common: CfdStateCommon::default(), - info: e.to_string(), - }; - - append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; - - return Err(e); - } - }; - - cfd.state = CfdState::PendingOpen { - common: CfdStateCommon::default(), - dlc: dlc.clone(), - attestation: None, - }; - - append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; - - let txid = self - .wallet - .send(wallet::TryBroadcastTransaction { - tx: dlc.lock.0.clone(), - }) - .await??; - - tracing::info!("Lock transaction published with txid {}", txid); - - self.monitor_actor - .send(monitor::StartMonitoring { - id: order_id, - params: MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()), - }) - .await?; - - self.oracle_actor - .send(oracle::MonitorAttestation { - event_id: dlc.settlement_event_id, - }) - .await?; - - Ok(()) - } -} - impl Actor where M: xtra::Handler, @@ -1060,6 +948,66 @@ where } } +#[xtra_productivity] +impl Actor +where + O: xtra::Handler, + M: xtra::Handler, + W: xtra::Handler, +{ + async fn handle_setup_completed(&mut self, msg: setup_maker::Completed) { + log_error!(async { + use setup_maker::Completed::*; + let (order_id, dlc) = match msg { + NewContract { order_id, dlc } => (order_id, dlc), + Failed { order_id, error } => { + self.append_cfd_state_setup_failed(order_id, error).await?; + return anyhow::Ok(()); + } + Rejected(order_id) => { + self.append_cfd_state_rejected(order_id).await?; + return anyhow::Ok(()); + } + }; + + let mut conn = self.db.acquire().await?; + let mut cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + + cfd.state = CfdState::PendingOpen { + common: CfdStateCommon::default(), + dlc: dlc.clone(), + attestation: None, + }; + + append_cfd_state(&cfd, &mut conn, &self.projection_actor).await?; + + let txid = self + .wallet + .send(wallet::TryBroadcastTransaction { + tx: dlc.lock.0.clone(), + }) + .await??; + + tracing::info!("Lock transaction published with txid {}", txid); + + self.monitor_actor + .send(monitor::StartMonitoring { + id: order_id, + params: MonitorParams::new(dlc.clone(), cfd.refund_timelock_in_blocks()), + }) + .await?; + + self.oracle_actor + .send(oracle::MonitorAttestation { + event_id: dlc.settlement_event_id, + }) + .await?; + + Ok(()) + }); + } +} + #[async_trait] impl Handler for Actor where @@ -1081,19 +1029,6 @@ where } } -#[async_trait] -impl Handler - for Actor -where - O: xtra::Handler, - M: xtra::Handler, - W: xtra::Handler, -{ - async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context) { - log_error!(self.handle_cfd_setup_completed(msg.order_id, msg.dlc)); - } -} - #[async_trait] impl Handler for Actor @@ -1119,15 +1054,20 @@ where #[async_trait] impl Handler for Actor where - T: xtra::Handler - + xtra::Handler, - M: xtra::Handler, - W: xtra::Handler, + O: xtra::Handler + xtra::Handler, + M: xtra::Handler + xtra::Handler, + T: xtra::Handler + + xtra::Handler + + xtra::Handler + + xtra::Handler>, + W: xtra::Handler + + xtra::Handler + + xtra::Handler, { - async fn handle(&mut self, FromTaker { taker_id, msg }: FromTaker, _ctx: &mut Context) { + async fn handle(&mut self, FromTaker { taker_id, msg }: FromTaker, ctx: &mut Context) { match msg { wire::TakerToMaker::TakeOrder { order_id, quantity } => { - log_error!(self.handle_take_order(taker_id, order_id, quantity)) + log_error!(self.handle_take_order(taker_id, order_id, quantity, ctx)) } wire::TakerToMaker::Settlement { order_id, @@ -1156,9 +1096,6 @@ where } => { log_error!(self.handle_initiate_settlement(taker_id, order_id, sig_taker)) } - wire::TakerToMaker::Protocol { msg, .. } => { - log_error!(self.handle_inc_protocol_msg(taker_id, msg)) - } wire::TakerToMaker::ProposeRollOver { order_id, timestamp, @@ -1171,10 +1108,12 @@ where taker_id, )) } - wire::TakerToMaker::RollOverProtocol(msg) => { log_error!(self.handle_inc_roll_over_protocol_msg(taker_id, msg)) } + wire::TakerToMaker::Protocol { .. } => { + unreachable!("This kind of message should be sent to the `setup_maker::Actor`") + } } } } @@ -1198,10 +1137,6 @@ impl Message for TakerDisconnected { type Result = (); } -impl Message for CfdSetupCompleted { - type Result = (); -} - impl Message for CfdRollOverCompleted { type Result = (); } diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index 1899277..c26baf3 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -1,8 +1,9 @@ +use crate::address_map::{AddressMap, Stopping}; use crate::maker_cfd::{FromTaker, TakerConnected, TakerDisconnected}; -use crate::model::cfd::Order; +use crate::model::cfd::{Order, OrderId}; use crate::model::Identity; use crate::noise::TransportStateExt; -use crate::{maker_cfd, noise, send_to_socket, wire, Tasks}; +use crate::{maker_cfd, noise, send_to_socket, setup_maker, wire, Tasks}; use anyhow::Result; use futures::TryStreamExt; use std::collections::HashMap; @@ -18,6 +19,20 @@ use xtra_productivity::xtra_productivity; pub struct BroadcastOrder(pub Option); +/// Message sent from the `setup_maker::Actor` to the +/// `maker_inc_connections::Actor` so that it can forward it to the +/// taker. +/// +/// Additionally, the address of this instance of the +/// `setup_maker::Actor` is included so that the +/// `maker_inc_connections::Actor` knows where to forward the contract +/// setup messages from the taker about this particular order. +pub struct ConfirmOrder { + pub taker_id: Identity, + pub order_id: OrderId, + pub address: xtra::Address, +} + #[derive(Debug)] pub struct TakerMessage { pub taker_id: Identity, @@ -41,6 +56,7 @@ pub struct Actor { taker_msg_channel: Box>, noise_priv_key: x25519_dalek::StaticSecret, heartbeat_interval: Duration, + setup_actors: AddressMap, connection_tasks: HashMap, } @@ -59,6 +75,7 @@ impl Actor { taker_msg_channel: taker_msg_channel.clone_channel(), noise_priv_key, heartbeat_interval, + setup_actors: AddressMap::default(), connection_tasks: HashMap::new(), } } @@ -113,10 +130,9 @@ impl Actor { FramedRead::new(read, wire::EncryptedJsonCodec::new(transport_state.clone())); let this = ctx.address().expect("self to be alive"); - let taker_msg_channel = self.taker_msg_channel.clone_channel(); let read_fut = async move { while let Ok(Some(msg)) = read.try_next().await { - let res = taker_msg_channel.send(FromTaker { taker_id, msg }).await; + let res = this.send(FromTaker { taker_id, msg }).await; if res.is_err() { break; @@ -166,6 +182,18 @@ impl Actor { } } + async fn handle_confirm_order(&mut self, msg: ConfirmOrder) -> Result<()> { + self.send_to_taker( + &msg.taker_id, + wire::MakerToTaker::ConfirmOrder(msg.order_id), + ) + .await?; + + self.setup_actors.insert(msg.order_id, msg.address); + + Ok(()) + } + async fn handle_taker_message(&mut self, msg: TakerMessage) -> Result<(), NoConnection> { self.send_to_taker(&msg.taker_id, msg.msg).await?; @@ -198,6 +226,30 @@ impl Actor { } } +#[xtra_productivity(message_impl = false)] +impl Actor { + async fn handle_msg_from_taker(&mut self, msg: FromTaker) { + use wire::TakerToMaker::*; + match msg.msg { + Protocol { order_id, msg } => match self.setup_actors.get_connected(&order_id) { + Some(addr) => { + let _ = addr.send(msg).await; + } + None => { + tracing::error!(%order_id, "No active contract setup"); + } + }, + _ => { + let _ = self.taker_msg_channel.send(msg); + } + } + } + + async fn handle_setup_actor_stopping(&mut self, message: Stopping) { + self.setup_actors.gc(message); + } +} + struct ReadFail(Identity); impl xtra::Actor for Actor {} diff --git a/daemon/src/setup_maker.rs b/daemon/src/setup_maker.rs new file mode 100644 index 0000000..b66a70e --- /dev/null +++ b/daemon/src/setup_maker.rs @@ -0,0 +1,284 @@ +use crate::address_map::Stopping; +use crate::model::cfd::{Cfd, CfdState, Dlc, Order, OrderId, Role}; +use crate::model::{Identity, Usd}; +use crate::oracle::Announcement; +use crate::setup_contract::{self, SetupParams}; +use crate::tokio_ext::spawn_fallible; +use crate::wire::{self, SetupMsg}; +use crate::{maker_inc_connections, wallet}; +use anyhow::{Context, Result}; +use async_trait::async_trait; +use futures::channel::mpsc::{self, UnboundedSender}; +use futures::{future, SinkExt}; +use maia::secp256k1_zkp::schnorrsig; +use xtra::prelude::MessageChannel; +use xtra_productivity::xtra_productivity; + +pub struct Actor { + order: Order, + quantity: Usd, + n_payouts: usize, + oracle_pk: schnorrsig::PublicKey, + announcement: Announcement, + build_party_params: Box>, + sign: Box>, + taker: Box>, + confirm_order: Box>, + taker_id: Identity, + on_completed: Box>, + on_stopping: Vec>>>, + setup_msg_sender: Option>, +} + +impl Actor { + pub fn new( + (order, quantity, n_payouts): (Order, Usd, usize), + (oracle_pk, announcement): (schnorrsig::PublicKey, Announcement), + build_party_params: &(impl MessageChannel + 'static), + sign: &(impl MessageChannel + 'static), + (taker, confirm_order, taker_id): ( + &(impl MessageChannel + 'static), + &(impl MessageChannel + 'static), + Identity, + ), + on_completed: &(impl MessageChannel + 'static), + (on_stopping0, on_stopping1): ( + &(impl MessageChannel> + 'static), + &(impl MessageChannel> + 'static), + ), + ) -> Self { + Self { + order, + quantity, + n_payouts, + oracle_pk, + announcement, + build_party_params: build_party_params.clone_channel(), + sign: sign.clone_channel(), + taker: taker.clone_channel(), + confirm_order: confirm_order.clone_channel(), + taker_id, + on_completed: on_completed.clone_channel(), + on_stopping: vec![on_stopping0.clone_channel(), on_stopping1.clone_channel()], + setup_msg_sender: None, + } + } + + async fn contract_setup(&mut self, this: xtra::Address) -> Result<()> { + let order_id = self.order.id; + let cfd = Cfd::new( + self.order.clone(), + self.quantity, + CfdState::contract_setup(), + ); + + let (sender, receiver) = mpsc::unbounded(); + // store the writing end to forward messages from the taker to + // the spawned contract setup task + self.setup_msg_sender = Some(sender); + + let taker_id = self.taker_id; + let contract_future = setup_contract::new( + self.taker.sink().with(move |msg| { + future::ok(maker_inc_connections::TakerMessage { + taker_id, + msg: wire::MakerToTaker::Protocol { order_id, msg }, + }) + }), + receiver, + (self.oracle_pk, self.announcement.clone()), + SetupParams::new( + cfd.margin()?, + cfd.counterparty_margin()?, + cfd.order.price, + cfd.quantity_usd, + cfd.order.leverage, + cfd.refund_timelock_in_blocks(), + ), + self.build_party_params.clone_channel(), + self.sign.clone_channel(), + Role::Maker, + self.n_payouts, + ); + + spawn_fallible::<_, anyhow::Error>(async move { + let _ = match contract_future.await { + Ok(dlc) => this.send(SetupSucceeded { order_id, dlc }).await?, + Err(error) => this.send(SetupFailed { order_id, error }).await?, + }; + + Ok(()) + }); + + Ok(()) + } + + async fn complete(&mut self, completed: Completed, ctx: &mut xtra::Context) { + let _ = self.on_completed.send(completed).await; + + ctx.stop(); + } +} + +#[xtra_productivity] +impl Actor { + fn handle(&mut self, _msg: Accepted, ctx: &mut xtra::Context) { + let order_id = self.order.id; + tracing::info!(%order_id, "Maker accepts an order"); + + let this = ctx + .address() + .expect("actor to be able to give address to itself"); + + let fut = async { + self.confirm_order + .send(maker_inc_connections::ConfirmOrder { + taker_id: self.taker_id, + order_id, + address: this.clone(), + }) + .await + .context("Failed to deliver order confirmation")??; + + self.contract_setup(this) + .await + .context("Failed to start contract setup")?; + + Ok(()) + }; + + if let Err(error) = fut.await { + tracing::error!(%order_id, "Stopping setup_maker actor: {}", error); + + self.complete(Completed::Failed { order_id, error }, ctx) + .await; + + return; + } + } + + fn handle(&mut self, _msg: Rejected, ctx: &mut xtra::Context) { + self.complete(Completed::Rejected(self.order.id), ctx).await; + } + + fn handle(&mut self, msg: SetupSucceeded, ctx: &mut xtra::Context) { + self.complete( + Completed::NewContract { + order_id: msg.order_id, + dlc: msg.dlc, + }, + ctx, + ) + .await + } + + fn handle(&mut self, msg: SetupFailed, ctx: &mut xtra::Context) { + self.complete( + Completed::Failed { + order_id: msg.order_id, + error: msg.error, + }, + ctx, + ) + .await + } +} + +#[xtra_productivity(message_impl = false)] +impl Actor { + fn handle(&mut self, msg: wire::SetupMsg, _ctx: &mut xtra::Context) -> Result<()> { + let mut sender = self + .setup_msg_sender + .clone() + .context("Cannot forward message to contract setup task")?; + sender.send(msg).await?; + + Ok(()) + } +} + +#[async_trait] +impl xtra::Actor for Actor { + async fn started(&mut self, ctx: &mut xtra::Context) { + let quantity = self.quantity; + let order = self.order.clone(); + if quantity < order.min_quantity || quantity > order.max_quantity { + tracing::info!( + "Order rejected: quantity {} not in range [{}, {}]", + quantity, + order.min_quantity, + order.max_quantity + ); + + let _ = self + .taker + .send(maker_inc_connections::TakerMessage { + taker_id: self.taker_id, + msg: wire::MakerToTaker::RejectOrder(order.id), + }) + .await; + + self.complete(Completed::Rejected(order.id), ctx).await; + } + } + + async fn stopping(&mut self, ctx: &mut xtra::Context) -> xtra::KeepRunning { + // inform other actors that we are stopping so that our + // address can be GCd from their AddressMaps + let me = ctx.address().expect("we are still alive"); + + for channel in self.on_stopping.iter() { + let _ = channel.send(Stopping { me: me.clone() }).await; + } + + xtra::KeepRunning::StopAll + } +} + +/// Message sent from the `maker_cfd::Actor` to the +/// `setup_maker::Actor` to inform that the maker user has accepted +/// the taker order request from the taker. +pub struct Accepted; + +/// Message sent from the `maker_cfd::Actor` to the +/// `setup_maker::Actor` to inform that the maker user has rejected +/// the taker order request from the taker. +pub struct Rejected; + +/// Message sent from the `setup_maker::Actor` to the +/// `maker_cfd::Actor` to notify that the contract setup has started. +pub struct Started(pub OrderId); + +/// Message sent from the spawned task to `setup_maker::Actor` to +/// notify that the contract setup has finished successfully. +pub struct SetupSucceeded { + order_id: OrderId, + dlc: Dlc, +} + +/// Message sent from the spawned task to `setup_maker::Actor` to +/// notify that the contract setup has failed. +pub struct SetupFailed { + order_id: OrderId, + error: anyhow::Error, +} + +/// Message sent from the `setup_maker::Actor` to the +/// `maker_cfd::Actor` to notify that the contract setup has finished. +#[derive(Debug)] +#[allow(clippy::large_enum_variant)] +pub enum Completed { + Rejected(OrderId), + NewContract { + order_id: OrderId, + dlc: Dlc, + }, + Failed { + order_id: OrderId, + error: anyhow::Error, + }, +} + +impl xtra::Message for Started { + type Result = (); +} diff --git a/daemon/src/setup_taker.rs b/daemon/src/setup_taker.rs index f919c2c..0065563 100644 --- a/daemon/src/setup_taker.rs +++ b/daemon/src/setup_taker.rs @@ -70,7 +70,7 @@ impl Actor { ); let (sender, receiver) = mpsc::unbounded::(); - // store the writing end to forward messages from the taker to + // store the writing end to forward messages from the maker to // the spawned contract setup task self.setup_msg_sender = Some(sender); diff --git a/daemon/tests/happy_path.rs b/daemon/tests/happy_path.rs index 4cfebdf..7829c8b 100644 --- a/daemon/tests/happy_path.rs +++ b/daemon/tests/happy_path.rs @@ -81,12 +81,11 @@ async fn taker_takes_order_and_maker_accepts_and_contract_setup() { .unwrap(); taker.mocks.mock_oracle_announcement().await; + maker.mocks.mock_oracle_announcement().await; + taker.take_order(received.clone(), Usd::new(dec!(5))).await; let (_, _) = next_cfd(taker.cfd_feed(), maker.cfd_feed()).await.unwrap(); - maker.mocks.mock_oracle_announcement().await; - taker.mocks.mock_oracle_announcement().await; - maker.mocks.mock_party_params().await; taker.mocks.mock_party_params().await;