Browse Source

Use std::thread instead of crossbeam

refactor-mempool
Roman Zeyde 7 years ago
parent
commit
fd03723842
No known key found for this signature in database GPG Key ID: 87CAE5FA46917CBB
  1. 1
      Cargo.toml
  2. 1
      TODO.txt
  3. 17
      src/app.rs
  4. 1
      src/lib.rs
  5. 35
      src/rpc.rs

1
Cargo.toml

@ -9,7 +9,6 @@ arrayref = "0.3"
base64 = "0.9" base64 = "0.9"
bincode = "1.0" bincode = "1.0"
bitcoin = "0.13" bitcoin = "0.13"
crossbeam = "0.3"
error-chain = "0.11" error-chain = "0.11"
hex = "0.3" hex = "0.3"
futures = "0.2" futures = "0.2"

1
TODO.txt

@ -1,7 +1,6 @@
= Electrum = Electrum
Update height to -1 for txns with any unconfirmed input (https://electrumx.readthedocs.io/en/latest/protocol-basics.html#status) Update height to -1 for txns with any unconfirmed input (https://electrumx.readthedocs.io/en/latest/protocol-basics.html#status)
Snapshot DB after successful indexing - and run queries on this snapshot Snapshot DB after successful indexing - and run queries on this snapshot
Use futures for electrum RPC serving and cancellation
Handle multiple RPC client in parallel Handle multiple RPC client in parallel
Figure out graceful shutting down and error logging for RPC server Figure out graceful shutting down and error logging for RPC server

17
src/app.rs

@ -1,6 +1,5 @@
use argparse::{ArgumentParser, StoreTrue}; use argparse::{ArgumentParser, StoreTrue};
use bitcoin::util::hash::Sha256dHash; use bitcoin::util::hash::Sha256dHash;
use crossbeam;
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
@ -98,15 +97,13 @@ fn run_server(config: &Config) {
}); });
let query = Arc::new(query::Query::new(app.clone())); let query = Arc::new(query::Query::new(app.clone()));
crossbeam::scope(|scope| { let poll_delay = Duration::from_secs(5);
let poll_delay = Duration::from_secs(5); rpc::start(&config.rpc_addr, query.clone());
scope.spawn(|| rpc::serve(&config.rpc_addr, query.clone())); loop {
loop { thread::sleep(poll_delay);
thread::sleep(poll_delay); query.update_mempool().unwrap();
query.update_mempool().unwrap(); tip = app.update_index(tip);
tip = app.update_index(tip); }
}
});
} }
fn setup_logging(config: &Config) { fn setup_logging(config: &Config) {

1
src/lib.rs

@ -4,7 +4,6 @@ extern crate argparse;
extern crate base64; extern crate base64;
extern crate bincode; extern crate bincode;
extern crate bitcoin; extern crate bitcoin;
extern crate crossbeam;
extern crate crypto; extern crate crypto;
extern crate futures; extern crate futures;
extern crate hex; extern crate hex;

35
src/rpc.rs

@ -2,14 +2,15 @@ use bitcoin::blockdata::block::BlockHeader;
use bitcoin::blockdata::transaction::Transaction; use bitcoin::blockdata::transaction::Transaction;
use bitcoin::network::serialize::{deserialize, serialize}; use bitcoin::network::serialize::{deserialize, serialize};
use bitcoin::util::hash::Sha256dHash; use bitcoin::util::hash::Sha256dHash;
use crossbeam; use error_chain::ChainedError;
use hex; use hex;
use serde_json::{from_str, Number, Value}; use serde_json::{from_str, Number, Value};
use std::collections::HashMap; use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write}; use std::io::{BufRead, BufReader, Write};
use std::net::{SocketAddr, TcpListener, TcpStream}; use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::Arc; use std::sync::Arc;
use std::sync::mpsc::{sync_channel, Receiver, RecvTimeoutError, SyncSender}; use std::sync::mpsc::{sync_channel, Receiver, RecvTimeoutError, SyncSender};
use std::thread;
use std::time::Duration; use std::time::Duration;
use query::Query; use query::Query;
@ -294,12 +295,20 @@ impl Connection {
pub fn run(mut self) { pub fn run(mut self) {
let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream")); let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
crossbeam::scope(|scope| { let chan = Channel::new();
let chan = Channel::new(); let tx = chan.sender();
let tx = chan.sender(); let child = thread::spawn(|| Connection::handle_requests(reader, tx));
scope.spawn(|| Connection::handle_requests(reader, tx)); if let Err(e) = self.handle_replies(&chan) {
self.handle_replies(&chan).unwrap(); error!(
}); "[{}] connection handling failed: {}",
self.addr,
e.display_chain().to_string()
);
}
let _ = self.stream.shutdown(Shutdown::Both);
if child.join().is_err() {
error!("[{}] receiver panicked", self.addr);
}
} }
} }
@ -329,13 +338,13 @@ impl Channel {
} }
} }
pub fn serve(addr: &SocketAddr, query: Arc<Query>) { pub fn start(addr: &SocketAddr, query: Arc<Query>) -> thread::JoinHandle<()> {
let listener = TcpListener::bind(addr).unwrap(); let listener = TcpListener::bind(addr).expect(&format!("bind({}) failed", addr));
info!("RPC server running on {}", addr); info!("RPC server running on {}", addr);
loop { thread::spawn(move || loop {
let (stream, addr) = listener.accept().unwrap(); let (stream, addr) = listener.accept().expect("accept failed");
info!("[{}] connected peer", addr); info!("[{}] connected peer", addr);
Connection::new(query.clone(), stream, addr).run(); Connection::new(query.clone(), stream, addr).run();
info!("[{}] disconnected peer", addr); info!("[{}] disconnected peer", addr);
} })
} }

Loading…
Cancel
Save