Browse Source

Merge pull request #98 from comit-network/xtra-maker

Roll-out xtra framework usage in maker
no-contract-setup-message
Mariusz 3 years ago
committed by GitHub
parent
commit
826d5109ca
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 193
      Cargo.lock
  2. 2
      daemon/Cargo.toml
  3. 71
      daemon/src/maker.rs
  4. 477
      daemon/src/maker_cfd_actor.rs
  5. 212
      daemon/src/maker_inc_connections_actor.rs
  6. 12
      daemon/src/routes_maker.rs

193
Cargo.lock

@ -116,6 +116,18 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "barrage"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "756c265ddc2c445724b688455c42ec724590635fa71b47eaff7b260dc95fa7c9"
dependencies = [
"concurrent-queue",
"event-listener",
"loom 0.3.6",
"spinny",
]
[[package]] [[package]]
name = "base64" name = "base64"
version = "0.10.1" version = "0.10.1"
@ -259,6 +271,21 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
[[package]]
name = "cache-padded"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba"
[[package]]
name = "catty"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d231959e9442d4c614ecc961178c44fce85d494484281d8055167d87993e61b"
dependencies = [
"spin 0.7.1",
]
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.0.70" version = "1.0.70"
@ -280,6 +307,12 @@ dependencies = [
"thiserror", "thiserror",
] ]
[[package]]
name = "cfg-if"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]] [[package]]
name = "cfg-if" name = "cfg-if"
version = "1.0.0" version = "1.0.0"
@ -338,6 +371,15 @@ dependencies = [
"bitflags", "bitflags",
] ]
[[package]]
name = "concurrent-queue"
version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3"
dependencies = [
"cache-padded",
]
[[package]] [[package]]
name = "cookie" name = "cookie"
version = "0.16.0-rc.1" version = "0.16.0-rc.1"
@ -379,7 +421,7 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81156fece84ab6a9f2afdb109ce3ae577e42b1228441eded99bd77f627953b1a" checksum = "81156fece84ab6a9f2afdb109ce3ae577e42b1228441eded99bd77f627953b1a"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
] ]
[[package]] [[package]]
@ -388,7 +430,7 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
"crossbeam-utils", "crossbeam-utils",
] ]
@ -398,7 +440,7 @@ version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd" checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
"crossbeam-utils", "crossbeam-utils",
"lazy_static", "lazy_static",
"memoffset", "memoffset",
@ -411,7 +453,7 @@ version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b10ddc024425c88c2ad148c1b0fd53f4c6d38db9697c9f1588381212fa657c9" checksum = "9b10ddc024425c88c2ad148c1b0fd53f4c6d38db9697c9f1588381212fa657c9"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
"crossbeam-utils", "crossbeam-utils",
] ]
@ -421,7 +463,7 @@ version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
"lazy_static", "lazy_static",
] ]
@ -440,6 +482,7 @@ name = "daemon"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait",
"atty", "atty",
"bdk", "bdk",
"cfd_protocol", "cfd_protocol",
@ -466,6 +509,7 @@ dependencies = [
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"uuid", "uuid",
"xtra",
] ]
[[package]] [[package]]
@ -582,9 +626,15 @@ version = "0.8.28"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80df024fbc5ac80f87dfef0d9f5209a252f2a497f7f42944cff24d8253cac065" checksum = "80df024fbc5ac80f87dfef0d9f5209a252f2a497f7f42944cff24d8253cac065"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
] ]
[[package]]
name = "event-listener"
version = "2.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59"
[[package]] [[package]]
name = "figment" name = "figment"
version = "0.10.6" version = "0.10.6"
@ -599,6 +649,18 @@ dependencies = [
"version_check", "version_check",
] ]
[[package]]
name = "flume"
version = "0.10.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24c3fd473b3a903a62609e413ed7538f99e10b665ecb502b5e481a95283f8ab4"
dependencies = [
"futures-core",
"futures-sink",
"pin-project",
"spin 0.9.2",
]
[[package]] [[package]]
name = "fnv" name = "fnv"
version = "1.0.7" version = "1.0.7"
@ -715,6 +777,12 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d3d00f4eddb73e498a54394f228cd55853bdf059259e8e7bc6e69d408892e99" checksum = "1d3d00f4eddb73e498a54394f228cd55853bdf059259e8e7bc6e69d408892e99"
[[package]]
name = "futures-timer"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
[[package]] [[package]]
name = "futures-util" name = "futures-util"
version = "0.3.17" version = "0.3.17"
@ -745,6 +813,19 @@ dependencies = [
"byteorder", "byteorder",
] ]
[[package]]
name = "generator"
version = "0.6.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "061d3be1afec479d56fa3bd182bf966c7999ec175fcfdb87ac14d417241366c6"
dependencies = [
"cc",
"libc",
"log",
"rustversion",
"winapi 0.3.9",
]
[[package]] [[package]]
name = "generator" name = "generator"
version = "0.7.0" version = "0.7.0"
@ -774,7 +855,7 @@ version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
"js-sys", "js-sys",
"libc", "libc",
"wasi 0.9.0+wasi-snapshot-preview1", "wasi 0.9.0+wasi-snapshot-preview1",
@ -787,7 +868,7 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
"libc", "libc",
"wasi 0.10.2+wasi-snapshot-preview1", "wasi 0.10.2+wasi-snapshot-preview1",
] ]
@ -977,7 +1058,7 @@ version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bee0328b1209d157ef001c94dd85b4f8f64139adb0eac2659f4b08382b2f474d" checksum = "bee0328b1209d157ef001c94dd85b4f8f64139adb0eac2659f4b08382b2f474d"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
] ]
[[package]] [[package]]
@ -1042,7 +1123,19 @@ version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
]
[[package]]
name = "loom"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0e8460f2f2121162705187214720353c517b97bdfb3494c0b1e33d83ebe4bed"
dependencies = [
"cfg-if 0.1.10",
"futures-util",
"generator 0.6.25",
"scoped-tls",
] ]
[[package]] [[package]]
@ -1051,8 +1144,8 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2111607c723d7857e0d8299d5ce7a0bf4b844d3e44f8de136b13da513eaf8fc4" checksum = "2111607c723d7857e0d8299d5ce7a0bf4b844d3e44f8de136b13da513eaf8fc4"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
"generator", "generator 0.7.0",
"scoped-tls", "scoped-tls",
"serde", "serde",
"serde_json", "serde_json",
@ -1241,7 +1334,7 @@ version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
"instant", "instant",
"libc", "libc",
"redox_syscall", "redox_syscall",
@ -1278,6 +1371,26 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pin-project"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "pin-project-lite" name = "pin-project-lite"
version = "0.2.7" version = "0.2.7"
@ -1296,6 +1409,12 @@ version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c" checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c"
[[package]]
name = "pollster"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb20dcc30536a1508e75d47dd0e399bb2fe7354dcf35cda9127f2bf1ed92e30e"
[[package]] [[package]]
name = "ppv-lite86" name = "ppv-lite86"
version = "0.2.10" version = "0.2.10"
@ -2012,7 +2131,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b69f9a4c9740d74c5baa3fd2e547f9525fa8088a8a958e0ca2409a514e33f5fa" checksum = "b69f9a4c9740d74c5baa3fd2e547f9525fa8088a8a958e0ca2409a514e33f5fa"
dependencies = [ dependencies = [
"block-buffer", "block-buffer",
"cfg-if", "cfg-if 1.0.0",
"cpufeatures", "cpufeatures",
"digest", "digest",
"opaque-debug", "opaque-debug",
@ -2092,11 +2211,31 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spin"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13287b4da9d1207a4f4929ac390916d64eacfe236a487e9a9f5b3be392be5162"
[[package]] [[package]]
name = "spin" name = "spin"
version = "0.9.2" version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "511254be0c5bcf062b019a6c89c01a664aa359ded62f78aa72c6fc137c0590e5" checksum = "511254be0c5bcf062b019a6c89c01a664aa359ded62f78aa72c6fc137c0590e5"
dependencies = [
"lock_api",
]
[[package]]
name = "spinny"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66f5e2008c6e3864566a0dfa4717946ebdbc7555810b7c0c9266fd41c6d7a2a4"
dependencies = [
"lock_api",
"loom 0.3.6",
"once_cell",
]
[[package]] [[package]]
name = "sqlformat" name = "sqlformat"
@ -2215,7 +2354,7 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87cf4f5369e6d3044b5e365c9690f451516ac8f0954084622b49ea3fde2f6de5" checksum = "87cf4f5369e6d3044b5e365c9690f451516ac8f0954084622b49ea3fde2f6de5"
dependencies = [ dependencies = [
"loom", "loom 0.5.1",
] ]
[[package]] [[package]]
@ -2257,7 +2396,7 @@ version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22" checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
"libc", "libc",
"rand 0.8.4", "rand 0.8.4",
"redox_syscall", "redox_syscall",
@ -2431,7 +2570,7 @@ version = "0.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84f96e095c0c82419687c20ddf5cb3eadb61f4e1405923c9dc8e53a1adacbda8" checksum = "84f96e095c0c82419687c20ddf5cb3eadb61f4e1405923c9dc8e53a1adacbda8"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
"pin-project-lite", "pin-project-lite",
"tracing-attributes", "tracing-attributes",
"tracing-core", "tracing-core",
@ -2670,7 +2809,7 @@ version = "0.2.77"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e68338db6becec24d3c7977b5bf8a48be992c934b5d07177e3931f5dc9b076c" checksum = "5e68338db6becec24d3c7977b5bf8a48be992c934b5d07177e3931f5dc9b076c"
dependencies = [ dependencies = [
"cfg-if", "cfg-if 1.0.0",
"wasm-bindgen-macro", "wasm-bindgen-macro",
] ]
@ -2819,6 +2958,24 @@ dependencies = [
"winapi-build", "winapi-build",
] ]
[[package]]
name = "xtra"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd0133cb26accfd34360ab6b8fe9745d8907dcaee0cd7f8191dee4fd884e88d0"
dependencies = [
"async-trait",
"barrage",
"catty",
"flume",
"futures-core",
"futures-sink",
"futures-timer",
"futures-util",
"pollster",
"tokio",
]
[[package]] [[package]]
name = "yansi" name = "yansi"
version = "0.5.0" version = "0.5.0"

2
daemon/Cargo.toml

@ -5,6 +5,7 @@ edition = "2018"
[dependencies] [dependencies]
anyhow = "1" anyhow = "1"
async-trait = "0.1.51"
atty = "0.2" atty = "0.2"
bdk = { git = "https://github.com/bitcoindevkit/bdk/" } bdk = { git = "https://github.com/bitcoindevkit/bdk/" }
cfd_protocol = { path = "../cfd_protocol" } cfd_protocol = { path = "../cfd_protocol" }
@ -30,6 +31,7 @@ tokio-util = { version = "0.6", features = ["codec"] }
tracing = { version = "0.1" } tracing = { version = "0.1" }
tracing-subscriber = { version = "0.2", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log", "json"] } tracing-subscriber = { version = "0.2", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log", "json"] }
uuid = { version = "0.8", features = ["serde", "v4"] } uuid = { version = "0.8", features = ["serde", "v4"] }
xtra = { version = "0.5", features = ["with-tokio-1"] }
[[bin]] [[bin]]
name = "taker" name = "taker"

