Browse Source

Allow indexing cancellation via SIGINT

refactor-mempool
Roman Zeyde 7 years ago
parent
commit
70a5085469
No known key found for this signature in database GPG Key ID: 87CAE5FA46917CBB
  1. 25
      src/app.rs
  2. 3
      src/bin/bench_index.rs
  3. 12
      src/bin/main.rs
  4. 11
      src/index.rs
  5. 1
      src/lib.rs
  6. 31
      src/signal.rs
  7. 8
      src/store.rs

25
src/app.rs

@ -1,7 +1,4 @@
use chan;
use chan_signal;
use std::sync::Arc;
use std::time::Duration;
use {daemon, index, store};
@ -33,25 +30,3 @@ impl App {
&self.daemon
}
}
pub struct Waiter {
signal: chan::Receiver<chan_signal::Signal>,
duration: Duration,
}
impl Waiter {
pub fn new(duration: Duration) -> Waiter {
let signal = chan_signal::notify(&[chan_signal::Signal::INT]);
Waiter { signal, duration }
}
pub fn wait(&self) -> Option<chan_signal::Signal> {
let signal = &self.signal;
let timeout = chan::after(self.duration);
let result;
chan_select! {
signal.recv() -> sig => { result = sig; },
timeout.recv() => { result = None; },
}
result
}
}

3
src/bin/bench_index.rs

@ -5,6 +5,7 @@ use electrs::{config::Config,
daemon::Daemon,
errors::*,
index::Index,
signal::Waiter,
store::{ReadStore, Row, WriteStore},
util::Bytes};
use error_chain::ChainedError;
@ -30,7 +31,7 @@ fn run() -> Result<()> {
let daemon = Daemon::new(config.network_type)?;
let fake_store = FakeStore {};
let index = Index::load(&fake_store);
index.update(&fake_store, &daemon)?;
index.update(&fake_store, &daemon, &Waiter::new())?;
Ok(())
}

12
src/bin/main.rs

@ -5,16 +5,18 @@ use error_chain::ChainedError;
use std::thread;
use std::time::Duration;
use electrs::{app::{App, Waiter},
use electrs::{app::App,
config::Config,
daemon::Daemon,
errors::*,
index::Index,
query::Query,
rpc::RPC,
signal::Waiter,
store::{DBStore, StoreOptions}};
fn run_server(config: &Config) -> Result<()> {
let signal = Waiter::new();
let daemon = Daemon::new(config.network_type)?;
let store = DBStore::open(
&config.db_path,
@ -24,7 +26,7 @@ fn run_server(config: &Config) -> Result<()> {
},
);
let index = Index::load(&store);
let mut tip = index.update(&store, &daemon)?;
let mut tip = index.update(&store, &daemon, &signal)?;
store.compact_if_needed();
drop(store); // to be re-opened soon
@ -33,11 +35,11 @@ fn run_server(config: &Config) -> Result<()> {
let query = Query::new(app.clone());
let rpc = RPC::start(config.rpc_addr, query.clone());
let signal = Waiter::new(Duration::from_secs(5));
while let None = signal.wait() {
while let None = signal.wait(Duration::from_secs(5)) {
query.update_mempool()?;
if tip != app.daemon().getbestblockhash()? {
tip = app.index().update(app.write_store(), app.daemon())?;
tip = app.index()
.update(app.write_store(), app.daemon(), &signal)?;
}
rpc.notify();
}

11
src/index.rs

@ -12,6 +12,7 @@ use std::sync::RwLock;
use std::time::{Duration, Instant};
use daemon::Daemon;
use signal::Waiter;
use store::{ReadStore, Row, WriteStore};
use util::{self, full_hash, hash_prefix, Bytes, FullHash, HashPrefix, HeaderEntry, HeaderList,
HeaderMap, Timer, HASH_PREFIX_LEN};
@ -300,7 +301,12 @@ impl Index {
.cloned()
}
pub fn update(&self, store: &WriteStore, daemon: &Daemon) -> Result<Sha256dHash> {
pub fn update(
&self,
store: &WriteStore,
daemon: &Daemon,
waiter: &Waiter,
) -> Result<Sha256dHash> {
let tip = daemon.getbestblockhash()?;
let new_headers: Vec<HeaderEntry> = {
let indexed_headers = self.headers.read().unwrap();
@ -318,6 +324,9 @@ impl Index {
let headers_map: HashMap<Sha256dHash, &HeaderEntry> =
HashMap::from_iter(new_headers.iter().map(|h| (*h.hash(), h)));
for chunk in new_headers.chunks(100) {
if let Some(sig) = waiter.poll() {
bail!("indexing interrupted by {:?}", sig);
}
// Download new blocks
let hashes: Vec<Sha256dHash> = chunk.into_iter().map(|h| *h.hash()).collect();
let batch = daemon.getblocks(&hashes)?;

1
src/lib.rs

@ -34,5 +34,6 @@ pub mod index;
pub mod mempool;
pub mod query;
pub mod rpc;
pub mod signal;
pub mod store;
pub mod util;

31
src/signal.rs

@ -0,0 +1,31 @@
use chan;
use chan_signal;
use std::time::Duration;
pub struct Waiter {
signal: chan::Receiver<chan_signal::Signal>,
}
impl Waiter {
pub fn new() -> Waiter {
Waiter {
signal: chan_signal::notify(&[chan_signal::Signal::INT]),
}
}
pub fn wait(&self, duration: Duration) -> Option<chan_signal::Signal> {
let signal = &self.signal;
let timeout = chan::after(duration);
let result;
chan_select! {
signal.recv() -> sig => {
result = sig;
},
timeout.recv() => { result = None; },
}
result.map(|sig| info!("received SIG{:?}", sig));
result
}
pub fn poll(&self) -> Option<chan_signal::Signal> {
self.wait(Duration::from_secs(0))
}
}

8
src/store.rs

@ -25,6 +25,7 @@ pub trait WriteStore: Sync {
pub struct DBStore {
db: rocksdb::DB,
path: String,
}
#[derive(Debug)]
@ -48,6 +49,7 @@ impl DBStore {
block_opts.set_block_size(256 << 10);
DBStore {
db: rocksdb::DB::open(&db_opts, &path).unwrap(),
path: path.to_owned(),
}
}
@ -107,3 +109,9 @@ impl WriteStore for DBStore {
.unwrap();
}
}
impl Drop for DBStore {
fn drop(&mut self) {
debug!("closing {}", self.path);
}
}

Loading…
Cancel
Save