From c8c41995033e91044687caf285b272cf8a7faac8 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 3 Jan 2017 07:49:58 +0900 Subject: [PATCH] New VirtualFile abstraction Use for headers, tx counts and tx hashes. --- lib/util.py | 54 ++++++++++++++++++++++++++++++++++++++ server/db.py | 73 +++++++++++++++++++--------------------------------- 2 files changed, 81 insertions(+), 46 deletions(-) diff --git a/lib/util.py b/lib/util.py index c8a5fe2..c60a0d3 100644 --- a/lib/util.py +++ b/lib/util.py @@ -142,3 +142,57 @@ def increment_byte_string(bs): # This can only happen if all characters are 0xff bs = bytes([1]) + bs return bytes(bs) + + +class LogicalFile(object): + '''A logical binary file split across several separate files on disk.''' + + def __init__(self, prefix, digits, file_size): + digit_fmt = '{' + ':0{:d}d'.format(digits) + '}' + self.filename_fmt = prefix + digit_fmt + self.file_size = file_size + + def read(self, start, size=-1): + '''Read up to size bytes from the virtual file, starting at offset + start, and return them. + + If size is -1 all bytes are read.''' + parts = [] + while size != 0: + try: + with self.open_file(start, False) as f: + part = f.read(size) + if not part: + break + except FileNotFoundError: + break + parts.append(part) + start += len(part) + if size > 0: + size -= len(part) + return b''.join(parts) + + def write(self, start, b): + '''Write the bytes-like object, b, to the underlying virtual file.''' + while b: + size = min(len(b), self.file_size - (start % self.file_size)) + with self.open_file(start, True) as f: + f.write(b if size == len(b) else b[:size]) + b = b[size:] + start += size + + def open_file(self, start, create): + '''Open the virtual file and seek to start. Return a file handle. + Raise FileNotFoundError if the file does not exist and create + is False. + ''' + file_num, offset = divmod(start, self.file_size) + filename = self.filename_fmt.format(file_num) + try: + f= open(filename, 'rb+') + except FileNotFoundError: + if not create: + raise + f = open(filename, 'wb+') + f.seek(offset) + return f diff --git a/server/db.py b/server/db.py index 0c4ce88..440aa98 100644 --- a/server/db.py +++ b/server/db.py @@ -16,7 +16,7 @@ from struct import pack, unpack from bisect import bisect_right from collections import namedtuple -from lib.util import chunks, formatted_time, LoggedClass +import lib.util as util from lib.hash import hash_to_str from server.storage import open_db from server.version import VERSION @@ -24,7 +24,7 @@ from server.version import VERSION UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") -class DB(LoggedClass): +class DB(util.LoggedClass): '''Simple wrapper of the backend database for querying. Performs no DB update, though the DB will be cleaned on opening if @@ -53,16 +53,16 @@ class DB(LoggedClass): self.logger.info('reorg limit is {:,d} blocks' .format(self.env.reorg_limit)) - create = self.db_height == -1 - self.headers_file = self.open_file('headers', create) - self.txcount_file = self.open_file('txcount', create) - self.tx_hash_file_size = 16 * 1024 * 1024 + self.headers_file = util.LogicalFile('meta/headers', 2, 16000000) + self.tx_counts_file = util.LogicalFile('meta/txcounts', 2, 2000000) + self.hashes_file = util.LogicalFile('meta/hashes', 4, 16000000) # tx_counts[N] has the cumulative number of txs at the end of # height N. So tx_counts[0] is 1 - the genesis coinbase - self.tx_counts = array.array('I') - self.txcount_file.seek(0) - self.tx_counts.fromfile(self.txcount_file, self.db_height + 1) + size = (self.db_height + 1) * 4 + tx_counts = self.tx_counts_file.read(0, size) + assert len(tx_counts) == size + self.tx_counts = array.array('I', tx_counts) if self.tx_counts: assert self.db_tx_count == self.tx_counts[-1] else: @@ -112,7 +112,7 @@ class DB(LoggedClass): self.logger.info('tx count: {:,d}'.format(self.db_tx_count)) if self.first_sync: self.logger.info('sync time so far: {}' - .format(formatted_time(self.wall_time))) + .format(util.formatted_time(self.wall_time))) else: self.open_db(self.first_sync) @@ -253,32 +253,18 @@ class DB(LoggedClass): assert len(self.tx_hashes) == blocks_done assert len(self.tx_counts) == new_height + 1 - - # First the headers - self.headers_file.seek((fs_height + 1) * self.coin.HEADER_LEN) - self.headers_file.write(b''.join(headers)) - self.headers_file.flush() - - # Then the tx counts - self.txcount_file.seek((fs_height + 1) * self.tx_counts.itemsize) - self.txcount_file.write(self.tx_counts[fs_height + 1:]) - self.txcount_file.flush() - - # Finally the hashes - hashes = memoryview(b''.join(itertools.chain(*block_tx_hashes))) + hashes = b''.join(itertools.chain(*block_tx_hashes)) assert len(hashes) % 32 == 0 assert len(hashes) // 32 == txs_done - cursor = 0 - file_pos = prior_tx_count * 32 - while cursor < len(hashes): - file_num, offset = divmod(file_pos, self.tx_hash_file_size) - size = min(len(hashes) - cursor, self.tx_hash_file_size - offset) - filename = 'hashes{:04d}'.format(file_num) - with self.open_file(filename, create=True) as f: - f.seek(offset) - f.write(hashes[cursor:cursor + size]) - cursor += size - file_pos += size + + # Write the headers, tx counts, and tx hashes + offset = (fs_height + 1) * self.coin.HEADER_LEN + self.headers_file.write(offset, b''.join(headers)) + offset = (fs_height + 1) * self.tx_counts.itemsize + self.tx_counts_file.write(offset, + self.tx_counts[fs_height + 1:].tobytes()) + offset = prior_tx_count * 32 + self.hashes_file.write(offset, hashes) def read_headers(self, start, count): '''Requires count >= 0.''' @@ -289,8 +275,8 @@ class DB(LoggedClass): .format(count, start)) if disk_count: header_len = self.coin.HEADER_LEN - self.headers_file.seek(start * header_len) - return self.headers_file.read(disk_count * header_len) + offset = start * header_len + return self.headers_file.read(offset, disk_count * header_len) return b'' def fs_tx_hash(self, tx_num): @@ -298,23 +284,18 @@ class DB(LoggedClass): If the tx_height is not on disk, returns (None, tx_height).''' tx_height = bisect_right(self.tx_counts, tx_num) - if tx_height > self.db_height: - return None, tx_height - - file_pos = tx_num * 32 - file_num, offset = divmod(file_pos, self.tx_hash_file_size) - filename = 'hashes{:04d}'.format(file_num) - with self.open_file(filename) as f: - f.seek(offset) - return f.read(32), tx_height + tx_hash = None + else: + tx_hash = self.hashes_file.read(tx_num * 32, 32) + return tx_hash, height def fs_block_hashes(self, height, count): headers = self.read_headers(height, count) # FIXME: move to coins.py hlen = self.coin.HEADER_LEN return [self.coin.header_hash(header) - for header in chunks(headers, hlen)] + for header in util.chunks(headers, hlen)] @staticmethod def _resolve_limit(limit):