Browse Source

Merge branch 'remove_fs_cache' into develop

master
Neil Booth 8 years ago
parent
commit
8e0f556e7b
  1. 91
      server/block_processor.py
  2. 160
      server/cache.py
  3. 123
      server/db.py

91
server/block_processor.py

@ -10,13 +10,15 @@
import array
import asyncio
import itertools
import os
import struct
import time
from bisect import bisect_left
from collections import defaultdict
from functools import partial
from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY
from server.cache import UTXOCache, NO_CACHE_ENTRY
from server.daemon import Daemon, DaemonError
from lib.hash import hash_to_str
from lib.tx import Deserializer
@ -315,6 +317,9 @@ class BlockProcessor(server.db.DB):
self.last_flush = time.time()
self.last_flush_tx_count = self.tx_count
# UTXO cache
self.utxo_cache = UTXOCache(self.get_tx_hash, self.db, self.coin)
# Log state
self.logger.info('{}/{} height: {:,d} tx count: {:,d} '
'flush count: {:,d} utxo flush count: {:,d} '
@ -408,7 +413,7 @@ class BlockProcessor(server.db.DB):
start = self.height - 1
count = 1
while start > 0:
hashes = self.fs_cache.block_hashes(start, count)
hashes = self.block_hashes(start, count)
hex_hashes = [hash_to_str(hash) for hash in hashes]
d_hex_hashes = await self.daemon.block_hex_hashes(start, count)
n = match_pos(hex_hashes, d_hex_hashes)
@ -425,7 +430,7 @@ class BlockProcessor(server.db.DB):
'height {:,d} to height {:,d}'
.format(count, start, start + count - 1))
return self.fs_cache.block_hashes(start, count)
return self.block_hashes(start, count)
def clean_db(self):
'''Clean out stale DB items.
@ -531,7 +536,9 @@ class BlockProcessor(server.db.DB):
if self.height > self.db_height:
assert flush_history is None
flush_history = self.flush_history
self.fs_cache.flush(self.height, self.tx_count)
self.fs_flush()
self.logger.info('FS flush took {:.1f} seconds'
.format(time.time() - flush_start))
with self.db.write_batch() as batch:
# History first - fast and frees memory. Flush state last
@ -590,6 +597,55 @@ class BlockProcessor(server.db.DB):
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
def fs_flush(self):
'''Flush the things stored on the filesystem.
The arguments are passed for sanity check assertions only.'''
blocks_done = len(self.headers)
prior_tx_count = (self.tx_counts[self.fs_height]
if self.fs_height >= 0 else 0)
cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0
txs_done = cur_tx_count - prior_tx_count
assert self.fs_height + blocks_done == self.height
assert len(self.tx_hashes) == blocks_done
assert len(self.tx_counts) == self.height + 1
assert cur_tx_count == self.tx_count, \
'cur: {:,d} new: {:,d}'.format(cur_tx_count, self.tx_count)
# First the headers
headers = b''.join(self.headers)
header_len = self.coin.HEADER_LEN
self.headers_file.seek((self.fs_height + 1) * header_len)
self.headers_file.write(headers)
self.headers_file.flush()
# Then the tx counts
self.txcount_file.seek((self.fs_height + 1) * self.tx_counts.itemsize)
self.txcount_file.write(self.tx_counts[self.fs_height + 1:])
self.txcount_file.flush()
# Finally the hashes
hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes)))
assert len(hashes) % 32 == 0
assert len(hashes) // 32 == txs_done
cursor = 0
file_pos = prior_tx_count * 32
while cursor < len(hashes):
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
size = min(len(hashes) - cursor, self.tx_hash_file_size - offset)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename, create=True) as f:
f.seek(offset)
f.write(hashes[cursor:cursor + size])
cursor += size
file_pos += size
os.sync()
self.tx_hashes = []
self.headers = []
self.fs_height += blocks_done
def backup_history(self, batch, hash168s):
self.logger.info('backing up history to height {:,d} tx_count {:,d}'
.format(self.height, self.tx_count))
@ -659,9 +715,18 @@ class BlockProcessor(server.db.DB):
'''Read undo information from a file for the current height.'''
return self.db.get(self.undo_key(height))
def fs_advance_block(self, header, tx_hashes, txs):
'''Update unflushed FS state for a new block.'''
prior_tx_count = self.tx_counts[-1] if self.tx_counts else 0
# Cache the new header, tx hashes and cumulative tx count
self.headers.append(header)
self.tx_hashes.append(tx_hashes)
self.tx_counts.append(prior_tx_count + len(txs))
def advance_block(self, block, update_touched):
# We must update the fs_cache before calling advance_txs() as
# the UTXO cache uses the fs_cache via get_tx_hash() to
# We must update the FS cache before calling advance_txs() as
# the UTXO cache uses the FS cache via get_tx_hash() to
# resolve compressed key collisions
header, tx_hashes, txs = self.coin.read_block(block)
prev_hash, header_hash = self.coin.header_hashes(header)
@ -669,7 +734,7 @@ class BlockProcessor(server.db.DB):
raise ChainReorg
touched = set()
self.fs_cache.advance_block(header, tx_hashes, txs)
self.fs_advance_block(header, tx_hashes, txs)
self.tip = header_hash
self.height += 1
undo_info = self.advance_txs(tx_hashes, txs, touched)
@ -730,6 +795,16 @@ class BlockProcessor(server.db.DB):
return undo_info
def fs_backup_block(self):
'''Revert a block.'''
assert not self.headers
assert not self.tx_hashes
assert self.fs_height >= 0
# Just update in-memory. It doesn't matter if disk files are
# too long, they will be overwritten when advancing.
self.fs_height -= 1
self.tx_counts.pop()
def backup_blocks(self, blocks):
'''Backup the blocks and flush.
@ -749,7 +824,7 @@ class BlockProcessor(server.db.DB):
hash_to_str(self.tip), self.height))
self.backup_txs(tx_hashes, txs, touched)
self.fs_cache.backup_block()
self.fs_backup_block()
self.tip = prev_hash
self.height -= 1

