Browse Source

Remove the FS cache

Really belongs with BlockProcessor now
master
Neil Booth 8 years ago
parent
commit
ad2dabf751
  1. 86
      server/block_processor.py
  2. 160
      server/cache.py
  3. 89
      server/db.py

86
server/block_processor.py

@ -10,6 +10,8 @@
import array
import asyncio
import itertools
import os
import struct
import time
from bisect import bisect_left
@ -411,7 +413,7 @@ class BlockProcessor(server.db.DB):
start = self.height - 1
count = 1
while start > 0:
hashes = self.fs_cache.block_hashes(start, count)
hashes = self.block_hashes(start, count)
hex_hashes = [hash_to_str(hash) for hash in hashes]
d_hex_hashes = await self.daemon.block_hex_hashes(start, count)
n = match_pos(hex_hashes, d_hex_hashes)
@ -428,7 +430,7 @@ class BlockProcessor(server.db.DB):
'height {:,d} to height {:,d}'
.format(count, start, start + count - 1))
return self.fs_cache.block_hashes(start, count)
return self.block_hashes(start, count)
def clean_db(self):
'''Clean out stale DB items.
@ -534,7 +536,9 @@ class BlockProcessor(server.db.DB):
if self.height > self.db_height:
assert flush_history is None
flush_history = self.flush_history
self.fs_cache.flush(self.height, self.tx_count)
self.fs_flush()
self.logger.info('FS flush took {:.1f} seconds'
.format(time.time() - flush_start))
with self.db.write_batch() as batch:
# History first - fast and frees memory. Flush state last
@ -593,6 +597,55 @@ class BlockProcessor(server.db.DB):
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
def fs_flush(self):
'''Flush the things stored on the filesystem.
The arguments are passed for sanity check assertions only.'''
blocks_done = len(self.headers)
prior_tx_count = (self.tx_counts[self.fs_height]
if self.fs_height >= 0 else 0)
cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0
txs_done = cur_tx_count - prior_tx_count
assert self.fs_height + blocks_done == self.height
assert len(self.tx_hashes) == blocks_done
assert len(self.tx_counts) == self.height + 1
assert cur_tx_count == self.tx_count, \
'cur: {:,d} new: {:,d}'.format(cur_tx_count, self.tx_count)
# First the headers
headers = b''.join(self.headers)
header_len = self.coin.HEADER_LEN
self.headers_file.seek((self.fs_height + 1) * header_len)
self.headers_file.write(headers)
self.headers_file.flush()
# Then the tx counts
self.txcount_file.seek((self.fs_height + 1) * self.tx_counts.itemsize)
self.txcount_file.write(self.tx_counts[self.fs_height + 1:])
self.txcount_file.flush()
# Finally the hashes
hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes)))
assert len(hashes) % 32 == 0
assert len(hashes) // 32 == txs_done
cursor = 0
file_pos = prior_tx_count * 32
while cursor < len(hashes):
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
size = min(len(hashes) - cursor, self.tx_hash_file_size - offset)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename, create=True) as f:
f.seek(offset)
f.write(hashes[cursor:cursor + size])
cursor += size
file_pos += size
os.sync()
self.tx_hashes = []
self.headers = []
self.fs_height += blocks_done
def backup_history(self, batch, hash168s):
self.logger.info('backing up history to height {:,d} tx_count {:,d}'
.format(self.height, self.tx_count))
@ -662,9 +715,18 @@ class BlockProcessor(server.db.DB):
'''Read undo information from a file for the current height.'''
return self.db.get(self.undo_key(height))
def fs_advance_block(self, header, tx_hashes, txs):
'''Update unflushed FS state for a new block.'''
prior_tx_count = self.tx_counts[-1] if self.tx_counts else 0
# Cache the new header, tx hashes and cumulative tx count
self.headers.append(header)
self.tx_hashes.append(tx_hashes)
self.tx_counts.append(prior_tx_count + len(txs))
def advance_block(self, block, update_touched):
# We must update the fs_cache before calling advance_txs() as
# the UTXO cache uses the fs_cache via get_tx_hash() to
# We must update the FS cache before calling advance_txs() as
# the UTXO cache uses the FS cache via get_tx_hash() to
# resolve compressed key collisions
header, tx_hashes, txs = self.coin.read_block(block)
prev_hash, header_hash = self.coin.header_hashes(header)
@ -672,7 +734,7 @@ class BlockProcessor(server.db.DB):
raise ChainReorg
touched = set()
self.fs_cache.advance_block(header, tx_hashes, txs)
self.fs_advance_block(header, tx_hashes, txs)
self.tip = header_hash
self.height += 1
undo_info = self.advance_txs(tx_hashes, txs, touched)
@ -733,6 +795,16 @@ class BlockProcessor(server.db.DB):
return undo_info
def fs_backup_block(self):
'''Revert a block.'''
assert not self.headers
assert not self.tx_hashes
assert self.fs_height >= 0
# Just update in-memory. It doesn't matter if disk files are
# too long, they will be overwritten when advancing.
self.fs_height -= 1
self.tx_counts.pop()
def backup_blocks(self, blocks):
'''Backup the blocks and flush.
@ -752,7 +824,7 @@ class BlockProcessor(server.db.DB):
hash_to_str(self.tip), self.height))
self.backup_txs(tx_hashes, txs, touched)
self.fs_cache.backup_block()
self.fs_backup_block()
self.tip = prev_hash
self.height -= 1

