Browse Source

Merge branch 'develop'

master 0.10.11
Neil Booth 8 years ago
parent
commit
491080f4c2
  1. 73
      README.rst
  2. 2
      docs/ARCHITECTURE.rst
  3. 8
      docs/ENVIRONMENT.rst
  4. 95
      electrumx_rpc.py
  5. 4
      lib/coins.py
  6. 2
      lib/hash.py
  7. 803
      lib/jsonrpc.py
  8. 2
      lib/script.py
  9. 2
      lib/tx.py
  10. 2
      lib/util.py
  11. 7
      server/block_processor.py
  12. 189
      server/controller.py
  13. 2
      server/env.py
  14. 110
      server/irc.py
  15. 139
      server/peers.py
  16. 170
      server/session.py
  17. 4
      server/storage.py
  18. 2
      server/version.py
  19. 2
      tests/test_util.py

73
README.rst

@ -115,8 +115,6 @@ Roadmap Pre-1.0
- minor code cleanups. - minor code cleanups.
- implement simple protocol to discover peers without resorting to IRC. - implement simple protocol to discover peers without resorting to IRC.
This may slip to post 1.0
Roadmap Post-1.0 Roadmap Post-1.0
================ ================
@ -137,6 +135,33 @@ version prior to the release of 1.0.
ChangeLog ChangeLog
========= =========
Version 0.10.11
---------------
* rewrite of JSON RPC layer to improve usability for clients.
Includes support of JSON RPC v1, v2 and a compat layer that tries to
detect the peer's version.
Version 0.10.10
---------------
* move peer management from irc.py to peers.py. This is preparataion
for peer discovery without IRC.
* misc cleanups
* fix Litecoin genesis hash (petrkr)
Version 0.10.9
--------------
* restore client to sessions output
* cleanup shutdown process; hopefully this resolves the log spew for good
Version 0.10.8
--------------
* fix import for reverse iterator for RocksDB
* fix tests
Version 0.10.7 Version 0.10.7
-------------- --------------
@ -220,46 +245,6 @@ variables to use roughly the same amount of memory.
For now this code should be considered experimental; if you want For now this code should be considered experimental; if you want
stability please stick with the 0.9 series. stability please stick with the 0.9 series.
Version 0.9.23
--------------
* Backport of the fix for issue `#94#` - stale references to old
sessions. This would effectively memory and network handles.
Version 0.9.22
--------------
* documentation updates (ARCHITECTURE.rst, ENVIRONMENT.rst) only.
Version 0.9.21
--------------
* moved RELEASE-NOTES into this README
* document the RPC interface in docs/RPC-INTERFACE.rst
* clean up open DB handling, issue `#89`_
Version 0.9.20
--------------
* fix for IRC flood issue `#93`_
Version 0.9.19
--------------
* move sleep outside semaphore (issue `#88`_)
Version 0.9.18
--------------
* last release of 2016. Just a couple of minor tweaks to logging.
Version 0.9.17
--------------
* have all the DBs use fsync on write; hopefully means DB won't corrupt in
case of a kernel panic (issue `#75`_)
* replace $DONATION_ADDRESS in banner file
**Neil Booth** kyuupichan@gmail.com https://github.com/kyuupichan **Neil Booth** kyuupichan@gmail.com https://github.com/kyuupichan
@ -267,11 +252,7 @@ Version 0.9.17
.. _#72: https://github.com/kyuupichan/electrumx/issues/72 .. _#72: https://github.com/kyuupichan/electrumx/issues/72
.. _#75: https://github.com/kyuupichan/electrumx/issues/75
.. _#88: https://github.com/kyuupichan/electrumx/issues/88
.. _#89: https://github.com/kyuupichan/electrumx/issues/89
.. _#92: https://github.com/kyuupichan/electrumx/issues/92 .. _#92: https://github.com/kyuupichan/electrumx/issues/92
.. _#93: https://github.com/kyuupichan/electrumx/issues/93
.. _#94: https://github.com/kyuupichan/electrumx/issues/94 .. _#94: https://github.com/kyuupichan/electrumx/issues/94
.. _#99: https://github.com/kyuupichan/electrumx/issues/99 .. _#99: https://github.com/kyuupichan/electrumx/issues/99
.. _#100: https://github.com/kyuupichan/electrumx/issues/100 .. _#100: https://github.com/kyuupichan/electrumx/issues/100

2
docs/ARCHITECTURE.rst

@ -36,7 +36,7 @@ Not started until the Block Processor has caught up with bitcoind.
Daemon Daemon
------ ------
Encapsulates the RPC wire protcol with bitcoind for the whole server. Encapsulates the RPC wire protocol with bitcoind for the whole server.
Transparently handles temporary bitcoind connection errors, and fails Transparently handles temporary bitcoind connection errors, and fails
over if necessary. over if necessary.

8
docs/ENVIRONMENT.rst

@ -205,7 +205,8 @@ below are low and encourage you to raise them.
An integer number of seconds defaulting to 600. Sessions with no An integer number of seconds defaulting to 600. Sessions with no
activity for longer than this are disconnected. Properly activity for longer than this are disconnected. Properly
functioning Electrum clients by default will send pings roughly functioning Electrum clients by default will send pings roughly
every 60 seconds. every 60 seconds, and servers doing peer discovery roughly every 300
seconds.
IRC IRC
--- ---
@ -239,8 +240,9 @@ connectivity on IRC:
* **REPORT_HOST_TOR** * **REPORT_HOST_TOR**
The tor .onion address to advertise. If set, an additional The tor address to advertise; must end with `.onion`. If set, an
connection to IRC happens with '_tor" appended to **IRC_NICK**. additional connection to IRC happens with '_tor' appended to
**IRC_NICK**.
* **REPORT_TCP_PORT_TOR** * **REPORT_TCP_PORT_TOR**

95
electrumx_rpc.py

@ -16,45 +16,60 @@ import json
from functools import partial from functools import partial
from os import environ from os import environ
from lib.jsonrpc import JSONRPC from lib.jsonrpc import JSONSession, JSONRPCv2
from server.controller import Controller from server.controller import Controller
class RPCClient(JSONRPC): class RPCClient(JSONSession):
def __init__(self): def __init__(self):
super().__init__() super().__init__(version=JSONRPCv2)
self.queue = asyncio.Queue() self.max_send = 0
self.max_send = 1000000 self.max_buffer_size = 5*10**6
self.event = asyncio.Event()
def enqueue_request(self, request):
self.queue.put_nowait(request) def have_pending_items(self):
self.event.set()
async def wait_for_response(self):
await self.event.wait()
await self.process_pending_items()
def send_rpc_request(self, method, params):
handler = partial(self.handle_response, method)
self.send_request(handler, method, params)
def handle_response(self, method, result, error):
if method in ('groups', 'sessions') and not error:
if method == 'groups':
lines = Controller.groups_text_lines(result)
else:
lines = Controller.sessions_text_lines(result)
for line in lines:
print(line)
elif error:
print('error: {} (code {:d})'
.format(error['message'], error['code']))
else:
print(json.dumps(result, indent=4, sort_keys=True))
async def send_and_wait(self, method, params, timeout=None):
# Raise incoming buffer size - presumably connection is trusted
self.max_buffer_size = 5000000
if params:
params = [params]
payload = self.request_payload(method, id_=method, params=params)
self.encode_and_send_payload(payload)
future = asyncio.ensure_future(self.queue.get()) def rpc_send_and_wait(port, method, params, timeout=15):
for f in asyncio.as_completed([future], timeout=timeout): loop = asyncio.get_event_loop()
coro = loop.create_connection(RPCClient, 'localhost', port)
try: try:
request = await f transport, rpc_client = loop.run_until_complete(coro)
rpc_client.send_rpc_request(method, params)
try:
coro = rpc_client.wait_for_response()
loop.run_until_complete(asyncio.wait_for(coro, timeout))
except asyncio.TimeoutError: except asyncio.TimeoutError:
future.cancel()
print('request timed out after {}s'.format(timeout)) print('request timed out after {}s'.format(timeout))
else: except OSError:
await request.process(self) print('cannot connect - is ElectrumX catching up, not running, or '
'is {:d} the wrong RPC port?'.format(port))
async def handle_response(self, result, error, method): finally:
if result and method in ('groups', 'sessions'): loop.close()
for line in Controller.text_lines(method, result):
print(line)
else:
value = {'error': error} if error else result
print(json.dumps(value, indent=4, sort_keys=True))
def main(): def main():
@ -68,19 +83,17 @@ def main():
help='params to send') help='params to send')
args = parser.parse_args() args = parser.parse_args()
if args.port is None: port = args.port
args.port = int(environ.get('RPC_PORT', 8000)) if port is None:
port = int(environ.get('RPC_PORT', 8000))
loop = asyncio.get_event_loop() # Get the RPC request.
coro = loop.create_connection(RPCClient, 'localhost', args.port) method = args.command[0]
try: params = args.param
transport, protocol = loop.run_until_complete(coro) if method in ('log', 'disconnect'):
coro = protocol.send_and_wait(args.command[0], args.param, timeout=15) params = [params]
loop.run_until_complete(coro)
except OSError: rpc_send_and_wait(port, method, params)
print('error connecting - is ElectrumX catching up or not running?')
finally:
loop.close()
if __name__ == '__main__': if __name__ == '__main__':

4
lib/coins.py

@ -318,8 +318,8 @@ class Litecoin(Coin):
P2PKH_VERBYTE = 0x30 P2PKH_VERBYTE = 0x30
P2SH_VERBYTE = 0x05 P2SH_VERBYTE = 0x05
WIF_BYTE = 0xb0 WIF_BYTE = 0xb0
GENESIS_HASH=('000000000019d6689c085ae165831e93' GENESIS_HASH=('12a765e31ffd4059bada1e25190f6e98'
'4ff763ae46a2a6c172b3f1b60a8ce26f') 'c99d9714d334efa41a195a7e7e04bfe2')
TX_COUNT = 8908766 TX_COUNT = 8908766
TX_COUNT_HEIGHT = 1105256 TX_COUNT_HEIGHT = 1105256
TX_PER_BLOCK = 10 TX_PER_BLOCK = 10

2
lib/hash.py

@ -1,4 +1,4 @@
# Copyright (c) 2016, Neil Booth # Copyright (c) 2016-2017, Neil Booth
# #
# All rights reserved. # All rights reserved.
# #

803
lib/jsonrpc.py

