Browse Source

Merge #428

428: Actor tests r=da-kami a=da-kami

TODOs:

- [x] Test code does not clippy, there are two annoying warnings
- [x] Rebase on `master` 😬

Once that is done I'd say we get this work in and then add more tests and extract the test framework into a test module as needed. 

Co-authored-by: Daniel Karzel <daniel@comit.network>
Co-authored-by: Mariusz Klochowicz <mariusz@klochowicz.com>
contact-taker-before-changing-cfd-state
bors[bot] 3 years ago
committed by GitHub
parent
commit
c22dcc9739
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 58
      daemon/src/cfd_actors.rs
  2. 2
      daemon/src/connection.rs
  3. 33
      daemon/src/housekeeping.rs
  4. 215
      daemon/src/lib.rs
  5. 164
      daemon/src/maker.rs
  6. 153
      daemon/src/maker_cfd.rs
  7. 25
      daemon/src/setup_contract.rs
  8. 121
      daemon/src/taker.rs
  9. 171
      daemon/src/taker_cfd.rs
  10. 57
      daemon/src/wallet.rs
  11. 11
      daemon/src/wallet_sync.rs
  12. 273
      daemon/tests/happy_path.rs
  13. 28
      xtra_productivity/src/lib.rs
  14. 20
      xtra_productivity/tests/pass/can_handle_message.rs

58
daemon/src/cfd_actors.rs

