Browse Source

Merge branch 'disconnect'

refactor-mempool
Roman Zeyde 7 years ago
parent
commit
2c167c6a7d
No known key found for this signature in database GPG Key ID: 87CAE5FA46917CBB
  1. 2
      examples/index.rs
  2. 2
      examples/load.rs
  3. 2
      src/bin/electrs.rs
  4. 53
      src/config.rs
  5. 85
      src/daemon.rs
  6. 7
      src/errors.rs

2
examples/index.rs

@ -19,7 +19,7 @@ fn run() -> Result<()> {
let daemon = Daemon::new( let daemon = Daemon::new(
&config.daemon_dir, &config.daemon_dir,
config.daemon_rpc_addr, config.daemon_rpc_addr,
&config.cookie, config.cookie_getter(),
config.network_type, config.network_type,
&metrics, &metrics,
)?; )?;

2
examples/load.rs

@ -21,7 +21,7 @@ fn run(config: Config) -> Result<()> {
let daemon = Daemon::new( let daemon = Daemon::new(
&config.daemon_dir, &config.daemon_dir,
config.daemon_rpc_addr, config.daemon_rpc_addr,
&config.cookie, config.cookie_getter(),
config.network_type, config.network_type,
&metrics, &metrics,
)?; )?;

2
src/bin/electrs.rs

@ -20,7 +20,7 @@ fn run_server(config: &Config) -> Result<()> {
let daemon = Daemon::new( let daemon = Daemon::new(
&config.daemon_dir, &config.daemon_dir,
config.daemon_rpc_addr, config.daemon_rpc_addr,
&config.cookie, config.cookie_getter(),
config.network_type, config.network_type,
&metrics, &metrics,
)?; )?;

53
src/config.rs

@ -3,21 +3,13 @@ use std::env::home_dir;
use std::fs; use std::fs;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc;
use stderrlog; use stderrlog;
use daemon::Network; use daemon::{CookieGetter, Network};
use errors::*; use errors::*;
fn read_cookie(daemon_dir: &Path) -> Result<String> {
let mut path = daemon_dir.to_path_buf();
path.push(".cookie");
let contents = String::from_utf8(
fs::read(&path).chain_err(|| format!("failed to read cookie from {:?}", path))?
).chain_err(|| "invalid cookie string")?;
Ok(contents.trim().to_owned())
}
#[derive(Debug)] #[derive(Debug)]
pub struct Config { pub struct Config {
pub log: stderrlog::StdErrLog, pub log: stderrlog::StdErrLog,
@ -25,7 +17,7 @@ pub struct Config {
pub db_path: PathBuf, // RocksDB directory path pub db_path: PathBuf, // RocksDB directory path
pub daemon_dir: PathBuf, // Bitcoind data directory pub daemon_dir: PathBuf, // Bitcoind data directory
pub daemon_rpc_addr: SocketAddr, // for connecting Bitcoind JSONRPC pub daemon_rpc_addr: SocketAddr, // for connecting Bitcoind JSONRPC
pub cookie: String, // for bitcoind JSONRPC authentication ("USER:PASSWORD") pub cookie: Option<String>, // for bitcoind JSONRPC authentication ("USER:PASSWORD")
pub electrum_rpc_addr: SocketAddr, // for serving Electrum clients pub electrum_rpc_addr: SocketAddr, // for serving Electrum clients
pub monitoring_addr: SocketAddr, // for Prometheus monitoring pub monitoring_addr: SocketAddr, // for Prometheus monitoring
pub skip_bulk_import: bool, // slower initial indexing, for low-memory systems pub skip_bulk_import: bool, // slower initial indexing, for low-memory systems
@ -141,9 +133,7 @@ impl Config {
Network::Testnet => daemon_dir.push("testnet3"), Network::Testnet => daemon_dir.push("testnet3"),
Network::Regtest => daemon_dir.push("regtest"), Network::Regtest => daemon_dir.push("regtest"),
} }
let cookie = m.value_of("cookie") let cookie = m.value_of("cookie").map(|s| s.to_owned());
.map(|s| s.to_owned())
.unwrap_or_else(|| read_cookie(&daemon_dir).unwrap());
let mut log = stderrlog::new(); let mut log = stderrlog::new();
log.verbosity(m.occurrences_of("verbosity") as usize); log.verbosity(m.occurrences_of("verbosity") as usize);
@ -167,4 +157,39 @@ impl Config {
eprintln!("{:?}", config); eprintln!("{:?}", config);
config config
} }
pub fn cookie_getter(&self) -> Arc<CookieGetter> {
if let Some(ref value) = self.cookie {
Arc::new(StaticCookie {
value: value.as_bytes().to_vec(),
})
} else {
Arc::new(CookieFile {
daemon_dir: self.daemon_dir.clone(),
})
}
}
}
struct StaticCookie {
value: Vec<u8>,
}
impl CookieGetter for StaticCookie {
fn get(&self) -> Result<Vec<u8>> {
Ok(self.value.clone())
}
}
struct CookieFile {
daemon_dir: PathBuf,
}
impl CookieGetter for CookieFile {
fn get(&self) -> Result<Vec<u8>> {
let path = self.daemon_dir.join(".cookie");
let contents = fs::read(&path)
.chain_err(|| ErrorKind::Connection(format!("failed to read cookie from {:?}", path)))?;
Ok(contents)
}
} }

85
src/daemon.rs

@ -11,7 +11,9 @@ use std::collections::HashSet;
use std::io::{BufRead, BufReader, Lines, Write}; use std::io::{BufRead, BufReader, Lines, Write};
use std::net::{SocketAddr, TcpStream}; use std::net::{SocketAddr, TcpStream};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Mutex; use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use metrics::{HistogramOpts, HistogramVec, Metrics}; use metrics::{HistogramOpts, HistogramVec, Metrics};
use util::HeaderList; use util::HeaderList;
@ -125,49 +127,58 @@ impl MempoolEntry {
} }
} }
pub trait CookieGetter: Send + Sync {
fn get(&self) -> Result<Vec<u8>>;
}
struct Connection { struct Connection {
tx: TcpStream, tx: TcpStream,
rx: Lines<BufReader<TcpStream>>, rx: Lines<BufReader<TcpStream>>,
cookie_b64: String, cookie_getter: Arc<CookieGetter>,
addr: SocketAddr, addr: SocketAddr,
} }
fn tcp_connect(addr: SocketAddr) -> Result<TcpStream> {
loop {
match TcpStream::connect(addr) {
Ok(conn) => return Ok(conn),
Err(err) => {
warn!("failed to connect daemon at {}: {}", addr, err);
thread::sleep(Duration::from_secs(3));
continue;
}
}
}
}
impl Connection { impl Connection {
fn new(addr: SocketAddr, cookie_b64: String) -> Result<Connection> { fn new(addr: SocketAddr, cookie_getter: Arc<CookieGetter>) -> Result<Connection> {
let conn = TcpStream::connect(addr).chain_err(|| format!("failed to connect to {}", addr))?; let conn = tcp_connect(addr)?;
let reader = BufReader::new(conn.try_clone() let reader = BufReader::new(conn.try_clone()
.chain_err(|| format!("failed to clone {:?}", conn))?); .chain_err(|| format!("failed to clone {:?}", conn))?);
Ok(Connection { Ok(Connection {
tx: conn, tx: conn,
rx: reader.lines(), rx: reader.lines(),
cookie_b64, cookie_getter,
addr, addr,
}) })
} }
pub fn reconnect(&self) -> Result<Connection> { pub fn reconnect(&self) -> Result<Connection> {
let conn = TcpStream::connect(self.addr) Connection::new(self.addr, self.cookie_getter.clone())
.chain_err(|| format!("failed to connect to {}", self.addr))?;
let reader = BufReader::new(conn.try_clone()
.chain_err(|| format!("failed to clone {:?}", conn))?);
Ok(Connection {
tx: conn,
rx: reader.lines(),
cookie_b64: self.cookie_b64.clone(),
addr: self.addr,
})
} }
fn send(&mut self, request: &str) -> Result<()> { fn send(&mut self, request: &str) -> Result<()> {
let cookie = &self.cookie_getter.get()?;
let msg = format!( let msg = format!(
"POST / HTTP/1.1\nAuthorization: Basic {}\nContent-Length: {}\n\n{}", "POST / HTTP/1.1\nAuthorization: Basic {}\nContent-Length: {}\n\n{}",
self.cookie_b64, base64::encode(cookie),
request.len(), request.len(),
request, request,
); );
self.tx self.tx.write_all(msg.as_bytes()).chain_err(|| {
.write_all(msg.as_bytes()) ErrorKind::Connection("disconnected from daemon while sending".to_owned())
.chain_err(|| "failed to send request") })
} }
fn recv(&mut self) -> Result<String> { fn recv(&mut self) -> Result<String> {
@ -176,13 +187,16 @@ impl Connection {
let mut contents: Option<String> = None; let mut contents: Option<String> = None;
let iter = self.rx.by_ref(); let iter = self.rx.by_ref();
let status = iter.next() let status = iter.next()
.chain_err(|| "disconnected from daemon")? .chain_err(|| {
ErrorKind::Connection("disconnected from daemon while receiving".to_owned())
})?
.chain_err(|| "failed to read status")?; .chain_err(|| "failed to read status")?;
if status != "HTTP/1.1 200 OK" { if status != "HTTP/1.1 200 OK" {
bail!("request failed: {}", status); let msg = format!("request failed {:?}", status);
bail!(ErrorKind::Connection(msg));
} }
for line in iter { for line in iter {
let line = line.chain_err(|| "failed to read")?; let line = line.chain_err(|| ErrorKind::Connection("failed to read".to_owned()))?;
if line.is_empty() { if line.is_empty() {
in_header = false; // next line should contain the actual response. in_header = false; // next line should contain the actual response.
} else if !in_header { } else if !in_header {
@ -190,7 +204,7 @@ impl Connection {
break; break;
} }
} }
contents.chain_err(|| "no reply") contents.chain_err(|| ErrorKind::Connection("no reply from daemon".to_owned()))
} }
} }
@ -227,14 +241,14 @@ impl Daemon {
pub fn new( pub fn new(
daemon_dir: &PathBuf, daemon_dir: &PathBuf,
daemon_rpc_addr: SocketAddr, daemon_rpc_addr: SocketAddr,
cookie: &str, cookie_getter: Arc<CookieGetter>,
network: Network, network: Network,
metrics: &Metrics, metrics: &Metrics,
) -> Result<Daemon> { ) -> Result<Daemon> {
let daemon = Daemon { let daemon = Daemon {
daemon_dir: daemon_dir.clone(), daemon_dir: daemon_dir.clone(),
network, network,
conn: Mutex::new(Connection::new(daemon_rpc_addr, base64::encode(cookie))?), conn: Mutex::new(Connection::new(daemon_rpc_addr, cookie_getter)?),
message_id: Counter::new(), message_id: Counter::new(),
latency: metrics.histogram_vec( latency: metrics.histogram_vec(
HistogramOpts::new("daemon_rpc", "Bitcoind RPC latency (in seconds)"), HistogramOpts::new("daemon_rpc", "Bitcoind RPC latency (in seconds)"),
@ -283,8 +297,8 @@ impl Daemon {
} }
fn call_jsonrpc(&self, method: &str, request: &Value) -> Result<Value> { fn call_jsonrpc(&self, method: &str, request: &Value) -> Result<Value> {
let timer = self.latency.with_label_values(&[method]).start_timer();
let mut conn = self.conn.lock().unwrap(); let mut conn = self.conn.lock().unwrap();
let timer = self.latency.with_label_values(&[method]).start_timer();
let request = request.to_string(); let request = request.to_string();
conn.send(&request)?; conn.send(&request)?;
self.size self.size
@ -299,10 +313,25 @@ impl Daemon {
Ok(result) Ok(result)
} }
fn retry_call_jsonrpc(&self, method: &str, request: &Value) -> Result<Value> {
loop {
match self.call_jsonrpc(method, request) {
Err(Error(ErrorKind::Connection(msg), _)) => {
warn!("connection failed: {}", msg);
thread::sleep(Duration::from_secs(3));
let mut conn = self.conn.lock().unwrap();
*conn = conn.reconnect()?;
continue;
}
result => return result,
}
}
}
fn request(&self, method: &str, params: Value) -> Result<Value> { fn request(&self, method: &str, params: Value) -> Result<Value> {
let id = self.message_id.next(); let id = self.message_id.next();
let req = json!({"method": method, "params": params, "id": id}); let req = json!({"method": method, "params": params, "id": id});
let reply = self.call_jsonrpc(method, &req) let reply = self.retry_call_jsonrpc(method, &req)
.chain_err(|| format!("RPC failed: {}", req))?; .chain_err(|| format!("RPC failed: {}", req))?;
parse_jsonrpc_reply(reply, method, id) parse_jsonrpc_reply(reply, method, id)
} }
@ -314,7 +343,7 @@ impl Daemon {
.map(|params| json!({"method": method, "params": params, "id": id})) .map(|params| json!({"method": method, "params": params, "id": id}))
.collect(); .collect();
let mut results = vec![]; let mut results = vec![];
let mut replies = self.call_jsonrpc(method, &reqs) let mut replies = self.retry_call_jsonrpc(method, &reqs)
.chain_err(|| format!("RPC failed: {}", reqs))?; .chain_err(|| format!("RPC failed: {}", reqs))?;
if let Some(replies_vec) = replies.as_array_mut() { if let Some(replies_vec) = replies.as_array_mut() {
for reply in replies_vec { for reply in replies_vec {

7
src/errors.rs

@ -2,4 +2,11 @@ error_chain!{
types { types {
Error, ErrorKind, ResultExt, Result; Error, ErrorKind, ResultExt, Result;
} }
errors {
Connection(msg: String) {
description("Connection error")
display("Connection error: {}", msg)
}
}
} }

Loading…
Cancel
Save