Browse Source

Merge branch 'release-0.7.12'

master 0.7.12
Neil Booth 8 years ago
parent
commit
c34c3f493d
  1. 11
      RELEASE-NOTES
  2. 4
      lib/jsonrpc.py
  3. 256
      server/block_processor.py
  4. 63
      server/db.py
  5. 6
      server/protocol.py
  6. 35
      server/storage.py
  7. 2
      server/version.py
  8. 4
      tests/test_storage.py

11
RELEASE-NOTES

@ -1,3 +1,14 @@
version 0.7.12
--------------
- minor bug fixes: 2 in JSON RPC, 1 in get_utxos (affected addresslistunspent)
- leveldb / rocksdb are opened with a different maximum open files limit,
depending on whether the chain has been fully synced or not. If synced
you want the files for network sockets, if not synced for the DB engines.
Once synced the DB will be reopened with the lower limit to free up the
files for serving network connections
- various refactoring preparing for possible asynchronous block processing
version 0.7.11
--------------

4
lib/jsonrpc.py

@ -277,7 +277,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
except RCPError:
pass
else:
self.handle_notification(method, params)
await self.handle_notification(method, params)
return None
async def json_request(self, message):
@ -297,7 +297,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
await self.handle_response(message['result'], None, message['id'])
return None
def raise_unknown_method(method):
def raise_unknown_method(self, method):
'''Respond to a request with an unknown method.'''
raise self.RPCError('unknown method: "{}"'.format(method),
self.METHOD_NOT_FOUND)

256
server/block_processor.py

