Browse Source

Merge branch 'bandwidth' into develop

master
Neil Booth 8 years ago
parent
commit
d9082e59a3
  1. 2
      electrumx_rpc.py
  2. 37
      lib/jsonrpc.py
  3. 1
      server/env.py
  4. 4
      server/protocol.py

2
electrumx_rpc.py

@ -23,6 +23,8 @@ from server.protocol import ServerManager
class RPCClient(JSONRPC): class RPCClient(JSONRPC):
async def send_and_wait(self, method, params, timeout=None): async def send_and_wait(self, method, params, timeout=None):
# Raise incoming buffer size - presumably connection is trusted
self.max_buffer_size = 5000000
self.send_json_request(method, id_=method, params=params) self.send_json_request(method, id_=method, params=params)
future = asyncio.ensure_future(self.messages.get()) future = asyncio.ensure_future(self.messages.get())

37
lib/jsonrpc.py

@ -79,6 +79,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.start = time.time() self.start = time.time()
self.bandwidth_start = self.start
self.bandwidth_interval = 3600
self.bandwidth_used = 0
self.bandwidth_limit = 5000000
self.transport = None self.transport = None
# Parts of an incomplete JSON line. We buffer them until # Parts of an incomplete JSON line. We buffer them until
# getting a newline. # getting a newline.
@ -96,6 +100,8 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
# connection. The request causing it is logged. Values under # connection. The request causing it is logged. Values under
# 1000 are treated as 1000. # 1000 are treated as 1000.
self.max_send = 0 self.max_send = 0
# If buffered incoming data exceeds this the connection is closed
self.max_buffer_size = 150000
self.anon_logs = False self.anon_logs = False
def peername(self, *, for_log=True): def peername(self, *, for_log=True):
@ -115,6 +121,13 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
'''Handle client disconnection.''' '''Handle client disconnection.'''
pass pass
def using_bandwidth(self, amount):
now = time.time()
if now >= self.bandwidth_start + self.bandwidth_interval:
self.bandwidth_start = now
self.bandwidth_used = 0
self.bandwidth_used += amount
def data_received(self, data): def data_received(self, data):
'''Handle incoming data (synchronously). '''Handle incoming data (synchronously).
@ -122,6 +135,18 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
decode_message for handling. decode_message for handling.
''' '''
self.recv_size += len(data) self.recv_size += len(data)
self.using_bandwidth(len(data))
# Close abuvsive connections where buffered data exceeds limit
buffer_size = len(data) + sum(len(part) for part in self.parts)
if buffer_size > self.max_buffer_size:
self.logger.error('read buffer of {:,d} bytes exceeds {:,d} '
'byte limit, closing {}'
.format(buffer_size, self.max_buffer_size,
self.peername()))
self.transport.close()
# Do nothing if this connection is closing
if self.transport.is_closing(): if self.transport.is_closing():
return return
@ -200,6 +225,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
else: else:
self.send_count += 1 self.send_count += 1
self.send_size += len(data) self.send_size += len(data)
self.using_bandwidth(len(data))
self.transport.write(data) self.transport.write(data)
async def handle_message(self, message): async def handle_message(self, message):
@ -207,6 +233,17 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
Handles batches according to the JSON 2.0 spec. Handles batches according to the JSON 2.0 spec.
''' '''
# Throttle high-bandwidth connections by delaying processing
# their requests. Delay more the higher the excessive usage.
excess = self.bandwidth_used - self.bandwidth_limit
if excess > 0:
secs = 1 + excess // self.bandwidth_limit
self.logger.warning('{} has high bandwidth use of {:,d} bytes, '
'sleeping {:d}s'
.format(self.peername(), self.bandwidth_used,
secs))
await asyncio.sleep(secs)
if isinstance(message, list): if isinstance(message, list):
payload = await self.batch_payload(message) payload = await self.batch_payload(message)
else: else:

1
server/env.py

@ -49,6 +49,7 @@ class Env(LoggedClass):
self.max_send = self.integer('MAX_SEND', 1000000) self.max_send = self.integer('MAX_SEND', 1000000)
self.max_subs = self.integer('MAX_SUBS', 250000) self.max_subs = self.integer('MAX_SUBS', 250000)
self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000) self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000)
self.bandwidth_limit = self.integer('BANDWIDTH_LIMIT', 2000000)
# IRC # IRC
self.report_tcp_port = self.integer('REPORT_TCP_PORT', self.tcp_port) self.report_tcp_port = self.integer('REPORT_TCP_PORT', self.tcp_port)
self.report_ssl_port = self.integer('REPORT_SSL_PORT', self.ssl_port) self.report_ssl_port = self.integer('REPORT_SSL_PORT', self.ssl_port)

4
server/protocol.py

@ -230,6 +230,8 @@ class ServerManager(util.LoggedClass):
self.subscription_count = 0 self.subscription_count = 0
self.futures = [] self.futures = []
env.max_send = max(350000, env.max_send) env.max_send = max(350000, env.max_send)
self.logger.info('session bandwidth limit {:,d} bytes'
.format(env.bandwidth_limit))
self.logger.info('max response size {:,d} bytes'.format(env.max_send)) self.logger.info('max response size {:,d} bytes'.format(env.max_send))
self.logger.info('max subscriptions across all sessions: {:,d}' self.logger.info('max subscriptions across all sessions: {:,d}'
.format(self.max_subs)) .format(self.max_subs))
@ -471,6 +473,7 @@ class Session(JSONRPC):
self.client = 'unknown' self.client = 'unknown'
self.anon_logs = env.anon_logs self.anon_logs = env.anon_logs
self.max_send = env.max_send self.max_send = env.max_send
self.bandwidth_limit = env.bandwidth_limit
self.txs_sent = 0 self.txs_sent = 0
def connection_made(self, transport): def connection_made(self, transport):
@ -923,3 +926,4 @@ class LocalRPC(Session):
self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd)) self.handlers = {cmd: getattr(self.manager, 'rpc_{}'.format(cmd))
for cmd in cmds} for cmd in cmds}
self.client = 'RPC' self.client = 'RPC'
self.max_send = 5000000

Loading…
Cancel
Save