Browse Source

Improve the leveldb flush; it should be a lot faster now.

More useful logging and stats.
master
Neil Booth 8 years ago
parent
commit
4879422e92
  1. 5
      README.rst
  2. 191
      server/db.py
  3. 15
      server/server.py

5
README.rst

@ -100,9 +100,8 @@ Database Format
===============
The database and metadata formats of ElectrumX are very likely to
change in the future. If so old DBs would not be usable. However it
should be easy to write short Python script to do any necessary
conversions in-place without having to start afresh.
change in the future which will render old DBs unusable. For now I do
not intend to provide converters as the rate of flux is high.
Miscellany

191
server/db.py

@ -17,9 +17,11 @@ import plyvel
from lib.coins import Bitcoin
from lib.script import ScriptPubKey
ADDR_TX_HASH_LEN=6
UTXO_TX_HASH_LEN=4
HIST_ENTRY_LEN=256*4 # Admits 65536 * HIST_ENTRY_LEN/4 entries
# History can hold approx. 65536 * HIST_ENTRIES_PER_KEY entries
HIST_ENTRIES_PER_KEY = 1024
HIST_VALUE_BYTES = HIST_ENTRIES_PER_KEY * 4
ADDR_TX_HASH_LEN = 4
UTXO_TX_HASH_LEN = 4
UTXO = namedtuple("UTXO", "tx_num tx_pos tx_hash height value")
@ -36,6 +38,7 @@ class DB(object):
TIP_KEY = b'tip'
GENESIS_KEY = b'genesis'
TX_COUNT_KEY = b'tx_count'
FLUSH_COUNT_KEY = b'flush_count'
WALL_TIME_KEY = b'wall_time'
class Error(Exception):
@ -59,8 +62,7 @@ class DB(object):
self.writes_avoided = 0
self.read_cache_hits = 0
self.write_cache_hits = 0
self.last_writes = 0
self.last_time = time.time()
self.hcolls = 0
# Things put in a batch are not visible until the batch is written,
# so use a cache.
@ -95,6 +97,8 @@ class DB(object):
self.tx_count = self.db_tx_count
self.height = self.db_height - 1
self.tx_counts.fromfile(self.txcount_file, self.db_height)
self.last_flush = time.time()
# FIXME: this sucks and causes issues with exceptions in init_db()
if self.tx_count == 0:
self.flush()
@ -107,6 +111,7 @@ class DB(object):
def init_db(self):
self.db_height = 0
self.db_tx_count = 0
self.flush_count = 0
self.wall_time = 0
self.tip = self.coin.GENESIS_HASH
self.put(self.GENESIS_KEY, unhexlify(self.tip))
@ -118,12 +123,14 @@ class DB(object):
.format(genesis_hash, self.coin.GENESIS_HASH))
self.db_height = from_4_bytes(self.get(self.HEIGHT_KEY))
self.db_tx_count = from_4_bytes(self.get(self.TX_COUNT_KEY))
self.flush_count = from_4_bytes(self.get(self.FLUSH_COUNT_KEY))
self.wall_time = from_4_bytes(self.get(self.WALL_TIME_KEY))
self.tip = hexlify(self.get(self.TIP_KEY))
self.logger.info('{}/{} height: {:,d} tx count: {:,d} sync time: {}'
self.logger.info('{}/{} height: {:,d} tx count: {:,d} '
'flush count: {:,d} sync time: {}'
.format(self.coin.NAME, self.coin.NET,
self.db_height - 1, self.db_tx_count,
self.formatted_wall_time()))
self.flush_count, self.formatted_wall_time()))
def formatted_wall_time(self):
wall_time = int(self.wall_time)
@ -158,57 +165,43 @@ class DB(object):
else:
self.write_cache[key] = None
def put_state(self):
now = time.time()
self.wall_time += now - self.last_time
self.last_time = now
self.db_tx_count = self.tx_count
self.db_height = self.height + 1
self.put(self.HEIGHT_KEY, to_4_bytes(self.db_height))
self.put(self.TX_COUNT_KEY, to_4_bytes(self.db_tx_count))
self.put(self.TIP_KEY, unhexlify(self.tip))
self.put(self.WALL_TIME_KEY, to_4_bytes(int(self.wall_time)))
def flush(self):
'''Flush out all cached state.'''
flush_start = time.time()
last_flush = self.last_flush
tx_diff = self.tx_count - self.db_tx_count
height_diff = self.height + 1 - self.db_height
self.logger.info('starting flush {:,d} txs and {:,d} blocks'
.format(tx_diff, height_diff))
# Write out the files to the FS before flushing to the DB. If
# the DB transaction fails, the files being too long doesn't
# matter. But if writing the files fails we do not want to
# have updated the DB. This disk flush is fast.
self.write_headers()
self.write_tx_counts()
self.write_tx_hashes()
tx_diff = self.tx_count - self.db_tx_count
height_diff = self.height + 1 - self.db_height
self.logger.info('flushing to levelDB {:,d} txs and {:,d} blocks '
'to height {:,d} tx count: {:,d}'
.format(tx_diff, height_diff, self.height,
self.tx_count))
# This LevelDB flush is slow
deletes = 0
writes = 0
# have updated the DB. Flush state last as it reads the wall
# time.
self.flush_to_fs()
with self.db.write_batch(transaction=True) as batch:
# Flush the state, then the cache, then the history
self.put_state()
for n, (key, value) in enumerate(self.write_cache.items()):
if value is None:
batch.delete(key)
deletes += 1
else:
batch.put(key, value)
writes += 1
if n % 20000 == 0:
pct = n * 100 // len(self.write_cache)
self.logger.info('U {:d} {:d}% done...'.format(n, pct))
self.flush_history()
self.logger.info('flushed. Cache hits: {:,d}/{:,d} writes: {:,d} '
'deletes: {:,d} elided: {:,d} sync: {}'
.format(self.write_cache_hits,
self.read_cache_hits, writes, deletes,
self.writes_avoided,
self.flush_cache(batch)
self.flush_history(batch)
self.logger.info('flushed history...')
self.flush_state(batch)
self.logger.info('committing transaction...')
# Update and put the wall time again - otherwise we drop the
# time it takes leveldb to commit the batch
self.update_wall_time(self.db)
flush_time = int(self.last_flush - flush_start)
self.logger.info('flushed in {:,d}s to height {:,d} tx count {:,d} '
'flush count {:,d}'
.format(flush_time, self.height, self.tx_count,
self.flush_count))
txs_per_sec = int(self.tx_count / self.wall_time)
this_txs_per_sec = int(tx_diff / (self.last_flush - last_flush))
self.logger.info('tx/s since genesis: {:,d} since last flush: {:,d} '
'sync time {}'
.format(txs_per_sec, this_txs_per_sec,
self.formatted_wall_time()))
# Note this preserves semantics and hopefully saves time
@ -217,40 +210,55 @@ class DB(object):
self.writes_avoided = 0
self.read_cache_hits = 0
self.write_cache_hits = 0
self.last_writes = writes
def flush_history(self):
def flush_to_fs(self):
'''Flush the things stored on the filesystem.'''
self.write_headers()
self.write_tx_counts()
self.write_tx_hashes()
os.sync()
def update_wall_time(self, dest):
now = time.time()
self.wall_time += now - self.last_flush
self.last_flush = now
dest.put(self.WALL_TIME_KEY, to_4_bytes(int(self.wall_time)))
def flush_state(self, batch):
self.db_tx_count = self.tx_count
self.db_height = self.height + 1
batch.put(self.HEIGHT_KEY, to_4_bytes(self.db_height))
batch.put(self.TX_COUNT_KEY, to_4_bytes(self.db_tx_count))
batch.put(self.FLUSH_COUNT_KEY, to_4_bytes(self.flush_count))
batch.put(self.TIP_KEY, unhexlify(self.tip))
self.update_wall_time(batch)
self.flush_count += 1
def flush_cache(self, batch):
'''Flushes the UTXO write cache.'''
deletes = writes = 0
for n, (key, value) in enumerate(self.write_cache.items()):
if value is None:
batch.delete(key)
deletes += 1
else:
batch.put(key, value)
writes += 1
self.logger.info('flushed UTXO cache. Hits: {:,d}/{:,d} '
'writes: {:,d} deletes: {:,d} elided: {:,d}'
.format(self.write_cache_hits,
self.read_cache_hits, writes, deletes,
self.writes_avoided))
def flush_history(self, batch):
# Drop any None entry
self.history.pop(None, None)
for m, (hash160, hist) in enumerate(self.history.items()):
prefix = b'H' + hash160
for key, v in self.db.iterator(reverse=True, prefix=prefix,
fill_cache=False):
assert len(key) == 23
v += array.array('I', hist).tobytes()
break
else:
key = prefix + bytes(2)
v = array.array('I', hist).tobytes()
# db.put doesn't accept a memoryview!
self.db.put(key, v[:HIST_ENTRY_LEN])
if len(v) > HIST_ENTRY_LEN:
# must be big-endian
(idx, ) = struct.unpack('>H', key[-2:])
for n in range(HIST_ENTRY_LEN, len(v), HIST_ENTRY_LEN):
idx += 1
key = prefix + struct.pack('>H', idx)
if idx % 500 == 0:
addr = self.coin.P2PKH_address_from_hash160(hash160)
self.logger.info('address {} hist moving to idx {:d}'
.format(addr, idx))
self.db.put(key, v[n:n + HIST_ENTRY_LEN])
if m % 20000 == 0:
pct = m * 100 // len(self.history)
self.logger.info('H {:d} {:d}% done...'.format(m, pct))
flush_id = struct.pack('>H', self.flush_count)
for hash160, hist in self.history.items():
key = b'H' + hash160 + flush_id
batch.put(key, array.array('I', hist).tobytes())
self.history = defaultdict(list)
@ -265,10 +273,11 @@ class DB(object):
self.delete(key)
return data[:20]
# This should almost never happen
assert len(data) % 24 == 0
self.logger.info('hash160 compressed key collision {}'
.format(key.hex()))
self.hcolls += 1
if self.hcolls % 1000 == 0:
self.logger.info('{} total hash160 compressed key collisions'
.format(self.hcolls))
for n in range(0, len(data), 24):
(tx_num, ) = struct.unpack('<I', data[n+20:n+24])
my_hash, height = self.get_tx_hash(tx_num)
@ -292,16 +301,18 @@ class DB(object):
key = (b'u' + hash160 + prevout.hash[:UTXO_TX_HASH_LEN]
+ struct.pack('<H', prevout.n))
data = self.get(key)
if data is None:
self.logger.error('found no UTXO for {} / {:d} key {}'
.format(bytes(reversed(prevout.hash)).hex(),
prevout.n, bytes(key).hex()))
return hash160
if len(data) == 12:
(tx_num, ) = struct.unpack('<I', data[:4])
self.delete(key)
else:
# This should almost never happen
assert len(data) % (4 + 8) == 0
self.logger.info('UTXO compressed key collision at height {:d}, '
'utxo {} / {:d}'
.format(self.height, bytes(reversed(prevout.hash))
.hex(), prevout.n))
for n in range(0, len(data), 12):
(tx_num, ) = struct.unpack('<I', data[n:n+4])
tx_hash, height = self.get_tx_hash(tx_num)

15
server/server.py

@ -6,6 +6,7 @@ import json
import logging
import os
import signal
import time
from functools import partial
import aiohttp
@ -73,7 +74,15 @@ class BlockCache(object):
self.logger.info('catching up, block cache limit {:d}MB...'
.format(self.cache_limit))
last_log = 0
while await self.maybe_prefill():
now = time.time()
if now > last_log + 15:
last_log = now
self.logger.info('prefilled blocks to height {:,d} '
'daemon height: {:,d}'
.format(self.fetched_height,
self.daemon_height))
await asyncio.sleep(1)
if not self.stop:
@ -108,7 +117,6 @@ class BlockCache(object):
if not count or self.stop:
return False # Done catching up
# self.logger.info('requesting {:,d} blocks'.format(count))
first = self.fetched_height + 1
param_lists = [[height] for height in range(first, first + count)]
hashes = await self.rpc.rpc_multi('getblockhash', param_lists)
@ -129,11 +137,6 @@ class BlockCache(object):
# Reverse order and place at front of list
self.blocks = list(reversed(blocks)) + self.blocks
self.logger.info('prefilled {:,d} blocks to height {:,d} '
'daemon height: {:,d} block cache size: {:,d}'
.format(count, self.fetched_height,
self.daemon_height, self.cache_used()))
# Keep 50 most recent block sizes for fetch count estimation
sizes = [len(block) for block in blocks]
self.recent_sizes.extend(sizes)

Loading…
Cancel
Save