@ -1,7 +1,6 @@
use crate::db::load_cfd_by_order_id; use crate::db::load_cfd_by_order_id;
use crate::model::cfd::{Attestation, Cfd, CfdState, CfdStateChangeEvent, OrderId}; use crate::model::cfd::{Attestation, Cfd, CfdState, CfdStateChangeEvent, OrderId};
use crate::wallet::Wallet; use crate::{db, monitor, oracle, try_continue, wallet};
use crate::{db, monitor, oracle, try_continue};
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use sqlx::pool::PoolConnection; use sqlx::pool::PoolConnection;
use sqlx::Sqlite; use sqlx::Sqlite;
@ -34,15 +33,21 @@ pub async fn append_cfd_state(
Ok(()) Ok(())
} }
pub async fn try_cet_publication( pub async fn try_cet_publication<W>(
cfd: &mut Cfd, cfd: &mut Cfd,
conn: &mut PoolConnection<Sqlite>, conn: &mut PoolConnection<Sqlite>,
wallet: &Wallet, wallet: &xtra::Address<W>,
update_sender: &watch::Sender<Vec<Cfd>>, update_sender: &watch::Sender<Vec<Cfd>>,
) -> Result<()> { ) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
match cfd.cet()? { match cfd.cet()? {
Ok(cet) => { Ok(cet) => {
let txid = wallet.try_broadcast_transaction(cet).await?; let txid = wallet
.send(wallet::TryBroadcastTransaction { tx: cet })
.await?
.context("Failed to send transaction")?;
tracing::info!("CET published with txid {}", txid); tracing::info!("CET published with txid {}", txid);
if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() { if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() {
@ -60,12 +65,15 @@ pub async fn try_cet_publication(
Ok(()) Ok(())
} }
pub async fn handle_monitoring_event( pub async fn handle_monitoring_event<W>(
event: monitor::Event, event: monitor::Event,
conn: &mut PoolConnection<Sqlite>, conn: &mut PoolConnection<Sqlite>,
wallet: &Wallet, wallet: &xtra::Address<W>,
update_sender: &watch::Sender<Vec<Cfd>>, update_sender: &watch::Sender<Vec<Cfd>>,
) -> Result<()> { ) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
let order_id = event.order_id(); let order_id = event.order_id();
let mut cfd = db::load_cfd_by_order_id(order_id, conn).await?; let mut cfd = db::load_cfd_by_order_id(order_id, conn).await?;
@ -82,24 +90,37 @@ pub async fn handle_monitoring_event(
try_cet_publication(&mut cfd, conn, wallet, update_sender).await?; try_cet_publication(&mut cfd, conn, wallet, update_sender).await?;
} else if let CfdState::MustRefund { .. } = cfd.state { } else if let CfdState::MustRefund { .. } = cfd.state {
let signed_refund_tx = cfd.refund_tx()?; let signed_refund_tx = cfd.refund_tx()?;
let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?; let txid = wallet
.send(wallet::TryBroadcastTransaction {
tx: signed_refund_tx,
})
.await?
.context("Failed to publish CET")?;
tracing::info!("Refund transaction published on chain: {}", txid); tracing::info!("Refund transaction published on chain: {}", txid);
} }
Ok(()) Ok(())
} }
pub async fn handle_commit( pub async fn handle_commit<W>(
order_id: OrderId, order_id: OrderId,
conn: &mut PoolConnection<Sqlite>, conn: &mut PoolConnection<Sqlite>,
wallet: &Wallet, wallet: &xtra::Address<W>,
update_sender: &watch::Sender<Vec<Cfd>>, update_sender: &watch::Sender<Vec<Cfd>>,
) -> Result<()> { ) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
let mut cfd = db::load_cfd_by_order_id(order_id, conn).await?; let mut cfd = db::load_cfd_by_order_id(order_id, conn).await?;
let signed_commit_tx = cfd.commit_tx()?; let signed_commit_tx = cfd.commit_tx()?;
let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?; let txid = wallet
.send(wallet::TryBroadcastTransaction {
tx: signed_commit_tx,
})
.await?
.context("Failed to publish commit tx")?;
if cfd.handle(CfdStateChangeEvent::CommitTxSent)?.is_none() { if cfd.handle(CfdStateChangeEvent::CommitTxSent)?.is_none() {
bail!("If we can get the commit tx we should be able to transition") bail!("If we can get the commit tx we should be able to transition")
@ -111,12 +132,15 @@ pub async fn handle_commit(
Ok(()) Ok(())
} }
pub async fn handle_oracle_attestation( pub async fn handle_oracle_attestation<W>(
attestation: oracle::Attestation, attestation: oracle::Attestation,
conn: &mut PoolConnection<Sqlite>, conn: &mut PoolConnection<Sqlite>,
wallet: &Wallet, wallet: &xtra::Address<W>,
update_sender: &watch::Sender<Vec<Cfd>>, update_sender: &watch::Sender<Vec<Cfd>>,
) -> Result<()> { ) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
tracing::debug!( tracing::debug!(
"Learnt latest oracle attestation for event: {}", "Learnt latest oracle attestation for event: {}",
attestation.id attestation.id

2
daemon/src/connection.rs

@ -1,4 +1,4 @@
use daemon::{send_to_socket, taker_cfd, wire}; use crate::{send_to_socket, taker_cfd, wire};
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::time::Duration; use std::time::Duration;

33
daemon/src/housekeeping.rs

@ -1,10 +1,11 @@
use crate::db::{append_cfd_state, load_all_cfds}; use crate::db::{append_cfd_state, load_all_cfds};
use crate::model::cfd::{Cfd, CfdState}; use crate::model::cfd::{Cfd, CfdState};
use crate::try_continue; use crate::{try_continue, wallet};
use crate::wallet::Wallet;
use anyhow::Result; use anyhow::Result;
use sqlx::pool::PoolConnection; use sqlx::pool::PoolConnection;
use sqlx::Sqlite; use sqlx::Sqlite;
use xtra::Address;
pub async fn transition_non_continue_cfds_to_setup_failed( pub async fn transition_non_continue_cfds_to_setup_failed(
conn: &mut PoolConnection<Sqlite>, conn: &mut PoolConnection<Sqlite>,
) -> Result<()> { ) -> Result<()> {
@ -24,25 +25,40 @@ pub async fn transition_non_continue_cfds_to_setup_failed(
pub async fn rebroadcast_transactions( pub async fn rebroadcast_transactions(
conn: &mut PoolConnection<Sqlite>, conn: &mut PoolConnection<Sqlite>,
wallet: &Wallet, wallet: &Address<wallet::Actor>,
) -> Result<()> { ) -> Result<()> {
let cfds = load_all_cfds(conn).await?; let cfds = load_all_cfds(conn).await?;
for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) { for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) {
let txid = try_continue!(wallet.try_broadcast_transaction(dlc.lock.0.clone()).await); let txid = try_continue!(wallet
.send(wallet::TryBroadcastTransaction {
tx: dlc.lock.0.clone()
})
.await
.expect("if sending to actor fails here we are screwed anyway"));
tracing::info!("Lock transaction published with txid {}", txid); tracing::info!("Lock transaction published with txid {}", txid);
} }
for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) { for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) {
let signed_refund_tx = cfd.refund_tx()?; let signed_refund_tx = cfd.refund_tx()?;
let txid = try_continue!(wallet.try_broadcast_transaction(signed_refund_tx).await); let txid = try_continue!(wallet
.send(wallet::TryBroadcastTransaction {
tx: signed_refund_tx
})
.await
.expect("if sending to actor fails here we are screwed anyway"));
tracing::info!("Refund transaction published on chain: {}", txid); tracing::info!("Refund transaction published on chain: {}", txid);
} }
for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) { for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) {
let signed_commit_tx = cfd.commit_tx()?; let signed_commit_tx = cfd.commit_tx()?;
let txid = try_continue!(wallet.try_broadcast_transaction(signed_commit_tx).await); let txid = try_continue!(wallet
.send(wallet::TryBroadcastTransaction {
tx: signed_commit_tx
})
.await
.expect("if sending to actor fails here we are screwed anyway"));
tracing::info!("Commit transaction published on chain: {}", txid); tracing::info!("Commit transaction published on chain: {}", txid);
} }
@ -50,7 +66,10 @@ pub async fn rebroadcast_transactions(
for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_cet(cfd)) { for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_cet(cfd)) {
// Double question mark OK because if we are in PendingCet we must have been Ready before // Double question mark OK because if we are in PendingCet we must have been Ready before
let signed_cet = cfd.cet()??; let signed_cet = cfd.cet()??;
let txid = try_continue!(wallet.try_broadcast_transaction(signed_cet).await); let txid = try_continue!(wallet
.send(wallet::TryBroadcastTransaction { tx: signed_cet })
.await
.expect("if sending to actor fails here we are screwed anyway"));
tracing::info!("CET published on chain: {}", txid); tracing::info!("CET published on chain: {}", txid);
} }

215
daemon/src/lib.rs

@ -1,9 +1,25 @@
#![cfg_attr(not(test), warn(clippy::unwrap_used))] #![cfg_attr(not(test), warn(clippy::unwrap_used))]
use crate::db::load_all_cfds;
use crate::maker_cfd::{FromTaker, NewTakerOnline};
use crate::model::cfd::{Cfd, Order, UpdateCfdProposals};
use crate::oracle::Attestation;
use anyhow::Result;
use cfd_protocol::secp256k1_zkp::schnorrsig;
use futures::Stream;
use sqlx::SqlitePool;
use std::collections::HashMap;
use std::future::Future;
use std::time::Duration;
use tokio::sync::watch;
use xtra::message_channel::{MessageChannel, StrongMessageChannel};
use xtra::spawn::TokioGlobalSpawnExt;
use xtra::{Actor, Address};
pub mod actors; pub mod actors;
pub mod auth; pub mod auth;
pub mod bitmex_price_feed; pub mod bitmex_price_feed;
pub mod cfd_actors; pub mod cfd_actors;
pub mod connection;
pub mod db; pub mod db;
pub mod fan_out; pub mod fan_out;
pub mod forward_only_ok; pub mod forward_only_ok;
@ -28,3 +44,202 @@ pub mod try_continue;
pub mod wallet; pub mod wallet;
pub mod wallet_sync; pub mod wallet_sync;
pub mod wire; pub mod wire;
pub struct MakerActorSystem<O, M, T, W> {
pub cfd_actor_addr: Address<maker_cfd::Actor<O, M, T, W>>,
pub cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
pub order_feed_receiver: watch::Receiver<Option<Order>>,
pub update_cfd_feed_receiver: watch::Receiver<UpdateCfdProposals>,
pub inc_conn_addr: Address<T>,
}
impl<O, M, T, W> MakerActorSystem<O, M, T, W>
where
O: xtra::Handler<oracle::MonitorAttestation>
+ xtra::Handler<oracle::GetAnnouncement>
+ xtra::Handler<oracle::FetchAnnouncement>
+ xtra::Handler<oracle::Sync>,
M: xtra::Handler<monitor::StartMonitoring>
+ xtra::Handler<monitor::Sync>
+ xtra::Handler<monitor::CollaborativeSettlement>
+ xtra::Handler<oracle::Attestation>,
T: xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>,
W: xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::Sync>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::TryBroadcastTransaction>,
{
pub async fn new<F>(
db: SqlitePool,
wallet_addr: Address<W>,
oracle_pk: schnorrsig::PublicKey,
oracle_constructor: impl FnOnce(Vec<Cfd>, Box<dyn StrongMessageChannel<Attestation>>) -> O,
monitor_constructor: impl FnOnce(Box<dyn StrongMessageChannel<monitor::Event>>, Vec<Cfd>) -> F,
inc_conn_constructor: impl FnOnce(
Box<dyn MessageChannel<NewTakerOnline>>,
Box<dyn MessageChannel<FromTaker>>,
) -> T,
term: time::Duration,
) -> Result<Self>
where
F: Future<Output = Result<M>>,
{
let mut conn = db.acquire().await?;
let cfds = load_all_cfds(&mut conn).await?;
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (update_cfd_feed_sender, update_cfd_feed_receiver) =
watch::channel::<UpdateCfdProposals>(HashMap::new());
let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None);
let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None);
let (inc_conn_addr, inc_conn_ctx) = xtra::Context::new(None);
let cfd_actor_addr = maker_cfd::Actor::new(
db,
wallet_addr,
term,
oracle_pk,
cfd_feed_sender,
order_feed_sender,
update_cfd_feed_sender,
inc_conn_addr.clone(),
monitor_addr.clone(),
oracle_addr.clone(),
)
.create(None)
.spawn_global();
tokio::spawn(inc_conn_ctx.run(inc_conn_constructor(
Box::new(cfd_actor_addr.clone()),
Box::new(cfd_actor_addr.clone()),
)));
tokio::spawn(
monitor_ctx
.notify_interval(Duration::from_secs(20), || monitor::Sync)
.map_err(|e| anyhow::anyhow!(e))?,
);
tokio::spawn(
monitor_ctx
.run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?),
);
tokio::spawn(
oracle_ctx
.notify_interval(Duration::from_secs(5), || oracle::Sync)
.map_err(|e| anyhow::anyhow!(e))?,
);
let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr])
.create(None)
.spawn_global();
tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor))));
oracle_addr.do_send_async(oracle::Sync).await?;
Ok(Self {
cfd_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
inc_conn_addr,
})
}
}
pub struct TakerActorSystem<O, M, W> {
pub cfd_actor_addr: Address<taker_cfd::Actor<O, M, W>>,
pub cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
pub order_feed_receiver: watch::Receiver<Option<Order>>,
pub update_cfd_feed_receiver: watch::Receiver<UpdateCfdProposals>,
}
impl<O, M, W> TakerActorSystem<O, M, W>
where
O: xtra::Handler<oracle::MonitorAttestation>
+ xtra::Handler<oracle::GetAnnouncement>
+ xtra::Handler<oracle::FetchAnnouncement>
+ xtra::Handler<oracle::Sync>,
M: xtra::Handler<monitor::StartMonitoring>
+ xtra::Handler<monitor::Sync>
+ xtra::Handler<monitor::CollaborativeSettlement>
+ xtra::Handler<oracle::Attestation>,
W: xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::Sync>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::TryBroadcastTransaction>,
{
pub async fn new<F>(
db: SqlitePool,
wallet_addr: Address<W>,
oracle_pk: schnorrsig::PublicKey,
send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker>>,
read_from_maker: Box<dyn Stream<Item = taker_cfd::MakerStreamMessage> + Unpin + Send>,
oracle_constructor: impl FnOnce(Vec<Cfd>, Box<dyn StrongMessageChannel<Attestation>>) -> O,
monitor_constructor: impl FnOnce(Box<dyn StrongMessageChannel<monitor::Event>>, Vec<Cfd>) -> F,
) -> Result<Self>
where
F: Future<Output = Result<M>>,
{
let mut conn = db.acquire().await?;
let cfds = load_all_cfds(&mut conn).await?;
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (update_cfd_feed_sender, update_cfd_feed_receiver) =
watch::channel::<UpdateCfdProposals>(HashMap::new());
let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None);
let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None);
let cfd_actor_addr = taker_cfd::Actor::new(
db,
wallet_addr,
oracle_pk,
cfd_feed_sender,
order_feed_sender,
update_cfd_feed_sender,
send_to_maker,
monitor_addr.clone(),
oracle_addr,
)
.create(None)
.spawn_global();
tokio::spawn(cfd_actor_addr.clone().attach_stream(read_from_maker));
tokio::spawn(
monitor_ctx
.notify_interval(Duration::from_secs(20), || monitor::Sync)
.map_err(|e| anyhow::anyhow!(e))?,
);
tokio::spawn(
monitor_ctx
.run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?),
);
tokio::spawn(
oracle_ctx
.notify_interval(Duration::from_secs(5), || oracle::Sync)
.map_err(|e| anyhow::anyhow!(e))?,
);
let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr])
.create(None)
.spawn_global();
tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor))));
Ok(Self {
cfd_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
})
}
}

164
daemon/src/maker.rs

