Browse Source

Run fetching and indexing in separate threads

Indexing throughput should improve since fetching takes ~half of the time.
refactor-mempool
Roman Zeyde 7 years ago
parent
commit
71dc45074b
No known key found for this signature in database GPG Key ID: 87CAE5FA46917CBB
  1. 4
      src/bin/bench_index.rs
  2. 7
      src/bin/main.rs
  3. 90
      src/index.rs

4
src/bin/bench_index.rs

@ -35,8 +35,8 @@ fn run() -> Result<()> {
let daemon = Daemon::new(config.network_type, &metrics)?;
let fake_store = FakeStore {};
let index = Index::load(&fake_store, &metrics);
index.update(&fake_store, &daemon, &signal)?;
let index = Index::load(&fake_store, &daemon, &metrics)?;
index.update(&fake_store, &signal)?;
Ok(())
}

7
src/bin/main.rs

@ -23,9 +23,9 @@ fn run_server(config: &Config) -> Result<()> {
let daemon = Daemon::new(config.network_type, &metrics)?;
let store = DBStore::open(&config.db_path, StoreOptions { bulk_import: true });
let index = Index::load(&store, &metrics);
let index = Index::load(&store, &daemon, &metrics)?;
let mut tip = index.update(&store, &daemon, &signal)?;
let mut tip = index.update(&store, &signal)?;
store.compact_if_needed();
drop(store); // bulk import is over
@ -39,8 +39,7 @@ fn run_server(config: &Config) -> Result<()> {
while let None = signal.wait(Duration::from_secs(5)) {
query.update_mempool()?;
if tip != app.daemon().getbestblockhash()? {
tip = app.index()
.update(app.write_store(), app.daemon(), &signal)?;
tip = app.index().update(app.write_store(), &signal)?;
}
rpc.notify();
}

90
src/index.rs

@ -14,8 +14,8 @@ use daemon::Daemon;
use metrics::{Counter, Gauge, HistogramOpts, HistogramTimer, HistogramVec, MetricOpts, Metrics};
use signal::Waiter;
use store::{ReadStore, Row, WriteStore};
use util::{full_hash, hash_prefix, Bytes, FullHash, HashPrefix, HeaderEntry, HeaderList,
HeaderMap, HASH_PREFIX_LEN};
use util::{full_hash, hash_prefix, spawn_thread, Bytes, FullHash, HashPrefix, HeaderEntry,
HeaderList, HeaderMap, SyncChannel, HASH_PREFIX_LEN};
use errors::*;
@ -270,13 +270,13 @@ impl Stats {
}
}
fn update(&self, block: &Block, entry: &HeaderEntry) {
fn update(&self, block: &Block, height: usize) {
self.blocks.inc();
self.txns.inc_by(block.txdata.len() as i64);
for tx in &block.txdata {
self.vsize.inc_by(tx.get_weight() as i64 / 4);
}
self.height.set(entry.height() as i64);
self.height.set(height as i64);
}
fn start_timer(&self, step: &str) -> HistogramTimer {
@ -287,15 +287,17 @@ impl Stats {
pub struct Index {
// TODO: store also latest snapshot.
headers: RwLock<HeaderList>,
daemon: Daemon,
stats: Stats,
}
impl Index {
pub fn load(store: &ReadStore, metrics: &Metrics) -> Index {
Index {
pub fn load(store: &ReadStore, daemon: &Daemon, metrics: &Metrics) -> Result<Index> {
Ok(Index {
headers: RwLock::new(read_indexed_headers(store)),
daemon: daemon.reconnect()?,
stats: Stats::new(metrics),
}
})
}
pub fn best_header(&self) -> Option<HeaderEntry> {
@ -311,12 +313,8 @@ impl Index {
.cloned()
}
pub fn update(
&self,
store: &WriteStore,
daemon: &Daemon,
waiter: &Waiter,
) -> Result<Sha256dHash> {
pub fn update(&self, store: &WriteStore, waiter: &Waiter) -> Result<Sha256dHash> {
let daemon = self.daemon.reconnect()?;
let tip = daemon.getbestblockhash()?;
let new_headers: Vec<HeaderEntry> = {
let indexed_headers = self.headers.read().unwrap();
@ -325,38 +323,52 @@ impl Index {
new_headers.last().map(|tip| {
info!("{:?} ({} left to index)", tip, new_headers.len());
});
{
let headers_map: HashMap<Sha256dHash, &HeaderEntry> =
HashMap::from_iter(new_headers.iter().map(|h| (*h.hash(), h)));
let height_map = HashMap::<Sha256dHash, usize>::from_iter(
new_headers.iter().map(|h| (*h.hash(), h.height())),
);
let chan = SyncChannel::new(1);
let sender = chan.sender();
let fetcher = spawn_thread("fetcher", move || {
for chunk in new_headers.chunks(100) {
if let Some(sig) = waiter.poll() {
bail!("indexing interrupted by {:?}", sig);
}
let timer = self.stats.start_timer("fetch");
let hashes: Vec<Sha256dHash> = chunk.into_iter().map(|h| *h.hash()).collect();
let batch = daemon.getblocks(&hashes)?;
sender.send(daemon.getblocks(&hashes)).unwrap();
}
sender.send(Ok(vec![])).unwrap();
new_headers
});
loop {
if let Some(sig) = waiter.poll() {
bail!("indexing interrupted by SIG{:?}", sig);
}
let timer = self.stats.start_timer("fetch");
let batch = chan.receiver().recv().unwrap()?;
timer.observe_duration();
if batch.is_empty() {
break;
}
for block in &batch {
let expected_hash = block.bitcoin_hash();
let height = *height_map
.get(&expected_hash)
.expect(&format!("missing header for block {}", expected_hash));
let timer = self.stats.start_timer("index");
let rows = index_block(block, height);
timer.observe_duration();
for block in &batch {
let expected_hash = block.bitcoin_hash();
let header = headers_map
.get(&expected_hash)
.expect(&format!("missing header for block {}", expected_hash));
let timer = self.stats.start_timer("index");
let rows = index_block(block, header.height());
timer.observe_duration();
let timer = self.stats.start_timer("write");
store.write(rows);
timer.observe_duration();
self.stats.update(block, header);
}
let timer = self.stats.start_timer("write");
store.write(rows);
timer.observe_duration();
self.stats.update(block, height);
}
let timer = self.stats.start_timer("flush");
store.flush(); // make sure no row is left behind
timer.observe_duration();
}
let timer = self.stats.start_timer("flush");
store.flush(); // make sure no row is left behind
timer.observe_duration();
let new_headers = fetcher.join().unwrap();
self.headers.write().unwrap().apply(new_headers);
assert_eq!(tip, *self.headers.read().unwrap().tip());
Ok(tip)

Loading…
Cancel
Save