diff --git a/lib/interface.py b/lib/interface.py index 2c0673359..9bfd929fb 100644 --- a/lib/interface.py +++ b/lib/interface.py @@ -221,7 +221,6 @@ class Interface(util.PrintError): self.pipe.set_timeout(0.0) # Don't wait for data # Dump network messages. Set at runtime from the console. self.debug = False - self.message_id = 0 self.unsent_requests = [] self.unanswered_requests = {} # Set last ping to zero to ensure immediate ping @@ -241,32 +240,26 @@ class Interface(util.PrintError): self.socket.shutdown(socket.SHUT_RDWR) self.socket.close() - def queue_request(self, request): - '''Queue a request.''' + def queue_request(self, *args): # method, params, _id + '''Queue a request, later to be send with send_requests when the + socket is available for writing. + ''' self.request_time = time.time() - self.unsent_requests.append(request) + self.unsent_requests.append(args) def send_requests(self): '''Sends all queued requests. Returns False on failure.''' - def copy_request(orig): - # Replace ID after making copy - mustn't change caller's copy - request = orig.copy() - request['id'] = self.message_id - self.message_id += 1 - if self.debug: - self.print_error("-->", request, orig.get('id')) - return request - - requests_as_sent = map(copy_request, self.unsent_requests) + make_dict = lambda (m, p, i): {'method': m, 'params': p, 'id': i} + wire_requests = map(make_dict, self.unsent_requests) try: - self.pipe.send_all(requests_as_sent) + self.pipe.send_all(wire_requests) except socket.error, e: self.print_error("socket error:", e) return False - # unanswered_requests stores the original unmodified user - # request, keyed by wire ID - for n, request in enumerate(self.unsent_requests): - self.unanswered_requests[requests_as_sent[n]['id']] = request + for request in self.unsent_requests: + if self.debug: + self.print_error("-->", request) + self.unanswered_requests[request[2]] = request self.unsent_requests = [] return True @@ -291,37 +284,39 @@ class Interface(util.PrintError): def get_responses(self): '''Call if there is data available on the socket. Returns a list of - notifications and a list of responses. The notifications are - singleton unsolicited responses presumably as a result of - prior subscriptions. The responses are (request, response) - pairs. If the connection was closed remotely or the remote - server is misbehaving, the last notification will be None. + (request, response) pairs. Notifications are singleton + unsolicited responses presumably as a result of prior + subscriptions, so request is None and there is no 'id' member. + Otherwise it is a response, which has an 'id' member and a + corresponding request. If the connection was closed remotely + or the remote server is misbehaving, a (None, None) will appear. ''' - notifications, responses = [], [] + responses = [] while True: try: response = self.pipe.get() except util.timeout: break if response is None: - notifications.append(None) + responses.append((None, None)) self.closed_remotely = True self.print_error("connection closed remotely") break if self.debug: self.print_error("<--", response) - wire_id = response.pop('id', None) - if wire_id is None: - notifications.append(response) - elif wire_id in self.unanswered_requests: - request = self.unanswered_requests.pop(wire_id) - responses.append((request, response)) + wire_id = response.get('id', None) + if wire_id is None: # Notification + responses.append((None, response)) else: - notifications.append(None) - self.print_error("unknown wire ID", wire_id) - break + request = self.unanswered_requests.pop(wire_id, None) + if request: + responses.append((request, response)) + else: + self.print_error("unknown wire ID", wire_id) + responses.append(None, None) # Signal + break - return notifications, responses + return responses def check_cert(host, cert): diff --git a/lib/network.py b/lib/network.py index 818a03ef9..e790c295d 100644 --- a/lib/network.py +++ b/lib/network.py @@ -254,15 +254,26 @@ class Network(util.DaemonThread): def is_up_to_date(self): return self.unanswered_requests == {} - def queue_request(self, method, params): - self.interface.queue_request({'method': method, 'params': params}) + def queue_request(self, method, params, interface=None): + # If you want to queue a request on any interface it must go + # through this function so message ids are properly tracked + if interface is None: + interface = self.interface + message_id = self.message_id + self.message_id += 1 + interface.queue_request(method, params, message_id) + return message_id def send_subscriptions(self): # clear cache self.cached_responses = {} self.print_error('sending subscriptions to', self.interface.server, len(self.unanswered_requests), len(self.subscribed_addresses)) - for r in self.unanswered_requests.values(): - self.interface.queue_request(r[0]) + # Resend unanswered requests + requests = self.unanswered_requests.values() + self.unanswered_requests = {} + for request in requests: + message_id = self.queue_request(request[0], request[1]) + self.unanswered_requests[message_id] = request for addr in self.subscribed_addresses: self.queue_request('blockchain.address.subscribe', [addr]) self.queue_request('server.banner', []) @@ -488,38 +499,39 @@ class Network(util.DaemonThread): callback(response) def process_responses(self, interface): - notifications, responses = interface.get_responses() + responses = interface.get_responses() for request, response in responses: - # Client ID was given by the daemon or proxy - client_id = request.get('id') - if client_id is not None: - if interface != self.interface: - continue - _req, callback = self.unanswered_requests.pop(client_id) + callback = None + if request: + method, params, message_id = request + # client requests go through self.send() with a + # callback, are only sent to the current interface, + # and are placed in the unanswered_requests dictionary + client_req = self.unanswered_requests.pop(message_id, None) + if client_req: + assert interface == self.interface + callback = client_req[2] + # Copy the request method and params to the response + response['method'] = method + response['params'] = params else: - callback = None - # Copy the request method and params to the response - response['method'] = request.get('method') - response['params'] = request.get('params') - response['id'] = client_id + if not response: # Closed remotely / misbehaving + self.connection_down(interface.server) + break + # Rewrite response shape to match subscription request response + method = response.get('method') + params = response.get('params') + if method == 'blockchain.headers.subscribe': + response['result'] = params[0] + response['params'] = [] + elif method == 'blockchain.address.subscribe': + response['params'] = [params[0]] # addr + response['result'] = params[1] + + # Response is now in canonical form self.process_response(interface, response, callback) - for response in notifications: - if not response: # Closed remotely - self.connection_down(interface.server) - break - # Rewrite response shape to match subscription request response - method = response.get('method') - params = response.get('params') - if method == 'blockchain.headers.subscribe': - response['result'] = params[0] - response['params'] = [] - elif method == 'blockchain.address.subscribe': - response['params'] = [params[0]] # addr - response['result'] = params[1] - self.process_response(interface, response, None) - def send(self, messages, callback): '''Messages is a list of (method, value) tuples''' with self.lock: @@ -535,16 +547,11 @@ class Network(util.DaemonThread): for sub in subs: if sub not in self.subscriptions[callback]: self.subscriptions[callback].append(sub) - _id = self.message_id - self.message_id += len(messages) unsent = [] for message in messages: - method, params = message - request = {'id': _id, 'method': method, 'params': params} - if not self.process_request(request, callback): + if not self.process_request(message, callback): unsent.append(message) - _id += 1 if unsent: with self.lock: @@ -553,12 +560,10 @@ class Network(util.DaemonThread): # FIXME: inline this function def process_request(self, request, callback): '''Returns true if the request was processed.''' - method = request['method'] - params = request['params'] - _id = request['id'] + method, params = request if method.startswith('network.'): - out = {'id':_id} + out = {} try: f = getattr(self, method[8:]) out['result'] = f(*params) @@ -585,8 +590,8 @@ class Network(util.DaemonThread): if self.debug: self.print_error("-->", request) - self.unanswered_requests[_id] = request, callback - self.interface.queue_request(request) + message_id = self.queue_request(method, params) + self.unanswered_requests[message_id] = method, params, callback return True def connection_down(self, server): @@ -603,8 +608,7 @@ class Network(util.DaemonThread): def new_interface(self, server, socket): self.add_recent_server(server) self.interfaces[server] = interface = Interface(server, socket) - interface.queue_request({'method': 'blockchain.headers.subscribe', - 'params': []}) + self.queue_request('blockchain.headers.subscribe', [], interface) if server == self.default_server: self.switch_to_interface(server) self.notify('interfaces') @@ -625,9 +629,8 @@ class Network(util.DaemonThread): if interface.has_timed_out(): self.connection_down(interface.server) elif interface.ping_required(): - version_req = {'method': 'server.version', - 'params': [ELECTRUM_VERSION, PROTOCOL_VERSION]} - interface.queue_request(version_req) + params = [ELECTRUM_VERSION, PROTOCOL_VERSION] + self.queue_request('server.version', params, interface) now = time.time() # nodes @@ -653,8 +656,7 @@ class Network(util.DaemonThread): def request_chunk(self, interface, data, idx): interface.print_error("requesting chunk %d" % idx) - interface.queue_request({'method':'blockchain.block.get_chunk', - 'params':[idx]}) + self.queue_request('blockchain.block.get_chunk', [idx], interface) data['chunk_idx'] = idx data['req_time'] = time.time() @@ -675,8 +677,7 @@ class Network(util.DaemonThread): def request_header(self, interface, data, height): interface.print_error("requesting header %d" % height) - interface.queue_request({'method':'blockchain.block.get_header', - 'params':[height]}) + self.queue_request('blockchain.block.get_header', [height], interface) data['header_height'] = height data['req_time'] = time.time() if not 'chain' in data: diff --git a/scripts/estimate_fee b/scripts/estimate_fee index 18a22314e..da1fa081a 100755 --- a/scripts/estimate_fee +++ b/scripts/estimate_fee @@ -1,9 +1,5 @@ #!/usr/bin/env python import util, json peers = util.get_peers() -results = util.send_request(peers, {'method':'blockchain.estimatefee','params':[2]}) +results = util.send_request(peers, 'blockchain.estimatefee', [2]) print json.dumps(results, indent=4) - - - - diff --git a/scripts/peers b/scripts/peers index 4fc146980..e063664d7 100755 --- a/scripts/peers +++ b/scripts/peers @@ -31,12 +31,9 @@ def analyze(results): peers = util.get_peers() -results = util.send_request(peers, {'method':'blockchain.headers.subscribe','params':[]}) +results = util.send_request(peers, 'blockchain.headers.subscribe', []) errors = analyze(results).keys() for n,v in sorted(results.items(), key=lambda x:x[1].get('block_height')): print "%40s"%n, v.get('block_height'), v.get('utxo_root'), "error" if n in errors else "ok" - - - diff --git a/scripts/servers b/scripts/servers index 20d9cd57b..3e853fa87 100755 --- a/scripts/servers +++ b/scripts/servers @@ -10,7 +10,7 @@ set_verbosity(False) config = SimpleConfig() servers = filter_protocol(protocol = 't') -results = util.send_request(servers, {'method':'blockchain.headers.subscribe', 'params':[]}) +results = util.send_request(servers, 'blockchain.headers.subscribe', []) d = defaultdict(int) diff --git a/scripts/txradar b/scripts/txradar index 10c5655eb..f1834d639 100755 --- a/scripts/txradar +++ b/scripts/txradar @@ -7,7 +7,7 @@ except: sys.exit(1) peers = util.get_peers() -results = util.send_request(peers, {'method':'blockchain.transaction.get','params':[tx]}) +results = util.send_request(peers, 'blockchain.transaction.get', [tx]) r1 = [] r2 = [] @@ -17,4 +17,3 @@ for k, v in results.items(): print "Received %d answers"%len(results) print "Propagation rate: %.1f percent" % (len(r1) *100./(len(r1)+ len(r2))) - diff --git a/scripts/util.py b/scripts/util.py index 454d256c1..a8041c9e6 100644 --- a/scripts/util.py +++ b/scripts/util.py @@ -16,13 +16,15 @@ def get_interfaces(servers, timeout=10): connecting[server] = Connection(server, socket_queue, config.path) interfaces = {} timeout = time.time() + timeout - while time.time() < timeout: + count = 0 + while time.time() < timeout and count < len(servers): try: - server, socket = socket_queue.get(True, 1) + server, socket = socket_queue.get(True, 0.3) except Queue.Empty: continue if socket: interfaces[server] = Interface(server, socket) + count += 1 return interfaces def wait_on_interfaces(interfaces, timeout=10): @@ -37,7 +39,7 @@ def wait_on_interfaces(interfaces, timeout=10): for interface in wout: interface.send_requests() for interface in rout: - notifications, responses = interface.get_responses() + responses = interface.get_responses() if responses: result[interface.server].extend(responses) return result @@ -52,25 +54,23 @@ def get_peers(): return [] # 2. get list of peers interface = interfaces[server] - interface.queue_request({'id':0, 'method': 'server.peers.subscribe', - 'params': []}) - responses = wait_on_interfaces(interfaces) - responses = responses.get(server) + interface.queue_request('server.peers.subscribe', [], 0) + responses = wait_on_interfaces(interfaces).get(server) if responses: response = responses[0][1] # One response, (req, response) tuple peers = parse_servers(response.get('result')) peers = filter_protocol(peers,'s') return peers -def send_request(peers, request): +def send_request(peers, method, params): print "Contacting %d servers"%len(peers) interfaces = get_interfaces(peers) print "%d servers could be reached" % len(interfaces) for peer in peers: if not peer in interfaces: print "Connection failed:", peer - for i in interfaces.values(): - i.queue_request(request) + for msg_id, i in enumerate(interfaces.values()): + i.queue_request(method, params, msg_id) responses = wait_on_interfaces(interfaces) for peer in interfaces: if not peer in responses: