Browse Source

Merge pull request #124 from comit-network/fix-happy-path

Fix happy path
fix-bad-api-calls
Mariusz 3 years ago
committed by GitHub
parent
commit
342fd0c3b8
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      daemon/src/db.rs
  2. 188
      daemon/src/maker_cfd_actor.rs
  3. 6
      daemon/src/maker_inc_connections_actor.rs
  4. 17
      daemon/src/taker_cfd_actor.rs
  5. 3
      daemon/src/wire.rs
  6. 9
      frontend/src/Maker.tsx

3
daemon/src/db.rs

@ -234,8 +234,7 @@ async fn load_latest_cfd_state(
.fetch_one(conn)
.await?;
let latest_cfd_state_in_db: CfdState =
serde_json::from_str(dbg!(latest_cfd_state).state.as_str())?;
let latest_cfd_state_in_db: CfdState = serde_json::from_str(latest_cfd_state.state.as_str())?;
Ok(latest_cfd_state_in_db)
}

188
daemon/src/maker_cfd_actor.rs

@ -25,11 +25,6 @@ pub struct TakeOrder {
pub struct NewOrder(pub Order);
pub struct StartContractSetup {
pub taker_id: TakerId,
pub order_id: OrderId,
}
pub struct NewTakerOnline {
pub id: TakerId,
}
@ -114,86 +109,6 @@ impl MakerCfdActor {
.context("Maker inc connections actor to be initialised")
}
async fn handle_start_contract_setup(
&mut self,
msg: StartContractSetup,
ctx: &mut Context<Self>,
) -> Result<()> {
let StartContractSetup { taker_id, order_id } = msg;
tracing::info!("Starting contract setup");
// Kick-off the CFD protocol
let (sk, pk) = crate::keypair::new(&mut rand::thread_rng());
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let margin = cfd.margin()?;
let maker_params = self.wallet.build_party_params(margin, pk).await?;
let (actor, inbox) = setup_contract_actor::new(
{
let takers = self.takers()?.clone();
move |msg| {
tokio::spawn({
let inbox = takers.clone();
async move {
inbox
.do_send_async(maker_inc_connections_actor::TakerMessage {
taker_id,
command: TakerCommand::OutProtocolMsg { setup_msg: msg },
})
.await
.unwrap();
}
});
}
},
setup_contract_actor::OwnParams::Maker(maker_params),
sk,
self.oracle_pk,
cfd,
self.wallet.clone(),
);
self.current_contract_setup.replace(inbox.clone());
for msg in self.contract_setup_message_buffer.drain(..) {
inbox.send(msg)?;
}
// TODO: Should we do this here or already earlier or after the spawn?
insert_new_cfd_state_by_order_id(
order_id,
CfdState::ContractSetup {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
&mut conn,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let address = ctx
.address()
.expect("actor to be able to give address to itself");
tokio::spawn(async move {
address
.do_send_async(CfdSetupCompleted {
order_id,
dlc: actor.await,
})
.await
});
Ok(())
}
async fn handle_new_taker_online(&mut self, msg: NewTakerOnline) -> Result<()> {
let mut conn = self.db.acquire().await?;
@ -263,23 +178,27 @@ impl MakerCfdActor {
Ok(())
}
async fn handle_take_order(&mut self, msg: TakeOrder) -> Result<()> {
tracing::debug!(%msg.taker_id, %msg.quantity, %msg.order_id,
"Taker wants to take an order"
);
async fn handle_take_order(&mut self, msg: TakeOrder, ctx: &mut Context<Self>) -> Result<()> {
let TakeOrder {
taker_id,
order_id,
quantity,
} = msg;
tracing::debug!(%taker_id, %quantity, %order_id, "Taker wants to take an order");
let mut conn = self.db.acquire().await?;
// 1. Validate if order is still valid
let current_order = match self.current_order_id {
Some(current_order_id) if current_order_id == msg.order_id => {
Some(current_order_id) if current_order_id == order_id => {
load_order_by_id(current_order_id, &mut conn).await?
}
_ => {
self.takers()?
.do_send_async(maker_inc_connections_actor::TakerMessage {
taker_id: msg.taker_id,
command: TakerCommand::NotifyInvalidOrderId { id: msg.order_id },
taker_id,
command: TakerCommand::NotifyInvalidOrderId { id: order_id },
})
.await?;
// TODO: Return an error here?
@ -291,7 +210,7 @@ impl MakerCfdActor {
// TODO: Don't auto-accept, present to user in UI instead
let cfd = Cfd::new(
current_order.clone(),
msg.quantity,
quantity,
CfdState::Accepted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
@ -302,8 +221,8 @@ impl MakerCfdActor {
self.takers()?
.do_send_async(maker_inc_connections_actor::TakerMessage {
taker_id: msg.taker_id,
command: TakerCommand::NotifyOrderAccepted { id: msg.order_id },
taker_id,
command: TakerCommand::NotifyOrderAccepted { id: order_id },
})
.await?;
self.cfd_feed_actor_inbox
@ -316,10 +235,72 @@ impl MakerCfdActor {
.await?;
self.order_feed_sender.send(None)?;
// 4. Start contract setup
tracing::info!("Starting contract setup");
// Kick-off the CFD protocol
let (sk, pk) = crate::keypair::new(&mut rand::thread_rng());
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let margin = cfd.margin()?;
let maker_params = self.wallet.build_party_params(margin, pk).await?;
let (actor, inbox) = setup_contract_actor::new(
{
let inbox = self.takers()?.clone();
move |msg| {
tokio::spawn(
inbox.do_send_async(maker_inc_connections_actor::TakerMessage {
taker_id,
command: TakerCommand::OutProtocolMsg { setup_msg: msg },
}),
);
}
},
setup_contract_actor::OwnParams::Maker(maker_params),
sk,
self.oracle_pk,
cfd,
self.wallet.clone(),
);
self.current_contract_setup.replace(inbox.clone());
for msg in self.contract_setup_message_buffer.drain(..) {
inbox.send(msg)?;
}
// TODO: Should we do this here or already earlier or after the spawn?
insert_new_cfd_state_by_order_id(
order_id,
CfdState::ContractSetup {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
&mut conn,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let address = ctx
.address()
.expect("actor to be able to give address to itself");
tokio::spawn(async move {
address
.do_send_async(CfdSetupCompleted {
order_id,
dlc: actor.await,
})
.await
});
Ok(())
}
}
#[async_trait]
impl Handler<Initialized> for MakerCfdActor {
async fn handle(&mut self, msg: Initialized, _ctx: &mut Context<Self>) {
@ -337,8 +318,8 @@ macro_rules! log_error {
#[async_trait]
impl Handler<TakeOrder> for MakerCfdActor {
async fn handle(&mut self, msg: TakeOrder, _ctx: &mut Context<Self>) {
log_error!(self.handle_take_order(msg))
async fn handle(&mut self, msg: TakeOrder, ctx: &mut Context<Self>) {
log_error!(self.handle_take_order(msg, ctx))
}
}
@ -349,13 +330,6 @@ impl Handler<NewOrder> for MakerCfdActor {
}
}
#[async_trait]
impl Handler<StartContractSetup> for MakerCfdActor {
async fn handle(&mut self, msg: StartContractSetup, ctx: &mut Context<Self>) {
log_error!(self.handle_start_contract_setup(msg, ctx));
}
}
#[async_trait]
impl Handler<NewTakerOnline> for MakerCfdActor {
async fn handle(&mut self, msg: NewTakerOnline, _ctx: &mut Context<Self>) {
@ -396,10 +370,6 @@ impl Message for NewOrder {
type Result = ();
}
impl Message for StartContractSetup {
type Result = ();
}
impl Message for NewTakerOnline {
type Result = ();
}

6
daemon/src/maker_inc_connections_actor.rs

@ -165,12 +165,6 @@ pub fn in_taker_messages(
.await
.unwrap();
}
Ok(wire::TakerToMaker::StartContractSetup(order_id)) => {
cfd_actor_inbox
.do_send_async(maker_cfd_actor::StartContractSetup { taker_id, order_id })
.await
.unwrap();
}
Ok(wire::TakerToMaker::Protocol(msg)) => {
cfd_actor_inbox
.do_send_async(maker_cfd_actor::IncProtocolMsg(msg))

17
daemon/src/taker_cfd_actor.rs

@ -8,7 +8,6 @@ use crate::wallet::Wallet;
use crate::wire::SetupMsg;
use crate::{setup_contract_actor, wire};
use bdk::bitcoin::secp256k1::schnorrsig;
use core::panic;
use futures::Future;
use std::time::SystemTime;
use tokio::sync::{mpsc, watch};
@ -35,6 +34,7 @@ pub fn new(
) -> (impl Future<Output = ()>, mpsc::UnboundedSender<Command>) {
let (sender, mut receiver) = mpsc::unbounded_channel();
let mut current_contract_setup = None;
let mut contract_setup_message_buffer = vec![];
let actor = {
let sender = sender.clone();
@ -88,6 +88,8 @@ pub fn new(
order_feed_actor_inbox.send(None).unwrap();
}
Command::OrderAccepted(order_id) => {
tracing::info!(%order_id, "Order got accepted");
let mut conn = db.acquire().await.unwrap();
insert_new_cfd_state_by_order_id(
order_id,
@ -101,10 +103,6 @@ pub fn new(
.await
.unwrap();
out_msg_maker_inbox
.send(wire::TakerToMaker::StartContractSetup(order_id))
.unwrap();
cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await.unwrap())
.unwrap();
@ -128,6 +126,10 @@ pub fn new(
wallet.clone(),
);
for msg in contract_setup_message_buffer.drain(..) {
inbox.send(msg).unwrap();
}
tokio::spawn({
let sender = sender.clone();
@ -144,7 +146,10 @@ pub fn new(
}
Command::IncProtocolMsg(msg) => {
let inbox = match &current_contract_setup {
None => panic!("whoops"),
None => {
contract_setup_message_buffer.push(msg);
continue;
}
Some(inbox) => inbox,
};

3
daemon/src/wire.rs

@ -14,9 +14,6 @@ use std::ops::RangeInclusive;
#[allow(clippy::large_enum_variant)]
pub enum TakerToMaker {
TakeOrder { order_id: OrderId, quantity: Usd },
// TODO: Currently the taker starts, can already send some stuff for signing over in the first
// message.
StartContractSetup(OrderId),
Protocol(SetupMsg),
}

9
frontend/src/Maker.tsx

@ -31,7 +31,14 @@ interface CfdSellOrderPayload {
}
async function postCfdSellOrderRequest(payload: CfdSellOrderPayload) {
let res = await fetch(`/api/order/sell`, { method: "POST", body: JSON.stringify(payload), credentials: "include" });
let res = await fetch(`/api/order/sell`, {
method: "POST",
body: JSON.stringify(payload),
headers: {
"Content-Type": "application/json",
},
credentials: "include",
});
if (!res.status.toString().startsWith("2")) {
console.log("Status: " + res.status + ", " + res.statusText);

Loading…
Cancel
Save