diff --git a/daemon/src/taker.rs b/daemon/src/taker.rs index 406cd38..cabec88 100644 --- a/daemon/src/taker.rs +++ b/daemon/src/taker.rs @@ -4,6 +4,7 @@ use anyhow::{Context, Result}; use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1}; use bdk::bitcoin::Network; use clap::Clap; +use futures::StreamExt; use model::cfd::{Cfd, Order}; use rocket::fairing::AdHoc; use rocket_db_pools::Database; @@ -14,6 +15,7 @@ use std::path::PathBuf; use std::thread::sleep; use std::time::Duration; use tokio::sync::watch; +use tokio_util::codec::FramedRead; use tracing_subscriber::filter::LevelFilter; use wire::TakerToMaker; use xtra::spawn::TokioGlobalSpawnExt; @@ -32,7 +34,6 @@ mod seed; mod send_to_socket; mod setup_contract_actor; mod taker_cfd; -mod taker_inc_message_actor; mod to_sse_event; mod wallet; mod wallet_sync; @@ -181,17 +182,16 @@ async fn main() -> Result<()> { .create(None) .spawn_global(); - let inc_maker_messages_actor = - taker_inc_message_actor::new(read, cfd_actor_inbox.clone()); + let read = FramedRead::new(read, wire::JsonCodec::new()) + .map(move |item| taker_cfd::MakerStreamMessage { item }); + tokio::spawn(cfd_actor_inbox.clone().attach_stream(read)); tokio::spawn(monitor_actor_context.run(monitor::Actor::new( &opts.electrum, HashMap::new(), cfd_actor_inbox.clone(), ))); - tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); - tokio::spawn(inc_maker_messages_actor); Ok(rocket.manage(cfd_actor_inbox)) }, diff --git a/daemon/src/taker_cfd.rs b/daemon/src/taker_cfd.rs index 2a7339f..c27cd83 100644 --- a/daemon/src/taker_cfd.rs +++ b/daemon/src/taker_cfd.rs @@ -1,10 +1,11 @@ +use crate::actors::log_error; use crate::db::{ insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds, load_cfd_by_order_id, load_order_by_id, }; - -use crate::actors::log_error; -use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId}; +use crate::model::cfd::{ + Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId, Origin, +}; use crate::model::Usd; use crate::monitor::MonitorParams; use crate::wallet::Wallet; @@ -16,17 +17,16 @@ use bdk::bitcoin::secp256k1::schnorrsig; use std::time::SystemTime; use tokio::sync::{mpsc, watch}; use xtra::prelude::*; +use xtra::KeepRunning; pub struct TakeOffer { pub order_id: OrderId, pub quantity: Usd, } -pub struct NewOrder(pub Option); - -pub struct OrderAccepted(pub OrderId); -pub struct OrderRejected(pub OrderId); -pub struct IncProtocolMsg(pub SetupMsg); +pub struct MakerStreamMessage { + pub item: Result, +} pub struct CfdSetupCompleted { pub order_id: OrderId, @@ -103,7 +103,9 @@ impl Actor { async fn handle_new_order(&mut self, order: Option) -> Result<()> { match order { - Some(order) => { + Some(mut order) => { + order.origin = Origin::Theirs; + let mut conn = self.db.acquire().await?; insert_order(&order, &mut conn).await?; self.order_feed_actor_inbox.send(Some(order))?; @@ -209,6 +211,7 @@ impl Actor { Some(inbox) => inbox, }; inbox.send(msg)?; + Ok(()) } @@ -305,30 +308,37 @@ impl Handler for Actor { } #[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: NewOrder, _ctx: &mut Context) { - log_error!(self.handle_new_order(msg.0)); - } -} - -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: OrderAccepted, ctx: &mut Context) { - log_error!(self.handle_order_accepted(msg.0, ctx)); - } -} +impl Handler for Actor { + async fn handle( + &mut self, + message: MakerStreamMessage, + ctx: &mut Context, + ) -> KeepRunning { + let msg = match message.item { + Ok(msg) => msg, + Err(e) => { + tracing::warn!("Error while receiving message from maker: {:#}", e); + return KeepRunning::Yes; + } + }; -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: OrderRejected, _ctx: &mut Context) { - log_error!(self.handle_order_rejected(msg.0)); - } -} + match msg { + wire::MakerToTaker::CurrentOrder(current_order) => { + log_error!(self.handle_new_order(current_order)) + } + wire::MakerToTaker::ConfirmOrder(order_id) => { + log_error!(self.handle_order_accepted(order_id, ctx)) + } + wire::MakerToTaker::RejectOrder(order_id) => { + log_error!(self.handle_order_rejected(order_id)) + } + wire::MakerToTaker::InvalidOrderId(_) => todo!(), + wire::MakerToTaker::Protocol(setup_msg) => { + log_error!(self.handle_inc_protocol_msg(setup_msg)) + } + } -#[async_trait] -impl Handler for Actor { - async fn handle(&mut self, msg: IncProtocolMsg, _ctx: &mut Context) { - log_error!(self.handle_inc_protocol_msg(msg.0)); + KeepRunning::Yes } } @@ -350,20 +360,9 @@ impl Message for TakeOffer { type Result = (); } -impl Message for NewOrder { - type Result = (); -} - -impl Message for OrderAccepted { - type Result = (); -} - -impl Message for OrderRejected { - type Result = (); -} - -impl Message for IncProtocolMsg { - type Result = (); +// this signature is a bit different because we use `Address::attach_stream` +impl Message for MakerStreamMessage { + type Result = KeepRunning; } impl Message for CfdSetupCompleted { diff --git a/daemon/src/taker_inc_message_actor.rs b/daemon/src/taker_inc_message_actor.rs deleted file mode 100644 index c82ef70..0000000 --- a/daemon/src/taker_inc_message_actor.rs +++ /dev/null @@ -1,52 +0,0 @@ -use crate::model::cfd::Origin; -use crate::wire::JsonCodec; -use crate::{taker_cfd, wire}; -use futures::{Future, StreamExt}; -use tokio::net::tcp::OwnedReadHalf; -use tokio_util::codec::FramedRead; -use xtra::prelude::*; - -pub fn new(read: OwnedReadHalf, cfd_actor: Address) -> impl Future { - let mut messages = FramedRead::new(read, JsonCodec::new()); - - async move { - while let Some(message) = messages.next().await { - match message { - Ok(wire::MakerToTaker::CurrentOrder(mut order)) => { - if let Some(order) = order.as_mut() { - order.origin = Origin::Theirs; - } - cfd_actor - .do_send_async(taker_cfd::NewOrder(order)) - .await - .expect("actor to be always available"); - } - Ok(wire::MakerToTaker::ConfirmOrder(order_id)) => { - // TODO: This naming is not well aligned. - cfd_actor - .do_send_async(taker_cfd::OrderAccepted(order_id)) - .await - .expect("actor to be always available"); - } - Ok(wire::MakerToTaker::RejectOrder(order_id)) => { - cfd_actor - .do_send_async(taker_cfd::OrderRejected(order_id)) - .await - .expect("actor to be always available"); - } - Ok(wire::MakerToTaker::InvalidOrderId(_)) => { - todo!() - } - Ok(wire::MakerToTaker::Protocol(msg)) => { - cfd_actor - .do_send_async(taker_cfd::IncProtocolMsg(msg)) - .await - .expect("actor to be always available"); - } - Err(error) => { - tracing::error!("Error in reading message: {}", error); - } - } - } - } -}