Browse Source

Merge branch 'release-0.7.4'

master 0.7.4
Neil Booth 8 years ago
parent
commit
eb869efd96
  1. 6
      docs/ENV-NOTES
  2. 9
      docs/RELEASE-NOTES
  3. 1
      samples/daemontools/env/DAEMON_HOST
  4. 1
      samples/daemontools/env/DAEMON_PASSWORD
  5. 1
      samples/daemontools/env/DAEMON_PORT
  6. 1
      samples/daemontools/env/DAEMON_URL
  7. 1
      samples/daemontools/env/DAEMON_USERNAME
  8. 68
      server/block_processor.py
  9. 14
      server/daemon.py
  10. 4
      server/env.py
  11. 2
      server/version.py

6
docs/ENV-NOTES

@ -82,3 +82,9 @@ UTXO_MB - amount of UTXO and history cache, in MB, to retain before
leveldb caching and Python GC effects. However this may be
very dependent on hardware and you may have different
results.
The following are for debugging purposes.
FORCE_REORG - if set to a positive integer, it will simulate a reorg
of the blockchain for that number of blocks. Do not set
to a value greater than REORG_LIMIT.

9
docs/RELEASE-NOTES

@ -1,3 +1,12 @@
version 0.7.4
-------------
- really fix reorgs, they still triggered an assertion. If you hit a reorg
I believe your DB is fine and all you need to do is restart with updated
software
- introduced a new debug env var FORCE_REORG which I used to simulate a
reorg and confirm they should work
version 0.7.3
-------------

1
samples/daemontools/env/DAEMON_HOST

@ -1 +0,0 @@
192.168.0.1

1
samples/daemontools/env/DAEMON_PASSWORD

@ -1 +0,0 @@
your_daemon's_rpc_password

1
samples/daemontools/env/DAEMON_PORT

@ -1 +0,0 @@
8332

1
samples/daemontools/env/DAEMON_URL

@ -0,0 +1 @@
http://username:password@host:port/

1
samples/daemontools/env/DAEMON_USERNAME

@ -1 +0,0 @@
your_daemon's_rpc_username

68
server/block_processor.py

