Browse Source

Integrate CFD protocol into the maker and taker

- Add wallet to maker and taker.

  For now we have different, static descriptors with a hardcoded DB
  path. We also only operate on testnet and use sled as a database.

- Associate each CET with a unique nonce_pk

  Eventually a set of `(message, nonce_pk)`.

- Share transactions with maker

- Add serialisation support to adaptor signature

Co-authored-by: Mariusz Klochowicz <mariusz@klochowicz.com>
Co-authored-by: Lucas Soriano del Pino <l.soriano.del.pino@gmail.com>
verify-transactions
Thomas Eizinger 3 years ago
committed by Mariusz Klochowicz
parent
commit
d459af14d1
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 73
      Cargo.lock
  2. 22
      cfd_protocol/src/lib.rs
  3. 26
      cfd_protocol/tests/cfds.rs
  4. 3
      daemon/Cargo.toml
  5. 9
      daemon/src/db.rs
  6. 13
      daemon/src/keypair.rs
  7. 83
      daemon/src/maker.rs
  8. 375
      daemon/src/maker_cfd_actor.rs
  9. 17
      daemon/src/maker_inc_connections_actor.rs
  10. 33
      daemon/src/model/cfd.rs
  11. 4
      daemon/src/routes_maker.rs
  12. 77
      daemon/src/taker.rs
  13. 272
      daemon/src/taker_cfd_actor.rs
  14. 5
      daemon/src/taker_inc_message_actor.rs
  15. 133
      daemon/src/wire.rs
  16. 4
      dprint.json

73
Cargo.lock

@ -371,13 +371,16 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bdk", "bdk",
"cfd_protocol",
"futures", "futures",
"rand 0.6.5",
"rocket", "rocket",
"rocket_db_pools", "rocket_db_pools",
"rust_decimal", "rust_decimal",
"rust_decimal_macros", "rust_decimal_macros",
"serde", "serde",
"serde_json", "serde_json",
"serde_with",
"sqlx", "sqlx",
"tempfile", "tempfile",
"tokio", "tokio",
@ -385,6 +388,41 @@ dependencies = [
"uuid", "uuid",
] ]
[[package]]
name = "darling"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "757c0ded2af11d8e739c4daea1ac623dd1624b06c844cf3f5a39f1bdbd99bb12"
dependencies = [
"darling_core",
"darling_macro",
]
[[package]]
name = "darling_core"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c34d8efb62d0c2d7f60ece80f75e5c63c1588ba68032740494b0b9a996466e3"
dependencies = [
"fnv",
"ident_case",
"proc-macro2",
"quote",
"strsim",
"syn",
]
[[package]]
name = "darling_macro"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ade7bff147130fe5e6d39f089c6bd49ec0250f35d70b2eebf72afdfc919f15cc"
dependencies = [
"darling_core",
"quote",
"syn",
]
[[package]] [[package]]
name = "devise" name = "devise"
version = "0.3.1" version = "0.3.1"
@ -799,6 +837,12 @@ dependencies = [
"want", "want",
] ]
[[package]]
name = "ident_case"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
[[package]] [[package]]
name = "idna" name = "idna"
version = "0.2.3" version = "0.2.3"
@ -1697,6 +1741,29 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "serde_with"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "062b87e45d8f26714eacfaef0ed9a583e2bfd50ebd96bdd3c200733bd5758e2c"
dependencies = [
"rustversion",
"serde",
"serde_with_macros",
]
[[package]]
name = "serde_with_macros"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "98c1fcca18d55d1763e1c16873c4bde0ac3ef75179a28c7b372917e0494625be"
dependencies = [
"darling",
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "sha2" name = "sha2"
version = "0.9.8" version = "0.9.8"
@ -1911,6 +1978,12 @@ dependencies = [
"unicode-normalization", "unicode-normalization",
] ]
[[package]]
name = "strsim"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]] [[package]]
name = "syn" name = "syn"
version = "1.0.76" version = "1.0.76"

22
cfd_protocol/src/lib.rs

