Browse Source

Roll-out xtra framework usage in maker

no-contract-setup-message
Mariusz Klochowicz 3 years ago
parent
commit
46e9b96fbf
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 193
      Cargo.lock
  2. 2
      daemon/Cargo.toml
  3. 75
      daemon/src/maker.rs
  4. 637
      daemon/src/maker_cfd_actor.rs
  5. 236
      daemon/src/maker_inc_connections_actor.rs
  6. 12
      daemon/src/routes_maker.rs

193
Cargo.lock

@ -116,6 +116,18 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "barrage"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "756c265ddc2c445724b688455c42ec724590635fa71b47eaff7b260dc95fa7c9"
dependencies = [
"concurrent-queue",
"event-listener",
"loom 0.3.6",
"spinny",
]
[[package]]
name = "base64"
version = "0.10.1"
@ -259,6 +271,21 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
[[package]]
name = "cache-padded"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba"
[[package]]
name = "catty"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d231959e9442d4c614ecc961178c44fce85d494484281d8055167d87993e61b"
dependencies = [
"spin 0.7.1",
]
[[package]]
name = "cc"
version = "1.0.70"
@ -280,6 +307,12 @@ dependencies = [
"thiserror",
]
[[package]]
name = "cfg-if"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
name = "cfg-if"
version = "1.0.0"
@ -338,6 +371,15 @@ dependencies = [
"bitflags",
]
[[package]]
name = "concurrent-queue"
version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3"
dependencies = [
"cache-padded",
]
[[package]]
name = "cookie"
version = "0.16.0-rc.1"
@ -379,7 +421,7 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81156fece84ab6a9f2afdb109ce3ae577e42b1228441eded99bd77f627953b1a"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
]
[[package]]
@ -388,7 +430,7 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
"crossbeam-utils",
]
@ -398,7 +440,7 @@ version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
"crossbeam-utils",
"lazy_static",
"memoffset",
@ -411,7 +453,7 @@ version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b10ddc024425c88c2ad148c1b0fd53f4c6d38db9697c9f1588381212fa657c9"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
"crossbeam-utils",
]
@ -421,7 +463,7 @@ version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
"lazy_static",
]
@ -440,6 +482,7 @@ name = "daemon"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"atty",
"bdk",
"cfd_protocol",
@ -466,6 +509,7 @@ dependencies = [
"tracing",
"tracing-subscriber",
"uuid",
"xtra",
]
[[package]]
@ -582,9 +626,15 @@ version = "0.8.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80df024fbc5ac80f87dfef0d9f5209a252f2a497f7f42944cff24d8253cac065"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
]
[[package]]
name = "event-listener"
version = "2.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59"
[[package]]
name = "figment"
version = "0.10.6"
@ -599,6 +649,18 @@ dependencies = [
"version_check",
]
[[package]]
name = "flume"
version = "0.10.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24c3fd473b3a903a62609e413ed7538f99e10b665ecb502b5e481a95283f8ab4"
dependencies = [
"futures-core",
"futures-sink",
"pin-project",
"spin 0.9.2",
]
[[package]]
name = "fnv"
version = "1.0.7"
@ -715,6 +777,12 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d3d00f4eddb73e498a54394f228cd55853bdf059259e8e7bc6e69d408892e99"
[[package]]
name = "futures-timer"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
[[package]]
name = "futures-util"
version = "0.3.17"
@ -745,6 +813,19 @@ dependencies = [
"byteorder",
]
[[package]]
name = "generator"
version = "0.6.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "061d3be1afec479d56fa3bd182bf966c7999ec175fcfdb87ac14d417241366c6"
dependencies = [
"cc",
"libc",
"log",
"rustversion",
"winapi 0.3.9",
]
[[package]]
name = "generator"
version = "0.7.0"
@ -774,7 +855,7 @@ version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
"js-sys",
"libc",
"wasi 0.9.0+wasi-snapshot-preview1",
@ -787,7 +868,7 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
"libc",
"wasi 0.10.2+wasi-snapshot-preview1",
]
@ -977,7 +1058,7 @@ version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bee0328b1209d157ef001c94dd85b4f8f64139adb0eac2659f4b08382b2f474d"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
]
[[package]]
@ -1042,7 +1123,19 @@ version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
]
[[package]]
name = "loom"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0e8460f2f2121162705187214720353c517b97bdfb3494c0b1e33d83ebe4bed"
dependencies = [
"cfg-if 0.1.10",
"futures-util",
"generator 0.6.25",
"scoped-tls",
]
[[package]]
@ -1051,8 +1144,8 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2111607c723d7857e0d8299d5ce7a0bf4b844d3e44f8de136b13da513eaf8fc4"
dependencies = [
"cfg-if",
"generator",
"cfg-if 1.0.0",
"generator 0.7.0",
"scoped-tls",
"serde",
"serde_json",
@ -1241,7 +1334,7 @@ version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
"instant",
"libc",
"redox_syscall",
@ -1278,6 +1371,26 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pin-project"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.7"
@ -1296,6 +1409,12 @@ version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c"
[[package]]
name = "pollster"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb20dcc30536a1508e75d47dd0e399bb2fe7354dcf35cda9127f2bf1ed92e30e"
[[package]]
name = "ppv-lite86"
version = "0.2.10"
@ -2012,7 +2131,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b69f9a4c9740d74c5baa3fd2e547f9525fa8088a8a958e0ca2409a514e33f5fa"
dependencies = [
"block-buffer",
"cfg-if",
"cfg-if 1.0.0",
"cpufeatures",
"digest",
"opaque-debug",
@ -2092,11 +2211,31 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spin"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13287b4da9d1207a4f4929ac390916d64eacfe236a487e9a9f5b3be392be5162"
[[package]]
name = "spin"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "511254be0c5bcf062b019a6c89c01a664aa359ded62f78aa72c6fc137c0590e5"
dependencies = [
"lock_api",
]
[[package]]
name = "spinny"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66f5e2008c6e3864566a0dfa4717946ebdbc7555810b7c0c9266fd41c6d7a2a4"
dependencies = [
"lock_api",
"loom 0.3.6",
"once_cell",
]
[[package]]
name = "sqlformat"
@ -2215,7 +2354,7 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87cf4f5369e6d3044b5e365c9690f451516ac8f0954084622b49ea3fde2f6de5"
dependencies = [
"loom",
"loom 0.5.1",
]
[[package]]
@ -2257,7 +2396,7 @@ version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
"libc",
"rand 0.8.4",
"redox_syscall",
@ -2431,7 +2570,7 @@ version = "0.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84f96e095c0c82419687c20ddf5cb3eadb61f4e1405923c9dc8e53a1adacbda8"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
@ -2670,7 +2809,7 @@ version = "0.2.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e68338db6becec24d3c7977b5bf8a48be992c934b5d07177e3931f5dc9b076c"
dependencies = [
"cfg-if",
"cfg-if 1.0.0",
"wasm-bindgen-macro",
]
@ -2819,6 +2958,24 @@ dependencies = [
"winapi-build",
]
[[package]]
name = "xtra"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd0133cb26accfd34360ab6b8fe9745d8907dcaee0cd7f8191dee4fd884e88d0"
dependencies = [
"async-trait",
"barrage",
"catty",
"flume",
"futures-core",
"futures-sink",
"futures-timer",
"futures-util",
"pollster",
"tokio",
]
[[package]]
name = "yansi"
version = "0.5.0"

2
daemon/Cargo.toml

@ -5,6 +5,7 @@ edition = "2018"
[dependencies]
anyhow = "1"
async-trait = "0.1.51"
atty = "0.2"
bdk = { git = "https://github.com/bitcoindevkit/bdk/" }
cfd_protocol = { path = "../cfd_protocol" }
@ -30,6 +31,7 @@ tokio-util = { version = "0.6", features = ["codec"] }
tracing = { version = "0.1" }
tracing-subscriber = { version = "0.2", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log", "json"] }
uuid = { version = "0.8", features = ["serde", "v4"] }
xtra = { version = "0.5", features = ["with-tokio-1"] }
[[bin]]
name = "taker"

75
daemon/src/maker.rs

@ -1,5 +1,6 @@
use crate::auth::MAKER_USERNAME;
use crate::maker_cfd_actor::Command;
use crate::maker_inc_connections_actor::in_taker_messages;
use crate::model::TakerId;
use crate::seed::Seed;
use crate::wallet::Wallet;
use anyhow::{Context, Result};
@ -12,8 +13,10 @@ use rocket::fairing::AdHoc;
use rocket_db_pools::Database;
use std::path::PathBuf;
use std::time::Duration;
use tokio::sync::{mpsc, watch};
use tokio::sync::watch;
use tracing_subscriber::filter::LevelFilter;
use xtra::prelude::*;
use xtra::spawn::TokioGlobalSpawnExt;
mod auth;
mod db;
@ -137,23 +140,56 @@ async fn main() -> Result<()> {
None => return Err(rocket),
};
let (connections_actor_inbox_sender, connections_actor_inbox_recv) =
mpsc::unbounded_channel();
let (cfd_maker_actor, cfd_maker_actor_inbox) = maker_cfd_actor::new(
let cfd_maker_actor_inbox = maker_cfd_actor::MakerCfdActor::new(
db,
wallet,
schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle),
connections_actor_inbox_sender,
cfd_feed_sender,
order_feed_sender,
wallet_feed_sender,
);
let connections_actor = maker_inc_connections_actor::new(
listener,
cfd_maker_actor_inbox.clone(),
connections_actor_inbox_recv,
);
)
.await
.create(None)
.spawn_global();
let maker_inc_connections_address =
maker_inc_connections_actor::MakerIncConnectionsActor::new(
cfd_maker_actor_inbox.clone(),
)
.create(None)
.spawn_global();
tokio::spawn({
let cfd_maker_actor_inbox = cfd_maker_actor_inbox.clone();
let maker_inc_connections_address = maker_inc_connections_address.clone();
async move {
loop {
if let Ok((socket, remote_addr)) = listener.accept().await {
tracing::info!("Connected to {}", remote_addr);
let taker_id = TakerId::default();
let (read, write) = socket.into_split();
let in_taker_actor = in_taker_messages(
read,
cfd_maker_actor_inbox.clone(),
taker_id,
);
let (out_msg_actor, out_msg_actor_inbox) =
send_wire_message_actor::new::<wire::MakerToTaker>(write);
tokio::spawn(in_taker_actor);
tokio::spawn(out_msg_actor);
maker_inc_connections_address.do_send_async(
maker_inc_connections_actor::NewTakerOnline {
taker_id,
out_msg_actor_inbox,
},
);
};
}
}
});
// consecutive wallet syncs handled by task that triggers sync
let wallet_sync_interval = Duration::from_secs(10);
@ -161,14 +197,21 @@ async fn main() -> Result<()> {
let cfd_actor_inbox = cfd_maker_actor_inbox.clone();
async move {
loop {
cfd_actor_inbox.send(Command::SyncWallet).unwrap();
cfd_actor_inbox
.do_send_async(maker_cfd_actor::SyncWallet)
.await
.unwrap();
tokio::time::sleep(wallet_sync_interval).await;
}
}
});
tokio::spawn(cfd_maker_actor);
tokio::spawn(connections_actor);
cfd_maker_actor_inbox
.do_send_async(maker_cfd_actor::Initialized(
maker_inc_connections_address.clone(),
))
.await
.expect("not to fail after actors were initialized");
Ok(rocket.manage(cfd_maker_actor_inbox))
},

