Browse Source

Break out FS cache into its own class

master
Neil Booth 8 years ago
parent
commit
28eb95edd3
  1. 282
      server/db.py

282
server/db.py

@ -275,6 +275,157 @@ class UTXOCache(LoggedClass):
self.adds = self.cache_hits = self.db_deletes = 0
class FSCache(LoggedClass):
def __init__(self, coin, height, tx_count):
super().__init__()
self.coin = coin
self.tx_hash_file_size = 16 * 1024 * 1024
assert self.tx_hash_file_size % 32 == 0
# On-disk values, updated by a flush
self.height = height
self.tx_count = tx_count
# Unflushed items
self.headers = []
self.tx_hashes = []
is_new = height == -1
self.headers_file = self.open_file('headers', is_new)
self.txcount_file = self.open_file('txcount', is_new)
self.tx_counts = array.array('I')
self.txcount_file.seek(0)
self.tx_counts.fromfile(self.txcount_file, self.height + 1)
def open_file(self, filename, create=False):
'''Open the file name. Return its handle.'''
try:
return open(filename, 'rb+')
except FileNotFoundError:
if create:
return open(filename, 'wb+')
raise
def process_block(self, block):
'''Process a new block.'''
assert len(self.tx_counts) == self.height + 1 + len(self.headers)
tx_hashes, txs = self.coin.read_block(block)
# Cache the new header, tx hashes and cumulative tx count
self.headers.append(block[:self.coin.HEADER_LEN])
self.tx_hashes.append(tx_hashes)
self.tx_counts.append(self.tx_count + len(txs))
return tx_hashes, txs
def flush(self, new_height, new_tx_count):
'''Flush the things stored on the filesystem.'''
self.logger.info('flushing to file system')
block_count = len(self.headers)
assert self.height + block_count == new_height
assert len(self.tx_hashes) == block_count
assert len(self.tx_counts) == self.height + 1 + block_count
# First the headers
headers = b''.join(self.headers)
header_len = self.coin.HEADER_LEN
self.headers_file.seek((self.height + 1) * header_len)
self.headers_file.write(headers)
self.headers_file.flush()
# Then the tx counts
self.txcount_file.seek((self.height + 1) * self.tx_counts.itemsize)
self.txcount_file.write(self.tx_counts[self.height + 1:])
self.txcount_file.flush()
# Finally the hashes
hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes)))
assert len(hashes) % 32 == 0
assert self.tx_count + len(hashes) // 32 == new_tx_count
cursor = 0
file_pos = self.tx_count * 32
while cursor < len(hashes):
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
size = min(len(hashes) - cursor, self.tx_hash_file_size - offset)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename, create=True) as f:
f.seek(offset)
f.write(hashes[cursor:cursor + size])
cursor += size
file_pos += size
os.sync()
tx_diff = new_tx_count - self.tx_count
self.tx_hashes = []
self.headers = []
self.height += block_count
self.tx_count = new_tx_count
return tx_diff
def read_headers(self, height, count):
read_count = min(count, self.height + 1 - height)
assert height >= 0 and read_count >= 0
assert count <= read_count + len(self.headers)
result = b''
if read_count > 0:
header_len = self.coin.HEADER_LEN
self.headers_file.seek(height * header_len)
result = self.headers_file.read(read_count * header_len)
count -= read_count
if count:
start = (height + read_count) - (self.height + 1)
result += b''.join(self.headers[start: start + count])
return result
def get_tx_hash(self, tx_num):
'''Returns the tx_hash and height of a tx number.'''
height = bisect_right(self.tx_counts, tx_num)
# Is this on disk or unflushed?
if height > self.height:
tx_hashes = self.tx_hashes[height - (self.height + 1)]
tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]]
else:
file_pos = tx_num * 32
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename) as f:
f.seek(offset)
tx_hash = f.read(32)
return tx_hash, height
def encode_header(self, height):
if height < 0 or height > self.height + len(self.headers):
raise Exception('no header information for height {:,d}'
.format(height))
header = self.read_headers(self.height, 1)
unpack = struct.unpack
version, = unpack('<I', header[:4])
timestamp, bits, nonce = unpack('<III', header[68:80])
return {
'block_height': self.height,
'version': version,
'prev_block_hash': hash_to_str(header[4:36]),
'merkle_root': hash_to_str(header[36:68]),
'timestamp': timestamp,
'bits': bits,
'nonce': nonce,
}
class DB(LoggedClass):
class Error(Exception):
@ -284,13 +435,11 @@ class DB(LoggedClass):
super().__init__()
# Meta
self.tx_hash_file_size = 16 * 1024 * 1024
self.utxo_MB = env.utxo_MB
self.hist_MB = env.hist_MB
self.next_cache_check = 0
self.last_flush = time.time()
self.coin = env.coin
self.current_header = None
# Chain state (initialize to genesis in case of new DB)
self.db_height = -1
@ -302,19 +451,15 @@ class DB(LoggedClass):
# Open DB and metadata files. Record some of its state.
self.db = self.open_db(self.coin)
self.tx_count = self.fs_tx_count = self.db_tx_count
self.height = self.fs_height = self.db_height
self.tx_count = self.db_tx_count
self.height = self.db_height
# Caches to be flushed later. Headers and tx_hashes have one
# entry per block
self.headers = []
self.tx_hashes = []
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
self.utxo_cache = UTXOCache(self, self.db, self.coin)
self.tx_counts = array.array('I')
self.txcount_file.seek(0)
self.tx_counts.fromfile(self.txcount_file, self.height + 1)
self.fs_cache = FSCache(self.coin, self.height, self.tx_count)
# Log state
self.logger.info('{}/{} height: {:,d} tx count: {:,d} '
@ -332,16 +477,12 @@ class DB(LoggedClass):
def open_db(self, coin):
db_name = '{}-{}'.format(coin.NAME, coin.NET)
is_new = False
try:
db = plyvel.DB(db_name, create_if_missing=False,
error_if_exists=False, compression=None)
except:
db = plyvel.DB(db_name, create_if_missing=True,
error_if_exists=True, compression=None)
is_new = True
if is_new:
self.logger.info('created new database {}'.format(db_name))
self.flush_state(db)
else:
@ -349,9 +490,6 @@ class DB(LoggedClass):
self.read_state(db)
self.delete_excess_history(db)
self.headers_file = self.open_file('headers', is_new)
self.txcount_file = self.open_file('txcount', is_new)
return db
def read_state(self, db):
@ -395,43 +533,6 @@ class DB(LoggedClass):
self.flush_state(batch)
self.logger.info('deletion complete')
def flush_to_fs(self):
'''Flush the things stored on the filesystem.'''
# First the headers
headers = b''.join(self.headers)
header_len = self.coin.HEADER_LEN
self.headers_file.seek((self.fs_height + 1) * header_len)
self.headers_file.write(headers)
self.headers_file.flush()
self.headers = []
# Then the tx counts
self.txcount_file.seek((self.fs_height + 1) * self.tx_counts.itemsize)
self.txcount_file.write(self.tx_counts[self.fs_height + 1:
self.height + 1])
self.txcount_file.flush()
# Finally the hashes
hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes)))
assert len(hashes) % 32 == 0
assert self.tx_hash_file_size % 32 == 0
cursor = 0
file_pos = self.fs_tx_count * 32
while cursor < len(hashes):
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
size = min(len(hashes) - cursor, self.tx_hash_file_size - offset)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename, create=True) as f:
f.seek(offset)
f.write(hashes[cursor:cursor + size])
cursor += size
file_pos += size
self.tx_hashes = []
self.fs_height = self.height
self.fs_tx_count = self.tx_count
os.sync()
def flush_state(self, batch):
'''Flush chain state to the batch.'''
now = time.time()
@ -463,14 +564,12 @@ class DB(LoggedClass):
History is always flushed. UTXOs are flushed if flush_utxos.'''
flush_start = time.time()
last_flush = self.last_flush
tx_diff = self.tx_count - self.fs_tx_count
# Write out the files to the FS before flushing to the DB. If
# the DB transaction fails, the files being too long doesn't
# matter. But if writing the files fails we do not want to
# have updated the DB.
self.logger.info('commencing history flush')
self.flush_to_fs()
tx_diff = self.fs_cache.flush(self.height, self.tx_count)
with self.db.write_batch(transaction=True) as batch:
# History first - fast and frees memory. Flush state last
@ -507,6 +606,8 @@ class DB(LoggedClass):
formatted_time(tx_est / this_txs_per_sec)))
def flush_history(self, batch):
self.logger.info('flushing history')
# Drop any None entry
self.history.pop(None, None)
@ -522,20 +623,6 @@ class DB(LoggedClass):
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
def open_file(self, filename, create=False):
'''Open the file name. Return its handle.'''
try:
return open(filename, 'rb+')
except FileNotFoundError:
if create:
return open(filename, 'wb+')
raise
def read_headers(self, height, count):
header_len = self.coin.HEADER_LEN
self.headers_file.seek(height * header_len)
return self.headers_file.read(count * header_len)
def cache_sizes(self, daemon_height):
'''Returns the approximate size of the cache, in MB.'''
# Good average estimates based on traversal of subobjects and
@ -562,18 +649,11 @@ class DB(LoggedClass):
return utxo_MB, hist_MB
def process_block(self, block, daemon_height):
self.headers.append(block[:self.coin.HEADER_LEN])
# We must update the fs_cache before calling process_tx() as
# it uses the fs_cache for tx hash lookup
tx_hashes, txs = self.fs_cache.process_block(block)
tx_hashes, txs = self.coin.read_block(block)
self.height += 1
assert len(self.tx_counts) == self.height
# These both need to be updated before calling process_tx().
# It uses them for tx hash lookup
self.tx_hashes.append(tx_hashes)
self.tx_counts.append(self.tx_count + len(txs))
for tx_hash, tx in zip(tx_hashes, txs):
self.process_tx(tx_hash, tx)
@ -601,24 +681,6 @@ class DB(LoggedClass):
self.tx_count += 1
def get_tx_hash(self, tx_num):
'''Returns the tx_hash and height of a tx number.'''
height = bisect_right(self.tx_counts, tx_num)
# Is this on disk or unflushed?
if height > self.fs_height:
tx_hashes = self.tx_hashes[height - (self.fs_height + 1)]
tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]]
else:
file_pos = tx_num * 32
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename) as f:
f.seek(offset)
tx_hash = f.read(32)
return tx_hash, height
@staticmethod
def resolve_limit(limit):
if limit is None:
@ -674,26 +736,6 @@ class DB(LoggedClass):
position in the block.'''
return sorted(self.get_utxos(hash168, limit=None))
def encode_header(self):
if self.height == -1:
return None
header = self.read_headers(self.height, 1)
unpack = struct.unpack
version, = unpack('<I', header[:4])
timestamp, bits, nonce = unpack('<III', header[68:80])
return {
'block_height': self.height,
'version': version,
'prev_block_hash': hash_to_str(header[4:36]),
'merkle_root': hash_to_str(header[36:68]),
'timestamp': timestamp,
'bits': bits,
'nonce': nonce,
}
def get_current_header(self):
# FIXME: clear current_header on new block
if self.current_header is None:
self.current_header = self.encode_header()
return self.current_header
'''Returns the current header as a dictionary.'''
return self.fs_cache.encode_header(self.height)

Loading…
Cancel
Save