From e91f49101b00db0dce5de66b7cc9876482990658 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Mon, 19 Dec 2016 23:25:37 +0900 Subject: [PATCH] Don't start processing mempool until caught up Print server manager settings once servers start --- server/block_processor.py | 8 ++++---- server/protocol.py | 23 ++++++++++++----------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index b0fade0..10d5153 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -77,7 +77,7 @@ class Prefetcher(LoggedClass): self.refill_event.set() return blocks - async def main_loop(self): + async def main_loop(self, caught_up_event): '''Loop forever polling for more blocks.''' daemon_height = await self.daemon.height() if self.fetched_height >= daemon_height: @@ -89,7 +89,7 @@ class Prefetcher(LoggedClass): while True: try: with await self.semaphore: - await self._prefetch_blocks() + await self._prefetch_blocks(caught_up_event.is_set()) await self.refill_event.wait() except DaemonError as e: self.logger.info('ignoring daemon error: {}'.format(e)) @@ -97,13 +97,13 @@ class Prefetcher(LoggedClass): await self.clear(-1) return - async def _prefetch_blocks(self): + async def _prefetch_blocks(self, mempool): '''Prefetch some blocks and put them on the queue. Repeats until the queue is full or caught up. If caught up, sleep for a period of time before returning. ''' - daemon_height = await self.daemon.height(mempool=self.caught_up) + daemon_height = await self.daemon.height(mempool) while self.cache_size < self.min_cache_size: # Try and catch up all blocks but limit to room in cache. # Constrain fetch count to between 0 and 2500 regardless. diff --git a/server/protocol.py b/server/protocol.py index c590fb1..c4efbbc 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -79,16 +79,6 @@ class ServerManager(util.LoggedClass): self.futures = [] env.max_send = max(350000, env.max_send) self.setup_bands() - self.logger.info('max session count: {:,d}'.format(self.max_sessions)) - self.logger.info('session timeout: {:,d} seconds' - .format(env.session_timeout)) - self.logger.info('session bandwidth limit {:,d} bytes' - .format(env.bandwidth_limit)) - self.logger.info('max response size {:,d} bytes'.format(env.max_send)) - self.logger.info('max subscriptions across all sessions: {:,d}' - .format(self.max_subs)) - self.logger.info('max subscriptions per session: {:,d}' - .format(env.max_session_subs)) async def mempool_transactions(self, hash168): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool @@ -186,7 +176,7 @@ class ServerManager(util.LoggedClass): # shutdown() assumes bp.main_loop() is first add_future(self.bp.main_loop(self.mempool.touched)) - add_future(self.bp.prefetcher.main_loop()) + add_future(self.bp.prefetcher.main_loop(self.bp.caught_up_event)) add_future(self.irc.start(self.bp.caught_up_event)) add_future(self.start_servers(self.bp.caught_up_event)) add_future(self.mempool.main_loop()) @@ -231,6 +221,17 @@ class ServerManager(util.LoggedClass): if self.env.rpc_port is not None: await self.start_server('RPC', 'localhost', self.env.rpc_port) await caught_up.wait() + self.logger.info('max session count: {:,d}'.format(self.max_sessions)) + self.logger.info('session timeout: {:,d} seconds' + .format(self.env.session_timeout)) + self.logger.info('session bandwidth limit {:,d} bytes' + .format(self.env.bandwidth_limit)) + self.logger.info('max response size {:,d} bytes' + .format(self.env.max_send)) + self.logger.info('max subscriptions across all sessions: {:,d}' + .format(self.max_subs)) + self.logger.info('max subscriptions per session: {:,d}' + .format(self.env.max_session_subs)) await self.start_external_servers() async def start_external_servers(self):