Browse Source

Move indexing logic into index.rs

refactor-mempool
Roman Zeyde 7 years ago
parent
commit
a770804e19
No known key found for this signature in database GPG Key ID: 87CAE5FA46917CBB
  1. 84
      src/daemon.rs
  2. 137
      src/index.rs
  3. 137
      src/main.rs

84
src/daemon.rs

@ -14,46 +14,60 @@ const HEADER_SIZE: usize = 80;
type HeaderMap = HashMap<String, BlockHeader>; type HeaderMap = HashMap<String, BlockHeader>;
fn get(resource: &str) -> reqwest::Response { pub struct Daemon {
let url = format!("http://localhost:8332/rest/{}", resource); url: String,
reqwest::get(&url).unwrap()
} }
pub fn get_bin(resource: &str) -> Bytes { impl Daemon {
let mut buf = Bytes::new(); pub fn new(url: &str) -> Daemon {
let mut resp = get(resource); Daemon {
resp.copy_to(&mut buf).unwrap(); url: url.to_string(),
buf
}
pub fn get_headers() -> (HeaderMap, String) {
let mut headers = HashMap::new();
let mut blockhash =
String::from("000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"); // genesis
loop {
let data = get_bin(&format!("headers/2000/{}.bin", blockhash));
let num_of_headers = data.len() / HEADER_SIZE;
let mut decoder = RawDecoder::new(Cursor::new(data));
for _ in 0..num_of_headers {
let header: BlockHeader = ConsensusDecodable::consensus_decode(&mut decoder).unwrap();
blockhash = header.bitcoin_hash().be_hex_string();
headers.insert(blockhash.to_string(), header);
} }
if num_of_headers == 1 { }
break;
fn request(&self, resource: &str) -> reqwest::Response {
let url = format!("{}/rest/{}", self.url, resource);
reqwest::get(&url).unwrap()
}
pub fn get(&self, resource: &str) -> Bytes {
let mut buf = Bytes::new();
let mut resp = self.request(resource);
resp.copy_to(&mut buf).unwrap();
buf
}
fn get_headers(&self) -> (HeaderMap, String) {
let mut headers = HashMap::new();
let mut blockhash =
String::from("000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"); // genesis
loop {
let data = self.get(&format!("headers/2000/{}.bin", blockhash));
let num_of_headers = data.len() / HEADER_SIZE;
let mut decoder = RawDecoder::new(Cursor::new(data));
for _ in 0..num_of_headers {
let header: BlockHeader =
ConsensusDecodable::consensus_decode(&mut decoder).unwrap();
blockhash = header.bitcoin_hash().be_hex_string();
headers.insert(blockhash.to_string(), header);
}
if num_of_headers == 1 {
break;
}
} }
(headers, blockhash)
} }
(headers, blockhash)
}
pub fn enumerate_headers(headers: &HeaderMap, bestblockhash: &str) -> Vec<(usize, String)> { pub fn enumerate_headers(&self) -> Vec<(usize, String)> {
let null_hash = Sha256dHash::default().be_hex_string(); let (headers, mut blockhash) = self.get_headers();
let mut hashes = VecDeque::<String>::new(); let mut hashes = VecDeque::<String>::new();
let mut blockhash = bestblockhash.to_string();
while blockhash != null_hash { let null_hash = Sha256dHash::default().be_hex_string();
let header: &BlockHeader = headers.get(&blockhash).unwrap(); while blockhash != null_hash {
hashes.push_front(blockhash); let header: &BlockHeader = headers.get(&blockhash).unwrap();
blockhash = header.prev_blockhash.be_hex_string(); hashes.push_front(blockhash);
blockhash = header.prev_blockhash.be_hex_string();
}
enumerate(hashes).collect()
} }
enumerate(hashes).collect()
} }

137
src/index.rs

