diff --git a/examples/load.rs b/examples/load.rs index cad3463..0ffd9a8 100644 --- a/examples/load.rs +++ b/examples/load.rs @@ -27,7 +27,7 @@ fn run(config: Config) -> Result<()> { config.network_type, &metrics, )?; - let store = DBStore::open(&config.db_path, StoreOptions { bulk_import: true }); + let store = DBStore::open(&config.db_path); bulk::index(&daemon, &metrics, store) } diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index c46a036..e9801d0 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -9,7 +9,7 @@ use std::time::Duration; use electrs::{ app::App, bulk, config::Config, daemon::Daemon, errors::*, index::Index, metrics::Metrics, - query::Query, rpc::RPC, signal::Waiter, store::{DBStore, StoreOptions}, + query::Query, rpc::RPC, signal::Waiter, store::DBStore, }; fn run_server(config: &Config) -> Result<()> { @@ -24,13 +24,8 @@ fn run_server(config: &Config) -> Result<()> { &metrics, )?; // Perform initial indexing from local blk*.dat block files. - bulk::index( - &daemon, - &metrics, - DBStore::open(&config.db_path, StoreOptions { bulk_import: true }), - )?; + let store = bulk::index(&daemon, &metrics, DBStore::open(&config.db_path))?; let daemon = daemon.reconnect()?; - let store = DBStore::open(&config.db_path, StoreOptions { bulk_import: false }); let index = Index::load(&store, &daemon, &metrics)?; let app = App::new(store, index, daemon); let mut tip = app.index().update(app.write_store(), &signal)?; diff --git a/src/bulk.rs b/src/bulk.rs index c7eaece..54bc3b2 100644 --- a/src/bulk.rs +++ b/src/bulk.rs @@ -209,39 +209,42 @@ fn start_indexer( }) } -pub fn index(daemon: &Daemon, metrics: &Metrics, store: DBStore) -> Result<()> { +pub fn index(daemon: &Daemon, metrics: &Metrics, store: DBStore) -> Result { set_open_files_limit(2048); // twice the default `ulimit -n` value - if store.get(FINISH_MARKER).is_some() { - return Ok(()); - } - let blk_files = daemon.list_blk_files()?; - info!("indexing {} blk*.dat files", blk_files.len()); - let parser = Parser::new(daemon, metrics)?; - let (blobs, reader) = start_reader(blk_files, parser.clone()); - let rows_chan = SyncChannel::new(0); - let indexers: Vec = (0..2) - .map(|_| start_indexer(blobs.clone(), parser.clone(), rows_chan.sender())) - .collect(); - spawn_thread("bulk_writer", move || { - for (rows, path) in rows_chan.into_receiver() { - trace!("indexed {:?}: {} rows", path, rows.len()); - store.write(rows); - } - reader - .join() - .expect("reader panicked") - .expect("reader failed"); - - indexers.into_iter().for_each(|i| { - i.join() - .expect("indexer panicked") - .expect("indexing failed") - }); - store.write(vec![parser.last_indexed_row()]); - store.flush(); - store.compact(); // will take a while. - store.put(FINISH_MARKER, b""); - }).join() - .expect("writer panicked"); - Ok(()) + let result = if store.get(FINISH_MARKER).is_none() { + let blk_files = daemon.list_blk_files()?; + info!("indexing {} blk*.dat files", blk_files.len()); + let parser = Parser::new(daemon, metrics)?; + let (blobs, reader) = start_reader(blk_files, parser.clone()); + let rows_chan = SyncChannel::new(0); + let indexers: Vec = (0..2) + .map(|_| start_indexer(blobs.clone(), parser.clone(), rows_chan.sender())) + .collect(); + spawn_thread("bulk_writer", move || -> Result { + for (rows, path) in rows_chan.into_receiver() { + trace!("indexed {:?}: {} rows", path, rows.len()); + store.write(rows); + } + reader + .join() + .expect("reader panicked") + .expect("reader failed"); + + indexers.into_iter().for_each(|i| { + i.join() + .expect("indexer panicked") + .expect("indexing failed") + }); + store.write(vec![parser.last_indexed_row()]); + store.flush(); + store.compact(); // will take a while. + store.put(FINISH_MARKER, b""); + Ok(store) + }).join() + .expect("writer panicked") + } else { + Ok(store) + }; + // Enable auto compactions after bulk indexing is over. + result.map(|store| store.enable_compaction()) } diff --git a/src/store.rs b/src/store.rs index 9166c92..16a9f5d 100644 --- a/src/store.rs +++ b/src/store.rs @@ -29,19 +29,14 @@ pub trait WriteStore: Sync { pub struct DBStore { db: rocksdb::DB, - opts: StoreOptions, -} - -#[derive(Debug)] -pub struct StoreOptions { - pub bulk_import: bool, + bulk_import: bool, } impl DBStore { /// Opens a new RocksDB at the specified location. - pub fn open(path: &Path, opts: StoreOptions) -> DBStore { + pub fn open(path: &Path) -> Self { let path = path.to_str().unwrap(); - debug!("opening {:?} with {:?}", path, &opts); + debug!("opening DB at {:?}", path); let mut db_opts = rocksdb::DBOptions::default(); db_opts.create_if_missing(true); db_opts.set_keep_log_file_num(10); @@ -54,14 +49,25 @@ impl DBStore { cf_opts.set_write_buffer_size(64 << 20); cf_opts.set_min_write_buffer_number(2); cf_opts.set_max_write_buffer_number(3); - cf_opts.set_disable_auto_compactions(opts.bulk_import); + cf_opts.set_disable_auto_compactions(true); // for initial bulk load let mut block_opts = rocksdb::BlockBasedOptions::default(); block_opts.set_block_size(1 << 20); DBStore { db: rocksdb::DB::open_cf(db_opts, path, vec![("default", cf_opts)]).unwrap(), - opts: opts, + bulk_import: true, + } + } + + pub fn enable_compaction(mut self) -> Self { + self.bulk_import = false; + { + let cf = self.db.cf_handle("default").expect("no default CF"); + self.db + .set_options_cf(cf, &vec![("disable_auto_compactions", "false")]) + .expect("failed to enable auto compactions"); } + self } pub fn sstable(&self) -> SSTableWriter { @@ -116,8 +122,8 @@ impl WriteStore for DBStore { batch.put(row.key.as_slice(), row.value.as_slice()).unwrap(); } let mut opts = rocksdb::WriteOptions::new(); - opts.set_sync(!self.opts.bulk_import); - opts.disable_wal(self.opts.bulk_import); + opts.set_sync(!self.bulk_import); + opts.disable_wal(self.bulk_import); self.db.write_opt(batch, &opts).unwrap(); }