@ -23,24 +23,22 @@ from lib.util import chunks, formatted_time, LoggedClass
import server.db
class ChainError(Exception):
pass
class Prefetcher(LoggedClass):
'''Prefetches blocks (in the forward direction only).'''
def __init__(self, daemon, height):
def __init__(self, tasks, daemon, height):
super().__init__()
self.tasks = tasks
self.daemon = daemon
self.semaphore = asyncio.Semaphore()
self.queue = asyncio.Queue()
self.queue_size = 0
self.caught_up = False
self.fetched_height = height
# A list of (blocks, size) pairs. Earliest last.
self.cache = []
self.cache_size = 0
# Target cache size. Has little effect on sync time.
self.target_cache_size = 10 * 1024 * 1024
# First fetch to be 10 blocks
# This makes the first fetch be 10 blocks
self.ave_size = self.target_cache_size // 10
async def clear(self, height):
@ -53,19 +51,19 @@ class Prefetcher(LoggedClass):
with await self.semaphore:
while not self.queue.empty():
self.queue.get_nowait()
self.queue_size = 0
self.cache = []
self.cache_size = 0
self.fetched_height = height
self.caught_up = False
async def get_blocks(self):
'''Blocking function that returns prefetched blocks.
self.logger.info('reset to height'.format(height))
The returned result empty just once - when the prefetcher
has caught up with the daemon.
'''
blocks, size = await self.queue.get()
self.queue_size -= size
return blocks
def get_blocks(self):
'''Return the next list of blocks from our prefetch cache.'''
# Cache might be empty after a clear()
if self.cache:
blocks, size = self.cache.pop()
self.cache_size -= size
return blocks
return []
async def main_loop(self):
'''Loop forever polling for more blocks.'''
@ -73,9 +71,13 @@ class Prefetcher(LoggedClass):
.format(await self.daemon.height()))
while True:
try:
with await self.semaphore:
await self._prefetch()
await asyncio.sleep(5 if self.caught_up else 0)
secs = 0
if self.cache_size < self.target_cache_size:
if not await self._prefetch():
self.caught_up = True
secs = 5
self.tasks.put_nowait(None)
await asyncio.sleep(secs)
except DaemonError as e:
self.logger.info('ignoring daemon error: {}'.format(e))
except asyncio.CancelledError:
@ -83,41 +85,41 @@ class Prefetcher(LoggedClass):
async def _prefetch(self):
'''Prefetch blocks unless the prefetch queue is full.'''
if self.queue_size >= self.target_cache_size:
return
daemon_height = await self.daemon.height()
cache_room = self.target_cache_size // self.ave_size
# Try and catch up all blocks but limit to room in cache.
# Constrain count to between 0 and 4000 regardless
count = min(daemon_height - self.fetched_height, cache_room)
count = min(4000, max(count, 0))
if not count:
# Indicate when we have caught up for the first time only
if not self.caught_up:
self.caught_up = True
self.queue.put_nowait(([], 0))
return
with await self.semaphore:
# Try and catch up all blocks but limit to room in cache.
# Constrain count to between 0 and 4000 regardless
count = min(daemon_height - self.fetched_height, cache_room)
count = min(4000, max(count, 0))
if not count:
return 0
first = self.fetched_height + 1
hex_hashes = await self.daemon.block_hex_hashes(first, count)
if self.caught_up:
self.logger.info('new block height {:,d} hash {}'
.format(first + count - 1, hex_hashes[-1]))
blocks = await self.daemon.raw_blocks(hex_hashes)
first = self.fetched_height + 1
hex_hashes = await self.daemon.block_hex_hashes(first, count)
if self.caught_up:
self.logger.info('new block height {:,d} hash {}'
.format(first + count - 1, hex_hashes[-1]))
blocks = await self.daemon.raw_blocks(hex_hashes)
size = sum(len(block) for block in blocks)
# Update our recent average block size estimate
if count >= 10:
self.ave_size = size // count
else:
self.ave_size = (size + (10 - count) * self.ave_size) // 10
size = sum(len(block) for block in blocks)
# Update our recent average block size estimate
if count >= 10:
self.ave_size = size // count
else:
self.ave_size = (size + (10 - count) * self.ave_size) // 10
self.fetched_height += len(blocks)
self.queue.put_nowait((blocks, size))
self.queue_size += size
self.cache.insert(0, (blocks, size))
self.cache_size += size
self.fetched_height += len(blocks)
return count
class ChainError(Exception):
'''Raised on error processing blocks.'''
class ChainReorg(Exception):
'''Raised on a blockchain reorganisation.'''
@ -135,6 +137,9 @@ class BlockProcessor(server.db.DB):
self.client = client
# The block processor reads its tasks from this queue
self.tasks = asyncio.Queue()
# These are our state as we move ahead of DB state
self.fs_height = self.db_height
self.fs_tx_count = self.db_tx_count
@ -144,6 +149,7 @@ class BlockProcessor(server.db.DB):
self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url))
self.caught_up = False
self._shutdown = False
self.event = asyncio.Event()
# Meta
@ -154,7 +160,7 @@ class BlockProcessor(server.db.DB):
# Headers and tx_hashes have one entry per block
self.history = defaultdict(partial(array.array, 'I'))
self.history_size = 0
self.prefetcher = Prefetcher(self.daemon, self.height)
self.prefetcher = Prefetcher(self.tasks, self.daemon, self.height)
self.last_flush = time.time()
self.last_flush_tx_count = self.tx_count
@ -176,33 +182,33 @@ class BlockProcessor(server.db.DB):
async def main_loop(self):
'''Main loop for block processing.'''
try:
# Simulate a reorg if requested
if self.env.force_reorg > 0:
self.logger.info('DEBUG: simulating reorg of {:,d} blocks'
.format(self.env.force_reorg))
await self.handle_chain_reorg(self.env.force_reorg, set())
while True:
await self._wait_for_update()
except asyncio.CancelledError:
pass
async def shutdown(self):
'''Shut down the DB cleanly.'''
self.logger.info('flushing state to DB for clean shutdown...')
self.flush(True)
async def _wait_for_update(self):
'''Wait for the prefetcher to deliver blocks.
# Simulate a reorg if requested
if self.env.force_reorg > 0:
self.logger.info('DEBUG: simulating reorg of {:,d} blocks'
.format(self.env.force_reorg))
await self.handle_chain_reorg(set(), self.env.force_reorg)
Blocks are only processed in the forward direction.
'''
blocks = await self.prefetcher.get_blocks()
if not blocks:
self.first_caught_up()
return
while True:
task = await self.tasks.get()
if self._shutdown:
break
blocks = self.prefetcher.get_blocks()
if blocks:
await self.advance_blocks(blocks)
elif not self.caught_up:
self.caught_up = True
self.first_caught_up()
self.flush(True)
def shutdown(self):
'''Call to shut down the block processor.'''
self.logger.info('flushing state to DB for clean shutdown...')
self._shutdown = True
self.tasks.put_nowait(None)
async def advance_blocks(self, blocks):
'''Strip the unspendable genesis coinbase.'''
if self.height == -1:
blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1)
@ -213,11 +219,12 @@ class BlockProcessor(server.db.DB):
self.advance_block(block, touched)
await asyncio.sleep(0) # Yield
except ChainReorg:
await self.handle_chain_reorg(None, touched)
await self.handle_chain_reorg(touched)
if self.caught_up:
# Flush everything as queries are performed on the DB and
# not in-memory.
await asyncio.sleep(0)
self.flush(True)
self.client.notify(touched)
elif time.time() > self.next_cache_check:
@ -225,23 +232,23 @@ class BlockProcessor(server.db.DB):
self.next_cache_check = time.time() + 60
def first_caught_up(self):
'''Called when first caught up after start, or after a reorg.'''
self.caught_up = True
'''Called when first caught up after starting.'''
self.flush(True)
if self.first_sync:
self.logger.info('{} synced to height {:,d}'
.format(VERSION, self.height))
self.first_sync = False
self.logger.info('{} synced to height {:,d}. DB version:'
.format(VERSION, self.height, self.db_version))
self.flush(True)
self.flush_state(self.db)
self.reopen_db(False)
self.event.set()
async def handle_chain_reorg(self, count, touched):
async def handle_chain_reorg(self, touched, count=None):
'''Handle a chain reorganisation.
Count is the number of blocks to simulate a reorg, or None for
a real reorg.'''
self.logger.info('chain reorg detected')
self.flush(True)
self.logger.info('finding common height...')
hashes = await self.reorg_hashes(count)
# Reverse and convert to hex strings.
@ -251,7 +258,6 @@ class BlockProcessor(server.db.DB):
self.backup_blocks(blocks, touched)
await self.prefetcher.clear(self.height)
self.logger.info('prefetcher reset')
async def reorg_hashes(self, count):
'''Return the list of hashes to back up beacuse of a reorg.
@ -305,12 +311,11 @@ class BlockProcessor(server.db.DB):
assert not self.utxo_cache
assert not self.db_deletes
def flush(self, flush_utxos=False, flush_history=None):
def flush(self, flush_utxos=False):
'''Flush out cached state.
History is always flushed. UTXOs are flushed if flush_utxos.'''
if self.height == self.db_height:
assert flush_history is None
self.assert_flushed()
return
@ -319,14 +324,10 @@ class BlockProcessor(server.db.DB):
last_flush = self.last_flush
tx_diff = self.tx_count - self.last_flush_tx_count
if self.height > self.db_height:
assert flush_history is None
flush_history = self.flush_history
with self.db.write_batch() as batch:
# History first - fast and frees memory. Flush state last
# as it reads the wall time.
flush_history(batch)
self.flush_history(batch)
if flush_utxos:
self.flush_utxos(batch)
self.flush_state(batch)
@ -394,12 +395,36 @@ class BlockProcessor(server.db.DB):
self.tx_hashes = []
self.headers = []
def backup_history(self, batch, hash168s):
self.logger.info('backing up history to height {:,d} tx_count {:,d}'
.format(self.height, self.tx_count))
def backup_flush(self, hash168s):
'''Like flush() but when backing up. All UTXOs are flushed.
hash168s - sequence of hash168s which were touched by backing
up. Searched for history entries to remove after the backup
height.
'''
assert self.height < self.db_height
assert not self.history
self.flush_count += 1
flush_start = time.time()
with self.db.write_batch() as batch:
# Flush state last as it reads the wall time.
self.backup_history(batch, hash168s)
self.flush_utxos(batch)
self.flush_state(batch)
# Update and put the wall time again - otherwise we drop the
# time it took to commit the batch
self.flush_state(self.db)
self.logger.info('backup flush #{:,d} took {:.1f}s. '
'Height {:,d} txs: {:,d}'
.format(self.flush_count,
self.last_flush - flush_start,
self.height, self.tx_count))
def backup_history(self, batch, hash168s):
nremoves = 0
for hash168 in sorted(hash168s):
prefix = b'H' + hash168
@ -426,8 +451,8 @@ class BlockProcessor(server.db.DB):
assert not self.headers
assert not self.tx_hashes
self.logger.info('removed {:,d} history entries from {:,d} addresses'
.format(nremoves, len(hash168s)))
self.logger.info('backing up removed {:,d} history entries from '
'{:,d} addresses'.format(nremoves, len(hash168s)))
def check_cache_size(self):
'''Flush a cache if it gets too big.'''
@ -462,9 +487,6 @@ class BlockProcessor(server.db.DB):
self.tx_counts.append(prior_tx_count + len(txs))
def advance_block(self, block, touched):
# We must update the FS cache before calling advance_txs() as
# the UTXO cache uses the FS cache via get_tx_hash() to
# resolve compressed key collisions
header, tx_hashes, txs = self.coin.read_block(block)
if self.tip != self.coin.header_prevhash(header):
raise ChainReorg
@ -477,43 +499,47 @@ class BlockProcessor(server.db.DB):
self.write_undo_info(self.height, b''.join(undo_info))
def advance_txs(self, tx_hashes, txs, touched):
put_utxo = self.utxo_cache.__setitem__
spend_utxo = self.spend_utxo
undo_info = []
# Use local vars for speed in the loops
history = self.history
history_size = self.history_size
tx_num = self.tx_count
script_hash168 = self.coin.hash168_from_script()
s_pack = pack
put_utxo = self.utxo_cache.__setitem__
spend_utxo = self.spend_utxo
undo_info_append = undo_info.append
for tx, tx_hash in zip(txs, tx_hashes):
hash168s = set()
add_hash168 = hash168s.add
tx_numb = s_pack('<I', tx_num)
# Spend the inputs
if not tx.is_coinbase:
for txin in tx.inputs:
cache_value = spend_utxo(txin.prev_hash, txin.prev_idx)
undo_info.append(cache_value)
hash168s.add(cache_value[:21])
undo_info_append(cache_value)
add_hash168(cache_value[:21])
# Add the new UTXOs
for idx, txout in enumerate(tx.outputs):
# Get the hash168. Ignore unspendable outputs
hash168 = script_hash168(txout.pk_script)
if hash168:
hash168s.add(hash168)
add_hash168(hash168)
put_utxo(tx_hash + s_pack('<H', idx),
hash168 + tx_numb + s_pack('<Q', txout.value))
for hash168 in hash168s:
history[hash168].append(tx_num)
self.history_size += len(hash168s)
history_size += len(hash168s)
touched.update(hash168s)
tx_num += 1
self.tx_count = tx_num
self.history_size = history_size
return undo_info
@ -523,7 +549,6 @@ class BlockProcessor(server.db.DB):
The blocks should be in order of decreasing height.
A flush is performed once the blocks are backed up.
'''
self.logger.info('backing up {:,d} blocks'.format(len(blocks)))
self.assert_flushed()
for block in blocks:
@ -544,8 +569,7 @@ class BlockProcessor(server.db.DB):
# touched includes those passed into this function. That will
# generally be empty but is harmless if not.
flush_history = partial(self.backup_history, hash168s=touched)
self.flush(True, flush_history=flush_history)
self.backup_flush(touched)
def backup_txs(self, tx_hashes, txs, touched):
# Prevout values, in order down the block (coinbase first if present)
@ -665,8 +689,9 @@ class BlockProcessor(server.db.DB):
if len(candidates) > 1:
tx_num, = unpack('<I', tx_num_packed)
hash, height = self.get_tx_hash(tx_num)
hash, height = self.fs_tx_hash(tx_num)
if hash != tx_hash:
assert hash is not None # Should always be found
continue
# Key: b'u' + address_hash168 + tx_idx + tx_num
@ -716,14 +741,3 @@ class BlockProcessor(server.db.DB):
self.db_tx_count = self.tx_count
self.db_height = self.height
self.db_tip = self.tip
def get_tx_hash(self, tx_num):
'''Returns the tx_hash and height of a tx number.'''
tx_hash, tx_height = self.fs_tx_hash(tx_num)
# Is this unflushed?
if tx_hash is None:
tx_hashes = self.tx_hashes[tx_height - (self.fs_height + 1)]
tx_hash = tx_hashes[tx_num - self.tx_counts[tx_height - 1]]
return tx_hash, tx_height

