diff --git a/electrum/synchronizer.py b/electrum/synchronizer.py index 6cc3b7bdf..f65793132 100644 --- a/electrum/synchronizer.py +++ b/electrum/synchronizer.py @@ -32,10 +32,11 @@ import concurrent.futures # from .bitcoin import Hash, hash_encode from .transaction import Transaction -from .util import ThreadJob, bh2u, PrintError, aiosafe +from .util import ThreadJob, bh2u, PrintError, aiosafe, bfh, NotificationSession from .bitcoin import address_to_scripthash from .version import ELECTRUM_VERSION, PROTOCOL_VERSION - +from .network import parse_servers +from .bitcoin import COIN def history_status(h): if not h: @@ -47,20 +48,6 @@ def history_status(h): -class NotificationSession(ClientSession): - - def __init__(self, queue, *args, **kwargs): - super(NotificationSession, self).__init__(*args, **kwargs) - self.queue = queue - - @aiosafe - async def handle_request(self, request): - if isinstance(request, Notification): - if request.method == 'blockchain.scripthash.subscribe': - args = request.args - await self.queue.put((args[0], args[1])) - - class Synchronizer(PrintError): '''The synchronizer keeps the wallet up-to-date with its set of addresses and their transactions. It subscribes over the network @@ -79,9 +66,6 @@ class Synchronizer(PrintError): self.add_queue = asyncio.Queue() self.status_queue = asyncio.Queue() - async def send_version(self): - r = await self.session.send_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION]) - def is_up_to_date(self): return (not self.requested_addrs and not self.requested_histories) @@ -134,6 +118,8 @@ class Synchronizer(PrintError): # Remove request; this allows up_to_date to be True self.requested_histories.pop(addr) + if self.wallet.network: self.wallet.network.notify('updated') + async def request_missing_txs(self, hist): # "hist" is a list of [tx_hash, tx_height] lists transaction_hashes = [] @@ -170,12 +156,37 @@ class Synchronizer(PrintError): async def subscribe_to_address(self, addr): h = address_to_scripthash(addr) self.scripthash_to_address[h] = addr + self.session.scripthash = self.status_queue status = await self.session.send_request('blockchain.scripthash.subscribe', [h]) await self.status_queue.put((h, status)) self.requested_addrs.remove(addr) + async def request_fee_estimates(self): + from .simple_config import FEE_ETA_TARGETS + self.wallet.network.config.requested_fee_estimates() + histogram = await self.session.send_request('mempool.get_fee_histogram') + fees = [] + for i in FEE_ETA_TARGETS: + fees.append((i, await self.session.send_request('blockchain.estimatefee', [i]))) + return histogram, fees + @aiosafe async def send_subscriptions(self): + self.wallet.network.banner = await self.session.send_request('server.banner') + self.wallet.network.notify('banner') + self.wallet.network.donation_address = await self.session.send_request('server.donation_address') + self.wallet.network.irc_servers = parse_servers(await self.session.send_request('server.peers.subscribe')) + self.wallet.network.notify('servers') + histogram, fees = await self.request_fee_estimates() + self.wallet.network.config.mempool_fees = histogram + self.wallet.network.notify('fee_histogram') + for i, result in fees: + fee = int(result * COIN) + self.wallet.network.config.update_fee_estimates(i, fee) + self.print_error("fee_estimates[%d]" % i, fee) + self.wallet.network.notify('fee') + relayfee = await self.session.send_request('blockchain.relayfee') + self.wallet.network.relay_fee = int(relayfee * COIN) if relayfee is not None else None async with TaskGroup() as group: while True: addr = await self.add_queue.get() @@ -189,20 +200,19 @@ class Synchronizer(PrintError): addr = self.scripthash_to_address[h] await group.spawn(self.on_address_status(addr, status)) + @property + def session(self): + s = self.wallet.network.interface.session + assert s is not None + return s + @aiosafe async def main(self): - conn = self.wallet.network.default_server - host, port, protocol = conn.split(':') - sslc = ssl.SSLContext(ssl.PROTOCOL_TLS) if protocol == 's' else None - async with NotificationSession(self.status_queue, host, int(port), ssl=sslc) as session: - self.session = session - await self.send_version() - self.wallet.synchronizer = self - for addr in self.wallet.get_addresses(): self.add(addr) - while True: - await asyncio.sleep(1) - self.wallet.synchronize() - up_to_date = self.is_up_to_date() - if up_to_date != self.wallet.is_up_to_date(): - self.wallet.set_up_to_date(up_to_date) - self.wallet.network.trigger_callback('updated') + for addr in self.wallet.get_addresses(): self.add(addr) + while True: + await asyncio.sleep(1) + self.wallet.synchronize() + up_to_date = self.is_up_to_date() + if up_to_date != self.wallet.is_up_to_date(): + self.wallet.set_up_to_date(up_to_date) + self.wallet.network.trigger_callback('updated')