@ -13,6 +13,7 @@ import util
from bitcoin import *
import interface
from blockchain import Blockchain
from collections import deque
DEFAULT_PORTS = { ' t ' : ' 50001 ' , ' s ' : ' 50002 ' , ' h ' : ' 8081 ' , ' g ' : ' 8082 ' }
@ -141,6 +142,8 @@ class Network(util.DaemonThread):
self . queue = Queue . Queue ( )
self . requests_queue = pipe . send_queue
self . response_queue = pipe . get_queue
# A deque of interface header requests, processed left-to-right
self . bc_requests = deque ( )
# Server for addresses and transactions
self . default_server = self . config . get ( ' server ' )
# Sanitize default server
@ -295,11 +298,6 @@ class Network(util.DaemonThread):
for i in range ( self . num_server - 1 ) :
self . start_random_interface ( )
def start ( self ) :
self . running = True
self . blockchain . start ( )
util . DaemonThread . start ( self )
def set_proxy ( self , proxy ) :
self . proxy = proxy
if proxy :
@ -405,7 +403,6 @@ class Network(util.DaemonThread):
# Our set of interfaces changed
self . notify ( ' interfaces ' )
def process_response ( self , i , response ) :
# the id comes from the daemon or the network proxy
_id = response . get ( ' id ' )
@ -428,6 +425,10 @@ class Network(util.DaemonThread):
addr = response . get ( ' params ' ) [ 0 ]
self . addr_responses [ addr ] = result
self . response_queue . put ( response )
elif method == ' blockchain.block.get_chunk ' :
self . on_get_chunk ( i , response )
elif method == ' blockchain.block.get_header ' :
self . on_get_header ( i , response )
else :
self . response_queue . put ( response )
@ -490,10 +491,91 @@ class Network(util.DaemonThread):
else :
self . switch_to_interface ( self . default_server )
def request_chunk ( self , interface , data , idx ) :
interface . print_error ( " requesting chunk %d " % idx )
interface . send_request ( { ' method ' : ' blockchain.block.get_chunk ' , ' params ' : [ idx ] } )
data [ ' chunk_idx ' ] = idx
data [ ' req_time ' ] = time . time ( )
def on_get_chunk ( self , interface , response ) :
''' Handle receiving a chunk of block headers '''
if self . bc_requests :
req_if , data = self . bc_requests [ 0 ]
req_idx = data . get ( ' chunk_idx ' )
# Ignore unsolicited chunks
if req_if == interface and req_idx == response [ ' params ' ] [ 0 ] :
idx = self . blockchain . connect_chunk ( req_idx , response [ ' result ' ] )
# If not finished, get the next chunk
if idx < 0 or self . get_local_height ( ) > = data [ ' if_height ' ] :
self . bc_requests . popleft ( )
else :
self . request_chunk ( interface , data , idx )
def request_header ( self , interface , data , height ) :
interface . print_error ( " requesting header %d " % height )
interface . send_request ( { ' method ' : ' blockchain.block.get_header ' , ' params ' : [ height ] } )
data [ ' header_height ' ] = height
data [ ' req_time ' ] = time . time ( )
if not ' chain ' in data :
data [ ' chain ' ] = [ ]
def on_get_header ( self , interface , response ) :
''' Handle receiving a single block header '''
if self . bc_requests :
req_if , data = self . bc_requests [ 0 ]
req_height = data . get ( ' header_height ' , - 1 )
# Ignore unsolicited headers
if req_if == interface and req_height == response [ ' params ' ] [ 0 ] :
next_height = self . blockchain . connect_header ( data [ ' chain ' ] , response [ ' result ' ] )
# If not finished, get the next header
if next_height in [ True , False ] :
self . bc_requests . popleft ( )
if not next_height :
interface . print_error ( " header didn ' t connect, dismissing interface " )
interface . stop ( )
else :
self . request_header ( interface , data , next_height )
def bc_request_headers ( self , interface , data ) :
''' Send a request for the next header, or a chunk of them, if necessary '''
local_height , if_height = self . get_local_height ( ) , data [ ' if_height ' ]
if if_height < = local_height :
return False
elif if_height > local_height + 50 :
self . request_chunk ( interface , data , ( local_height + 1 ) / 2016 )
else :
self . request_header ( interface , data , if_height )
return True
def handle_bc_requests ( self ) :
''' Work through each interface that has notified us of a new header.
Send it requests if it is ahead of our blockchain object '''
while self . bc_requests :
interface , data = self . bc_requests . popleft ( )
# If the connection was lost move on
if not interface . is_connected ( ) :
continue
req_time = data . get ( ' req_time ' )
if not req_time :
# No requests sent yet. This interface has a new height.
# Request headers if it is ahead of our blockchain
if not self . bc_request_headers ( interface , data ) :
continue
elif time . time ( ) - req_time > 10 :
interface . print_error ( " blockchain request timed out " )
interface . stop ( )
continue
# Put updated request state back at head of deque
self . bc_requests . appendleft ( ( interface , data ) )
break
def run ( self ) :
self . blockchain . init ( )
while self . is_running ( ) :
self . check_interfaces ( )
self . handle_requests ( )
self . handle_bc_requests ( )
try :
i , response = self . queue . get ( timeout = 0.1 )
except Queue . Empty :
@ -508,19 +590,19 @@ class Network(util.DaemonThread):
self . stop_network ( )
self . print_error ( " stopped " )
def on_header ( self , i , r ) :
result = r . get ( ' result ' )
if not result :
header = r . get ( ' result ' )
if not header :
return
height = result . get ( ' block_height ' )
height = header . get ( ' block_height ' )
if not height :
return
self . heights [ i . server ] = height
self . merkle_roots [ i . server ] = result . get ( ' merkle_root ' )
self . utxo_roots [ i . server ] = result . get ( ' utxo_root ' )
# notify blockchain about the new height
self . blockchain . queue . put ( ( i , result ) )
self . merkle_roots [ i . server ] = header . get ( ' merkle_root ' )
self . utxo_roots [ i . server ] = header . get ( ' utxo_root ' )
# Queue this interface's height for asynchronous catch-up
self . bc_requests . append ( ( i , { ' if_height ' : height } ) )
if i == self . interface :
self . switch_lagging_interface ( )