From 14ae2a6b71b06b51a4eef9b6bbcf9d80722723ce Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Wed, 30 May 2018 21:27:00 +0300 Subject: [PATCH] Rewrite indexing using new HeaderList API --- src/app.rs | 2 +- src/index.rs | 111 ++++++++++++++++++++++++++++++--------------------- 2 files changed, 66 insertions(+), 47 deletions(-) diff --git a/src/app.rs b/src/app.rs index 93336e5..92cae5b 100644 --- a/src/app.rs +++ b/src/app.rs @@ -99,7 +99,6 @@ impl App { } fn run_server(config: &Config) -> Result<()> { - let index = index::Index::new(); let daemon = daemon::Daemon::new(config.network_type)?; debug!("{:?}", daemon.getblockchaininfo()?); @@ -110,6 +109,7 @@ fn run_server(config: &Config) -> Result<()> { auto_compact: false, }, ); + let index = index::Index::load(&store); let mut tip = index.update(&store, &daemon)?; store.compact_if_needed(); drop(store); // to be re-opened soon diff --git a/src/index.rs b/src/index.rs index 0ad91a3..bfadd4f 100644 --- a/src/index.rs +++ b/src/index.rs @@ -8,7 +8,7 @@ use crypto::digest::Digest; use crypto::sha2::Sha256; use pbr; use std::io::{stderr, Stderr}; -use std::sync::{Arc, RwLock}; +use std::sync::RwLock; use std::time::{Duration, Instant}; use daemon::Daemon; @@ -200,26 +200,51 @@ fn index_block(block: &Block, height: usize) -> Vec { rows } -fn read_indexed_headers(store: &ReadStore) -> HeaderMap { - let mut headers = HeaderMap::new(); +fn read_indexed_headers(store: &ReadStore) -> HeaderList { + let latest_blockhash: Sha256dHash = match store.get(b"L") { + // latest blockheader persisted in the DB. + Some(row) => deserialize(&row).unwrap(), + None => Sha256dHash::default(), + }; + let mut map = HeaderMap::new(); for row in store.scan(b"B") { let key: BlockKey = bincode::deserialize(&row.key).unwrap(); let header: BlockHeader = deserialize(&row.value).unwrap(); - headers.insert(deserialize(&key.hash).unwrap(), header); + map.insert(deserialize(&key.hash).unwrap(), header); } - debug!("read {} block headers from DB", headers.len()); - headers -} - -fn read_last_indexed_blockhash(store: &ReadStore) -> Sha256dHash { - match store.get(b"L") { - Some(row) => deserialize(&row).unwrap(), - None => Sha256dHash::default(), + debug!("read {} block headers from DB", map.len()); + let mut headers = vec![]; + let null_hash = Sha256dHash::default(); + let mut blockhash = latest_blockhash; + while blockhash != null_hash { + let header = map.remove(&blockhash) + .expect(&format!("missing {} header in DB", blockhash)); + blockhash = header.prev_blockhash; + headers.push(header); } + headers.reverse(); + assert_eq!( + headers + .first() + .map(|h| h.prev_blockhash) + .unwrap_or(null_hash), + null_hash + ); + assert_eq!( + headers + .last() + .map(|h| h.bitcoin_hash()) + .unwrap_or(null_hash), + latest_blockhash + ); + let mut result = HeaderList::empty(); + let entries = result.order(headers); + result.apply(entries); + result } struct Indexer<'a> { - headers: Vec<&'a HeaderEntry>, + headers: &'a [HeaderEntry], header_index: usize, daemon: &'a Daemon, @@ -231,16 +256,7 @@ struct Indexer<'a> { } impl<'a> Indexer<'a> { - fn new( - headers: Vec<&'a HeaderEntry>, - daemon: &'a Daemon, - use_progress_bar: bool, - ) -> Indexer<'a> { - let bar = if use_progress_bar { - Some(ProgressBar::on(stderr(), headers.len() as u64)) - } else { - None - }; + fn new(headers: &'a [HeaderEntry], daemon: &'a Daemon) -> Indexer<'a> { Indexer { headers: headers, header_index: 0, @@ -250,7 +266,11 @@ impl<'a> Indexer<'a> { blocks_size: 0, rows_size: 0, num_of_rows: 0, - bar: bar, + bar: if headers.len() > 1 { + Some(ProgressBar::on(stderr(), headers.len() as u64)) + } else { + None + }, } } @@ -267,7 +287,7 @@ impl<'a> Iterator for Indexer<'a> { self.bar.as_mut().map(|b| b.finish()); return None; } - let &entry = &self.headers[self.header_index]; + let entry = &self.headers[self.header_index]; let blockhash = entry.hash(); let block = self.daemon.getblock(&blockhash).unwrap(); @@ -337,39 +357,38 @@ impl<'a> Iterator for Batching<'a> { pub struct Index { // TODO: store also a &HeaderMap. // TODO: store also latest snapshot. - headers: RwLock>, + headers: RwLock, } impl Index { - pub fn new() -> Index { + pub fn load(store: &ReadStore) -> Index { Index { - headers: RwLock::new(Arc::new(HeaderList::empty())), + headers: RwLock::new(read_indexed_headers(store)), } } - pub fn headers_list(&self) -> Arc { - self.headers.read().unwrap().clone() + pub fn best_header(&self) -> Option { + let headers = self.headers.read().unwrap(); + headers.header_by_blockhash(headers.tip()).cloned() + } + + pub fn get_header(&self, height: usize) -> Option { + let entry = self.headers.read().unwrap().headers().get(height)?.clone(); + assert_eq!(entry.height(), height); + Some(entry) } pub fn update(&self, store: &DBStore, daemon: &Daemon) -> Result { - let mut indexed_headers: Arc = self.headers_list(); - let no_indexed_headers = indexed_headers.headers().is_empty(); - if no_indexed_headers { - indexed_headers = Arc::new(HeaderList::build( - read_indexed_headers(store), - read_last_indexed_blockhash(store), - )); - } - let latest_headers = daemon.get_latest_headers(&*indexed_headers)?; - for rows in Batching::new(Indexer::new( - latest_headers.get_missing_headers(&indexed_headers.as_map()), - &daemon, - /*use_progress_bar=*/ no_indexed_headers, - )) { + let tip = daemon.getbestblockhash()?; + let new_headers = daemon.get_new_headers(&self.headers.read().unwrap(), &tip)?; + new_headers.last().map(|tip| { + info!("{:?} ({} left to index)", tip, new_headers.len()); + }); + for rows in Batching::new(Indexer::new(&new_headers, &daemon)) { store.write(rows); } - let tip = latest_headers.tip(); - *(self.headers.write().unwrap()) = Arc::new(latest_headers); + self.headers.write().unwrap().apply(new_headers); + assert_eq!(tip, *self.headers.read().unwrap().tip()); Ok(tip) } }