Browse Source

Merge branch 'release-0.8.7'

master 0.8.7
Neil Booth 8 years ago
parent
commit
4839e6bdc6
  1. 7
      RELEASE-NOTES
  2. 4
      electrumx_rpc.py
  3. 154
      lib/jsonrpc.py
  4. 4
      samples/systemd/electrumx.service
  5. 5
      server/block_processor.py
  6. 69
      server/protocol.py
  7. 2
      server/version.py

7
RELEASE-NOTES

@ -1,3 +1,10 @@
version 0.8.7
-------------
- update systemd config (bauerj)
- temporary fix for initial sync times
- continued JSON code refactoring
version 0.8.6 version 0.8.6
------------- -------------

4
electrumx_rpc.py

@ -31,12 +31,12 @@ class RPCClient(JSONRPC):
future = asyncio.ensure_future(self.messages.get()) future = asyncio.ensure_future(self.messages.get())
for f in asyncio.as_completed([future], timeout=timeout): for f in asyncio.as_completed([future], timeout=timeout):
try: try:
message = await f request = await f
except asyncio.TimeoutError: except asyncio.TimeoutError:
future.cancel() future.cancel()
print('request timed out after {}s'.format(timeout)) print('request timed out after {}s'.format(timeout))
else: else:
await self.handle_message(message) await request.process()
async def handle_response(self, result, error, method): async def handle_response(self, result, error, method):
if result and method == 'sessions': if result and method == 'sessions':

154
lib/jsonrpc.py

