Browse Source

Report unconfirmed parents correctly.

Also, send a notification to the client if the unconfirmed status
of any parent changes.

Fixes #129
master
Neil Booth 8 years ago
parent
commit
f3cdd97ff9
  1. 22
      server/controller.py
  2. 3
      server/mempool.py
  3. 77
      server/session.py

22
server/controller.py

@ -20,7 +20,7 @@ from functools import partial
import pylru
from lib.jsonrpc import JSONRPC, JSONSessionBase, RPCError
from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
from lib.hash import double_sha256, hash_to_str, hex_str_to_hash
import lib.util as util
from server.block_processor import BlockProcessor
from server.daemon import Daemon, DaemonError
@ -706,16 +706,13 @@ class Controller(util.LoggedClass):
except DaemonError as e:
raise RPCError('daemon error: {}'.format(e))
async def new_subscription(self, address):
def new_subscription(self, address):
if self.subs_room <= 0:
self.subs_room = self.max_subs - self.sub_count()
if self.subs_room <= 0:
raise RPCError('server subscription limit {:,d} reached'
.format(self.max_subs))
self.subs_room -= 1
hashX = self.address_to_hashX(address)
status = await self.address_status(hashX)
return hashX, status
async def tx_merkle(self, tx_hash, height):
'''tx_hash is a hex string.'''
@ -779,21 +776,6 @@ class Controller(util.LoggedClass):
for tx_hash, height in history]
return conf + await self.unconfirmed_history(hashX)
async def address_status(self, hashX):
'''Returns status as 32 bytes.'''
# Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if unconfirmed txins, otherwise 0
history = await self.get_history(hashX)
mempool = await self.mempool_transactions(hashX)
status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height)
for tx_hash, height in history)
status += ''.join('{}:{:d}:'.format(hex_hash, -unconfirmed)
for hex_hash, tx_fee, unconfirmed in mempool)
if status:
return sha256(status.encode()).hex()
return None
async def get_utxos(self, hashX):
'''Get UTXOs asynchronously to reduce latency.'''
def job():

3
server/mempool.py

@ -282,7 +282,8 @@ class MemPool(util.LoggedClass):
tx_fee = (sum(v for hashX, v in txin_pairs) -
sum(v for hashX, v in txout_pairs))
tx, tx_hash = deserializer(raw_tx).read_tx()
unconfirmed = any(txin.prev_hash in self.txs for txin in tx.inputs)
unconfirmed = any(hash_to_str(txin.prev_hash) in self.txs
for txin in tx.inputs)
result.append((hex_hash, tx_fee, unconfirmed))
return result

77
server/session.py

@ -11,6 +11,7 @@ import codecs
import time
from functools import partial
from lib.hash import sha256, hash_to_str
from lib.jsonrpc import JSONSession, RPCError, JSONRPCv2
from server.daemon import DaemonError
import server.version as version
@ -115,6 +116,7 @@ class ElectrumX(SessionBase):
self.max_send = self.env.max_send
self.max_subs = self.env.max_session_subs
self.hashX_subs = {}
self.mempool_statuses = {}
self.electrumx_handlers = {
'blockchain.address.subscribe': self.address_subscribe,
'blockchain.headers.subscribe': self.headers_subscribe,
@ -135,29 +137,42 @@ class ElectrumX(SessionBase):
Cache is a shared cache for this update.
'''
controller = self.controller
pairs = []
changed = []
matches = touched.intersection(self.hashX_subs)
for hashX in matches:
address = self.hashX_subs[hashX]
status = await self.address_status(hashX)
changed.append((address, status))
if height != self.notified_height:
self.notified_height = height
if self.subscribe_headers:
args = (controller.electrum_header(height), )
args = (self.controller.electrum_header(height), )
pairs.append(('blockchain.headers.subscribe', args))
if self.subscribe_height:
pairs.append(('blockchain.numblocks.subscribe', (height, )))
matches = touched.intersection(self.hashX_subs)
for hashX in matches:
address = self.hashX_subs[hashX]
status = await controller.address_status(hashX)
pairs.append(('blockchain.address.subscribe', (address, status)))
self.send_notifications(pairs)
if matches:
es = '' if len(matches) == 1 else 'es'
self.log_info('notified of {:,d} address{}'
.format(len(matches), es))
# Check mempool hashXs - the status is a function of the
# confirmed state of other transactions
for hashX in set(self.mempool_statuses).difference(matches):
old_status = self.mempool_statuses[hashX]
status = await self.address_status(hashX)
if status != old_status:
address = self.hashX_subs[hashX]
changed.append((address, status))
for address_status in changed:
pairs.append(('blockchain.address.subscribe', address_status))
if pairs:
self.send_notifications(pairs)
if changed:
es = '' if len(changed) == 1 else 'es'
self.log_info('notified of {:,d} address{}'
.format(len(changed), es))
def height(self):
'''Return the current flushed database height.'''
@ -191,6 +206,35 @@ class ElectrumX(SessionBase):
'''Return the server peers as a list of (ip, host, details) tuples.'''
return self.controller.peer_mgr.on_peers_subscribe(self.is_tor())
async def address_status(self, hashX):
'''Returns an address status.
Status is a hex string, but must be None if there is no history.
'''
# Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if unconfirmed txins, otherwise 0
history = await self.controller.get_history(hashX)
mempool = await self.controller.mempool_transactions(hashX)
status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height)
for tx_hash, height in history)
status += ''.join('{}:{:d}:'.format(hex_hash, -unconfirmed)
for hex_hash, tx_fee, unconfirmed in mempool)
for hex_hash, tx_fee, unconfirmed in mempool:
self.log_info('UNCONFIRMED: {} {}'
.format(self.hashX_subs[hashX], unconfirmed))
if status:
status = sha256(status.encode()).hex()
else:
status = None
if mempool:
self.mempool_statuses[hashX] = status
else:
self.mempool_statuses.pop(hashX, None)
return status
async def address_subscribe(self, address):
'''Subscribe to an address.
@ -199,10 +243,13 @@ class ElectrumX(SessionBase):
if len(self.hashX_subs) >= self.max_subs:
raise RPCError('your address subscription limit {:,d} reached'
.format(self.max_subs))
# Now let the controller check its limit
hashX, status = await self.controller.new_subscription(address)
self.controller.new_subscription(address)
hashX = self.env.coin.address_to_hashX(address)
self.hashX_subs[hashX] = address
return status
return await self.address_status(hashX)
def server_features(self):
'''Returns a dictionary of server features.'''

Loading…
Cancel
Save