Browse Source

Introduce a dedicated codec

fix-bad-api-calls
Thomas Eizinger 3 years ago
parent
commit
eda1b88b0a
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 1
      Cargo.lock
  2. 1
      daemon/Cargo.toml
  3. 11
      daemon/src/maker_inc_connections.rs
  4. 23
      daemon/src/send_to_socket.rs
  5. 4
      daemon/src/taker_cfd.rs
  6. 10
      daemon/src/taker_inc_message_actor.rs
  7. 52
      daemon/src/wire.rs

1
Cargo.lock

@ -492,6 +492,7 @@ dependencies = [
"async-trait",
"atty",
"bdk",
"bytes",
"cfd_protocol",
"clap",
"futures",

1
daemon/Cargo.toml

@ -8,6 +8,7 @@ anyhow = "1"
async-trait = "0.1.51"
atty = "0.2"
bdk = { git = "https://github.com/bitcoindevkit/bdk/" }
bytes = "1"
cfd_protocol = { path = "../cfd_protocol" }
clap = "3.0.0-beta.4"
futures = { version = "0.3", default-features = false }

11
daemon/src/maker_inc_connections.rs

@ -8,7 +8,7 @@ use async_trait::async_trait;
use futures::{Future, StreamExt};
use std::collections::HashMap;
use tokio::net::tcp::OwnedReadHalf;
use tokio_util::codec::{FramedRead, LengthDelimitedCodec};
use tokio_util::codec::FramedRead;
use xtra::prelude::*;
pub struct BroadcastOrder(pub Option<Order>);
@ -37,7 +37,7 @@ impl Message for TakerMessage {
pub struct NewTakerOnline {
pub taker_id: TakerId,
pub out_msg_actor: Address<send_to_socket::Actor>,
pub out_msg_actor: Address<send_to_socket::Actor<wire::MakerToTaker>>,
}
impl Message for NewTakerOnline {
@ -45,7 +45,7 @@ impl Message for NewTakerOnline {
}
pub struct Actor {
write_connections: HashMap<TakerId, Address<send_to_socket::Actor>>,
write_connections: HashMap<TakerId, Address<send_to_socket::Actor<wire::MakerToTaker>>>,
cfd_actor: Address<maker_cfd::Actor>,
}
@ -156,10 +156,7 @@ pub fn in_taker_messages(
cfd_actor_inbox: Address<maker_cfd::Actor>,
taker_id: TakerId,
) -> impl Future<Output = ()> {
let mut messages = FramedRead::new(read, LengthDelimitedCodec::new()).map(|result| {
let message = serde_json::from_slice::<wire::TakerToMaker>(&result?)?;
anyhow::Result::<_>::Ok(message)
});
let mut messages = FramedRead::new(read, wire::JsonCodec::new());
async move {
while let Some(message) = messages.next().await {

23
daemon/src/send_to_socket.rs

@ -1,35 +1,36 @@
use crate::wire::JsonCodec;
use futures::SinkExt;
use serde::Serialize;
use std::fmt;
use tokio::net::tcp::OwnedWriteHalf;
use tokio_util::codec::{FramedWrite, LengthDelimitedCodec};
use tokio_util::codec::FramedWrite;
use xtra::{Handler, Message};
pub struct Actor {
write: FramedWrite<OwnedWriteHalf, LengthDelimitedCodec>,
pub struct Actor<T> {
write: FramedWrite<OwnedWriteHalf, JsonCodec<T>>,
}
impl Actor {
impl<T> Actor<T> {
pub fn new(write: OwnedWriteHalf) -> Self {
Self {
write: FramedWrite::new(write, LengthDelimitedCodec::new()),
write: FramedWrite::new(write, JsonCodec::new()),
}
}
}
#[async_trait::async_trait]
impl<T> Handler<T> for Actor
impl<T> Handler<T> for Actor<T>
where
T: Message<Result = ()> + Serialize + fmt::Display,
T: Message<Result = ()> + Serialize + fmt::Display + Sync,
{
async fn handle(&mut self, message: T, ctx: &mut xtra::Context<Self>) {
let bytes = serde_json::to_vec(&message).expect("serialization should never fail");
let message_name = message.to_string(); // send consumes the message, avoid a clone just in case it errors by getting the name here
if let Err(e) = self.write.send(bytes.into()).await {
tracing::error!("Failed to write message {} to socket: {}", message, e);
if let Err(e) = self.write.send(message).await {
tracing::error!("Failed to write message {} to socket: {}", message_name, e);
ctx.stop();
}
}
}
impl xtra::Actor for Actor {}
impl<T: 'static + Send> xtra::Actor for Actor<T> {}

4
daemon/src/taker_cfd.rs

@ -39,7 +39,7 @@ pub struct Actor {
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>,
send_to_maker: Address<send_to_socket::Actor>,
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>,
current_contract_setup: Option<mpsc::UnboundedSender<SetupMsg>>,
// TODO: Move the contract setup into a dedicated actor and send messages to that actor that
// manages the state instead of this ugly buffer
@ -54,7 +54,7 @@ impl Actor {
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>,
send_to_maker: Address<send_to_socket::Actor>,
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>,
monitor_actor: Address<monitor::Actor<Actor>>,
) -> Result<Self> {
let mut conn = db.acquire().await?;

10
daemon/src/taker_inc_message_actor.rs

@ -1,17 +1,13 @@
use crate::model::cfd::Origin;
use crate::wire::JsonCodec;
use crate::{taker_cfd, wire};
use futures::{Future, StreamExt};
use tokio::net::tcp::OwnedReadHalf;
use tokio_util::codec::{FramedRead, LengthDelimitedCodec};
use tokio_util::codec::FramedRead;
use xtra::prelude::*;
pub fn new(read: OwnedReadHalf, cfd_actor: Address<taker_cfd::Actor>) -> impl Future<Output = ()> {
let frame_read = FramedRead::new(read, LengthDelimitedCodec::new());
let mut messages = frame_read.map(|result| {
let message = serde_json::from_slice::<wire::MakerToTaker>(&result?)?;
anyhow::Result::<_>::Ok(message)
});
let mut messages = FramedRead::new(read, JsonCodec::new());
async move {
while let Some(message) = messages.next().await {

52
daemon/src/wire.rs

@ -4,11 +4,15 @@ use crate::Order;
use bdk::bitcoin::secp256k1::Signature;
use bdk::bitcoin::util::psbt::PartiallySignedTransaction;
use bdk::bitcoin::{Address, Amount, PublicKey};
use bytes::BytesMut;
use cfd_protocol::secp256k1_zkp::EcdsaAdaptorSignature;
use cfd_protocol::{CfdTransactions, PartyParams, PunishParams};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::marker::PhantomData;
use std::ops::RangeInclusive;
use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type", content = "payload")]
@ -50,6 +54,54 @@ impl fmt::Display for MakerToTaker {
}
}
pub struct JsonCodec<T> {
_type: PhantomData<T>,
inner: LengthDelimitedCodec,
}
impl<T> JsonCodec<T> {
pub fn new() -> Self {
Self {
_type: PhantomData,
inner: LengthDelimitedCodec::new(),
}
}
}
impl<T> Decoder for JsonCodec<T>
where
T: DeserializeOwned,
{
type Item = T;
type Error = anyhow::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let bytes = match self.inner.decode(src)? {
None => return Ok(None),
Some(bytes) => bytes,
};
let item = serde_json::from_slice(&bytes)?;
Ok(Some(item))
}
}
impl<T> Encoder<T> for JsonCodec<T>
where
T: Serialize,
{
type Error = anyhow::Error;
fn encode(&mut self, item: T, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = serde_json::to_vec(&item)?;
self.inner.encode(bytes.into(), dst)?;
Ok(())
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type", content = "payload")]
pub enum SetupMsg {

Loading…
Cancel
Save