|
|
@ -491,12 +491,16 @@ class Network(util.DaemonThread): |
|
|
|
for callback in callbacks: |
|
|
|
callback(response) |
|
|
|
|
|
|
|
def get_index(self, method, params): |
|
|
|
""" hashable index for subscriptions and cache""" |
|
|
|
return str(method) + (':' + str(params[0]) if params else '') |
|
|
|
|
|
|
|
def process_responses(self, interface): |
|
|
|
responses = interface.get_responses() |
|
|
|
|
|
|
|
for request, response in responses: |
|
|
|
if request: |
|
|
|
method, params, message_id = request |
|
|
|
k = self.get_index(method, params) |
|
|
|
# client requests go through self.send() with a |
|
|
|
# callback, are only sent to the current interface, |
|
|
|
# and are placed in the unanswered_requests dictionary |
|
|
@ -520,16 +524,18 @@ class Network(util.DaemonThread): |
|
|
|
# Rewrite response shape to match subscription request response |
|
|
|
method = response.get('method') |
|
|
|
params = response.get('params') |
|
|
|
k = self.get_index(method, params) |
|
|
|
if method == 'blockchain.headers.subscribe': |
|
|
|
response['result'] = params[0] |
|
|
|
response['params'] = [] |
|
|
|
elif method == 'blockchain.address.subscribe': |
|
|
|
response['params'] = [params[0]] # addr |
|
|
|
response['result'] = params[1] |
|
|
|
callbacks = self.subscriptions.get(repr((method, params)), []) |
|
|
|
callbacks = self.subscriptions.get(k, []) |
|
|
|
|
|
|
|
# update cache if it's a subscription |
|
|
|
if method.endswith('.subscribe'): |
|
|
|
self.sub_cache[repr((method, params))] = response |
|
|
|
self.sub_cache[k] = response |
|
|
|
# Response is now in canonical form |
|
|
|
self.process_response(interface, response, callbacks) |
|
|
|
|
|
|
@ -538,7 +544,6 @@ class Network(util.DaemonThread): |
|
|
|
with self.lock: |
|
|
|
self.pending_sends.append((messages, callback)) |
|
|
|
|
|
|
|
|
|
|
|
def process_pending_sends(self): |
|
|
|
# Requests needs connectivity. If we don't have an interface, |
|
|
|
# we cannot process them. |
|
|
@ -551,14 +556,14 @@ class Network(util.DaemonThread): |
|
|
|
|
|
|
|
for messages, callback in sends: |
|
|
|
for method, params in messages: |
|
|
|
k = repr((method, params)) |
|
|
|
r = None |
|
|
|
if method.endswith('.subscribe'): |
|
|
|
k = self.get_index(method, params) |
|
|
|
# add callback to list |
|
|
|
l = self.subscriptions.get(k, []) |
|
|
|
if callback not in l: |
|
|
|
l.append(callback) |
|
|
|
self.subscriptions[k] = l |
|
|
|
|
|
|
|
# check cached response for subscriptions |
|
|
|
r = self.sub_cache.get(k) |
|
|
|
if r is not None: |
|
|
|