From 3d2824218be9c8f26c810df3fbf3d7dc3776f5e0 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 3 Dec 2016 12:20:42 +0900 Subject: [PATCH 1/3] Close stale sessions New envvar SESSION_TIMEOUT A session with no activity is cut off after this time Fixes #56 --- docs/ENV-NOTES | 3 +++ lib/jsonrpc.py | 2 ++ server/env.py | 1 + server/protocol.py | 18 ++++++++++++++++++ 4 files changed, 24 insertions(+) diff --git a/docs/ENV-NOTES b/docs/ENV-NOTES index 26048a6..58baf97 100644 --- a/docs/ENV-NOTES +++ b/docs/ENV-NOTES @@ -83,6 +83,9 @@ BANDWIDTH_LIMIT - per-session periodic bandwith usage limit in bytes. end of each period. Currently the period is hard-coded to be one hour. The default limit value is 2 million bytes. +SESSION_TIMEOUT - an integer number of seconds defaulting to 600. + Sessions with no activity for longer than this are + disconnected. If you want IRC connectivity to advertise your node: diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 5688f84..7aa1764 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -79,6 +79,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): def __init__(self): super().__init__() self.start = time.time() + self.last_recv = self.start self.bandwidth_start = self.start self.bandwidth_interval = 3600 self.bandwidth_used = 0 @@ -155,6 +156,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): if npos == -1: self.parts.append(data) break + self.last_recv = time.time() self.recv_count += 1 tail, data = data[:npos], data[npos + 1:] parts, self.parts = self.parts, [] diff --git a/server/env.py b/server/env.py index 2b875a4..f13c56e 100644 --- a/server/env.py +++ b/server/env.py @@ -50,6 +50,7 @@ class Env(LoggedClass): self.max_subs = self.integer('MAX_SUBS', 250000) self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000) self.bandwidth_limit = self.integer('BANDWIDTH_LIMIT', 2000000) + self.session_timeout = self.integer('SESSION_TIMEOUT', 600) # IRC self.irc = self.default('IRC', False) self.irc_nick = self.default('IRC_NICK', None) diff --git a/server/protocol.py b/server/protocol.py index 20157a8..2ebc82d 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -228,8 +228,11 @@ class ServerManager(util.LoggedClass): self.next_log_sessions = 0 self.max_subs = env.max_subs self.subscription_count = 0 + self.next_stale_check = 0 self.futures = [] env.max_send = max(350000, env.max_send) + self.logger.info('session timeout: {:,d} seconds' + .format(env.session_timeout)) self.logger.info('session bandwidth limit {:,d} bytes' .format(env.bandwidth_limit)) self.logger.info('max response size {:,d} bytes'.format(env.max_send)) @@ -354,6 +357,7 @@ class ServerManager(util.LoggedClass): .format(len(self.sessions))) def add_session(self, session): + self.clear_stale_sessions() coro = session.serve_requests() future = asyncio.ensure_future(coro) self.sessions[session] = future @@ -373,6 +377,20 @@ class ServerManager(util.LoggedClass): future = self.sessions.pop(session) future.cancel() + def clear_stale_sessions(self): + '''Cut off sessions that haven't done anything for 10 minutes.''' + now = time.time() + if now > self.next_stale_check: + self.next_stale_check = now + 60 + cutoff = now - self.env.session_timeout + stale = [session for session in self.sessions + if session.last_recv < cutoff] + for session in stale: + self.close_session(session) + if stale: + self.logger.info('dropped {:,d} stale connections' + .format(len(stale))) + def new_subscription(self): if self.subscription_count >= self.max_subs: raise JSONRPC.RPCError('server subscription limit {:,d} reached' From c4e7878407c835d2f5925b58e75bd66ea7b0eb1a Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 3 Dec 2016 13:05:02 +0900 Subject: [PATCH 2/3] Add session ID for each session Show it in the logs with each session-specific message. Show the sessions ID in the sessions list with an L suffix if logging. Add RPC commands to toggle logging of, and disconnect, a session. Closes #55 --- lib/jsonrpc.py | 27 +++++++++------- lib/util.py | 10 ++++++ server/protocol.py | 81 ++++++++++++++++++++++++++++++++++------------ 3 files changed, 86 insertions(+), 32 deletions(-) diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 7aa1764..f58cf6c 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -64,6 +64,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): INTERNAL_ERROR = -32603 ID_TYPES = (type(None), str, numbers.Number) + NEXT_SESSION_ID = 0 class RPCError(Exception): '''RPC handlers raise this error.''' @@ -104,6 +105,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass): # If buffered incoming data exceeds this the connection is closed self.max_buffer_size = 1000000 self.anon_logs = False + self.id_ = JSONRPC.NEXT_SESSION_ID + JSONRPC.NEXT_SESSION_ID += 1 + self.log_prefix = '[{:d}] '.format(self.id_) + self.log_me = False def peername(self, *, for_log=True): '''Return the peer name of this connection.''' @@ -141,10 +146,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass): # Close abuvsive connections where buffered data exceeds limit buffer_size = len(data) + sum(len(part) for part in self.parts) if buffer_size > self.max_buffer_size: - self.logger.error('read buffer of {:,d} bytes exceeds {:,d} ' - 'byte limit, closing {}' - .format(buffer_size, self.max_buffer_size, - self.peername())) + self.log_error('read buffer of {:,d} bytes exceeds {:,d} ' + 'byte limit, closing {}' + .format(buffer_size, self.max_buffer_size, + self.peername())) self.transport.close() # Do nothing if this connection is closing @@ -186,6 +191,8 @@ class JSONRPC(asyncio.Protocol, LoggedClass): '''Queue the request for asynchronous handling.''' self.messages.put_nowait(message) + if self.log_me: + self.log_info('queued {}'.format(message)) def send_json_notification(self, method, params): '''Create a json notification.''' @@ -218,7 +225,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): data = (json.dumps(payload) + '\n').encode() except TypeError: msg = 'JSON encoding failure: {}'.format(payload) - self.logger.error(msg) + self.log_error(msg) self.send_json_error(msg, self.INTERNAL_ERROR, id_) else: if len(data) > max(1000, self.max_send): @@ -241,10 +248,9 @@ class JSONRPC(asyncio.Protocol, LoggedClass): excess = self.bandwidth_used - self.bandwidth_limit if excess > 0: secs = 1 + excess // self.bandwidth_limit - self.logger.warning('{} has high bandwidth use of {:,d} bytes, ' - 'sleeping {:d}s' - .format(self.peername(), self.bandwidth_used, - secs)) + self.log_warning('high bandwidth use of {:,d} bytes, ' + 'sleeping {:d}s' + .format(self.bandwidth_used, secs)) await asyncio.sleep(secs) if isinstance(message, list): @@ -256,8 +262,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): try: self.send_json(payload) except self.LargeRequestError: - self.logger.warning('blocked large request from {}: {}' - .format(self.peername(), message)) + self.log_warning('blocked large request {}'.format(message)) async def batch_payload(self, batch): '''Return the JSON payload corresponding to a batch JSON request.''' diff --git a/lib/util.py b/lib/util.py index 3feab9d..de0d2f7 100644 --- a/lib/util.py +++ b/lib/util.py @@ -20,6 +20,16 @@ class LoggedClass(object): def __init__(self): self.logger = logging.getLogger(self.__class__.__name__) self.logger.setLevel(logging.INFO) + self.log_prefix = '' + + def log_info(self, msg): + self.logger.info(self.log_prefix + msg) + + def log_warning(self, msg): + self.logger.warning(self.log_prefix + msg) + + def log_error(self, msg): + self.logger.error(self.log_prefix + msg) # Method decorator. To be used for calculations that will always diff --git a/server/protocol.py b/server/protocol.py index 2ebc82d..531b3e0 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -361,21 +361,30 @@ class ServerManager(util.LoggedClass): coro = session.serve_requests() future = asyncio.ensure_future(coro) self.sessions[session] = future - self.logger.info('connection from {}, {:,d} total' + session.log_info('connection from {}, {:,d} total' .format(session.peername(), len(self.sessions))) # Some connections are acknowledged after the servers are closed if not self.servers: self.close_session(session) + def remove_session(self, session): + self.subscription_count -= session.sub_count() + future = self.sessions.pop(session) + future.cancel() + def close_session(self, session): '''Close the session's transport and cancel its future.''' session.transport.close() self.sessions[session].cancel() + return '{:d} disconnected'.format(session.id_) - def remove_session(self, session): - self.subscription_count -= session.sub_count() - future = self.sessions.pop(session) - future.cancel() + def toggle_logging(self, session): + '''Close the session's transport and cancel its future.''' + session.log_me = not session.log_me + if session.log_me: + return 'logging {:d}'.format(session.id_) + else: + return 'not logging {:d}'.format(session.id_) def clear_stale_sessions(self): '''Cut off sessions that haven't done anything for 10 minutes.''' @@ -426,14 +435,15 @@ class ServerManager(util.LoggedClass): return ('{:3d}:{:02d}:{:02d}' .format(t // 3600, (t % 3600) // 60, t % 60)) - fmt = ('{:<4} {:>23} {:>15} {:>7} ' + fmt = ('{:<4} {:>7} {:>23} {:>15} {:>7} ' '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') - yield fmt.format('Type', 'Peer', 'Client', 'Subs', + yield fmt.format('Type', 'ID ', 'Peer', 'Client', 'Subs', 'Recv', 'Recv KB', 'Sent', 'Sent KB', 'Txs', 'Time') - for (kind, peer, subs, client, recv_count, recv_size, + for (kind, id_, log_me, peer, subs, client, recv_count, recv_size, send_count, send_size, txs_sent, time) in data: - yield fmt.format(kind, peer, client, + yield fmt.format(kind, str(id_) + ('L' if log_me else ' '), + peer, client, '{:,d}'.format(subs), '{:,d}'.format(recv_count), '{:,d}'.format(recv_size // 1024), @@ -447,6 +457,8 @@ class ServerManager(util.LoggedClass): now = time.time() sessions = sorted(self.sessions.keys(), key=lambda s: s.start) return [(session.kind, + session.id_, + session.log_me, session.peername(for_log=for_log), session.sub_count(), session.client, @@ -456,6 +468,33 @@ class ServerManager(util.LoggedClass): now - session.start) for session in sessions] + def lookup_session(self, param): + try: + id_ = int(param) + except: + pass + else: + for session in self.sessions: + if session.id_ == id_: + return session + return None + + def for_each_session(self, params, operation): + result = [] + for param in params: + session = self.lookup_session(param) + if session: + result.append(operation(session)) + else: + result.append('unknown session: {}'.format(param)) + return result + + async def rpc_disconnect(self, params): + return self.for_each_session(params, self.close_session) + + async def rpc_log(self, params): + return self.for_each_session(params, self.toggle_logging) + async def rpc_getinfo(self, params): return self.server_summary() @@ -503,10 +542,10 @@ class Session(JSONRPC): '''Handle client disconnection.''' super().connection_lost(exc) if self.error_count or self.send_size >= 1024*1024: - self.logger.info('{} disconnected. ' - 'Sent {:,d} bytes in {:,d} messages {:,d} errors' - .format(self.peername(), self.send_size, - self.send_count, self.error_count)) + self.log_info('disconnected. Sent {:,d} bytes in {:,d} messages ' + '{:,d} errors' + .format(self.send_size, self.send_count, + self.error_count)) self.manager.remove_session(self) async def handle_request(self, method, params): @@ -532,7 +571,7 @@ class Session(JSONRPC): break except Exception: # Getting here should probably be considered a bug and fixed - self.logger.error('error handling request {}'.format(message)) + self.log_error('error handling request {}'.format(message)) traceback.print_exc() def sub_count(self): @@ -656,8 +695,7 @@ class ElectrumX(Session): self.send_json(payload) if matches: - self.logger.info('notified {} of {} addresses' - .format(self.peername(), len(matches))) + self.log_info('notified of {:,d} addresses'.format(len(matches))) def height(self): '''Return the current flushed database height.''' @@ -843,12 +881,12 @@ class ElectrumX(Session): tx_hash = await self.daemon.sendrawtransaction(params) self.txs_sent += 1 self.manager.txs_sent += 1 - self.logger.info('sent tx: {}'.format(tx_hash)) + self.log_info('sent tx: {}'.format(tx_hash)) return tx_hash except DaemonError as e: error = e.args[0] message = error['message'] - self.logger.info('sendrawtransaction: {}'.format(message)) + self.log_info('sendrawtransaction: {}'.format(message)) if 'non-mandatory-script-verify-flag' in message: return ( 'Your client produced a transaction that is not accepted ' @@ -904,8 +942,8 @@ class ElectrumX(Session): with codecs.open(self.env.banner_file, 'r', 'utf-8') as f: banner = f.read() except Exception as e: - self.logger.error('reading banner file {}: {}' - .format(self.env.banner_file, e)) + self.log_error('reading banner file {}: {}' + .format(self.env.banner_file, e)) else: network_info = await self.daemon.getnetworkinfo() version = network_info['version'] @@ -950,7 +988,8 @@ class LocalRPC(Session): def __init__(self, *args): super().__init__(*args) - cmds = 'getinfo sessions numsessions peers numpeers'.split() + cmds = ('disconnect getinfo log numpeers numsessions peers sessions' + .split()) self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd)) for cmd in cmds} self.client = 'RPC' From 33b1ce8f6dc3807476a4db93243942e9039141ae Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 3 Dec 2016 14:39:36 +0900 Subject: [PATCH 3/3] Prepare 0.8.0 --- RELEASE-NOTES | 19 +++++++++++++++++++ server/version.py | 2 +- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/RELEASE-NOTES b/RELEASE-NOTES index 731d580..5343e4e 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,3 +1,22 @@ +verion 0.8.0 +------------ + +- stale connections are periodically closed. See docs/ENV-NOTES for + SESSION_TIMEOUT, default is 10 minutes. Issue #56. +- each session gets its own ID which is used in the logs instead of its + network address; the network address is still shown on initial connection. + Issue #55. +- the session ID is also shown in the sessions list. You can use this ID + with the following new RPC commands which take a list of session ids: + + electrumx_rpc.py log + electrumx_rpc.py disconnect + + The first will toggle logging of the sessions. A logged sesssion + prints every incoming request to the logs. + The second will disconnect the sessions. + Example: "electrumx_rpc.py log 15 369" + version 0.7.20 -------------- diff --git a/server/version.py b/server/version.py index c901cc6..8a6e890 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.7.20" +VERSION = "ElectrumX 0.8.0"