Browse Source

Tidy up request and notification sending

master
Neil Booth 8 years ago
parent
commit
ed0646efbf
  1. 4
      electrumx_rpc.py
  2. 24
      lib/jsonrpc.py
  3. 27
      server/session.py

4
electrumx_rpc.py

@ -35,8 +35,7 @@ class RPCClient(JSONRPC):
self.max_buffer_size = 5000000 self.max_buffer_size = 5000000
if params: if params:
params = [params] params = [params]
payload = self.request_payload(method, id_=method, params=params) self.send_request(method, method, params)
self.encode_and_send_payload(payload)
future = asyncio.ensure_future(self.queue.get()) future = asyncio.ensure_future(self.queue.get())
for f in asyncio.as_completed([future], timeout=timeout): for f in asyncio.as_completed([future], timeout=timeout):
@ -80,6 +79,7 @@ def main():
except OSError: except OSError:
print('error connecting - is ElectrumX catching up or not running?') print('error connecting - is ElectrumX catching up or not running?')
finally: finally:
loop.stop()
loop.close() loop.close()

24
lib/jsonrpc.py

@ -13,6 +13,7 @@ import json
import numbers import numbers
import time import time
import traceback import traceback
from functools import partial
from lib.util import LoggedClass from lib.util import LoggedClass
@ -121,7 +122,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
NEXT_SESSION_ID = 0 NEXT_SESSION_ID = 0
@classmethod @classmethod
def request_payload(cls, method, id_, params=None): def request_payload(cls, id_, method, params=None):
payload = {'jsonrpc': '2.0', 'id': id_, 'method': method} payload = {'jsonrpc': '2.0', 'id': id_, 'method': method}
if params: if params:
payload['params'] = params payload['params'] = params
@ -131,10 +132,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
def response_payload(cls, result, id_): def response_payload(cls, result, id_):
return {'jsonrpc': '2.0', 'result': result, 'id': id_} return {'jsonrpc': '2.0', 'result': result, 'id': id_}
@classmethod
def notification_payload(cls, method, params=None):
return cls.request_payload(method, None, params)
@classmethod @classmethod
def error_payload(cls, message, code, id_=None): def error_payload(cls, message, code, id_=None):
error = {'message': message, 'code': code} error = {'message': message, 'code': code}
@ -166,6 +163,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.send_notification = partial(self.send_request, None)
self.start = time.time() self.start = time.time()
self.stop = 0 self.stop = 0
self.last_recv = self.start self.last_recv = self.start
@ -316,6 +314,18 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
'''Send a JSON error.''' '''Send a JSON error.'''
self._send_bytes(self.json_error_bytes(message, code, id_)) self._send_bytes(self.json_error_bytes(message, code, id_))
def send_request(self, id_, method, params=None):
'''Send a request. If id_ is None it is a notification.'''
self.encode_and_send_payload(self.request_payload(id_, method, params))
def send_notifications(self, mp_iterable):
'''Send an iterable of (method, params) notification pairs.
A 1-tuple is also valid in which case there are no params.'''
# TODO: maybe send batches if remote side supports it
for pair in mp_iterable:
self.send_notification(*pair)
def encode_payload(self, payload): def encode_payload(self, payload):
assert isinstance(payload, dict) assert isinstance(payload, dict)
@ -353,10 +363,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
'''Encode the payload and send it.''' '''Encode the payload and send it.'''
self._send_bytes(self.encode_payload(payload)) self._send_bytes(self.encode_payload(payload))
def json_notification_bytes(self, method, params):
'''Return the bytes of a json notification.'''
return self.encode_payload(self.notification_payload(method, params))
def json_request_bytes(self, method, id_, params=None): def json_request_bytes(self, method, id_, params=None):
'''Return the bytes of a JSON request.''' '''Return the bytes of a JSON request.'''
return self.encode_payload(self.request_payload(method, id_, params)) return self.encode_payload(self.request_payload(method, id_, params))

27
server/session.py

@ -135,32 +135,29 @@ class ElectrumX(Session):
Cache is a shared cache for this update. Cache is a shared cache for this update.
''' '''
controller = self.controller
pairs = []
if height != self.notified_height: if height != self.notified_height:
self.notified_height = height self.notified_height = height
if self.subscribe_headers: if self.subscribe_headers:
payload = self.notification_payload( args = (controller.electrum_header(height), )
'blockchain.headers.subscribe', pairs.append(('blockchain.headers.subscribe', args))
(self.controller.electrum_header(height), ),
)
self.encode_and_send_payload(payload)
if self.subscribe_height: if self.subscribe_height:
payload = self.notification_payload( pairs.append(('blockchain.numblocks.subscribe', (height, )))
'blockchain.numblocks.subscribe',
(height, ),
)
self.encode_and_send_payload(payload)
matches = touched.intersection(self.hashX_subs) matches = touched.intersection(self.hashX_subs)
for hashX in matches: for hashX in matches:
address = self.hashX_subs[hashX] address = self.hashX_subs[hashX]
status = await self.controller.address_status(hashX) status = await controller.address_status(hashX)
payload = self.notification_payload( pairs.append(('blockchain.address.subscribe', (address, status)))
'blockchain.address.subscribe', (address, status))
self.encode_and_send_payload(payload)
self.send_notifications(pairs)
if matches: if matches:
self.log_info('notified of {:,d} addresses'.format(len(matches))) es = '' if len(matches) == 1 else 'es'
self.log_info('notified of {:,d} address{}'
.format(len(matches), es))
def height(self): def height(self):
'''Return the current flushed database height.''' '''Return the current flushed database height.'''

Loading…
Cancel
Save