From d16714a1db20a4bbd8da275ba28e9efe5edfffa1 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 17 May 2015 22:54:20 +0900 Subject: [PATCH] Make the blockchain class not a thread Remove interface communication out of blockchain.py into network.py. network.py handles protocol requests for headers and chunks. blockchain.py continues to handle their analysis and verification. If an interface provides a header chain that doesn't connect, it is dismissed, as per a previous TODO comment. This removes a thread and another source of timeouts. I see no performance issues with this when truncating the blockchain. Rename 'result' to 'header' for clarity. --- lib/blockchain.py | 168 ++++++++++++---------------------------------- lib/network.py | 110 ++++++++++++++++++++++++++---- 2 files changed, 139 insertions(+), 139 deletions(-) diff --git a/lib/blockchain.py b/lib/blockchain.py index baa26c3ff..b65487426 100644 --- a/lib/blockchain.py +++ b/lib/blockchain.py @@ -17,70 +17,31 @@ # along with this program. If not, see . -import threading, time, Queue, os, sys, shutil -from util import user_dir, print_error +import os import util from bitcoin import * -class Blockchain(util.DaemonThread): - +class Blockchain(): + '''Manages blockchain headers and their verification''' def __init__(self, config, network): - util.DaemonThread.__init__(self) self.config = config self.network = network - self.lock = threading.Lock() self.headers_url = 'http://headers.electrum.org/blockchain_headers' - self.queue = Queue.Queue() self.local_height = 0 self.set_local_height() + def print_error(self, *msg): + util.print_error("[blockchain]", *msg) + def height(self): return self.local_height - def run(self): + def init(self): self.init_headers_file() self.print_error("%d blocks" % self.local_height) - while self.is_running(): - try: - result = self.queue.get(timeout=0.1) - except Queue.Empty: - continue - if not result: - continue - i, header = result - if not header: - continue - height = header.get('block_height') - if height <= self.local_height: - continue - if height > self.local_height + 50: - if not self.get_and_verify_chunks(i, header, height): - continue - if height > self.local_height: - # get missing parts from interface (until it connects to my chain) - chain = self.get_chain( i, header ) - # skip that server if the result is not consistent - if not chain: - self.print_error('e') - continue - # verify the chain - if self.verify_chain( chain ): - self.print_error("height:", height, i.server) - for header in chain: - self.save_header(header) - else: - self.print_error("error", i.server) - # todo: dismiss that server - continue - self.network.new_blockchain_height(height, i) - - self.print_error("stopped") - - def verify_chain(self, chain): - first_header = chain[0] prev_header = self.read_header(first_header.get('block_height') -1) @@ -131,7 +92,7 @@ class Blockchain(util.DaemonThread): previous_hash = _hash self.save_chunk(index, data) - self.print_error("validated chunk %d"%height) + self.print_error("validated chunk %d to height %d" % (index, height)) @@ -259,81 +220,38 @@ class Blockchain(util.DaemonThread): new_bits = c + MM * i return new_bits, new_target - - def request_header(self, i, h, queue): - self.print_error("requesting header %d from %s"%(h, i.server)) - i.send_request({'method':'blockchain.block.get_header', 'params':[h]}, queue) - - def retrieve_request(self, queue): - t = time.time() - while self.is_running(): - try: - ir = queue.get(timeout=0.1) - except Queue.Empty: - if time.time() - t > 10: - return - else: - continue - i, r = ir - result = r['result'] - return result - - def get_chain(self, interface, final_header): - - header = final_header - chain = [ final_header ] - requested_header = False - queue = Queue.Queue() - - while self.is_running(): - - if requested_header: - header = self.retrieve_request(queue) - if not header: - self.print_error('chain request timed out, giving up') - return - chain = [ header ] + chain - requested_header = False - - height = header.get('block_height') - previous_header = self.read_header(height -1) - if not previous_header: - self.request_header(interface, height - 1, queue) - requested_header = True - continue - - # verify that it connects to my chain - prev_hash = self.hash_header(previous_header) - if prev_hash != header.get('prev_block_hash'): - self.print_error("reorg") - self.request_header(interface, height - 1, queue) - requested_header = True - continue - - else: - # the chain is complete - return chain - - - def get_and_verify_chunks(self, i, header, height): - - queue = Queue.Queue() - min_index = (self.local_height + 1)/2016 - max_index = (height + 1)/2016 - n = min_index - while n < max_index + 1: - self.print_error( "Requesting chunk:", n ) - i.send_request({'method':'blockchain.block.get_chunk', 'params':[n]}, queue) - r = self.retrieve_request(queue) - if not r: - return False - try: - self.verify_chunk(n, r) - n = n + 1 - except Exception: - self.print_error('Verify chunk failed!') - n = n - 1 - if n < 0: - return False - - return True + def connect_header(self, chain, header): + '''Builds a header chain until it connects. Returns True if it has + successfully connected, False if verification failed, otherwise the + height of the next header needed.''' + chain.append(header) # Ordered by decreasing height + previous_height = header['block_height'] - 1 + previous_header = self.read_header(previous_height) + + # Missing header, request it + if not previous_header: + return previous_height + + # Does it connect to my chain? + prev_hash = self.hash_header(previous_header) + if prev_hash != header.get('prev_block_hash'): + self.print_error("reorg") + return previous_height + + # The chain is complete. Reverse to order by increasing height + chain.reverse() + if self.verify_chain(chain): + self.print_error("connected at height:", previous_height) + for header in chain: + self.save_header(header) + return True + + return False + + def connect_chunk(self, idx, chunk): + try: + self.verify_chunk(idx, chunk) + return idx + 1 + except Exception: + self.print_error('verify_chunk failed') + return idx - 1 diff --git a/lib/network.py b/lib/network.py index 64c1686fd..e023ff9bc 100644 --- a/lib/network.py +++ b/lib/network.py @@ -13,6 +13,7 @@ import util from bitcoin import * import interface from blockchain import Blockchain +from collections import deque DEFAULT_PORTS = {'t':'50001', 's':'50002', 'h':'8081', 'g':'8082'} @@ -141,6 +142,8 @@ class Network(util.DaemonThread): self.queue = Queue.Queue() self.requests_queue = pipe.send_queue self.response_queue = pipe.get_queue + # A deque of interface header requests, processed left-to-right + self.bc_requests = deque() # Server for addresses and transactions self.default_server = self.config.get('server') # Sanitize default server @@ -295,11 +298,6 @@ class Network(util.DaemonThread): for i in range(self.num_server - 1): self.start_random_interface() - def start(self): - self.running = True - self.blockchain.start() - util.DaemonThread.start(self) - def set_proxy(self, proxy): self.proxy = proxy if proxy: @@ -405,7 +403,6 @@ class Network(util.DaemonThread): # Our set of interfaces changed self.notify('interfaces') - def process_response(self, i, response): # the id comes from the daemon or the network proxy _id = response.get('id') @@ -428,6 +425,10 @@ class Network(util.DaemonThread): addr = response.get('params')[0] self.addr_responses[addr] = result self.response_queue.put(response) + elif method == 'blockchain.block.get_chunk': + self.on_get_chunk(i, response) + elif method == 'blockchain.block.get_header': + self.on_get_header(i, response) else: self.response_queue.put(response) @@ -490,10 +491,91 @@ class Network(util.DaemonThread): else: self.switch_to_interface(self.default_server) + def request_chunk(self, interface, data, idx): + interface.print_error("requesting chunk %d" % idx) + interface.send_request({'method':'blockchain.block.get_chunk', 'params':[idx]}) + data['chunk_idx'] = idx + data['req_time'] = time.time() + + def on_get_chunk(self, interface, response): + '''Handle receiving a chunk of block headers''' + if self.bc_requests: + req_if, data = self.bc_requests[0] + req_idx = data.get('chunk_idx') + # Ignore unsolicited chunks + if req_if == interface and req_idx == response['params'][0]: + idx = self.blockchain.connect_chunk(req_idx, response['result']) + # If not finished, get the next chunk + if idx < 0 or self.get_local_height() >= data['if_height']: + self.bc_requests.popleft() + else: + self.request_chunk(interface, data, idx) + + def request_header(self, interface, data, height): + interface.print_error("requesting header %d" % height) + interface.send_request({'method':'blockchain.block.get_header', 'params':[height]}) + data['header_height'] = height + data['req_time'] = time.time() + if not 'chain' in data: + data['chain'] = [] + + def on_get_header(self, interface, response): + '''Handle receiving a single block header''' + if self.bc_requests: + req_if, data = self.bc_requests[0] + req_height = data.get('header_height', -1) + # Ignore unsolicited headers + if req_if == interface and req_height == response['params'][0]: + next_height = self.blockchain.connect_header(data['chain'], response['result']) + # If not finished, get the next header + if next_height in [True, False]: + self.bc_requests.popleft() + if not next_height: + interface.print_error("header didn't connect, dismissing interface") + interface.stop() + else: + self.request_header(interface, data, next_height) + + def bc_request_headers(self, interface, data): + '''Send a request for the next header, or a chunk of them, if necessary''' + local_height, if_height = self.get_local_height(), data['if_height'] + if if_height <= local_height: + return False + elif if_height > local_height + 50: + self.request_chunk(interface, data, (local_height + 1) / 2016) + else: + self.request_header(interface, data, if_height) + return True + + def handle_bc_requests(self): + '''Work through each interface that has notified us of a new header. + Send it requests if it is ahead of our blockchain object''' + while self.bc_requests: + interface, data = self.bc_requests.popleft() + # If the connection was lost move on + if not interface.is_connected(): + continue + + req_time = data.get('req_time') + if not req_time: + # No requests sent yet. This interface has a new height. + # Request headers if it is ahead of our blockchain + if not self.bc_request_headers(interface, data): + continue + elif time.time() - req_time > 10: + interface.print_error("blockchain request timed out") + interface.stop() + continue + # Put updated request state back at head of deque + self.bc_requests.appendleft((interface, data)) + break + def run(self): + self.blockchain.init() while self.is_running(): self.check_interfaces() self.handle_requests() + self.handle_bc_requests() try: i, response = self.queue.get(timeout=0.1) except Queue.Empty: @@ -508,19 +590,19 @@ class Network(util.DaemonThread): self.stop_network() self.print_error("stopped") - def on_header(self, i, r): - result = r.get('result') - if not result: + header = r.get('result') + if not header: return - height = result.get('block_height') + height = header.get('block_height') if not height: return self.heights[i.server] = height - self.merkle_roots[i.server] = result.get('merkle_root') - self.utxo_roots[i.server] = result.get('utxo_root') - # notify blockchain about the new height - self.blockchain.queue.put((i,result)) + self.merkle_roots[i.server] = header.get('merkle_root') + self.utxo_roots[i.server] = header.get('utxo_root') + + # Queue this interface's height for asynchronous catch-up + self.bc_requests.append((i, {'if_height': height})) if i == self.interface: self.switch_lagging_interface()