Browse Source

Allow accepting/rejecting settlement proposals in the maker daemon

Keep track of outgoing/incoming settlement proposals in a hashmap inside the CFD
rocket that is available as a Rocket state in order to derive correct CfdState
for the UI.

Add placeholders for actions for accepting/rejecting settlement offers in the maker.

Also:
    - rename Accept/Reject in few places to reduce ambiguity between interacting
      with an order and a settlement proposal.
    - allow only one in-flight settlement proposal from the taker
upload-correct-windows-binary
Mariusz Klochowicz 3 years ago
parent
commit
30a549c8bb
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 6
      daemon/src/maker.rs
  2. 73
      daemon/src/maker_cfd.rs
  3. 10
      daemon/src/model/cfd.rs
  4. 43
      daemon/src/routes_maker.rs
  5. 33
      daemon/src/routes_taker.rs
  6. 6
      daemon/src/taker.rs
  7. 39
      daemon/src/taker_cfd.rs
  8. 119
      daemon/src/to_sse_event.rs
  9. 4
      frontend/src/components/Types.tsx
  10. 8
      frontend/src/components/cfdtables/CfdTable.tsx

6
daemon/src/maker.rs

@ -1,5 +1,6 @@
use crate::auth::MAKER_USERNAME; use crate::auth::MAKER_USERNAME;
use crate::db::load_all_cfds; use crate::db::load_all_cfds;
use crate::model::cfd::SettlementProposals;
use crate::seed::Seed; use crate::seed::Seed;
use crate::wallet::Wallet; use crate::wallet::Wallet;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
@ -10,6 +11,7 @@ use model::cfd::Order;
use model::WalletInfo; use model::WalletInfo;
use rocket::fairing::AdHoc; use rocket::fairing::AdHoc;
use rocket_db_pools::Database; use rocket_db_pools::Database;
use std::collections::HashMap;
use std::path::PathBuf; use std::path::PathBuf;
use std::task::Poll; use std::task::Poll;
use std::time::Duration; use std::time::Duration;
@ -111,6 +113,8 @@ async fn main() -> Result<()> {
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None); let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info); let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info);
let (settlement_feed_sender, settlement_feed_receiver) =
watch::channel::<SettlementProposals>(SettlementProposals::Incoming(HashMap::new()));
let figment = rocket::Config::figment() let figment = rocket::Config::figment()
.merge(("databases.maker.url", data_dir.join("maker.sqlite"))) .merge(("databases.maker.url", data_dir.join("maker.sqlite")))
@ -127,6 +131,7 @@ async fn main() -> Result<()> {
rocket::custom(figment) rocket::custom(figment)
.manage(order_feed_receiver) .manage(order_feed_receiver)
.manage(wallet_feed_receiver) .manage(wallet_feed_receiver)
.manage(settlement_feed_receiver)
.manage(auth_password) .manage(auth_password)
.manage(quote_updates) .manage(quote_updates)
.attach(Db::init()) .attach(Db::init())
@ -175,6 +180,7 @@ async fn main() -> Result<()> {
schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle), schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle),
cfd_feed_sender, cfd_feed_sender,
order_feed_sender, order_feed_sender,
settlement_feed_sender,
maker_inc_connections_address.clone(), maker_inc_connections_address.clone(),
monitor_actor_address.clone(), monitor_actor_address.clone(),
oracle_actor_address, oracle_actor_address,

73
daemon/src/maker_cfd.rs

