|
|
@ -29,79 +29,71 @@ class BlockchainVerifier(threading.Thread): |
|
|
|
threading.Thread.__init__(self) |
|
|
|
self.daemon = True |
|
|
|
self.config = config |
|
|
|
self.interface = interface |
|
|
|
self.interface.register_channel('verifier') |
|
|
|
self.lock = threading.Lock() |
|
|
|
self.pending_headers = [] # headers that have not been verified |
|
|
|
self.height = 0 |
|
|
|
self.local_height = 0 |
|
|
|
self.running = False |
|
|
|
self.headers_url = 'http://headers.electrum.org/blockchain_headers' |
|
|
|
self.interface = interface |
|
|
|
interface.register_channel('verifier') |
|
|
|
self.set_local_height() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def start_interfaces(self): |
|
|
|
import interface |
|
|
|
servers = interface.DEFAULT_SERVERS |
|
|
|
servers = interface.filter_protocol(servers,'s') |
|
|
|
print_error("using %d servers"% len(servers)) |
|
|
|
self.interfaces = map ( lambda server: interface.Interface({'server':server} ), servers ) |
|
|
|
|
|
|
|
for i in self.interfaces: |
|
|
|
i.start() |
|
|
|
# subscribe to block headers |
|
|
|
i.register_channel('verifier') |
|
|
|
i.register_channel('get_header') |
|
|
|
i.send([ ('blockchain.headers.subscribe',[])], 'verifier') |
|
|
|
# note: each interface should send its results directly to a queue, instead of channels |
|
|
|
# pass the queue to the interface, so that several can share the same queue |
|
|
|
|
|
|
|
|
|
|
|
def get_new_response(self): |
|
|
|
# listen to interfaces, forward to verifier using the queue |
|
|
|
while 1: |
|
|
|
for i in self.interfaces: |
|
|
|
try: |
|
|
|
r = i.get_response('verifier',timeout=0) |
|
|
|
except Queue.Empty: |
|
|
|
continue |
|
|
|
|
|
|
|
result = r.get('result') |
|
|
|
if result: |
|
|
|
return (i,result) |
|
|
|
|
|
|
|
time.sleep(1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def stop(self): |
|
|
|
with self.lock: self.running = False |
|
|
|
self.interface.poke('verifier') |
|
|
|
#self.interface.poke('verifier') |
|
|
|
|
|
|
|
def is_running(self): |
|
|
|
with self.lock: return self.running |
|
|
|
|
|
|
|
def run(self): |
|
|
|
|
|
|
|
self.init_headers_file() |
|
|
|
self.set_local_height() |
|
|
|
|
|
|
|
with self.lock: |
|
|
|
self.running = True |
|
|
|
requested_chunks = [] |
|
|
|
requested_headers = [] |
|
|
|
all_chunks = False |
|
|
|
|
|
|
|
# subscribe to block headers |
|
|
|
self.interface.send([ ('blockchain.headers.subscribe',[])], 'verifier') |
|
|
|
|
|
|
|
while self.is_running(): |
|
|
|
# request missing chunks |
|
|
|
if not all_chunks and self.height and not requested_chunks: |
|
|
|
|
|
|
|
if self.local_height + 50 < self.height: |
|
|
|
min_index = (self.local_height + 1)/2016 |
|
|
|
max_index = (self.height + 1)/2016 |
|
|
|
for i in range(min_index, max_index + 1): |
|
|
|
print_error( "requesting chunk", i ) |
|
|
|
self.interface.send([ ('blockchain.block.get_chunk',[i])], 'verifier') |
|
|
|
requested_chunks.append(i) |
|
|
|
break |
|
|
|
else: |
|
|
|
all_chunks = True |
|
|
|
print_error("downloaded all chunks") |
|
|
|
|
|
|
|
|
|
|
|
# process pending headers |
|
|
|
if self.pending_headers and all_chunks: |
|
|
|
done = [] |
|
|
|
for header in self.pending_headers: |
|
|
|
if self.verify_header(header): |
|
|
|
done.append(header) |
|
|
|
else: |
|
|
|
# request previous header |
|
|
|
i = header.get('block_height') - 1 |
|
|
|
if i not in requested_headers: |
|
|
|
print_error("requesting header %d"%i) |
|
|
|
self.interface.send([ ('blockchain.block.get_header',[i])], 'verifier') |
|
|
|
requested_headers.append(i) |
|
|
|
# no point continuing |
|
|
|
break |
|
|
|
if done: |
|
|
|
self.interface.trigger_callback('updated') |
|
|
|
for header in done: |
|
|
|
self.pending_headers.remove(header) |
|
|
|
def request_header(self, i, h): |
|
|
|
print_error("requesting header %d from %s"%(h, i.server)) |
|
|
|
i.send([ ('blockchain.block.get_header',[h])], 'get_header') |
|
|
|
|
|
|
|
def retrieve_header(self, i): |
|
|
|
while True: |
|
|
|
try: |
|
|
|
r = self.interface.get_response('verifier',timeout=1) |
|
|
|
r = i.get_response('get_header',timeout=1) |
|
|
|
except Queue.Empty: |
|
|
|
print_error('timeout') |
|
|
|
continue |
|
|
|
if not r: continue |
|
|
|
|
|
|
|
if r.get('error'): |
|
|
|
print_error('Verifier received an error:', r) |
|
|
@ -112,23 +104,66 @@ class BlockchainVerifier(threading.Thread): |
|
|
|
params = r['params'] |
|
|
|
result = r['result'] |
|
|
|
|
|
|
|
if method == 'blockchain.block.get_chunk': |
|
|
|
index = params[0] |
|
|
|
self.verify_chunk(index, result) |
|
|
|
requested_chunks.remove(index) |
|
|
|
if method == 'blockchain.block.get_header': |
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
elif method in ['blockchain.headers.subscribe', 'blockchain.block.get_header']: |
|
|
|
def get_chain(self, interface, final_header): |
|
|
|
|
|
|
|
self.pending_headers.append(result) |
|
|
|
if method == 'blockchain.block.get_header': |
|
|
|
requested_headers.remove(result.get('block_height')) |
|
|
|
else: |
|
|
|
self.height = result.get('block_height') |
|
|
|
## fixme # self.interface.poke('synchronizer') |
|
|
|
|
|
|
|
self.pending_headers.sort(key=lambda x: x.get('block_height')) |
|
|
|
# print "pending headers", map(lambda x: x.get('block_height'), self.pending_headers) |
|
|
|
header = final_header |
|
|
|
chain = [ final_header ] |
|
|
|
requested_header = False |
|
|
|
|
|
|
|
while self.is_running(): |
|
|
|
|
|
|
|
if requested_header: |
|
|
|
header = self.retrieve_header(interface) |
|
|
|
if not header: return |
|
|
|
chain = [ header ] + chain |
|
|
|
requested_header = False |
|
|
|
|
|
|
|
height = header.get('block_height') |
|
|
|
previous_header = self.read_header(height -1) |
|
|
|
if not previous_header: |
|
|
|
self.request_header(interface, height - 1) |
|
|
|
requested_header = True |
|
|
|
continue |
|
|
|
|
|
|
|
# verify that it connects to my chain |
|
|
|
prev_hash = self.hash_header(previous_header) |
|
|
|
if prev_hash != header.get('prev_block_hash'): |
|
|
|
print_error("reorg") |
|
|
|
self.request_header(interface, height - 1) |
|
|
|
requested_header = True |
|
|
|
continue |
|
|
|
|
|
|
|
else: |
|
|
|
# the chain is complete |
|
|
|
return chain |
|
|
|
|
|
|
|
|
|
|
|
def verify_chain(self, chain): |
|
|
|
|
|
|
|
first_header = chain[0] |
|
|
|
prev_header = self.read_header(first_header.get('block_height') -1) |
|
|
|
|
|
|
|
for header in chain: |
|
|
|
|
|
|
|
height = header.get('block_height') |
|
|
|
|
|
|
|
prev_hash = self.hash_header(prev_header) |
|
|
|
bits, target = self.get_target(height/2016) |
|
|
|
_hash = self.hash_header(header) |
|
|
|
try: |
|
|
|
assert prev_hash == header.get('prev_block_hash') |
|
|
|
assert bits == header.get('bits') |
|
|
|
assert eval('0x'+_hash) < target |
|
|
|
except: |
|
|
|
return False |
|
|
|
|
|
|
|
prev_header = header |
|
|
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -184,17 +219,8 @@ class BlockchainVerifier(threading.Thread): |
|
|
|
except: |
|
|
|
# this can be caused by a reorg. |
|
|
|
print_error("verify header failed"+ repr(header)) |
|
|
|
# undo verifications |
|
|
|
with self.lock: |
|
|
|
items = self.verified_tx.items()[:] |
|
|
|
for tx_hash, item in items: |
|
|
|
tx_height, timestamp, pos = item |
|
|
|
if tx_height >= height: |
|
|
|
print_error("redoing", tx_hash) |
|
|
|
with self.lock: |
|
|
|
self.verified_tx.pop(tx_hash) |
|
|
|
if tx_hash in self.merkle_roots: |
|
|
|
self.merkle_roots.pop(tx_hash) |
|
|
|
verifier.undo_verifications() |
|
|
|
|
|
|
|
# return False to request previous header. |
|
|
|
return False |
|
|
|
|
|
|
@ -272,6 +298,7 @@ class BlockchainVerifier(threading.Thread): |
|
|
|
h = os.path.getsize(name)/80 - 1 |
|
|
|
if self.local_height != h: |
|
|
|
self.local_height = h |
|
|
|
self.height = self.local_height |
|
|
|
|
|
|
|
|
|
|
|
def read_header(self, block_height): |
|
|
@ -327,3 +354,59 @@ class BlockchainVerifier(threading.Thread): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run(self): |
|
|
|
self.start_interfaces() |
|
|
|
|
|
|
|
self.init_headers_file() |
|
|
|
self.set_local_height() |
|
|
|
print_error( "blocks:", self.local_height ) |
|
|
|
|
|
|
|
with self.lock: |
|
|
|
self.running = True |
|
|
|
|
|
|
|
while self.is_running(): |
|
|
|
|
|
|
|
i, header = self.get_new_response() |
|
|
|
|
|
|
|
height = header.get('block_height') |
|
|
|
|
|
|
|
if height > self.local_height: |
|
|
|
# get missing parts from interface (until it connects to my chain) |
|
|
|
chain = self.get_chain( i, header ) |
|
|
|
|
|
|
|
# skip that server if the result is not consistent |
|
|
|
if not chain: continue |
|
|
|
|
|
|
|
# verify the chain |
|
|
|
if self.verify_chain( chain ): |
|
|
|
print_error("height:", height, i.server) |
|
|
|
for header in chain: |
|
|
|
self.save_header(header) |
|
|
|
self.height = height |
|
|
|
else: |
|
|
|
print_error("error", i.server) |
|
|
|
# todo: dismiss that server |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
import interface, simple_config |
|
|
|
|
|
|
|
config = simple_config.SimpleConfig({'verbose':True}) |
|
|
|
|
|
|
|
i0 = interface.Interface() |
|
|
|
i0.start() |
|
|
|
|
|
|
|
bv = BlockchainVerifier(i0, config) |
|
|
|
bv.start() |
|
|
|
|
|
|
|
|
|
|
|
# listen to interfaces, forward to verifier using the queue |
|
|
|
while 1: |
|
|
|
time.sleep(1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|