Browse Source

Rename get_utxos to all_utxos.

- no longer takes a limit
- runs in a thread to avoid blocking
patch-2
Neil Booth 7 years ago
parent
commit
a036a2eb3f
  1. 7
      contrib/query.py
  2. 15
      electrumx/server/chain_state.py
  3. 23
      electrumx/server/db.py
  4. 2
      electrumx/server/peers.py
  5. 4
      electrumx/server/session.py

7
contrib/query.py

@ -80,13 +80,16 @@ async def query(args):
if n is None: if n is None:
print('No history found') print('No history found')
n = None n = None
for n, utxo in enumerate(db.get_utxos(hashX, limit), start=1): utxos = await db.all_utxos(hashX)
for n, utxo in enumerate(utxos, start=1):
print(f'UTXO #{n:,d}: tx_hash {hash_to_hex_str(utxo.tx_hash)} ' print(f'UTXO #{n:,d}: tx_hash {hash_to_hex_str(utxo.tx_hash)} '
f'tx_pos {utxo.tx_pos:,d} height {utxo.height:,d} ' f'tx_pos {utxo.tx_pos:,d} height {utxo.height:,d} '
f'value {utxo.value:,d}') f'value {utxo.value:,d}')
if n == limit:
break
if n is None: if n is None:
print('No UTXOs found') print('No UTXOs found')
balance = db.get_balance(hashX) balance = sum(utxo.value for utxo in utxos)
print(f'Balance: {coin.decimal_value(balance):,f} {coin.SHORTNAME}') print(f'Balance: {coin.decimal_value(balance):,f} {coin.SHORTNAME}')

15
electrumx/server/chain_state.py

@ -27,6 +27,7 @@ class ChainState(object):
self.force_chain_reorg = self._bp.force_chain_reorg self.force_chain_reorg = self._bp.force_chain_reorg
self.tx_branch_and_root = self._bp.merkle.branch_and_root self.tx_branch_and_root = self._bp.merkle.branch_and_root
self.read_headers = self._bp.read_headers self.read_headers = self._bp.read_headers
self.all_utxos = self._bp.all_utxos
async def broadcast_transaction(self, raw_tx): async def broadcast_transaction(self, raw_tx):
return await self._daemon.sendrawtransaction([raw_tx]) return await self._daemon.sendrawtransaction([raw_tx])
@ -57,13 +58,6 @@ class ChainState(object):
return await run_in_thread(job) return await run_in_thread(job)
async def get_utxos(self, hashX):
'''Get UTXOs asynchronously to reduce latency.'''
def job():
return list(self._bp.get_utxos(hashX, limit=None))
return await run_in_thread(job)
def header_branch_and_root(self, length, height): def header_branch_and_root(self, length, height):
return self._bp.header_mc.branch_and_root(length, height) return self._bp.header_mc.branch_and_root(length, height)
@ -115,15 +109,18 @@ class ChainState(object):
if n is None: if n is None:
lines.append('No history found') lines.append('No history found')
n = None n = None
for n, utxo in enumerate(db.get_utxos(hashX, limit), start=1): utxos = await db.all_utxos(hashX)
for n, utxo in enumerate(utxos, start=1):
lines.append(f'UTXO #{n:,d}: tx_hash ' lines.append(f'UTXO #{n:,d}: tx_hash '
f'{hash_to_hex_str(utxo.tx_hash)} ' f'{hash_to_hex_str(utxo.tx_hash)} '
f'tx_pos {utxo.tx_pos:,d} height ' f'tx_pos {utxo.tx_pos:,d} height '
f'{utxo.height:,d} value {utxo.value:,d}') f'{utxo.height:,d} value {utxo.value:,d}')
if n == limit:
break
if n is None: if n is None:
lines.append('No UTXOs found') lines.append('No UTXOs found')
balance = db.get_balance(hashX) balance = sum(utxo.value for utxo in utxos)
lines.append(f'Balance: {coin.decimal_value(balance):,f} ' lines.append(f'Balance: {coin.decimal_value(balance):,f} '
f'{coin.SHORTNAME}') f'{coin.SHORTNAME}')

23
electrumx/server/db.py

