From 20c76f6134188c95edabfd697b6691f44d1b12b0 Mon Sep 17 00:00:00 2001 From: Lucas Soriano del Pino Date: Mon, 18 Oct 2021 10:54:05 +1100 Subject: [PATCH] Use MessageChannel instead of send_to_socket::Actor We depend on `luckysori/xtra` until https://github.com/Restioson/xtra/pull/45 is released. --- Cargo.lock | 2 +- Cargo.toml | 2 +- daemon/src/taker.rs | 2 +- daemon/src/taker_cfd.rs | 31 +++++++++++++------------------ 4 files changed, 16 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 50d34c0..4673bcd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3353,7 +3353,7 @@ dependencies = [ [[package]] name = "xtra" version = "0.6.0" -source = "git+https://github.com/Restioson/xtra#7ae9140dbc30127aaa5f701078b72ac143babeb4" +source = "git+https://github.com/luckysori/xtra?rev=19684c196f71ec762e595906cc88c6142b82092e#19684c196f71ec762e595906cc88c6142b82092e" dependencies = [ "async-trait", "barrage", diff --git a/Cargo.toml b/Cargo.toml index bd63170..991f689 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,4 +4,4 @@ resolver = "2" [patch.crates-io] rocket = { git = "https://github.com/SergioBenitez/Rocket" } # Need to patch rocket dependency of `rocket_basicauth` until there is an official release. -xtra = { git = "https://github.com/Restioson/xtra" } # Need to patch xtra dependency until there is an official release as it contains useful PRs from us. +xtra = { git = "https://github.com/luckysori/xtra", rev = "19684c196f71ec762e595906cc88c6142b82092e" } # Need to patch xtra dependency until there is an official release as it contains useful PRs from us. diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index bc2acef..1771552 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -221,7 +221,7 @@ async fn main() -> Result<()> { cfd_feed_sender, order_feed_sender, update_cfd_feed_sender, - send_to_maker, + Box::new(send_to_maker), monitor_actor_address.clone(), oracle_actor_address.clone(), ) diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 218fac3..5709169 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -9,7 +9,7 @@ use crate::model::{BitMexPriceEventId, Usd}; use crate::monitor::{self, MonitorParams}; use crate::wallet::Wallet; use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg}; -use crate::{log_error, oracle, send_to_socket, setup_contract, wire}; +use crate::{log_error, oracle, setup_contract, wire}; use anyhow::{Context as _, Result}; use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; @@ -74,7 +74,7 @@ pub struct Actor { cfd_feed_actor_inbox: watch::Sender>, order_feed_actor_inbox: watch::Sender>, update_cfd_feed_sender: watch::Sender, - send_to_maker: Address>, + send_to_maker: Box>, monitor_actor: Address, setup_state: SetupState, roll_over_state: RollOverState, @@ -91,7 +91,7 @@ impl Actor { cfd_feed_actor_inbox: watch::Sender>, order_feed_actor_inbox: watch::Sender>, update_cfd_feed_sender: watch::Sender, - send_to_maker: Address>, + send_to_maker: Box + Send>, monitor_actor: Address, oracle_actor: Address, ) -> Self { @@ -155,8 +155,7 @@ impl Actor { insert_cfd(&cfd, &mut conn, &self.cfd_feed_actor_inbox).await?; self.send_to_maker - .do_send_async(wire::TakerToMaker::TakeOrder { order_id, quantity }) - .await?; + .do_send(wire::TakerToMaker::TakeOrder { order_id, quantity })?; Ok(()) } @@ -191,14 +190,13 @@ impl Actor { self.send_pending_update_proposals()?; self.send_to_maker - .do_send_async(wire::TakerToMaker::ProposeSettlement { + .do_send(wire::TakerToMaker::ProposeSettlement { order_id: proposal.order_id, timestamp: proposal.timestamp, taker: proposal.taker, maker: proposal.maker, price: proposal.price, - }) - .await?; + })?; Ok(()) } @@ -222,11 +220,10 @@ impl Actor { self.send_pending_update_proposals()?; self.send_to_maker - .do_send_async(wire::TakerToMaker::ProposeRollOver { + .do_send(wire::TakerToMaker::ProposeRollOver { order_id: proposal.order_id, timestamp: proposal.timestamp, - }) - .await?; + })?; Ok(()) } @@ -283,8 +280,8 @@ impl Actor { let contract_future = setup_contract::new( self.send_to_maker - .clone() - .into_sink() + .sink() + .clone_message_sink() .with(|msg| future::ok(wire::TakerToMaker::Protocol(msg))), receiver, (self.oracle_pk, offer_announcement), @@ -337,11 +334,10 @@ impl Actor { let (tx, sig_taker) = dlc.close_transaction(proposal)?; self.send_to_maker - .do_send_async(wire::TakerToMaker::InitiateSettlement { + .do_send(wire::TakerToMaker::InitiateSettlement { order_id, sig_taker, - }) - .await?; + })?; cfd.handle(CfdStateChangeEvent::ProposalSigned( CollaborativeSettlement::new( @@ -391,8 +387,7 @@ impl Actor { let contract_future = setup_contract::roll_over( self.send_to_maker - .clone() - .into_sink() + .sink() .with(|msg| future::ok(wire::TakerToMaker::RollOverProtocol(msg))), receiver, (self.oracle_pk, announcement),