diff --git a/electrumx_rpc.py b/electrumx_rpc.py index e292ec5..68fc74c 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -6,16 +6,29 @@ import argparse import asyncio import json +from functools import partial from os import environ -import aiohttp +class RPCClient(asyncio.Protocol): -async def send(url, payload): - data = json.dumps(payload) + def __init__(self, loop): + self.loop = loop - async with aiohttp.post(url, data = data) as resp: - return await resp.json() + def connection_made(self, transport): + self.transport = transport + + def connection_lost(self, exc): + self.loop.stop() + + def send(self, payload): + data = json.dumps(payload) + '\n' + self.transport.write(data.encode()) + + def data_received(self, data): + payload = json.loads(data.decode()) + self.transport.close() + print(json.dumps(payload, indent=4, sort_keys=True)) def main(): @@ -30,18 +43,20 @@ def main(): if args.port is None: args.port = int(environ.get('ELECTRUMX_RPC_PORT', 8000)) - url = 'http://127.0.0.1:{:d}/'.format(args.port) payload = {'method': args.command[0], 'params': args.command[1:]} - task = send(url, payload) loop = asyncio.get_event_loop() + proto_factory = partial(RPCClient, loop) + coro = loop.create_connection(proto_factory, 'localhost', args.port) try: - result = loop.run_until_complete(task) + transport, protocol = loop.run_until_complete(coro) + protocol.send(payload) + loop.run_forever() + except OSError: + print('error connecting - is ElectrumX running?') finally: loop.close() - print(result) - if __name__ == '__main__': main() diff --git a/lib/hash.py b/lib/hash.py index 28fb399..2cae68b 100644 --- a/lib/hash.py +++ b/lib/hash.py @@ -31,6 +31,13 @@ def hash160(x): return ripemd160(sha256(x)) +def hash_to_str(x): + '''Converts a big-endian binary hash to a little-endian hex string, as + shown in block explorers, etc. + ''' + return bytes(reversed(x)).hex() + + class InvalidBase58String(Exception): pass diff --git a/lib/util.py b/lib/util.py index 13be38a..552a352 100644 --- a/lib/util.py +++ b/lib/util.py @@ -2,10 +2,18 @@ # and warranty status of this software. import array +import logging import sys from collections import Container, Mapping +class LoggedClass(object): + + def __init__(self): + self.logger = logging.getLogger(self.__class__.__name__) + self.logger.setLevel(logging.INFO) + + # Method decorator. To be used for calculations that will always # deliver the same result. The method cannot take any arguments # and should be accessed as an attribute. diff --git a/query.py b/query.py index 1d8b462..e391e39 100755 --- a/query.py +++ b/query.py @@ -8,6 +8,7 @@ import sys from server.env import Env from server.db import DB +from lib.hash import hash_to_str def main(): @@ -27,13 +28,13 @@ def main(): n = None for n, (tx_hash, height) in enumerate(db.get_history(hash168, limit)): print('History #{:d}: hash: {} height: {:d}' - .format(n + 1, bytes(reversed(tx_hash)).hex(), height)) + .format(n + 1, hash_to_str(tx_hash), height)) if n is None: print('No history') n = None for n, utxo in enumerate(db.get_utxos(hash168, limit)): print('UTXOs #{:d}: hash: {} pos: {:d} height: {:d} value: {:d}' - .format(n + 1, bytes(reversed(utxo.tx_hash)).hex(), + .format(n + 1, hash_to_str(utxo.tx_hash), utxo.tx_pos, utxo.height, utxo.value)) if n is None: print('No UTXOs') diff --git a/samples/scripts/env/RPC_HOST b/samples/scripts/env/DAEMON_HOST similarity index 100% rename from samples/scripts/env/RPC_HOST rename to samples/scripts/env/DAEMON_HOST diff --git a/samples/scripts/env/RPC_PASSWORD b/samples/scripts/env/DAEMON_PASSWORD similarity index 100% rename from samples/scripts/env/RPC_PASSWORD rename to samples/scripts/env/DAEMON_PASSWORD diff --git a/samples/scripts/env/DAEMON_PORT b/samples/scripts/env/DAEMON_PORT new file mode 100644 index 0000000..ffa23b5 --- /dev/null +++ b/samples/scripts/env/DAEMON_PORT @@ -0,0 +1 @@ +8332 diff --git a/samples/scripts/env/RPC_USERNAME b/samples/scripts/env/DAEMON_USERNAME similarity index 100% rename from samples/scripts/env/RPC_USERNAME rename to samples/scripts/env/DAEMON_USERNAME diff --git a/samples/scripts/env/RPC_PORT b/samples/scripts/env/RPC_PORT index ffa23b5..e002b36 100644 --- a/samples/scripts/env/RPC_PORT +++ b/samples/scripts/env/RPC_PORT @@ -1 +1 @@ -8332 +8000 diff --git a/samples/scripts/env/SSL_PORT b/samples/scripts/env/SSL_PORT new file mode 100644 index 0000000..d1c5b6b --- /dev/null +++ b/samples/scripts/env/SSL_PORT @@ -0,0 +1 @@ +50002 diff --git a/samples/scripts/env/TCP_PORT b/samples/scripts/env/TCP_PORT new file mode 100644 index 0000000..e69de29 diff --git a/server/server.py b/server/controller.py similarity index 61% rename from server/server.py rename to server/controller.py index c51d992..efd097c 100644 --- a/server/server.py +++ b/server/controller.py @@ -3,76 +3,134 @@ import asyncio import json -import logging import signal +import traceback from functools import partial import aiohttp from server.db import DB -from server.rpc import ElectrumRPCServer +from server.protocol import ElectrumX, LocalRPC +from lib.hash import sha256, hash_to_str, Base58 +from lib.util import LoggedClass -class Server(object): +class Controller(LoggedClass): def __init__(self, env): + super().__init__() self.env = env self.db = DB(env) self.block_cache = BlockCache(env, self.db) - self.rpc_server = ElectrumRPCServer(self) + self.servers = [] + self.sessions = set() + self.addresses = {} + self.jobs = set() + self.peers = {} + + def start(self, loop): + env = self.env + + protocol = partial(LocalRPC, self) + if env.rpc_port is not None: + host = 'localhost' + rpc_server = loop.create_server(protocol, host, env.rpc_port) + self.servers.append(loop.run_until_complete(rpc_server)) + self.logger.info('RPC server listening on {}:{:d}' + .format(host, env.rpc_port)) + + protocol = partial(ElectrumX, self, env) + if env.tcp_port is not None: + tcp_server = loop.create_server(protocol, env.host, env.tcp_port) + self.servers.append(loop.run_until_complete(tcp_server)) + self.logger.info('TCP server listening on {}:{:d}' + .format(env.host, env.tcp_port)) + + if env.ssl_port is not None: + ssl_server = loop.create_server(protocol, env.host, env.ssl_port) + self.servers.append(loop.run_until_complete(ssl_server)) + self.logger.info('SSL server listening on {}:{:d}' + .format(env.host, env.ssl_port)) + + coros = [ + self.reap_jobs(), + self.block_cache.catch_up(), + self.block_cache.process_cache() + ] + + self.tasks = [asyncio.ensure_future(coro) for coro in coros] # Signal handlers - loop = asyncio.get_event_loop() for signame in ('SIGINT', 'SIGTERM'): loop.add_signal_handler(getattr(signal, signame), partial(self.on_signal, signame)) - coros = self.rpc_server.tasks(env.electrumx_rpc_port) - coros += [self.block_cache.catch_up(), - self.block_cache.process_cache()] - self.tasks = [asyncio.ensure_future(coro) for coro in coros] + return self.tasks - async def handle_rpc_getinfo(self, params): - return None, { - 'blocks': self.db.height, - 'peers': 0, - 'sessions': 0, - 'watched': 0, - 'cached': 0, - } + def stop(self): + for server in self.servers: + server.close() - async def handle_rpc_sessions(self, params): - return None, [] + def add_session(self, session): + self.sessions.add(session) - async def handle_rpc_numsessions(self, params): - return None, 0 + def remove_session(self, session): + self.sessions.remove(session) - async def handle_rpc_peers(self, params): - return None, [] + def add_job(self, coro): + '''Queue a job for asynchronous processing.''' + self.jobs.add(asyncio.ensure_future(coro)) - async def handle_rpc_banner_update(self, params): - return None, 'FIXME' + async def reap_jobs(self): + while True: + jobs = set() + for job in self.jobs: + if job.done(): + try: + job.result() + except Exception as e: + traceback.print_exc() + else: + jobs.add(job) + self.logger.info('reaped {:d} jobs, {:d} jobs pending' + .format(len(self.jobs) - len(jobs), len(jobs))) + self.jobs = jobs + await asyncio.sleep(5) def on_signal(self, signame): - logging.warning('received {} signal, preparing to shut down' - .format(signame)) + self.logger.warning('received {} signal, preparing to shut down' + .format(signame)) for task in self.tasks: task.cancel() - def async_tasks(self): - return self.tasks + def address_status(self, hash168): + '''Returns status as 32 bytes.''' + status = self.addresses.get(hash168) + if status is None: + status = ''.join( + '{}:{:d}:'.format(hash_to_str(tx_hash), height) + for tx_hash, height in self.db.get_history(hash168) + ) + if status: + status = sha256(status.encode()) + self.addresses[hash168] = status + + return status + def get_peers(self): + '''Returns a dictionary of IRC nick to (ip, host, ports) tuples, one + per peer.''' + return self.peers -class BlockCache(object): + +class BlockCache(LoggedClass): '''Requests blocks ahead of time from the daemon. Serves them to the blockchain processor.''' def __init__(self, env, db): - self.logger = logging.getLogger('BlockCache') - self.logger.setLevel(logging.INFO) - + super().__init__() self.db = db - self.rpc_url = env.rpc_url + self.daemon_url = env.daemon_url # Cache target size is in MB. Has little effect on sync time. self.cache_limit = 10 self.daemon_height = 0 @@ -82,7 +140,7 @@ class BlockCache(object): self.recent_sizes = [] self.ave_size = 0 - self.logger.info('using RPC URL {}'.format(self.rpc_url)) + self.logger.info('using daemon URL {}'.format(self.daemon_url)) async def process_cache(self): while True: @@ -173,7 +231,7 @@ class BlockCache(object): data = json.dumps(payload) while True: try: - async with aiohttp.post(self.rpc_url, data = data) as resp: + async with aiohttp.post(self.daemon_url, data = data) as resp: result = await resp.json() except asyncio.CancelledError: raise diff --git a/server/db.py b/server/db.py index 3a5dae7..b87249f 100644 --- a/server/db.py +++ b/server/db.py @@ -7,16 +7,16 @@ import itertools import os import struct import time -from binascii import hexlify, unhexlify from bisect import bisect_right from collections import defaultdict, namedtuple from functools import partial -import logging import plyvel -from lib.coins import Bitcoin from lib.script import ScriptPubKey +from lib.util import LoggedClass +from lib.hash import hash_to_str + # History can hold approx. 65536 * HIST_ENTRIES_PER_KEY entries HIST_ENTRIES_PER_KEY = 1024 @@ -25,14 +25,13 @@ ADDR_TX_HASH_LEN = 4 UTXO_TX_HASH_LEN = 4 UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") - def formatted_time(t): t = int(t) return '{:d}d {:02d}h {:02d}m {:02d}s'.format( t // 86400, (t % 86400) // 3600, (t % 3600) // 60, t % 60) -class UTXOCache(object): +class UTXOCache(LoggedClass): '''An in-memory UTXO cache, representing all changes to UTXO state since the last DB flush. @@ -85,8 +84,7 @@ class UTXOCache(object): ''' def __init__(self, parent, db, coin): - self.logger = logging.getLogger('UTXO') - self.logger.setLevel(logging.INFO) + super().__init__() self.parent = parent self.coin = coin self.cache = {} @@ -126,7 +124,7 @@ class UTXOCache(object): # d5d27987d2a3dfc724e359870c6644b40e497bdc0589a033220fe15429d88599 #if key in self.cache: # self.logger.info('duplicate tx hash {}' - # .format(bytes(reversed(tx_hash)).hex())) + # .format(hash_to_str(tx_hash))) self.cache[key] = hash168 + tx_numb + pack('