Browse Source

Merge branch 'release-0.8.0'

master 0.8.0
Neil Booth 8 years ago
parent
commit
4825fbba17
  1. 19
      RELEASE-NOTES
  2. 3
      docs/ENV-NOTES
  3. 29
      lib/jsonrpc.py
  4. 10
      lib/util.py
  5. 1
      server/env.py
  6. 99
      server/protocol.py
  7. 2
      server/version.py

19
RELEASE-NOTES

@ -1,3 +1,22 @@
verion 0.8.0
------------
- stale connections are periodically closed. See docs/ENV-NOTES for
SESSION_TIMEOUT, default is 10 minutes. Issue #56.
- each session gets its own ID which is used in the logs instead of its
network address; the network address is still shown on initial connection.
Issue #55.
- the session ID is also shown in the sessions list. You can use this ID
with the following new RPC commands which take a list of session ids:
electrumx_rpc.py log
electrumx_rpc.py disconnect
The first will toggle logging of the sessions. A logged sesssion
prints every incoming request to the logs.
The second will disconnect the sessions.
Example: "electrumx_rpc.py log 15 369"
version 0.7.20 version 0.7.20
-------------- --------------

3
docs/ENV-NOTES

@ -83,6 +83,9 @@ BANDWIDTH_LIMIT - per-session periodic bandwith usage limit in bytes.
end of each period. Currently the period is end of each period. Currently the period is
hard-coded to be one hour. The default limit value hard-coded to be one hour. The default limit value
is 2 million bytes. is 2 million bytes.
SESSION_TIMEOUT - an integer number of seconds defaulting to 600.
Sessions with no activity for longer than this are
disconnected.
If you want IRC connectivity to advertise your node: If you want IRC connectivity to advertise your node:

29
lib/jsonrpc.py

@ -64,6 +64,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
INTERNAL_ERROR = -32603 INTERNAL_ERROR = -32603
ID_TYPES = (type(None), str, numbers.Number) ID_TYPES = (type(None), str, numbers.Number)
NEXT_SESSION_ID = 0
class RPCError(Exception): class RPCError(Exception):
'''RPC handlers raise this error.''' '''RPC handlers raise this error.'''
@ -79,6 +80,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.start = time.time() self.start = time.time()
self.last_recv = self.start
self.bandwidth_start = self.start self.bandwidth_start = self.start
self.bandwidth_interval = 3600 self.bandwidth_interval = 3600
self.bandwidth_used = 0 self.bandwidth_used = 0
@ -103,6 +105,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
# If buffered incoming data exceeds this the connection is closed # If buffered incoming data exceeds this the connection is closed
self.max_buffer_size = 1000000 self.max_buffer_size = 1000000
self.anon_logs = False self.anon_logs = False
self.id_ = JSONRPC.NEXT_SESSION_ID
JSONRPC.NEXT_SESSION_ID += 1
self.log_prefix = '[{:d}] '.format(self.id_)
self.log_me = False
def peername(self, *, for_log=True): def peername(self, *, for_log=True):
'''Return the peer name of this connection.''' '''Return the peer name of this connection.'''
@ -140,10 +146,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
# Close abuvsive connections where buffered data exceeds limit # Close abuvsive connections where buffered data exceeds limit
buffer_size = len(data) + sum(len(part) for part in self.parts) buffer_size = len(data) + sum(len(part) for part in self.parts)
if buffer_size > self.max_buffer_size: if buffer_size > self.max_buffer_size:
self.logger.error('read buffer of {:,d} bytes exceeds {:,d} ' self.log_error('read buffer of {:,d} bytes exceeds {:,d} '
'byte limit, closing {}' 'byte limit, closing {}'
.format(buffer_size, self.max_buffer_size, .format(buffer_size, self.max_buffer_size,
self.peername())) self.peername()))
self.transport.close() self.transport.close()
# Do nothing if this connection is closing # Do nothing if this connection is closing
@ -155,6 +161,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
if npos == -1: if npos == -1:
self.parts.append(data) self.parts.append(data)
break break
self.last_recv = time.time()
self.recv_count += 1 self.recv_count += 1
tail, data = data[:npos], data[npos + 1:] tail, data = data[:npos], data[npos + 1:]
parts, self.parts = self.parts, [] parts, self.parts = self.parts, []
@ -184,6 +191,8 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
'''Queue the request for asynchronous handling.''' '''Queue the request for asynchronous handling.'''
self.messages.put_nowait(message) self.messages.put_nowait(message)
if self.log_me:
self.log_info('queued {}'.format(message))
def send_json_notification(self, method, params): def send_json_notification(self, method, params):
'''Create a json notification.''' '''Create a json notification.'''
@ -216,7 +225,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
data = (json.dumps(payload) + '\n').encode() data = (json.dumps(payload) + '\n').encode()
except TypeError: except TypeError:
msg = 'JSON encoding failure: {}'.format(payload) msg = 'JSON encoding failure: {}'.format(payload)
self.logger.error(msg) self.log_error(msg)
self.send_json_error(msg, self.INTERNAL_ERROR, id_) self.send_json_error(msg, self.INTERNAL_ERROR, id_)
else: else:
if len(data) > max(1000, self.max_send): if len(data) > max(1000, self.max_send):
@ -239,10 +248,9 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
excess = self.bandwidth_used - self.bandwidth_limit excess = self.bandwidth_used - self.bandwidth_limit
if excess > 0: if excess > 0:
secs = 1 + excess // self.bandwidth_limit secs = 1 + excess // self.bandwidth_limit
self.logger.warning('{} has high bandwidth use of {:,d} bytes, ' self.log_warning('high bandwidth use of {:,d} bytes, '
'sleeping {:d}s' 'sleeping {:d}s'
.format(self.peername(), self.bandwidth_used, .format(self.bandwidth_used, secs))
secs))
await asyncio.sleep(secs) await asyncio.sleep(secs)
if isinstance(message, list): if isinstance(message, list):
@ -254,8 +262,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
try: try:
self.send_json(payload) self.send_json(payload)
except self.LargeRequestError: except self.LargeRequestError:
self.logger.warning('blocked large request from {}: {}' self.log_warning('blocked large request {}'.format(message))
.format(self.peername(), message))
async def batch_payload(self, batch): async def batch_payload(self, batch):
'''Return the JSON payload corresponding to a batch JSON request.''' '''Return the JSON payload corresponding to a batch JSON request.'''