@ -1,20 +1,26 @@
# Copyright (c) 2016, Neil Booth # Copyright (c) 2016-2017, Neil Booth
# #
# All rights reserved. # All rights reserved.
# #
# See the file "LICENCE" for information about the copyright # See the file "LICENCE" for information about the copyright
# and warranty status of this software. # and warranty status of this software.
'''Class for handling JSON RPC 2.0 connections, server or client.''' '''Classes for acting as a peer over a transport and speaking the JSON
RPC versions 1.0 and 2.0.
JSONSessionBase can use an arbitrary transport.
JSONSession integrates asyncio.Protocol to provide the transport.
'''
import asyncio import asyncio
import collections
import inspect import inspect
import json import json
import numbers import numbers
import time import time
import traceback import traceback
from lib.util import LoggedClass import lib.util as util
class RPCError(Exception): class RPCError(Exception):
@ -25,298 +31,310 @@ class RPCError(Exception):
self.code = code self.code = code
class RequestBase(object): class JSONRPC(object):
'''An object that represents a queued request.''' '''Base class of JSON RPC versions.'''
def __init__(self, remaining): # See http://www.jsonrpc.org/specification
self.remaining = remaining PARSE_ERROR = -32700
INVALID_REQUEST = -32600
METHOD_NOT_FOUND = -32601
INVALID_ARGS = -32602
INTERNAL_ERROR = -32603
ID_TYPES = (type(None), str, numbers.Number)
HAS_BATCHES = False
class SingleRequest(RequestBase):
'''An object that represents a single request.'''
def __init__(self, payload): class JSONRPCv1(JSONRPC):
super().__init__(1) '''JSON RPC version 1.0.'''
self.payload = payload
async def process(self, session): @classmethod
'''Asynchronously handle the JSON request.''' def request_payload(cls, id_, method, params=None):
self.remaining = 0 '''JSON v1 request payload. Params is mandatory.'''
binary = await session.process_single_payload(self.payload) return {'method': method, 'params': params or [], 'id': id_}
if binary:
session._send_bytes(binary)
def __str__(self): @classmethod
return str(self.payload) def notification_payload(cls, method, params=None):
'''JSON v1 notification payload. Params and id are mandatory.'''
return {'method': method, 'params': params or [], 'id': None}
@classmethod
def response_payload(cls, result, id_):
'''JSON v1 response payload. error is present and None.'''
return {'id': id_, 'result': result, 'error': None}
class BatchRequest(RequestBase): @classmethod
'''An object that represents a batch request and its processing state. def error_payload(cls, message, code, id_):
'''JSON v1 error payload. result is present and None.'''
return {'id': id_, 'result': None,
'error': {'message': message, 'code': code}}
Batches are processed in chunks. @classmethod
def handle_response(cls, handler, payload):
'''JSON v1 response handler. Both 'error' and 'response'
should exist with exactly one being None.
''' '''
try:
result = payload['result']
error = payload['error']
except KeyError:
pass
else:
if (result is None) != (error is None):
handler(result, error)
def __init__(self, payload): @classmethod
super().__init__(len(payload)) def is_request(cls, payload):
self.payload = payload '''Returns True if the payload (which has a method) is a request.
self.parts = [] False means it is a notification.'''
return payload.get('id') != None
async def process(self, session):
'''Asynchronously handle the JSON batch according to the JSON 2.0
spec.'''
target = max(self.remaining - 4, 0)
while self.remaining > target:
item = self.payload[len(self.payload) - self.remaining]
self.remaining -= 1
part = await session.process_single_payload(item)
if part:
self.parts.append(part)
total_len = sum(len(part) + 2 for part in self.parts)
if session.is_oversized_request(total_len):
raise RPCError('request too large', JSONRPC.INVALID_REQUEST)
if not self.remaining:
if self.parts:
binary = b'[' + b', '.join(self.parts) + b']'
session._send_bytes(binary)
def __str__(self):
return str(self.payload)
class JSONRPC(asyncio.Protocol, LoggedClass):
'''Manages a JSONRPC connection.
Assumes JSON messages are newline-separated and that newlines
cannot appear in the JSON other than to separate lines. Incoming
requests are passed to enqueue_request(), which should arrange for
their asynchronous handling via the request's process() method.
Derived classes may want to override connection_made() and
connection_lost() but should be sure to call the implementation in
this base class first. They may also want to implement the asynchronous
function handle_response() which by default does nothing.
The functions request_handler() and notification_handler() are
passed an RPC method name, and should return an asynchronous
function to call to handle it. The functions' docstrings are used
for help, and the arguments are what can be used as JSONRPC 2.0
named arguments (and thus become part of the external interface).
If the method is unknown return None.
Request handlers should return a Python object to return to the
caller, or raise an RPCError on error. Notification handlers
should not return a value or raise any exceptions.
'''
# See http://www.jsonrpc.org/specification class JSONRPCv2(JSONRPC):
PARSE_ERROR = -32700 '''JSON RPC version 2.0.'''
INVALID_REQUEST = -32600
METHOD_NOT_FOUND = -32601
INVALID_ARGS = -32602
INTERNAL_ERROR = -32603
ID_TYPES = (type(None), str, numbers.Number) HAS_BATCHES = True
NEXT_SESSION_ID = 0
@classmethod
def request_payload(cls, id_, method, params=None):
'''JSON v2 request payload. Params is optional.'''
payload = {'jsonrpc': '2.0', 'method': method, 'id': id_}
if params:
payload['params'] = params
return payload
@classmethod @classmethod
def request_payload(cls, method, id_, params=None): def notification_payload(cls, method, params=None):
payload = {'jsonrpc': '2.0', 'id': id_, 'method': method} '''JSON v2 notification payload. There must be no id.'''
payload = {'jsonrpc': '2.0', 'method': method}
if params: if params:
payload['params'] = params payload['params'] = params
return payload return payload
@classmethod @classmethod
def response_payload(cls, result, id_): def response_payload(cls, result, id_):
return {'jsonrpc': '2.0', 'result': result, 'id': id_} '''JSON v2 response payload. error is not present.'''
return {'jsonrpc': '2.0', 'id': id_, 'result': result}
@classmethod @classmethod
def notification_payload(cls, method, params=None): def error_payload(cls, message, code, id_):
return cls.request_payload(method, None, params) '''JSON v2 error payload. result is not present.'''
return {'jsonrpc': '2.0', 'id': id_,
'error': {'message': message, 'code': code}}
@classmethod @classmethod
def error_payload(cls, message, code, id_=None): def handle_response(cls, handler, payload):
error = {'message': message, 'code': code} '''JSON v2 response handler. Exactly one of 'error' and 'response'
return {'jsonrpc': '2.0', 'error': error, 'id': id_} must exist. Errors must have 'code' and 'message' members.
'''
if ('error' in payload) != ('result' in payload):
if 'result' in payload:
handler(payload['result'], None)
else:
error = payload['error']
if (isinstance(error, dict) and 'code' in error
and 'message' in error):
handler(None, error)
@classmethod @classmethod
def check_payload_id(cls, payload): def batch_size(cls, parts):
'''Extract and return the ID from the payload. '''Return the size of a JSON batch from its parts.'''
return sum(len(part) for part in parts) + 2 * len(parts)
Raises an RPCError if it is missing or invalid.''' @classmethod
if not 'id' in payload: def batch_bytes(cls, parts):
raise RPCError('missing id', JSONRPC.INVALID_REQUEST) '''Return the bytes of a JSON batch from its parts.'''
if parts:
return b'[' + b', '.join(parts) + b']'
return b''
@classmethod
def is_request(cls, payload):
'''Returns True if the payload (which has a method) is a request.
False means it is a notification.'''
return 'id' in payload
id_ = payload['id']
if not isinstance(id_, JSONRPC.ID_TYPES):
raise RPCError('invalid id: {}'.format(id_),
JSONRPC.INVALID_REQUEST)
return id_
class JSONRPCCompat(JSONRPC):
'''Intended to be used until receiving a response from the peer, at
which point detect_version should be used to choose which version
to use.
Sends requests compatible with v1 and v2. Errors cannot be
compatible so v2 errors are sent.
Does not send responses or notifications, nor handle responses.
'''
@classmethod @classmethod
def payload_id(cls, payload): def request_payload(cls, id_, method, params=None):
'''Extract and return the ID from the payload. '''JSON v2 request payload but with params present.'''
return {'jsonrpc': '2.0', 'id': id_,
'method': method, 'params': params or []}
Returns None if it is missing or invalid.''' @classmethod
try: def error_payload(cls, message, code, id_):
return cls.check_payload_id(payload) '''JSON v2 error payload. result is not present.'''
except RPCError: return {'jsonrpc': '2.0', 'id': id_,
'error': {'message': message, 'code': code}}
@classmethod
def detect_version(cls, payload):
'''Return a best guess at a version compatible with the received
payload.
Return None if one cannot be determined.
'''
def item_version(item):
if isinstance(item, dict):
version = item.get('jsonrpc')
if version is None:
return JSONRPCv1
if version == '2.0':
return JSONRPCv2
return None return None
def __init__(self): if isinstance(payload, list) and payload:
version = item_version(payload[0])
# If a batch return at least JSONRPCv2
if version in (JSONRPCv1, None):
version = JSONRPCv2
else:
version = item_version(payload)
return version
class JSONSessionBase(util.LoggedClass):
'''Acts as the application layer session, communicating via JSON RPC
over an underlying transport.
Processes incoming and sends outgoing requests, notifications and
responses. Incoming messages are queued. When the queue goes
from empty
'''
NEXT_SESSION_ID = 0
@classmethod
def next_session_id(cls):
session_id = cls.NEXT_SESSION_ID
cls.NEXT_SESSION_ID += 1
return session_id
def __init__(self, version=JSONRPCCompat):
super().__init__() super().__init__()
self.start = time.time()
self.stop = 0
self.last_recv = self.start
self.bandwidth_start = self.start
self.bandwidth_interval = 3600
self.bandwidth_used = 0
self.bandwidth_limit = 5000000
self.transport = None
self.pause = False
# Parts of an incomplete JSON line. We buffer them until # Parts of an incomplete JSON line. We buffer them until
# getting a newline. # getting a newline.
self.parts = [] self.parts = []
# recv_count is JSON messages not calls to data_received() self.version = version
self.recv_count = 0 self.log_me = False
self.recv_size = 0 self.session_id = None
# Count of incoming complete JSON requests and the time of the
# last one. A batch counts as just one here.
self.last_recv = time.time()
self.send_count = 0 self.send_count = 0
self.send_size = 0 self.send_size = 0
self.recv_size = 0
self.recv_count = 0
self.error_count = 0 self.error_count = 0
self.close_after_send = False self.pause = False
self.peer_info = None # Handling of incoming items
# Sends longer than max_send are prevented, instead returning self.items = collections.deque()
# an oversized request error to other end of the network self.batch_results = []
# connection. The request causing it is logged. Values under # Handling of outgoing requests
# 1000 are treated as 1000. self.next_request_id = 0
self.max_send = 0 self.pending_responses = {}
# If buffered incoming data exceeds this the connection is closed # If buffered incoming data exceeds this the connection is closed
self.max_buffer_size = 1000000 self.max_buffer_size = 1000000
self.anon_logs = False self.max_send = 50000
self.id_ = JSONRPC.NEXT_SESSION_ID self.close_after_send = False
JSONRPC.NEXT_SESSION_ID += 1
self.log_prefix = '[{:d}] '.format(self.id_)
self.log_me = False
def peername(self, *, for_log=True):
'''Return the peer name of this connection.'''
if not self.peer_info:
return 'unknown'
if for_log and self.anon_logs:
return 'xx.xx.xx.xx:xx'
if ':' in self.peer_info[0]:
return '[{}]:{}'.format(self.peer_info[0], self.peer_info[1])
else:
return '{}:{}'.format(self.peer_info[0], self.peer_info[1])
def connection_made(self, transport):
'''Handle an incoming client connection.'''
self.transport = transport
self.peer_info = transport.get_extra_info('peername')
transport.set_write_buffer_limits(high=500000)
def connection_lost(self, exc):
'''Handle client disconnection.'''
pass
def pause_writing(self): def pause_writing(self):
'''Called by asyncio when the write buffer is full.''' '''Transport calls when the send buffer is full.'''
self.log_info('pausing request processing whilst socket drains') self.log_info('pausing processing whilst socket drains')
self.pause = True self.pause = True
def resume_writing(self): def resume_writing(self):
'''Called by asyncio when the write buffer has room.''' '''Transport calls when the send buffer has room.'''
self.log_info('resuming request processing') self.log_info('resuming processing')
self.pause = False self.pause = False
def close_connection(self): def is_oversized(self, length):
self.stop = time.time() '''Return True if the given outgoing message size is too large.'''
if self.transport: if self.max_send and length > max(1000, self.max_send):
self.transport.close() msg = 'response too large (at least {:d} bytes)'.format(length)
return self.error_bytes(msg, JSONRPC.INVALID_REQUEST,
payload.get('id'))
return False
def using_bandwidth(self, amount): def send_binary(self, binary):
now = time.time() '''Pass the bytes through to the transport.
# Reduce the recorded usage in proportion to the elapsed time
elapsed = now - self.bandwidth_start
self.bandwidth_start = now
refund = int(elapsed / self.bandwidth_interval * self.bandwidth_limit)
refund = min(refund, self.bandwidth_used)
self.bandwidth_used += amount - refund
self.throttled = max(0, self.throttled - int(elapsed) // 60)
def data_received(self, data): Close the connection if close_after_send is set.
'''Handle incoming data (synchronously).
Requests end in newline characters. Pass complete requests to
decode_message for handling.
''' '''
self.recv_size += len(data) if self.is_closing():
self.using_bandwidth(len(data)) return
self.using_bandwidth(len(binary))
# Close abusive connections where buffered data exceeds limit self.send_count += 1
buffer_size = len(data) + sum(len(part) for part in self.parts) self.send_size += len(binary)
if buffer_size > self.max_buffer_size: self.send_bytes(binary)
self.log_error('read buffer of {:,d} bytes exceeds {:,d} ' if self.close_after_send:
'byte limit, closing {}'
.format(buffer_size, self.max_buffer_size,
self.peername()))
self.close_connection() self.close_connection()
# Do nothing if this connection is closing def payload_id(self, payload):
if self.transport.is_closing(): '''Extract and return the ID from the payload.
return
while True: Returns None if it is missing or invalid.'''
npos = data.find(ord('\n')) try:
if npos == -1: return self.check_payload_id(payload)
self.parts.append(data) except RPCError:
break return None
self.last_recv = time.time()
self.recv_count += 1
tail, data = data[:npos], data[npos + 1:]
parts, self.parts = self.parts, []
parts.append(tail)
self.decode_message(b''.join(parts))
def decode_message(self, message): def check_payload_id(self, payload):
'''Decode a binary message and queue it for asynchronous handling. '''Extract and return the ID from the payload.
Messages that cannot be decoded are logged and dropped. Raises an RPCError if it is missing or invalid.'''
''' if not 'id' in payload:
try: raise RPCError('missing id', JSONRPC.INVALID_REQUEST)
message = message.decode()
except UnicodeDecodeError as e:
msg = 'cannot decode binary bytes: {}'.format(e)
self.send_json_error(msg, JSONRPC.PARSE_ERROR)
return
try: id_ = payload['id']
message = json.loads(message) if not isinstance(id_, self.version.ID_TYPES):
except json.JSONDecodeError as e: raise RPCError('invalid id type {}'.format(type(id_)),
msg = 'cannot decode JSON: {}'.format(e) JSONRPC.INVALID_REQUEST)
self.send_json_error(msg, JSONRPC.PARSE_ERROR) return id_
return
if isinstance(message, list): def request_bytes(self, id_, method, params=None):
# Batches must have at least one object. '''Return the bytes of a JSON request.'''
if not message: payload = self.version.request_payload(id_, method, params)
self.send_json_error('empty batch', JSONRPC.INVALID_REQUEST) return self.encode_payload(payload)
return
request = BatchRequest(message)
else:
request = SingleRequest(message)
'''Queue the request for asynchronous handling.''' def notification_bytes(self, method, params=None):
self.enqueue_request(request) payload = self.version.notification_payload(method, params)
if self.log_me: return self.encode_payload(payload)
self.log_info('queued {}'.format(message))
def send_json_error(self, message, code, id_=None): def response_bytes(self, result, id_):
'''Send a JSON error.''' '''Return the bytes of a JSON response.'''
self._send_bytes(self.json_error_bytes(message, code, id_)) return self.encode_payload(self.version.response_payload(result, id_))
def error_bytes(self, message, code, id_=None):
'''Return the bytes of a JSON error.
Flag the connection to close on a fatal error or too many errors.'''
version = self.version
self.error_count += 1
if code in (version.PARSE_ERROR, version.INVALID_REQUEST):
self.log_info(message)
self.close_after_send = True
elif self.error_count >= 10:
self.log_info('too many errors, last: {}'.format(message))
self.close_after_send = True
return self.encode_payload(self.version.error_payload
(message, code, id_))
def encode_payload(self, payload): def encode_payload(self, payload):
'''Encode a Python object as binary bytes.'''
assert isinstance(payload, dict) assert isinstance(payload, dict)
try: try:
@ -324,88 +342,116 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
except TypeError: except TypeError:
msg = 'JSON encoding failure: {}'.format(payload) msg = 'JSON encoding failure: {}'.format(payload)
self.log_error(msg) self.log_error(msg)
binary = self.json_error_bytes(msg, JSONRPC.INTERNAL_ERROR, binary = self.error_bytes(msg, JSONRPC.INTERNAL_ERROR,
payload.get('id')) payload.get('id'))
if self.is_oversized_request(len(binary)): error_bytes = self.is_oversized(len(binary))
binary = self.json_error_bytes('request too large', return error_bytes or binary
JSONRPC.INVALID_REQUEST,
payload.get('id'))
self.send_count += 1
self.send_size += len(binary)
self.using_bandwidth(len(binary))
return binary
def is_oversized_request(self, total_len): def decode_message(self, payload):
return total_len > max(1000, self.max_send) '''Decode a binary message and pass it on to process_single_item or
process_batch as appropriate.
def _send_bytes(self, binary): Messages that cannot be decoded are logged and dropped.
'''Send JSON text over the transport. Close it if close is True.''' '''
# Confirmed this happens, sometimes a lot try:
if self.transport.is_closing(): payload = payload.decode()
except UnicodeDecodeError as e:
msg = 'cannot decode message: {}'.format(e)
self.send_error(msg, JSONRPC.PARSE_ERROR)
return return
self.transport.write(binary)
self.transport.write(b'\n')
if self.close_after_send:
self.close_connection()
def encode_and_send_payload(self, payload): try:
'''Encode the payload and send it.''' payload = json.loads(payload)
self._send_bytes(self.encode_payload(payload)) except json.JSONDecodeError as e:
msg = 'cannot decode JSON: {}'.format(e)
self.send_error(msg, JSONRPC.PARSE_ERROR)
return
def json_notification_bytes(self, method, params): if self.version is JSONRPCCompat:
'''Return the bytes of a json notification.''' # Attempt to detect peer's JSON RPC version
return self.encode_payload(self.notification_payload(method, params)) version = self.version.detect_version(payload)
if not version:
version = JSONRPCv2
self.log_info('unable to detect JSON RPC version, using 2.0')
self.version = version
def json_request_bytes(self, method, id_, params=None): # Batches must have at least one object.
'''Return the bytes of a JSON request.''' if payload == [] and self.version.HAS_BATCHES:
return self.encode_payload(self.request_payload(method, id_, params)) self.send_error('empty batch', JSONRPC.INVALID_REQUEST)
return
def json_response_bytes(self, result, id_): # Incoming items get queued for later asynchronous processing.
'''Return the bytes of a JSON response.''' if not self.items:
return self.encode_payload(self.response_payload(result, id_)) self.have_pending_items()
self.items.append(payload)
def json_error_bytes(self, message, code, id_=None): async def process_batch(self, batch, count):
'''Return the bytes of a JSON error. '''Processes count items from the batch according to the JSON 2.0
spec.
Flag the connection to close on a fatal error or too many errors.''' If any remain, puts what is left of the batch back in the deque
self.error_count += 1 and returns None. Otherwise returns the binary batch result.'''
if (code in (JSONRPC.PARSE_ERROR, JSONRPC.INVALID_REQUEST) results = self.batch_results
or self.error_count > 10): self.batch_results = []
self.close_after_send = True
return self.encode_payload(self.error_payload(message, code, id_)) for n in range(count):
item = batch.pop()
result = await self.process_single_item(item)
if result:
results.append(result)
if not batch:
return self.version.batch_bytes(results)
error_bytes = self.is_oversized(self.batch_size(results))
if error_bytes:
return error_bytes
async def process_single_payload(self, payload): self.items.appendleft(item)
self.batch_results = results
return None
async def process_single_item(self, payload):
'''Handle a single JSON request, notification or response. '''Handle a single JSON request, notification or response.
If it is a request, return the binary response, oterhwise None.''' If it is a request, return the binary response, oterhwise None.'''
if self.log_me:
self.log_info('processing {}'.format(payload))
if not isinstance(payload, dict): if not isinstance(payload, dict):
return self.json_error_bytes('request must be a dict', return self.error_bytes('request must be a dictionary',
JSONRPC.INVALID_REQUEST) JSONRPC.INVALID_REQUEST)
try:
# Requests and notifications must have a method. # Requests and notifications must have a method.
# Notifications are distinguished by having no 'id'.
if 'method' in payload: if 'method' in payload:
if 'id' in payload: if self.version.is_request(payload):
return await self.process_single_request(payload) return await self.process_single_request(payload)
else: else:
await self.process_single_notification(payload) await self.process_single_notification(payload)
else: else:
await self.process_single_response(payload) self.process_single_response(payload)
return None return None
except asyncio.CancelledError:
raise
except Exception:
self.log_error(traceback.format_exc())
return self.error_bytes('internal error processing request',
JSONRPC.INTERNAL_ERROR,
self.payload_id(payload))
async def process_single_request(self, payload): async def process_single_request(self, payload):
'''Handle a single JSON request and return the binary response.''' '''Handle a single JSON request and return the binary response.'''
try: try:
result = await self.handle_payload(payload, self.request_handler) result = await self.handle_payload(payload, self.request_handler)
return self.json_response_bytes(result, payload['id']) return self.response_bytes(result, payload['id'])
except RPCError as e: except RPCError as e:
return self.json_error_bytes(e.msg, e.code, return self.error_bytes(e.msg, e.code, self.payload_id(payload))
self.payload_id(payload))
except Exception: except Exception:
self.log_error(traceback.format_exc()) self.log_error(traceback.format_exc())
return self.json_error_bytes('internal error processing request', return self.error_bytes('internal error processing request',
JSONRPC.INTERNAL_ERROR, JSONRPC.INTERNAL_ERROR,
self.payload_id(payload)) self.payload_id(payload))
@ -418,18 +464,16 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
except Exception: except Exception:
self.log_error(traceback.format_exc()) self.log_error(traceback.format_exc())
async def process_single_response(self, payload): def process_single_response(self, payload):
'''Handle a single JSON response.''' '''Handle a single JSON response.'''
try: try:
id_ = self.check_payload_id(payload) id_ = self.check_payload_id(payload)
# Only one of result and error should exist handler = self.pending_responses.pop(id_, None)
if 'error' in payload: if handler:
error = payload['error'] self.version.handle_response(handler, payload)
if (not 'result' in payload and isinstance(error, dict) else:
and 'code' in error and 'message' in error): self.log_info('response for unsent id {}'.format(id_),
await self.handle_response(None, error, id_) throttle=True)
elif 'result' in payload:
await self.handle_response(payload['result'], None, id_)
except RPCError: except RPCError:
pass pass
except Exception: except Exception:
@ -442,7 +486,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
method = payload.get('method') method = payload.get('method')
if not isinstance(method, str): if not isinstance(method, str):
raise RPCError("invalid method: '{}'".format(method), raise RPCError("invalid method type {}".format(type(method)),
JSONRPC.INVALID_REQUEST) JSONRPC.INVALID_REQUEST)
handler = get_handler(method) handler = get_handler(method)
@ -451,7 +495,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
JSONRPC.METHOD_NOT_FOUND) JSONRPC.METHOD_NOT_FOUND)
if not isinstance(args, (list, dict)): if not isinstance(args, (list, dict)):
raise RPCError('arguments should be an array or a dict', raise RPCError('arguments should be an array or dictionary',
JSONRPC.INVALID_REQUEST) JSONRPC.INVALID_REQUEST)
params = inspect.signature(handler).parameters params = inspect.signature(handler).parameters
@ -459,12 +503,13 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
min_args = sum(p.default is p.empty for p in params.values()) min_args = sum(p.default is p.empty for p in params.values())
if len(args) < min_args: if len(args) < min_args:
raise RPCError('too few arguments: expected {:d} got {:d}' raise RPCError('too few arguments to {}: expected {:d} got {:d}'
.format(min_args, len(args)), JSONRPC.INVALID_ARGS) .format(method, min_args, len(args)),
JSONRPC.INVALID_ARGS)
if len(args) > len(params): if len(args) > len(params):
raise RPCError('too many arguments: expected {:d} got {:d}' raise RPCError('too many arguments to {}: expected {:d} got {:d}'
.format(len(params), len(args)), .format(method, len(params), len(args)),
JSONRPC.INVALID_ARGS) JSONRPC.INVALID_ARGS)
if isinstance(args, list): if isinstance(args, list):
@ -477,23 +522,179 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
raise RPCError('invalid parameter names: {}' raise RPCError('invalid parameter names: {}'
.format(', '.join(bad_names))) .format(', '.join(bad_names)))
if inspect.iscoroutinefunction(handler):
return await handler(**kw_args) return await handler(**kw_args)
else:
return handler(**kw_args)
# ---- External Interface ----
async def process_pending_items(self, limit=8):
'''Processes up to LIMIT pending items asynchronously.'''
while limit > 0 and self.items:
item = self.items.popleft()
if isinstance(item, list) and self.version.HAS_BATCHES:
count = min(limit, len(item))
binary = await self.process_batch(item, count)
limit -= count
else:
binary = await self.process_single_item(item)
limit -= 1
if binary:
self.send_binary(binary)
def count_pending_items(self):
'''Counts the number of pending items.'''
return sum(len(item) if isinstance(item, list) else 1
for item in self.items)
def connection_made(self):
'''Call when an incoming client connection is established.'''
self.session_id = self.next_session_id()
self.log_prefix = '[{:d}] '.format(self.session_id)
def data_received(self, data):
'''Underlying transport calls this when new data comes in.
Look for newline separators terminating full requests.
'''
if self.is_closing():
return
self.using_bandwidth(len(data))
self.recv_size += len(data)
# Close abusive connections where buffered data exceeds limit
buffer_size = len(data) + sum(len(part) for part in self.parts)
if buffer_size > self.max_buffer_size:
self.log_error('read buffer of {:,d} bytes over {:,d} byte limit'
.format(buffer_size, self.max_buffer_size))
self.close_connection()
return
while True:
npos = data.find(ord('\n'))
if npos == -1:
self.parts.append(data)
break
tail, data = data[:npos], data[npos + 1:]
parts, self.parts = self.parts, []
parts.append(tail)
self.recv_count += 1
self.last_recv = time.time()
self.decode_message(b''.join(parts))
def send_error(self, message, code, id_=None):
'''Send a JSON error.'''
self.send_binary(self.error_bytes(message, code, id_))
def send_request(self, handler, method, params=None):
'''Sends a request and arranges for handler to be called with the
response when it comes in.
'''
id_ = self.next_request_id
self.next_request_id += 1
self.send_binary(self.request_bytes(id_, method, params))
self.pending_responses[id_] = handler
def send_notification(self, method, params=None):
'''Send a notification.'''
self.send_binary(self.notification_bytes(method, params))
def send_notifications(self, mp_iterable):
'''Send an iterable of (method, params) notification pairs.
A 1-tuple is also valid in which case there are no params.'''
if self.version.HAS_BATCHES:
parts = [self.notification_bytes(*pair) for pair in mp_iterable]
self.send_binary(batch_bytes(parts))
else:
for pair in mp_iterable:
self.send_notification(*pair)
# -- derived classes are intended to override these functions
# --- derived classes are intended to override these functions # Transport layer
def enqueue_request(self, request):
'''Enqueue a request for later asynchronous processing.''' def is_closing(self):
'''Return True if the underlying transport is closing.'''
raise NotImplementedError raise NotImplementedError
async def handle_response(self, result, error, id_): def close_connection(self):
'''Handle a JSON response. '''Close the connection.'''
raise NotImplementedError
def send_bytes(self, binary):
'''Pass the bytes through to the underlying transport.'''
raise NotImplementedError
Should not raise an exception. Return values are ignored. # App layer
def have_pending_items(self):
'''Called to indicate there are items pending to be processed
asynchronously by calling process_pending_items.
This is *not* called every time an item is added, just when
there were previously none and now there is at least one.
''' '''
raise NotImplementedError
def using_bandwidth(self, amount):
'''Called as bandwidth is consumed.
Override to implement bandwidth management.
'''
pass
def notification_handler(self, method): def notification_handler(self, method):
'''Return the async handler for the given notification method.''' '''Return the handler for the given notification.
The handler can be synchronous or asynchronous.'''
return None return None
def request_handler(self, method): def request_handler(self, method):
'''Return the async handler for the given request method.''' '''Return the handler for the given request method.
The handler can be synchronous or asynchronous.'''
return None return None
class JSONSession(JSONSessionBase, asyncio.Protocol):
'''A JSONSessionBase instance specialized for use with
asyncio.protocol to implement the transport layer.
Derived classes must provide have_pending_items() and may want to
override the request and notification handlers.
'''
def __init__(self, version=JSONRPCCompat):
super().__init__(version=version)
self.transport = None
self.write_buffer_high = 500000
def peer_info(self):
'''Returns information about the peer.'''
return self.transport.get_extra_info('peername')
def abort(self):
'''Cut the connection abruptly.'''
self.transport.abort()
def connection_made(self, transport):
'''Handle an incoming client connection.'''
transport.set_write_buffer_limits(high=self.write_buffer_high)
self.transport = transport
super().connection_made()
def is_closing(self):
'''True if the underlying transport is closing.'''
return self.transport and self.transport.is_closing()
def close_connection(self):
'''Close the connection.'''
if self.transport:
self.transport.close()
def send_bytes(self, binary):
'''Send JSON text over the transport.'''
self.transport.writelines((binary, b'\n'))

