From e40db63bebabb2b5cde067081a5dabdf64957d01 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 8 Dec 2016 00:29:46 +0900 Subject: [PATCH 1/5] Queue requests, which have a process method. --- electrumx_rpc.py | 4 +- lib/jsonrpc.py | 148 ++++++++++++++++++++++++--------------------- server/protocol.py | 21 +++---- 3 files changed, 92 insertions(+), 81 deletions(-) diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 3ec6062..8de320d 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -31,12 +31,12 @@ class RPCClient(JSONRPC): future = asyncio.ensure_future(self.messages.get()) for f in asyncio.as_completed([future], timeout=timeout): try: - message = await f + request = await f except asyncio.TimeoutError: future.cancel() print('request timed out after {}s'.format(timeout)) else: - await self.handle_message(message) + await request.process() async def handle_response(self, result, error, method): if result and method == 'sessions': diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 5eb51ee..0496dee 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -21,7 +21,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): Assumes JSON messages are newline-separated and that newlines cannot appear in the JSON other than to separate lines. Incoming messages are queued on the messages queue for later asynchronous - processing, and should be passed to the handle_message() function. + processing, and should be passed to the handle_request() function. Derived classes may want to override connection_made() and connection_lost() but should be sure to call the implementation in @@ -53,8 +53,47 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.msg = msg self.code = code - class LargeRequestError(Exception): - '''Raised if a large request was prevented from being sent.''' + class SingleRequest(object): + '''An object that represents a single request.''' + def __init__(self, session, payload): + self.payload = payload + self.session = session + + async def process(self): + '''Asynchronously handle the JSON request.''' + binary = await self.session.process_single_payload(self.payload) + if binary: + self.session._send_bytes(binary) + + class BatchRequest(object): + '''An object that represents a batch request and its processing + state.''' + def __init__(self, session, payload): + self.session = session + self.payload = payload + self.done = 0 + self.parts = [] + + async def process(self): + '''Asynchronously handle the JSON batch according to the JSON 2.0 + spec.''' + if not self.payload: + raise JSONRPC.RPCError('empty batch', self.INVALID_REQUEST) + for n in range(self.session.batch_limit): + if self.done >= len(self.payload): + if self.parts: + binary = b'[' + b', '.join(self.parts) + b']' + self.session._send_bytes(binary) + return + item = self.payload[self.done] + part = await self.session.process_single_payload(item) + if part: + self.parts.append(part) + self.done += 1 + + # Re-enqueue to continue the rest later + self.session.enqueue_request(self) + return b'' @classmethod def request_payload(cls, method, id_, params=None): @@ -90,6 +129,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.bandwidth_interval = 3600 self.bandwidth_used = 0 self.bandwidth_limit = 5000000 + self.batch_limit = 4 self.transport = None # Parts of an incomplete JSON line. We buffer them until # getting a newline. @@ -184,18 +224,27 @@ class JSONRPC(asyncio.Protocol, LoggedClass): message = message.decode() except UnicodeDecodeError as e: msg = 'cannot decode binary bytes: {}'.format(e) - self.send_json_error(msg, self.PARSE_ERROR) + self.send_json_error(msg, self.PARSE_ERROR, close=True) return try: message = json.loads(message) except json.JSONDecodeError as e: msg = 'cannot decode JSON: {}'.format(e) - self.send_json_error(msg, self.PARSE_ERROR) + self.send_json_error(msg, self.PARSE_ERROR, close=True) return + if isinstance(message, list): + # Batches must have at least one request. + if not message: + self.send_json_error('empty batch', self.INVALID_REQUEST) + return + request = self.BatchRequest(self, message) + else: + request = self.SingleRequest(self, message) + '''Queue the request for asynchronous handling.''' - self.messages.put_nowait(message) + self.enqueue_request(request) if self.log_me: self.log_info('queued {}'.format(message)) @@ -214,23 +263,23 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.using_bandwidth(len(binary)) return binary - def _send_bytes(self, text, close): + def _send_bytes(self, binary, close=False): '''Send JSON text over the transport. Close it if close is True.''' # Confirmed this happens, sometimes a lot if self.transport.is_closing(): return - self.transport.write(text) + self.transport.write(binary) self.transport.write(b'\n') - if close: + if close or self.error_count > 10: self.transport.close() - def send_json_error(self, message, code, id_=None, close=True): + def send_json_error(self, message, code, id_=None, close=False): '''Send a JSON error and close the connection by default.''' self._send_bytes(self.json_error_bytes(message, code, id_), close) def encode_and_send_payload(self, payload): '''Encode the payload and send it.''' - self._send_bytes(self.encode_payload(payload), False) + self._send_bytes(self.encode_payload(payload)) def json_notification_bytes(self, method, params): '''Return the bytes of a json notification.''' @@ -249,74 +298,33 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.error_count += 1 return self.encode_payload(self.error_payload(message, code, id_)) - async def handle_message(self, payload): - '''Asynchronously handle a JSON request or response. - - Handles batches according to the JSON 2.0 spec. - ''' - try: - if isinstance(payload, list): - binary = await self.process_json_batch(payload) - else: - binary = await self.process_single_json(payload) - except self.RPCError as e: - binary = self.json_error_bytes(e.msg, e.code, - self.payload_id(payload)) - - if binary: - self._send_bytes(binary, self.error_count > 10) - - async def process_json_batch(self, batch): - '''Return the text response to a JSON batch request.''' - # Batches must have at least one request. - if not batch: - return self.json_error_bytes('empty batch', self.INVALID_REQUEST) - - # PYTHON 3.6: use asynchronous comprehensions when supported - parts = [] - total_len = 0 - for item in batch: - part = await self.process_single_json(item) - if part: - parts.append(part) - total_len += len(part) + 2 - self.check_oversized_request(total_len) - if parts: - return b'[' + b', '.join(parts) + b']' - return b'' - - async def process_single_json(self, payload): - '''Return the JSON result of a single JSON request, response or + async def process_single_payload(self, payload): + '''Return the binary JSON result of a single JSON request, response or notification. - Return None if the request is a notification or a response. + The result is empty if nothing is to be sent. ''' - # Throttle high-bandwidth connections by delaying processing - # their requests. Delay more the higher the excessive usage. - excess = self.bandwidth_used - self.bandwidth_limit - if excess > 0: - secs = 1 + excess // self.bandwidth_limit - self.log_warning('high bandwidth use of {:,d} bytes, ' - 'sleeping {:d}s' - .format(self.bandwidth_used, secs)) - await asyncio.sleep(secs) if not isinstance(payload, dict): return self.json_error_bytes('request must be a dict', self.INVALID_REQUEST) - if not 'id' in payload: - return await self.process_json_notification(payload) + try: + if not 'id' in payload: + return await self.process_json_notification(payload) - id_ = payload['id'] - if not isinstance(id_, self.ID_TYPES): - return self.json_error_bytes('invalid id: {}'.format(id_), - self.INVALID_REQUEST) + id_ = payload['id'] + if not isinstance(id_, self.ID_TYPES): + return self.json_error_bytes('invalid id: {}'.format(id_), + self.INVALID_REQUEST) - if 'method' in payload: - return await self.process_json_request(payload) + if 'method' in payload: + return await self.process_json_request(payload) - return await self.process_json_response(payload) + return await self.process_json_response(payload) + except self.RPCError as e: + return self.json_error_bytes(e.msg, e.code, + self.payload_id(payload)) @classmethod def method_and_params(cls, payload): @@ -394,6 +402,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass): # --- derived classes are intended to override these functions + def enqueue_request(self, request): + '''Enqueue a request for later asynchronous processing.''' + self.messages.put_nowait(request) + async def handle_notification(self, method, params): '''Handle a notification.''' diff --git a/server/protocol.py b/server/protocol.py index 7864ff8..2711ee9 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -316,6 +316,10 @@ class ServerManager(util.LoggedClass): sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile) await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc) + class NotificationRequest(object): + def __init__(self, fn_call): + self.process = fn_call + def notify(self, touched): '''Notify sessions about height changes and touched addresses.''' # Remove invalidated history cache @@ -325,9 +329,9 @@ class ServerManager(util.LoggedClass): cache = {} for session in self.sessions: if isinstance(session, ElectrumX): - # Use a tuple to distinguish from JSON - triple = (self.bp.db_height, touched, cache) - session.messages.put_nowait(triple) + fn_call = partial(session.notify, self.bp.db_height, touched, + cache) + session.enqueue_request(self.NotificationRequest(fn_call)) # Periodically log sessions if self.env.log_sessions and time.time() > self.next_log_sessions: data = self.session_data(for_log=True) @@ -597,19 +601,14 @@ class Session(JSONRPC): async def serve_requests(self): '''Asynchronously run through the task queue.''' while True: - await asyncio.sleep(0) - message = await self.messages.get() + request = await self.messages.get() try: - # Height / mempool notification? - if isinstance(message, tuple): - await self.notify(*message) - else: - await self.handle_message(message) + await request.process() except asyncio.CancelledError: break except Exception: # Getting here should probably be considered a bug and fixed - self.log_error('error handling request {}'.format(message)) + self.log_error('error handling request {}'.format(request)) traceback.print_exc() def sub_count(self): From cbb1e504cc37de69096dad5039c2b89bb762cf34 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 8 Dec 2016 06:19:11 +0900 Subject: [PATCH 2/5] Cache headers. --- lib/jsonrpc.py | 94 ++++++++++++++++++++++++---------------------- server/protocol.py | 60 ++++++++++++++++------------- 2 files changed, 83 insertions(+), 71 deletions(-) diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 0496dee..7773c96 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -15,6 +15,53 @@ import time from lib.util import LoggedClass +class SingleRequest(object): + '''An object that represents a single request.''' + def __init__(self, session, payload): + self.payload = payload + self.session = session + + async def process(self): + '''Asynchronously handle the JSON request.''' + binary = await self.session.process_single_payload(self.payload) + if binary: + self.session._send_bytes(binary) + + +class BatchRequest(object): + '''An object that represents a batch request and its processing state. + + Batches are processed in parts chunks. + ''' + + CUHNK_SIZE = 3 + + def __init__(self, session, payload): + self.session = session + self.payload = payload + self.done = 0 + self.parts = [] + + async def process(self): + '''Asynchronously handle the JSON batch according to the JSON 2.0 + spec.''' + for n in range(self.CHUNK_SIZE): + if self.done >= len(self.payload): + if self.parts: + binary = b'[' + b', '.join(self.parts) + b']' + self.session._send_bytes(binary) + return + item = self.payload[self.done] + part = await self.session.process_single_payload(item) + if part: + self.parts.append(part) + self.done += 1 + + # Re-enqueue to continue the rest later + self.session.enqueue_request(self) + return b'' + + class JSONRPC(asyncio.Protocol, LoggedClass): '''Manages a JSONRPC connection. @@ -53,48 +100,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.msg = msg self.code = code - class SingleRequest(object): - '''An object that represents a single request.''' - def __init__(self, session, payload): - self.payload = payload - self.session = session - - async def process(self): - '''Asynchronously handle the JSON request.''' - binary = await self.session.process_single_payload(self.payload) - if binary: - self.session._send_bytes(binary) - - class BatchRequest(object): - '''An object that represents a batch request and its processing - state.''' - def __init__(self, session, payload): - self.session = session - self.payload = payload - self.done = 0 - self.parts = [] - - async def process(self): - '''Asynchronously handle the JSON batch according to the JSON 2.0 - spec.''' - if not self.payload: - raise JSONRPC.RPCError('empty batch', self.INVALID_REQUEST) - for n in range(self.session.batch_limit): - if self.done >= len(self.payload): - if self.parts: - binary = b'[' + b', '.join(self.parts) + b']' - self.session._send_bytes(binary) - return - item = self.payload[self.done] - part = await self.session.process_single_payload(item) - if part: - self.parts.append(part) - self.done += 1 - - # Re-enqueue to continue the rest later - self.session.enqueue_request(self) - return b'' - @classmethod def request_payload(cls, method, id_, params=None): payload = {'jsonrpc': '2.0', 'id': id_, 'method': method} @@ -129,7 +134,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass): self.bandwidth_interval = 3600 self.bandwidth_used = 0 self.bandwidth_limit = 5000000 - self.batch_limit = 4 self.transport = None # Parts of an incomplete JSON line. We buffer them until # getting a newline. @@ -239,9 +243,9 @@ class JSONRPC(asyncio.Protocol, LoggedClass): if not message: self.send_json_error('empty batch', self.INVALID_REQUEST) return - request = self.BatchRequest(self, message) + request = BatchRequest(self, message) else: - request = self.SingleRequest(self, message) + request = SingleRequest(self, message) '''Queue the request for asynchronous handling.''' self.enqueue_request(request) diff --git a/server/protocol.py b/server/protocol.py index 2711ee9..07a6533 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -218,8 +218,13 @@ class ServerManager(util.LoggedClass): MgrTask = namedtuple('MgrTask', 'session task') + class NotificationRequest(object): + def __init__(self, fn_call): + self.process = fn_call + def __init__(self, env): super().__init__() + self.start = time.time() self.bp = BlockProcessor(self, env) self.mempool = MemPool(self.bp.daemon, env.coin, self.bp, self) self.irc = IRC(env) @@ -231,7 +236,9 @@ class ServerManager(util.LoggedClass): self.max_subs = env.max_subs self.subscription_count = 0 self.next_stale_check = 0 - self.history_cache = pylru.lrucache(128) + self.history_cache = pylru.lrucache(256) + self.header_cache = pylru.lrucache(8) + self.height = 0 self.futures = [] env.max_send = max(350000, env.max_send) self.logger.info('session timeout: {:,d} seconds' @@ -316,21 +323,19 @@ class ServerManager(util.LoggedClass): sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile) await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc) - class NotificationRequest(object): - def __init__(self, fn_call): - self.process = fn_call - def notify(self, touched): '''Notify sessions about height changes and touched addresses.''' - # Remove invalidated history cache + # Invalidate caches hc = self.history_cache for hash168 in set(hc).intersection(touched): del hc[hash168] - cache = {} + if self.bp.db_height != self.height: + self.height = self.bp.db_height + self.header_cache.clear() + for session in self.sessions: if isinstance(session, ElectrumX): - fn_call = partial(session.notify, self.bp.db_height, touched, - cache) + fn_call = partial(session.notify, self.bp.db_height, touched) session.enqueue_request(self.NotificationRequest(fn_call)) # Periodically log sessions if self.env.log_sessions and time.time() > self.next_log_sessions: @@ -340,6 +345,17 @@ class ServerManager(util.LoggedClass): self.logger.info(json.dumps(self.server_summary())) self.next_log_sessions = time.time() + self.env.log_sessions + def electrum_header(self, height): + '''Return the binary header at the given height.''' + if not 0 <= height <= self.bp.db_height: + raise self.RPCError('height {:,d} out of range'.format(height)) + if height in self.header_cache: + return self.header_cache[height] + header = self.bp.read_headers(height, 1) + header = self.env.coin.electrum_header(header, height) + self.header_cache[height] = header + return header + async def async_get_history(self, hash168): if hash168 in self.history_cache: return self.history_cache[hash168] @@ -561,6 +577,7 @@ class Session(JSONRPC): self.max_send = env.max_send self.bandwidth_limit = env.bandwidth_limit self.txs_sent = 0 + self.bucket = int(self.start - self.manager.start) // 60 def is_closing(self): '''True if this session is closing.''' @@ -676,7 +693,7 @@ class ElectrumX(Session): def sub_count(self): return len(self.hash168s) - async def notify(self, height, touched, cache): + async def notify(self, height, touched): '''Notify the client about changes in height and touched addresses. Cache is a shared cache for this update. @@ -684,13 +701,11 @@ class ElectrumX(Session): if height != self.notified_height: self.notified_height = height if self.subscribe_headers: - key = 'headers_payload' - if key not in cache: - cache[key] = self.notification_payload( - 'blockchain.headers.subscribe', - (self.electrum_header(height), ), - ) - self.encode_and_send_payload(cache[key]) + payload = self.notification_payload( + 'blockchain.headers.subscribe', + (self.manager.electrum_header(height), ), + ) + self.encode_and_send_payload(payload) if self.subscribe_height: payload = self.notification_payload( @@ -717,14 +732,7 @@ class ElectrumX(Session): def current_electrum_header(self): '''Used as response to a headers subscription request.''' - return self.electrum_header(self.height()) - - def electrum_header(self, height): - '''Return the binary header at the given height.''' - if not 0 <= height <= self.height(): - raise self.RPCError('height {:,d} out of range'.format(height)) - header = self.bp.read_headers(height, 1) - return self.coin.electrum_header(header, height) + return self.manager.electrum_header(self.height()) async def address_status(self, hash168): '''Returns status as 32 bytes.''' @@ -848,7 +856,7 @@ class ElectrumX(Session): async def block_get_header(self, params): height = self.params_to_non_negative_integer(params) - return self.electrum_header(height) + return self.manager.electrum_header(height) async def estimatefee(self, params): return await self.daemon_request('estimatefee', params) From 73994522940db05a9cab7cc9144eacdefd208b78 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Thu, 8 Dec 2016 22:47:29 +0900 Subject: [PATCH 3/5] Temporary hack to fix initial sync slowdown --- server/block_processor.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/block_processor.py b/server/block_processor.py index 0ad73a9..b4f749c 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -223,7 +223,10 @@ class BlockProcessor(server.db.DB): touched = set() loop = asyncio.get_event_loop() try: - await loop.run_in_executor(None, do_it) + if self.caught_up: + await loop.run_in_executor(None, do_it) + else: + do_it() except ChainReorg: await self.handle_chain_reorg(touched) From 337f351b6ec206bf46aa0a03df41ec485db8c04a Mon Sep 17 00:00:00 2001 From: Johann Bauer Date: Thu, 8 Dec 2016 15:12:18 +0100 Subject: [PATCH 4/5] Tweak systemd unit file settings - Set the open file limit to 8192, which should hopefully be enough - Set the timeout for process termination to 30 minutes. Systemd will send another SIGTERM after 30 minutes and then a SIGKILL after 60 minutes. That should be plenty of time for electrumx to cleanly finish whatever it's doing --- samples/systemd/electrumx.service | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/samples/systemd/electrumx.service b/samples/systemd/electrumx.service index 46107f3..808ff7b 100644 --- a/samples/systemd/electrumx.service +++ b/samples/systemd/electrumx.service @@ -6,6 +6,8 @@ After=network.target EnvironmentFile=/etc/electrumx.conf ExecStart=/usr/local/bin/electrumx_server.py User=electrumx +LimitNOFILE=8192 +TimeoutStopSec=30min [Install] -WantedBy=multi-user.target \ No newline at end of file +WantedBy=multi-user.target From 21d1825f09c1bb55d414681663b4d3e277d26ac4 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Fri, 9 Dec 2016 07:41:24 +0900 Subject: [PATCH 5/5] Prepare 0.8.7 --- RELEASE-NOTES | 7 +++++++ server/version.py | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/RELEASE-NOTES b/RELEASE-NOTES index 4ea4c53..fceca12 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,3 +1,10 @@ +version 0.8.7 +------------- + +- update systemd config (bauerj) +- temporary fix for initial sync times +- continued JSON code refactoring + version 0.8.6 ------------- diff --git a/server/version.py b/server/version.py index e50ac0a..5cd0726 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.8.6" +VERSION = "ElectrumX 0.8.7"