You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

444 lines
16 KiB

use bitcoin::blockdata::transaction::Transaction;
use bitcoin::network::serialize::{deserialize, serialize};
use bitcoin::util::hash::Sha256dHash;
use error_chain::ChainedError;
use hex;
7 years ago
use serde_json::{from_str, Number, Value};
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender, TrySendError};
use std::sync::{Arc, Mutex};
use std::thread;
7 years ago
use metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics};
use query::Query;
use util::HeaderEntry;
7 years ago
use errors::*;
7 years ago
// TODO: Sha256dHash should be a generic hash-container (since script hash is single SHA256)
fn hash_from_value(val: Option<&Value>) -> Result<Sha256dHash> {
let script_hash = val.chain_err(|| "missing hash")?;
let script_hash = script_hash.as_str().chain_err(|| "non-string hash")?;
let script_hash = Sha256dHash::from_hex(script_hash).chain_err(|| "non-hex hash")?;
Ok(script_hash)
}
fn usize_from_value(val: Option<&Value>, name: &str) -> Result<usize> {
let val = val.chain_err(|| format!("missing {}", name))?;
let val = val.as_u64().chain_err(|| format!("non-integer {}", name))?;
Ok(val as usize)
}
fn jsonify_header(entry: &HeaderEntry) -> Value {
let header = entry.header();
json!({
"block_height": entry.height(),
"version": header.version,
"prev_block_hash": header.prev_blockhash.be_hex_string(),
"merkle_root": header.merkle_root.be_hex_string(),
"timestamp": header.time,
"bits": header.bits,
"nonce": header.nonce
})
}
struct Connection {
query: Arc<Query>,
last_header_entry: Option<HeaderEntry>,
status_hashes: HashMap<Sha256dHash, Value>, // ScriptHash -> StatusHash
stream: TcpStream,
addr: SocketAddr,
chan: Channel,
stats: Arc<Stats>,
}
impl Connection {
pub fn new(
query: Arc<Query>,
stream: TcpStream,
addr: SocketAddr,
stats: Arc<Stats>,
) -> Connection {
Connection {
query,
last_header_entry: None, // disable header subscription for now
status_hashes: HashMap::new(),
stream,
addr,
chan: Channel::new(),
stats,
}
}
fn blockchain_headers_subscribe(&mut self) -> Result<Value> {
let entry = self.query.get_best_header()?;
let hex_header = hex::encode(serialize(entry.header()).unwrap());
let result = json!({"hex": hex_header, "height": entry.height()});
self.last_header_entry = Some(entry);
Ok(result)
}
7 years ago
fn server_version(&self) -> Result<Value> {
Ok(json!(["RustElectrum 0.1.0", "1.2"]))
}
7 years ago
fn server_banner(&self) -> Result<Value> {
Ok(json!("Welcome to RustElectrum Server!\n"))
}
7 years ago
fn server_donation_address(&self) -> Result<Value> {
Ok(Value::Null)
}
7 years ago
fn server_peers_subscribe(&self) -> Result<Value> {
Ok(json!([]))
}
7 years ago
fn mempool_get_fee_histogram(&self) -> Result<Value> {
Ok(json!(self.query.get_fee_histogram()))
}
7 years ago
fn blockchain_block_headers(&self, params: &[Value]) -> Result<Value> {
let start_height = usize_from_value(params.get(0), "start_height")?;
let count = usize_from_value(params.get(1), "count")?;
let heights: Vec<usize> = (start_height..(start_height + count)).collect();
let headers: Vec<String> = self.query
.get_headers(&heights)
.into_iter()
.map(|entry| hex::encode(&serialize(entry.header()).unwrap()))
.collect();
Ok(json!({
"count": headers.len(),
"hex": headers.join(""),
"max": 2016,
}))
}
fn blockchain_block_get_header(&self, params: &[Value]) -> Result<Value> {
let height = usize_from_value(params.get(0), "missing height")?;
let mut entries = self.query.get_headers(&[height]);
let entry = entries
.pop()
.chain_err(|| format!("missing header #{}", height))?;
assert_eq!(entries.len(), 0);
Ok(json!(jsonify_header(&entry)))
}
fn blockchain_estimatefee(&self, params: &[Value]) -> Result<Value> {
let blocks_count = usize_from_value(params.get(0), "blocks_count")?;
let fee_rate = self.query.estimate_fee(blocks_count); // in BTC/kB
Ok(json!(fee_rate))
}
7 years ago
fn blockchain_relayfee(&self) -> Result<Value> {
Ok(json!(0.0)) // allow sending transactions with any fee.
}
fn blockchain_scripthash_subscribe(&mut self, params: &[Value]) -> Result<Value> {
let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
let status = self.query.status(&script_hash[..])?;
let result = status.hash().map_or(Value::Null, |h| json!(hex::encode(h)));
self.status_hashes.insert(script_hash, result.clone());
Ok(result)
}
7 years ago
fn blockchain_scripthash_get_balance(&self, params: &[Value]) -> Result<Value> {
let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
let status = self.query.status(&script_hash[..])?;
Ok(
json!({ "confirmed": status.confirmed_balance(), "unconfirmed": status.mempool_balance() }),
)
}
7 years ago
fn blockchain_scripthash_get_history(&self, params: &[Value]) -> Result<Value> {
let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
let status = self.query.status(&script_hash[..])?;
Ok(json!(Value::Array(
status
.history()
.into_iter()
.map(|item| json!({"height": item.0, "tx_hash": item.1.be_hex_string()}))
.collect()
)))
}
7 years ago
fn blockchain_transaction_broadcast(&self, params: &[Value]) -> Result<Value> {
let tx = params.get(0).chain_err(|| "missing tx")?;
let tx = tx.as_str().chain_err(|| "non-string tx")?;
let tx = hex::decode(&tx).chain_err(|| "non-hex tx")?;
let tx: Transaction = deserialize(&tx).chain_err(|| "failed to parse tx")?;
let txid = self.query.broadcast(&tx)?;
Ok(json!(txid.be_hex_string()))
}
fn blockchain_transaction_get(&self, params: &[Value]) -> Result<Value> {
7 years ago
// TODO: handle 'verbose' param
let tx_hash = hash_from_value(params.get(0)).chain_err(|| "bad tx_hash")?;
let tx = self.query.load_txn(&tx_hash, /*blockhash=*/ None)?;
Ok(json!(hex::encode(&serialize(&tx).unwrap())))
7 years ago
}
fn blockchain_transaction_get_merkle(&self, params: &[Value]) -> Result<Value> {
let tx_hash = hash_from_value(params.get(0)).chain_err(|| "bad tx_hash")?;
let height = usize_from_value(params.get(1), "height")?;
let (merkle, pos) = self.query
.get_merkle_proof(&tx_hash, height)
.chain_err(|| "cannot create merkle proof")?;
let merkle: Vec<String> = merkle
.into_iter()
.map(|txid| txid.be_hex_string())
.collect();
Ok(json!({
"block_height": height,
"merkle": merkle,
"pos": pos}))
}
7 years ago
fn handle_command(&mut self, method: &str, params: &[Value], id: &Number) -> Result<Value> {
let timer = self.stats
.latency
.with_label_values(&[method])
.start_timer();
let result = match method {
"blockchain.headers.subscribe" => self.blockchain_headers_subscribe(),
"server.version" => self.server_version(),
"server.banner" => self.server_banner(),
"server.donation_address" => self.server_donation_address(),
"server.peers.subscribe" => self.server_peers_subscribe(),
"mempool.get_fee_histogram" => self.mempool_get_fee_histogram(),
"blockchain.block.headers" => self.blockchain_block_headers(&params),
"blockchain.block.get_header" => self.blockchain_block_get_header(&params),
"blockchain.estimatefee" => self.blockchain_estimatefee(&params),
"blockchain.relayfee" => self.blockchain_relayfee(),
"blockchain.scripthash.subscribe" => self.blockchain_scripthash_subscribe(&params),
"blockchain.scripthash.get_balance" => self.blockchain_scripthash_get_balance(&params),
"blockchain.scripthash.get_history" => self.blockchain_scripthash_get_history(&params),
"blockchain.transaction.broadcast" => self.blockchain_transaction_broadcast(&params),
"blockchain.transaction.get" => self.blockchain_transaction_get(&params),
"blockchain.transaction.get_merkle" => self.blockchain_transaction_get_merkle(&params),
&_ => bail!("unknown method {} {:?}", method, params),
};
timer.observe_duration();
// TODO: return application errors should be sent to the client
Ok(match result {
Ok(result) => json!({"jsonrpc": "2.0", "id": id, "result": result}),
Err(e) => {
7 years ago
warn!(
"rpc #{} {} {:?} failed: {}",
id,
method,
params,
e.display_chain()
);
json!({"jsonrpc": "2.0", "id": id, "error": format!("{}", e)})
}
})
}
fn update_subscriptions(&mut self) -> Result<Vec<Value>> {
let timer = self.stats
.latency
.with_label_values(&["periodic_update"])
.start_timer();
let mut result = vec![];
if let Some(ref mut last_entry) = self.last_header_entry {
let entry = self.query.get_best_header()?;
if *last_entry != entry {
*last_entry = entry;
let hex_header = hex::encode(serialize(last_entry.header()).unwrap());
let header = json!({"hex": hex_header, "height": last_entry.height()});
result.push(json!({
"jsonrpc": "2.0",
"method": "blockchain.headers.subscribe",
"params": [header]}));
}
}
for (script_hash, status_hash) in self.status_hashes.iter_mut() {
let status = self.query.status(&script_hash[..])?;
let new_status_hash = status.hash().map_or(Value::Null, |h| json!(hex::encode(h)));
if new_status_hash == *status_hash {
continue;
}
result.push(json!({
"jsonrpc": "2.0",
"method": "blockchain.scripthash.subscribe",
"params": [script_hash.be_hex_string(), new_status_hash]}));
*status_hash = new_status_hash;
}
timer.observe_duration();
self.stats
.subscriptions
.set(self.status_hashes.len() as i64);
Ok(result)
}
fn send_values(&mut self, values: &[Value]) -> Result<()> {
for value in values {
let line = value.to_string() + "\n";
self.stream
.write_all(line.as_bytes())
.chain_err(|| format!("failed to send {}", value))?;
}
Ok(())
}
fn handle_replies(&mut self) -> Result<()> {
loop {
let msg = self.chan.receiver().recv().chain_err(|| "channel closed")?;
match msg {
Message::Request(line) => {
let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
let reply = match (cmd.get("method"), cmd.get("params"), cmd.get("id")) {
(
Some(&Value::String(ref method)),
Some(&Value::Array(ref params)),
Some(&Value::Number(ref id)),
) => self.handle_command(method, params, id)?,
_ => bail!("invalid command: {}", cmd),
};
self.send_values(&[reply])?
}
Message::PeriodicUpdate => {
let values = self.update_subscriptions()
.chain_err(|| "failed to update subscriptions")?;
self.send_values(&values)?
}
Message::Done => {
debug!("done");
break;
}
}
}
Ok(())
}
fn handle_requests(mut reader: BufReader<TcpStream>, tx: SyncSender<Message>) {
loop {
let mut line = String::new();
reader
.read_line(&mut line) // TODO: use .lines() iterator
.expect("failed to read a request");
if line.is_empty() {
tx.send(Message::Done).expect("channel closed");
break;
} else {
tx.send(Message::Request(line)).expect("channel closed");
}
}
}
pub fn run(mut self) {
let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
let tx = self.chan.sender();
let child = thread::spawn(|| Connection::handle_requests(reader, tx));
if let Err(e) = self.handle_replies() {
error!(
"[{}] connection handling failed: {}",
self.addr,
e.display_chain().to_string()
);
}
info!("[{}] shutting down connection", self.addr);
let _ = self.stream.shutdown(Shutdown::Both);
if child.join().is_err() {
error!("[{}] receiver panicked", self.addr);
}
}
}
pub enum Message {
Request(String),
PeriodicUpdate,
Done,
}
pub struct Channel {
tx: SyncSender<Message>,
rx: Receiver<Message>,
}
impl Channel {
pub fn new() -> Channel {
let (tx, rx) = sync_channel(10);
Channel { tx, rx }
}
pub fn sender(&self) -> SyncSender<Message> {
self.tx.clone()
}
pub fn receiver(&self) -> &Receiver<Message> {
&self.rx
7 years ago
}
}
pub struct RPC {
notification: Sender<()>,
}
struct Stats {
latency: HistogramVec,
subscriptions: Gauge,
}
impl RPC {
fn start_notification_worker(
receiver: Receiver<()>,
senders: Arc<Mutex<Vec<SyncSender<Message>>>>,
) {
thread::spawn(move || {
for _ in receiver.iter() {
let mut senders = senders.lock().unwrap();
for sender in senders.split_off(0) {
if let Err(TrySendError::Disconnected(_)) =
sender.try_send(Message::PeriodicUpdate)
{
continue;
}
senders.push(sender);
}
}
});
}
pub fn start(addr: SocketAddr, query: Arc<Query>, metrics: &Metrics) -> RPC {
let stats = Arc::new(Stats {
latency: metrics.histogram(
HistogramOpts::new("electrum_rpc", "Electrum RPC latency (seconds)"),
&["method"],
),
subscriptions: metrics.gauge(MetricOpts::new(
"electrum_subscriptions",
"# of Electrum subscriptions",
)),
});
let (tx, rx) = channel();
let listener = TcpListener::bind(addr).expect(&format!("bind({}) failed", addr));
info!("RPC server running on {}", addr);
thread::spawn(move || {
let senders = Arc::new(Mutex::new(Vec::<SyncSender<Message>>::new()));
RPC::start_notification_worker(rx, senders.clone());
loop {
let (stream, addr) = listener.accept().expect("accept failed");
let query = query.clone();
let senders = senders.clone();
let stats = stats.clone();
thread::spawn(move || {
info!("[{}] connected peer", addr);
let conn = Connection::new(query, stream, addr, stats);
senders.lock().unwrap().push(conn.chan.sender());
conn.run();
info!("[{}] disconnected peer", addr);
});
}
});
RPC { notification: tx }
}
pub fn notify(&self) {
self.notification.send(()).unwrap();
}
}