2
lib/script.py

@ -1,4 +1,4 @@
# Copyright (c) 2016, Neil Booth # Copyright (c) 2016-2017, Neil Booth
# #
# All rights reserved. # All rights reserved.
# #

2
lib/tx.py

@ -1,4 +1,4 @@
# Copyright (c) 2016, Neil Booth # Copyright (c) 2016-2017, Neil Booth
# #
# All rights reserved. # All rights reserved.
# #

2
lib/util.py

@ -1,4 +1,4 @@
# Copyright (c) 2016, Neil Booth # Copyright (c) 2016-2017, Neil Booth
# #
# All rights reserved. # All rights reserved.
# #

7
server/block_processor.py

@ -196,11 +196,14 @@ class BlockProcessor(server.db.DB):
task = await self.task_queue.get() task = await self.task_queue.get()
await task() await task()
def shutdown(self): def shutdown(self, executor):
'''Shutdown cleanly and flush to disk.'''
# First stut down the executor; it may be processing a block.
# Then we can flush anything remaining to disk.
executor.shutdown()
if self.height != self.db_height: if self.height != self.db_height:
self.logger.info('flushing state to DB for a clean shutdown...') self.logger.info('flushing state to DB for a clean shutdown...')
self.flush(True) self.flush(True)
self.logger.info('shutdown complete')
async def executor(self, func, *args, **kwargs): async def executor(self, func, *args, **kwargs):
'''Run func taking args in the executor.''' '''Run func taking args in the executor.'''

