Browse Source

Put notifications on the session queue

This keeps the network in-order and means slow clients get slow
notifications, which seems fairer.

Add some simple logging
master
Neil Booth 8 years ago
parent
commit
f17ad2ddf4
  1. 31
      server/protocol.py

31
server/protocol.py

@ -111,7 +111,8 @@ class ServerManager(LoggedClass):
cache = {} cache = {}
for session in self.sessions: for session in self.sessions:
if isinstance(session, ElectrumX): if isinstance(session, ElectrumX):
session.notify(height, touched, cache) # Use a tuple to distinguish from JSON
session.jobs.put_nowait((height, touched, cache))
def stop(self): def stop(self):
'''Close listening servers.''' '''Close listening servers.'''
@ -197,7 +198,7 @@ class Session(JSONRPC):
self.coin = bp.coin self.coin = bp.coin
self.kind = kind self.kind = kind
self.hash168s = set() self.hash168s = set()
self.requests = asyncio.Queue() self.jobs = asyncio.Queue()
self.current_task = None self.current_task = None
self.client = 'unknown' self.client = 'unknown'
@ -223,26 +224,23 @@ class Session(JSONRPC):
def on_json_request(self, request): def on_json_request(self, request):
'''Queue the request for asynchronous handling.''' '''Queue the request for asynchronous handling.'''
self.requests.put_nowait(request) self.jobs.put_nowait(request)
async def serve_requests(self): async def serve_requests(self):
'''Asynchronously run through the task queue.''' '''Asynchronously run through the task queue.'''
while True: while True:
await asyncio.sleep(0) await asyncio.sleep(0)
request = await self.requests.get() job = await self.jobs.get()
try: try:
start = time.time() if isinstance(job, tuple): # Height / mempool notification
await self.handle_json_request(request) await self.notify(*job)
secs = time.time() - start else:
if secs > 1: await self.handle_json_request(job)
self.logger.warning('slow request for {} took {:.1f}s: {}'
.format(self.peername(), secs,
request))
except asyncio.CancelledError: except asyncio.CancelledError:
break break
except Exception: except Exception:
# Getting here should probably be considered a bug and fixed # Getting here should probably be considered a bug and fixed
self.logger.error('error handling request {}'.format(request)) self.logger.error('error handling request {}'.format(job))
traceback.print_exc() traceback.print_exc()
def peername(self, *, for_log=True): def peername(self, *, for_log=True):
@ -325,7 +323,7 @@ class ElectrumX(Session):
for prefix, suffixes in rpcs for prefix, suffixes in rpcs
for suffix in suffixes.split()} for suffix in suffixes.split()}
def notify(self, height, touched, cache): async def notify(self, height, touched, cache):
'''Notify the client about changes in height and touched addresses. '''Notify the client about changes in height and touched addresses.
Cache is a shared cache for this update. Cache is a shared cache for this update.
@ -349,13 +347,18 @@ class ElectrumX(Session):
self.send_json(payload) self.send_json(payload)
hash168_to_address = self.coin.hash168_to_address hash168_to_address = self.coin.hash168_to_address
for hash168 in self.hash168s.intersection(touched): matches = self.hash168s.intersection(touched)
for hash168 in matches:
address = hash168_to_address(hash168) address = hash168_to_address(hash168)
status = self.address_status(hash168) status = self.address_status(hash168)
payload = json_notification_payload( payload = json_notification_payload(
'blockchain.address.subscribe', (address, status)) 'blockchain.address.subscribe', (address, status))
self.send_json(payload) self.send_json(payload)
if matches:
self.logger.info('notified {} of {} addresses'
.format(self.peername(), len(matches)))
def height(self): def height(self):
'''Return the block processor's current height.''' '''Return the block processor's current height.'''
return self.bp.height return self.bp.height

Loading…
Cancel
Save