Browse Source

Merge branch 'client-limiting' into develop

master
Neil Booth 8 years ago
parent
commit
64333c3dfd
  1. 88
      server/protocol.py

88
server/protocol.py

@ -108,9 +108,11 @@ class ServerManager(LoggedClass):
def notify(self, height, touched):
'''Notify sessions about height changes and touched addresses.'''
sessions = [session for session in self.sessions
if isinstance(session, ElectrumX)]
ElectrumX.notify(sessions, height, touched)
cache = {}
for session in self.sessions:
if isinstance(session, ElectrumX):
# Use a tuple to distinguish from JSON
session.jobs.put_nowait((height, touched, cache))
def stop(self):
'''Close listening servers.'''
@ -196,7 +198,7 @@ class Session(JSONRPC):
self.coin = bp.coin
self.kind = kind
self.hash168s = set()
self.requests = asyncio.Queue()
self.jobs = asyncio.Queue()
self.current_task = None
self.client = 'unknown'
@ -222,26 +224,23 @@ class Session(JSONRPC):
def on_json_request(self, request):
'''Queue the request for asynchronous handling.'''
self.requests.put_nowait(request)
self.jobs.put_nowait(request)
async def serve_requests(self):
'''Asynchronously run through the task queue.'''
while True:
await asyncio.sleep(0)
request = await self.requests.get()
job = await self.jobs.get()
try:
start = time.time()
await self.handle_json_request(request)
secs = time.time() - start
if secs > 1:
self.logger.warning('slow request for {} took {:.1f}s: {}'
.format(self.peername(), secs,
request))
if isinstance(job, tuple): # Height / mempool notification
await self.notify(*job)
else:
await self.handle_json_request(job)
except asyncio.CancelledError:
break
except Exception:
# Getting here should probably be considered a bug and fixed
self.logger.error('error handling request {}'.format(request))
self.logger.error('error handling request {}'.format(job))
traceback.print_exc()
def peername(self, *, for_log=True):
@ -324,36 +323,41 @@ class ElectrumX(Session):
for prefix, suffixes in rpcs
for suffix in suffixes.split()}
@classmethod
def notify(cls, sessions, height, touched):
headers_payload = height_payload = None
async def notify(self, height, touched, cache):
'''Notify the client about changes in height and touched addresses.
for session in sessions:
if height != session.notified_height:
session.notified_height = height
if session.subscribe_headers:
if headers_payload is None:
headers_payload = json_notification_payload(
'blockchain.headers.subscribe',
(session.electrum_header(height), ),
)
session.send_json(headers_payload)
if session.subscribe_height:
if height_payload is None:
height_payload = json_notification_payload(
'blockchain.numblocks.subscribe',
(height, ),
)
session.send_json(height_payload)
hash168_to_address = session.coin.hash168_to_address
for hash168 in session.hash168s.intersection(touched):
address = hash168_to_address(hash168)
status = session.address_status(hash168)
Cache is a shared cache for this update.
'''
if height != self.notified_height:
self.notified_height = height
if self.subscribe_headers:
key = 'headers_payload'
if key not in cache:
cache[key] = json_notification_payload(
'blockchain.headers.subscribe',
(self.electrum_header(height), ),
)
self.send_json(cache[key])
if self.subscribe_height:
payload = json_notification_payload(
'blockchain.address.subscribe', (address, status))
session.send_json(payload)
'blockchain.numblocks.subscribe',
(height, ),
)
self.send_json(payload)
hash168_to_address = self.coin.hash168_to_address
matches = self.hash168s.intersection(touched)
for hash168 in matches:
address = hash168_to_address(hash168)
status = self.address_status(hash168)
payload = json_notification_payload(
'blockchain.address.subscribe', (address, status))
self.send_json(payload)
if matches:
self.logger.info('notified {} of {} addresses'
.format(self.peername(), len(matches)))
def height(self):
'''Return the block processor's current height.'''

Loading…
Cancel
Save