Browse Source

Replace indexing timer logging by proper monitoring

refactor-mempool
Roman Zeyde 7 years ago
parent
commit
05fd268b51
No known key found for this signature in database GPG Key ID: 87CAE5FA46917CBB
  1. 7
      src/bin/bench_index.rs
  2. 6
      src/bin/main.rs
  3. 53
      src/index.rs

7
src/bin/bench_index.rs

@ -5,6 +5,7 @@ use electrs::{config::Config,
daemon::Daemon,
errors::*,
index::Index,
metrics::Metrics,
signal::Waiter,
store::{ReadStore, Row, WriteStore},
util::Bytes};
@ -27,11 +28,13 @@ impl WriteStore for FakeStore {
}
fn run() -> Result<()> {
let signal = Waiter::new();
let config = Config::from_args();
let metrics = Metrics::new(config.monitoring_addr);
let daemon = Daemon::new(config.network_type)?;
let fake_store = FakeStore {};
let index = Index::load(&fake_store);
index.update(&fake_store, &daemon, &Waiter::new())?;
let index = Index::load(&fake_store, &metrics);
index.update(&fake_store, &daemon, &signal)?;
Ok(())
}

6
src/bin/main.rs

@ -10,6 +10,7 @@ use electrs::{app::App,
daemon::Daemon,
errors::*,
index::Index,
metrics::Metrics,
query::Query,
rpc::RPC,
signal::Waiter,
@ -17,6 +18,7 @@ use electrs::{app::App,
fn run_server(config: &Config) -> Result<()> {
let signal = Waiter::new();
let metrics = Metrics::new(config.monitoring_addr);
let daemon = Daemon::new(config.network_type)?;
let store = DBStore::open(
&config.db_path,
@ -25,7 +27,9 @@ fn run_server(config: &Config) -> Result<()> {
auto_compact: false,
},
);
let index = Index::load(&store);
let index = Index::load(&store, &metrics);
thread::spawn(move || metrics.serve());
let mut tip = index.update(&store, &daemon, &signal)?;
store.compact_if_needed();
drop(store); // to be re-opened soon

53
src/index.rs

@ -11,10 +11,11 @@ use std::iter::FromIterator;
use std::sync::RwLock;
use daemon::Daemon;
use metrics::{Counter, MetricOpts, Metrics};
use signal::Waiter;
use store::{ReadStore, Row, WriteStore};
use util::{self, full_hash, hash_prefix, Bytes, FullHash, HashPrefix, HeaderEntry, HeaderList,
HeaderMap, Timer, HASH_PREFIX_LEN};
use util::{full_hash, hash_prefix, Bytes, FullHash, HashPrefix, HeaderEntry, HeaderList,
HeaderMap, HASH_PREFIX_LEN};
use errors::*;
@ -203,7 +204,6 @@ fn index_block(block: &Block, height: usize) -> Vec<Row> {
}
fn read_indexed_headers(store: &ReadStore) -> HeaderList {
let mut timer = Timer::new();
let latest_blockhash: Sha256dHash = match store.get(b"L") {
// latest blockheader persisted in the DB.
Some(row) => deserialize(&row).unwrap(),
@ -215,7 +215,6 @@ fn read_indexed_headers(store: &ReadStore) -> HeaderList {
let header: BlockHeader = deserialize(&row.value).unwrap();
map.insert(deserialize(&key.hash).unwrap(), header);
}
timer.tick("load");
let mut headers = vec![];
let null_hash = Sha256dHash::default();
let mut blockhash = latest_blockhash;
@ -240,37 +239,32 @@ fn read_indexed_headers(store: &ReadStore) -> HeaderList {
.unwrap_or(null_hash),
latest_blockhash
);
timer.tick("verify");
let mut result = HeaderList::empty();
let headers_len = headers.len();
let entries = result.order(headers);
result.apply(entries);
timer.tick("apply");
debug!("{} headers' verification {:?}", headers_len, timer);
result
}
#[derive(Debug)]
struct Stats {
blocks: usize,
txns: usize,
vsize: usize,
blocks: Counter,
txns: Counter,
vsize: Counter,
}
impl Stats {
fn new() -> Stats {
fn new(metrics: &Metrics) -> Stats {
Stats {
blocks: 0,
txns: 0,
vsize: 0,
blocks: metrics.counter(MetricOpts::new("index_blocks", "# of indexed blocks")),
txns: metrics.counter(MetricOpts::new("index_txns", "# of indexed transactions")),
vsize: metrics.counter(MetricOpts::new("index_vsize", "# of indexed vbytes")),
}
}
fn update(&mut self, block: &Block) {
self.blocks += 1;
self.txns += block.txdata.len();
fn update(&self, block: &Block) {
self.blocks.inc();
self.txns.inc_by(block.txdata.len() as i64);
for tx in &block.txdata {
self.vsize += tx.get_weight() as usize / 4;
self.vsize.inc_by(tx.get_weight() as i64 / 4);
}
}
}
@ -278,12 +272,14 @@ impl Stats {
pub struct Index {
// TODO: store also latest snapshot.
headers: RwLock<HeaderList>,
stats: Stats,
}
impl Index {
pub fn load(store: &ReadStore) -> Index {
pub fn load(store: &ReadStore, metrics: &Metrics) -> Index {
Index {
headers: RwLock::new(read_indexed_headers(store)),
stats: (Stats::new(metrics)),
}
}
@ -315,10 +311,6 @@ impl Index {
info!("{:?} ({} left to index)", tip, new_headers.len());
});
{
let mut timer = Timer::new();
let mut stats = Stats::new();
let mut bar = util::new_progress_bar(new_headers.len());
bar.message("Blocks: ");
let headers_map: HashMap<Sha256dHash, &HeaderEntry> =
HashMap::from_iter(new_headers.iter().map(|h| (*h.hash(), h)));
for chunk in new_headers.chunks(100) {
@ -328,7 +320,6 @@ impl Index {
// Download new blocks
let hashes: Vec<Sha256dHash> = chunk.into_iter().map(|h| *h.hash()).collect();
let batch = daemon.getblocks(&hashes)?;
timer.tick("get");
for block in &batch {
let expected_hash = block.bitcoin_hash();
let header = headers_map
@ -337,21 +328,13 @@ impl Index {
// Index it
let rows = index_block(block, header.height());
timer.tick("index");
// Write to DB
store.write(rows);
timer.tick("write");
stats.update(block);
}
if bar.add(batch.len() as u64) % 10000 == 0 {
debug!("index update {:?} {:?}", stats, timer);
self.stats.update(block);
}
}
store.flush(); // make sure no row is left behind
timer.tick("write");
bar.finish();
debug!("index update {:?} {:?}", stats, timer);
}
self.headers.write().unwrap().apply(new_headers);
assert_eq!(tip, *self.headers.read().unwrap().tip());

Loading…
Cancel
Save