Browse Source

Merge branch 'release-0.6.3'

master 0.6.3
Neil Booth 8 years ago
parent
commit
725ad399d4
  1. 14
      docs/ENV-NOTES
  2. 8
      docs/RELEASE-NOTES
  3. 8
      lib/util.py
  4. 17
      server/block_processor.py
  5. 2
      server/db.py
  6. 3
      server/env.py
  7. 72
      server/protocol.py
  8. 2
      server/version.py

14
docs/ENV-NOTES

@ -32,8 +32,20 @@ RPC_PORT - Listen on this port for local RPC connections, defaults to
8000. 8000.
BANNER_FILE - a path to a banner file to serve to clients. The banner file BANNER_FILE - a path to a banner file to serve to clients. The banner file
is re-read for each new client. is re-read for each new client.
ANON_LOGS - set to anything non-empty to remove IP addresses from
logs. By default IP addresses will be logged.
DONATION_ADDRESS - server donation address. Defaults to none. DONATION_ADDRESS - server donation address. Defaults to none.
ANON_LOGS - set to remove IP addresses from logs. Default: disabled
These following environment variables are to help limit server
resource consumption and to prevent simple DoS. Address subscriptions
in ElectrumX are very cheap - they consume about 100 bytes of memory
each and are processed efficiently. I feel the defaults are low and
encourage you to raise them.
MAX_SUBS - maximum number of address subscriptions across all
sessions. Defaults to 250,000.
MAX_SESSION_SUBS - maximum number of address subscriptions permitted to a
single session. Defaults to 50,000.
If you want IRC connectivity to advertise your node: If you want IRC connectivity to advertise your node:

8
docs/RELEASE-NOTES

@ -1,3 +1,11 @@
version 0.6.3
-------------
- new environment variables MAX_SUBS and MAX_SESSION_SUBS. Please read
docs/ENV-NOTES - I encourage you to raise the default values.
- fixed import bug in 0.6.2 that prevented initial sync
- issues closed: #30. Logs should be clean on shutdown now.
version 0.6.2 version 0.6.2
------------- -------------

8
lib/util.py