71
daemon/src/maker.rs

@ -1,5 +1,6 @@
use crate::auth::MAKER_USERNAME; use crate::auth::MAKER_USERNAME;
use crate::maker_cfd_actor::Command; use crate::maker_inc_connections_actor::in_taker_messages;
use crate::model::TakerId;
use crate::seed::Seed; use crate::seed::Seed;
use crate::wallet::Wallet; use crate::wallet::Wallet;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
@ -12,8 +13,10 @@ use rocket::fairing::AdHoc;
use rocket_db_pools::Database; use rocket_db_pools::Database;
use std::path::PathBuf; use std::path::PathBuf;
use std::time::Duration; use std::time::Duration;
use tokio::sync::{mpsc, watch}; use tokio::sync::watch;
use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::filter::LevelFilter;
use xtra::prelude::*;
use xtra::spawn::TokioGlobalSpawnExt;
mod auth; mod auth;
mod db; mod db;
@ -137,23 +140,56 @@ async fn main() -> Result<()> {
None => return Err(rocket), None => return Err(rocket),
}; };
let (connections_actor_inbox_sender, connections_actor_inbox_recv) = let cfd_maker_actor_inbox = maker_cfd_actor::MakerCfdActor::new(
mpsc::unbounded_channel();
let (cfd_maker_actor, cfd_maker_actor_inbox) = maker_cfd_actor::new(
db, db,
wallet, wallet,
schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle), schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle),
connections_actor_inbox_sender,
cfd_feed_sender, cfd_feed_sender,
order_feed_sender, order_feed_sender,
wallet_feed_sender, wallet_feed_sender,
); )
let connections_actor = maker_inc_connections_actor::new( .await
listener, .create(None)
.spawn_global();
let maker_inc_connections_address =
maker_inc_connections_actor::MakerIncConnectionsActor::new(
cfd_maker_actor_inbox.clone(),
)
.create(None)
.spawn_global();
tokio::spawn({
let cfd_maker_actor_inbox = cfd_maker_actor_inbox.clone();
let maker_inc_connections_address = maker_inc_connections_address.clone();
async move {
loop {
if let Ok((socket, remote_addr)) = listener.accept().await {
tracing::info!("Connected to {}", remote_addr);
let taker_id = TakerId::default();
let (read, write) = socket.into_split();
let in_taker_actor = in_taker_messages(
read,
cfd_maker_actor_inbox.clone(), cfd_maker_actor_inbox.clone(),
connections_actor_inbox_recv, taker_id,
);
let (out_msg_actor, out_msg_actor_inbox) =
send_wire_message_actor::new::<wire::MakerToTaker>(write);
tokio::spawn(in_taker_actor);
tokio::spawn(out_msg_actor);
maker_inc_connections_address.do_send_async(
maker_inc_connections_actor::NewTakerOnline {
taker_id,
out_msg_actor_inbox,
},
); );
};
}
}
});
// consecutive wallet syncs handled by task that triggers sync // consecutive wallet syncs handled by task that triggers sync
let wallet_sync_interval = Duration::from_secs(10); let wallet_sync_interval = Duration::from_secs(10);
@ -161,14 +197,21 @@ async fn main() -> Result<()> {
let cfd_actor_inbox = cfd_maker_actor_inbox.clone(); let cfd_actor_inbox = cfd_maker_actor_inbox.clone();
async move { async move {
loop { loop {
cfd_actor_inbox.send(Command::SyncWallet).unwrap(); cfd_actor_inbox
.do_send_async(maker_cfd_actor::SyncWallet)
.await
.unwrap();
tokio::time::sleep(wallet_sync_interval).await; tokio::time::sleep(wallet_sync_interval).await;
} }
} }
}); });
tokio::spawn(cfd_maker_actor); cfd_maker_actor_inbox
tokio::spawn(connections_actor); .do_send_async(maker_cfd_actor::Initialized(
maker_inc_connections_address.clone(),
))
.await
.expect("not to fail after actors were initialized");
Ok(rocket.manage(cfd_maker_actor_inbox)) Ok(rocket.manage(cfd_maker_actor_inbox))
}, },

