You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

510 lines
19 KiB

use bitcoin::blockdata::transaction::Transaction;
use bitcoin::network::serialize::{deserialize, serialize};
use bitcoin::util::address::Address;
use bitcoin::util::hash::Sha256dHash;
use error_chain::ChainedError;
use hex;
use serde_json::{from_str, Number, Value};
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::str::FromStr;
use std::sync::mpsc::{Sender, SyncSender, TrySendError};
use std::sync::{Arc, Mutex};
use std::thread;
use index::compute_script_hash;
use metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics};
use query::{Query, Status};
use util::{spawn_thread, Channel, HeaderEntry, SyncChannel};
use errors::*;
// TODO: Sha256dHash should be a generic hash-container (since script hash is single SHA256)
fn hash_from_value(val: Option<&Value>) -> Result<Sha256dHash> {
let script_hash = val.chain_err(|| "missing hash")?;
let script_hash = script_hash.as_str().chain_err(|| "non-string hash")?;
let script_hash = Sha256dHash::from_hex(script_hash).chain_err(|| "non-hex hash")?;
Ok(script_hash)
}
fn usize_from_value(val: Option<&Value>, name: &str) -> Result<usize> {
let val = val.chain_err(|| format!("missing {}", name))?;
let val = val.as_u64().chain_err(|| format!("non-integer {}", name))?;
Ok(val as usize)
}
fn unspent_from_status(status: &Status) -> Value {
json!(Value::Array(
status
.unspent()
.into_iter()
.map(|out| {
json!({
"height": out.height,
"tx_pos": out.output_index,
"tx_hash": out.txn_id.be_hex_string(),
"value": out.value,
})
})
.collect()
))
}
fn jsonify_header(entry: &HeaderEntry) -> Value {
let header = entry.header();
json!({
"block_height": entry.height(),
"version": header.version,
"prev_block_hash": header.prev_blockhash.be_hex_string(),
"merkle_root": header.merkle_root.be_hex_string(),
"timestamp": header.time,
"bits": header.bits,
"nonce": header.nonce
})
}
struct Connection {
query: Arc<Query>,
last_header_entry: Option<HeaderEntry>,
status_hashes: HashMap<Sha256dHash, Value>, // ScriptHash -> StatusHash
stream: TcpStream,
addr: SocketAddr,
chan: SyncChannel<Message>,
stats: Arc<Stats>,
}
impl Connection {
pub fn new(
query: Arc<Query>,
stream: TcpStream,
addr: SocketAddr,
stats: Arc<Stats>,
) -> Connection {
Connection {
query,
last_header_entry: None, // disable header subscription for now
status_hashes: HashMap::new(),
stream,
addr,
chan: SyncChannel::new(10),
stats,
}
}
fn blockchain_headers_subscribe(&mut self) -> Result<Value> {
let entry = self.query.get_best_header()?;
let hex_header = hex::encode(serialize(entry.header()).unwrap());
let result = json!({"hex": hex_header, "height": entry.height()});
self.last_header_entry = Some(entry);
Ok(result)
}
fn server_version(&self) -> Result<Value> {
Ok(json!(["RustElectrum 0.1.0", "1.2"]))
}
fn server_banner(&self) -> Result<Value> {
Ok(json!("Welcome to RustElectrum Server!\n"))
}
fn server_donation_address(&self) -> Result<Value> {
Ok(Value::Null)
}
fn server_peers_subscribe(&self) -> Result<Value> {
Ok(json!([]))
}
fn mempool_get_fee_histogram(&self) -> Result<Value> {
Ok(json!(self.query.get_fee_histogram()))
}
fn blockchain_block_headers(&self, params: &[Value]) -> Result<Value> {
let start_height = usize_from_value(params.get(0), "start_height")?;
let count = usize_from_value(params.get(1), "count")?;
let heights: Vec<usize> = (start_height..(start_height + count)).collect();
let headers: Vec<String> = self.query
.get_headers(&heights)
.into_iter()
.map(|entry| hex::encode(&serialize(entry.header()).unwrap()))
.collect();
Ok(json!({
"count": headers.len(),
"hex": headers.join(""),
"max": 2016,
}))
}
fn blockchain_block_get_header(&self, params: &[Value]) -> Result<Value> {
let height = usize_from_value(params.get(0), "missing height")?;
let mut entries = self.query.get_headers(&[height]);
let entry = entries
.pop()
.chain_err(|| format!("missing header #{}", height))?;
assert_eq!(entries.len(), 0);
Ok(json!(jsonify_header(&entry)))
}
fn blockchain_estimatefee(&self, params: &[Value]) -> Result<Value> {
let blocks_count = usize_from_value(params.get(0), "blocks_count")?;
let fee_rate = self.query.estimate_fee(blocks_count); // in BTC/kB
Ok(json!(fee_rate))
}
fn blockchain_relayfee(&self) -> Result<Value> {
Ok(json!(0.0)) // allow sending transactions with any fee.
}
fn blockchain_scripthash_subscribe(&mut self, params: &[Value]) -> Result<Value> {
let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
let status = self.query.status(&script_hash[..])?;
let result = status.hash().map_or(Value::Null, |h| json!(hex::encode(h)));
self.status_hashes.insert(script_hash, result.clone());
Ok(result)
}
fn blockchain_scripthash_get_balance(&self, params: &[Value]) -> Result<Value> {
let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
let status = self.query.status(&script_hash[..])?;
Ok(
json!({ "confirmed": status.confirmed_balance(), "unconfirmed": status.mempool_balance() }),
)
}
fn blockchain_scripthash_get_history(&self, params: &[Value]) -> Result<Value> {
let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
let status = self.query.status(&script_hash[..])?;
Ok(json!(Value::Array(
status
.history()
.into_iter()
.map(|item| json!({"height": item.0, "tx_hash": item.1.be_hex_string()}))
.collect()
)))
}
fn blockchain_scripthash_listunspent(&self, params: &[Value]) -> Result<Value> {
let script_hash = hash_from_value(params.get(0)).chain_err(|| "bad script_hash")?;
Ok(unspent_from_status(&self.query.status(&script_hash[..])?))
}
fn blockchain_address_listunspent(&self, params: &[Value]) -> Result<Value> {
let address = params
.get(0)
.chain_err(|| "no address")?
.as_str()
.chain_err(|| "non-string address")?;
let addr = Address::from_str(address).chain_err(|| format!("invalid address {}", address))?;
let script_hash = compute_script_hash(&addr.script_pubkey().into_vec());
Ok(unspent_from_status(&self.query.status(&script_hash[..])?))
}
fn blockchain_transaction_broadcast(&self, params: &[Value]) -> Result<Value> {
let tx = params.get(0).chain_err(|| "missing tx")?;
let tx = tx.as_str().chain_err(|| "non-string tx")?;
let tx = hex::decode(&tx).chain_err(|| "non-hex tx")?;
let tx: Transaction = deserialize(&tx).chain_err(|| "failed to parse tx")?;
let txid = self.query.broadcast(&tx)?;
Ok(json!(txid.be_hex_string()))
}
fn blockchain_transaction_get(&self, params: &[Value]) -> Result<Value> {
// TODO: handle 'verbose' param
let tx_hash = hash_from_value(params.get(0)).chain_err(|| "bad tx_hash")?;
let tx = self.query.load_txn(&tx_hash, /*blockhash=*/ None)?;
Ok(json!(hex::encode(&serialize(&tx).unwrap())))
}
fn blockchain_transaction_get_merkle(&self, params: &[Value]) -> Result<Value> {
let tx_hash = hash_from_value(params.get(0)).chain_err(|| "bad tx_hash")?;
let height = usize_from_value(params.get(1), "height")?;
let (merkle, pos) = self.query
.get_merkle_proof(&tx_hash, height)
.chain_err(|| "cannot create merkle proof")?;
let merkle: Vec<String> = merkle
.into_iter()
.map(|txid| txid.be_hex_string())
.collect();
Ok(json!({
"block_height": height,
"merkle": merkle,
"pos": pos}))
}
fn handle_command(&mut self, method: &str, params: &[Value], id: &Number) -> Result<Value> {
let timer = self.stats
.latency
.with_label_values(&[method])
.start_timer();
let result = match method {
"blockchain.headers.subscribe" => self.blockchain_headers_subscribe(),
"server.version" => self.server_version(),
"server.banner" => self.server_banner(),
"server.donation_address" => self.server_donation_address(),
"server.peers.subscribe" => self.server_peers_subscribe(),
"mempool.get_fee_histogram" => self.mempool_get_fee_histogram(),
"blockchain.block.headers" => self.blockchain_block_headers(&params),
"blockchain.block.get_header" => self.blockchain_block_get_header(&params),
"blockchain.estimatefee" => self.blockchain_estimatefee(&params),
"blockchain.relayfee" => self.blockchain_relayfee(),
"blockchain.address.listunspent" => self.blockchain_address_listunspent(&params),
"blockchain.scripthash.subscribe" => self.blockchain_scripthash_subscribe(&params),
"blockchain.scripthash.get_balance" => self.blockchain_scripthash_get_balance(&params),
"blockchain.scripthash.get_history" => self.blockchain_scripthash_get_history(&params),
"blockchain.scripthash.listunspent" => self.blockchain_scripthash_listunspent(&params),
"blockchain.transaction.broadcast" => self.blockchain_transaction_broadcast(&params),
"blockchain.transaction.get" => self.blockchain_transaction_get(&params),
"blockchain.transaction.get_merkle" => self.blockchain_transaction_get_merkle(&params),
&_ => bail!("unknown method {} {:?}", method, params),
};
timer.observe_duration();
// TODO: return application errors should be sent to the client
Ok(match result {
Ok(result) => json!({"jsonrpc": "2.0", "id": id, "result": result}),
Err(e) => {
warn!(
"rpc #{} {} {:?} failed: {}",
id,
method,
params,
e.display_chain()
);
json!({"jsonrpc": "2.0", "id": id, "error": format!("{}", e)})
}
})
}
fn update_subscriptions(&mut self) -> Result<Vec<Value>> {
let timer = self.stats
.latency
.with_label_values(&["periodic_update"])
.start_timer();
let mut result = vec![];
if let Some(ref mut last_entry) = self.last_header_entry {
let entry = self.query.get_best_header()?;
if *last_entry != entry {
*last_entry = entry;
let hex_header = hex::encode(serialize(last_entry.header()).unwrap());
let header = json!({"hex": hex_header, "height": last_entry.height()});
result.push(json!({
"jsonrpc": "2.0",
"method": "blockchain.headers.subscribe",
"params": [header]}));
}
}
for (script_hash, status_hash) in self.status_hashes.iter_mut() {
let status = self.query.status(&script_hash[..])?;
let new_status_hash = status.hash().map_or(Value::Null, |h| json!(hex::encode(h)));
if new_status_hash == *status_hash {
continue;
}
result.push(json!({
"jsonrpc": "2.0",
"method": "blockchain.scripthash.subscribe",
"params": [script_hash.be_hex_string(), new_status_hash]}));
*status_hash = new_status_hash;
}
timer.observe_duration();
self.stats
.subscriptions
.set(self.status_hashes.len() as i64);
Ok(result)
}
fn send_values(&mut self, values: &[Value]) -> Result<()> {
for value in values {
let line = value.to_string() + "\n";
self.stream
.write_all(line.as_bytes())
.chain_err(|| format!("failed to send {}", value))?;
}
Ok(())
}
fn handle_replies(&mut self) -> Result<()> {
loop {
let msg = self.chan.receiver().recv().chain_err(|| "channel closed")?;
trace!("RPC {:?}", msg);
match msg {
Message::Request(line) => {
let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
let reply = match (cmd.get("method"), cmd.get("params"), cmd.get("id")) {
(
Some(&Value::String(ref method)),
Some(&Value::Array(ref params)),
Some(&Value::Number(ref id)),
) => self.handle_command(method, params, id)?,
_ => bail!("invalid command: {}", cmd),
};
self.send_values(&[reply])?
}
Message::PeriodicUpdate => {
let values = self.update_subscriptions()
.chain_err(|| "failed to update subscriptions")?;
self.send_values(&values)?
}
Message::Done => return Ok(()),
}
}
}
fn handle_requests(mut reader: BufReader<TcpStream>, tx: SyncSender<Message>) -> Result<()> {
loop {
let mut line = Vec::<u8>::new();
reader
.read_until(b'\n', &mut line)
.chain_err(|| "failed to read a request")?;
if line.is_empty() {
tx.send(Message::Done).chain_err(|| "channel closed")?;
return Ok(());
} else {
match String::from_utf8(line) {
Ok(req) => tx.send(Message::Request(req))
.chain_err(|| "channel closed")?,
Err(err) => {
let _ = tx.send(Message::Done);
bail!(
"invalid UTF8 {:?}: {:?}",
String::from_utf8_lossy(err.as_bytes()),
err
)
}
}
}
}
}
pub fn run(mut self) {
let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
let tx = self.chan.sender();
let child = spawn_thread("reader", || Connection::handle_requests(reader, tx));
if let Err(e) = self.handle_replies() {
error!(
"[{}] connection handling failed: {}",
self.addr,
e.display_chain().to_string()
);
}
info!("[{}] shutting down connection", self.addr);
let _ = self.stream.shutdown(Shutdown::Both);
if let Err(err) = child.join().expect("receiver panicked") {
error!("[{}] receiver failed: {}", self.addr, err);
}
}
}
#[derive(Debug)]
pub enum Message {
Request(String),
PeriodicUpdate,
Done,
}
pub enum Notification {
Periodic,
Exit,
}
pub struct RPC {
notification: Sender<Notification>,
server: Option<thread::JoinHandle<()>>, // so we can join the server while dropping this ojbect
}
struct Stats {
latency: HistogramVec,
subscriptions: Gauge,
}
impl RPC {
fn start_notifier(
notification: Channel<Notification>,
senders: Arc<Mutex<Vec<SyncSender<Message>>>>,
acceptor: Sender<Option<(TcpStream, SocketAddr)>>,
) {
spawn_thread("notification", move || {
for msg in notification.receiver().iter() {
let mut senders = senders.lock().unwrap();
match msg {
Notification::Periodic => for sender in senders.split_off(0) {
if let Err(TrySendError::Disconnected(_)) =
sender.try_send(Message::PeriodicUpdate)
{
continue;
}
senders.push(sender);
},
Notification::Exit => acceptor.send(None).unwrap(),
}
}
});
}
fn start_acceptor(addr: SocketAddr) -> Channel<Option<(TcpStream, SocketAddr)>> {
let chan = Channel::new();
let acceptor = chan.sender();
spawn_thread("acceptor", move || {
let listener = TcpListener::bind(addr).expect(&format!("bind({}) failed", addr));
info!("RPC server running on {}", addr);
loop {
let (stream, addr) = listener.accept().expect("accept failed");
acceptor.send(Some((stream, addr))).expect("send failed");
}
});
chan
}
pub fn start(addr: SocketAddr, query: Arc<Query>, metrics: &Metrics) -> RPC {
let stats = Arc::new(Stats {
latency: metrics.histogram_vec(
HistogramOpts::new("electrum_rpc", "Electrum RPC latency (seconds)"),
&["method"],
),
subscriptions: metrics.gauge(MetricOpts::new(
"electrum_subscriptions",
"# of Electrum subscriptions",
)),
});
let notification = Channel::new();
let handle = RPC {
notification: notification.sender(),
server: Some(spawn_thread("rpc", move || {
let senders = Arc::new(Mutex::new(Vec::<SyncSender<Message>>::new()));
let acceptor = RPC::start_acceptor(addr);
RPC::start_notifier(notification, senders.clone(), acceptor.sender());
let mut children = vec![];
while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() {
let query = query.clone();
let senders = senders.clone();
let stats = stats.clone();
children.push(spawn_thread("peer", move || {
info!("[{}] connected peer", addr);
let conn = Connection::new(query, stream, addr, stats);
senders.lock().unwrap().push(conn.chan.sender());
conn.run();
info!("[{}] disconnected peer", addr);
}));
}
info!("stopping RPC server");
for sender in senders.lock().unwrap().iter() {
let _ = sender.send(Message::Done);
}
for child in children {
let _ = child.join();
}
})),
};
handle
}
pub fn notify(&self) {
self.notification.send(Notification::Periodic).unwrap();
}
}
impl Drop for RPC {
fn drop(&mut self) {
self.notification.send(Notification::Exit).unwrap();
self.server.take().map(|t| t.join().unwrap());
}
}