Browse Source

New VirtualFile abstraction

Use for headers, tx counts and tx hashes.
master
Neil Booth 8 years ago
parent
commit
c8c4199503
  1. 54
      lib/util.py
  2. 73
      server/db.py

54
lib/util.py

@ -142,3 +142,57 @@ def increment_byte_string(bs):
# This can only happen if all characters are 0xff # This can only happen if all characters are 0xff
bs = bytes([1]) + bs bs = bytes([1]) + bs
return bytes(bs) return bytes(bs)
class LogicalFile(object):
'''A logical binary file split across several separate files on disk.'''
def __init__(self, prefix, digits, file_size):
digit_fmt = '{' + ':0{:d}d'.format(digits) + '}'
self.filename_fmt = prefix + digit_fmt
self.file_size = file_size
def read(self, start, size=-1):
'''Read up to size bytes from the virtual file, starting at offset
start, and return them.
If size is -1 all bytes are read.'''
parts = []
while size != 0:
try:
with self.open_file(start, False) as f:
part = f.read(size)
if not part:
break
except FileNotFoundError:
break
parts.append(part)
start += len(part)
if size > 0:
size -= len(part)
return b''.join(parts)
def write(self, start, b):
'''Write the bytes-like object, b, to the underlying virtual file.'''
while b:
size = min(len(b), self.file_size - (start % self.file_size))
with self.open_file(start, True) as f:
f.write(b if size == len(b) else b[:size])
b = b[size:]
start += size
def open_file(self, start, create):
'''Open the virtual file and seek to start. Return a file handle.
Raise FileNotFoundError if the file does not exist and create
is False.
'''
file_num, offset = divmod(start, self.file_size)
filename = self.filename_fmt.format(file_num)
try:
f= open(filename, 'rb+')
except FileNotFoundError:
if not create:
raise
f = open(filename, 'wb+')
f.seek(offset)
return f

73
server/db.py

