From b65bcda504f8c0e4809e87a21be65b58d1d33a15 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Tue, 29 Nov 2016 22:19:12 +0900 Subject: [PATCH] Add per-session bandwidth limits --- lib/jsonrpc.py | 24 ++++++++++++++++++++++++ server/env.py | 1 + server/protocol.py | 3 +++ 3 files changed, 28 insertions(+) diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index f50a9a9..e11a5af 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -79,6 +79,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass): def __init__(self): super().__init__() self.start = time.time() + self.bandwidth_start = self.start + self.bandwidth_interval = 3600 + self.bandwidth_used = 0 + self.bandwidth_limit = 5000000 self.transport = None # Parts of an incomplete JSON line. We buffer them until # getting a newline. @@ -117,6 +121,13 @@ class JSONRPC(asyncio.Protocol, LoggedClass): '''Handle client disconnection.''' pass + def using_bandwidth(self, amount): + now = time.time() + if now >= self.bandwidth_start + self.bandwidth_interval: + self.bandwidth_start = now + self.bandwidth_used = 0 + self.bandwidth_used += amount + def data_received(self, data): '''Handle incoming data (synchronously). @@ -124,6 +135,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): decode_message for handling. ''' self.recv_size += len(data) + self.using_bandwidth(len(data)) # Close abuvsive connections where buffered data exceeds limit buffer_size = len(data) + sum(len(part) for part in self.parts) @@ -213,6 +225,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): else: self.send_count += 1 self.send_size += len(data) + self.using_bandwidth(len(data)) self.transport.write(data) async def handle_message(self, message): @@ -220,6 +233,17 @@ class JSONRPC(asyncio.Protocol, LoggedClass): Handles batches according to the JSON 2.0 spec. ''' + # Throttle high-bandwidth connections by delaying processing + # their requests. Delay more the higher the excessive usage. + excess = self.bandwidth_used - self.bandwidth_limit + if excess > 0: + secs = 1 + excess // self.bandwidth_limit + self.logger.warning('{} has high bandwidth use of {:,d} bytes, ' + 'sleeping {:d}s' + .format(self.peername(), self.bandwidth_used, + secs)) + await asyncio.sleep(secs) + if isinstance(message, list): payload = await self.batch_payload(message) else: diff --git a/server/env.py b/server/env.py index 6b6f111..58b8b9d 100644 --- a/server/env.py +++ b/server/env.py @@ -49,6 +49,7 @@ class Env(LoggedClass): self.max_send = self.integer('MAX_SEND', 1000000) self.max_subs = self.integer('MAX_SUBS', 250000) self.max_session_subs = self.integer('MAX_SESSION_SUBS', 50000) + self.bandwidth_limit = self.integer('BANDWIDTH_LIMIT', 2000000) # IRC self.report_tcp_port = self.integer('REPORT_TCP_PORT', self.tcp_port) self.report_ssl_port = self.integer('REPORT_SSL_PORT', self.ssl_port) diff --git a/server/protocol.py b/server/protocol.py index d0abf3f..61cb456 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -230,6 +230,8 @@ class ServerManager(util.LoggedClass): self.subscription_count = 0 self.futures = [] env.max_send = max(350000, env.max_send) + self.logger.info('session bandwidth limit {:,d} bytes' + .format(env.bandwidth_limit)) self.logger.info('max response size {:,d} bytes'.format(env.max_send)) self.logger.info('max subscriptions across all sessions: {:,d}' .format(self.max_subs)) @@ -471,6 +473,7 @@ class Session(JSONRPC): self.client = 'unknown' self.anon_logs = env.anon_logs self.max_send = env.max_send + self.bandwidth_limit = env.bandwidth_limit self.txs_sent = 0 def connection_made(self, transport):