Browse Source

Merge branch 'develop'

master
Neil Booth 8 years ago
parent
commit
1c0151a69f
  1. 8
      README.rst
  2. 2
      docs/ENVIRONMENT.rst
  3. 2
      lib/script.py
  4. 2
      query.py
  5. 18
      server/block_processor.py
  6. 126
      server/controller.py
  7. 2
      server/env.py
  8. 9
      server/mempool.py
  9. 89
      server/peers.py
  10. 36
      server/session.py
  11. 2
      server/version.py

8
README.rst

@ -135,6 +135,14 @@ version prior to the release of 1.0.
ChangeLog
=========
Version 0.10.13
---------------
* worked around asyncio issue to suppress the annoying log spew on shutdown
that makes it look like a bomb hit
* implement peer subscriptions as real subscriptions with incremental updates
* misc cleanups
Version 0.10.12
---------------

2
docs/ENVIRONMENT.rst

@ -89,7 +89,7 @@ These environment variables are optional:
* **SSL_PORT**
If set ElectrumX will serve SSL clients on **HOST**:**SSL_PORT**.
If set SSL_CERTFILE and SSL_KEYFILE must be defined and be
If set then SSL_CERTFILE and SSL_KEYFILE must be defined and be
filesystem paths to those SSL files.
* **RPC_PORT**

2
lib/script.py

@ -209,7 +209,7 @@ class Script(object):
n += dlen
ops.append(op)
except:
except Exception:
# Truncated script; e.g. tx_hash
# ebc9fa1196a59e192352d76c0f6e73167046b9d37b8302b6bb6968dfd279b767
raise ScriptError('truncated script')

2
query.py

@ -50,7 +50,7 @@ def main():
try:
limit = int(sys.argv[argc])
argc += 1
except:
except Exception:
limit = 10
for addr in sys.argv[argc:]:
print('Address: ', addr)

18
server/block_processor.py

