diff --git a/server/block_processor.py b/server/block_processor.py index 583dad4..b4d5b9d 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -10,7 +10,6 @@ import array import asyncio -import itertools import os from struct import pack, unpack import time @@ -438,47 +437,11 @@ class BlockProcessor(server.db.DB): def fs_flush(self): '''Flush the things stored on the filesystem.''' - blocks_done = len(self.headers) - prior_tx_count = (self.tx_counts[self.fs_height] - if self.fs_height >= 0 else 0) - cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 - txs_done = cur_tx_count - prior_tx_count - - assert self.fs_height + blocks_done == self.height - assert len(self.tx_hashes) == blocks_done - assert len(self.tx_counts) == self.height + 1 - assert cur_tx_count == self.tx_count, \ - 'cur: {:,d} new: {:,d}'.format(cur_tx_count, self.tx_count) - - # First the headers - headers = b''.join(self.headers) - header_len = self.coin.HEADER_LEN - self.headers_file.seek((self.fs_height + 1) * header_len) - self.headers_file.write(headers) - self.headers_file.flush() - - # Then the tx counts - self.txcount_file.seek((self.fs_height + 1) * self.tx_counts.itemsize) - self.txcount_file.write(self.tx_counts[self.fs_height + 1:]) - self.txcount_file.flush() - - # Finally the hashes - hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes))) - assert len(hashes) % 32 == 0 - assert len(hashes) // 32 == txs_done - cursor = 0 - file_pos = prior_tx_count * 32 - while cursor < len(hashes): - file_num, offset = divmod(file_pos, self.tx_hash_file_size) - size = min(len(hashes) - cursor, self.tx_hash_file_size - offset) - filename = 'hashes{:04d}'.format(file_num) - with self.open_file(filename, create=True) as f: - f.seek(offset) - f.write(hashes[cursor:cursor + size]) - cursor += size - file_pos += size - - os.sync() + assert self.fs_height + len(self.headers) == self.height + assert self.tx_count == self.tx_counts[-1] if self.tx_counts else 0 + + self.fs_update(self.fs_height, self.headers, self.tx_hashes) + self.fs_height = self.height self.fs_tx_count = self.tx_count self.tx_hashes = [] @@ -824,23 +787,6 @@ class BlockProcessor(server.db.DB): self.db_height = self.height self.db_tip = self.tip - def read_headers(self, start, count): - # Read some from disk - disk_count = min(count, max(0, self.fs_height + 1 - start)) - result = self.fs_read_headers(start, disk_count) - count -= disk_count - start += disk_count - - # The rest from memory - if count: - start -= self.fs_height + 1 - if not (count >= 0 and start + count <= len(self.headers)): - raise ChainError('{:,d} headers starting at {:,d} not on disk' - .format(count, start)) - result += b''.join(self.headers[start: start + count]) - - return result - def get_tx_hash(self, tx_num): '''Returns the tx_hash and height of a tx number.''' tx_hash, tx_height = self.fs_tx_hash(tx_num) diff --git a/server/db.py b/server/db.py index 609537e..376cb0f 100644 --- a/server/db.py +++ b/server/db.py @@ -10,6 +10,7 @@ import array import ast +import itertools import os from struct import pack, unpack from bisect import bisect_right @@ -117,7 +118,6 @@ class DB(LoggedClass): self.logger.info('sync time so far: {}' .format(formatted_time(self.wall_time))) - def write_state(self, batch): '''Write chain state to the batch.''' state = { @@ -142,7 +142,51 @@ class DB(LoggedClass): return open(filename, 'wb+') raise - def fs_read_headers(self, start, count): + def fs_update(self, fs_height, headers, block_tx_hashes): + '''Write headers, the tx_count array and block tx hashes to disk. + + Their first height is fs_height. No recorded DB state is + updated. These arrays are all append only, so in a crash we + just pick up again from the DB height. + ''' + blocks_done = len(self.headers) + new_height = fs_height + blocks_done + prior_tx_count = (self.tx_counts[fs_height] if fs_height >= 0 else 0) + cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0 + txs_done = cur_tx_count - prior_tx_count + + assert len(self.tx_hashes) == blocks_done + assert len(self.tx_counts) == new_height + 1 + + # First the headers + self.headers_file.seek((fs_height + 1) * self.coin.HEADER_LEN) + self.headers_file.write(b''.join(headers)) + self.headers_file.flush() + + # Then the tx counts + self.txcount_file.seek((fs_height + 1) * self.tx_counts.itemsize) + self.txcount_file.write(self.tx_counts[fs_height + 1:]) + self.txcount_file.flush() + + # Finally the hashes + hashes = memoryview(b''.join(itertools.chain(*block_tx_hashes))) + assert len(hashes) % 32 == 0 + assert len(hashes) // 32 == txs_done + cursor = 0 + file_pos = prior_tx_count * 32 + while cursor < len(hashes): + file_num, offset = divmod(file_pos, self.tx_hash_file_size) + size = min(len(hashes) - cursor, self.tx_hash_file_size - offset) + filename = 'hashes{:04d}'.format(file_num) + with self.open_file(filename, create=True) as f: + f.seek(offset) + f.write(hashes[cursor:cursor + size]) + cursor += size + file_pos += size + + os.sync() + + def read_headers(self, start, count): '''Requires count >= 0.''' # Read some from disk disk_count = min(count, self.db_height + 1 - start) @@ -172,7 +216,7 @@ class DB(LoggedClass): return f.read(32), tx_height def fs_block_hashes(self, height, count): - headers = self.fs_read_headers(height, count) + headers = self.read_headers(height, count) # FIXME: move to coins.py hlen = self.coin.HEADER_LEN return [self.coin.header_hash(header) diff --git a/server/protocol.py b/server/protocol.py index b9f1884..8690ea9 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -311,7 +311,8 @@ class ServerManager(util.LoggedClass): for session in self.sessions: if isinstance(session, ElectrumX): # Use a tuple to distinguish from JSON - session.messages.put_nowait((self.bp.height, touched, cache)) + triple = (self.bp.db_height, touched, cache) + session.messages.put_nowait(triple) async def shutdown(self): '''Call to shutdown the servers. Returns when done.''' @@ -377,7 +378,7 @@ class ServerManager(util.LoggedClass): async def rpc_getinfo(self, params): '''The RPC 'getinfo' call.''' return { - 'blocks': self.bp.height, + 'blocks': self.bp.db_height, 'peers': len(self.irc.peers), 'sessions': self.session_count(), 'watched': self.subscription_count, @@ -592,8 +593,8 @@ class ElectrumX(Session): .format(self.peername(), len(matches))) def height(self): - '''Return the block processor's current height.''' - return self.bp.height + '''Return the current flushed database height.''' + return self.bp.db_height def current_electrum_header(self): '''Used as response to a headers subscription request.'''