@ -25,10 +25,6 @@ from lib.util import chunks, formatted_time, LoggedClass
import server.db
from server.storage import open_db
# Limits single address history to ~ 65536 * HIST_ENTRIES_PER_KEY entries
HIST_ENTRIES_PER_KEY = 1024
HIST_VALUE_BYTES = HIST_ENTRIES_PER_KEY * 4
class ChainError(Exception):
pass
@ -62,6 +58,7 @@ class Prefetcher(LoggedClass):
self.queue.get_nowait()
self.queue_size = 0
self.fetched_height = height
self.caught_up = False
async def get_blocks(self):
'''Blocking function that returns prefetched blocks.
@ -146,8 +143,7 @@ class BlockProcessor(server.db.DB):
self.tip = self.db_tip
self.tx_count = self.db_tx_count
self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url), env.debug)
self.daemon.debug_set_height(self.height)
self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url))
self.caught_up = False
self.touched = set()
self.futures = []
@ -190,6 +186,13 @@ class BlockProcessor(server.db.DB):
Safely flushes the DB on clean shutdown.
'''
self.futures.append(asyncio.ensure_future(self.prefetcher.main_loop()))
# Simulate a reorg if requested
if self.env.force_reorg > 0:
self.logger.info('DEBUG: simulating chain reorg of {:,d} blocks'
.format(self.env.force_reorg))
await self.handle_chain_reorg(self.env.force_reorg)
try:
while True:
await self._wait_for_update()
@ -228,7 +231,7 @@ class BlockProcessor(server.db.DB):
self.advance_block(block, self.caught_up)
await asyncio.sleep(0) # Yield
except ChainReorg:
await self.handle_chain_reorg()
await self.handle_chain_reorg(None)
if self.caught_up:
# Flush everything as queries are performed on the DB and
@ -255,24 +258,26 @@ class BlockProcessor(server.db.DB):
Only called for blocks found after first_caught_up is called.
Intended to be overridden in derived classes.'''
async def handle_chain_reorg(self):
# First get all state on disk
async def handle_chain_reorg(self, count):
'''Handle a chain reorganisation.
Count is the number of blocks to simulate a reorg, or None for
a real reorg.'''
self.logger.info('chain reorg detected')
self.flush(True)
self.logger.info('finding common height...')
hashes = await self.reorg_hashes()
hashes = await self.reorg_hashes(count)
# Reverse and convert to hex strings.
hashes = [hash_to_str(hash) for hash in reversed(hashes)]
for hex_hashes in chunks(hashes, 50):
blocks = await self.daemon.raw_blocks(hex_hashes)
self.backup_blocks(blocks)
self.logger.info('backed up to height {:,d}'.format(self.height))
await self.prefetcher.clear(self.height)
self.logger.info('prefetcher reset')
async def reorg_hashes(self):
async def reorg_hashes(self, count):
'''Return the list of hashes to back up beacuse of a reorg.
The hashes are returned in order of increasing height.'''
@ -283,24 +288,27 @@ class BlockProcessor(server.db.DB):
return n
return -1
start = self.height - 1
count = 1
while start > 0:
hashes = self.fs_block_hashes(start, count)
hex_hashes = [hash_to_str(hash) for hash in hashes]
d_hex_hashes = await self.daemon.block_hex_hashes(start, count)
n = match_pos(hex_hashes, d_hex_hashes)
if n >= 0:
start += n + 1
break
count = min(count * 2, start)
start -= count
if count is None:
# A real reorg
start = self.height - 1
count = 1
while start > 0:
hashes = self.fs_block_hashes(start, count)
hex_hashes = [hash_to_str(hash) for hash in hashes]
d_hex_hashes = await self.daemon.block_hex_hashes(start, count)
n = match_pos(hex_hashes, d_hex_hashes)
if n >= 0:
start += n + 1
break
count = min(count * 2, start)
start -= count
# Hashes differ from height 'start'
count = (self.height - start) + 1
count = (self.height - start) + 1
else:
start = (self.height - count) + 1
self.logger.info('chain was reorganised for {:,d} blocks from '
'height {:,d} to height {:,d}'
self.logger.info('chain was reorganised for {:,d} blocks over '
'heights {:,d}-{:,d} inclusive'
.format(count, start, start + count - 1))
return self.fs_block_hashes(start, count)
@ -649,6 +657,7 @@ class BlockProcessor(server.db.DB):
self.tip = prev_hash
assert self.height >= 0
self.height -= 1
self.tx_counts.pop()
self.fs_height = self.height
assert not self.headers
@ -664,6 +673,9 @@ class BlockProcessor(server.db.DB):
# Prevout values, in order down the block (coinbase first if present)
# undo_info is in reverse block order
undo_info = self.read_undo_info(self.height)
if not undo_info:
raise ChainError('no undo information found for height {:,d}'
.format(self.height))
n = len(undo_info)
# Use local vars for speed in the loops

14
server/daemon.py

@ -27,7 +27,7 @@ class Daemon(util.LoggedClass):
class DaemonWarmingUpError(Exception):
'''Raised when the daemon returns an error in its results.'''
def __init__(self, urls, debug):
def __init__(self, urls):
super().__init__()
if not urls:
raise DaemonError('no daemon URLs provided')
@ -36,17 +36,10 @@ class Daemon(util.LoggedClass):
self.urls = urls
self.url_index = 0
self._height = None
self.debug_caught_up = 'caught_up' in debug
# Limit concurrent RPC calls to this number.
# See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16
self.workqueue_semaphore = asyncio.Semaphore(value=10)
def debug_set_height(self, height):
if self.debug_caught_up:
self.logger.info('pretending to have caught up to height {}'
.format(height))
self._height = height
async def _send(self, payload, processor):
'''Send a payload to be converted to JSON.
@ -157,8 +150,6 @@ class Daemon(util.LoggedClass):
async def mempool_hashes(self):
'''Return the hashes of the txs in the daemon's mempool.'''
if self.debug_caught_up:
return []
return await self._send_single('getrawmempool')
async def estimatefee(self, params):
@ -191,8 +182,7 @@ class Daemon(util.LoggedClass):
async def height(self):
'''Query the daemon for its current height.'''
if not self.debug_caught_up:
self._height = await self._send_single('getblockcount')
self._height = await self._send_single('getblockcount')
return self._height
def cached_height(self):

4
server/env.py

@ -44,8 +44,6 @@ class Env(LoggedClass):
# The electrum client takes the empty string as unspecified
self.donation_address = self.default('DONATION_ADDRESS', '')
self.db_engine = self.default('DB_ENGINE', 'leveldb')
self.debug = self.default('DEBUG', '')
self.debug = [item.lower() for item in self.debug.split()]
# Subscription limits
self.max_subs = self.integer('MAX_SUBS', 250000)
self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000)
@ -55,6 +53,8 @@ class Env(LoggedClass):
self.report_host = self.default('REPORT_HOST', self.host)
self.irc_nick = self.default('IRC_NICK', None)
self.irc = self.default('IRC', False)
# Debugging
self.force_reorg = self.integer('FORCE_REORG', 0)
def default(self, envvar, default):
return environ.get(envvar, default)

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.7.3"
VERSION = "ElectrumX 0.7.4"

Loading…
Cancel
Save