Browse Source

Generic session costing

* new env vars: COST_SOFT_LIMIT, COST_HARD_LIMIT, BANDWIDTH_UNIT_COST
* obsolete: BANDWIDTH_LIMIT, MAX_SESSION_SUBS, MAX_SUBS
* requires aiorpcX 0.12.0
patch-2
Neil Booth 6 years ago
parent
commit
c03a72faa6
  1. 41
      docs/environment.rst
  2. 15
      electrumx/lib/text.py
  3. 4
      electrumx/server/controller.py
  4. 9
      electrumx/server/env.py
  5. 92
      electrumx/server/session.py
  6. 2
      setup.py
  7. 28
      tests/server/test_env.py

41
docs/environment.rst

@ -246,33 +246,36 @@ raise them.
hexadecimal ASCII characters on the wire. Very few transactions on
Bitcoin mainnet are over 500KB in size.
.. envvar:: MAX_SUBS
.. envvar:: COST_SOFT_LIMIT
.. envvar:: COST_HARD_LIMIT
The maximum number of address subscriptions across all sessions.
Defaults to 250,000.
Session cost soft and hard limits as integers. The default values are :const:`2,000`
and :const:`20,000` respectively.
.. envvar:: MAX_SESSION_SUBS
The server prices each request made to it based upon an estimate of the resources needed
to process it . Factors include whether the request uses bitcoind, how much bandwidth
it uses, and how hard it hits the databases.
The maximum number of address subscriptions permitted to a single
session. Defaults to 50,000.
To set a base for the units, a :func:`blockchain.scripthash.subscribe` subscription to
an address with a history of 2 or fewer transactions is costed at :const:`1.0` before
considering the bandwidth consumed. :func:`server.ping` is costed at :const:`0.1`.
.. envvar:: BANDWIDTH_LIMIT
As the total cost of a session goes over the soft limit, its requests start to be
throttled in two ways. First, each request sleeps a little before being handled.
Second, the number of requests that the server will handle concurrently reduces. Both
effects increase as the hard limit is approached, at which point the session is
disconnected.
Per-session periodic bandwidth usage limit in bytes. This is a soft,
not hard, limit. Currently the period is hard-coded to be one hour.
The default limit value is 2 million bytes.
So non-abusive sessions can continue to be served, a session's cost gradually decays
over time. Subscriptions have an ongoing servicing cost, so the decay is slower as the
number of subscriptions increases.
Bandwidth usage over each period is totalled, and when this limit is
exceeded each subsequent request is stalled by sleeping before
handling it, effectively giving higher processing priority to other
sessions.
.. envvar:: BANDWIDTH_UNIT_COST
The more bandwidth usage exceeds this soft limit the longer the next
request will sleep. Sleep times are a round number of seconds with
a minimum of 1. Each time the delay changes the event is logged.
The number of bytes, sent and received, by a session that is deemed to cost 1.0.
Bandwidth usage is gradually reduced over time by "refunding" a
proportional part of the limit every now and then.
The default value :const:`5000` bytes, meaning the bandwidth cost assigned to a response
of 100KB is 20. If your bandwidth is cheap you should probably raise this.
.. envvar:: SESSION_TIMEOUT

15
electrumx/lib/text.py

