Browse Source

Merge branch 'develop'

master 0.8.12
Neil Booth 8 years ago
parent
commit
42868f899c
  1. 7
      RELEASE-NOTES
  2. 2
      electrumx_rpc.py
  3. 76
      lib/jsonrpc.py
  4. 82
      server/protocol.py
  5. 2
      server/version.py

7
RELEASE-NOTES

@ -1,3 +1,10 @@
version 0.8.12
--------------
- pause serving sessions whose send buffers are full (anti-DoS). This
is currently logged; let me know if it's too verbose
- various tweaks to request handling
version 0.8.11 version 0.8.11
-------------- --------------

2
electrumx_rpc.py

@ -43,7 +43,7 @@ class RPCClient(JSONRPC):
future.cancel() future.cancel()
print('request timed out after {}s'.format(timeout)) print('request timed out after {}s'.format(timeout))
else: else:
await request.process(1) await request.process(self)
async def handle_response(self, result, error, method): async def handle_response(self, result, error, method):
if result and method in ('groups', 'sessions'): if result and method in ('groups', 'sessions'):

76
lib/jsonrpc.py

@ -10,69 +10,64 @@
import asyncio import asyncio
import json import json
import numbers import numbers
import socket
import time import time
from lib.util import LoggedClass from lib.util import LoggedClass
class SingleRequest(object): class RequestBase(object):
'''An object that represents a queued request.'''
def __init__(self, remaining):
self.remaining = remaining
class SingleRequest(RequestBase):
'''An object that represents a single request.''' '''An object that represents a single request.'''
def __init__(self, session, payload):
self.payload = payload
self.session = session
self.count = 1
def remaining(self): def __init__(self, payload):
return self.count super().__init__(1)
self.payload = payload
async def process(self, limit): async def process(self, session):
'''Asynchronously handle the JSON request.''' '''Asynchronously handle the JSON request.'''
binary = await self.session.process_single_payload(self.payload) self.remaining = 0
binary = await session.process_single_payload(self.payload)
if binary: if binary:
self.session._send_bytes(binary) session._send_bytes(binary)
self.count = 0
return 1
def __str__(self): def __str__(self):
return str(self.payload) return str(self.payload)
class BatchRequest(object): class BatchRequest(RequestBase):
'''An object that represents a batch request and its processing state. '''An object that represents a batch request and its processing state.
Batches are processed in chunks. Batches are processed in chunks.
''' '''
def __init__(self, session, payload): def __init__(self, payload):
self.session = session super().__init__(len(payload))
self.payload = payload self.payload = payload
self.done = 0
self.parts = [] self.parts = []
def remaining(self): async def process(self, session):
return len(self.payload) - self.done
async def process(self, limit):
'''Asynchronously handle the JSON batch according to the JSON 2.0 '''Asynchronously handle the JSON batch according to the JSON 2.0
spec.''' spec.'''
count = min(limit, self.remaining()) target = max(self.remaining - 4, 0)
for n in range(count): while self.remaining > target:
item = self.payload[self.done] item = self.payload[len(self.payload) - self.remaining]
part = await self.session.process_single_payload(item) self.remaining -= 1
part = await session.process_single_payload(item)
if part: if part:
self.parts.append(part) self.parts.append(part)
self.done += 1
total_len = sum(len(part) + 2 for part in self.parts) total_len = sum(len(part) + 2 for part in self.parts)
self.session.check_oversized_request(total_len) session.check_oversized_request(total_len)
if not self.remaining(): if not self.remaining:
if self.parts: if self.parts:
binary = b'[' + b', '.join(self.parts) + b']' binary = b'[' + b', '.join(self.parts) + b']'
self.session._send_bytes(binary) session._send_bytes(binary)
return count
def __str__(self): def __str__(self):
return str(self.payload) return str(self.payload)
@ -152,7 +147,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
self.bandwidth_used = 0 self.bandwidth_used = 0
self.bandwidth_limit = 5000000 self.bandwidth_limit = 5000000
self.transport = None self.transport = None
self.socket = None self.pause = False
# Parts of an incomplete JSON line. We buffer them until # Parts of an incomplete JSON line. We buffer them until
# getting a newline. # getting a newline.
self.parts = [] self.parts = []
@ -188,13 +183,22 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
'''Handle an incoming client connection.''' '''Handle an incoming client connection.'''
self.transport = transport self.transport = transport
self.peer_info = transport.get_extra_info('peername') self.peer_info = transport.get_extra_info('peername')
self.socket = transport.get_extra_info('socket') transport.set_write_buffer_limits(high=500000)
self.socket.settimeout(10)
def connection_lost(self, exc): def connection_lost(self, exc):
'''Handle client disconnection.''' '''Handle client disconnection.'''
pass pass
def pause_writing(self):
'''Called by asyncio when the write buffer is full.'''
self.log_info('pausing writing')
self.pause = True
def resume_writing(self):
'''Called by asyncio when the write buffer has room.'''
self.log_info('resuming writing')
self.pause = False
def close_connection(self): def close_connection(self):
self.stop = time.time() self.stop = time.time()
if self.transport: if self.transport:
@ -267,9 +271,9 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
if not message: if not message:
self.send_json_error('empty batch', self.INVALID_REQUEST) self.send_json_error('empty batch', self.INVALID_REQUEST)
return return
request = BatchRequest(self, message) request = BatchRequest(message)
else: else:
request = SingleRequest(self, message) request = SingleRequest(message)
'''Queue the request for asynchronous handling.''' '''Queue the request for asynchronous handling.'''
self.enqueue_request(request) self.enqueue_request(request)

