|
@ -17,6 +17,7 @@ use crate::daemon::Daemon; |
|
|
use crate::errors::*; |
|
|
use crate::errors::*; |
|
|
use crate::index::{index_block, last_indexed_block, read_indexed_blockhashes}; |
|
|
use crate::index::{index_block, last_indexed_block, read_indexed_blockhashes}; |
|
|
use crate::metrics::{CounterVec, Histogram, HistogramOpts, HistogramVec, MetricOpts, Metrics}; |
|
|
use crate::metrics::{CounterVec, Histogram, HistogramOpts, HistogramVec, MetricOpts, Metrics}; |
|
|
|
|
|
use crate::signal::Waiter; |
|
|
use crate::store::{DBStore, Row, WriteStore}; |
|
|
use crate::store::{DBStore, Row, WriteStore}; |
|
|
use crate::util::{spawn_thread, HeaderList, SyncChannel}; |
|
|
use crate::util::{spawn_thread, HeaderList, SyncChannel}; |
|
|
|
|
|
|
|
@ -225,6 +226,7 @@ pub fn index_blk_files( |
|
|
daemon: &Daemon, |
|
|
daemon: &Daemon, |
|
|
index_threads: usize, |
|
|
index_threads: usize, |
|
|
metrics: &Metrics, |
|
|
metrics: &Metrics, |
|
|
|
|
|
signal: &Waiter, |
|
|
store: DBStore, |
|
|
store: DBStore, |
|
|
) -> Result<DBStore> { |
|
|
) -> Result<DBStore> { |
|
|
set_open_files_limit(2048); // twice the default `ulimit -n` value
|
|
|
set_open_files_limit(2048); // twice the default `ulimit -n` value
|
|
@ -238,10 +240,14 @@ pub fn index_blk_files( |
|
|
let indexers: Vec<JoinHandle> = (0..index_threads) |
|
|
let indexers: Vec<JoinHandle> = (0..index_threads) |
|
|
.map(|_| start_indexer(blobs.clone(), parser.clone(), rows_chan.sender())) |
|
|
.map(|_| start_indexer(blobs.clone(), parser.clone(), rows_chan.sender())) |
|
|
.collect(); |
|
|
.collect(); |
|
|
Ok(spawn_thread("bulk_writer", move || -> DBStore { |
|
|
let signal = signal.clone(); |
|
|
|
|
|
spawn_thread("bulk_writer", move || -> Result<DBStore> { |
|
|
for (rows, path) in rows_chan.into_receiver() { |
|
|
for (rows, path) in rows_chan.into_receiver() { |
|
|
trace!("indexed {:?}: {} rows", path, rows.len()); |
|
|
trace!("indexed {:?}: {} rows", path, rows.len()); |
|
|
store.write(rows); |
|
|
store.write(rows); |
|
|
|
|
|
signal |
|
|
|
|
|
.poll() |
|
|
|
|
|
.chain_err(|| "stopping bulk indexing due to signal")?; |
|
|
} |
|
|
} |
|
|
reader |
|
|
reader |
|
|
.join() |
|
|
.join() |
|
@ -254,10 +260,10 @@ pub fn index_blk_files( |
|
|
.expect("indexing failed") |
|
|
.expect("indexing failed") |
|
|
}); |
|
|
}); |
|
|
store.write(vec![parser.last_indexed_row()]); |
|
|
store.write(vec![parser.last_indexed_row()]); |
|
|
store |
|
|
Ok(store) |
|
|
}) |
|
|
}) |
|
|
.join() |
|
|
.join() |
|
|
.expect("writer panicked")) |
|
|
.expect("writer panicked") |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
#[cfg(test)] |
|
|
#[cfg(test)] |
|
|