|
|
@ -79,6 +79,10 @@ class JSONRPC(asyncio.Protocol, LoggedClass): |
|
|
|
def __init__(self): |
|
|
|
super().__init__() |
|
|
|
self.start = time.time() |
|
|
|
self.bandwidth_start = self.start |
|
|
|
self.bandwidth_interval = 3600 |
|
|
|
self.bandwidth_used = 0 |
|
|
|
self.bandwidth_limit = 5000000 |
|
|
|
self.transport = None |
|
|
|
# Parts of an incomplete JSON line. We buffer them until |
|
|
|
# getting a newline. |
|
|
@ -96,6 +100,8 @@ class JSONRPC(asyncio.Protocol, LoggedClass): |
|
|
|
# connection. The request causing it is logged. Values under |
|
|
|
# 1000 are treated as 1000. |
|
|
|
self.max_send = 0 |
|
|
|
# If buffered incoming data exceeds this the connection is closed |
|
|
|
self.max_buffer_size = 150000 |
|
|
|
self.anon_logs = False |
|
|
|
|
|
|
|
def peername(self, *, for_log=True): |
|
|
@ -115,6 +121,13 @@ class JSONRPC(asyncio.Protocol, LoggedClass): |
|
|
|
'''Handle client disconnection.''' |
|
|
|
pass |
|
|
|
|
|
|
|
def using_bandwidth(self, amount): |
|
|
|
now = time.time() |
|
|
|
if now >= self.bandwidth_start + self.bandwidth_interval: |
|
|
|
self.bandwidth_start = now |
|
|
|
self.bandwidth_used = 0 |
|
|
|
self.bandwidth_used += amount |
|
|
|
|
|
|
|
def data_received(self, data): |
|
|
|
'''Handle incoming data (synchronously). |
|
|
|
|
|
|
@ -122,6 +135,18 @@ class JSONRPC(asyncio.Protocol, LoggedClass): |
|
|
|
decode_message for handling. |
|
|
|
''' |
|
|
|
self.recv_size += len(data) |
|
|
|
self.using_bandwidth(len(data)) |
|
|
|
|
|
|
|
# Close abuvsive connections where buffered data exceeds limit |
|
|
|
buffer_size = len(data) + sum(len(part) for part in self.parts) |
|
|
|
if buffer_size > self.max_buffer_size: |
|
|
|
self.logger.error('read buffer of {:,d} bytes exceeds {:,d} ' |
|
|
|
'byte limit, closing {}' |
|
|
|
.format(buffer_size, self.max_buffer_size, |
|
|
|
self.peername())) |
|
|
|
self.transport.close() |
|
|
|
|
|
|
|
# Do nothing if this connection is closing |
|
|
|
if self.transport.is_closing(): |
|
|
|
return |
|
|
|
|
|
|
@ -200,6 +225,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): |
|
|
|
else: |
|
|
|
self.send_count += 1 |
|
|
|
self.send_size += len(data) |
|
|
|
self.using_bandwidth(len(data)) |
|
|
|
self.transport.write(data) |
|
|
|
|
|
|
|
async def handle_message(self, message): |
|
|
@ -207,6 +233,17 @@ class JSONRPC(asyncio.Protocol, LoggedClass): |
|
|
|
|
|
|
|
Handles batches according to the JSON 2.0 spec. |
|
|
|
''' |
|
|
|
# Throttle high-bandwidth connections by delaying processing |
|
|
|
# their requests. Delay more the higher the excessive usage. |
|
|
|
excess = self.bandwidth_used - self.bandwidth_limit |
|
|
|
if excess > 0: |
|
|
|
secs = 1 + excess // self.bandwidth_limit |
|
|
|
self.logger.warning('{} has high bandwidth use of {:,d} bytes, ' |
|
|
|
'sleeping {:d}s' |
|
|
|
.format(self.peername(), self.bandwidth_used, |
|
|
|
secs)) |
|
|
|
await asyncio.sleep(secs) |
|
|
|
|
|
|
|
if isinstance(message, list): |
|
|
|
payload = await self.batch_payload(message) |
|
|
|
else: |
|
|
|