@ -16,7 +16,7 @@ from struct import pack, unpack
from bisect import bisect_right from bisect import bisect_right
from collections import namedtuple from collections import namedtuple
from lib.util import chunks, formatted_time, LoggedClass import lib.util as util
from lib.hash import hash_to_str from lib.hash import hash_to_str
from server.storage import open_db from server.storage import open_db
from server.version import VERSION from server.version import VERSION
@ -24,7 +24,7 @@ from server.version import VERSION
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value") UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
class DB(LoggedClass): class DB(util.LoggedClass):
'''Simple wrapper of the backend database for querying. '''Simple wrapper of the backend database for querying.
Performs no DB update, though the DB will be cleaned on opening if Performs no DB update, though the DB will be cleaned on opening if
@ -53,16 +53,16 @@ class DB(LoggedClass):
self.logger.info('reorg limit is {:,d} blocks' self.logger.info('reorg limit is {:,d} blocks'
.format(self.env.reorg_limit)) .format(self.env.reorg_limit))
create = self.db_height == -1 self.headers_file = util.LogicalFile('meta/headers', 2, 16000000)
self.headers_file = self.open_file('headers', create) self.tx_counts_file = util.LogicalFile('meta/txcounts', 2, 2000000)
self.txcount_file = self.open_file('txcount', create) self.hashes_file = util.LogicalFile('meta/hashes', 4, 16000000)
self.tx_hash_file_size = 16 * 1024 * 1024
# tx_counts[N] has the cumulative number of txs at the end of # tx_counts[N] has the cumulative number of txs at the end of
# height N. So tx_counts[0] is 1 - the genesis coinbase # height N. So tx_counts[0] is 1 - the genesis coinbase
self.tx_counts = array.array('I') size = (self.db_height + 1) * 4
self.txcount_file.seek(0) tx_counts = self.tx_counts_file.read(0, size)
self.tx_counts.fromfile(self.txcount_file, self.db_height + 1) assert len(tx_counts) == size
self.tx_counts = array.array('I', tx_counts)
if self.tx_counts: if self.tx_counts:
assert self.db_tx_count == self.tx_counts[-1] assert self.db_tx_count == self.tx_counts[-1]
else: else:
@ -112,7 +112,7 @@ class DB(LoggedClass):
self.logger.info('tx count: {:,d}'.format(self.db_tx_count)) self.logger.info('tx count: {:,d}'.format(self.db_tx_count))
if self.first_sync: if self.first_sync:
self.logger.info('sync time so far: {}' self.logger.info('sync time so far: {}'
.format(formatted_time(self.wall_time))) .format(util.formatted_time(self.wall_time)))
else: else:
self.open_db(self.first_sync) self.open_db(self.first_sync)
@ -253,32 +253,18 @@ class DB(LoggedClass):
assert len(self.tx_hashes) == blocks_done assert len(self.tx_hashes) == blocks_done
assert len(self.tx_counts) == new_height + 1 assert len(self.tx_counts) == new_height + 1
hashes = b''.join(itertools.chain(*block_tx_hashes))
# First the headers
self.headers_file.seek((fs_height + 1) * self.coin.HEADER_LEN)
self.headers_file.write(b''.join(headers))
self.headers_file.flush()
# Then the tx counts
self.txcount_file.seek((fs_height + 1) * self.tx_counts.itemsize)
self.txcount_file.write(self.tx_counts[fs_height + 1:])
self.txcount_file.flush()
# Finally the hashes
hashes = memoryview(b''.join(itertools.chain(*block_tx_hashes)))
assert len(hashes) % 32 == 0 assert len(hashes) % 32 == 0
assert len(hashes) // 32 == txs_done assert len(hashes) // 32 == txs_done
cursor = 0
file_pos = prior_tx_count * 32 # Write the headers, tx counts, and tx hashes
while cursor < len(hashes): offset = (fs_height + 1) * self.coin.HEADER_LEN
file_num, offset = divmod(file_pos, self.tx_hash_file_size) self.headers_file.write(offset, b''.join(headers))
size = min(len(hashes) - cursor, self.tx_hash_file_size - offset) offset = (fs_height + 1) * self.tx_counts.itemsize
filename = 'hashes{:04d}'.format(file_num) self.tx_counts_file.write(offset,
with self.open_file(filename, create=True) as f: self.tx_counts[fs_height + 1:].tobytes())
f.seek(offset) offset = prior_tx_count * 32
f.write(hashes[cursor:cursor + size]) self.hashes_file.write(offset, hashes)
cursor += size
file_pos += size
def read_headers(self, start, count): def read_headers(self, start, count):
'''Requires count >= 0.''' '''Requires count >= 0.'''
@ -289,8 +275,8 @@ class DB(LoggedClass):
.format(count, start)) .format(count, start))
if disk_count: if disk_count:
header_len = self.coin.HEADER_LEN header_len = self.coin.HEADER_LEN
self.headers_file.seek(start * header_len) offset = start * header_len
return self.headers_file.read(disk_count * header_len) return self.headers_file.read(offset, disk_count * header_len)
return b'' return b''
def fs_tx_hash(self, tx_num): def fs_tx_hash(self, tx_num):
@ -298,23 +284,18 @@ class DB(LoggedClass):
If the tx_height is not on disk, returns (None, tx_height).''' If the tx_height is not on disk, returns (None, tx_height).'''
tx_height = bisect_right(self.tx_counts, tx_num) tx_height = bisect_right(self.tx_counts, tx_num)
if tx_height > self.db_height: if tx_height > self.db_height:
return None, tx_height tx_hash = None
else:
file_pos = tx_num * 32 tx_hash = self.hashes_file.read(tx_num * 32, 32)
file_num, offset = divmod(file_pos, self.tx_hash_file_size) return tx_hash, height
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename) as f:
f.seek(offset)
return f.read(32), tx_height
def fs_block_hashes(self, height, count): def fs_block_hashes(self, height, count):
headers = self.read_headers(height, count) headers = self.read_headers(height, count)
# FIXME: move to coins.py # FIXME: move to coins.py
hlen = self.coin.HEADER_LEN hlen = self.coin.HEADER_LEN
return [self.coin.header_hash(header) return [self.coin.header_hash(header)
for header in chunks(headers, hlen)] for header in util.chunks(headers, hlen)]
@staticmethod @staticmethod
def _resolve_limit(limit): def _resolve_limit(limit):

Loading…
Cancel
Save