From 2a6e8927ddc3e60cf18e01da395031efae032de7 Mon Sep 17 00:00:00 2001 From: thomasv Date: Mon, 2 Sep 2013 17:49:12 +0200 Subject: [PATCH] select the longest blockchain from several servers --- lib/blockchain.py | 243 +++++++++++++++++++++++++++++++--------------- lib/interface.py | 3 +- lib/verifier.py | 12 ++- 3 files changed, 176 insertions(+), 82 deletions(-) diff --git a/lib/blockchain.py b/lib/blockchain.py index 702c88166..e64257459 100644 --- a/lib/blockchain.py +++ b/lib/blockchain.py @@ -29,79 +29,71 @@ class BlockchainVerifier(threading.Thread): threading.Thread.__init__(self) self.daemon = True self.config = config - self.interface = interface - self.interface.register_channel('verifier') self.lock = threading.Lock() - self.pending_headers = [] # headers that have not been verified self.height = 0 self.local_height = 0 self.running = False self.headers_url = 'http://headers.electrum.org/blockchain_headers' + self.interface = interface + interface.register_channel('verifier') + self.set_local_height() + + + + def start_interfaces(self): + import interface + servers = interface.DEFAULT_SERVERS + servers = interface.filter_protocol(servers,'s') + print_error("using %d servers"% len(servers)) + self.interfaces = map ( lambda server: interface.Interface({'server':server} ), servers ) + + for i in self.interfaces: + i.start() + # subscribe to block headers + i.register_channel('verifier') + i.register_channel('get_header') + i.send([ ('blockchain.headers.subscribe',[])], 'verifier') + # note: each interface should send its results directly to a queue, instead of channels + # pass the queue to the interface, so that several can share the same queue + + + def get_new_response(self): + # listen to interfaces, forward to verifier using the queue + while 1: + for i in self.interfaces: + try: + r = i.get_response('verifier',timeout=0) + except Queue.Empty: + continue + + result = r.get('result') + if result: + return (i,result) + + time.sleep(1) + + def stop(self): with self.lock: self.running = False - self.interface.poke('verifier') + #self.interface.poke('verifier') def is_running(self): with self.lock: return self.running - def run(self): - self.init_headers_file() - self.set_local_height() - - with self.lock: - self.running = True - requested_chunks = [] - requested_headers = [] - all_chunks = False - - # subscribe to block headers - self.interface.send([ ('blockchain.headers.subscribe',[])], 'verifier') - - while self.is_running(): - # request missing chunks - if not all_chunks and self.height and not requested_chunks: - - if self.local_height + 50 < self.height: - min_index = (self.local_height + 1)/2016 - max_index = (self.height + 1)/2016 - for i in range(min_index, max_index + 1): - print_error( "requesting chunk", i ) - self.interface.send([ ('blockchain.block.get_chunk',[i])], 'verifier') - requested_chunks.append(i) - break - else: - all_chunks = True - print_error("downloaded all chunks") - - - # process pending headers - if self.pending_headers and all_chunks: - done = [] - for header in self.pending_headers: - if self.verify_header(header): - done.append(header) - else: - # request previous header - i = header.get('block_height') - 1 - if i not in requested_headers: - print_error("requesting header %d"%i) - self.interface.send([ ('blockchain.block.get_header',[i])], 'verifier') - requested_headers.append(i) - # no point continuing - break - if done: - self.interface.trigger_callback('updated') - for header in done: - self.pending_headers.remove(header) + def request_header(self, i, h): + print_error("requesting header %d from %s"%(h, i.server)) + i.send([ ('blockchain.block.get_header',[h])], 'get_header') + def retrieve_header(self, i): + while True: try: - r = self.interface.get_response('verifier',timeout=1) + r = i.get_response('get_header',timeout=1) except Queue.Empty: + print_error('timeout') continue - if not r: continue if r.get('error'): print_error('Verifier received an error:', r) @@ -112,23 +104,66 @@ class BlockchainVerifier(threading.Thread): params = r['params'] result = r['result'] - if method == 'blockchain.block.get_chunk': - index = params[0] - self.verify_chunk(index, result) - requested_chunks.remove(index) + if method == 'blockchain.block.get_header': + return result + - elif method in ['blockchain.headers.subscribe', 'blockchain.block.get_header']: + def get_chain(self, interface, final_header): - self.pending_headers.append(result) - if method == 'blockchain.block.get_header': - requested_headers.remove(result.get('block_height')) - else: - self.height = result.get('block_height') - ## fixme # self.interface.poke('synchronizer') - - self.pending_headers.sort(key=lambda x: x.get('block_height')) - # print "pending headers", map(lambda x: x.get('block_height'), self.pending_headers) + header = final_header + chain = [ final_header ] + requested_header = False + + while self.is_running(): + + if requested_header: + header = self.retrieve_header(interface) + if not header: return + chain = [ header ] + chain + requested_header = False + + height = header.get('block_height') + previous_header = self.read_header(height -1) + if not previous_header: + self.request_header(interface, height - 1) + requested_header = True + continue + + # verify that it connects to my chain + prev_hash = self.hash_header(previous_header) + if prev_hash != header.get('prev_block_hash'): + print_error("reorg") + self.request_header(interface, height - 1) + requested_header = True + continue + + else: + # the chain is complete + return chain + + + def verify_chain(self, chain): + + first_header = chain[0] + prev_header = self.read_header(first_header.get('block_height') -1) + + for header in chain: + height = header.get('block_height') + + prev_hash = self.hash_header(prev_header) + bits, target = self.get_target(height/2016) + _hash = self.hash_header(header) + try: + assert prev_hash == header.get('prev_block_hash') + assert bits == header.get('bits') + assert eval('0x'+_hash) < target + except: + return False + + prev_header = header + + return True @@ -184,17 +219,8 @@ class BlockchainVerifier(threading.Thread): except: # this can be caused by a reorg. print_error("verify header failed"+ repr(header)) - # undo verifications - with self.lock: - items = self.verified_tx.items()[:] - for tx_hash, item in items: - tx_height, timestamp, pos = item - if tx_height >= height: - print_error("redoing", tx_hash) - with self.lock: - self.verified_tx.pop(tx_hash) - if tx_hash in self.merkle_roots: - self.merkle_roots.pop(tx_hash) + verifier.undo_verifications() + # return False to request previous header. return False @@ -272,6 +298,7 @@ class BlockchainVerifier(threading.Thread): h = os.path.getsize(name)/80 - 1 if self.local_height != h: self.local_height = h + self.height = self.local_height def read_header(self, block_height): @@ -327,3 +354,59 @@ class BlockchainVerifier(threading.Thread): + + def run(self): + self.start_interfaces() + + self.init_headers_file() + self.set_local_height() + print_error( "blocks:", self.local_height ) + + with self.lock: + self.running = True + + while self.is_running(): + + i, header = self.get_new_response() + + height = header.get('block_height') + + if height > self.local_height: + # get missing parts from interface (until it connects to my chain) + chain = self.get_chain( i, header ) + + # skip that server if the result is not consistent + if not chain: continue + + # verify the chain + if self.verify_chain( chain ): + print_error("height:", height, i.server) + for header in chain: + self.save_header(header) + self.height = height + else: + print_error("error", i.server) + # todo: dismiss that server + + + + + +if __name__ == "__main__": + import interface, simple_config + + config = simple_config.SimpleConfig({'verbose':True}) + + i0 = interface.Interface() + i0.start() + + bv = BlockchainVerifier(i0, config) + bv.start() + + + # listen to interfaces, forward to verifier using the queue + while 1: + time.sleep(1) + + + diff --git a/lib/interface.py b/lib/interface.py index 48aa29512..92b3ad5cc 100644 --- a/lib/interface.py +++ b/lib/interface.py @@ -332,7 +332,8 @@ class Interface(threading.Thread): try: s.connect(( self.host.encode('ascii'), int(self.port))) except: - traceback.print_exc(file=sys.stdout) + #traceback.print_exc(file=sys.stdout) + print_error("failed to connect", host, port) self.is_connected = False self.s = None return diff --git a/lib/verifier.py b/lib/verifier.py index 32a6842ce..4cacfc20b 100644 --- a/lib/verifier.py +++ b/lib/verifier.py @@ -155,4 +155,14 @@ class TxVerifier(threading.Thread): - + def undo_verifications(self, height): + with self.lock: + items = self.verified_tx.items()[:] + for tx_hash, item in items: + tx_height, timestamp, pos = item + if tx_height >= height: + print_error("redoing", tx_hash) + with self.lock: + self.verified_tx.pop(tx_hash) + if tx_hash in self.merkle_roots: + self.merkle_roots.pop(tx_hash)