189
server/controller.py

@ -18,14 +18,14 @@ from functools import partial
import pylru import pylru
from lib.jsonrpc import JSONRPC, RPCError, RequestBase from lib.jsonrpc import JSONRPC, RPCError
from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
import lib.util as util import lib.util as util
from server.block_processor import BlockProcessor from server.block_processor import BlockProcessor
from server.daemon import Daemon, DaemonError from server.daemon import Daemon, DaemonError
from server.irc import IRC
from server.session import LocalRPC, ElectrumX
from server.mempool import MemPool from server.mempool import MemPool
from server.peers import PeerManager
from server.session import LocalRPC, ElectrumX
from server.version import VERSION from server.version import VERSION
@ -39,16 +39,6 @@ class Controller(util.LoggedClass):
BANDS = 5 BANDS = 5
CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4) CATCHING_UP, LISTENING, PAUSED, SHUTTING_DOWN = range(4)
class NotificationRequest(RequestBase):
def __init__(self, height, touched):
super().__init__(1)
self.height = height
self.touched = touched
async def process(self, session):
self.remaining = 0
await session.notify(self.height, self.touched)
def __init__(self, env): def __init__(self, env):
super().__init__() super().__init__()
# Set this event to cleanly shutdown # Set this event to cleanly shutdown
@ -56,12 +46,12 @@ class Controller(util.LoggedClass):
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
self.executor = ThreadPoolExecutor() self.executor = ThreadPoolExecutor()
self.loop.set_default_executor(self.executor) self.loop.set_default_executor(self.executor)
self.start = time.time() self.start_time = time.time()
self.coin = env.coin self.coin = env.coin
self.daemon = Daemon(env.coin.daemon_urls(env.daemon_url)) self.daemon = Daemon(env.coin.daemon_urls(env.daemon_url))
self.bp = BlockProcessor(env, self.daemon) self.bp = BlockProcessor(env, self.daemon)
self.mempool = MemPool(self.bp) self.mempool = MemPool(self.bp)
self.irc = IRC(env) self.peers = PeerManager(env)
self.env = env self.env = env
self.servers = {} self.servers = {}
# Map of session to the key of its list in self.groups # Map of session to the key of its list in self.groups
@ -73,7 +63,8 @@ class Controller(util.LoggedClass):
self.max_sessions = env.max_sessions self.max_sessions = env.max_sessions
self.low_watermark = self.max_sessions * 19 // 20 self.low_watermark = self.max_sessions * 19 // 20
self.max_subs = env.max_subs self.max_subs = env.max_subs
self.subscription_count = 0 # Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0
self.next_stale_check = 0 self.next_stale_check = 0
self.history_cache = pylru.lrucache(256) self.history_cache = pylru.lrucache(256)
self.header_cache = pylru.lrucache(8) self.header_cache = pylru.lrucache(8)
@ -95,12 +86,14 @@ class Controller(util.LoggedClass):
'block.get_header block.get_chunk estimatefee relayfee ' 'block.get_header block.get_chunk estimatefee relayfee '
'transaction.get transaction.get_merkle utxo.get_address'), 'transaction.get transaction.get_merkle utxo.get_address'),
('server', ('server',
'banner donation_address peers.subscribe version'), 'banner donation_address'),
] ]
self.electrumx_handlers = {'.'.join([prefix, suffix]): handlers = {'.'.join([prefix, suffix]):
getattr(self, suffix.replace('.', '_')) getattr(self, suffix.replace('.', '_'))
for prefix, suffixes in rpcs for prefix, suffixes in rpcs
for suffix in suffixes.split()} for suffix in suffixes.split()}
handlers['server.peers.subscribe'] = self.peers.subscribe
self.electrumx_handlers = handlers
async def mempool_transactions(self, hashX): async def mempool_transactions(self, hashX):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
@ -138,9 +131,9 @@ class Controller(util.LoggedClass):
if isinstance(session, LocalRPC): if isinstance(session, LocalRPC):
return 0 return 0
gid = self.sessions[session] gid = self.sessions[session]
group_bandwidth = sum(s.bandwidth_used for s in self.groups[gid]) group_bw = sum(session.bw_used for session in self.groups[gid])
return 1 + (bisect_left(self.bands, session.bandwidth_used) return 1 + (bisect_left(self.bands, session.bw_used)
+ bisect_left(self.bands, group_bandwidth)) // 2 + bisect_left(self.bands, group_bw)) // 2
def is_deprioritized(self, session): def is_deprioritized(self, session):
return self.session_priority(session) > self.BANDS return self.session_priority(session) > self.BANDS
@ -163,6 +156,15 @@ class Controller(util.LoggedClass):
and self.state == self.PAUSED): and self.state == self.PAUSED):
await self.start_external_servers() await self.start_external_servers()
# Periodically log sessions
if self.env.log_sessions and time.time() > self.next_log_sessions:
if self.next_log_sessions:
data = self.session_data(for_log=True)
for line in Controller.sessions_text_lines(data):
self.logger.info(line)
self.logger.info(json.dumps(self.server_summary()))
self.next_log_sessions = time.time() + self.env.log_sessions
await asyncio.sleep(1) await asyncio.sleep(1)
def enqueue_session(self, session): def enqueue_session(self, session):
@ -192,7 +194,10 @@ class Controller(util.LoggedClass):
while True: while True:
priority_, id_, session = await self.queue.get() priority_, id_, session = await self.queue.get()
if session in self.sessions: if session in self.sessions:
await session.serve_requests() await session.process_pending_items()
# Re-enqueue the session if stuff is left
if session.items:
self.enqueue_session(session)
def initiate_shutdown(self): def initiate_shutdown(self):
'''Call this function to start the shutdown process.''' '''Call this function to start the shutdown process.'''
@ -206,11 +211,11 @@ class Controller(util.LoggedClass):
async def await_bp_catchup(): async def await_bp_catchup():
'''Wait for the block processor to catch up. '''Wait for the block processor to catch up.
When it has, start the servers and connect to IRC. Then start the servers and the peer manager.
''' '''
await self.bp.caught_up_event.wait() await self.bp.caught_up_event.wait()
self.logger.info('block processor has caught up') self.logger.info('block processor has caught up')
add_future(self.irc.start()) add_future(self.peers.main_loop())
add_future(self.start_servers()) add_future(self.start_servers())
add_future(self.mempool.main_loop()) add_future(self.mempool.main_loop())
add_future(self.enqueue_delayed_sessions()) add_future(self.enqueue_delayed_sessions())
@ -225,7 +230,13 @@ class Controller(util.LoggedClass):
# Perform a clean shutdown when this event is signalled. # Perform a clean shutdown when this event is signalled.
await self.shutdown_event.wait() await self.shutdown_event.wait()
self.logger.info('shutting down gracefully')
self.logger.info('shutting down')
await self.shutdown(futures)
self.logger.info('shutdown complete')
async def shutdown(self, futures):
'''Perform the shutdown sequence.'''
self.state = self.SHUTTING_DOWN self.state = self.SHUTTING_DOWN
# Close servers and sessions # Close servers and sessions
@ -237,11 +248,12 @@ class Controller(util.LoggedClass):
for future in futures: for future in futures:
future.cancel() future.cancel()
await asyncio.wait(futures) # Wait for all futures to finish
while any(not future.done() for future in futures):
await asyncio.sleep(1)
# Wait for the executor to finish anything it's doing # Finally shut down the block processor and executor
self.executor.shutdown() self.bp.shutdown(self.executor)
self.bp.shutdown()
def close_servers(self, kinds): def close_servers(self, kinds):
'''Close the servers of the given kinds (TCP etc.).''' '''Close the servers of the given kinds (TCP etc.).'''
@ -253,22 +265,10 @@ class Controller(util.LoggedClass):
if server: if server:
server.close() server.close()
async def wait_for_sessions(self, secs=30):
if not self.sessions:
return
self.logger.info('waiting up to {:d} seconds for socket cleanup'
.format(secs))
limit = time.time() + secs
while self.sessions and time.time() < limit:
self.clear_stale_sessions(grace=secs//2)
await asyncio.sleep(2)
self.logger.info('{:,d} sessions remaining'
.format(len(self.sessions)))
async def start_server(self, kind, *args, **kw_args): async def start_server(self, kind, *args, **kw_args):
protocol_class = LocalRPC if kind == 'RPC' else ElectrumX protocol_class = LocalRPC if kind == 'RPC' else ElectrumX
protocol = partial(protocol_class, self, self.bp, self.env, kind) protocol_factory = partial(protocol_class, self, kind)
server = self.loop.create_server(protocol, *args, **kw_args) server = self.loop.create_server(protocol_factory, *args, **kw_args)
host, port = args[:2] host, port = args[:2]
try: try:
@ -331,17 +331,7 @@ class Controller(util.LoggedClass):
for session in self.sessions: for session in self.sessions:
if isinstance(session, ElectrumX): if isinstance(session, ElectrumX):
request = self.NotificationRequest(self.bp.db_height, await session.notify(self.bp.db_height, touched)
touched)
session.enqueue_request(request)
# Periodically log sessions
if self.env.log_sessions and time.time() > self.next_log_sessions:
if self.next_log_sessions:
data = self.session_data(for_log=True)
for line in Controller.sessions_text_lines(data):
self.logger.info(line)
self.logger.info(json.dumps(self.server_summary()))
self.next_log_sessions = time.time() + self.env.log_sessions
def electrum_header(self, height): def electrum_header(self, height):
'''Return the binary header at the given height.''' '''Return the binary header at the given height.'''
@ -359,7 +349,7 @@ class Controller(util.LoggedClass):
if now > self.next_stale_check: if now > self.next_stale_check:
self.next_stale_check = now + 300 self.next_stale_check = now + 300
self.clear_stale_sessions() self.clear_stale_sessions()
gid = int(session.start - self.start) // 900 gid = int(session.start_time - self.start_time) // 900
self.groups[gid].append(session) self.groups[gid].append(session)
self.sessions[session] = gid self.sessions[session] = gid
session.log_info('{} {}, {:,d} total' session.log_info('{} {}, {:,d} total'
@ -379,17 +369,16 @@ class Controller(util.LoggedClass):
gid = self.sessions.pop(session) gid = self.sessions.pop(session)
assert gid in self.groups assert gid in self.groups
self.groups[gid].remove(session) self.groups[gid].remove(session)
self.subscription_count -= session.sub_count()
def close_session(self, session): def close_session(self, session):
'''Close the session's transport and cancel its future.''' '''Close the session's transport and cancel its future.'''
session.close_connection() session.close_connection()
return 'disconnected {:d}'.format(session.id_) return 'disconnected {:d}'.format(session.session_id)
def toggle_logging(self, session): def toggle_logging(self, session):
'''Toggle logging of the session.''' '''Toggle logging of the session.'''
session.log_me = not session.log_me session.log_me = not session.log_me
return 'log {:d}: {}'.format(session.id_, session.log_me) return 'log {:d}: {}'.format(session.session_id, session.log_me)
def clear_stale_sessions(self, grace=15): def clear_stale_sessions(self, grace=15):
'''Cut off sessions that haven't done anything for 10 minutes. Force '''Cut off sessions that haven't done anything for 10 minutes. Force
@ -403,17 +392,17 @@ class Controller(util.LoggedClass):
stale = [] stale = []
for session in self.sessions: for session in self.sessions:
if session.is_closing(): if session.is_closing():
if session.stop <= shutdown_cutoff: if session.close_time <= shutdown_cutoff:
session.transport.abort() session.abort()
elif session.last_recv < stale_cutoff: elif session.last_recv < stale_cutoff:
self.close_session(session) self.close_session(session)
stale.append(session.id_) stale.append(session.session_id)
if stale: if stale:
self.logger.info('closing stale connections {}'.format(stale)) self.logger.info('closing stale connections {}'.format(stale))
# Consolidate small groups # Consolidate small groups
gids = [gid for gid, l in self.groups.items() if len(l) <= 4 gids = [gid for gid, l in self.groups.items() if len(l) <= 4
and sum(session.bandwidth_used for session in l) < 10000] and sum(session.bw_used for session in l) < 10000]
if len(gids) > 1: if len(gids) > 1:
sessions = sum([self.groups[gid] for gid in gids], []) sessions = sum([self.groups[gid] for gid in gids], [])
new_gid = max(gids) new_gid = max(gids)
@ -438,19 +427,15 @@ class Controller(util.LoggedClass):
'logged': len([s for s in self.sessions if s.log_me]), 'logged': len([s for s in self.sessions if s.log_me]),
'paused': sum(s.pause for s in self.sessions), 'paused': sum(s.pause for s in self.sessions),
'pid': os.getpid(), 'pid': os.getpid(),
'peers': len(self.irc.peers), 'peers': self.peers.count(),
'requests': sum(s.requests_remaining() for s in self.sessions), 'requests': sum(s.count_pending_items() for s in self.sessions),
'sessions': self.session_count(), 'sessions': self.session_count(),
'subs': self.subscription_count, 'subs': self.sub_count(),
'txs_sent': self.txs_sent, 'txs_sent': self.txs_sent,
} }
@staticmethod def sub_count(self):
def text_lines(method, data): return sum(s.sub_count() for s in self.sessions)
if method == 'sessions':
return Controller.sessions_text_lines(data)
else:
return Controller.groups_text_lines(data)
@staticmethod @staticmethod
def groups_text_lines(data): def groups_text_lines(data):
@ -482,8 +467,8 @@ class Controller(util.LoggedClass):
sessions = self.groups[gid] sessions = self.groups[gid]
result.append([gid, result.append([gid,
len(sessions), len(sessions),
sum(s.bandwidth_used for s in sessions), sum(s.bw_used for s in sessions),
sum(s.requests_remaining() for s in sessions), sum(s.count_pending_items() for s in sessions),
sum(s.txs_sent for s in sessions), sum(s.txs_sent for s in sessions),
sum(s.sub_count() for s in sessions), sum(s.sub_count() for s in sessions),
sum(s.recv_count for s in sessions), sum(s.recv_count for s in sessions),
@ -523,17 +508,17 @@ class Controller(util.LoggedClass):
def session_data(self, for_log): def session_data(self, for_log):
'''Returned to the RPC 'sessions' call.''' '''Returned to the RPC 'sessions' call.'''
now = time.time() now = time.time()
sessions = sorted(self.sessions, key=lambda s: s.start) sessions = sorted(self.sessions, key=lambda s: s.start_time)
return [(session.id_, return [(session.session_id,
session.flags(), session.flags(),
session.peername(for_log=for_log), session.peername(for_log=for_log),
session.client, session.client,
session.requests_remaining(), session.count_pending_items(),
session.txs_sent, session.txs_sent,
session.sub_count(), session.sub_count(),
session.recv_count, session.recv_size, session.recv_count, session.recv_size,
session.send_count, session.send_size, session.send_count, session.send_size,
now - session.start) now - session.start_time)
for session in sessions] for session in sessions]
def lookup_session(self, session_id): def lookup_session(self, session_id):
@ -543,7 +528,7 @@ class Controller(util.LoggedClass):
pass pass
else: else:
for session in self.sessions: for session in self.sessions:
if session.id_ == session_id: if session.session_id == session_id:
return session return session
return None return None
@ -562,42 +547,42 @@ class Controller(util.LoggedClass):
# Local RPC command handlers # Local RPC command handlers
async def rpc_disconnect(self, session_ids): def rpc_disconnect(self, session_ids):
'''Disconnect sesssions. '''Disconnect sesssions.
session_ids: array of session IDs session_ids: array of session IDs
''' '''
return self.for_each_session(session_ids, self.close_session) return self.for_each_session(session_ids, self.close_session)
async def rpc_log(self, session_ids): def rpc_log(self, session_ids):
'''Toggle logging of sesssions. '''Toggle logging of sesssions.
session_ids: array of session IDs session_ids: array of session IDs
''' '''
return self.for_each_session(session_ids, self.toggle_logging) return self.for_each_session(session_ids, self.toggle_logging)
async def rpc_stop(self): def rpc_stop(self):
'''Shut down the server cleanly.''' '''Shut down the server cleanly.'''
self.initiate_shutdown() self.initiate_shutdown()
return 'stopping' return 'stopping'
async def rpc_getinfo(self): def rpc_getinfo(self):
'''Return summary information about the server process.''' '''Return summary information about the server process.'''
return self.server_summary() return self.server_summary()
async def rpc_groups(self): def rpc_groups(self):
'''Return statistics about the session groups.''' '''Return statistics about the session groups.'''
return self.group_data() return self.group_data()
async def rpc_sessions(self): def rpc_sessions(self):
'''Return statistics about connected sessions.''' '''Return statistics about connected sessions.'''
return self.session_data(for_log=False) return self.session_data(for_log=False)
async def rpc_peers(self): def rpc_peers(self):
'''Return a list of server peers, currently taken from IRC.''' '''Return a list of server peers, currently taken from IRC.'''
return self.irc.peers return self.peers.peer_list()
async def rpc_reorg(self, count=3): def rpc_reorg(self, count=3):
'''Force a reorg of the given number of blocks. '''Force a reorg of the given number of blocks.
count: number of blocks to reorg (default 3) count: number of blocks to reorg (default 3)
@ -647,10 +632,12 @@ class Controller(util.LoggedClass):
raise RPCError('daemon error: {}'.format(e)) raise RPCError('daemon error: {}'.format(e))
async def new_subscription(self, address): async def new_subscription(self, address):
if self.subscription_count >= self.max_subs: if self.subs_room <= 0:
self.subs_room = self.max_subs - self.sub_count()
if self.subs_room <= 0:
raise RPCError('server subscription limit {:,d} reached' raise RPCError('server subscription limit {:,d} reached'
.format(self.max_subs)) .format(self.max_subs))
self.subscription_count += 1 self.subs_room -= 1
hashX = self.address_to_hashX(address) hashX = self.address_to_hashX(address)
status = await self.address_status(hashX) status = await self.address_status(hashX)
return hashX, status return hashX, status
@ -777,14 +764,14 @@ class Controller(util.LoggedClass):
'height': utxo.height, 'value': utxo.value} 'height': utxo.height, 'value': utxo.value}
for utxo in sorted(await self.get_utxos(hashX))] for utxo in sorted(await self.get_utxos(hashX))]
async def block_get_chunk(self, index): def block_get_chunk(self, index):
'''Return a chunk of block headers. '''Return a chunk of block headers.
index: the chunk index''' index: the chunk index'''
index = self.non_negative_integer(index) index = self.non_negative_integer(index)
return self.get_chunk(index) return self.get_chunk(index)
async def block_get_header(self, height): def block_get_header(self, height):
'''The deserialized header at a given height. '''The deserialized header at a given height.
height: the header's height''' height: the header's height'''
@ -877,24 +864,6 @@ class Controller(util.LoggedClass):
return banner return banner
async def donation_address(self): def donation_address(self):
'''Return the donation address as a string, empty if there is none.''' '''Return the donation address as a string, empty if there is none.'''
return self.env.donation_address return self.env.donation_address
async def peers_subscribe(self):
'''Returns the server peers as a list of (ip, host, ports) tuples.
Despite the name this is not currently treated as a subscription.'''
return list(self.irc.peers.values())
async def version(self, client_name=None, protocol_version=None):
'''Returns the server version as a string.
client_name: a string identifying the client
protocol_version: the protocol version spoken by the client
'''
if client_name:
self.client = str(client_name)[:15]
if protocol_version is not None:
self.protocol_version = protocol_version
return VERSION

2
server/env.py

@ -66,7 +66,7 @@ class Env(LoggedClass):
self.report_ssl_port self.report_ssl_port
if self.report_ssl_port else if self.report_ssl_port else
self.ssl_port) self.ssl_port)
self.report_host_tor = self.default('REPORT_HOST_TOR', None) self.report_host_tor = self.default('REPORT_HOST_TOR', '')
def default(self, envvar, default): def default(self, envvar, default):
return environ.get(envvar, default) return environ.get(envvar, default)

