diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index 80119bf..3c5b7cc 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -2,12 +2,13 @@ * Use `configure_me` instead of `clap` to support config files, environment variables and man pages (@Kixunil) * Don't accept `--cookie` via CLI arguments (@Kixunil) +* Define cache size in MB instead of number of elements (@dagurval) * Support Rust >=1.34 (for Debian) # 0.7.1 (27 July 2019) * Allow stopping bulk indexing via SIGINT/SIGTERM -* Cache list of transaction IDs for blocks +* Cache list of transaction IDs for blocks (@dagurval) # 0.7.0 (13 June 2019) diff --git a/config_spec.toml b/config_spec.toml index b877401..c364336 100644 --- a/config_spec.toml +++ b/config_spec.toml @@ -76,16 +76,16 @@ doc = "Number of threads used for bulk indexing (default: use the # of CPUs)" default = "0" [[param]] -name = "tx_cache_size" -type = "usize" -doc = "Number of transactions to keep in for query LRU cache" -default = "10000" +name = "tx_cache_size_mb" +type = "f32" +doc = "Total size of transactions to cache (MB)" +default = "10.0" [[param]] -name = "blocktxids_cache_size" -type = "usize" -doc = "Number of blocks to cache transactions IDs in LRU cache" -default = "100" +name = "blocktxids_cache_size_mb" +type = "f32" +doc = "Total size of block transactions IDs to cache (in MB)" +default = "10.0" [[param]] name = "txid_limit" diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index cca1c01..6a80a55 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -12,13 +12,13 @@ use std::time::Duration; use electrs::{ app::App, bulk, - cache::BlockTxIDsCache, + cache::{BlockTxIDsCache, TransactionCache}, config::Config, daemon::Daemon, errors::*, index::Index, metrics::Metrics, - query::{Query, TransactionCache}, + query::Query, rpc::RPC, signal::Waiter, store::{full_compaction, is_fully_compacted, DBStore}, @@ -58,7 +58,7 @@ fn run_server(config: &Config) -> Result<()> { .enable_compaction(); // enable auto compactions before starting incremental index updates. let app = App::new(store, index, daemon, &config)?; - let tx_cache = TransactionCache::new(config.tx_cache_size); + let tx_cache = TransactionCache::new(config.tx_cache_size, &metrics); let query = Query::new(app.clone(), &metrics, tx_cache, config.txid_limit); let mut server = None; // Electrum RPC server diff --git a/src/cache.rs b/src/cache.rs index 4a64776..5d21da4 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -1,27 +1,76 @@ use crate::errors::*; -use crate::metrics::{Counter, MetricOpts, Metrics}; +use crate::metrics::{CounterVec, MetricOpts, Metrics}; + +use bitcoin::blockdata::transaction::Transaction; +use bitcoin::consensus::encode::deserialize; use bitcoin_hashes::sha256d::Hash as Sha256dHash; use lru::LruCache; +use std::hash::Hash; use std::sync::Mutex; +struct SizedLruCache { + map: LruCache, + bytes_usage: usize, + bytes_capacity: usize, + lookups: CounterVec, +} + +impl SizedLruCache { + fn new(bytes_capacity: usize, lookups: CounterVec) -> SizedLruCache { + SizedLruCache { + map: LruCache::unbounded(), + bytes_usage: 0, + bytes_capacity, + lookups, + } + } + + fn get(&mut self, key: &K) -> Option<&V> { + match self.map.get(key) { + None => { + self.lookups.with_label_values(&["miss"]).inc(); + None + } + Some((value, _)) => { + self.lookups.with_label_values(&["hit"]).inc(); + Some(value) + } + } + } + + fn put(&mut self, key: K, value: V, byte_size: usize) { + if byte_size > self.bytes_capacity { + return; + } + if let Some((_, popped_size)) = self.map.put(key, (value, byte_size)) { + self.bytes_usage -= popped_size + } + self.bytes_usage += byte_size; + + while self.bytes_usage > self.bytes_capacity { + match self.map.pop_lru() { + Some((_, (_, popped_size))) => self.bytes_usage -= popped_size, + None => return, + } + } + } +} + pub struct BlockTxIDsCache { - map: Mutex>>, - hits: Counter, - misses: Counter, + map: Mutex>>, } impl BlockTxIDsCache { - pub fn new(capacity: usize, metrics: &Metrics) -> BlockTxIDsCache { + pub fn new(bytes_capacity: usize, metrics: &Metrics) -> BlockTxIDsCache { + let lookups = metrics.counter_vec( + MetricOpts::new( + "electrs_blocktxids_cache", + "# of cache lookups for list of transactions in a block", + ), + &["type"], + ); BlockTxIDsCache { - map: Mutex::new(LruCache::new(capacity)), - hits: metrics.counter(MetricOpts::new( - "electrs_blocktxids_cache_hits", - "# of cache hits for list of transactions in a block", - )), - misses: metrics.counter(MetricOpts::new( - "electrs_blocktxids_cache_misses", - "# of cache misses for list of transactions in a block", - )), + map: Mutex::new(SizedLruCache::new(bytes_capacity, lookups)), } } @@ -34,29 +83,111 @@ impl BlockTxIDsCache { F: FnOnce() -> Result>, { if let Some(txids) = self.map.lock().unwrap().get(blockhash) { - self.hits.inc(); return Ok(txids.clone()); } - self.misses.inc(); let txids = load_txids_func()?; - self.map.lock().unwrap().put(*blockhash, txids.clone()); + let byte_size = 32 /* hash size */ * (1 /* key */ + txids.len() /* values */); + self.map + .lock() + .unwrap() + .put(*blockhash, txids.clone(), byte_size); Ok(txids) } } +pub struct TransactionCache { + // Store serialized transaction (should use less RAM). + map: Mutex>>, +} + +impl TransactionCache { + pub fn new(bytes_capacity: usize, metrics: &Metrics) -> TransactionCache { + let lookups = metrics.counter_vec( + MetricOpts::new( + "electrs_transactions_cache", + "# of cache lookups for transactions", + ), + &["type"], + ); + TransactionCache { + map: Mutex::new(SizedLruCache::new(bytes_capacity, lookups)), + } + } + + pub fn get_or_else(&self, txid: &Sha256dHash, load_txn_func: F) -> Result + where + F: FnOnce() -> Result>, + { + match self.map.lock().unwrap().get(txid) { + Some(serialized_txn) => { + return Ok(deserialize(&serialized_txn).chain_err(|| "failed to parse cached tx")?); + } + None => {} + } + let serialized_txn = load_txn_func()?; + let txn = deserialize(&serialized_txn).chain_err(|| "failed to parse serialized tx")?; + let byte_size = 32 /* key (hash size) */ + serialized_txn.len(); + self.map + .lock() + .unwrap() + .put(*txid, serialized_txn, byte_size); + Ok(txn) + } +} + #[cfg(test)] mod tests { use super::*; use bitcoin_hashes::Hash; + #[test] + fn test_sized_lru_cache_hit_and_miss() { + let counter = CounterVec::new(prometheus::Opts::new("name", "help"), &["type"]).unwrap(); + let mut cache = SizedLruCache::::new(100, counter.clone()); + assert_eq!(counter.with_label_values(&["miss"]).get(), 0); + assert_eq!(counter.with_label_values(&["hit"]).get(), 0); + + assert_eq!(cache.get(&1), None); // no such key + assert_eq!(counter.with_label_values(&["miss"]).get(), 1); + assert_eq!(counter.with_label_values(&["hit"]).get(), 0); + + cache.put(1, 10, 50); // add new key-value + assert_eq!(cache.get(&1), Some(&10)); + assert_eq!(counter.with_label_values(&["miss"]).get(), 1); + assert_eq!(counter.with_label_values(&["hit"]).get(), 1); + + cache.put(3, 30, 50); // drop oldest key (1) + cache.put(2, 20, 50); + assert_eq!(cache.get(&1), None); + assert_eq!(cache.get(&2), Some(&20)); + assert_eq!(cache.get(&3), Some(&30)); + assert_eq!(counter.with_label_values(&["miss"]).get(), 2); + assert_eq!(counter.with_label_values(&["hit"]).get(), 3); + + cache.put(3, 33, 50); // replace existing value + assert_eq!(cache.get(&1), None); + assert_eq!(cache.get(&2), Some(&20)); + assert_eq!(cache.get(&3), Some(&33)); + assert_eq!(counter.with_label_values(&["miss"]).get(), 3); + assert_eq!(counter.with_label_values(&["hit"]).get(), 5); + + cache.put(9, 90, 9999); // larger than cache capacity, don't drop the cache + assert_eq!(cache.get(&1), None); + assert_eq!(cache.get(&2), Some(&20)); + assert_eq!(cache.get(&3), Some(&33)); + assert_eq!(cache.get(&9), None); + assert_eq!(counter.with_label_values(&["miss"]).get(), 5); + assert_eq!(counter.with_label_values(&["hit"]).get(), 7); + } + fn gen_hash(seed: u8) -> Sha256dHash { let bytes: Vec = (seed..seed + 32).collect(); Sha256dHash::hash(&bytes[..]) } #[test] - fn test_cache_hit_and_miss() { + fn test_blocktxids_cache_hit_and_miss() { let block1 = gen_hash(1); let block2 = gen_hash(2); let block3 = gen_hash(3); @@ -69,7 +200,8 @@ mod tests { }; let dummy_metrics = Metrics::new("127.0.0.1:60000".parse().unwrap()); - let cache = BlockTxIDsCache::new(2, &dummy_metrics); + // 200 bytes ~ 32 (bytes/hash) * (1 key hash + 2 value hashes) * 2 txns + let cache = BlockTxIDsCache::new(200, &dummy_metrics); // cache miss let result = cache.get_or_else(&block1, &miss_func).unwrap(); @@ -81,7 +213,7 @@ mod tests { assert_eq!(1, *misses.lock().unwrap()); assert_eq!(txids, result); - // cache size is 2, test that blockhash1 falls out of cache + // cache size is 200, test that blockhash1 falls out of cache cache.get_or_else(&block2, &miss_func).unwrap(); assert_eq!(2, *misses.lock().unwrap()); cache.get_or_else(&block3, &miss_func).unwrap(); @@ -94,4 +226,36 @@ mod tests { cache.get_or_else(&block1, &miss_func).unwrap(); assert_eq!(4, *misses.lock().unwrap()); } + + #[test] + fn test_txn_cache() { + use bitcoin::util::hash::BitcoinHash; + use hex; + + let dummy_metrics = Metrics::new("127.0.0.1:60000".parse().unwrap()); + let cache = TransactionCache::new(1024, &dummy_metrics); + let tx_bytes = hex::decode("0100000001a15d57094aa7a21a28cb20b59aab8fc7d1149a3bdbcddba9c622e4f5f6a99ece010000006c493046022100f93bb0e7d8db7bd46e40132d1f8242026e045f03a0efe71bbb8e3f475e970d790221009337cd7f1f929f00cc6ff01f03729b069a7c21b59b1736ddfee5db5946c5da8c0121033b9b137ee87d5a812d6f506efdd37f0affa7ffc310711c06c7f3e097c9447c52ffffffff0100e1f505000000001976a9140389035a9225b3839e2bbf32d826a1e222031fd888ac00000000").unwrap(); + + let tx: Transaction = deserialize(&tx_bytes).unwrap(); + let txid = tx.bitcoin_hash(); + + let mut misses = 0; + assert_eq!( + cache + .get_or_else(&txid, || { + misses += 1; + Ok(tx_bytes.clone()) + }) + .unwrap(), + tx + ); + assert_eq!(misses, 1); + assert_eq!( + cache + .get_or_else(&txid, || panic!("should not be called")) + .unwrap(), + tx + ); + assert_eq!(misses, 1); + } } diff --git a/src/config.rs b/src/config.rs index 820f139..5010cc4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -234,6 +234,7 @@ impl Config { if config.bulk_index_threads == 0 { config.bulk_index_threads = num_cpus::get(); } + const MB: f32 = (1 << 20) as f32; let config = Config { log, network_type: config.network, @@ -246,8 +247,8 @@ impl Config { jsonrpc_import: config.jsonrpc_import, index_batch_size: config.index_batch_size, bulk_index_threads: config.bulk_index_threads, - tx_cache_size: config.tx_cache_size, - blocktxids_cache_size: config.blocktxids_cache_size, + tx_cache_size: (config.tx_cache_size_mb * MB) as usize, + blocktxids_cache_size: (config.blocktxids_cache_size_mb * MB) as usize, txid_limit: config.txid_limit, server_banner: config.server_banner, }; diff --git a/src/query.rs b/src/query.rs index 063a502..7b40d17 100644 --- a/src/query.rs +++ b/src/query.rs @@ -5,12 +5,12 @@ use bitcoin_hashes::sha256d::Hash as Sha256dHash; use bitcoin_hashes::Hash; use crypto::digest::Digest; use crypto::sha2::Sha256; -use lru::LruCache; use serde_json::Value; use std::collections::HashMap; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, RwLock}; use crate::app::App; +use crate::cache::TransactionCache; use crate::errors::*; use crate::index::{compute_script_hash, TxInRow, TxOutRow, TxRow}; use crate::mempool::Tracker; @@ -177,30 +177,6 @@ fn txids_by_funding_output( .collect() } -pub struct TransactionCache { - map: Mutex>, -} - -impl TransactionCache { - pub fn new(capacity: usize) -> TransactionCache { - TransactionCache { - map: Mutex::new(LruCache::new(capacity)), - } - } - - fn get_or_else(&self, txid: &Sha256dHash, load_txn_func: F) -> Result - where - F: FnOnce() -> Result, - { - if let Some(txn) = self.map.lock().unwrap().get(txid) { - return Ok(txn.clone()); - } - let txn = load_txn_func()?; - self.map.lock().unwrap().put(*txid, txn.clone()); - Ok(txn) - } -} - pub struct Query { app: Arc, tracker: RwLock, @@ -378,7 +354,12 @@ impl Query { fn load_txn(&self, txid: &Sha256dHash, block_height: Option) -> Result { self.tx_cache.get_or_else(&txid, || { let blockhash = self.lookup_confirmed_blockhash(txid, block_height)?; - self.app.daemon().gettransaction(txid, blockhash) + let value: Value = self + .app + .daemon() + .gettransaction_raw(txid, blockhash, /*verbose*/ false)?; + let value_hex: &str = value.as_str().chain_err(|| "non-string tx")?; + hex::decode(&value_hex).chain_err(|| "non-hex tx") }) }