diff --git a/electrumx_rpc.py b/electrumx_rpc.py new file mode 100755 index 0000000..216b5ca --- /dev/null +++ b/electrumx_rpc.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 + +# See the file "LICENSE" for information about the copyright +# and warranty status of this software. + +import argparse +import asyncio +import json +from os import environ + +import aiohttp + + +async def send(url, payload): + data = json.dumps(payload) + + async with aiohttp.post(url, data = data) as resp: + return await resp.json() + + +def main(): + '''Send the RPC command to the server and print the result.''' + parser = argparse.ArgumentParser() + parser.add_argument('--port', metavar='port_num', type=int, + help='specify the RPC port number') + parser.add_argument('command', nargs='*', default=[], + help='send a command to the server') + args = parser.parse_args() + + if args.port is None: + args.port = int(environ.get('ELECTRUMX_RPC_PORT', 8000)) + + url = 'http://127.0.0.1:{:d}/'.format(args.port) + payload = {'method': args.command[0], 'params': args.command[1:]} + task = send(url, payload) + + loop = asyncio.get_event_loop() + try: + result = loop.run_until_complete(task) + finally: + loop.close() + + print(result) + + +if __name__ == '__main__': + main() diff --git a/query.py b/query.py old mode 100644 new mode 100755 diff --git a/server/env.py b/server/env.py index 9507c96..4315934 100644 --- a/server/env.py +++ b/server/env.py @@ -22,6 +22,7 @@ class Env(object): self.db_dir = self.required('DB_DIRECTORY') self.utxo_MB = self.integer('UTXO_MB', 1000) self.hist_MB = self.integer('HIST_MB', 250) + self.electrumx_rpc_port = self.integer('ELECTRUMX_RPC_PORT', 8000) self.rpc_url = self.build_rpc_url() def default(self, envvar, default): diff --git a/server/rpc.py b/server/rpc.py new file mode 100644 index 0000000..9246d6c --- /dev/null +++ b/server/rpc.py @@ -0,0 +1,60 @@ +# See the file "LICENSE" for information about the copyright +# and warranty status of this software. + +import logging +import traceback + +from aiohttp import web + + +class ElectrumRPCServer(object): + '''ElectrumX's RPC server for localhost.''' + + def __init__(self, server): + self.logger = logging.getLogger('RPCServer') + self.logger.setLevel(logging.INFO) + self.server = server + + async def request_handler(self, request): + json_request = await request.json() + try: + err, result = await self.json_handler(json_request) + except Exception as e: + traceback.print_exc() + err, result = 1, 'caught exception: {}'.format(e) + + id_ = request.get('id') + if err is None: + response = { + 'id': id_, + 'error': None, + 'result': result, + } + else: + response = { + 'id': id_, + 'error': {'code': err, 'message': result}, + 'result': None, + } + + return web.json_response(response) + + async def json_handler(self, request): + method = request.get('method') + id_ = request.get('id') + params = request.get('params', []) + handler = getattr(self.server, 'handle_rpc_{}'.format(method), None) + if not handler: + return 1, 'unknown method "{}"'.format(method) + else: + return await handler(params) + + def tasks(self, port): + self.logger.info('listening on port {:d}'.format(port)) + app = web.Application() + app.router.add_post('/', self.request_handler) + host = '0.0.0.0' + loop = app.loop + handler = app.make_handler() + server = loop.create_server(handler, host, port) + return [server, app.startup()] diff --git a/server/server.py b/server/server.py index 8efde5f..9eeb4a3 100644 --- a/server/server.py +++ b/server/server.py @@ -2,16 +2,13 @@ # and warranty status of this software. import asyncio -import json import logging -import os import signal import time from functools import partial -import aiohttp - from server.db import DB +from server.rpc import ElectrumRPCServer class Server(object): @@ -20,16 +17,40 @@ class Server(object): self.env = env self.db = DB(env) self.block_cache = BlockCache(env, self.db) - self.tasks = [ - asyncio.ensure_future(self.block_cache.catch_up()), - asyncio.ensure_future(self.block_cache.process_cache()), - ] + self.rpc_server = ElectrumRPCServer(self) + # Signal handlers loop = asyncio.get_event_loop() for signame in ('SIGINT', 'SIGTERM'): loop.add_signal_handler(getattr(signal, signame), partial(self.on_signal, signame)) + coros = self.rpc_server.tasks(env.electrumx_rpc_port) + coros += [self.block_cache.catch_up(), + self.block_cache.process_cache()] + self.tasks = [asyncio.ensure_future(coro) for coro in coros] + + async def handle_rpc_getinfo(self, params): + return None, { + 'blocks': self.db.height, + 'peers': 0, + 'sessions': 0, + 'watched': 0, + 'cached': 0, + } + + async def handle_rpc_sessions(self, params): + return None, [] + + async def handle_rpc_numsessions(self, params): + return None, 0 + + async def handle_rpc_peers(self, params): + return None, [] + + async def handle_rpc_banner_update(self, params): + return None, 'FIXME' + def on_signal(self, signame): logging.warning('received {} signal, preparing to shut down' .format(signame)) @@ -150,8 +171,7 @@ class BlockCache(object): data = json.dumps(payload) while True: try: - async with aiohttp.request('POST', self.rpc_url, - data = data) as resp: + async with aiohttp.post(self.rpc_url, data = data) as resp: result = await resp.json() except asyncio.CancelledError: raise