477
daemon/src/maker_cfd_actor.rs

@ -2,204 +2,191 @@ use crate::db::{
insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds, insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds,
load_cfd_by_order_id, load_order_by_id, load_cfd_by_order_id, load_order_by_id,
}; };
use crate::maker_inc_connections_actor::{MakerIncConnectionsActor, TakerCommand};
use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId}; use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId};
use crate::model::{TakerId, Usd, WalletInfo}; use crate::model::{TakerId, Usd, WalletInfo};
use crate::wallet::Wallet; use crate::wallet::Wallet;
use crate::wire::SetupMsg; use crate::wire::SetupMsg;
use crate::{maker_cfd_actor, maker_inc_connections_actor, setup_contract_actor}; use crate::{maker_inc_connections_actor, setup_contract_actor};
use anyhow::{Context as AnyhowContext, Result};
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::secp256k1::schnorrsig;
use futures::Future;
use std::time::SystemTime; use std::time::SystemTime;
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
use xtra::prelude::*;
#[allow(clippy::large_enum_variant)] pub struct Initialized(pub Address<MakerIncConnectionsActor>);
#[derive(Debug)]
pub enum Command { impl Message for Initialized {
SyncWallet, type Result = Result<()>;
TakeOrder { }
taker_id: TakerId,
order_id: OrderId, pub struct TakeOrder {
quantity: Usd, pub taker_id: TakerId,
}, pub order_id: OrderId,
NewOrder(Order), pub quantity: Usd,
StartContractSetup { }
taker_id: TakerId,
order_id: OrderId, impl Message for TakeOrder {
}, type Result = Result<()>;
NewTakerOnline { }
id: TakerId,
}, pub struct NewOrder(pub Order);
IncProtocolMsg(SetupMsg),
CfdSetupCompleted { impl Message for NewOrder {
order_id: OrderId, type Result = Result<()>;
dlc: Dlc, }
},
pub struct StartContractSetup {
pub taker_id: TakerId,
pub order_id: OrderId,
}
impl Message for StartContractSetup {
type Result = Result<()>;
}
pub struct NewTakerOnline {
pub id: TakerId,
}
impl Message for NewTakerOnline {
type Result = Result<()>;
}
pub struct IncProtocolMsg(pub SetupMsg);
impl Message for IncProtocolMsg {
type Result = Result<()>;
}
pub struct CfdSetupCompleted {
pub order_id: OrderId,
pub dlc: Dlc,
} }
pub fn new( impl Message for CfdSetupCompleted {
type Result = Result<()>;
}
pub struct SyncWallet;
impl Message for SyncWallet {
type Result = Result<()>;
}
pub struct MakerCfdActor {
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Wallet,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
takers: mpsc::UnboundedSender<maker_inc_connections_actor::Command>,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_sender: watch::Sender<Option<Order>>, order_feed_sender: watch::Sender<Option<Order>>,
wallet_feed_sender: watch::Sender<WalletInfo>, wallet_feed_sender: watch::Sender<WalletInfo>,
) -> ( takers: Option<Address<MakerIncConnectionsActor>>,
impl Future<Output = ()>, current_order_id: Option<OrderId>,
mpsc::UnboundedSender<maker_cfd_actor::Command>, current_contract_setup: Option<mpsc::UnboundedSender<SetupMsg>>,
) {
let (sender, mut receiver) = mpsc::unbounded_channel();
// TODO: Move the contract setup into a dedicated actor and send messages to that actor that // TODO: Move the contract setup into a dedicated actor and send messages to that actor that
// manages the state instead of this ugly buffer // manages the state instead of this ugly buffer
let mut current_contract_setup = None; contract_setup_message_buffer: Vec<SetupMsg>,
let mut contract_setup_message_buffer = vec![]; }
let mut current_order_id = None; impl xtra::Actor for MakerCfdActor {}
let actor = { impl MakerCfdActor {
let sender = sender.clone(); pub async fn new(
db: sqlx::SqlitePool,
wallet: Wallet,
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_sender: watch::Sender<Option<Order>>,
wallet_feed_sender: watch::Sender<WalletInfo>,
) -> Self {
let mut conn = db.acquire().await.unwrap();
async move {
// populate the CFD feed with existing CFDs // populate the CFD feed with existing CFDs
let mut conn = db.acquire().await.unwrap();
cfd_feed_actor_inbox cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await.unwrap()) .send(load_all_cfds(&mut conn).await.unwrap())
.unwrap(); .unwrap();
while let Some(message) = receiver.recv().await { Self {
match message { db,
maker_cfd_actor::Command::SyncWallet => { wallet,
let wallet_info = wallet.sync().await.unwrap(); oracle_pk,
wallet_feed_sender.send(wallet_info).unwrap(); cfd_feed_actor_inbox,
} order_feed_sender,
maker_cfd_actor::Command::TakeOrder { wallet_feed_sender,
taker_id, takers: None,
order_id, current_order_id: None,
quantity, current_contract_setup: None,
} => { contract_setup_message_buffer: vec![],
tracing::debug!(%taker_id, %quantity, %order_id,
"Taker wants to take an order"
);
let mut conn = db.acquire().await.unwrap();
// 1. Validate if order is still valid
let current_order = match current_order_id {
Some(current_order_id) if current_order_id == order_id => {
load_order_by_id(current_order_id, &mut conn).await.unwrap()
} }
_ => {
takers
.send(maker_inc_connections_actor::Command::NotifyInvalidOrderId {
id: order_id,
taker_id,
})
.unwrap();
continue;
} }
};
// 2. Insert CFD in DB
// TODO: Don't auto-accept, present to user in UI instead
let cfd = Cfd::new(
current_order.clone(),
quantity,
CfdState::Accepted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
);
insert_cfd(cfd, &mut conn).await.unwrap();
takers async fn handle_new_order(&mut self, msg: NewOrder) -> Result<()> {
.send(maker_inc_connections_actor::Command::NotifyOrderAccepted { let order = msg.0;
id: order_id,
taker_id,
})
.unwrap();
cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await.unwrap())
.unwrap();
// 3. Remove current order
current_order_id = None;
takers
.send(maker_inc_connections_actor::Command::BroadcastOrder(None))
.unwrap();
order_feed_sender.send(None).unwrap();
}
maker_cfd_actor::Command::NewOrder(order) => {
// 1. Save to DB // 1. Save to DB
let mut conn = db.acquire().await.unwrap(); let mut conn = self.db.acquire().await?;
insert_order(&order, &mut conn).await.unwrap(); insert_order(&order, &mut conn).await?;
// 2. Update actor state to current order // 2. Update actor state to current order
current_order_id.replace(order.id); self.current_order_id.replace(order.id);
// 3. Notify UI via feed // 3. Notify UI via feed
order_feed_sender.send(Some(order.clone())).unwrap(); self.order_feed_sender.send(Some(order.clone()))?;
// 4. Inform connected takers // 4. Inform connected takers
takers self.takers()?
.send(maker_inc_connections_actor::Command::BroadcastOrder(Some( .do_send_async(maker_inc_connections_actor::BroadcastOrder(Some(order)))
order, .await?;
))) Ok(())
.unwrap();
} }
maker_cfd_actor::Command::NewTakerOnline { id: taker_id } => {
let mut conn = db.acquire().await.unwrap();
let current_order = match current_order_id { fn takers(&self) -> Result<&Address<MakerIncConnectionsActor>> {
Some(current_order_id) => { self.takers
Some(load_order_by_id(current_order_id, &mut conn).await.unwrap()) .as_ref()
.context("Maker inc connections actor to be initialised")
} }
None => None,
};
takers async fn handle_start_contract_setup(
.send(maker_inc_connections_actor::Command::SendOrder { &mut self,
order: current_order, msg: StartContractSetup,
taker_id, ctx: &mut Context<Self>,
}) ) -> Result<()> {
.unwrap(); let StartContractSetup { taker_id, order_id } = msg;
}
maker_cfd_actor::Command::StartContractSetup { taker_id, order_id } => { tracing::info!("Starting contract setup");
println!("CONTRACT SETUP");
// Kick-off the CFD protocol // Kick-off the CFD protocol
let (sk, pk) = crate::keypair::new(&mut rand::thread_rng()); let (sk, pk) = crate::keypair::new(&mut rand::thread_rng());
let cfd = load_cfd_by_order_id(order_id, &mut conn).await.unwrap(); let mut conn = self.db.acquire().await?;
let margin = cfd.margin().unwrap();
let cfd = load_cfd_by_order_id(order_id, &mut conn).await?;
let margin = cfd.margin()?;
let maker_params = wallet.build_party_params(margin, pk).await.unwrap(); let maker_params = self.wallet.build_party_params(margin, pk).await?;
let (actor, inbox) = setup_contract_actor::new( let (actor, inbox) = setup_contract_actor::new(
{ {
let inbox = takers.clone(); let inbox = self.takers()?.clone();
move |msg| { move |msg| {
inbox inbox.send(maker_inc_connections_actor::TakerMessage {
.send(
maker_inc_connections_actor::Command::OutProtocolMsg {
taker_id, taker_id,
msg, command: TakerCommand::OutProtocolMsg { setup_msg: msg },
}, });
)
.unwrap()
} }
}, },
setup_contract_actor::OwnParams::Maker(maker_params), setup_contract_actor::OwnParams::Maker(maker_params),
sk, sk,
oracle_pk, self.oracle_pk,
cfd, cfd,
wallet.clone(), self.wallet.clone(),
); );
current_contract_setup = Some(inbox.clone()); self.current_contract_setup.replace(inbox.clone());
for msg in contract_setup_message_buffer.drain(..) { for msg in self.contract_setup_message_buffer.drain(..) {
inbox.send(msg).unwrap(); inbox.send(msg).unwrap();
} }
@ -213,70 +200,214 @@ pub fn new(
}, },
&mut conn, &mut conn,
) )
.await .await?;
.unwrap(); self.cfd_feed_actor_inbox
cfd_feed_actor_inbox .send(load_all_cfds(&mut conn).await?)?;
.send(load_all_cfds(&mut conn).await.unwrap())
.unwrap();
tokio::spawn({ let address = ctx
let sender = sender.clone(); .address()
.expect("actor to be able to give address to itself");
async move { tokio::spawn(async move {
sender address
.send(Command::CfdSetupCompleted { .do_send_async(CfdSetupCompleted {
order_id, order_id,
dlc: actor.await, dlc: actor.await,
}) })
.unwrap() .await
}
}); });
Ok(())
}
async fn handle_new_taker_online(&mut self, msg: NewTakerOnline) -> Result<()> {
let mut conn = self.db.acquire().await?;
let current_order = match self.current_order_id {
Some(current_order_id) => Some(load_order_by_id(current_order_id, &mut conn).await?),
None => None,
};
self.takers()?
.do_send_async(maker_inc_connections_actor::TakerMessage {
taker_id: msg.id,
command: TakerCommand::SendOrder {
order: current_order,
},
})
.await?;
Ok(())
} }
maker_cfd_actor::Command::IncProtocolMsg(msg) => {
let inbox = match &current_contract_setup { async fn handle_inc_protocol_msg(&mut self, msg: IncProtocolMsg) -> Result<()> {
let msg = msg.0;
let inbox = match &self.current_contract_setup {
None => { None => {
contract_setup_message_buffer.push(msg); self.contract_setup_message_buffer.push(msg);
continue; return Ok(());
} }
Some(inbox) => inbox, Some(inbox) => inbox,
}; };
inbox.send(msg)?;
Ok(())
}
inbox.send(msg).unwrap(); async fn handle_sync_wallet(&mut self) -> Result<()> {
let wallet_info = self.wallet.sync().await?;
self.wallet_feed_sender.send(wallet_info)?;
Ok(())
}
}
#[async_trait]
impl Handler<Initialized> for MakerCfdActor {
async fn handle(&mut self, msg: Initialized, _ctx: &mut Context<Self>) -> Result<()> {
self.takers.replace(msg.0);
Ok(())
}
}
#[async_trait]
impl Handler<TakeOrder> for MakerCfdActor {
async fn handle(&mut self, msg: TakeOrder, _ctx: &mut Context<Self>) -> Result<()> {
tracing::debug!(%msg.taker_id, %msg.quantity, %msg.order_id,
"Taker wants to take an order"
);
let mut conn = self.db.acquire().await?;
// 1. Validate if order is still valid
let current_order = match self.current_order_id {
Some(current_order_id) if current_order_id == msg.order_id => {
load_order_by_id(current_order_id, &mut conn).await.unwrap()
}
_ => {
self.takers()?
.do_send_async(maker_inc_connections_actor::TakerMessage {
taker_id: msg.taker_id,
command: TakerCommand::NotifyInvalidOrderId { id: msg.order_id },
})
.await?;
// TODO: Return an error here?
return Ok(());
}
};
// 2. Insert CFD in DB
// TODO: Don't auto-accept, present to user in UI instead
let cfd = Cfd::new(
current_order.clone(),
msg.quantity,
CfdState::Accepted {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
);
insert_cfd(cfd, &mut conn).await?;
self.takers()?
.do_send_async(maker_inc_connections_actor::TakerMessage {
taker_id: msg.taker_id,
command: TakerCommand::NotifyOrderAccepted { id: msg.order_id },
})
.await?;
self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await?)?;
// 3. Remove current order
self.current_order_id = None;
self.takers()?
.do_send_async(maker_inc_connections_actor::BroadcastOrder(None))
.await?;
self.order_feed_sender.send(None)?;
Ok(())
}
}
macro_rules! log_error {
($future:expr) => {
if let Err(e) = $future.await {
tracing::error!(%e);
}
};
}
#[async_trait]
impl Handler<NewOrder> for MakerCfdActor {
async fn handle(&mut self, msg: NewOrder, _ctx: &mut Context<Self>) -> Result<()> {
log_error!(self.handle_new_order(msg));
Ok(())
}
} }
maker_cfd_actor::Command::CfdSetupCompleted { order_id, dlc } => {
println!("Setup complete, publishing on chain now...");
current_contract_setup = None; #[async_trait]
contract_setup_message_buffer = vec![]; impl Handler<StartContractSetup> for MakerCfdActor {
async fn handle(&mut self, msg: StartContractSetup, ctx: &mut Context<Self>) -> Result<()> {
log_error!(self.handle_start_contract_setup(msg, ctx));
Ok(())
}
}
#[async_trait]
impl Handler<NewTakerOnline> for MakerCfdActor {
async fn handle(&mut self, msg: NewTakerOnline, _ctx: &mut Context<Self>) -> Result<()> {
log_error!(self.handle_new_taker_online(msg));
Ok(())
}
}
#[async_trait]
impl Handler<IncProtocolMsg> for MakerCfdActor {
async fn handle(&mut self, msg: IncProtocolMsg, _ctx: &mut Context<Self>) -> Result<()> {
log_error!(self.handle_inc_protocol_msg(msg));
Ok(())
}
}
#[async_trait]
impl Handler<CfdSetupCompleted> for MakerCfdActor {
async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context<Self>) -> Result<()> {
let mut conn = self.db.acquire().await?;
self.current_contract_setup = None;
self.contract_setup_message_buffer = vec![];
insert_new_cfd_state_by_order_id( insert_new_cfd_state_by_order_id(
order_id, msg.order_id,
CfdState::PendingOpen { CfdState::PendingOpen {
common: CfdStateCommon { common: CfdStateCommon {
transition_timestamp: SystemTime::now(), transition_timestamp: SystemTime::now(),
}, },
dlc: dlc.clone(), dlc: msg.dlc.clone(),
}, },
&mut conn, &mut conn,
) )
.await .await?;
.unwrap();
cfd_feed_actor_inbox self.cfd_feed_actor_inbox
.send(load_all_cfds(&mut conn).await.unwrap()) .send(load_all_cfds(&mut conn).await?)?;
.unwrap();
let txid = wallet.try_broadcast_transaction(dlc.lock).await.unwrap(); let txid = self
.wallet
.try_broadcast_transaction(msg.dlc.lock)
.await
.unwrap();
println!("Lock transaction published with txid {}", txid); tracing::info!("Lock transaction published with txid {}", txid);
// TODO: tx monitoring, once confirmed with x blocks transition the Cfd to // TODO: tx monitoring, once confirmed with x blocks transition the Cfd to
// Open // Open
Ok(())
} }
} }
}
}
};
(actor, sender) #[async_trait]
impl Handler<SyncWallet> for MakerCfdActor {
async fn handle(&mut self, _msg: SyncWallet, _ctx: &mut Context<Self>) -> Result<()> {
log_error!(self.handle_sync_wallet());
Ok(())
}
} }