@ -6,7 +6,7 @@ use crate::db::{
use crate::maker_inc_connections::TakerCommand; use crate::maker_inc_connections::TakerCommand;
use crate::model::cfd::{ use crate::model::cfd::{
Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId, Role, Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId, Role,
SettlementProposal, SettlementProposal, SettlementProposals,
}; };
use crate::model::{TakerId, Usd}; use crate::model::{TakerId, Usd};
use crate::monitor::MonitorParams; use crate::monitor::MonitorParams;
@ -17,6 +17,7 @@ use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::secp256k1::schnorrsig;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::{future, SinkExt}; use futures::{future, SinkExt};
use std::collections::HashMap;
use std::time::SystemTime; use std::time::SystemTime;
use tokio::sync::watch; use tokio::sync::watch;
use xtra::prelude::*; use xtra::prelude::*;
@ -34,6 +35,14 @@ pub struct Commit {
pub order_id: OrderId, pub order_id: OrderId,
} }
pub struct AcceptSettlement {
pub order_id: OrderId,
}
pub struct RejectSettlement {
pub order_id: OrderId,
}
pub struct NewOrder(pub Order); pub struct NewOrder(pub Order);
pub struct NewTakerOnline { pub struct NewTakerOnline {
@ -56,12 +65,14 @@ pub struct Actor {
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_sender: watch::Sender<Option<Order>>, order_feed_sender: watch::Sender<Option<Order>>,
settlements_feed_sender: watch::Sender<SettlementProposals>,
takers: Address<maker_inc_connections::Actor>, takers: Address<maker_inc_connections::Actor>,
current_order_id: Option<OrderId>, current_order_id: Option<OrderId>,
monitor_actor: Address<monitor::Actor<Actor>>, monitor_actor: Address<monitor::Actor<Actor>>,
setup_state: SetupState, setup_state: SetupState,
latest_announcement: Option<oracle::Announcement>, latest_announcement: Option<oracle::Announcement>,
_oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>, _oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>,
current_settlement_proposals: HashMap<OrderId, SettlementProposal>,
} }
enum SetupState { enum SetupState {
@ -80,6 +91,7 @@ impl Actor {
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_sender: watch::Sender<Option<Order>>, order_feed_sender: watch::Sender<Option<Order>>,
settlements_feed_sender: watch::Sender<SettlementProposals>,
takers: Address<maker_inc_connections::Actor>, takers: Address<maker_inc_connections::Actor>,
monitor_actor: Address<monitor::Actor<Actor>>, monitor_actor: Address<monitor::Actor<Actor>>,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>, oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>,
@ -90,15 +102,25 @@ impl Actor {
oracle_pk, oracle_pk,
cfd_feed_actor_inbox, cfd_feed_actor_inbox,
order_feed_sender, order_feed_sender,
settlements_feed_sender,
takers, takers,
current_order_id: None, current_order_id: None,
monitor_actor, monitor_actor,
setup_state: SetupState::None, setup_state: SetupState::None,
latest_announcement: None, latest_announcement: None,
_oracle_actor: oracle_actor, _oracle_actor: oracle_actor,
current_settlement_proposals: HashMap::new(),
} }
} }
fn send_current_settlement_proposals(&self) -> Result<()> {
Ok(self
.settlements_feed_sender
.send(SettlementProposals::Incoming(
self.current_settlement_proposals.clone(),
))?)
}
async fn handle_new_order(&mut self, order: Order) -> Result<()> { async fn handle_new_order(&mut self, order: Order) -> Result<()> {
// 1. Save to DB // 1. Save to DB
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
@ -142,7 +164,10 @@ impl Actor {
"Received settlement proposal from the taker: {:?}", "Received settlement proposal from the taker: {:?}",
proposal proposal
); );
// TODO: Handle the proposal self.current_settlement_proposals
.insert(proposal.order_id, proposal);
self.send_current_settlement_proposals()?;
Ok(()) Ok(())
} }
@ -427,7 +452,29 @@ impl Actor {
self.cfd_feed_actor_inbox self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?; .send(load_all_cfds(&mut conn).await?)?;
Ok(())
}
async fn handle_accept_settlement(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Maker accepts a settlement proposal" );
// TODO: Initiate the settlement
self.current_settlement_proposals
.remove(&order_id)
.context("Could not find proposal for given order id")?;
self.send_current_settlement_proposals()?;
Ok(())
}
async fn handle_reject_settlement(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Maker rejects a settlement proposal" );
// TODO: Handle rejection offer:
// - notify the taker that the settlement was rejected
self.current_settlement_proposals
.remove(&order_id)
.context("Could not find proposal for given order id")?;
self.send_current_settlement_proposals()?;
Ok(()) Ok(())
} }
@ -494,6 +541,20 @@ impl Handler<RejectOrder> for Actor {
} }
} }
#[async_trait]
impl Handler<AcceptSettlement> for Actor {
async fn handle(&mut self, msg: AcceptSettlement, _ctx: &mut Context<Self>) {
log_error!(self.handle_accept_settlement(msg.order_id))
}
}
#[async_trait]
impl Handler<RejectSettlement> for Actor {
async fn handle(&mut self, msg: RejectSettlement, _ctx: &mut Context<Self>) {
log_error!(self.handle_reject_settlement(msg.order_id))
}
}
#[async_trait] #[async_trait]
impl Handler<Commit> for Actor { impl Handler<Commit> for Actor {
async fn handle(&mut self, msg: Commit, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: Commit, _ctx: &mut Context<Self>) {
@ -612,6 +673,14 @@ impl Message for Commit {
type Result = (); type Result = ();
} }
impl Message for AcceptSettlement {
type Result = ();
}
impl Message for RejectSettlement {
type Result = ();
}
// this signature is a bit different because we use `Address::attach_stream` // this signature is a bit different because we use `Address::attach_stream`
impl Message for TakerStreamMessage { impl Message for TakerStreamMessage {
type Result = KeepRunning; type Result = KeepRunning;

10
daemon/src/model/cfd.rs

@ -11,6 +11,7 @@ use rust_decimal::prelude::{FromPrimitive, ToPrimitive};
use rust_decimal::Decimal; use rust_decimal::Decimal;
use rust_decimal_macros::dec; use rust_decimal_macros::dec;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt; use std::fmt;
use std::ops::{Neg, RangeInclusive}; use std::ops::{Neg, RangeInclusive};
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
@ -49,7 +50,7 @@ pub enum Origin {
} }
/// Role in the Cfd /// Role in the Cfd
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone, PartialEq)]
pub enum Role { pub enum Role {
Maker, Maker,
Taker, Taker,
@ -350,6 +351,13 @@ pub struct SettlementProposal {
pub maker: Amount, pub maker: Amount,
} }
#[derive(Debug, Clone)]
#[allow(dead_code)] // Variants used by different binaries
pub enum SettlementProposals {
Incoming(HashMap<OrderId, SettlementProposal>),
Outgoing(HashMap<OrderId, SettlementProposal>),
}
/// Represents a cfd (including state) /// Represents a cfd (including state)
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Cfd { pub struct Cfd {

43
daemon/src/routes_maker.rs

@ -1,8 +1,8 @@
use crate::auth::Authenticated; use crate::auth::Authenticated;
use crate::model::cfd::{Cfd, Order, OrderId, Origin}; use crate::model::cfd::{Cfd, Order, OrderId, Origin, Role, SettlementProposals};
use crate::model::{Usd, WalletInfo}; use crate::model::{Usd, WalletInfo};
use crate::routes::EmbeddedFileExt; use crate::routes::EmbeddedFileExt;
use crate::to_sse_event::{CfdAction, CfdsWithCurrentPrice, ToSseEvent}; use crate::to_sse_event::{CfdAction, CfdsWithAuxData, ToSseEvent};
use crate::{bitmex_price_feed, maker_cfd}; use crate::{bitmex_price_feed, maker_cfd};
use anyhow::Result; use anyhow::Result;
use rocket::http::{ContentType, Header, Status}; use rocket::http::{ContentType, Header, Status};
@ -24,12 +24,14 @@ pub async fn maker_feed(
rx_order: &State<watch::Receiver<Option<Order>>>, rx_order: &State<watch::Receiver<Option<Order>>>,
rx_wallet: &State<watch::Receiver<WalletInfo>>, rx_wallet: &State<watch::Receiver<WalletInfo>>,
rx_quote: &State<watch::Receiver<bitmex_price_feed::Quote>>, rx_quote: &State<watch::Receiver<bitmex_price_feed::Quote>>,
rx_settlements: &State<watch::Receiver<SettlementProposals>>,
_auth: Authenticated, _auth: Authenticated,
) -> EventStream![] { ) -> EventStream![] {
let mut rx_cfds = rx_cfds.inner().clone(); let mut rx_cfds = rx_cfds.inner().clone();
let mut rx_order = rx_order.inner().clone(); let mut rx_order = rx_order.inner().clone();
let mut rx_wallet = rx_wallet.inner().clone(); let mut rx_wallet = rx_wallet.inner().clone();
let mut rx_quote = rx_quote.inner().clone(); let mut rx_quote = rx_quote.inner().clone();
let mut rx_settlements = rx_settlements.inner().clone();
EventStream! { EventStream! {
let wallet_info = rx_wallet.borrow().clone(); let wallet_info = rx_wallet.borrow().clone();
@ -41,11 +43,7 @@ pub async fn maker_feed(
let quote = rx_quote.borrow().clone(); let quote = rx_quote.borrow().clone();
yield quote.to_sse_event(); yield quote.to_sse_event();
let cfds_with_price = CfdsWithCurrentPrice { yield CfdsWithAuxData::new(&rx_cfds, &rx_quote, &rx_settlements, Role::Maker).to_sse_event();
cfds: rx_cfds.borrow().clone(),
current_price: quote.for_maker(),
};
yield cfds_with_price.to_sse_event();
loop{ loop{
select! { select! {
@ -58,20 +56,15 @@ pub async fn maker_feed(
yield order.to_sse_event(); yield order.to_sse_event();
} }
Ok(()) = rx_cfds.changed() => { Ok(()) = rx_cfds.changed() => {
let cfds_with_price = CfdsWithCurrentPrice { yield CfdsWithAuxData::new(&rx_cfds, &rx_quote, &rx_settlements, Role::Maker).to_sse_event();
cfds: rx_cfds.borrow().clone(), }
current_price: quote.for_maker(), Ok(()) = rx_settlements.changed() => {
}; yield CfdsWithAuxData::new(&rx_cfds, &rx_quote, &rx_settlements, Role::Maker).to_sse_event();
yield cfds_with_price.to_sse_event();
} }
Ok(()) = rx_quote.changed() => { Ok(()) = rx_quote.changed() => {
let quote = rx_quote.borrow().clone(); let quote = rx_quote.borrow().clone();
yield quote.to_sse_event(); yield quote.to_sse_event();
let cfds_with_price = CfdsWithCurrentPrice { yield CfdsWithAuxData::new(&rx_cfds, &rx_quote, &rx_settlements, Role::Maker).to_sse_event();
cfds: rx_cfds.borrow().clone(),
current_price: quote.for_maker(),
};
yield cfds_with_price.to_sse_event();
} }
} }
} }
@ -133,18 +126,30 @@ pub async fn post_cfd_action(
_auth: Authenticated, _auth: Authenticated,
) -> Result<status::Accepted<()>, status::BadRequest<String>> { ) -> Result<status::Accepted<()>, status::BadRequest<String>> {
match action { match action {
CfdAction::Accept => { CfdAction::AcceptOrder => {
cfd_actor_address cfd_actor_address
.do_send_async(maker_cfd::AcceptOrder { order_id: id }) .do_send_async(maker_cfd::AcceptOrder { order_id: id })
.await .await
.expect("actor to always be available"); .expect("actor to always be available");
} }
CfdAction::Reject => { CfdAction::RejectOrder => {
cfd_actor_address cfd_actor_address
.do_send_async(maker_cfd::RejectOrder { order_id: id }) .do_send_async(maker_cfd::RejectOrder { order_id: id })
.await .await
.expect("actor to always be available"); .expect("actor to always be available");
} }
CfdAction::AcceptSettlement => {
cfd_actor_address
.do_send_async(maker_cfd::AcceptSettlement { order_id: id })
.await
.expect("actor to always be available");
}
CfdAction::RejectSettlement => {
cfd_actor_address
.do_send_async(maker_cfd::RejectSettlement { order_id: id })
.await
.expect("actor to always be available");
}
CfdAction::Commit => { CfdAction::Commit => {
cfd_actor_address cfd_actor_address
.do_send_async(maker_cfd::Commit { order_id: id }) .do_send_async(maker_cfd::Commit { order_id: id })

33
daemon/src/routes_taker.rs

@ -1,7 +1,7 @@
use crate::model::cfd::{calculate_buy_margin, Cfd, Order, OrderId}; use crate::model::cfd::{calculate_buy_margin, Cfd, Order, OrderId, Role, SettlementProposals};
use crate::model::{Leverage, Usd, WalletInfo}; use crate::model::{Leverage, Usd, WalletInfo};
use crate::routes::EmbeddedFileExt; use crate::routes::EmbeddedFileExt;
use crate::to_sse_event::{CfdAction, CfdsWithCurrentPrice, ToSseEvent}; use crate::to_sse_event::{CfdAction, CfdsWithAuxData, ToSseEvent};
use crate::{bitmex_price_feed, taker_cfd}; use crate::{bitmex_price_feed, taker_cfd};
use bdk::bitcoin::Amount; use bdk::bitcoin::Amount;
use rocket::http::{ContentType, Status}; use rocket::http::{ContentType, Status};
@ -23,11 +23,13 @@ pub async fn feed(
rx_order: &State<watch::Receiver<Option<Order>>>, rx_order: &State<watch::Receiver<Option<Order>>>,
rx_wallet: &State<watch::Receiver<WalletInfo>>, rx_wallet: &State<watch::Receiver<WalletInfo>>,
rx_quote: &State<watch::Receiver<bitmex_price_feed::Quote>>, rx_quote: &State<watch::Receiver<bitmex_price_feed::Quote>>,
rx_settlements: &State<watch::Receiver<SettlementProposals>>,
) -> EventStream![] { ) -> EventStream![] {
let mut rx_cfds = rx_cfds.inner().clone(); let mut rx_cfds = rx_cfds.inner().clone();
let mut rx_order = rx_order.inner().clone(); let mut rx_order = rx_order.inner().clone();
let mut rx_wallet = rx_wallet.inner().clone(); let mut rx_wallet = rx_wallet.inner().clone();
let mut rx_quote = rx_quote.inner().clone(); let mut rx_quote = rx_quote.inner().clone();
let mut rx_settlements = rx_settlements.inner().clone();
EventStream! { EventStream! {
let wallet_info = rx_wallet.borrow().clone(); let wallet_info = rx_wallet.borrow().clone();
@ -39,11 +41,7 @@ pub async fn feed(
let quote = rx_quote.borrow().clone(); let quote = rx_quote.borrow().clone();
yield quote.to_sse_event(); yield quote.to_sse_event();
let cfds_with_price = CfdsWithCurrentPrice { yield CfdsWithAuxData::new(&rx_cfds, &rx_quote, &rx_settlements, Role::Taker).to_sse_event();
cfds: rx_cfds.borrow().clone(),
current_price: quote.for_maker(),
};
yield cfds_with_price.to_sse_event();
loop{ loop{
select! { select! {
@ -56,20 +54,15 @@ pub async fn feed(
yield order.to_sse_event(); yield order.to_sse_event();
} }
Ok(()) = rx_cfds.changed() => { Ok(()) = rx_cfds.changed() => {
let cfds_with_price = CfdsWithCurrentPrice { yield CfdsWithAuxData::new(&rx_cfds, &rx_quote, &rx_settlements, Role::Taker).to_sse_event();
cfds: rx_cfds.borrow().clone(), }
current_price: quote.for_maker(), Ok(()) = rx_settlements.changed() => {
}; yield CfdsWithAuxData::new(&rx_cfds, &rx_quote, &rx_settlements, Role::Taker).to_sse_event();
yield cfds_with_price.to_sse_event();
} }
Ok(()) = rx_quote.changed() => { Ok(()) = rx_quote.changed() => {
let quote = rx_quote.borrow().clone(); let quote = rx_quote.borrow().clone();
yield quote.to_sse_event(); yield quote.to_sse_event();
let cfds_with_price = CfdsWithCurrentPrice { yield CfdsWithAuxData::new(&rx_cfds, &rx_quote, &rx_settlements, Role::Taker).to_sse_event();
cfds: rx_cfds.borrow().clone(),
current_price: quote.for_maker(),
};
yield cfds_with_price.to_sse_event();
} }
} }
} }
@ -104,10 +97,12 @@ pub async fn post_cfd_action(
quote_updates: &State<watch::Receiver<bitmex_price_feed::Quote>>, quote_updates: &State<watch::Receiver<bitmex_price_feed::Quote>>,
) -> Result<status::Accepted<()>, status::BadRequest<String>> { ) -> Result<status::Accepted<()>, status::BadRequest<String>> {
match action { match action {
CfdAction::Accept | CfdAction::Reject => { CfdAction::AcceptOrder
| CfdAction::RejectOrder
| CfdAction::AcceptSettlement
| CfdAction::RejectSettlement => {
return Err(status::BadRequest(None)); return Err(status::BadRequest(None));
} }
CfdAction::Commit => { CfdAction::Commit => {
cfd_actor_address cfd_actor_address
.do_send_async(taker_cfd::Commit { order_id: id }) .do_send_async(taker_cfd::Commit { order_id: id })

6
daemon/src/taker.rs

@ -1,4 +1,5 @@
use crate::db::load_all_cfds; use crate::db::load_all_cfds;
use crate::model::cfd::SettlementProposals;
use crate::model::WalletInfo; use crate::model::WalletInfo;
use crate::wallet::Wallet; use crate::wallet::Wallet;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
@ -10,6 +11,7 @@ use model::cfd::Order;
use rocket::fairing::AdHoc; use rocket::fairing::AdHoc;
use rocket_db_pools::Database; use rocket_db_pools::Database;
use seed::Seed; use seed::Seed;
use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::thread::sleep; use std::thread::sleep;
@ -106,6 +108,8 @@ async fn main() -> Result<()> {
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None); let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info); let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info);
let (settlement_feed_sender, settlement_feed_receiver) =
watch::channel::<SettlementProposals>(SettlementProposals::Outgoing(HashMap::new()));
let (read, write) = loop { let (read, write) = loop {
let socket = tokio::net::TcpSocket::new_v4()?; let socket = tokio::net::TcpSocket::new_v4()?;
@ -130,6 +134,7 @@ async fn main() -> Result<()> {
rocket::custom(figment) rocket::custom(figment)
.manage(order_feed_receiver) .manage(order_feed_receiver)
.manage(wallet_feed_receiver) .manage(wallet_feed_receiver)
.manage(settlement_feed_receiver)
.manage(quote_updates) .manage(quote_updates)
.attach(Db::init()) .attach(Db::init())
.attach(AdHoc::try_on_ignite( .attach(AdHoc::try_on_ignite(
@ -177,6 +182,7 @@ async fn main() -> Result<()> {
schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle), schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle),
cfd_feed_sender, cfd_feed_sender,
order_feed_sender, order_feed_sender,
settlement_feed_sender,
send_to_maker, send_to_maker,
monitor_actor_address.clone(), monitor_actor_address.clone(),
oracle_actor_address, oracle_actor_address,

39
daemon/src/taker_cfd.rs

@ -5,6 +5,7 @@ use crate::db::{
}; };
use crate::model::cfd::{ use crate::model::cfd::{
Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId, Origin, Role, Cfd, CfdState, CfdStateChangeEvent, CfdStateCommon, Dlc, Order, OrderId, Origin, Role,
SettlementProposal, SettlementProposals,
}; };
use crate::model::Usd; use crate::model::Usd;
use crate::monitor::{self, MonitorParams}; use crate::monitor::{self, MonitorParams};
@ -16,6 +17,7 @@ use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::secp256k1::schnorrsig;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::{future, SinkExt}; use futures::{future, SinkExt};
use std::collections::HashMap;
use std::time::SystemTime; use std::time::SystemTime;
use tokio::sync::watch; use tokio::sync::watch;
use xtra::prelude::*; use xtra::prelude::*;
@ -57,11 +59,13 @@ pub struct Actor {
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>, order_feed_actor_inbox: watch::Sender<Option<Order>>,
settlements_feed_sender: watch::Sender<SettlementProposals>,
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>, send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>,
monitor_actor: Address<monitor::Actor<Actor>>, monitor_actor: Address<monitor::Actor<Actor>>,
setup_state: SetupState, setup_state: SetupState,
latest_announcement: Option<oracle::Announcement>, latest_announcement: Option<oracle::Announcement>,
_oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>, _oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>,
current_settlement_proposals: HashMap<OrderId, SettlementProposal>,
} }
impl Actor { impl Actor {
@ -72,6 +76,7 @@ impl Actor {
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>, order_feed_actor_inbox: watch::Sender<Option<Order>>,
settlements_feed_sender: watch::Sender<SettlementProposals>,
send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>, send_to_maker: Address<send_to_socket::Actor<wire::TakerToMaker>>,
monitor_actor: Address<monitor::Actor<Actor>>, monitor_actor: Address<monitor::Actor<Actor>>,
oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>, oracle_actor: Address<oracle::Actor<Actor, monitor::Actor<Actor>>>,
@ -82,14 +87,24 @@ impl Actor {
oracle_pk, oracle_pk,
cfd_feed_actor_inbox, cfd_feed_actor_inbox,
order_feed_actor_inbox, order_feed_actor_inbox,
settlements_feed_sender,
send_to_maker, send_to_maker,
monitor_actor, monitor_actor,
setup_state: SetupState::None, setup_state: SetupState::None,
latest_announcement: None, latest_announcement: None,
_oracle_actor: oracle_actor, _oracle_actor: oracle_actor,
current_settlement_proposals: HashMap::new(),
} }
} }
fn send_current_settlement_proposals(&self) -> Result<()> {
Ok(self
.settlements_feed_sender
.send(SettlementProposals::Outgoing(
self.current_settlement_proposals.clone(),
))?)
}
async fn handle_take_offer(&mut self, order_id: OrderId, quantity: Usd) -> Result<()> { async fn handle_take_offer(&mut self, order_id: OrderId, quantity: Usd) -> Result<()> {
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
@ -126,14 +141,28 @@ impl Actor {
let mut conn = self.db.acquire().await?; let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let settlement = cfd.calculate_settlement(current_price)?; let proposal = cfd.calculate_settlement(current_price)?;
if self
.current_settlement_proposals
.contains_key(&proposal.order_id)
{
anyhow::bail!(
"Settlement proposal for order id {} already present",
order_id
)
}
self.current_settlement_proposals
.insert(proposal.order_id, proposal.clone());
self.send_current_settlement_proposals()?;
self.send_to_maker self.send_to_maker
.do_send_async(wire::TakerToMaker::ProposeSettlement { .do_send_async(wire::TakerToMaker::ProposeSettlement {
order_id: settlement.order_id, order_id: proposal.order_id,
timestamp: settlement.timestamp, timestamp: proposal.timestamp,
taker: settlement.taker, taker: proposal.taker,
maker: settlement.maker, maker: proposal.maker,
}) })
.await?; .await?;
Ok(()) Ok(())

119
daemon/src/to_sse_event.rs

@ -1,4 +1,4 @@
use crate::model::cfd::{OrderId, Role}; use crate::model::cfd::{OrderId, Role, SettlementProposals};
use crate::model::{Leverage, Position, TradingPair, Usd}; use crate::model::{Leverage, Position, TradingPair, Usd};
use crate::{bitmex_price_feed, model}; use crate::{bitmex_price_feed, model};
use bdk::bitcoin::{Amount, SignedAmount}; use bdk::bitcoin::{Amount, SignedAmount};
@ -7,6 +7,7 @@ use rocket::response::stream::Event;
use rust_decimal::Decimal; use rust_decimal::Decimal;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::watch;
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct Cfd { pub struct Cfd {
@ -33,12 +34,14 @@ pub struct Cfd {
} }
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "camelCase")]
pub enum CfdAction { pub enum CfdAction {
Accept, AcceptOrder,
Reject, RejectOrder,
Commit, Commit,
Settle, Settle,
AcceptSettlement,
RejectSettlement,
} }
impl<'v> FromParam<'v> for CfdAction { impl<'v> FromParam<'v> for CfdAction {
@ -61,6 +64,8 @@ pub enum CfdState {
Open, Open,
PendingCommit, PendingCommit,
OpenCommitted, OpenCommitted,
IncomingSettlementProposal,
OutgoingSettlementProposal,
MustRefund, MustRefund,
Refunded, Refunded,
SetupFailed, SetupFailed,
@ -89,13 +94,76 @@ pub trait ToSseEvent {
fn to_sse_event(&self) -> Event; fn to_sse_event(&self) -> Event;
} }
/// Intermediate struct to able to piggy-back current price along with cfds /// Intermediate struct to able to piggy-back additional information along with
pub struct CfdsWithCurrentPrice { /// cfds, so we can avoid a 1:1 mapping between the states in the model and seen
/// by UI
pub struct CfdsWithAuxData {
pub cfds: Vec<model::cfd::Cfd>, pub cfds: Vec<model::cfd::Cfd>,
pub current_price: Usd, pub current_price: Usd,
pub settlement_proposals: SettlementProposals,
} }
impl ToSseEvent for CfdsWithCurrentPrice { enum SettlementProposalStatus {
Incoming,
Outgoing,
None,
}
impl CfdsWithAuxData {
pub fn new(
rx_cfds: &watch::Receiver<Vec<model::cfd::Cfd>>,
rx_quote: &watch::Receiver<bitmex_price_feed::Quote>,
rx_settlement: &watch::Receiver<SettlementProposals>,
role: Role,
) -> Self {
let quote = rx_quote.borrow().clone();
let current_price = match role {
Role::Maker => quote.for_maker(),
Role::Taker => quote.for_taker(),
};
let settlement_proposals = rx_settlement.borrow().clone();
// Test whether the correct settlement proposals were sent
match settlement_proposals {
SettlementProposals::Incoming(_) => {
if role == Role::Taker {
panic!("Taker should never receive incoming settlement proposals");
}
}
SettlementProposals::Outgoing(_) => {
if role == Role::Maker {
panic!("Maker should never receive outgoing settlement proposals");
}
}
}
CfdsWithAuxData {
cfds: rx_cfds.borrow().clone(),
current_price,
settlement_proposals,
}
}
/// Check whether given CFD has any active settlement proposals
fn settlement_proposal_status(&self, cfd: &model::cfd::Cfd) -> SettlementProposalStatus {
match &self.settlement_proposals {
SettlementProposals::Incoming(proposals) => {
if proposals.contains_key(&cfd.order.id) {
return SettlementProposalStatus::Incoming;
}
}
SettlementProposals::Outgoing(proposals) => {
if proposals.contains_key(&cfd.order.id) {
return SettlementProposalStatus::Outgoing;
}
}
}
SettlementProposalStatus::None
}
}
impl ToSseEvent for CfdsWithAuxData {
// TODO: This conversion can fail, we might want to change the API // TODO: This conversion can fail, we might want to change the API
fn to_sse_event(&self) -> Event { fn to_sse_event(&self) -> Event {
let current_price = self.current_price; let current_price = self.current_price;
@ -113,6 +181,8 @@ impl ToSseEvent for CfdsWithCurrentPrice {
(SignedAmount::ZERO, Decimal::ZERO.into()) (SignedAmount::ZERO, Decimal::ZERO.into())
}); });
let state = to_cfd_state(&cfd.state, self.settlement_proposal_status(cfd));
Cfd { Cfd {
order_id: cfd.order.id, order_id: cfd.order.id,
initial_price: cfd.order.price, initial_price: cfd.order.price,
@ -123,8 +193,8 @@ impl ToSseEvent for CfdsWithCurrentPrice {
quantity_usd: cfd.quantity_usd, quantity_usd: cfd.quantity_usd,
profit_btc, profit_btc,
profit_in_percent: profit_in_percent.to_string(), profit_in_percent: profit_in_percent.to_string(),
state: cfd.state.clone().into(), state: state.clone(),
actions: actions_for_state(cfd.state.clone(), cfd.role()), actions: available_actions(state, cfd.role()),
state_transition_timestamp: cfd state_transition_timestamp: cfd
.state .state
.get_transition_timestamp() .get_transition_timestamp()
@ -186,9 +256,14 @@ impl ToSseEvent for model::WalletInfo {
} }
} }
impl From<model::cfd::CfdState> for CfdState { fn to_cfd_state(
fn from(cfd_state: model::cfd::CfdState) -> Self { cfd_state: &model::cfd::CfdState,
match cfd_state { proposal_status: SettlementProposalStatus,
) -> CfdState {
match proposal_status {
SettlementProposalStatus::Incoming => CfdState::IncomingSettlementProposal,
SettlementProposalStatus::Outgoing => CfdState::OutgoingSettlementProposal,
SettlementProposalStatus::None => match cfd_state {
model::cfd::CfdState::OutgoingOrderRequest { .. } => CfdState::OutgoingOrderRequest, model::cfd::CfdState::OutgoingOrderRequest { .. } => CfdState::OutgoingOrderRequest,
model::cfd::CfdState::IncomingOrderRequest { .. } => CfdState::IncomingOrderRequest, model::cfd::CfdState::IncomingOrderRequest { .. } => CfdState::IncomingOrderRequest,
model::cfd::CfdState::Accepted { .. } => CfdState::Accepted, model::cfd::CfdState::Accepted { .. } => CfdState::Accepted,
@ -201,7 +276,7 @@ impl From<model::cfd::CfdState> for CfdState {
model::cfd::CfdState::Refunded { .. } => CfdState::Refunded, model::cfd::CfdState::Refunded { .. } => CfdState::Refunded,
model::cfd::CfdState::SetupFailed { .. } => CfdState::SetupFailed, model::cfd::CfdState::SetupFailed { .. } => CfdState::SetupFailed,
model::cfd::CfdState::PendingCommit { .. } => CfdState::PendingCommit, model::cfd::CfdState::PendingCommit { .. } => CfdState::PendingCommit,
} },
} }
} }
@ -230,15 +305,23 @@ fn into_unix_secs(time: SystemTime) -> u64 {
.as_secs() .as_secs()
} }
fn actions_for_state(state: model::cfd::CfdState, role: Role) -> Vec<CfdAction> { fn available_actions(state: CfdState, role: Role) -> Vec<CfdAction> {
match (state, role) { match (state, role) {
(model::cfd::CfdState::IncomingOrderRequest { .. }, Role::Maker) => { (CfdState::IncomingOrderRequest { .. }, Role::Maker) => {
vec![CfdAction::Accept, CfdAction::Reject] vec![CfdAction::AcceptOrder, CfdAction::RejectOrder]
}
(CfdState::IncomingSettlementProposal { .. }, Role::Maker) => {
vec![CfdAction::AcceptSettlement, CfdAction::RejectSettlement]
}
// If there is an outgoing settlement proposal already, user can't
// initiate new one
(CfdState::OutgoingSettlementProposal { .. }, Role::Maker) => {
vec![CfdAction::Commit]
} }
(model::cfd::CfdState::Open { .. }, Role::Taker) => { (CfdState::Open { .. }, Role::Taker) => {
vec![CfdAction::Commit, CfdAction::Settle] vec![CfdAction::Commit, CfdAction::Settle]
} }
(model::cfd::CfdState::Open { .. }, Role::Maker) => vec![CfdAction::Commit], (CfdState::Open { .. }, Role::Maker) => vec![CfdAction::Commit],
_ => vec![], _ => vec![],
} }
} }

4
frontend/src/components/Types.tsx

@ -137,8 +137,8 @@ export class State {
} }
export enum Action { export enum Action {
ACCEPT = "accept", ACCEPT_ORDER = "acceptOrder",
REJECT = "reject", REJECT_ORDER = "rejectOrder",
COMMIT = "commit", COMMIT = "commit",
SETTLE = "settle", SETTLE = "settle",
} }

8
frontend/src/components/cfdtables/CfdTable.tsx

@ -187,9 +187,9 @@ export function CfdTable(
function iconForAction(action: Action): any { function iconForAction(action: Action): any {
switch (action) { switch (action) {
case Action.ACCEPT: case Action.ACCEPT_ORDER:
return <CheckIcon />; return <CheckIcon />;
case Action.REJECT: case Action.REJECT_ORDER:
return <CloseIcon />; return <CloseIcon />;
case Action.COMMIT: case Action.COMMIT:
return <WarningIcon />; return <WarningIcon />;
@ -200,9 +200,9 @@ function iconForAction(action: Action): any {
function colorSchemaForAction(action: Action): string { function colorSchemaForAction(action: Action): string {
switch (action) { switch (action) {
case Action.ACCEPT: case Action.ACCEPT_ORDER:
return "green"; return "green";
case Action.REJECT: case Action.REJECT_ORDER:
return "red"; return "red";
case Action.COMMIT: case Action.COMMIT:
return "red"; return "red";

Loading…
Cancel
Save