@ -3,31 +3,29 @@ use bdk::bitcoin;
use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::secp256k1::schnorrsig;
use clap::Clap; use clap::Clap;
use daemon::auth::{self, MAKER_USERNAME}; use daemon::auth::{self, MAKER_USERNAME};
use daemon::db::{self, load_all_cfds}; use daemon::db::{self};
use daemon::maker_cfd::{FromTaker, NewTakerOnline};
use daemon::model::cfd::{Cfd, Order, UpdateCfdProposals};
use daemon::model::WalletInfo; use daemon::model::WalletInfo;
use daemon::oracle::Attestation;
use daemon::seed::Seed; use daemon::seed::Seed;
use daemon::wallet::Wallet;
use daemon::{ use daemon::{
bitmex_price_feed, fan_out, housekeeping, logger, maker_cfd, maker_inc_connections, monitor, bitmex_price_feed, housekeeping, logger, maker_cfd, maker_inc_connections, monitor, oracle,
oracle, wallet_sync, wallet, wallet_sync, MakerActorSystem,
}; };
use futures::Future;
use sqlx::sqlite::SqliteConnectOptions; use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool; use sqlx::SqlitePool;
use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::task::Poll; use std::task::Poll;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::sync::watch; use tokio::sync::watch;
use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::filter::LevelFilter;
use xtra::prelude::*; use xtra::prelude::*;
use xtra::spawn::TokioGlobalSpawnExt; use xtra::spawn::TokioGlobalSpawnExt;
use xtra::Actor;
mod routes_maker; mod routes_maker;
@ -132,13 +130,16 @@ async fn main() -> Result<()> {
let bitcoin_network = opts.network.bitcoin_network(); let bitcoin_network = opts.network.bitcoin_network();
let ext_priv_key = seed.derive_extended_priv_key(bitcoin_network)?; let ext_priv_key = seed.derive_extended_priv_key(bitcoin_network)?;
let wallet = Wallet::new( let wallet = wallet::Actor::new(
opts.network.electrum(), opts.network.electrum(),
&data_dir.join("maker_wallet.sqlite"), &data_dir.join("maker_wallet.sqlite"),
ext_priv_key, ext_priv_key,
) )
.await?; .await?
let wallet_info = wallet.sync().await?; .create(None)
.spawn_global();
let wallet_info = wallet.send(wallet::Sync).await??;
let auth_password = seed.derive_auth_password::<auth::Password>(); let auth_password = seed.derive_auth_password::<auth::Password>();
@ -189,12 +190,13 @@ async fn main() -> Result<()> {
housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn).await?; housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn).await?;
housekeeping::rebroadcast_transactions(&mut conn, &wallet).await?; housekeeping::rebroadcast_transactions(&mut conn, &wallet).await?;
let ActorSystem { let MakerActorSystem {
cfd_actor_addr, cfd_actor_addr,
cfd_feed_receiver, cfd_feed_receiver,
order_feed_receiver, order_feed_receiver,
update_cfd_feed_receiver, update_cfd_feed_receiver,
} = ActorSystem::new( inc_conn_addr: incoming_connection_addr,
} = MakerActorSystem::new(
db.clone(), db.clone(),
wallet.clone(), wallet.clone(),
oracle, oracle,
@ -206,11 +208,23 @@ async fn main() -> Result<()> {
} }
}, },
|channel0, channel1| maker_inc_connections::Actor::new(channel0, channel1), |channel0, channel1| maker_inc_connections::Actor::new(channel0, channel1),
listener,
time::Duration::hours(opts.term as i64), time::Duration::hours(opts.term as i64),
) )
.await?; .await?;
let listener_stream = futures::stream::poll_fn(move |ctx| {
let message = match futures::ready!(listener.poll_accept(ctx)) {
Ok((stream, address)) => {
maker_inc_connections::ListenerMessage::NewConnection { stream, address }
}
Err(e) => maker_inc_connections::ListenerMessage::Error { source: e },
};
Poll::Ready(Some(message))
});
tokio::spawn(incoming_connection_addr.attach_stream(listener_stream));
tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender)); tokio::spawn(wallet_sync::new(wallet, wallet_feed_sender));
let cfd_action_channel = MessageChannel::<maker_cfd::CfdAction>::clone_channel(&cfd_actor_addr); let cfd_action_channel = MessageChannel::<maker_cfd::CfdAction>::clone_channel(&cfd_actor_addr);
@ -248,119 +262,3 @@ async fn main() -> Result<()> {
Ok(()) Ok(())
} }
pub struct ActorSystem<O, M, T> {
cfd_actor_addr: Address<maker_cfd::Actor<O, M, T>>,
cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
order_feed_receiver: watch::Receiver<Option<Order>>,
update_cfd_feed_receiver: watch::Receiver<UpdateCfdProposals>,
}
impl<O, M, T> ActorSystem<O, M, T>
where
O: xtra::Handler<oracle::MonitorAttestation>
+ xtra::Handler<oracle::GetAnnouncement>
+ xtra::Handler<oracle::FetchAnnouncement>
+ xtra::Handler<oracle::Sync>,
M: xtra::Handler<monitor::StartMonitoring>
+ xtra::Handler<monitor::Sync>
+ xtra::Handler<monitor::CollaborativeSettlement>
+ xtra::Handler<oracle::Attestation>,
T: xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>
+ xtra::Handler<maker_inc_connections::ListenerMessage>,
{
#[allow(clippy::too_many_arguments)]
pub async fn new<F>(
db: SqlitePool,
wallet: Wallet,
oracle_pk: schnorrsig::PublicKey,
oracle_constructor: impl Fn(Vec<Cfd>, Box<dyn StrongMessageChannel<Attestation>>) -> O,
monitor_constructor: impl Fn(Box<dyn StrongMessageChannel<monitor::Event>>, Vec<Cfd>) -> F,
inc_conn_constructor: impl Fn(
Box<dyn MessageChannel<NewTakerOnline>>,
Box<dyn MessageChannel<FromTaker>>,
) -> T,
listener: TcpListener,
term: time::Duration,
) -> Result<Self>
where
F: Future<Output = Result<M>>,
{
let mut conn = db.acquire().await?;
let cfds = load_all_cfds(&mut conn).await?;
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (update_cfd_feed_sender, update_cfd_feed_receiver) =
watch::channel::<UpdateCfdProposals>(HashMap::new());
let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None);
let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None);
let (inc_conn_addr, inc_conn_ctx) = xtra::Context::new(None);
let cfd_actor_addr = maker_cfd::Actor::new(
db,
wallet,
term,
oracle_pk,
cfd_feed_sender,
order_feed_sender,
update_cfd_feed_sender,
inc_conn_addr.clone(),
monitor_addr.clone(),
oracle_addr.clone(),
)
.create(None)
.spawn_global();
tokio::spawn(inc_conn_ctx.run(inc_conn_constructor(
Box::new(cfd_actor_addr.clone()),
Box::new(cfd_actor_addr.clone()),
)));
tokio::spawn(
monitor_ctx
.notify_interval(Duration::from_secs(20), || monitor::Sync)
.map_err(|e| anyhow::anyhow!(e))?,
);
tokio::spawn(
monitor_ctx
.run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?),
);
tokio::spawn(
oracle_ctx
.notify_interval(Duration::from_secs(5), || oracle::Sync)
.map_err(|e| anyhow::anyhow!(e))?,
);
let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr])
.create(None)
.spawn_global();
tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor))));
oracle_addr.do_send_async(oracle::Sync).await?;
let listener_stream = futures::stream::poll_fn(move |ctx| {
let message = match futures::ready!(listener.poll_accept(ctx)) {
Ok((stream, address)) => {
maker_inc_connections::ListenerMessage::NewConnection { stream, address }
}
Err(e) => maker_inc_connections::ListenerMessage::Error { source: e },
};
Poll::Ready(Some(message))
});
tokio::spawn(inc_conn_addr.attach_stream(listener_stream));
Ok(Self {
cfd_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
})
}
}

153
daemon/src/maker_cfd.rs

