Browse Source

print_error methods. request timeout for interface

283
ThomasV 10 years ago
parent
commit
f32f1183fc
  1. 61
      lib/interface.py
  2. 30
      lib/network.py

61
lib/interface.py

@ -26,7 +26,7 @@ import requests
ca_path = requests.certs.where() ca_path = requests.certs.where()
from version import ELECTRUM_VERSION, PROTOCOL_VERSION from version import ELECTRUM_VERSION, PROTOCOL_VERSION
from util import print_error, print_msg import util
from simple_config import SimpleConfig from simple_config import SimpleConfig
import x509 import x509
@ -63,9 +63,12 @@ class TcpInterface(threading.Thread):
self.port = int(self.port) self.port = int(self.port)
self.use_ssl = (self.protocol == 's') self.use_ssl = (self.protocol == 's')
def print_error(self, *msg):
util.print_error("[%s]"%self.host, *msg)
def process_response(self, response): def process_response(self, response):
if self.debug: if self.debug:
print_error("<--", response) self.print_error("<--", response)
msg_id = response.get('id') msg_id = response.get('id')
error = response.get('error') error = response.get('error')
@ -134,7 +137,7 @@ class TcpInterface(threading.Thread):
try: try:
l = socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM) l = socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM)
except socket.gaierror: except socket.gaierror:
print_error("error: cannot resolve", self.host) self.print_error("cannot resolve hostname")
return return
for res in l: for res in l:
try: try:
@ -144,7 +147,7 @@ class TcpInterface(threading.Thread):
except BaseException as e: except BaseException as e:
continue continue
else: else:
print_error("failed to connect", self.host, self.port, str(e)) self.print_error("failed to connect", str(e))
def get_socket(self): def get_socket(self):
@ -161,7 +164,7 @@ class TcpInterface(threading.Thread):
except ssl.SSLError, e: except ssl.SSLError, e:
s = None s = None
if s and self.check_host_name(s.getpeercert(), self.host): if s and self.check_host_name(s.getpeercert(), self.host):
print_error("SSL certificate signed by CA:", self.host) self.print_error("SSL certificate signed by CA")
return s return s
# get server certificate. # get server certificate.
@ -170,7 +173,7 @@ class TcpInterface(threading.Thread):
try: try:
s = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_SSLv23, cert_reqs=ssl.CERT_NONE, ca_certs=None) s = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_SSLv23, cert_reqs=ssl.CERT_NONE, ca_certs=None)
except ssl.SSLError, e: except ssl.SSLError, e:
print_error("SSL error retrieving SSL certificate:", self.host, e) self.print_error("SSL error retrieving SSL certificate:", e)
return return
dercert = s.getpeercert(True) dercert = s.getpeercert(True)
@ -199,7 +202,7 @@ class TcpInterface(threading.Thread):
ca_certs= (temporary_path if is_new else cert_path), ca_certs= (temporary_path if is_new else cert_path),
do_handshake_on_connect=True) do_handshake_on_connect=True)
except ssl.SSLError, e: except ssl.SSLError, e:
print_error("SSL error:", self.host, e) self.print_error("SSL error:", e)
if e.errno != 1: if e.errno != 1:
return return
if is_new: if is_new:
@ -216,25 +219,25 @@ class TcpInterface(threading.Thread):
x.slow_parse() x.slow_parse()
except: except:
traceback.print_exc(file=sys.stderr) traceback.print_exc(file=sys.stderr)
print_error("wrong certificate", self.host) self.print_error("wrong certificate")
return return
try: try:
x.check_date() x.check_date()
except: except:
print_error("certificate has expired:", cert_path) self.print_error("certificate has expired:", cert_path)
os.unlink(cert_path) os.unlink(cert_path)
return return
print_error("wrong certificate", self.host) self.print_error("wrong certificate")
return return
except BaseException, e: except BaseException, e:
print_error(self.host, e) self.print_error(e)
if e.errno == 104: if e.errno == 104:
return return
traceback.print_exc(file=sys.stderr) traceback.print_exc(file=sys.stderr)
return return
if is_new: if is_new:
print_error("saving certificate for", self.host) self.print_error("saving certificate")
os.rename(temporary_path, cert_path) os.rename(temporary_path, cert_path)
return s return s
@ -249,9 +252,9 @@ class TcpInterface(threading.Thread):
r = {'id':self.message_id, 'method':method, 'params':params} r = {'id':self.message_id, 'method':method, 'params':params}
self.pipe.send(r) self.pipe.send(r)
if self.debug: if self.debug:
print_error("-->", r) self.print_error("-->", r)
except socket.error, e: except socket.error, e:
print_error("socked error:", self.server, e) self.print_error("socked error:", e)
self.is_connected = False self.is_connected = False
return return
self.unanswered_requests[self.message_id] = method, params, _id, queue self.unanswered_requests[self.message_id] = method, params, _id, queue
@ -262,6 +265,7 @@ class TcpInterface(threading.Thread):
self.s.shutdown(socket.SHUT_RDWR) self.s.shutdown(socket.SHUT_RDWR)
self.s.close() self.s.close()
self.is_connected = False self.is_connected = False
self.print_error("stopped")
def start(self, response_queue): def start(self, response_queue):
self.response_queue = response_queue self.response_queue = response_queue
@ -273,35 +277,50 @@ class TcpInterface(threading.Thread):
self.pipe = util.SocketPipe(self.s) self.pipe = util.SocketPipe(self.s)
self.s.settimeout(2) self.s.settimeout(2)
self.is_connected = True self.is_connected = True
print_error("connected to", self.host, self.port) self.print_error("connected")
self.change_status() self.change_status()
if not self.is_connected: if not self.is_connected:
return return
t = 0 # ping timer
ping_time = 0
# request timer
request_time = False
while self.is_connected: while self.is_connected:
# ping the server with server.version # ping the server with server.version
if time.time() - t > 60: if time.time() - ping_time > 60:
if self.is_ping: if self.is_ping:
print_error("ping timeout", self.server) self.print_error("ping timeout")
self.is_connected = False self.is_connected = False
break break
else: else:
self.send_request({'method':'server.version', 'params':[ELECTRUM_VERSION, PROTOCOL_VERSION]}) self.send_request({'method':'server.version', 'params':[ELECTRUM_VERSION, PROTOCOL_VERSION]})
self.is_ping = True self.is_ping = True
t = time.time() ping_time = time.time()
try: try:
response = self.pipe.get() response = self.pipe.get()
except util.timeout: except util.timeout:
if self.unanswered_requests:
if request_time is False:
request_time = time.time()
self.print_error("setting timer")
else:
if time.time() - request_time > 10:
self.print_error("request timeout", len(self.unanswered_requests))
self.is_connected = False
break
continue continue
if response is None: if response is None:
self.is_connected = False self.is_connected = False
break break
if request_time is not False:
self.print_error("stopping timer")
request_time = False
self.process_response(response) self.process_response(response)
self.change_status() self.change_status()
print_error("closing connection:", self.server)
def change_status(self): def change_status(self):
# print_error( "change status", self.server, self.is_connected) # print_error( "change status", self.server, self.is_connected)
@ -329,7 +348,7 @@ def check_cert(host, cert):
m = "host: %s\n"%host m = "host: %s\n"%host
m += "has_expired: %s\n"% expired m += "has_expired: %s\n"% expired
print_msg(m) util.print_msg(m)
def test_certificates(): def test_certificates():

