From 2da0c0b77e109ba48321519870e78e223f291403 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Sun, 21 Oct 2012 02:57:31 +0200 Subject: [PATCH] big refactoring of the interface addition of the wallet verifier class for SPV --- electrum | 11 +- lib/__init__.py | 5 +- lib/gui_lite.py | 2 +- lib/gui_qt.py | 6 +- lib/interface.py | 342 ++++++++++++++++++++--------------------------- lib/wallet.py | 264 ++++++++++++++++++++++++++++++++---- scripts/blocks | 2 +- scripts/peers | 4 +- scripts/servers | 20 ++- 9 files changed, 411 insertions(+), 245 deletions(-) diff --git a/electrum b/electrum index 267baab33..4e842f4a0 100755 --- a/electrum +++ b/electrum @@ -36,9 +36,9 @@ except ImportError: sys.exit("Error: AES does not seem to be installed. Try 'sudo pip install slowaes'") try: - from lib import Wallet, WalletSynchronizer, format_satoshis, mnemonic, SimpleConfig, pick_random_server + from lib import Wallet, Interface, WalletSynchronizer, WalletVerifier, format_satoshis, mnemonic, SimpleConfig, pick_random_server except ImportError: - from electrum import Wallet, WalletSynchronizer, format_satoshis, mnemonic, SimpleConfig, pick_random_server + from electrum import Wallet, Interface, WalletSynchronizer, WalletVerifier, format_satoshis, mnemonic, SimpleConfig, pick_random_server from decimal import Decimal @@ -185,8 +185,11 @@ if __name__ == '__main__': sys.exit("Error: Unknown GUI: " + pref_gui ) gui = gui.ElectrumGui(wallet, config) - interface = WalletSynchronizer(wallet, config, True, gui.server_list_changed) - interface.start() + wallet.interface = Interface(config, True, gui.server_list_changed) + wallet.interface.start() + + WalletSynchronizer(wallet, config).start() + WalletVerifier(wallet, config).start() try: found = config.wallet_file_exists diff --git a/lib/__init__.py b/lib/__init__.py index 70fd32f7c..258a66129 100644 --- a/lib/__init__.py +++ b/lib/__init__.py @@ -1,4 +1,5 @@ -from wallet import Wallet, format_satoshis -from interface import WalletSynchronizer, Interface, pick_random_server, DEFAULT_SERVERS +from util import format_satoshis +from wallet import Wallet, WalletSynchronizer, WalletVerifier +from interface import Interface, pick_random_server, DEFAULT_SERVERS from simple_config import SimpleConfig import bitcoin diff --git a/lib/gui_lite.py b/lib/gui_lite.py index ed131173b..02eefcc13 100644 --- a/lib/gui_lite.py +++ b/lib/gui_lite.py @@ -800,7 +800,7 @@ class MiniDriver(QObject): self.wallet = wallet self.window = window - self.wallet.register_callback(self.update_callback) + self.wallet.interface.register_callback(self.update_callback) self.state = None diff --git a/lib/gui_qt.py b/lib/gui_qt.py index f93ee3c17..38b65b39a 100644 --- a/lib/gui_qt.py +++ b/lib/gui_qt.py @@ -207,7 +207,7 @@ class ElectrumWindow(QMainWindow): QMainWindow.__init__(self) self.wallet = wallet self.config = config - self.wallet.register_callback(self.update_callback) + self.wallet.interface.register_callback(self.update_callback) self.detailed_view = config.get('qt_detailed_view', False) @@ -1577,7 +1577,7 @@ class ElectrumGui: wallet.init_mpk( wallet.seed ) wallet.up_to_date_event.clear() wallet.up_to_date = False - wallet.interface.poke() + wallet.interface.poke('synchronizer') waiting_dialog(waiting) # run a dialog indicating the seed, ask the user to remember it ElectrumWindow.show_seed_dialog(wallet) @@ -1589,7 +1589,7 @@ class ElectrumGui: wallet.init_mpk( wallet.seed ) wallet.up_to_date_event.clear() wallet.up_to_date = False - wallet.interface.poke() + wallet.interface.poke('synchronizer') waiting_dialog(waiting) if wallet.is_found(): # history and addressbook diff --git a/lib/interface.py b/lib/interface.py index c1f243a66..729f0378d 100644 --- a/lib/interface.py +++ b/lib/interface.py @@ -28,11 +28,11 @@ DEFAULT_TIMEOUT = 5 DEFAULT_SERVERS = [ 'electrum.novit.ro:50001:t', 'electrum.pdmc.net:50001:t', - #'ecdsa.org:50002:s', + 'ecdsa.org:50001:t', 'electrum.bitcoins.sk:50001:t', 'uncle-enzo.info:50001:t', 'electrum.bytesized-hosting.com:50001:t', - 'california.stratum.bitcoin.cz:50001:t', + 'electrum.bitcoin.cz:50001:t', 'electrum.bitfoo.org:50001:t' ] @@ -42,24 +42,22 @@ proxy_modes = ['socks4', 'socks5', 'http'] def pick_random_server(): return random.choice( DEFAULT_SERVERS ) -def pick_random_interface(config): - servers = DEFAULT_SERVERS - while servers: - server = random.choice( servers ) - servers.remove(server) - config.set_key('server', server, False) - i = Interface(config) - if i.is_connected: - return i - raise BaseException('no server available') -class InterfaceAncestor(threading.Thread): +class Interface(threading.Thread): - def __init__(self, host, port, proxy=None, use_ssl=True): - threading.Thread.__init__(self) - self.daemon = True + def register_callback(self, update_callback): + with self.lock: + self.update_callbacks.append(update_callback) + + def trigger_callbacks(self): + with self.lock: + callbacks = self.update_callbacks[:] + [update() for update in callbacks] + + + def init_server(self, host, port, proxy=None, use_ssl=True): self.host = host self.port = port self.proxy = proxy @@ -74,13 +72,9 @@ class InterfaceAncestor(threading.Thread): #json self.message_id = 0 - self.responses = Queue.Queue() self.unanswered_requests = {} - def poke(self): - # push a fake response so that the getting thread exits its loop - self.responses.put(None) def queue_json_response(self, c): @@ -95,12 +89,19 @@ class InterfaceAncestor(threading.Thread): return if msg_id is not None: - method, params = self.unanswered_requests.pop(msg_id) + with self.lock: + method, params, channel = self.unanswered_requests.pop(msg_id) result = c.get('result') else: - # notification + # notification. we should find the channel(s).. method = c.get('method') params = c.get('params') + with self.lock: + for k,v in self.subscriptions.items(): + if (method, params) in v: + channel = k + else: + raise if method == 'blockchain.numblocks.subscribe': result = params[0] @@ -111,32 +112,29 @@ class InterfaceAncestor(threading.Thread): result = params[1] params = [addr] - self.responses.put({'method':method, 'params':params, 'result':result, 'id':msg_id}) - - + response_queue = self.responses[channel] + response_queue.put({'method':method, 'params':params, 'result':result, 'id':msg_id}) - def subscribe(self, addresses): - messages = [] - for addr in addresses: - messages.append(('blockchain.address.subscribe', [addr])) - self.send(messages) + def get_response(self, channel='default', block=True, timeout=10000000000): + return self.responses[channel].get(block, timeout) + def register_channel(self, channel): + with self.lock: + self.responses[channel] = Queue.Queue() + def poke(self, channel): + self.responses[channel].put(None) -class HttpStratumInterface(InterfaceAncestor): - """ non-persistent connection. synchronous calls""" - def __init__(self, host, port, proxy=None, use_ssl=True): - InterfaceAncestor.__init__(self, host, port, proxy, use_ssl) + def init_http(self, host, port, proxy=None, use_ssl=True): + self.init_server(host, port, proxy, use_ssl) self.session_id = None self.connection_msg = ('https' if self.use_ssl else 'http') + '://%s:%d'%( self.host, self.port ) - def get_history(self, address): - self.send([('blockchain.address.get_history', [address] )]) - def run(self): + def run_http(self): self.is_connected = True while self.is_connected: try: @@ -152,13 +150,13 @@ class HttpStratumInterface(InterfaceAncestor): break self.is_connected = False - self.poke() def poll(self): self.send([]) - def send(self, messages): + + def send_http(self, messages, channel='default'): import urllib2, json, time, cookielib if self.proxy: @@ -177,7 +175,7 @@ class HttpStratumInterface(InterfaceAncestor): method, params = m if type(params) != type([]): params = [params] data.append( { 'method':method, 'id':self.message_id, 'params':params } ) - self.unanswered_requests[self.message_id] = method, params + self.unanswered_requests[self.message_id] = method, params, channel self.message_id += 1 if data: @@ -221,14 +219,9 @@ class HttpStratumInterface(InterfaceAncestor): -class TcpStratumInterface(InterfaceAncestor): - """json-rpc over persistent TCP connection, asynchronous""" + def init_tcp(self, host, port, proxy=None, use_ssl=True): + self.init_server(host, port, proxy, use_ssl) - def __init__(self, host, port, proxy=None, use_ssl=True): - InterfaceAncestor.__init__(self, host, port, proxy, use_ssl) - self.init_socket() - - def init_socket(self): import ssl global proxy_modes self.connection_msg = "%s:%d"%(self.host,self.port) @@ -251,17 +244,18 @@ class TcpStratumInterface(InterfaceAncestor): s.settimeout(60) self.s = s self.is_connected = True - self.send([('server.version', [ELECTRUM_VERSION])]) except: self.is_connected = False self.s = None - def run(self): + + def run_tcp(self): try: out = '' while self.is_connected: try: msg = self.s.recv(1024) except socket.timeout: + print "timeout" # ping the server with server.version, as a real ping does not exist yet self.send([('server.version', [ELECTRUM_VERSION])]) continue @@ -283,17 +277,16 @@ class TcpStratumInterface(InterfaceAncestor): traceback.print_exc(file=sys.stdout) self.is_connected = False - print "Poking" - self.poke() - def send(self, messages): + + def send_tcp(self, messages, channel='default'): """return the ids of the requests that we sent""" out = '' ids = [] for m in messages: method, params = m request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } ) - self.unanswered_requests[self.message_id] = method, params + self.unanswered_requests[self.message_id] = method, params, channel ids.append(self.message_id) # uncomment to debug # print "-->",request @@ -304,18 +297,55 @@ class TcpStratumInterface(InterfaceAncestor): out = out[sent:] return ids - def get_history(self, addr): - self.send([('blockchain.address.get_history', [addr])]) - -class Interface(TcpStratumInterface, HttpStratumInterface): - - def __init__(self, config = None): + def __init__(self, config=None, loop=False, servers_loaded_callback=None): if config is None: from simple_config import SimpleConfig config = SimpleConfig() + + threading.Thread.__init__(self) + self.daemon = True + self.loop = loop + self.config = config + self.servers_loaded_callback = servers_loaded_callback + + self.subscriptions = {} + self.responses = {} + self.responses['default'] = Queue.Queue() + + self.update_callbacks = [] + self.lock = threading.Lock() + self.init_interface() + + + + def init_interface(self): + if self.config.get('server'): + self.init_with_server(self.config) + else: + print "Using random server..." + servers = DEFAULT_SERVERS + while servers: + server = random.choice( servers ) + servers.remove(server) + self.config.set_key('server', server, False) + self.init_with_server(self.config) + if self.is_connected: break + + if not servers: + raise BaseException('no server available') + + if self.is_connected: + print "Connected to " + self.connection_msg + self.send([('server.version', [ELECTRUM_VERSION])]) + #self.send([('server.banner',[])], 'synchronizer') + else: + print_error("Failed to connect " + self.connection_msg) + + + def init_with_server(self, config): s = config.get('server') host, port, protocol = s.split(':') @@ -327,24 +357,41 @@ class Interface(TcpStratumInterface, HttpStratumInterface): #print protocol, host, port if protocol in 'st': - TcpStratumInterface.__init__(self, host, port, proxy, use_ssl=(protocol=='s')) + self.init_tcp(host, port, proxy, use_ssl=(protocol=='s')) elif protocol in 'gh': - HttpStratumInterface.__init__(self, host, port, proxy, use_ssl=(protocol=='g')) + self.init_http(host, port, proxy, use_ssl=(protocol=='g')) else: raise BaseException('Unknown protocol: %s'%protocol) - def run(self): - if self.protocol in 'st': - TcpStratumInterface.run(self) - else: - HttpStratumInterface.run(self) + def send(self, messages, channel='default'): + + sub = [] + for message in messages: + m, v = message + if m[-10:] == '.subscribe': + sub.append(message) + + if sub: + with self.lock: + if self.subscriptions.get(channel) is None: + self.subscriptions[channel] = [] + self.subscriptions[channel] += sub - def send(self, messages): if self.protocol in 'st': - return TcpStratumInterface.send(self, messages) + with self.lock: + out = self.send_tcp(messages, channel) else: - return HttpStratumInterface.send(self, messages) + # do not use lock, http is synchronous + out = self.send_http(messages, channel) + + return out + + def resend_subscriptions(self): + for channel, messages in self.subscriptions.items(): + if messages: + self.send(messages, channel) + def parse_proxy_options(self, s): @@ -377,12 +424,30 @@ class Interface(TcpStratumInterface, HttpStratumInterface): print "changing server:", server, proxy self.server = server self.proxy = proxy + if self.protocol in 'st': + self.s.shutdown(socket.SHUT_RDWR) + self.s.close() self.is_connected = False # this exits the polling loop - self.poke() - def is_up_to_date(self): - return self.responses.empty() and not self.unanswered_requests + def is_empty(self, channel): + q = self.responses.get(channel) + if q: + return q.empty() + else: + return True + + + def get_pending_requests(self, channel): + result = [] + with self.lock: + for k, v in self.unanswered_requests.items(): + a, b, c = v + if c == channel: result.append(k) + return result + + def is_up_to_date(self, channel): + return self.is_empty(channel) and not self.get_pending_requests(channel) def synchronous_get(self, requests, timeout=100000000): @@ -391,7 +456,7 @@ class Interface(TcpStratumInterface, HttpStratumInterface): id2 = ids[:] res = {} while ids: - r = self.responses.get(True, timeout) + r = self.responses['default'].get(True, timeout) _id = r.get('id') if _id in ids: ids.remove(_id) @@ -403,130 +468,15 @@ class Interface(TcpStratumInterface, HttpStratumInterface): + def run(self): + while True: + self.run_tcp() if self.protocol in 'st' else self.run_http() + self.trigger_callbacks() + if not self.loop: break -class WalletSynchronizer(threading.Thread): - - def __init__(self, wallet, config, loop=False, servers_loaded_callback=None): - threading.Thread.__init__(self) - self.daemon = True - self.wallet = wallet - self.loop = loop - self.config = config - self.init_interface() - self.servers_loaded_callback = servers_loaded_callback - - def init_interface(self): - if self.config.get('server'): - self.interface = Interface(self.config) - else: - print "Using random server..." - self.interface = pick_random_interface(self.config) - - if self.interface.is_connected: - print "Connected to " + self.interface.connection_msg - else: - print_error("Failed to connect " + self.interface.connection_msg) - - self.wallet.interface = self.interface - - def handle_response(self, r): - if r is None: - return - - method = r['method'] - params = r['params'] - result = r['result'] - - if method == 'server.banner': - self.wallet.banner = result - self.wallet.was_updated = True - - elif method == 'server.peers.subscribe': - servers = [] - for item in result: - s = [] - host = item[1] - ports = [] - version = None - if len(item) > 2: - for v in item[2]: - if re.match("[stgh]\d+", v): - ports.append((v[0], v[1:])) - if re.match("v(.?)+", v): - version = v[1:] - if ports and version: - servers.append((host, ports)) - self.interface.servers = servers - # servers_loaded_callback is None for commands, but should - # NEVER be None when using the GUI. - if self.servers_loaded_callback is not None: - self.servers_loaded_callback() - - elif method == 'blockchain.address.subscribe': - addr = params[0] - self.wallet.receive_status_callback(addr, result) - - elif method == 'blockchain.address.get_history': - addr = params[0] - self.wallet.receive_history_callback(addr, result) - self.wallet.was_updated = True - - elif method == 'blockchain.transaction.broadcast': - self.wallet.tx_result = result - self.wallet.tx_event.set() - - elif method == 'blockchain.numblocks.subscribe': - self.wallet.blocks = result - self.wallet.was_updated = True - - elif method == 'server.version': - pass - - else: - print_error("Error: Unknown message:" + method + ", " + params + ", " + result) - - - def start_interface(self): - self.interface.start() - if self.interface.is_connected: - self.wallet.start_session(self.interface) - + time.sleep(5) + self.init_interface() + self.resend_subscriptions() - def run(self): - import socket, time - self.start_interface() - while True: - while self.interface.is_connected: - new_addresses = self.wallet.synchronize() - if new_addresses: - self.interface.subscribe(new_addresses) - - if self.interface.is_up_to_date(): - if not self.wallet.up_to_date: - self.wallet.up_to_date = True - self.wallet.was_updated = True - self.wallet.up_to_date_event.set() - else: - if self.wallet.up_to_date: - self.wallet.up_to_date = False - self.wallet.was_updated = True - - if self.wallet.was_updated: - self.wallet.trigger_callbacks() - self.wallet.was_updated = False - - response = self.interface.responses.get() - self.handle_response(response) - - self.wallet.trigger_callbacks() - if self.loop: - time.sleep(5) - # Server has been changed. Copy callback for new interface. - self.proxy = self.interface.proxy - self.init_interface() - self.start_interface() - continue - else: - break diff --git a/lib/wallet.py b/lib/wallet.py index 2e5cc780e..464735c7c 100644 --- a/lib/wallet.py +++ b/lib/wallet.py @@ -28,6 +28,7 @@ import threading import random import aes import ecdsa +import Queue from ecdsa.util import string_to_number, number_to_string from util import print_error, user_dir, format_satoshis @@ -50,7 +51,6 @@ class Wallet: self.config = config self.electrum_version = ELECTRUM_VERSION - self.update_callbacks = [] # saved fields self.seed_version = config.get('seed_version', SEED_VERSION) @@ -94,16 +94,6 @@ class Wallet: raise ValueError("This wallet seed is deprecated. Please run upgrade.py for a diagnostic.") - def register_callback(self, update_callback): - with self.lock: - self.update_callbacks.append(update_callback) - - def trigger_callbacks(self): - with self.lock: - callbacks = self.update_callbacks[:] - [update() for update in callbacks] - - def import_key(self, keypair, password): address, key = keypair.split(':') if not self.is_valid(address): @@ -480,7 +470,8 @@ class Wallet: return s def get_status(self, address): - h = self.history.get(address) + with self.lock: + h = self.history.get(address) if not h: status = None else: @@ -490,11 +481,6 @@ class Wallet: status = status + ':%d'% len(h) return status - def receive_status_callback(self, addr, status): - with self.lock: - if self.get_status(addr) != status: - #print "updating status for", addr, status - self.interface.get_history(addr) def receive_history_callback(self, addr, data): #print "updating history for", addr @@ -504,10 +490,26 @@ class Wallet: self.save() def get_tx_history(self): - lines = self.tx_history.values() + with self.lock: + lines = self.tx_history.values() lines = sorted(lines, key=operator.itemgetter("timestamp")) return lines + def get_tx_hashes(self): + with self.lock: + hashes = self.tx_history.keys() + return hashes + + def get_transactions_at_height(self, height): + with self.lock: + values = self.tx_history.values()[:] + + out = [] + for tx in values: + if tx['height'] == height: + out.append(tx['tx_hash']) + return out + def update_tx_history(self): self.tx_history= {} for addr in self.all_addresses(): @@ -751,12 +753,6 @@ class Wallet: self.up_to_date_event.wait(10000000000) - def start_session(self, interface): - self.interface = interface - self.interface.send([('server.banner',[]), ('blockchain.numblocks.subscribe',[]), ('server.peers.subscribe',[])]) - self.interface.subscribe(self.all_addresses()) - - def freeze(self,addr): if addr in self.all_addresses() and addr not in self.frozen_addresses: self.unprioritize(addr) @@ -816,3 +812,223 @@ class Wallet: for k, v in s.items(): self.config.set_key(k,v) self.config.save() + + + + + + +class WalletSynchronizer(threading.Thread): + + + def __init__(self, wallet, config): + threading.Thread.__init__(self) + self.daemon = True + self.wallet = wallet + self.interface = self.wallet.interface + self.interface.register_channel('synchronizer') + + + def synchronize_wallet(self): + new_addresses = self.wallet.synchronize() + if new_addresses: + self.subscribe_to_addresses(new_addresses) + + if self.interface.is_up_to_date('synchronizer'): + if not self.wallet.up_to_date: + self.wallet.up_to_date = True + self.wallet.was_updated = True + self.wallet.up_to_date_event.set() + else: + if self.wallet.up_to_date: + self.wallet.up_to_date = False + self.wallet.was_updated = True + + if self.wallet.was_updated: + self.interface.trigger_callbacks() + self.wallet.was_updated = False + + + def subscribe_to_addresses(self, addresses): + messages = [] + for addr in addresses: + messages.append(('blockchain.address.subscribe', [addr])) + self.interface.send( messages, 'synchronizer') + + + def run(self): + + # subscriptions + self.interface.send([('blockchain.numblocks.subscribe',[]), ('server.peers.subscribe',[])], 'synchronizer') + self.subscribe_to_addresses(self.wallet.all_addresses()) + + while True: + # 1. send new requests + self.synchronize_wallet() + + # 2. get a response + r = self.interface.get_response('synchronizer') + if not r: continue + + # 3. handle response + method = r['method'] + params = r['params'] + result = r['result'] + + if method == 'blockchain.address.subscribe': + addr = params[0] + if self.wallet.get_status(addr) != result: + self.interface.send([('blockchain.address.get_history', [address] )]) + + elif method == 'blockchain.address.get_history': + addr = params[0] + self.wallet.receive_history_callback(addr, result) + self.wallet.was_updated = True + + elif method == 'blockchain.transaction.broadcast': + self.wallet.tx_result = result + self.wallet.tx_event.set() + + elif method == 'blockchain.numblocks.subscribe': + self.wallet.blocks = result + self.wallet.was_updated = True + + elif method == 'server.banner': + self.wallet.banner = result + self.wallet.was_updated = True + + elif method == 'server.peers.subscribe': + servers = [] + for item in result: + s = [] + host = item[1] + ports = [] + version = None + if len(item) > 2: + for v in item[2]: + if re.match("[stgh]\d+", v): + ports.append((v[0], v[1:])) + if re.match("v(.?)+", v): + version = v[1:] + if ports and version: + servers.append((host, ports)) + self.interface.servers = servers + + # servers_loaded_callback is None for commands, but should + # NEVER be None when using the GUI. + #if self.servers_loaded_callback is not None: + # self.servers_loaded_callback() + + elif method == 'server.version': + pass + + else: + print_error("Error: Unknown message:" + method + ", " + params + ", " + result) + + +encode = lambda x: x[::-1].encode('hex') +decode = lambda x: x.decode('hex')[::-1] +from bitcoin import Hash, rev_hex, int_to_hex + +class WalletVerifier(threading.Thread): + + def __init__(self, wallet, config): + threading.Thread.__init__(self) + self.daemon = True + self.wallet = wallet + self.interface = self.wallet.interface + self.interface.register_channel('verifier') + self.validated = [] + self.merkle_roots = {} + self.headers = {} + self.lock = threading.Lock() + + def run(self): + requested = [] + + while True: + txlist = self.wallet.get_tx_hashes() + for tx in txlist: + if tx not in requested: + requested.append(tx) + self.request_merkle(tx) + break + try: + r = self.interface.get_response('verifier',timeout=1) + except Queue.Empty: + continue + + # 3. handle response + method = r['method'] + params = r['params'] + result = r['result'] + + if method == 'blockchain.transaction.get_merkle': + tx_hash = params[0] + tx_height = result.get('block_height') + self.merkle_roots[tx_hash] = self.hash_merkle_root(result['merkle'], tx_hash) + # if we already have the header, check merkle root directly + header = self.headers.get(tx_height) + if header: + self.validated.append(tx_hash) + assert header.get('merkle_root') == self.merkle_roots[tx_hash] + self.request_headers(tx_height) + + elif method == 'blockchain.block.get_header': + self.validate_header(result) + + + def request_merkle(self, tx_hash): + self.interface.send([ ('blockchain.transaction.get_merkle',[tx_hash]) ], 'verifier') + + + def request_headers(self, tx_height, delta=10): + headers_requests = [] + for height in range(tx_height-delta,tx_height+delta): # we might can request blocks that do not exist yet + if height not in self.headers: + headers_requests.append( ('blockchain.block.get_header',[height]) ) + self.interface.send(headers_requests,'verifier') + + + def validate_header(self, header): + """ if there is a previous or a next block in the list, check the hash""" + height = header.get('block_height') + with self.lock: + self.headers[height] = header # detect conflicts + prev_header = next_header = None + if height-1 in self.headers: + prev_header = self.headers[height-1] + if height+1 in self.headers: + next_header = self.headers[height+1] + + if prev_header: + prev_hash = self.hash_header(prev_header) + assert prev_hash == header.get('prev_block_hash') + if next_header: + _hash = self.hash_header(header) + assert _hash == next_header.get('prev_block_hash') + + # check if there are transactions at that height + for tx_hash in self.wallet.get_transactions_at_height(height): + if tx_hash in self.validated: continue + # check if we already have the merkle root + merkle_root = self.merkle_roots.get(tx_hash) + if merkle_root: + self.validated.append(tx_hash) + assert header.get('merkle_root') == merkle_root + + def hash_header(self, res): + header = int_to_hex(res.get('version'),4) \ + + rev_hex(res.get('prev_block_hash')) \ + + rev_hex(res.get('merkle_root')) \ + + int_to_hex(int(res.get('timestamp')),4) \ + + int_to_hex(int(res.get('bits')),4) \ + + int_to_hex(int(res.get('nonce')),4) + return rev_hex(Hash(header.decode('hex')).encode('hex')) + + def hash_merkle_root(self, merkle_s, target_hash): + h = decode(target_hash) + for item in merkle_s: + is_left = item[0] == 'L' + h = Hash( h + decode(item[1:]) ) if is_left else Hash( decode(item[1:]) + h ) + return encode(h) diff --git a/scripts/blocks b/scripts/blocks index 67db84a1e..16aa5e7f3 100755 --- a/scripts/blocks +++ b/scripts/blocks @@ -8,7 +8,7 @@ i.send([('blockchain.numblocks.subscribe',[])]) while True: try: - r = i.responses.get(True, 100000000000) + r = i.get_response() except KeyboardInterrupt: break if r.get('method') == 'blockchain.numblocks.subscribe': diff --git a/scripts/peers b/scripts/peers index b3656efb7..38ef6327e 100755 --- a/scripts/peers +++ b/scripts/peers @@ -2,10 +2,10 @@ from electrum import Interface -i = Interface({'server':'electrum.novit.ro:50001:t'}) +i = Interface({'server':'ecdsa.org:50001:t'}) i.start() i.send([('server.peers.subscribe',[])]) while True: - r = i.responses.get(True, 100000000000) + r = i.get_response() print r.get('result') diff --git a/scripts/servers b/scripts/servers index 69c0ade59..a7f4df2c0 100755 --- a/scripts/servers +++ b/scripts/servers @@ -5,12 +5,12 @@ import time, Queue servers = DEFAULT_SERVERS interfaces = map ( lambda server: Interface({'server':server} ), servers ) -results = [] for i in interfaces: if i.is_connected: i.start() i.send([('blockchain.numblocks.subscribe',[])]) + i.status = "timed out" else: servers.remove(i.server) i.status = "unreachable" @@ -18,29 +18,25 @@ for i in interfaces: for i in interfaces: while True: try: - r = i.responses.get(True,1) + r = i.get_response(timeout=1) except Queue.Empty: break if r.get('method') == 'blockchain.numblocks.subscribe': - results.append((i.host, r.get('result'))) - i.status = "ok" servers.remove(i.server) + i.status = "ok" + i.blocks = r.get('result') break -for s in servers: - i.status = "timed out" from collections import defaultdict d = defaultdict(int) -for e in results: - d[e[1]] += 1 +for i in interfaces: + if i.status == 'ok': + d[i.blocks] += 1 v = d.values() numblocks = d.keys()[v.index(max(v))] for i in interfaces: - print i.host, i.status - -for s,n in results: - print "%30s %d "%(s, n), "ok" if abs(n-numblocks)<2 else "lagging" + print "%30s %s "%(i.host, i.status) #, "ok" if abs(n-numblocks)<2 else "lagging"