110
server/irc.py

@ -12,7 +12,6 @@ Only calling start() requires the IRC Python module.
import asyncio import asyncio
import re import re
import socket
from collections import namedtuple from collections import namedtuple
@ -22,52 +21,26 @@ from lib.util import LoggedClass
class IRC(LoggedClass): class IRC(LoggedClass):
Peer = namedtuple('Peer', 'ip_addr host ports')
class DisconnectedError(Exception): class DisconnectedError(Exception):
pass pass
def __init__(self, env): def __init__(self, env, peer_mgr):
super().__init__() super().__init__()
self.env = env self.coin = env.coin
self.peer_mgr = peer_mgr
# If this isn't something a peer or client expects # If this isn't something a peer or client expects
# then you won't appear in the client's network dialog box # then you won't appear in the client's network dialog box
irc_address = (env.coin.IRC_SERVER, env.coin.IRC_PORT)
self.channel = env.coin.IRC_CHANNEL self.channel = env.coin.IRC_CHANNEL
self.prefix = env.coin.IRC_PREFIX self.prefix = env.coin.IRC_PREFIX
self.clients = []
self.nick = '{}{}'.format(self.prefix, self.nick = '{}{}'.format(self.prefix,
env.irc_nick if env.irc_nick else env.irc_nick if env.irc_nick else
double_sha256(env.report_host.encode()) double_sha256(env.report_host.encode())
[:5].hex()) [:5].hex())
self.clients.append(IrcClient(irc_address, self.nick,
env.report_host,
env.report_tcp_port,
env.report_ssl_port))
if env.report_host_tor:
self.clients.append(IrcClient(irc_address, self.nick + '_tor',
env.report_host_tor,
env.report_tcp_port_tor,
env.report_ssl_port_tor))
self.peer_regexp = re.compile('({}[^!]*)!'.format(self.prefix)) self.peer_regexp = re.compile('({}[^!]*)!'.format(self.prefix))
self.peers = {}
async def start(self): async def start(self, name_pairs):
'''Start IRC connections if enabled in environment.''' '''Start IRC connections if enabled in environment.'''
try:
if self.env.irc:
await self.join()
else:
self.logger.info('IRC is disabled')
except asyncio.CancelledError:
pass
except Exception as e:
self.logger.error(str(e))
async def join(self):
import irc.client as irc_client import irc.client as irc_client
from jaraco.stream import buffer from jaraco.stream import buffer
@ -77,21 +50,18 @@ class IRC(LoggedClass):
# Register handlers for events we're interested in # Register handlers for events we're interested in
reactor = irc_client.Reactor() reactor = irc_client.Reactor()
for event in 'welcome join quit kick whoreply disconnect'.split(): for event in 'welcome join quit whoreply disconnect'.split():
reactor.add_global_handler(event, getattr(self, 'on_' + event)) reactor.add_global_handler(event, getattr(self, 'on_' + event))
# Note: Multiple nicks in same channel will trigger duplicate events # Note: Multiple nicks in same channel will trigger duplicate events
for client in self.clients: clients = [IrcClient(self.coin, real_name, self.nick + suffix,
client.connection = reactor.server() reactor.server())
for (real_name, suffix) in name_pairs]
while True: while True:
try: try:
for client in self.clients: for client in clients:
self.logger.info('Joining IRC in {} as "{}" with ' client.connect(self)
'real name "{}"'
.format(self.channel, client.nick,
client.realname))
client.connect()
while True: while True:
reactor.process_once() reactor.process_once()
await asyncio.sleep(2) await asyncio.sleep(2)
@ -130,14 +100,7 @@ class IRC(LoggedClass):
'''Called when someone leaves our channel.''' '''Called when someone leaves our channel.'''
match = self.peer_regexp.match(event.source) match = self.peer_regexp.match(event.source)
if match: if match:
self.peers.pop(match.group(1), None) self.peer_mgr.remove_irc_peer(match.group(1))
def on_kick(self, connection, event):
'''Called when someone is kicked from our channel.'''
self.log_event(event)
match = self.peer_regexp.match(event.arguments[0])
if match:
self.peers.pop(match.group(1), None)
def on_whoreply(self, connection, event): def on_whoreply(self, connection, event):
'''Called when a response to our who requests arrives. '''Called when a response to our who requests arrives.
@ -145,50 +108,25 @@ class IRC(LoggedClass):
The nick is the 4th argument, and real name is in the 6th The nick is the 4th argument, and real name is in the 6th
argument preceeded by '0 ' for some reason. argument preceeded by '0 ' for some reason.
''' '''
try:
nick = event.arguments[4] nick = event.arguments[4]
if nick.startswith(self.prefix): if nick.startswith(self.prefix):
line = event.arguments[6].split() line = event.arguments[6].split()
try: hostname, details = line[1], line[2:]
ip_addr = socket.gethostbyname(line[1]) self.peer_mgr.add_irc_peer(nick, hostname, details)
except socket.error:
# Could be .onion or IPv6.
ip_addr = line[1]
peer = self.Peer(ip_addr, line[1], line[2:])
self.peers[nick] = peer
except (IndexError, UnicodeError):
# UnicodeError comes from invalid domains (issue #68)
pass
class IrcClient(LoggedClass): class IrcClient(object):
VERSION = '1.0' def __init__(self, coin, real_name, nick, server):
DEFAULT_PORTS = {'t': 50001, 's': 50002} self.irc_host = coin.IRC_SERVER
self.irc_port = coin.IRC_PORT
def __init__(self, irc_address, nick, host, tcp_port, ssl_port):
super().__init__()
self.irc_host, self.irc_port = irc_address
self.nick = nick self.nick = nick
self.realname = self.create_realname(host, tcp_port, ssl_port) self.real_name = real_name
self.connection = None self.server = server
def connect(self, keepalive=60): def connect(self, irc):
'''Connect this client to its IRC server''' '''Connect this client to its IRC server'''
self.connection.connect(self.irc_host, self.irc_port, self.nick, irc.logger.info('joining {} as "{}" with real name "{}"'
ircname=self.realname) .format(irc.channel, self.nick, self.real_name))
self.connection.set_keepalive(keepalive) self.server.connect(self.irc_host, self.irc_port, self.nick,
ircname=self.real_name)
@classmethod
def create_realname(cls, host, tcp_port, ssl_port):
def port_text(letter, port):
if not port:
return ''
if port == cls.DEFAULT_PORTS.get(letter):
return ' ' + letter
else:
return ' ' + letter + str(port)
tcp = port_text('t', tcp_port)
ssl = port_text('s', ssl_port)
return '{} v{}{}{}'.format(host, cls.VERSION, tcp, ssl)

