Browse Source

address synchronizer: use aiorpcx session object in network's interface,

request, fees
3.3.3.1
Janus 7 years ago
committed by SomberNight
parent
commit
9bfb5fe71f
No known key found for this signature in database GPG Key ID: B33B5F232C6271E9
  1. 78
      electrum/synchronizer.py

78
electrum/synchronizer.py

@ -32,10 +32,11 @@ import concurrent.futures
# from .bitcoin import Hash, hash_encode
from .transaction import Transaction
from .util import ThreadJob, bh2u, PrintError, aiosafe
from .util import ThreadJob, bh2u, PrintError, aiosafe, bfh, NotificationSession
from .bitcoin import address_to_scripthash
from .version import ELECTRUM_VERSION, PROTOCOL_VERSION
from .network import parse_servers
from .bitcoin import COIN
def history_status(h):
if not h:
@ -47,20 +48,6 @@ def history_status(h):
class NotificationSession(ClientSession):
def __init__(self, queue, *args, **kwargs):
super(NotificationSession, self).__init__(*args, **kwargs)
self.queue = queue
@aiosafe
async def handle_request(self, request):
if isinstance(request, Notification):
if request.method == 'blockchain.scripthash.subscribe':
args = request.args
await self.queue.put((args[0], args[1]))
class Synchronizer(PrintError):
'''The synchronizer keeps the wallet up-to-date with its set of
addresses and their transactions. It subscribes over the network
@ -79,9 +66,6 @@ class Synchronizer(PrintError):
self.add_queue = asyncio.Queue()
self.status_queue = asyncio.Queue()
async def send_version(self):
r = await self.session.send_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])
def is_up_to_date(self):
return (not self.requested_addrs and not self.requested_histories)
@ -134,6 +118,8 @@ class Synchronizer(PrintError):
# Remove request; this allows up_to_date to be True
self.requested_histories.pop(addr)
if self.wallet.network: self.wallet.network.notify('updated')
async def request_missing_txs(self, hist):
# "hist" is a list of [tx_hash, tx_height] lists
transaction_hashes = []
@ -170,12 +156,37 @@ class Synchronizer(PrintError):
async def subscribe_to_address(self, addr):
h = address_to_scripthash(addr)
self.scripthash_to_address[h] = addr
self.session.scripthash = self.status_queue
status = await self.session.send_request('blockchain.scripthash.subscribe', [h])
await self.status_queue.put((h, status))
self.requested_addrs.remove(addr)
async def request_fee_estimates(self):
from .simple_config import FEE_ETA_TARGETS
self.wallet.network.config.requested_fee_estimates()
histogram = await self.session.send_request('mempool.get_fee_histogram')
fees = []
for i in FEE_ETA_TARGETS:
fees.append((i, await self.session.send_request('blockchain.estimatefee', [i])))
return histogram, fees
@aiosafe
async def send_subscriptions(self):
self.wallet.network.banner = await self.session.send_request('server.banner')
self.wallet.network.notify('banner')
self.wallet.network.donation_address = await self.session.send_request('server.donation_address')
self.wallet.network.irc_servers = parse_servers(await self.session.send_request('server.peers.subscribe'))
self.wallet.network.notify('servers')
histogram, fees = await self.request_fee_estimates()
self.wallet.network.config.mempool_fees = histogram
self.wallet.network.notify('fee_histogram')
for i, result in fees:
fee = int(result * COIN)
self.wallet.network.config.update_fee_estimates(i, fee)
self.print_error("fee_estimates[%d]" % i, fee)
self.wallet.network.notify('fee')
relayfee = await self.session.send_request('blockchain.relayfee')
self.wallet.network.relay_fee = int(relayfee * COIN) if relayfee is not None else None
async with TaskGroup() as group:
while True:
addr = await self.add_queue.get()
@ -189,20 +200,19 @@ class Synchronizer(PrintError):
addr = self.scripthash_to_address[h]
await group.spawn(self.on_address_status(addr, status))
@property
def session(self):
s = self.wallet.network.interface.session
assert s is not None
return s
@aiosafe
async def main(self):
conn = self.wallet.network.default_server
host, port, protocol = conn.split(':')
sslc = ssl.SSLContext(ssl.PROTOCOL_TLS) if protocol == 's' else None
async with NotificationSession(self.status_queue, host, int(port), ssl=sslc) as session:
self.session = session
await self.send_version()
self.wallet.synchronizer = self
for addr in self.wallet.get_addresses(): self.add(addr)
while True:
await asyncio.sleep(1)
self.wallet.synchronize()
up_to_date = self.is_up_to_date()
if up_to_date != self.wallet.is_up_to_date():
self.wallet.set_up_to_date(up_to_date)
self.wallet.network.trigger_callback('updated')
for addr in self.wallet.get_addresses(): self.add(addr)
while True:
await asyncio.sleep(1)
self.wallet.synchronize()
up_to_date = self.is_up_to_date()
if up_to_date != self.wallet.is_up_to_date():
self.wallet.set_up_to_date(up_to_date)
self.wallet.network.trigger_callback('updated')

Loading…
Cancel
Save