160
server/cache.py

@ -12,14 +12,10 @@ Once synced flushes are performed after processing each block.
'''
import array
import itertools
import os
import struct
from bisect import bisect_right
from lib.util import chunks, LoggedClass
from lib.hash import double_sha256, hash_to_str
from lib.util import LoggedClass
from lib.hash import hash_to_str
# History can hold approx. 65536 * HIST_ENTRIES_PER_KEY entries
@ -259,155 +255,3 @@ class UTXOCache(LoggedClass):
.format(new_utxos, self.db_deletes,
hcolls, ucolls))
self.cache_spends = self.db_deletes = 0
class FSCache(LoggedClass):
def __init__(self, coin, height, tx_count):
super().__init__()
self.coin = coin
self.tx_hash_file_size = 16 * 1024 * 1024
assert self.tx_hash_file_size % 32 == 0
# On-disk values, updated by a flush
self.height = height
# Unflushed items
self.headers = []
self.tx_hashes = []
is_new = height == -1
self.headers_file = self.open_file('headers', is_new)
self.txcount_file = self.open_file('txcount', is_new)
# tx_counts[N] has the cumulative number of txs at the end of
# height N. So tx_counts[0] is 1 - the genesis coinbase
self.tx_counts = array.array('I')
self.txcount_file.seek(0)
self.tx_counts.fromfile(self.txcount_file, self.height + 1)
if self.tx_counts:
assert tx_count == self.tx_counts[-1]
else:
assert tx_count == 0
def open_file(self, filename, create=False):
'''Open the file name. Return its handle.'''
try:
return open(filename, 'rb+')
except FileNotFoundError:
if create:
return open(filename, 'wb+')
raise
def advance_block(self, header, tx_hashes, txs):
'''Update the FS cache for a new block.'''
prior_tx_count = self.tx_counts[-1] if self.tx_counts else 0
# Cache the new header, tx hashes and cumulative tx count
self.headers.append(header)
self.tx_hashes.append(tx_hashes)
self.tx_counts.append(prior_tx_count + len(txs))
def backup_block(self):
'''Revert a block.'''
assert not self.headers
assert not self.tx_hashes
assert self.height >= 0
# Just update in-memory. It doesn't matter if disk files are
# too long, they will be overwritten when advancing.
self.height -= 1
self.tx_counts.pop()
def flush(self, new_height, new_tx_count):
'''Flush the things stored on the filesystem.
The arguments are passed for sanity check assertions only.'''
self.logger.info('flushing to file system')
blocks_done = len(self.headers)
prior_tx_count = self.tx_counts[self.height] if self.height >= 0 else 0
cur_tx_count = self.tx_counts[-1] if self.tx_counts else 0
txs_done = cur_tx_count - prior_tx_count
assert self.height + blocks_done == new_height
assert len(self.tx_hashes) == blocks_done
assert len(self.tx_counts) == new_height + 1
assert cur_tx_count == new_tx_count, \
'cur: {:,d} new: {:,d}'.format(cur_tx_count, new_tx_count)
# First the headers
headers = b''.join(self.headers)
header_len = self.coin.HEADER_LEN
self.headers_file.seek((self.height + 1) * header_len)
self.headers_file.write(headers)
self.headers_file.flush()
# Then the tx counts
self.txcount_file.seek((self.height + 1) * self.tx_counts.itemsize)
self.txcount_file.write(self.tx_counts[self.height + 1:])
self.txcount_file.flush()
# Finally the hashes
hashes = memoryview(b''.join(itertools.chain(*self.tx_hashes)))
assert len(hashes) % 32 == 0
assert len(hashes) // 32 == txs_done
cursor = 0
file_pos = prior_tx_count * 32
while cursor < len(hashes):
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
size = min(len(hashes) - cursor, self.tx_hash_file_size - offset)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename, create=True) as f:
f.seek(offset)
f.write(hashes[cursor:cursor + size])
cursor += size
file_pos += size
os.sync()
self.tx_hashes = []
self.headers = []
self.height += blocks_done
def read_headers(self, start, count):
result = b''
# Read some from disk
disk_count = min(count, self.height + 1 - start)
if disk_count > 0:
header_len = self.coin.HEADER_LEN
assert start >= 0
self.headers_file.seek(start * header_len)
result = self.headers_file.read(disk_count * header_len)
count -= disk_count
start += disk_count
# The rest from memory
start -= self.height + 1
assert count >= 0 and start + count <= len(self.headers)
result += b''.join(self.headers[start: start + count])
return result
def get_tx_hash(self, tx_num):
'''Returns the tx_hash and height of a tx number.'''
height = bisect_right(self.tx_counts, tx_num)
# Is this on disk or unflushed?
if height > self.height:
tx_hashes = self.tx_hashes[height - (self.height + 1)]
tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]]
else:
file_pos = tx_num * 32
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename) as f:
f.seek(offset)
tx_hash = f.read(32)
return tx_hash, height
def block_hashes(self, height, count):
headers = self.read_headers(height, count)
hlen = self.coin.HEADER_LEN
return [double_sha256(header) for header in chunks(headers, hlen)]

123
server/db.py

@ -12,15 +12,15 @@ import array
import ast
import os
import struct
from bisect import bisect_right
from collections import namedtuple
from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY
from lib.util import LoggedClass
from lib.util import chunks, LoggedClass
from lib.hash import double_sha256
from server.storage import open_db
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
class DB(LoggedClass):
'''Simple wrapper of the backend database for querying.
@ -28,6 +28,9 @@ class DB(LoggedClass):
it was shutdown uncleanly.
'''
class DBError(Exception):
pass
def __init__(self, env):
super().__init__()
self.env = env
@ -52,13 +55,29 @@ class DB(LoggedClass):
self.height = self.db_height
self.tip = self.db_tip
# Cache wrapping the filesystem and redirected functions
self.fs_cache = FSCache(self.coin, self.height, self.tx_count)
self.get_tx_hash = self.fs_cache.get_tx_hash
self.read_headers = self.fs_cache.read_headers
# -- FS related members --
self.tx_hash_file_size = 16 * 1024 * 1024
# On-disk height updated by a flush
self.fs_height = self.height
# Unflushed items
self.headers = []
self.tx_hashes = []
# UTXO cache
self.utxo_cache = UTXOCache(self.get_tx_hash, self.db, self.coin)
create = self.height == -1
self.headers_file = self.open_file('headers', create)
self.txcount_file = self.open_file('txcount', create)
# tx_counts[N] has the cumulative number of txs at the end of
# height N. So tx_counts[0] is 1 - the genesis coinbase
self.tx_counts = array.array('I')
self.txcount_file.seek(0)
self.tx_counts.fromfile(self.txcount_file, self.height + 1)
if self.tx_counts:
assert self.tx_count == self.tx_counts[-1]
else:
assert self.tx_count == 0
def init_state_from_db(self):
if self.db.is_new:
@ -73,9 +92,9 @@ class DB(LoggedClass):
state = self.db.get(b'state')
state = ast.literal_eval(state.decode())
if state['genesis'] != self.coin.GENESIS_HASH:
raise ChainError('DB genesis hash {} does not match coin {}'
.format(state['genesis_hash'],
self.coin.GENESIS_HASH))
raise self.DBError('DB genesis hash {} does not match coin {}'
.format(state['genesis_hash'],
self.coin.GENESIS_HASH))
self.db_height = state['height']
self.db_tx_count = state['tx_count']
self.db_tip = state['tip']
@ -84,6 +103,59 @@ class DB(LoggedClass):
self.wall_time = state['wall_time']
self.first_sync = state.get('first_sync', True)
def open_file(self, filename, create=False):
'''Open the file name. Return its handle.'''
try:
return open(filename, 'rb+')
except FileNotFoundError:
if create:
return open(filename, 'wb+')
raise
def read_headers(self, start, count):
result = b''
# Read some from disk
disk_count = min(count, self.fs_height + 1 - start)
if disk_count > 0:
header_len = self.coin.HEADER_LEN
assert start >= 0
self.headers_file.seek(start * header_len)
result = self.headers_file.read(disk_count * header_len)
count -= disk_count
start += disk_count
# The rest from memory
start -= self.fs_height + 1
assert count >= 0 and start + count <= len(self.headers)
result += b''.join(self.headers[start: start + count])
return result
def get_tx_hash(self, tx_num):
'''Returns the tx_hash and height of a tx number.'''
height = bisect_right(self.tx_counts, tx_num)
# Is this on disk or unflushed?
if height > self.fs_height:
tx_hashes = self.tx_hashes[height - (self.fs_height + 1)]
tx_hash = tx_hashes[tx_num - self.tx_counts[height - 1]]
else:
file_pos = tx_num * 32
file_num, offset = divmod(file_pos, self.tx_hash_file_size)
filename = 'hashes{:04d}'.format(file_num)
with self.open_file(filename) as f:
f.seek(offset)
tx_hash = f.read(32)
return tx_hash, height
def block_hashes(self, height, count):
headers = self.read_headers(height, count)
# FIXME: move to coins.py
hlen = self.coin.HEADER_LEN
return [double_sha256(header) for header in chunks(headers, hlen)]
@staticmethod
def _resolve_limit(limit):
if limit is None:
@ -143,7 +215,28 @@ class DB(LoggedClass):
hash168 = None
if 0 <= index <= 65535:
idx_packed = struct.pack('<H', index)
hash168 = self.utxo_cache.hash168(tx_hash, idx_packed, False)
if hash168 == NO_CACHE_ENTRY:
hash168 = None
hash168 = self.hash168(tx_hash, idx_packed)
return hash168
def hash168(self, tx_hash, idx_packed):
'''Return the hash168 paid to by the given TXO.
Return None if not found.'''
key = b'h' + tx_hash[:ADDR_TX_HASH_LEN] + idx_packed
data = self.db.get(key)
if data is None:
return None
if len(data) == 25:
return data[:21]
assert len(data) % 25 == 0
# Resolve the compressed key collision using the TX number
for n in range(0, len(data), 25):
tx_num, = struct.unpack('<I', data[n+21:n+25])
my_hash, height = self.get_tx_hash(tx_num)
if my_hash == tx_hash:
return data[n:n+21]
raise self.DBError('could not resolve hash168 collision')

Loading…
Cancel
Save