@ -8,8 +8,7 @@ use crate::model::cfd::{
}; };
use crate::model::{TakerId, Usd}; use crate::model::{TakerId, Usd};
use crate::monitor::MonitorParams; use crate::monitor::MonitorParams;
use crate::wallet::Wallet; use crate::{log_error, maker_inc_connections, monitor, oracle, setup_contract, wallet, wire};
use crate::{log_error, maker_inc_connections, monitor, oracle, setup_contract, wire};
use anyhow::{Context as _, Result}; use anyhow::{Context as _, Result};
use async_trait::async_trait; use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::secp256k1::schnorrsig;
@ -59,9 +58,9 @@ pub struct FromTaker {
pub msg: wire::TakerToMaker, pub msg: wire::TakerToMaker,
} }
pub struct Actor<O, M, T> { pub struct Actor<O, M, T, W> {
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Address<W>,
term: Duration, term: Duration,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
@ -94,11 +93,11 @@ enum RollOverState {
None, None,
} }
impl<O, M, T> Actor<O, M, T> { impl<O, M, T, W> Actor<O, M, T, W> {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Address<W>,
term: Duration, term: Duration,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
@ -127,19 +126,6 @@ impl<O, M, T> Actor<O, M, T> {
} }
} }
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_propose_roll_over( async fn handle_propose_roll_over(
&mut self, &mut self,
proposal: RollOverProposal, proposal: RollOverProposal,
@ -204,30 +190,6 @@ impl<O, M, T> Actor<O, M, T> {
Ok(()) Ok(())
} }
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event(
event,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_oracle_attestation(
attestation,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_inc_protocol_msg( async fn handle_inc_protocol_msg(
&mut self, &mut self,
taker_id: TakerId, taker_id: TakerId,
@ -311,7 +273,49 @@ impl<O, M, T> Actor<O, M, T> {
} }
} }
impl<O, M, T> Actor<O, M, T> impl<O, M, T, W> Actor<O, M, T, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event(
event,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_oracle_attestation(
attestation,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
}
impl<O, M, T, W> Actor<O, M, T, W>
where where
T: xtra::Handler<maker_inc_connections::TakerMessage>, T: xtra::Handler<maker_inc_connections::TakerMessage>,
{ {
@ -401,7 +405,7 @@ where
} }
} }
impl<O, M, T> Actor<O, M, T> impl<O, M, T, W> Actor<O, M, T, W>
where where
T: xtra::Handler<maker_inc_connections::TakerMessage> T: xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>, + xtra::Handler<maker_inc_connections::BroadcastOrder>,
@ -530,11 +534,12 @@ where
} }
} }
impl<O, M, T> Actor<O, M, T> impl<O, M, T, W> Actor<O, M, T, W>
where where
Self: xtra::Handler<CfdSetupCompleted>, Self: xtra::Handler<CfdSetupCompleted>,
O: xtra::Handler<oracle::GetAnnouncement>, O: xtra::Handler<oracle::GetAnnouncement>,
T: xtra::Handler<maker_inc_connections::TakerMessage>, T: xtra::Handler<maker_inc_connections::TakerMessage>,
W: xtra::Handler<wallet::Sign> + xtra::Handler<wallet::BuildPartyParams>,
{ {
async fn handle_accept_order( async fn handle_accept_order(
&mut self, &mut self,
@ -622,7 +627,7 @@ where
} }
} }
impl<O, M, T> Actor<O, M, T> impl<O, M, T, W> Actor<O, M, T, W>
where where
O: xtra::Handler<oracle::FetchAnnouncement>, O: xtra::Handler<oracle::FetchAnnouncement>,
T: xtra::Handler<maker_inc_connections::BroadcastOrder>, T: xtra::Handler<maker_inc_connections::BroadcastOrder>,
@ -667,10 +672,11 @@ where
} }
} }
impl<O, M, T> Actor<O, M, T> impl<O, M, T, W> Actor<O, M, T, W>
where where
O: xtra::Handler<oracle::MonitorAttestation>, O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>, M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{ {
async fn handle_cfd_setup_completed( async fn handle_cfd_setup_completed(
&mut self, &mut self,
@ -694,8 +700,10 @@ where
let txid = self let txid = self
.wallet .wallet
.try_broadcast_transaction(dlc.lock.0.clone()) .send(wallet::TryBroadcastTransaction {
.await?; tx: dlc.lock.0.clone(),
})
.await??;
tracing::info!("Lock transaction published with txid {}", txid); tracing::info!("Lock transaction published with txid {}", txid);
@ -716,7 +724,7 @@ where
} }
} }
impl<O, M, T> Actor<O, M, T> impl<O, M, T, W> Actor<O, M, T, W>
where where
Self: xtra::Handler<CfdRollOverCompleted>, Self: xtra::Handler<CfdRollOverCompleted>,
O: xtra::Handler<oracle::MonitorAttestation> + xtra::Handler<oracle::GetAnnouncement>, O: xtra::Handler<oracle::MonitorAttestation> + xtra::Handler<oracle::GetAnnouncement>,
@ -809,7 +817,7 @@ where
} }
} }
impl<O, M, T> Actor<O, M, T> impl<O, M, T, W> Actor<O, M, T, W>
where where
M: xtra::Handler<monitor::StartMonitoring>, M: xtra::Handler<monitor::StartMonitoring>,
{ {
@ -842,9 +850,10 @@ where
} }
} }
impl<O, M, T> Actor<O, M, T> impl<O, M, T, W> Actor<O, M, T, W>
where where
M: xtra::Handler<monitor::CollaborativeSettlement>, M: xtra::Handler<monitor::CollaborativeSettlement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{ {
async fn handle_initiate_settlement( async fn handle_initiate_settlement(
&mut self, &mut self,
@ -888,8 +897,10 @@ where
let txid = self let txid = self
.wallet .wallet
.try_broadcast_transaction(spend_tx.clone()) .send(wallet::TryBroadcastTransaction {
.await tx: spend_tx.clone(),
})
.await?
.context("Broadcasting spend transaction")?; .context("Broadcasting spend transaction")?;
tracing::info!("Close transaction published with txid {}", txid); tracing::info!("Close transaction published with txid {}", txid);
@ -909,12 +920,15 @@ where
} }
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static, T: 'static> Handler<CfdAction> for Actor<O, M, T> impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<CfdAction> for Actor<O, M, T, W>
where where
Self: xtra::Handler<CfdSetupCompleted> + xtra::Handler<CfdRollOverCompleted>, Self: xtra::Handler<CfdSetupCompleted> + xtra::Handler<CfdRollOverCompleted>,
O: xtra::Handler<oracle::MonitorAttestation> + xtra::Handler<oracle::GetAnnouncement>, O: xtra::Handler<oracle::MonitorAttestation> + xtra::Handler<oracle::GetAnnouncement>,
T: xtra::Handler<maker_inc_connections::TakerMessage> T: xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>, + xtra::Handler<maker_inc_connections::BroadcastOrder>,
W: xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::TryBroadcastTransaction>,
{ {
async fn handle(&mut self, msg: CfdAction, ctx: &mut Context<Self>) { async fn handle(&mut self, msg: CfdAction, ctx: &mut Context<Self>) {
use CfdAction::*; use CfdAction::*;
@ -933,7 +947,7 @@ where
} }
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static, T: 'static> Handler<NewOrder> for Actor<O, M, T> impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<NewOrder> for Actor<O, M, T, W>
where where
O: xtra::Handler<oracle::FetchAnnouncement>, O: xtra::Handler<oracle::FetchAnnouncement>,
T: xtra::Handler<maker_inc_connections::BroadcastOrder>, T: xtra::Handler<maker_inc_connections::BroadcastOrder>,
@ -944,7 +958,7 @@ where
} }
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static, T: 'static> Handler<NewTakerOnline> for Actor<O, M, T> impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<NewTakerOnline> for Actor<O, M, T, W>
where where
T: xtra::Handler<maker_inc_connections::TakerMessage>, T: xtra::Handler<maker_inc_connections::TakerMessage>,
{ {
@ -954,10 +968,12 @@ where
} }
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static, T: 'static> Handler<CfdSetupCompleted> for Actor<O, M, T> impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<CfdSetupCompleted>
for Actor<O, M, T, W>
where where
O: xtra::Handler<oracle::MonitorAttestation>, O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>, M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{ {
async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context<Self>) {
log_error!(self.handle_cfd_setup_completed(msg.order_id, msg.dlc)); log_error!(self.handle_cfd_setup_completed(msg.order_id, msg.dlc));
@ -965,7 +981,8 @@ where
} }
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static, T: 'static> Handler<CfdRollOverCompleted> for Actor<O, M, T> impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<CfdRollOverCompleted>
for Actor<O, M, T, W>
where where
M: xtra::Handler<monitor::StartMonitoring>, M: xtra::Handler<monitor::StartMonitoring>,
{ {
@ -975,18 +992,22 @@ where
} }
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static, T: 'static> Handler<monitor::Event> for Actor<O, M, T> { impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<monitor::Event> for Actor<O, M, T, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context<Self>) {
log_error!(self.handle_monitoring_event(msg)) log_error!(self.handle_monitoring_event(msg))
} }
} }
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static, T: 'static> Handler<FromTaker> for Actor<O, M, T> impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<FromTaker> for Actor<O, M, T, W>
where where
T: xtra::Handler<maker_inc_connections::BroadcastOrder> T: xtra::Handler<maker_inc_connections::BroadcastOrder>
+ xtra::Handler<maker_inc_connections::TakerMessage>, + xtra::Handler<maker_inc_connections::TakerMessage>,
M: xtra::Handler<monitor::CollaborativeSettlement>, M: xtra::Handler<monitor::CollaborativeSettlement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{ {
async fn handle(&mut self, FromTaker { taker_id, msg }: FromTaker, _ctx: &mut Context<Self>) { async fn handle(&mut self, FromTaker { taker_id, msg }: FromTaker, _ctx: &mut Context<Self>) {
match msg { match msg {
@ -1041,7 +1062,11 @@ where
} }
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static, T: 'static> Handler<oracle::Attestation> for Actor<O, M, T> { impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<oracle::Attestation>
for Actor<O, M, T, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context<Self>) {
log_error!(self.handle_oracle_attestation(msg)) log_error!(self.handle_oracle_attestation(msg))
} }
@ -1071,4 +1096,4 @@ impl Message for FromTaker {
type Result = (); type Result = ();
} }
impl<O: 'static, M: 'static, T: 'static> xtra::Actor for Actor<O, M, T> {} impl<O: 'static, M: 'static, T: 'static, W: 'static> xtra::Actor for Actor<O, M, T, W> {}

25
daemon/src/setup_contract.rs

@ -1,10 +1,9 @@
use crate::model::cfd::{Cet, Cfd, Dlc, RevokedCommit, Role}; use crate::model::cfd::{Cet, Cfd, Dlc, RevokedCommit, Role};
use crate::tokio_ext::FutureExt; use crate::tokio_ext::FutureExt;
use crate::wallet::Wallet;
use crate::wire::{ use crate::wire::{
Msg0, Msg1, Msg2, RollOverMsg, RollOverMsg0, RollOverMsg1, RollOverMsg2, SetupMsg, Msg0, Msg1, Msg2, RollOverMsg, RollOverMsg0, RollOverMsg1, RollOverMsg2, SetupMsg,
}; };
use crate::{model, oracle, payout_curve}; use crate::{model, oracle, payout_curve, wallet};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use bdk::bitcoin::secp256k1::{schnorrsig, Signature, SECP256K1}; use bdk::bitcoin::secp256k1::{schnorrsig, Signature, SECP256K1};
use bdk::bitcoin::util::psbt::PartiallySignedTransaction; use bdk::bitcoin::util::psbt::PartiallySignedTransaction;
@ -23,25 +22,33 @@ use std::collections::HashMap;
use std::iter::FromIterator; use std::iter::FromIterator;
use std::ops::RangeInclusive; use std::ops::RangeInclusive;
use std::time::Duration; use std::time::Duration;
use xtra::Address;
/// Given an initial set of parameters, sets up the CFD contract with /// Given an initial set of parameters, sets up the CFD contract with
/// the other party. /// the other party.
pub async fn new( pub async fn new<W>(
mut sink: impl Sink<SetupMsg, Error = anyhow::Error> + Unpin, mut sink: impl Sink<SetupMsg, Error = anyhow::Error> + Unpin,
mut stream: impl FusedStream<Item = SetupMsg> + Unpin, mut stream: impl FusedStream<Item = SetupMsg> + Unpin,
(oracle_pk, announcement): (schnorrsig::PublicKey, oracle::Announcement), (oracle_pk, announcement): (schnorrsig::PublicKey, oracle::Announcement),
cfd: Cfd, cfd: Cfd,
wallet: Wallet, wallet: Address<W>,
role: Role, role: Role,
) -> Result<Dlc> { ) -> Result<Dlc>
where
W: xtra::Handler<wallet::Sign> + xtra::Handler<wallet::BuildPartyParams>,
{
let (sk, pk) = crate::keypair::new(&mut rand::thread_rng()); let (sk, pk) = crate::keypair::new(&mut rand::thread_rng());
let (rev_sk, rev_pk) = crate::keypair::new(&mut rand::thread_rng()); let (rev_sk, rev_pk) = crate::keypair::new(&mut rand::thread_rng());
let (publish_sk, publish_pk) = crate::keypair::new(&mut rand::thread_rng()); let (publish_sk, publish_pk) = crate::keypair::new(&mut rand::thread_rng());
let margin = cfd.margin().context("Failed to calculate margin")?; let margin = cfd.margin().context("Failed to calculate margin")?;
let own_params = wallet let own_params = wallet
.build_party_params(margin, pk) .send(wallet::BuildPartyParams {
amount: margin,
identity_pk: pk,
})
.await .await
.context("Failed to send message to wallet actor")?
.context("Failed to build party params")?; .context("Failed to build party params")?;
let own_punish = PunishParams { let own_punish = PunishParams {
@ -171,7 +178,11 @@ pub async fn new(
tracing::info!("Verified all signatures"); tracing::info!("Verified all signatures");
let mut signed_lock_tx = wallet.sign(lock_tx).await?; let mut signed_lock_tx = wallet
.send(wallet::Sign { psbt: lock_tx })
.await
.context("Failed to send message to wallet actor")?
.context("Failed to sign transaction")?;
sink.send(SetupMsg::Msg2(Msg2 { sink.send(SetupMsg::Msg2(Msg2 {
signed_lock: signed_lock_tx.clone(), signed_lock: signed_lock_tx.clone(),
})) }))

121
daemon/src/taker.rs

@ -2,30 +2,29 @@ use anyhow::{Context, Result};
use bdk::bitcoin; use bdk::bitcoin;
use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::secp256k1::schnorrsig;
use clap::Clap; use clap::Clap;
use daemon::db::{self, load_all_cfds}; use daemon::db::{self};
use daemon::model::cfd::{Cfd, Order, UpdateCfdProposals};
use daemon::model::WalletInfo; use daemon::model::WalletInfo;
use daemon::oracle::Attestation;
use daemon::seed::Seed; use daemon::seed::Seed;
use daemon::wallet::Wallet;
use daemon::{ use daemon::{
bitmex_price_feed, fan_out, housekeeping, logger, monitor, oracle, taker_cfd, wallet_sync, wire, bitmex_price_feed, connection, housekeeping, logger, monitor, oracle, taker_cfd, wallet,
wallet_sync, TakerActorSystem,
}; };
use futures::{Future, Stream};
use sqlx::sqlite::SqliteConnectOptions; use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool; use sqlx::SqlitePool;
use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::time::Duration;
use tokio::sync::watch; use tokio::sync::watch;
use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::filter::LevelFilter;
use xtra::prelude::{MessageChannel, StrongMessageChannel}; use xtra::prelude::MessageChannel;
use xtra::spawn::TokioGlobalSpawnExt; use xtra::spawn::TokioGlobalSpawnExt;
use xtra::{Actor, Address}; use xtra::Actor;
mod connection;
mod routes_taker; mod routes_taker;
#[derive(Clap)] #[derive(Clap)]
@ -123,13 +122,15 @@ async fn main() -> Result<()> {
let bitcoin_network = opts.network.bitcoin_network(); let bitcoin_network = opts.network.bitcoin_network();
let ext_priv_key = seed.derive_extended_priv_key(bitcoin_network)?; let ext_priv_key = seed.derive_extended_priv_key(bitcoin_network)?;
let wallet = Wallet::new( let wallet = wallet::Actor::new(
opts.network.electrum(), opts.network.electrum(),
&data_dir.join("taker_wallet.sqlite"), &data_dir.join("taker_wallet.sqlite"),
ext_priv_key, ext_priv_key,
) )
.await?; .await?
let wallet_info = wallet.sync().await.unwrap(); .create(None)
.spawn_global();
let wallet_info = wallet.send(wallet::Sync).await??;
// TODO: Actually fetch it from Olivia // TODO: Actually fetch it from Olivia
let oracle = schnorrsig::PublicKey::from_str( let oracle = schnorrsig::PublicKey::from_str(
@ -167,12 +168,12 @@ async fn main() -> Result<()> {
read_from_maker, read_from_maker,
} = connection::Actor::new(opts.maker).await; } = connection::Actor::new(opts.maker).await;
let ActorSystem { let TakerActorSystem {
cfd_actor_addr, cfd_actor_addr,
cfd_feed_receiver, cfd_feed_receiver,
order_feed_receiver, order_feed_receiver,
update_cfd_feed_receiver, update_cfd_feed_receiver,
} = ActorSystem::new( } = TakerActorSystem::new(
db.clone(), db.clone(),
wallet.clone(), wallet.clone(),
oracle, oracle,
@ -222,91 +223,3 @@ async fn main() -> Result<()> {
Ok(()) Ok(())
} }
pub struct ActorSystem<O, M> {
cfd_actor_addr: Address<taker_cfd::Actor<O, M>>,
cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
order_feed_receiver: watch::Receiver<Option<Order>>,
update_cfd_feed_receiver: watch::Receiver<UpdateCfdProposals>,
}
impl<O, M> ActorSystem<O, M>
where
O: xtra::Handler<oracle::MonitorAttestation>
+ xtra::Handler<oracle::GetAnnouncement>
+ xtra::Handler<oracle::FetchAnnouncement>
+ xtra::Handler<oracle::Sync>,
M: xtra::Handler<monitor::StartMonitoring>
+ xtra::Handler<monitor::Sync>
+ xtra::Handler<monitor::CollaborativeSettlement>
+ xtra::Handler<oracle::Attestation>,
{
pub async fn new<F>(
db: SqlitePool,
wallet: Wallet,
oracle_pk: schnorrsig::PublicKey,
send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker>>,
read_from_maker: Box<dyn Stream<Item = taker_cfd::MakerStreamMessage> + Unpin + Send>,
oracle_constructor: impl Fn(Vec<Cfd>, Box<dyn StrongMessageChannel<Attestation>>) -> O,
monitor_constructor: impl Fn(Box<dyn StrongMessageChannel<monitor::Event>>, Vec<Cfd>) -> F,
) -> Result<Self>
where
F: Future<Output = Result<M>>,
{
let mut conn = db.acquire().await?;
let cfds = load_all_cfds(&mut conn).await?;
let (cfd_feed_sender, cfd_feed_receiver) = watch::channel(cfds.clone());
let (order_feed_sender, order_feed_receiver) = watch::channel::<Option<Order>>(None);
let (update_cfd_feed_sender, update_cfd_feed_receiver) =
watch::channel::<UpdateCfdProposals>(HashMap::new());
let (monitor_addr, mut monitor_ctx) = xtra::Context::new(None);
let (oracle_addr, mut oracle_ctx) = xtra::Context::new(None);
let cfd_actor_addr = taker_cfd::Actor::new(
db,
wallet,
oracle_pk,
cfd_feed_sender,
order_feed_sender,
update_cfd_feed_sender,
send_to_maker,
monitor_addr.clone(),
oracle_addr,
)
.create(None)
.spawn_global();
tokio::spawn(cfd_actor_addr.clone().attach_stream(read_from_maker));
tokio::spawn(
monitor_ctx
.notify_interval(Duration::from_secs(20), || monitor::Sync)
.map_err(|e| anyhow::anyhow!(e))?,
);
tokio::spawn(
monitor_ctx
.run(monitor_constructor(Box::new(cfd_actor_addr.clone()), cfds.clone()).await?),
);
tokio::spawn(
oracle_ctx
.notify_interval(Duration::from_secs(5), || oracle::Sync)
.unwrap(),
);
let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr])
.create(None)
.spawn_global();
tokio::spawn(oracle_ctx.run(oracle_constructor(cfds, Box::new(fan_out_actor))));
Ok(Self {
cfd_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
})
}
}

171
daemon/src/taker_cfd.rs

@ -7,9 +7,8 @@ use crate::model::cfd::{
}; };
use crate::model::{BitMexPriceEventId, Usd}; use crate::model::{BitMexPriceEventId, Usd};
use crate::monitor::{self, MonitorParams}; use crate::monitor::{self, MonitorParams};
use crate::wallet::Wallet;
use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg}; use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg};
use crate::{log_error, oracle, setup_contract, wire}; use crate::{log_error, oracle, setup_contract, wallet, wire};
use anyhow::{Context as _, Result}; use anyhow::{Context as _, Result};
use async_trait::async_trait; use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig; use bdk::bitcoin::secp256k1::schnorrsig;
@ -68,9 +67,9 @@ enum RollOverState {
None, None,
} }
pub struct Actor<O, M> { pub struct Actor<O, M, W> {
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Address<W>,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>, order_feed_actor_inbox: watch::Sender<Option<Order>>,
@ -83,11 +82,16 @@ pub struct Actor<O, M> {
current_pending_proposals: UpdateCfdProposals, current_pending_proposals: UpdateCfdProposals,
} }
impl<O, M> Actor<O, M> { impl<O, M, W> Actor<O, M, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>,
{
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
db: sqlx::SqlitePool, db: sqlx::SqlitePool,
wallet: Wallet, wallet: Address<W>,
oracle_pk: schnorrsig::PublicKey, oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>, cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>, order_feed_actor_inbox: watch::Sender<Option<Order>>,
@ -111,7 +115,9 @@ impl<O, M> Actor<O, M> {
current_pending_proposals: HashMap::new(), current_pending_proposals: HashMap::new(),
} }
} }
}
impl<O, M, W> Actor<O, M, W> {
fn send_pending_update_proposals(&self) -> Result<()> { fn send_pending_update_proposals(&self) -> Result<()> {
Ok(self Ok(self
.update_cfd_feed_sender .update_cfd_feed_sender
@ -165,6 +171,25 @@ impl<O, M> Actor<O, M> {
Ok(()) Ok(())
} }
}
impl<O, M, W> Actor<O, M, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>,
{
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_propose_settlement( async fn handle_propose_settlement(
&mut self, &mut self,
@ -255,42 +280,6 @@ impl<O, M> Actor<O, M> {
Ok(()) Ok(())
} }
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event(
event,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_oracle_attestation(
attestation,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_invalid_order_id(&mut self, order_id: OrderId) -> Result<()> { async fn handle_invalid_order_id(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Invalid order ID"); tracing::debug!(%order_id, "Invalid order ID");
@ -311,7 +300,7 @@ impl<O, M> Actor<O, M> {
} }
} }
impl<O, M> Actor<O, M> impl<O, M, W> Actor<O, M, W>
where where
O: xtra::Handler<oracle::FetchAnnouncement>, O: xtra::Handler<oracle::FetchAnnouncement>,
{ {
@ -334,7 +323,42 @@ where
} }
Ok(()) Ok(())
} }
}
impl<O, M, W> Actor<O, M, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_oracle_attestation(
attestation,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event(
event,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
}
impl<O, M, W> Actor<O, M, W>
where
O: xtra::Handler<oracle::FetchAnnouncement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_propose_roll_over(&mut self, order_id: OrderId) -> Result<()> { async fn handle_propose_roll_over(&mut self, order_id: OrderId) -> Result<()> {
if self.current_pending_proposals.contains_key(&order_id) { if self.current_pending_proposals.contains_key(&order_id) {
anyhow::bail!("An update for order id {} is already in progress", order_id) anyhow::bail!("An update for order id {} is already in progress", order_id)
@ -371,11 +395,20 @@ where
Ok(()) Ok(())
} }
} }
impl<O, M, W> Actor<O, M, W>
where
O: xtra::Handler<oracle::FetchAnnouncement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>,
{
}
impl<O, M> Actor<O, M> impl<O, M, W> Actor<O, M, W>
where where
O: xtra::Handler<oracle::MonitorAttestation>, O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>, M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{ {
async fn handle_cfd_setup_completed( async fn handle_cfd_setup_completed(
&mut self, &mut self,
@ -399,8 +432,10 @@ where
let txid = self let txid = self
.wallet .wallet
.try_broadcast_transaction(dlc.lock.0.clone()) .send(wallet::TryBroadcastTransaction {
.await?; tx: dlc.lock.0.clone(),
})
.await??;
tracing::info!("Lock transaction published with txid {}", txid); tracing::info!("Lock transaction published with txid {}", txid);
@ -421,10 +456,11 @@ where
} }
} }
impl<O: 'static, M: 'static> Actor<O, M> impl<O: 'static, M: 'static, W: 'static> Actor<O, M, W>
where where
Self: xtra::Handler<CfdSetupCompleted>, Self: xtra::Handler<CfdSetupCompleted>,
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>, O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
W: xtra::Handler<wallet::Sign> + xtra::Handler<wallet::BuildPartyParams>,
{ {
async fn handle_order_accepted( async fn handle_order_accepted(
&mut self, &mut self,
@ -486,10 +522,13 @@ where
} }
} }
impl<O: 'static, M: 'static> Actor<O, M> impl<O: 'static, M: 'static, W: 'static> Actor<O, M, W>
where where
Self: xtra::Handler<CfdRollOverCompleted>, Self: xtra::Handler<CfdRollOverCompleted>,
O: xtra::Handler<oracle::GetAnnouncement>, O: xtra::Handler<oracle::GetAnnouncement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>,
{ {
async fn handle_roll_over_accepted( async fn handle_roll_over_accepted(
&mut self, &mut self,
@ -546,7 +585,7 @@ where
} }
} }
impl<O: 'static, M: 'static> Actor<O, M> impl<O: 'static, M: 'static, W: 'static> Actor<O, M, W>
where where
M: xtra::Handler<monitor::StartMonitoring>, M: xtra::Handler<monitor::StartMonitoring>,
{ {
@ -580,9 +619,12 @@ where
} }
} }
impl<O: 'static, M: 'static> Actor<O, M> impl<O: 'static, M: 'static, W: 'static> Actor<O, M, W>
where where
M: xtra::Handler<monitor::CollaborativeSettlement>, M: xtra::Handler<monitor::CollaborativeSettlement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>,
{ {
async fn handle_settlement_accepted( async fn handle_settlement_accepted(
&mut self, &mut self,
@ -630,16 +672,19 @@ where
} }
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static> Handler<TakeOffer> for Actor<O, M> { impl<O: 'static, M: 'static, W: 'static> Handler<TakeOffer> for Actor<O, M, W> {
async fn handle(&mut self, msg: TakeOffer, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: TakeOffer, _ctx: &mut Context<Self>) {
log_error!(self.handle_take_offer(msg.order_id, msg.quantity)); log_error!(self.handle_take_offer(msg.order_id, msg.quantity));
} }
} }
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static> Handler<CfdAction> for Actor<O, M> impl<O: 'static, M: 'static, W: 'static> Handler<CfdAction> for Actor<O, M, W>
where where
O: xtra::Handler<oracle::FetchAnnouncement>, O: xtra::Handler<oracle::FetchAnnouncement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>,
{ {
async fn handle(&mut self, msg: CfdAction, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: CfdAction, _ctx: &mut Context<Self>) {
use CfdAction::*; use CfdAction::*;
@ -661,13 +706,16 @@ where
} }
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static> Handler<MakerStreamMessage> for Actor<O, M> impl<O: 'static, M: 'static, W: 'static> Handler<MakerStreamMessage> for Actor<O, M, W>
where where
Self: xtra::Handler<CfdSetupCompleted> + xtra::Handler<CfdRollOverCompleted>, Self: xtra::Handler<CfdSetupCompleted> + xtra::Handler<CfdRollOverCompleted>,
O: xtra::Handler<oracle::FetchAnnouncement> O: xtra::Handler<oracle::FetchAnnouncement>
+ xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::GetAnnouncement>
+ xtra::Handler<oracle::MonitorAttestation>, + xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::CollaborativeSettlement>, M: xtra::Handler<monitor::CollaborativeSettlement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>,
{ {
async fn handle( async fn handle(
&mut self, &mut self,
@ -723,10 +771,11 @@ where
} }
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static> Handler<CfdSetupCompleted> for Actor<O, M> impl<O: 'static, M: 'static, W: 'static> Handler<CfdSetupCompleted> for Actor<O, M, W>
where where
O: xtra::Handler<oracle::MonitorAttestation>, O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>, M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{ {
async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context<Self>) {
log_error!(self.handle_cfd_setup_completed(msg.order_id, msg.dlc)); log_error!(self.handle_cfd_setup_completed(msg.order_id, msg.dlc));
@ -734,7 +783,7 @@ where
} }
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static> Handler<CfdRollOverCompleted> for Actor<O, M> impl<O: 'static, M: 'static, W: 'static> Handler<CfdRollOverCompleted> for Actor<O, M, W>
where where
M: xtra::Handler<monitor::StartMonitoring>, M: xtra::Handler<monitor::StartMonitoring>,
{ {
@ -744,14 +793,20 @@ where
} }
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static> Handler<monitor::Event> for Actor<O, M> { impl<O: 'static, M: 'static, W: 'static> Handler<monitor::Event> for Actor<O, M, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context<Self>) {
log_error!(self.handle_monitoring_event(msg)) log_error!(self.handle_monitoring_event(msg))
} }
} }
#[async_trait] #[async_trait]
impl<O: 'static, M: 'static> Handler<oracle::Attestation> for Actor<O, M> { impl<O: 'static, M: 'static, W: 'static> Handler<oracle::Attestation> for Actor<O, M, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context<Self>) { async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context<Self>) {
log_error!(self.handle_oracle_attestation(msg)) log_error!(self.handle_oracle_attestation(msg))
} }
@ -778,4 +833,4 @@ impl Message for CfdRollOverCompleted {
type Result = (); type Result = ();
} }
impl<O: 'static, M: 'static> xtra::Actor for Actor<O, M> {} impl<O: 'static, M: 'static, W: 'static> xtra::Actor for Actor<O, M, W> {}

57
daemon/src/wallet.rs

@ -13,9 +13,10 @@ use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::time::SystemTime; use std::time::SystemTime;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use xtra_productivity::xtra_productivity;
#[derive(Clone)] #[derive(Clone)]
pub struct Wallet { pub struct Actor {
wallet: Arc<Mutex<bdk::Wallet<ElectrumBlockchain, bdk::database::SqliteDatabase>>>, wallet: Arc<Mutex<bdk::Wallet<ElectrumBlockchain, bdk::database::SqliteDatabase>>>,
} }
@ -23,7 +24,7 @@ pub struct Wallet {
#[error("The transaction is already in the blockchain")] #[error("The transaction is already in the blockchain")]
pub struct TransactionAlreadyInBlockchain; pub struct TransactionAlreadyInBlockchain;
impl Wallet { impl Actor {
pub async fn new( pub async fn new(
electrum_rpc_url: &str, electrum_rpc_url: &str,
wallet_dir: &Path, wallet_dir: &Path,
@ -46,17 +47,11 @@ impl Wallet {
Ok(Self { wallet }) Ok(Self { wallet })
} }
pub async fn build_party_params(
&self,
amount: Amount,
identity_pk: PublicKey,
) -> Result<PartyParams> {
let wallet = self.wallet.lock().await;
wallet.build_party_params(amount, identity_pk)
} }
pub async fn sync(&self) -> Result<WalletInfo> { #[xtra_productivity]
impl Actor {
pub async fn handle_sync(&self, _msg: Sync) -> Result<WalletInfo> {
let wallet = self.wallet.lock().await; let wallet = self.wallet.lock().await;
wallet wallet
.sync(NoopProgress, None) .sync(NoopProgress, None)
@ -75,10 +70,8 @@ impl Wallet {
Ok(wallet_info) Ok(wallet_info)
} }
pub async fn sign( pub async fn handle_sign(&self, msg: Sign) -> Result<PartiallySignedTransaction> {
&self, let mut psbt = msg.psbt;
mut psbt: PartiallySignedTransaction,
) -> Result<PartiallySignedTransaction> {
let wallet = self.wallet.lock().await; let wallet = self.wallet.lock().await;
wallet wallet
@ -94,7 +87,22 @@ impl Wallet {
Ok(psbt) Ok(psbt)
} }
pub async fn try_broadcast_transaction(&self, tx: Transaction) -> Result<Txid> { pub async fn build_party_params(
&self,
BuildPartyParams {
amount,
identity_pk,
}: BuildPartyParams,
) -> Result<PartyParams> {
let wallet = self.wallet.lock().await;
wallet.build_party_params(amount, identity_pk)
}
pub async fn handle_try_broadcast_transaction(
&self,
msg: TryBroadcastTransaction,
) -> Result<Txid> {
let tx = msg.tx;
let wallet = self.wallet.lock().await; let wallet = self.wallet.lock().await;
let txid = tx.txid(); let txid = tx.txid();
@ -128,6 +136,23 @@ impl Wallet {
} }
} }
impl xtra::Actor for Actor {}
pub struct BuildPartyParams {
pub amount: Amount,
pub identity_pk: PublicKey,
}
pub struct Sync;
pub struct Sign {
pub psbt: PartiallySignedTransaction,
}
pub struct TryBroadcastTransaction {
pub tx: Transaction,
}
fn parse_rpc_protocol_error_code(error_value: &Value) -> Result<i64> { fn parse_rpc_protocol_error_code(error_value: &Value) -> Result<i64> {
let json = error_value let json = error_value
.as_str() .as_str()

11
daemon/src/wallet_sync.rs

@ -1,14 +1,19 @@
use crate::model::WalletInfo; use crate::model::WalletInfo;
use crate::wallet::Wallet; use crate::wallet;
use std::time::Duration; use std::time::Duration;
use tokio::sync::watch; use tokio::sync::watch;
use tokio::time::sleep; use tokio::time::sleep;
use xtra::Address;
pub async fn new(wallet: Wallet, sender: watch::Sender<WalletInfo>) { pub async fn new(wallet: Address<wallet::Actor>, sender: watch::Sender<WalletInfo>) {
loop { loop {
sleep(Duration::from_secs(10)).await; sleep(Duration::from_secs(10)).await;
let info = match wallet.sync().await { let info = match wallet
.send(wallet::Sync)
.await
.expect("Wallet actor to be available")
{
Ok(info) => info, Ok(info) => info,
Err(e) => { Err(e) => {
tracing::warn!("Failed to sync wallet: {:#}", e); tracing::warn!("Failed to sync wallet: {:#}", e);

273
daemon/tests/happy_path.rs

@ -0,0 +1,273 @@
use anyhow::Result;
use bdk::bitcoin::util::psbt::PartiallySignedTransaction;
use bdk::bitcoin::{ecdsa, Txid};
use cfd_protocol::secp256k1_zkp::{schnorrsig, Secp256k1};
use cfd_protocol::PartyParams;
use daemon::model::cfd::Order;
use daemon::model::{Usd, WalletInfo};
use daemon::{connection, db, logger, maker_cfd, maker_inc_connections, monitor, oracle, wallet};
use rand::thread_rng;
use rust_decimal_macros::dec;
use sqlx::SqlitePool;
use std::net::SocketAddr;
use std::str::FromStr;
use std::task::Poll;
use std::time::SystemTime;
use tokio::sync::watch;
use tracing_subscriber::filter::LevelFilter;
use xtra::spawn::TokioGlobalSpawnExt;
use xtra::Actor;
use xtra_productivity::xtra_productivity;
#[tokio::test]
async fn taker_receives_order_from_maker_on_publication() {
let (mut maker, mut taker) = start_both().await;
assert!(is_next_none(&mut taker.order_feed).await);
let (published, received) = tokio::join!(
maker.publish_order(new_dummy_order()),
next_some(&mut taker.order_feed)
);
// TODO: Add assertion function so we can assert on the other order values
assert_eq!(published.id, received.id);
}
fn new_dummy_order() -> maker_cfd::NewOrder {
maker_cfd::NewOrder {
price: Usd::new(dec!(50_000)),
min_quantity: Usd::new(dec!(10)),
max_quantity: Usd::new(dec!(100)),
}
}
/// Returns the value if the next Option received on the stream is Some
///
/// Panics if None is received on the stream.
async fn next_some<T>(rx: &mut watch::Receiver<Option<T>>) -> T
where
T: Clone,
{
if let Some(value) = next(rx).await {
value
} else {
panic!("Received None when Some was expected")
}
}
/// Returns true if the next Option received on the stream is None
///
/// Returns false if Some is received.
async fn is_next_none<T>(rx: &mut watch::Receiver<Option<T>>) -> bool
where
T: Clone,
{
next(rx).await.is_none()
}
/// Returns watch channel value upon change
async fn next<T>(rx: &mut watch::Receiver<T>) -> T
where
T: Clone,
{
rx.changed().await.unwrap();
rx.borrow().clone()
}
fn init_tracing() {
logger::init(LevelFilter::DEBUG, false).unwrap();
tracing::info!("Running version: {}", env!("VERGEN_GIT_SEMVER_LIGHTWEIGHT"));
}
/// Test Stub simulating the Oracle actor
struct Oracle;
impl xtra::Actor for Oracle {}
#[xtra_productivity(message_impl = false)]
impl Oracle {
async fn handle_fetch_announcement(&mut self, _msg: oracle::FetchAnnouncement) {}
async fn handle_get_announcement(
&mut self,
_msg: oracle::GetAnnouncement,
) -> Option<oracle::Announcement> {
todo!("stub this if needed")
}
async fn handle(&mut self, _msg: oracle::MonitorAttestation) {
todo!("stub this if needed")
}
async fn handle(&mut self, _msg: oracle::Sync) {}
}
/// Test Stub simulating the Monitor actor
struct Monitor;
impl xtra::Actor for Monitor {}
#[xtra_productivity(message_impl = false)]
impl Monitor {
async fn handle(&mut self, _msg: monitor::Sync) {}
async fn handle(&mut self, _msg: monitor::StartMonitoring) {
todo!("stub this if needed")
}
async fn handle(&mut self, _msg: monitor::CollaborativeSettlement) {
todo!("stub this if needed")
}
async fn handle(&mut self, _msg: oracle::Attestation) {
todo!("stub this if needed")
}
}
/// Test Stub simulating the Wallet actor
struct Wallet;
impl xtra::Actor for Wallet {}
#[xtra_productivity(message_impl = false)]
impl Wallet {
async fn handle(&mut self, _msg: wallet::BuildPartyParams) -> Result<PartyParams> {
todo!("stub this if needed")
}
async fn handle(&mut self, _msg: wallet::Sync) -> Result<WalletInfo> {
let s = Secp256k1::new();
let public_key = ecdsa::PublicKey::new(s.generate_keypair(&mut thread_rng()).1);
let address = bdk::bitcoin::Address::p2pkh(&public_key, bdk::bitcoin::Network::Testnet);
Ok(WalletInfo {
balance: bdk::bitcoin::Amount::ONE_BTC,
address,
last_updated_at: SystemTime::now(),
})
}
async fn handle(&mut self, _msg: wallet::Sign) -> Result<PartiallySignedTransaction> {
todo!("stub this if needed")
}
async fn handle(&mut self, _msg: wallet::TryBroadcastTransaction) -> Result<Txid> {
todo!("stub this if needed")
}
}
/// Maker Test Setup
struct Maker {
cfd_actor_addr:
xtra::Address<maker_cfd::Actor<Oracle, Monitor, maker_inc_connections::Actor, Wallet>>,
order_feed_receiver: watch::Receiver<Option<Order>>,
#[allow(dead_code)] // we need to keep the xtra::Address for refcounting
inc_conn_addr: xtra::Address<maker_inc_connections::Actor>,
address: SocketAddr,
}
impl Maker {
async fn start(oracle_pk: schnorrsig::PublicKey) -> Self {
let db = in_memory_db().await;
let wallet_addr = Wallet {}.create(None).spawn_global();
let term = time::Duration::hours(24);
let maker = daemon::MakerActorSystem::new(
db,
wallet_addr,
oracle_pk,
|_, _| Oracle,
|_, _| async { Ok(Monitor) },
|channel0, channel1| maker_inc_connections::Actor::new(channel0, channel1),
term,
)
.await
.unwrap();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let address = listener.local_addr().unwrap();
let listener_stream = futures::stream::poll_fn(move |ctx| {
let message = match futures::ready!(listener.poll_accept(ctx)) {
Ok((stream, address)) => {
dbg!("new connection");
maker_inc_connections::ListenerMessage::NewConnection { stream, address }
}
Err(e) => maker_inc_connections::ListenerMessage::Error { source: e },
};
Poll::Ready(Some(message))
});
tokio::spawn(maker.inc_conn_addr.clone().attach_stream(listener_stream));
Self {
cfd_actor_addr: maker.cfd_actor_addr,
order_feed_receiver: maker.order_feed_receiver,
inc_conn_addr: maker.inc_conn_addr,
address,
}
}
async fn publish_order(&mut self, new_order_params: maker_cfd::NewOrder) -> Order {
self.cfd_actor_addr.send(new_order_params).await.unwrap();
let next_order = self.order_feed_receiver.borrow().clone().unwrap();
next_order
}
}
/// Taker Test Setup
struct Taker {
order_feed: watch::Receiver<Option<Order>>,
}
impl Taker {
async fn start(oracle_pk: schnorrsig::PublicKey, maker_address: SocketAddr) -> Self {
let connection::Actor {
send_to_maker,
read_from_maker,
} = connection::Actor::new(maker_address).await;
let db = in_memory_db().await;
let wallet_addr = Wallet {}.create(None).spawn_global();
let taker = daemon::TakerActorSystem::new(
db,
wallet_addr,
oracle_pk,
send_to_maker,
read_from_maker,
|_, _| Oracle,
|_, _| async { Ok(Monitor) },
)
.await
.unwrap();
Self {
order_feed: taker.order_feed_receiver,
}
}
}
async fn start_both() -> (Maker, Taker) {
init_tracing();
let oracle_pk: schnorrsig::PublicKey = schnorrsig::PublicKey::from_str(
"ddd4636845a90185991826be5a494cde9f4a6947b1727217afedc6292fa4caf7",
)
.unwrap();
let maker = Maker::start(oracle_pk).await;
let taker = Taker::start(oracle_pk, maker.address).await;
(maker, taker)
}
async fn in_memory_db() -> SqlitePool {
// Note: Every :memory: database is distinct from every other. So, opening two database
// connections each with the filename ":memory:" will create two independent in-memory
// databases. see: https://www.sqlite.org/inmemorydb.html
let pool = SqlitePool::connect(":memory:").await.unwrap();
db::run_migrations(&pool).await.unwrap();
pool
}

28
xtra_productivity/src/lib.rs

@ -1,10 +1,26 @@
use proc_macro::TokenStream; use proc_macro::TokenStream;
use quote::quote; use quote::quote;
use syn::{FnArg, GenericParam, ImplItem, ItemImpl, ReturnType}; use syn::{FnArg, GenericParam, ImplItem, ItemImpl, MetaNameValue, ReturnType};
#[proc_macro_attribute] #[proc_macro_attribute]
pub fn xtra_productivity(_attribute: TokenStream, item: TokenStream) -> TokenStream { pub fn xtra_productivity(attribute: TokenStream, item: TokenStream) -> TokenStream {
let block = syn::parse::<ItemImpl>(item).unwrap(); let block = syn::parse::<ItemImpl>(item).unwrap();
let want_message_impl = if attribute.is_empty() {
true
} else {
let attribute = syn::parse::<MetaNameValue>(attribute).unwrap();
if !attribute.path.is_ident("message_impl") {
panic!(
"Unexpected attribute {:?}",
attribute.path.get_ident().unwrap()
)
}
matches!(
attribute.lit,
syn::Lit::Bool(syn::LitBool { value: true, .. })
)
};
let actor = block.self_ty; let actor = block.self_ty;
@ -60,10 +76,18 @@ pub fn xtra_productivity(_attribute: TokenStream, item: TokenStream) -> TokenStr
ReturnType::Type(_, ref t) => quote! { #t } ReturnType::Type(_, ref t) => quote! { #t }
}; };
let message_impl = if want_message_impl {
quote! { quote! {
impl xtra::Message for #message_type { impl xtra::Message for #message_type {
type Result = #result_type; type Result = #result_type;
} }
}
} else {
quote! {}
};
quote! {
#message_impl
#[async_trait::async_trait] #[async_trait::async_trait]
impl<#generic_params> xtra::Handler<#message_type> for #actor impl<#generic_params> xtra::Handler<#message_type> for #actor

20
xtra_productivity/tests/pass/can_handle_message.rs

@ -30,6 +30,21 @@ impl DummyActor {
fn is_i32(_: i32) {} fn is_i32(_: i32) {}
struct DummyMessageWithoutMessageImpl;
#[xtra_productivity(message_impl = false)]
impl DummyActor {
pub fn handle_dummy_message_without_message_impl(
&mut self,
_message: DummyMessageWithoutMessageImpl,
) {
}
}
impl xtra::Message for DummyMessageWithoutMessageImpl {
type Result = ();
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
// Create dummy actor // Create dummy actor
@ -39,4 +54,9 @@ async fn main() {
let i32 = dummy_actor.send(DummyMessage).await.unwrap(); let i32 = dummy_actor.send(DummyMessage).await.unwrap();
is_i32(i32); is_i32(i32);
dummy_actor.send(DummyMessageWithContext).await.unwrap(); dummy_actor.send(DummyMessageWithContext).await.unwrap();
dummy_actor
.send(DummyMessageWithoutMessageImpl)
.await
.unwrap();
} }

Loading…
Cancel
Save