@ -0,0 +1,137 @@
use bitcoin::blockdata::block::Block;
use bitcoin::network::serialize::BitcoinHash;
use bitcoin::network::serialize::{deserialize, serialize};
use bitcoin::util::hash::Sha256dHash;
use byteorder::{LittleEndian, WriteBytesExt};
use crypto::digest::Digest;
use crypto::sha2::Sha256;
use daemon::Daemon;
use store::{Row, Store};
use timer::Timer;
use pbr;
use Bytes;
const HASH_LEN: usize = 8;
fn index_block(block: &Block, height: usize) -> Vec<Row> {
let null_hash = Sha256dHash::default();
let mut rows = Vec::new();
for tx in &block.txdata {
let txid: Sha256dHash = tx.txid();
for input in &tx.input {
if input.prev_hash == null_hash {
continue;
}
let mut key = Vec::<u8>::new(); // ???
key.push(b'I');
key.extend_from_slice(&input.prev_hash[..HASH_LEN]);
key.write_u16::<LittleEndian>(input.prev_index as u16)
.unwrap();
rows.push(Row {
key: key,
value: txid[..HASH_LEN].to_vec(),
});
}
for output in &tx.output {
let mut script_hash = [0u8; 32];
let mut sha2 = Sha256::new();
sha2.input(&output.script_pubkey[..]);
sha2.result(&mut script_hash);
let mut key = Vec::<u8>::new(); // ???
key.push(b'O');
key.extend_from_slice(&script_hash);
key.extend_from_slice(&txid[..HASH_LEN]);
rows.push(Row {
key: key,
value: vec![],
});
}
// Persist transaction ID and confirmed height
{
let mut key = Vec::<u8>::new();
key.push(b'T');
key.extend_from_slice(&txid[..]);
let mut value = Vec::<u8>::new();
value.write_u32::<LittleEndian>(height as u32).unwrap();
rows.push(Row {
key: key,
value: value,
})
}
}
// Persist block hash and header
{
let mut key = Vec::<u8>::new();
key.push(b'B');
key.extend_from_slice(&block.bitcoin_hash()[..]);
rows.push(Row {
key: key,
value: serialize(&block.header).unwrap(),
})
}
rows
}
fn get_missing_hashes(store: &mut Store, daemon: &Daemon) -> Vec<(usize, String)> {
let indexed_headers = store.read_headers();
let mut hashes: Vec<(usize, String)> = daemon.enumerate_headers();
info!(
"got {} headers (indexed {})",
hashes.len(),
indexed_headers.len(),
);
hashes.retain(|item| !indexed_headers.contains_key(&item.1));
hashes
}
pub fn update(store: &mut Store, daemon: &Daemon) {
let hashes = get_missing_hashes(store, daemon);
if hashes.is_empty() {
return;
}
let mut timer = Timer::new();
let mut blocks_size = 0usize;
let mut rows_size = 0usize;
let mut num_of_rows = 0usize;
let mut pb = pbr::ProgressBar::new(hashes.len() as u64);
for (height, blockhash) in hashes {
timer.start("get");
let buf: Bytes = daemon.get(&format!("block/{}.bin", &blockhash));
timer.start("parse");
let block: Block = deserialize(&buf).unwrap();
assert_eq!(&block.bitcoin_hash().be_hex_string(), &blockhash);
timer.start("index");
let rows = index_block(&block, height);
for row in &rows {
rows_size += row.key.len() + row.value.len();
}
num_of_rows += rows.len();
timer.start("store");
store.persist(rows);
timer.stop();
blocks_size += buf.len();
pb.inc();
debug!(
"{} @ {}: {:.3}/{:.3} MB, {} rows, {}",
blockhash,
height,
rows_size as f64 / 1e6_f64,
blocks_size as f64 / 1e6_f64,
num_of_rows,
timer.stats()
);
}
store.flush();
pb.finish();
}

137
src/main.rs

