Browse Source

Rework futures and event handling

master
Neil Booth 8 years ago
parent
commit
9e220820aa
  1. 31
      server/block_processor.py
  2. 10
      server/irc.py
  3. 65
      server/protocol.py

31
server/block_processor.py

@ -147,6 +147,7 @@ class BlockProcessor(server.db.DB):
self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url))
self.caught_up = False
self.event = asyncio.Event()
self.touched = set()
# Meta
@ -182,27 +183,23 @@ class BlockProcessor(server.db.DB):
self.clean_db()
async def main_loop(self):
'''Main loop for block processing.
Safely flushes the DB on clean shutdown.
'''
prefetcher_loop = asyncio.ensure_future(self.prefetcher.main_loop())
# Simulate a reorg if requested
if self.env.force_reorg > 0:
self.logger.info('DEBUG: simulating chain reorg of {:,d} blocks'
.format(self.env.force_reorg))
await self.handle_chain_reorg(self.env.force_reorg)
'''Main loop for block processing.'''
try:
# Simulate a reorg if requested
if self.env.force_reorg > 0:
self.logger.info('DEBUG: simulating reorg of {:,d} blocks'
.format(self.env.force_reorg))
await self.handle_chain_reorg(self.env.force_reorg)
while True:
await self._wait_for_update()
except asyncio.CancelledError:
pass
prefetcher_loop.cancel()
async def shutdown(self):
'''Shut down the DB cleanly.'''
self.logger.info('flushing state to DB for clean shutdown...')
self.flush(True)
await self.client.shutdown()
async def _wait_for_update(self):
'''Wait for the prefetcher to deliver blocks.
@ -211,7 +208,7 @@ class BlockProcessor(server.db.DB):
'''
blocks = await self.prefetcher.get_blocks()
if not blocks:
await self.first_caught_up()
self.first_caught_up()
return
'''Strip the unspendable genesis coinbase.'''
@ -235,7 +232,7 @@ class BlockProcessor(server.db.DB):
self.next_cache_check = time.time() + 60
self.touched = set()
async def first_caught_up(self):
def first_caught_up(self):
'''Called when first caught up after start, or after a reorg.'''
self.caught_up = True
if self.first_sync:
@ -243,7 +240,7 @@ class BlockProcessor(server.db.DB):
self.logger.info('{} synced to height {:,d}. DB version:'
.format(VERSION, self.height, self.db_version))
self.flush(True)
await self.client.first_caught_up()
self.event.set()
async def handle_chain_reorg(self, count):
'''Handle a chain reorganisation.

10
server/irc.py

@ -54,10 +54,16 @@ class IRC(LoggedClass):
self.irc_port = env.coin.IRC_PORT
self.peer_regexp = re.compile('({}[^!]*)!'.format(self.prefix))
self.peers = {}
self.disabled = env.irc is None
async def start(self):
async def start(self, caught_up):
'''Start IRC connections once caught up if enabled in environment.'''
await caught_up.wait()
try:
await self.join()
if self.disabled:
self.logger.info('IRC is disabled')
else:
await self.join()
except asyncio.CancelledError:
pass
except Exception as e:

65
server/protocol.py

@ -52,12 +52,11 @@ class MemPool(util.LoggedClass):
self.hash168s = defaultdict(set) # None can be a key
self.count = -1
def start(self):
'''Starts the mempool synchronization mainloop. Return a future.'''
return asyncio.ensure_future(self.main_loop())
async def main_loop(self, caught_up):
'''Asynchronously maintain mempool status with daemon.
async def main_loop(self):
'''Asynchronously maintain mempool status with daemon.'''
Waits until the caught up event is signalled.'''
await caught_up.wait()
self.logger.info('maintaining state with daemon...')
while True:
try:
@ -181,7 +180,7 @@ class MemPool(util.LoggedClass):
self.logger.info('{:,d} txs touching {:,d} addresses'
.format(len(self.txs), len(self.hash168s)))
self.maanger.notify(touched)
self.manager.notify(touched)
def transactions(self, hash168):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
@ -220,15 +219,14 @@ class ServerManager(util.LoggedClass):
def __init__(self, env):
super().__init__()
self.bp = BlockProcessor(self, env)
self.mempool = MemPool(self.db.daemon, env.coin, self.bp, self)
self.mempool = MemPool(self.bp.daemon, env.coin, self.bp, self)
self.irc = IRC(env)
self.env = env
self.servers = []
self.irc = IRC(env)
self.sessions = {}
self.max_subs = env.max_subs
self.subscription_count = 0
self.irc_future = None
self.mempool_future = None
self.futures = []
self.logger.info('max subscriptions across all sessions: {:,d}'
.format(self.max_subs))
self.logger.info('max subscriptions per session: {:,d}'
@ -249,6 +247,24 @@ class ServerManager(util.LoggedClass):
'''
return self.mempool.value(hash168)
async def main_loop(self):
'''Server manager main loop.'''
def add_future(coro):
self.futures.append(asyncio.ensure_future(coro))
add_future(self.bp.main_loop())
add_future(self.bp.prefetcher.main_loop())
add_future(self.mempool.main_loop(self.bp.event))
add_future(self.irc.start(self.bp.event))
add_future(self.start_servers(self.bp.event))
for future in asyncio.as_completed(self.futures):
try:
await future # Note: future is not one of self.futures
except asyncio.CancelledError:
break
await self.shutdown()
async def start_server(self, kind, *args, **kw_args):
loop = asyncio.get_event_loop()
protocol_class = LocalRPC if kind == 'RPC' else ElectrumX
@ -265,12 +281,14 @@ class ServerManager(util.LoggedClass):
self.logger.info('{} server listening on {}:{:d}'
.format(kind, host, port))
async def start_servers(self):
async def start_servers(self, caught_up):
'''Connect to IRC and start listening for incoming connections.
Only connect to IRC if enabled. Start listening on RCP, TCP
and SSL ports only if the port wasn pecified.
and SSL ports only if the port wasn't pecified. Waits for the
caught_up event to be signalled.
'''
await caught_up.wait()
env = self.env
if env.rpc_port is not None:
@ -285,16 +303,6 @@ class ServerManager(util.LoggedClass):
sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile)
await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc)
if env.irc:
self.irc_future = asyncio.ensure_future(self.irc.start())
else:
self.logger.info('IRC disabled')
async def first_caught_up(self):
if not self.mempool_future:
self.mempool_future = self.mempool.start()
await self.server_mgr.start_servers()
def notify(self, touched):
'''Notify sessions about height changes and touched addresses.'''
cache = {}
@ -305,18 +313,17 @@ class ServerManager(util.LoggedClass):
async def shutdown(self):
'''Call to shutdown the servers. Returns when done.'''
for future in self.futures:
future.cancel()
for server in self.servers:
server.close()
for server in self.servers:
await server.wait_closed()
self.servers = []
if self.irc_future:
self.irc_future.cancel()
if self.mempool_future:
self.mempool_future.cancel()
self.servers = [] # So add_session closes new sessions
while not all(future.done() for future in self.futures):
await asyncio.sleep(0)
if self.sessions:
await self.close_sessions()
await self.bp.shutdown()
async def close_sessions(self, secs=60):
self.logger.info('cleanly closing client sessions, please wait...')

Loading…
Cancel
Save