|
|
@ -58,6 +58,7 @@ class Prefetcher(LoggedClass): |
|
|
|
self.queue.get_nowait() |
|
|
|
self.queue_size = 0 |
|
|
|
self.fetched_height = height |
|
|
|
self.caught_up = False |
|
|
|
|
|
|
|
async def get_blocks(self): |
|
|
|
'''Blocking function that returns prefetched blocks. |
|
|
@ -185,6 +186,13 @@ class BlockProcessor(server.db.DB): |
|
|
|
Safely flushes the DB on clean shutdown. |
|
|
|
''' |
|
|
|
self.futures.append(asyncio.ensure_future(self.prefetcher.main_loop())) |
|
|
|
|
|
|
|
# Simulate a reorg if requested |
|
|
|
if self.env.force_reorg > 0: |
|
|
|
self.logger.info('DEBUG: simulating chain reorg of {:,d} blocks' |
|
|
|
.format(self.env.force_reorg)) |
|
|
|
await self.handle_chain_reorg(self.env.force_reorg) |
|
|
|
|
|
|
|
try: |
|
|
|
while True: |
|
|
|
await self._wait_for_update() |
|
|
@ -223,7 +231,7 @@ class BlockProcessor(server.db.DB): |
|
|
|
self.advance_block(block, self.caught_up) |
|
|
|
await asyncio.sleep(0) # Yield |
|
|
|
except ChainReorg: |
|
|
|
await self.handle_chain_reorg() |
|
|
|
await self.handle_chain_reorg(None) |
|
|
|
|
|
|
|
if self.caught_up: |
|
|
|
# Flush everything as queries are performed on the DB and |
|
|
@ -250,24 +258,26 @@ class BlockProcessor(server.db.DB): |
|
|
|
Only called for blocks found after first_caught_up is called. |
|
|
|
Intended to be overridden in derived classes.''' |
|
|
|
|
|
|
|
async def handle_chain_reorg(self): |
|
|
|
# First get all state on disk |
|
|
|
async def handle_chain_reorg(self, count): |
|
|
|
'''Handle a chain reorganisation. |
|
|
|
|
|
|
|
Count is the number of blocks to simulate a reorg, or None for |
|
|
|
a real reorg.''' |
|
|
|
self.logger.info('chain reorg detected') |
|
|
|
self.flush(True) |
|
|
|
self.logger.info('finding common height...') |
|
|
|
|
|
|
|
hashes = await self.reorg_hashes() |
|
|
|
hashes = await self.reorg_hashes(count) |
|
|
|
# Reverse and convert to hex strings. |
|
|
|
hashes = [hash_to_str(hash) for hash in reversed(hashes)] |
|
|
|
for hex_hashes in chunks(hashes, 50): |
|
|
|
blocks = await self.daemon.raw_blocks(hex_hashes) |
|
|
|
self.backup_blocks(blocks) |
|
|
|
|
|
|
|
self.logger.info('backed up to height {:,d}'.format(self.height)) |
|
|
|
await self.prefetcher.clear(self.height) |
|
|
|
self.logger.info('prefetcher reset') |
|
|
|
|
|
|
|
async def reorg_hashes(self): |
|
|
|
async def reorg_hashes(self, count): |
|
|
|
'''Return the list of hashes to back up beacuse of a reorg. |
|
|
|
|
|
|
|
The hashes are returned in order of increasing height.''' |
|
|
@ -278,24 +288,27 @@ class BlockProcessor(server.db.DB): |
|
|
|
return n |
|
|
|
return -1 |
|
|
|
|
|
|
|
start = self.height - 1 |
|
|
|
count = 1 |
|
|
|
while start > 0: |
|
|
|
hashes = self.fs_block_hashes(start, count) |
|
|
|
hex_hashes = [hash_to_str(hash) for hash in hashes] |
|
|
|
d_hex_hashes = await self.daemon.block_hex_hashes(start, count) |
|
|
|
n = match_pos(hex_hashes, d_hex_hashes) |
|
|
|
if n >= 0: |
|
|
|
start += n + 1 |
|
|
|
break |
|
|
|
count = min(count * 2, start) |
|
|
|
start -= count |
|
|
|
if count is None: |
|
|
|
# A real reorg |
|
|
|
start = self.height - 1 |
|
|
|
count = 1 |
|
|
|
while start > 0: |
|
|
|
hashes = self.fs_block_hashes(start, count) |
|
|
|
hex_hashes = [hash_to_str(hash) for hash in hashes] |
|
|
|
d_hex_hashes = await self.daemon.block_hex_hashes(start, count) |
|
|
|
n = match_pos(hex_hashes, d_hex_hashes) |
|
|
|
if n >= 0: |
|
|
|
start += n + 1 |
|
|
|
break |
|
|
|
count = min(count * 2, start) |
|
|
|
start -= count |
|
|
|
|
|
|
|
# Hashes differ from height 'start' |
|
|
|
count = (self.height - start) + 1 |
|
|
|
count = (self.height - start) + 1 |
|
|
|
else: |
|
|
|
start = (self.height - count) + 1 |
|
|
|
|
|
|
|
self.logger.info('chain was reorganised for {:,d} blocks from ' |
|
|
|
'height {:,d} to height {:,d}' |
|
|
|
self.logger.info('chain was reorganised for {:,d} blocks over ' |
|
|
|
'heights {:,d}-{:,d} inclusive' |
|
|
|
.format(count, start, start + count - 1)) |
|
|
|
|
|
|
|
return self.fs_block_hashes(start, count) |
|
|
@ -660,6 +673,9 @@ class BlockProcessor(server.db.DB): |
|
|
|
# Prevout values, in order down the block (coinbase first if present) |
|
|
|
# undo_info is in reverse block order |
|
|
|
undo_info = self.read_undo_info(self.height) |
|
|
|
if not undo_info: |
|
|
|
raise ChainError('no undo information found for height {:,d}' |
|
|
|
.format(self.height)) |
|
|
|
n = len(undo_info) |
|
|
|
|
|
|
|
# Use local vars for speed in the loops |
|
|
|