@ -15,13 +15,60 @@ import time
from lib.util import LoggedClass from lib.util import LoggedClass
class SingleRequest(object):
'''An object that represents a single request.'''
def __init__(self, session, payload):
self.payload = payload
self.session = session
async def process(self):
'''Asynchronously handle the JSON request.'''
binary = await self.session.process_single_payload(self.payload)
if binary:
self.session._send_bytes(binary)
class BatchRequest(object):
'''An object that represents a batch request and its processing state.
Batches are processed in parts chunks.
'''
CUHNK_SIZE = 3
def __init__(self, session, payload):
self.session = session
self.payload = payload
self.done = 0
self.parts = []
async def process(self):
'''Asynchronously handle the JSON batch according to the JSON 2.0
spec.'''
for n in range(self.CHUNK_SIZE):
if self.done >= len(self.payload):
if self.parts:
binary = b'[' + b', '.join(self.parts) + b']'
self.session._send_bytes(binary)
return
item = self.payload[self.done]
part = await self.session.process_single_payload(item)
if part:
self.parts.append(part)
self.done += 1
# Re-enqueue to continue the rest later
self.session.enqueue_request(self)
return b''
class JSONRPC(asyncio.Protocol, LoggedClass): class JSONRPC(asyncio.Protocol, LoggedClass):
'''Manages a JSONRPC connection. '''Manages a JSONRPC connection.
Assumes JSON messages are newline-separated and that newlines Assumes JSON messages are newline-separated and that newlines
cannot appear in the JSON other than to separate lines. Incoming cannot appear in the JSON other than to separate lines. Incoming
messages are queued on the messages queue for later asynchronous messages are queued on the messages queue for later asynchronous
processing, and should be passed to the handle_message() function. processing, and should be passed to the handle_request() function.
Derived classes may want to override connection_made() and Derived classes may want to override connection_made() and
connection_lost() but should be sure to call the implementation in connection_lost() but should be sure to call the implementation in
@ -53,9 +100,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
self.msg = msg self.msg = msg
self.code = code self.code = code
class LargeRequestError(Exception):
'''Raised if a large request was prevented from being sent.'''
@classmethod @classmethod
def request_payload(cls, method, id_, params=None): def request_payload(cls, method, id_, params=None):
payload = {'jsonrpc': '2.0', 'id': id_, 'method': method} payload = {'jsonrpc': '2.0', 'id': id_, 'method': method}
@ -184,18 +228,27 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
message = message.decode() message = message.decode()
except UnicodeDecodeError as e: except UnicodeDecodeError as e:
msg = 'cannot decode binary bytes: {}'.format(e) msg = 'cannot decode binary bytes: {}'.format(e)
self.send_json_error(msg, self.PARSE_ERROR) self.send_json_error(msg, self.PARSE_ERROR, close=True)
return return
try: try:
message = json.loads(message) message = json.loads(message)
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
msg = 'cannot decode JSON: {}'.format(e) msg = 'cannot decode JSON: {}'.format(e)
self.send_json_error(msg, self.PARSE_ERROR) self.send_json_error(msg, self.PARSE_ERROR, close=True)
return return
if isinstance(message, list):
# Batches must have at least one request.
if not message:
self.send_json_error('empty batch', self.INVALID_REQUEST)
return
request = BatchRequest(self, message)
else:
request = SingleRequest(self, message)
'''Queue the request for asynchronous handling.''' '''Queue the request for asynchronous handling.'''
self.messages.put_nowait(message) self.enqueue_request(request)
if self.log_me: if self.log_me:
self.log_info('queued {}'.format(message)) self.log_info('queued {}'.format(message))
@ -214,23 +267,23 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
self.using_bandwidth(len(binary)) self.using_bandwidth(len(binary))
return binary return binary
def _send_bytes(self, text, close): def _send_bytes(self, binary, close=False):
'''Send JSON text over the transport. Close it if close is True.''' '''Send JSON text over the transport. Close it if close is True.'''
# Confirmed this happens, sometimes a lot # Confirmed this happens, sometimes a lot
if self.transport.is_closing(): if self.transport.is_closing():
return return
self.transport.write(text) self.transport.write(binary)
self.transport.write(b'\n') self.transport.write(b'\n')
if close: if close or self.error_count > 10:
self.transport.close() self.transport.close()
def send_json_error(self, message, code, id_=None, close=True): def send_json_error(self, message, code, id_=None, close=False):
'''Send a JSON error and close the connection by default.''' '''Send a JSON error and close the connection by default.'''
self._send_bytes(self.json_error_bytes(message, code, id_), close) self._send_bytes(self.json_error_bytes(message, code, id_), close)
def encode_and_send_payload(self, payload): def encode_and_send_payload(self, payload):
'''Encode the payload and send it.''' '''Encode the payload and send it.'''
self._send_bytes(self.encode_payload(payload), False) self._send_bytes(self.encode_payload(payload))
def json_notification_bytes(self, method, params): def json_notification_bytes(self, method, params):
'''Return the bytes of a json notification.''' '''Return the bytes of a json notification.'''
@ -249,74 +302,33 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
self.error_count += 1 self.error_count += 1
return self.encode_payload(self.error_payload(message, code, id_)) return self.encode_payload(self.error_payload(message, code, id_))
async def handle_message(self, payload): async def process_single_payload(self, payload):
'''Asynchronously handle a JSON request or response. '''Return the binary JSON result of a single JSON request, response or
Handles batches according to the JSON 2.0 spec.
'''
try:
if isinstance(payload, list):
binary = await self.process_json_batch(payload)
else:
binary = await self.process_single_json(payload)
except self.RPCError as e:
binary = self.json_error_bytes(e.msg, e.code,
self.payload_id(payload))
if binary:
self._send_bytes(binary, self.error_count > 10)
async def process_json_batch(self, batch):
'''Return the text response to a JSON batch request.'''
# Batches must have at least one request.
if not batch:
return self.json_error_bytes('empty batch', self.INVALID_REQUEST)
# PYTHON 3.6: use asynchronous comprehensions when supported
parts = []
total_len = 0
for item in batch:
part = await self.process_single_json(item)
if part:
parts.append(part)
total_len += len(part) + 2
self.check_oversized_request(total_len)
if parts:
return b'[' + b', '.join(parts) + b']'
return b''
async def process_single_json(self, payload):
'''Return the JSON result of a single JSON request, response or
notification. notification.
Return None if the request is a notification or a response. The result is empty if nothing is to be sent.
''' '''
# Throttle high-bandwidth connections by delaying processing
# their requests. Delay more the higher the excessive usage.
excess = self.bandwidth_used - self.bandwidth_limit
if excess > 0:
secs = 1 + excess // self.bandwidth_limit
self.log_warning('high bandwidth use of {:,d} bytes, '
'sleeping {:d}s'
.format(self.bandwidth_used, secs))
await asyncio.sleep(secs)
if not isinstance(payload, dict): if not isinstance(payload, dict):
return self.json_error_bytes('request must be a dict', return self.json_error_bytes('request must be a dict',
self.INVALID_REQUEST) self.INVALID_REQUEST)
if not 'id' in payload: try:
return await self.process_json_notification(payload) if not 'id' in payload:
return await self.process_json_notification(payload)
id_ = payload['id'] id_ = payload['id']
if not isinstance(id_, self.ID_TYPES): if not isinstance(id_, self.ID_TYPES):
return self.json_error_bytes('invalid id: {}'.format(id_), return self.json_error_bytes('invalid id: {}'.format(id_),
self.INVALID_REQUEST) self.INVALID_REQUEST)
if 'method' in payload: if 'method' in payload:
return await self.process_json_request(payload) return await self.process_json_request(payload)
return await self.process_json_response(payload) return await self.process_json_response(payload)
except self.RPCError as e:
return self.json_error_bytes(e.msg, e.code,
self.payload_id(payload))
@classmethod @classmethod
def method_and_params(cls, payload): def method_and_params(cls, payload):
@ -394,6 +406,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
# --- derived classes are intended to override these functions # --- derived classes are intended to override these functions
def enqueue_request(self, request):
'''Enqueue a request for later asynchronous processing.'''
self.messages.put_nowait(request)
async def handle_notification(self, method, params): async def handle_notification(self, method, params):
'''Handle a notification.''' '''Handle a notification.'''

