Browse Source

Add an RPC server

And associated electrumx_rpc command
master
Neil Booth 8 years ago
parent
commit
0b01026b88
  1. 47
      electrumx_rpc.py
  2. 0
      query.py
  3. 1
      server/env.py
  4. 60
      server/rpc.py
  5. 40
      server/server.py

47
electrumx_rpc.py

@ -0,0 +1,47 @@
#!/usr/bin/env python3
# See the file "LICENSE" for information about the copyright
# and warranty status of this software.
import argparse
import asyncio
import json
from os import environ
import aiohttp
async def send(url, payload):
data = json.dumps(payload)
async with aiohttp.post(url, data = data) as resp:
return await resp.json()
def main():
'''Send the RPC command to the server and print the result.'''
parser = argparse.ArgumentParser()
parser.add_argument('--port', metavar='port_num', type=int,
help='specify the RPC port number')
parser.add_argument('command', nargs='*', default=[],
help='send a command to the server')
args = parser.parse_args()
if args.port is None:
args.port = int(environ.get('ELECTRUMX_RPC_PORT', 8000))
url = 'http://127.0.0.1:{:d}/'.format(args.port)
payload = {'method': args.command[0], 'params': args.command[1:]}
task = send(url, payload)
loop = asyncio.get_event_loop()
try:
result = loop.run_until_complete(task)
finally:
loop.close()
print(result)
if __name__ == '__main__':
main()

0
query.py

1
server/env.py

@ -22,6 +22,7 @@ class Env(object):
self.db_dir = self.required('DB_DIRECTORY')
self.utxo_MB = self.integer('UTXO_MB', 1000)
self.hist_MB = self.integer('HIST_MB', 250)
self.electrumx_rpc_port = self.integer('ELECTRUMX_RPC_PORT', 8000)
self.rpc_url = self.build_rpc_url()
def default(self, envvar, default):

60
server/rpc.py

@ -0,0 +1,60 @@
# See the file "LICENSE" for information about the copyright
# and warranty status of this software.
import logging
import traceback
from aiohttp import web
class ElectrumRPCServer(object):
'''ElectrumX's RPC server for localhost.'''
def __init__(self, server):
self.logger = logging.getLogger('RPCServer')
self.logger.setLevel(logging.INFO)
self.server = server
async def request_handler(self, request):
json_request = await request.json()
try:
err, result = await self.json_handler(json_request)
except Exception as e:
traceback.print_exc()
err, result = 1, 'caught exception: {}'.format(e)
id_ = request.get('id')
if err is None:
response = {
'id': id_,
'error': None,
'result': result,
}
else:
response = {
'id': id_,
'error': {'code': err, 'message': result},
'result': None,
}
return web.json_response(response)
async def json_handler(self, request):
method = request.get('method')
id_ = request.get('id')
params = request.get('params', [])
handler = getattr(self.server, 'handle_rpc_{}'.format(method), None)
if not handler:
return 1, 'unknown method "{}"'.format(method)
else:
return await handler(params)
def tasks(self, port):
self.logger.info('listening on port {:d}'.format(port))
app = web.Application()
app.router.add_post('/', self.request_handler)
host = '0.0.0.0'
loop = app.loop
handler = app.make_handler()
server = loop.create_server(handler, host, port)
return [server, app.startup()]

40
server/server.py

@ -2,16 +2,13 @@
# and warranty status of this software.
import asyncio
import json
import logging
import os
import signal
import time
from functools import partial
import aiohttp
from server.db import DB
from server.rpc import ElectrumRPCServer
class Server(object):
@ -20,16 +17,40 @@ class Server(object):
self.env = env
self.db = DB(env)
self.block_cache = BlockCache(env, self.db)
self.tasks = [
asyncio.ensure_future(self.block_cache.catch_up()),
asyncio.ensure_future(self.block_cache.process_cache()),
]
self.rpc_server = ElectrumRPCServer(self)
# Signal handlers
loop = asyncio.get_event_loop()
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame),
partial(self.on_signal, signame))
coros = self.rpc_server.tasks(env.electrumx_rpc_port)
coros += [self.block_cache.catch_up(),
self.block_cache.process_cache()]
self.tasks = [asyncio.ensure_future(coro) for coro in coros]
async def handle_rpc_getinfo(self, params):
return None, {
'blocks': self.db.height,
'peers': 0,
'sessions': 0,
'watched': 0,
'cached': 0,
}
async def handle_rpc_sessions(self, params):
return None, []
async def handle_rpc_numsessions(self, params):
return None, 0
async def handle_rpc_peers(self, params):
return None, []
async def handle_rpc_banner_update(self, params):
return None, 'FIXME'
def on_signal(self, signame):
logging.warning('received {} signal, preparing to shut down'
.format(signame))
@ -150,8 +171,7 @@ class BlockCache(object):
data = json.dumps(payload)
while True:
try:
async with aiohttp.request('POST', self.rpc_url,
data = data) as resp:
async with aiohttp.post(self.rpc_url, data = data) as resp:
result = await resp.json()
except asyncio.CancelledError:
raise

Loading…
Cancel
Save