From aaf0592f52577e29d0e0346993a471a0de5265cb Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 3 Dec 2016 15:10:57 +0900 Subject: [PATCH 1/5] Fix typo --- server/protocol.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/server/protocol.py b/server/protocol.py index 531b3e0..d714ded 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -376,15 +376,12 @@ class ServerManager(util.LoggedClass): '''Close the session's transport and cancel its future.''' session.transport.close() self.sessions[session].cancel() - return '{:d} disconnected'.format(session.id_) + return 'disconnected {:d}'.format(session.id_) def toggle_logging(self, session): - '''Close the session's transport and cancel its future.''' + '''Toggle logging of the session.''' session.log_me = not session.log_me - if session.log_me: - return 'logging {:d}'.format(session.id_) - else: - return 'not logging {:d}'.format(session.id_) + return 'log {:d}: {}'.format(session.id_, session.log_me) def clear_stale_sessions(self): '''Cut off sessions that haven't done anything for 10 minutes.''' From 04369dd228057568d34ab4e67398106c1f9bb645 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 3 Dec 2016 18:33:23 +0900 Subject: [PATCH 2/5] Forcefully drop stale sessions or if shutting down Don't wait for the socket --- electrumx_server.py | 2 +- server/protocol.py | 29 ++++++++++++++++++++--------- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/electrumx_server.py b/electrumx_server.py index a749949..7129a8a 100755 --- a/electrumx_server.py +++ b/electrumx_server.py @@ -41,7 +41,7 @@ def main_loop(): def on_exception(loop, context): '''Suppress spurious messages it appears we cannot control.''' message = context.get('message') - if not loop.is_closed() and not message in SUPPRESS_MESSAGES: + if not message in SUPPRESS_MESSAGES: if not ('task' in context and 'accept_connection2()' in repr(context.get('task'))): loop.default_exception_handler(context) diff --git a/server/protocol.py b/server/protocol.py index d714ded..4df4fde 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -351,8 +351,13 @@ class ServerManager(util.LoggedClass): self.logger.info('server listening sockets closed, waiting ' '{:d} seconds for socket cleanup'.format(secs)) limit = time.time() + secs - while self.sessions and time.time() < limit: - await asyncio.sleep(4) + while self.sessions: + if time.time() < limit: + await asyncio.sleep(4) + else: + for session in list(self.sessions): + self.close_session(session, hard=True) + await asyncio.sleep(0) self.logger.info('{:,d} sessions remaining' .format(len(self.sessions))) @@ -368,14 +373,20 @@ class ServerManager(util.LoggedClass): self.close_session(session) def remove_session(self, session): - self.subscription_count -= session.sub_count() - future = self.sessions.pop(session) - future.cancel() + # It might have been forcefully removed earlier by close_session() + if session in self.sessions: + self.subscription_count -= session.sub_count() + future = self.sessions.pop(session) + future.cancel() - def close_session(self, session): + def close_session(self, session, hard=False): '''Close the session's transport and cancel its future.''' session.transport.close() self.sessions[session].cancel() + if hard: + self.remove_session(session) + socket = session.transport.get_extra_info('socket') + socket.close() return 'disconnected {:d}'.format(session.id_) def toggle_logging(self, session): @@ -392,10 +403,10 @@ class ServerManager(util.LoggedClass): stale = [session for session in self.sessions if session.last_recv < cutoff] for session in stale: - self.close_session(session) + self.close_session(session, hard=True) if stale: - self.logger.info('dropped {:,d} stale connections' - .format(len(stale))) + self.logger.info('dropped stale connections {}' + .format([session.id_ for session in stale])) def new_subscription(self): if self.subscription_count >= self.max_subs: From 556574640f8fc81836053f70e960d7d62ba6cfcd Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 3 Dec 2016 20:19:58 +0900 Subject: [PATCH 3/5] IRC encoding fix --- server/irc.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/irc.py b/server/irc.py index 694c81a..40ac9e6 100644 --- a/server/irc.py +++ b/server/irc.py @@ -70,6 +70,11 @@ class IRC(LoggedClass): async def join(self): import irc.client as irc_client + from jaraco.stream import buffer + + # see https://pypi.python.org/pypi/irc under DecodingInput + irc_client.ServerConnection.buffer_class = \ + buffer.LenientDecodingLineBuffer reactor = irc_client.Reactor() for event in ['welcome', 'join', 'quit', 'kick', 'whoreply', From 656f749c70b9d2b5ad7499f5cc2e848f3511397c Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 3 Dec 2016 20:20:28 +0900 Subject: [PATCH 4/5] Add LRU cache for history --- server/protocol.py | 42 +++++++++++++++++++++++++----------------- setup.py | 2 +- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/server/protocol.py b/server/protocol.py index 4df4fde..7745678 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -15,7 +15,9 @@ import ssl import time import traceback from collections import defaultdict, namedtuple -from functools import partial +from functools import partial, lru_cache + +import pylru from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.jsonrpc import JSONRPC, json_notification_payload @@ -229,6 +231,7 @@ class ServerManager(util.LoggedClass): self.max_subs = env.max_subs self.subscription_count = 0 self.next_stale_check = 0 + self.history_cache = pylru.lrucache(512) self.futures = [] env.max_send = max(350000, env.max_send) self.logger.info('session timeout: {:,d} seconds' @@ -329,6 +332,25 @@ class ServerManager(util.LoggedClass): self.logger.info(json.dumps(self.server_summary())) self.next_log_sessions = time.time() + self.env.log_sessions + async def async_get_history(self, hash168): + if hash168 in self.history_cache: + return self.history_cache[hash168] + + # History DoS limit. Each element of history is about 99 + # bytes when encoded as JSON. This limits resource usage on + # bloated history requests, and uses a smaller divisor so + # large requests are logged before refusing them. + limit = self.env.max_send // 97 + # Python 3.6: use async generators; update callers + history = [] + for item in self.bp.get_history(hash168, limit=limit): + history.append(item) + if len(history) % 100 == 0: + await asyncio.sleep(0) + + self.history_cache[hash168] = history + return history + async def shutdown(self): '''Call to shutdown the servers. Returns when done.''' self.bp.shutdown() @@ -724,7 +746,7 @@ class ElectrumX(Session): '''Returns status as 32 bytes.''' # Note history is ordered and mempool unordered in electrum-server # For mempool, height is -1 if unconfirmed txins, otherwise 0 - history = await self.async_get_history(hash168) + history = await self.manager.async_get_history(hash168) mempool = self.manager.mempool_transactions(hash168) status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) @@ -769,7 +791,7 @@ class ElectrumX(Session): async def get_history(self, hash168): # Note history is ordered but unconfirmed is unordered in e-s - history = await self.async_get_history(hash168) + history = await self.manager.async_get_history(hash168) conf = [{'tx_hash': hash_to_str(tx_hash), 'height': height} for tx_hash, height in history] @@ -783,20 +805,6 @@ class ElectrumX(Session): count = min(next_height - start_height, chunk_size) return self.bp.read_headers(start_height, count).hex() - async def async_get_history(self, hash168): - # History DoS limit. Each element of history is about 99 - # bytes when encoded as JSON. This limits resource usage on - # bloated history requests, and uses a smaller divisor so - # large requests are logged before refusing them. - limit = self.max_send // 97 - # Python 3.6: use async generators; update callers - history = [] - for item in self.bp.get_history(hash168, limit=limit): - history.append(item) - if len(history) % 100 == 0: - await asyncio.sleep(0) - return history - async def get_utxos(self, hash168): # Python 3.6: use async generators; update callers utxos = [] diff --git a/setup.py b/setup.py index e8f6574..3a91425 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ setuptools.setup( # "irc" package is only required if IRC connectivity is enabled # via environment variables, in which case I've tested with 15.0.4 # "x11_hash" package (1.4) is required to sync DASH network. - install_requires=['plyvel', 'aiohttp >= 1'], + install_requires=['plyvel', 'pylru', 'aiohttp >= 1'], packages=setuptools.find_packages(), description='ElectrumX Server', author='Neil Booth', From 26de3f0261d99a45c48b026ae637cac9a79f84f7 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 3 Dec 2016 20:24:53 +0900 Subject: [PATCH 5/5] Prepare 0.8.1 --- RELEASE-NOTES | 10 +++++++++- docs/HOWTO.rst | 1 + server/version.py | 2 +- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/RELEASE-NOTES b/RELEASE-NOTES index 5343e4e..79ac010 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,4 +1,12 @@ -verion 0.8.0 +version 0.8.1 +------------- + +** NOTE: this version has a new Python package dependency: pylru + +- fix for IRC encoding killing IRC connection +- add lru cache for history + +version 0.8.0 ------------ - stale connections are periodically closed. See docs/ENV-NOTES for diff --git a/docs/HOWTO.rst b/docs/HOWTO.rst index 2775ec4..20cae89 100644 --- a/docs/HOWTO.rst +++ b/docs/HOWTO.rst @@ -8,6 +8,7 @@ small - patches welcome. + Python3: ElectrumX uses asyncio. Python version >=3.5 is required. + plyvel: Python interface to LevelDB. I am using plyvel-0.9. ++ pylru: Python LRU cache package. I'm using 1.0.9. + aiohttp: Python library for asynchronous HTTP. ElectrumX uses it for communication with the daemon. Version >= 1.0 required; I am using 1.0.5. diff --git a/server/version.py b/server/version.py index 8a6e890..389d504 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.8.0" +VERSION = "ElectrumX 0.8.1"