From 46e9b96fbf4f61ef210934c0cf146f0cabdc47fc Mon Sep 17 00:00:00 2001 From: Mariusz Klochowicz Date: Wed, 22 Sep 2021 10:56:21 +0930 Subject: [PATCH] Roll-out xtra framework usage in maker --- Cargo.lock | 193 ++++++- daemon/Cargo.toml | 2 + daemon/src/maker.rs | 75 ++- daemon/src/maker_cfd_actor.rs | 637 +++++++++++++--------- daemon/src/maker_inc_connections_actor.rs | 236 +++++--- daemon/src/routes_maker.rs | 12 +- 6 files changed, 777 insertions(+), 378 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 533db23..11d3205 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -116,6 +116,18 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "barrage" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "756c265ddc2c445724b688455c42ec724590635fa71b47eaff7b260dc95fa7c9" +dependencies = [ + "concurrent-queue", + "event-listener", + "loom 0.3.6", + "spinny", +] + [[package]] name = "base64" version = "0.10.1" @@ -259,6 +271,21 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +[[package]] +name = "cache-padded" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba" + +[[package]] +name = "catty" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d231959e9442d4c614ecc961178c44fce85d494484281d8055167d87993e61b" +dependencies = [ + "spin 0.7.1", +] + [[package]] name = "cc" version = "1.0.70" @@ -280,6 +307,12 @@ dependencies = [ "thiserror", ] +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + [[package]] name = "cfg-if" version = "1.0.0" @@ -338,6 +371,15 @@ dependencies = [ "bitflags", ] +[[package]] +name = "concurrent-queue" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" +dependencies = [ + "cache-padded", +] + [[package]] name = "cookie" version = "0.16.0-rc.1" @@ -379,7 +421,7 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81156fece84ab6a9f2afdb109ce3ae577e42b1228441eded99bd77f627953b1a" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -388,7 +430,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-utils", ] @@ -398,7 +440,7 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-utils", "lazy_static", "memoffset", @@ -411,7 +453,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b10ddc024425c88c2ad148c1b0fd53f4c6d38db9697c9f1588381212fa657c9" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "crossbeam-utils", ] @@ -421,7 +463,7 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "lazy_static", ] @@ -440,6 +482,7 @@ name = "daemon" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "atty", "bdk", "cfd_protocol", @@ -466,6 +509,7 @@ dependencies = [ "tracing", "tracing-subscriber", "uuid", + "xtra", ] [[package]] @@ -582,9 +626,15 @@ version = "0.8.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80df024fbc5ac80f87dfef0d9f5209a252f2a497f7f42944cff24d8253cac065" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] +[[package]] +name = "event-listener" +version = "2.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59" + [[package]] name = "figment" version = "0.10.6" @@ -599,6 +649,18 @@ dependencies = [ "version_check", ] +[[package]] +name = "flume" +version = "0.10.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24c3fd473b3a903a62609e413ed7538f99e10b665ecb502b5e481a95283f8ab4" +dependencies = [ + "futures-core", + "futures-sink", + "pin-project", + "spin 0.9.2", +] + [[package]] name = "fnv" version = "1.0.7" @@ -715,6 +777,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d3d00f4eddb73e498a54394f228cd55853bdf059259e8e7bc6e69d408892e99" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.17" @@ -745,6 +813,19 @@ dependencies = [ "byteorder", ] +[[package]] +name = "generator" +version = "0.6.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "061d3be1afec479d56fa3bd182bf966c7999ec175fcfdb87ac14d417241366c6" +dependencies = [ + "cc", + "libc", + "log", + "rustversion", + "winapi 0.3.9", +] + [[package]] name = "generator" version = "0.7.0" @@ -774,7 +855,7 @@ version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "js-sys", "libc", "wasi 0.9.0+wasi-snapshot-preview1", @@ -787,7 +868,7 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "wasi 0.10.2+wasi-snapshot-preview1", ] @@ -977,7 +1058,7 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bee0328b1209d157ef001c94dd85b4f8f64139adb0eac2659f4b08382b2f474d" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -1042,7 +1123,19 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", +] + +[[package]] +name = "loom" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0e8460f2f2121162705187214720353c517b97bdfb3494c0b1e33d83ebe4bed" +dependencies = [ + "cfg-if 0.1.10", + "futures-util", + "generator 0.6.25", + "scoped-tls", ] [[package]] @@ -1051,8 +1144,8 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2111607c723d7857e0d8299d5ce7a0bf4b844d3e44f8de136b13da513eaf8fc4" dependencies = [ - "cfg-if", - "generator", + "cfg-if 1.0.0", + "generator 0.7.0", "scoped-tls", "serde", "serde_json", @@ -1241,7 +1334,7 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "instant", "libc", "redox_syscall", @@ -1278,6 +1371,26 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "pin-project" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.7" @@ -1296,6 +1409,12 @@ version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c" +[[package]] +name = "pollster" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb20dcc30536a1508e75d47dd0e399bb2fe7354dcf35cda9127f2bf1ed92e30e" + [[package]] name = "ppv-lite86" version = "0.2.10" @@ -2012,7 +2131,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b69f9a4c9740d74c5baa3fd2e547f9525fa8088a8a958e0ca2409a514e33f5fa" dependencies = [ "block-buffer", - "cfg-if", + "cfg-if 1.0.0", "cpufeatures", "digest", "opaque-debug", @@ -2092,11 +2211,31 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13287b4da9d1207a4f4929ac390916d64eacfe236a487e9a9f5b3be392be5162" + [[package]] name = "spin" version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "511254be0c5bcf062b019a6c89c01a664aa359ded62f78aa72c6fc137c0590e5" +dependencies = [ + "lock_api", +] + +[[package]] +name = "spinny" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66f5e2008c6e3864566a0dfa4717946ebdbc7555810b7c0c9266fd41c6d7a2a4" +dependencies = [ + "lock_api", + "loom 0.3.6", + "once_cell", +] [[package]] name = "sqlformat" @@ -2215,7 +2354,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87cf4f5369e6d3044b5e365c9690f451516ac8f0954084622b49ea3fde2f6de5" dependencies = [ - "loom", + "loom 0.5.1", ] [[package]] @@ -2257,7 +2396,7 @@ version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "rand 0.8.4", "redox_syscall", @@ -2431,7 +2570,7 @@ version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "84f96e095c0c82419687c20ddf5cb3eadb61f4e1405923c9dc8e53a1adacbda8" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -2670,7 +2809,7 @@ version = "0.2.77" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e68338db6becec24d3c7977b5bf8a48be992c934b5d07177e3931f5dc9b076c" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "wasm-bindgen-macro", ] @@ -2819,6 +2958,24 @@ dependencies = [ "winapi-build", ] +[[package]] +name = "xtra" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd0133cb26accfd34360ab6b8fe9745d8907dcaee0cd7f8191dee4fd884e88d0" +dependencies = [ + "async-trait", + "barrage", + "catty", + "flume", + "futures-core", + "futures-sink", + "futures-timer", + "futures-util", + "pollster", + "tokio", +] + [[package]] name = "yansi" version = "0.5.0" diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml index eb86e93..6797aaf 100644 --- a/daemon/Cargo.toml +++ b/daemon/Cargo.toml @@ -5,6 +5,7 @@ edition = "2018" [dependencies] anyhow = "1" +async-trait = "0.1.51" atty = "0.2" bdk = { git = "https://github.com/bitcoindevkit/bdk/" } cfd_protocol = { path = "../cfd_protocol" } @@ -30,6 +31,7 @@ tokio-util = { version = "0.6", features = ["codec"] } tracing = { version = "0.1" } tracing-subscriber = { version = "0.2", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log", "json"] } uuid = { version = "0.8", features = ["serde", "v4"] } +xtra = { version = "0.5", features = ["with-tokio-1"] } [[bin]] name = "taker" diff --git a/daemon/src/maker.rs b/daemon/src/maker.rs index 9b06ebf..46a36b2 100644 --- a/daemon/src/maker.rs +++ b/daemon/src/maker.rs @@ -1,5 +1,6 @@ use crate::auth::MAKER_USERNAME; -use crate::maker_cfd_actor::Command; +use crate::maker_inc_connections_actor::in_taker_messages; +use crate::model::TakerId; use crate::seed::Seed; use crate::wallet::Wallet; use anyhow::{Context, Result}; @@ -12,8 +13,10 @@ use rocket::fairing::AdHoc; use rocket_db_pools::Database; use std::path::PathBuf; use std::time::Duration; -use tokio::sync::{mpsc, watch}; +use tokio::sync::watch; use tracing_subscriber::filter::LevelFilter; +use xtra::prelude::*; +use xtra::spawn::TokioGlobalSpawnExt; mod auth; mod db; @@ -137,23 +140,56 @@ async fn main() -> Result<()> { None => return Err(rocket), }; - let (connections_actor_inbox_sender, connections_actor_inbox_recv) = - mpsc::unbounded_channel(); - - let (cfd_maker_actor, cfd_maker_actor_inbox) = maker_cfd_actor::new( + let cfd_maker_actor_inbox = maker_cfd_actor::MakerCfdActor::new( db, wallet, schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle), - connections_actor_inbox_sender, cfd_feed_sender, order_feed_sender, wallet_feed_sender, - ); - let connections_actor = maker_inc_connections_actor::new( - listener, - cfd_maker_actor_inbox.clone(), - connections_actor_inbox_recv, - ); + ) + .await + .create(None) + .spawn_global(); + + let maker_inc_connections_address = + maker_inc_connections_actor::MakerIncConnectionsActor::new( + cfd_maker_actor_inbox.clone(), + ) + .create(None) + .spawn_global(); + + tokio::spawn({ + let cfd_maker_actor_inbox = cfd_maker_actor_inbox.clone(); + let maker_inc_connections_address = maker_inc_connections_address.clone(); + async move { + loop { + if let Ok((socket, remote_addr)) = listener.accept().await { + tracing::info!("Connected to {}", remote_addr); + let taker_id = TakerId::default(); + let (read, write) = socket.into_split(); + + let in_taker_actor = in_taker_messages( + read, + cfd_maker_actor_inbox.clone(), + taker_id, + ); + let (out_msg_actor, out_msg_actor_inbox) = + send_wire_message_actor::new::(write); + + tokio::spawn(in_taker_actor); + tokio::spawn(out_msg_actor); + + maker_inc_connections_address.do_send_async( + maker_inc_connections_actor::NewTakerOnline { + taker_id, + out_msg_actor_inbox, + }, + ); + }; + } + } + }); // consecutive wallet syncs handled by task that triggers sync let wallet_sync_interval = Duration::from_secs(10); @@ -161,14 +197,21 @@ async fn main() -> Result<()> { let cfd_actor_inbox = cfd_maker_actor_inbox.clone(); async move { loop { - cfd_actor_inbox.send(Command::SyncWallet).unwrap(); + cfd_actor_inbox + .do_send_async(maker_cfd_actor::SyncWallet) + .await + .unwrap(); tokio::time::sleep(wallet_sync_interval).await; } } }); - tokio::spawn(cfd_maker_actor); - tokio::spawn(connections_actor); + cfd_maker_actor_inbox + .do_send_async(maker_cfd_actor::Initialized( + maker_inc_connections_address.clone(), + )) + .await + .expect("not to fail after actors were initialized"); Ok(rocket.manage(cfd_maker_actor_inbox)) }, diff --git a/daemon/src/maker_cfd_actor.rs b/daemon/src/maker_cfd_actor.rs index efa9afb..f7119d8 100644 --- a/daemon/src/maker_cfd_actor.rs +++ b/daemon/src/maker_cfd_actor.rs @@ -2,281 +2,412 @@ use crate::db::{ insert_cfd, insert_new_cfd_state_by_order_id, insert_order, load_all_cfds, load_cfd_by_order_id, load_order_by_id, }; +use crate::maker_inc_connections_actor::{MakerIncConnectionsActor, TakerCommand}; use crate::model::cfd::{Cfd, CfdState, CfdStateCommon, Dlc, Order, OrderId}; use crate::model::{TakerId, Usd, WalletInfo}; use crate::wallet::Wallet; use crate::wire::SetupMsg; -use crate::{maker_cfd_actor, maker_inc_connections_actor, setup_contract_actor}; +use crate::{maker_inc_connections_actor, setup_contract_actor}; +use anyhow::{Context as AnyhowContext, Result}; +use async_trait::async_trait; use bdk::bitcoin::secp256k1::schnorrsig; -use futures::Future; use std::time::SystemTime; use tokio::sync::{mpsc, watch}; +use xtra::prelude::*; -#[allow(clippy::large_enum_variant)] -#[derive(Debug)] -pub enum Command { - SyncWallet, - TakeOrder { - taker_id: TakerId, - order_id: OrderId, - quantity: Usd, - }, - NewOrder(Order), - StartContractSetup { - taker_id: TakerId, - order_id: OrderId, - }, - NewTakerOnline { - id: TakerId, - }, - IncProtocolMsg(SetupMsg), - CfdSetupCompleted { - order_id: OrderId, - dlc: Dlc, - }, +pub struct Initialized(pub Address); + +impl Message for Initialized { + type Result = Result<()>; +} + +pub struct TakeOrder { + pub taker_id: TakerId, + pub order_id: OrderId, + pub quantity: Usd, +} + +impl Message for TakeOrder { + type Result = Result<()>; } -pub fn new( +pub struct NewOrder(pub Order); + +impl Message for NewOrder { + type Result = Result<()>; +} + +pub struct StartContractSetup { + pub taker_id: TakerId, + pub order_id: OrderId, +} + +impl Message for StartContractSetup { + type Result = Result<()>; +} + +pub struct NewTakerOnline { + pub id: TakerId, +} + +impl Message for NewTakerOnline { + type Result = Result<()>; +} + +pub struct IncProtocolMsg(pub SetupMsg); + +impl Message for IncProtocolMsg { + type Result = Result<()>; +} + +pub struct CfdSetupCompleted { + pub order_id: OrderId, + pub dlc: Dlc, +} + +impl Message for CfdSetupCompleted { + type Result = Result<()>; +} + +pub struct SyncWallet; + +impl Message for SyncWallet { + type Result = Result<()>; +} + +pub struct MakerCfdActor { db: sqlx::SqlitePool, wallet: Wallet, oracle_pk: schnorrsig::PublicKey, - takers: mpsc::UnboundedSender, cfd_feed_actor_inbox: watch::Sender>, order_feed_sender: watch::Sender>, wallet_feed_sender: watch::Sender, -) -> ( - impl Future, - mpsc::UnboundedSender, -) { - let (sender, mut receiver) = mpsc::unbounded_channel(); - + takers: Option>, + current_order_id: Option, + current_contract_setup: Option>, // TODO: Move the contract setup into a dedicated actor and send messages to that actor that // manages the state instead of this ugly buffer - let mut current_contract_setup = None; - let mut contract_setup_message_buffer = vec![]; - - let mut current_order_id = None; - - let actor = { - let sender = sender.clone(); - - async move { - // populate the CFD feed with existing CFDs - let mut conn = db.acquire().await.unwrap(); - cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await.unwrap()) - .unwrap(); - - while let Some(message) = receiver.recv().await { - match message { - maker_cfd_actor::Command::SyncWallet => { - let wallet_info = wallet.sync().await.unwrap(); - wallet_feed_sender.send(wallet_info).unwrap(); - } - maker_cfd_actor::Command::TakeOrder { + contract_setup_message_buffer: Vec, +} + +impl xtra::Actor for MakerCfdActor {} + +impl MakerCfdActor { + pub async fn new( + db: sqlx::SqlitePool, + wallet: Wallet, + oracle_pk: schnorrsig::PublicKey, + cfd_feed_actor_inbox: watch::Sender>, + order_feed_sender: watch::Sender>, + wallet_feed_sender: watch::Sender, + ) -> Self { + let mut conn = db.acquire().await.unwrap(); + + // populate the CFD feed with existing CFDs + cfd_feed_actor_inbox + .send(load_all_cfds(&mut conn).await.unwrap()) + .unwrap(); + + Self { + db, + wallet, + oracle_pk, + cfd_feed_actor_inbox, + order_feed_sender, + wallet_feed_sender, + takers: None, + current_order_id: None, + current_contract_setup: None, + contract_setup_message_buffer: vec![], + } + } + + async fn handle_new_order(&mut self, msg: NewOrder) -> Result<()> { + let order = msg.0; + + // 1. Save to DB + let mut conn = self.db.acquire().await?; + insert_order(&order, &mut conn).await?; + + // 2. Update actor state to current order + self.current_order_id.replace(order.id); + + // 3. Notify UI via feed + self.order_feed_sender.send(Some(order.clone()))?; + + // 4. Inform connected takers + self.takers()? + .do_send_async(maker_inc_connections_actor::BroadcastOrder(Some(order))) + .await?; + Ok(()) + } + + fn takers(&self) -> Result<&Address> { + self.takers + .as_ref() + .context("Maker inc connections actor to be initialised") + } + + async fn handle_start_contract_setup( + &mut self, + msg: StartContractSetup, + ctx: &mut Context, + ) -> Result<()> { + let StartContractSetup { taker_id, order_id } = msg; + + tracing::info!("Starting contract setup"); + + // Kick-off the CFD protocol + let (sk, pk) = crate::keypair::new(&mut rand::thread_rng()); + + let mut conn = self.db.acquire().await?; + + let cfd = load_cfd_by_order_id(order_id, &mut conn).await?; + let margin = cfd.margin()?; + + let maker_params = self.wallet.build_party_params(margin, pk).await?; + + let (actor, inbox) = setup_contract_actor::new( + { + let inbox = self.takers()?.clone(); + move |msg| { + inbox.send(maker_inc_connections_actor::TakerMessage { taker_id, - order_id, - quantity, - } => { - tracing::debug!(%taker_id, %quantity, %order_id, - "Taker wants to take an order" - ); - - let mut conn = db.acquire().await.unwrap(); - - // 1. Validate if order is still valid - let current_order = match current_order_id { - Some(current_order_id) if current_order_id == order_id => { - load_order_by_id(current_order_id, &mut conn).await.unwrap() - } - _ => { - takers - .send(maker_inc_connections_actor::Command::NotifyInvalidOrderId { - id: order_id, - taker_id, - }) - .unwrap(); - continue; - } - }; - - // 2. Insert CFD in DB - // TODO: Don't auto-accept, present to user in UI instead - let cfd = Cfd::new( - current_order.clone(), - quantity, - CfdState::Accepted { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - }, - ); - insert_cfd(cfd, &mut conn).await.unwrap(); - - takers - .send(maker_inc_connections_actor::Command::NotifyOrderAccepted { - id: order_id, - taker_id, - }) - .unwrap(); - cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await.unwrap()) - .unwrap(); - - // 3. Remove current order - current_order_id = None; - takers - .send(maker_inc_connections_actor::Command::BroadcastOrder(None)) - .unwrap(); - order_feed_sender.send(None).unwrap(); - } - maker_cfd_actor::Command::NewOrder(order) => { - // 1. Save to DB - let mut conn = db.acquire().await.unwrap(); - insert_order(&order, &mut conn).await.unwrap(); - - // 2. Update actor state to current order - current_order_id.replace(order.id); - - // 3. Notify UI via feed - order_feed_sender.send(Some(order.clone())).unwrap(); - - // 4. Inform connected takers - takers - .send(maker_inc_connections_actor::Command::BroadcastOrder(Some( - order, - ))) - .unwrap(); - } - maker_cfd_actor::Command::NewTakerOnline { id: taker_id } => { - let mut conn = db.acquire().await.unwrap(); - - let current_order = match current_order_id { - Some(current_order_id) => { - Some(load_order_by_id(current_order_id, &mut conn).await.unwrap()) - } - None => None, - }; - - takers - .send(maker_inc_connections_actor::Command::SendOrder { - order: current_order, - taker_id, - }) - .unwrap(); - } - maker_cfd_actor::Command::StartContractSetup { taker_id, order_id } => { - println!("CONTRACT SETUP"); - - // Kick-off the CFD protocol - let (sk, pk) = crate::keypair::new(&mut rand::thread_rng()); - - let cfd = load_cfd_by_order_id(order_id, &mut conn).await.unwrap(); - let margin = cfd.margin().unwrap(); - - let maker_params = wallet.build_party_params(margin, pk).await.unwrap(); - - let (actor, inbox) = setup_contract_actor::new( - { - let inbox = takers.clone(); - move |msg| { - inbox - .send( - maker_inc_connections_actor::Command::OutProtocolMsg { - taker_id, - msg, - }, - ) - .unwrap() - } - }, - setup_contract_actor::OwnParams::Maker(maker_params), - sk, - oracle_pk, - cfd, - wallet.clone(), - ); - - current_contract_setup = Some(inbox.clone()); - - for msg in contract_setup_message_buffer.drain(..) { - inbox.send(msg).unwrap(); - } - - // TODO: Should we do this here or already earlier or after the spawn? - insert_new_cfd_state_by_order_id( - order_id, - CfdState::ContractSetup { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - }, - &mut conn, - ) - .await - .unwrap(); - cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await.unwrap()) - .unwrap(); - - tokio::spawn({ - let sender = sender.clone(); - - async move { - sender - .send(Command::CfdSetupCompleted { - order_id, - dlc: actor.await, - }) - .unwrap() - } - }); - } - maker_cfd_actor::Command::IncProtocolMsg(msg) => { - let inbox = match ¤t_contract_setup { - None => { - contract_setup_message_buffer.push(msg); - continue; - } - Some(inbox) => inbox, - }; - - inbox.send(msg).unwrap(); - } - maker_cfd_actor::Command::CfdSetupCompleted { order_id, dlc } => { - println!("Setup complete, publishing on chain now..."); - - current_contract_setup = None; - contract_setup_message_buffer = vec![]; - - insert_new_cfd_state_by_order_id( - order_id, - CfdState::PendingOpen { - common: CfdStateCommon { - transition_timestamp: SystemTime::now(), - }, - dlc: dlc.clone(), - }, - &mut conn, - ) - .await - .unwrap(); - - cfd_feed_actor_inbox - .send(load_all_cfds(&mut conn).await.unwrap()) - .unwrap(); - - let txid = wallet.try_broadcast_transaction(dlc.lock).await.unwrap(); - - println!("Lock transaction published with txid {}", txid); - - // TODO: tx monitoring, once confirmed with x blocks transition the Cfd to - // Open - } + command: TakerCommand::OutProtocolMsg { setup_msg: msg }, + }); } + }, + setup_contract_actor::OwnParams::Maker(maker_params), + sk, + self.oracle_pk, + cfd, + self.wallet.clone(), + ); + + self.current_contract_setup.replace(inbox.clone()); + + for msg in self.contract_setup_message_buffer.drain(..) { + inbox.send(msg).unwrap(); + } + + // TODO: Should we do this here or already earlier or after the spawn? + insert_new_cfd_state_by_order_id( + order_id, + CfdState::ContractSetup { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + }, + &mut conn, + ) + .await?; + self.cfd_feed_actor_inbox + .send(load_all_cfds(&mut conn).await?)?; + + let address = ctx + .address() + .expect("actor to be able to give address to itself"); + + tokio::spawn(async move { + address + .do_send_async(CfdSetupCompleted { + order_id, + dlc: actor.await, + }) + .await + }); + + Ok(()) + } + + async fn handle_new_taker_online(&mut self, msg: NewTakerOnline) -> Result<()> { + let mut conn = self.db.acquire().await?; + + let current_order = match self.current_order_id { + Some(current_order_id) => Some(load_order_by_id(current_order_id, &mut conn).await?), + None => None, + }; + + self.takers()? + .do_send_async(maker_inc_connections_actor::TakerMessage { + taker_id: msg.id, + command: TakerCommand::SendOrder { + order: current_order, + }, + }) + .await?; + + Ok(()) + } + + async fn handle_inc_protocol_msg(&mut self, msg: IncProtocolMsg) -> Result<()> { + let msg = msg.0; + let inbox = match &self.current_contract_setup { + None => { + self.contract_setup_message_buffer.push(msg); + return Ok(()); } + Some(inbox) => inbox, + }; + inbox.send(msg)?; + Ok(()) + } + + async fn handle_sync_wallet(&mut self) -> Result<()> { + let wallet_info = self.wallet.sync().await?; + self.wallet_feed_sender.send(wallet_info)?; + Ok(()) + } +} + +#[async_trait] +impl Handler for MakerCfdActor { + async fn handle(&mut self, msg: Initialized, _ctx: &mut Context) -> Result<()> { + self.takers.replace(msg.0); + Ok(()) + } +} + +#[async_trait] +impl Handler for MakerCfdActor { + async fn handle(&mut self, msg: TakeOrder, _ctx: &mut Context) -> Result<()> { + tracing::debug!(%msg.taker_id, %msg.quantity, %msg.order_id, + "Taker wants to take an order" + ); + + let mut conn = self.db.acquire().await?; + + // 1. Validate if order is still valid + let current_order = match self.current_order_id { + Some(current_order_id) if current_order_id == msg.order_id => { + load_order_by_id(current_order_id, &mut conn).await.unwrap() + } + _ => { + self.takers()? + .do_send_async(maker_inc_connections_actor::TakerMessage { + taker_id: msg.taker_id, + command: TakerCommand::NotifyInvalidOrderId { id: msg.order_id }, + }) + .await?; + // TODO: Return an error here? + return Ok(()); + } + }; + + // 2. Insert CFD in DB + // TODO: Don't auto-accept, present to user in UI instead + let cfd = Cfd::new( + current_order.clone(), + msg.quantity, + CfdState::Accepted { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + }, + ); + insert_cfd(cfd, &mut conn).await?; + + self.takers()? + .do_send_async(maker_inc_connections_actor::TakerMessage { + taker_id: msg.taker_id, + command: TakerCommand::NotifyOrderAccepted { id: msg.order_id }, + }) + .await?; + self.cfd_feed_actor_inbox + .send(load_all_cfds(&mut conn).await?)?; + + // 3. Remove current order + self.current_order_id = None; + self.takers()? + .do_send_async(maker_inc_connections_actor::BroadcastOrder(None)) + .await?; + self.order_feed_sender.send(None)?; + + Ok(()) + } +} + +macro_rules! log_error { + ($future:expr) => { + if let Err(e) = $future.await { + tracing::error!(%e); } }; +} + +#[async_trait] +impl Handler for MakerCfdActor { + async fn handle(&mut self, msg: NewOrder, _ctx: &mut Context) -> Result<()> { + log_error!(self.handle_new_order(msg)); + Ok(()) + } +} + +#[async_trait] +impl Handler for MakerCfdActor { + async fn handle(&mut self, msg: StartContractSetup, ctx: &mut Context) -> Result<()> { + log_error!(self.handle_start_contract_setup(msg, ctx)); + Ok(()) + } +} + +#[async_trait] +impl Handler for MakerCfdActor { + async fn handle(&mut self, msg: NewTakerOnline, _ctx: &mut Context) -> Result<()> { + log_error!(self.handle_new_taker_online(msg)); + Ok(()) + } +} + +#[async_trait] +impl Handler for MakerCfdActor { + async fn handle(&mut self, msg: IncProtocolMsg, _ctx: &mut Context) -> Result<()> { + log_error!(self.handle_inc_protocol_msg(msg)); + Ok(()) + } +} + +#[async_trait] +impl Handler for MakerCfdActor { + async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context) -> Result<()> { + let mut conn = self.db.acquire().await?; + + self.current_contract_setup = None; + self.contract_setup_message_buffer = vec![]; + + insert_new_cfd_state_by_order_id( + msg.order_id, + CfdState::PendingOpen { + common: CfdStateCommon { + transition_timestamp: SystemTime::now(), + }, + dlc: msg.dlc.clone(), + }, + &mut conn, + ) + .await?; + + self.cfd_feed_actor_inbox + .send(load_all_cfds(&mut conn).await?)?; + + let txid = self + .wallet + .try_broadcast_transaction(msg.dlc.lock) + .await + .unwrap(); + + tracing::info!("Lock transaction published with txid {}", txid); + + // TODO: tx monitoring, once confirmed with x blocks transition the Cfd to + // Open + Ok(()) + } +} - (actor, sender) +#[async_trait] +impl Handler for MakerCfdActor { + async fn handle(&mut self, _msg: SyncWallet, _ctx: &mut Context) -> Result<()> { + log_error!(self.handle_sync_wallet()); + Ok(()) + } } diff --git a/daemon/src/maker_inc_connections_actor.rs b/daemon/src/maker_inc_connections_actor.rs index 2470284..bc19f13 100644 --- a/daemon/src/maker_inc_connections_actor.rs +++ b/daemon/src/maker_inc_connections_actor.rs @@ -1,95 +1,150 @@ +use crate::maker_cfd_actor::MakerCfdActor; use crate::model::cfd::{Order, OrderId}; use crate::model::TakerId; use crate::wire::SetupMsg; -use crate::{maker_cfd_actor, maker_inc_connections_actor, send_wire_message_actor, wire}; +use crate::{maker_cfd_actor, wire}; +use anyhow::{Context as AnyhowContext, Result}; use futures::{Future, StreamExt}; use std::collections::HashMap; use tokio::net::tcp::OwnedReadHalf; -use tokio::net::TcpListener; use tokio::sync::mpsc; use tokio_util::codec::{FramedRead, LengthDelimitedCodec}; +use async_trait::async_trait; +use xtra::prelude::*; + +type MakerToTakerSender = mpsc::UnboundedSender; + +pub struct BroadcastOrder(pub Option); + +impl Message for BroadcastOrder { + type Result = Result<()>; +} + #[allow(clippy::large_enum_variant)] -#[derive(Debug)] -pub enum Command { - BroadcastOrder(Option), - SendOrder { - order: Option, - taker_id: TakerId, - }, - NotifyInvalidOrderId { - id: OrderId, - taker_id: TakerId, - }, - NotifyOrderAccepted { - id: OrderId, - taker_id: TakerId, - }, - OutProtocolMsg { - taker_id: TakerId, - msg: SetupMsg, - }, +pub enum TakerCommand { + SendOrder { order: Option }, + NotifyInvalidOrderId { id: OrderId }, + NotifyOrderAccepted { id: OrderId }, + OutProtocolMsg { setup_msg: SetupMsg }, } -pub fn new( - listener: TcpListener, - cfd_maker_actor_inbox: mpsc::UnboundedSender, - mut our_inbox: mpsc::UnboundedReceiver, -) -> impl Future { - let mut write_connections = - HashMap::>::new(); +pub struct TakerMessage { + pub taker_id: TakerId, + pub command: TakerCommand, +} - async move { - loop { - tokio::select! { - Ok((socket, remote_addr)) = listener.accept() => { - tracing::info!("Connected to {}", remote_addr); - let taker_id = TakerId::default(); - let (read, write) = socket.into_split(); - - let in_taker_actor = in_taker_messages(read, cfd_maker_actor_inbox.clone(), taker_id); - let (out_message_actor, out_message_actor_inbox) = send_wire_message_actor::new(write); - - tokio::spawn(in_taker_actor); - tokio::spawn(out_message_actor); - - cfd_maker_actor_inbox.send(maker_cfd_actor::Command::NewTakerOnline{id : taker_id}).unwrap(); - - write_connections.insert(taker_id, out_message_actor_inbox); - }, - Some(message) = our_inbox.recv() => { - match message { - maker_inc_connections_actor::Command::NotifyInvalidOrderId { id, taker_id } => { - let conn = write_connections.get(&taker_id).expect("no connection to taker_id"); - conn.send(wire::MakerToTaker::InvalidOrderId(id)).unwrap(); - }, - maker_inc_connections_actor::Command::BroadcastOrder(order) => { - for conn in write_connections.values() { - conn.send(wire::MakerToTaker::CurrentOrder(order.clone())).unwrap(); - } - }, - maker_inc_connections_actor::Command::SendOrder {order, taker_id} => { - let conn = write_connections.get(&taker_id).expect("no connection to taker_id"); - conn.send(wire::MakerToTaker::CurrentOrder(order)).unwrap(); - }, - maker_inc_connections_actor::Command::NotifyOrderAccepted { id, taker_id } => { - let conn = write_connections.get(&taker_id).expect("no connection to taker_id"); - conn.send(wire::MakerToTaker::ConfirmTakeOrder(id)).unwrap(); - }, - maker_inc_connections_actor::Command::OutProtocolMsg { taker_id, msg } => { - let conn = write_connections.get(&taker_id).expect("no connection to taker_id"); - conn.send(wire::MakerToTaker::Protocol(msg)).unwrap(); - } - } - } +impl Message for TakerMessage { + type Result = Result<()>; +} + +pub struct NewTakerOnline { + pub taker_id: TakerId, + pub out_msg_actor_inbox: MakerToTakerSender, +} + +impl Message for NewTakerOnline { + type Result = Result<()>; +} + +pub struct MakerIncConnectionsActor { + write_connections: HashMap, + cfd_maker_actor_address: Address, +} + +impl Actor for MakerIncConnectionsActor {} + +impl MakerIncConnectionsActor { + pub fn new(cfd_maker_actor_address: Address) -> Self { + Self { + write_connections: HashMap::::new(), + cfd_maker_actor_address, + } + } + + fn send_to_taker(&self, taker_id: TakerId, msg: wire::MakerToTaker) -> Result<()> { + let conn = self + .write_connections + .get(&taker_id) + .context("no connection to taker_id")?; + conn.send(msg)?; + Ok(()) + } + + async fn handle_broadcast_order(&mut self, msg: BroadcastOrder) -> Result<()> { + let order = msg.0; + self.write_connections + .values() + .try_for_each(|conn| conn.send(wire::MakerToTaker::CurrentOrder(order.clone())))?; + Ok(()) + } + + async fn handle_taker_message(&mut self, msg: TakerMessage) -> Result<()> { + match msg.command { + TakerCommand::SendOrder { order } => { + self.send_to_taker(msg.taker_id, wire::MakerToTaker::CurrentOrder(order))?; + } + TakerCommand::NotifyInvalidOrderId { id } => { + self.send_to_taker(msg.taker_id, wire::MakerToTaker::InvalidOrderId(id))?; + } + TakerCommand::NotifyOrderAccepted { id } => { + self.send_to_taker(msg.taker_id, wire::MakerToTaker::ConfirmTakeOrder(id))?; + } + TakerCommand::OutProtocolMsg { setup_msg } => { + self.send_to_taker(msg.taker_id, wire::MakerToTaker::Protocol(setup_msg))?; } } + Ok(()) + } + + async fn handle_new_taker_online(&mut self, msg: NewTakerOnline) -> Result<()> { + self.cfd_maker_actor_address + .do_send_async(maker_cfd_actor::NewTakerOnline { id: msg.taker_id }) + .await?; + + self.write_connections + .insert(msg.taker_id, msg.out_msg_actor_inbox); + Ok(()) } } -fn in_taker_messages( +macro_rules! log_error { + ($future:expr) => { + if let Err(e) = $future.await { + tracing::error!(%e); + } + }; +} + +#[async_trait] +impl Handler for MakerIncConnectionsActor { + async fn handle(&mut self, msg: BroadcastOrder, _ctx: &mut Context) -> Result<()> { + log_error!(self.handle_broadcast_order(msg)); + Ok(()) + } +} + +#[async_trait] +impl Handler for MakerIncConnectionsActor { + async fn handle(&mut self, msg: TakerMessage, _ctx: &mut Context) -> Result<()> { + log_error!(self.handle_taker_message(msg)); + Ok(()) + } +} + +#[async_trait] +impl Handler for MakerIncConnectionsActor { + async fn handle(&mut self, msg: NewTakerOnline, _ctx: &mut Context) -> Result<()> { + log_error!(self.handle_new_taker_online(msg)); + Ok(()) + } +} + +// + +pub fn in_taker_messages( read: OwnedReadHalf, - cfd_actor_inbox: mpsc::UnboundedSender, + cfd_actor_inbox: Address, taker_id: TakerId, ) -> impl Future { let mut messages = FramedRead::new(read, LengthDelimitedCodec::new()).map(|result| { @@ -100,19 +155,28 @@ fn in_taker_messages( async move { while let Some(message) = messages.next().await { match message { - Ok(wire::TakerToMaker::TakeOrder { order_id, quantity }) => cfd_actor_inbox - .send(maker_cfd_actor::Command::TakeOrder { - taker_id, - order_id, - quantity, - }) - .unwrap(), - Ok(wire::TakerToMaker::StartContractSetup(order_id)) => cfd_actor_inbox - .send(maker_cfd_actor::Command::StartContractSetup { taker_id, order_id }) - .unwrap(), - Ok(wire::TakerToMaker::Protocol(msg)) => cfd_actor_inbox - .send(maker_cfd_actor::Command::IncProtocolMsg(msg)) - .unwrap(), + Ok(wire::TakerToMaker::TakeOrder { order_id, quantity }) => { + cfd_actor_inbox + .do_send_async(maker_cfd_actor::TakeOrder { + taker_id, + order_id, + quantity, + }) + .await + .unwrap(); + } + Ok(wire::TakerToMaker::StartContractSetup(order_id)) => { + cfd_actor_inbox + .do_send_async(maker_cfd_actor::StartContractSetup { taker_id, order_id }) + .await + .unwrap(); + } + Ok(wire::TakerToMaker::Protocol(msg)) => { + cfd_actor_inbox + .do_send_async(maker_cfd_actor::IncProtocolMsg(msg)) + .await + .unwrap(); + } Err(error) => { tracing::error!(%error, "Error in reading message"); } diff --git a/daemon/src/routes_maker.rs b/daemon/src/routes_maker.rs index 4f0a872..c8bc33a 100644 --- a/daemon/src/routes_maker.rs +++ b/daemon/src/routes_maker.rs @@ -1,5 +1,5 @@ use crate::auth::Authenticated; -use crate::maker_cfd_actor; +use crate::maker_cfd_actor::{self, MakerCfdActor}; use crate::model::cfd::{Cfd, Order, Origin}; use crate::model::{Usd, WalletInfo}; use crate::routes::EmbeddedFileExt; @@ -15,7 +15,8 @@ use serde::Deserialize; use std::borrow::Cow; use std::path::PathBuf; use tokio::select; -use tokio::sync::{mpsc, watch}; +use tokio::sync::watch; +use xtra::Address; #[rocket::get("/feed")] pub async fn maker_feed( @@ -71,7 +72,7 @@ pub struct CfdNewOrderRequest { #[rocket::post("/order/sell", data = "")] pub async fn post_sell_order( order: Json, - cfd_actor_inbox: &State>, + cfd_actor_address: &State>, _auth: Authenticated, ) -> Result, status::BadRequest> { let order = Order::from_default_with_price(order.price, Origin::Ours) @@ -79,8 +80,9 @@ pub async fn post_sell_order( .with_min_quantity(order.min_quantity) .with_max_quantity(order.max_quantity); - cfd_actor_inbox - .send(maker_cfd_actor::Command::NewOrder(order)) + cfd_actor_address + .do_send_async(maker_cfd_actor::NewOrder(order)) + .await .expect("actor to always be available"); Ok(status::Accepted(None))