From 43df795b1ffc2e99b45ff406670da1e44cb60b0d Mon Sep 17 00:00:00 2001 From: ThomasV Date: Thu, 26 Nov 2015 10:57:43 +0100 Subject: [PATCH] network: separate callbacks from unanswered_requests --- lib/network.py | 51 +++++++++++++++++++++++++------------------------- 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/lib/network.py b/lib/network.py index 880b21386..b1beacede 100644 --- a/lib/network.py +++ b/lib/network.py @@ -166,7 +166,9 @@ class Network(util.DaemonThread): self.heights = {} self.merkle_roots = {} self.utxo_roots = {} + # callbacks passed with subscriptions self.subscriptions = defaultdict(list) + # callbacks set by the GUI self.callbacks = defaultdict(list) dir_path = os.path.join( self.config.path, 'certs') @@ -454,7 +456,7 @@ class Network(util.DaemonThread): self.switch_lagging_interface(i.server) self.notify('updated') - def process_response(self, interface, response, callback): + def process_response(self, interface, response): if self.debug: self.print_error("<--", response) error = response.get('error') @@ -485,17 +487,9 @@ class Network(util.DaemonThread): elif method == 'blockchain.block.get_header': self.on_get_header(interface, response) else: - if callback is None: - params = response['params'] - with self.lock: - for k,v in self.subscriptions.items(): - if (method, params) in v: - callback = k - break - if callback is None: - self.print_error("received unexpected notification", - method, params) - else: + params = response['params'] + callbacks = self.subscriptions.get(repr((method, params)), []) + for callback in callbacks: callback(response) def process_responses(self, interface): @@ -511,7 +505,7 @@ class Network(util.DaemonThread): client_req = self.unanswered_requests.pop(message_id, None) if client_req: assert interface == self.interface - callback = client_req[2] + # Copy the request method and params to the response response['method'] = method response['params'] = params @@ -534,12 +528,21 @@ class Network(util.DaemonThread): response['result'] = params[1] # Response is now in canonical form - self.process_response(interface, response, callback) + self.process_response(interface, response) def send(self, messages, callback): '''Messages is a list of (method, params) tuples''' with self.lock: - self.pending_sends.append((messages, callback)) + subs = filter(lambda (m,v): m.endswith('.subscribe'), messages) + for method, params in subs: + k = repr((method, params)) + l = self.subscriptions.get(k, []) + if callback not in l: + l.append(callback) + self.subscriptions[k] = l + + self.pending_sends += messages + def process_pending_sends(self): # Requests needs connectivity. If we don't have an interface, @@ -551,23 +554,19 @@ class Network(util.DaemonThread): sends = self.pending_sends self.pending_sends = [] - for messages, callback in sends: - subs = filter(lambda (m,v): m.endswith('.subscribe'), messages) - with self.lock: - for sub in subs: - if sub not in self.subscriptions[callback]: - self.subscriptions[callback].append(sub) - - for method, params in messages: - message_id = self.queue_request(method, params) - self.unanswered_requests[message_id] = method, params, callback + for method, params in sends: + message_id = self.queue_request(method, params) + self.unanswered_requests[message_id] = method, params def unsubscribe(self, callback): '''Unsubscribe a callback to free object references to enable GC.''' # Note: we can't unsubscribe from the server, so if we receive # subsequent notifications process_response() will emit a harmless # "received unexpected notification" warning - self.subscriptions.pop(callback, None) + with self.lock: + for v in self.subscriptions.values(): + if callback in v: + v.remove(callback) def connection_down(self, server): '''A connection to server either went down, or was never made.