From 7dd527ac72526d86e404c036bb5b0379fdf21717 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 23 Sep 2021 12:36:45 +1000 Subject: [PATCH 1/5] Let rocket know what we are sending is JSON --- frontend/src/Maker.tsx | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/frontend/src/Maker.tsx b/frontend/src/Maker.tsx index f737215..0c3527f 100644 --- a/frontend/src/Maker.tsx +++ b/frontend/src/Maker.tsx @@ -31,7 +31,14 @@ interface CfdSellOrderPayload { } async function postCfdSellOrderRequest(payload: CfdSellOrderPayload) { - let res = await fetch(`/api/order/sell`, { method: "POST", body: JSON.stringify(payload), credentials: "include" }); + let res = await fetch(`/api/order/sell`, { + method: "POST", + body: JSON.stringify(payload), + headers: { + "Content-Type": "application/json", + }, + credentials: "include", + }); if (!res.status.toString().startsWith("2")) { console.log("Status: " + res.status + ", " + res.statusText); From 49bff80b93890299a621e67fdc315f4252bb1d3d Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 22 Sep 2021 18:11:15 +1000 Subject: [PATCH 2/5] Destruct msg to access variables directly --- daemon/src/maker_cfd_actor.rs | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/daemon/src/maker_cfd_actor.rs b/daemon/src/maker_cfd_actor.rs index f36ef62..92f2b08 100644 --- a/daemon/src/maker_cfd_actor.rs +++ b/daemon/src/maker_cfd_actor.rs @@ -264,22 +264,26 @@ impl MakerCfdActor { } async fn handle_take_order(&mut self, msg: TakeOrder) -> Result<()> { - tracing::debug!(%msg.taker_id, %msg.quantity, %msg.order_id, - "Taker wants to take an order" - ); + let TakeOrder { + taker_id, + order_id, + quantity, + } = msg; + + tracing::debug!(%taker_id, %quantity, %order_id, "Taker wants to take an order"); let mut conn = self.db.acquire().await?; // 1. Validate if order is still valid let current_order = match self.current_order_id { - Some(current_order_id) if current_order_id == msg.order_id => { + Some(current_order_id) if current_order_id == order_id => { load_order_by_id(current_order_id, &mut conn).await? } _ => { self.takers()? .do_send_async(maker_inc_connections_actor::TakerMessage { - taker_id: msg.taker_id, - command: TakerCommand::NotifyInvalidOrderId { id: msg.order_id }, + taker_id, + command: TakerCommand::NotifyInvalidOrderId { id: order_id }, }) .await?; // TODO: Return an error here? @@ -291,7 +295,7 @@ impl MakerCfdActor { // TODO: Don't auto-accept, present to user in UI instead let cfd = Cfd::new( current_order.clone(), - msg.quantity, + quantity, CfdState::Accepted { common: CfdStateCommon { transition_timestamp: SystemTime::now(), @@ -302,8 +306,8 @@ impl MakerCfdActor { self.takers()? .do_send_async(maker_inc_connections_actor::TakerMessage { - taker_id: msg.taker_id, - command: TakerCommand::NotifyOrderAccepted { id: msg.order_id }, + taker_id, + command: TakerCommand::NotifyOrderAccepted { id: order_id }, }) .await?; self.cfd_feed_actor_inbox From 816f9b9791e9f11c7177b545297bfe2dc809d7f1 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 23 Sep 2021 13:21:30 +1000 Subject: [PATCH 3/5] Buffer incoming messages in case the maker is faster in reaching setup --- daemon/src/taker_cfd_actor.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/daemon/src/taker_cfd_actor.rs b/daemon/src/taker_cfd_actor.rs index 2e39f31..c4c24c4 100644 --- a/daemon/src/taker_cfd_actor.rs +++ b/daemon/src/taker_cfd_actor.rs @@ -8,7 +8,6 @@ use crate::wallet::Wallet; use crate::wire::SetupMsg; use crate::{setup_contract_actor, wire}; use bdk::bitcoin::secp256k1::schnorrsig; -use core::panic; use futures::Future; use std::time::SystemTime; use tokio::sync::{mpsc, watch}; @@ -35,6 +34,7 @@ pub fn new( ) -> (impl Future, mpsc::UnboundedSender) { let (sender, mut receiver) = mpsc::unbounded_channel(); let mut current_contract_setup = None; + let mut contract_setup_message_buffer = vec![]; let actor = { let sender = sender.clone(); @@ -88,6 +88,8 @@ pub fn new( order_feed_actor_inbox.send(None).unwrap(); } Command::OrderAccepted(order_id) => { + tracing::info!(%order_id, "Order got accepted"); + let mut conn = db.acquire().await.unwrap(); insert_new_cfd_state_by_order_id( order_id, @@ -128,6 +130,10 @@ pub fn new( wallet.clone(), ); + for msg in contract_setup_message_buffer.drain(..) { + inbox.send(msg).unwrap(); + } + tokio::spawn({ let sender = sender.clone(); @@ -144,7 +150,10 @@ pub fn new( } Command::IncProtocolMsg(msg) => { let inbox = match ¤t_contract_setup { - None => panic!("whoops"), + None => { + contract_setup_message_buffer.push(msg); + continue; + } Some(inbox) => inbox, }; From e7f9d24e94c180c9b8a7022dd2fec8056580f2fc Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 23 Sep 2021 10:50:14 +1000 Subject: [PATCH 4/5] Start contract setup directly after sending the accept The taker does this unconditionally, we don't need to wait for another message on the maker side. --- daemon/src/maker_cfd_actor.rs | 166 +++++++++------------- daemon/src/maker_inc_connections_actor.rs | 6 - daemon/src/taker_cfd_actor.rs | 4 - daemon/src/wire.rs | 3 - 4 files changed, 66 insertions(+), 113 deletions(-) diff --git a/daemon/src/maker_cfd_actor.rs b/daemon/src/maker_cfd_actor.rs index 92f2b08..dae8322 100644 --- a/daemon/src/maker_cfd_actor.rs +++ b/daemon/src/maker_cfd_actor.rs @@ -25,11 +25,6 @@ pub struct TakeOrder { pub struct NewOrder(pub Order); -pub struct StartContractSetup { - pub taker_id: TakerId, - pub order_id: OrderId, -} - pub struct NewTakerOnline { pub id: TakerId, } @@ -114,86 +109,6 @@ impl MakerCfdActor { .context("Maker inc connections actor to be initialised") } - async fn handle_start_contract_setup( - &mut self, - msg: StartContractSetup, - ctx: &mut Context, - ) -> Result<()> { - let StartContractSetup { taker_id, order_id } = msg; - - tracing::info!("Starting contract setup"); - - // Kick-off the CFD protocol - let (sk, pk) = crate::keypair::new(&mut rand::thread_rng()); - - let mut conn = self.db.acquire().await?; - - let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; - let margin = cfd.margin()?; - - let maker_params = self.wallet.build_party_params(margin, pk).await?; - - let (actor, inbox) = setup_contract_actor::new( - { - let takers = self.takers()?.clone(); - move |msg| { - tokio::spawn({ - let inbox = takers.clone(); - async move { - inbox - .do_send_async(maker_inc_connections_actor::TakerMessage { - taker_id, - command: TakerCommand::OutProtocolMsg { setup_msg: msg }, - }) - .await - .unwrap(); - } - }); - } - }, - setup_contract_actor::OwnParams::Maker(maker_params), - sk, - self.oracle_pk, - cfd, - self.wallet.clone(), - ); - - self.current_contract_setup.replace(inbox.clone()); - - for msg in self.contract_setup_message_buffer.drain(..) { - inbox.send(msg)?; - } - - // TODO: Should we do this here or already earlier or after the spawn? - insert_new_cfd_state_by_order_id( - order_id, - CfdState::ContractSetup { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - }, - &mut conn, - ) - .await?; - self.cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await?)?; - - let address = ctx - .address() - .expect("actor to be able to give address to itself"); - - tokio::spawn(async move { - address - .do_send_async(CfdSetupCompleted { - order_id, - dlc: actor.await, - }) - .await - }); - - Ok(()) - } - async fn handle_new_taker_online(&mut self, msg: NewTakerOnline) -> Result<()> { let mut conn = self.db.acquire().await?; @@ -263,7 +178,7 @@ impl MakerCfdActor { Ok(()) } - async fn handle_take_order(&mut self, msg: TakeOrder) -> Result<()> { + async fn handle_take_order(&mut self, msg: TakeOrder, ctx: &mut Context) -> Result<()> { let TakeOrder { taker_id, order_id, @@ -320,10 +235,72 @@ impl MakerCfdActor { .await?; self.order_feed_sender.send(None)?; + // 4. Start contract setup + tracing::info!("Starting contract setup"); + + // Kick-off the CFD protocol + let (sk, pk) = crate::keypair::new(&mut rand::thread_rng()); + + let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + let margin = cfd.margin()?; + + let maker_params = self.wallet.build_party_params(margin, pk).await?; + + let (actor, inbox) = setup_contract_actor::new( + { + let inbox = self.takers()?.clone(); + move |msg| { + tokio::spawn( + inbox.do_send_async(maker_inc_connections_actor::TakerMessage { + taker_id, + command: TakerCommand::OutProtocolMsg { setup_msg: msg }, + }), + ); + } + }, + setup_contract_actor::OwnParams::Maker(maker_params), + sk, + self.oracle_pk, + cfd, + self.wallet.clone(), + ); + + self.current_contract_setup.replace(inbox.clone()); + + for msg in self.contract_setup_message_buffer.drain(..) { + inbox.send(msg)?; + } + + // TODO: Should we do this here or already earlier or after the spawn? + insert_new_cfd_state_by_order_id( + order_id, + CfdState::ContractSetup { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + }, + &mut conn, + ) + .await?; + self.cfd_feed_actor_inbox + .send(load_all_cfds(&mut conn).await?)?; + + let address = ctx + .address() + .expect("actor to be able to give address to itself"); + + tokio::spawn(async move { + address + .do_send_async(CfdSetupCompleted { + order_id, + dlc: actor.await, + }) + .await + }); + Ok(()) } } - #[async_trait] impl Handler for MakerCfdActor { async fn handle(&mut self, msg: Initialized, _ctx: &mut Context) { @@ -341,8 +318,8 @@ macro_rules! log_error { #[async_trait] impl Handler for MakerCfdActor { - async fn handle(&mut self, msg: TakeOrder, _ctx: &mut Context) { - log_error!(self.handle_take_order(msg)) + async fn handle(&mut self, msg: TakeOrder, ctx: &mut Context) { + log_error!(self.handle_take_order(msg, ctx)) } } @@ -353,13 +330,6 @@ impl Handler for MakerCfdActor { } } -#[async_trait] -impl Handler for MakerCfdActor { - async fn handle(&mut self, msg: StartContractSetup, ctx: &mut Context) { - log_error!(self.handle_start_contract_setup(msg, ctx)); - } -} - #[async_trait] impl Handler for MakerCfdActor { async fn handle(&mut self, msg: NewTakerOnline, _ctx: &mut Context) { @@ -400,10 +370,6 @@ impl Message for NewOrder { type Result = (); } -impl Message for StartContractSetup { - type Result = (); -} - impl Message for NewTakerOnline { type Result = (); } diff --git a/daemon/src/maker_inc_connections_actor.rs b/daemon/src/maker_inc_connections_actor.rs index bc19f13..3887b46 100644 --- a/daemon/src/maker_inc_connections_actor.rs +++ b/daemon/src/maker_inc_connections_actor.rs @@ -165,12 +165,6 @@ pub fn in_taker_messages( .await .unwrap(); } - Ok(wire::TakerToMaker::StartContractSetup(order_id)) => { - cfd_actor_inbox - .do_send_async(maker_cfd_actor::StartContractSetup { taker_id, order_id }) - .await - .unwrap(); - } Ok(wire::TakerToMaker::Protocol(msg)) => { cfd_actor_inbox .do_send_async(maker_cfd_actor::IncProtocolMsg(msg)) diff --git a/daemon/src/taker_cfd_actor.rs b/daemon/src/taker_cfd_actor.rs index c4c24c4..ccd9721 100644 --- a/daemon/src/taker_cfd_actor.rs +++ b/daemon/src/taker_cfd_actor.rs @@ -103,10 +103,6 @@ pub fn new( .await .unwrap(); - out_msg_maker_inbox - .send(wire::TakerToMaker::StartContractSetup(order_id)) - .unwrap(); - cfd_feed_actor_inbox .send(load_all_cfds(&mut conn).await.unwrap()) .unwrap(); diff --git a/daemon/src/wire.rs b/daemon/src/wire.rs index 39cfdee..7fb0813 100644 --- a/daemon/src/wire.rs +++ b/daemon/src/wire.rs @@ -14,9 +14,6 @@ use std::ops::RangeInclusive; #[allow(clippy::large_enum_variant)] pub enum TakerToMaker { TakeOrder { order_id: OrderId, quantity: Usd }, - // TODO: Currently the taker starts, can already send some stuff for signing over in the first - // message. - StartContractSetup(OrderId), Protocol(SetupMsg), } From 33a27d4d9288bf7b5cb030def4b1aeaccafc640a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 23 Sep 2021 13:20:34 +1000 Subject: [PATCH 5/5] Don't dbg log the latest state --- daemon/src/db.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/daemon/src/db.rs b/daemon/src/db.rs index 0747253..b374d8c 100644 --- a/daemon/src/db.rs +++ b/daemon/src/db.rs @@ -234,8 +234,7 @@ async fn load_latest_cfd_state( .fetch_one(conn) .await?; - let latest_cfd_state_in_db: CfdState = - serde_json::from_str(dbg!(latest_cfd_state).state.as_str())?; + let latest_cfd_state_in_db: CfdState = serde_json::from_str(latest_cfd_state.state.as_str())?; Ok(latest_cfd_state_in_db) }