@ -138,9 +138,10 @@ class BlockProcessor(server.db.DB):
Coordinate backing up in case of chain reorganisations.
'''
def __init__(self, env, daemon):
def __init__(self, env, controller, daemon):
super().__init__(env)
self.daemon = daemon
self.controller = controller
# These are our state as we move ahead of DB state
self.fs_height = self.db_height
@ -190,6 +191,7 @@ class BlockProcessor(server.db.DB):
async def main_loop(self):
'''Main loop for block processing.'''
self.controller.ensure_future(self.prefetcher.main_loop())
await self.prefetcher.reset_height()
while True:
@ -205,16 +207,11 @@ class BlockProcessor(server.db.DB):
self.logger.info('flushing state to DB for a clean shutdown...')
self.flush(True)
async def executor(self, func, *args, **kwargs):
'''Run func taking args in the executor.'''
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, partial(func, *args, **kwargs))
async def first_caught_up(self):
'''Called when first caught up to daemon after starting.'''
# Flush everything with updated first_sync->False state.
self.first_sync = False
await self.executor(self.flush, True)
await self.controller.run_in_executor(self.flush, True)
if self.utxo_db.for_sync:
self.logger.info('{} synced to height {:,d}'
.format(VERSION, self.height))
@ -240,7 +237,8 @@ class BlockProcessor(server.db.DB):
if hprevs == chain:
start = time.time()
await self.executor(self.advance_blocks, blocks, headers)
await self.controller.run_in_executor(self.advance_blocks,
blocks, headers)
if not self.first_sync:
s = '' if len(blocks) == 1 else 's'
self.logger.info('processed {:,d} block{} in {:.1f}s'
@ -277,14 +275,14 @@ class BlockProcessor(server.db.DB):
self.logger.info('chain reorg detected')
else:
self.logger.info('faking a reorg of {:,d} blocks'.format(count))
await self.executor(self.flush, True)
await self.controller.run_in_executor(self.flush, True)
hashes = await self.reorg_hashes(count)
# Reverse and convert to hex strings.
hashes = [hash_to_str(hash) for hash in reversed(hashes)]
for hex_hashes in chunks(hashes, 50):
blocks = await self.daemon.raw_blocks(hex_hashes)
await self.executor(self.backup_blocks, blocks)
await self.controller.run_in_executor(self.backup_blocks, blocks)
await self.prefetcher.reset_height()
async def reorg_hashes(self, count):

126
server/controller.py

@ -11,6 +11,8 @@ import json
import os
import ssl
import time
import traceback
import warnings
from bisect import bisect_left
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
@ -49,9 +51,9 @@ class Controller(util.LoggedClass):
self.start_time = time.time()
self.coin = env.coin
self.daemon = Daemon(env.coin.daemon_urls(env.daemon_url))
self.bp = BlockProcessor(env, self.daemon)
self.mempool = MemPool(self.bp)
self.peers = PeerManager(env)
self.bp = BlockProcessor(env, self, self.daemon)
self.mempool = MemPool(self.bp, self)
self.peers = PeerManager(env, self)
self.env = env
self.servers = {}
# Map of session to the key of its list in self.groups
@ -63,6 +65,7 @@ class Controller(util.LoggedClass):
self.max_sessions = env.max_sessions
self.low_watermark = self.max_sessions * 19 // 20
self.max_subs = env.max_subs
self.futures = set()
# Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0
self.next_stale_check = 0
@ -88,12 +91,10 @@ class Controller(util.LoggedClass):
('server',
'banner donation_address'),
]
handlers = {'.'.join([prefix, suffix]):
getattr(self, suffix.replace('.', '_'))
for prefix, suffixes in rpcs
for suffix in suffixes.split()}
handlers['server.peers.subscribe'] = self.peers.subscribe
self.electrumx_handlers = handlers
self.electrumx_handlers = {'.'.join([prefix, suffix]):
getattr(self, suffix.replace('.', '_'))
for prefix, suffixes in rpcs
for suffix in suffixes.split()}
async def mempool_transactions(self, hashX):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
@ -199,43 +200,64 @@ class Controller(util.LoggedClass):
if session.items:
self.enqueue_session(session)
def initiate_shutdown(self):
'''Call this function to start the shutdown process.'''
self.shutdown_event.set()
async def run_in_executor(self, func, *args):
'''Wait whilst running func in the executor.'''
return await self.loop.run_in_executor(None, func, *args)
def schedule_executor(self, func, *args):
'''Schedule running func in the executor, return a task.'''
return self.ensure_future(self.run_in_executor(func, *args))
def ensure_future(self, coro):
'''Schedule the coro to be run.'''
future = asyncio.ensure_future(coro)
future.add_done_callback(self.on_future_done)
self.futures.add(future)
return future
def on_future_done(self, future):
'''Collect the result of a future after removing it from our set.'''
self.futures.remove(future)
try:
future.result()
except asyncio.CancelledError:
pass
except Exception:
self.log_error(traceback.format_exc())
async def wait_for_bp_catchup(self):
'''Called when the block processor catches up.'''
await self.bp.caught_up_event.wait()
self.logger.info('block processor has caught up')
self.ensure_future(self.peers.main_loop())
self.ensure_future(self.start_servers())
self.ensure_future(self.mempool.main_loop())
self.ensure_future(self.enqueue_delayed_sessions())
self.ensure_future(self.notify())
for n in range(4):
self.ensure_future(self.serve_requests())
async def main_loop(self):
'''Controller main loop.'''
def add_future(coro):
futures.append(asyncio.ensure_future(coro))
async def await_bp_catchup():
'''Wait for the block processor to catch up.
Then start the servers and the peer manager.
'''
await self.bp.caught_up_event.wait()
self.logger.info('block processor has caught up')
add_future(self.peers.main_loop())
add_future(self.start_servers())
add_future(self.mempool.main_loop())
add_future(self.enqueue_delayed_sessions())
add_future(self.notify())
for n in range(4):
add_future(self.serve_requests())
futures = []
add_future(self.bp.main_loop())
add_future(self.bp.prefetcher.main_loop())
add_future(await_bp_catchup())
# Perform a clean shutdown when this event is signalled.
await self.shutdown_event.wait()
self.ensure_future(self.bp.main_loop())
self.ensure_future(self.wait_for_bp_catchup())
# Shut down cleanly after waiting for shutdown to be signalled
await self.shutdown_event.wait()
self.logger.info('shutting down')
await self.shutdown(futures)
await self.shutdown()
# Avoid log spew on shutdown for partially opened SSL sockets
try:
del asyncio.sslproto._SSLProtocolTransport.__del__
except Exception:
pass
self.logger.info('shutdown complete')
async def shutdown(self, futures):
def initiate_shutdown(self):
'''Call this function to start the shutdown process.'''
self.shutdown_event.set()
async def shutdown(self):
'''Perform the shutdown sequence.'''
self.state = self.SHUTTING_DOWN
@ -244,13 +266,13 @@ class Controller(util.LoggedClass):
for session in self.sessions:
self.close_session(session)
# Cancel the futures
for future in futures:
# Cancel pending futures
for future in self.futures:
future.cancel()
# Wait for all futures to finish
while any(not future.done() for future in futures):
await asyncio.sleep(1)
while not all (future.done() for future in self.futures):
await asyncio.sleep(0.1)
# Finally shut down the block processor and executor
self.bp.shutdown(self.executor)
@ -334,6 +356,11 @@ class Controller(util.LoggedClass):
for session in sessions:
await session.notify(self.bp.db_height, touched)
def notify_peers(self, updates):
'''Notify of peer updates.'''
for session in self.sessions:
session.notify_peers(updates)
def electrum_header(self, height):
'''Return the binary header at the given height.'''
if not 0 <= height <= self.bp.db_height:
@ -525,7 +552,7 @@ class Controller(util.LoggedClass):
def lookup_session(self, session_id):
try:
session_id = int(session_id)
except:
except Exception:
pass
else:
for session in self.sessions:
@ -581,7 +608,7 @@ class Controller(util.LoggedClass):
def rpc_peers(self):
'''Return a list of server peers, currently taken from IRC.'''
return self.peers.peer_list()
return self.peers.peer_dict()
def rpc_reorg(self, count=3):
'''Force a reorg of the given number of blocks.
@ -599,7 +626,7 @@ class Controller(util.LoggedClass):
if isinstance(address, str):
try:
return self.coin.address_to_hashX(address)
except:
except Exception:
pass
raise RPCError('{} is not a valid address'.format(address))
@ -694,8 +721,7 @@ class Controller(util.LoggedClass):
limit = self.env.max_send // 97
return list(self.bp.get_history(hashX, limit=limit))
loop = asyncio.get_event_loop()
history = await loop.run_in_executor(None, job)
history = await self.run_in_executor(job)
self.history_cache[hashX] = history
return history
@ -725,8 +751,8 @@ class Controller(util.LoggedClass):
'''Get UTXOs asynchronously to reduce latency.'''
def job():
return list(self.bp.get_utxos(hashX, limit=None))
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, job)
return await self.run_in_executor(job)
def get_chunk(self, index):
'''Return header chunk as hex. Index is a non-negative integer.'''

