From c03a72faa6bb8d4ceb3d08f8b15c28cf32a77487 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Wed, 10 Apr 2019 14:35:19 +0100 Subject: [PATCH] Generic session costing * new env vars: COST_SOFT_LIMIT, COST_HARD_LIMIT, BANDWIDTH_UNIT_COST * obsolete: BANDWIDTH_LIMIT, MAX_SESSION_SUBS, MAX_SUBS * requires aiorpcX 0.12.0 --- docs/environment.rst | 41 ++++++++------- electrumx/lib/text.py | 15 +++--- electrumx/server/controller.py | 4 +- electrumx/server/env.py | 9 ++-- electrumx/server/session.py | 92 ++++++++++++++++++++-------------- setup.py | 2 +- tests/server/test_env.py | 28 +++++------ 7 files changed, 102 insertions(+), 89 deletions(-) diff --git a/docs/environment.rst b/docs/environment.rst index d7aa6ba..fdbdc36 100644 --- a/docs/environment.rst +++ b/docs/environment.rst @@ -246,33 +246,36 @@ raise them. hexadecimal ASCII characters on the wire. Very few transactions on Bitcoin mainnet are over 500KB in size. -.. envvar:: MAX_SUBS +.. envvar:: COST_SOFT_LIMIT +.. envvar:: COST_HARD_LIMIT - The maximum number of address subscriptions across all sessions. - Defaults to 250,000. + Session cost soft and hard limits as integers. The default values are :const:`2,000` + and :const:`20,000` respectively. -.. envvar:: MAX_SESSION_SUBS + The server prices each request made to it based upon an estimate of the resources needed + to process it . Factors include whether the request uses bitcoind, how much bandwidth + it uses, and how hard it hits the databases. - The maximum number of address subscriptions permitted to a single - session. Defaults to 50,000. + To set a base for the units, a :func:`blockchain.scripthash.subscribe` subscription to + an address with a history of 2 or fewer transactions is costed at :const:`1.0` before + considering the bandwidth consumed. :func:`server.ping` is costed at :const:`0.1`. -.. envvar:: BANDWIDTH_LIMIT + As the total cost of a session goes over the soft limit, its requests start to be + throttled in two ways. First, each request sleeps a little before being handled. + Second, the number of requests that the server will handle concurrently reduces. Both + effects increase as the hard limit is approached, at which point the session is + disconnected. - Per-session periodic bandwidth usage limit in bytes. This is a soft, - not hard, limit. Currently the period is hard-coded to be one hour. - The default limit value is 2 million bytes. + So non-abusive sessions can continue to be served, a session's cost gradually decays + over time. Subscriptions have an ongoing servicing cost, so the decay is slower as the + number of subscriptions increases. - Bandwidth usage over each period is totalled, and when this limit is - exceeded each subsequent request is stalled by sleeping before - handling it, effectively giving higher processing priority to other - sessions. +.. envvar:: BANDWIDTH_UNIT_COST - The more bandwidth usage exceeds this soft limit the longer the next - request will sleep. Sleep times are a round number of seconds with - a minimum of 1. Each time the delay changes the event is logged. + The number of bytes, sent and received, by a session that is deemed to cost 1.0. - Bandwidth usage is gradually reduced over time by "refunding" a - proportional part of the limit every now and then. + The default value :const:`5000` bytes, meaning the bandwidth cost assigned to a response + of 100KB is 20. If your bandwidth is cheap you should probably raise this. .. envvar:: SESSION_TIMEOUT diff --git a/electrumx/lib/text.py b/electrumx/lib/text.py index 8546eb9..20299bf 100644 --- a/electrumx/lib/text.py +++ b/electrumx/lib/text.py @@ -7,14 +7,15 @@ def sessions_lines(data): '''A generator returning lines for a list of sessions. data is the return value of rpc_sessions().''' - fmt = ('{:<6} {:<5} {:>17} {:>5} {:>5} {:>5} ' + fmt = ('{:<6} {:<5} {:>17} {:>5} {:>7} {:>5} {:>5} ' '{:>7} {:>7} {:>7} {:>7} {:>7} {:>9} {:>21}') yield fmt.format('ID', 'Flags', 'Client', 'Proto', - 'Reqs', 'Txs', 'Subs', + 'Cost', 'Reqs', 'Txs', 'Subs', 'Recv', 'Recv KB', 'Sent', 'Sent KB', 'Time', 'Peer') - for (id_, flags, peer, client, proto, reqs, txs_sent, subs, + for (id_, flags, peer, client, proto, cost, reqs, txs_sent, subs, recv_count, recv_size, send_count, send_size, time) in data: yield fmt.format(id_, flags, client, proto, + '{:,d}'.format(int(cost)), '{:,d}'.format(reqs), '{:,d}'.format(txs_sent), '{:,d}'.format(subs), @@ -30,15 +31,15 @@ def groups_lines(data): data is the return value of rpc_groups().''' - fmt = ('{:<6} {:>9} {:>9} {:>6} {:>6} {:>8}' + fmt = ('{:<6} {:>9} {:>8} {:>6} {:>6} {:>8}' '{:>7} {:>9} {:>7} {:>9}') - yield fmt.format('ID', 'Sessions', 'Bwidth KB', 'Reqs', 'Txs', 'Subs', + yield fmt.format('ID', 'Sessions', 'Cost', 'Reqs', 'Txs', 'Subs', 'Recv', 'Recv KB', 'Sent', 'Sent KB') - for (id_, session_count, bandwidth, reqs, txs_sent, subs, + for (id_, session_count, cost, reqs, txs_sent, subs, recv_count, recv_size, send_count, send_size) in data: yield fmt.format(id_, '{:,d}'.format(session_count), - '{:,d}'.format(bandwidth // 1024), + '{:,d}'.format(int(cost)), '{:,d}'.format(reqs), '{:,d}'.format(txs_sent), '{:,d}'.format(subs), diff --git a/electrumx/server/controller.py b/electrumx/server/controller.py index 868a1f3..d02dfa1 100644 --- a/electrumx/server/controller.py +++ b/electrumx/server/controller.py @@ -82,8 +82,8 @@ class Controller(ServerBase): '''Start the RPC server and wait for the mempool to synchronize. Then start serving external clients. ''' - if not (0, 11, 0) <= aiorpcx_version < (0, 12): - raise RuntimeError('aiorpcX version 0.11.x is required') + if not (0, 12, 0) <= aiorpcx_version < (0, 13): + raise RuntimeError('aiorpcX version 0.12.x is required') env = self.env min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings() diff --git a/electrumx/server/env.py b/electrumx/server/env.py index df287e8..1c90745 100644 --- a/electrumx/server/env.py +++ b/electrumx/server/env.py @@ -31,7 +31,7 @@ class Env(EnvBase): def __init__(self, coin=None): super().__init__() - self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK']) + self.obsolete([]) self.db_dir = self.required('DB_DIRECTORY') self.db_engine = self.default('DB_ENGINE', 'leveldb') self.daemon_url = self.required('DAEMON_URL') @@ -52,7 +52,6 @@ class Env(EnvBase): self.ssl_certfile = self.required('SSL_CERTFILE') self.ssl_keyfile = self.required('SSL_KEYFILE') self.rpc_port = self.integer('RPC_PORT', 8000) - self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000) self.banner_file = self.default('BANNER_FILE', None) self.tor_banner_file = self.default('TOR_BANNER_FILE', self.banner_file) @@ -68,10 +67,10 @@ class Env(EnvBase): self.donation_address = self.default('DONATION_ADDRESS', '') # Server limits to help prevent DoS self.max_send = self.integer('MAX_SEND', self.coin.DEFAULT_MAX_SEND) - self.max_subs = self.integer('MAX_SUBS', 250000) self.max_sessions = self.sane_max_sessions() - self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000) - self.bandwidth_limit = self.integer('BANDWIDTH_LIMIT', 2000000) + self.cost_soft_limit = self.integer('COST_SOFT_LIMIT', 2000) + self.cost_hard_limit = self.integer('COST_HARD_LIMIT', 20000) + self.bw_unit_cost = self.integer('BANDWIDTH_UNIT_COST', 5000) self.session_timeout = self.integer('SESSION_TIMEOUT', 600) self.drop_client = self.custom("DROP_CLIENT", None, re.compile) self.blacklist_url = self.default('BLACKLIST_URL', self.coin.BLACKLIST_URL) diff --git a/electrumx/server/session.py b/electrumx/server/session.py index a48fc4e..7de6e0a 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -120,14 +120,11 @@ class SessionManager(object): self.logger = util.class_logger(__name__, self.__class__.__name__) self.servers = {} self.sessions = set() - self.max_subs = env.max_subs self.cur_group = SessionGroup(0) self.txs_sent = 0 self.start_time = time.time() self.history_cache = pylru.lrucache(256) self.notified_height = None - # Cache some idea of room to avoid recounting on each subscription - self.subs_room = 0 # Masternode stuff only for such coins if issubclass(env.coin.SESSIONCLS, DashElectrumX): self.mn_cache_height = 0 @@ -267,11 +264,10 @@ class SessionManager(object): await group.spawn(session.close()) # Consolidate small groups - bw_limit = self.env.bandwidth_limit group_map = self._group_map() groups = [group for group, sessions in group_map.items() if len(sessions) <= 5 and - sum(s.bw_charge for s in sessions) < bw_limit] + sum(s.cost for s in sessions) < self.env.cost_soft_limit] if len(groups) > 1: new_group = groups[-1] for group in groups: @@ -310,6 +306,7 @@ class SessionManager(object): session.peer_address_str(for_log=for_log), session.client, session.protocol_version_string(), + session.cost, session.count_pending_items(), session.txs_sent, session.sub_count(), @@ -325,7 +322,7 @@ class SessionManager(object): for group, sessions in group_map.items(): result.append([group.gid, len(sessions), - sum(s.bw_charge for s in sessions), + sum(s.cost for s in sessions), sum(s.count_pending_items() for s in sessions), sum(s.txs_sent for s in sessions), sum(s.sub_count() for s in sessions), @@ -486,17 +483,19 @@ class SessionManager(object): await self._start_server('RPC', self.env.cs_host(for_rpc=True), self.env.rpc_port) await event.wait() + + session_class = self.env.coin.SESSIONCLS + session_class.cost_soft_limit = self.env.cost_soft_limit + session_class.cost_hard_limit = self.env.cost_hard_limit + session_class.cost_decay_per_sec = session_class.cost_hard_limit / 3600 + session_class.bw_cost_per_byte = 1.0 / self.env.bw_unit_cost + self.logger.info(f'max session count: {self.env.max_sessions:,d}') - self.logger.info(f'session timeout: ' - f'{self.env.session_timeout:,d} seconds') - self.logger.info('session bandwidth limit {:,d} bytes' - .format(self.env.bandwidth_limit)) - self.logger.info('max response size {:,d} bytes' - .format(self.env.max_send)) - self.logger.info('max subscriptions across all sessions: {:,d}' - .format(self.max_subs)) - self.logger.info('max subscriptions per session: {:,d}' - .format(self.env.max_session_subs)) + self.logger.info(f'session timeout: {self.env.session_timeout:,d} seconds') + self.logger.info(f'session cost hard limit {self.env.cost_hard_limit:,d}') + self.logger.info(f'session cost soft limit {self.env.cost_soft_limit:,d}') + self.logger.info(f'bandwidth unit cost {self.env.bw_unit_cost:,d}') + self.logger.info(f'max response size {self.env.max_send:,d} bytes') if self.env.drop_client is not None: self.logger.info('drop clients matching: {}' .format(self.env.drop_client.pattern)) @@ -590,14 +589,6 @@ class SessionManager(object): self.sessions.remove(session) self.session_event.set() - def new_subscription(self): - if self.subs_room <= 0: - self.subs_room = self.max_subs - self._sub_count() - if self.subs_room <= 0: - raise RPCError(BAD_REQUEST, f'server subscription limit ' - f'{self.max_subs:,d} reached') - self.subs_room -= 1 - class SessionBase(RPCSession): '''Base class of ElectrumX JSON sessions. @@ -624,7 +615,6 @@ class SessionBase(RPCSession): self.anon_logs = self.env.anon_logs self.txs_sent = 0 self.log_me = False - self.bw_limit = self.env.bandwidth_limit self.daemon_request = self.session_mgr.daemon_request # Hijack the connection so we can log messages self._receive_message_orig = self.connection.receive_message @@ -674,7 +664,7 @@ class SessionBase(RPCSession): msg = '' if not self._can_send.is_set(): msg += ' with full socket buffer' - if self._concurrency.max_concurrent != self.max_concurrent: + if self._concurrency.max_concurrent != self.initial_concurrent: msg += ' whilst throttled' if self.send_size >= 1024*1024: msg += ('. Sent {:,d} bytes in {:,d} messages' @@ -716,7 +706,6 @@ class ElectrumX(SessionBase): self.subscribe_headers = False self.subscribe_headers_raw = False self.connection.max_response_size = self.env.max_send - self.max_subs = self.env.max_session_subs self.hashX_subs = {} self.sv_seen = False self.mempool_statuses = {} @@ -743,6 +732,7 @@ class ElectrumX(SessionBase): } async def server_features_async(self): + self.bump_cost(0.2) return self.server_features(self.env) @classmethod @@ -802,6 +792,7 @@ class ElectrumX(SessionBase): async def _headers_subscribe(self, raw): '''Subscribe to get headers of new blocks.''' + self.bump_cost(0.25) self.subscribe_headers_raw = assert_boolean(raw) self.subscribe_headers = True return await self.subscribe_headers_result() @@ -821,10 +812,12 @@ class ElectrumX(SessionBase): async def add_peer(self, features): '''Add a peer (but only if the peer resolves to the source).''' self.is_peer = True + self.bump_cost(100.0) return await self.peer_mgr.on_add_peer(features, self.peer_address()) async def peers_subscribe(self): '''Return the server peers as a list of (ip, host, details) tuples.''' + self.bump_cost(1.0) return self.peer_mgr.on_peers_subscribe(self.is_tor()) async def address_status(self, hashX): @@ -843,6 +836,9 @@ class ElectrumX(SessionBase): status += ''.join(f'{hash_to_hex_str(tx.hash)}:' f'{-tx.has_unconfirmed_inputs:d}:' for tx in mempool) + + excess = max(len(db_history) + len(mempool) - 2, 0) + self.bump_cost(1.0 + excess / 50) if status: status = sha256(status.encode()).hex() else: @@ -861,6 +857,7 @@ class ElectrumX(SessionBase): utxos = await self.db.all_utxos(hashX) utxos = sorted(utxos) utxos.extend(await self.mempool.unordered_UTXOs(hashX)) + self.bump_cost(1.0 + len(utxos) / 50) spends = await self.mempool.potential_spends(hashX) return [{'tx_hash': hash_to_hex_str(utxo.tx_hash), @@ -870,13 +867,6 @@ class ElectrumX(SessionBase): if (utxo.tx_hash, utxo.tx_pos) not in spends] async def hashX_subscribe(self, hashX, alias): - # First check our limit. - if len(self.hashX_subs) >= self.max_subs: - raise RPCError(BAD_REQUEST, 'your address subscription limit ' - f'{self.max_subs:,d} reached') - - # Now let the session manager check its limit - self.session_mgr.new_subscription() self.hashX_subs[hashX] = alias return await self.address_status(hashX) @@ -918,6 +908,7 @@ class ElectrumX(SessionBase): utxos = await self.db.all_utxos(hashX) confirmed = sum(utxo.value for utxo in utxos) unconfirmed = await self.mempool.balance_delta(hashX) + self.bump_cost(1.0 + len(utxos) / 50) return {'confirmed': confirmed, 'unconfirmed': unconfirmed} async def scripthash_get_balance(self, scripthash): @@ -928,14 +919,17 @@ class ElectrumX(SessionBase): async def unconfirmed_history(self, hashX): # Note unconfirmed history is unordered in electrum-server # height is -1 if it has unconfirmed inputs, otherwise 0 - return [{'tx_hash': hash_to_hex_str(tx.hash), - 'height': -tx.has_unconfirmed_inputs, - 'fee': tx.fee} - for tx in await self.mempool.transaction_summaries(hashX)] + result = [{'tx_hash': hash_to_hex_str(tx.hash), + 'height': -tx.has_unconfirmed_inputs, + 'fee': tx.fee} + for tx in await self.mempool.transaction_summaries(hashX)] + self.bump_cost(0.25 + len(result) / 50) + return result async def confirmed_and_unconfirmed_history(self, hashX): # Note history is ordered but unconfirmed is unordered in e-s history = await self.session_mgr.limited_history(hashX) + self.bump_cost(0.25 + len(history) / 50) conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height} for tx_hash, height in history] return conf + await self.unconfirmed_history(hashX) @@ -982,6 +976,7 @@ class ElectrumX(SessionBase): height = non_negative_integer(height) cp_height = non_negative_integer(cp_height) raw_header_hex = (await self.session_mgr.raw_header(height)).hex() + self.bump_cost(1.25 - (cp_height == 0)) if cp_height == 0: return raw_header_hex result = {'header': raw_header_hex} @@ -1004,14 +999,17 @@ class ElectrumX(SessionBase): start_height = non_negative_integer(start_height) count = non_negative_integer(count) cp_height = non_negative_integer(cp_height) + cost = count / 50 max_size = self.MAX_CHUNK_SIZE count = min(count, max_size) headers, count = await self.db.read_headers(start_height, count) result = {'hex': headers.hex(), 'count': count, 'max': max_size} if count and cp_height: + cost += 1.0 last_height = start_height + count - 1 result.update(await self._merkle_proof(cp_height, last_height)) + self.bump_cost(cost) return result async def block_headers_12(self, start_height, count): @@ -1025,6 +1023,7 @@ class ElectrumX(SessionBase): size = self.coin.CHUNK_SIZE start_height = index * size headers, _ = await self.db.read_headers(start_height, size) + self.bump_cost(2016 / 50) return headers.hex() async def block_get_header(self, height): @@ -1032,6 +1031,7 @@ class ElectrumX(SessionBase): height: the header's height''' height = non_negative_integer(height) + self.bump_cost(0.25) return await self.session_mgr.electrum_header(height) def is_tor(self): @@ -1062,11 +1062,13 @@ class ElectrumX(SessionBase): async def donation_address(self): '''Return the donation address as a string, empty if there is none.''' + self.bump_cost(0.1) return self.env.donation_address async def banner(self): '''Return the server banner text.''' banner = f'You are connected to an {electrumx.version} server.' + self.bump_cost(0.5) if self.is_tor(): banner_file = self.env.tor_banner_file @@ -1086,6 +1088,7 @@ class ElectrumX(SessionBase): async def relayfee(self): '''The minimum fee a low-priority tx must pay in order to be accepted to the daemon's memory pool.''' + self.bump_cost(1.0) return await self.daemon_request('relayfee') async def estimatefee(self, number): @@ -1095,12 +1098,14 @@ class ElectrumX(SessionBase): number: the number of blocks ''' number = non_negative_integer(number) + self.bump_cost(2.0) return await self.daemon_request('estimatefee', number) async def ping(self): '''Serves as a connection keep-alive mechanism and for the client to confirm the server is still responding. ''' + self.bump_cost(0.1) return None async def server_version(self, client_name='', protocol_version=None): @@ -1109,6 +1114,7 @@ class ElectrumX(SessionBase): client_name: a string identifying the client protocol_version: the protocol version spoken by the client ''' + self.bump_cost(0.5) if self.sv_seen and self.protocol_tuple >= (1, 4): raise RPCError(BAD_REQUEST, f'server.version already sent') self.sv_seen = True @@ -1144,6 +1150,7 @@ class ElectrumX(SessionBase): '''Broadcast a raw transaction to the network. raw_tx: the raw transaction as a hexadecimal string''' + self.bump_cost(0.25 + len(raw_tx) / 5000) # This returns errors as JSON RPC errors, as is natural try: hex_hash = await self.session_mgr.broadcast_transaction(raw_tx) @@ -1176,6 +1183,7 @@ class ElectrumX(SessionBase): if verbose not in (True, False): raise RPCError(BAD_REQUEST, f'"verbose" must be a boolean') + self.bump_cost(1.0) return await self.daemon_request('getrawtransaction', tx_hash, verbose) async def _block_hash_and_tx_hashes(self, height): @@ -1189,7 +1197,9 @@ class ElectrumX(SessionBase): hex_hashes = await self.daemon_request('block_hex_hashes', height, 1) block_hash = hex_hashes[0] block = await self.daemon_request('deserialised_block', block_hash) - return block_hash, block['tx'] + tx_hashes = block['tx'] + self.bump_cost(1.0 + len(tx_hashes) / 1000) + return block_hash, tx_hashes def _get_merkle_branch(self, tx_hashes, tx_pos): '''Return a merkle branch to a transaction. @@ -1242,6 +1252,10 @@ class ElectrumX(SessionBase): else: return tx_hash + async def compact_fee_histogram(self): + self.bump_cost(1.0) + return await self.mempool.compact_fee_histogram() + def set_request_handlers(self, ptuple): self.protocol_tuple = ptuple diff --git a/setup.py b/setup.py index cf37db6..282104a 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ setuptools.setup( # "xevan_hash" package is required to sync Xuez network. # "groestlcoin_hash" package is required to sync Groestlcoin network. # "pycryptodomex" package is required to sync SmartCash network. - install_requires=['aiorpcX>=0.11.0,<0.12', 'attrs', + install_requires=['aiorpcX>=0.12.0,<0.13', 'attrs', 'plyvel', 'pylru', 'aiohttp >= 2'], packages=setuptools.find_packages(include=('electrumx*',)), description='ElectrumX Server', diff --git a/tests/server/test_env.py b/tests/server/test_env.py index f5c2797..2c79294 100644 --- a/tests/server/test_env.py +++ b/tests/server/test_env.py @@ -196,12 +196,20 @@ def test_RPC_PORT(): assert_integer('RPC_PORT', 'rpc_port', 8000) -def test_MAX_SUBSCRIPTIONS(): - assert_integer('MAX_SUBSCRIPTIONS', 'max_subscriptions', 10000) +def test_COST_HARD_LIMIT(): + assert_integer('COST_HARD_LIMIT', 'cost_hard_limit', 20000) -def test_LOG_SESSIONS(): - assert_integer('LOG_SESSIONS', 'log_sessions', 3600) +def test_COST_SOFT_LIMIT(): + assert_integer('COST_SOFT_LIMIT', 'cost_soft_limit', 2000) + + +def test_COST_SOFT_LIMIT(): + assert_integer('COST_SOFT_LIMIT', 'cost_soft_limit', 2000) + + +def test_BANDWIDTH_UNIT_COST(): + assert_integer('BANDWIDTH_UNIT_COST', 'bw_unit_cost', 5000) def test_DONATION_ADDRESS(): @@ -216,10 +224,6 @@ def test_MAX_SEND(): assert_integer('MAX_SEND', 'max_send', 1000000) -def test_MAX_SUBS(): - assert_integer('MAX_SUBS', 'max_subs', 250000) - - def test_MAX_SESSIONS(): too_big = 1000000 os.environ['MAX_SESSIONS'] = str(too_big) @@ -228,14 +232,6 @@ def test_MAX_SESSIONS(): # Cannot test default as it may be lowered by the open file limit cap -def test_MAX_SESSION_SUBS(): - assert_integer('MAX_SESSION_SUBS', 'max_session_subs', 50000) - - -def test_BANDWIDTH_LIMIT(): - assert_integer('BANDWIDTH_LIMIT', 'bandwidth_limit', 2000000) - - def test_SESSION_TIMEOUT(): assert_integer('SESSION_TIMEOUT', 'session_timeout', 600)