Browse Source

Persist full compaction marker after bulk indexing is over

refactor-mempool
Roman Zeyde 7 years ago
parent
commit
d8e53246c4
No known key found for this signature in database GPG Key ID: 87CAE5FA46917CBB
  1. 2
      src/bin/electrs.rs
  2. 21
      src/bulk.rs
  3. 4
      src/store.rs

2
src/bin/electrs.rs

@ -31,7 +31,7 @@ fn run_server(config: &Config) -> Result<()> {
let index = Index::load(&store, &daemon, &metrics, config.index_batch_size)?;
let store = if config.skip_bulk_import {
index.update(&store, &signal)?; // slower: uses JSONRPC for fetching blocks
bulk::compact(store)
bulk::full_compaction(store)
} else {
bulk::index(&daemon, &metrics, store) // faster, but uses more memory
}?;

21
src/bulk.rs

@ -22,7 +22,12 @@ use util::{spawn_thread, HeaderList, SyncChannel};
use errors::*;
const FINISH_MARKER: &'static [u8] = b"F";
fn finish_marker_row() -> Row {
Row {
key: b"F".to_vec(),
value: b"".to_vec(),
}
}
struct Parser {
magic: u32,
@ -217,7 +222,9 @@ fn start_indexer(
pub fn index(daemon: &Daemon, metrics: &Metrics, store: DBStore) -> Result<DBStore> {
set_open_files_limit(2048); // twice the default `ulimit -n` value
let result = if store.get(FINISH_MARKER).is_none() {
let marker = store.get(&finish_marker_row().key);
debug!("full compaction marker: {:?}", marker);
let result = if marker.is_none() {
let blk_files = daemon.list_blk_files()?;
info!("indexing {} blk*.dat files", blk_files.len());
let indexed_blockhashes = read_indexed_blockhashes(&store);
@ -244,7 +251,7 @@ pub fn index(daemon: &Daemon, metrics: &Metrics, store: DBStore) -> Result<DBSto
.expect("indexing failed")
});
store.write(vec![parser.last_indexed_row()]);
Ok(store.compact())
full_compaction(store)
}).join()
.expect("writer panicked")
} else {
@ -254,9 +261,9 @@ pub fn index(daemon: &Daemon, metrics: &Metrics, store: DBStore) -> Result<DBSto
result.map(|store| store.enable_compaction())
}
pub fn compact(store: DBStore) -> Result<DBStore> {
pub fn full_compaction(store: DBStore) -> Result<DBStore> {
store.flush();
let store = store.compact(); // will take a while.
store.put(FINISH_MARKER, b"");
Ok(store.enable_compaction())
let store = store.compact().enable_compaction();
store.write(vec![finish_marker_row()]);
Ok(store)
}

4
src/store.rs

@ -79,10 +79,6 @@ impl DBStore {
}
}
pub fn put(&self, key: &[u8], value: &[u8]) {
self.db.put(key, value).unwrap();
}
pub fn compact(self) -> Self {
let opts = self.opts.clone();
drop(self); // DB must be closed before being re-opened

Loading…
Cancel
Save