Browse Source

Merge branch 'develop'

master 0.9.9
Neil Booth 8 years ago
parent
commit
d8e64ea2fe
  1. 6
      RELEASE-NOTES
  2. 14
      server/block_processor.py
  3. 16
      server/daemon.py
  4. 30
      server/mempool.py
  5. 7
      server/protocol.py
  6. 2
      server/version.py

6
RELEASE-NOTES

@ -1,3 +1,9 @@
version 0.9.9
-------------
- prioritize mempool processing of sent txs. Closes issue 73.
- mempool tx processing needs to handle DBError exceptions. Fixes issue 74.
version 0.9.8 version 0.9.8
------------- -------------

14
server/block_processor.py

@ -67,8 +67,13 @@ class Prefetcher(LoggedClass):
async def main_loop(self): async def main_loop(self):
'''Loop forever polling for more blocks.''' '''Loop forever polling for more blocks.'''
self.logger.info('catching up to daemon height {:,d}...' daemon_height = await self.daemon.height()
.format(await self.daemon.height())) if daemon_height > self.fetched_height:
log_msg = 'catching up to daemon height {:,d}...'
else:
log_msg = 'caught up to daemon height {:,d}'
self.logger.info(log_msg.format(daemon_height))
while True: while True:
try: try:
secs = 0 secs = 0
@ -87,10 +92,7 @@ class Prefetcher(LoggedClass):
'''Prefetch blocks unless the prefetch queue is full.''' '''Prefetch blocks unless the prefetch queue is full.'''
# Refresh the mempool after updating the daemon height, if and # Refresh the mempool after updating the daemon height, if and
# only if we've caught up # only if we've caught up
daemon_height = await self.daemon.height() daemon_height = await self.daemon.height(mempool=self.caught_up)
if self.caught_up:
await self.daemon.refresh_mempool_hashes()
cache_room = self.target_cache_size // self.ave_size cache_room = self.target_cache_size // self.ave_size
with await self.semaphore: with await self.semaphore:
# Try and catch up all blocks but limit to room in cache. # Try and catch up all blocks but limit to room in cache.

16
server/daemon.py

@ -36,7 +36,7 @@ class Daemon(util.LoggedClass):
self.urls = urls self.urls = urls
self.url_index = 0 self.url_index = 0
self._height = None self._height = None
self.mempool_hashes = set() self._mempool_hashes = set()
self.mempool_refresh_event = asyncio.Event() self.mempool_refresh_event = asyncio.Event()
# Limit concurrent RPC calls to this number. # Limit concurrent RPC calls to this number.
# See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16 # See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16
@ -150,10 +150,9 @@ class Daemon(util.LoggedClass):
# Convert hex string to bytes # Convert hex string to bytes
return [bytes.fromhex(block) for block in blocks] return [bytes.fromhex(block) for block in blocks]
async def refresh_mempool_hashes(self): async def mempool_hashes(self):
'''Update our record of the daemon's mempool hashes.''' '''Update our record of the daemon's mempool hashes.'''
self.mempool_hashes = set(await self._send_single('getrawmempool')) return await self._send_single('getrawmempool')
self.mempool_refresh_event.set()
async def estimatefee(self, params): async def estimatefee(self, params):
'''Return the fee estimate for the given parameters.''' '''Return the fee estimate for the given parameters.'''
@ -187,11 +186,18 @@ class Daemon(util.LoggedClass):
'''Broadcast a transaction to the network.''' '''Broadcast a transaction to the network.'''
return await self._send_single('sendrawtransaction', params) return await self._send_single('sendrawtransaction', params)
async def height(self): async def height(self, mempool=False):
'''Query the daemon for its current height.''' '''Query the daemon for its current height.'''
self._height = await self._send_single('getblockcount') self._height = await self._send_single('getblockcount')
if mempool:
self._mempool_hashes = set(await self.mempool_hashes())
self.mempool_refresh_event.set()
return self._height return self._height
def cached_mempool_hashes(self):
'''Return the cached mempool hashes.'''
return self._mempool_hashes
def cached_height(self): def cached_height(self):
'''Return the cached daemon height. '''Return the cached daemon height.

30
server/mempool.py