2
server/env.py

@ -83,7 +83,7 @@ class Env(LoggedClass):
return default
try:
return int(value)
except:
except Exception:
raise self.Error('cannot convert envvar {} value {} to an integer'
.format(envvar, value))

9
server/mempool.py

@ -31,9 +31,10 @@ class MemPool(util.LoggedClass):
A pair is a (hashX, value) tuple. tx hashes are hex strings.
'''
def __init__(self, bp):
def __init__(self, bp, controller):
super().__init__()
self.daemon = bp.daemon
self.controller = controller
self.coin = bp.coin
self.db = bp
self.touched = bp.touched
@ -139,7 +140,6 @@ class MemPool(util.LoggedClass):
break
def async_process_some(self, unfetched, limit):
loop = asyncio.get_event_loop()
pending = []
txs = self.txs
@ -162,9 +162,8 @@ class MemPool(util.LoggedClass):
deferred = pending
pending = []
def job():
return self.process_raw_txs(raw_txs, deferred)
result, deferred = await loop.run_in_executor(None, job)
result, deferred = await self.controller.run_in_executor \
(self.process_raw_txs, raw_txs, deferred)
pending.extend(deferred)
hashXs = self.hashXs

89
server/peers.py

@ -8,10 +8,9 @@
'''Peer management.'''
import asyncio
import itertools
import socket
import traceback
from collections import namedtuple
from functools import partial
import lib.util as util
from server.irc import IRC
@ -30,30 +29,32 @@ class PeerManager(util.LoggedClass):
VERSION = '1.0'
DEFAULT_PORTS = {'t': 50001, 's': 50002}
def __init__(self, env):
def __init__(self, env, controller):
super().__init__()
self.env = env
self.loop = asyncio.get_event_loop()
self.controller = controller
self.irc = IRC(env, self)
self.futures = set()
self.identities = []
self.pruning = None
self._identities = []
# Keyed by nick
self.irc_peers = {}
self.updated_nicks = set()
# We can have a Tor identity inaddition to a normal one
self.identities.append(NetIdentity(env.report_host,
env.report_tcp_port,
env.report_ssl_port,
''))
self._identities.append(self.identity(env.report_host,
env.report_tcp_port,
env.report_ssl_port,
''))
if env.report_host_tor.endswith('.onion'):
self.identities.append(NetIdentity(env.report_host_tor,
env.report_tcp_port_tor,
env.report_ssl_port_tor,
'_tor'))
self._identities.append(self.identity(env.report_host_tor,
env.report_tcp_port_tor,
env.report_ssl_port_tor,
'_tor'))
async def executor(self, func, *args, **kwargs):
'''Run func taking args in the executor.'''
await self.loop.run_in_executor(None, partial(func, *args, **kwargs))
@classmethod
def identity(self, host, tcp_port, ssl_port, suffix):
'''Returns a NetIdentity object. Unpublished ports are None.'''
return NetIdentity(host, tcp_port or None, ssl_port or None, suffix)
@classmethod
def real_name(cls, identity):
@ -70,38 +71,29 @@ class PeerManager(util.LoggedClass):
ssl = port_text('s', identity.ssl_port)
return '{} v{}{}{}'.format(identity.host, cls.VERSION, tcp, ssl)
def ensure_future(self, coro):
'''Convert a coro into a future and add it to our pending list
to be waited for.'''
self.futures.add(asyncio.ensure_future(coro))
def identities(self):
'''Return a list of network identities of this server.'''
return self._identities
async def refresh_peer_subs(self):
for n in itertools.count():
await asyncio.sleep(60)
updates = [self.irc_peers[nick] for nick in self.updated_nicks
if nick in self.irc_peers]
if updates:
self.controller.notify_peers(updates)
self.updated_nicks.clear()
def start_irc(self):
'''Start up the IRC connections if enabled.'''
async def main_loop(self):
'''Not a loop for now...'''
self.controller.ensure_future(self.refresh_peer_subs())
if self.env.irc:
name_pairs = [(self.real_name(identity), identity.nick_suffix)
for identity in self.identities]
self.ensure_future(self.irc.start(name_pairs))
for identity in self._identities]
self.controller.ensure_future(self.irc.start(name_pairs))
else:
self.logger.info('IRC is disabled')
async def main_loop(self):
'''Start and then enter the main loop.'''
self.start_irc()
try:
while True:
await asyncio.sleep(10)
done = [future for future in self.futures if future.done()]
self.futures.difference_update(done)
for future in done:
try:
future.result()
except:
self.log_error(traceback.format_exc())
finally:
for future in self.futures:
future.cancel()
def dns_lookup_peer(self, nick, hostname, details):
try:
ip_addr = None
@ -110,6 +102,7 @@ class PeerManager(util.LoggedClass):
except socket.error:
pass # IPv6?
ip_addr = ip_addr or hostname
self.updated_nicks.add(nick)
self.irc_peers[nick] = IRCPeer(ip_addr, hostname, details)
self.logger.info('new IRC peer {} at {} ({})'
.format(nick, hostname, details))
@ -119,7 +112,7 @@ class PeerManager(util.LoggedClass):
def add_irc_peer(self, *args):
'''Schedule DNS lookup of peer.'''
self.ensure_future(self.executor(self.dns_lookup_peer, *args))
self.controller.schedule_executor(self.dns_lookup_peer, *args)
def remove_irc_peer(self, nick):
'''Remove a peer from our IRC peers map.'''
@ -129,11 +122,9 @@ class PeerManager(util.LoggedClass):
def count(self):
return len(self.irc_peers)
def peer_list(self):
def peer_dict(self):
return self.irc_peers
def subscribe(self):
'''Returns the server peers as a list of (ip, host, details) tuples.
Despite the name this is not currently treated as a subscription.'''
def peer_list(self):
'''Returns the server peers as a list of (ip, host, details) tuples.'''
return list(self.irc_peers.values())

36
server/session.py

@ -7,8 +7,6 @@
'''Classes for local RPC server and remote client TCP/SSL servers.'''
import asyncio
import time
import traceback
from functools import partial
@ -106,6 +104,7 @@ class ElectrumX(SessionBase):
super().__init__(*args, **kwargs)
self.subscribe_headers = False
self.subscribe_height = False
self.subscribe_peers = False
self.notified_height = None
self.max_send = self.env.max_send
self.max_subs = self.env.max_session_subs
@ -115,6 +114,8 @@ class ElectrumX(SessionBase):
'blockchain.headers.subscribe': self.headers_subscribe,
'blockchain.numblocks.subscribe': self.numblocks_subscribe,
'blockchain.transaction.broadcast': self.transaction_broadcast,
'server.features': self.server_features,
'server.peers.subscribe': self.peers_subscribe,
'server.version': self.server_version,
}
@ -168,6 +169,23 @@ class ElectrumX(SessionBase):
self.subscribe_height = True
return self.height()
def peers_subscribe(self, incremental=False):
'''Returns the server peers as a list of (ip, host, details) tuples.
If incremental is False there is no subscription. If True the
remote side will receive notifications of new or modified
peers (peers that disappeared are not notified).
'''
self.subscribe_peers = incremental
return self.controller.peers.peer_list()
def notify_peers(self, updates):
'''Notify of peer updates. Updates are sent as a list in the same
format as the subscription reply, as the first parameter.
'''
if self.subscribe_peers:
self.send_notification('server.peers.subscribe', [updates])
async def address_subscribe(self, address):
'''Subscribe to an address.
@ -181,6 +199,20 @@ class ElectrumX(SessionBase):
self.hashX_subs[hashX] = address
return status
def server_features(self):
'''Returns a dictionary of server features.'''
peers = self.controller.peers
hosts = {identity.host: {
'tcp_port': identity.tcp_port,
'ssl_port': identity.ssl_port,
'pruning': peers.pruning,
'version': peers.VERSION,
} for identity in self.controller.peers.identities()}
return {
'hosts': hosts,
}
def server_version(self, client_name=None, protocol_version=None):
'''Returns the server version as a string.

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.10.12"
VERSION = "ElectrumX 0.10.13"

Loading…
Cancel
Save