From fd037238421f609a27348aed6c512682690ebd54 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Fri, 25 May 2018 09:53:32 +0300 Subject: [PATCH] Use std::thread instead of crossbeam --- Cargo.toml | 1 - TODO.txt | 1 - src/app.rs | 17 +++++++---------- src/lib.rs | 1 - src/rpc.rs | 35 ++++++++++++++++++++++------------- 5 files changed, 29 insertions(+), 26 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4fec862..c8b71dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,6 @@ arrayref = "0.3" base64 = "0.9" bincode = "1.0" bitcoin = "0.13" -crossbeam = "0.3" error-chain = "0.11" hex = "0.3" futures = "0.2" diff --git a/TODO.txt b/TODO.txt index 0057196..df3b894 100644 --- a/TODO.txt +++ b/TODO.txt @@ -1,7 +1,6 @@ = Electrum Update height to -1 for txns with any unconfirmed input (https://electrumx.readthedocs.io/en/latest/protocol-basics.html#status) Snapshot DB after successful indexing - and run queries on this snapshot -Use futures for electrum RPC serving and cancellation Handle multiple RPC client in parallel Figure out graceful shutting down and error logging for RPC server diff --git a/src/app.rs b/src/app.rs index fb300f1..4c1e9f7 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,6 +1,5 @@ use argparse::{ArgumentParser, StoreTrue}; use bitcoin::util::hash::Sha256dHash; -use crossbeam; use std::fs::OpenOptions; use std::net::SocketAddr; use std::sync::Arc; @@ -98,15 +97,13 @@ fn run_server(config: &Config) { }); let query = Arc::new(query::Query::new(app.clone())); - crossbeam::scope(|scope| { - let poll_delay = Duration::from_secs(5); - scope.spawn(|| rpc::serve(&config.rpc_addr, query.clone())); - loop { - thread::sleep(poll_delay); - query.update_mempool().unwrap(); - tip = app.update_index(tip); - } - }); + let poll_delay = Duration::from_secs(5); + rpc::start(&config.rpc_addr, query.clone()); + loop { + thread::sleep(poll_delay); + query.update_mempool().unwrap(); + tip = app.update_index(tip); + } } fn setup_logging(config: &Config) { diff --git a/src/lib.rs b/src/lib.rs index dfa3e3c..c9cb6ee 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,7 +4,6 @@ extern crate argparse; extern crate base64; extern crate bincode; extern crate bitcoin; -extern crate crossbeam; extern crate crypto; extern crate futures; extern crate hex; diff --git a/src/rpc.rs b/src/rpc.rs index e995181..1f46712 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -2,14 +2,15 @@ use bitcoin::blockdata::block::BlockHeader; use bitcoin::blockdata::transaction::Transaction; use bitcoin::network::serialize::{deserialize, serialize}; use bitcoin::util::hash::Sha256dHash; -use crossbeam; +use error_chain::ChainedError; use hex; use serde_json::{from_str, Number, Value}; use std::collections::HashMap; use std::io::{BufRead, BufReader, Write}; -use std::net::{SocketAddr, TcpListener, TcpStream}; +use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; use std::sync::Arc; use std::sync::mpsc::{sync_channel, Receiver, RecvTimeoutError, SyncSender}; +use std::thread; use std::time::Duration; use query::Query; @@ -294,12 +295,20 @@ impl Connection { pub fn run(mut self) { let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream")); - crossbeam::scope(|scope| { - let chan = Channel::new(); - let tx = chan.sender(); - scope.spawn(|| Connection::handle_requests(reader, tx)); - self.handle_replies(&chan).unwrap(); - }); + let chan = Channel::new(); + let tx = chan.sender(); + let child = thread::spawn(|| Connection::handle_requests(reader, tx)); + if let Err(e) = self.handle_replies(&chan) { + error!( + "[{}] connection handling failed: {}", + self.addr, + e.display_chain().to_string() + ); + } + let _ = self.stream.shutdown(Shutdown::Both); + if child.join().is_err() { + error!("[{}] receiver panicked", self.addr); + } } } @@ -329,13 +338,13 @@ impl Channel { } } -pub fn serve(addr: &SocketAddr, query: Arc) { - let listener = TcpListener::bind(addr).unwrap(); +pub fn start(addr: &SocketAddr, query: Arc) -> thread::JoinHandle<()> { + let listener = TcpListener::bind(addr).expect(&format!("bind({}) failed", addr)); info!("RPC server running on {}", addr); - loop { - let (stream, addr) = listener.accept().unwrap(); + thread::spawn(move || loop { + let (stream, addr) = listener.accept().expect("accept failed"); info!("[{}] connected peer", addr); Connection::new(query.clone(), stream, addr).run(); info!("[{}] disconnected peer", addr); - } + }) }