4
samples/systemd/electrumx.service

@ -6,6 +6,8 @@ After=network.target
EnvironmentFile=/etc/electrumx.conf EnvironmentFile=/etc/electrumx.conf
ExecStart=/usr/local/bin/electrumx_server.py ExecStart=/usr/local/bin/electrumx_server.py
User=electrumx User=electrumx
LimitNOFILE=8192
TimeoutStopSec=30min
[Install] [Install]
WantedBy=multi-user.target WantedBy=multi-user.target

5
server/block_processor.py

@ -223,7 +223,10 @@ class BlockProcessor(server.db.DB):
touched = set() touched = set()
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
try: try:
await loop.run_in_executor(None, do_it) if self.caught_up:
await loop.run_in_executor(None, do_it)
else:
do_it()
except ChainReorg: except ChainReorg:
await self.handle_chain_reorg(touched) await self.handle_chain_reorg(touched)

69
server/protocol.py

@ -218,8 +218,13 @@ class ServerManager(util.LoggedClass):
MgrTask = namedtuple('MgrTask', 'session task') MgrTask = namedtuple('MgrTask', 'session task')
class NotificationRequest(object):
def __init__(self, fn_call):
self.process = fn_call
def __init__(self, env): def __init__(self, env):
super().__init__() super().__init__()
self.start = time.time()
self.bp = BlockProcessor(self, env) self.bp = BlockProcessor(self, env)
self.mempool = MemPool(self.bp.daemon, env.coin, self.bp, self) self.mempool = MemPool(self.bp.daemon, env.coin, self.bp, self)
self.irc = IRC(env) self.irc = IRC(env)
@ -231,7 +236,9 @@ class ServerManager(util.LoggedClass):
self.max_subs = env.max_subs self.max_subs = env.max_subs
self.subscription_count = 0 self.subscription_count = 0
self.next_stale_check = 0 self.next_stale_check = 0
self.history_cache = pylru.lrucache(128) self.history_cache = pylru.lrucache(256)
self.header_cache = pylru.lrucache(8)
self.height = 0
self.futures = [] self.futures = []
env.max_send = max(350000, env.max_send) env.max_send = max(350000, env.max_send)
self.logger.info('session timeout: {:,d} seconds' self.logger.info('session timeout: {:,d} seconds'
@ -318,16 +325,18 @@ class ServerManager(util.LoggedClass):
def notify(self, touched): def notify(self, touched):
'''Notify sessions about height changes and touched addresses.''' '''Notify sessions about height changes and touched addresses.'''
# Remove invalidated history cache # Invalidate caches
hc = self.history_cache hc = self.history_cache
for hash168 in set(hc).intersection(touched): for hash168 in set(hc).intersection(touched):
del hc[hash168] del hc[hash168]
cache = {} if self.bp.db_height != self.height:
self.height = self.bp.db_height
self.header_cache.clear()
for session in self.sessions: for session in self.sessions:
if isinstance(session, ElectrumX): if isinstance(session, ElectrumX):
# Use a tuple to distinguish from JSON fn_call = partial(session.notify, self.bp.db_height, touched)
triple = (self.bp.db_height, touched, cache) session.enqueue_request(self.NotificationRequest(fn_call))
session.messages.put_nowait(triple)
# Periodically log sessions # Periodically log sessions
if self.env.log_sessions and time.time() > self.next_log_sessions: if self.env.log_sessions and time.time() > self.next_log_sessions:
data = self.session_data(for_log=True) data = self.session_data(for_log=True)
@ -336,6 +345,17 @@ class ServerManager(util.LoggedClass):
self.logger.info(json.dumps(self.server_summary())) self.logger.info(json.dumps(self.server_summary()))
self.next_log_sessions = time.time() + self.env.log_sessions self.next_log_sessions = time.time() + self.env.log_sessions
def electrum_header(self, height):
'''Return the binary header at the given height.'''
if not 0 <= height <= self.bp.db_height:
raise self.RPCError('height {:,d} out of range'.format(height))
if height in self.header_cache:
return self.header_cache[height]
header = self.bp.read_headers(height, 1)
header = self.env.coin.electrum_header(header, height)
self.header_cache[height] = header
return header
async def async_get_history(self, hash168): async def async_get_history(self, hash168):
if hash168 in self.history_cache: if hash168 in self.history_cache:
return self.history_cache[hash168] return self.history_cache[hash168]
@ -557,6 +577,7 @@ class Session(JSONRPC):
self.max_send = env.max_send self.max_send = env.max_send
self.bandwidth_limit = env.bandwidth_limit self.bandwidth_limit = env.bandwidth_limit
self.txs_sent = 0 self.txs_sent = 0
self.bucket = int(self.start - self.manager.start) // 60
def is_closing(self): def is_closing(self):
'''True if this session is closing.''' '''True if this session is closing.'''
@ -597,19 +618,14 @@ class Session(JSONRPC):
async def serve_requests(self): async def serve_requests(self):
'''Asynchronously run through the task queue.''' '''Asynchronously run through the task queue.'''
while True: while True:
await asyncio.sleep(0) request = await self.messages.get()
message = await self.messages.get()
try: try:
# Height / mempool notification? await request.process()
if isinstance(message, tuple):
await self.notify(*message)
else:
await self.handle_message(message)
except asyncio.CancelledError: except asyncio.CancelledError:
break break
except Exception: except Exception:
# Getting here should probably be considered a bug and fixed # Getting here should probably be considered a bug and fixed
self.log_error('error handling request {}'.format(message)) self.log_error('error handling request {}'.format(request))
traceback.print_exc() traceback.print_exc()
def sub_count(self): def sub_count(self):
@ -677,7 +693,7 @@ class ElectrumX(Session):
def sub_count(self): def sub_count(self):
return len(self.hash168s) return len(self.hash168s)
async def notify(self, height, touched, cache): async def notify(self, height, touched):
'''Notify the client about changes in height and touched addresses. '''Notify the client about changes in height and touched addresses.
Cache is a shared cache for this update. Cache is a shared cache for this update.
@ -685,13 +701,11 @@ class ElectrumX(Session):
if height != self.notified_height: if height != self.notified_height:
self.notified_height = height self.notified_height = height
if self.subscribe_headers: if self.subscribe_headers:
key = 'headers_payload' payload = self.notification_payload(
if key not in cache: 'blockchain.headers.subscribe',
cache[key] = self.notification_payload( (self.manager.electrum_header(height), ),
'blockchain.headers.subscribe', )
(self.electrum_header(height), ), self.encode_and_send_payload(payload)
)
self.encode_and_send_payload(cache[key])
if self.subscribe_height: if self.subscribe_height:
payload = self.notification_payload( payload = self.notification_payload(
@ -718,14 +732,7 @@ class ElectrumX(Session):
def current_electrum_header(self): def current_electrum_header(self):
'''Used as response to a headers subscription request.''' '''Used as response to a headers subscription request.'''
return self.electrum_header(self.height()) return self.manager.electrum_header(self.height())
def electrum_header(self, height):
'''Return the binary header at the given height.'''
if not 0 <= height <= self.height():
raise self.RPCError('height {:,d} out of range'.format(height))
header = self.bp.read_headers(height, 1)
return self.coin.electrum_header(header, height)
async def address_status(self, hash168): async def address_status(self, hash168):
'''Returns status as 32 bytes.''' '''Returns status as 32 bytes.'''
@ -849,7 +856,7 @@ class ElectrumX(Session):
async def block_get_header(self, params): async def block_get_header(self, params):
height = self.params_to_non_negative_integer(params) height = self.params_to_non_negative_integer(params)
return self.electrum_header(height) return self.manager.electrum_header(height)
async def estimatefee(self, params): async def estimatefee(self, params):
return await self.daemon_request('estimatefee', params) return await self.daemon_request('estimatefee', params)

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.8.6" VERSION = "ElectrumX 0.8.7"

Loading…
Cancel
Save