Browse Source

Replace `in_taker_messages` with `Address::attach_stream`

fix-bad-api-calls
Thomas Eizinger 3 years ago
parent
commit
0d1406ec63
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 13
      daemon/src/maker.rs
  2. 94
      daemon/src/maker_cfd.rs
  3. 39
      daemon/src/maker_inc_connections.rs

13
daemon/src/maker.rs

@ -1,5 +1,4 @@
use crate::auth::MAKER_USERNAME; use crate::auth::MAKER_USERNAME;
use crate::maker_inc_connections::in_taker_messages;
use crate::model::TakerId; use crate::model::TakerId;
use crate::seed::Seed; use crate::seed::Seed;
use crate::wallet::Wallet; use crate::wallet::Wallet;
@ -7,6 +6,7 @@ use anyhow::{Context, Result};
use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1}; use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1};
use bdk::bitcoin::Network; use bdk::bitcoin::Network;
use clap::Clap; use clap::Clap;
use futures::StreamExt;
use model::cfd::{Cfd, Order}; use model::cfd::{Cfd, Order};
use model::WalletInfo; use model::WalletInfo;
use rocket::fairing::AdHoc; use rocket::fairing::AdHoc;
@ -14,6 +14,7 @@ use rocket_db_pools::Database;
use std::collections::HashMap; use std::collections::HashMap;
use std::path::PathBuf; use std::path::PathBuf;
use tokio::sync::watch; use tokio::sync::watch;
use tokio_util::codec::FramedRead;
use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::filter::LevelFilter;
use xtra::prelude::*; use xtra::prelude::*;
use xtra::spawn::TokioGlobalSpawnExt; use xtra::spawn::TokioGlobalSpawnExt;
@ -201,18 +202,16 @@ async fn main() -> Result<()> {
let taker_id = TakerId::default(); let taker_id = TakerId::default();
let (read, write) = socket.into_split(); let (read, write) = socket.into_split();
let in_taker_actor = in_taker_messages( let read = FramedRead::new(read, wire::JsonCodec::new()).map(
read, move |item| maker_cfd::TakerStreamMessage { taker_id, item },
cfd_maker_actor_inbox.clone(),
taker_id,
); );
tokio::spawn(cfd_maker_actor_inbox.clone().attach_stream(read));
let out_msg_actor = send_to_socket::Actor::new(write) let out_msg_actor = send_to_socket::Actor::new(write)
.create(None) .create(None)
.spawn_global(); .spawn_global();
tokio::spawn(in_taker_actor);
maker_inc_connections_address maker_inc_connections_address
.do_send_async(maker_inc_connections::NewTakerOnline { .do_send_async(maker_inc_connections::NewTakerOnline {
taker_id, taker_id,

94
daemon/src/maker_cfd.rs

@ -8,7 +8,7 @@ use crate::model::cfd::{Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc,
use crate::model::{TakerId, Usd}; use crate::model::{TakerId, Usd};
use crate::monitor::MonitorParams; use crate::monitor::MonitorParams;
use crate::wallet::Wallet; use crate::wallet::Wallet;
use crate::wire::SetupMsg; use crate::wire::{self, SetupMsg};
use crate::{maker_inc_connections, monitor, setup_contract_actor}; use crate::{maker_inc_connections, monitor, setup_contract_actor};
use anyhow::Result; use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
@ -16,12 +16,7 @@ use bdk::bitcoin::secp256k1::schnorrsig;
use std::time::SystemTime; use std::time::SystemTime;
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
use xtra::prelude::*; use xtra::prelude::*;
use xtra::KeepRunning;
pub struct TakeOrder {
pub taker_id: TakerId,
pub order_id: OrderId,
pub quantity: Usd,
}
pub struct AcceptOrder { pub struct AcceptOrder {
pub order_id: OrderId, pub order_id: OrderId,
@ -37,13 +32,16 @@ pub struct NewTakerOnline {
pub id: TakerId, pub id: TakerId,
} }
pub struct IncProtocolMsg(pub SetupMsg);
pub struct CfdSetupCompleted { pub struct CfdSetupCompleted {
pub order_id: OrderId, pub order_id: OrderId,
pub dlc: Dlc, pub dlc: Dlc,
} }
pub struct TakerStreamMessage {
pub taker_id: TakerId,
pub item: Result<wire::TakerToMaker>,
}
pub struct Actor { pub struct Actor {
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Wallet,
@ -128,8 +126,11 @@ impl Actor {
Ok(()) Ok(())
} }
async fn handle_inc_protocol_msg(&mut self, msg: IncProtocolMsg) -> Result<()> { async fn handle_inc_protocol_msg(
let msg = msg.0; &mut self,
_taker_id: TakerId,
msg: wire::SetupMsg,
) -> Result<()> {
let inbox = match &self.current_contract_setup { let inbox = match &self.current_contract_setup {
None => { None => {
self.contract_setup_message_buffer.push(msg); self.contract_setup_message_buffer.push(msg);
@ -138,6 +139,7 @@ impl Actor {
Some(inbox) => inbox, Some(inbox) => inbox,
}; };
inbox.send(msg)?; inbox.send(msg)?;
Ok(()) Ok(())
} }
@ -198,13 +200,12 @@ impl Actor {
Ok(()) Ok(())
} }
async fn handle_take_order(&mut self, msg: TakeOrder) -> Result<()> { async fn handle_take_order(
let TakeOrder { &mut self,
taker_id, taker_id: TakerId,
order_id, order_id: OrderId,
quantity, quantity: Usd,
} = msg; ) -> Result<()> {
tracing::debug!(%taker_id, %quantity, %order_id, "Taker wants to take an order"); tracing::debug!(%taker_id, %quantity, %order_id, "Taker wants to take an order");
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
@ -229,7 +230,7 @@ impl Actor {
// 2. Insert CFD in DB // 2. Insert CFD in DB
let cfd = Cfd::new( let cfd = Cfd::new(
current_order.clone(), current_order.clone(),
msg.quantity, quantity,
CfdState::IncomingOrderRequest { CfdState::IncomingOrderRequest {
common: CfdStateCommon { common: CfdStateCommon {
transition_timestamp: SystemTime::now(), transition_timestamp: SystemTime::now(),
@ -435,13 +436,6 @@ impl Actor {
} }
} }
#[async_trait]
impl Handler<TakeOrder> for Actor {
async fn handle(&mut self, msg: TakeOrder, _ctx: &mut Context<Self>) {
log_error!(self.handle_take_order(msg))
}
}
#[async_trait] #[async_trait]
impl Handler<AcceptOrder> for Actor { impl Handler<AcceptOrder> for Actor {
async fn handle(&mut self, msg: AcceptOrder, ctx: &mut Context<Self>) { async fn handle(&mut self, msg: AcceptOrder, ctx: &mut Context<Self>) {
@ -470,13 +464,6 @@ impl Handler<NewTakerOnline> for Actor {
} }
} }
#[async_trait]
impl Handler<IncProtocolMsg> for Actor {
async fn handle(&mut self, msg: IncProtocolMsg, _ctx: &mut Context<Self>) {
log_error!(self.handle_inc_protocol_msg(msg));
}
}
#[async_trait] #[async_trait]
impl Handler<CfdSetupCompleted> for Actor { impl Handler<CfdSetupCompleted> for Actor {
async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context<Self>) {
@ -491,8 +478,36 @@ impl Handler<monitor::Event> for Actor {
} }
} }
impl Message for TakeOrder { #[async_trait]
type Result = (); impl Handler<TakerStreamMessage> for Actor {
async fn handle(&mut self, msg: TakerStreamMessage, _ctx: &mut Context<Self>) -> KeepRunning {
let TakerStreamMessage {
taker_id: taker,
item,
} = msg;
let msg = match item {
Ok(msg) => msg,
Err(e) => {
tracing::warn!(
"Error while receiving message from taker {}: {:#}",
taker,
e
);
return KeepRunning::Yes;
}
};
match msg {
wire::TakerToMaker::TakeOrder { order_id, quantity } => {
log_error!(self.handle_take_order(taker, order_id, quantity))
}
wire::TakerToMaker::Protocol(msg) => {
log_error!(self.handle_inc_protocol_msg(taker, msg))
}
}
KeepRunning::Yes
}
} }
impl Message for NewOrder { impl Message for NewOrder {
@ -503,10 +518,6 @@ impl Message for NewTakerOnline {
type Result = (); type Result = ();
} }
impl Message for IncProtocolMsg {
type Result = ();
}
impl Message for CfdSetupCompleted { impl Message for CfdSetupCompleted {
type Result = (); type Result = ();
} }
@ -519,4 +530,9 @@ impl Message for RejectOrder {
type Result = (); type Result = ();
} }
// this signature is a bit different because we use `Address::attach_stream`
impl Message for TakerStreamMessage {
type Result = KeepRunning;
}
impl xtra::Actor for Actor {} impl xtra::Actor for Actor {}

39
daemon/src/maker_inc_connections.rs

@ -5,10 +5,7 @@ use crate::wire::SetupMsg;
use crate::{maker_cfd, send_to_socket, wire}; use crate::{maker_cfd, send_to_socket, wire};
use anyhow::{Context as AnyhowContext, Result}; use anyhow::{Context as AnyhowContext, Result};
use async_trait::async_trait; use async_trait::async_trait;
use futures::{Future, StreamExt};
use std::collections::HashMap; use std::collections::HashMap;
use tokio::net::tcp::OwnedReadHalf;
use tokio_util::codec::FramedRead;
use xtra::prelude::*; use xtra::prelude::*;
pub struct BroadcastOrder(pub Option<Order>); pub struct BroadcastOrder(pub Option<Order>);
@ -148,39 +145,3 @@ impl Handler<NewTakerOnline> for Actor {
Ok(()) Ok(())
} }
} }
//
pub fn in_taker_messages(
read: OwnedReadHalf,
cfd_actor_inbox: Address<maker_cfd::Actor>,
taker_id: TakerId,
) -> impl Future<Output = ()> {
let mut messages = FramedRead::new(read, wire::JsonCodec::new());
async move {
while let Some(message) = messages.next().await {
match message {
Ok(wire::TakerToMaker::TakeOrder { order_id, quantity }) => {
cfd_actor_inbox
.do_send_async(maker_cfd::TakeOrder {
taker_id,
order_id,
quantity,
})
.await
.unwrap();
}
Ok(wire::TakerToMaker::Protocol(msg)) => {
cfd_actor_inbox
.do_send_async(maker_cfd::IncProtocolMsg(msg))
.await
.unwrap();
}
Err(error) => {
tracing::error!(%error, "Error in reading message");
}
}
}
}
}

Loading…
Cancel
Save