Browse Source

Rewrite indexing using new HeaderList API

refactor-mempool
Roman Zeyde 6 years ago
parent
commit
14ae2a6b71
No known key found for this signature in database GPG Key ID: 87CAE5FA46917CBB
  1. 2
      src/app.rs
  2. 111
      src/index.rs

2
src/app.rs

@ -99,7 +99,6 @@ impl App {
}
fn run_server(config: &Config) -> Result<()> {
let index = index::Index::new();
let daemon = daemon::Daemon::new(config.network_type)?;
debug!("{:?}", daemon.getblockchaininfo()?);
@ -110,6 +109,7 @@ fn run_server(config: &Config) -> Result<()> {
auto_compact: false,
},
);
let index = index::Index::load(&store);
let mut tip = index.update(&store, &daemon)?;
store.compact_if_needed();
drop(store); // to be re-opened soon

111
src/index.rs

@ -8,7 +8,7 @@ use crypto::digest::Digest;
use crypto::sha2::Sha256;
use pbr;
use std::io::{stderr, Stderr};
use std::sync::{Arc, RwLock};
use std::sync::RwLock;
use std::time::{Duration, Instant};
use daemon::Daemon;
@ -200,26 +200,51 @@ fn index_block(block: &Block, height: usize) -> Vec<Row> {
rows
}
fn read_indexed_headers(store: &ReadStore) -> HeaderMap {
let mut headers = HeaderMap::new();
fn read_indexed_headers(store: &ReadStore) -> HeaderList {
let latest_blockhash: Sha256dHash = match store.get(b"L") {
// latest blockheader persisted in the DB.
Some(row) => deserialize(&row).unwrap(),
None => Sha256dHash::default(),
};
let mut map = HeaderMap::new();
for row in store.scan(b"B") {
let key: BlockKey = bincode::deserialize(&row.key).unwrap();
let header: BlockHeader = deserialize(&row.value).unwrap();
headers.insert(deserialize(&key.hash).unwrap(), header);
map.insert(deserialize(&key.hash).unwrap(), header);
}
debug!("read {} block headers from DB", headers.len());
headers
}
fn read_last_indexed_blockhash(store: &ReadStore) -> Sha256dHash {
match store.get(b"L") {
Some(row) => deserialize(&row).unwrap(),
None => Sha256dHash::default(),
debug!("read {} block headers from DB", map.len());
let mut headers = vec![];
let null_hash = Sha256dHash::default();
let mut blockhash = latest_blockhash;
while blockhash != null_hash {
let header = map.remove(&blockhash)
.expect(&format!("missing {} header in DB", blockhash));
blockhash = header.prev_blockhash;
headers.push(header);
}
headers.reverse();
assert_eq!(
headers
.first()
.map(|h| h.prev_blockhash)
.unwrap_or(null_hash),
null_hash
);
assert_eq!(
headers
.last()
.map(|h| h.bitcoin_hash())
.unwrap_or(null_hash),
latest_blockhash
);
let mut result = HeaderList::empty();
let entries = result.order(headers);
result.apply(entries);
result
}
struct Indexer<'a> {
headers: Vec<&'a HeaderEntry>,
headers: &'a [HeaderEntry],
header_index: usize,
daemon: &'a Daemon,
@ -231,16 +256,7 @@ struct Indexer<'a> {
}
impl<'a> Indexer<'a> {
fn new(
headers: Vec<&'a HeaderEntry>,
daemon: &'a Daemon,
use_progress_bar: bool,
) -> Indexer<'a> {
let bar = if use_progress_bar {
Some(ProgressBar::on(stderr(), headers.len() as u64))
} else {
None
};
fn new(headers: &'a [HeaderEntry], daemon: &'a Daemon) -> Indexer<'a> {
Indexer {
headers: headers,
header_index: 0,
@ -250,7 +266,11 @@ impl<'a> Indexer<'a> {
blocks_size: 0,
rows_size: 0,
num_of_rows: 0,
bar: bar,
bar: if headers.len() > 1 {
Some(ProgressBar::on(stderr(), headers.len() as u64))
} else {
None
},
}
}
@ -267,7 +287,7 @@ impl<'a> Iterator for Indexer<'a> {
self.bar.as_mut().map(|b| b.finish());
return None;
}
let &entry = &self.headers[self.header_index];
let entry = &self.headers[self.header_index];
let blockhash = entry.hash();
let block = self.daemon.getblock(&blockhash).unwrap();
@ -337,39 +357,38 @@ impl<'a> Iterator for Batching<'a> {
pub struct Index {
// TODO: store also a &HeaderMap.
// TODO: store also latest snapshot.
headers: RwLock<Arc<HeaderList>>,
headers: RwLock<HeaderList>,
}
impl Index {
pub fn new() -> Index {
pub fn load(store: &ReadStore) -> Index {
Index {
headers: RwLock::new(Arc::new(HeaderList::empty())),
headers: RwLock::new(read_indexed_headers(store)),
}
}
pub fn headers_list(&self) -> Arc<HeaderList> {
self.headers.read().unwrap().clone()
pub fn best_header(&self) -> Option<HeaderEntry> {
let headers = self.headers.read().unwrap();
headers.header_by_blockhash(headers.tip()).cloned()
}
pub fn get_header(&self, height: usize) -> Option<HeaderEntry> {
let entry = self.headers.read().unwrap().headers().get(height)?.clone();
assert_eq!(entry.height(), height);
Some(entry)
}
pub fn update(&self, store: &DBStore, daemon: &Daemon) -> Result<Sha256dHash> {
let mut indexed_headers: Arc<HeaderList> = self.headers_list();
let no_indexed_headers = indexed_headers.headers().is_empty();
if no_indexed_headers {
indexed_headers = Arc::new(HeaderList::build(
read_indexed_headers(store),
read_last_indexed_blockhash(store),
));
}
let latest_headers = daemon.get_latest_headers(&*indexed_headers)?;
for rows in Batching::new(Indexer::new(
latest_headers.get_missing_headers(&indexed_headers.as_map()),
&daemon,
/*use_progress_bar=*/ no_indexed_headers,
)) {
let tip = daemon.getbestblockhash()?;
let new_headers = daemon.get_new_headers(&self.headers.read().unwrap(), &tip)?;
new_headers.last().map(|tip| {
info!("{:?} ({} left to index)", tip, new_headers.len());
});
for rows in Batching::new(Indexer::new(&new_headers, &daemon)) {
store.write(rows);
}
let tip = latest_headers.tip();
*(self.headers.write().unwrap()) = Arc::new(latest_headers);
self.headers.write().unwrap().apply(new_headers);
assert_eq!(tip, *self.headers.read().unwrap().tip());
Ok(tip)
}
}

Loading…
Cancel
Save