Browse Source

Reload block header index only after bulk indexing is over

refactor-mempool
Roman Zeyde 6 years ago
parent
commit
00da50d1ac
No known key found for this signature in database GPG Key ID: 87CAE5FA46917CBB
  1. 1
      src/app.rs
  2. 27
      src/bin/electrs.rs
  3. 84
      src/bulk.rs

1
src/app.rs

@ -18,7 +18,6 @@ impl App {
index: index::Index,
daemon: daemon::Daemon,
) -> Result<Arc<App>> {
index.reload(&store);
Ok(Arc::new(App {
store,
index,

27
src/bin/electrs.rs

@ -9,8 +9,17 @@ use std::process;
use std::time::Duration;
use electrs::{
app::App, bulk, config::Config, daemon::Daemon, errors::*, index::Index, metrics::Metrics,
query::Query, rpc::RPC, signal::Waiter, store::DBStore,
app::App,
bulk,
config::Config,
daemon::Daemon,
errors::*,
index::Index,
metrics::Metrics,
query::Query,
rpc::RPC,
signal::Waiter,
store::{full_compaction, is_fully_compacted, DBStore},
};
fn run_server(config: &Config) -> Result<()> {
@ -31,10 +40,18 @@ fn run_server(config: &Config) -> Result<()> {
let index = Index::load(&store, &daemon, &metrics, config.index_batch_size)?;
let store = if config.skip_bulk_import {
index.update(&store, &signal)?; // slower: uses JSONRPC for fetching blocks
bulk::full_compaction(store)
full_compaction(store)
} else {
bulk::index(&daemon, &metrics, store) // faster, but uses more memory
}?;
// faster, but uses more memory
if is_fully_compacted(&store) == false {
let store = bulk::index_blk_files(&daemon, &metrics, store)?;
let store = full_compaction(store);
index.reload(&store); // make sure the block header index is up-to-date
store
} else {
store
}
}.enable_compaction(); // enable auto compactions before starting incremental index updates.
let app = App::new(store, index, daemon)?;
let query = Query::new(app.clone(), &metrics);

84
src/bulk.rs

@ -17,18 +17,11 @@ use std::thread;
use daemon::Daemon;
use index::{index_block, last_indexed_block, read_indexed_blockhashes};
use metrics::{CounterVec, Histogram, HistogramOpts, HistogramVec, MetricOpts, Metrics};
use store::{DBStore, ReadStore, Row, WriteStore};
use store::{DBStore, Row, WriteStore};
use util::{spawn_thread, HeaderList, SyncChannel};
use errors::*;
fn finish_marker_row() -> Row {
Row {
key: b"F".to_vec(),
value: b"".to_vec(),
}
}
struct Parser {
magic: u32,
current_headers: HeaderList,
@ -220,50 +213,35 @@ fn start_indexer(
})
}
pub fn index(daemon: &Daemon, metrics: &Metrics, store: DBStore) -> Result<DBStore> {
pub fn index_blk_files(daemon: &Daemon, metrics: &Metrics, store: DBStore) -> Result<DBStore> {
set_open_files_limit(2048); // twice the default `ulimit -n` value
let marker = store.get(&finish_marker_row().key);
debug!("full compaction marker: {:?}", marker);
let result = if marker.is_none() {
let blk_files = daemon.list_blk_files()?;
info!("indexing {} blk*.dat files", blk_files.len());
let indexed_blockhashes = read_indexed_blockhashes(&store);
debug!("found {} indexed blocks", indexed_blockhashes.len());
let parser = Parser::new(daemon, metrics, indexed_blockhashes)?;
let (blobs, reader) = start_reader(blk_files, parser.clone());
let rows_chan = SyncChannel::new(0);
let indexers: Vec<JoinHandle> = (0..2)
.map(|_| start_indexer(blobs.clone(), parser.clone(), rows_chan.sender()))
.collect();
spawn_thread("bulk_writer", move || -> Result<DBStore> {
for (rows, path) in rows_chan.into_receiver() {
trace!("indexed {:?}: {} rows", path, rows.len());
store.write(rows);
}
reader
.join()
.expect("reader panicked")
.expect("reader failed");
indexers.into_iter().for_each(|i| {
i.join()
.expect("indexer panicked")
.expect("indexing failed")
});
store.write(vec![parser.last_indexed_row()]);
full_compaction(store)
}).join()
.expect("writer panicked")
} else {
Ok(store)
};
// Enable auto compactions after bulk indexing and full compaction are over.
result.map(|store| store.enable_compaction())
}
pub fn full_compaction(store: DBStore) -> Result<DBStore> {
store.flush();
let store = store.compact().enable_compaction();
store.write(vec![finish_marker_row()]);
Ok(store)
let blk_files = daemon.list_blk_files()?;
info!("indexing {} blk*.dat files", blk_files.len());
let indexed_blockhashes = read_indexed_blockhashes(&store);
debug!("found {} indexed blocks", indexed_blockhashes.len());
let parser = Parser::new(daemon, metrics, indexed_blockhashes)?;
let (blobs, reader) = start_reader(blk_files, parser.clone());
let rows_chan = SyncChannel::new(0);
let indexers: Vec<JoinHandle> = (0..2)
.map(|_| start_indexer(blobs.clone(), parser.clone(), rows_chan.sender()))
.collect();
Ok(spawn_thread("bulk_writer", move || -> DBStore {
for (rows, path) in rows_chan.into_receiver() {
trace!("indexed {:?}: {} rows", path, rows.len());
store.write(rows);
}
reader
.join()
.expect("reader panicked")
.expect("reader failed");
indexers.into_iter().for_each(|i| {
i.join()
.expect("indexer panicked")
.expect("indexing failed")
});
store.write(vec![parser.last_indexed_row()]);
store
}).join()
.expect("writer panicked"))
}

Loading…
Cancel
Save