From 0d1406ec6301cbb10ae3c992ada99f9f3115841f Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 27 Sep 2021 18:02:22 +1000 Subject: [PATCH] Replace `in_taker_messages` with `Address::attach_stream` --- daemon/src/maker.rs | 13 ++-- daemon/src/maker_cfd.rs | 94 +++++++++++++++++------------ daemon/src/maker_inc_connections.rs | 39 ------------ 3 files changed, 61 insertions(+), 85 deletions(-) diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 54d687a..2d27b6e 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -1,5 +1,4 @@ use crate::auth::MAKER_USERNAME; -use crate::maker_inc_connections::in_taker_messages; use crate::model::TakerId; use crate::seed::Seed; use crate::wallet::Wallet; @@ -7,6 +6,7 @@ use anyhow::{Context, Result}; use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1}; use bdk::bitcoin::Network; use clap::Clap; +use futures::StreamExt; use model::cfd::{Cfd, Order}; use model::WalletInfo; use rocket::fairing::AdHoc; @@ -14,6 +14,7 @@ use rocket_db_pools::Database; use std::collections::HashMap; use std::path::PathBuf; use tokio::sync::watch; +use tokio_util::codec::FramedRead; use tracing_subscriber::filter::LevelFilter; use xtra::prelude::*; use xtra::spawn::TokioGlobalSpawnExt; @@ -201,18 +202,16 @@ async fn main() -> Result<()> { let taker_id = TakerId::default(); let (read, write) = socket.into_split(); - let in_taker_actor = in_taker_messages( - read, - cfd_maker_actor_inbox.clone(), - taker_id, + let read = FramedRead::new(read, wire::JsonCodec::new()).map( + move |item| maker_cfd::TakerStreamMessage { taker_id, item }, ); + tokio::spawn(cfd_maker_actor_inbox.clone().attach_stream(read)); + let out_msg_actor = send_to_socket::Actor::new(write) .create(None) .spawn_global(); - tokio::spawn(in_taker_actor); - maker_inc_connections_address .do_send_async(maker_inc_connections::NewTakerOnline { taker_id, diff --git a/daemon/src/maker_cfd.rs b/daemon/src/maker_cfd.rs index d0eca64..1afe2a4 100644 --- a/daemon/src/maker_cfd.rs +++ b/daemon/src/maker_cfd.rs @@ -8,7 +8,7 @@ use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, use crate::model::{TakerId, Usd}; use crate::monitor::MonitorParams; use crate::wallet::Wallet; -use crate::wire::SetupMsg; +use crate::wire::{self, SetupMsg}; use crate::{maker_inc_connections, monitor, setup_contract_actor}; use anyhow::Result; use async_trait::async_trait; @@ -16,12 +16,7 @@ use bdk::bitcoin::secp256k1::schnorrsig; use std::time::SystemTime; use tokio::sync::{mpsc, watch}; use xtra::prelude::*; - -pub struct TakeOrder { - pub taker_id: TakerId, - pub order_id: OrderId, - pub quantity: Usd, -} +use xtra::KeepRunning; pub struct AcceptOrder { pub order_id: OrderId, @@ -37,13 +32,16 @@ pub struct NewTakerOnline { pub id: TakerId, } -pub struct IncProtocolMsg(pub SetupMsg); - pub struct CfdSetupCompleted { pub order_id: OrderId, pub dlc: Dlc, } +pub struct TakerStreamMessage { + pub taker_id: TakerId, + pub item: Result, +} + pub struct Actor { db: sqlx::SqlitePool, wallet: Wallet, @@ -128,8 +126,11 @@ impl Actor { Ok(()) } - async fn handle_inc_protocol_msg(&mut self, msg: IncProtocolMsg) -> Result<()> { - let msg = msg.0; + async fn handle_inc_protocol_msg( + &mut self, + _taker_id: TakerId, + msg: wire::SetupMsg, + ) -> Result<()> { let inbox = match &self.current_contract_setup { None => { self.contract_setup_message_buffer.push(msg); @@ -138,6 +139,7 @@ impl Actor { Some(inbox) => inbox, }; inbox.send(msg)?; + Ok(()) } @@ -198,13 +200,12 @@ impl Actor { Ok(()) } - async fn handle_take_order(&mut self, msg: TakeOrder) -> Result<()> { - let TakeOrder { - taker_id, - order_id, - quantity, - } = msg; - + async fn handle_take_order( + &mut self, + taker_id: TakerId, + order_id: OrderId, + quantity: Usd, + ) -> Result<()> { tracing::debug!(%taker_id, %quantity, %order_id, "Taker wants to take an order"); let mut conn = self.db.acquire().await?; @@ -229,7 +230,7 @@ impl Actor { // 2. Insert CFD in DB let cfd = Cfd::new( current_order.clone(), - msg.quantity, + quantity, CfdState::IncomingOrderRequest { common: CfdStateCommon { transition_timestamp: SystemTime::now(), @@ -435,13 +436,6 @@ impl Actor { } } -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: TakeOrder, _ctx: &mut Context) { - log_error!(self.handle_take_order(msg)) - } -} - #[async_trait] impl Handler for Actor { async fn handle(&mut self, msg: AcceptOrder, ctx: &mut Context) { @@ -470,13 +464,6 @@ impl Handler for Actor { } } -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: IncProtocolMsg, _ctx: &mut Context) { - log_error!(self.handle_inc_protocol_msg(msg)); - } -} - #[async_trait] impl Handler for Actor { async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context) { @@ -491,8 +478,36 @@ impl Handler for Actor { } } -impl Message for TakeOrder { - type Result = (); +#[async_trait] +impl Handler for Actor { + async fn handle(&mut self, msg: TakerStreamMessage, _ctx: &mut Context) -> KeepRunning { + let TakerStreamMessage { + taker_id: taker, + item, + } = msg; + let msg = match item { + Ok(msg) => msg, + Err(e) => { + tracing::warn!( + "Error while receiving message from taker {}: {:#}", + taker, + e + ); + return KeepRunning::Yes; + } + }; + + match msg { + wire::TakerToMaker::TakeOrder { order_id, quantity } => { + log_error!(self.handle_take_order(taker, order_id, quantity)) + } + wire::TakerToMaker::Protocol(msg) => { + log_error!(self.handle_inc_protocol_msg(taker, msg)) + } + } + + KeepRunning::Yes + } } impl Message for NewOrder { @@ -503,10 +518,6 @@ impl Message for NewTakerOnline { type Result = (); } -impl Message for IncProtocolMsg { - type Result = (); -} - impl Message for CfdSetupCompleted { type Result = (); } @@ -519,4 +530,9 @@ impl Message for RejectOrder { type Result = (); } +// this signature is a bit different because we use `Address::attach_stream` +impl Message for TakerStreamMessage { + type Result = KeepRunning; +} + impl xtra::Actor for Actor {} diff --git a/daemon/src/maker_inc_connections.rs b/daemon/src/maker_inc_connections.rs index 18f32ee..beaa8eb 100644 --- a/daemon/src/maker_inc_connections.rs +++ b/daemon/src/maker_inc_connections.rs @@ -5,10 +5,7 @@ use crate::wire::SetupMsg; use crate::{maker_cfd, send_to_socket, wire}; use anyhow::{Context as AnyhowContext, Result}; use async_trait::async_trait; -use futures::{Future, StreamExt}; use std::collections::HashMap; -use tokio::net::tcp::OwnedReadHalf; -use tokio_util::codec::FramedRead; use xtra::prelude::*; pub struct BroadcastOrder(pub Option); @@ -148,39 +145,3 @@ impl Handler for Actor { Ok(()) } } - -// - -pub fn in_taker_messages( - read: OwnedReadHalf, - cfd_actor_inbox: Address, - taker_id: TakerId, -) -> impl Future { - let mut messages = FramedRead::new(read, wire::JsonCodec::new()); - - async move { - while let Some(message) = messages.next().await { - match message { - Ok(wire::TakerToMaker::TakeOrder { order_id, quantity }) => { - cfd_actor_inbox - .do_send_async(maker_cfd::TakeOrder { - taker_id, - order_id, - quantity, - }) - .await - .unwrap(); - } - Ok(wire::TakerToMaker::Protocol(msg)) => { - cfd_actor_inbox - .do_send_async(maker_cfd::IncProtocolMsg(msg)) - .await - .unwrap(); - } - Err(error) => { - tracing::error!(%error, "Error in reading message"); - } - } - } - } -}