139
server/peers.py

@ -0,0 +1,139 @@
# Copyright (c) 2017, Neil Booth
#
# All rights reserved.
#
# See the file "LICENCE" for information about the copyright
# and warranty status of this software.
'''Peer management.'''
import asyncio
import socket
import traceback
from collections import namedtuple
from functools import partial
import lib.util as util
from server.irc import IRC
NetIdentity = namedtuple('NetIdentity', 'host tcp_port ssl_port nick_suffix')
IRCPeer = namedtuple('IRCPeer', 'ip_addr host details')
class PeerManager(util.LoggedClass):
'''Looks after the DB of peer network servers.
Attempts to maintain a connection with up to 8 peers.
Issues a 'peers.subscribe' RPC to them and tells them our data.
'''
VERSION = '1.0'
DEFAULT_PORTS = {'t': 50001, 's': 50002}
def __init__(self, env):
super().__init__()
self.env = env
self.loop = asyncio.get_event_loop()
self.irc = IRC(env, self)
self.futures = set()
self.identities = []
# Keyed by nick
self.irc_peers = {}
# We can have a Tor identity inaddition to a normal one
self.identities.append(NetIdentity(env.report_host,
env.report_tcp_port,
env.report_ssl_port,
''))
if env.report_host_tor.endswith('.onion'):
self.identities.append(NetIdentity(env.report_host_tor,
env.report_tcp_port_tor,
env.report_ssl_port_tor,
'_tor'))
async def executor(self, func, *args, **kwargs):
'''Run func taking args in the executor.'''
await self.loop.run_in_executor(None, partial(func, *args, **kwargs))
@classmethod
def real_name(cls, identity):
'''Real name as used on IRC.'''
def port_text(letter, port):
if not port:
return ''
if port == cls.DEFAULT_PORTS.get(letter):
return ' ' + letter
else:
return ' ' + letter + str(port)
tcp = port_text('t', identity.tcp_port)
ssl = port_text('s', identity.ssl_port)
return '{} v{}{}{}'.format(identity.host, cls.VERSION, tcp, ssl)
def ensure_future(self, coro):
'''Convert a coro into a future and add it to our pending list
to be waited for.'''
self.futures.add(asyncio.ensure_future(coro))
def start_irc(self):
'''Start up the IRC connections if enabled.'''
if self.env.irc:
name_pairs = [(self.real_name(identity), identity.nick_suffix)
for identity in self.identities]
self.ensure_future(self.irc.start(name_pairs))
else:
self.logger.info('IRC is disabled')
async def main_loop(self):
'''Start and then enter the main loop.'''
self.start_irc()
try:
while True:
await asyncio.sleep(10)
done = [future for future in self.futures if future.done()]
self.futures.difference_update(done)
for future in done:
try:
future.result()
except:
self.log_error(traceback.format_exc())
finally:
for future in self.futures:
future.cancel()
def dns_lookup_peer(self, nick, hostname, details):
try:
ip_addr = None
try:
ip_addr = socket.gethostbyname(hostname)
except socket.error:
pass # IPv6?
ip_addr = ip_addr or hostname
self.irc_peers[nick] = IRCPeer(ip_addr, hostname, details)
self.logger.info('new IRC peer {} at {} ({})'
.format(nick, hostname, details))
except UnicodeError:
# UnicodeError comes from invalid domains (issue #68)
self.logger.info('IRC peer domain {} invalid'.format(hostname))
def add_irc_peer(self, *args):
'''Schedule DNS lookup of peer.'''
self.ensure_future(self.executor(self.dns_lookup_peer, *args))
def remove_irc_peer(self, nick):
'''Remove a peer from our IRC peers map.'''
self.logger.info('removing IRC peer {}'.format(nick))
self.irc_peers.pop(nick, None)
def count(self):
return len(self.irc_peers)
def peer_list(self):
return self.irc_peers
def subscribe(self):
'''Returns the server peers as a list of (ip, host, details) tuples.
Despite the name this is not currently treated as a subscription.'''
return list(self.irc_peers.values())