212
daemon/src/maker_inc_connections_actor.rs

@ -1,95 +1,150 @@
use crate::maker_cfd_actor::MakerCfdActor;
use crate::model::cfd::{Order, OrderId}; use crate::model::cfd::{Order, OrderId};
use crate::model::TakerId; use crate::model::TakerId;
use crate::wire::SetupMsg; use crate::wire::SetupMsg;
use crate::{maker_cfd_actor, maker_inc_connections_actor, send_wire_message_actor, wire}; use crate::{maker_cfd_actor, wire};
use anyhow::{Context as AnyhowContext, Result};
use futures::{Future, StreamExt}; use futures::{Future, StreamExt};
use std::collections::HashMap; use std::collections::HashMap;
use tokio::net::tcp::OwnedReadHalf; use tokio::net::tcp::OwnedReadHalf;
use tokio::net::TcpListener;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_util::codec::{FramedRead, LengthDelimitedCodec}; use tokio_util::codec::{FramedRead, LengthDelimitedCodec};
use async_trait::async_trait;
use xtra::prelude::*;
type MakerToTakerSender = mpsc::UnboundedSender<wire::MakerToTaker>;
pub struct BroadcastOrder(pub Option<Order>);
impl Message for BroadcastOrder {
type Result = Result<()>;
}
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
#[derive(Debug)] pub enum TakerCommand {
pub enum Command { SendOrder { order: Option<Order> },
BroadcastOrder(Option<Order>), NotifyInvalidOrderId { id: OrderId },
SendOrder { NotifyOrderAccepted { id: OrderId },
order: Option<Order>, OutProtocolMsg { setup_msg: SetupMsg },
taker_id: TakerId,
},
NotifyInvalidOrderId {
id: OrderId,
taker_id: TakerId,
},
NotifyOrderAccepted {
id: OrderId,
taker_id: TakerId,
},
OutProtocolMsg {
taker_id: TakerId,
msg: SetupMsg,
},
} }
pub fn new( pub struct TakerMessage {
listener: TcpListener, pub taker_id: TakerId,
cfd_maker_actor_inbox: mpsc::UnboundedSender<maker_cfd_actor::Command>, pub command: TakerCommand,
mut our_inbox: mpsc::UnboundedReceiver<maker_inc_connections_actor::Command>, }
) -> impl Future<Output = ()> {
let mut write_connections =
HashMap::<TakerId, mpsc::UnboundedSender<wire::MakerToTaker>>::new();
async move { impl Message for TakerMessage {
loop { type Result = Result<()>;
tokio::select! { }
Ok((socket, remote_addr)) = listener.accept() => {
tracing::info!("Connected to {}", remote_addr);
let taker_id = TakerId::default();
let (read, write) = socket.into_split();
let in_taker_actor = in_taker_messages(read, cfd_maker_actor_inbox.clone(), taker_id); pub struct NewTakerOnline {
let (out_message_actor, out_message_actor_inbox) = send_wire_message_actor::new(write); pub taker_id: TakerId,
pub out_msg_actor_inbox: MakerToTakerSender,
}
tokio::spawn(in_taker_actor); impl Message for NewTakerOnline {
tokio::spawn(out_message_actor); type Result = Result<()>;
}
cfd_maker_actor_inbox.send(maker_cfd_actor::Command::NewTakerOnline{id : taker_id}).unwrap(); pub struct MakerIncConnectionsActor {
write_connections: HashMap<TakerId, MakerToTakerSender>,
cfd_maker_actor_address: Address<MakerCfdActor>,
}
write_connections.insert(taker_id, out_message_actor_inbox); impl Actor for MakerIncConnectionsActor {}
},
Some(message) = our_inbox.recv() => { impl MakerIncConnectionsActor {
match message { pub fn new(cfd_maker_actor_address: Address<MakerCfdActor>) -> Self {
maker_inc_connections_actor::Command::NotifyInvalidOrderId { id, taker_id } => { Self {
let conn = write_connections.get(&taker_id).expect("no connection to taker_id"); write_connections: HashMap::<TakerId, MakerToTakerSender>::new(),
conn.send(wire::MakerToTaker::InvalidOrderId(id)).unwrap(); cfd_maker_actor_address,
},
maker_inc_connections_actor::Command::BroadcastOrder(order) => {
for conn in write_connections.values() {
conn.send(wire::MakerToTaker::CurrentOrder(order.clone())).unwrap();
} }
},
maker_inc_connections_actor::Command::SendOrder {order, taker_id} => {
let conn = write_connections.get(&taker_id).expect("no connection to taker_id");
conn.send(wire::MakerToTaker::CurrentOrder(order)).unwrap();
},
maker_inc_connections_actor::Command::NotifyOrderAccepted { id, taker_id } => {
let conn = write_connections.get(&taker_id).expect("no connection to taker_id");
conn.send(wire::MakerToTaker::ConfirmTakeOrder(id)).unwrap();
},
maker_inc_connections_actor::Command::OutProtocolMsg { taker_id, msg } => {
let conn = write_connections.get(&taker_id).expect("no connection to taker_id");
conn.send(wire::MakerToTaker::Protocol(msg)).unwrap();
} }
fn send_to_taker(&self, taker_id: TakerId, msg: wire::MakerToTaker) -> Result<()> {
let conn = self
.write_connections
.get(&taker_id)
.context("no connection to taker_id")?;
conn.send(msg)?;
Ok(())
} }
async fn handle_broadcast_order(&mut self, msg: BroadcastOrder) -> Result<()> {
let order = msg.0;
self.write_connections
.values()
.try_for_each(|conn| conn.send(wire::MakerToTaker::CurrentOrder(order.clone())))?;
Ok(())
}
async fn handle_taker_message(&mut self, msg: TakerMessage) -> Result<()> {
match msg.command {
TakerCommand::SendOrder { order } => {
self.send_to_taker(msg.taker_id, wire::MakerToTaker::CurrentOrder(order))?;
} }
TakerCommand::NotifyInvalidOrderId { id } => {
self.send_to_taker(msg.taker_id, wire::MakerToTaker::InvalidOrderId(id))?;
} }
TakerCommand::NotifyOrderAccepted { id } => {
self.send_to_taker(msg.taker_id, wire::MakerToTaker::ConfirmTakeOrder(id))?;
} }
TakerCommand::OutProtocolMsg { setup_msg } => {
self.send_to_taker(msg.taker_id, wire::MakerToTaker::Protocol(setup_msg))?;
}
}
Ok(())
}
async fn handle_new_taker_online(&mut self, msg: NewTakerOnline) -> Result<()> {
self.cfd_maker_actor_address
.do_send_async(maker_cfd_actor::NewTakerOnline { id: msg.taker_id })
.await?;
self.write_connections
.insert(msg.taker_id, msg.out_msg_actor_inbox);
Ok(())
}
}
macro_rules! log_error {
($future:expr) => {
if let Err(e) = $future.await {
tracing::error!(%e);
}
};
}
#[async_trait]
impl Handler<BroadcastOrder> for MakerIncConnectionsActor {
async fn handle(&mut self, msg: BroadcastOrder, _ctx: &mut Context<Self>) -> Result<()> {
log_error!(self.handle_broadcast_order(msg));
Ok(())
} }
} }
fn in_taker_messages( #[async_trait]
impl Handler<TakerMessage> for MakerIncConnectionsActor {
async fn handle(&mut self, msg: TakerMessage, _ctx: &mut Context<Self>) -> Result<()> {
log_error!(self.handle_taker_message(msg));
Ok(())
}
}
#[async_trait]
impl Handler<NewTakerOnline> for MakerIncConnectionsActor {
async fn handle(&mut self, msg: NewTakerOnline, _ctx: &mut Context<Self>) -> Result<()> {
log_error!(self.handle_new_taker_online(msg));
Ok(())
}
}
//
pub fn in_taker_messages(
read: OwnedReadHalf, read: OwnedReadHalf,
cfd_actor_inbox: mpsc::UnboundedSender<maker_cfd_actor::Command>, cfd_actor_inbox: Address<MakerCfdActor>,
taker_id: TakerId, taker_id: TakerId,
) -> impl Future<Output = ()> { ) -> impl Future<Output = ()> {
let mut messages = FramedRead::new(read, LengthDelimitedCodec::new()).map(|result| { let mut messages = FramedRead::new(read, LengthDelimitedCodec::new()).map(|result| {
@ -100,19 +155,28 @@ fn in_taker_messages(
async move { async move {
while let Some(message) = messages.next().await { while let Some(message) = messages.next().await {
match message { match message {
Ok(wire::TakerToMaker::TakeOrder { order_id, quantity }) => cfd_actor_inbox Ok(wire::TakerToMaker::TakeOrder { order_id, quantity }) => {
.send(maker_cfd_actor::Command::TakeOrder { cfd_actor_inbox
.do_send_async(maker_cfd_actor::TakeOrder {
taker_id, taker_id,
order_id, order_id,
quantity, quantity,
}) })
.unwrap(), .await
Ok(wire::TakerToMaker::StartContractSetup(order_id)) => cfd_actor_inbox .unwrap();
.send(maker_cfd_actor::Command::StartContractSetup { taker_id, order_id }) }
.unwrap(), Ok(wire::TakerToMaker::StartContractSetup(order_id)) => {
Ok(wire::TakerToMaker::Protocol(msg)) => cfd_actor_inbox cfd_actor_inbox
.send(maker_cfd_actor::Command::IncProtocolMsg(msg)) .do_send_async(maker_cfd_actor::StartContractSetup { taker_id, order_id })
.unwrap(), .await
.unwrap();
}
Ok(wire::TakerToMaker::Protocol(msg)) => {
cfd_actor_inbox
.do_send_async(maker_cfd_actor::IncProtocolMsg(msg))
.await
.unwrap();
}
Err(error) => { Err(error) => {
tracing::error!(%error, "Error in reading message"); tracing::error!(%error, "Error in reading message");
} }

12
daemon/src/routes_maker.rs

@ -1,5 +1,5 @@
use crate::auth::Authenticated; use crate::auth::Authenticated;
use crate::maker_cfd_actor; use crate::maker_cfd_actor::{self, MakerCfdActor};
use crate::model::cfd::{Cfd, Order, Origin}; use crate::model::cfd::{Cfd, Order, Origin};
use crate::model::{Usd, WalletInfo}; use crate::model::{Usd, WalletInfo};
use crate::routes::EmbeddedFileExt; use crate::routes::EmbeddedFileExt;
@ -15,7 +15,8 @@ use serde::Deserialize;
use std::borrow::Cow; use std::borrow::Cow;
use std::path::PathBuf; use std::path::PathBuf;
use tokio::select; use tokio::select;
use tokio::sync::{mpsc, watch}; use tokio::sync::watch;
use xtra::Address;
#[rocket::get("/feed")] #[rocket::get("/feed")]
pub async fn maker_feed( pub async fn maker_feed(
@ -71,7 +72,7 @@ pub struct CfdNewOrderRequest {
#[rocket::post("/order/sell", data = "<order>")] #[rocket::post("/order/sell", data = "<order>")]
pub async fn post_sell_order( pub async fn post_sell_order(
order: Json<CfdNewOrderRequest>, order: Json<CfdNewOrderRequest>,
cfd_actor_inbox: &State<mpsc::UnboundedSender<maker_cfd_actor::Command>>, cfd_actor_address: &State<Address<MakerCfdActor>>,
_auth: Authenticated, _auth: Authenticated,
) -> Result<status::Accepted<()>, status::BadRequest<String>> { ) -> Result<status::Accepted<()>, status::BadRequest<String>> {
let order = Order::from_default_with_price(order.price, Origin::Ours) let order = Order::from_default_with_price(order.price, Origin::Ours)
@ -79,8 +80,9 @@ pub async fn post_sell_order(
.with_min_quantity(order.min_quantity) .with_min_quantity(order.min_quantity)
.with_max_quantity(order.max_quantity); .with_max_quantity(order.max_quantity);
cfd_actor_inbox cfd_actor_address
.send(maker_cfd_actor::Command::NewOrder(order)) .do_send_async(maker_cfd_actor::NewOrder(order))
.await
.expect("actor to always be available"); .expect("actor to always be available");
Ok(status::Accepted(None)) Ok(status::Accepted(None))

Loading…
Cancel
Save