82
server/protocol.py

@ -11,7 +11,6 @@
import asyncio import asyncio
import codecs import codecs
import json import json
import socket
import ssl import ssl
import time import time
import traceback import traceback
@ -22,7 +21,7 @@ from functools import partial
import pylru import pylru
from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
from lib.jsonrpc import JSONRPC from lib.jsonrpc import JSONRPC, RequestBase
from lib.tx import Deserializer from lib.tx import Deserializer
import lib.util as util import lib.util as util
from server.block_processor import BlockProcessor from server.block_processor import BlockProcessor
@ -218,16 +217,15 @@ class ServerManager(util.LoggedClass):
BANDS = 5 BANDS = 5
class NotificationRequest(object): class NotificationRequest(RequestBase):
def __init__(self, fn_call): def __init__(self, height, touched):
self.fn_call = fn_call super().__init__(1)
self.height = height
self.touched = touched
def remaining(self): async def process(self, session):
return 0 self.remaining = 0
await session.notify(self.height, self.touched)
async def process(self, limit):
await self.fn_call()
return 0
def __init__(self, env): def __init__(self, env):
super().__init__() super().__init__()
@ -295,7 +293,7 @@ class ServerManager(util.LoggedClass):
if isinstance(session, LocalRPC): if isinstance(session, LocalRPC):
return 0 return 0
group_bandwidth = sum(s.bandwidth_used for s in self.sessions[session]) group_bandwidth = sum(s.bandwidth_used for s in self.sessions[session])
return (bisect_left(self.bands, session.bandwidth_used) return 1 + (bisect_left(self.bands, session.bandwidth_used)
+ bisect_left(self.bands, group_bandwidth) + 1) // 2 + bisect_left(self.bands, group_bandwidth) + 1) // 2
async def enqueue_delayed_sessions(self): async def enqueue_delayed_sessions(self):
@ -318,9 +316,14 @@ class ServerManager(util.LoggedClass):
item = (priority, self.next_queue_id, session) item = (priority, self.next_queue_id, session)
self.next_queue_id += 1 self.next_queue_id += 1
secs = priority - self.BANDS secs = int(session.pause)
if secs >= 0: if secs:
session.log_info('delaying processing whilst paused')
excess = priority - self.BANDS
if excess > 0:
secs = excess
session.log_info('delaying response {:d}s'.format(secs)) session.log_info('delaying response {:d}s'.format(secs))
if secs:
self.delayed_sessions.append((time.time() + secs, item)) self.delayed_sessions.append((time.time() + secs, item))
else: else:
self.queue.put_nowait(item) self.queue.put_nowait(item)
@ -404,8 +407,8 @@ class ServerManager(util.LoggedClass):
for session in self.sessions: for session in self.sessions:
if isinstance(session, ElectrumX): if isinstance(session, ElectrumX):
fn_call = partial(session.notify, self.bp.db_height, touched) request = self.NotificationRequest(self.bp.db_height, touched)
session.enqueue_request(self.NotificationRequest(fn_call)) session.enqueue_request(request)
# Periodically log sessions # Periodically log sessions
if self.env.log_sessions and time.time() > self.next_log_sessions: if self.env.log_sessions and time.time() > self.next_log_sessions:
data = self.session_data(for_log=True) data = self.session_data(for_log=True)
@ -481,7 +484,7 @@ class ServerManager(util.LoggedClass):
if now > self.next_stale_check: if now > self.next_stale_check:
self.next_stale_check = now + 60 self.next_stale_check = now + 60
self.clear_stale_sessions() self.clear_stale_sessions()
group = self.groups[int(session.start - self.start) // 60] group = self.groups[int(session.start - self.start) // 180]
group.add(session) group.add(session)
self.sessions[session] = group self.sessions[session] = group
session.log_info('connection from {}, {:,d} total' session.log_info('connection from {}, {:,d} total'
@ -514,22 +517,22 @@ class ServerManager(util.LoggedClass):
stale = [] stale = []
for session in self.sessions: for session in self.sessions:
if session.is_closing(): if session.is_closing():
if session.stop <= shutdown_cutoff and session.socket: if session.stop <= shutdown_cutoff:
try: session.transport.abort()
# Force shut down - a call to connection_lost
# should come soon after
session.socket.shutdown(socket.SHUT_RDWR)
except socket.error:
pass
elif session.last_recv < stale_cutoff: elif session.last_recv < stale_cutoff:
self.close_session(session) self.close_session(session)
stale.append(session.id_) stale.append(session.id_)
if stale: if stale:
self.logger.info('closing stale connections {}'.format(stale)) self.logger.info('closing stale connections {}'.format(stale))
# Clear out empty groups # Consolidate small groups
for key in [k for k, v in self.groups.items() if not v]: keys = [k for k, v in self.groups.items() if len(v) <= 2
and sum(session.bandwidth_used for session in v) < 10000]
if len(keys) > 1:
group = set.union(*(self.groups[key] for key in keys))
for key in keys:
del self.groups[key] del self.groups[key]
self.groups[max(keys)] = group
def new_subscription(self): def new_subscription(self):
if self.subscription_count >= self.max_subs: if self.subscription_count >= self.max_subs:
@ -574,7 +577,7 @@ class ServerManager(util.LoggedClass):
fmt = ('{:<6} {:>9} {:>9} {:>6} {:>6} {:>8}' fmt = ('{:<6} {:>9} {:>9} {:>6} {:>6} {:>8}'
'{:>7} {:>9} {:>7} {:>9}') '{:>7} {:>9} {:>7} {:>9}')
yield fmt.format('ID', 'Sessions', 'Bw Qta KB', 'Reqs', 'Txs', 'Subs', yield fmt.format('ID', 'Sessions', 'Bwidth KB', 'Reqs', 'Txs', 'Subs',
'Recv', 'Recv KB', 'Sent', 'Sent KB') 'Recv', 'Recv KB', 'Sent', 'Sent KB')
for (id_, session_count, bandwidth, reqs, txs_sent, subs, for (id_, session_count, bandwidth, reqs, txs_sent, subs,
recv_count, recv_size, send_count, send_size) in data: recv_count, recv_size, send_count, send_size) in data:
@ -734,7 +737,7 @@ class Session(JSONRPC):
return status return status
def requests_remaining(self): def requests_remaining(self):
return sum(request.remaining() for request in self.requests) return sum(request.remaining for request in self.requests)
def enqueue_request(self, request): def enqueue_request(self, request):
'''Add a request to the session's list.''' '''Add a request to the session's list.'''
@ -744,28 +747,27 @@ class Session(JSONRPC):
async def serve_requests(self): async def serve_requests(self):
'''Serve requests in batches.''' '''Serve requests in batches.'''
done_reqs = 0 total = 0
done_jobs = 0 errs = []
limit = 4 # Process 8 items at a time
for request in self.requests: for request in self.requests:
try: try:
done_jobs += await request.process(limit - done_jobs) initial = request.remaining
await request.process(self)
total += initial - request.remaining
except asyncio.CancelledError: except asyncio.CancelledError:
raise raise
except Exception: except Exception:
# Getting here should probably be considered a bug and fixed # Should probably be considered a bug and fixed
self.log_error('error handling request {}'.format(request)) self.log_error('error handling request {}'.format(request))
traceback.print_exc() traceback.print_exc()
done_reqs += 1 errs.append(request)
else: if total >= 8:
if not request.remaining():
done_reqs += 1
if done_jobs >= limit:
break break
# Remove completed requests and re-enqueue ourself if any remain. # Remove completed requests and re-enqueue ourself if any remain.
if done_reqs: self.requests = [req for req in self.requests
self.requests = self.requests[done_reqs:] if req.remaining and not req in errs]
if self.requests: if self.requests:
self.manager.enqueue_session(self) self.manager.enqueue_session(self)

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.8.11" VERSION = "ElectrumX 0.8.12"

Loading…
Cancel
Save