170
server/session.py

@ -9,38 +9,61 @@
import asyncio import asyncio
import time
import traceback import traceback
from functools import partial
from lib.jsonrpc import JSONRPC, RPCError from lib.jsonrpc import JSONSession, RPCError
from server.daemon import DaemonError from server.daemon import DaemonError
from server.version import VERSION
class Session(JSONRPC): class SessionBase(JSONSession):
'''Base class of ElectrumX JSON session protocols. '''Base class of ElectrumX JSON sessions.
Each session runs its tasks in asynchronous parallelism with other Each session runs its tasks in asynchronous parallelism with other
sessions. To prevent some sessions blocking others, potentially sessions.
long-running requests should yield.
''' '''
def __init__(self, controller, bp, env, kind): def __init__(self, controller, kind):
super().__init__() super().__init__()
self.kind = kind # 'RPC', 'TCP' etc.
self.controller = controller self.controller = controller
self.bp = bp self.bp = controller.bp
self.env = env self.env = controller.env
self.daemon = bp.daemon self.daemon = self.bp.daemon
self.kind = kind
self.client = 'unknown' self.client = 'unknown'
self.anon_logs = env.anon_logs self.anon_logs = self.env.anon_logs
self.max_send = env.max_send
self.bandwidth_limit = env.bandwidth_limit
self.last_delay = 0 self.last_delay = 0
self.txs_sent = 0 self.txs_sent = 0
self.requests = [] self.requests = []
self.start_time = time.time()
self.close_time = 0
self.bw_time = self.start_time
self.bw_interval = 3600
self.bw_used = 0
def have_pending_items(self):
'''Called each time the pending item queue goes from empty to having
one item.'''
self.controller.enqueue_session(self)
def is_closing(self): def close_connection(self):
'''True if this session is closing.''' '''Call this to close the connection.'''
return self.transport and self.transport.is_closing() self.close_time = time.time()
super().close_connection()
def peername(self, *, for_log=True):
'''Return the peer name of this connection.'''
peer_info = self.peer_info()
if not peer_info:
return 'unknown'
if for_log and self.anon_logs:
return 'xx.xx.xx.xx:xx'
if ':' in peer_info[0]:
return '[{}]:{}'.format(peer_info[0], peer_info[1])
else:
return '{}:{}'.format(peer_info[0], peer_info[1])
def flags(self): def flags(self):
'''Status flags.''' '''Status flags.'''
@ -52,42 +75,6 @@ class Session(JSONRPC):
status += str(self.controller.session_priority(self)) status += str(self.controller.session_priority(self))
return status return status
def requests_remaining(self):
return sum(request.remaining for request in self.requests)
def enqueue_request(self, request):
'''Add a request to the session's list.'''
self.requests.append(request)
if len(self.requests) == 1:
self.controller.enqueue_session(self)
async def serve_requests(self):
'''Serve requests in batches.'''
total = 0
errs = []
# Process 8 items at a time
for request in self.requests:
try:
initial = request.remaining
await request.process(self)
total += initial - request.remaining
except asyncio.CancelledError:
raise
except Exception:
# Should probably be considered a bug and fixed
self.log_error('error handling request {}'.format(request))
traceback.print_exc()
errs.append(request)
await asyncio.sleep(0)
if total >= 8:
break
# Remove completed requests and re-enqueue ourself if any remain.
self.requests = [req for req in self.requests
if req.remaining and not req in errs]
if self.requests:
self.controller.enqueue_session(self)
def connection_made(self, transport): def connection_made(self, transport):
'''Handle an incoming client connection.''' '''Handle an incoming client connection.'''
super().connection_made(transport) super().connection_made(transport)
@ -95,27 +82,32 @@ class Session(JSONRPC):
def connection_lost(self, exc): def connection_lost(self, exc):
'''Handle client disconnection.''' '''Handle client disconnection.'''
super().connection_lost(exc) msg = ''
if (self.pause or self.controller.is_deprioritized(self) if self.pause:
or self.send_size >= 1024*1024 or self.error_count): msg += ' whilst paused'
self.log_info('disconnected. Sent {:,d} bytes in {:,d} messages ' if self.controller.is_deprioritized(self):
'{:,d} errors' msg += ' whilst deprioritized'
.format(self.send_size, self.send_count, if self.send_size >= 1024*1024:
self.error_count)) msg += ('. Sent {:,d} bytes in {:,d} messages'
.format(self.send_size, self.send_count))
if msg:
msg = 'disconnected' + msg
self.log_info(msg)
self.controller.remove_session(self) self.controller.remove_session(self)
def sub_count(self): def sub_count(self):
return 0 return 0
class ElectrumX(Session): class ElectrumX(SessionBase):
'''A TCP server that handles incoming Electrum connections.''' '''A TCP server that handles incoming Electrum connections.'''
def __init__(self, *args): def __init__(self, *args, **kwargs):
super().__init__(*args) super().__init__(*args, **kwargs)
self.subscribe_headers = False self.subscribe_headers = False
self.subscribe_height = False self.subscribe_height = False
self.notified_height = None self.notified_height = None
self.max_send = self.env.max_send
self.max_subs = self.env.max_session_subs self.max_subs = self.env.max_session_subs
self.hashX_subs = {} self.hashX_subs = {}
self.electrumx_handlers = { self.electrumx_handlers = {
@ -123,6 +115,7 @@ class ElectrumX(Session):
'blockchain.headers.subscribe': self.headers_subscribe, 'blockchain.headers.subscribe': self.headers_subscribe,
'blockchain.numblocks.subscribe': self.numblocks_subscribe, 'blockchain.numblocks.subscribe': self.numblocks_subscribe,
'blockchain.transaction.broadcast': self.transaction_broadcast, 'blockchain.transaction.broadcast': self.transaction_broadcast,
'server.version': self.server_version,
} }
def sub_count(self): def sub_count(self):
@ -133,32 +126,29 @@ class ElectrumX(Session):
Cache is a shared cache for this update. Cache is a shared cache for this update.
''' '''
controller = self.controller
pairs = []
if height != self.notified_height: if height != self.notified_height:
self.notified_height = height self.notified_height = height
if self.subscribe_headers: if self.subscribe_headers:
payload = self.notification_payload( args = (controller.electrum_header(height), )
'blockchain.headers.subscribe', pairs.append(('blockchain.headers.subscribe', args))
(self.controller.electrum_header(height), ),
)
self.encode_and_send_payload(payload)
if self.subscribe_height: if self.subscribe_height:
payload = self.notification_payload( pairs.append(('blockchain.numblocks.subscribe', (height, )))
'blockchain.numblocks.subscribe',
(height, ),
)
self.encode_and_send_payload(payload)
matches = touched.intersection(self.hashX_subs) matches = touched.intersection(self.hashX_subs)
for hashX in matches: for hashX in matches:
address = self.hashX_subs[hashX] address = self.hashX_subs[hashX]
status = await self.controller.address_status(hashX) status = await controller.address_status(hashX)
payload = self.notification_payload( pairs.append(('blockchain.address.subscribe', (address, status)))
'blockchain.address.subscribe', (address, status))
self.encode_and_send_payload(payload)
self.send_notifications(pairs)
if matches: if matches:
self.log_info('notified of {:,d} addresses'.format(len(matches))) es = '' if len(matches) == 1 else 'es'
self.log_info('notified of {:,d} address{}'
.format(len(matches), es))
def height(self): def height(self):
'''Return the current flushed database height.''' '''Return the current flushed database height.'''
@ -168,12 +158,12 @@ class ElectrumX(Session):
'''Used as response to a headers subscription request.''' '''Used as response to a headers subscription request.'''
return self.controller.electrum_header(self.height()) return self.controller.electrum_header(self.height())
async def headers_subscribe(self): def headers_subscribe(self):
'''Subscribe to get headers of new blocks.''' '''Subscribe to get headers of new blocks.'''
self.subscribe_headers = True self.subscribe_headers = True
return self.current_electrum_header() return self.current_electrum_header()
async def numblocks_subscribe(self): def numblocks_subscribe(self):
'''Subscribe to get height of new blocks.''' '''Subscribe to get height of new blocks.'''
self.subscribe_height = True self.subscribe_height = True
return self.height() return self.height()
@ -191,6 +181,18 @@ class ElectrumX(Session):
self.hashX_subs[hashX] = address self.hashX_subs[hashX] = address
return status return status
def server_version(self, client_name=None, protocol_version=None):
'''Returns the server version as a string.
client_name: a string identifying the client
protocol_version: the protocol version spoken by the client
'''
if client_name:
self.client = str(client_name)[:15]
if protocol_version is not None:
self.protocol_version = protocol_version
return VERSION
async def transaction_broadcast(self, raw_tx): async def transaction_broadcast(self, raw_tx):
'''Broadcast a raw transaction to the network. '''Broadcast a raw transaction to the network.
@ -230,13 +232,13 @@ class ElectrumX(Session):
return handler return handler
class LocalRPC(Session): class LocalRPC(SessionBase):
'''A local TCP RPC server for querying status.''' '''A local TCP RPC server session.'''
def __init__(self, *args): def __init__(self, *args, **kwargs):
super().__init__(*args) super().__init__(*args, **kwargs)
self.client = 'RPC' self.client = 'RPC'
self.max_send = 5000000 self.max_send = 0
def request_handler(self, method): def request_handler(self, method):
'''Return the async handler for the given request method.''' '''Return the async handler for the given request method.'''

4
server/storage.py

@ -10,11 +10,11 @@
import os import os
from functools import partial from functools import partial
from lib.util import subclasses, increment_byte_string import lib.util as util
def db_class(name): def db_class(name):
'''Returns a DB engine class.''' '''Returns a DB engine class.'''
for db_class in subclasses(Storage): for db_class in util.subclasses(Storage):
if db_class.__name__.lower() == name.lower(): if db_class.__name__.lower() == name.lower():
db_class.import_module() db_class.import_module()
return db_class return db_class

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.10.7" VERSION = "ElectrumX 0.10.11"

2
tests/test_util.py

@ -56,4 +56,4 @@ def test_chunks():
def test_increment_byte_string(): def test_increment_byte_string():
assert util.increment_byte_string(b'1') == b'2' assert util.increment_byte_string(b'1') == b'2'
assert util.increment_byte_string(b'\x01\x01') == b'\x01\x02' assert util.increment_byte_string(b'\x01\x01') == b'\x01\x02'
assert util.increment_byte_string(b'\xff\xff') == b'\x01\x00\x00' assert util.increment_byte_string(b'\xff\xff') == None

Loading…
Cancel
Save