diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index 3502c09..a2697fb 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -344,11 +344,12 @@ class BlockProcessor(electrumx.server.db.DB): tx_diff = self.tx_count - self.last_flush_tx_count # Flush to file system - self.fs_flush() + self.fs_flush(self.height, self.tx_count, self.headers, + self.tx_hashes) + self.tx_hashes = [] + self.headers = [] + fs_end = time.time() - if self.utxo_db.for_sync: - self.logger.info('flushed to FS in {:.1f}s' - .format(fs_end - flush_start)) # History next - it's fast and frees memory hashX_count = self.history.flush() @@ -395,17 +396,6 @@ class BlockProcessor(electrumx.server.db.DB): .format(formatted_time(self.wall_time), formatted_time(tx_est / this_tx_per_sec))) - def fs_flush(self): - '''Flush the things stored on the filesystem.''' - assert self.fs_height + len(self.headers) == self.height - assert self.tx_count == self.tx_counts[-1] if self.tx_counts else 0 - - self.fs_update(self.fs_height, self.headers, self.tx_hashes) - self.fs_height = self.height - self.fs_tx_count = self.tx_count - self.tx_hashes = [] - self.headers = [] - async def backup_flush(self): assert self.height < self.db_height assert not self.headers diff --git a/electrumx/server/db.py b/electrumx/server/db.py index 3cb36dd..817f51e 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -150,6 +150,43 @@ class DB(object): return await self.header_mc.branch_and_root(length, height) # Flushing + def fs_flush(self, to_height, to_tx_count, headers, block_tx_hashes): + '''Write headers, tx counts and block tx hashes to the filesystem. + No LevelDB state is updated. + + The first height to write is self.fs_height + 1. The FS + metadata is all append-only, so in a crash we just pick up + again from the height stored in the DB. + ''' + prior_tx_count = (self.tx_counts[self.fs_height] + if self.fs_height >= 0 else 0) + assert len(block_tx_hashes) == len(headers) + assert to_height == self.fs_height + len(headers) + assert to_tx_count == self.tx_counts[-1] if self.tx_counts else 0 + assert len(self.tx_counts) == to_height + 1 + hashes = b''.join(block_tx_hashes) + assert len(hashes) % 32 == 0 + assert len(hashes) // 32 == to_tx_count - prior_tx_count + + # Write the headers, tx counts, and tx hashes + start_time = time.time() + height_start = self.fs_height + 1 + offset = self.header_offset(height_start) + self.headers_file.write(offset, b''.join(headers)) + self.fs_update_header_offsets(offset, height_start, headers) + offset = height_start * self.tx_counts.itemsize + self.tx_counts_file.write(offset, + self.tx_counts[height_start:].tobytes()) + offset = prior_tx_count * 32 + self.hashes_file.write(offset, hashes) + + self.fs_height = to_height + self.fs_tx_count = to_tx_count + + if self.utxo_db.for_sync: + elapsed = time.time() - start_time + self.logger.info(f'flushed to FS in {elapsed:.2f}s') + def db_assert_flushed(self, to_tx_count, to_height): '''Asserts state is fully flushed.''' assert to_tx_count == self.fs_tx_count == self.db_tx_count @@ -185,36 +222,6 @@ class DB(object): # Truncate header_mc: header count is 1 more than the height. self.header_mc.truncate(height + 1) - def fs_update(self, fs_height, headers, block_tx_hashes): - '''Write headers, the tx_count array and block tx hashes to disk. - - Their first height is fs_height. No recorded DB state is - updated. These arrays are all append only, so in a crash we - just pick up again from the DB height. - ''' - blocks_done = len(headers) - height_start = fs_height + 1 - new_height = fs_height + blocks_done - prior_tx_count = (self.tx_counts[fs_height] if fs_height >= 0 else 0) - cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 - txs_done = cur_tx_count - prior_tx_count - - assert len(block_tx_hashes) == blocks_done - assert len(self.tx_counts) == new_height + 1 - hashes = b''.join(block_tx_hashes) - assert len(hashes) % 32 == 0 - assert len(hashes) // 32 == txs_done - - # Write the headers, tx counts, and tx hashes - offset = self.header_offset(height_start) - self.headers_file.write(offset, b''.join(headers)) - self.fs_update_header_offsets(offset, height_start, headers) - offset = height_start * self.tx_counts.itemsize - self.tx_counts_file.write(offset, - self.tx_counts[height_start:].tobytes()) - offset = prior_tx_count * 32 - self.hashes_file.write(offset, hashes) - async def read_headers(self, start_height, count): '''Requires start_height >= 0, count >= 0. Reads as many headers as are available starting at start_height up to count. This