From 91e4dd8b6fbee8252856127639d6885199e6e25d Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Fri, 27 Jul 2018 23:46:54 +0300 Subject: [PATCH 1/7] Fail with ErrorKind::Connection (instead of a generic error) --- src/daemon.rs | 14 +++++++++----- src/errors.rs | 7 +++++++ 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index 62212ed..475a105 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -132,9 +132,14 @@ struct Connection { addr: SocketAddr, } +fn tcp_connect(addr: SocketAddr) -> Result { + TcpStream::connect(addr) + .chain_err(|| ErrorKind::Connection(format!("failed to connect to {}", addr))) +} + impl Connection { fn new(addr: SocketAddr, cookie_b64: String) -> Result { - let conn = TcpStream::connect(addr).chain_err(|| format!("failed to connect to {}", addr))?; + let conn = tcp_connect(addr)?; let reader = BufReader::new(conn.try_clone() .chain_err(|| format!("failed to clone {:?}", conn))?); Ok(Connection { @@ -146,8 +151,7 @@ impl Connection { } pub fn reconnect(&self) -> Result { - let conn = TcpStream::connect(self.addr) - .chain_err(|| format!("failed to connect to {}", self.addr))?; + let conn = tcp_connect(self.addr)?; let reader = BufReader::new(conn.try_clone() .chain_err(|| format!("failed to clone {:?}", conn))?); Ok(Connection { @@ -176,7 +180,7 @@ impl Connection { let mut contents: Option = None; let iter = self.rx.by_ref(); let status = iter.next() - .chain_err(|| "disconnected from daemon")? + .chain_err(|| ErrorKind::Connection("disconnection from daemon".to_owned()))? .chain_err(|| "failed to read status")?; if status != "HTTP/1.1 200 OK" { bail!("request failed: {}", status); @@ -190,7 +194,7 @@ impl Connection { break; } } - contents.chain_err(|| "no reply") + contents.chain_err(|| "no reply from daemon") } } diff --git a/src/errors.rs b/src/errors.rs index 154c1e6..7fda165 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -2,4 +2,11 @@ error_chain!{ types { Error, ErrorKind, ResultExt, Result; } + + errors { + Connection(msg: String) { + description("Connection error") + display("Connection error: {}", msg) + } + } } From 1d59449677aefc4e88de67a7b682ebda8c8acf9e Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Sat, 28 Jul 2018 11:29:19 +0300 Subject: [PATCH 2/7] Abstract cookie handling to a trait object This would allow re-loading JSONRPC cookie after bitcoind restart --- examples/index.rs | 2 +- examples/load.rs | 2 +- src/bin/electrs.rs | 2 +- src/config.rs | 59 +++++++++++++++++++++++++++++++++++----------- src/daemon.rs | 21 ++++++++++------- 5 files changed, 61 insertions(+), 25 deletions(-) diff --git a/examples/index.rs b/examples/index.rs index 374ceca..804573e 100644 --- a/examples/index.rs +++ b/examples/index.rs @@ -19,7 +19,7 @@ fn run() -> Result<()> { let daemon = Daemon::new( &config.daemon_dir, config.daemon_rpc_addr, - &config.cookie, + config.cookie_getter(), config.network_type, &metrics, )?; diff --git a/examples/load.rs b/examples/load.rs index a3b08af..8bb50d8 100644 --- a/examples/load.rs +++ b/examples/load.rs @@ -21,7 +21,7 @@ fn run(config: Config) -> Result<()> { let daemon = Daemon::new( &config.daemon_dir, config.daemon_rpc_addr, - &config.cookie, + config.cookie_getter(), config.network_type, &metrics, )?; diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index ed61fa0..5f887c5 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -20,7 +20,7 @@ fn run_server(config: &Config) -> Result<()> { let daemon = Daemon::new( &config.daemon_dir, config.daemon_rpc_addr, - &config.cookie, + config.cookie_getter(), config.network_type, &metrics, )?; diff --git a/src/config.rs b/src/config.rs index 89f5155..ce3e448 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,21 +3,13 @@ use std::env::home_dir; use std::fs; use std::net::SocketAddr; use std::path::{Path, PathBuf}; +use std::sync::Arc; use stderrlog; -use daemon::Network; +use daemon::{CookieGetter, Network}; use errors::*; -fn read_cookie(daemon_dir: &Path) -> Result { - let mut path = daemon_dir.to_path_buf(); - path.push(".cookie"); - let contents = String::from_utf8( - fs::read(&path).chain_err(|| format!("failed to read cookie from {:?}", path))? - ).chain_err(|| "invalid cookie string")?; - Ok(contents.trim().to_owned()) -} - #[derive(Debug)] pub struct Config { pub log: stderrlog::StdErrLog, @@ -25,7 +17,7 @@ pub struct Config { pub db_path: PathBuf, // RocksDB directory path pub daemon_dir: PathBuf, // Bitcoind data directory pub daemon_rpc_addr: SocketAddr, // for connecting Bitcoind JSONRPC - pub cookie: String, // for bitcoind JSONRPC authentication ("USER:PASSWORD") + pub cookie: Option, // for bitcoind JSONRPC authentication ("USER:PASSWORD") pub electrum_rpc_addr: SocketAddr, // for serving Electrum clients pub monitoring_addr: SocketAddr, // for Prometheus monitoring pub skip_bulk_import: bool, // slower initial indexing, for low-memory systems @@ -141,9 +133,7 @@ impl Config { Network::Testnet => daemon_dir.push("testnet3"), Network::Regtest => daemon_dir.push("regtest"), } - let cookie = m.value_of("cookie") - .map(|s| s.to_owned()) - .unwrap_or_else(|| read_cookie(&daemon_dir).unwrap()); + let cookie = m.value_of("cookie").map(|s| s.to_owned()); let mut log = stderrlog::new(); log.verbosity(m.occurrences_of("verbosity") as usize); @@ -167,4 +157,45 @@ impl Config { eprintln!("{:?}", config); config } + + pub fn cookie_getter(&self) -> Arc { + if let Some(ref value) = self.cookie { + Arc::new(StaticCookie { + value: value.clone(), + }) + } else { + Arc::new(CookieFile { + daemon_dir: self.daemon_dir.clone(), + }) + } + } +} + +struct StaticCookie { + value: String, +} + +impl CookieGetter for StaticCookie { + fn get(&self) -> String { + self.value.clone() + } +} + +struct CookieFile { + daemon_dir: PathBuf, +} + +impl CookieGetter for CookieFile { + fn get(&self) -> String { + read_cookie(&self.daemon_dir).unwrap() + } +} + +fn read_cookie(daemon_dir: &Path) -> Result { + let mut path = daemon_dir.to_path_buf(); + path.push(".cookie"); + let contents = String::from_utf8( + fs::read(&path).chain_err(|| format!("failed to read cookie from {:?}", path))? + ).chain_err(|| "invalid cookie string")?; + Ok(contents.trim().to_owned()) } diff --git a/src/daemon.rs b/src/daemon.rs index 475a105..140706a 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -11,7 +11,7 @@ use std::collections::HashSet; use std::io::{BufRead, BufReader, Lines, Write}; use std::net::{SocketAddr, TcpStream}; use std::path::PathBuf; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use metrics::{HistogramOpts, HistogramVec, Metrics}; use util::HeaderList; @@ -125,10 +125,14 @@ impl MempoolEntry { } } +pub trait CookieGetter: Send + Sync { + fn get(&self) -> String; +} + struct Connection { tx: TcpStream, rx: Lines>, - cookie_b64: String, + cookie_getter: Arc, addr: SocketAddr, } @@ -138,14 +142,14 @@ fn tcp_connect(addr: SocketAddr) -> Result { } impl Connection { - fn new(addr: SocketAddr, cookie_b64: String) -> Result { + fn new(addr: SocketAddr, cookie_getter: Arc) -> Result { let conn = tcp_connect(addr)?; let reader = BufReader::new(conn.try_clone() .chain_err(|| format!("failed to clone {:?}", conn))?); Ok(Connection { tx: conn, rx: reader.lines(), - cookie_b64, + cookie_getter, addr, }) } @@ -157,15 +161,16 @@ impl Connection { Ok(Connection { tx: conn, rx: reader.lines(), - cookie_b64: self.cookie_b64.clone(), + cookie_getter: self.cookie_getter.clone(), addr: self.addr, }) } fn send(&mut self, request: &str) -> Result<()> { + let cookie_b64 = base64::encode(&self.cookie_getter.get()); let msg = format!( "POST / HTTP/1.1\nAuthorization: Basic {}\nContent-Length: {}\n\n{}", - self.cookie_b64, + cookie_b64, request.len(), request, ); @@ -231,14 +236,14 @@ impl Daemon { pub fn new( daemon_dir: &PathBuf, daemon_rpc_addr: SocketAddr, - cookie: &str, + cookie_getter: Arc, network: Network, metrics: &Metrics, ) -> Result { let daemon = Daemon { daemon_dir: daemon_dir.clone(), network, - conn: Mutex::new(Connection::new(daemon_rpc_addr, base64::encode(cookie))?), + conn: Mutex::new(Connection::new(daemon_rpc_addr, cookie_getter)?), message_id: Counter::new(), latency: metrics.histogram_vec( HistogramOpts::new("daemon_rpc", "Bitcoind RPC latency (in seconds)"), From defe9f13af52b5dbe374bb354ed9e62a8144c151 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Sat, 28 Jul 2018 11:37:15 +0300 Subject: [PATCH 3/7] Measure RPC latency after connection lock is taken --- src/daemon.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/daemon.rs b/src/daemon.rs index 140706a..6c3171a 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -292,8 +292,8 @@ impl Daemon { } fn call_jsonrpc(&self, method: &str, request: &Value) -> Result { - let timer = self.latency.with_label_values(&[method]).start_timer(); let mut conn = self.conn.lock().unwrap(); + let timer = self.latency.with_label_values(&[method]).start_timer(); let request = request.to_string(); conn.send(&request)?; self.size From 06f5a099a41c3a7fb539b55ab0ceb408ca608cbf Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Sat, 28 Jul 2018 11:57:20 +0300 Subject: [PATCH 4/7] Simplify Daemon::reconnect() --- src/daemon.rs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index 6c3171a..216717a 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -155,15 +155,7 @@ impl Connection { } pub fn reconnect(&self) -> Result { - let conn = tcp_connect(self.addr)?; - let reader = BufReader::new(conn.try_clone() - .chain_err(|| format!("failed to clone {:?}", conn))?); - Ok(Connection { - tx: conn, - rx: reader.lines(), - cookie_getter: self.cookie_getter.clone(), - addr: self.addr, - }) + Connection::new(self.addr, self.cookie_getter.clone()) } fn send(&mut self, request: &str) -> Result<()> { From 6efab9c8a20808c2a8f275bb1075ca299c3ff78d Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Sat, 28 Jul 2018 14:56:41 +0300 Subject: [PATCH 5/7] Reconnect after TCP disconnections and HTTP errors --- src/daemon.rs | 50 +++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index 216717a..38557c3 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -12,6 +12,8 @@ use std::io::{BufRead, BufReader, Lines, Write}; use std::net::{SocketAddr, TcpStream}; use std::path::PathBuf; use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::Duration; use metrics::{HistogramOpts, HistogramVec, Metrics}; use util::HeaderList; @@ -137,8 +139,16 @@ struct Connection { } fn tcp_connect(addr: SocketAddr) -> Result { - TcpStream::connect(addr) - .chain_err(|| ErrorKind::Connection(format!("failed to connect to {}", addr))) + loop { + match TcpStream::connect(addr) { + Ok(conn) => return Ok(conn), + Err(err) => { + warn!("failed to connect daemon at {}: {}", addr, err); + thread::sleep(Duration::from_secs(3)); + continue; + } + } + } } impl Connection { @@ -166,9 +176,9 @@ impl Connection { request.len(), request, ); - self.tx - .write_all(msg.as_bytes()) - .chain_err(|| "failed to send request") + self.tx.write_all(msg.as_bytes()).chain_err(|| { + ErrorKind::Connection("disconnected from daemon while sending".to_owned()) + }) } fn recv(&mut self) -> Result { @@ -177,13 +187,16 @@ impl Connection { let mut contents: Option = None; let iter = self.rx.by_ref(); let status = iter.next() - .chain_err(|| ErrorKind::Connection("disconnection from daemon".to_owned()))? + .chain_err(|| { + ErrorKind::Connection("disconnected from daemon while receiving".to_owned()) + })? .chain_err(|| "failed to read status")?; if status != "HTTP/1.1 200 OK" { - bail!("request failed: {}", status); + let msg = format!("request failed {:?}", status); + bail!(ErrorKind::Connection(msg)); } for line in iter { - let line = line.chain_err(|| "failed to read")?; + let line = line.chain_err(|| ErrorKind::Connection("failed to read".to_owned()))?; if line.is_empty() { in_header = false; // next line should contain the actual response. } else if !in_header { @@ -191,7 +204,7 @@ impl Connection { break; } } - contents.chain_err(|| "no reply from daemon") + contents.chain_err(|| ErrorKind::Connection("no reply from daemon".to_owned())) } } @@ -300,10 +313,25 @@ impl Daemon { Ok(result) } + fn retry_call_jsonrpc(&self, method: &str, request: &Value) -> Result { + loop { + match self.call_jsonrpc(method, request) { + Err(Error(ErrorKind::Connection(msg), _)) => { + warn!("connection failed: {}", msg); + thread::sleep(Duration::from_secs(3)); + let mut conn = self.conn.lock().unwrap(); + *conn = conn.reconnect()?; + continue; + } + result => return result, + } + } + } + fn request(&self, method: &str, params: Value) -> Result { let id = self.message_id.next(); let req = json!({"method": method, "params": params, "id": id}); - let reply = self.call_jsonrpc(method, &req) + let reply = self.retry_call_jsonrpc(method, &req) .chain_err(|| format!("RPC failed: {}", req))?; parse_jsonrpc_reply(reply, method, id) } @@ -315,7 +343,7 @@ impl Daemon { .map(|params| json!({"method": method, "params": params, "id": id})) .collect(); let mut results = vec![]; - let mut replies = self.call_jsonrpc(method, &reqs) + let mut replies = self.retry_call_jsonrpc(method, &reqs) .chain_err(|| format!("RPC failed: {}", reqs))?; if let Some(replies_vec) = replies.as_array_mut() { for reply in replies_vec { From 6bb837177c3c4a6cd9c088eff9cfc44bec2ae1e5 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Sat, 28 Jul 2018 17:50:59 +0300 Subject: [PATCH 6/7] Handle missing cookie file as connection failure --- src/config.rs | 9 +++++---- src/daemon.rs | 6 +++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/config.rs b/src/config.rs index ce3e448..cd4a2d3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -176,8 +176,8 @@ struct StaticCookie { } impl CookieGetter for StaticCookie { - fn get(&self) -> String { - self.value.clone() + fn get(&self) -> Result { + Ok(self.value.clone()) } } @@ -186,8 +186,9 @@ struct CookieFile { } impl CookieGetter for CookieFile { - fn get(&self) -> String { - read_cookie(&self.daemon_dir).unwrap() + fn get(&self) -> Result { + read_cookie(&self.daemon_dir) + .chain_err(|| ErrorKind::Connection("no cookie found".to_owned())) } } diff --git a/src/daemon.rs b/src/daemon.rs index 38557c3..6156d76 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -128,7 +128,7 @@ impl MempoolEntry { } pub trait CookieGetter: Send + Sync { - fn get(&self) -> String; + fn get(&self) -> Result; } struct Connection { @@ -169,10 +169,10 @@ impl Connection { } fn send(&mut self, request: &str) -> Result<()> { - let cookie_b64 = base64::encode(&self.cookie_getter.get()); + let cookie = &self.cookie_getter.get()?; let msg = format!( "POST / HTTP/1.1\nAuthorization: Basic {}\nContent-Length: {}\n\n{}", - cookie_b64, + base64::encode(cookie), request.len(), request, ); From 9ee92c823f5e1d5f5c64b3fbe858566b2fde1d83 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Sat, 28 Jul 2018 18:12:13 +0300 Subject: [PATCH 7/7] Read cookie as Vec (instead of String) --- src/config.rs | 23 ++++++++--------------- src/daemon.rs | 2 +- 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/src/config.rs b/src/config.rs index cd4a2d3..e8e634a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -161,7 +161,7 @@ impl Config { pub fn cookie_getter(&self) -> Arc { if let Some(ref value) = self.cookie { Arc::new(StaticCookie { - value: value.clone(), + value: value.as_bytes().to_vec(), }) } else { Arc::new(CookieFile { @@ -172,11 +172,11 @@ impl Config { } struct StaticCookie { - value: String, + value: Vec, } impl CookieGetter for StaticCookie { - fn get(&self) -> Result { + fn get(&self) -> Result> { Ok(self.value.clone()) } } @@ -186,17 +186,10 @@ struct CookieFile { } impl CookieGetter for CookieFile { - fn get(&self) -> Result { - read_cookie(&self.daemon_dir) - .chain_err(|| ErrorKind::Connection("no cookie found".to_owned())) + fn get(&self) -> Result> { + let path = self.daemon_dir.join(".cookie"); + let contents = fs::read(&path) + .chain_err(|| ErrorKind::Connection(format!("failed to read cookie from {:?}", path)))?; + Ok(contents) } } - -fn read_cookie(daemon_dir: &Path) -> Result { - let mut path = daemon_dir.to_path_buf(); - path.push(".cookie"); - let contents = String::from_utf8( - fs::read(&path).chain_err(|| format!("failed to read cookie from {:?}", path))? - ).chain_err(|| "invalid cookie string")?; - Ok(contents.trim().to_owned()) -} diff --git a/src/daemon.rs b/src/daemon.rs index 6156d76..0dad298 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -128,7 +128,7 @@ impl MempoolEntry { } pub trait CookieGetter: Send + Sync { - fn get(&self) -> Result; + fn get(&self) -> Result>; } struct Connection {