@ -7,14 +7,15 @@ def sessions_lines(data):
'''A generator returning lines for a list of sessions.
data is the return value of rpc_sessions().'''
fmt = ('{:<6} {:<5} {:>17} {:>5} {:>5} {:>5} '
fmt = ('{:<6} {:<5} {:>17} {:>5} {:>7} {:>5} {:>5} '
'{:>7} {:>7} {:>7} {:>7} {:>7} {:>9} {:>21}')
yield fmt.format('ID', 'Flags', 'Client', 'Proto',
'Reqs', 'Txs', 'Subs',
'Cost', 'Reqs', 'Txs', 'Subs',
'Recv', 'Recv KB', 'Sent', 'Sent KB', 'Time', 'Peer')
for (id_, flags, peer, client, proto, reqs, txs_sent, subs,
for (id_, flags, peer, client, proto, cost, reqs, txs_sent, subs,
recv_count, recv_size, send_count, send_size, time) in data:
yield fmt.format(id_, flags, client, proto,
'{:,d}'.format(int(cost)),
'{:,d}'.format(reqs),
'{:,d}'.format(txs_sent),
'{:,d}'.format(subs),
@ -30,15 +31,15 @@ def groups_lines(data):
data is the return value of rpc_groups().'''
fmt = ('{:<6} {:>9} {:>9} {:>6} {:>6} {:>8}'
fmt = ('{:<6} {:>9} {:>8} {:>6} {:>6} {:>8}'
'{:>7} {:>9} {:>7} {:>9}')
yield fmt.format('ID', 'Sessions', 'Bwidth KB', 'Reqs', 'Txs', 'Subs',
yield fmt.format('ID', 'Sessions', 'Cost', 'Reqs', 'Txs', 'Subs',
'Recv', 'Recv KB', 'Sent', 'Sent KB')
for (id_, session_count, bandwidth, reqs, txs_sent, subs,
for (id_, session_count, cost, reqs, txs_sent, subs,
recv_count, recv_size, send_count, send_size) in data:
yield fmt.format(id_,
'{:,d}'.format(session_count),
'{:,d}'.format(bandwidth // 1024),
'{:,d}'.format(int(cost)),
'{:,d}'.format(reqs),
'{:,d}'.format(txs_sent),
'{:,d}'.format(subs),

4
electrumx/server/controller.py

@ -82,8 +82,8 @@ class Controller(ServerBase):
'''Start the RPC server and wait for the mempool to synchronize. Then
start serving external clients.
'''
if not (0, 11, 0) <= aiorpcx_version < (0, 12):
raise RuntimeError('aiorpcX version 0.11.x is required')
if not (0, 12, 0) <= aiorpcx_version < (0, 13):
raise RuntimeError('aiorpcX version 0.12.x is required')
env = self.env
min_str, max_str = env.coin.SESSIONCLS.protocol_min_max_strings()

9
electrumx/server/env.py

@ -31,7 +31,7 @@ class Env(EnvBase):
def __init__(self, coin=None):
super().__init__()
self.obsolete(['UTXO_MB', 'HIST_MB', 'NETWORK'])
self.obsolete([])
self.db_dir = self.required('DB_DIRECTORY')
self.db_engine = self.default('DB_ENGINE', 'leveldb')
self.daemon_url = self.required('DAEMON_URL')
@ -52,7 +52,6 @@ class Env(EnvBase):
self.ssl_certfile = self.required('SSL_CERTFILE')
self.ssl_keyfile = self.required('SSL_KEYFILE')
self.rpc_port = self.integer('RPC_PORT', 8000)
self.max_subscriptions = self.integer('MAX_SUBSCRIPTIONS', 10000)
self.banner_file = self.default('BANNER_FILE', None)
self.tor_banner_file = self.default('TOR_BANNER_FILE',
self.banner_file)
@ -68,10 +67,10 @@ class Env(EnvBase):
self.donation_address = self.default('DONATION_ADDRESS', '')
# Server limits to help prevent DoS
self.max_send = self.integer('MAX_SEND', self.coin.DEFAULT_MAX_SEND)
self.max_subs = self.integer('MAX_SUBS', 250000)
self.max_sessions = self.sane_max_sessions()
self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000)
self.bandwidth_limit = self.integer('BANDWIDTH_LIMIT', 2000000)
self.cost_soft_limit = self.integer('COST_SOFT_LIMIT', 2000)
self.cost_hard_limit = self.integer('COST_HARD_LIMIT', 20000)
self.bw_unit_cost = self.integer('BANDWIDTH_UNIT_COST', 5000)
self.session_timeout = self.integer('SESSION_TIMEOUT', 600)
self.drop_client = self.custom("DROP_CLIENT", None, re.compile)
self.blacklist_url = self.default('BLACKLIST_URL', self.coin.BLACKLIST_URL)

92
electrumx/server/session.py

@ -120,14 +120,11 @@ class SessionManager(object):
self.logger = util.class_logger(__name__, self.__class__.__name__)
self.servers = {}
self.sessions = set()
self.max_subs = env.max_subs
self.cur_group = SessionGroup(0)
self.txs_sent = 0
self.start_time = time.time()
self.history_cache = pylru.lrucache(256)
self.notified_height = None
# Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0
# Masternode stuff only for such coins
if issubclass(env.coin.SESSIONCLS, DashElectrumX):
self.mn_cache_height = 0
@ -267,11 +264,10 @@ class SessionManager(object):
await group.spawn(session.close())
# Consolidate small groups
bw_limit = self.env.bandwidth_limit
group_map = self._group_map()
groups = [group for group, sessions in group_map.items()
if len(sessions) <= 5 and
sum(s.bw_charge for s in sessions) < bw_limit]
sum(s.cost for s in sessions) < self.env.cost_soft_limit]
if len(groups) > 1:
new_group = groups[-1]
for group in groups:
@ -310,6 +306,7 @@ class SessionManager(object):
session.peer_address_str(for_log=for_log),
session.client,
session.protocol_version_string(),
session.cost,
session.count_pending_items(),
session.txs_sent,
session.sub_count(),
@ -325,7 +322,7 @@ class SessionManager(object):
for group, sessions in group_map.items():
result.append([group.gid,
len(sessions),
sum(s.bw_charge for s in sessions),
sum(s.cost for s in sessions),
sum(s.count_pending_items() for s in sessions),
sum(s.txs_sent for s in sessions),
sum(s.sub_count() for s in sessions),
@ -486,17 +483,19 @@ class SessionManager(object):
await self._start_server('RPC', self.env.cs_host(for_rpc=True),
self.env.rpc_port)
await event.wait()
session_class = self.env.coin.SESSIONCLS
session_class.cost_soft_limit = self.env.cost_soft_limit
session_class.cost_hard_limit = self.env.cost_hard_limit
session_class.cost_decay_per_sec = session_class.cost_hard_limit / 3600
session_class.bw_cost_per_byte = 1.0 / self.env.bw_unit_cost
self.logger.info(f'max session count: {self.env.max_sessions:,d}')
self.logger.info(f'session timeout: '
f'{self.env.session_timeout:,d} seconds')
self.logger.info('session bandwidth limit {:,d} bytes'
.format(self.env.bandwidth_limit))
self.logger.info('max response size {:,d} bytes'
.format(self.env.max_send))
self.logger.info('max subscriptions across all sessions: {:,d}'
.format(self.max_subs))
self.logger.info('max subscriptions per session: {:,d}'
.format(self.env.max_session_subs))
self.logger.info(f'session timeout: {self.env.session_timeout:,d} seconds')
self.logger.info(f'session cost hard limit {self.env.cost_hard_limit:,d}')
self.logger.info(f'session cost soft limit {self.env.cost_soft_limit:,d}')
self.logger.info(f'bandwidth unit cost {self.env.bw_unit_cost:,d}')
self.logger.info(f'max response size {self.env.max_send:,d} bytes')
if self.env.drop_client is not None:
self.logger.info('drop clients matching: {}'
.format(self.env.drop_client.pattern))
@ -590,14 +589,6 @@ class SessionManager(object):
self.sessions.remove(session)
self.session_event.set()
def new_subscription(self):
if self.subs_room <= 0:
self.subs_room = self.max_subs - self._sub_count()
if self.subs_room <= 0:
raise RPCError(BAD_REQUEST, f'server subscription limit '
f'{self.max_subs:,d} reached')
self.subs_room -= 1
class SessionBase(RPCSession):
'''Base class of ElectrumX JSON sessions.
@ -624,7 +615,6 @@ class SessionBase(RPCSession):
self.anon_logs = self.env.anon_logs
self.txs_sent = 0
self.log_me = False
self.bw_limit = self.env.bandwidth_limit
self.daemon_request = self.session_mgr.daemon_request
# Hijack the connection so we can log messages
self._receive_message_orig = self.connection.receive_message
@ -674,7 +664,7 @@ class SessionBase(RPCSession):
msg = ''
if not self._can_send.is_set():
msg += ' with full socket buffer'
if self._concurrency.max_concurrent != self.max_concurrent:
if self._concurrency.max_concurrent != self.initial_concurrent:
msg += ' whilst throttled'
if self.send_size >= 1024*1024:
msg += ('. Sent {:,d} bytes in {:,d} messages'
@ -716,7 +706,6 @@ class ElectrumX(SessionBase):
self.subscribe_headers = False
self.subscribe_headers_raw = False
self.connection.max_response_size = self.env.max_send
self.max_subs = self.env.max_session_subs
self.hashX_subs = {}
self.sv_seen = False
self.mempool_statuses = {}
@ -743,6 +732,7 @@ class ElectrumX(SessionBase):
}
async def server_features_async(self):
self.bump_cost(0.2)
return self.server_features(self.env)
@classmethod
@ -802,6 +792,7 @@ class ElectrumX(SessionBase):
async def _headers_subscribe(self, raw):
'''Subscribe to get headers of new blocks.'''
self.bump_cost(0.25)
self.subscribe_headers_raw = assert_boolean(raw)
self.subscribe_headers = True
return await self.subscribe_headers_result()
@ -821,10 +812,12 @@ class ElectrumX(SessionBase):
async def add_peer(self, features):
'''Add a peer (but only if the peer resolves to the source).'''
self.is_peer = True
self.bump_cost(100.0)
return await self.peer_mgr.on_add_peer(features, self.peer_address())
async def peers_subscribe(self):
'''Return the server peers as a list of (ip, host, details) tuples.'''
self.bump_cost(1.0)
return self.peer_mgr.on_peers_subscribe(self.is_tor())
async def address_status(self, hashX):
@ -843,6 +836,9 @@ class ElectrumX(SessionBase):
status += ''.join(f'{hash_to_hex_str(tx.hash)}:'
f'{-tx.has_unconfirmed_inputs:d}:'
for tx in mempool)
excess = max(len(db_history) + len(mempool) - 2, 0)
self.bump_cost(1.0 + excess / 50)
if status:
status = sha256(status.encode()).hex()
else:
@ -861,6 +857,7 @@ class ElectrumX(SessionBase):
utxos = await self.db.all_utxos(hashX)
utxos = sorted(utxos)
utxos.extend(await self.mempool.unordered_UTXOs(hashX))
self.bump_cost(1.0 + len(utxos) / 50)
spends = await self.mempool.potential_spends(hashX)
return [{'tx_hash': hash_to_hex_str(utxo.tx_hash),
@ -870,13 +867,6 @@ class ElectrumX(SessionBase):
if (utxo.tx_hash, utxo.tx_pos) not in spends]
async def hashX_subscribe(self, hashX, alias):
# First check our limit.
if len(self.hashX_subs) >= self.max_subs:
raise RPCError(BAD_REQUEST, 'your address subscription limit '
f'{self.max_subs:,d} reached')
# Now let the session manager check its limit
self.session_mgr.new_subscription()
self.hashX_subs[hashX] = alias
return await self.address_status(hashX)
@ -918,6 +908,7 @@ class ElectrumX(SessionBase):
utxos = await self.db.all_utxos(hashX)
confirmed = sum(utxo.value for utxo in utxos)
unconfirmed = await self.mempool.balance_delta(hashX)
self.bump_cost(1.0 + len(utxos) / 50)
return {'confirmed': confirmed, 'unconfirmed': unconfirmed}
async def scripthash_get_balance(self, scripthash):
@ -928,14 +919,17 @@ class ElectrumX(SessionBase):
async def unconfirmed_history(self, hashX):
# Note unconfirmed history is unordered in electrum-server
# height is -1 if it has unconfirmed inputs, otherwise 0
return [{'tx_hash': hash_to_hex_str(tx.hash),
'height': -tx.has_unconfirmed_inputs,
'fee': tx.fee}
for tx in await self.mempool.transaction_summaries(hashX)]
result = [{'tx_hash': hash_to_hex_str(tx.hash),
'height': -tx.has_unconfirmed_inputs,
'fee': tx.fee}
for tx in await self.mempool.transaction_summaries(hashX)]
self.bump_cost(0.25 + len(result) / 50)
return result
async def confirmed_and_unconfirmed_history(self, hashX):
# Note history is ordered but unconfirmed is unordered in e-s
history = await self.session_mgr.limited_history(hashX)
self.bump_cost(0.25 + len(history) / 50)
conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height}
for tx_hash, height in history]
return conf + await self.unconfirmed_history(hashX)
@ -982,6 +976,7 @@ class ElectrumX(SessionBase):
height = non_negative_integer(height)
cp_height = non_negative_integer(cp_height)
raw_header_hex = (await self.session_mgr.raw_header(height)).hex()
self.bump_cost(1.25 - (cp_height == 0))
if cp_height == 0:
return raw_header_hex
result = {'header': raw_header_hex}
@ -1004,14 +999,17 @@ class ElectrumX(SessionBase):
start_height = non_negative_integer(start_height)
count = non_negative_integer(count)
cp_height = non_negative_integer(cp_height)
cost = count / 50
max_size = self.MAX_CHUNK_SIZE
count = min(count, max_size)
headers, count = await self.db.read_headers(start_height, count)
result = {'hex': headers.hex(), 'count': count, 'max': max_size}
if count and cp_height:
cost += 1.0
last_height = start_height + count - 1
result.update(await self._merkle_proof(cp_height, last_height))
self.bump_cost(cost)
return result
async def block_headers_12(self, start_height, count):
@ -1025,6 +1023,7 @@ class ElectrumX(SessionBase):
size = self.coin.CHUNK_SIZE
start_height = index * size
headers, _ = await self.db.read_headers(start_height, size)
self.bump_cost(2016 / 50)
return headers.hex()
async def block_get_header(self, height):
@ -1032,6 +1031,7 @@ class ElectrumX(SessionBase):
height: the header's height'''
height = non_negative_integer(height)
self.bump_cost(0.25)
return await self.session_mgr.electrum_header(height)
def is_tor(self):
@ -1062,11 +1062,13 @@ class ElectrumX(SessionBase):
async def donation_address(self):
'''Return the donation address as a string, empty if there is none.'''
self.bump_cost(0.1)
return self.env.donation_address
async def banner(self):
'''Return the server banner text.'''
banner = f'You are connected to an {electrumx.version} server.'
self.bump_cost(0.5)
if self.is_tor():
banner_file = self.env.tor_banner_file
@ -1086,6 +1088,7 @@ class ElectrumX(SessionBase):
async def relayfee(self):
'''The minimum fee a low-priority tx must pay in order to be accepted
to the daemon's memory pool.'''
self.bump_cost(1.0)
return await self.daemon_request('relayfee')
async def estimatefee(self, number):
@ -1095,12 +1098,14 @@ class ElectrumX(SessionBase):
number: the number of blocks
'''
number = non_negative_integer(number)
self.bump_cost(2.0)
return await self.daemon_request('estimatefee', number)
async def ping(self):
'''Serves as a connection keep-alive mechanism and for the client to
confirm the server is still responding.
'''
self.bump_cost(0.1)
return None
async def server_version(self, client_name='', protocol_version=None):
@ -1109,6 +1114,7 @@ class ElectrumX(SessionBase):
client_name: a string identifying the client
protocol_version: the protocol version spoken by the client
'''
self.bump_cost(0.5)
if self.sv_seen and self.protocol_tuple >= (1, 4):
raise RPCError(BAD_REQUEST, f'server.version already sent')
self.sv_seen = True
@ -1144,6 +1150,7 @@ class ElectrumX(SessionBase):
'''Broadcast a raw transaction to the network.
raw_tx: the raw transaction as a hexadecimal string'''
self.bump_cost(0.25 + len(raw_tx) / 5000)
# This returns errors as JSON RPC errors, as is natural
try:
hex_hash = await self.session_mgr.broadcast_transaction(raw_tx)
@ -1176,6 +1183,7 @@ class ElectrumX(SessionBase):
if verbose not in (True, False):
raise RPCError(BAD_REQUEST, f'"verbose" must be a boolean')
self.bump_cost(1.0)
return await self.daemon_request('getrawtransaction', tx_hash, verbose)
async def _block_hash_and_tx_hashes(self, height):
@ -1189,7 +1197,9 @@ class ElectrumX(SessionBase):
hex_hashes = await self.daemon_request('block_hex_hashes', height, 1)
block_hash = hex_hashes[0]
block = await self.daemon_request('deserialised_block', block_hash)
return block_hash, block['tx']
tx_hashes = block['tx']
self.bump_cost(1.0 + len(tx_hashes) / 1000)
return block_hash, tx_hashes
def _get_merkle_branch(self, tx_hashes, tx_pos):
'''Return a merkle branch to a transaction.
@ -1242,6 +1252,10 @@ class ElectrumX(SessionBase):
else:
return tx_hash
async def compact_fee_histogram(self):
self.bump_cost(1.0)
return await self.mempool.compact_fee_histogram()
def set_request_handlers(self, ptuple):
self.protocol_tuple = ptuple

2
setup.py

@ -14,7 +14,7 @@ setuptools.setup(
# "xevan_hash" package is required to sync Xuez network.
# "groestlcoin_hash" package is required to sync Groestlcoin network.
# "pycryptodomex" package is required to sync SmartCash network.
install_requires=['aiorpcX>=0.11.0,<0.12', 'attrs',
install_requires=['aiorpcX>=0.12.0,<0.13', 'attrs',
'plyvel', 'pylru', 'aiohttp >= 2'],
packages=setuptools.find_packages(include=('electrumx*',)),
description='ElectrumX Server',

28
tests/server/test_env.py

@ -196,12 +196,20 @@ def test_RPC_PORT():
assert_integer('RPC_PORT', 'rpc_port', 8000)
def test_MAX_SUBSCRIPTIONS():
assert_integer('MAX_SUBSCRIPTIONS', 'max_subscriptions', 10000)
def test_COST_HARD_LIMIT():
assert_integer('COST_HARD_LIMIT', 'cost_hard_limit', 20000)
def test_LOG_SESSIONS():
assert_integer('LOG_SESSIONS', 'log_sessions', 3600)
def test_COST_SOFT_LIMIT():
assert_integer('COST_SOFT_LIMIT', 'cost_soft_limit', 2000)
def test_COST_SOFT_LIMIT():
assert_integer('COST_SOFT_LIMIT', 'cost_soft_limit', 2000)
def test_BANDWIDTH_UNIT_COST():
assert_integer('BANDWIDTH_UNIT_COST', 'bw_unit_cost', 5000)
def test_DONATION_ADDRESS():
@ -216,10 +224,6 @@ def test_MAX_SEND():
assert_integer('MAX_SEND', 'max_send', 1000000)
def test_MAX_SUBS():
assert_integer('MAX_SUBS', 'max_subs', 250000)
def test_MAX_SESSIONS():
too_big = 1000000
os.environ['MAX_SESSIONS'] = str(too_big)
@ -228,14 +232,6 @@ def test_MAX_SESSIONS():
# Cannot test default as it may be lowered by the open file limit cap
def test_MAX_SESSION_SUBS():
assert_integer('MAX_SESSION_SUBS', 'max_session_subs', 50000)
def test_BANDWIDTH_LIMIT():
assert_integer('BANDWIDTH_LIMIT', 'bandwidth_limit', 2000000)
def test_SESSION_TIMEOUT():
assert_integer('SESSION_TIMEOUT', 'session_timeout', 600)

Loading…
Cancel
Save