@ -11,7 +11,6 @@ import asyncio
import itertools import itertools
import time import time
from collections import defaultdict from collections import defaultdict
from functools import partial
from lib.hash import hash_to_str, hex_str_to_hash from lib.hash import hash_to_str, hex_str_to_hash
from lib.tx import Deserializer from lib.tx import Deserializer
@ -40,10 +39,16 @@ class MemPool(util.LoggedClass):
self.db = db self.db = db
self.touched = set() self.touched = set()
self.touched_event = asyncio.Event() self.touched_event = asyncio.Event()
self.prioritized = set()
self.stop = False self.stop = False
self.txs = {} self.txs = {}
self.hash168s = defaultdict(set) # None can be a key self.hash168s = defaultdict(set) # None can be a key
def prioritize(self, tx_hash):
'''Prioritize processing the given hash. This is important during
initial mempool sync.'''
self.prioritized.add(tx_hash)
def resync_daemon_hashes(self, unprocessed, unfetched): def resync_daemon_hashes(self, unprocessed, unfetched):
'''Re-sync self.txs with the list of hashes in the daemon's mempool. '''Re-sync self.txs with the list of hashes in the daemon's mempool.
@ -54,7 +59,7 @@ class MemPool(util.LoggedClass):
hash168s = self.hash168s hash168s = self.hash168s
touched = self.touched touched = self.touched
hashes = self.daemon.mempool_hashes hashes = self.daemon.cached_mempool_hashes()
gone = set(txs).difference(hashes) gone = set(txs).difference(hashes)
for hex_hash in gone: for hex_hash in gone:
unfetched.discard(hex_hash) unfetched.discard(hex_hash)
@ -105,6 +110,7 @@ class MemPool(util.LoggedClass):
self.logger.info('{:,d} txs touching {:,d} addresses' self.logger.info('{:,d} txs touching {:,d} addresses'
.format(len(txs), len(self.hash168s))) .format(len(txs), len(self.hash168s)))
next_log = now + 150 next_log = now + 150
self.prioritized.clear()
await self.daemon.mempool_refresh_event.wait() await self.daemon.mempool_refresh_event.wait()
self.resync_daemon_hashes(unprocessed, unfetched) self.resync_daemon_hashes(unprocessed, unfetched)
@ -137,6 +143,11 @@ class MemPool(util.LoggedClass):
nonlocal pending nonlocal pending
raw_txs = {} raw_txs = {}
for hex_hash in self.prioritized:
if hex_hash in unprocessed:
raw_txs[hex_hash] = unprocessed.pop(hex_hash)
while unprocessed and len(raw_txs) < limit: while unprocessed and len(raw_txs) < limit:
hex_hash, raw_tx = unprocessed.popitem() hex_hash, raw_tx = unprocessed.popitem()
raw_txs[hex_hash] = raw_tx raw_txs[hex_hash] = raw_tx
@ -147,9 +158,9 @@ class MemPool(util.LoggedClass):
deferred = pending deferred = pending
pending = [] pending = []
process_raw_txs = partial(self.process_raw_txs, raw_txs, deferred) def job():
result, deferred = ( return self.process_raw_txs(raw_txs, deferred)
await loop.run_in_executor(None, process_raw_txs)) result, deferred = await loop.run_in_executor(None, job)
pending.extend(deferred) pending.extend(deferred)
hash168s = self.hash168s hash168s = self.hash168s
@ -231,10 +242,11 @@ class MemPool(util.LoggedClass):
elif not mempool_missing: elif not mempool_missing:
prev_hash = hex_str_to_hash(prev_hex_hash) prev_hash = hex_str_to_hash(prev_hex_hash)
txin_pairs.append(db_utxo_lookup(prev_hash, prev_idx)) txin_pairs.append(db_utxo_lookup(prev_hash, prev_idx))
except self.db.MissingUTXOError: except (self.db.MissingUTXOError, self.db.DBError):
# This typically happens just after the daemon has # DBError can happen when flushing a newly processed
# accepted a new block and the new mempool has deps on # block. MissingUTXOError typically happens just
# new txs in that block. # after the daemon has accepted a new block and the
# new mempool has deps on new txs in that block.
continue continue
if mempool_missing: if mempool_missing:

7
server/protocol.py

@ -104,6 +104,11 @@ class ServerManager(util.LoggedClass):
''' '''
return self.mempool.value(hash168) return self.mempool.value(hash168)
def sent_tx(self, tx_hash):
'''Call when a TX is sent. Tells mempool to prioritize it.'''
self.txs_sent += 1
self.mempool.prioritize(tx_hash)
def setup_bands(self): def setup_bands(self):
bands = [] bands = []
limit = self.env.bandwidth_limit limit = self.env.bandwidth_limit
@ -898,8 +903,8 @@ class ElectrumX(Session):
try: try:
tx_hash = await self.daemon.sendrawtransaction(params) tx_hash = await self.daemon.sendrawtransaction(params)
self.txs_sent += 1 self.txs_sent += 1
self.manager.txs_sent += 1
self.log_info('sent tx: {}'.format(tx_hash)) self.log_info('sent tx: {}'.format(tx_hash))
self.manager.sent_tx(tx_hash)
return tx_hash return tx_hash
except DaemonError as e: except DaemonError as e:
error = e.args[0] error = e.args[0]

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.9.8" VERSION = "ElectrumX 0.9.9"

Loading…
Cancel
Save