diff --git a/lib/interface.py b/lib/interface.py index 3b67b20de..aff2dd019 100644 --- a/lib/interface.py +++ b/lib/interface.py @@ -17,10 +17,14 @@ # along with this program. If not, see . -import copy, re, errno, os -import threading, traceback, sys, time, Queue +import os +import re import socket import ssl +import sys +import threading +import time +import traceback import requests ca_path = requests.certs.where() @@ -28,49 +32,30 @@ ca_path = requests.certs.where() import util import x509 import pem -from version import ELECTRUM_VERSION, PROTOCOL_VERSION -from simple_config import SimpleConfig -def Interface(server, response_queue, config = None): - """Interface factory function. The returned interface class handles the connection - to a single remote electrum server. The object handles all necessary locking. It's - exposed API is: +def Connection(server, queue, config_path): + """Makes asynchronous connections to a remote remote electrum server. + Returns the running thread that is making the connection. - - Inherits everything from threading.Thread. - - Member functions send_request(), stop(), is_connected() - - Member variable server. - - "server" is constant for the object's lifetime and hence synchronization is unnecessary. + Once the thread has connected, it finishes, placing a tuple on the + queue of the form (server, socket), where socket is None if + connection failed. """ host, port, protocol = server.split(':') - if protocol in 'st': - return TcpInterface(server, response_queue, config) - else: - raise Exception('Unknown protocol: %s'%protocol) - -# Connection status -CS_OPENING, CS_CONNECTED, CS_FAILED = range(3) + if not protocol in 'st': + raise Exception('Unknown protocol: %s' % protocol) + c = TcpConnection(server, queue, config_path) + c.start() + return c -class TcpInterface(threading.Thread): +class TcpConnection(threading.Thread): - def __init__(self, server, response_queue, config = None): + def __init__(self, server, queue, config_path): threading.Thread.__init__(self) self.daemon = True - self.config = config if config is not None else SimpleConfig() - # Set by stop(); no more data is exchanged and the thread exits after gracefully - # closing the socket - self.disconnect = False - self._status = CS_OPENING - self.debug = False # dump network messages. can be changed at runtime using the console - self.message_id = 0 - self.response_queue = response_queue - self.request_queue = Queue.Queue() - self.unanswered_requests = {} - # request timeouts - self.request_time = time.time() - self.ping_time = 0 - # parse server + self.config_path = config_path + self.queue = queue self.server = server self.host, self.port, self.protocol = self.server.split(':') self.host = str(self.host) @@ -78,47 +63,31 @@ class TcpInterface(threading.Thread): self.use_ssl = (self.protocol == 's') def print_error(self, *msg): - util.print_error("[%s]"%self.host, *msg) - - def process_response(self, response): - if self.debug: - self.print_error("<--", response) - - msg_id = response.get('id') - error = response.get('error') - result = response.get('result') - - if msg_id is not None: - method, params, _id, queue = self.unanswered_requests.pop(msg_id) - if queue is None: - queue = self.response_queue + util.print_error("[%s]" % self.host, *msg) + + def check_host_name(self, peercert, name): + """Simple certificate/host name checker. Returns True if the + certificate matches, False otherwise. Does not support + wildcards.""" + # Check that the peer has supplied a certificate. + # None/{} is not acceptable. + if not peercert: + return False + if peercert.has_key("subjectAltName"): + for typ, val in peercert["subjectAltName"]: + if typ == "DNS" and val == name: + return True else: - # notification - method = response.get('method') - params = response.get('params') - _id = None - queue = self.response_queue - # restore parameters - if method == 'blockchain.numblocks.subscribe': - result = params[0] - params = [] - elif method == 'blockchain.headers.subscribe': - result = params[0] - params = [] - elif method == 'blockchain.address.subscribe': - addr = params[0] - result = params[1] - params = [addr] - - if method == 'server.version': - self.server_version = result - return - - if error: - queue.put((self, {'method':method, 'params':params, 'error':error, 'id':_id})) - else: - queue.put((self, {'method':method, 'params':params, 'result':result, 'id':_id})) - + # Only check the subject DN if there is no subject alternative + # name. + cn = None + for attr, val in peercert["subject"]: + # Use most-specific (last) commonName attribute. + if attr == "commonName": + cn = val + if cn is not None: + return cn == name + return False def get_simple_socket(self): try: @@ -138,10 +107,9 @@ class TcpInterface(threading.Thread): else: self.print_error("failed to connect", str(e)) - def get_socket(self): if self.use_ssl: - cert_path = os.path.join( self.config.path, 'certs', self.host) + cert_path = os.path.join(self.config_path, 'certs', self.host) if not os.path.exists(cert_path): is_new = True s = self.get_simple_socket() @@ -152,7 +120,7 @@ class TcpInterface(threading.Thread): s = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1, cert_reqs=ssl.CERT_REQUIRED, ca_certs=ca_path, do_handshake_on_connect=True) except ssl.SSLError, e: s = None - if s and check_host_name(s.getpeercert(), self.host): + if s and self.check_host_name(s.getpeercert(), self.host): self.print_error("SSL certificate signed by CA") return s @@ -229,117 +197,131 @@ class TcpInterface(threading.Thread): return s - def send_request(self, request, response_queue = None): + def run(self): + socket = self.get_socket() + if socket: + self.print_error("connected") + self.queue.put((self.server, socket)) + +class Interface: + """The Interface class handles a socket connected to a single remote + electrum server. It's exposed API is: + + - Member functions close(), fileno(), get_responses(), has_timed_out(), + ping_required(), queue_request(), send_requests() + - Member variable server. + """ + + def __init__(self, server, socket): + self.server = server + self.host, _, _ = server.split(':') + self.socket = socket + + self.pipe = util.SocketPipe(socket) + self.pipe.set_timeout(0.0) # Don't wait for data + # Dump network messages. Set at runtime from the console. + self.debug = False + self.message_id = 0 + self.unsent_requests = [] + self.unanswered_requests = {} + # Set last ping to zero to ensure immediate ping + self.last_request = time.time() + self.last_ping = 0 + self.closed_remotely = False + + def print_error(self, *msg): + util.print_error("[%s]" % self.host, *msg) + + def fileno(self): + # Needed for select + return self.socket.fileno() + + def close(self): + if not self.closed_remotely: + self.socket.shutdown(socket.SHUT_RDWR) + self.socket.close() + + def queue_request(self, request): '''Queue a request.''' self.request_time = time.time() - self.request_queue.put((copy.deepcopy(request), response_queue)) + self.unsent_requests.append(request) def send_requests(self): - '''Sends all queued requests''' - while self.is_connected() and not self.request_queue.empty(): - request, response_queue = self.request_queue.get() - method = request.get('method') - params = request.get('params') - r = {'id': self.message_id, 'method': method, 'params': params} - try: - self.pipe.send(r) - except socket.error, e: - self.print_error("socket error:", e) - self.stop() - return - if self.debug: - self.print_error("-->", r) - self.unanswered_requests[self.message_id] = method, params, request.get('id'), response_queue + '''Sends all queued requests. Returns False on failure.''' + def copy_request(orig): + # Replace ID after making copy - mustn't change caller's copy + request = orig.copy() + request['id'] = self.message_id self.message_id += 1 + if self.debug: + self.print_error("-->", request, orig.get('id')) + return request + + requests_as_sent = map(copy_request, self.unsent_requests) + try: + self.pipe.send_all(requests_as_sent) + except socket.error, e: + self.print_error("socket error:", e) + return False + # unanswered_requests stores the original unmodified user + # request, keyed by wire ID + for n, request in enumerate(self.unsent_requests): + self.unanswered_requests[requests_as_sent[n]['id']] = request + self.unsent_requests = [] + return True + + def ping_required(self): + '''Maintains time since last ping. Returns True if a ping should + be sent. + ''' + now = time.time() + if now - self.last_ping > 60: + self.last_ping = now + return True + return False - def is_connected(self): - '''True if status is connected''' - return self._status == CS_CONNECTED and not self.disconnect - - def stop(self): - if not self.disconnect: - self.disconnect = True - self.print_error("disconnecting") - - def maybe_ping(self): - # ping the server with server.version - if time.time() - self.ping_time > 60: - self.send_request({'method':'server.version', 'params':[ELECTRUM_VERSION, PROTOCOL_VERSION]}) - self.ping_time = time.time() - # stop interface if we have been waiting for more than 10 seconds - if self.unanswered_requests and time.time() - self.request_time > 10 and self.pipe.idle_time() > 10: - self.print_error("interface timeout", len(self.unanswered_requests)) - self.stop() - - def get_and_process_response(self): - if self.is_connected(): + def has_timed_out(self): + '''Returns True if the interface has timed out.''' + if (self.unanswered_requests and time.time() - self.request_time > 10 + and self.pipe.idle_time() > 10): + self.print_error("timeout", len(self.unanswered_requests)) + return True + + return False + + def get_responses(self): + '''Call if there is data available on the socket. Returns a list of + notifications and a list of responses. The notifications are + singleton unsolicited responses presumably as a result of + prior subscriptions. The responses are (request, response) + pairs. If the connection was closed remotely or the remote + server is misbehaving, the last notification will be None. + ''' + notifications, responses = [], [] + while True: try: response = self.pipe.get() except util.timeout: - return - # If remote side closed the socket, SocketPipe closes our socket and returns None + break if response is None: - self.disconnect = True + notifications.append(None) + self.closed_remotely = True self.print_error("connection closed remotely") + break + if self.debug: + self.print_error("<--", response) + wire_id = response.pop('id', None) + if wire_id is None: + notifications.append(response) + elif wire_id in self.unanswered_requests: + request = self.unanswered_requests.pop(wire_id) + responses.append((request, response)) else: - self.process_response(response) + notifications.append(None) + self.print_error("unknown wire ID", wire_id) + break - def run(self): - s = self.get_socket() - if s: - self.pipe = util.SocketPipe(s) - s.settimeout(0.1) - self.print_error("connected") - self._status = CS_CONNECTED - # Indicate to parent that we've connected - self.notify_status() - while self.is_connected(): - self.maybe_ping() - self.send_requests() - self.get_and_process_response() - s.shutdown(socket.SHUT_RDWR) - s.close() - - # Also for the s is None case - self._status = CS_FAILED - # Indicate to parent that the connection is now down - self.notify_status() - - def notify_status(self): - '''Notify owner that we have just connected or just failed the connection. - Owner determines which through e.g. testing is_connected()''' - self.response_queue.put((self, None)) - - -def _match_hostname(name, val): - if val == name: - return True - - return val.startswith('*.') and name.endswith(val[1:]) - - -def check_host_name(peercert, name): - """Simple certificate/host name checker. Returns True if the - certificate matches, False otherwise.""" - # Check that the peer has supplied a certificate. - # None/{} is not acceptable. - if not peercert: - return False - if peercert.has_key("subjectAltName"): - for typ, val in peercert["subjectAltName"]: - if typ == "DNS" and _match_hostname(name, val): - return True - else: - # Only check the subject DN if there is no subject alternative - # name. - cn = None - for attr, val in peercert["subject"]: - # Use most-specific (last) commonName attribute. - if attr == "commonName": - cn = val - if cn is not None: - return _match_hostname(name, cn) - return False + return notifications, responses def check_cert(host, cert): @@ -361,7 +343,15 @@ def check_cert(host, cert): util.print_msg(m) +# Used by tests +def _match_hostname(name, val): + if val == name: + return True + + return val.startswith('*.') and name.endswith(val[1:]) + def test_certificates(): + from simple_config import SimpleConfig config = SimpleConfig() mydir = os.path.join(config.path, "certs") certs = os.listdir(mydir) diff --git a/lib/network.py b/lib/network.py index 127b43722..1df55d3eb 100644 --- a/lib/network.py +++ b/lib/network.py @@ -3,7 +3,9 @@ import Queue import os import sys import random +import select import traceback +from collections import deque import socks import socket @@ -11,9 +13,9 @@ import json import util from bitcoin import * -import interface +from interface import Connection, Interface from blockchain import Blockchain -from collections import deque +from version import ELECTRUM_VERSION, PROTOCOL_VERSION DEFAULT_PORTS = {'t':'50001', 's':'50002', 'h':'8081', 'g':'8082'} @@ -119,11 +121,13 @@ def deserialize_server(server_str): def serialize_server(host, port, protocol): return str(':'.join([host, port, protocol])) - class Network(util.DaemonThread): - """The Network class manages a set of connections to remote - electrum servers, each connection is handled by its own - thread object returned from Interface(). Its external API: + """The Network class manages a set of connections to remote electrum + servers, each connected socket is handled by an Interface() object. + Connections are initiated by a Connection() thread which stops once + the connection succeeds or fails. + + Our external API: - Member functions get_header(), get_parameters(), get_status_value(), new_blockchain_height(), set_parameters(), start(), @@ -137,7 +141,6 @@ class Network(util.DaemonThread): self.config = SimpleConfig(config) if type(config) == type({}) else config self.num_server = 8 if not self.config.get('oneserver') else 0 self.blockchain = Blockchain(self.config, self) - self.queue = Queue.Queue() self.requests_queue = pipe.send_queue self.response_queue = pipe.get_queue # A deque of interface header requests, processed left-to-right @@ -169,7 +172,7 @@ class Network(util.DaemonThread): self.subscribed_addresses = set() # cached address status self.addr_responses = {} - # unanswered requests + # Requests from client we've not seen a response to self.unanswered_requests = {} # retry times self.server_retry_time = time.time() @@ -180,6 +183,8 @@ class Network(util.DaemonThread): self.interface = None self.interfaces = {} self.auto_connect = self.config.get('auto_connect', False) + self.connecting = {} + self.socket_queue = Queue.Queue() self.start_network(deserialize_server(self.default_server)[2], deserialize_proxy(self.config.get('proxy'))) @@ -224,19 +229,22 @@ class Network(util.DaemonThread): self.notify('status') def is_connected(self): - return self.interface and self.interface.is_connected() + return self.interface is not None + + def queue_request(self, method, params): + self.interface.queue_request({'method': method, 'params': params}) def send_subscriptions(self): # clear cache self.cached_responses = {} self.print_error('sending subscriptions to', self.interface.server, len(self.unanswered_requests), len(self.subscribed_addresses)) for r in self.unanswered_requests.values(): - self.interface.send_request(r) + self.interface.queue_request(r) for addr in self.subscribed_addresses: - self.interface.send_request({'method':'blockchain.address.subscribe','params':[addr]}) - self.interface.send_request({'method':'server.banner','params':[]}) - self.interface.send_request({'method':'server.peers.subscribe','params':[]}) - self.interface.send_request({'method':'blockchain.estimatefee','params':[2]}) + self.queue_request('blockchain.address.subscribe', [addr]) + self.queue_request('server.banner', []) + self.queue_request('server.peers.subscribe', []) + self.queue_request('blockchain.estimatefee', [2]) def get_status_value(self, key): if key == 'status': @@ -263,7 +271,7 @@ class Network(util.DaemonThread): def get_interfaces(self): '''The interfaces that are in connected state''' - return [s for s, i in self.interfaces.items() if i.is_connected()] + return self.interfaces.keys() def get_servers(self): if self.irc_servers: @@ -280,12 +288,11 @@ class Network(util.DaemonThread): return out def start_interface(self, server): - if not server in self.interfaces.keys(): + if (not server in self.interfaces and not server in self.connecting): if server == self.default_server: self.set_status('connecting') - i = interface.Interface(server, self.queue, self.config) - self.interfaces[i.server] = i - i.start() + c = Connection(server, self.socket_queue, self.config.path) + self.connecting[server] = c def start_random_interface(self): exclude_set = self.disconnected_servers.union(set(self.interfaces)) @@ -312,6 +319,7 @@ class Network(util.DaemonThread): def start_network(self, protocol, proxy): assert not self.interface and not self.interfaces + assert not self.connecting and self.socket_queue.empty() self.print_error('starting network') self.disconnected_servers = set([]) self.protocol = protocol @@ -320,10 +328,13 @@ class Network(util.DaemonThread): def stop_network(self): self.print_error("stopping network") - for i in self.interfaces.values(): - i.stop() - self.interface = None - self.interfaces = {} + for interface in self.interfaces.values(): + self.close_interface(interface) + assert self.interface is None + assert not self.interfaces + self.connecting = {} + # Get a new queue - no old pending connections thanks! + self.socket_queue = Queue.Queue() def set_parameters(self, host, port, protocol, proxy, auto_connect): self.auto_connect = auto_connect @@ -339,7 +350,10 @@ class Network(util.DaemonThread): self.switch_lagging_interface() def switch_to_random_interface(self): + '''Switch to a random connected server other than the current one''' servers = self.get_interfaces() # Those in connected state + if self.default_server in servers: + servers.remove(self.default_server) if servers: self.switch_to_interface(random.choice(servers)) @@ -362,30 +376,28 @@ class Network(util.DaemonThread): self.start_interface(server) return i = self.interfaces[server] - if not i.is_connected(): - # do nothing; we will switch once connected - return if self.interface != i: self.print_error("switching to", server) # stop any current interface in order to terminate subscriptions - self.stop_interface() + self.close_interface(self.interface) self.interface = i self.addr_responses = {} self.send_subscriptions() self.set_status('connected') self.notify('updated') - def stop_interface(self): - if self.interface: - self.interface.stop() - self.interface = None + def close_interface(self, interface): + if interface: + self.interfaces.pop(interface.server) + if interface.server == self.default_server: + self.interface = None + interface.close() - def add_recent_server(self, i): + def add_recent_server(self, server): # list is ordered - s = i.server - if s in self.recent_servers: - self.recent_servers.remove(s) - self.recent_servers.insert(0,s) + if server in self.recent_servers: + self.recent_servers.remove(server) + self.recent_servers.insert(0, server) self.recent_servers = self.recent_servers[0:20] self.save_recent_servers() @@ -393,70 +405,75 @@ class Network(util.DaemonThread): self.switch_lagging_interface(i.server) self.notify('updated') - def process_if_notification(self, i): - '''Handle interface addition and removal through notifications''' - if i.is_connected(): - self.add_recent_server(i) - i.send_request({'method':'blockchain.headers.subscribe','params':[]}) - if i.server == self.default_server: - self.switch_to_interface(i.server) - else: - self.interfaces.pop(i.server, None) - self.heights.pop(i.server, None) - if i == self.interface: - self.interface = None - self.addr_responses = {} - self.set_status('disconnected') - self.disconnected_servers.add(i.server) - # Our set of interfaces changed - self.notify('interfaces') - - def process_response(self, i, response): - # the id comes from the daemon or the network proxy - _id = response.get('id') - if _id is not None: - if i != self.interface: - return - self.unanswered_requests.pop(_id) - - method = response.get('method') + def process_response(self, interface, response): + error = response.get('error') result = response.get('result') - if method == 'blockchain.headers.subscribe': - self.on_header(i, response) + method = response.get('method') + + # We handle some responses; return the rest to the client. + if method == 'server.version': + interface.server_version = result + elif method == 'blockchain.headers.subscribe': + if error is None: + self.on_header(interface, result) elif method == 'server.peers.subscribe': - self.irc_servers = parse_servers(result) - self.notify('servers') + if error is None: + self.irc_servers = parse_servers(result) + self.notify('servers') elif method == 'server.banner': - self.banner = result - self.notify('banner') + if error is None: + self.banner = result + self.notify('banner') elif method == 'blockchain.estimatefee': - from bitcoin import COIN - self.fee = int(result * COIN) - self.print_error("recommended fee", self.fee) - self.notify('fee') - elif method == 'blockchain.address.subscribe': - addr = response.get('params')[0] - self.addr_responses[addr] = result - self.response_queue.put(response) + if error is None: + self.fee = int(result * COIN) + self.print_error("recommended fee", self.fee) + self.notify('fee') elif method == 'blockchain.block.get_chunk': - self.on_get_chunk(i, response) + self.on_get_chunk(interface, response) elif method == 'blockchain.block.get_header': - self.on_get_header(i, response) + self.on_get_header(interface, response) else: + # Cache address subscription results + if method == 'blockchain.address.subscribe' and error is None: + addr = response['params'][0] + self.addr_responses[addr] = result self.response_queue.put(response) - def handle_requests(self): - '''Some requests require connectivity, others we handle locally in - process_request() and must do so in order to e.g. prevent the - daemon seeming unresponsive. - ''' - unhandled = [] + def process_responses(self, interface): + notifications, responses = interface.get_responses() + + for request, response in responses: + # Client ID was given by the daemon or proxy + client_id = request.get('id') + if client_id is not None: + if interface != self.interface: + continue + self.unanswered_requests.pop(client_id) + # Copy the request method and params to the response + response['method'] = request.get('method') + response['params'] = request.get('params') + response['id'] = client_id + self.process_response(interface, response) + + for response in notifications: + if not response: # Closed remotely + self.connection_down(interface.server) + break + # Rewrite response shape to match subscription request response + method = response.get('method') + if method == 'blockchain.headers.subscribe': + response['result'] = response['params'][0] + response['params'] = [] + elif method == 'blockchain.address.subscribe': + params = response['params'] + response['params'] = [params[0]] # addr + response['result'] = params[1] + self.process_response(interface, response) + + def handle_incoming_requests(self): while not self.requests_queue.empty(): - request = self.requests_queue.get() - if not self.process_request(request): - unhandled.append(request) - for request in unhandled: - self.requests_queue.put(request) + self.process_request(self.requests_queue.get()) def process_request(self, request): '''Returns true if the request was processed.''' @@ -487,22 +504,62 @@ class Network(util.DaemonThread): # This request needs connectivity. If we don't have an # interface, we cannot process it. - if not self.is_connected(): + if not self.interface: return False self.unanswered_requests[_id] = request - self.interface.send_request(request) + self.interface.queue_request(request) return True - def check_interfaces(self): + def connection_down(self, server): + '''A connection to server either went down, or was never made. + We distinguish by whether it is in self.interfaces.''' + self.disconnected_servers.add(server) + if server == self.default_server: + self.set_status('disconnected') + if server in self.interfaces: + self.close_interface(self.interfaces[server]) + self.heights.pop(server, None) + self.notify('interfaces') + + def new_interface(self, server, socket): + self.add_recent_server(server) + self.interfaces[server] = interface = Interface(server, socket) + interface.queue_request({'method': 'blockchain.headers.subscribe', + 'params': []}) + if server == self.default_server: + self.switch_to_interface(server) + self.notify('interfaces') + + def maintain_sockets(self): + '''Socket maintenance.''' + # Responses to connection attempts? + while not self.socket_queue.empty(): + server, socket = self.socket_queue.get() + self.connecting.pop(server) + if socket: + self.new_interface(server, socket) + else: + self.connection_down(server) + + # Send pings and shut down stale interfaces + for interface in self.interfaces.values(): + if interface.has_timed_out(): + self.connection_down(interface.server) + elif interface.ping_required(): + version_req = {'method': 'server.version', + 'params': [ELECTRUM_VERSION, PROTOCOL_VERSION]} + interface.queue_request(version_req) + now = time.time() # nodes - if len(self.interfaces) < self.num_server: + if len(self.interfaces) + len(self.connecting) < self.num_server: self.start_random_interface() if now - self.nodes_retry_time > NODES_RETRY_INTERVAL: self.print_error('network: retrying connections') self.disconnected_servers = set([]) self.nodes_retry_time = now + # main interface if not self.is_connected(): if self.auto_connect: @@ -517,7 +574,8 @@ class Network(util.DaemonThread): def request_chunk(self, interface, data, idx): interface.print_error("requesting chunk %d" % idx) - interface.send_request({'method':'blockchain.block.get_chunk', 'params':[idx]}) + interface.queue_request({'method':'blockchain.block.get_chunk', + 'params':[idx]}) data['chunk_idx'] = idx data['req_time'] = time.time() @@ -537,7 +595,8 @@ class Network(util.DaemonThread): def request_header(self, interface, data, height): interface.print_error("requesting header %d" % height) - interface.send_request({'method':'blockchain.block.get_header', 'params':[height]}) + interface.queue_request({'method':'blockchain.block.get_header', + 'params':[height]}) data['header_height'] = height data['req_time'] = time.time() if not 'chain' in data: @@ -563,7 +622,9 @@ class Network(util.DaemonThread): self.request_header(interface, data, next_height) def bc_request_headers(self, interface, data): - '''Send a request for the next header, or a chunk of them, if necessary''' + '''Send a request for the next header, or a chunk of them, + if necessary. + ''' local_height, if_height = self.get_local_height(), data['if_height'] if if_height <= local_height: return False @@ -575,11 +636,12 @@ class Network(util.DaemonThread): def handle_bc_requests(self): '''Work through each interface that has notified us of a new header. - Send it requests if it is ahead of our blockchain object''' + Send it requests if it is ahead of our blockchain object. + ''' while self.bc_requests: interface, data = self.bc_requests.popleft() # If the connection was lost move on - if not interface.is_connected(): + if not interface in self.interfaces.values(): continue req_time = data.get('req_time') @@ -590,36 +652,39 @@ class Network(util.DaemonThread): continue elif time.time() - req_time > 10: interface.print_error("blockchain request timed out") - interface.stop() + self.connection_down(interface.server) continue # Put updated request state back at head of deque self.bc_requests.appendleft((interface, data)) break + def wait_on_sockets(self): + # Python docs say Windows doesn't like empty selects. + # Sleep to prevent busy looping + if not self.interfaces: + time.sleep(0.1) + return + rin = [i for i in self.interfaces.values()] + win = [i for i in self.interfaces.values() if i.unsent_requests] + rout, wout, xout = select.select(rin, win, [], 0.1) + assert not xout + for interface in wout: + interface.send_requests() + for interface in rout: + self.process_responses(interface) + def run(self): self.blockchain.init() while self.is_running(): - self.check_interfaces() - self.handle_requests() + self.maintain_sockets() + self.wait_on_sockets() + self.handle_incoming_requests() self.handle_bc_requests() - try: - i, response = self.queue.get(timeout=0.1) - except Queue.Empty: - continue - - # if response is None it is a notification about the interface - if response is None: - self.process_if_notification(i) - else: - self.process_response(i, response) self.stop_network() self.print_error("stopped") - def on_header(self, i, r): - header = r.get('result') - if not header: - return + def on_header(self, i, header): height = header.get('block_height') if not height: return