diff --git a/docs/RELEASE-NOTES b/docs/RELEASE-NOTES index fbbc463..a599d6c 100644 --- a/docs/RELEASE-NOTES +++ b/docs/RELEASE-NOTES @@ -1,3 +1,43 @@ +version 0.5.1 +------------- + +- 0.5 changed some cache defaults, only partially intentionally. For + some users, including me, the result was a regression (a 15hr HDD + sync became a 20hr sync). Another user reported their fastest sync + yet (sub 10hr SSD sync). What changed was memory accounting - all + releases until 0.5 were not properly accounting for memory usage of + unflushed transaction hashes. In 0.5 they were accounted for in the + UTXO cache, which resulted in much earlier flushes. 0.5.1 flushes + the hashes at the same time as history so I now account for it + towards the history cache limit. To get a reasonable comparison + with prior releases your HIST_MB environment variable should be + bumped by about 15% from 0.4 and earlier values. This will not + result in greater memory consumption - the additional memory + consumption was being ignored before but is now being included. +- 0.5.1 is the first release where Electrum client requests are queued + on a per-session basis. Previously they were in a global queue. + This is the beginning of ensuring that expensive / DOS requests + mostly affect that user's session and not those of other users. The + goal is that each session's requests run asynchronously parallel to + every other sessions's requests. The missing part of the puzzle is + that Python's asyncio is co-operative, however at the moment + ElectrumX does not yield during expensive requests. I intend that a + near upcoming release will ensure expensive requests yield the CPU + at regular fine-grained intervals. The intended result is that, to + some extent, expensive requests mainly delay that and later requests + from the same session, and have minimal impact on the legitimate + requests of other sessions. The extent to which this goal is + achieved will only be verifiable in practice. +- more robust tracking and handling of asynchronous tasks. I hope + this will reduce asyncio's logging messages, some of which I'm + becoming increasingly convinced I have no control over. In + particular I learned earlier releases were unintentionally limiting + the universe of acceptable SSL protocols, and so I made them the + default that had been intended. +- I added logging of expensive tasks, though I don't expect much real + information from this +- various RPC improvements + version 0.5 ----------- diff --git a/electrumx_rpc.py b/electrumx_rpc.py index f7f05e1..06aa56f 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -13,7 +13,6 @@ import argparse import asyncio import json -import pprint from functools import partial from os import environ @@ -36,38 +35,39 @@ class RPCClient(asyncio.Protocol): data = json.dumps(payload) + '\n' self.transport.write(data.encode()) + def print_sessions(self, result): + def data_fmt(count, size): + return '{:,d}/{:,d}KB'.format(count, size // 1024) + def time_fmt(t): + t = int(t) + return ('{:3d}:{:02d}:{:02d}' + .format(t // 3600, (t % 3600) // 60, t % 60)) + + fmt = ('{:<4} {:>23} {:>15} {:>5} ' + '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') + print(fmt.format('Type', 'Peer', 'Client', 'Subs', + 'Recv #', 'Recv KB', 'Sent #', 'Sent KB', + 'Errs', 'Time')) + for (kind, peer, subs, client, recv_count, recv_size, + send_count, send_size, error_count, time) in result: + print(fmt.format(kind, peer, client, '{:,d}'.format(subs), + '{:,d}'.format(recv_count), + '{:,d}'.format(recv_size // 1024), + '{:,d}'.format(send_count), + '{:,d}'.format(send_size // 1024), + '{:,d}'.format(error_count), + time_fmt(time))) + def data_received(self, data): payload = json.loads(data.decode()) self.transport.close() result = payload['result'] error = payload['error'] - if error: - print("ERROR: {}".format(error)) + if not error and self.method == 'sessions': + self.print_sessions(result) else: - def data_fmt(count, size): - return '{:,d}/{:,d}KB'.format(count, size // 1024) - def time_fmt(t): - t = int(t) - return ('{:3d}:{:02d}:{:02d}' - .format(t // 3600, (t % 3600) // 60, t % 60)) - - if self.method == 'sessions': - fmt = ('{:<4} {:>23} {:>15} {:>5} ' - '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') - print(fmt.format('Type', 'Peer', 'Client', 'Subs', - 'Snt #', 'Snt MB', 'Rcv #', 'Rcv MB', - 'Errs', 'Time')) - for (kind, peer, subs, client, recv_count, recv_size, - send_count, send_size, error_count, time) in result: - print(fmt.format(kind, peer, client, '{:,d}'.format(subs), - '{:,d}'.format(recv_count), - '{:,.1f}'.format(recv_size / 1048576), - '{:,d}'.format(send_count), - '{:,.1f}'.format(send_size / 1048576), - '{:,d}'.format(error_count), - time_fmt(time))) - else: - pprint.pprint(result, indent=4) + value = {'error': error} if error else result + print(json.dumps(value, indent=4, sort_keys=True)) def main(): '''Send the RPC command to the server and print the result.''' diff --git a/electrumx_server.py b/electrumx_server.py index 939bd2e..d851aa6 100755 --- a/electrumx_server.py +++ b/electrumx_server.py @@ -32,23 +32,18 @@ def main_loop(): def on_signal(signame): '''Call on receipt of a signal to cleanly shutdown.''' logging.warning('received {} signal, shutting down'.format(signame)) - for task in asyncio.Task.all_tasks(): - task.cancel() + future.cancel() + + server = BlockServer(Env()) + future = asyncio.ensure_future(server.main_loop()) # Install signal handlers for signame in ('SIGINT', 'SIGTERM'): loop.add_signal_handler(getattr(signal, signame), partial(on_signal, signame)) - server = BlockServer(Env()) - future = server.start() - try: - loop.run_until_complete(future) - except asyncio.CancelledError: - pass - finally: - server.stop() - loop.close() + loop.run_until_complete(future) + loop.close() def main(): diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 663b84c..205beb3 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -63,7 +63,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): def __init__(self, msg, code=-1, **kw_args): super().__init__(**kw_args) self.msg = msg - self.code + self.code = code def __init__(self): @@ -172,7 +172,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): the connection is closing. ''' if isinstance(request, list): - payload = self.batch_request_payload(request) + payload = await self.batch_request_payload(request) else: payload = await self.single_request_payload(request) @@ -231,7 +231,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): handler = self.method_handler(method) if not handler: - raise self.RPCError('unknown method: {}'.format(method), + raise self.RPCError('unknown method: "{}"'.format(method), self.METHOD_NOT_FOUND) return await handler(params) diff --git a/server/block_processor.py b/server/block_processor.py index afc97ff..63a3c88 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -91,6 +91,8 @@ class Prefetcher(LoggedClass): await asyncio.sleep(0) except DaemonError as e: self.logger.info('ignoring daemon error: {}'.format(e)) + except asyncio.CancelledError: + break async def _caught_up(self): '''Poll for new blocks and mempool state. @@ -215,7 +217,7 @@ class MemPool(LoggedClass): for n, (hex_hash, tx) in enumerate(new_txs.items()): # Yield to process e.g. signals - if n % 500 == 0: + if n % 100 == 0: await asyncio.sleep(0) txout_pairs = [txout_pair(txout) for txout in tx.outputs] self.txs[hex_hash] = (None, txout_pairs, None) @@ -236,7 +238,7 @@ class MemPool(LoggedClass): # Now add the inputs for n, (hex_hash, tx) in enumerate(new_txs.items()): # Yield to process e.g. signals - if n % 50 == 0: + if n % 10 == 0: await asyncio.sleep(0) if initial and time.time() > next_log: @@ -319,6 +321,8 @@ class BlockProcessor(server.db.DB): super().__init__(env) # These are our state as we move ahead of DB state + self.fs_height = self.db_height + self.fs_tx_count = self.db_tx_count self.height = self.db_height self.tip = self.db_tip self.tx_count = self.db_tx_count @@ -327,6 +331,7 @@ class BlockProcessor(server.db.DB): self.daemon.debug_set_height(self.height) self.mempool = MemPool(self) self.touched = set() + self.futures = [] # Meta self.utxo_MB = env.utxo_MB @@ -369,23 +374,28 @@ class BlockProcessor(server.db.DB): self.clean_db() - def start(self): - '''Returns a future that starts the block processor when awaited.''' - return asyncio.gather(self.main_loop(), - self.prefetcher.main_loop()) - async def main_loop(self): '''Main loop for block processing. Safely flushes the DB on clean shutdown. ''' + self.futures.append(asyncio.ensure_future(self.prefetcher.main_loop())) try: while True: await self._wait_for_update() await asyncio.sleep(0) # Yield except asyncio.CancelledError: - self.flush(True) - raise + self.on_cancel() + # This lets the asyncio subsystem process futures cancellations + await asyncio.sleep(0) + + def on_cancel(self): + '''Called when the main loop is cancelled. + + Intended to be overridden in derived classes.''' + for future in self.futures: + future.cancel() + self.flush(True) async def _wait_for_update(self): '''Wait for the prefetcher to deliver blocks or a mempool update. @@ -526,7 +536,8 @@ class BlockProcessor(server.db.DB): def assert_flushed(self): '''Asserts state is fully flushed.''' - assert self.tx_count == self.db_tx_count + assert self.tx_count == self.fs_tx_count == self.db_tx_count + assert self.height == self.fs_height == self.db_height assert not self.history assert not self.utxo_cache assert not self.db_cache @@ -563,9 +574,10 @@ class BlockProcessor(server.db.DB): # time it took to commit the batch self.flush_state(self.db) - self.logger.info('flush #{:,d} to height {:,d} txs: {:,d} took {:,d}s' + self.logger.info('flush #{:,d} to height {:,d} txs: {:,d} ' + 'took {:,.1f}s' .format(self.flush_count, self.height, self.tx_count, - int(self.last_flush - flush_start))) + self.last_flush - flush_start)) # Catch-up stats if show_stats: @@ -591,7 +603,12 @@ class BlockProcessor(server.db.DB): formatted_time(tx_est / this_tx_per_sec))) def flush_history(self, batch): - flush_start = time.time() + fs_flush_start = time.time() + self.fs_flush() + fs_flush_end = time.time() + self.logger.info('FS flush took {:.1f} seconds' + .format(fs_flush_end - fs_flush_start)) + flush_id = pack('>H', self.flush_count) for hash168, hist in self.history.items(): @@ -599,21 +616,21 @@ class BlockProcessor(server.db.DB): batch.put(key, hist.tobytes()) self.logger.info('flushed {:,d} history entries for {:,d} addrs ' - 'in {:,d}s' + 'in {:.1f}s' .format(self.history_size, len(self.history), - int(time.time() - flush_start))) + time.time() - fs_flush_end)) self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 def fs_flush(self): '''Flush the things stored on the filesystem.''' blocks_done = len(self.headers) - prior_tx_count = (self.tx_counts[self.db_height] - if self.db_height >= 0 else 0) + prior_tx_count = (self.tx_counts[self.fs_height] + if self.fs_height >= 0 else 0) cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 txs_done = cur_tx_count - prior_tx_count - assert self.db_height + blocks_done == self.height + assert self.fs_height + blocks_done == self.height assert len(self.tx_hashes) == blocks_done assert len(self.tx_counts) == self.height + 1 assert cur_tx_count == self.tx_count, \ @@ -622,13 +639,13 @@ class BlockProcessor(server.db.DB): # First the headers headers = b''.join(self.headers) header_len = self.coin.HEADER_LEN - self.headers_file.seek((self.db_height + 1) * header_len) + self.headers_file.seek((self.fs_height + 1) * header_len) self.headers_file.write(headers) self.headers_file.flush() # Then the tx counts - self.txcount_file.seek((self.db_height + 1) * self.tx_counts.itemsize) - self.txcount_file.write(self.tx_counts[self.db_height + 1:]) + self.txcount_file.seek((self.fs_height + 1) * self.tx_counts.itemsize) + self.txcount_file.write(self.tx_counts[self.fs_height + 1:]) self.txcount_file.flush() # Finally the hashes @@ -648,7 +665,8 @@ class BlockProcessor(server.db.DB): file_pos += size os.sync() - + self.fs_height = self.height + self.fs_tx_count = self.tx_count self.tx_hashes = [] self.headers = [] @@ -692,9 +710,9 @@ class BlockProcessor(server.db.DB): utxo_cache_size = len(self.utxo_cache) * 187 db_cache_size = len(self.db_cache) * 105 hist_cache_size = len(self.history) * 180 + self.history_size * 4 - tx_hash_size = (self.tx_count - self.db_tx_count) * 74 - utxo_MB = (db_cache_size + utxo_cache_size + tx_hash_size) // one_MB - hist_MB = hist_cache_size // one_MB + tx_hash_size = (self.tx_count - self.fs_tx_count) * 74 + utxo_MB = (db_cache_size + utxo_cache_size) // one_MB + hist_MB = (hist_cache_size + tx_hash_size) // one_MB self.logger.info('UTXOs: {:,d} deletes: {:,d} ' 'UTXOs {:,d}MB hist {:,d}MB' @@ -978,6 +996,7 @@ class BlockProcessor(server.db.DB): # Care is needed because the writes generated by flushing the # UTXO state may have keys in common with our write cache or # may be in the DB already. + flush_start = time.time() self.logger.info('flushing {:,d} blocks with {:,d} txs' .format(self.height - self.db_height, self.tx_count - self.db_tx_count)) @@ -987,12 +1006,6 @@ class BlockProcessor(server.db.DB): self.utxo_cache_spends, self.db_deletes)) - fs_flush_start = time.time() - self.fs_flush() - fs_flush_end = time.time() - self.logger.info('FS flush took {:.1f} seconds' - .format(fs_flush_end - fs_flush_start)) - collisions = 0 new_utxos = len(self.utxo_cache) @@ -1031,18 +1044,18 @@ class BlockProcessor(server.db.DB): self.db_tip = self.tip self.logger.info('UTXO flush took {:.1f} seconds' - .format(time.time() - fs_flush_end)) + .format(time.time() - flush_start)) def read_headers(self, start, count): # Read some from disk - disk_count = min(count, self.db_height + 1 - start) + disk_count = min(count, self.fs_height + 1 - start) result = self.fs_read_headers(start, disk_count) count -= disk_count start += disk_count # The rest from memory if count: - start -= self.db_height + 1 + start -= self.fs_height + 1 if not (count >= 0 and start + count <= len(self.headers)): raise ChainError('{:,d} headers starting at {:,d} not on disk' .format(count, start)) @@ -1056,7 +1069,7 @@ class BlockProcessor(server.db.DB): # Is this unflushed? if tx_hash is None: - tx_hashes = self.tx_hashes[tx_height - (self.db_height + 1)] + tx_hashes = self.tx_hashes[tx_height - (self.fs_height + 1)] tx_hash = tx_hashes[tx_num - self.tx_counts[tx_height - 1]] return tx_hash, tx_height diff --git a/server/irc.py b/server/irc.py index a23bd3b..0a39dd8 100644 --- a/server/irc.py +++ b/server/irc.py @@ -51,6 +51,12 @@ class IRC(LoggedClass): self.peers = {} async def start(self): + try: + await self.join() + except asyncio.CancelledError: + pass + + async def join(self): import irc.client as irc_client self.logger.info('joining IRC with nick "{}" and real name "{}"' diff --git a/server/protocol.py b/server/protocol.py index 92ebb8c..6eda042 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -42,15 +42,16 @@ class BlockServer(BlockProcessor): self.bs_caught_up = True self.server_mgr.notify(self.height, self.touched) - def stop(self): - '''Close the listening servers.''' + def on_cancel(self): + '''Called when the main loop is cancelled.''' self.server_mgr.stop() + super().on_cancel() class ServerManager(LoggedClass): '''Manages the servers.''' - AsyncTask = namedtuple('AsyncTask', 'session job') + MgrTask = namedtuple('MgrTask', 'session task') def __init__(self, bp, env): super().__init__() @@ -58,9 +59,8 @@ class ServerManager(LoggedClass): self.env = env self.servers = [] self.irc = IRC(env) - self.sessions = set() - self.tasks = asyncio.Queue() - self.current_task = None + self.sessions = {} + self.futures = [] # At present just the IRC future, if any async def start_server(self, kind, *args, **kw_args): loop = asyncio.get_event_loop() @@ -95,16 +95,14 @@ class ServerManager(LoggedClass): await self.start_server('TCP', env.host, env.tcp_port) if env.ssl_port is not None: - # FIXME: update if we want to require Python >= 3.5.3 - sslc = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + # Python 3.5.3: use PROTOCOL_TLS + sslc = ssl.SSLContext(ssl.PROTOCOL_SSLv23) sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile) await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc) - asyncio.ensure_future(self.run_tasks()) - if env.irc: self.logger.info('starting IRC coroutine') - asyncio.ensure_future(self.irc.start()) + self.futures.append(asyncio.ensure_future(self.irc.start())) else: self.logger.info('IRC disabled') @@ -115,67 +113,54 @@ class ServerManager(LoggedClass): ElectrumX.notify(sessions, height, touched) def stop(self): - '''Close the listening servers.''' + '''Close listening servers.''' for server in self.servers: server.close() + self.servers = [] + for future in self.futures: + future.cancel() + self.futures = [] + sessions = list(self.sessions.keys()) # A copy + for session in sessions: + self.remove_session(session) def add_session(self, session): assert session not in self.sessions - self.sessions.add(session) + coro = session.serve_requests() + self.sessions[session] = asyncio.ensure_future(coro) def remove_session(self, session): - self.sessions.remove(session) - if self.current_task and session == self.current_task.session: - self.logger.info('cancelling running task') - self.current_task.job.cancel() - - def add_task(self, session, job): - assert session in self.sessions - task = asyncio.ensure_future(job) - self.tasks.put_nowait(self.AsyncTask(session, task)) - - async def run_tasks(self): - '''Asynchronously run through the task queue.''' - while True: - task = await self.tasks.get() - try: - if task.session in self.sessions: - self.current_task = task - await task.job - else: - task.job.cancel() - except asyncio.CancelledError: - self.logger.info('cancelled task noted') - except Exception: - # Getting here should probably be considered a bug and fixed - traceback.print_exc() - finally: - self.current_task = None + future = self.sessions.pop(session) + future.cancel() def irc_peers(self): return self.irc.peers def session_count(self): - return len(self.sessions) + '''Returns a dictionary.''' + active = len([s for s in self.sessions if s.send_count]) + total = len(self.sessions) + return {'active': active, 'inert': total - active, 'total': total} - def info(self): - '''Returned in the RPC 'getinfo' call.''' - address_count = sum(len(session.hash168s) - for session in self.sessions - if isinstance(session, ElectrumX)) + def address_count(self): + return sum(len(session.hash168s) for session in self.sessions + if isinstance(session, ElectrumX)) + + async def rpc_getinfo(self, params): + '''The RPC 'getinfo' call.''' return { 'blocks': self.bp.height, - 'peers': len(self.irc_peers()), + 'peers': len(self.irc.peers), 'sessions': self.session_count(), - 'watched': address_count, + 'watched': self.address_count(), 'cached': 0, } - def sessions_info(self): + async def rpc_sessions(self, params): '''Returned to the RPC 'sessions' call.''' now = time.time() return [(session.kind, - session.peername(), + session.peername(for_log=False), len(session.hash168s), 'RPC' if isinstance(session, LocalRPC) else session.client, session.recv_count, session.recv_size, @@ -184,9 +169,23 @@ class ServerManager(LoggedClass): now - session.start) for session in self.sessions] + async def rpc_numsessions(self, params): + return self.session_count() + + async def rpc_peers(self, params): + return self.irc.peers + + async def rpc_numpeers(self, params): + return len(self.irc.peers) + class Session(JSONRPC): - '''Base class of ElectrumX JSON session protocols.''' + '''Base class of ElectrumX JSON session protocols. + + Each session runs its tasks in asynchronous parallelism with other + sessions. To prevent some sessions blocking othersr, potentially + long-running requests should yield (not yet implemented). + ''' def __init__(self, manager, bp, env, kind): super().__init__() @@ -197,12 +196,14 @@ class Session(JSONRPC): self.coin = bp.coin self.kind = kind self.hash168s = set() + self.requests = asyncio.Queue() + self.current_task = None self.client = 'unknown' def connection_made(self, transport): '''Handle an incoming client connection.''' super().connection_made(transport) - self.logger.info('connection from {}'.format(self.peername(True))) + self.logger.info('connection from {}'.format(self.peername())) self.manager.add_session(self) def connection_lost(self, exc): @@ -211,7 +212,7 @@ class Session(JSONRPC): if self.error_count or self.send_size >= 250000: self.logger.info('{} disconnected. ' 'Sent {:,d} bytes in {:,d} messages {:,d} errors' - .format(self.peername(True), self.send_size, + .format(self.peername(), self.send_size, self.send_count, self.error_count)) self.manager.remove_session(self) @@ -221,15 +222,35 @@ class Session(JSONRPC): def on_json_request(self, request): '''Queue the request for asynchronous handling.''' - self.manager.add_task(self, self.handle_json_request(request)) + self.requests.put_nowait(request) - def peername(self, for_log=False): - # Anonymi{z, s}e all IP addresses that will be stored in a log - if for_log and self.env.anon_logs and self.peer_info: - info = ["XX.XX.XX.XX", "XX"] - else: - info = self.peer_info - return 'unknown' if not info else '{}:{}'.format(info[0], info[1]) + async def serve_requests(self): + '''Asynchronously run through the task queue.''' + while True: + await asyncio.sleep(0) + request = await self.requests.get() + try: + start = time.time() + await self.handle_json_request(request) + secs = time.time() - start + if secs > 1: + self.logger.warning('slow request for {} took {:.1f}s: {}' + .format(self.peername(), secs, + request)) + except asyncio.CancelledError: + break + except Exception: + # Getting here should probably be considered a bug and fixed + self.logger.error('error handling request {}'.format(request)) + traceback.print_exc() + + def peername(self, *, for_log=True): + if not self.peer_info: + return 'unknown' + # Anonymize IP addresses that will be logged + if for_log and self.env.anon_logs: + return 'xx.xx.xx.xx:xx' + return '{}:{}'.format(self.peer_info[0], self.peer_info[1]) def tx_hash_from_param(self, param): '''Raise an RPCError if the parameter is not a valid transaction @@ -576,19 +597,5 @@ class LocalRPC(Session): def __init__(self, *args): super().__init__(*args) cmds = 'getinfo sessions numsessions peers numpeers'.split() - self.handlers = {cmd: getattr(self, cmd) for cmd in cmds} - - async def getinfo(self, params): - return self.manager.info() - - async def sessions(self, params): - return self.manager.sessions_info() - - async def numsessions(self, params): - return self.manager.session_count() - - async def peers(self, params): - return self.manager.irc_peers() - - async def numpeers(self, params): - return len(self.manager.irc_peers()) + self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd)) + for cmd in cmds} diff --git a/server/version.py b/server/version.py index c49ba4d..77fe367 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.5" +VERSION = "ElectrumX 0.5.1" diff --git a/tests/test_util.py b/tests/test_util.py index 0e92deb..a6b79ef 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -3,6 +3,9 @@ from lib import util def test_cachedproperty(): class Target: + + CALL_COUNT = 0 + def __init__(self): self.call_count = 0 @@ -11,8 +14,15 @@ def test_cachedproperty(): self.call_count += 1 return self.call_count + @util.cachedproperty + def cls_prop(cls): + cls.CALL_COUNT += 1 + return cls.CALL_COUNT + + t = Target() assert t.prop == t.prop == 1 + assert Target.cls_prop == Target.cls_prop == 1 def test_deep_getsizeof(): @@ -36,6 +46,7 @@ class B(Base): def test_subclasses(): assert util.subclasses(Base) == [A, B] + assert util.subclasses(Base, strict=False) == [A, B, Base] def test_chunks():