diff --git a/lib/coins.py b/lib/coins.py index 9a1d10c..81ba382 100644 --- a/lib/coins.py +++ b/lib/coins.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016, Neil Booth +# Copyright (c) 2016-2017, Neil Booth # # All rights reserved. # @@ -13,13 +13,14 @@ necessary for appropriate handling. from decimal import Decimal from functools import partial +from hashlib import sha256 import inspect import re import struct import sys -from lib.hash import Base58, hash160, double_sha256, hash_to_str -from lib.script import ScriptPubKey, Script +from lib.hash import Base58, hash160, ripemd160, double_sha256, hash_to_str +from lib.script import ScriptPubKey from lib.tx import Deserializer from lib.util import cachedproperty, subclasses @@ -37,9 +38,9 @@ class Coin(object): RPC_URL_REGEX = re.compile('.+@[^:]+(:[0-9]+)?') VALUE_PER_COIN = 100000000 CHUNK_SIZE=2016 - STRANGE_VERBYTE = 0xff IRC_SERVER = "irc.freenode.net" IRC_PORT = 6667 + HASHX_LEN = 11 @classmethod def lookup_coin_class(cls, name, net): @@ -70,20 +71,28 @@ class Coin(object): def daemon_urls(cls, urls): return [cls.sanitize_url(url) for url in urls.split(',')] + @classmethod + def hashX_from_script(cls, script): + '''Returns a hashX from a script.''' + script = ScriptPubKey.hashX_script(script) + if script is None: + return None + return sha256(script).digest()[:cls.HASHX_LEN] + @cachedproperty - def hash168_handlers(cls): + def address_handlers(cls): return ScriptPubKey.PayToHandlers( - address = cls.P2PKH_hash168_from_hash160, - script_hash = cls.P2SH_hash168_from_hash160, - pubkey = cls.P2PKH_hash168_from_pubkey, - unspendable = cls.hash168_from_unspendable, - strange = cls.hash168_from_strange, + address = cls.P2PKH_address_from_hash160, + script_hash = cls.P2SH_address_from_hash160, + pubkey = cls.P2PKH_address_from_pubkey, + unspendable = lambda : None, + strange = lambda script: None, ) @classmethod - def hash168_from_script(cls): - '''Returns a function that is passed a script to return a hash168.''' - return partial(ScriptPubKey.pay_to, cls.hash168_handlers) + def address_from_script(cls, script): + '''Given a pk_script, return the adddress it pays to, or None.''' + return ScriptPubKey.pay_to(cls.address_handlers, script) @staticmethod def lookup_xverbytes(verbytes): @@ -97,65 +106,26 @@ class Coin(object): raise CoinError('version bytes unrecognised') @classmethod - def address_to_hash168(cls, addr): - '''Return a 21-byte hash given an address. - - This is the hash160 prefixed by the address version byte. - ''' - result = Base58.decode_check(addr) - if len(result) != 21: - raise CoinError('invalid address: {}'.format(addr)) - return result - - @classmethod - def hash168_to_address(cls, hash168): - '''Return an address given a 21-byte hash.''' - return Base58.encode_check(hash168) - - @classmethod - def hash168_from_unspendable(cls): - '''Return a hash168 for an unspendable script.''' - return None - - @classmethod - def hash168_from_strange(cls, script): - '''Return a hash168 for a strange script.''' - return bytes([cls.STRANGE_VERBYTE]) + hash160(script) - - @classmethod - def P2PKH_hash168_from_hash160(cls, hash160): - '''Return a hash168 if hash160 is 160 bits otherwise None.''' - if len(hash160) == 20: - return bytes([cls.P2PKH_VERBYTE]) + hash160 - return None - - @classmethod - def P2PKH_hash168_from_pubkey(cls, pubkey): - return cls.P2PKH_hash168_from_hash160(hash160(pubkey)) + def address_to_hashX(cls, address): + '''Return a hashX given a coin address.''' + return cls.hashX_from_script(cls.pay_to_address_script(address)) @classmethod def P2PKH_address_from_hash160(cls, hash160): '''Return a P2PKH address given a public key.''' assert len(hash160) == 20 - return Base58.encode_check(cls.P2PKH_hash168_from_hash160(hash160)) + return Base58.encode_check(bytes([cls.P2PKH_VERBYTE]) + hash160) @classmethod def P2PKH_address_from_pubkey(cls, pubkey): '''Return a coin address given a public key.''' return cls.P2PKH_address_from_hash160(hash160(pubkey)) - @classmethod - def P2SH_hash168_from_hash160(cls, hash160): - '''Return a hash168 if hash160 is 160 bits otherwise None.''' - if len(hash160) == 20: - return bytes([cls.P2SH_VERBYTE]) + hash160 - return None - @classmethod def P2SH_address_from_hash160(cls, hash160): '''Return a coin address given a hash160.''' assert len(hash160) == 20 - return Base58.encode_check(cls.P2SH_hash168_from_hash160(hash160)) + return Base58.encode_check(bytes([cls.P2SH_VERBYTE]) + hash160) @classmethod def multisig_address(cls, m, pubkeys): @@ -195,7 +165,7 @@ class Coin(object): if len(raw) == 21: verbyte, hash_bytes = raw[0], raw[1:] - if verbyte == cls.P2PKH_VERYBYTE: + if verbyte == cls.P2PKH_VERBYTE: return ScriptPubKey.P2PKH_script(hash_bytes) if verbyte == cls.P2SH_VERBYTE: return ScriptPubKey.P2SH_script(hash_bytes) @@ -262,9 +232,9 @@ class Bitcoin(Coin): WIF_BYTE = 0x80 GENESIS_HASH=(b'000000000019d6689c085ae165831e93' b'4ff763ae46a2a6c172b3f1b60a8ce26f') - TX_COUNT = 142791895 - TX_COUNT_HEIGHT = 420976 - TX_PER_BLOCK = 1600 + TX_COUNT = 156335304 + TX_COUNT_HEIGHT = 429972 + TX_PER_BLOCK = 1800 IRC_PREFIX = "E_" IRC_CHANNEL = "#electrum" RPC_PORT = 8332 diff --git a/lib/jsonrpc.py b/lib/jsonrpc.py index 7301076..ccf46e1 100644 --- a/lib/jsonrpc.py +++ b/lib/jsonrpc.py @@ -290,8 +290,8 @@ class JSONRPC(asyncio.Protocol, LoggedClass): except TypeError: msg = 'JSON encoding failure: {}'.format(payload) self.log_error(msg) - return self.json_error(msg, self.INTERNAL_ERROR, - self.payload_id(payload)) + return self.send_json_error(msg, self.INTERNAL_ERROR, + self.payload_id(payload)) self.check_oversized_request(len(binary)) self.send_count += 1 diff --git a/lib/script.py b/lib/script.py index 6391c81..4c2b65a 100644 --- a/lib/script.py +++ b/lib/script.py @@ -12,6 +12,7 @@ import struct from collections import namedtuple from lib.enum import Enumeration +from lib.hash import hash160 class ScriptError(Exception): @@ -82,6 +83,27 @@ class ScriptPubKey(object): PayToHandlers = namedtuple('PayToHandlers', 'address script_hash pubkey ' 'unspendable strange') + @classmethod + def hashX_script(cls, script): + '''Return None if the script is provably unspendable. Return a + pay-to-pubkey-hash script if it is pay-to-pubkey, otherwise + return script. + ''' + if script: + op = script[0] + if op == OpCodes.OP_RETURN: + return None + if op <= OpCodes.OP_PUSHDATA4: + try: + ops = Script.get_ops(script) + except ScriptError: + pass + else: + if _match_ops(ops, cls.TO_PUBKEY_OPS): + pubkey = ops[0][1] + script = ScriptPubKey.P2PKH_script(hash160(pubkey)) + return script + @classmethod def pay_to(cls, handlers, script): '''Parse a script, invoke the appropriate handler and diff --git a/query.py b/query.py index 671b91d..05ae277 100755 --- a/query.py +++ b/query.py @@ -26,15 +26,17 @@ def count_entries(db): utxos += 1 print("UTXO count:", utxos) - hash168 = 0 + hashX = 0 for key in db.iterator(prefix=b'h', include_value=False): - hash168 += 1 - print("Hash168 count:", hash168) + hashX += 1 + print("HashX count:", hashX) hist = 0 - for key in db.iterator(prefix=b'H', include_value=False): + hist_len = 0 + for key, value in db.iterator(prefix=b'H'): hist += 1 - print("History rows:", hist) + hist_len += len(value) // 4 + print("History rows {:,d} entries {:,d}", hist, hist_len) def main(): @@ -52,27 +54,20 @@ def main(): limit = 10 for addr in sys.argv[argc:]: print('Address: ', addr) - hash168 = coin.address_to_hash168(addr) - - hist = 0 - hist_len = 0 - for key, value in bp.db.iterator(prefix=b'H'+hash168): - hist += 1 - hist_len += len(value) // 4 - print("History: {:,d} rows with {:,d} entries".format(hist, hist_len)) + hashX = coin.address_to_hashX(addr) n = None - for n, (tx_hash, height) in enumerate(bp.get_history(hash168, limit)): + for n, (tx_hash, height) in enumerate(bp.get_history(hashX, limit)): print('History #{:d}: hash: {} height: {:d}' .format(n + 1, hash_to_str(tx_hash), height)) n = None - for n, utxo in enumerate(bp.get_utxos(hash168, limit)): + for n, utxo in enumerate(bp.get_utxos(hashX, limit)): print('UTXOs #{:d}: hash: {} pos: {:d} height: {:d} value: {:d}' .format(n + 1, hash_to_str(utxo.tx_hash), utxo.tx_pos, utxo.height, utxo.value)) if n is None: print('No UTXOs') - balance = bp.get_balance(hash168) + balance = bp.get_balance(hashX) print('Balance: {} {}'.format(coin.decimal_value(balance), coin.SHORTNAME)) diff --git a/server/block_processor.py b/server/block_processor.py index 8faa7a8..40b2324 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -172,8 +172,7 @@ class BlockProcessor(server.db.DB): self.caught_up_event = asyncio.Event() # Meta - self.utxo_MB = env.utxo_MB - self.hist_MB = env.hist_MB + self.cache_MB = env.cache_MB self.next_cache_check = 0 # Headers and tx_hashes have one entry per block @@ -194,10 +193,8 @@ class BlockProcessor(server.db.DB): # Log state if self.first_sync: - self.logger.info('flushing UTXO cache at {:,d} MB' - .format(self.utxo_MB)) - self.logger.info('flushing history cache at {:,d} MB' - .format(self.hist_MB)) + self.logger.info('flushing DB cache at {:,d} MB' + .format(self.cache_MB)) async def main_loop(self, touched): '''Main loop for block processing.''' @@ -242,7 +239,7 @@ class BlockProcessor(server.db.DB): touched.clear() if time.time() > self.next_cache_check: self.check_cache_size() - self.next_cache_check = time.time() + 60 + self.next_cache_check = time.time() + 30 if not self.first_sync: s = '' if len(blocks) == 1 else 's' @@ -391,8 +388,8 @@ class BlockProcessor(server.db.DB): flush_id = pack('>H', self.flush_count) - for hash168, hist in self.history.items(): - key = b'H' + hash168 + flush_id + for hashX, hist in self.history.items(): + key = b'H' + hashX + flush_id batch.put(key, hist.tobytes()) if self.first_sync: @@ -415,10 +412,10 @@ class BlockProcessor(server.db.DB): self.tx_hashes = [] self.headers = [] - def backup_flush(self, hash168s): + def backup_flush(self, hashXs): '''Like flush() but when backing up. All UTXOs are flushed. - hash168s - sequence of hash168s which were touched by backing + hashXs - sequence of hashXs which were touched by backing up. Searched for history entries to remove after the backup height. ''' @@ -430,7 +427,7 @@ class BlockProcessor(server.db.DB): with self.db.write_batch() as batch: # Flush state last as it reads the wall time. - self.backup_history(batch, hash168s) + self.backup_history(batch, hashXs) self.flush_utxos(batch) self.flush_state(batch) @@ -444,10 +441,10 @@ class BlockProcessor(server.db.DB): self.last_flush - flush_start, self.height, self.tx_count)) - def backup_history(self, batch, hash168s): + def backup_history(self, batch, hashXs): nremoves = 0 - for hash168 in sorted(hash168s): - prefix = b'H' + hash168 + for hashX in sorted(hashXs): + prefix = b'H' + hashX deletes = [] puts = {} for key, hist in self.db.iterator(prefix=prefix, reverse=True): @@ -472,17 +469,15 @@ class BlockProcessor(server.db.DB): assert not self.tx_hashes self.logger.info('backing up removed {:,d} history entries from ' - '{:,d} addresses'.format(nremoves, len(hash168s))) + '{:,d} addresses'.format(nremoves, len(hashXs))) def check_cache_size(self): '''Flush a cache if it gets too big.''' # Good average estimates based on traversal of subobjects and - # requesting size from Python (see deep_getsizeof). For - # whatever reason Python O/S mem usage is typically +30% or - # more, so we scale our already bloated object sizes. - one_MB = int(1048576 / 1.3) - utxo_cache_size = len(self.utxo_cache) * 187 - db_deletes_size = len(self.db_deletes) * 61 + # requesting size from Python (see deep_getsizeof). + one_MB = 1000*1000 + utxo_cache_size = len(self.utxo_cache) * 205 + db_deletes_size = len(self.db_deletes) * 57 hist_cache_size = len(self.history) * 180 + self.history_size * 4 tx_hash_size = (self.tx_count - self.fs_tx_count) * 74 utxo_MB = (db_deletes_size + utxo_cache_size) // one_MB @@ -493,9 +488,10 @@ class BlockProcessor(server.db.DB): .format(self.height, self.daemon.cached_height(), utxo_MB, hist_MB)) - # Flush if a cache is too big - if utxo_MB >= self.utxo_MB or hist_MB >= self.hist_MB: - self.flush(utxo_MB >= self.utxo_MB) + # Flush history if it takes up over 20% of cache memory. + # Flush UTXOs once they take up 80% of cache memory. + if utxo_MB + hist_MB >= self.cache_MB or hist_MB >= self.cache_MB // 5: + self.flush(utxo_MB >= self.cache_MB * 4 // 5) def fs_advance_block(self, header, tx_hashes, txs): '''Update unflushed FS state for a new block.''' @@ -525,15 +521,15 @@ class BlockProcessor(server.db.DB): history = self.history history_size = self.history_size tx_num = self.tx_count - script_hash168 = self.coin.hash168_from_script() + script_hashX = self.coin.hashX_from_script s_pack = pack put_utxo = self.utxo_cache.__setitem__ spend_utxo = self.spend_utxo undo_info_append = undo_info.append for tx, tx_hash in zip(txs, tx_hashes): - hash168s = set() - add_hash168 = hash168s.add + hashXs = set() + add_hashX = hashXs.add tx_numb = s_pack(' 1: @@ -715,15 +713,15 @@ class BlockProcessor(server.db.DB): assert hash is not None # Should always be found continue - # Key: b'u' + address_hash168 + tx_idx + tx_num + # Key: b'u' + address_hashX + tx_idx + tx_num # Value: the UTXO value as a 64-bit unsigned integer - udb_key = b'u' + hash168 + hdb_key[-6:] + udb_key = b'u' + hashX + hdb_key[-6:] utxo_value_packed = self.db.get(udb_key) if utxo_value_packed: # Remove both entries for this UTXO self.db_deletes.append(hdb_key) self.db_deletes.append(udb_key) - return hash168 + tx_num_packed + utxo_value_packed + return hashX + tx_num_packed + utxo_value_packed raise ChainError('UTXO {} / {:,d} not found in "h" table' .format(hash_to_str(tx_hash), tx_idx)) @@ -743,11 +741,11 @@ class BlockProcessor(server.db.DB): batch_put = batch.put for cache_key, cache_value in self.utxo_cache.items(): - # suffix = tx_num + tx_idx - hash168 = cache_value[:21] - suffix = cache_key[-2:] + cache_value[21:25] - batch_put(b'h' + cache_key[:4] + suffix, hash168) - batch_put(b'u' + hash168 + suffix, cache_value[25:]) + # suffix = tx_idx + tx_num + hashX = cache_value[:-12] + suffix = cache_key[-2:] + cache_value[-12:-8] + batch_put(b'h' + cache_key[:4] + suffix, hashX) + batch_put(b'u' + hashX + suffix, cache_value[-8:]) if self.first_sync: self.logger.info('flushed {:,d} blocks with {:,d} txs, {:,d} UTXO ' diff --git a/server/controller.py b/server/controller.py index 1a4b06a..3febb59 100644 --- a/server/controller.py +++ b/server/controller.py @@ -73,20 +73,20 @@ class Controller(util.LoggedClass): env.max_send = max(350000, env.max_send) self.setup_bands() - async def mempool_transactions(self, hash168): + async def mempool_transactions(self, hashX): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool - entries for the hash168. + entries for the hashX. unconfirmed is True if any txin is unconfirmed. ''' - return await self.mempool.transactions(hash168) + return await self.mempool.transactions(hashX) - def mempool_value(self, hash168): - '''Return the unconfirmed amount in the mempool for hash168. + def mempool_value(self, hashX): + '''Return the unconfirmed amount in the mempool for hashX. Can be positive or negative. ''' - return self.mempool.value(hash168) + return self.mempool.value(hashX) def sent_tx(self, tx_hash): '''Call when a TX is sent. Tells mempool to prioritize it.''' @@ -255,8 +255,8 @@ class Controller(util.LoggedClass): # Invalidate caches hc = self.history_cache - for hash168 in set(hc).intersection(touched): - del hc[hash168] + for hashX in set(hc).intersection(touched): + del hc[hashX] if self.bp.db_height != self.height: self.height = self.bp.db_height self.header_cache.clear() @@ -286,10 +286,10 @@ class Controller(util.LoggedClass): self.header_cache[height] = header return header - async def async_get_history(self, hash168): + async def async_get_history(self, hashX): '''Get history asynchronously to reduce latency.''' - if hash168 in self.history_cache: - return self.history_cache[hash168] + if hashX in self.history_cache: + return self.history_cache[hashX] def job(): # History DoS limit. Each element of history is about 99 @@ -297,11 +297,11 @@ class Controller(util.LoggedClass): # on bloated history requests, and uses a smaller divisor # so large requests are logged before refusing them. limit = self.env.max_send // 97 - return list(self.bp.get_history(hash168, limit=limit)) + return list(self.bp.get_history(hashX, limit=limit)) loop = asyncio.get_event_loop() history = await loop.run_in_executor(None, job) - self.history_cache[hash168] = history + self.history_cache[hashX] = history return history async def shutdown(self): diff --git a/server/db.py b/server/db.py index b5ec12d..143d4e9 100644 --- a/server/db.py +++ b/server/db.py @@ -31,7 +31,7 @@ class DB(LoggedClass): it was shutdown uncleanly. ''' - DB_VERSIONS = [3] + DB_VERSIONS = [4] class MissingUTXOError(Exception): '''Raised if a mempool tx input UTXO couldn't be found.''' @@ -319,7 +319,7 @@ class DB(LoggedClass): assert isinstance(limit, int) and limit >= 0 return limit - def get_history(self, hash168, limit=1000): + def get_history(self, hashX, limit=1000): '''Generator that returns an unpruned, sorted list of (tx_hash, height) tuples of confirmed transactions that touched the address, earliest in the blockchain first. Includes both spending and @@ -327,7 +327,7 @@ class DB(LoggedClass): Set limit to None to get them all. ''' limit = self._resolve_limit(limit) - prefix = b'H' + hash168 + prefix = b'H' + hashX for key, hist in self.db.iterator(prefix=prefix): a = array.array('I') a.frombytes(hist) @@ -337,20 +337,20 @@ class DB(LoggedClass): yield self.fs_tx_hash(tx_num) limit -= 1 - def get_balance(self, hash168): + def get_balance(self, hashX): '''Returns the confirmed balance of an address.''' - return sum(utxo.value for utxo in self.get_utxos(hash168, limit=None)) + return sum(utxo.value for utxo in self.get_utxos(hashX, limit=None)) - def get_utxos(self, hash168, limit=1000): + def get_utxos(self, hashX, limit=1000): '''Generator that yields all UTXOs for an address sorted in no particular order. By default yields at most 1000 entries. Set limit to None to get them all. ''' limit = self._resolve_limit(limit) s_unpack = unpack - # Key: b'u' + address_hash168 + tx_idx + tx_num + # Key: b'u' + address_hashX + tx_idx + tx_num # Value: the UTXO value as a 64-bit unsigned integer - prefix = b'u' + hash168 + prefix = b'u' + hashX for db_key, db_value in self.db.iterator(prefix=prefix): if limit == 0: return @@ -360,56 +360,43 @@ class DB(LoggedClass): tx_hash, height = self.fs_tx_hash(tx_num) yield UTXO(tx_num, tx_pos, tx_hash, height, value) - def get_utxo_hash168(self, tx_hash, index): - '''Returns the hash168 for a UTXO. - - Used only for electrum client command-line requests. - ''' - hash168 = None - if 0 <= index <= 65535: - idx_packed = pack(' (txin_pairs, txout_pairs) - hash168 -> set of all tx hashes in which the hash168 appears + hashX -> set of all tx hashes in which the hashX appears - A pair is a (hash168, value) tuple. tx hashes are hex strings. + A pair is a (hashX, value) tuple. tx hashes are hex strings. ''' def __init__(self, daemon, coin, db): @@ -42,7 +42,7 @@ class MemPool(util.LoggedClass): self.prioritized = set() self.stop = False self.txs = {} - self.hash168s = defaultdict(set) # None can be a key + self.hashXs = defaultdict(set) # None can be a key def prioritize(self, tx_hash): '''Prioritize processing the given hash. This is important during @@ -56,7 +56,7 @@ class MemPool(util.LoggedClass): unfetched. Add new ones to unprocessed. ''' txs = self.txs - hash168s = self.hash168s + hashXs = self.hashXs touched = self.touched hashes = self.daemon.cached_mempool_hashes() @@ -67,13 +67,13 @@ class MemPool(util.LoggedClass): item = txs.pop(hex_hash) if item: txin_pairs, txout_pairs = item - tx_hash168s = set(hash168 for hash168, value in txin_pairs) - tx_hash168s.update(hash168 for hash168, value in txout_pairs) - for hash168 in tx_hash168s: - hash168s[hash168].remove(hex_hash) - if not hash168s[hash168]: - del hash168s[hash168] - touched.update(tx_hash168s) + tx_hashXs = set(hashX for hashX, value in txin_pairs) + tx_hashXs.update(hashX for hashX, value in txout_pairs) + for hashX in tx_hashXs: + hashXs[hashX].remove(hex_hash) + if not hashXs[hashX]: + del hashXs[hashX] + touched.update(tx_hashXs) new = hashes.difference(txs) unfetched.update(new) @@ -114,7 +114,7 @@ class MemPool(util.LoggedClass): now = time.time() if now >= next_log and loops: self.logger.info('{:,d} txs touching {:,d} addresses' - .format(len(txs), len(self.hash168s))) + .format(len(txs), len(self.hashXs))) next_log = now + 150 try: @@ -168,14 +168,14 @@ class MemPool(util.LoggedClass): result, deferred = await loop.run_in_executor(None, job) pending.extend(deferred) - hash168s = self.hash168s + hashXs = self.hashXs touched = self.touched for hex_hash, in_out_pairs in result.items(): if hex_hash in txs: txs[hex_hash] = in_out_pairs - for hash168, value in itertools.chain(*in_out_pairs): - touched.add(hash168) - hash168s[hash168].add(hex_hash) + for hashX, value in itertools.chain(*in_out_pairs): + touched.add(hashX) + hashXs[hashX].add(hex_hash) return process @@ -199,7 +199,7 @@ class MemPool(util.LoggedClass): variables it doesn't own. Atomic reads of self.txs that do not depend on the result remaining the same are fine. ''' - script_hash168 = self.coin.hash168_from_script() + script_hashX = self.coin.hashX_from_script db_utxo_lookup = self.db.db_utxo_lookup txs = self.txs @@ -209,8 +209,8 @@ class MemPool(util.LoggedClass): continue tx = Deserializer(raw_tx).read_tx() - # Convert the tx outputs into (hash168, value) pairs - txout_pairs = [(script_hash168(txout.pk_script), txout.value) + # Convert the tx outputs into (hashX, value) pairs + txout_pairs = [(script_hashX(txout.pk_script), txout.value) for txout in tx.outputs] # Convert the tx inputs to ([prev_hex_hash, prev_idx) pairs @@ -261,17 +261,17 @@ class MemPool(util.LoggedClass): return result, deferred - async def transactions(self, hash168): + async def transactions(self, hashX): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool - entries for the hash168. + entries for the hashX. unconfirmed is True if any txin is unconfirmed. ''' - # hash168s is a defaultdict - if not hash168 in self.hash168s: + # hashXs is a defaultdict + if not hashX in self.hashXs: return [] - hex_hashes = self.hash168s[hash168] + hex_hashes = self.hashXs[hashX] raw_txs = await self.daemon.getrawtransactions(hex_hashes) result = [] for hex_hash, raw_tx in zip(hex_hashes, raw_txs): @@ -279,23 +279,23 @@ class MemPool(util.LoggedClass): if not item or not raw_tx: continue txin_pairs, txout_pairs = item - tx_fee = (sum(v for hash168, v in txin_pairs) - - sum(v for hash168, v in txout_pairs)) + tx_fee = (sum(v for hashX, v in txin_pairs) + - sum(v for hashX, v in txout_pairs)) tx = Deserializer(raw_tx).read_tx() unconfirmed = any(txin.prev_hash in self.txs for txin in tx.inputs) result.append((hex_hash, tx_fee, unconfirmed)) return result - def value(self, hash168): - '''Return the unconfirmed amount in the mempool for hash168. + def value(self, hashX): + '''Return the unconfirmed amount in the mempool for hashX. Can be positive or negative. ''' value = 0 - # hash168s is a defaultdict - if hash168 in self.hash168s: - for hex_hash in self.hash168s[hash168]: + # hashXs is a defaultdict + if hashX in self.hashXs: + for hex_hash in self.hashXs[hashX]: txin_pairs, txout_pairs = self.txs[hex_hash] - value -= sum(v for h168, v in txin_pairs if h168 == hash168) - value += sum(v for h168, v in txout_pairs if h168 == hash168) + value -= sum(v for h168, v in txin_pairs if h168 == hashX) + value += sum(v for h168, v in txout_pairs if h168 == hashX) return value diff --git a/server/protocol.py b/server/protocol.py index bd3d783..1b0516f 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -14,6 +14,7 @@ import traceback from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.jsonrpc import JSONRPC +from lib.tx import Deserializer from server.daemon import DaemonError from server.version import VERSION @@ -138,17 +139,17 @@ class Session(JSONRPC): raise self.RPCError('parameter should be a transaction hash: {}' .format(param)) - def param_to_hash168(self, param): + def param_to_hashX(self, param): if isinstance(param, str): try: - return self.coin.address_to_hash168(param) + return self.coin.address_to_hashX(param) except: pass raise self.RPCError('param {} is not a valid address'.format(param)) - def params_to_hash168(self, params): + def params_to_hashX(self, params): if len(params) == 1: - return self.param_to_hash168(params[0]) + return self.param_to_hashX(params[0]) raise self.RPCError('params {} should contain a single address' .format(params)) @@ -162,7 +163,7 @@ class ElectrumX(Session): self.subscribe_height = False self.notified_height = None self.max_subs = self.env.max_session_subs - self.hash168s = set() + self.hashX_subs = {} rpcs = [ ('blockchain', 'address.get_balance address.get_history address.get_mempool ' @@ -179,7 +180,7 @@ class ElectrumX(Session): for suffix in suffixes.split()} def sub_count(self): - return len(self.hash168s) + return len(self.hashX_subs) async def notify(self, height, touched): '''Notify the client about changes in height and touched addresses. @@ -202,11 +203,10 @@ class ElectrumX(Session): ) self.encode_and_send_payload(payload) - hash168_to_address = self.coin.hash168_to_address - matches = self.hash168s.intersection(touched) - for hash168 in matches: - address = hash168_to_address(hash168) - status = await self.address_status(hash168) + matches = touched.intersection(self.hashX_subs) + for hashX in matches: + address = self.hashX_subs[hashX] + status = await self.address_status(hashX) payload = self.notification_payload( 'blockchain.address.subscribe', (address, status)) self.encode_and_send_payload(payload) @@ -222,12 +222,12 @@ class ElectrumX(Session): '''Used as response to a headers subscription request.''' return self.manager.electrum_header(self.height()) - async def address_status(self, hash168): + async def address_status(self, hashX): '''Returns status as 32 bytes.''' # Note history is ordered and mempool unordered in electrum-server # For mempool, height is -1 if unconfirmed txins, otherwise 0 - history = await self.manager.async_get_history(hash168) - mempool = await self.manager.mempool_transactions(hash168) + history = await self.manager.async_get_history(hashX) + mempool = await self.manager.mempool_transactions(hashX) status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) for tx_hash, height in history) @@ -262,20 +262,20 @@ class ElectrumX(Session): return {"block_height": height, "merkle": merkle_branch, "pos": pos} - async def unconfirmed_history(self, hash168): + async def unconfirmed_history(self, hashX): # Note unconfirmed history is unordered in electrum-server # Height is -1 if unconfirmed txins, otherwise 0 - mempool = await self.manager.mempool_transactions(hash168) + mempool = await self.manager.mempool_transactions(hashX) return [{'tx_hash': tx_hash, 'height': -unconfirmed, 'fee': fee} for tx_hash, fee, unconfirmed in mempool] - async def get_history(self, hash168): + async def get_history(self, hashX): # Note history is ordered but unconfirmed is unordered in e-s - history = await self.manager.async_get_history(hash168) + history = await self.manager.async_get_history(hashX) conf = [{'tx_hash': hash_to_str(tx_hash), 'height': height} for tx_hash, height in history] - return conf + await self.unconfirmed_history(hash168) + return conf + await self.unconfirmed_history(hashX) def get_chunk(self, index): '''Return header chunk as hex. Index is a non-negative integer.''' @@ -285,55 +285,55 @@ class ElectrumX(Session): count = min(next_height - start_height, chunk_size) return self.bp.read_headers(start_height, count).hex() - async def get_utxos(self, hash168): + async def get_utxos(self, hashX): '''Get UTXOs asynchronously to reduce latency.''' def job(): - return list(self.bp.get_utxos(hash168, limit=None)) + return list(self.bp.get_utxos(hashX, limit=None)) loop = asyncio.get_event_loop() return await loop.run_in_executor(None, job) - async def get_balance(self, hash168): - utxos = await self.get_utxos(hash168) + async def get_balance(self, hashX): + utxos = await self.get_utxos(hashX) confirmed = sum(utxo.value for utxo in utxos) - unconfirmed = self.manager.mempool_value(hash168) + unconfirmed = self.manager.mempool_value(hashX) return {'confirmed': confirmed, 'unconfirmed': unconfirmed} - async def list_unspent(self, hash168): + async def list_unspent(self, hashX): return [{'tx_hash': hash_to_str(utxo.tx_hash), 'tx_pos': utxo.tx_pos, 'height': utxo.height, 'value': utxo.value} - for utxo in sorted(await self.get_utxos(hash168))] + for utxo in sorted(await self.get_utxos(hashX))] # --- blockchain commands async def address_get_balance(self, params): - hash168 = self.params_to_hash168(params) - return await self.get_balance(hash168) + hashX = self.params_to_hashX(params) + return await self.get_balance(hashX) async def address_get_history(self, params): - hash168 = self.params_to_hash168(params) - return await self.get_history(hash168) + hashX = self.params_to_hashX(params) + return await self.get_history(hashX) async def address_get_mempool(self, params): - hash168 = self.params_to_hash168(params) - return await self.unconfirmed_history(hash168) + hashX = self.params_to_hashX(params) + return await self.unconfirmed_history(hashX) async def address_get_proof(self, params): - hash168 = self.params_to_hash168(params) + hashX = self.params_to_hashX(params) raise self.RPCError('get_proof is not yet implemented') async def address_listunspent(self, params): - hash168 = self.params_to_hash168(params) - return await self.list_unspent(hash168) + hashX = self.params_to_hashX(params) + return await self.list_unspent(hashX) async def address_subscribe(self, params): - hash168 = self.params_to_hash168(params) - if len(self.hash168s) >= self.max_subs: + hashX = self.params_to_hashX(params) + if len(self.hashX_subs) >= self.max_subs: raise self.RPCError('your address subscription limit {:,d} reached' .format(self.max_subs)) - result = await self.address_status(hash168) + result = await self.address_status(hashX) # add_subscription can raise so call it before adding self.manager.new_subscription() - self.hash168s.add(hash168) + self.hashX_subs[hashX] = params[0] return result async def block_get_chunk(self, params): @@ -414,14 +414,23 @@ class ElectrumX(Session): 'and height') async def utxo_get_address(self, params): + '''Returns the address for a TXO. + + Used only for electrum client command-line requests. We no + longer index by address, so need to request the raw + transaction. So it works for any TXO not just UTXOs. + ''' if len(params) == 2: tx_hash = self.param_to_tx_hash(params[0]) index = self.param_to_non_negative_integer(params[1]) - tx_hash = hex_str_to_hash(tx_hash) - hash168 = self.bp.get_utxo_hash168(tx_hash, index) - if hash168: - return self.coin.hash168_to_address(hash168) - return None + raw_tx = await self.daemon_request('getrawtransaction', tx_hash) + if not raw_tx: + return None + raw_tx = bytes.fromhex(raw_tx) + tx = Deserializer(raw_tx).read_tx() + if index >= len(tx.outputs): + return None + return self.coin.address_from_script(tx.outputs[index].pk_script) raise self.RPCError('params should contain a transaction hash ' 'and index')