From 28df27fba2b37237c97500533d5bd86f5bd4ee71 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Mon, 28 Jul 2014 00:13:40 +0200 Subject: [PATCH] update interface.py --- lib/blockchain.py | 4 +- lib/interface.py | 422 +++++++++++++++---------------------------- lib/network.py | 90 +++++---- lib/network_proxy.py | 11 +- lib/synchronizer.py | 4 +- lib/util.py | 52 +++++- 6 files changed, 250 insertions(+), 333 deletions(-) diff --git a/lib/blockchain.py b/lib/blockchain.py index ac5ea1489..ad5bbb526 100644 --- a/lib/blockchain.py +++ b/lib/blockchain.py @@ -289,14 +289,14 @@ class Blockchain(threading.Thread): def request_header(self, i, h, queue): print_error("requesting header %d from %s"%(h, i.server)) - i.send([ ('blockchain.block.get_header',[h])], lambda i,r: queue.put((i,r))) + i.send_request({'method':'blockchain.block.get_header', 'params':[h]}, queue) def retrieve_header(self, i, queue): while True: try: ir = queue.get(timeout=1) except Queue.Empty: - print_error('timeout') + print_error('retrieve_header: timeout', i.server) continue if not ir: diff --git a/lib/interface.py b/lib/interface.py index 7d90c5167..183e94a19 100644 --- a/lib/interface.py +++ b/lib/interface.py @@ -33,7 +33,8 @@ DEFAULT_TIMEOUT = 5 proxy_modes = ['socks4', 'socks5', 'http'] -from util import parse_json +import util + def cert_verify_hostname(s): # hostname verification (disabled) @@ -48,15 +49,11 @@ def cert_verify_hostname(s): class Interface(threading.Thread): - def __init__(self, server, config = None): - threading.Thread.__init__(self) self.daemon = True self.config = config if config is not None else SimpleConfig() self.connect_event = threading.Event() - - self.subscriptions = {} self.lock = threading.Lock() self.rtime = 0 @@ -65,8 +62,6 @@ class Interface(threading.Thread): self.poll_interval = 1 self.debug = False # dump network messages. can be changed at runtime using the console - - #json self.message_id = 0 self.unanswered_requests = {} @@ -91,36 +86,31 @@ class Interface(threading.Thread): self.proxy_mode = proxy_modes.index(self.proxy["mode"]) + 1 - - - def process_response(self, c): - - # uncomment to debug if self.debug: print_error( "<--",c ) msg_id = c.get('id') error = c.get('error') - + result = c.get('result') + if error: print_error("received error:", c) - if msg_id is not None: - with self.lock: - method, params, callback = self.unanswered_requests.pop(msg_id) - callback(self,{'method':method, 'params':params, 'error':error, 'id':msg_id}) - + #queue.put((self,{'method':method, 'params':params, 'error':error, 'id':_id})) return - + if msg_id is not None: - with self.lock: - method, params, callback = self.unanswered_requests.pop(msg_id) - result = c.get('result') + with self.lock: + method, params, _id, queue = self.unanswered_requests.pop(msg_id) + if queue is None: + queue = self.response_queue else: + queue = self.response_queue # notification method = c.get('method') params = c.get('params') + _id = None if method == 'blockchain.numblocks.subscribe': result = params[0] @@ -135,138 +125,16 @@ class Interface(threading.Thread): result = params[1] params = [addr] - with self.lock: - for k,v in self.subscriptions.items(): - if (method, params) in v: - callback = k - break - else: - print_error( "received unexpected notification", method, params) - print_error( self.subscriptions ) - return - - - callback(self, {'method':method, 'params':params, 'result':result, 'id':msg_id}) + queue.put((self, {'method':method, 'params':params, 'result':result, 'id':_id})) def on_version(self, i, result): self.server_version = result - - - def start_http(self): - self.session_id = None - self.is_connected = True - self.connection_msg = ('https' if self.use_ssl else 'http') + '://%s:%d'%( self.host, self.port ) - try: - self.poll() - except Exception: - print_error("http init session failed") - self.is_connected = False - return - - if self.session_id: - print_error('http session:',self.session_id) - self.is_connected = True - else: - self.is_connected = False - - def run_http(self): - self.is_connected = True - while self.is_connected: - try: - if self.session_id: - self.poll() - time.sleep(self.poll_interval) - except socket.gaierror: - break - except socket.error: - break - except Exception: - traceback.print_exc(file=sys.stdout) - break - - self.is_connected = False - - def poll(self): - self.send([], None) - - - def send_http(self, messages, callback): - import urllib2, json, time, cookielib - print_error( "send_http", messages ) - - if self.proxy: - socks.setdefaultproxy(self.proxy_mode, self.proxy["host"], int(self.proxy["port"]) ) - socks.wrapmodule(urllib2) - - cj = cookielib.CookieJar() - opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj)) - urllib2.install_opener(opener) - - t1 = time.time() - - data = [] - ids = [] - for m in messages: - method, params = m - if type(params) != type([]): params = [params] - data.append( { 'method':method, 'id':self.message_id, 'params':params } ) - self.unanswered_requests[self.message_id] = method, params, callback - ids.append(self.message_id) - self.message_id += 1 - - if data: - data_json = json.dumps(data) - else: - # poll with GET - data_json = None - - - headers = {'content-type': 'application/json'} - if self.session_id: - headers['cookie'] = 'SESSION=%s'%self.session_id - - try: - req = urllib2.Request(self.connection_msg, data_json, headers) - response_stream = urllib2.urlopen(req, timeout=DEFAULT_TIMEOUT) - except Exception: - return - - for index, cookie in enumerate(cj): - if cookie.name=='SESSION': - self.session_id = cookie.value - - response = response_stream.read() - self.bytes_received += len(response) - if response: - response = json.loads( response ) - if type(response) is not type([]): - self.process_response(response) - else: - for item in response: - self.process_response(item) - - if response: - self.poll_interval = 1 - else: - if self.poll_interval < 15: - self.poll_interval += 1 - #print self.poll_interval, response - - self.rtime = time.time() - t1 - self.is_connected = True - return ids - - - def start_tcp(self): - self.connection_msg = self.host + ':%d' % self.port - if self.proxy is not None: - socks.setdefaultproxy(self.proxy_mode, self.proxy["host"], int(self.proxy["port"])) socket.socket = socks.socksocket # prevent dns leaks, see http://stackoverflow.com/questions/13184205/dns-over-proxy @@ -276,7 +144,6 @@ class Interface(threading.Thread): if self.use_ssl: cert_path = os.path.join( self.config.path, 'certs', self.host) - if not os.path.exists(cert_path): is_new = True # get server certificate. @@ -387,133 +254,45 @@ class Interface(threading.Thread): self.s = s self.is_connected = True print_error("connected to", self.host, self.port) + self.pipe = util.SocketPipe(s) def run_tcp(self): - try: - #if self.use_ssl: self.s.do_handshake() - message = '' - while self.is_connected: - try: - timeout = False - msg = self.s.recv(1024) - except socket.timeout: - timeout = True - except ssl.SSLError: - timeout = True - except socket.error, err: - if err.errno == 60: - timeout = True - elif err.errno in [11, 10035]: - print_error("socket errno", err.errno) - time.sleep(0.1) - continue - else: - traceback.print_exc(file=sys.stdout) - raise - - if timeout: - # ping the server with server.version, as a real ping does not exist yet - self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])], self.on_version) - continue - - message += msg - self.bytes_received += len(msg) - if msg == '': - self.is_connected = False - - while True: - response, message = parse_json(message) - if response is None: - break - self.process_response(response) - - except Exception: - traceback.print_exc(file=sys.stdout) + t = time.time() + while self.is_connected: + # ping the server with server.version + if time.time() - t > 60: + self.send_request({'method':'server.version', 'params':[ELECTRUM_VERSION, PROTOCOL_VERSION]}) + t = time.time() + try: + response = self.pipe.get() + except util.timeout: + continue + if response is None: + break + self.process_response(response) self.is_connected = False - - - def send_tcp(self, messages, callback): - """return the ids of the requests that we sent""" - out = '' - ids = [] - for m in messages: - method, params = m - request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } ) - self.unanswered_requests[self.message_id] = method, params, callback - ids.append(self.message_id) - if self.debug: - print_error("-->", request) + print_error("exit interface", self.server) + + def send_request(self, request, queue=None): + _id = request.get('id') + method = request.get('method') + params = request.get('params') + with self.lock: + self.pipe.send({'id':self.message_id, 'method':method, 'params':params}) + self.unanswered_requests[self.message_id] = method, params, _id, queue self.message_id += 1 - out += request + '\n' - while out: - try: - sent = self.s.send( out ) - out = out[sent:] - except socket.error,e: - if e[0] in (errno.EWOULDBLOCK,errno.EAGAIN): - print_error( "EAGAIN: retrying") - time.sleep(0.1) - continue - else: - traceback.print_exc(file=sys.stdout) - # this happens when we get disconnected - print_error( "Not connected, cannot send" ) - return None - return ids - - - - + if self.debug: + print_error("-->", request) def start_interface(self): - if self.protocol in 'st': self.start_tcp() elif self.protocol in 'gh': self.start_http() - self.connect_event.set() - - - def stop_subscriptions(self): - for callback in self.subscriptions.keys(): - callback(self, None) - self.subscriptions = {} - - - def send(self, messages, callback): - - sub = [] - for message in messages: - m, v = message - if m[-10:] == '.subscribe': - sub.append(message) - - if sub: - with self.lock: - if self.subscriptions.get(callback) is None: - self.subscriptions[callback] = [] - for message in sub: - if message not in self.subscriptions[callback]: - self.subscriptions[callback].append(message) - - if not self.is_connected: - print_error("interface: trying to send while not connected") - return - - if self.protocol in 'st': - with self.lock: - out = self.send_tcp(messages, callback) - else: - # do not use lock, http is synchronous - out = self.send_http(messages, callback) - - return out - - def parse_proxy_options(self, s): if type(s) == type({}): return s # fixme: type should be fixed if type(s) != type(""): return None @@ -533,43 +312,34 @@ class Interface(threading.Thread): proxy["port"] = "8080" if proxy["mode"] == "http" else "1080" return proxy - - def stop(self): if self.is_connected and self.protocol in 'st' and self.s: self.s.shutdown(socket.SHUT_RDWR) self.s.close() - self.is_connected = False - def is_up_to_date(self): return self.unanswered_requests == {} - - - def start(self, queue = None, wait = False): + def start(self, response_queue, wait = False): if not self.server: return - self.queue = queue if queue else Queue.Queue() + self.response_queue = response_queue threading.Thread.start(self) if wait: self.connect_event.wait() - def run(self): self.start_interface() if self.is_connected: - self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])], self.on_version) + self.send_request({'method':'server.version', 'params':[ELECTRUM_VERSION, PROTOCOL_VERSION]}) self.change_status() self.run_tcp() if self.protocol in 'st' else self.run_http() self.change_status() - def change_status(self): - #print "change status", self.server, self.is_connected - self.queue.put(self) - + # print_error( "change status", self.server, self.is_connected) + self.response_queue.put((self, None)) def synchronous_get(self, requests, timeout=100000000): queue = Queue.Queue() @@ -588,6 +358,112 @@ class Interface(threading.Thread): return out +class HTTP_Interface(Interface): + + def send_request(self, request, queue=None): + import urllib2, json, time, cookielib + print_error( "send_http", messages ) + + if self.proxy: + socks.setdefaultproxy(self.proxy_mode, self.proxy["host"], int(self.proxy["port"]) ) + socks.wrapmodule(urllib2) + + cj = cookielib.CookieJar() + opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj)) + urllib2.install_opener(opener) + + t1 = time.time() + + data = [] + ids = [] + for m in messages: + method, params = m + if type(params) != type([]): params = [params] + data.append( { 'method':method, 'id':self.message_id, 'params':params } ) + self.unanswered_requests[self.message_id] = method, params, callback + ids.append(self.message_id) + self.message_id += 1 + + if data: + data_json = json.dumps(data) + else: + # poll with GET + data_json = None + + + headers = {'content-type': 'application/json'} + if self.session_id: + headers['cookie'] = 'SESSION=%s'%self.session_id + + try: + req = urllib2.Request(self.connection_msg, data_json, headers) + response_stream = urllib2.urlopen(req, timeout=DEFAULT_TIMEOUT) + except Exception: + return + + for index, cookie in enumerate(cj): + if cookie.name=='SESSION': + self.session_id = cookie.value + + response = response_stream.read() + self.bytes_received += len(response) + if response: + response = json.loads( response ) + if type(response) is not type([]): + self.process_response(response) + else: + for item in response: + self.process_response(item) + if response: + self.poll_interval = 1 + else: + if self.poll_interval < 15: + self.poll_interval += 1 + #print self.poll_interval, response + self.rtime = time.time() - t1 + self.is_connected = True + return ids + + def poll(self): + self.send([], None) + + def start_http(self): + self.session_id = None + self.is_connected = True + self.connection_msg = ('https' if self.use_ssl else 'http') + '://%s:%d'%( self.host, self.port ) + try: + self.poll() + except Exception: + print_error("http init session failed") + self.is_connected = False + return + + if self.session_id: + print_error('http session:',self.session_id) + self.is_connected = True + else: + self.is_connected = False + + def run_http(self): + self.is_connected = True + while self.is_connected: + try: + if self.session_id: + self.poll() + time.sleep(self.poll_interval) + except socket.gaierror: + break + except socket.error: + break + except Exception: + traceback.print_exc(file=sys.stdout) + break + + self.is_connected = False + + + + def check_cert(host, cert): diff --git a/lib/network.py b/lib/network.py index e43843bad..acb10f839 100644 --- a/lib/network.py +++ b/lib/network.py @@ -109,16 +109,13 @@ class Network(threading.Thread): if not os.path.exists(dir_path): os.mkdir(dir_path) - # default subscriptions - self.subscriptions = {} - self.subscriptions[self.on_banner] = [('server.banner',[])] - self.subscriptions[self.on_peers] = [('server.peers.subscribe',[])] - self.pending_transactions_for_notifications = [] + # address subscriptions and cached results + self.addresses = {} self.connection_status = 'connecting' self.requests_queue = Queue.Queue() - self.unanswered_requests = {} + def get_server_height(self): return self.heights.get(self.default_server,0) @@ -139,31 +136,16 @@ class Network(threading.Thread): return self.interface and self.interface.is_connected def is_up_to_date(self): + raise return self.interface.is_up_to_date() def send_subscriptions(self): - for cb, sub in self.subscriptions.items(): - self.interface.send(sub, cb) - - def subscribe(self, messages, callback): - with self.lock: - if self.subscriptions.get(callback) is None: - self.subscriptions[callback] = [] - for message in messages: - if message not in self.subscriptions[callback]: - self.subscriptions[callback].append(message) - - if self.is_connected(): - self.interface.send( messages, callback ) + for addr in self.addresses: + self.interface.send_request({'method':'blockchain.address.subscribe', 'params':[addr]}) + self.interface.send_request({'method':'server.banner','params':[]}) + self.interface.send_request({'method':'server.peers.subscribe','params':[]}) - def send(self, messages, callback): - if self.is_connected(): - self.interface.send( messages, callback ) - return True - else: - return False - def get_status_value(self, key): if key == 'status': value = self.connection_status @@ -225,7 +207,7 @@ class Network(threading.Thread): i = interface.Interface(server, self.config) self.pending_servers.add(server) i.start(self.queue) - return i + return i def start_random_interface(self): server = self.random_server() @@ -241,9 +223,9 @@ class Network(threading.Thread): self.running = True self.response_queue = response_queue self.start_interfaces() - threading.Thread.start(self) - threading.Thread(target=self.process_thread).start() + threading.Thread(target=self.process_requests_thread).start() self.blockchain.start() + threading.Thread.start(self) def set_parameters(self, host, port, protocol, proxy, auto_connect): self.config.set_key('auto_cycle', auto_connect, True) @@ -331,15 +313,24 @@ class Network(threading.Thread): self.trigger_callback('updated') - def process_thread(self): + def process_response(self, i, response): + method = response['method'] + if method == 'blockchain.address.subscribe': + self.on_address(i, response) + elif method == 'blockchain.headers.subscribe': + self.on_header(i, response) + elif method == 'server.peers.subscribe': + self.on_peers(i, response) + + def process_requests_thread(self): while self.is_running(): try: request = self.requests_queue.get(timeout=0.1) except Queue.Empty: continue - self.process(request) + self.process_request(request) - def process(self, request): + def process_request(self, request): method = request['method'] params = request['params'] _id = request['id'] @@ -359,32 +350,29 @@ class Network(threading.Thread): self.response_queue.put(out) return - def cb(i,r): - _id = r.get('id') - if _id is not None: - my_id = self.unanswered_requests.pop(_id) - r['id'] = my_id - self.response_queue.put(r) - - try: - new_id = self.interface.send([(method, params)], cb) [0] - except Exception as e: - self.response_queue.put({'id':_id, 'error':str(e)}) - print_error("network interface error", str(e)) - return + if method == 'blockchain.address.subscribe': + addr = params[0] + if addr in self.addresses: + self.response_queue.put({'id':_id, 'result':self.addresses[addr]}) + return - self.unanswered_requests[new_id] = _id + self.interface.send_request(request) def run(self): while self.is_running(): try: - i = self.queue.get(timeout = 30 if self.interfaces else 3) + i, response = self.queue.get(0.1) #timeout = 30 if self.interfaces else 3) except Queue.Empty: if len(self.interfaces) < self.num_server: self.start_random_interface() continue + + if response is not None: + self.process_response(i, response) + continue + # if response is None it is a notification about the interface if i.server in self.pending_servers: self.pending_servers.remove(i.server) @@ -392,7 +380,7 @@ class Network(threading.Thread): #if i.server in self.interfaces: raise self.interfaces[i.server] = i self.add_recent_server(i) - i.send([ ('blockchain.headers.subscribe',[])], self.on_header) + i.send_request({'method':'blockchain.headers.subscribe','params':[]}) if i == self.interface: print_error('sending subscriptions to', self.interface.server) self.send_subscriptions() @@ -439,6 +427,12 @@ class Network(threading.Thread): self.banner = r.get('result') self.trigger_callback('banner') + def on_address(self, i, r): + addr = r.get('params')[0] + result = r.get('result') + self.addresses[addr] = result + self.response_queue.put(r) + def stop(self): with self.lock: self.running = False diff --git a/lib/network_proxy.py b/lib/network_proxy.py index 2eedb0867..0ee216f58 100644 --- a/lib/network_proxy.py +++ b/lib/network_proxy.py @@ -32,6 +32,12 @@ from simple_config import SimpleConfig from daemon import NetworkServer, DAEMON_PORT +# policies +SPAWN_DAEMON=0 +NEED_DAEMON=1 +NO_DAEMON=2 +USE_DAEMON_IF_AVAILABLE=3 + class NetworkProxy(threading.Thread): @@ -119,8 +125,9 @@ class NetworkProxy(threading.Thread): print_error( "received unexpected notification", method, params) return - callback({'method':method, 'params':params, 'result':result, 'id':msg_id}) - + + r = {'method':method, 'params':params, 'result':result, 'id':msg_id} + callback(r) def send(self, messages, callback): diff --git a/lib/synchronizer.py b/lib/synchronizer.py index b73ca476d..5dd569205 100644 --- a/lib/synchronizer.py +++ b/lib/synchronizer.py @@ -54,7 +54,7 @@ class WalletSynchronizer(threading.Thread): messages = [] for addr in addresses: messages.append(('blockchain.address.subscribe', [addr])) - self.network.send(messages, lambda r: self.queue.put(r)) + self.network.send(messages, self.queue.put) def run(self): with self.lock: @@ -147,7 +147,7 @@ class WalletSynchronizer(threading.Thread): addr = params[0] if self.wallet.get_status(self.wallet.get_history(addr)) != result: if requested_histories.get(addr) is None: - self.network.send([('blockchain.address.get_history', [addr])], lambda i,r:self.queue.put(r)) + self.network.send([('blockchain.address.get_history', [addr])], self.queue.put) requested_histories[addr] = result elif method == 'blockchain.address.get_history': diff --git a/lib/util.py b/lib/util.py index 2d13f7f9f..eed6f0667 100644 --- a/lib/util.py +++ b/lib/util.py @@ -230,7 +230,12 @@ def parse_json(message): class timeout(Exception): pass -import socket, json +import socket +import errno +import json +import ssl +import traceback +import time class SocketPipe: @@ -251,8 +256,21 @@ class SocketPipe: data = self.socket.recv(1024) except socket.timeout: raise timeout + except ssl.SSLError: + raise timeout + except socket.error, err: + if err.errno == 60: + raise timeout + elif err.errno in [11, 10035]: + print_error("socket errno", err.errno) + time.sleep(0.1) + continue + else: + traceback.print_exc(file=sys.stdout) + data = '' except: data = '' + if not data: self.socket.close() return None @@ -260,15 +278,37 @@ class SocketPipe: def send(self, request): out = json.dumps(request) + '\n' - while out: - sent = self.socket.send( out ) - out = out[sent:] + self._send(out) def send_all(self, requests): out = ''.join(map(lambda x: json.dumps(x) + '\n', requests)) + self._send(out) + + def _send(self, out): while out: - sent = self.socket.send( out ) - out = out[sent:] + try: + sent = self.socket.send( out ) + out = out[sent:] + except ssl.SSLError as e: + print_error( "SSLError: retry", e) + time.sleep(0.1) + continue + + except socket.error as e: + if e[0] in (errno.EWOULDBLOCK,errno.EAGAIN): + print_error( "EAGAIN: retrying") + time.sleep(0.1) + continue + elif e[0] == 'The write operation timed out': + print_error( "ssl: retry") + time.sleep(0.1) + continue + else: + print repr(e[0]) + traceback.print_exc(file=sys.stdout) + print_error( "Not connected, cannot send" ) + return False + import Queue