63
server/db.py

@ -50,16 +50,8 @@ class DB(LoggedClass):
self.logger.info('reorg limit is {:,d} blocks'
.format(self.env.reorg_limit))
# Open DB and metadata files. Record some of its state.
db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET)
self.db = open_db(db_name, env.db_engine)
if self.db.is_new:
self.logger.info('created new {} database {}'
.format(env.db_engine, db_name))
else:
self.logger.info('successfully opened {} database {}'
.format(env.db_engine, db_name))
self.read_state()
self.db = None
self.reopen_db(True)
create = self.db_height == -1
self.headers_file = self.open_file('headers', create)
@ -77,6 +69,43 @@ class DB(LoggedClass):
assert self.db_tx_count == 0
self.clean_db()
def reopen_db(self, first_sync):
'''Open the database. If the database is already open, it is
closed (implicitly via GC) and re-opened.
Re-open to set the maximum number of open files appropriately.
'''
if self.db:
self.logger.info('closing DB to re-open')
self.db.close()
max_open_files = 1024 if first_sync else 256
# Open DB and metadata files. Record some of its state.
db_name = '{}-{}'.format(self.coin.NAME, self.coin.NET)
self.db = open_db(db_name, self.env.db_engine, max_open_files)
if self.db.is_new:
self.logger.info('created new {} database {}'
.format(self.env.db_engine, db_name))
else:
self.logger.info('successfully opened {} database {} for sync: {}'
.format(self.env.db_engine, db_name, first_sync))
self.read_state()
if self.first_sync == first_sync:
self.logger.info('software version: {}'.format(VERSION))
self.logger.info('DB version: {:d}'.format(self.db_version))
self.logger.info('coin: {}'.format(self.coin.NAME))
self.logger.info('network: {}'.format(self.coin.NET))
self.logger.info('height: {:,d}'.format(self.db_height))
self.logger.info('tip: {}'.format(hash_to_str(self.db_tip)))
self.logger.info('tx count: {:,d}'.format(self.db_tx_count))
if self.first_sync:
self.logger.info('sync time so far: {}'
.format(formatted_time(self.wall_time)))
else:
self.reopen_db(self.first_sync)
def read_state(self):
if self.db.is_new:
self.db_height = -1
@ -110,16 +139,6 @@ class DB(LoggedClass):
self.wall_time = state['wall_time']
self.first_sync = state['first_sync']
self.logger.info('software version: {}'.format(VERSION))
self.logger.info('DB version: {:d}'.format(self.db_version))
self.logger.info('coin: {}'.format(self.coin.NAME))
self.logger.info('network: {}'.format(self.coin.NET))
self.logger.info('height: {:,d}'.format(self.db_height))
self.logger.info('tip: {}'.format(hash_to_str(self.db_tip)))
self.logger.info('tx count: {:,d}'.format(self.db_tx_count))
if self.first_sync:
self.logger.info('sync time so far: {}'
.format(formatted_time(self.wall_time)))
if self.flush_count < self.utxo_flush_count:
raise self.DBError('DB corrupt: flush_count < utxo_flush_count')
@ -251,8 +270,6 @@ class DB(LoggedClass):
cursor += size
file_pos += size
os.sync()
def read_headers(self, start, count):
'''Requires count >= 0.'''
# Read some from disk
@ -332,7 +349,7 @@ class DB(LoggedClass):
if limit == 0:
return
limit -= 1
tx_num, tx_pos = s_unpack('<HI', db_key[-6:])
tx_pos, tx_num = s_unpack('<HI', db_key[-6:])
value, = unpack('<Q', db_value)
tx_hash, height = self.fs_tx_hash(tx_num)
yield UTXO(tx_num, tx_pos, tx_hash, height, value)

