Browse Source

Update stable Rust to 1.28

Fix formatting warnings.
refactor-mempool
Roman Zeyde 6 years ago
parent
commit
001f7a7ba5
No known key found for this signature in database GPG Key ID: 87CAE5FA46917CBB
  1. 2
      .travis.yml
  2. 9
      src/bulk.rs
  3. 12
      src/config.rs
  4. 35
      src/daemon.rs
  5. 6
      src/index.rs
  6. 6
      src/mempool.rs
  7. 15
      src/query.rs
  8. 18
      src/rpc.rs

2
.travis.yml

@ -20,4 +20,4 @@ jobs:
before_script:
- rustup component add rustfmt-preview
script:
- cargo fmt --all -- --write-mode=check
- cargo fmt --all -- --check

9
src/bulk.rs

@ -9,7 +9,8 @@ use std::fs;
use std::io::{Cursor, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use std::sync::{
mpsc::{Receiver, SyncSender}, Arc, Mutex,
mpsc::{Receiver, SyncSender},
Arc, Mutex,
};
use std::thread;
@ -61,7 +62,8 @@ impl Parser {
fn last_indexed_row(&self) -> Row {
let indexed_blockhashes = self.indexed_blockhashes.lock().unwrap();
let last_header = self.current_headers
let last_header = self
.current_headers
.iter()
.take_while(|h| indexed_blockhashes.contains(h.hash()))
.last()
@ -88,7 +90,8 @@ impl Parser {
for block in blocks {
let blockhash = block.bitcoin_hash();
if let Some(header) = self.current_headers.header_by_blockhash(&blockhash) {
if self.indexed_blockhashes
if self
.indexed_blockhashes
.lock()
.expect("indexed_blockhashes")
.insert(blockhash.clone())

12
src/config.rs

@ -108,20 +108,24 @@ impl Config {
Network::Regtest => 60401,
};
let daemon_rpc_addr: SocketAddr = m.value_of("daemon_rpc_addr")
let daemon_rpc_addr: SocketAddr = m
.value_of("daemon_rpc_addr")
.unwrap_or(&format!("127.0.0.1:{}", default_daemon_port))
.parse()
.expect("invalid Bitcoind RPC address");
let electrum_rpc_addr: SocketAddr = m.value_of("electrum_rpc_addr")
let electrum_rpc_addr: SocketAddr = m
.value_of("electrum_rpc_addr")
.unwrap_or(&format!("127.0.0.1:{}", default_electrum_port))
.parse()
.expect("invalid Electrum RPC address");
let monitoring_addr: SocketAddr = m.value_of("monitoring_addr")
let monitoring_addr: SocketAddr = m
.value_of("monitoring_addr")
.unwrap_or("127.0.0.1:42024")
.parse()
.expect("invalid Prometheus monitoring address");
let mut daemon_dir = m.value_of("daemon_dir")
let mut daemon_dir = m
.value_of("daemon_dir")
.map(|p| PathBuf::from(p))
.unwrap_or_else(|| {
let mut default_dir = home_dir().expect("no homedir");

35
src/daemon.rs

@ -28,12 +28,11 @@ pub enum Network {
}
fn parse_hash(value: &Value) -> Result<Sha256dHash> {
Ok(Sha256dHash::from_hex(value
.as_str()
.chain_err(|| format!("non-string value: {}", value))?)
.chain_err(|| {
format!("non-hex value: {}", value)
})?)
Ok(Sha256dHash::from_hex(
value
.as_str()
.chain_err(|| format!("non-string value: {}", value))?,
).chain_err(|| format!("non-hex value: {}", value))?)
}
fn header_from_value(value: Value) -> Result<BlockHeader> {
@ -160,8 +159,10 @@ impl Connection {
signal: Waiter,
) -> Result<Connection> {
let conn = tcp_connect(addr, &signal)?;
let reader = BufReader::new(conn.try_clone()
.chain_err(|| format!("failed to clone {:?}", conn))?);
let reader = BufReader::new(
conn.try_clone()
.chain_err(|| format!("failed to clone {:?}", conn))?,
);
Ok(Connection {
tx: conn,
rx: reader.lines(),
@ -193,7 +194,8 @@ impl Connection {
let mut in_header = true;
let mut contents: Option<String> = None;
let iter = self.rx.by_ref();
let status = iter.next()
let status = iter
.next()
.chain_err(|| {
ErrorKind::Connection("disconnected from daemon while receiving".to_owned())
})?
@ -362,7 +364,8 @@ impl Daemon {
fn request(&self, method: &str, params: Value) -> Result<Value> {
let id = self.message_id.next();
let req = json!({"method": method, "params": params, "id": id});
let reply = self.retry_call_jsonrpc(method, &req)
let reply = self
.retry_call_jsonrpc(method, &req)
.chain_err(|| format!("RPC failed: {}", req))?;
parse_jsonrpc_reply(reply, method, id)
}
@ -374,7 +377,8 @@ impl Daemon {
.map(|params| json!({"method": method, "params": params, "id": id}))
.collect();
let mut results = vec![];
let mut replies = self.retry_call_jsonrpc(method, &reqs)
let mut replies = self
.retry_call_jsonrpc(method, &reqs)
.chain_err(|| format!("RPC failed: {}", reqs))?;
if let Some(replies_vec) = replies.as_array_mut() {
for reply in replies_vec {
@ -410,7 +414,8 @@ impl Daemon {
pub fn getblockheaders(&self, heights: &[usize]) -> Result<Vec<BlockHeader>> {
let heights: Vec<Value> = heights.iter().map(|height| json!([height])).collect();
let params_list: Vec<Value> = self.requests("getblockhash", &heights)?
let params_list: Vec<Value> = self
.requests("getblockhash", &heights)?
.into_iter()
.map(|hash| json!([hash, /*verbose=*/ false]))
.collect();
@ -507,7 +512,8 @@ impl Daemon {
fn get_all_headers(&self, tip: &Sha256dHash) -> Result<Vec<BlockHeader>> {
let info: Value = self.request("getblockheader", json!([tip.be_hex_string()]))?;
let tip_height = info.get("height")
let tip_height = info
.get("height")
.expect("missing height")
.as_u64()
.expect("non-numeric height") as usize;
@ -553,7 +559,8 @@ impl Daemon {
if indexed_headers.header_by_blockhash(&blockhash).is_some() {
break;
}
let header = self.getblockheader(&blockhash)
let header = self
.getblockheader(&blockhash)
.chain_err(|| format!("failed to get {} header", blockhash))?;
new_headers.push(header);
blockhash = header.prev_blockhash;

6
src/index.rs

@ -233,7 +233,8 @@ fn read_indexed_headers(store: &ReadStore) -> HeaderList {
let null_hash = Sha256dHash::default();
let mut blockhash = latest_blockhash;
while blockhash != null_hash {
let header = map.remove(&blockhash)
let header = map
.remove(&blockhash)
.expect(&format!("missing {} header in DB", blockhash));
blockhash = header.prev_blockhash;
headers.push(header);
@ -360,7 +361,8 @@ impl Index {
loop {
waiter.poll()?;
let timer = self.stats.start_timer("fetch");
let batch = chan.receiver()
let batch = chan
.receiver()
.recv()
.expect("block fetch exited prematurely")?;
timer.observe_duration();

6
src/mempool.rs

@ -45,7 +45,8 @@ impl MempoolStore {
for row in rows {
let (key, value) = row.into_pair();
let no_values_left = {
let values = map.get_mut(&key)
let values = map
.get_mut(&key)
.expect(&format!("missing key {} in mempool", hex::encode(&key)));
let last_value = values
.pop()
@ -250,7 +251,8 @@ impl Tracker {
}
fn remove(&mut self, txid: &Sha256dHash) {
let stats = self.items
let stats = self
.items
.remove(txid)
.expect(&format!("missing mempool tx {}", txid));
self.index.remove(&stats.tx);

15
src/query.rs

@ -202,7 +202,8 @@ impl Query {
for txid_prefix in prefixes {
for tx_row in txrows_by_prefix(store, &txid_prefix) {
let txid: Sha256dHash = deserialize(&tx_row.key.txid).unwrap();
let txn = self.tx_cache
let txn = self
.tx_cache
.get_or_else(&txid, || self.load_txn(&txid, Some(tx_row.height)))?;
txns.push(TxnHeight {
txn,
@ -305,9 +306,11 @@ impl Query {
}
pub fn status(&self, script_hash: &[u8]) -> Result<Status> {
let confirmed = self.confirmed_status(script_hash)
let confirmed = self
.confirmed_status(script_hash)
.chain_err(|| "failed to get confirmed status")?;
let mempool = self.mempool_status(script_hash, &confirmed.0)
let mempool = self
.mempool_status(script_hash, &confirmed.0)
.chain_err(|| "failed to get mempool status")?;
Ok(Status { confirmed, mempool })
}
@ -329,7 +332,8 @@ impl Query {
.height
}
};
let blockhash = *self.app
let blockhash = *self
.app
.index()
.get_header(height as usize)
.chain_err(|| format!("missing header at height {}", height))?
@ -355,7 +359,8 @@ impl Query {
tx_hash: &Sha256dHash,
height: usize,
) -> Result<(Vec<Sha256dHash>, usize)> {
let header_entry = self.app
let header_entry = self
.app
.index()
.get_header(height)
.chain_err(|| format!("missing block #{}", height))?;

18
src/rpc.rs

@ -124,7 +124,8 @@ impl Connection {
let start_height = usize_from_value(params.get(0), "start_height")?;
let count = usize_from_value(params.get(1), "count")?;
let heights: Vec<usize> = (start_height..(start_height + count)).collect();
let headers: Vec<String> = self.query
let headers: Vec<String> = self
.query
.get_headers(&heights)
.into_iter()
.map(|entry| hex::encode(&serialize(entry.header()).unwrap()))
@ -219,7 +220,8 @@ impl Connection {
fn blockchain_transaction_get_merkle(&self, params: &[Value]) -> Result<Value> {
let tx_hash = hash_from_value(params.get(0)).chain_err(|| "bad tx_hash")?;
let height = usize_from_value(params.get(1), "height")?;
let (merkle, pos) = self.query
let (merkle, pos) = self
.query
.get_merkle_proof(&tx_hash, height)
.chain_err(|| "cannot create merkle proof")?;
let merkle: Vec<String> = merkle
@ -233,7 +235,8 @@ impl Connection {
}
fn handle_command(&mut self, method: &str, params: &[Value], id: &Number) -> Result<Value> {
let timer = self.stats
let timer = self
.stats
.latency
.with_label_values(&[method])
.start_timer();
@ -276,7 +279,8 @@ impl Connection {
}
fn update_subscriptions(&mut self) -> Result<Vec<Value>> {
let timer = self.stats
let timer = self
.stats
.latency
.with_label_values(&["periodic_update"])
.start_timer();
@ -340,7 +344,8 @@ impl Connection {
self.send_values(&[reply])?
}
Message::PeriodicUpdate => {
let values = self.update_subscriptions()
let values = self
.update_subscriptions()
.chain_err(|| "failed to update subscriptions")?;
self.send_values(&values)?
}
@ -360,7 +365,8 @@ impl Connection {
return Ok(());
} else {
match String::from_utf8(line) {
Ok(req) => tx.send(Message::Request(req))
Ok(req) => tx
.send(Message::Request(req))
.chain_err(|| "channel closed")?,
Err(err) => {
let _ = tx.send(Message::Done);

Loading…
Cancel
Save