@ -24,7 +24,7 @@
# SOFTWARE.
# SOFTWARE.
import asyncio
import asyncio
import hashlib
import hashlib
from typing import Dict , List , TYPE_CHECKING , Tuple
from typing import Dict , List , TYPE_CHECKING , Tuple , Set
from collections import defaultdict
from collections import defaultdict
import logging
import logging
@ -35,7 +35,7 @@ from .transaction import Transaction, PartialTransaction
from . util import bh2u , make_aiohttp_session , NetworkJobOnDefaultServer , random_shuffled_copy
from . util import bh2u , make_aiohttp_session , NetworkJobOnDefaultServer , random_shuffled_copy
from . bitcoin import address_to_scripthash , is_address
from . bitcoin import address_to_scripthash , is_address
from . logging import Logger
from . logging import Logger
from . interface import GracefulDisconnect
from . interface import GracefulDisconnect , NetworkTimeout
if TYPE_CHECKING :
if TYPE_CHECKING :
from . network import Network
from . network import Network
@ -153,6 +153,7 @@ class Synchronizer(SynchronizerBase):
super ( ) . _reset ( )
super ( ) . _reset ( )
self . requested_tx = { }
self . requested_tx = { }
self . requested_histories = set ( )
self . requested_histories = set ( )
self . _stale_histories = dict ( ) # type: Dict[str, asyncio.Task]
def diagnostic_name ( self ) :
def diagnostic_name ( self ) :
return self . wallet . diagnostic_name ( )
return self . wallet . diagnostic_name ( )
@ -160,34 +161,43 @@ class Synchronizer(SynchronizerBase):
def is_up_to_date ( self ) :
def is_up_to_date ( self ) :
return ( not self . requested_addrs
return ( not self . requested_addrs
and not self . requested_histories
and not self . requested_histories
and not self . requested_tx )
and not self . requested_tx
and not self . _stale_histories )
async def _on_address_status ( self , addr , status ) :
async def _on_address_status ( self , addr , status ) :
history = self . wallet . db . get_addr_history ( addr )
history = self . wallet . db . get_addr_history ( addr )
if history_status ( history ) == status :
if history_status ( history ) == status :
return
return
# No point in requesting history twice for the same announced status.
# However if we got announced a new status, we should request history again:
if ( addr , status ) in self . requested_histories :
if ( addr , status ) in self . requested_histories :
return
return
# request address history
# request address history
self . requested_histories . add ( ( addr , status ) )
self . requested_histories . add ( ( addr , status ) )
self . _stale_histories . pop ( addr , asyncio . Future ( ) ) . cancel ( )
h = address_to_scripthash ( addr )
h = address_to_scripthash ( addr )
self . _requests_sent + = 1
self . _requests_sent + = 1
async with self . _network_request_semaphore :
async with self . _network_request_semaphore :
result = await self . interface . get_history_for_scripthash ( h )
result = await self . interface . get_history_for_scripthash ( h )
self . _requests_answered + = 1
self . _requests_answered + = 1
self . logger . info ( f " receiving history { addr } { len ( result ) } " )
self . logger . info ( f " receiving history { addr } { len ( result ) } " )
hashes = set ( map ( lambda item : item [ ' tx_hash ' ] , result ) )
hist = list ( map ( lambda item : ( item [ ' tx_hash ' ] , item [ ' height ' ] ) , result ) )
hist = list ( map ( lambda item : ( item [ ' tx_hash ' ] , item [ ' height ' ] ) , result ) )
# tx_fees
# tx_fees
tx_fees = [ ( item [ ' tx_hash ' ] , item . get ( ' fee ' ) ) for item in result ]
tx_fees = [ ( item [ ' tx_hash ' ] , item . get ( ' fee ' ) ) for item in result ]
tx_fees = dict ( filter ( lambda x : x [ 1 ] is not None , tx_fees ) )
tx_fees = dict ( filter ( lambda x : x [ 1 ] is not None , tx_fees ) )
# Check that txids are unique
if len ( hashes ) != len ( result ) :
self . logger . info ( f " error: server history has non-unique txids: { addr } " )
# Check that the status corresponds to what was announced
# Check that the status corresponds to what was announced
elif history_status ( hist ) != status :
if history_status ( hist ) != status :
self . logger . info ( f " error: status mismatch: { addr } " )
# could happen naturally if history changed between getting status and history (race)
self . logger . info ( f " error: status mismatch: { addr } . we ' ll wait a bit for status update. " )
# The server is supposed to send a new status notification, which will trigger a new
# get_history. We shall wait a bit for this to happen, otherwise we disconnect.
async def disconnect_if_still_stale ( ) :
timeout = self . network . get_network_timeout_seconds ( NetworkTimeout . Generic )
await asyncio . sleep ( timeout )
raise SynchronizerFailure ( f " timeout reached waiting for addr { addr } : history still stale " )
self . _stale_histories [ addr ] = await self . taskgroup . spawn ( disconnect_if_still_stale )
else :
else :
self . _stale_histories . pop ( addr , asyncio . Future ( ) ) . cancel ( )
# Store received history
# Store received history
self . wallet . receive_history_callback ( addr , hist , tx_fees )
self . wallet . receive_history_callback ( addr , hist , tx_fees )
# Request transactions we don't have
# Request transactions we don't have