|
|
@ -13,6 +13,7 @@ import util |
|
|
|
from bitcoin import * |
|
|
|
import interface |
|
|
|
from blockchain import Blockchain |
|
|
|
from collections import deque |
|
|
|
|
|
|
|
DEFAULT_PORTS = {'t':'50001', 's':'50002', 'h':'8081', 'g':'8082'} |
|
|
|
|
|
|
@ -141,6 +142,8 @@ class Network(util.DaemonThread): |
|
|
|
self.queue = Queue.Queue() |
|
|
|
self.requests_queue = pipe.send_queue |
|
|
|
self.response_queue = pipe.get_queue |
|
|
|
# A deque of interface header requests, processed left-to-right |
|
|
|
self.bc_requests = deque() |
|
|
|
# Server for addresses and transactions |
|
|
|
self.default_server = self.config.get('server') |
|
|
|
# Sanitize default server |
|
|
@ -295,11 +298,6 @@ class Network(util.DaemonThread): |
|
|
|
for i in range(self.num_server - 1): |
|
|
|
self.start_random_interface() |
|
|
|
|
|
|
|
def start(self): |
|
|
|
self.running = True |
|
|
|
self.blockchain.start() |
|
|
|
util.DaemonThread.start(self) |
|
|
|
|
|
|
|
def set_proxy(self, proxy): |
|
|
|
self.proxy = proxy |
|
|
|
if proxy: |
|
|
@ -412,7 +410,6 @@ class Network(util.DaemonThread): |
|
|
|
# Our set of interfaces changed |
|
|
|
self.notify('interfaces') |
|
|
|
|
|
|
|
|
|
|
|
def process_response(self, i, response): |
|
|
|
# the id comes from the daemon or the network proxy |
|
|
|
_id = response.get('id') |
|
|
@ -435,6 +432,10 @@ class Network(util.DaemonThread): |
|
|
|
addr = response.get('params')[0] |
|
|
|
self.addr_responses[addr] = result |
|
|
|
self.response_queue.put(response) |
|
|
|
elif method == 'blockchain.block.get_chunk': |
|
|
|
self.on_get_chunk(i, response) |
|
|
|
elif method == 'blockchain.block.get_header': |
|
|
|
self.on_get_header(i, response) |
|
|
|
else: |
|
|
|
self.response_queue.put(response) |
|
|
|
|
|
|
@ -508,10 +509,91 @@ class Network(util.DaemonThread): |
|
|
|
else: |
|
|
|
self.switch_to_interface(self.default_server) |
|
|
|
|
|
|
|
def request_chunk(self, interface, data, idx): |
|
|
|
interface.print_error("requesting chunk %d" % idx) |
|
|
|
interface.send_request({'method':'blockchain.block.get_chunk', 'params':[idx]}) |
|
|
|
data['chunk_idx'] = idx |
|
|
|
data['req_time'] = time.time() |
|
|
|
|
|
|
|
def on_get_chunk(self, interface, response): |
|
|
|
'''Handle receiving a chunk of block headers''' |
|
|
|
if self.bc_requests: |
|
|
|
req_if, data = self.bc_requests[0] |
|
|
|
req_idx = data.get('chunk_idx') |
|
|
|
# Ignore unsolicited chunks |
|
|
|
if req_if == interface and req_idx == response['params'][0]: |
|
|
|
idx = self.blockchain.connect_chunk(req_idx, response['result']) |
|
|
|
# If not finished, get the next chunk |
|
|
|
if idx < 0 or self.get_local_height() >= data['if_height']: |
|
|
|
self.bc_requests.popleft() |
|
|
|
else: |
|
|
|
self.request_chunk(interface, data, idx) |
|
|
|
|
|
|
|
def request_header(self, interface, data, height): |
|
|
|
interface.print_error("requesting header %d" % height) |
|
|
|
interface.send_request({'method':'blockchain.block.get_header', 'params':[height]}) |
|
|
|
data['header_height'] = height |
|
|
|
data['req_time'] = time.time() |
|
|
|
if not 'chain' in data: |
|
|
|
data['chain'] = [] |
|
|
|
|
|
|
|
def on_get_header(self, interface, response): |
|
|
|
'''Handle receiving a single block header''' |
|
|
|
if self.bc_requests: |
|
|
|
req_if, data = self.bc_requests[0] |
|
|
|
req_height = data.get('header_height', -1) |
|
|
|
# Ignore unsolicited headers |
|
|
|
if req_if == interface and req_height == response['params'][0]: |
|
|
|
next_height = self.blockchain.connect_header(data['chain'], response['result']) |
|
|
|
# If not finished, get the next header |
|
|
|
if next_height in [True, False]: |
|
|
|
self.bc_requests.popleft() |
|
|
|
if not next_height: |
|
|
|
interface.print_error("header didn't connect, dismissing interface") |
|
|
|
interface.stop() |
|
|
|
else: |
|
|
|
self.request_header(interface, data, next_height) |
|
|
|
|
|
|
|
def bc_request_headers(self, interface, data): |
|
|
|
'''Send a request for the next header, or a chunk of them, if necessary''' |
|
|
|
local_height, if_height = self.get_local_height(), data['if_height'] |
|
|
|
if if_height <= local_height: |
|
|
|
return False |
|
|
|
elif if_height > local_height + 50: |
|
|
|
self.request_chunk(interface, data, (local_height + 1) / 2016) |
|
|
|
else: |
|
|
|
self.request_header(interface, data, if_height) |
|
|
|
return True |
|
|
|
|
|
|
|
def handle_bc_requests(self): |
|
|
|
'''Work through each interface that has notified us of a new header. |
|
|
|
Send it requests if it is ahead of our blockchain object''' |
|
|
|
while self.bc_requests: |
|
|
|
interface, data = self.bc_requests.popleft() |
|
|
|
# If the connection was lost move on |
|
|
|
if not interface.is_connected(): |
|
|
|
continue |
|
|
|
|
|
|
|
req_time = data.get('req_time') |
|
|
|
if not req_time: |
|
|
|
# No requests sent yet. This interface has a new height. |
|
|
|
# Request headers if it is ahead of our blockchain |
|
|
|
if not self.bc_request_headers(interface, data): |
|
|
|
continue |
|
|
|
elif time.time() - req_time > 10: |
|
|
|
interface.print_error("blockchain request timed out") |
|
|
|
interface.stop() |
|
|
|
continue |
|
|
|
# Put updated request state back at head of deque |
|
|
|
self.bc_requests.appendleft((interface, data)) |
|
|
|
break |
|
|
|
|
|
|
|
def run(self): |
|
|
|
self.blockchain.init() |
|
|
|
while self.is_running(): |
|
|
|
self.check_interfaces() |
|
|
|
self.handle_requests() |
|
|
|
self.handle_bc_requests() |
|
|
|
try: |
|
|
|
i, response = self.queue.get(timeout=0.1) |
|
|
|
except Queue.Empty: |
|
|
@ -526,19 +608,19 @@ class Network(util.DaemonThread): |
|
|
|
self.stop_network() |
|
|
|
self.print_error("stopped") |
|
|
|
|
|
|
|
|
|
|
|
def on_header(self, i, r): |
|
|
|
result = r.get('result') |
|
|
|
if not result: |
|
|
|
header = r.get('result') |
|
|
|
if not header: |
|
|
|
return |
|
|
|
height = result.get('block_height') |
|
|
|
height = header.get('block_height') |
|
|
|
if not height: |
|
|
|
return |
|
|
|
self.heights[i.server] = height |
|
|
|
self.merkle_roots[i.server] = result.get('merkle_root') |
|
|
|
self.utxo_roots[i.server] = result.get('utxo_root') |
|
|
|
# notify blockchain about the new height |
|
|
|
self.blockchain.queue.put((i,result)) |
|
|
|
self.merkle_roots[i.server] = header.get('merkle_root') |
|
|
|
self.utxo_roots[i.server] = header.get('utxo_root') |
|
|
|
|
|
|
|
# Queue this interface's height for asynchronous catch-up |
|
|
|
self.bc_requests.append((i, {'if_height': height})) |
|
|
|
|
|
|
|
if i == self.interface: |
|
|
|
self.switch_lagging_interface() |
|
|
|