Browse Source

Replace `taker_inc_message_actor` with `Address::attach_stream`

`attach_stream` uses `send` under the hood which will process messages
in order. This will guarantee that the `taker_cfd::Actor` processes the
messages in the order they come in on the wire. That allows us to assume
that we will always get the `Accept` message before the first `Setup` message.
fix-bad-api-calls
Thomas Eizinger 3 years ago
parent
commit
e677ba7a74
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 10
      daemon/src/taker.rs
  2. 83
      daemon/src/taker_cfd.rs
  3. 52
      daemon/src/taker_inc_message_actor.rs

10
daemon/src/taker.rs

@ -4,6 +4,7 @@ use anyhow::{Context, Result};
use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1}; use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1};
use bdk::bitcoin::Network; use bdk::bitcoin::Network;
use clap::Clap; use clap::Clap;
use futures::StreamExt;
use model::cfd::{Cfd, Order}; use model::cfd::{Cfd, Order};
use rocket::fairing::AdHoc; use rocket::fairing::AdHoc;
use rocket_db_pools::Database; use rocket_db_pools::Database;
@ -14,6 +15,7 @@ use std::path::PathBuf;
use std::thread::sleep; use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
use tokio::sync::watch; use tokio::sync::watch;
use tokio_util::codec::FramedRead;
use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::filter::LevelFilter;
use wire::TakerToMaker; use wire::TakerToMaker;
use xtra::spawn::TokioGlobalSpawnExt; use xtra::spawn::TokioGlobalSpawnExt;
@ -32,7 +34,6 @@ mod seed;
mod send_to_socket; mod send_to_socket;
mod setup_contract_actor; mod setup_contract_actor;
mod taker_cfd; mod taker_cfd;
mod taker_inc_message_actor;
mod to_sse_event; mod to_sse_event;
mod wallet; mod wallet;
mod wallet_sync; mod wallet_sync;
@ -181,17 +182,16 @@ async fn main() -> Result<()> {
.create(None) .create(None)
.spawn_global(); .spawn_global();
let inc_maker_messages_actor = let read = FramedRead::new(read, wire::JsonCodec::new())
taker_inc_message_actor::new(read, cfd_actor_inbox.clone()); .map(move |item| taker_cfd::MakerStreamMessage { item });
tokio::spawn(cfd_actor_inbox.clone().attach_stream(read));
tokio::spawn(monitor_actor_context.run(monitor::Actor::new( tokio::spawn(monitor_actor_context.run(monitor::Actor::new(
&opts.electrum, &opts.electrum,
HashMap::new(), HashMap::new(),
cfd_actor_inbox.clone(), cfd_actor_inbox.clone(),
))); )));
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
tokio::spawn(inc_maker_messages_actor);
Ok(rocket.manage(cfd_actor_inbox)) Ok(rocket.manage(cfd_actor_inbox))
}, },

83
daemon/src/taker_cfd.rs

