Browse Source

Tasks queue just has null entries for now

master
Neil Booth 8 years ago
parent
commit
5c0b026158
  1. 49
      server/block_processor.py

49
server/block_processor.py

@ -23,14 +23,6 @@ from lib.util import chunks, formatted_time, LoggedClass
import server.db import server.db
# Tasks placed on task queue
BLOCKS, CAUGHT_UP = range(2)
class ChainError(Exception):
pass
class Prefetcher(LoggedClass): class Prefetcher(LoggedClass):
'''Prefetches blocks (in the forward direction only).''' '''Prefetches blocks (in the forward direction only).'''
@ -81,12 +73,10 @@ class Prefetcher(LoggedClass):
try: try:
secs = 0 secs = 0
if self.cache_size < self.target_cache_size: if self.cache_size < self.target_cache_size:
if await self._prefetch(): if not await self._prefetch():
self.tasks.put_nowait(BLOCKS)
else:
self.tasks.put_nowait(CAUGHT_UP)
self.caught_up = True self.caught_up = True
secs = 5 secs = 5
self.tasks.put_nowait(None)
await asyncio.sleep(secs) await asyncio.sleep(secs)
except DaemonError as e: except DaemonError as e:
self.logger.info('ignoring daemon error: {}'.format(e)) self.logger.info('ignoring daemon error: {}'.format(e))
@ -128,6 +118,9 @@ class Prefetcher(LoggedClass):
return count return count
class ChainError(Exception):
'''Raised on error processing blocks.'''
class ChainReorg(Exception): class ChainReorg(Exception):
'''Raised on a blockchain reorganisation.''' '''Raised on a blockchain reorganisation.'''
@ -194,17 +187,16 @@ class BlockProcessor(server.db.DB):
if self.env.force_reorg > 0: if self.env.force_reorg > 0:
self.logger.info('DEBUG: simulating reorg of {:,d} blocks' self.logger.info('DEBUG: simulating reorg of {:,d} blocks'
.format(self.env.force_reorg)) .format(self.env.force_reorg))
await self.handle_chain_reorg(self.env.force_reorg, set()) await self.handle_chain_reorg(set(), self.env.force_reorg)
while True: while True:
task = await self.tasks.get() task = await self.tasks.get()
if self._shutdown: if self._shutdown:
break break
if task == BLOCKS: blocks = self.prefetcher.get_blocks()
await self.advance_blocks() if blocks:
else: await self.advance_blocks(blocks)
assert task == CAUGHT_UP elif not self.caught_up:
if not self.caught_up:
self.caught_up = True self.caught_up = True
self.first_caught_up() self.first_caught_up()
@ -214,13 +206,9 @@ class BlockProcessor(server.db.DB):
'''Call to shut down the block processor.''' '''Call to shut down the block processor.'''
self.logger.info('flushing state to DB for clean shutdown...') self.logger.info('flushing state to DB for clean shutdown...')
self._shutdown = True self._shutdown = True
# Ensure we don't sit waiting for a task self.tasks.put_nowait(None)
self.tasks.put_nowait(BLOCKS)
async def advance_blocks(self):
'''Take blocks from the prefetcher and process them.'''
blocks = self.prefetcher.get_blocks()
async def advance_blocks(self, blocks):
'''Strip the unspendable genesis coinbase.''' '''Strip the unspendable genesis coinbase.'''
if self.height == -1: if self.height == -1:
blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1) blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1)
@ -231,7 +219,7 @@ class BlockProcessor(server.db.DB):
self.advance_block(block, touched) self.advance_block(block, touched)
await asyncio.sleep(0) # Yield await asyncio.sleep(0) # Yield
except ChainReorg: except ChainReorg:
await self.handle_chain_reorg(None, touched) await self.handle_chain_reorg(touched)
if self.caught_up: if self.caught_up:
# Flush everything as queries are performed on the DB and # Flush everything as queries are performed on the DB and
@ -243,18 +231,17 @@ class BlockProcessor(server.db.DB):
self.next_cache_check = time.time() + 60 self.next_cache_check = time.time() + 60
def first_caught_up(self): def first_caught_up(self):
'''Called when first caught up after start, or after a reorg.''' '''Called when first caught up after starting.'''
self.flush(True)
if self.first_sync: if self.first_sync:
self.first_sync = False
self.logger.info('{} synced to height {:,d}' self.logger.info('{} synced to height {:,d}'
.format(VERSION, self.height)) .format(VERSION, self.height))
self.flush(True) self.first_sync = False
self.flush_state(self.db)
self.reopen_db(False) self.reopen_db(False)
else:
self.flush(True)
self.event.set() self.event.set()
async def handle_chain_reorg(self, count, touched): async def handle_chain_reorg(self, touched, count=None):
'''Handle a chain reorganisation. '''Handle a chain reorganisation.
Count is the number of blocks to simulate a reorg, or None for Count is the number of blocks to simulate a reorg, or None for

Loading…
Cancel
Save