@ -375,28 +375,25 @@ class DB(object):
with self.utxo_db.write_batch() as batch: with self.utxo_db.write_batch() as batch:
self.write_utxo_state(batch) self.write_utxo_state(batch)
def get_balance(self, hashX): async def all_utxos(self, hashX):
'''Returns the confirmed balance of an address.''' '''Return all UTXOs for an address sorted in no particular order. By
return sum(utxo.value for utxo in self.get_utxos(hashX, limit=None)) default yields at most 1000 entries.
def get_utxos(self, hashX, limit=1000):
'''Generator that yields all UTXOs for an address sorted in no
particular order. By default yields at most 1000 entries.
Set limit to None to get them all.
''' '''
limit = util.resolve_limit(limit) def read_utxos():
utxos = []
utxos_append = utxos.append
s_unpack = unpack s_unpack = unpack
# Key: b'u' + address_hashX + tx_idx + tx_num # Key: b'u' + address_hashX + tx_idx + tx_num
# Value: the UTXO value as a 64-bit unsigned integer # Value: the UTXO value as a 64-bit unsigned integer
prefix = b'u' + hashX prefix = b'u' + hashX
for db_key, db_value in self.utxo_db.iterator(prefix=prefix): for db_key, db_value in self.utxo_db.iterator(prefix=prefix):
if limit == 0:
return
limit -= 1
tx_pos, tx_num = s_unpack('<HI', db_key[-6:]) tx_pos, tx_num = s_unpack('<HI', db_key[-6:])
value, = unpack('<Q', db_value) value, = unpack('<Q', db_value)
tx_hash, height = self.fs_tx_hash(tx_num) tx_hash, height = self.fs_tx_hash(tx_num)
yield UTXO(tx_num, tx_pos, tx_hash, height, value) utxos_append(UTXO(tx_num, tx_pos, tx_hash, height, value))
return utxos
return await run_in_thread(read_utxos)
async def lookup_utxos(self, prevouts): async def lookup_utxos(self, prevouts):
'''For each prevout, lookup it up in the DB and return a (hashX, '''For each prevout, lookup it up in the DB and return a (hashX,

2
electrumx/server/peers.py

@ -17,7 +17,7 @@ from collections import defaultdict, Counter
from aiorpcx import (ClientSession, SOCKSProxy, from aiorpcx import (ClientSession, SOCKSProxy,
Notification, handler_invocation, Notification, handler_invocation,
SOCKSError, RPCError, TaskTimeout, SOCKSError, RPCError, TaskTimeout,
TaskGroup, run_in_thread, ignore_after, timeout_after) TaskGroup, ignore_after, timeout_after)
from electrumx.lib.peer import Peer from electrumx.lib.peer import Peer
from electrumx.lib.util import class_logger, protocol_tuple from electrumx.lib.util import class_logger, protocol_tuple

4
electrumx/server/session.py

@ -796,7 +796,7 @@ class ElectrumX(SessionBase):
async def hashX_listunspent(self, hashX): async def hashX_listunspent(self, hashX):
'''Return the list of UTXOs of a script hash, including mempool '''Return the list of UTXOs of a script hash, including mempool
effects.''' effects.'''
utxos = await self.chain_state.get_utxos(hashX) utxos = await self.chain_state.all_utxos(hashX)
utxos = sorted(utxos) utxos = sorted(utxos)
utxos.extend(await self.mempool.unordered_UTXOs(hashX)) utxos.extend(await self.mempool.unordered_UTXOs(hashX))
spends = await self.mempool.potential_spends(hashX) spends = await self.mempool.potential_spends(hashX)
@ -853,7 +853,7 @@ class ElectrumX(SessionBase):
return await self.hashX_subscribe(hashX, address) return await self.hashX_subscribe(hashX, address)
async def get_balance(self, hashX): async def get_balance(self, hashX):
utxos = await self.chain_state.get_utxos(hashX) utxos = await self.chain_state.all_utxos(hashX)
confirmed = sum(utxo.value for utxo in utxos) confirmed = sum(utxo.value for utxo in utxos)
unconfirmed = await self.mempool.balance_delta(hashX) unconfirmed = await self.mempool.balance_delta(hashX)
return {'confirmed': confirmed, 'unconfirmed': unconfirmed} return {'confirmed': confirmed, 'unconfirmed': unconfirmed}

Loading…
Cancel
Save