Browse Source

Make mempool truly asynchronous

Mempool updates happen in parallel asynchronously to processing of
new blocks once caught up.  This means that, e.g., during the initial
slow mempool download incoming blocks can be processed and communicated
to clients without waiting for the downloaded mempool transaction
analysis to complete.  From a client's point of view the server won't
be seen as lagging.
master
Neil Booth 8 years ago
parent
commit
98fd178c62
  1. 97
      server/block_processor.py
  2. 45
      server/protocol.py

97
server/block_processor.py

@ -43,8 +43,8 @@ class Prefetcher(LoggedClass):
self.semaphore = asyncio.Semaphore()
self.queue = asyncio.Queue()
self.queue_size = 0
self.caught_up = False
self.fetched_height = height
self.mempool_hashes = []
# Target cache size. Has little effect on sync time.
self.target_cache_size = 10 * 1024 * 1024
# First fetch to be 10 blocks
@ -64,13 +64,14 @@ class Prefetcher(LoggedClass):
self.fetched_height = height
async def get_blocks(self):
'''Returns a list of prefetched blocks and the mempool.'''
blocks, height, size = await self.queue.get()
'''Blocking function that returns prefetched blocks.
The returned result empty just once - when the prefetcher
has caught up with the daemon.
'''
blocks, size = await self.queue.get()
self.queue_size -= size
if height == self.daemon.cached_height():
return blocks, self.mempool_hashes
else:
return blocks, None
return blocks
async def main_loop(self):
'''Loop forever polling for more blocks.'''
@ -78,39 +79,19 @@ class Prefetcher(LoggedClass):
.format(await self.daemon.height()))
while True:
try:
if await self._caught_up():
await asyncio.sleep(5)
else:
await asyncio.sleep(0)
with await self.semaphore:
await self._prefetch()
await asyncio.sleep(5 if self.caught_up else 0)
except DaemonError as e:
self.logger.info('ignoring daemon error: {}'.format(e))
except asyncio.CancelledError:
break
async def _caught_up(self):
'''Poll for new blocks and mempool state.
Mempool is only queried if caught up with daemon.'''
with await self.semaphore:
blocks, size = await self._prefetch()
self.fetched_height += len(blocks)
caught_up = self.fetched_height == self.daemon.cached_height()
if caught_up:
self.mempool_hashes = await self.daemon.mempool_hashes()
# Wake up block processor if we have something
if blocks or caught_up:
self.queue.put_nowait((blocks, self.fetched_height, size))
self.queue_size += size
return caught_up
async def _prefetch(self):
'''Prefetch blocks unless the prefetch queue is full.'''
if self.queue_size >= self.target_cache_size:
return [], 0
return
caught_up = self.daemon.cached_height() == self.fetched_height
daemon_height = await self.daemon.height()
cache_room = self.target_cache_size // self.ave_size
@ -119,15 +100,18 @@ class Prefetcher(LoggedClass):
count = min(daemon_height - self.fetched_height, cache_room)
count = min(4000, max(count, 0))
if not count:
return [], 0
# Indicate when we have caught up for the first time only
if not self.caught_up:
self.caught_up = True
self.queue.put_nowait(([], 0))
return
first = self.fetched_height + 1
hex_hashes = await self.daemon.block_hex_hashes(first, count)
if caught_up:
if self.caught_up:
self.logger.info('new block height {:,d} hash {}'
.format(first + count - 1, hex_hashes[-1]))
blocks = await self.daemon.raw_blocks(hex_hashes)
size = sum(len(block) for block in blocks)
# Update our recent average block size estimate
@ -136,7 +120,9 @@ class Prefetcher(LoggedClass):
else:
self.ave_size = (size + (10 - count) * self.ave_size) // 10
return blocks, size
self.fetched_height += len(blocks)
self.queue.put_nowait((blocks, size))
self.queue_size += size
class ChainReorg(Exception):
@ -162,6 +148,7 @@ class BlockProcessor(server.db.DB):
self.daemon = Daemon(env.daemon_url, env.debug)
self.daemon.debug_set_height(self.height)
self.caught_up = False
self.touched = set()
self.futures = []
@ -223,41 +210,51 @@ class BlockProcessor(server.db.DB):
await asyncio.sleep(0)
async def _wait_for_update(self):
'''Wait for the prefetcher to deliver blocks or a mempool update.
'''Wait for the prefetcher to deliver blocks.
Blocks are only processed in the forward direction. The
prefetcher only provides a non-None mempool when caught up.
Blocks are only processed in the forward direction.
'''
blocks, mempool_hashes = await self.prefetcher.get_blocks()
blocks = await self.prefetcher.get_blocks()
if not blocks:
await self.first_caught_up()
return
'''Strip the unspendable genesis coinbase.'''
if self.height == -1:
blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1)
caught_up = mempool_hashes is not None
try:
for block in blocks:
self.advance_block(block, caught_up)
if not caught_up and time.time() > self.next_cache_check:
self.check_cache_size()
self.next_cache_check = time.time() + 60
self.advance_block(block, self.caught_up)
await asyncio.sleep(0) # Yield
if caught_up:
await self.caught_up(mempool_hashes)
self.touched = set()
except ChainReorg:
await self.handle_chain_reorg()
async def caught_up(self, mempool_hashes):
if self.caught_up:
# Flush everything as queries are performed on the DB and
# not in-memory.
self.flush(True)
self.notify(self.touched)
elif time.time() > self.next_cache_check:
self.check_cache_size()
self.next_cache_check = time.time() + 60
self.touched = set()
async def first_caught_up(self):
'''Called after each deamon poll if caught up.'''
# Caught up to daemon height. Flush everything as queries
# are performed on the DB and not in-memory.
self.caught_up = True
if self.first_sync:
self.first_sync = False
self.logger.info('{} synced to height {:,d}. DB version:'
.format(VERSION, self.height, self.db_version))
self.flush(True)
def notify(self, touched):
'''Called with list of touched addresses by new blocks.
Only called for blocks found after first_caught_up is called.
Intended to be overridden in derived classes.'''
async def handle_chain_reorg(self):
# First get all state on disk
self.logger.info('chain reorg detected')

45
server/protocol.py

@ -38,15 +38,16 @@ class BlockServer(BlockProcessor):
super().__init__(env)
self.server_mgr = ServerManager(self, env)
self.mempool = MemPool(self)
self.caught_up_yet = False
async def caught_up(self, mempool_hashes):
# Call the base class to flush before doing anything else.
await super().caught_up(mempool_hashes)
if not self.caught_up_yet:
await self.server_mgr.start_servers()
self.caught_up_yet = True
self.touched.update(await self.mempool.update(mempool_hashes))
async def first_caught_up(self):
# Call the base class to flush and log first
await super().first_caught_up()
await self.server_mgr.start_servers()
self.futures.append(self.mempool.start())
def notify(self, touched):
'''Called when addresses are touched by new blocks or mempool
updates.'''
self.server_mgr.notify(self.height, self.touched)
def on_cancel(self):
@ -97,13 +98,29 @@ class MemPool(LoggedClass):
self.bp = bp
self.count = -1
async def update(self, hex_hashes):
def start(self):
'''Starts the mempool synchronization mainloop. Return a future.'''
return asyncio.ensure_future(self.main_loop())
async def main_loop(self):
'''Asynchronously maintain mempool status with daemon.'''
self.logger.info('maintaining state with daemon...')
while True:
try:
await self.update()
await asyncio.sleep(5)
except DaemonError as e:
self.logger.info('ignoring daemon error: {}'.format(e))
except asyncio.CancelledError:
break
async def update(self):
'''Update state given the current mempool to the passed set of hashes.
Remove transactions that are no longer in our mempool.
Request new transactions we don't have then add to our mempool.
'''
hex_hashes = set(hex_hashes)
hex_hashes = set(await self.bp.daemon.mempool_hashes())
touched = set()
missing_utxos = []
@ -210,8 +227,7 @@ class MemPool(LoggedClass):
self.logger.info('{:,d} txs touching {:,d} addresses'
.format(len(self.txs), len(self.hash168s)))
# Might include a None
return touched
self.bp.notify(touched)
def transactions(self, hash168):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
@ -310,11 +326,12 @@ class ServerManager(LoggedClass):
def stop(self):
'''Close listening servers.'''
self.logger.info('cleanly closing client sessions, please wait...')
for server in self.servers:
server.close()
if self.irc_future:
self.irc_future.cancel()
if self.sessions:
self.logger.info('cleanly closing client sessions, please wait...')
for session in self.sessions:
self.close_session(session)

Loading…
Cancel
Save