160
server/cache.py

@ -12,14 +12,10 @@ Once synced flushes are performed after processing each block.
'''
import array
import itertools
import os
import struct
from bisect import bisect_right
from lib.util import chunks, LoggedClass
from lib.hash import double_sha256, hash_to_str
from lib.util import LoggedClass
from lib.hash import hash_to_str
# History can hold approx. 65536 * HIST_ENTRIES_PER_KEY entries
@ -259,155 +255,3 @@ class UTXOCache(LoggedClass):
.format(new_utxos, self.db_deletes,
hcolls, ucolls))
self.cache_spends = self.db_deletes = 0
class FSCache(LoggedClass):
def __init__(self, coin, height, tx_count):
super().__init__()
self.coin = coin
self.tx_hash_file_size = 16 * 1024 * 1024
assert self.tx_hash_file_size % 32 == 0
# On-disk values, updated by a flush
self.height = height
# Unflushed items
self.headers = []
self.tx_hashes = []
is_new = height == -1
self.headers_file = self.open_file('headers', is_new)
self.txcount_file = self.open_file('txcount', is_new)
# tx_counts[N] has the cumulative number of txs at the end of
# height N. So tx_counts[0] is 1 - the genesis coinbase
self.tx_counts = array.array('I')
self.txcount_file.seek(0)
self.tx_counts.fromfile(self.txcount_file, self.height + 1)
if self.tx_counts:
assert tx_count == self.tx_counts[-1]
else:
assert tx_count == 0
def open_file(self, filename, create=False):
'''Open the file name. Return its handle.'''
try:
return open(filename, 'rb+')
except FileNotFoundError:
if create:
return open(filename, 'wb+')
raise
def advance_block(self, header, tx_hashes, txs):
'''Update the FS cache for a new block.'''
prior_tx_count = self.tx_counts[-1] if self.tx_counts else 0
# Cache the new header, tx hashes and cumulative tx count
self.headers.append(header)
self.tx_hashes.append(tx_hashes)
self.tx_counts.append(prior_tx_count + len(txs))
def backup_block(self):
'''Revert a block.'''
assert not self.headers
assert not self.tx_hashes
assert self.height >= 0
# Just update in-memory. It doesn't matter if disk files are
# too long, they will be overwritten when advancing.
self.height -= 1
self.tx_counts.pop()
def flush(self, new_height, new_tx_count):
'''Flush the things stored on the filesystem.
The arguments are passed for sanity check assertions only.'''
self.logger.info('flushing to file system')
blocks_done = len(self.headers)
prior_tx_count = self.tx_counts[self.height] if self.height >= 0 else 0
cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0
txs_done = cur_tx_count - prior_tx_count
assert self.height + blocks_done == new_height
assert len(self.tx_hashes) == blocks_done
assert len(self.tx_counts) == new_height + 1
assert cur_tx_count == new_tx_count, \
'cur: {:,d} new: {:,d}'.format(cur_tx_count, new_tx_count)
# First the headers
headers = b''.join(self.headers)
header_len = self.coin.HEADER_LEN
self.headers_file.seek((self.height + 1) * header_len)
self.headers_file.write(headers)
self.headers_file.flush()
# Then the tx counts
self.txcount_file.seek((self.height + 1) * self.tx_counts.itemsize)
self.txcount_file.write(self.tx_counts[self.height + 1:])
self.txcount_file.flush()
# Finally the hashes
hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes)))
assert len(hashes) % 32 == 0
assert len(hashes) // 32 == txs_done
cursor = 0
file_pos = prior_tx_count * 32
while cursor < len(hashes):
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
size = min(len(hashes) - cursor, self.tx_hash_file_size - offset)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename, create=True) as f:
f.seek(offset)
f.write(hashes[cursor:cursor + size])
cursor += size
file_pos += size
os.sync()
self.tx_hashes = []
self.headers = []
self.height += blocks_done
def read_headers(self, start, count):
result = b''
# Read some from disk
disk_count = min(count, self.height + 1 - start)
if disk_count > 0:
header_len = self.coin.HEADER_LEN
assert start >= 0
self.headers_file.seek(start * header_len)
result = self.headers_file.read(disk_count * header_len)
count -= disk_count
start += disk_count
# The rest from memory
start -= self.height + 1
assert count >= 0 and start + count <= len(self.headers)
result += b''.join(self.headers[start: start + count])
return result
def get_tx_hash(self, tx_num):
'''Returns the tx_hash and height of a tx number.'''
height = bisect_right(self.tx_counts, tx_num)
# Is this on disk or unflushed?
if height > self.height:
tx_hashes = self.tx_hashes[height - (self.height + 1)]
tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]]
else:
file_pos = tx_num * 32
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename) as f:
f.seek(offset)
tx_hash = f.read(32)
return tx_hash, height
def block_hashes(self, height, count):
headers = self.read_headers(height, count)
hlen = self.coin.HEADER_LEN
return [double_sha256(header) for header in chunks(headers, hlen)]

89
server/db.py

@ -12,10 +12,11 @@ import array
import ast
import os
import struct
from bisect import bisect_right
from collections import namedtuple
from server.cache import FSCache, NO_CACHE_ENTRY
from lib.util import LoggedClass
from lib.util import chunks, LoggedClass
from lib.hash import double_sha256
from server.storage import open_db
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
@ -54,10 +55,29 @@ class DB(LoggedClass):
self.height = self.db_height
self.tip = self.db_tip
# Cache wrapping the filesystem and redirected functions
self.fs_cache = FSCache(self.coin, self.height, self.tx_count)
self.get_tx_hash = self.fs_cache.get_tx_hash
self.read_headers = self.fs_cache.read_headers
# -- FS related members --
self.tx_hash_file_size = 16 * 1024 * 1024
# On-disk height updated by a flush
self.fs_height = self.height
# Unflushed items
self.headers = []
self.tx_hashes = []
create = self.height == -1
self.headers_file = self.open_file('headers', create)
self.txcount_file = self.open_file('txcount', create)
# tx_counts[N] has the cumulative number of txs at the end of
# height N. So tx_counts[0] is 1 - the genesis coinbase
self.tx_counts = array.array('I')
self.txcount_file.seek(0)
self.tx_counts.fromfile(self.txcount_file, self.height + 1)
if self.tx_counts:
assert self.tx_count == self.tx_counts[-1]
else:
assert self.tx_count == 0
def init_state_from_db(self):
if self.db.is_new:
@ -83,6 +103,59 @@ class DB(LoggedClass):
self.wall_time = state['wall_time']
self.first_sync = state.get('first_sync', True)
def open_file(self, filename, create=False):
'''Open the file name. Return its handle.'''
try:
return open(filename, 'rb+')
except FileNotFoundError:
if create:
return open(filename, 'wb+')
raise
def read_headers(self, start, count):
result = b''
# Read some from disk
disk_count = min(count, self.fs_height + 1 - start)
if disk_count > 0:
header_len = self.coin.HEADER_LEN
assert start >= 0
self.headers_file.seek(start * header_len)
result = self.headers_file.read(disk_count * header_len)
count -= disk_count
start += disk_count
# The rest from memory
start -= self.fs_height + 1
assert count >= 0 and start + count <= len(self.headers)
result += b''.join(self.headers[start: start + count])
return result
def get_tx_hash(self, tx_num):
'''Returns the tx_hash and height of a tx number.'''
height = bisect_right(self.tx_counts, tx_num)
# Is this on disk or unflushed?
if height > self.fs_height:
tx_hashes = self.tx_hashes[height - (self.fs_height + 1)]
tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]]
else:
file_pos = tx_num * 32
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename) as f:
f.seek(offset)
tx_hash = f.read(32)
return tx_hash, height
def block_hashes(self, height, count):
headers = self.read_headers(height, count)
# FIXME: move to coins.py
hlen = self.coin.HEADER_LEN
return [double_sha256(header) for header in chunks(headers, hlen)]
@staticmethod
def _resolve_limit(limit):
if limit is None:
@ -142,9 +215,7 @@ class DB(LoggedClass):
hash168 = None
if 0 <= index <= 65535:
idx_packed = struct.pack('<H', index)
hash168 = self.hash168(tx_hash, idx_packed, False)
if hash168 == NO_CACHE_ENTRY:
hash168 = None
hash168 = self.hash168(tx_hash, idx_packed)
return hash168
def hash168(self, tx_hash, idx_packed):

Loading…
Cancel
Save