diff --git a/server/controller.py b/server/controller.py index e5b2898..90f3222 100644 --- a/server/controller.py +++ b/server/controller.py @@ -20,7 +20,7 @@ from functools import partial import pylru from lib.jsonrpc import JSONRPC, JSONSessionBase, RPCError -from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash +from lib.hash import double_sha256, hash_to_str, hex_str_to_hash import lib.util as util from server.block_processor import BlockProcessor from server.daemon import Daemon, DaemonError @@ -706,16 +706,13 @@ class Controller(util.LoggedClass): except DaemonError as e: raise RPCError('daemon error: {}'.format(e)) - async def new_subscription(self, address): + def new_subscription(self, address): if self.subs_room <= 0: self.subs_room = self.max_subs - self.sub_count() if self.subs_room <= 0: raise RPCError('server subscription limit {:,d} reached' .format(self.max_subs)) self.subs_room -= 1 - hashX = self.address_to_hashX(address) - status = await self.address_status(hashX) - return hashX, status async def tx_merkle(self, tx_hash, height): '''tx_hash is a hex string.''' @@ -779,21 +776,6 @@ class Controller(util.LoggedClass): for tx_hash, height in history] return conf + await self.unconfirmed_history(hashX) - async def address_status(self, hashX): - '''Returns status as 32 bytes.''' - # Note history is ordered and mempool unordered in electrum-server - # For mempool, height is -1 if unconfirmed txins, otherwise 0 - history = await self.get_history(hashX) - mempool = await self.mempool_transactions(hashX) - - status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) - for tx_hash, height in history) - status += ''.join('{}:{:d}:'.format(hex_hash, -unconfirmed) - for hex_hash, tx_fee, unconfirmed in mempool) - if status: - return sha256(status.encode()).hex() - return None - async def get_utxos(self, hashX): '''Get UTXOs asynchronously to reduce latency.''' def job(): diff --git a/server/mempool.py b/server/mempool.py index 35d177f..c7843ba 100644 --- a/server/mempool.py +++ b/server/mempool.py @@ -282,7 +282,8 @@ class MemPool(util.LoggedClass): tx_fee = (sum(v for hashX, v in txin_pairs) - sum(v for hashX, v in txout_pairs)) tx, tx_hash = deserializer(raw_tx).read_tx() - unconfirmed = any(txin.prev_hash in self.txs for txin in tx.inputs) + unconfirmed = any(hash_to_str(txin.prev_hash) in self.txs + for txin in tx.inputs) result.append((hex_hash, tx_fee, unconfirmed)) return result diff --git a/server/session.py b/server/session.py index 490fc1c..38f3783 100644 --- a/server/session.py +++ b/server/session.py @@ -11,6 +11,7 @@ import codecs import time from functools import partial +from lib.hash import sha256, hash_to_str from lib.jsonrpc import JSONSession, RPCError, JSONRPCv2 from server.daemon import DaemonError import server.version as version @@ -115,6 +116,7 @@ class ElectrumX(SessionBase): self.max_send = self.env.max_send self.max_subs = self.env.max_session_subs self.hashX_subs = {} + self.mempool_statuses = {} self.electrumx_handlers = { 'blockchain.address.subscribe': self.address_subscribe, 'blockchain.headers.subscribe': self.headers_subscribe, @@ -135,29 +137,42 @@ class ElectrumX(SessionBase): Cache is a shared cache for this update. ''' - controller = self.controller pairs = [] + changed = [] + + matches = touched.intersection(self.hashX_subs) + for hashX in matches: + address = self.hashX_subs[hashX] + status = await self.address_status(hashX) + changed.append((address, status)) if height != self.notified_height: self.notified_height = height if self.subscribe_headers: - args = (controller.electrum_header(height), ) + args = (self.controller.electrum_header(height), ) pairs.append(('blockchain.headers.subscribe', args)) if self.subscribe_height: pairs.append(('blockchain.numblocks.subscribe', (height, ))) - matches = touched.intersection(self.hashX_subs) - for hashX in matches: - address = self.hashX_subs[hashX] - status = await controller.address_status(hashX) - pairs.append(('blockchain.address.subscribe', (address, status))) - - self.send_notifications(pairs) - if matches: - es = '' if len(matches) == 1 else 'es' - self.log_info('notified of {:,d} address{}' - .format(len(matches), es)) + # Check mempool hashXs - the status is a function of the + # confirmed state of other transactions + for hashX in set(self.mempool_statuses).difference(matches): + old_status = self.mempool_statuses[hashX] + status = await self.address_status(hashX) + if status != old_status: + address = self.hashX_subs[hashX] + changed.append((address, status)) + + for address_status in changed: + pairs.append(('blockchain.address.subscribe', address_status)) + + if pairs: + self.send_notifications(pairs) + if changed: + es = '' if len(changed) == 1 else 'es' + self.log_info('notified of {:,d} address{}' + .format(len(changed), es)) def height(self): '''Return the current flushed database height.''' @@ -191,6 +206,35 @@ class ElectrumX(SessionBase): '''Return the server peers as a list of (ip, host, details) tuples.''' return self.controller.peer_mgr.on_peers_subscribe(self.is_tor()) + async def address_status(self, hashX): + '''Returns an address status. + + Status is a hex string, but must be None if there is no history. + ''' + # Note history is ordered and mempool unordered in electrum-server + # For mempool, height is -1 if unconfirmed txins, otherwise 0 + history = await self.controller.get_history(hashX) + mempool = await self.controller.mempool_transactions(hashX) + + status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) + for tx_hash, height in history) + status += ''.join('{}:{:d}:'.format(hex_hash, -unconfirmed) + for hex_hash, tx_fee, unconfirmed in mempool) + for hex_hash, tx_fee, unconfirmed in mempool: + self.log_info('UNCONFIRMED: {} {}' + .format(self.hashX_subs[hashX], unconfirmed)) + if status: + status = sha256(status.encode()).hex() + else: + status = None + + if mempool: + self.mempool_statuses[hashX] = status + else: + self.mempool_statuses.pop(hashX, None) + + return status + async def address_subscribe(self, address): '''Subscribe to an address. @@ -199,10 +243,13 @@ class ElectrumX(SessionBase): if len(self.hashX_subs) >= self.max_subs: raise RPCError('your address subscription limit {:,d} reached' .format(self.max_subs)) + # Now let the controller check its limit - hashX, status = await self.controller.new_subscription(address) + self.controller.new_subscription(address) + + hashX = self.env.coin.address_to_hashX(address) self.hashX_subs[hashX] = address - return status + return await self.address_status(hashX) def server_features(self): '''Returns a dictionary of server features.'''