637
daemon/src/maker_cfd_actor.rs

@ -2,281 +2,412 @@ use crate::db::{
insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds,
load_cfd_by_order_id, load_order_by_id,
};
use crate::maker_inc_connections_actor::{MakerIncConnectionsActor, TakerCommand};
use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId};
use crate::model::{TakerId, Usd, WalletInfo};
use crate::wallet::Wallet;
use crate::wire::SetupMsg;
use crate::{maker_cfd_actor, maker_inc_connections_actor, setup_contract_actor};
use crate::{maker_inc_connections_actor, setup_contract_actor};
use anyhow::{Context as AnyhowContext, Result};
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
use futures::Future;
use std::time::SystemTime;
use tokio::sync::{mpsc, watch};
use xtra::prelude::*;
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum Command {
SyncWallet,
TakeOrder {
taker_id: TakerId,
order_id: OrderId,
quantity: Usd,
},
NewOrder(Order),
StartContractSetup {
taker_id: TakerId,
order_id: OrderId,
},
NewTakerOnline {
id: TakerId,
},
IncProtocolMsg(SetupMsg),
CfdSetupCompleted {
order_id: OrderId,
dlc: Dlc,
},
pub struct Initialized(pub Address<MakerIncConnectionsActor>);
impl Message for Initialized {
type Result = Result<()>;
}
pub struct TakeOrder {
pub taker_id: TakerId,
pub order_id: OrderId,
pub quantity: Usd,
}
impl Message for TakeOrder {
type Result = Result<()>;
}
pub fn new(
pub struct NewOrder(pub Order);
impl Message for NewOrder {
type Result = Result<()>;
}
pub struct StartContractSetup {
pub taker_id: TakerId,
pub order_id: OrderId,
}
impl Message for StartContractSetup {
type Result = Result<()>;
}
pub struct NewTakerOnline {
pub id: TakerId,
}
impl Message for NewTakerOnline {
type Result = Result<()>;
}
pub struct IncProtocolMsg(pub SetupMsg);
impl Message for IncProtocolMsg {
type Result = Result<()>;
}
pub struct CfdSetupCompleted {
pub order_id: OrderId,
pub dlc: Dlc,
}
impl Message for CfdSetupCompleted {
type Result = Result<()>;
}
pub struct SyncWallet;
impl Message for SyncWallet {
type Result = Result<()>;
}
pub struct MakerCfdActor {
db: sqlx::SqlitePool,
wallet: Wallet,
oracle_pk: schnorrsig::PublicKey,
takers: mpsc::UnboundedSender<maker_inc_connections_actor::Command>,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_sender: watch::Sender<Option<Order>>,
wallet_feed_sender: watch::Sender<WalletInfo>,
) -> (
impl Future<Output = ()>,
mpsc::UnboundedSender<maker_cfd_actor::Command>,
) {
let (sender, mut receiver) = mpsc::unbounded_channel();
takers: Option<Address<MakerIncConnectionsActor>>,
current_order_id: Option<OrderId>,
current_contract_setup: Option<mpsc::UnboundedSender<SetupMsg>>,
// TODO: Move the contract setup into a dedicated actor and send messages to that actor that
// manages the state instead of this ugly buffer
let mut current_contract_setup = None;
let mut contract_setup_message_buffer = vec![];
let mut current_order_id = None;
let actor = {
let sender = sender.clone();
async move {
// populate the CFD feed with existing CFDs
let mut conn = db.acquire().await.unwrap();
cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await.unwrap())
.unwrap();
while let Some(message) = receiver.recv().await {
match message {
maker_cfd_actor::Command::SyncWallet => {
let wallet_info = wallet.sync().await.unwrap();
wallet_feed_sender.send(wallet_info).unwrap();
}
maker_cfd_actor::Command::TakeOrder {
contract_setup_message_buffer: Vec<SetupMsg>,
}
impl xtra::Actor for MakerCfdActor {}
impl MakerCfdActor {
pub async fn new(
db: sqlx::SqlitePool,
wallet: Wallet,
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_sender: watch::Sender<Option<Order>>,
wallet_feed_sender: watch::Sender<WalletInfo>,
) -> Self {
let mut conn = db.acquire().await.unwrap();
// populate the CFD feed with existing CFDs
cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await.unwrap())
.unwrap();
Self {
db,
wallet,
oracle_pk,
cfd_feed_actor_inbox,
order_feed_sender,
wallet_feed_sender,
takers: None,
current_order_id: None,
current_contract_setup: None,
contract_setup_message_buffer: vec![],
}
}
async fn handle_new_order(&mut self, msg: NewOrder) -> Result<()> {
let order = msg.0;
// 1. Save to DB
let mut conn = self.db.acquire().await?;
insert_order(&order, &mut conn).await?;
// 2. Update actor state to current order
self.current_order_id.replace(order.id);
// 3. Notify UI via feed
self.order_feed_sender.send(Some(order.clone()))?;
// 4. Inform connected takers
self.takers()?
.do_send_async(maker_inc_connections_actor::BroadcastOrder(Some(order)))
.await?;
Ok(())
}
fn takers(&self) -> Result<&Address<MakerIncConnectionsActor>> {
self.takers
.as_ref()
.context("Maker inc connections actor to be initialised")
}
async fn handle_start_contract_setup(
&mut self,
msg: StartContractSetup,
ctx: &mut Context<Self>,
) -> Result<()> {
let StartContractSetup { taker_id, order_id } = msg;
tracing::info!("Starting contract setup");
// Kick-off the CFD protocol
let (sk, pk) = crate::keypair::new(&mut rand::thread_rng());
let mut conn = self.db.acquire().await?;
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let margin = cfd.margin()?;
let maker_params = self.wallet.build_party_params(margin, pk).await?;
let (actor, inbox) = setup_contract_actor::new(
{
let inbox = self.takers()?.clone();
move |msg| {
inbox.send(maker_inc_connections_actor::TakerMessage {
taker_id,
order_id,
quantity,
} => {
tracing::debug!(%taker_id, %quantity, %order_id,
"Taker wants to take an order"
);
let mut conn = db.acquire().await.unwrap();
// 1. Validate if order is still valid
let current_order = match current_order_id {
Some(current_order_id) if current_order_id == order_id => {
load_order_by_id(current_order_id, &mut conn).await.unwrap()
}
_ => {
takers
.send(maker_inc_connections_actor::Command::NotifyInvalidOrderId {
id: order_id,
taker_id,
})
.unwrap();
continue;
}
};
// 2. Insert CFD in DB
// TODO: Don't auto-accept, present to user in UI instead
let cfd = Cfd::new(
current_order.clone(),
quantity,
CfdState::Accepted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
);
insert_cfd(cfd, &mut conn).await.unwrap();
takers
.send(maker_inc_connections_actor::Command::NotifyOrderAccepted {
id: order_id,
taker_id,
})
.unwrap();
cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await.unwrap())
.unwrap();
// 3. Remove current order
current_order_id = None;
takers
.send(maker_inc_connections_actor::Command::BroadcastOrder(None))
.unwrap();
order_feed_sender.send(None).unwrap();
}
maker_cfd_actor::Command::NewOrder(order) => {
// 1. Save to DB
let mut conn = db.acquire().await.unwrap();
insert_order(&order, &mut conn).await.unwrap();
// 2. Update actor state to current order
current_order_id.replace(order.id);
// 3. Notify UI via feed
order_feed_sender.send(Some(order.clone())).unwrap();
// 4. Inform connected takers
takers
.send(maker_inc_connections_actor::Command::BroadcastOrder(Some(
order,
)))
.unwrap();
}
maker_cfd_actor::Command::NewTakerOnline { id: taker_id } => {
let mut conn = db.acquire().await.unwrap();
let current_order = match current_order_id {
Some(current_order_id) => {
Some(load_order_by_id(current_order_id, &mut conn).await.unwrap())
}
None => None,
};
takers
.send(maker_inc_connections_actor::Command::SendOrder {
order: current_order,
taker_id,
})
.unwrap();
}
maker_cfd_actor::Command::StartContractSetup { taker_id, order_id } => {
println!("CONTRACT SETUP");
// Kick-off the CFD protocol
let (sk, pk) = crate::keypair::new(&mut rand::thread_rng());
let cfd = load_cfd_by_order_id(order_id, &mut conn).await.unwrap();
let margin = cfd.margin().unwrap();
let maker_params = wallet.build_party_params(margin, pk).await.unwrap();
let (actor, inbox) = setup_contract_actor::new(
{
let inbox = takers.clone();
move |msg| {
inbox
.send(
maker_inc_connections_actor::Command::OutProtocolMsg {
taker_id,
msg,
},
)
.unwrap()
}
},
setup_contract_actor::OwnParams::Maker(maker_params),
sk,
oracle_pk,
cfd,
wallet.clone(),
);
current_contract_setup = Some(inbox.clone());
for msg in contract_setup_message_buffer.drain(..) {
inbox.send(msg).unwrap();
}
// TODO: Should we do this here or already earlier or after the spawn?
insert_new_cfd_state_by_order_id(
order_id,
CfdState::ContractSetup {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
&mut conn,
)
.await
.unwrap();
cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await.unwrap())
.unwrap();
tokio::spawn({
let sender = sender.clone();
async move {
sender
.send(Command::CfdSetupCompleted {
order_id,
dlc: actor.await,
})
.unwrap()
}
});
}
maker_cfd_actor::Command::IncProtocolMsg(msg) => {
let inbox = match &current_contract_setup {
None => {
contract_setup_message_buffer.push(msg);
continue;
}
Some(inbox) => inbox,
};
inbox.send(msg).unwrap();
}
maker_cfd_actor::Command::CfdSetupCompleted { order_id, dlc } => {
println!("Setup complete, publishing on chain now...");
current_contract_setup = None;
contract_setup_message_buffer = vec![];
insert_new_cfd_state_by_order_id(
order_id,
CfdState::PendingOpen {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc: dlc.clone(),
},
&mut conn,
)
.await
.unwrap();
cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await.unwrap())
.unwrap();
let txid = wallet.try_broadcast_transaction(dlc.lock).await.unwrap();
println!("Lock transaction published with txid {}", txid);
// TODO: tx monitoring, once confirmed with x blocks transition the Cfd to
// Open
}
command: TakerCommand::OutProtocolMsg { setup_msg: msg },
});
}
},
setup_contract_actor::OwnParams::Maker(maker_params),
sk,
self.oracle_pk,
cfd,
self.wallet.clone(),
);
self.current_contract_setup.replace(inbox.clone());
for msg in self.contract_setup_message_buffer.drain(..) {
inbox.send(msg).unwrap();
}
// TODO: Should we do this here or already earlier or after the spawn?
insert_new_cfd_state_by_order_id(
order_id,
CfdState::ContractSetup {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
&mut conn,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let address = ctx
.address()
.expect("actor to be able to give address to itself");
tokio::spawn(async move {
address
.do_send_async(CfdSetupCompleted {
order_id,
dlc: actor.await,
})
.await
});
Ok(())
}
async fn handle_new_taker_online(&mut self, msg: NewTakerOnline) -> Result<()> {
let mut conn = self.db.acquire().await?;
let current_order = match self.current_order_id {
Some(current_order_id) => Some(load_order_by_id(current_order_id, &mut conn).await?),
None => None,
};
self.takers()?
.do_send_async(maker_inc_connections_actor::TakerMessage {
taker_id: msg.id,
command: TakerCommand::SendOrder {
order: current_order,
},
})
.await?;
Ok(())
}
async fn handle_inc_protocol_msg(&mut self, msg: IncProtocolMsg) -> Result<()> {
let msg = msg.0;
let inbox = match &self.current_contract_setup {
None => {
self.contract_setup_message_buffer.push(msg);
return Ok(());
}
Some(inbox) => inbox,
};
inbox.send(msg)?;
Ok(())
}
async fn handle_sync_wallet(&mut self) -> Result<()> {
let wallet_info = self.wallet.sync().await?;
self.wallet_feed_sender.send(wallet_info)?;
Ok(())
}
}
#[async_trait]
impl Handler<Initialized> for MakerCfdActor {
async fn handle(&mut self, msg: Initialized, _ctx: &mut Context<Self>) -> Result<()> {
self.takers.replace(msg.0);
Ok(())
}
}
#[async_trait]
impl Handler<TakeOrder> for MakerCfdActor {
async fn handle(&mut self, msg: TakeOrder, _ctx: &mut Context<Self>) -> Result<()> {
tracing::debug!(%msg.taker_id, %msg.quantity, %msg.order_id,
"Taker wants to take an order"
);
let mut conn = self.db.acquire().await?;
// 1. Validate if order is still valid
let current_order = match self.current_order_id {
Some(current_order_id) if current_order_id == msg.order_id => {
load_order_by_id(current_order_id, &mut conn).await.unwrap()
}
_ => {
self.takers()?
.do_send_async(maker_inc_connections_actor::TakerMessage {
taker_id: msg.taker_id,
command: TakerCommand::NotifyInvalidOrderId { id: msg.order_id },
})
.await?;
// TODO: Return an error here?
return Ok(());
}
};
// 2. Insert CFD in DB
// TODO: Don't auto-accept, present to user in UI instead
let cfd = Cfd::new(
current_order.clone(),
msg.quantity,
CfdState::Accepted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
);
insert_cfd(cfd, &mut conn).await?;
self.takers()?
.do_send_async(maker_inc_connections_actor::TakerMessage {
taker_id: msg.taker_id,
command: TakerCommand::NotifyOrderAccepted { id: msg.order_id },
})
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
// 3. Remove current order
self.current_order_id = None;
self.takers()?
.do_send_async(maker_inc_connections_actor::BroadcastOrder(None))
.await?;
self.order_feed_sender.send(None)?;
Ok(())
}
}
macro_rules! log_error {
($future:expr) => {
if let Err(e) = $future.await {
tracing::error!(%e);
}
};
}
#[async_trait]
impl Handler<NewOrder> for MakerCfdActor {
async fn handle(&mut self, msg: NewOrder, _ctx: &mut Context<Self>) -> Result<()> {
log_error!(self.handle_new_order(msg));
Ok(())
}
}
#[async_trait]
impl Handler<StartContractSetup> for MakerCfdActor {
async fn handle(&mut self, msg: StartContractSetup, ctx: &mut Context<Self>) -> Result<()> {
log_error!(self.handle_start_contract_setup(msg, ctx));
Ok(())
}
}
#[async_trait]
impl Handler<NewTakerOnline> for MakerCfdActor {
async fn handle(&mut self, msg: NewTakerOnline, _ctx: &mut Context<Self>) -> Result<()> {
log_error!(self.handle_new_taker_online(msg));
Ok(())
}
}
#[async_trait]
impl Handler<IncProtocolMsg> for MakerCfdActor {
async fn handle(&mut self, msg: IncProtocolMsg, _ctx: &mut Context<Self>) -> Result<()> {
log_error!(self.handle_inc_protocol_msg(msg));
Ok(())
}
}
#[async_trait]
impl Handler<CfdSetupCompleted> for MakerCfdActor {
async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context<Self>) -> Result<()> {
let mut conn = self.db.acquire().await?;
self.current_contract_setup = None;
self.contract_setup_message_buffer = vec![];
insert_new_cfd_state_by_order_id(
msg.order_id,
CfdState::PendingOpen {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
dlc: msg.dlc.clone(),
},
&mut conn,
)
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
let txid = self
.wallet
.try_broadcast_transaction(msg.dlc.lock)
.await
.unwrap();
tracing::info!("Lock transaction published with txid {}", txid);
// TODO: tx monitoring, once confirmed with x blocks transition the Cfd to
// Open
Ok(())
}
}
(actor, sender)
#[async_trait]
impl Handler<SyncWallet> for MakerCfdActor {
async fn handle(&mut self, _msg: SyncWallet, _ctx: &mut Context<Self>) -> Result<()> {
log_error!(self.handle_sync_wallet());
Ok(())
}
}

236
daemon/src/maker_inc_connections_actor.rs

@ -1,95 +1,150 @@
use crate::maker_cfd_actor::MakerCfdActor;
use crate::model::cfd::{Order, OrderId};
use crate::model::TakerId;
use crate::wire::SetupMsg;
use crate::{maker_cfd_actor, maker_inc_connections_actor, send_wire_message_actor, wire};
use crate::{maker_cfd_actor, wire};
use anyhow::{Context as AnyhowContext, Result};
use futures::{Future, StreamExt};
use std::collections::HashMap;
use tokio::net::tcp::OwnedReadHalf;
use tokio::net::TcpListener;
use tokio::sync::mpsc;
use tokio_util::codec::{FramedRead, LengthDelimitedCodec};
use async_trait::async_trait;
use xtra::prelude::*;
type MakerToTakerSender = mpsc::UnboundedSender<wire::MakerToTaker>;
pub struct BroadcastOrder(pub Option<Order>);
impl Message for BroadcastOrder {
type Result = Result<()>;
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum Command {
BroadcastOrder(Option<Order>),
SendOrder {
order: Option<Order>,
taker_id: TakerId,
},
NotifyInvalidOrderId {
id: OrderId,
taker_id: TakerId,
},
NotifyOrderAccepted {
id: OrderId,
taker_id: TakerId,
},
OutProtocolMsg {
taker_id: TakerId,
msg: SetupMsg,
},
pub enum TakerCommand {
SendOrder { order: Option<Order> },
NotifyInvalidOrderId { id: OrderId },
NotifyOrderAccepted { id: OrderId },
OutProtocolMsg { setup_msg: SetupMsg },
}
pub fn new(
listener: TcpListener,
cfd_maker_actor_inbox: mpsc::UnboundedSender<maker_cfd_actor::Command>,
mut our_inbox: mpsc::UnboundedReceiver<maker_inc_connections_actor::Command>,
) -> impl Future<Output = ()> {
let mut write_connections =
HashMap::<TakerId, mpsc::UnboundedSender<wire::MakerToTaker>>::new();
pub struct TakerMessage {
pub taker_id: TakerId,
pub command: TakerCommand,
}
async move {
loop {
tokio::select! {
Ok((socket, remote_addr)) = listener.accept() => {
tracing::info!("Connected to {}", remote_addr);
let taker_id = TakerId::default();
let (read, write) = socket.into_split();
let in_taker_actor = in_taker_messages(read, cfd_maker_actor_inbox.clone(), taker_id);
let (out_message_actor, out_message_actor_inbox) = send_wire_message_actor::new(write);
tokio::spawn(in_taker_actor);
tokio::spawn(out_message_actor);
cfd_maker_actor_inbox.send(maker_cfd_actor::Command::NewTakerOnline{id : taker_id}).unwrap();
write_connections.insert(taker_id, out_message_actor_inbox);
},
Some(message) = our_inbox.recv() => {
match message {
maker_inc_connections_actor::Command::NotifyInvalidOrderId { id, taker_id } => {
let conn = write_connections.get(&taker_id).expect("no connection to taker_id");
conn.send(wire::MakerToTaker::InvalidOrderId(id)).unwrap();
},
maker_inc_connections_actor::Command::BroadcastOrder(order) => {
for conn in write_connections.values() {
conn.send(wire::MakerToTaker::CurrentOrder(order.clone())).unwrap();
}
},
maker_inc_connections_actor::Command::SendOrder {order, taker_id} => {
let conn = write_connections.get(&taker_id).expect("no connection to taker_id");
conn.send(wire::MakerToTaker::CurrentOrder(order)).unwrap();
},
maker_inc_connections_actor::Command::NotifyOrderAccepted { id, taker_id } => {
let conn = write_connections.get(&taker_id).expect("no connection to taker_id");
conn.send(wire::MakerToTaker::ConfirmTakeOrder(id)).unwrap();
},
maker_inc_connections_actor::Command::OutProtocolMsg { taker_id, msg } => {
let conn = write_connections.get(&taker_id).expect("no connection to taker_id");
conn.send(wire::MakerToTaker::Protocol(msg)).unwrap();
}
}
}
impl Message for TakerMessage {
type Result = Result<()>;
}
pub struct NewTakerOnline {
pub taker_id: TakerId,
pub out_msg_actor_inbox: MakerToTakerSender,
}
impl Message for NewTakerOnline {
type Result = Result<()>;
}
pub struct MakerIncConnectionsActor {
write_connections: HashMap<TakerId, MakerToTakerSender>,
cfd_maker_actor_address: Address<MakerCfdActor>,
}
impl Actor for MakerIncConnectionsActor {}
impl MakerIncConnectionsActor {
pub fn new(cfd_maker_actor_address: Address<MakerCfdActor>) -> Self {
Self {
write_connections: HashMap::<TakerId, MakerToTakerSender>::new(),
cfd_maker_actor_address,
}
}
fn send_to_taker(&self, taker_id: TakerId, msg: wire::MakerToTaker) -> Result<()> {
let conn = self
.write_connections
.get(&taker_id)
.context("no connection to taker_id")?;
conn.send(msg)?;
Ok(())
}
async fn handle_broadcast_order(&mut self, msg: BroadcastOrder) -> Result<()> {
let order = msg.0;
self.write_connections
.values()
.try_for_each(|conn| conn.send(wire::MakerToTaker::CurrentOrder(order.clone())))?;
Ok(())
}
async fn handle_taker_message(&mut self, msg: TakerMessage) -> Result<()> {
match msg.command {
TakerCommand::SendOrder { order } => {
self.send_to_taker(msg.taker_id, wire::MakerToTaker::CurrentOrder(order))?;
}
TakerCommand::NotifyInvalidOrderId { id } => {
self.send_to_taker(msg.taker_id, wire::MakerToTaker::InvalidOrderId(id))?;
}
TakerCommand::NotifyOrderAccepted { id } => {
self.send_to_taker(msg.taker_id, wire::MakerToTaker::ConfirmTakeOrder(id))?;
}
TakerCommand::OutProtocolMsg { setup_msg } => {
self.send_to_taker(msg.taker_id, wire::MakerToTaker::Protocol(setup_msg))?;
}
}
Ok(())
}
async fn handle_new_taker_online(&mut self, msg: NewTakerOnline) -> Result<()> {
self.cfd_maker_actor_address
.do_send_async(maker_cfd_actor::NewTakerOnline { id: msg.taker_id })
.await?;
self.write_connections
.insert(msg.taker_id, msg.out_msg_actor_inbox);
Ok(())
}
}
fn in_taker_messages(
macro_rules! log_error {
($future:expr) => {
if let Err(e) = $future.await {
tracing::error!(%e);
}
};
}
#[async_trait]
impl Handler<BroadcastOrder> for MakerIncConnectionsActor {
async fn handle(&mut self, msg: BroadcastOrder, _ctx: &mut Context<Self>) -> Result<()> {
log_error!(self.handle_broadcast_order(msg));
Ok(())
}
}
#[async_trait]
impl Handler<TakerMessage> for MakerIncConnectionsActor {
async fn handle(&mut self, msg: TakerMessage, _ctx: &mut Context<Self>) -> Result<()> {
log_error!(self.handle_taker_message(msg));
Ok(())
}
}
#[async_trait]
impl Handler<NewTakerOnline> for MakerIncConnectionsActor {
async fn handle(&mut self, msg: NewTakerOnline, _ctx: &mut Context<Self>) -> Result<()> {
log_error!(self.handle_new_taker_online(msg));
Ok(())
}
}
//
pub fn in_taker_messages(
read: OwnedReadHalf,
cfd_actor_inbox: mpsc::UnboundedSender<maker_cfd_actor::Command>,
cfd_actor_inbox: Address<MakerCfdActor>,
taker_id: TakerId,
) -> impl Future<Output = ()> {
let mut messages = FramedRead::new(read, LengthDelimitedCodec::new()).map(|result| {
@ -100,19 +155,28 @@ fn in_taker_messages(
async move {
while let Some(message) = messages.next().await {
match message {
Ok(wire::TakerToMaker::TakeOrder { order_id, quantity }) => cfd_actor_inbox
.send(maker_cfd_actor::Command::TakeOrder {
taker_id,
order_id,
quantity,
})
.unwrap(),
Ok(wire::TakerToMaker::StartContractSetup(order_id)) => cfd_actor_inbox
.send(maker_cfd_actor::Command::StartContractSetup { taker_id, order_id })
.unwrap(),
Ok(wire::TakerToMaker::Protocol(msg)) => cfd_actor_inbox
.send(maker_cfd_actor::Command::IncProtocolMsg(msg))
.unwrap(),
Ok(wire::TakerToMaker::TakeOrder { order_id, quantity }) => {
cfd_actor_inbox
.do_send_async(maker_cfd_actor::TakeOrder {
taker_id,
order_id,
quantity,
})
.await
.unwrap();
}
Ok(wire::TakerToMaker::StartContractSetup(order_id)) => {
cfd_actor_inbox
.do_send_async(maker_cfd_actor::StartContractSetup { taker_id, order_id })
.await
.unwrap();
}
Ok(wire::TakerToMaker::Protocol(msg)) => {
cfd_actor_inbox
.do_send_async(maker_cfd_actor::IncProtocolMsg(msg))
.await
.unwrap();
}
Err(error) => {
tracing::error!(%error, "Error in reading message");
}

12
daemon/src/routes_maker.rs

@ -1,5 +1,5 @@
use crate::auth::Authenticated;
use crate::maker_cfd_actor;
use crate::maker_cfd_actor::{self, MakerCfdActor};
use crate::model::cfd::{Cfd, Order, Origin};
use crate::model::{Usd, WalletInfo};
use crate::routes::EmbeddedFileExt;
@ -15,7 +15,8 @@ use serde::Deserialize;
use std::borrow::Cow;
use std::path::PathBuf;
use tokio::select;
use tokio::sync::{mpsc, watch};
use tokio::sync::watch;
use xtra::Address;
#[rocket::get("/feed")]
pub async fn maker_feed(
@ -71,7 +72,7 @@ pub struct CfdNewOrderRequest {
#[rocket::post("/order/sell", data = "<order>")]
pub async fn post_sell_order(
order: Json<CfdNewOrderRequest>,
cfd_actor_inbox: &State<mpsc::UnboundedSender<maker_cfd_actor::Command>>,
cfd_actor_address: &State<Address<MakerCfdActor>>,
_auth: Authenticated,
) -> Result<status::Accepted<()>, status::BadRequest<String>> {
let order = Order::from_default_with_price(order.price, Origin::Ours)
@ -79,8 +80,9 @@ pub async fn post_sell_order(
.with_min_quantity(order.min_quantity)
.with_max_quantity(order.max_quantity);
cfd_actor_inbox
.send(maker_cfd_actor::Command::NewOrder(order))
cfd_actor_address
.do_send_async(maker_cfd_actor::NewOrder(order))
.await
.expect("actor to always be available");
Ok(status::Accepted(None))

Loading…
Cancel
Save