30
lib/network.py

@ -9,7 +9,7 @@ import traceback
import socks import socks
import socket import socket
from util import user_dir, print_error, print_msg import util
from bitcoin import * from bitcoin import *
import interface import interface
from blockchain import Blockchain from blockchain import Blockchain
@ -166,17 +166,19 @@ class Network(threading.Thread):
self.addresses = {} self.addresses = {}
self.connection_status = 'connecting' self.connection_status = 'connecting'
self.requests_queue = Queue.Queue() self.requests_queue = Queue.Queue()
self.set_proxy(deserialize_proxy(self.config.get('proxy'))) self.set_proxy(deserialize_proxy(self.config.get('proxy')))
def print_error(self, *msg):
util.print_error("[network]", *msg)
def get_server_height(self): def get_server_height(self):
return self.heights.get(self.default_server, 0) return self.heights.get(self.default_server, 0)
def server_is_lagging(self): def server_is_lagging(self):
h = self.get_server_height() h = self.get_server_height()
if not h: if not h:
print_error('no height for main interface') self.print_error('no height for main interface')
return False return False
lag = self.get_local_height() - self.get_server_height() lag = self.get_local_height() - self.get_server_height()
return lag > 1 return lag > 1
@ -297,7 +299,7 @@ class Network(threading.Thread):
return return
if self.proxy != proxy or self.protocol != protocol: if self.proxy != proxy or self.protocol != protocol:
print_error('restarting network') self.print_error('restarting network')
for i in self.interfaces.values(): for i in self.interfaces.values():
i.stop() i.stop()
self.interfaces.pop(i.server) self.interfaces.pop(i.server)
@ -329,7 +331,7 @@ class Network(threading.Thread):
def switch_to_interface(self, interface): def switch_to_interface(self, interface):
server = interface.server server = interface.server
print_error("switching to", server) self.print_error("switching to", server)
self.interface = interface self.interface = interface
self.config.set_key('server', server, False) self.config.set_key('server', server, False)
self.default_server = server self.default_server = server
@ -386,7 +388,7 @@ class Network(threading.Thread):
def new_blockchain_height(self, blockchain_height, i): def new_blockchain_height(self, blockchain_height, i):
if self.is_connected(): if self.is_connected():
if self.server_is_lagging(): if self.server_is_lagging():
print_error( "Server is lagging", blockchain_height, self.get_server_height()) self.print_error("Server is lagging", blockchain_height, self.get_server_height())
if self.config.get('auto_cycle'): if self.config.get('auto_cycle'):
self.set_server(i.server) self.set_server(i.server)
self.notify('updated') self.notify('updated')
@ -429,7 +431,7 @@ class Network(threading.Thread):
except BaseException as e: except BaseException as e:
out['error'] = str(e) out['error'] = str(e)
traceback.print_exc(file=sys.stdout) traceback.print_exc(file=sys.stdout)
print_error("network error", str(e)) self.print_error("network error", str(e))
self.response_queue.put(out) self.response_queue.put(out)
return return
@ -444,7 +446,7 @@ class Network(threading.Thread):
self.interface.send_request(request) self.interface.send_request(request)
except: except:
# put it back in the queue # put it back in the queue
print_error("warning: interface not ready for", request) self.print_error("warning: interface not ready for", request)
self.requests_queue.put(request) self.requests_queue.put(request)
time.sleep(0.1) time.sleep(0.1)
@ -458,7 +460,7 @@ class Network(threading.Thread):
self.start_random_interface() self.start_random_interface()
if not self.interfaces: if not self.interfaces:
if time.time() - disconnected_time > DISCONNECTED_RETRY_INTERVAL: if time.time() - disconnected_time > DISCONNECTED_RETRY_INTERVAL:
print_error('network: retrying connections') self.print_error('network: retrying connections')
self.disconnected_servers = set([]) self.disconnected_servers = set([])
disconnected_time = time.time() disconnected_time = time.time()
if not self.interface.is_connected: if not self.interface.is_connected:
@ -470,7 +472,7 @@ class Network(threading.Thread):
self.switch_to_interface(self.interfaces[self.default_server]) self.switch_to_interface(self.interfaces[self.default_server])
else: else:
if self.default_server not in self.disconnected_servers and self.default_server not in self.pending_servers: if self.default_server not in self.disconnected_servers and self.default_server not in self.pending_servers:
print_error("forcing reconnection") self.print_error("forcing reconnection")
self.interface = self.start_interface(self.default_server) self.interface = self.start_interface(self.default_server)
continue continue
@ -487,7 +489,7 @@ class Network(threading.Thread):
self.add_recent_server(i) self.add_recent_server(i)
i.send_request({'method':'blockchain.headers.subscribe','params':[]}) i.send_request({'method':'blockchain.headers.subscribe','params':[]})
if i == self.interface: if i == self.interface:
print_error('sending subscriptions to', self.interface.server) self.print_error('sending subscriptions to', self.interface.server)
self.send_subscriptions() self.send_subscriptions()
self.set_status('connected') self.set_status('connected')
else: else:
@ -499,7 +501,7 @@ class Network(threading.Thread):
self.set_status('disconnected') self.set_status('disconnected')
self.disconnected_servers.add(i.server) self.disconnected_servers.add(i.server)
print_error("Network: Stopping interfaces") self.print_error("stopping interfaces")
for i in self.interfaces.values(): for i in self.interfaces.values():
i.stop() i.stop()
@ -519,7 +521,7 @@ class Network(threading.Thread):
if i == self.interface: if i == self.interface:
if self.server_is_lagging() and self.config.get('auto_cycle'): if self.server_is_lagging() and self.config.get('auto_cycle'):
print_error( "Server lagging, stopping interface") self.print_error("Server lagging, stopping interface")
self.stop_interface() self.stop_interface()
self.notify('updated') self.notify('updated')
@ -539,7 +541,7 @@ class Network(threading.Thread):
self.response_queue.put(r) self.response_queue.put(r)
def stop(self): def stop(self):
print_error("stopping network") self.print_error("stopping network")
with self.lock: with self.lock:
self.running = False self.running = False

Loading…
Cancel
Save