@ -37,6 +37,14 @@ class cachedproperty(object):
return value return value
def formatted_time(t):
'''Return a number of seconds as a string in days, hours, mins and
secs.'''
t = int(t)
return '{:d}d {:02d}h {:02d}m {:02d}s'.format(
t // 86400, (t % 86400) // 3600, (t % 3600) // 60, t % 60)
def deep_getsizeof(obj): def deep_getsizeof(obj):
"""Find the memory footprint of a Python object. """Find the memory footprint of a Python object.

17
server/block_processor.py

@ -21,7 +21,7 @@ from functools import partial
from server.daemon import Daemon, DaemonError from server.daemon import Daemon, DaemonError
from server.version import VERSION from server.version import VERSION
from lib.hash import hash_to_str from lib.hash import hash_to_str
from lib.util import chunks, LoggedClass from lib.util import chunks, formatted_time, LoggedClass
import server.db import server.db
from server.storage import open_db from server.storage import open_db
@ -30,14 +30,6 @@ HIST_ENTRIES_PER_KEY = 1024
HIST_VALUE_BYTES = HIST_ENTRIES_PER_KEY * 4 HIST_VALUE_BYTES = HIST_ENTRIES_PER_KEY * 4
def formatted_time(t):
'''Return a number of seconds as a string in days, hours, mins and
secs.'''
t = int(t)
return '{:d}d {:02d}h {:02d}m {:02d}s'.format(
t // 86400, (t % 86400) // 3600, (t % 3600) // 60, t % 60)
class ChainError(Exception): class ChainError(Exception):
pass pass
@ -216,8 +208,7 @@ class BlockProcessor(server.db.DB):
await self._wait_for_update() await self._wait_for_update()
except asyncio.CancelledError: except asyncio.CancelledError:
self.on_cancel() self.on_cancel()
# This lets the asyncio subsystem process futures cancellations await self.wait_shutdown()
await asyncio.sleep(0)
def on_cancel(self): def on_cancel(self):
'''Called when the main loop is cancelled. '''Called when the main loop is cancelled.
@ -227,6 +218,10 @@ class BlockProcessor(server.db.DB):
future.cancel() future.cancel()
self.flush(True) self.flush(True)
async def wait_shutdown(self):
'''Wait for shutdown to complete cleanly, and return.'''
await asyncio.sleep(0)
async def _wait_for_update(self): async def _wait_for_update(self):
'''Wait for the prefetcher to deliver blocks or a mempool update. '''Wait for the prefetcher to deliver blocks or a mempool update.

2
server/db.py

@ -15,7 +15,7 @@ from struct import pack, unpack
from bisect import bisect_right from bisect import bisect_right
from collections import namedtuple from collections import namedtuple
from lib.util import chunks, LoggedClass from lib.util import chunks, formatted_time, LoggedClass
from lib.hash import double_sha256, hash_to_str from lib.hash import double_sha256, hash_to_str
from server.storage import open_db from server.storage import open_db
from server.version import VERSION from server.version import VERSION

3
server/env.py

@ -46,6 +46,9 @@ class Env(LoggedClass):
self.db_engine = self.default('DB_ENGINE', 'leveldb') self.db_engine = self.default('DB_ENGINE', 'leveldb')
self.debug = self.default('DEBUG', '') self.debug = self.default('DEBUG', '')
self.debug = [item.lower() for item in self.debug.split()] self.debug = [item.lower() for item in self.debug.split()]
# Subscription limits
self.max_subs = self.integer('MAX_SUBS', 250000)
self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000)
# IRC # IRC
self.report_tcp_port = self.integer('REPORT_TCP_PORT', self.tcp_port) self.report_tcp_port = self.integer('REPORT_TCP_PORT', self.tcp_port)
self.report_ssl_port = self.integer('REPORT_SSL_PORT', self.ssl_port) self.report_ssl_port = self.integer('REPORT_SSL_PORT', self.ssl_port)

72
server/protocol.py

@ -54,6 +54,11 @@ class BlockServer(BlockProcessor):
self.server_mgr.stop() self.server_mgr.stop()
super().on_cancel() super().on_cancel()
async def wait_shutdown(self):
'''Wait for shutdown to complete cleanly, and return.'''
await self.server_mgr.wait_shutdown()
await super().wait_shutdown()
def mempool_transactions(self, hash168): def mempool_transactions(self, hash168):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
entries for the hash168. entries for the hash168.
@ -141,7 +146,7 @@ class MemPool(LoggedClass):
for n, (hex_hash, tx) in enumerate(new_txs.items()): for n, (hex_hash, tx) in enumerate(new_txs.items()):
# Yield to process e.g. signals # Yield to process e.g. signals
if n % 100 == 0: if n % 20 == 0:
await asyncio.sleep(0) await asyncio.sleep(0)
txout_pairs = [txout_pair(txout) for txout in tx.outputs] txout_pairs = [txout_pair(txout) for txout in tx.outputs]
self.txs[hex_hash] = (None, txout_pairs, None) self.txs[hex_hash] = (None, txout_pairs, None)
@ -162,7 +167,6 @@ class MemPool(LoggedClass):
# Now add the inputs # Now add the inputs
for n, (hex_hash, tx) in enumerate(new_txs.items()): for n, (hex_hash, tx) in enumerate(new_txs.items()):
# Yield to process e.g. signals # Yield to process e.g. signals
if n % 10 == 0:
await asyncio.sleep(0) await asyncio.sleep(0)
if initial and time.time() > next_log: if initial and time.time() > next_log:
@ -246,7 +250,13 @@ class ServerManager(LoggedClass):
self.servers = [] self.servers = []
self.irc = IRC(env) self.irc = IRC(env)
self.sessions = {} self.sessions = {}
self.futures = [] # At present just the IRC future, if any self.max_subs = env.max_subs
self.subscription_count = 0
self.irc_future = None
self.logger.info('max subscriptions across all sessions: {:,d}'
.format(self.max_subs))
self.logger.info('max subscriptions per session: {:,d}'
.format(env.max_session_subs))
async def start_server(self, kind, *args, **kw_args): async def start_server(self, kind, *args, **kw_args):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
@ -257,8 +267,6 @@ class ServerManager(LoggedClass):
host, port = args[:2] host, port = args[:2]
try: try:
self.servers.append(await server) self.servers.append(await server)
except asyncio.CancelledError:
raise
except Exception as e: except Exception as e:
self.logger.error('{} server failed to listen on {}:{:d} :{}' self.logger.error('{} server failed to listen on {}:{:d} :{}'
.format(kind, host, port, e)) .format(kind, host, port, e))
@ -288,7 +296,7 @@ class ServerManager(LoggedClass):
if env.irc: if env.irc:
self.logger.info('starting IRC coroutine') self.logger.info('starting IRC coroutine')
self.futures.append(asyncio.ensure_future(self.irc.start())) self.irc_future = asyncio.ensure_future(self.irc.start())
else: else:
self.logger.info('IRC disabled') self.logger.info('IRC disabled')
@ -302,25 +310,51 @@ class ServerManager(LoggedClass):
def stop(self): def stop(self):
'''Close listening servers.''' '''Close listening servers.'''
self.logger.info('cleanly closing client sessions, please wait...')
for server in self.servers: for server in self.servers:
server.close() server.close()
if self.irc_future:
self.irc_future.cancel()
for session in self.sessions:
session.transport.close()
async def wait_shutdown(self):
# Wait for servers to close
for server in self.servers:
await server.wait_closed()
# Just in case a connection came in
await asyncio.sleep(0)
self.servers = [] self.servers = []
for future in self.futures: self.logger.info('server listening sockets closed')
limit = time.time() + 10
while self.sessions and time.time() < limit:
self.logger.info('{:,d} sessions remaining'
.format(len(self.sessions)))
await asyncio.sleep(2)
if self.sessions:
self.logger.info('forcibly closing {:,d} stragglers'
.format(len(self.sessions)))
for future in self.sessions.values():
future.cancel() future.cancel()
self.futures = [] await asyncio.sleep(0)
sessions = list(self.sessions.keys()) # A copy
for session in sessions:
self.remove_session(session)
def add_session(self, session): def add_session(self, session):
assert self.servers
assert session not in self.sessions assert session not in self.sessions
coro = session.serve_requests() coro = session.serve_requests()
self.sessions[session] = asyncio.ensure_future(coro) self.sessions[session] = asyncio.ensure_future(coro)
def remove_session(self, session): def remove_session(self, session):
self.subscription_count -= session.sub_count()
future = self.sessions.pop(session) future = self.sessions.pop(session)
future.cancel() future.cancel()
def new_subscription(self):
if self.subscription_count >= self.max_subs:
raise JSONRPC.RPCError('server subscription limit {:,d} reached'
.format(self.max_subs))
self.subscription_count += 1
def irc_peers(self): def irc_peers(self):
return self.irc.peers return self.irc.peers
@ -330,18 +364,13 @@ class ServerManager(LoggedClass):
total = len(self.sessions) total = len(self.sessions)
return {'active': active, 'inert': total - active, 'total': total} return {'active': active, 'inert': total - active, 'total': total}
def address_count(self):
return sum(len(session.hash168s) for session in self.sessions
if isinstance(session, ElectrumX))
async def rpc_getinfo(self, params): async def rpc_getinfo(self, params):
'''The RPC 'getinfo' call.''' '''The RPC 'getinfo' call.'''
return { return {
'blocks': self.bp.height, 'blocks': self.bp.height,
'peers': len(self.irc.peers), 'peers': len(self.irc.peers),
'sessions': self.session_count(), 'sessions': self.session_count(),
'watched': self.address_count(), 'watched': self.subscription_count,
'cached': 0,
} }
async def rpc_sessions(self, params): async def rpc_sessions(self, params):
@ -503,6 +532,7 @@ class ElectrumX(Session):
self.subscribe_headers = False self.subscribe_headers = False
self.subscribe_height = False self.subscribe_height = False
self.notified_height = None self.notified_height = None
self.max_subs = self.env.max_session_subs
self.hash168s = set() self.hash168s = set()
rpcs = [ rpcs = [
('blockchain', ('blockchain',
@ -689,8 +719,14 @@ class ElectrumX(Session):
async def address_subscribe(self, params): async def address_subscribe(self, params):
hash168 = self.extract_hash168(params) hash168 = self.extract_hash168(params)
if len(self.hash168s) >= self.max_subs:
raise self.RPCError('your address subscription limit {:,d} reached'
.format(self.max_subs))
result = await self.address_status(hash168)
# add_subscription can raise so call it before adding
self.manager.new_subscription()
self.hash168s.add(hash168) self.hash168s.add(hash168)
return await self.address_status(hash168) return result
async def block_get_chunk(self, params): async def block_get_chunk(self, params):
index = self.extract_non_negative_integer(params) index = self.extract_non_negative_integer(params)

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.6.2" VERSION = "ElectrumX 0.6.3"

Loading…
Cancel
Save