From f5384ccc9816e6009d4986cb87155eec6bf43bf1 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 26 Nov 2016 20:02:24 +0900 Subject: [PATCH 1/9] Remove get_tx_hash Anything in the DB should be on the disk. Remove misleading comment. --- server/block_processor.py | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index a1619cb..c9c86be 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -462,9 +462,6 @@ class BlockProcessor(server.db.DB): self.tx_counts.append(prior_tx_count + len(txs)) def advance_block(self, block, touched): - # We must update the FS cache before calling advance_txs() as - # the UTXO cache uses the FS cache via get_tx_hash() to - # resolve compressed key collisions header, tx_hashes, txs = self.coin.read_block(block) if self.tip != self.coin.header_prevhash(header): raise ChainReorg @@ -665,8 +662,9 @@ class BlockProcessor(server.db.DB): if len(candidates) > 1: tx_num, = unpack(' Date: Sat, 26 Nov 2016 20:34:00 +0900 Subject: [PATCH 2/9] Give backing up its own flush function Remove some excessive log messages --- server/block_processor.py | 48 ++++++++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index c9c86be..aaf1f2c 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -241,7 +241,6 @@ class BlockProcessor(server.db.DB): a real reorg.''' self.logger.info('chain reorg detected') self.flush(True) - self.logger.info('finding common height...') hashes = await self.reorg_hashes(count) # Reverse and convert to hex strings. @@ -305,12 +304,11 @@ class BlockProcessor(server.db.DB): assert not self.utxo_cache assert not self.db_deletes - def flush(self, flush_utxos=False, flush_history=None): + def flush(self, flush_utxos=False): '''Flush out cached state. History is always flushed. UTXOs are flushed if flush_utxos.''' if self.height == self.db_height: - assert flush_history is None self.assert_flushed() return @@ -319,14 +317,10 @@ class BlockProcessor(server.db.DB): last_flush = self.last_flush tx_diff = self.tx_count - self.last_flush_tx_count - if self.height > self.db_height: - assert flush_history is None - flush_history = self.flush_history - with self.db.write_batch() as batch: # History first - fast and frees memory. Flush state last # as it reads the wall time. - flush_history(batch) + self.flush_history(batch) if flush_utxos: self.flush_utxos(batch) self.flush_state(batch) @@ -394,12 +388,36 @@ class BlockProcessor(server.db.DB): self.tx_hashes = [] self.headers = [] - def backup_history(self, batch, hash168s): - self.logger.info('backing up history to height {:,d} tx_count {:,d}' - .format(self.height, self.tx_count)) + def backup_flush(self, hash168s): + '''Like flush() but when backing up. All UTXOs are flushed. + hash168s - sequence of hash168s which were touched by backing + up. Searched for history entries to remove after the backup + height. + ''' + assert self.height < self.db_height assert not self.history + self.flush_count += 1 + flush_start = time.time() + + with self.db.write_batch() as batch: + # Flush state last as it reads the wall time. + self.backup_history(batch, hash168s) + self.flush_utxos(batch) + self.flush_state(batch) + + # Update and put the wall time again - otherwise we drop the + # time it took to commit the batch + self.flush_state(self.db) + + self.logger.info('backup flush #{:,d} took {:.1f}s. ' + 'Height {:,d} txs: {:,d}' + .format(self.flush_count, + self.last_flush - flush_start, + self.height, self.tx_count)) + + def backup_history(self, batch, hash168s): nremoves = 0 for hash168 in sorted(hash168s): prefix = b'H' + hash168 @@ -426,8 +444,8 @@ class BlockProcessor(server.db.DB): assert not self.headers assert not self.tx_hashes - self.logger.info('removed {:,d} history entries from {:,d} addresses' - .format(nremoves, len(hash168s))) + self.logger.info('backing up removed {:,d} history entries from ' + '{:,d} addresses'.format(nremoves, len(hash168s))) def check_cache_size(self): '''Flush a cache if it gets too big.''' @@ -520,7 +538,6 @@ class BlockProcessor(server.db.DB): The blocks should be in order of decreasing height. A flush is performed once the blocks are backed up. ''' - self.logger.info('backing up {:,d} blocks'.format(len(blocks))) self.assert_flushed() for block in blocks: @@ -541,8 +558,7 @@ class BlockProcessor(server.db.DB): # touched includes those passed into this function. That will # generally be empty but is harmless if not. - flush_history = partial(self.backup_history, hash168s=touched) - self.flush(True, flush_history=flush_history) + self.backup_flush(touched) def backup_txs(self, tx_hashes, txs, touched): # Prevout values, in order down the block (coinbase first if present) From 1b95bcd8ac64f142bbca633bfd51222c4690dd1e Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 26 Nov 2016 22:36:05 +0900 Subject: [PATCH 3/9] Open DB differently depending on if syncing If syncing, use a high max_open_files, otherwise lower it. --- server/block_processor.py | 9 ++++-- server/db.py | 59 ++++++++++++++++++++++++++------------- server/storage.py | 35 +++++++++++++++++------ tests/test_storage.py | 4 +-- 4 files changed, 73 insertions(+), 34 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index aaf1f2c..362f76f 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -229,9 +229,12 @@ class BlockProcessor(server.db.DB): self.caught_up = True if self.first_sync: self.first_sync = False - self.logger.info('{} synced to height {:,d}. DB version:' - .format(VERSION, self.height, self.db_version)) - self.flush(True) + self.logger.info('{} synced to height {:,d}' + .format(VERSION, self.height)) + self.flush(True) + self.reopen_db(False) + else: + self.flush(True) self.event.set() async def handle_chain_reorg(self, count, touched): diff --git a/server/db.py b/server/db.py index f6672bc..677ca72 100644 --- a/server/db.py +++ b/server/db.py @@ -50,16 +50,8 @@ class DB(LoggedClass): self.logger.info('reorg limit is {:,d} blocks' .format(self.env.reorg_limit)) - # Open DB and metadata files. Record some of its state. - db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) - self.db = open_db(db_name, env.db_engine) - if self.db.is_new: - self.logger.info('created new {} database {}' - .format(env.db_engine, db_name)) - else: - self.logger.info('successfully opened {} database {}' - .format(env.db_engine, db_name)) - self.read_state() + self.db = None + self.reopen_db(True) create = self.db_height == -1 self.headers_file = self.open_file('headers', create) @@ -77,6 +69,43 @@ class DB(LoggedClass): assert self.db_tx_count == 0 self.clean_db() + def reopen_db(self, first_sync): + '''Open the database. If the database is already open, it is + closed (implicitly via GC) and re-opened. + + Re-open to set the maximum number of open files appropriately. + ''' + if self.db: + self.logger.info('closing DB to re-open') + self.db.close() + + max_open_files = 1024 if first_sync else 256 + + # Open DB and metadata files. Record some of its state. + db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET) + self.db = open_db(db_name, self.env.db_engine, max_open_files) + if self.db.is_new: + self.logger.info('created new {} database {}' + .format(self.env.db_engine, db_name)) + else: + self.logger.info('successfully opened {} database {} for sync: {}' + .format(self.env.db_engine, db_name, first_sync)) + self.read_state() + + if self.first_sync == first_sync: + self.logger.info('software version: {}'.format(VERSION)) + self.logger.info('DB version: {:d}'.format(self.db_version)) + self.logger.info('coin: {}'.format(self.coin.NAME)) + self.logger.info('network: {}'.format(self.coin.NET)) + self.logger.info('height: {:,d}'.format(self.db_height)) + self.logger.info('tip: {}'.format(hash_to_str(self.db_tip))) + self.logger.info('tx count: {:,d}'.format(self.db_tx_count)) + if self.first_sync: + self.logger.info('sync time so far: {}' + .format(formatted_time(self.wall_time))) + else: + self.reopen_db(self.first_sync) + def read_state(self): if self.db.is_new: self.db_height = -1 @@ -110,16 +139,6 @@ class DB(LoggedClass): self.wall_time = state['wall_time'] self.first_sync = state['first_sync'] - self.logger.info('software version: {}'.format(VERSION)) - self.logger.info('DB version: {:d}'.format(self.db_version)) - self.logger.info('coin: {}'.format(self.coin.NAME)) - self.logger.info('network: {}'.format(self.coin.NET)) - self.logger.info('height: {:,d}'.format(self.db_height)) - self.logger.info('tip: {}'.format(hash_to_str(self.db_tip))) - self.logger.info('tx count: {:,d}'.format(self.db_tx_count)) - if self.first_sync: - self.logger.info('sync time so far: {}' - .format(formatted_time(self.wall_time))) if self.flush_count < self.utxo_flush_count: raise self.DBError('DB corrupt: flush_count < utxo_flush_count') diff --git a/server/storage.py b/server/storage.py index b8a0785..12d7d33 100644 --- a/server/storage.py +++ b/server/storage.py @@ -16,12 +16,12 @@ from functools import partial from lib.util import subclasses, increment_byte_string -def open_db(name, db_engine): +def open_db(name, db_engine, for_sync): '''Returns a database handle.''' for db_class in subclasses(Storage): if db_class.__name__.lower() == db_engine.lower(): db_class.import_module() - return db_class(name) + return db_class(name, for_sync) raise RuntimeError('unrecognised DB engine "{}"'.format(db_engine)) @@ -29,9 +29,9 @@ def open_db(name, db_engine): class Storage(object): '''Abstract base class of the DB backend abstraction.''' - def __init__(self, name): + def __init__(self, name, for_sync): self.is_new = not os.path.exists(name) - self.open(name, create=self.is_new) + self.open(name, create=self.is_new, for_sync=for_sync) @classmethod def import_module(cls): @@ -42,6 +42,10 @@ class Storage(object): '''Open an existing database or create a new one.''' raise NotImplementedError + def close(self): + '''Close an existing database.''' + raise NotImplementedError + def get(self, key): raise NotImplementedError @@ -75,9 +79,11 @@ class LevelDB(Storage): import plyvel cls.module = plyvel - def open(self, name, create): + def open(self, name, create, for_sync): + mof = 1024 if for_sync else 256 self.db = self.module.DB(name, create_if_missing=create, - max_open_files=256, compression=None) + max_open_files=mof, compression=None) + self.close = self.db.close self.get = self.db.get self.put = self.db.put self.iterator = self.db.iterator @@ -92,18 +98,25 @@ class RocksDB(Storage): import rocksdb cls.module = rocksdb - def open(self, name, create): + def open(self, name, create, for_sync): + mof = 1024 if for_sync else 256 compression = "no" compression = getattr(self.module.CompressionType, compression + "_compression") options = self.module.Options(create_if_missing=create, compression=compression, target_file_size_base=33554432, - max_open_files=1024) + max_open_files=mof) self.db = self.module.DB(name, options) self.get = self.db.get self.put = self.db.put + def close(self): + # PyRocksDB doesn't provide a close method; hopefully this is enough + self.db = None + import gc + gc.collect() + class WriteBatch(object): def __init__(self, db): self.batch = RocksDB.module.WriteBatch() @@ -157,11 +170,15 @@ class LMDB(Storage): import lmdb cls.module = lmdb - def open(self, name, create): + def open(self, name, create, for_sync): + # I don't see anything equivalent to max_open_files for for_sync self.env = LMDB.module.Environment('.', subdir=True, create=create, max_dbs=32, map_size=5 * 10 ** 10) self.db = self.env.open_db(create=create) + def close(self): + self.env.close() + def get(self, key): with self.env.begin(db=self.db) as tx: return tx.get(key) diff --git a/tests/test_storage.py b/tests/test_storage.py index b765c27..2c3d429 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -21,7 +21,7 @@ for c in subclasses(Storage): def db(tmpdir, request): cwd = os.getcwd() os.chdir(str(tmpdir)) - db = open_db("db", request.param) + db = open_db("db", request.param, False) os.chdir(cwd) yield db # Make sure all the locks and handles are closed @@ -66,4 +66,4 @@ def test_iterator_reverse(db): assert list(db.iterator(prefix=b"abc", reverse=True)) == [ (b"abc" + str.encode(str(i)), str.encode(str(i))) for i in reversed(range(5)) - ] \ No newline at end of file + ] From 6aef79461f7e768dc063b5a8df74e98b4efad401 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 27 Nov 2016 12:12:20 +0900 Subject: [PATCH 4/9] Don't shut down block processor by cancellation The block processor needs to be able to close cleanly, and not mid-block. In order to be able to yield whilst processing blocks we cannot forcefully close its coroutine with a cancellation. --- server/block_processor.py | 162 +++++++++++++++++++++----------------- server/db.py | 2 - server/protocol.py | 6 +- 3 files changed, 93 insertions(+), 77 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 362f76f..3e8e2f8 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -23,6 +23,10 @@ from lib.util import chunks, formatted_time, LoggedClass import server.db +# Tasks placed on task queue +BLOCKS, CAUGHT_UP = range(2) + + class ChainError(Exception): pass @@ -30,17 +34,19 @@ class ChainError(Exception): class Prefetcher(LoggedClass): '''Prefetches blocks (in the forward direction only).''' - def __init__(self, daemon, height): + def __init__(self, tasks, daemon, height): super().__init__() + self.tasks = tasks self.daemon = daemon self.semaphore = asyncio.Semaphore() - self.queue = asyncio.Queue() - self.queue_size = 0 self.caught_up = False self.fetched_height = height + # A list of (blocks, size) pairs. Earliest last. + self.cache = [] + self.cache_size = 0 # Target cache size. Has little effect on sync time. self.target_cache_size = 10 * 1024 * 1024 - # First fetch to be 10 blocks + # This makes the first fetch be 10 blocks self.ave_size = self.target_cache_size // 10 async def clear(self, height): @@ -53,19 +59,19 @@ class Prefetcher(LoggedClass): with await self.semaphore: while not self.queue.empty(): self.queue.get_nowait() - self.queue_size = 0 + self.cache = [] + self.cache_size = 0 self.fetched_height = height - self.caught_up = False + self.logger.info('reset to height'.format(height)) - async def get_blocks(self): - '''Blocking function that returns prefetched blocks. - - The returned result empty just once - when the prefetcher - has caught up with the daemon. - ''' - blocks, size = await self.queue.get() - self.queue_size -= size - return blocks + def get_blocks(self): + '''Return the next list of blocks from our prefetch cache.''' + # Cache might be empty after a clear() + if self.cache: + blocks, size = self.cache.pop() + self.cache_size -= size + return blocks + return [] async def main_loop(self): '''Loop forever polling for more blocks.''' @@ -73,9 +79,15 @@ class Prefetcher(LoggedClass): .format(await self.daemon.height())) while True: try: - with await self.semaphore: - await self._prefetch() - await asyncio.sleep(5 if self.caught_up else 0) + secs = 0 + if self.cache_size < self.target_cache_size: + if await self._prefetch(): + self.tasks.put_nowait(BLOCKS) + else: + self.tasks.put_nowait(CAUGHT_UP) + self.caught_up = True + secs = 5 + await asyncio.sleep(secs) except DaemonError as e: self.logger.info('ignoring daemon error: {}'.format(e)) except asyncio.CancelledError: @@ -83,40 +95,37 @@ class Prefetcher(LoggedClass): async def _prefetch(self): '''Prefetch blocks unless the prefetch queue is full.''' - if self.queue_size >= self.target_cache_size: - return - daemon_height = await self.daemon.height() cache_room = self.target_cache_size // self.ave_size - # Try and catch up all blocks but limit to room in cache. - # Constrain count to between 0 and 4000 regardless - count = min(daemon_height - self.fetched_height, cache_room) - count = min(4000, max(count, 0)) - if not count: - # Indicate when we have caught up for the first time only - if not self.caught_up: - self.caught_up = True - self.queue.put_nowait(([], 0)) - return + with await self.semaphore: + # Try and catch up all blocks but limit to room in cache. + # Constrain count to between 0 and 4000 regardless + count = min(daemon_height - self.fetched_height, cache_room) + count = min(4000, max(count, 0)) + if not count: + return 0 + + first = self.fetched_height + 1 + hex_hashes = await self.daemon.block_hex_hashes(first, count) + if self.caught_up: + self.logger.info('new block height {:,d} hash {}' + .format(first + count - 1, hex_hashes[-1])) + blocks = await self.daemon.raw_blocks(hex_hashes) - first = self.fetched_height + 1 - hex_hashes = await self.daemon.block_hex_hashes(first, count) - if self.caught_up: - self.logger.info('new block height {:,d} hash {}' - .format(first + count - 1, hex_hashes[-1])) - blocks = await self.daemon.raw_blocks(hex_hashes) - size = sum(len(block) for block in blocks) - - # Update our recent average block size estimate - if count >= 10: - self.ave_size = size // count - else: - self.ave_size = (size + (10 - count) * self.ave_size) // 10 + size = sum(len(block) for block in blocks) + + # Update our recent average block size estimate + if count >= 10: + self.ave_size = size // count + else: + self.ave_size = (size + (10 - count) * self.ave_size) // 10 - self.fetched_height += len(blocks) - self.queue.put_nowait((blocks, size)) - self.queue_size += size + self.cache.insert(0, (blocks, size)) + self.cache_size += size + self.fetched_height += len(blocks) + + return count class ChainReorg(Exception): @@ -135,6 +144,9 @@ class BlockProcessor(server.db.DB): self.client = client + # The block processor reads its tasks from this queue + self.tasks = asyncio.Queue() + # These are our state as we move ahead of DB state self.fs_height = self.db_height self.fs_tx_count = self.db_tx_count @@ -144,6 +156,7 @@ class BlockProcessor(server.db.DB): self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url)) self.caught_up = False + self._shutdown = False self.event = asyncio.Event() # Meta @@ -154,7 +167,7 @@ class BlockProcessor(server.db.DB): # Headers and tx_hashes have one entry per block self.history = defaultdict(partial(array.array, 'I')) self.history_size = 0 - self.prefetcher = Prefetcher(self.daemon, self.height) + self.prefetcher = Prefetcher(self.tasks, self.daemon, self.height) self.last_flush = time.time() self.last_flush_tx_count = self.tx_count @@ -176,32 +189,37 @@ class BlockProcessor(server.db.DB): async def main_loop(self): '''Main loop for block processing.''' - try: - # Simulate a reorg if requested - if self.env.force_reorg > 0: - self.logger.info('DEBUG: simulating reorg of {:,d} blocks' - .format(self.env.force_reorg)) - await self.handle_chain_reorg(self.env.force_reorg, set()) - - while True: - await self._wait_for_update() - except asyncio.CancelledError: - pass - - async def shutdown(self): - '''Shut down the DB cleanly.''' - self.logger.info('flushing state to DB for clean shutdown...') + + # Simulate a reorg if requested + if self.env.force_reorg > 0: + self.logger.info('DEBUG: simulating reorg of {:,d} blocks' + .format(self.env.force_reorg)) + await self.handle_chain_reorg(self.env.force_reorg, set()) + + while True: + task = await self.tasks.get() + if self._shutdown: + break + if task == BLOCKS: + await self.advance_blocks() + else: + assert task == CAUGHT_UP + if not self.caught_up: + self.caught_up = True + self.first_caught_up() + self.flush(True) - async def _wait_for_update(self): - '''Wait for the prefetcher to deliver blocks. + def shutdown(self): + '''Call to shut down the block processor.''' + self.logger.info('flushing state to DB for clean shutdown...') + self._shutdown = True + # Ensure we don't sit waiting for a task + self.tasks.put_nowait(BLOCKS) - Blocks are only processed in the forward direction. - ''' - blocks = await self.prefetcher.get_blocks() - if not blocks: - self.first_caught_up() - return + async def advance_blocks(self): + '''Take blocks from the prefetcher and process them.''' + blocks = self.prefetcher.get_blocks() '''Strip the unspendable genesis coinbase.''' if self.height == -1: @@ -226,7 +244,6 @@ class BlockProcessor(server.db.DB): def first_caught_up(self): '''Called when first caught up after start, or after a reorg.''' - self.caught_up = True if self.first_sync: self.first_sync = False self.logger.info('{} synced to height {:,d}' @@ -253,7 +270,6 @@ class BlockProcessor(server.db.DB): self.backup_blocks(blocks, touched) await self.prefetcher.clear(self.height) - self.logger.info('prefetcher reset') async def reorg_hashes(self, count): '''Return the list of hashes to back up beacuse of a reorg. diff --git a/server/db.py b/server/db.py index 677ca72..4c2f64a 100644 --- a/server/db.py +++ b/server/db.py @@ -270,8 +270,6 @@ class DB(LoggedClass): cursor += size file_pos += size - os.sync() - def read_headers(self, start, count): '''Requires count >= 0.''' # Read some from disk diff --git a/server/protocol.py b/server/protocol.py index 7e95a19..bd84a1a 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -254,6 +254,7 @@ class ServerManager(util.LoggedClass): def add_future(coro): self.futures.append(asyncio.ensure_future(coro)) + # shutdown() assumes bp.main_loop() is first add_future(self.bp.main_loop()) add_future(self.bp.prefetcher.main_loop()) add_future(self.mempool.main_loop(self.bp.event)) @@ -316,7 +317,9 @@ class ServerManager(util.LoggedClass): async def shutdown(self): '''Call to shutdown the servers. Returns when done.''' - for future in self.futures: + self.bp.shutdown() + # Don't cancel the block processor main loop - let it close itself + for future in self.futures[1:]: future.cancel() for server in self.servers: server.close() @@ -326,7 +329,6 @@ class ServerManager(util.LoggedClass): await asyncio.sleep(0) if self.sessions: await self.close_sessions() - await self.bp.shutdown() async def close_sessions(self, secs=60): self.logger.info('cleanly closing client sessions, please wait...') From 5c0b0261589e354d4f8b13f029aff588e742a5f9 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 27 Nov 2016 13:35:09 +0900 Subject: [PATCH 5/9] Tasks queue just has null entries for now --- server/block_processor.py | 53 +++++++++++++++------------------------ 1 file changed, 20 insertions(+), 33 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 3e8e2f8..c3f16aa 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -23,14 +23,6 @@ from lib.util import chunks, formatted_time, LoggedClass import server.db -# Tasks placed on task queue -BLOCKS, CAUGHT_UP = range(2) - - -class ChainError(Exception): - pass - - class Prefetcher(LoggedClass): '''Prefetches blocks (in the forward direction only).''' @@ -81,12 +73,10 @@ class Prefetcher(LoggedClass): try: secs = 0 if self.cache_size < self.target_cache_size: - if await self._prefetch(): - self.tasks.put_nowait(BLOCKS) - else: - self.tasks.put_nowait(CAUGHT_UP) + if not await self._prefetch(): self.caught_up = True secs = 5 + self.tasks.put_nowait(None) await asyncio.sleep(secs) except DaemonError as e: self.logger.info('ignoring daemon error: {}'.format(e)) @@ -128,6 +118,9 @@ class Prefetcher(LoggedClass): return count +class ChainError(Exception): + '''Raised on error processing blocks.''' + class ChainReorg(Exception): '''Raised on a blockchain reorganisation.''' @@ -194,19 +187,18 @@ class BlockProcessor(server.db.DB): if self.env.force_reorg > 0: self.logger.info('DEBUG: simulating reorg of {:,d} blocks' .format(self.env.force_reorg)) - await self.handle_chain_reorg(self.env.force_reorg, set()) + await self.handle_chain_reorg(set(), self.env.force_reorg) while True: task = await self.tasks.get() if self._shutdown: break - if task == BLOCKS: - await self.advance_blocks() - else: - assert task == CAUGHT_UP - if not self.caught_up: - self.caught_up = True - self.first_caught_up() + blocks = self.prefetcher.get_blocks() + if blocks: + await self.advance_blocks(blocks) + elif not self.caught_up: + self.caught_up = True + self.first_caught_up() self.flush(True) @@ -214,13 +206,9 @@ class BlockProcessor(server.db.DB): '''Call to shut down the block processor.''' self.logger.info('flushing state to DB for clean shutdown...') self._shutdown = True - # Ensure we don't sit waiting for a task - self.tasks.put_nowait(BLOCKS) - - async def advance_blocks(self): - '''Take blocks from the prefetcher and process them.''' - blocks = self.prefetcher.get_blocks() + self.tasks.put_nowait(None) + async def advance_blocks(self, blocks): '''Strip the unspendable genesis coinbase.''' if self.height == -1: blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1) @@ -231,7 +219,7 @@ class BlockProcessor(server.db.DB): self.advance_block(block, touched) await asyncio.sleep(0) # Yield except ChainReorg: - await self.handle_chain_reorg(None, touched) + await self.handle_chain_reorg(touched) if self.caught_up: # Flush everything as queries are performed on the DB and @@ -243,18 +231,17 @@ class BlockProcessor(server.db.DB): self.next_cache_check = time.time() + 60 def first_caught_up(self): - '''Called when first caught up after start, or after a reorg.''' + '''Called when first caught up after starting.''' + self.flush(True) if self.first_sync: - self.first_sync = False self.logger.info('{} synced to height {:,d}' .format(VERSION, self.height)) - self.flush(True) + self.first_sync = False + self.flush_state(self.db) self.reopen_db(False) - else: - self.flush(True) self.event.set() - async def handle_chain_reorg(self, count, touched): + async def handle_chain_reorg(self, touched, count=None): '''Handle a chain reorganisation. Count is the number of blocks to simulate a reorg, or None for From f9cc21807f0976672b25a2227ecf7c5a3f061b46 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 27 Nov 2016 07:58:13 +0900 Subject: [PATCH 6/9] Further optimize the inner loop --- server/block_processor.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index c3f16aa..1e661ac 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -224,6 +224,7 @@ class BlockProcessor(server.db.DB): if self.caught_up: # Flush everything as queries are performed on the DB and # not in-memory. + await asyncio.sleep(0) self.flush(True) self.client.notify(touched) elif time.time() > self.next_cache_check: @@ -498,43 +499,47 @@ class BlockProcessor(server.db.DB): self.write_undo_info(self.height, b''.join(undo_info)) def advance_txs(self, tx_hashes, txs, touched): - put_utxo = self.utxo_cache.__setitem__ - spend_utxo = self.spend_utxo undo_info = [] # Use local vars for speed in the loops history = self.history + history_size = self.history_size tx_num = self.tx_count script_hash168 = self.coin.hash168_from_script() s_pack = pack + put_utxo = self.utxo_cache.__setitem__ + spend_utxo = self.spend_utxo + undo_info_append = undo_info.append for tx, tx_hash in zip(txs, tx_hashes): hash168s = set() + add_hash168 = hash168s.add tx_numb = s_pack(' Date: Sun, 27 Nov 2016 19:43:43 +0900 Subject: [PATCH 7/9] Fix ordering in get_utxos --- server/db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/db.py b/server/db.py index 4c2f64a..5dad58b 100644 --- a/server/db.py +++ b/server/db.py @@ -349,7 +349,7 @@ class DB(LoggedClass): if limit == 0: return limit -= 1 - tx_num, tx_pos = s_unpack(' Date: Sun, 27 Nov 2016 20:21:52 +0900 Subject: [PATCH 8/9] Fix 2 JSON RPC issues --- lib/jsonrpc.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index ee416e0..23bcd93 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -277,7 +277,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): except RCPError: pass else: - self.handle_notification(method, params) + await self.handle_notification(method, params) return None async def json_request(self, message): @@ -297,7 +297,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): await self.handle_response(message['result'], None, message['id']) return None - def raise_unknown_method(method): + def raise_unknown_method(self, method): '''Respond to a request with an unknown method.''' raise self.RPCError('unknown method: "{}"'.format(method), self.METHOD_NOT_FOUND) From 98c4ce3fefdf1dfcdab6de6b5d566409997f5c27 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 27 Nov 2016 22:01:12 +0900 Subject: [PATCH 9/9] Prepare 0.7.12 --- RELEASE-NOTES | 11 +++++++++++ server/version.py | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/RELEASE-NOTES b/RELEASE-NOTES index d283fc3..0c74d53 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,3 +1,14 @@ +version 0.7.12 +-------------- + +- minor bug fixes: 2 in JSON RPC, 1 in get_utxos (affected addresslistunspent) +- leveldb / rocksdb are opened with a different maximum open files limit, + depending on whether the chain has been fully synced or not. If synced + you want the files for network sockets, if not synced for the DB engines. + Once synced the DB will be reopened with the lower limit to free up the + files for serving network connections +- various refactoring preparing for possible asynchronous block processing + version 0.7.11 -------------- diff --git a/server/version.py b/server/version.py index 1028f97..3236b2a 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.7.11" +VERSION = "ElectrumX 0.7.12"