10
lib/util.py

@ -20,6 +20,16 @@ class LoggedClass(object):
def __init__(self): def __init__(self):
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
self.logger.setLevel(logging.INFO) self.logger.setLevel(logging.INFO)
self.log_prefix = ''
def log_info(self, msg):
self.logger.info(self.log_prefix + msg)
def log_warning(self, msg):
self.logger.warning(self.log_prefix + msg)
def log_error(self, msg):
self.logger.error(self.log_prefix + msg)
# Method decorator. To be used for calculations that will always # Method decorator. To be used for calculations that will always

1
server/env.py

@ -50,6 +50,7 @@ class Env(LoggedClass):
self.max_subs = self.integer('MAX_SUBS', 250000) self.max_subs = self.integer('MAX_SUBS', 250000)
self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000) self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000)
self.bandwidth_limit = self.integer('BANDWIDTH_LIMIT', 2000000) self.bandwidth_limit = self.integer('BANDWIDTH_LIMIT', 2000000)
self.session_timeout = self.integer('SESSION_TIMEOUT', 600)
# IRC # IRC
self.irc = self.default('IRC', False) self.irc = self.default('IRC', False)
self.irc_nick = self.default('IRC_NICK', None) self.irc_nick = self.default('IRC_NICK', None)

99
server/protocol.py