6
server/protocol.py

@ -254,6 +254,7 @@ class ServerManager(util.LoggedClass):
def add_future(coro):
self.futures.append(asyncio.ensure_future(coro))
# shutdown() assumes bp.main_loop() is first
add_future(self.bp.main_loop())
add_future(self.bp.prefetcher.main_loop())
add_future(self.mempool.main_loop(self.bp.event))
@ -316,7 +317,9 @@ class ServerManager(util.LoggedClass):
async def shutdown(self):
'''Call to shutdown the servers. Returns when done.'''
for future in self.futures:
self.bp.shutdown()
# Don't cancel the block processor main loop - let it close itself
for future in self.futures[1:]:
future.cancel()
for server in self.servers:
server.close()
@ -326,7 +329,6 @@ class ServerManager(util.LoggedClass):
await asyncio.sleep(0)
if self.sessions:
await self.close_sessions()
await self.bp.shutdown()
async def close_sessions(self, secs=60):
self.logger.info('cleanly closing client sessions, please wait...')

35
server/storage.py

@ -16,12 +16,12 @@ from functools import partial
from lib.util import subclasses, increment_byte_string
def open_db(name, db_engine):
def open_db(name, db_engine, for_sync):
'''Returns a database handle.'''
for db_class in subclasses(Storage):
if db_class.__name__.lower() == db_engine.lower():
db_class.import_module()
return db_class(name)
return db_class(name, for_sync)
raise RuntimeError('unrecognised DB engine "{}"'.format(db_engine))
@ -29,9 +29,9 @@ def open_db(name, db_engine):
class Storage(object):
'''Abstract base class of the DB backend abstraction.'''
def __init__(self, name):
def __init__(self, name, for_sync):
self.is_new = not os.path.exists(name)
self.open(name, create=self.is_new)
self.open(name, create=self.is_new, for_sync=for_sync)
@classmethod
def import_module(cls):
@ -42,6 +42,10 @@ class Storage(object):
'''Open an existing database or create a new one.'''
raise NotImplementedError
def close(self):
'''Close an existing database.'''
raise NotImplementedError
def get(self, key):
raise NotImplementedError
@ -75,9 +79,11 @@ class LevelDB(Storage):
import plyvel
cls.module = plyvel
def open(self, name, create):
def open(self, name, create, for_sync):
mof = 1024 if for_sync else 256
self.db = self.module.DB(name, create_if_missing=create,
max_open_files=256, compression=None)
max_open_files=mof, compression=None)
self.close = self.db.close
self.get = self.db.get
self.put = self.db.put
self.iterator = self.db.iterator
@ -92,18 +98,25 @@ class RocksDB(Storage):
import rocksdb
cls.module = rocksdb
def open(self, name, create):
def open(self, name, create, for_sync):
mof = 1024 if for_sync else 256
compression = "no"
compression = getattr(self.module.CompressionType,
compression + "_compression")
options = self.module.Options(create_if_missing=create,
compression=compression,
target_file_size_base=33554432,
max_open_files=1024)
max_open_files=mof)
self.db = self.module.DB(name, options)
self.get = self.db.get
self.put = self.db.put
def close(self):
# PyRocksDB doesn't provide a close method; hopefully this is enough
self.db = None
import gc
gc.collect()
class WriteBatch(object):
def __init__(self, db):
self.batch = RocksDB.module.WriteBatch()
@ -157,11 +170,15 @@ class LMDB(Storage):
import lmdb
cls.module = lmdb
def open(self, name, create):
def open(self, name, create, for_sync):
# I don't see anything equivalent to max_open_files for for_sync
self.env = LMDB.module.Environment('.', subdir=True, create=create,
max_dbs=32, map_size=5 * 10 ** 10)
self.db = self.env.open_db(create=create)
def close(self):
self.env.close()
def get(self, key):
with self.env.begin(db=self.db) as tx:
return tx.get(key)

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.7.11"
VERSION = "ElectrumX 0.7.12"

4
tests/test_storage.py

@ -21,7 +21,7 @@ for c in subclasses(Storage):
def db(tmpdir, request):
cwd = os.getcwd()
os.chdir(str(tmpdir))
db = open_db("db", request.param)
db = open_db("db", request.param, False)
os.chdir(cwd)
yield db
# Make sure all the locks and handles are closed
@ -66,4 +66,4 @@ def test_iterator_reverse(db):
assert list(db.iterator(prefix=b"abc", reverse=True)) == [
(b"abc" + str.encode(str(i)), str.encode(str(i))) for
i in reversed(range(5))
]
]

Loading…
Cancel
Save