Browse Source

Merge #812

812: Make wallet sync more resilient r=thomaseizinger a=thomaseizinger



Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
chore/bump-maia
bors[bot] 3 years ago
committed by GitHub
parent
commit
4596174b2e
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 36
      Cargo.lock
  2. 3
      Cargo.toml
  3. 2
      daemon/Cargo.toml
  4. 3
      daemon/src/lib.rs
  5. 25
      daemon/src/maker.rs
  6. 2
      daemon/src/routes_maker.rs
  7. 2
      daemon/src/routes_taker.rs
  8. 27
      daemon/src/taker.rs
  9. 12
      daemon/src/to_sse_event.rs
  10. 71
      daemon/src/wallet.rs
  11. 29
      daemon/src/wallet_sync.rs
  12. 8
      daemon/tests/harness/mocks/wallet.rs

36
Cargo.lock

@ -207,11 +207,9 @@ dependencies = [
[[package]]
name = "bdk"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6ec4da3dbaa41bb6d6cffe40b113ea8651566da7f96beebe9c0e87fc92b9094"
version = "0.14.1-dev"
source = "git+https://github.com/bitcoindevkit/bdk#64e88f0e006c68315142d53dc35b633327dde4b5"
dependencies = [
"ahash",
"async-trait",
"bdk-macros",
"bitcoin",
@ -220,7 +218,6 @@ dependencies = [
"log",
"miniscript",
"rand 0.7.3",
"rusqlite",
"serde",
"serde_json",
"tokio",
@ -832,18 +829,6 @@ version = "2.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59"
[[package]]
name = "fallible-iterator"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
[[package]]
name = "fallible-streaming-iterator"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
[[package]]
name = "figment"
version = "0.10.6"
@ -1441,7 +1426,7 @@ dependencies = [
[[package]]
name = "maia"
version = "0.1.0"
source = "git+https://github.com/comit-network/maia#5e6d44882ced923c19f45c55a825b961c9b1e396"
source = "git+https://github.com/comit-network/maia?branch=temp-bdk-dependency#45e886ef9e04ce37c26b628abe143fd98e967fdd"
dependencies = [
"anyhow",
"bdk",
@ -2406,21 +2391,6 @@ dependencies = [
"uncased",
]
[[package]]
name = "rusqlite"
version = "0.25.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57adcf67c8faaf96f3248c2a7b419a0dbc52ebe36ba83dd57fe83827c1ea4eb3"
dependencies = [
"bitflags",
"fallible-iterator",
"fallible-streaming-iterator",
"hashlink",
"libsqlite3-sys",
"memchr",
"smallvec",
]
[[package]]
name = "rust-embed"
version = "6.3.0"

3
Cargo.toml

@ -6,5 +6,6 @@ resolver = "2"
rocket = { git = "https://github.com/SergioBenitez/Rocket" } # Need to patch rocket dependency of `rocket_basicauth` until there is an official release.
xtra = { git = "https://github.com/Restioson/xtra" } # We need to use unreleased patches.
secp256k1-zkp = { git = "https://github.com/ElementsProject/rust-secp256k1-zkp" } # We need to use unreleased patches.
maia = { git = "https://github.com/comit-network/maia" } # Unreleased
maia = { git = "https://github.com/comit-network/maia", branch = "temp-bdk-dependency" } # Unreleased
xtra_productivity = { git = "https://github.com/comit-network/xtra-productivity" } # Unreleased
bdk = { git = "https://github.com/bitcoindevkit/bdk" } # Unreleased

2
daemon/Cargo.toml

@ -7,7 +7,7 @@ edition = "2021"
anyhow = "1"
async-trait = "0.1.51"
atty = "0.2"
bdk = { version = "0.14", default-features = false, features = ["sqlite", "electrum"] }
bdk = { version = "0.14.1-dev", default-features = false, features = ["electrum"] }
bytes = "1"
chrono = { version = "0.4", features = ["serde"] }
clap = "3.0.0-beta.5"

3
daemon/src/lib.rs

@ -55,7 +55,6 @@ pub mod tokio_ext;
pub mod try_continue;
pub mod tx;
pub mod wallet;
pub mod wallet_sync;
pub mod wire;
// Certain operations (e.g. contract setup) take long time in debug mode,
@ -115,7 +114,6 @@ where
+ xtra::Handler<maker_inc_connections::ConfirmOrder>
+ xtra::Handler<Stopping<setup_maker::Actor>>,
W: xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::Sync>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::TryBroadcastTransaction>,
{
@ -222,7 +220,6 @@ where
+ xtra::Handler<monitor::CollaborativeSettlement>
+ xtra::Handler<oracle::Attestation>,
W: xtra::Handler<wallet::BuildPartyParams>
+ xtra::Handler<wallet::Sync>
+ xtra::Handler<wallet::Sign>
+ xtra::Handler<wallet::TryBroadcastTransaction>,
{

25
daemon/src/maker.rs

@ -5,12 +5,10 @@ use bdk::{bitcoin, FeeRate};
use clap::{Parser, Subcommand};
use daemon::auth::{self, MAKER_USERNAME};
use daemon::model::cfd::Role;
use daemon::model::WalletInfo;
use daemon::seed::Seed;
use daemon::tokio_ext::FutureExt;
use daemon::{
bitmex_price_feed, db, housekeeping, logger, maker_inc_connections, monitor, oracle,
projection, wallet, wallet_sync, MakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS,
projection, wallet, MakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS,
SETTLEMENT_INTERVAL,
};
use sqlx::sqlite::SqliteConnectOptions;
@ -19,7 +17,6 @@ use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::task::Poll;
use tokio::sync::watch;
use tracing_subscriber::filter::LevelFilter;
use xtra::Actor;
@ -160,17 +157,12 @@ async fn main() -> Result<()> {
let bitcoin_network = opts.network.bitcoin_network();
let ext_priv_key = seed.derive_extended_priv_key(bitcoin_network)?;
let (wallet, wallet_fut) = wallet::Actor::new(
opts.network.electrum(),
&data_dir.join("maker_wallet.sqlite"),
ext_priv_key,
)?
.create(None)
.run();
let _wallet_handle = wallet_fut.spawn_with_handle();
let mut tasks = Tasks::default();
let (wallet, wallet_feed_receiver) = wallet::Actor::new(opts.network.electrum(), ext_priv_key)?;
// do this before withdraw to ensure the wallet is synced
let wallet_info = wallet.send(wallet::Sync).await??;
let (wallet, wallet_fut) = wallet.create(None).run();
tasks.add(wallet_fut);
if let Some(Withdraw::Withdraw {
amount,
@ -205,8 +197,6 @@ async fn main() -> Result<()> {
"ddd4636845a90185991826be5a494cde9f4a6947b1727217afedc6292fa4caf7",
)?;
let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info);
let figment = rocket::Config::figment()
.merge(("address", opts.http_address.ip()))
.merge(("port", opts.http_address.port()));
@ -221,8 +211,6 @@ async fn main() -> Result<()> {
tracing::info!("Listening on {}", local_addr);
let mut tasks = Tasks::default();
let db = SqlitePool::connect_with(
SqliteConnectOptions::new()
.create_if_missing(true)
@ -294,7 +282,6 @@ async fn main() -> Result<()> {
});
tasks.add(incoming_connection_addr.attach_stream(listener_stream));
tasks.add(wallet_sync::new(wallet.clone(), wallet_feed_sender));
rocket::custom(figment)
.manage(projection_feeds)

2
daemon/src/routes_maker.rs

@ -29,7 +29,7 @@ pub type Maker = xtra::Address<
#[rocket::get("/feed")]
pub async fn maker_feed(
rx: &State<Feeds>,
rx_wallet: &State<watch::Receiver<WalletInfo>>,
rx_wallet: &State<watch::Receiver<Option<WalletInfo>>>,
_auth: Authenticated,
) -> EventStream![] {
let rx = rx.inner();

2
daemon/src/routes_taker.rs

@ -25,7 +25,7 @@ type Taker = xtra::Address<taker_cfd::Actor<oracle::Actor, monitor::Actor, walle
#[rocket::get("/feed")]
pub async fn feed(
rx: &State<Feeds>,
rx_wallet: &State<watch::Receiver<WalletInfo>>,
rx_wallet: &State<watch::Receiver<Option<WalletInfo>>>,
rx_maker_status: &State<watch::Receiver<ConnectionStatus>>,
) -> EventStream![] {
let rx = rx.inner();

27
daemon/src/taker.rs

@ -5,11 +5,10 @@ use bdk::{bitcoin, FeeRate};
use clap::{Parser, Subcommand};
use daemon::connection::connect;
use daemon::model::cfd::Role;
use daemon::model::{Identity, WalletInfo};
use daemon::model::Identity;
use daemon::seed::Seed;
use daemon::tokio_ext::FutureExt;
use daemon::{
bitmex_price_feed, db, housekeeping, logger, monitor, oracle, projection, wallet, wallet_sync,
bitmex_price_feed, db, housekeeping, logger, monitor, oracle, projection, wallet,
TakerActorSystem, Tasks, HEARTBEAT_INTERVAL, N_PAYOUTS, SETTLEMENT_INTERVAL,
};
use sqlx::sqlite::SqliteConnectOptions;
@ -18,7 +17,6 @@ use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
use tokio::sync::watch;
use tracing_subscriber::filter::LevelFilter;
use xtra::Actor;
@ -170,17 +168,12 @@ async fn main() -> Result<()> {
let ext_priv_key = seed.derive_extended_priv_key(bitcoin_network)?;
let (_, identity_sk) = seed.derive_identity();
let (wallet, wallet_fut) = wallet::Actor::new(
opts.network.electrum(),
&data_dir.join("taker_wallet.sqlite"),
ext_priv_key,
)?
.create(None)
.run();
let _wallet_handle = wallet_fut.spawn_with_handle();
let mut tasks = Tasks::default();
let (wallet, wallet_feed_receiver) = wallet::Actor::new(opts.network.electrum(), ext_priv_key)?;
// do this before withdraw to ensure the wallet is synced
let wallet_info = wallet.send(wallet::Sync).await??;
let (wallet, wallet_fut) = wallet.create(None).run();
tasks.add(wallet_fut);
if let Some(Withdraw::Withdraw {
amount,
@ -204,10 +197,6 @@ async fn main() -> Result<()> {
"ddd4636845a90185991826be5a494cde9f4a6947b1727217afedc6292fa4caf7",
)?;
let (wallet_feed_sender, wallet_feed_receiver) = watch::channel::<WalletInfo>(wallet_info);
let mut tasks = Tasks::default();
let figment = rocket::Config::figment()
.merge(("address", opts.http_address.ip()))
.merge(("port", opts.http_address.port()));
@ -274,8 +263,6 @@ async fn main() -> Result<()> {
possible_addresses,
));
tasks.add(wallet_sync::new(wallet.clone(), wallet_feed_sender));
let rocket = rocket::custom(figment)
.manage(projection_feeds)
.manage(cfd_actor_addr)

12
daemon/src/to_sse_event.rs

@ -46,13 +46,13 @@ pub struct WalletInfo {
last_updated_at: Timestamp,
}
impl ToSseEvent for model::WalletInfo {
impl ToSseEvent for Option<model::WalletInfo> {
fn to_sse_event(&self) -> Event {
let wallet_info = WalletInfo {
balance: self.balance,
address: self.address.to_string(),
last_updated_at: self.last_updated_at,
};
let wallet_info = self.as_ref().map(|wallet_info| WalletInfo {
balance: wallet_info.balance,
address: wallet_info.address.to_string(),
last_updated_at: wallet_info.last_updated_at,
});
Event::json(&wallet_info).event("wallet")
}

71
daemon/src/wallet.rs

@ -1,5 +1,7 @@
use crate::model::{Timestamp, WalletInfo};
use crate::Tasks;
use anyhow::{bail, Context, Result};
use async_trait::async_trait;
use bdk::bitcoin::consensus::encode::serialize_hex;
use bdk::bitcoin::util::bip32::ExtendedPrivKey;
use bdk::bitcoin::util::psbt::PartiallySignedTransaction;
@ -11,16 +13,18 @@ use bdk::wallet::AddressIndex;
use bdk::{electrum_client, FeeRate, KeychainKind, SignOptions};
use maia::{PartyParams, TxBuilderExt};
use rocket::serde::json::Value;
use std::collections::HashSet;
use std::path::Path;
use std::time::Duration;
use tokio::sync::watch;
use xtra_productivity::xtra_productivity;
const DUST_AMOUNT: u64 = 546;
pub struct Actor {
wallet: bdk::Wallet<ElectrumBlockchain, bdk::database::SqliteDatabase>,
wallet: bdk::Wallet<ElectrumBlockchain, bdk::database::MemoryDatabase>,
used_utxos: HashSet<OutPoint>,
tasks: Tasks,
sender: watch::Sender<Option<WalletInfo>>,
}
#[derive(thiserror::Error, Debug, Clone, Copy)]
@ -30,13 +34,12 @@ pub struct TransactionAlreadyInBlockchain;
impl Actor {
pub fn new(
electrum_rpc_url: &str,
wallet_dir: &Path,
ext_priv_key: ExtendedPrivKey,
) -> Result<Self> {
) -> Result<(Self, watch::Receiver<Option<WalletInfo>>)> {
let client = bdk::electrum_client::Client::new(electrum_rpc_url)
.context("Failed to initialize Electrum RPC client")?;
let db = bdk::database::SqliteDatabase::new(wallet_dir.display().to_string());
let db = bdk::database::MemoryDatabase::new();
let wallet = bdk::Wallet::new(
bdk::template::Bip84(ext_priv_key, KeychainKind::External),
@ -46,10 +49,15 @@ impl Actor {
ElectrumBlockchain::from(client),
)?;
Ok(Self {
let (sender, receiver) = watch::channel(None);
let actor = Self {
wallet,
tasks: Tasks::default(),
sender,
used_utxos: HashSet::default(),
})
};
Ok((actor, receiver))
}
/// Calculates the maximum "giveable" amount of this wallet.
@ -86,11 +94,8 @@ impl Actor {
Err(e) => bail!("Failed to build transaction. {:#}", e),
}
}
}
#[xtra_productivity]
impl Actor {
pub fn handle_sync(&mut self, _msg: Sync) -> Result<WalletInfo> {
fn sync_internal(&mut self) -> Result<WalletInfo> {
self.wallet
.sync(NoopProgress, None)
.context("Failed to sync wallet")?;
@ -107,6 +112,24 @@ impl Actor {
Ok(wallet_info)
}
}
#[xtra_productivity]
impl Actor {
pub fn handle_sync(&mut self, _msg: Sync) -> Result<()> {
let wallet_info_update = match self.sync_internal() {
Ok(wallet_info) => Some(wallet_info),
Err(e) => {
tracing::debug!("{:#}", e);
None
}
};
let _ = self.sender.send(wallet_info_update);
Ok(())
}
pub fn handle_sign(&mut self, msg: Sign) -> Result<PartiallySignedTransaction> {
let mut psbt = msg.psbt;
@ -178,6 +201,10 @@ impl Actor {
}
pub fn handle_withdraw(&mut self, msg: Withdraw) -> Result<Txid> {
self.wallet
.sync(NoopProgress, None)
.context("Failed to sync wallet")?;
if msg.address.network != self.wallet.network() {
bail!(
"Address has invalid network. It was {} but the wallet is connected to {}",
@ -218,14 +245,30 @@ impl Actor {
}
}
impl xtra::Actor for Actor {}
#[async_trait]
impl xtra::Actor for Actor {
async fn started(&mut self, ctx: &mut xtra::Context<Self>) {
let this = ctx.address().expect("self to be alive");
self.tasks.add(async move {
loop {
tokio::time::sleep(Duration::from_secs(10)).await;
if this.send(Sync).await.is_err() {
return; // we are disconnected, meaning actor stopped, just exit the loop.
}
}
});
}
}
pub struct BuildPartyParams {
pub amount: Amount,
pub identity_pk: PublicKey,
}
pub struct Sync;
/// Private message to trigger a sync.
struct Sync;
pub struct Sign {
pub psbt: PartiallySignedTransaction,

29
daemon/src/wallet_sync.rs

@ -1,29 +0,0 @@
use crate::model::WalletInfo;
use crate::wallet;
use std::time::Duration;
use tokio::sync::watch;
use tokio::time::sleep;
use xtra::Address;
pub async fn new(wallet: Address<wallet::Actor>, sender: watch::Sender<WalletInfo>) {
loop {
sleep(Duration::from_secs(10)).await;
let info = match wallet
.send(wallet::Sync)
.await
.expect("Wallet actor to be available")
{
Ok(info) => info,
Err(e) => {
tracing::warn!("Failed to sync wallet: {:#}", e);
continue;
}
};
if sender.send(info).is_err() {
tracing::warn!("Wallet feed receiver not available, stopping wallet sync");
break;
}
}
}

8
daemon/tests/harness/mocks/wallet.rs

@ -28,10 +28,6 @@ impl WalletActor {
async fn handle(&mut self, msg: wallet::BuildPartyParams) -> Result<PartyParams> {
self.mock.lock().await.build_party_params(msg)
}
async fn handle(&mut self, msg: wallet::Sync) -> Result<WalletInfo> {
tracing::info!("We are handling the wallet sync msg");
self.mock.lock().await.sync(msg)
}
async fn handle(&mut self, msg: wallet::Sign) -> Result<PartiallySignedTransaction> {
self.mock.lock().await.sign(msg)
}
@ -53,10 +49,6 @@ pub trait Wallet {
fn broadcast(&mut self, _msg: wallet::TryBroadcastTransaction) -> Result<Txid> {
unreachable!("mockall will reimplement this method")
}
fn sync(&mut self, _msg: wallet::Sync) -> Result<WalletInfo> {
unreachable!("mockall will reimplement this method")
}
}
#[allow(dead_code)]

Loading…
Cancel
Save