@ -228,8 +228,11 @@ class ServerManager(util.LoggedClass):
self.next_log_sessions = 0 self.next_log_sessions = 0
self.max_subs = env.max_subs self.max_subs = env.max_subs
self.subscription_count = 0 self.subscription_count = 0
self.next_stale_check = 0
self.futures = [] self.futures = []
env.max_send = max(350000, env.max_send) env.max_send = max(350000, env.max_send)
self.logger.info('session timeout: {:,d} seconds'
.format(env.session_timeout))
self.logger.info('session bandwidth limit {:,d} bytes' self.logger.info('session bandwidth limit {:,d} bytes'
.format(env.bandwidth_limit)) .format(env.bandwidth_limit))
self.logger.info('max response size {:,d} bytes'.format(env.max_send)) self.logger.info('max response size {:,d} bytes'.format(env.max_send))
@ -354,24 +357,48 @@ class ServerManager(util.LoggedClass):
.format(len(self.sessions))) .format(len(self.sessions)))
def add_session(self, session): def add_session(self, session):
self.clear_stale_sessions()
coro = session.serve_requests() coro = session.serve_requests()
future = asyncio.ensure_future(coro) future = asyncio.ensure_future(coro)
self.sessions[session] = future self.sessions[session] = future
self.logger.info('connection from {}, {:,d} total' session.log_info('connection from {}, {:,d} total'
.format(session.peername(), len(self.sessions))) .format(session.peername(), len(self.sessions)))
# Some connections are acknowledged after the servers are closed # Some connections are acknowledged after the servers are closed
if not self.servers: if not self.servers:
self.close_session(session) self.close_session(session)
def remove_session(self, session):
self.subscription_count -= session.sub_count()
future = self.sessions.pop(session)
future.cancel()
def close_session(self, session): def close_session(self, session):
'''Close the session's transport and cancel its future.''' '''Close the session's transport and cancel its future.'''
session.transport.close() session.transport.close()
self.sessions[session].cancel() self.sessions[session].cancel()
return '{:d} disconnected'.format(session.id_)
def remove_session(self, session): def toggle_logging(self, session):
self.subscription_count -= session.sub_count() '''Close the session's transport and cancel its future.'''
future = self.sessions.pop(session) session.log_me = not session.log_me
future.cancel() if session.log_me:
return 'logging {:d}'.format(session.id_)
else:
return 'not logging {:d}'.format(session.id_)
def clear_stale_sessions(self):
'''Cut off sessions that haven't done anything for 10 minutes.'''
now = time.time()
if now > self.next_stale_check:
self.next_stale_check = now + 60
cutoff = now - self.env.session_timeout
stale = [session for session in self.sessions
if session.last_recv < cutoff]
for session in stale:
self.close_session(session)
if stale:
self.logger.info('dropped {:,d} stale connections'
.format(len(stale)))
def new_subscription(self): def new_subscription(self):
if self.subscription_count >= self.max_subs: if self.subscription_count >= self.max_subs:
@ -408,14 +435,15 @@ class ServerManager(util.LoggedClass):
return ('{:3d}:{:02d}:{:02d}' return ('{:3d}:{:02d}:{:02d}'
.format(t // 3600, (t % 3600) // 60, t % 60)) .format(t // 3600, (t % 3600) // 60, t % 60))
fmt = ('{:<4} {:>23} {:>15} {:>7} ' fmt = ('{:<4} {:>7} {:>23} {:>15} {:>7} '
'{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}') '{:>7} {:>7} {:>7} {:>7} {:>5} {:>9}')
yield fmt.format('Type', 'Peer', 'Client', 'Subs', yield fmt.format('Type', 'ID ', 'Peer', 'Client', 'Subs',
'Recv', 'Recv KB', 'Sent', 'Sent KB', 'Recv', 'Recv KB', 'Sent', 'Sent KB',
'Txs', 'Time') 'Txs', 'Time')
for (kind, peer, subs, client, recv_count, recv_size, for (kind, id_, log_me, peer, subs, client, recv_count, recv_size,
send_count, send_size, txs_sent, time) in data: send_count, send_size, txs_sent, time) in data:
yield fmt.format(kind, peer, client, yield fmt.format(kind, str(id_) + ('L' if log_me else ' '),
peer, client,
'{:,d}'.format(subs), '{:,d}'.format(subs),
'{:,d}'.format(recv_count), '{:,d}'.format(recv_count),
'{:,d}'.format(recv_size // 1024), '{:,d}'.format(recv_size // 1024),
@ -429,6 +457,8 @@ class ServerManager(util.LoggedClass):
now = time.time() now = time.time()
sessions = sorted(self.sessions.keys(), key=lambda s: s.start) sessions = sorted(self.sessions.keys(), key=lambda s: s.start)
return [(session.kind, return [(session.kind,
session.id_,
session.log_me,
session.peername(for_log=for_log), session.peername(for_log=for_log),
session.sub_count(), session.sub_count(),
session.client, session.client,
@ -438,6 +468,33 @@ class ServerManager(util.LoggedClass):
now - session.start) now - session.start)
for session in sessions] for session in sessions]
def lookup_session(self, param):
try:
id_ = int(param)
except:
pass
else:
for session in self.sessions:
if session.id_ == id_:
return session
return None
def for_each_session(self, params, operation):
result = []
for param in params:
session = self.lookup_session(param)
if session:
result.append(operation(session))
else:
result.append('unknown session: {}'.format(param))
return result
async def rpc_disconnect(self, params):
return self.for_each_session(params, self.close_session)
async def rpc_log(self, params):
return self.for_each_session(params, self.toggle_logging)
async def rpc_getinfo(self, params): async def rpc_getinfo(self, params):
return self.server_summary() return self.server_summary()
@ -485,10 +542,10 @@ class Session(JSONRPC):
'''Handle client disconnection.''' '''Handle client disconnection.'''
super().connection_lost(exc) super().connection_lost(exc)
if self.error_count or self.send_size >= 1024*1024: if self.error_count or self.send_size >= 1024*1024:
self.logger.info('{} disconnected. ' self.log_info('disconnected. Sent {:,d} bytes in {:,d} messages '
'Sent {:,d} bytes in {:,d} messages {:,d} errors' '{:,d} errors'
.format(self.peername(), self.send_size, .format(self.send_size, self.send_count,
self.send_count, self.error_count)) self.error_count))
self.manager.remove_session(self) self.manager.remove_session(self)
async def handle_request(self, method, params): async def handle_request(self, method, params):
@ -514,7 +571,7 @@ class Session(JSONRPC):
break break
except Exception: except Exception:
# Getting here should probably be considered a bug and fixed # Getting here should probably be considered a bug and fixed
self.logger.error('error handling request {}'.format(message)) self.log_error('error handling request {}'.format(message))
traceback.print_exc() traceback.print_exc()
def sub_count(self): def sub_count(self):
@ -638,8 +695,7 @@ class ElectrumX(Session):
self.send_json(payload) self.send_json(payload)
if matches: if matches:
self.logger.info('notified {} of {} addresses' self.log_info('notified of {:,d} addresses'.format(len(matches)))
.format(self.peername(), len(matches)))
def height(self): def height(self):
'''Return the current flushed database height.''' '''Return the current flushed database height.'''
@ -825,12 +881,12 @@ class ElectrumX(Session):
tx_hash = await self.daemon.sendrawtransaction(params) tx_hash = await self.daemon.sendrawtransaction(params)
self.txs_sent += 1 self.txs_sent += 1
self.manager.txs_sent += 1 self.manager.txs_sent += 1
self.logger.info('sent tx: {}'.format(tx_hash)) self.log_info('sent tx: {}'.format(tx_hash))
return tx_hash return tx_hash
except DaemonError as e: except DaemonError as e:
error = e.args[0] error = e.args[0]
message = error['message'] message = error['message']
self.logger.info('sendrawtransaction: {}'.format(message)) self.log_info('sendrawtransaction: {}'.format(message))
if 'non-mandatory-script-verify-flag' in message: if 'non-mandatory-script-verify-flag' in message:
return ( return (
'Your client produced a transaction that is not accepted ' 'Your client produced a transaction that is not accepted '
@ -886,8 +942,8 @@ class ElectrumX(Session):
with codecs.open(self.env.banner_file, 'r', 'utf-8') as f: with codecs.open(self.env.banner_file, 'r', 'utf-8') as f:
banner = f.read() banner = f.read()
except Exception as e: except Exception as e:
self.logger.error('reading banner file {}: {}' self.log_error('reading banner file {}: {}'
.format(self.env.banner_file, e)) .format(self.env.banner_file, e))
else: else:
network_info = await self.daemon.getnetworkinfo() network_info = await self.daemon.getnetworkinfo()
version = network_info['version'] version = network_info['version']
@ -932,7 +988,8 @@ class LocalRPC(Session):
def __init__(self, *args): def __init__(self, *args):
super().__init__(*args) super().__init__(*args)
cmds = 'getinfo sessions numsessions peers numpeers'.split() cmds = ('disconnect getinfo log numpeers numsessions peers sessions'
.split())
self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd)) self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd))
for cmd in cmds} for cmd in cmds}
self.client = 'RPC' self.client = 'RPC'

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.7.20" VERSION = "ElectrumX 0.8.0"

Loading…
Cancel
Save