From 0b52376f23f16945de193cdfc90f02b1211825f4 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 19 Nov 2016 10:49:12 +0900 Subject: [PATCH 1/4] Add subscription limits --- docs/ENV-NOTES | 16 ++++++++++++++-- docs/RELEASE-NOTES | 6 ++++++ server/env.py | 3 +++ server/protocol.py | 36 +++++++++++++++++++++++++++++------- 4 files changed, 52 insertions(+), 9 deletions(-) diff --git a/docs/ENV-NOTES b/docs/ENV-NOTES index 14ab73a..fb62edd 100644 --- a/docs/ENV-NOTES +++ b/docs/ENV-NOTES @@ -32,8 +32,20 @@ RPC_PORT - Listen on this port for local RPC connections, defaults to 8000. BANNER_FILE - a path to a banner file to serve to clients. The banner file is re-read for each new client. -DONATION_ADDRESS - server donation address. Defaults to none. -ANON_LOGS - set to remove IP addresses from logs. Default: disabled +ANON_LOGS - set to anything non-empty to remove IP addresses from + logs. By default IP addresses will be logged. +DONATION_ADDRESS - server donation address. Defaults to none. + +These following environment variables are to help limit server +resource consumption and to prevent simple DoS. Address subscriptions +in ElectrumX are very cheap - they consume about 100 bytes of memory +each and are processed efficiently. I feel the defaults are low and +encourage you to raise them. + +MAX_SUBS - maximum number of address subscriptions across all + sessions. Defaults to 250,000. +MAX_SESSION_SUBS - maximum number of address subscriptions permitted to a + single session. Defaults to 50,000. If you want IRC connectivity to advertise your node: diff --git a/docs/RELEASE-NOTES b/docs/RELEASE-NOTES index 94e59be..a7f5c2a 100644 --- a/docs/RELEASE-NOTES +++ b/docs/RELEASE-NOTES @@ -1,3 +1,9 @@ +version 0.6.3 +------------- + +- new environment variables MAX_SUBS and MAX_SESSION_SUBS. Please read + docs/ENV-NOTES - I encourage you to raise the default values. + version 0.6.2 ------------- diff --git a/server/env.py b/server/env.py index ac1b46a..2c490f2 100644 --- a/server/env.py +++ b/server/env.py @@ -46,6 +46,9 @@ class Env(LoggedClass): self.db_engine = self.default('DB_ENGINE', 'leveldb') self.debug = self.default('DEBUG', '') self.debug = [item.lower() for item in self.debug.split()] + # Subscription limits + self.max_subs = self.integer('MAX_SUBS', 250000) + self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000) # IRC self.report_tcp_port = self.integer('REPORT_TCP_PORT', self.tcp_port) self.report_ssl_port = self.integer('REPORT_SSL_PORT', self.ssl_port) diff --git a/server/protocol.py b/server/protocol.py index c0f062e..d539e86 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -246,7 +246,13 @@ class ServerManager(LoggedClass): self.servers = [] self.irc = IRC(env) self.sessions = {} + self.max_subs = env.max_subs + self.subscription_count = 0 self.futures = [] # At present just the IRC future, if any + self.logger.info('max subscriptions across all sessions: {:,d}' + .format(self.max_subs)) + self.logger.info('max subscriptions per session: {:,d}' + .format(env.max_session_subs)) async def start_server(self, kind, *args, **kw_args): loop = asyncio.get_event_loop() @@ -318,9 +324,17 @@ class ServerManager(LoggedClass): self.sessions[session] = asyncio.ensure_future(coro) def remove_session(self, session): + if isinstance(session, ElectrumX): + self.subscription_count -= len(session.hash168s) future = self.sessions.pop(session) future.cancel() + def new_subscription(self): + if self.subscription_count >= self.max_subs: + raise JSONRPC.RPCError('server subscription limit {:,d} reached' + .format(self.max_subs)) + self.subscription_count += 1 + def irc_peers(self): return self.irc.peers @@ -330,18 +344,19 @@ class ServerManager(LoggedClass): total = len(self.sessions) return {'active': active, 'inert': total - active, 'total': total} - def address_count(self): - return sum(len(session.hash168s) for session in self.sessions - if isinstance(session, ElectrumX)) - async def rpc_getinfo(self, params): '''The RPC 'getinfo' call.''' + # FIXME: remove later + indep_count = sum(len(session.hash168s) for session in self.sessions + if isinstance(session, ElectrumX)) + if indep_count != self.subscription_count: + self.logger.error('sub count {:,d} but session total {:,d}' + .format(self.subscription_count, indep_count)) return { 'blocks': self.bp.height, 'peers': len(self.irc.peers), 'sessions': self.session_count(), - 'watched': self.address_count(), - 'cached': 0, + 'watched': self.subscription_count, } async def rpc_sessions(self, params): @@ -503,6 +518,7 @@ class ElectrumX(Session): self.subscribe_headers = False self.subscribe_height = False self.notified_height = None + self.max_subs = self.env.max_session_subs self.hash168s = set() rpcs = [ ('blockchain', @@ -689,8 +705,14 @@ class ElectrumX(Session): async def address_subscribe(self, params): hash168 = self.extract_hash168(params) + if len(self.hash168s) >= self.max_subs: + raise self.RPCError('your address subscription limit {:,d} reached' + .format(self.max_subs)) + result = await self.address_status(hash168) + # add_subscription can raise so call it before adding + self.manager.new_subscription() self.hash168s.add(hash168) - return await self.address_status(hash168) + return result async def block_get_chunk(self, params): index = self.extract_non_negative_integer(params) From 3d87e299ea958cca7426dcf08a0e0e3b08b5aa43 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 19 Nov 2016 15:42:06 +0900 Subject: [PATCH 2/4] Move formatted_time to library --- lib/util.py | 8 ++++++++ server/block_processor.py | 10 +--------- server/db.py | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/util.py b/lib/util.py index f41d4f2..3feab9d 100644 --- a/lib/util.py +++ b/lib/util.py @@ -37,6 +37,14 @@ class cachedproperty(object): return value +def formatted_time(t): + '''Return a number of seconds as a string in days, hours, mins and + secs.''' + t = int(t) + return '{:d}d {:02d}h {:02d}m {:02d}s'.format( + t // 86400, (t % 86400) // 3600, (t % 3600) // 60, t % 60) + + def deep_getsizeof(obj): """Find the memory footprint of a Python object. diff --git a/server/block_processor.py b/server/block_processor.py index 55a28f0..d9fd11f 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -21,7 +21,7 @@ from functools import partial from server.daemon import Daemon, DaemonError from server.version import VERSION from lib.hash import hash_to_str -from lib.util import chunks, LoggedClass +from lib.util import chunks, formatted_time, LoggedClass import server.db from server.storage import open_db @@ -30,14 +30,6 @@ HIST_ENTRIES_PER_KEY = 1024 HIST_VALUE_BYTES = HIST_ENTRIES_PER_KEY * 4 -def formatted_time(t): - '''Return a number of seconds as a string in days, hours, mins and - secs.''' - t = int(t) - return '{:d}d {:02d}h {:02d}m {:02d}s'.format( - t // 86400, (t % 86400) // 3600, (t % 3600) // 60, t % 60) - - class ChainError(Exception): pass diff --git a/server/db.py b/server/db.py index c44b17c..c1b3ff9 100644 --- a/server/db.py +++ b/server/db.py @@ -15,7 +15,7 @@ from struct import pack, unpack from bisect import bisect_right from collections import namedtuple -from lib.util import chunks, LoggedClass +from lib.util import chunks, formatted_time, LoggedClass from lib.hash import double_sha256, hash_to_str from server.storage import open_db from server.version import VERSION From 52116539d4ad53ae0bec797ca669db5226f387c4 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 19 Nov 2016 17:05:47 +0900 Subject: [PATCH 3/4] Clean shutdown: wait for transports to close We give 10 seconds for everything to be cleaned up, then close forcibly. Fixes #30 --- server/block_processor.py | 7 +++-- server/protocol.py | 56 ++++++++++++++++++++++++--------------- 2 files changed, 40 insertions(+), 23 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index d9fd11f..9373bc9 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -208,8 +208,7 @@ class BlockProcessor(server.db.DB): await self._wait_for_update() except asyncio.CancelledError: self.on_cancel() - # This lets the asyncio subsystem process futures cancellations - await asyncio.sleep(0) + await self.wait_shutdown() def on_cancel(self): '''Called when the main loop is cancelled. @@ -219,6 +218,10 @@ class BlockProcessor(server.db.DB): future.cancel() self.flush(True) + async def wait_shutdown(self): + '''Wait for shutdown to complete cleanly, and return.''' + await asyncio.sleep(0) + async def _wait_for_update(self): '''Wait for the prefetcher to deliver blocks or a mempool update. diff --git a/server/protocol.py b/server/protocol.py index d539e86..07c4073 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -54,6 +54,11 @@ class BlockServer(BlockProcessor): self.server_mgr.stop() super().on_cancel() + async def wait_shutdown(self): + '''Wait for shutdown to complete cleanly, and return.''' + await self.server_mgr.wait_shutdown() + await super().wait_shutdown() + def mempool_transactions(self, hash168): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool entries for the hash168. @@ -141,7 +146,7 @@ class MemPool(LoggedClass): for n, (hex_hash, tx) in enumerate(new_txs.items()): # Yield to process e.g. signals - if n % 100 == 0: + if n % 20 == 0: await asyncio.sleep(0) txout_pairs = [txout_pair(txout) for txout in tx.outputs] self.txs[hex_hash] = (None, txout_pairs, None) @@ -162,8 +167,7 @@ class MemPool(LoggedClass): # Now add the inputs for n, (hex_hash, tx) in enumerate(new_txs.items()): # Yield to process e.g. signals - if n % 10 == 0: - await asyncio.sleep(0) + await asyncio.sleep(0) if initial and time.time() > next_log: next_log = time.time() + 20 @@ -248,7 +252,7 @@ class ServerManager(LoggedClass): self.sessions = {} self.max_subs = env.max_subs self.subscription_count = 0 - self.futures = [] # At present just the IRC future, if any + self.irc_future = None self.logger.info('max subscriptions across all sessions: {:,d}' .format(self.max_subs)) self.logger.info('max subscriptions per session: {:,d}' @@ -263,8 +267,6 @@ class ServerManager(LoggedClass): host, port = args[:2] try: self.servers.append(await server) - except asyncio.CancelledError: - raise except Exception as e: self.logger.error('{} server failed to listen on {}:{:d} :{}' .format(kind, host, port, e)) @@ -294,7 +296,7 @@ class ServerManager(LoggedClass): if env.irc: self.logger.info('starting IRC coroutine') - self.futures.append(asyncio.ensure_future(self.irc.start())) + self.irc_future = asyncio.ensure_future(self.irc.start()) else: self.logger.info('IRC disabled') @@ -308,24 +310,42 @@ class ServerManager(LoggedClass): def stop(self): '''Close listening servers.''' + self.logger.info('cleanly closing client sessions, please wait...') for server in self.servers: server.close() + if self.irc_future: + self.irc_future.cancel() + for session in self.sessions: + session.transport.close() + + async def wait_shutdown(self): + # Wait for servers to close + for server in self.servers: + await server.wait_closed() + # Just in case a connection came in + await asyncio.sleep(0) self.servers = [] - for future in self.futures: - future.cancel() - self.futures = [] - sessions = list(self.sessions.keys()) # A copy - for session in sessions: - self.remove_session(session) + self.logger.info('server listening sockets closed') + limit = time.time() + 10 + while self.sessions and time.time() < limit: + self.logger.info('{:,d} sessions remaining' + .format(len(self.sessions))) + await asyncio.sleep(2) + if self.sessions: + self.logger.info('forcibly closing {:,d} stragglers' + .format(len(self.sessions))) + for future in self.sessions.values(): + future.cancel() + await asyncio.sleep(0) def add_session(self, session): + assert self.servers assert session not in self.sessions coro = session.serve_requests() self.sessions[session] = asyncio.ensure_future(coro) def remove_session(self, session): - if isinstance(session, ElectrumX): - self.subscription_count -= len(session.hash168s) + self.subscription_count -= session.sub_count() future = self.sessions.pop(session) future.cancel() @@ -346,12 +366,6 @@ class ServerManager(LoggedClass): async def rpc_getinfo(self, params): '''The RPC 'getinfo' call.''' - # FIXME: remove later - indep_count = sum(len(session.hash168s) for session in self.sessions - if isinstance(session, ElectrumX)) - if indep_count != self.subscription_count: - self.logger.error('sub count {:,d} but session total {:,d}' - .format(self.subscription_count, indep_count)) return { 'blocks': self.bp.height, 'peers': len(self.irc.peers), From d856cbab90be403897c9451341ba23f332172f5e Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 19 Nov 2016 17:10:58 +0900 Subject: [PATCH 4/4] Prepare 0.6.3 --- docs/RELEASE-NOTES | 2 ++ server/version.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/RELEASE-NOTES b/docs/RELEASE-NOTES index a7f5c2a..4b505c3 100644 --- a/docs/RELEASE-NOTES +++ b/docs/RELEASE-NOTES @@ -3,6 +3,8 @@ version 0.6.3 - new environment variables MAX_SUBS and MAX_SESSION_SUBS. Please read docs/ENV-NOTES - I encourage you to raise the default values. +- fixed import bug in 0.6.2 that prevented initial sync +- issues closed: #30. Logs should be clean on shutdown now. version 0.6.2 ------------- diff --git a/server/version.py b/server/version.py index 9cd7c85..5a9fd44 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.6.2" +VERSION = "ElectrumX 0.6.3"