@ -1,10 +1,11 @@
use crate::actors::log_error;
use crate::db::{ use crate::db::{
insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds, insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds,
load_cfd_by_order_id, load_order_by_id, load_cfd_by_order_id, load_order_by_id,
}; };
use crate::model::cfd::{
use crate::actors::log_error; Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId, Origin,
use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId}; };
use crate::model::Usd; use crate::model::Usd;
use crate::monitor::MonitorParams; use crate::monitor::MonitorParams;
use crate::wallet::Wallet; use crate::wallet::Wallet;
@ -16,17 +17,16 @@ use bdk::bitcoin::secp256k1::schnorrsig;
use std::time::SystemTime; use std::time::SystemTime;
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
use xtra::prelude::*; use xtra::prelude::*;
use xtra::KeepRunning;
pub struct TakeOffer { pub struct TakeOffer {
pub order_id: OrderId, pub order_id: OrderId,
pub quantity: Usd, pub quantity: Usd,
} }
pub struct NewOrder(pub Option<Order>); pub struct MakerStreamMessage {
pub item: Result<wire::MakerToTaker>,
pub struct OrderAccepted(pub OrderId); }
pub struct OrderRejected(pub OrderId);
pub struct IncProtocolMsg(pub SetupMsg);
pub struct CfdSetupCompleted { pub struct CfdSetupCompleted {
pub order_id: OrderId, pub order_id: OrderId,
@ -103,7 +103,9 @@ impl Actor {
async fn handle_new_order(&mut self, order: Option<Order>) -> Result<()> { async fn handle_new_order(&mut self, order: Option<Order>) -> Result<()> {
match order { match order {
Some(order) => { Some(mut order) => {
order.origin = Origin::Theirs;
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
insert_order(&order, &mut conn).await?; insert_order(&order, &mut conn).await?;
self.order_feed_actor_inbox.send(Some(order))?; self.order_feed_actor_inbox.send(Some(order))?;
@ -209,6 +211,7 @@ impl Actor {
Some(inbox) => inbox, Some(inbox) => inbox,
}; };
inbox.send(msg)?; inbox.send(msg)?;
Ok(()) Ok(())
} }
@ -305,30 +308,37 @@ impl Handler<TakeOffer> for Actor {
} }
#[async_trait] #[async_trait]
impl Handler<NewOrder> for Actor { impl Handler<MakerStreamMessage> for Actor {
async fn handle(&mut self, msg: NewOrder, _ctx: &mut Context<Self>) { async fn handle(
log_error!(self.handle_new_order(msg.0)); &mut self,
message: MakerStreamMessage,
ctx: &mut Context<Self>,
) -> KeepRunning {
let msg = match message.item {
Ok(msg) => msg,
Err(e) => {
tracing::warn!("Error while receiving message from maker: {:#}", e);
return KeepRunning::Yes;
} }
} };
#[async_trait] match msg {
impl Handler<OrderAccepted> for Actor { wire::MakerToTaker::CurrentOrder(current_order) => {
async fn handle(&mut self, msg: OrderAccepted, ctx: &mut Context<Self>) { log_error!(self.handle_new_order(current_order))
log_error!(self.handle_order_accepted(msg.0, ctx)); }
wire::MakerToTaker::ConfirmOrder(order_id) => {
log_error!(self.handle_order_accepted(order_id, ctx))
}
wire::MakerToTaker::RejectOrder(order_id) => {
log_error!(self.handle_order_rejected(order_id))
}
wire::MakerToTaker::InvalidOrderId(_) => todo!(),
wire::MakerToTaker::Protocol(setup_msg) => {
log_error!(self.handle_inc_protocol_msg(setup_msg))
} }
}
#[async_trait]
impl Handler<OrderRejected> for Actor {
async fn handle(&mut self, msg: OrderRejected, _ctx: &mut Context<Self>) {
log_error!(self.handle_order_rejected(msg.0));
} }
}
#[async_trait] KeepRunning::Yes
impl Handler<IncProtocolMsg> for Actor {
async fn handle(&mut self, msg: IncProtocolMsg, _ctx: &mut Context<Self>) {
log_error!(self.handle_inc_protocol_msg(msg.0));
} }
} }
@ -350,20 +360,9 @@ impl Message for TakeOffer {
type Result = (); type Result = ();
} }
impl Message for NewOrder { // this signature is a bit different because we use `Address::attach_stream`
type Result = (); impl Message for MakerStreamMessage {
} type Result = KeepRunning;
impl Message for OrderAccepted {
type Result = ();
}
impl Message for OrderRejected {
type Result = ();
}
impl Message for IncProtocolMsg {
type Result = ();
} }
impl Message for CfdSetupCompleted { impl Message for CfdSetupCompleted {

52
daemon/src/taker_inc_message_actor.rs

@ -1,52 +0,0 @@
use crate::model::cfd::Origin;
use crate::wire::JsonCodec;
use crate::{taker_cfd, wire};
use futures::{Future, StreamExt};
use tokio::net::tcp::OwnedReadHalf;
use tokio_util::codec::FramedRead;
use xtra::prelude::*;
pub fn new(read: OwnedReadHalf, cfd_actor: Address<taker_cfd::Actor>) -> impl Future<Output = ()> {
let mut messages = FramedRead::new(read, JsonCodec::new());
async move {
while let Some(message) = messages.next().await {
match message {
Ok(wire::MakerToTaker::CurrentOrder(mut order)) => {
if let Some(order) = order.as_mut() {
order.origin = Origin::Theirs;
}
cfd_actor
.do_send_async(taker_cfd::NewOrder(order))
.await
.expect("actor to be always available");
}
Ok(wire::MakerToTaker::ConfirmOrder(order_id)) => {
// TODO: This naming is not well aligned.
cfd_actor
.do_send_async(taker_cfd::OrderAccepted(order_id))
.await
.expect("actor to be always available");
}
Ok(wire::MakerToTaker::RejectOrder(order_id)) => {
cfd_actor
.do_send_async(taker_cfd::OrderRejected(order_id))
.await
.expect("actor to be always available");
}
Ok(wire::MakerToTaker::InvalidOrderId(_)) => {
todo!()
}
Ok(wire::MakerToTaker::Protocol(msg)) => {
cfd_actor
.do_send_async(taker_cfd::IncProtocolMsg(msg))
.await
.expect("actor to be always available");
}
Err(error) => {
tracing::error!("Error in reading message: {}", error);
}
}
}
}
}
Loading…
Cancel
Save