@ -1,3 +1,5 @@
pub use secp256k1_zkp::EcdsaAdaptorSignature;
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use bdk::bitcoin::hashes::hex::ToHex; use bdk::bitcoin::hashes::hex::ToHex;
use bdk::bitcoin::hashes::*; use bdk::bitcoin::hashes::*;
@ -14,7 +16,7 @@ use bdk::wallet::AddressIndex;
use bdk::FeeRate; use bdk::FeeRate;
use itertools::Itertools; use itertools::Itertools;
use secp256k1_zkp::bitcoin_hashes::sha256; use secp256k1_zkp::bitcoin_hashes::sha256;
use secp256k1_zkp::{self, schnorrsig, EcdsaAdaptorSignature, SecretKey, Signature, SECP256K1}; use secp256k1_zkp::{self, schnorrsig, SecretKey, Signature, SECP256K1};
use std::collections::HashMap; use std::collections::HashMap;
use std::iter::FromIterator; use std::iter::FromIterator;
@ -213,7 +215,7 @@ fn build_cfds(
let encsig = cet.encsign(identity_sk, &oracle_pk, &nonce_pk)?; let encsig = cet.encsign(identity_sk, &oracle_pk, &nonce_pk)?;
Ok((cet.inner, encsig, message)) Ok((cet.inner, encsig, message, nonce_pk))
}) })
.collect::<Result<Vec<_>>>() .collect::<Result<Vec<_>>>()
.context("cannot build and sign all cets")?; .context("cannot build and sign all cets")?;
@ -255,7 +257,9 @@ pub fn commit_descriptor(
let taker_publish_pk_hash = taker_publish_pk.pubkey_hash().as_hash(); let taker_publish_pk_hash = taker_publish_pk.pubkey_hash().as_hash();
let taker_rev_pk_hash = taker_rev_pk.pubkey_hash().as_hash(); let taker_rev_pk_hash = taker_rev_pk.pubkey_hash().as_hash();
// raw script: or(and(pk(maker_own_pk),pk(taker_own_pk)),or(and(pk(maker_own_pk),and(pk(taker_publish_pk),pk(taker_rev_pk))),and(pk(taker_own_pk),and(pk(maker_publish_pk),pk(maker_rev_pk))))) // raw script:
// or(and(pk(maker_own_pk),pk(taker_own_pk)),or(and(pk(maker_own_pk),and(pk(taker_publish_pk),
// pk(taker_rev_pk))),and(pk(taker_own_pk),and(pk(maker_publish_pk),pk(maker_rev_pk)))))
let full_script = format!("wsh(c:andor(pk({maker_own_pk}),pk_k({taker_own_pk}),or_i(and_v(v:pkh({maker_own_pk_hash}),and_v(v:pkh({taker_publish_pk_hash}),pk_h({taker_rev_pk_hash}))),and_v(v:pkh({taker_own_pk_hash}),and_v(v:pkh({maker_publish_pk_hash}),pk_h({maker_rev_pk_hash}))))))", let full_script = format!("wsh(c:andor(pk({maker_own_pk}),pk_k({taker_own_pk}),or_i(and_v(v:pkh({maker_own_pk_hash}),and_v(v:pkh({taker_publish_pk_hash}),pk_h({taker_rev_pk_hash}))),and_v(v:pkh({taker_own_pk_hash}),and_v(v:pkh({maker_publish_pk_hash}),pk_h({maker_rev_pk_hash}))))))",
maker_own_pk = maker_own_pk, maker_own_pk = maker_own_pk,
taker_own_pk = taker_own_pk, taker_own_pk = taker_own_pk,
@ -416,15 +420,22 @@ pub struct PartyParams {
pub address: Address, pub address: Address,
} }
#[derive(Debug, Copy, Clone)]
pub struct PunishParams { pub struct PunishParams {
pub revocation_pk: PublicKey, pub revocation_pk: PublicKey,
pub publish_pk: PublicKey, pub publish_pk: PublicKey,
} }
#[derive(Debug, Clone)]
pub struct CfdTransactions { pub struct CfdTransactions {
pub lock: PartiallySignedTransaction, pub lock: PartiallySignedTransaction,
pub commit: (Transaction, EcdsaAdaptorSignature), pub commit: (Transaction, EcdsaAdaptorSignature),
pub cets: Vec<(Transaction, EcdsaAdaptorSignature, Vec<u8>)>, pub cets: Vec<(
Transaction,
EcdsaAdaptorSignature,
Vec<u8>,
schnorrsig::PublicKey,
)>,
pub refund: (Transaction, Signature), pub refund: (Transaction, Signature),
} }
@ -529,7 +540,8 @@ sha256t_hash_newtype!(
true true
); );
/// Compute a signature point for the given oracle public key, announcement nonce public key and message. /// Compute a signature point for the given oracle public key, announcement nonce public key and
/// message.
pub fn compute_signature_point( pub fn compute_signature_point(
oracle_pk: &schnorrsig::PublicKey, oracle_pk: &schnorrsig::PublicKey,
nonce_pk: &schnorrsig::PublicKey, nonce_pk: &schnorrsig::PublicKey,

26
cfd_protocol/tests/cfds.rs

@ -68,7 +68,7 @@ fn create_cfd() {
verify_cfd_sigs( verify_cfd_sigs(
(&maker_cfd_txs, maker.pk, maker.pub_pk), (&maker_cfd_txs, maker.pk, maker.pub_pk),
(&taker_cfd_txs, taker.pk, taker.pub_pk), (&taker_cfd_txs, taker.pk, taker.pub_pk),
(oracle.public_key(), announcement.nonce_pk()), oracle.public_key(),
(&lock_desc, lock_amount), (&lock_desc, lock_amount),
(&commit_desc, commit_amount), (&commit_desc, commit_amount),
); );
@ -230,7 +230,7 @@ fn renew_cfd() {
verify_cfd_sigs( verify_cfd_sigs(
(&maker_cfd_txs, maker.pk, maker_pub_pk), (&maker_cfd_txs, maker.pk, maker_pub_pk),
(&taker_cfd_txs, taker.pk, taker_pub_pk), (&taker_cfd_txs, taker.pk, taker_pub_pk),
(oracle.public_key(), announcement.nonce_pk()), oracle.public_key(),
(&lock_desc, lock_amount), (&lock_desc, lock_amount),
(&commit_desc, commit_amount), (&commit_desc, commit_amount),
); );
@ -371,7 +371,7 @@ struct CfdKeys {
fn verify_cfd_sigs( fn verify_cfd_sigs(
(maker_cfd_txs, maker_pk, maker_publish_pk): (&CfdTransactions, PublicKey, PublicKey), (maker_cfd_txs, maker_pk, maker_publish_pk): (&CfdTransactions, PublicKey, PublicKey),
(taker_cfd_txs, taker_pk, taker_publish_pk): (&CfdTransactions, PublicKey, PublicKey), (taker_cfd_txs, taker_pk, taker_publish_pk): (&CfdTransactions, PublicKey, PublicKey),
(oracle_pk, nonce_pk): (schnorrsig::PublicKey, schnorrsig::PublicKey), oracle_pk: schnorrsig::PublicKey,
(lock_desc, lock_amount): (&Descriptor<PublicKey>, Amount), (lock_desc, lock_amount): (&Descriptor<PublicKey>, Amount),
(commit_desc, commit_amount): (&Descriptor<PublicKey>, Amount), (commit_desc, commit_amount): (&Descriptor<PublicKey>, Amount),
) { ) {
@ -391,11 +391,11 @@ fn verify_cfd_sigs(
&taker_pk.key, &taker_pk.key,
) )
.expect("valid taker refund sig"); .expect("valid taker refund sig");
for (tx, _, msg) in taker_cfd_txs.cets.iter() { for (tx, _, msg, nonce_pk) in taker_cfd_txs.cets.iter() {
let maker_encsig = maker_cfd_txs let maker_encsig = maker_cfd_txs
.cets .cets
.iter() .iter()
.find_map(|(maker_tx, encsig, _)| (maker_tx.txid() == tx.txid()).then(|| encsig)) .find_map(|(maker_tx, encsig, _, _)| (maker_tx.txid() == tx.txid()).then(|| encsig))
.expect("one encsig per cet, per party"); .expect("one encsig per cet, per party");
verify_cet_encsig( verify_cet_encsig(
@ -403,17 +403,17 @@ fn verify_cfd_sigs(
maker_encsig, maker_encsig,
msg, msg,
&maker_pk.key, &maker_pk.key,
(&oracle_pk, &nonce_pk), (&oracle_pk, nonce_pk),
commit_desc, commit_desc,
commit_amount, commit_amount,
) )
.expect("valid maker cet encsig") .expect("valid maker cet encsig")
} }
for (tx, _, msg) in maker_cfd_txs.cets.iter() { for (tx, _, msg, nonce_pk) in maker_cfd_txs.cets.iter() {
let taker_encsig = taker_cfd_txs let taker_encsig = taker_cfd_txs
.cets .cets
.iter() .iter()
.find_map(|(taker_tx, encsig, _)| (taker_tx.txid() == tx.txid()).then(|| encsig)) .find_map(|(taker_tx, encsig, _, _)| (taker_tx.txid() == tx.txid()).then(|| encsig))
.expect("one encsig per cet, per party"); .expect("one encsig per cet, per party");
verify_cet_encsig( verify_cet_encsig(
@ -421,7 +421,7 @@ fn verify_cfd_sigs(
taker_encsig, taker_encsig,
msg, msg,
&taker_pk.key, &taker_pk.key,
(&oracle_pk, &nonce_pk), (&oracle_pk, nonce_pk),
commit_desc, commit_desc,
commit_amount, commit_amount,
) )
@ -533,28 +533,28 @@ fn check_cfd_txs(
// CETs: // CETs:
for (tx, _, msg) in maker_cfd_txs.cets.clone().into_iter() { for (tx, _, msg, _) in maker_cfd_txs.cets.clone().into_iter() {
build_and_check_cet( build_and_check_cet(
tx, tx,
&oracle.attest(&event, &msg), &oracle.attest(&event, &msg),
taker_cfd_txs taker_cfd_txs
.cets .cets
.iter() .iter()
.map(|(tx, encsig, _)| (tx.txid(), *encsig)), .map(|(tx, encsig, _, _)| (tx.txid(), *encsig)),
(&maker_sk, &maker_pk), (&maker_sk, &maker_pk),
&taker_pk, &taker_pk,
(&signed_commit_tx_maker, &commit_desc, commit_amount), (&signed_commit_tx_maker, &commit_desc, commit_amount),
) )
.expect("valid maker cet"); .expect("valid maker cet");
} }
for (tx, _, msg) in taker_cfd_txs.cets.into_iter() { for (tx, _, msg, _) in taker_cfd_txs.cets.into_iter() {
build_and_check_cet( build_and_check_cet(
tx, tx,
&oracle.attest(&event, &msg), &oracle.attest(&event, &msg),
maker_cfd_txs maker_cfd_txs
.cets .cets
.iter() .iter()
.map(|(tx, encsig, _)| (tx.txid(), *encsig)), .map(|(tx, encsig, _, _)| (tx.txid(), *encsig)),
(&taker_sk, &taker_pk), (&taker_sk, &taker_pk),
&maker_pk, &maker_pk,
(&signed_commit_tx_maker, &commit_desc, commit_amount), (&signed_commit_tx_maker, &commit_desc, commit_amount),

3
daemon/Cargo.toml

@ -6,13 +6,16 @@ edition = "2018"
[dependencies] [dependencies]
anyhow = "1" anyhow = "1"
bdk = { git = "https://github.com/bitcoindevkit/bdk/" } bdk = { git = "https://github.com/bitcoindevkit/bdk/" }
cfd_protocol = { path = "../cfd_protocol" }
futures = { version = "0.3", default-features = false } futures = { version = "0.3", default-features = false }
rand = "0.6"
rocket = { git = "https://github.com/SergioBenitez/Rocket", features = ["json"] } rocket = { git = "https://github.com/SergioBenitez/Rocket", features = ["json"] }
rocket_db_pools = { git = "https://github.com/SergioBenitez/Rocket", features = ["sqlx_sqlite"] } rocket_db_pools = { git = "https://github.com/SergioBenitez/Rocket", features = ["sqlx_sqlite"] }
rust_decimal = { version = "1.15", features = ["serde-float", "serde-arbitrary-precision"] } rust_decimal = { version = "1.15", features = ["serde-float", "serde-arbitrary-precision"] }
rust_decimal_macros = "1.15" rust_decimal_macros = "1.15"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_json = "1" serde_json = "1"
serde_with = { version = "1", features = ["macros"] }
sqlx = { version = "0.5", features = ["offline"] } sqlx = { version = "0.5", features = ["offline"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "net"] } tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "net"] }
tokio-util = { version = "0.6", features = ["codec"] } tokio-util = { version = "0.6", features = ["codec"] }

9
daemon/src/db.rs

@ -118,7 +118,8 @@ pub async fn insert_cfd(cfd: Cfd, conn: &mut PoolConnection<Sqlite>) -> anyhow::
let cfd_state = serde_json::to_string(&cfd.state)?; let cfd_state = serde_json::to_string(&cfd.state)?;
// save cfd + state in a transaction to make sure the state is only inserted if the cfd was inserted // save cfd + state in a transaction to make sure the state is only inserted if the cfd was
// inserted
let cfd_id = sqlx::query!( let cfd_id = sqlx::query!(
r#" r#"
@ -165,7 +166,8 @@ pub async fn insert_new_cfd_state_by_offer_id(
.await .await
.context("loading latest state failed")?; .context("loading latest state failed")?;
// make sure that the new state is different than the current one to avoid that we save the same state twice // make sure that the new state is different than the current one to avoid that we save the same
// state twice
if mem::discriminant(&latest_cfd_state_in_db) == mem::discriminant(&new_state) { if mem::discriminant(&latest_cfd_state_in_db) == mem::discriminant(&new_state) {
anyhow::bail!("Cannot insert new state {} for cfd with order_id {} because it currently already is in state {}", new_state, offer_id, latest_cfd_state_in_db); anyhow::bail!("Cannot insert new state {} for cfd with order_id {} because it currently already is in state {}", new_state, offer_id, latest_cfd_state_in_db);
} }
@ -267,7 +269,8 @@ pub async fn load_all_cfds(conn: &mut PoolConnection<Sqlite>) -> anyhow::Result<
.fetch_all(conn) .fetch_all(conn)
.await?; .await?;
// TODO: We might want to separate the database model from the http model and properly map between them // TODO: We might want to separate the database model from the http model and properly map
// between them
let cfds = rows let cfds = rows
.iter() .iter()

13
daemon/src/keypair.rs

@ -0,0 +1,13 @@
use bdk::bitcoin;
use bdk::bitcoin::secp256k1::{self, SECP256K1};
use rand::{CryptoRng, RngCore};
pub fn new<R>(rng: &mut R) -> (secp256k1::SecretKey, bitcoin::PublicKey)
where
R: RngCore + CryptoRng,
{
let sk = secp256k1::SecretKey::new(rng);
let pk = bitcoin::PublicKey::new(secp256k1::PublicKey::from_secret_key(SECP256K1, &sk));
(sk, pk)
}

83
daemon/src/maker.rs

@ -1,5 +1,7 @@
use anyhow::Result; use anyhow::Result;
use bdk::bitcoin::Amount; use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1};
use bdk::bitcoin::{self, Amount};
use bdk::blockchain::{ElectrumBlockchain, NoopProgress};
use model::cfd::{Cfd, CfdOffer}; use model::cfd::{Cfd, CfdOffer};
use rocket::fairing::AdHoc; use rocket::fairing::AdHoc;
use rocket::figment::util::map; use rocket::figment::util::map;
@ -8,6 +10,7 @@ use rocket_db_pools::Database;
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
mod db; mod db;
mod keypair;
mod maker_cfd_actor; mod maker_cfd_actor;
mod maker_inc_connections_actor; mod maker_inc_connections_actor;
mod model; mod model;
@ -22,9 +25,28 @@ pub struct Db(sqlx::SqlitePool);
#[rocket::main] #[rocket::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
let client =
bdk::electrum_client::Client::new("ssl://electrum.blockstream.info:60002").unwrap();
// TODO: Replace with sqlite once https://github.com/bitcoindevkit/bdk/pull/376 is merged.
let db = bdk::sled::open("/tmp/maker.db")?;
let wallet_db = db.open_tree("wallet")?;
let wallet = bdk::Wallet::new(
"wpkh(tprv8ZgxMBicQKsPd95j7aKDzWZw9Z2SiLxpz5J5iFUdqFf1unqtoonSTteF1ZSrrB831BY1eufyHehediNH76DvcDSS2JDDyDXCQKJbyd7ozVf/*)#3vkm30lf",
None,
bitcoin::Network::Testnet,
wallet_db,
ElectrumBlockchain::from(client),
)
.unwrap();
wallet.sync(NoopProgress, None).unwrap(); // TODO: Use LogProgress once we have logging.
let oracle = schnorrsig::KeyPair::new(SECP256K1, &mut rand::thread_rng()); // TODO: Fetch oracle public key from oracle.
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::<Vec<Cfd>>(vec![]); let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::<Vec<Cfd>>(vec![]);
let (offer_feed_sender, offer_feed_receiver) = watch::channel::<Option<CfdOffer>>(None); let (offer_feed_sender, offer_feed_receiver) = watch::channel::<Option<CfdOffer>>(None);
let (_balance_feed_sender, balance_feed_receiver) = watch::channel::<Amount>(Amount::ONE_BTC); let (_balance_feed_sender, balance_feed_receiver) = watch::channel::<Amount>(Amount::ZERO);
let db: Map<_, Value> = map! { let db: Map<_, Value> = map! {
"url" => "./maker.sqlite".into(), "url" => "./maker.sqlite".into(),
@ -56,32 +78,37 @@ async fn main() -> Result<()> {
} }
}, },
)) ))
.attach(AdHoc::try_on_ignite("Create actors", |rocket| async move { .attach(AdHoc::try_on_ignite(
let db = match Db::fetch(&rocket) { "Create actors",
Some(db) => (**db).clone(), move |rocket| async move {
None => return Err(rocket), let db = match Db::fetch(&rocket) {
}; Some(db) => (**db).clone(),
None => return Err(rocket),
let (connections_actor_inbox_sender, connections_actor_inbox_recv) = };
mpsc::unbounded_channel();
let (connections_actor_inbox_sender, connections_actor_inbox_recv) =
let (cfd_maker_actor, cfd_maker_actor_inbox) = maker_cfd_actor::new( mpsc::unbounded_channel();
db,
connections_actor_inbox_sender, let (cfd_maker_actor, cfd_maker_actor_inbox) = maker_cfd_actor::new(
cfd_feed_sender, db,
offer_feed_sender, wallet,
); schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle),
let connections_actor = maker_inc_connections_actor::new( connections_actor_inbox_sender,
listener, cfd_feed_sender,
cfd_maker_actor_inbox.clone(), offer_feed_sender,
connections_actor_inbox_recv, );
); let connections_actor = maker_inc_connections_actor::new(
listener,
tokio::spawn(cfd_maker_actor); cfd_maker_actor_inbox.clone(),
tokio::spawn(connections_actor); connections_actor_inbox_recv,
);
Ok(rocket.manage(cfd_maker_actor_inbox))
})) tokio::spawn(cfd_maker_actor);
tokio::spawn(connections_actor);
Ok(rocket.manage(cfd_maker_actor_inbox))
},
))
.mount( .mount(
"/", "/",
rocket::routes![ rocket::routes![

375
daemon/src/maker_cfd_actor.rs

@ -1,12 +1,22 @@
use std::collections::HashMap;
use std::time::SystemTime; use std::time::SystemTime;
use crate::model::cfd::{Cfd, CfdOffer, CfdOfferId, CfdState, CfdStateCommon}; use crate::model::cfd::{Cfd, CfdOffer, CfdOfferId, CfdState, CfdStateCommon, FinalizedCfd};
use crate::model::{TakerId, Usd}; use crate::model::{TakerId, Usd};
use crate::wire::{Msg0, Msg1, SetupMsg};
use crate::{db, maker_cfd_actor, maker_inc_connections_actor}; use crate::{db, maker_cfd_actor, maker_inc_connections_actor};
use bdk::bitcoin::secp256k1::{schnorrsig, SecretKey};
use bdk::bitcoin::{self, Amount};
use bdk::database::BatchDatabase;
use cfd_protocol::{
commit_descriptor, create_cfd_transactions, lock_descriptor, PartyParams, PunishParams,
WalletExt,
};
use futures::Future; use futures::Future;
use rust_decimal_macros::dec; use rust_decimal_macros::dec;
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
#[allow(clippy::large_enum_variant)]
#[derive(Debug)] #[derive(Debug)]
pub enum Command { pub enum Command {
TakeOffer { TakeOffer {
@ -15,133 +25,202 @@ pub enum Command {
quantity: Usd, quantity: Usd,
}, },
NewOffer(CfdOffer), NewOffer(CfdOffer),
StartContractSetup {
taker_id: TakerId,
offer_id: CfdOfferId,
},
NewTakerOnline { NewTakerOnline {
id: TakerId, id: TakerId,
}, },
IncProtocolMsg(SetupMsg),
CfdSetupCompleted(FinalizedCfd),
} }
pub fn new( pub fn new<B, D>(
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: bdk::Wallet<B, D>,
oracle_pk: schnorrsig::PublicKey,
takers: mpsc::UnboundedSender<maker_inc_connections_actor::Command>, takers: mpsc::UnboundedSender<maker_inc_connections_actor::Command>,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
offer_feed_sender: watch::Sender<Option<CfdOffer>>, offer_feed_sender: watch::Sender<Option<CfdOffer>>,
) -> ( ) -> (
impl Future<Output = ()>, impl Future<Output = ()>,
mpsc::UnboundedSender<maker_cfd_actor::Command>, mpsc::UnboundedSender<maker_cfd_actor::Command>,
) { )
where
D: BatchDatabase,
{
let (sender, mut receiver) = mpsc::unbounded_channel(); let (sender, mut receiver) = mpsc::unbounded_channel();
let mut current_contract_setup = None;
let mut current_offer_id = None; let mut current_offer_id = None;
let actor = async move { let actor = {
// populate the CFD feed with existing CFDs let sender = sender.clone();
let mut conn = db.acquire().await.unwrap();
cfd_feed_actor_inbox async move {
.send(db::load_all_cfds(&mut conn).await.unwrap()) // populate the CFD feed with existing CFDs
.unwrap(); let mut conn = db.acquire().await.unwrap();
cfd_feed_actor_inbox
while let Some(message) = receiver.recv().await { .send(db::load_all_cfds(&mut conn).await.unwrap())
match message { .unwrap();
maker_cfd_actor::Command::TakeOffer {
taker_id, while let Some(message) = receiver.recv().await {
offer_id, match message {
quantity, maker_cfd_actor::Command::TakeOffer {
} => { taker_id,
println!( offer_id,
"Taker {} wants to take {} of offer {}", quantity,
taker_id, quantity, offer_id } => {
); println!(
"Taker {} wants to take {} of offer {}",
let mut conn = db.acquire().await.unwrap(); taker_id, quantity, offer_id
);
// 1. Validate if offer is still valid
let current_offer = match current_offer_id { let mut conn = db.acquire().await.unwrap();
Some(current_offer_id) if current_offer_id == offer_id => {
db::load_offer_by_id(current_offer_id, &mut conn) // 1. Validate if offer is still valid
.await let current_offer = match current_offer_id {
.unwrap() Some(current_offer_id) if current_offer_id == offer_id => {
} db::load_offer_by_id(current_offer_id, &mut conn)
_ => { .await
takers .unwrap()
}
_ => {
takers
.send(maker_inc_connections_actor::Command::NotifyInvalidOfferId { .send(maker_inc_connections_actor::Command::NotifyInvalidOfferId {
id: offer_id, id: offer_id,
taker_id, taker_id,
}) })
.unwrap(); .unwrap();
continue; continue;
} }
}; };
// 2. Insert CFD in DB // 2. Insert CFD in DB
// TODO: Don't auto-accept, present to user in UI instead // TODO: Don't auto-accept, present to user in UI instead
let cfd = Cfd::new( let cfd = Cfd::new(
current_offer, current_offer,
quantity, quantity,
CfdState::Accepted { CfdState::Accepted {
common: CfdStateCommon { common: CfdStateCommon {
transition_timestamp: SystemTime::now(), transition_timestamp: SystemTime::now(),
},
}, },
}, Usd(dec!(10001)),
Usd(dec!(10001)), )
)
.unwrap();
db::insert_cfd(cfd, &mut conn).await.unwrap();
takers
.send(maker_inc_connections_actor::Command::NotifyOfferAccepted {
id: offer_id,
taker_id,
})
.unwrap();
cfd_feed_actor_inbox
.send(db::load_all_cfds(&mut conn).await.unwrap())
.unwrap(); .unwrap();
db::insert_cfd(cfd, &mut conn).await.unwrap();
// 3. Remove current offer takers
current_offer_id = None; .send(maker_inc_connections_actor::Command::NotifyOfferAccepted {
takers id: offer_id,
.send(maker_inc_connections_actor::Command::BroadcastCurrentOffer( taker_id,
None, })
)) .unwrap();
.unwrap(); cfd_feed_actor_inbox
offer_feed_sender.send(None).unwrap(); .send(db::load_all_cfds(&mut conn).await.unwrap())
} .unwrap();
maker_cfd_actor::Command::NewOffer(offer) => {
// 1. Save to DB // 3. Remove current offer
let mut conn = db.acquire().await.unwrap(); current_offer_id = None;
db::insert_cfd_offer(&offer, &mut conn).await.unwrap(); takers
.send(maker_inc_connections_actor::Command::BroadcastCurrentOffer(
// 2. Update actor state to current offer None,
current_offer_id.replace(offer.id); ))
.unwrap();
// 3. Notify UI via feed offer_feed_sender.send(None).unwrap();
offer_feed_sender.send(Some(offer.clone())).unwrap(); }
maker_cfd_actor::Command::NewOffer(offer) => {
// 4. Inform connected takers // 1. Save to DB
takers let mut conn = db.acquire().await.unwrap();
.send(maker_inc_connections_actor::Command::BroadcastCurrentOffer( db::insert_cfd_offer(&offer, &mut conn).await.unwrap();
Some(offer),
)) // 2. Update actor state to current offer
.unwrap(); current_offer_id.replace(offer.id);
}
maker_cfd_actor::Command::NewTakerOnline { id: taker_id } => { // 3. Notify UI via feed
let mut conn = db.acquire().await.unwrap(); offer_feed_sender.send(Some(offer.clone())).unwrap();
let current_offer = match current_offer_id { // 4. Inform connected takers
Some(current_offer_id) => Some( takers
db::load_offer_by_id(current_offer_id, &mut conn) .send(maker_inc_connections_actor::Command::BroadcastCurrentOffer(
.await Some(offer),
.unwrap(), ))
), .unwrap();
None => None, }
}; maker_cfd_actor::Command::NewTakerOnline { id: taker_id } => {
let mut conn = db.acquire().await.unwrap();
takers
.send(maker_inc_connections_actor::Command::SendCurrentOffer { let current_offer = match current_offer_id {
offer: current_offer, Some(current_offer_id) => Some(
taker_id, db::load_offer_by_id(current_offer_id, &mut conn)
}) .await
.unwrap(); .unwrap(),
),
None => None,
};
takers
.send(maker_inc_connections_actor::Command::SendCurrentOffer {
offer: current_offer,
taker_id,
})
.unwrap();
}
maker_cfd_actor::Command::StartContractSetup {
taker_id,
offer_id: _offer_id,
} => {
// Kick-off the CFD protocol
let (sk, pk) = crate::keypair::new(&mut rand::thread_rng());
// TODO: Load correct quantity from DB with offer_id
let maker_params = wallet
.build_party_params(bitcoin::Amount::ZERO, pk)
.unwrap();
let (actor, inbox) = setup_contract(
{
let inbox = takers.clone();
move |msg| {
inbox
.send(
maker_inc_connections_actor::Command::OutProtocolMsg {
taker_id,
msg,
},
)
.unwrap()
}
},
maker_params,
sk,
oracle_pk,
);
tokio::spawn({
let sender = sender.clone();
async move {
sender
.send(Command::CfdSetupCompleted(actor.await))
.unwrap()
}
});
current_contract_setup = Some(inbox);
}
maker_cfd_actor::Command::IncProtocolMsg(msg) => {
let inbox = match &current_contract_setup {
None => panic!("whoops"),
Some(inbox) => inbox,
};
inbox.send(msg).unwrap();
}
maker_cfd_actor::Command::CfdSetupCompleted(_finalized_cfd) => {
todo!("but what?")
}
} }
} }
} }
@ -149,3 +228,91 @@ pub fn new(
(actor, sender) (actor, sender)
} }
/// Given an initial set of parameters, sets up the CFD contract with the taker.
///
/// Returns the [`FinalizedCfd`] which contains the lock transaction, ready to be signed and sent to
/// the taker. Signing of the lock transaction is not included in this function because we want the
/// actor above to own the wallet.
fn setup_contract(
send_to_taker: impl Fn(SetupMsg),
maker: PartyParams,
sk: SecretKey,
oracle_pk: schnorrsig::PublicKey,
) -> (
impl Future<Output = FinalizedCfd>,
mpsc::UnboundedSender<SetupMsg>,
) {
let (sender, mut receiver) = mpsc::unbounded_channel::<SetupMsg>();
let actor = async move {
let (rev_sk, rev_pk) = crate::keypair::new(&mut rand::thread_rng());
let (publish_sk, publish_pk) = crate::keypair::new(&mut rand::thread_rng());
let maker_punish = PunishParams {
revocation_pk: rev_pk,
publish_pk,
};
send_to_taker(SetupMsg::Msg0(Msg0::from((maker.clone(), maker_punish))));
let msg0 = receiver.recv().await.unwrap().try_into_msg0().unwrap();
let (taker, taker_punish) = msg0.into();
let maker_cfd_txs = create_cfd_transactions(
(maker.clone(), maker_punish),
(taker.clone(), taker_punish),
oracle_pk,
0, // TODO: Calculate refund timelock based on CFD term
vec![],
sk,
)
.unwrap();
send_to_taker(SetupMsg::Msg1(Msg1::from(maker_cfd_txs.clone())));
let msg1 = receiver.recv().await.unwrap().try_into_msg1().unwrap();
let _lock_desc = lock_descriptor(taker.identity_pk, taker.identity_pk);
// let lock_amount = maker_lock_amount + taker_lock_amount;
let _commit_desc = commit_descriptor(
(
taker.identity_pk,
taker_punish.revocation_pk,
taker_punish.publish_pk,
),
(taker.identity_pk, rev_pk, publish_pk),
);
let commit_tx = maker_cfd_txs.commit.0;
let _commit_amount = Amount::from_sat(commit_tx.output[0].value);
// TODO: Verify all signatures from the taker here
let lock_tx = maker_cfd_txs.lock;
let refund_tx = maker_cfd_txs.refund.0;
let mut cet_by_id = maker_cfd_txs
.cets
.into_iter()
.map(|(tx, _, msg, _)| (tx.txid(), (tx, msg)))
.collect::<HashMap<_, _>>();
FinalizedCfd {
identity: sk,
revocation: rev_sk,
publish: publish_sk,
lock: lock_tx,
commit: (commit_tx, *msg1.commit),
cets: msg1
.cets
.into_iter()
.map(|(txid, sig)| {
let (cet, msg) = cet_by_id.remove(&txid).expect("unknown CET");
(cet, *sig, msg)
})
.collect::<Vec<_>>(),
refund: (refund_tx, msg1.refund),
}
};
(actor, sender)
}

17
daemon/src/maker_inc_connections_actor.rs

@ -1,5 +1,6 @@
use crate::model::cfd::{CfdOffer, CfdOfferId}; use crate::model::cfd::{CfdOffer, CfdOfferId};
use crate::model::TakerId; use crate::model::TakerId;
use crate::wire::SetupMsg;
use crate::{maker_cfd_actor, maker_inc_connections_actor, send_wire_message_actor, wire}; use crate::{maker_cfd_actor, maker_inc_connections_actor, send_wire_message_actor, wire};
use futures::{Future, StreamExt}; use futures::{Future, StreamExt};
use std::collections::HashMap; use std::collections::HashMap;
@ -8,6 +9,7 @@ use tokio::net::TcpListener;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_util::codec::{FramedRead, LengthDelimitedCodec}; use tokio_util::codec::{FramedRead, LengthDelimitedCodec};
#[allow(clippy::large_enum_variant)]
#[derive(Debug)] #[derive(Debug)]
pub enum Command { pub enum Command {
BroadcastCurrentOffer(Option<CfdOffer>), BroadcastCurrentOffer(Option<CfdOffer>),
@ -23,6 +25,10 @@ pub enum Command {
id: CfdOfferId, id: CfdOfferId,
taker_id: TakerId, taker_id: TakerId,
}, },
OutProtocolMsg {
taker_id: TakerId,
msg: SetupMsg,
},
} }
pub fn new( pub fn new(
@ -70,6 +76,10 @@ pub fn new(
let conn = write_connections.get(&taker_id).expect("no connection to taker_id"); let conn = write_connections.get(&taker_id).expect("no connection to taker_id");
conn.send(wire::MakerToTaker::ConfirmTakeOffer(id)).unwrap(); conn.send(wire::MakerToTaker::ConfirmTakeOffer(id)).unwrap();
}, },
maker_inc_connections_actor::Command::OutProtocolMsg { taker_id, msg } => {
let conn = write_connections.get(&taker_id).expect("no connection to taker_id");
conn.send(wire::MakerToTaker::Protocol(msg)).unwrap();
}
} }
} }
} }
@ -97,7 +107,12 @@ fn in_taker_messages(
quantity, quantity,
}) })
.unwrap(), .unwrap(),
Ok(wire::TakerToMaker::StartContractSetup(_offer_id)) => {} Ok(wire::TakerToMaker::StartContractSetup(offer_id)) => cfd_actor_inbox
.send(maker_cfd_actor::Command::StartContractSetup { taker_id, offer_id })
.unwrap(),
Ok(wire::TakerToMaker::Protocol(msg)) => cfd_actor_inbox
.send(maker_cfd_actor::Command::IncProtocolMsg(msg))
.unwrap(),
Err(error) => { Err(error) => {
eprintln!("Error in reading message: {}", error); eprintln!("Error in reading message: {}", error);
} }

33
daemon/src/model/cfd.rs

@ -1,6 +1,9 @@
use crate::model::{Leverage, Position, TradingPair, Usd}; use crate::model::{Leverage, Position, TradingPair, Usd};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use bdk::bitcoin::Amount; use bdk::bitcoin::secp256k1::{SecretKey, Signature};
use bdk::bitcoin::util::psbt::PartiallySignedTransaction;
use bdk::bitcoin::{Amount, Transaction};
use cfd_protocol::EcdsaAdaptorSignature;
use rust_decimal::Decimal; use rust_decimal::Decimal;
use rust_decimal_macros::dec; use rust_decimal_macros::dec;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -33,7 +36,8 @@ pub struct CfdOffer {
pub price: Usd, pub price: Usd,
// TODO: [post-MVP] Representation of the contract size; at the moment the contract size is always 1 USD // TODO: [post-MVP] Representation of the contract size; at the moment the contract size is
// always 1 USD
pub min_quantity: Usd, pub min_quantity: Usd,
pub max_quantity: Usd, pub max_quantity: Usd,
@ -120,7 +124,8 @@ pub struct CfdTakeRequest {
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
pub struct CfdNewOfferRequest { pub struct CfdNewOfferRequest {
pub price: Usd, pub price: Usd,
// TODO: [post-MVP] Representation of the contract size; at the moment the contract size is always 1 USD // TODO: [post-MVP] Representation of the contract size; at the moment the contract size is
// always 1 USD
pub min_quantity: Usd, pub min_quantity: Usd,
pub max_quantity: Usd, pub max_quantity: Usd,
} }
@ -183,7 +188,8 @@ pub enum CfdState {
/// ///
/// This state applies to taker and maker. /// This state applies to taker and maker.
CloseRequested { common: CfdStateCommon }, CloseRequested { common: CfdStateCommon },
/// The close transaction (CET) was published on the Bitcoin blockchain but we don't have a confirmation yet. /// The close transaction (CET) was published on the Bitcoin blockchain but we don't have a
/// confirmation yet.
/// ///
/// This state applies to taker and maker. /// This state applies to taker and maker.
PendingClose { common: CfdStateCommon }, PendingClose { common: CfdStateCommon },
@ -334,7 +340,8 @@ mod tests {
#[test] #[test]
fn serialize_cfd_state_snapshot() { fn serialize_cfd_state_snapshot() {
// This test is to prevent us from breaking the cfd_state API used by the UI and database! // This test is to prevent us from breaking the cfd_state API used by the UI and database!
// We serialize the state into the database, so changes to the enum result in breaking program version changes. // We serialize the state into the database, so changes to the enum result in breaking
// program version changes.
let fixed_timestamp = UNIX_EPOCH; let fixed_timestamp = UNIX_EPOCH;
@ -439,3 +446,19 @@ mod tests {
); );
} }
} }
/// Contains all data we've assembled about the CFD through the setup protocol.
///
/// All contained signatures are the signatures of THE OTHER PARTY.
/// To use any of these transactions, we need to re-sign them with the correct secret key.
#[derive(Debug)]
pub struct FinalizedCfd {
pub identity: SecretKey,
pub revocation: SecretKey,
pub publish: SecretKey,
pub lock: PartiallySignedTransaction,
pub commit: (Transaction, EcdsaAdaptorSignature),
pub cets: Vec<(Transaction, EcdsaAdaptorSignature, Vec<u8>)>,
pub refund: (Transaction, Signature),
}

4
daemon/src/routes_maker.rs

@ -67,8 +67,8 @@ pub async fn post_sell_offer(
} }
// // TODO: Shall we use a simpler struct for verification? AFAICT quantity is not // // TODO: Shall we use a simpler struct for verification? AFAICT quantity is not
// // needed, no need to send the whole CFD either as the other fields can be generated from the offer // // needed, no need to send the whole CFD either as the other fields can be generated from the
// #[rocket::post("/offer/confirm", data = "<cfd_confirm_offer_request>")] // offer #[rocket::post("/offer/confirm", data = "<cfd_confirm_offer_request>")]
// pub async fn post_confirm_offer( // pub async fn post_confirm_offer(
// cfd_confirm_offer_request: Json<CfdTakeRequest>, // cfd_confirm_offer_request: Json<CfdTakeRequest>,
// queue: &State<mpsc::Sender<CfdOffer>>, // queue: &State<mpsc::Sender<CfdOffer>>,

77
daemon/src/taker.rs

@ -1,5 +1,7 @@
use anyhow::Result; use anyhow::Result;
use bdk::bitcoin::Amount; use bdk::bitcoin::secp256k1::{schnorrsig, SECP256K1};
use bdk::bitcoin::{self, Amount};
use bdk::blockchain::{ElectrumBlockchain, NoopProgress};
use model::cfd::{Cfd, CfdOffer}; use model::cfd::{Cfd, CfdOffer};
use rocket::fairing::AdHoc; use rocket::fairing::AdHoc;
use rocket::figment::util::map; use rocket::figment::util::map;
@ -8,6 +10,7 @@ use rocket_db_pools::Database;
use tokio::sync::watch; use tokio::sync::watch;
mod db; mod db;
mod keypair;
mod model; mod model;
mod routes_taker; mod routes_taker;
mod send_wire_message_actor; mod send_wire_message_actor;
@ -22,9 +25,28 @@ pub struct Db(sqlx::SqlitePool);
#[rocket::main] #[rocket::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
let client =
bdk::electrum_client::Client::new("ssl://electrum.blockstream.info:60002").unwrap();
// TODO: Replace with sqlite once https://github.com/bitcoindevkit/bdk/pull/376 is merged.
let db = bdk::sled::open("/tmp/taker.db")?;
let wallet_db = db.open_tree("wallet")?;
let wallet = bdk::Wallet::new(
"wpkh(tprv8ZgxMBicQKsPfL3BRRo2gK3rMQwsy49vhEHCsaRJSM3gNrwnDwpdzLVQzbsDo738VHyrMK3FJAaxsBkpu8gk77SUQ197RNyF46brV2EVKRZ/*)#29cd5ajg",
None,
bitcoin::Network::Testnet,
wallet_db,
ElectrumBlockchain::from(client),
)
.unwrap();
wallet.sync(NoopProgress, None).unwrap(); // TODO: Use LogProgress once we have logging.
let oracle = schnorrsig::KeyPair::new(SECP256K1, &mut rand::thread_rng()); // TODO: Fetch oracle public key from oracle.
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::<Vec<Cfd>>(vec![]); let (cfd_feed_sender, cfd_feed_receiver) = watch::channel::<Vec<Cfd>>(vec![]);
let (offer_feed_sender, offer_feed_receiver) = watch::channel::<Option<CfdOffer>>(None); let (offer_feed_sender, offer_feed_receiver) = watch::channel::<Option<CfdOffer>>(None);
let (_balance_feed_sender, balance_feed_receiver) = watch::channel::<Amount>(Amount::ONE_BTC); let (_balance_feed_sender, balance_feed_receiver) = watch::channel::<Amount>(Amount::ZERO);
let socket = tokio::net::TcpSocket::new_v4().unwrap(); let socket = tokio::net::TcpSocket::new_v4().unwrap();
let connection = socket let connection = socket
@ -57,29 +79,34 @@ async fn main() -> Result<()> {
} }
}, },
)) ))
.attach(AdHoc::try_on_ignite("Create actors", |rocket| async move { .attach(AdHoc::try_on_ignite(
let db = match Db::fetch(&rocket) { "Create actors",
Some(db) => (**db).clone(), move |rocket| async move {
None => return Err(rocket), let db = match Db::fetch(&rocket) {
}; Some(db) => (**db).clone(),
None => return Err(rocket),
let (out_maker_messages_actor, out_maker_actor_inbox) = };
send_wire_message_actor::new(write);
let (cfd_actor, cfd_actor_inbox) = taker_cfd_actor::new( let (out_maker_messages_actor, out_maker_actor_inbox) =
db, send_wire_message_actor::new(write);
cfd_feed_sender, let (cfd_actor, cfd_actor_inbox) = taker_cfd_actor::new(
offer_feed_sender, db,
out_maker_actor_inbox, wallet,
); schnorrsig::PublicKey::from_keypair(SECP256K1, &oracle),
let inc_maker_messages_actor = cfd_feed_sender,
taker_inc_message_actor::new(read, cfd_actor_inbox.clone()); offer_feed_sender,
out_maker_actor_inbox,
tokio::spawn(cfd_actor); );
tokio::spawn(inc_maker_messages_actor); let inc_maker_messages_actor =
tokio::spawn(out_maker_messages_actor); taker_inc_message_actor::new(read, cfd_actor_inbox.clone());
Ok(rocket.manage(cfd_actor_inbox)) tokio::spawn(cfd_actor);
})) tokio::spawn(inc_maker_messages_actor);
tokio::spawn(out_maker_messages_actor);
Ok(rocket.manage(cfd_actor_inbox))
},
))
.mount( .mount(
"/", "/",
rocket::routes![ rocket::routes![

272
daemon/src/taker_cfd_actor.rs

@ -1,91 +1,151 @@
use crate::model::cfd::{Cfd, CfdOffer, CfdOfferId, CfdState, CfdStateCommon}; use crate::model::cfd::{Cfd, CfdOffer, CfdOfferId, CfdState, CfdStateCommon, FinalizedCfd};
use crate::model::Usd; use crate::model::Usd;
use crate::wire::{Msg0, Msg1, SetupMsg};
use crate::{db, wire}; use crate::{db, wire};
use bdk::bitcoin::secp256k1::{schnorrsig, SecretKey};
use bdk::bitcoin::{self, Amount};
use bdk::database::BatchDatabase;
use cfd_protocol::{
commit_descriptor, create_cfd_transactions, lock_descriptor, PartyParams, PunishParams,
WalletExt,
};
use core::panic;
use futures::Future; use futures::Future;
use std::collections::HashMap;
use std::time::SystemTime; use std::time::SystemTime;
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
#[derive(Debug)] #[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum Command { pub enum Command {
TakeOffer { offer_id: CfdOfferId, quantity: Usd }, TakeOffer { offer_id: CfdOfferId, quantity: Usd },
NewOffer(Option<CfdOffer>), NewOffer(Option<CfdOffer>),
OfferAccepted(CfdOfferId), OfferAccepted(CfdOfferId),
IncProtocolMsg(SetupMsg),
CfdSetupCompleted(FinalizedCfd),
} }
pub fn new( pub fn new<B, D>(
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: bdk::Wallet<B, D>,
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
offer_feed_actor_inbox: watch::Sender<Option<CfdOffer>>, offer_feed_actor_inbox: watch::Sender<Option<CfdOffer>>,
out_msg_maker_inbox: mpsc::UnboundedSender<wire::TakerToMaker>, out_msg_maker_inbox: mpsc::UnboundedSender<wire::TakerToMaker>,
) -> (impl Future<Output = ()>, mpsc::UnboundedSender<Command>) { ) -> (impl Future<Output = ()>, mpsc::UnboundedSender<Command>)
where
D: BatchDatabase,
{
let (sender, mut receiver) = mpsc::unbounded_channel(); let (sender, mut receiver) = mpsc::unbounded_channel();
let mut current_contract_setup = None;
let actor = async move { let actor = {
// populate the CFD feed with existing CFDs let sender = sender.clone();
let mut conn = db.acquire().await.unwrap();
cfd_feed_actor_inbox
.send(db::load_all_cfds(&mut conn).await.unwrap())
.unwrap();
while let Some(message) = receiver.recv().await {
match message {
Command::TakeOffer { offer_id, quantity } => {
let mut conn = db.acquire().await.unwrap();
let current_offer = db::load_offer_by_id(offer_id, &mut conn).await.unwrap();
println!("Accepting current offer: {:?}", &current_offer);
let cfd = Cfd::new(
current_offer,
quantity,
CfdState::PendingTakeRequest {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
Usd::ZERO,
)
.unwrap();
db::insert_cfd(cfd, &mut conn).await.unwrap(); async move {
// populate the CFD feed with existing CFDs
let mut conn = db.acquire().await.unwrap();
cfd_feed_actor_inbox
.send(db::load_all_cfds(&mut conn).await.unwrap())
.unwrap();
cfd_feed_actor_inbox while let Some(message) = receiver.recv().await {
.send(db::load_all_cfds(&mut conn).await.unwrap()) match message {
.unwrap(); Command::TakeOffer { offer_id, quantity } => {
out_msg_maker_inbox let mut conn = db.acquire().await.unwrap();
.send(wire::TakerToMaker::TakeOffer { offer_id, quantity })
let current_offer =
db::load_offer_by_id(offer_id, &mut conn).await.unwrap();
println!("Accepting current offer: {:?}", &current_offer);
let cfd = Cfd::new(
current_offer,
quantity,
CfdState::PendingTakeRequest {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
},
Usd::ZERO,
)
.unwrap(); .unwrap();
}
Command::NewOffer(Some(offer)) => {
let mut conn = db.acquire().await.unwrap();
db::insert_cfd_offer(&offer, &mut conn).await.unwrap();
offer_feed_actor_inbox.send(Some(offer)).unwrap();
}
Command::NewOffer(None) => { db::insert_cfd(cfd, &mut conn).await.unwrap();
offer_feed_actor_inbox.send(None).unwrap();
}
Command::OfferAccepted(offer_id) => { cfd_feed_actor_inbox
let mut conn = db.acquire().await.unwrap(); .send(db::load_all_cfds(&mut conn).await.unwrap())
db::insert_new_cfd_state_by_offer_id( .unwrap();
offer_id, out_msg_maker_inbox
CfdState::ContractSetup { .send(wire::TakerToMaker::TakeOffer { offer_id, quantity })
common: CfdStateCommon { .unwrap();
transition_timestamp: SystemTime::now(), }
Command::NewOffer(Some(offer)) => {
let mut conn = db.acquire().await.unwrap();
db::insert_cfd_offer(&offer, &mut conn).await.unwrap();
offer_feed_actor_inbox.send(Some(offer)).unwrap();
}
Command::NewOffer(None) => {
offer_feed_actor_inbox.send(None).unwrap();
}
Command::OfferAccepted(offer_id) => {
let mut conn = db.acquire().await.unwrap();
db::insert_new_cfd_state_by_offer_id(
offer_id,
CfdState::ContractSetup {
common: CfdStateCommon {
transition_timestamp: SystemTime::now(),
},
}, },
}, &mut conn,
&mut conn, )
) .await
.await
.unwrap();
cfd_feed_actor_inbox
.send(db::load_all_cfds(&mut conn).await.unwrap())
.unwrap(); .unwrap();
// TODO: Contract signing/setup cfd_feed_actor_inbox
.send(db::load_all_cfds(&mut conn).await.unwrap())
.unwrap();
let (sk, pk) = crate::keypair::new(&mut rand::thread_rng());
let taker_params = wallet
.build_party_params(bitcoin::Amount::ZERO, pk) // TODO: Load correct quantity from DB
.unwrap();
let (actor, inbox) = setup_contract(
{
let inbox = out_msg_maker_inbox.clone();
move |msg| inbox.send(wire::TakerToMaker::Protocol(msg)).unwrap()
},
taker_params,
sk,
oracle_pk,
);
tokio::spawn({
let sender = sender.clone();
async move {
sender
.send(Command::CfdSetupCompleted(actor.await))
.unwrap()
}
});
current_contract_setup = Some(inbox);
}
Command::IncProtocolMsg(msg) => {
let inbox = match &current_contract_setup {
None => panic!("whoops"),
Some(inbox) => inbox,
};
inbox.send(msg).unwrap();
}
Command::CfdSetupCompleted(_finalized_cfd) => {
todo!("but what?")
}
} }
} }
} }
@ -93,3 +153,93 @@ pub fn new(
(actor, sender) (actor, sender)
} }
/// Given an initial set of parameters, sets up the CFD contract with the maker.
///
/// Returns the [`FinalizedCfd`] which contains the lock transaction, ready to be signed and sent to
/// the maker. Signing of the lock transaction is not included in this function because we want the
/// actor above to own the wallet.
fn setup_contract(
send_to_maker: impl Fn(SetupMsg),
taker: PartyParams,
sk: SecretKey,
oracle_pk: schnorrsig::PublicKey,
) -> (
impl Future<Output = FinalizedCfd>,
mpsc::UnboundedSender<SetupMsg>,
) {
let (sender, mut receiver) = mpsc::unbounded_channel::<SetupMsg>();
let actor = async move {
let (rev_sk, rev_pk) = crate::keypair::new(&mut rand::thread_rng());
let (publish_sk, publish_pk) = crate::keypair::new(&mut rand::thread_rng());
let taker_punish = PunishParams {
revocation_pk: rev_pk,
publish_pk,
};
send_to_maker(SetupMsg::Msg0(Msg0::from((taker.clone(), taker_punish))));
let msg0 = receiver.recv().await.unwrap().try_into_msg0().unwrap();
let (maker, maker_punish) = msg0.into();
let taker_cfd_txs = create_cfd_transactions(
(maker.clone(), maker_punish),
(taker.clone(), taker_punish),
oracle_pk,
0, // TODO: Calculate refund timelock based on CFD term
vec![],
sk,
)
.unwrap();
send_to_maker(SetupMsg::Msg1(Msg1::from(taker_cfd_txs.clone())));
let msg1 = receiver.recv().await.unwrap().try_into_msg1().unwrap();
let _lock_desc = lock_descriptor(maker.identity_pk, taker.identity_pk);
// let lock_amount = maker_lock_amount + taker_lock_amount;
let _commit_desc = commit_descriptor(
(
maker.identity_pk,
maker_punish.revocation_pk,
maker_punish.publish_pk,
),
(taker.identity_pk, rev_pk, publish_pk),
);
let commit_tx = taker_cfd_txs.commit.0;
let _commit_amount = Amount::from_sat(commit_tx.output[0].value);
// TODO: Verify all signatures from the maker here
let lock_tx = taker_cfd_txs.lock;
let refund_tx = taker_cfd_txs.refund.0;
let mut cet_by_id = taker_cfd_txs
.cets
.into_iter()
.map(|(tx, _, msg, _)| (tx.txid(), (tx, msg)))
.collect::<HashMap<_, _>>();
FinalizedCfd {
identity: sk,
revocation: rev_sk,
publish: publish_sk,
lock: lock_tx,
commit: (commit_tx, *msg1.commit),
cets: msg1
.cets
.into_iter()
.map(|(txid, sig)| {
let (cet, msg) = cet_by_id.remove(&txid).expect("unknown CET");
(cet, *sig, msg)
})
.collect::<Vec<_>>(),
refund: (refund_tx, msg1.refund),
}
};
(actor, sender)
}

5
daemon/src/taker_inc_message_actor.rs

@ -32,6 +32,11 @@ pub fn new(
Ok(wire::MakerToTaker::InvalidOfferId(_)) => { Ok(wire::MakerToTaker::InvalidOfferId(_)) => {
todo!() todo!()
} }
Ok(wire::MakerToTaker::Protocol(msg)) => {
cfd_actor
.send(taker_cfd_actor::Command::IncProtocolMsg(msg))
.unwrap();
}
Err(error) => { Err(error) => {
eprintln!("Error in reading message: {}", error); eprintln!("Error in reading message: {}", error);
} }

133
daemon/src/wire.rs

@ -1,21 +1,150 @@
use crate::model::cfd::CfdOfferId; use crate::model::cfd::CfdOfferId;
use crate::model::Usd; use crate::model::Usd;
use crate::CfdOffer; use crate::CfdOffer;
use bdk::bitcoin::secp256k1::Signature;
use bdk::bitcoin::util::psbt::PartiallySignedTransaction;
use bdk::bitcoin::{Address, Amount, PublicKey, Txid};
use cfd_protocol::{CfdTransactions, PartyParams, PunishParams};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
pub struct AdaptorSignature(#[serde_as(as = "DisplayFromStr")] cfd_protocol::EcdsaAdaptorSignature);
impl std::ops::Deref for AdaptorSignature {
type Target = cfd_protocol::EcdsaAdaptorSignature;
fn deref(&self) -> &Self::Target {
&self.0
}
}
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type", content = "payload")] #[serde(tag = "type", content = "payload")]
#[allow(clippy::large_enum_variant)]
pub enum TakerToMaker { pub enum TakerToMaker {
TakeOffer { offer_id: CfdOfferId, quantity: Usd }, TakeOffer { offer_id: CfdOfferId, quantity: Usd },
// TODO: Currently the taker starts, can already send some stuff for signing over in the first message. // TODO: Currently the taker starts, can already send some stuff for signing over in the first
// message.
StartContractSetup(CfdOfferId), StartContractSetup(CfdOfferId),
Protocol(SetupMsg),
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type", content = "payload")] #[serde(tag = "type", content = "payload")]
#[allow(clippy::large_enum_variant)]
pub enum MakerToTaker { pub enum MakerToTaker {
CurrentOffer(Option<CfdOffer>), CurrentOffer(Option<CfdOffer>),
// TODO: Needs RejectOffer as well // TODO: Needs RejectOffer as well
ConfirmTakeOffer(CfdOfferId), ConfirmTakeOffer(CfdOfferId), // TODO: Include payout curve in "accept" message from maker
InvalidOfferId(CfdOfferId), InvalidOfferId(CfdOfferId),
Protocol(SetupMsg),
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type", content = "payload")]
pub enum SetupMsg {
Msg0(Msg0),
Msg1(Msg1),
}
impl SetupMsg {
pub fn try_into_msg0(self) -> Result<Msg0, Self> {
if let Self::Msg0(v) = self {
Ok(v)
} else {
Err(self)
}
}
pub fn try_into_msg1(self) -> Result<Msg1, Self> {
if let Self::Msg1(v) = self {
Ok(v)
} else {
Err(self)
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Msg0 {
pub lock_psbt: PartiallySignedTransaction, // TODO: Use binary representation
pub identity_pk: PublicKey,
#[serde(with = "bdk::bitcoin::util::amount::serde::as_sat")]
pub lock_amount: Amount,
pub address: Address,
pub revocation_pk: PublicKey,
pub publish_pk: PublicKey,
}
impl From<(PartyParams, PunishParams)> for Msg0 {
fn from((party, punish): (PartyParams, PunishParams)) -> Self {
let PartyParams {
lock_psbt,
identity_pk,
lock_amount,
address,
} = party;
let PunishParams {
revocation_pk,
publish_pk,
} = punish;
Self {
lock_psbt,
identity_pk,
lock_amount,
address,
revocation_pk,
publish_pk,
}
}
}
impl From<Msg0> for (PartyParams, PunishParams) {
fn from(msg0: Msg0) -> Self {
let Msg0 {
lock_psbt,
identity_pk,
lock_amount,
address,
revocation_pk,
publish_pk,
} = msg0;
let party = PartyParams {
lock_psbt,
identity_pk,
lock_amount,
address,
};
let punish = PunishParams {
revocation_pk,
publish_pk,
};
(party, punish)
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Msg1 {
pub commit: AdaptorSignature,
pub cets: Vec<(Txid, AdaptorSignature)>,
pub refund: Signature,
}
impl From<CfdTransactions> for Msg1 {
fn from(txs: CfdTransactions) -> Self {
Self {
commit: AdaptorSignature(txs.commit.1),
cets: txs
.cets
.into_iter()
.map(|(tx, sig, _, _)| (tx.txid(), AdaptorSignature(sig)))
.collect(),
refund: txs.refund.1,
}
}
} }

4
dprint.json

@ -3,7 +3,9 @@
"projectType": "openSource", "projectType": "openSource",
"incremental": true, "incremental": true,
"rustfmt": { "rustfmt": {
"imports_granularity": "module" "imports_granularity": "module",
"wrap_comments": true,
"comment_width": 120
}, },
"includes": ["**/*.{md,rs,toml}"], "includes": ["**/*.{md,rs,toml}"],
"excludes": ["**/target", "excludes": ["**/target",

Loading…
Cancel
Save