|
|
@ -14,7 +14,7 @@ use std::sync::{ |
|
|
|
use std::thread; |
|
|
|
|
|
|
|
use daemon::Daemon; |
|
|
|
use index::{index_block, last_indexed_block}; |
|
|
|
use index::{index_block, last_indexed_block, read_indexed_blockhashes}; |
|
|
|
use metrics::{CounterVec, Histogram, HistogramOpts, HistogramVec, MetricOpts, Metrics}; |
|
|
|
use store::{DBStore, ReadStore, Row, WriteStore}; |
|
|
|
use util::{spawn_thread, HeaderList, SyncChannel}; |
|
|
@ -34,11 +34,15 @@ struct Parser { |
|
|
|
} |
|
|
|
|
|
|
|
impl Parser { |
|
|
|
fn new(daemon: &Daemon, metrics: &Metrics) -> Result<Arc<Parser>> { |
|
|
|
fn new( |
|
|
|
daemon: &Daemon, |
|
|
|
metrics: &Metrics, |
|
|
|
indexed_blockhashes: HashSet<Sha256dHash>, |
|
|
|
) -> Result<Arc<Parser>> { |
|
|
|
Ok(Arc::new(Parser { |
|
|
|
magic: daemon.magic(), |
|
|
|
current_headers: load_headers(daemon)?, |
|
|
|
indexed_blockhashes: Mutex::new(HashSet::new()), |
|
|
|
indexed_blockhashes: Mutex::new(indexed_blockhashes), |
|
|
|
duration: metrics.histogram_vec( |
|
|
|
HistogramOpts::new("parse_duration", "blk*.dat parsing duration (in seconds)"), |
|
|
|
&["step"], |
|
|
@ -95,7 +99,7 @@ impl Parser { |
|
|
|
self.block_count.with_label_values(&["duplicate"]).inc(); |
|
|
|
} |
|
|
|
} else { |
|
|
|
debug!("skipping block {}", blockhash); // will be indexed later (after bulk load is over)
|
|
|
|
// will be indexed later (after bulk load is over) if not an orphan block
|
|
|
|
self.block_count.with_label_values(&["skipped"]).inc(); |
|
|
|
} |
|
|
|
} |
|
|
@ -213,7 +217,9 @@ pub fn index(daemon: &Daemon, metrics: &Metrics, store: DBStore) -> Result<DBSto |
|
|
|
let result = if store.get(FINISH_MARKER).is_none() { |
|
|
|
let blk_files = daemon.list_blk_files()?; |
|
|
|
info!("indexing {} blk*.dat files", blk_files.len()); |
|
|
|
let parser = Parser::new(daemon, metrics)?; |
|
|
|
let indexed_blockhashes = read_indexed_blockhashes(&store); |
|
|
|
debug!("found {} indexed blocks", indexed_blockhashes.len()); |
|
|
|
let parser = Parser::new(daemon, metrics, indexed_blockhashes)?; |
|
|
|
let (blobs, reader) = start_reader(blk_files, parser.clone()); |
|
|
|
let rows_chan = SyncChannel::new(0); |
|
|
|
let indexers: Vec<JoinHandle> = (0..2) |
|
|
|