Browse Source

Integrate bulk parser into main server binary

refactor-mempool
Roman Zeyde 6 years ago
parent
commit
70743c9a10
No known key found for this signature in database GPG Key ID: 87CAE5FA46917CBB
  1. 9
      examples/bench_parse.rs
  2. 25
      src/bin/electrs.rs
  3. 15
      src/store.rs

9
examples/bench_parse.rs

@ -25,14 +25,7 @@ fn run(config: Config) -> Result<()> {
let store = DBStore::open("./test-db", StoreOptions { bulk_import: true }); let store = DBStore::open("./test-db", StoreOptions { bulk_import: true });
let chan = Parser::new(&daemon, &store, &metrics)?.start(); let chan = Parser::new(&daemon, &store, &metrics)?.start();
for rows in chan.iter() { store.load(chan, &signal)
if let Some(sig) = signal.poll() {
bail!("indexing interrupted by SIG{:?}", sig);
}
store.write(rows?);
}
store.compact_if_needed();
Ok(())
} }
fn main() { fn main() {

25
src/bin/electrs.rs

@ -1,6 +1,6 @@
extern crate electrs; extern crate electrs;
extern crate error_chain;
extern crate error_chain;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
@ -8,6 +8,7 @@ use error_chain::ChainedError;
use std::time::Duration; use std::time::Duration;
use electrs::{app::App, use electrs::{app::App,
bulk::Parser,
config::Config, config::Config,
daemon::Daemon, daemon::Daemon,
errors::*, errors::*,
@ -21,28 +22,30 @@ use electrs::{app::App,
fn run_server(config: &Config) -> Result<()> { fn run_server(config: &Config) -> Result<()> {
let signal = Waiter::new(); let signal = Waiter::new();
let metrics = Metrics::new(config.monitoring_addr); let metrics = Metrics::new(config.monitoring_addr);
metrics.start();
let daemon = Daemon::new(config.network_type, &metrics)?; let daemon = Daemon::new(config.network_type, &metrics)?;
let store = DBStore::open(&config.db_path, StoreOptions { bulk_import: true }); let store = DBStore::open(&config.db_path, StoreOptions { bulk_import: true });
let index = Index::load(&store, &daemon, &metrics)?; let parser = Parser::new(&daemon, &store, &metrics)?;
store.bulk_load(parser.start(), &signal)?;
metrics.start();
let mut tip = index.update(&store, &signal)?;
store.compact_if_needed();
drop(store); // bulk import is over
let daemon = daemon.reconnect()?;
let store = DBStore::open(&config.db_path, StoreOptions { bulk_import: false }); let store = DBStore::open(&config.db_path, StoreOptions { bulk_import: false });
let app = App::new(store, index, daemon.reconnect()?); let index = Index::load(&store, &daemon, &metrics)?;
let app = App::new(store, index, daemon);
let query = Query::new(app.clone(), &metrics); let query = Query::new(app.clone(), &metrics);
query.update_mempool()?; // poll once before starting RPC server let mut tip = *query.get_best_header()?.hash();
let rpc = RPC::start(config.rpc_addr, query.clone(), &metrics); let rpc = RPC::start(config.rpc_addr, query.clone(), &metrics);
while let None = signal.wait(Duration::from_secs(5)) { loop {
query.update_mempool()?; query.update_mempool()?;
if tip != app.daemon().getbestblockhash()? { if tip != app.daemon().getbestblockhash()? {
tip = app.index().update(app.write_store(), &signal)?; tip = app.index().update(app.write_store(), &signal)?;
} }
rpc.notify(); rpc.notify();
if signal.wait(Duration::from_secs(5)).is_some() {
break;
}
} }
rpc.exit(); rpc.exit();
Ok(()) Ok(())

15
src/store.rs

@ -1,7 +1,11 @@
use rocksdb; use rocksdb;
use std::sync::mpsc::Receiver;
use signal::Waiter;
use util::Bytes; use util::Bytes;
use errors::*;
pub struct Row { pub struct Row {
pub key: Bytes, pub key: Bytes,
pub value: Bytes, pub value: Bytes,
@ -58,15 +62,22 @@ impl DBStore {
} }
} }
pub fn compact_if_needed(&self) { pub fn bulk_load(self, rows: Receiver<Result<Vec<Vec<Row>>>>, signal: &Waiter) -> Result<()> {
let key = b"F"; // full compaction marker let key = b"F"; // full compaction marker
if self.get(key).is_some() { if self.get(key).is_some() {
return; return Ok(());
}
for rows in rows.iter() {
if let Some(sig) = signal.poll() {
bail!("indexing interrupted by SIG{:?}", sig);
}
self.write(rows?);
} }
info!("starting full compaction"); info!("starting full compaction");
self.db.compact_range(None, None); // should take a while self.db.compact_range(None, None); // should take a while
self.db.put(key, b"").unwrap(); self.db.put(key, b"").unwrap();
info!("finished full compaction"); info!("finished full compaction");
Ok(())
} }
} }

Loading…
Cancel
Save