@ -12,147 +12,20 @@ extern crate zmq;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
mod index;
mod daemon; mod daemon;
mod store; mod store;
mod timer; mod timer;
mod waiter; mod waiter;
use bitcoin::blockdata::block::Block; use store::{Store, StoreOptions};
use bitcoin::network::serialize::BitcoinHash;
use bitcoin::network::serialize::{deserialize, serialize};
use bitcoin::util::hash::Sha256dHash;
use byteorder::{LittleEndian, WriteBytesExt};
use crypto::digest::Digest;
use crypto::sha2::Sha256;
use store::{Row, Store, StoreOptions};
use timer::Timer;
const HASH_LEN: usize = 8;
type Bytes = Vec<u8>; type Bytes = Vec<u8>;
fn index_block(block: &Block, height: usize) -> Vec<Row> {
let null_hash = Sha256dHash::default();
let mut rows = Vec::new();
for tx in &block.txdata {
let txid: Sha256dHash = tx.txid();
for input in &tx.input {
if input.prev_hash == null_hash {
continue;
}
let mut key = Vec::<u8>::new(); // ???
key.push(b'I');
key.extend_from_slice(&input.prev_hash[..HASH_LEN]);
key.write_u16::<LittleEndian>(input.prev_index as u16)
.unwrap();
rows.push(Row {
key: key,
value: txid[..HASH_LEN].to_vec(),
});
}
for output in &tx.output {
let mut script_hash = [0u8; 32];
let mut sha2 = Sha256::new();
sha2.input(&output.script_pubkey[..]);
sha2.result(&mut script_hash);
let mut key = Vec::<u8>::new(); // ???
key.push(b'O');
key.extend_from_slice(&script_hash);
key.extend_from_slice(&txid[..HASH_LEN]);
rows.push(Row {
key: key,
value: vec![],
});
}
// Persist transaction ID and confirmed height
{
let mut key = Vec::<u8>::new();
key.push(b'T');
key.extend_from_slice(&txid[..]);
let mut value = Vec::<u8>::new();
value.write_u32::<LittleEndian>(height as u32).unwrap();
rows.push(Row {
key: key,
value: value,
})
}
}
// Persist block hash and header
{
let mut key = Vec::<u8>::new();
key.push(b'B');
key.extend_from_slice(&block.bitcoin_hash()[..]);
rows.push(Row {
key: key,
value: serialize(&block.header).unwrap(),
})
}
rows
}
fn index_blocks(store: &mut Store) {
let indexed_headers = store.read_headers();
let (headers, blockhash) = daemon::get_headers();
let best_block_header = headers.get(&blockhash).unwrap();
info!(
"got {} headers (indexed {}), best {} @ {}",
headers.len(),
indexed_headers.len(),
best_block_header.bitcoin_hash().be_hex_string(),
time::at_utc(time::Timespec::new(best_block_header.time as i64, 0)).rfc3339(),
);
let mut hashes: Vec<(usize, String)> = daemon::enumerate_headers(&headers, &blockhash);
hashes.retain(|item| !indexed_headers.contains_key(&item.1));
if hashes.is_empty() {
return;
}
let mut timer = Timer::new();
let mut blocks_size = 0usize;
let mut rows_size = 0usize;
let mut num_of_rows = 0usize;
let mut pb = pbr::ProgressBar::new(hashes.len() as u64);
for (height, blockhash) in hashes {
timer.start("get");
let buf: Bytes = daemon::get_bin(&format!("block/{}.bin", &blockhash));
timer.start("parse");
let block: Block = deserialize(&buf).unwrap();
assert_eq!(&block.bitcoin_hash().be_hex_string(), &blockhash);
timer.start("index");
let rows = index_block(&block, height);
for row in &rows {
rows_size += row.key.len() + row.value.len();
}
num_of_rows += rows.len();
timer.start("store");
store.persist(rows);
timer.stop();
blocks_size += buf.len();
pb.inc();
debug!(
"{} @ {}: {:.3}/{:.3} MB, {} rows, {}",
blockhash,
height,
rows_size as f64 / 1e6_f64,
blocks_size as f64 / 1e6_f64,
num_of_rows,
timer.stats()
);
}
store.flush();
pb.finish();
}
fn main() { fn main() {
simple_logger::init_with_level(log::Level::Info).unwrap(); simple_logger::init_with_level(log::Level::Info).unwrap();
let waiter = waiter::Waiter::new("tcp://localhost:28332"); let waiter = waiter::Waiter::new("tcp://localhost:28332");
let daemon = daemon::Daemon::new("http://localhost:8332");
{ {
let mut store = Store::open( let mut store = Store::open(
"db/mainnet", "db/mainnet",
@ -160,7 +33,7 @@ fn main() {
auto_compact: false, auto_compact: false,
}, },
); );
index_blocks(&mut store); index::update(&mut store, &daemon);
store.compact_if_needed(); store.compact_if_needed();
} }
@ -169,6 +42,6 @@ fn main() {
if store.has_block(&waiter.wait()) { if store.has_block(&waiter.wait()) {
continue; continue;
} }
index_blocks(&mut store); index::update(&mut store, &daemon);
} }
} }

Loading…
Cancel
Save