Browse Source

Start wallet sync as part of wallet actor

chore/bump-maia
Thomas Eizinger 3 years ago
parent
commit
928f4f5e2f
No known key found for this signature in database GPG Key ID: 651AC83A6C6C8B96
  1. 3
      daemon/src/lib.rs
  2. 14
      daemon/src/maker.rs
  3. 15
      daemon/src/taker.rs
  4. 61
      daemon/src/wallet.rs
  5. 29
      daemon/src/wallet_sync.rs
  6. 8
      daemon/tests/harness/mocks/wallet.rs

3
daemon/src/lib.rs

@ -55,7 +55,6 @@ pub mod tokio_ext;
pub mod try_continue; pub mod try_continue;
pub mod tx; pub mod tx;
pub mod wallet; pub mod wallet;
pub mod wallet_sync;
pub mod wire; pub mod wire;
// Certain operations (e.g. contract setup) take long time in debug mode, // Certain operations (e.g. contract setup) take long time in debug mode,
@ -115,7 +114,6 @@ where
+ xtra::Handler<maker_inc_connections::ConfirmOrder> + xtra::Handler<maker_inc_connections::ConfirmOrder>
+ xtra::Handler<Stopping<setup_maker::Actor>>, + xtra::Handler<Stopping<setup_maker::Actor>>,
W: xtra::Handler<wallet::BuildPartyParams> W: xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::Sync>
+ xtra::Handler<wallet::Sign> + xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::TryBroadcastTransaction>, + xtra::Handler<wallet::TryBroadcastTransaction>,
{ {
@ -222,7 +220,6 @@ where
+ xtra::Handler<monitor::CollaborativeSettlement> + xtra::Handler<monitor::CollaborativeSettlement>
+ xtra::Handler<oracle::Attestation>, + xtra::Handler<oracle::Attestation>,
W: xtra::Handler<wallet::BuildPartyParams> W: xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::Sync>
+ xtra::Handler<wallet::Sign> + xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::TryBroadcastTransaction>, + xtra::Handler<wallet::TryBroadcastTransaction>,
{ {

14
daemon/src/maker.rs

@ -8,7 +8,7 @@ use daemon::model::cfd::Role;
use daemon::seed::Seed; use daemon::seed::Seed;
use daemon::{ use daemon::{
bitmex_price_feed, db, housekeeping, logger, maker_inc_connections, monitor, oracle, bitmex_price_feed, db, housekeeping, logger, maker_inc_connections, monitor, oracle,
projection, wallet, wallet_sync, MakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, projection, wallet, MakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS,
SETTLEMENT_INTERVAL, SETTLEMENT_INTERVAL,
}; };
use sqlx::sqlite::SqliteConnectOptions; use sqlx::sqlite::SqliteConnectOptions;
@ -17,7 +17,6 @@ use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::task::Poll; use std::task::Poll;
use tokio::sync::watch;
use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::filter::LevelFilter;
use xtra::Actor; use xtra::Actor;
@ -160,13 +159,13 @@ async fn main() -> Result<()> {
let mut tasks = Tasks::default(); let mut tasks = Tasks::default();
let (wallet, wallet_fut) = wallet::Actor::new( let (wallet, wallet_feed_receiver) = wallet::Actor::new(
opts.network.electrum(), opts.network.electrum(),
&data_dir.join("maker_wallet.sqlite"), &data_dir.join("maker_wallet.sqlite"),
ext_priv_key, ext_priv_key,
)? )?;
.create(None)
.run(); let (wallet, wallet_fut) = wallet.create(None).run();
tasks.add(wallet_fut); tasks.add(wallet_fut);
if let Some(Withdraw::Withdraw { if let Some(Withdraw::Withdraw {
@ -202,8 +201,6 @@ async fn main() -> Result<()> {
"ddd4636845a90185991826be5a494cde9f4a6947b1727217afedc6292fa4caf7", "ddd4636845a90185991826be5a494cde9f4a6947b1727217afedc6292fa4caf7",
)?; )?;
let (wallet_feed_sender, wallet_feed_receiver) = watch::channel(None);
let figment = rocket::Config::figment() let figment = rocket::Config::figment()
.merge(("address", opts.http_address.ip())) .merge(("address", opts.http_address.ip()))
.merge(("port", opts.http_address.port())); .merge(("port", opts.http_address.port()));
@ -289,7 +286,6 @@ async fn main() -> Result<()> {
}); });
tasks.add(incoming_connection_addr.attach_stream(listener_stream)); tasks.add(incoming_connection_addr.attach_stream(listener_stream));
tasks.add(wallet_sync::new(wallet.clone(), wallet_feed_sender));
rocket::custom(figment) rocket::custom(figment)
.manage(projection_feeds) .manage(projection_feeds)

15
daemon/src/taker.rs

@ -8,7 +8,7 @@ use daemon::model::cfd::Role;
use daemon::model::Identity; use daemon::model::Identity;
use daemon::seed::Seed; use daemon::seed::Seed;
use daemon::{ use daemon::{
bitmex_price_feed, db, housekeeping, logger, monitor, oracle, projection, wallet, wallet_sync, bitmex_price_feed, db, housekeeping, logger, monitor, oracle, projection, wallet,
TakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, SETTLEMENT_INTERVAL, TakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, SETTLEMENT_INTERVAL,
}; };
use sqlx::sqlite::SqliteConnectOptions; use sqlx::sqlite::SqliteConnectOptions;
@ -17,7 +17,6 @@ use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::time::Duration; use std::time::Duration;
use tokio::sync::watch;
use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::filter::LevelFilter;
use xtra::Actor; use xtra::Actor;
@ -171,13 +170,13 @@ async fn main() -> Result<()> {
let mut tasks = Tasks::default(); let mut tasks = Tasks::default();
let (wallet, wallet_fut) = wallet::Actor::new( let (wallet, wallet_feed_receiver) = wallet::Actor::new(
opts.network.electrum(), opts.network.electrum(),
&data_dir.join("taker_wallet.sqlite"), &data_dir.join("taker_wallet.sqlite"),
ext_priv_key, ext_priv_key,
)? )?;
.create(None)
.run(); let (wallet, wallet_fut) = wallet.create(None).run();
tasks.add(wallet_fut); tasks.add(wallet_fut);
if let Some(Withdraw::Withdraw { if let Some(Withdraw::Withdraw {
@ -202,8 +201,6 @@ async fn main() -> Result<()> {
"ddd4636845a90185991826be5a494cde9f4a6947b1727217afedc6292fa4caf7", "ddd4636845a90185991826be5a494cde9f4a6947b1727217afedc6292fa4caf7",
)?; )?;
let (wallet_feed_sender, wallet_feed_receiver) = watch::channel(None);
let figment = rocket::Config::figment() let figment = rocket::Config::figment()
.merge(("address", opts.http_address.ip())) .merge(("address", opts.http_address.ip()))
.merge(("port", opts.http_address.port())); .merge(("port", opts.http_address.port()));
@ -270,8 +267,6 @@ async fn main() -> Result<()> {
possible_addresses, possible_addresses,
)); ));
tasks.add(wallet_sync::new(wallet.clone(), wallet_feed_sender));
let rocket = rocket::custom(figment) let rocket = rocket::custom(figment)
.manage(projection_feeds) .manage(projection_feeds)
.manage(cfd_actor_addr) .manage(cfd_actor_addr)

61
daemon/src/wallet.rs

@ -1,5 +1,7 @@
use crate::model::{Timestamp, WalletInfo}; use crate::model::{Timestamp, WalletInfo};
use crate::Tasks;
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use async_trait::async_trait;
use bdk::bitcoin::consensus::encode::serialize_hex; use bdk::bitcoin::consensus::encode::serialize_hex;
use bdk::bitcoin::util::bip32::ExtendedPrivKey; use bdk::bitcoin::util::bip32::ExtendedPrivKey;
use bdk::bitcoin::util::psbt::PartiallySignedTransaction; use bdk::bitcoin::util::psbt::PartiallySignedTransaction;
@ -11,9 +13,10 @@ use bdk::wallet::AddressIndex;
use bdk::{electrum_client, FeeRate, KeychainKind, SignOptions}; use bdk::{electrum_client, FeeRate, KeychainKind, SignOptions};
use maia::{PartyParams, TxBuilderExt}; use maia::{PartyParams, TxBuilderExt};
use rocket::serde::json::Value; use rocket::serde::json::Value;
use std::collections::HashSet; use std::collections::HashSet;
use std::path::Path; use std::path::Path;
use std::time::Duration;
use tokio::sync::watch;
use xtra_productivity::xtra_productivity; use xtra_productivity::xtra_productivity;
const DUST_AMOUNT: u64 = 546; const DUST_AMOUNT: u64 = 546;
@ -21,6 +24,8 @@ const DUST_AMOUNT: u64 = 546;
pub struct Actor { pub struct Actor {
wallet: bdk::Wallet<ElectrumBlockchain, bdk::database::SqliteDatabase>, wallet: bdk::Wallet<ElectrumBlockchain, bdk::database::SqliteDatabase>,
used_utxos: HashSet<OutPoint>, used_utxos: HashSet<OutPoint>,
tasks: Tasks,
sender: watch::Sender<Option<WalletInfo>>,
} }
#[derive(thiserror::Error, Debug, Clone, Copy)] #[derive(thiserror::Error, Debug, Clone, Copy)]
@ -32,7 +37,7 @@ impl Actor {
electrum_rpc_url: &str, electrum_rpc_url: &str,
wallet_dir: &Path, wallet_dir: &Path,
ext_priv_key: ExtendedPrivKey, ext_priv_key: ExtendedPrivKey,
) -> Result<Self> { ) -> Result<(Self, watch::Receiver<Option<WalletInfo>>)> {
let client = bdk::electrum_client::Client::new(electrum_rpc_url) let client = bdk::electrum_client::Client::new(electrum_rpc_url)
.context("Failed to initialize Electrum RPC client")?; .context("Failed to initialize Electrum RPC client")?;
@ -46,10 +51,15 @@ impl Actor {
ElectrumBlockchain::from(client), ElectrumBlockchain::from(client),
)?; )?;
Ok(Self { let (sender, receiver) = watch::channel(None);
let actor = Self {
wallet, wallet,
tasks: Tasks::default(),
sender,
used_utxos: HashSet::default(), used_utxos: HashSet::default(),
}) };
Ok((actor, receiver))
} }
/// Calculates the maximum "giveable" amount of this wallet. /// Calculates the maximum "giveable" amount of this wallet.
@ -86,11 +96,8 @@ impl Actor {
Err(e) => bail!("Failed to build transaction. {:#}", e), Err(e) => bail!("Failed to build transaction. {:#}", e),
} }
} }
}
#[xtra_productivity] fn sync_internal(&mut self) -> Result<WalletInfo> {
impl Actor {
pub fn handle_sync(&mut self, _msg: Sync) -> Result<WalletInfo> {
self.wallet self.wallet
.sync(NoopProgress, None) .sync(NoopProgress, None)
.context("Failed to sync wallet")?; .context("Failed to sync wallet")?;
@ -107,6 +114,24 @@ impl Actor {
Ok(wallet_info) Ok(wallet_info)
} }
}
#[xtra_productivity]
impl Actor {
pub fn handle_sync(&mut self, _msg: Sync) -> Result<()> {
let wallet_info_update = match self.sync_internal() {
Ok(wallet_info) => Some(wallet_info),
Err(e) => {
tracing::debug!("{:#}", e);
None
}
};
let _ = self.sender.send(wallet_info_update);
Ok(())
}
pub fn handle_sign(&mut self, msg: Sign) -> Result<PartiallySignedTransaction> { pub fn handle_sign(&mut self, msg: Sign) -> Result<PartiallySignedTransaction> {
let mut psbt = msg.psbt; let mut psbt = msg.psbt;
@ -222,14 +247,30 @@ impl Actor {
} }
} }
impl xtra::Actor for Actor {} #[async_trait]
impl xtra::Actor for Actor {
async fn started(&mut self, ctx: &mut xtra::Context<Self>) {
let this = ctx.address().expect("self to be alive");
self.tasks.add(async move {
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
if this.send(Sync).await.is_err() {
return; // we are disconnected, meaning actor stopped, just exit the loop.
}
}
});
}
}
pub struct BuildPartyParams { pub struct BuildPartyParams {
pub amount: Amount, pub amount: Amount,
pub identity_pk: PublicKey, pub identity_pk: PublicKey,
} }
pub struct Sync; /// Private message to trigger a sync.
struct Sync;
pub struct Sign { pub struct Sign {
pub psbt: PartiallySignedTransaction, pub psbt: PartiallySignedTransaction,

29
daemon/src/wallet_sync.rs

@ -1,29 +0,0 @@
use crate::model::WalletInfo;
use crate::wallet;
use std::time::Duration;
use tokio::sync::watch;
use tokio::time::sleep;
use xtra::Address;
pub async fn new(wallet: Address<wallet::Actor>, sender: watch::Sender<Option<WalletInfo>>) {
loop {
sleep(Duration::from_secs(10)).await;
let info = match wallet
.send(wallet::Sync)
.await
.expect("Wallet actor to be available")
{
Ok(info) => Some(info),
Err(e) => {
tracing::warn!("Failed to sync wallet: {:#}", e);
None
}
};
if sender.send(info).is_err() {
tracing::warn!("Wallet feed receiver not available, stopping wallet sync");
break;
}
}
}

8
daemon/tests/harness/mocks/wallet.rs

@ -28,10 +28,6 @@ impl WalletActor {
async fn handle(&mut self, msg: wallet::BuildPartyParams) -> Result<PartyParams> { async fn handle(&mut self, msg: wallet::BuildPartyParams) -> Result<PartyParams> {
self.mock.lock().await.build_party_params(msg) self.mock.lock().await.build_party_params(msg)
} }
async fn handle(&mut self, msg: wallet::Sync) -> Result<WalletInfo> {
tracing::info!("We are handling the wallet sync msg");
self.mock.lock().await.sync(msg)
}
async fn handle(&mut self, msg: wallet::Sign) -> Result<PartiallySignedTransaction> { async fn handle(&mut self, msg: wallet::Sign) -> Result<PartiallySignedTransaction> {
self.mock.lock().await.sign(msg) self.mock.lock().await.sign(msg)
} }
@ -53,10 +49,6 @@ pub trait Wallet {
fn broadcast(&mut self, _msg: wallet::TryBroadcastTransaction) -> Result<Txid> { fn broadcast(&mut self, _msg: wallet::TryBroadcastTransaction) -> Result<Txid> {
unreachable!("mockall will reimplement this method") unreachable!("mockall will reimplement this method")
} }
fn sync(&mut self, _msg: wallet::Sync) -> Result<WalletInfo> {
unreachable!("mockall will reimplement this method")
}
} }
#[allow(dead_code)] #[allow(dead_code)]

Loading…
Cancel
Save