Browse Source

Convert Wallet to actor

Include a generic Wallet actor constructor in the actor systems and allow
passing in a generic Wallet actor implementing xtra::Handlers into the cfd
actors.

Rename 'Maker' and 'Taker' to 'MakerActorSystem' and 'TakerActorSystem' for
readability.

Co-authored-by: Mariusz Klochowicz <mariusz@klochowicz.com>
contact-taker-before-changing-cfd-state
Daniel Karzel 3 years ago
committed by Mariusz Klochowicz
parent
commit
40f5b81e4d
No known key found for this signature in database GPG Key ID: 470C865699C8D4D
  1. 58
      daemon/src/cfd_actors.rs
  2. 2
      daemon/src/connection.rs
  3. 33
      daemon/src/housekeeping.rs
  4. 41
      daemon/src/lib.rs
  5. 20
      daemon/src/maker.rs
  6. 153
      daemon/src/maker_cfd.rs
  7. 25
      daemon/src/setup_contract.rs
  8. 19
      daemon/src/taker.rs
  9. 171
      daemon/src/taker_cfd.rs
  10. 57
      daemon/src/wallet.rs
  11. 11
      daemon/src/wallet_sync.rs

58
daemon/src/cfd_actors.rs

@ -1,7 +1,6 @@
use crate::db::load_cfd_by_order_id;
use crate::model::cfd::{Attestation, Cfd, CfdState, CfdStateChangeEvent, OrderId};
use crate::wallet::Wallet;
use crate::{db, monitor, oracle, try_continue};
use crate::{db, monitor, oracle, try_continue, wallet};
use anyhow::{bail, Context, Result};
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
@ -34,15 +33,21 @@ pub async fn append_cfd_state(
Ok(())
}
pub async fn try_cet_publication(
pub async fn try_cet_publication<W>(
cfd: &mut Cfd,
conn: &mut PoolConnection<Sqlite>,
wallet: &Wallet,
wallet: &xtra::Address<W>,
update_sender: &watch::Sender<Vec<Cfd>>,
) -> Result<()> {
) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
match cfd.cet()? {
Ok(cet) => {
let txid = wallet.try_broadcast_transaction(cet).await?;
let txid = wallet
.send(wallet::TryBroadcastTransaction { tx: cet })
.await?
.context("Failed to send transaction")?;
tracing::info!("CET published with txid {}", txid);
if cfd.handle(CfdStateChangeEvent::CetSent)?.is_none() {
@ -60,12 +65,15 @@ pub async fn try_cet_publication(
Ok(())
}
pub async fn handle_monitoring_event(
pub async fn handle_monitoring_event<W>(
event: monitor::Event,
conn: &mut PoolConnection<Sqlite>,
wallet: &Wallet,
wallet: &xtra::Address<W>,
update_sender: &watch::Sender<Vec<Cfd>>,
) -> Result<()> {
) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
let order_id = event.order_id();
let mut cfd = db::load_cfd_by_order_id(order_id, conn).await?;
@ -82,24 +90,37 @@ pub async fn handle_monitoring_event(
try_cet_publication(&mut cfd, conn, wallet, update_sender).await?;
} else if let CfdState::MustRefund { .. } = cfd.state {
let signed_refund_tx = cfd.refund_tx()?;
let txid = wallet.try_broadcast_transaction(signed_refund_tx).await?;
let txid = wallet
.send(wallet::TryBroadcastTransaction {
tx: signed_refund_tx,
})
.await?
.context("Failed to publish CET")?;
tracing::info!("Refund transaction published on chain: {}", txid);
}
Ok(())
}
pub async fn handle_commit(
pub async fn handle_commit<W>(
order_id: OrderId,
conn: &mut PoolConnection<Sqlite>,
wallet: &Wallet,
wallet: &xtra::Address<W>,
update_sender: &watch::Sender<Vec<Cfd>>,
) -> Result<()> {
) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
let mut cfd = db::load_cfd_by_order_id(order_id, conn).await?;
let signed_commit_tx = cfd.commit_tx()?;
let txid = wallet.try_broadcast_transaction(signed_commit_tx).await?;
let txid = wallet
.send(wallet::TryBroadcastTransaction {
tx: signed_commit_tx,
})
.await?
.context("Failed to publish commit tx")?;
if cfd.handle(CfdStateChangeEvent::CommitTxSent)?.is_none() {
bail!("If we can get the commit tx we should be able to transition")
@ -111,12 +132,15 @@ pub async fn handle_commit(
Ok(())
}
pub async fn handle_oracle_attestation(
pub async fn handle_oracle_attestation<W>(
attestation: oracle::Attestation,
conn: &mut PoolConnection<Sqlite>,
wallet: &Wallet,
wallet: &xtra::Address<W>,
update_sender: &watch::Sender<Vec<Cfd>>,
) -> Result<()> {
) -> Result<()>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
tracing::debug!(
"Learnt latest oracle attestation for event: {}",
attestation.id

2
daemon/src/connection.rs

@ -1,4 +1,4 @@
use daemon::{send_to_socket, taker_cfd, wire};
use crate::{send_to_socket, taker_cfd, wire};
use futures::{Stream, StreamExt};
use std::net::SocketAddr;
use std::time::Duration;

33
daemon/src/housekeeping.rs

@ -1,10 +1,11 @@
use crate::db::{append_cfd_state, load_all_cfds};
use crate::model::cfd::{Cfd, CfdState};
use crate::try_continue;
use crate::wallet::Wallet;
use crate::{try_continue, wallet};
use anyhow::Result;
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
use xtra::Address;
pub async fn transition_non_continue_cfds_to_setup_failed(
conn: &mut PoolConnection<Sqlite>,
) -> Result<()> {
@ -24,25 +25,40 @@ pub async fn transition_non_continue_cfds_to_setup_failed(
pub async fn rebroadcast_transactions(
conn: &mut PoolConnection<Sqlite>,
wallet: &Wallet,
wallet: &Address<wallet::Actor>,
) -> Result<()> {
let cfds = load_all_cfds(conn).await?;
for dlc in cfds.iter().filter_map(|cfd| Cfd::pending_open_dlc(cfd)) {
let txid = try_continue!(wallet.try_broadcast_transaction(dlc.lock.0.clone()).await);
let txid = try_continue!(wallet
.send(wallet::TryBroadcastTransaction {
tx: dlc.lock.0.clone()
})
.await
.expect("if sending to actor fails here we are screwed anyway"));
tracing::info!("Lock transaction published with txid {}", txid);
}
for cfd in cfds.iter().filter(|cfd| Cfd::is_must_refund(cfd)) {
let signed_refund_tx = cfd.refund_tx()?;
let txid = try_continue!(wallet.try_broadcast_transaction(signed_refund_tx).await);
let txid = try_continue!(wallet
.send(wallet::TryBroadcastTransaction {
tx: signed_refund_tx
})
.await
.expect("if sending to actor fails here we are screwed anyway"));
tracing::info!("Refund transaction published on chain: {}", txid);
}
for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_commit(cfd)) {
let signed_commit_tx = cfd.commit_tx()?;
let txid = try_continue!(wallet.try_broadcast_transaction(signed_commit_tx).await);
let txid = try_continue!(wallet
.send(wallet::TryBroadcastTransaction {
tx: signed_commit_tx
})
.await
.expect("if sending to actor fails here we are screwed anyway"));
tracing::info!("Commit transaction published on chain: {}", txid);
}
@ -50,7 +66,10 @@ pub async fn rebroadcast_transactions(
for cfd in cfds.iter().filter(|cfd| Cfd::is_pending_cet(cfd)) {
// Double question mark OK because if we are in PendingCet we must have been Ready before
let signed_cet = cfd.cet()??;
let txid = try_continue!(wallet.try_broadcast_transaction(signed_cet).await);
let txid = try_continue!(wallet
.send(wallet::TryBroadcastTransaction { tx: signed_cet })
.await
.expect("if sending to actor fails here we are screwed anyway"));
tracing::info!("CET published on chain: {}", txid);
}

41
daemon/src/lib.rs

@ -3,7 +3,6 @@ use crate::db::load_all_cfds;
use crate::maker_cfd::{FromTaker, NewTakerOnline};
use crate::model::cfd::{Cfd, Order, UpdateCfdProposals};
use crate::oracle::Attestation;
use crate::wallet::Wallet;
use anyhow::Result;
use cfd_protocol::secp256k1_zkp::schnorrsig;
use futures::Stream;
@ -20,6 +19,7 @@ pub mod actors;
pub mod auth;
pub mod bitmex_price_feed;
pub mod cfd_actors;
pub mod connection;
pub mod db;
pub mod fan_out;
pub mod forward_only_ok;
@ -45,15 +45,15 @@ pub mod wallet;
pub mod wallet_sync;
pub mod wire;
pub struct Maker<O, M, T> {
pub cfd_actor_addr: Address<maker_cfd::Actor<O, M, T>>,
pub struct MakerActorSystem<O, M, T, W> {
pub cfd_actor_addr: Address<maker_cfd::Actor<O, M, T, W>>,
pub cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
pub order_feed_receiver: watch::Receiver<Option<Order>>,
pub update_cfd_feed_receiver: watch::Receiver<UpdateCfdProposals>,
pub inc_conn_addr: Address<T>,
}
impl<O, M, T> Maker<O, M, T>
impl<O, M, T, W> MakerActorSystem<O, M, T, W>
where
O: xtra::Handler<oracle::MonitorAttestation>
+ xtra::Handler<oracle::GetAnnouncement>
@ -65,14 +65,18 @@ where
+ xtra::Handler<oracle::Attestation>,
T: xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>,
W: xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::Sync>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::TryBroadcastTransaction>,
{
pub async fn new<F>(
db: SqlitePool,
wallet: Wallet,
wallet_addr: Address<W>,
oracle_pk: schnorrsig::PublicKey,
oracle_constructor: impl Fn(Vec<Cfd>, Box<dyn StrongMessageChannel<Attestation>>) -> O,
monitor_constructor: impl Fn(Box<dyn StrongMessageChannel<monitor::Event>>, Vec<Cfd>) -> F,
inc_conn_constructor: impl Fn(
oracle_constructor: impl FnOnce(Vec<Cfd>, Box<dyn StrongMessageChannel<Attestation>>) -> O,
monitor_constructor: impl FnOnce(Box<dyn StrongMessageChannel<monitor::Event>>, Vec<Cfd>) -> F,
inc_conn_constructor: impl FnOnce(
Box<dyn MessageChannel<NewTakerOnline>>,
Box<dyn MessageChannel<FromTaker>>,
) -> T,
@ -96,7 +100,7 @@ where
let cfd_actor_addr = maker_cfd::Actor::new(
db,
wallet,
wallet_addr,
term,
oracle_pk,
cfd_feed_sender,
@ -147,14 +151,14 @@ where
}
}
pub struct Taker<O, M> {
pub cfd_actor_addr: Address<taker_cfd::Actor<O, M>>,
pub struct TakerActorSystem<O, M, W> {
pub cfd_actor_addr: Address<taker_cfd::Actor<O, M, W>>,
pub cfd_feed_receiver: watch::Receiver<Vec<Cfd>>,
pub order_feed_receiver: watch::Receiver<Option<Order>>,
pub update_cfd_feed_receiver: watch::Receiver<UpdateCfdProposals>,
}
impl<O, M> Taker<O, M>
impl<O, M, W> TakerActorSystem<O, M, W>
where
O: xtra::Handler<oracle::MonitorAttestation>
+ xtra::Handler<oracle::GetAnnouncement>
@ -164,15 +168,19 @@ where
+ xtra::Handler<monitor::Sync>
+ xtra::Handler<monitor::CollaborativeSettlement>
+ xtra::Handler<oracle::Attestation>,
W: xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::Sync>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::TryBroadcastTransaction>,
{
pub async fn new<F>(
db: SqlitePool,
wallet: Wallet,
wallet_addr: Address<W>,
oracle_pk: schnorrsig::PublicKey,
send_to_maker: Box<dyn MessageChannel<wire::TakerToMaker>>,
read_from_maker: Box<dyn Stream<Item = taker_cfd::MakerStreamMessage> + Unpin + Send>,
oracle_constructor: impl Fn(Vec<Cfd>, Box<dyn StrongMessageChannel<Attestation>>) -> O,
monitor_constructor: impl Fn(Box<dyn StrongMessageChannel<monitor::Event>>, Vec<Cfd>) -> F,
oracle_constructor: impl FnOnce(Vec<Cfd>, Box<dyn StrongMessageChannel<Attestation>>) -> O,
monitor_constructor: impl FnOnce(Box<dyn StrongMessageChannel<monitor::Event>>, Vec<Cfd>) -> F,
) -> Result<Self>
where
F: Future<Output = Result<M>>,
@ -191,7 +199,7 @@ where
let cfd_actor_addr = taker_cfd::Actor::new(
db,
wallet,
wallet_addr,
oracle_pk,
cfd_feed_sender,
order_feed_sender,
@ -220,6 +228,7 @@ where
.notify_interval(Duration::from_secs(5), || oracle::Sync)
.unwrap(),
);
let fan_out_actor = fan_out::Actor::new(&[&cfd_actor_addr, &monitor_addr])
.create(None)
.spawn_global();

20
daemon/src/maker.rs

@ -8,15 +8,13 @@ use daemon::db::{self};
use daemon::model::WalletInfo;
use daemon::seed::Seed;
use daemon::wallet::Wallet;
use daemon::{
bitmex_price_feed, housekeeping, logger, maker_cfd, maker_inc_connections, monitor, oracle,
wallet_sync, Maker,
wallet, wallet_sync, MakerActorSystem,
};
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
use xtra::prelude::MessageChannel;
use std::net::SocketAddr;
use std::path::PathBuf;
@ -25,6 +23,9 @@ use std::str::FromStr;
use std::task::Poll;
use tokio::sync::watch;
use tracing_subscriber::filter::LevelFilter;
use xtra::prelude::*;
use xtra::spawn::TokioGlobalSpawnExt;
use xtra::Actor;
mod routes_maker;
@ -129,13 +130,16 @@ async fn main() -> Result<()> {
let bitcoin_network = opts.network.bitcoin_network();
let ext_priv_key = seed.derive_extended_priv_key(bitcoin_network)?;
let wallet = Wallet::new(
let wallet = wallet::Actor::new(
opts.network.electrum(),
&data_dir.join("maker_wallet.sqlite"),
ext_priv_key,
)
.await?;
let wallet_info = wallet.sync().await?;
.await?
.create(None)
.spawn_global();
let wallet_info = wallet.send(wallet::Sync).await??;
let auth_password = seed.derive_auth_password::<auth::Password>();
@ -186,13 +190,13 @@ async fn main() -> Result<()> {
housekeeping::transition_non_continue_cfds_to_setup_failed(&mut conn).await?;
housekeeping::rebroadcast_transactions(&mut conn, &wallet).await?;
let Maker {
let MakerActorSystem {
cfd_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
inc_conn_addr: incoming_connection_addr,
} = Maker::new(
} = MakerActorSystem::new(
db.clone(),
wallet.clone(),
oracle,

153
daemon/src/maker_cfd.rs

@ -8,8 +8,7 @@ use crate::model::cfd::{
};
use crate::model::{TakerId, Usd};
use crate::monitor::MonitorParams;
use crate::wallet::Wallet;
use crate::{log_error, maker_inc_connections, monitor, oracle, setup_contract, wire};
use crate::{log_error, maker_inc_connections, monitor, oracle, setup_contract, wallet, wire};
use anyhow::{Context as _, Result};
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
@ -59,9 +58,9 @@ pub struct FromTaker {
pub msg: wire::TakerToMaker,
}
pub struct Actor<O, M, T> {
pub struct Actor<O, M, T, W> {
db: sqlx::SqlitePool,
wallet: Wallet,
wallet: Address<W>,
term: Duration,
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
@ -94,11 +93,11 @@ enum RollOverState {
None,
}
impl<O, M, T> Actor<O, M, T> {
impl<O, M, T, W> Actor<O, M, T, W> {
#[allow(clippy::too_many_arguments)]
pub fn new(
db: sqlx::SqlitePool,
wallet: Wallet,
wallet: Address<W>,
term: Duration,
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
@ -127,19 +126,6 @@ impl<O, M, T> Actor<O, M, T> {
}
}
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_propose_roll_over(
&mut self,
proposal: RollOverProposal,
@ -204,30 +190,6 @@ impl<O, M, T> Actor<O, M, T> {
Ok(())
}
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event(
event,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_oracle_attestation(
attestation,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_inc_protocol_msg(
&mut self,
taker_id: TakerId,
@ -311,7 +273,49 @@ impl<O, M, T> Actor<O, M, T> {
}
}
impl<O, M, T> Actor<O, M, T>
impl<O, M, T, W> Actor<O, M, T, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event(
event,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_oracle_attestation(
attestation,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
}
impl<O, M, T, W> Actor<O, M, T, W>
where
T: xtra::Handler<maker_inc_connections::TakerMessage>,
{
@ -401,7 +405,7 @@ where
}
}
impl<O, M, T> Actor<O, M, T>
impl<O, M, T, W> Actor<O, M, T, W>
where
T: xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>,
@ -530,11 +534,12 @@ where
}
}
impl<O, M, T> Actor<O, M, T>
impl<O, M, T, W> Actor<O, M, T, W>
where
Self: xtra::Handler<CfdSetupCompleted>,
O: xtra::Handler<oracle::GetAnnouncement>,
T: xtra::Handler<maker_inc_connections::TakerMessage>,
W: xtra::Handler<wallet::Sign> + xtra::Handler<wallet::BuildPartyParams>,
{
async fn handle_accept_order(
&mut self,
@ -622,7 +627,7 @@ where
}
}
impl<O, M, T> Actor<O, M, T>
impl<O, M, T, W> Actor<O, M, T, W>
where
O: xtra::Handler<oracle::FetchAnnouncement>,
T: xtra::Handler<maker_inc_connections::BroadcastOrder>,
@ -667,10 +672,11 @@ where
}
}
impl<O, M, T> Actor<O, M, T>
impl<O, M, T, W> Actor<O, M, T, W>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_cfd_setup_completed(
&mut self,
@ -694,8 +700,10 @@ where
let txid = self
.wallet
.try_broadcast_transaction(dlc.lock.0.clone())
.await?;
.send(wallet::TryBroadcastTransaction {
tx: dlc.lock.0.clone(),
})
.await??;
tracing::info!("Lock transaction published with txid {}", txid);
@ -716,7 +724,7 @@ where
}
}
impl<O, M, T> Actor<O, M, T>
impl<O, M, T, W> Actor<O, M, T, W>
where
Self: xtra::Handler<CfdRollOverCompleted>,
O: xtra::Handler<oracle::MonitorAttestation> + xtra::Handler<oracle::GetAnnouncement>,
@ -809,7 +817,7 @@ where
}
}
impl<O, M, T> Actor<O, M, T>
impl<O, M, T, W> Actor<O, M, T, W>
where
M: xtra::Handler<monitor::StartMonitoring>,
{
@ -842,9 +850,10 @@ where
}
}
impl<O, M, T> Actor<O, M, T>
impl<O, M, T, W> Actor<O, M, T, W>
where
M: xtra::Handler<monitor::CollaborativeSettlement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_initiate_settlement(
&mut self,
@ -888,8 +897,10 @@ where
let txid = self
.wallet
.try_broadcast_transaction(spend_tx.clone())
.await
.send(wallet::TryBroadcastTransaction {
tx: spend_tx.clone(),
})
.await?
.context("Broadcasting spend transaction")?;
tracing::info!("Close transaction published with txid {}", txid);
@ -909,12 +920,15 @@ where
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static> Handler<CfdAction> for Actor<O, M, T>
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<CfdAction> for Actor<O, M, T, W>
where
Self: xtra::Handler<CfdSetupCompleted> + xtra::Handler<CfdRollOverCompleted>,
O: xtra::Handler<oracle::MonitorAttestation> + xtra::Handler<oracle::GetAnnouncement>,
T: xtra::Handler<maker_inc_connections::TakerMessage>
+ xtra::Handler<maker_inc_connections::BroadcastOrder>,
W: xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: CfdAction, ctx: &mut Context<Self>) {
use CfdAction::*;
@ -933,7 +947,7 @@ where
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static> Handler<NewOrder> for Actor<O, M, T>
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<NewOrder> for Actor<O, M, T, W>
where
O: xtra::Handler<oracle::FetchAnnouncement>,
T: xtra::Handler<maker_inc_connections::BroadcastOrder>,
@ -944,7 +958,7 @@ where
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static> Handler<NewTakerOnline> for Actor<O, M, T>
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<NewTakerOnline> for Actor<O, M, T, W>
where
T: xtra::Handler<maker_inc_connections::TakerMessage>,
{
@ -954,10 +968,12 @@ where
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static> Handler<CfdSetupCompleted> for Actor<O, M, T>
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<CfdSetupCompleted>
for Actor<O, M, T, W>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context<Self>) {
log_error!(self.handle_cfd_setup_completed(msg.order_id, msg.dlc));
@ -965,7 +981,8 @@ where
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static> Handler<CfdRollOverCompleted> for Actor<O, M, T>
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<CfdRollOverCompleted>
for Actor<O, M, T, W>
where
M: xtra::Handler<monitor::StartMonitoring>,
{
@ -975,18 +992,22 @@ where
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static> Handler<monitor::Event> for Actor<O, M, T> {
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<monitor::Event> for Actor<O, M, T, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context<Self>) {
log_error!(self.handle_monitoring_event(msg))
}
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static> Handler<FromTaker> for Actor<O, M, T>
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<FromTaker> for Actor<O, M, T, W>
where
T: xtra::Handler<maker_inc_connections::BroadcastOrder>
+ xtra::Handler<maker_inc_connections::TakerMessage>,
M: xtra::Handler<monitor::CollaborativeSettlement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, FromTaker { taker_id, msg }: FromTaker, _ctx: &mut Context<Self>) {
match msg {
@ -1041,7 +1062,11 @@ where
}
#[async_trait]
impl<O: 'static, M: 'static, T: 'static> Handler<oracle::Attestation> for Actor<O, M, T> {
impl<O: 'static, M: 'static, T: 'static, W: 'static> Handler<oracle::Attestation>
for Actor<O, M, T, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context<Self>) {
log_error!(self.handle_oracle_attestation(msg))
}
@ -1071,4 +1096,4 @@ impl Message for FromTaker {
type Result = ();
}
impl<O: 'static, M: 'static, T: 'static> xtra::Actor for Actor<O, M, T> {}
impl<O: 'static, M: 'static, T: 'static, W: 'static> xtra::Actor for Actor<O, M, T, W> {}

25
daemon/src/setup_contract.rs

@ -1,10 +1,9 @@
use crate::model::cfd::{Cet, Cfd, Dlc, RevokedCommit, Role};
use crate::tokio_ext::FutureExt;
use crate::wallet::Wallet;
use crate::wire::{
Msg0, Msg1, Msg2, RollOverMsg, RollOverMsg0, RollOverMsg1, RollOverMsg2, SetupMsg,
};
use crate::{model, oracle, payout_curve};
use crate::{model, oracle, payout_curve, wallet};
use anyhow::{Context, Result};
use bdk::bitcoin::secp256k1::{schnorrsig, Signature, SECP256K1};
use bdk::bitcoin::util::psbt::PartiallySignedTransaction;
@ -23,25 +22,33 @@ use std::collections::HashMap;
use std::iter::FromIterator;
use std::ops::RangeInclusive;
use std::time::Duration;
use xtra::Address;
/// Given an initial set of parameters, sets up the CFD contract with
/// the other party.
pub async fn new(
pub async fn new<W>(
mut sink: impl Sink<SetupMsg, Error = anyhow::Error> + Unpin,
mut stream: impl FusedStream<Item = SetupMsg> + Unpin,
(oracle_pk, announcement): (schnorrsig::PublicKey, oracle::Announcement),
cfd: Cfd,
wallet: Wallet,
wallet: Address<W>,
role: Role,
) -> Result<Dlc> {
) -> Result<Dlc>
where
W: xtra::Handler<wallet::Sign> + xtra::Handler<wallet::BuildPartyParams>,
{
let (sk, pk) = crate::keypair::new(&mut rand::thread_rng());
let (rev_sk, rev_pk) = crate::keypair::new(&mut rand::thread_rng());
let (publish_sk, publish_pk) = crate::keypair::new(&mut rand::thread_rng());
let margin = cfd.margin().context("Failed to calculate margin")?;
let own_params = wallet
.build_party_params(margin, pk)
.send(wallet::BuildPartyParams {
amount: margin,
identity_pk: pk,
})
.await
.context("Failed to send message to wallet actor")?
.context("Failed to build party params")?;
let own_punish = PunishParams {
@ -171,7 +178,11 @@ pub async fn new(
tracing::info!("Verified all signatures");
let mut signed_lock_tx = wallet.sign(lock_tx).await?;
let mut signed_lock_tx = wallet
.send(wallet::Sign { psbt: lock_tx })
.await
.context("Failed to send message to wallet actor")?
.context("Failed to sign transaction")?;
sink.send(SetupMsg::Msg2(Msg2 {
signed_lock: signed_lock_tx.clone(),
}))

19
daemon/src/taker.rs

@ -7,9 +7,9 @@ use daemon::db::{self};
use daemon::model::WalletInfo;
use daemon::seed::Seed;
use daemon::wallet::Wallet;
use daemon::{
bitmex_price_feed, housekeeping, logger, monitor, oracle, taker_cfd, wallet_sync, Taker,
bitmex_price_feed, connection, housekeeping, logger, monitor, oracle, taker_cfd, wallet,
wallet_sync, TakerActorSystem,
};
use sqlx::sqlite::SqliteConnectOptions;
@ -22,8 +22,9 @@ use std::str::FromStr;
use tokio::sync::watch;
use tracing_subscriber::filter::LevelFilter;
use xtra::prelude::MessageChannel;
use xtra::spawn::TokioGlobalSpawnExt;
use xtra::Actor;
mod connection;
mod routes_taker;
#[derive(Clap)]
@ -121,13 +122,15 @@ async fn main() -> Result<()> {
let bitcoin_network = opts.network.bitcoin_network();
let ext_priv_key = seed.derive_extended_priv_key(bitcoin_network)?;
let wallet = Wallet::new(
let wallet = wallet::Actor::new(
opts.network.electrum(),
&data_dir.join("taker_wallet.sqlite"),
ext_priv_key,
)
.await?;
let wallet_info = wallet.sync().await.unwrap();
.await?
.create(None)
.spawn_global();
let wallet_info = wallet.send(wallet::Sync).await??;
// TODO: Actually fetch it from Olivia
let oracle = schnorrsig::PublicKey::from_str(
@ -165,12 +168,12 @@ async fn main() -> Result<()> {
read_from_maker,
} = connection::Actor::new(opts.maker).await;
let Taker {
let TakerActorSystem {
cfd_actor_addr,
cfd_feed_receiver,
order_feed_receiver,
update_cfd_feed_receiver,
} = Taker::new(
} = TakerActorSystem::new(
db.clone(),
wallet.clone(),
oracle,

171
daemon/src/taker_cfd.rs

@ -7,9 +7,8 @@ use crate::model::cfd::{
};
use crate::model::{BitMexPriceEventId, Usd};
use crate::monitor::{self, MonitorParams};
use crate::wallet::Wallet;
use crate::wire::{MakerToTaker, RollOverMsg, SetupMsg};
use crate::{log_error, oracle, setup_contract, wire};
use crate::{log_error, oracle, setup_contract, wallet, wire};
use anyhow::{Context as _, Result};
use async_trait::async_trait;
use bdk::bitcoin::secp256k1::schnorrsig;
@ -68,9 +67,9 @@ enum RollOverState {
None,
}
pub struct Actor<O, M> {
pub struct Actor<O, M, W> {
db: sqlx::SqlitePool,
wallet: Wallet,
wallet: Address<W>,
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>,
@ -83,11 +82,16 @@ pub struct Actor<O, M> {
current_pending_proposals: UpdateCfdProposals,
}
impl<O, M> Actor<O, M> {
impl<O, M, W> Actor<O, M, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
db: sqlx::SqlitePool,
wallet: Wallet,
wallet: Address<W>,
oracle_pk: schnorrsig::PublicKey,
cfd_feed_actor_inbox: watch::Sender<Vec<Cfd>>,
order_feed_actor_inbox: watch::Sender<Option<Order>>,
@ -111,7 +115,9 @@ impl<O, M> Actor<O, M> {
current_pending_proposals: HashMap::new(),
}
}
}
impl<O, M, W> Actor<O, M, W> {
fn send_pending_update_proposals(&self) -> Result<()> {
Ok(self
.update_cfd_feed_sender
@ -165,6 +171,25 @@ impl<O, M> Actor<O, M> {
Ok(())
}
}
impl<O, M, W> Actor<O, M, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>,
{
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_propose_settlement(
&mut self,
@ -255,42 +280,6 @@ impl<O, M> Actor<O, M> {
Ok(())
}
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event(
event,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_commit(&mut self, order_id: OrderId) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_commit(
order_id,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_oracle_attestation(
attestation,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_invalid_order_id(&mut self, order_id: OrderId) -> Result<()> {
tracing::debug!(%order_id, "Invalid order ID");
@ -311,7 +300,7 @@ impl<O, M> Actor<O, M> {
}
}
impl<O, M> Actor<O, M>
impl<O, M, W> Actor<O, M, W>
where
O: xtra::Handler<oracle::FetchAnnouncement>,
{
@ -334,7 +323,42 @@ where
}
Ok(())
}
}
impl<O, M, W> Actor<O, M, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_oracle_attestation(&mut self, attestation: oracle::Attestation) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_oracle_attestation(
attestation,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
async fn handle_monitoring_event(&mut self, event: monitor::Event) -> Result<()> {
let mut conn = self.db.acquire().await?;
cfd_actors::handle_monitoring_event(
event,
&mut conn,
&self.wallet,
&self.cfd_feed_actor_inbox,
)
.await?;
Ok(())
}
}
impl<O, M, W> Actor<O, M, W>
where
O: xtra::Handler<oracle::FetchAnnouncement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_propose_roll_over(&mut self, order_id: OrderId) -> Result<()> {
if self.current_pending_proposals.contains_key(&order_id) {
anyhow::bail!("An update for order id {} is already in progress", order_id)
@ -371,11 +395,20 @@ where
Ok(())
}
}
impl<O, M, W> Actor<O, M, W>
where
O: xtra::Handler<oracle::FetchAnnouncement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>,
{
}
impl<O, M> Actor<O, M>
impl<O, M, W> Actor<O, M, W>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle_cfd_setup_completed(
&mut self,
@ -399,8 +432,10 @@ where
let txid = self
.wallet
.try_broadcast_transaction(dlc.lock.0.clone())
.await?;
.send(wallet::TryBroadcastTransaction {
tx: dlc.lock.0.clone(),
})
.await??;
tracing::info!("Lock transaction published with txid {}", txid);
@ -421,10 +456,11 @@ where
}
}
impl<O: 'static, M: 'static> Actor<O, M>
impl<O: 'static, M: 'static, W: 'static> Actor<O, M, W>
where
Self: xtra::Handler<CfdSetupCompleted>,
O: xtra::Handler<oracle::GetAnnouncement> + xtra::Handler<oracle::MonitorAttestation>,
W: xtra::Handler<wallet::Sign> + xtra::Handler<wallet::BuildPartyParams>,
{
async fn handle_order_accepted(
&mut self,
@ -486,10 +522,13 @@ where
}
}
impl<O: 'static, M: 'static> Actor<O, M>
impl<O: 'static, M: 'static, W: 'static> Actor<O, M, W>
where
Self: xtra::Handler<CfdRollOverCompleted>,
O: xtra::Handler<oracle::GetAnnouncement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>,
{
async fn handle_roll_over_accepted(
&mut self,
@ -546,7 +585,7 @@ where
}
}
impl<O: 'static, M: 'static> Actor<O, M>
impl<O: 'static, M: 'static, W: 'static> Actor<O, M, W>
where
M: xtra::Handler<monitor::StartMonitoring>,
{
@ -580,9 +619,12 @@ where
}
}
impl<O: 'static, M: 'static> Actor<O, M>
impl<O: 'static, M: 'static, W: 'static> Actor<O, M, W>
where
M: xtra::Handler<monitor::CollaborativeSettlement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>,
{
async fn handle_settlement_accepted(
&mut self,
@ -630,16 +672,19 @@ where
}
#[async_trait]
impl<O: 'static, M: 'static> Handler<TakeOffer> for Actor<O, M> {
impl<O: 'static, M: 'static, W: 'static> Handler<TakeOffer> for Actor<O, M, W> {
async fn handle(&mut self, msg: TakeOffer, _ctx: &mut Context<Self>) {
log_error!(self.handle_take_offer(msg.order_id, msg.quantity));
}
}
#[async_trait]
impl<O: 'static, M: 'static> Handler<CfdAction> for Actor<O, M>
impl<O: 'static, M: 'static, W: 'static> Handler<CfdAction> for Actor<O, M, W>
where
O: xtra::Handler<oracle::FetchAnnouncement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>,
{
async fn handle(&mut self, msg: CfdAction, _ctx: &mut Context<Self>) {
use CfdAction::*;
@ -661,13 +706,16 @@ where
}
#[async_trait]
impl<O: 'static, M: 'static> Handler<MakerStreamMessage> for Actor<O, M>
impl<O: 'static, M: 'static, W: 'static> Handler<MakerStreamMessage> for Actor<O, M, W>
where
Self: xtra::Handler<CfdSetupCompleted> + xtra::Handler<CfdRollOverCompleted>,
O: xtra::Handler<oracle::FetchAnnouncement>
+ xtra::Handler<oracle::GetAnnouncement>
+ xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::CollaborativeSettlement>,
W: xtra::Handler<wallet::TryBroadcastTransaction>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::BuildPartyParams>,
{
async fn handle(
&mut self,
@ -723,10 +771,11 @@ where
}
#[async_trait]
impl<O: 'static, M: 'static> Handler<CfdSetupCompleted> for Actor<O, M>
impl<O: 'static, M: 'static, W: 'static> Handler<CfdSetupCompleted> for Actor<O, M, W>
where
O: xtra::Handler<oracle::MonitorAttestation>,
M: xtra::Handler<monitor::StartMonitoring>,
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: CfdSetupCompleted, _ctx: &mut Context<Self>) {
log_error!(self.handle_cfd_setup_completed(msg.order_id, msg.dlc));
@ -734,7 +783,7 @@ where
}
#[async_trait]
impl<O: 'static, M: 'static> Handler<CfdRollOverCompleted> for Actor<O, M>
impl<O: 'static, M: 'static, W: 'static> Handler<CfdRollOverCompleted> for Actor<O, M, W>
where
M: xtra::Handler<monitor::StartMonitoring>,
{
@ -744,14 +793,20 @@ where
}
#[async_trait]
impl<O: 'static, M: 'static> Handler<monitor::Event> for Actor<O, M> {
impl<O: 'static, M: 'static, W: 'static> Handler<monitor::Event> for Actor<O, M, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: monitor::Event, _ctx: &mut Context<Self>) {
log_error!(self.handle_monitoring_event(msg))
}
}
#[async_trait]
impl<O: 'static, M: 'static> Handler<oracle::Attestation> for Actor<O, M> {
impl<O: 'static, M: 'static, W: 'static> Handler<oracle::Attestation> for Actor<O, M, W>
where
W: xtra::Handler<wallet::TryBroadcastTransaction>,
{
async fn handle(&mut self, msg: oracle::Attestation, _ctx: &mut Context<Self>) {
log_error!(self.handle_oracle_attestation(msg))
}
@ -778,4 +833,4 @@ impl Message for CfdRollOverCompleted {
type Result = ();
}
impl<O: 'static, M: 'static> xtra::Actor for Actor<O, M> {}
impl<O: 'static, M: 'static, W: 'static> xtra::Actor for Actor<O, M, W> {}

57
daemon/src/wallet.rs

@ -13,9 +13,10 @@ use std::path::Path;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::Mutex;
use xtra_productivity::xtra_productivity;
#[derive(Clone)]
pub struct Wallet {
pub struct Actor {
wallet: Arc<Mutex<bdk::Wallet<ElectrumBlockchain, bdk::database::SqliteDatabase>>>,
}
@ -23,7 +24,7 @@ pub struct Wallet {
#[error("The transaction is already in the blockchain")]
pub struct TransactionAlreadyInBlockchain;
impl Wallet {
impl Actor {
pub async fn new(
electrum_rpc_url: &str,
wallet_dir: &Path,
@ -46,17 +47,11 @@ impl Wallet {
Ok(Self { wallet })
}
pub async fn build_party_params(
&self,
amount: Amount,
identity_pk: PublicKey,
) -> Result<PartyParams> {
let wallet = self.wallet.lock().await;
wallet.build_party_params(amount, identity_pk)
}
pub async fn sync(&self) -> Result<WalletInfo> {
#[xtra_productivity]
impl Actor {
pub async fn handle_sync(&self, _msg: Sync) -> Result<WalletInfo> {
let wallet = self.wallet.lock().await;
wallet
.sync(NoopProgress, None)
@ -75,10 +70,8 @@ impl Wallet {
Ok(wallet_info)
}
pub async fn sign(
&self,
mut psbt: PartiallySignedTransaction,
) -> Result<PartiallySignedTransaction> {
pub async fn handle_sign(&self, msg: Sign) -> Result<PartiallySignedTransaction> {
let mut psbt = msg.psbt;
let wallet = self.wallet.lock().await;
wallet
@ -94,7 +87,22 @@ impl Wallet {
Ok(psbt)
}
pub async fn try_broadcast_transaction(&self, tx: Transaction) -> Result<Txid> {
pub async fn build_party_params(
&self,
BuildPartyParams {
amount,
identity_pk,
}: BuildPartyParams,
) -> Result<PartyParams> {
let wallet = self.wallet.lock().await;
wallet.build_party_params(amount, identity_pk)
}
pub async fn handle_try_broadcast_transaction(
&self,
msg: TryBroadcastTransaction,
) -> Result<Txid> {
let tx = msg.tx;
let wallet = self.wallet.lock().await;
let txid = tx.txid();
@ -128,6 +136,23 @@ impl Wallet {
}
}
impl xtra::Actor for Actor {}
pub struct BuildPartyParams {
pub amount: Amount,
pub identity_pk: PublicKey,
}
pub struct Sync;
pub struct Sign {
pub psbt: PartiallySignedTransaction,
}
pub struct TryBroadcastTransaction {
pub tx: Transaction,
}
fn parse_rpc_protocol_error_code(error_value: &Value) -> Result<i64> {
let json = error_value
.as_str()

11
daemon/src/wallet_sync.rs

@ -1,14 +1,19 @@
use crate::model::WalletInfo;
use crate::wallet::Wallet;
use crate::wallet;
use std::time::Duration;
use tokio::sync::watch;
use tokio::time::sleep;
use xtra::Address;
pub async fn new(wallet: Wallet, sender: watch::Sender<WalletInfo>) {
pub async fn new(wallet: Address<wallet::Actor>, sender: watch::Sender<WalletInfo>) {
loop {
sleep(Duration::from_secs(10)).await;
let info = match wallet.sync().await {
let info = match wallet
.send(wallet::Sync)
.await
.expect("Wallet actor to be available")
{
Ok(info) => info,
Err(e) => {
tracing::warn!("Failed to sync wallet: {:#}", e);

Loading…
Cancel
Save