Browse Source

Prioritize mempool processing of sent txs

Closes #73
master
Neil Booth 8 years ago
parent
commit
5c80b96d0f
  1. 2
      server/block_processor.py
  2. 19
      server/mempool.py
  3. 7
      server/protocol.py

2
server/block_processor.py

@ -68,7 +68,7 @@ class Prefetcher(LoggedClass):
async def main_loop(self):
'''Loop forever polling for more blocks.'''
daemon_height = await self.daemon.height()
if daemon_height > height:
if daemon_height > self.fetched_height:
log_msg = 'catching up to daemon height {:,d}...'
else:
log_msg = 'caught up to daemon height {:,d}'

19
server/mempool.py

@ -11,7 +11,6 @@ import asyncio
import itertools
import time
from collections import defaultdict
from functools import partial
from lib.hash import hash_to_str, hex_str_to_hash
from lib.tx import Deserializer
@ -40,10 +39,16 @@ class MemPool(util.LoggedClass):
self.db = db
self.touched = set()
self.touched_event = asyncio.Event()
self.prioritized = set()
self.stop = False
self.txs = {}
self.hash168s = defaultdict(set) # None can be a key
def prioritize(self, tx_hash):
'''Prioritize processing the given hash. This is important during
initial mempool sync.'''
self.prioritized.add(tx_hash)
def resync_daemon_hashes(self, unprocessed, unfetched):
'''Re-sync self.txs with the list of hashes in the daemon's mempool.
@ -105,6 +110,7 @@ class MemPool(util.LoggedClass):
self.logger.info('{:,d} txs touching {:,d} addresses'
.format(len(txs), len(self.hash168s)))
next_log = now + 150
self.prioritized.clear()
await self.daemon.mempool_refresh_event.wait()
self.resync_daemon_hashes(unprocessed, unfetched)
@ -137,6 +143,11 @@ class MemPool(util.LoggedClass):
nonlocal pending
raw_txs = {}
for hex_hash in self.prioritized:
if hex_hash in unprocessed:
raw_txs[hex_hash] = unprocessed.pop(hex_hash)
while unprocessed and len(raw_txs) < limit:
hex_hash, raw_tx = unprocessed.popitem()
raw_txs[hex_hash] = raw_tx
@ -147,9 +158,9 @@ class MemPool(util.LoggedClass):
deferred = pending
pending = []
process_raw_txs = partial(self.process_raw_txs, raw_txs, deferred)
result, deferred = (
await loop.run_in_executor(None, process_raw_txs))
def job():
return self.process_raw_txs(raw_txs, deferred)
result, deferred = await loop.run_in_executor(None, job)
pending.extend(deferred)
hash168s = self.hash168s

7
server/protocol.py

@ -104,6 +104,11 @@ class ServerManager(util.LoggedClass):
'''
return self.mempool.value(hash168)
def sent_tx(self, tx_hash):
'''Call when a TX is sent. Tells mempool to prioritize it.'''
self.txs_sent += 1
self.mempool.prioritize(tx_hash)
def setup_bands(self):
bands = []
limit = self.env.bandwidth_limit
@ -898,8 +903,8 @@ class ElectrumX(Session):
try:
tx_hash = await self.daemon.sendrawtransaction(params)
self.txs_sent += 1
self.manager.txs_sent += 1
self.log_info('sent tx: {}'.format(tx_hash))
self.manager.sent_tx(tx_hash)
return tx_hash
except DaemonError as e:
error = e.args[0]

Loading…
Cancel
Save