From 86f6a148b95347e476d10aa5f381350e0aef6784 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 19 Feb 2017 14:19:26 +0900 Subject: [PATCH] Separate async item processor per session Improve daemon wait logic Fixes #100 --- electrumx_rpc.py | 6 +-- lib/jsonrpc.py | 27 +++++----- server/controller.py | 120 ++++++++++++++++--------------------------- server/daemon.py | 72 ++++++++++++++------------ server/peers.py | 12 +++-- server/session.py | 5 -- 6 files changed, 106 insertions(+), 136 deletions(-) diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 8cffa3b..6a193f3 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -26,13 +26,9 @@ class RPCClient(JSONSession): super().__init__(version=JSONRPCv2) self.max_send = 0 self.max_buffer_size = 5*10**6 - self.event = asyncio.Event() - - def have_pending_items(self): - self.event.set() async def wait_for_response(self): - await self.event.wait() + await self.items_event.wait() await self.process_pending_items() def send_rpc_request(self, method, params): diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 489a528..6811565 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -258,7 +258,7 @@ class JSONSessionBase(util.LoggedClass): from empty ''' _next_session_id = 0 - _pending_reqs = {} + _pending_reqs = {} # Outgoing requests waiting for a response @classmethod def next_session_id(cls): @@ -320,6 +320,7 @@ class JSONSessionBase(util.LoggedClass): self.pause = False # Handling of incoming items self.items = collections.deque() + self.items_event = asyncio.Event() self.batch_results = [] # Handling of outgoing requests self.next_request_id = 0 @@ -461,10 +462,8 @@ class JSONSessionBase(util.LoggedClass): self.send_error('empty batch', JSONRPC.INVALID_REQUEST) return - # Incoming items get queued for later asynchronous processing. - if not self.items: - self.have_pending_items() self.items.append(payload) + self.items_event.set() async def process_batch(self, batch, count): '''Processes count items from the batch according to the JSON 2.0 @@ -626,6 +625,9 @@ class JSONSessionBase(util.LoggedClass): if binary: self.send_binary(binary) + if not self.items: + self.items_event.clear() + def count_pending_items(self): '''Counts the number of pending items.''' return sum(len(item) if isinstance(item, list) else 1 @@ -716,15 +718,6 @@ class JSONSessionBase(util.LoggedClass): # App layer - def have_pending_items(self): - '''Called to indicate there are items pending to be processed - asynchronously by calling process_pending_items. - - This is *not* called every time an item is added, just when - there were previously none and now there is at least one. - ''' - raise NotImplementedError - def using_bandwidth(self, amount): '''Called as bandwidth is consumed. @@ -749,8 +742,12 @@ class JSONSession(JSONSessionBase, asyncio.Protocol): '''A JSONSessionBase instance specialized for use with asyncio.protocol to implement the transport layer. - Derived classes must provide have_pending_items() and may want to - override the request and notification handlers. + The app should await on items_event, which is set when unprocessed + incoming items remain and cleared when the queue is empty, and + then arrange to call process_pending_items asynchronously. + + Derived classes may want to override the request and notification + handlers. ''' def __init__(self, version=JSONRPCCompat): diff --git a/server/controller.py b/server/controller.py index 067cb95..ab05da7 100644 --- a/server/controller.py +++ b/server/controller.py @@ -69,9 +69,6 @@ class Controller(util.LoggedClass): self.next_stale_check = 0 self.history_cache = pylru.lrucache(256) self.header_cache = pylru.lrucache(8) - self.queue = asyncio.PriorityQueue() - self.delayed_sessions = [] - self.next_queue_id = 0 self.cache_height = 0 env.max_send = max(350000, env.max_send) self.setup_bands() @@ -136,67 +133,6 @@ class Controller(util.LoggedClass): def is_deprioritized(self, session): return self.session_priority(session) > self.BANDS - async def enqueue_delayed_sessions(self): - while True: - now = time.time() - keep = [] - for pair in self.delayed_sessions: - timeout, item = pair - priority, queue_id, session = item - if not session.pause and timeout <= now: - self.queue.put_nowait(item) - else: - keep.append(pair) - self.delayed_sessions = keep - - # If paused and session count has fallen, start listening again - if (len(self.sessions) <= self.low_watermark - and self.state == self.PAUSED): - await self.start_external_servers() - - # Periodically log sessions - if self.env.log_sessions and time.time() > self.next_log_sessions: - if self.next_log_sessions: - data = self.session_data(for_log=True) - for line in Controller.sessions_text_lines(data): - self.logger.info(line) - self.logger.info(json.dumps(self.getinfo())) - self.next_log_sessions = time.time() + self.env.log_sessions - - await asyncio.sleep(1) - - def enqueue_session(self, session): - # Might have disconnected whilst waiting - if session not in self.sessions: - return - priority = self.session_priority(session) - item = (priority, self.next_queue_id, session) - self.next_queue_id += 1 - - excess = max(0, priority - self.BANDS) - if excess != session.last_delay: - session.last_delay = excess - if excess: - session.log_info('high bandwidth use, deprioritizing by ' - 'delaying responses {:d}s'.format(excess)) - else: - session.log_info('stopped delaying responses') - delay = max(int(session.pause), excess) - if delay: - self.delayed_sessions.append((time.time() + delay, item)) - else: - self.queue.put_nowait(item) - - async def serve_requests(self): - '''Asynchronously run through the task queue.''' - while True: - priority_, id_, session = await self.queue.get() - if session in self.sessions: - await session.process_pending_items() - # Re-enqueue the session if stuff is left - if session.items: - self.enqueue_session(session) - async def run_in_executor(self, func, *args): '''Wait whilst running func in the executor.''' return await self.loop.run_in_executor(None, func, *args) @@ -225,11 +161,30 @@ class Controller(util.LoggedClass): except Exception: self.log_error(traceback.format_exc()) - async def check_request_timeouts(self): - '''Regularly check pending JSON requests for timeouts.''' + async def housekeeping(self): + '''Regular housekeeping checks.''' + n = 0 while True: - await asyncio.sleep(30) + n += 1 + await asyncio.sleep(15) JSONSessionBase.timeout_check() + if n % 10 == 0: + self.clear_stale_sessions() + + # Start listening for incoming connections if paused and + # session count has fallen + if (self.state == self.PAUSED and + len(self.sessions) <= self.low_watermark): + await self.start_external_servers() + + # Periodically log sessions + if self.env.log_sessions and time.time() > self.next_log_sessions: + if self.next_log_sessions: + data = self.session_data(for_log=True) + for line in Controller.sessions_text_lines(data): + self.logger.info(line) + self.logger.info(json.dumps(self.getinfo())) + self.next_log_sessions = time.time() + self.env.log_sessions async def wait_for_bp_catchup(self): '''Called when the block processor catches up.''' @@ -237,12 +192,9 @@ class Controller(util.LoggedClass): self.logger.info('block processor has caught up') self.ensure_future(self.peer_mgr.main_loop()) self.ensure_future(self.start_servers()) - self.ensure_future(self.check_request_timeouts()) + self.ensure_future(self.housekeeping()) self.ensure_future(self.mempool.main_loop()) - self.ensure_future(self.enqueue_delayed_sessions()) self.ensure_future(self.notify()) - for n in range(4): - self.ensure_future(self.serve_requests()) async def main_loop(self): '''Controller main loop.''' @@ -379,11 +331,28 @@ class Controller(util.LoggedClass): self.header_cache[height] = header return header + def session_delay(self, session): + priority = self.session_priority(session) + excess = max(0, priority - self.BANDS) + if excess != session.last_delay: + session.last_delay = excess + if excess: + session.log_info('high bandwidth use, deprioritizing by ' + 'delaying responses {:d}s'.format(excess)) + else: + session.log_info('stopped delaying responses') + return max(int(session.pause), excess) + + async def process_items(self, session): + '''Waits for incoming session items and processes them.''' + while True: + await session.items_event.wait() + await asyncio.sleep(self.session_delay(session)) + if not session.pause: + await session.process_pending_items() + def add_session(self, session): - now = time.time() - if now > self.next_stale_check: - self.next_stale_check = now + 300 - self.clear_stale_sessions() + session.items_future = self.ensure_future(self.process_items(session)) gid = int(session.start_time - self.start_time) // 900 self.groups[gid].append(session) self.sessions[session] = gid @@ -400,6 +369,7 @@ class Controller(util.LoggedClass): def remove_session(self, session): '''Remove a session from our sessions list if there.''' + session.items_future.cancel() if session in self.sessions: gid = self.sessions.pop(session) assert gid in self.groups diff --git a/server/daemon.py b/server/daemon.py index d52e973..dccca40 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -10,6 +10,7 @@ daemon.''' import asyncio import json +import time import traceback import aiohttp @@ -38,6 +39,8 @@ class Daemon(util.LoggedClass): # Limit concurrent RPC calls to this number. # See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16 self.workqueue_semaphore = asyncio.Semaphore(value=10) + self.down = False + self.last_error_time = 0 def set_urls(self, urls): '''Set the URLS to the given list, and switch to the first one.''' @@ -65,48 +68,56 @@ class Daemon(util.LoggedClass): return True return False + async def _send_data(self, data): + async with self.workqueue_semaphore: + async with aiohttp.ClientSession() as session: + async with session.post(self.url(), data=data) as resp: + # If bitcoind can't find a tx, for some reason + # it returns 500 but fills out the JSON. + # Should still return 200 IMO. + if resp.status in (200, 500): + return await resp.json() + return (resp.status, resp.reason) + async def _send(self, payload, processor): '''Send a payload to be converted to JSON. Handles temporary connection issues. Daemon reponse errors are raise through DaemonError. ''' - self.prior_msg = None - self.skip_count = None - - def log_error(msg, skip_once=False): - if skip_once and self.skip_count is None: - self.skip_count = 1 - if msg != self.prior_msg or self.skip_count == 0: - self.skip_count = 10 - self.prior_msg = msg - self.logger.error('{} Retrying between sleeps...' - .format(msg)) - self.skip_count -= 1 + def log_error(error): + self.down = True + now = time.time() + prior_time = self.last_error_time + if now - prior_time > 60: + self.last_error_time = now + if prior_time and self.failover(): + secs = 0 + else: + self.logger.error('{} Retrying occasionally...' + .format(error)) data = json.dumps(payload) secs = 1 - max_secs = 16 + max_secs = 4 while True: try: - async with self.workqueue_semaphore: - async with aiohttp.post(self.url(), data=data) as resp: - # If bitcoind can't find a tx, for some reason - # it returns 500 but fills out the JSON. - # Should still return 200 IMO. - if resp.status in (200, 500): - if self.prior_msg: - self.logger.info('connection restored') - result = processor(await resp.json()) - return result + result = await self._send_data(data) + if not isinstance(result, tuple): + result = processor(result) + if self.down: + self.down = False + self.last_error_time = 0 + self.logger.info('connection restored') + return result log_error('HTTP error code {:d}: {}' - .format(resp.status, resp.reason)) + .format(result[0], result[1])) except asyncio.TimeoutError: - log_error('timeout error.', skip_once=True) + log_error('timeout error.') except aiohttp.ClientHttpProcessingError: - log_error('HTTP error.', skip_once=True) + log_error('HTTP error.') except aiohttp.ServerDisconnectedError: - log_error('disconnected.', skip_once=True) + log_error('disconnected.') except aiohttp.ClientConnectionError: log_error('connection problem - is your daemon running?') except self.DaemonWarmingUpError: @@ -116,11 +127,8 @@ class Daemon(util.LoggedClass): except Exception: self.log_error(traceback.format_exc()) - if secs >= max_secs and self.failover(): - secs = 1 - else: - await asyncio.sleep(secs) - secs = min(max_secs, secs * 2) + await asyncio.sleep(secs) + secs = min(max_secs, secs * 2, 1) def logged_url(self, url=None): '''The host and port part, for logging.''' diff --git a/server/peers.py b/server/peers.py index 6f2ed55..dc9a035 100644 --- a/server/peers.py +++ b/server/peers.py @@ -59,13 +59,16 @@ class PeerSession(JSONSession): self.failed = False self.log_prefix = '[{}] '.format(self.peer) - def have_pending_items(self): - self.peer_mgr.ensure_future(self.process_pending_items()) + async def wait_on_items(self): + while True: + await self.items_event.wait() + await self.process_pending_items() def connection_made(self, transport): '''Handle an incoming client connection.''' super().connection_made(transport) self.log_prefix = '[{}] '.format(str(self.peer)[:25]) + self.future = self.peer_mgr.ensure_future(self.wait_on_items()) # Update IP address if not self.peer.is_tor: @@ -82,6 +85,7 @@ class PeerSession(JSONSession): def connection_lost(self, exc): '''Handle disconnection.''' super().connection_lost(exc) + self.future.cancel() self.peer_mgr.connection_lost(self) def on_peers_subscribe(self, result, error): @@ -306,8 +310,8 @@ class PeerManager(util.LoggedClass): '''Returns the server peers as a list of (ip, host, details) tuples. We return all peers we've connected to in the last day. - Additionally, if we don't have onion routing, we return up to - three randomly selected onion servers. + Additionally, if we don't have onion routing, we return a few + hard-coded onion servers. ''' cutoff = time.time() - STALE_SECS recent = [peer for peer in self.peers diff --git a/server/session.py b/server/session.py index 2b2a5c7..a9a4adc 100644 --- a/server/session.py +++ b/server/session.py @@ -47,11 +47,6 @@ class SessionBase(JSONSession): self.bw_used = 0 self.peer_added = False - def have_pending_items(self): - '''Called each time the pending item queue goes from empty to having - one item.''' - self.controller.enqueue_session(self) - def close_connection(self): '''Call this to close the connection.''' self.close_time = time.time()