Browse Source

Merge branch 'develop'

master
Neil Booth 8 years ago
parent
commit
ead7bf1bec
  1. 38
      README.rst
  2. 10
      docs/ENV-NOTES
  3. 4
      docs/HOWTO.rst
  4. 11
      docs/RELEASE-NOTES
  5. 19
      lib/coins.py
  6. 101
      server/block_processor.py
  7. 28
      server/daemon.py
  8. 1
      server/db.py
  9. 13
      server/env.py
  10. 2
      server/irc.py
  11. 83
      server/protocol.py
  12. 2
      server/version.py

38
README.rst

@ -46,6 +46,28 @@ that could easily be reused for those alts that are reasonably
compatible with Bitcoin. Such an abstraction is also useful for compatible with Bitcoin. Such an abstraction is also useful for
testnets, of course. testnets, of course.
Features
========
- The full Electrum protocol is implemented with the exception of the
blockchain.address.get_proof RPC call, which is not used in normal
sessions and only sent from the Electrum command line.
- Efficient synchronization from Genesis. Recent hardware should
synchronize in well under 24 hours, possibly much faster for recent
CPUs or if you have an SSD. The fastest time to height 439k (mid
November 2016) reported is under 5 hours. Electrum-server would
probably take around 1 month.
- Subscription limiting both per-connection and across all connections.
- Minimal resource usage once caught up and serving clients; tracking the
transaction mempool seems to take the most memory.
- Each client is served asynchronously to all other clients and tasks,
so busy clients do not reduce responsiveness of other clients'
requests and notifications, or the processing of incoming blocks.
- Daemon failover. More than one daemon can be specified; ElectrumX
will failover round-robin style if the current one fails for any
reason.
- Coin abstraction makes compatible altcoin support easy.
Implementation Implementation
============== ==============
@ -58,7 +80,7 @@ So how does it achieve a much more compact database than Electrum
server, which is forced to prune hisory for busy addresses, and yet server, which is forced to prune hisory for busy addresses, and yet
sync roughly 2 orders of magnitude faster? sync roughly 2 orders of magnitude faster?
I believe all of the following play a part: I believe all of the following play a part::
- aggressive caching and batching of DB writes - aggressive caching and batching of DB writes
- more compact and efficient representation of UTXOs, address index, - more compact and efficient representation of UTXOs, address index,
@ -94,15 +116,15 @@ Roadmap Pre-1.0
- minor code cleanups - minor code cleanups
- at most 1 more DB format change; I will make a weak attempt to - at most 1 more DB format change; I will make a weak attempt to
retain 0.6 release's DB format if possible retain 0.6 release's DB format if possible
- provision of configurable ways to limit client connections so as to - provision of bandwidth limit controls
mitigate intentional or unintentional degradation of server response - implement simple protocol to discover peers without resorting to IRC
time to other clients. Based on IRC discussion this will likely be a
combination of address subscription and bandwidth limits.
Roadmap Post-1.0 Roadmap Post-1.0
================ ================
- Python 3.6, which has several performance improvements relevant to
ElectrumX
- UTXO root logic and implementation - UTXO root logic and implementation
- improve DB abstraction so LMDB is not penalized - improve DB abstraction so LMDB is not penalized
- investigate effects of cache defaults and DB configuration defaults - investigate effects of cache defaults and DB configuration defaults
@ -114,9 +136,9 @@ Database Format
=============== ===============
The database and metadata formats of ElectrumX are likely to change. The database and metadata formats of ElectrumX are likely to change.
Such changes will render old DBs unusable. At least until 1.0 I do Such changes will render old DBs unusable. For now I do not intend to
not intend to provide converters; moreover from-genesis sync time to provide converters as the time taken from genesis to synchronize to a
create a pristine database is quite tolerable. pristine database is quite tolerable.
Miscellany Miscellany

10
docs/ENV-NOTES

@ -4,11 +4,13 @@ DB_DIRECTORY - path to the database directory (if relative, to `run` script)
USERNAME - the username the server will run as if using `run` script USERNAME - the username the server will run as if using `run` script
ELECTRUMX - path to the electrumx_server.py script (if relative, ELECTRUMX - path to the electrumx_server.py script (if relative,
to `run` script) to `run` script)
DAEMON_URL - the URL used to connect to the daemon. Should be of the form DAEMON_URL - A comma-separated list of daemon URLS. If more than one is
provided ElectrumX will failover to the next when one stops
working. The generic form is:
http://username:password@hostname:port/ http://username:password@hostname:port/
Alternatively you can specify DAEMON_USERNAME, DAEMON_PASSWORD, The leading 'http://' is optional, as is the trailing
DAEMON_HOST and DAEMON_PORT. DAEMON_PORT is optional and slash. The ':port' part is also optional and will default
will default appropriately for COIN. to the standard RPC port for COIN if omitted.
The other environment variables are all optional and will adopt The other environment variables are all optional and will adopt
sensible defaults if not specified. sensible defaults if not specified.

4
docs/HOWTO.rst

@ -28,9 +28,9 @@ for someone used to either.
When building the database form the genesis block, ElectrumX has to When building the database form the genesis block, ElectrumX has to
flush large quantities of data to disk and to leveldb. You will have flush large quantities of data to disk and to leveldb. You will have
a much nicer experience if the database directory is on an SSD than on a much nicer experience if the database directory is on an SSD than on
an HDD. Currently to around height 434,000 of the Bitcoin blockchain an HDD. Currently to around height 439,800 of the Bitcoin blockchain
the final size of the leveldb database, and other ElectrumX file the final size of the leveldb database, and other ElectrumX file
metadata comes to just over 17GB. Leveldb needs a bit more for brief metadata comes to just over 18GB. Leveldb needs a bit more for brief
periods, and the block chain is only getting longer, so I would periods, and the block chain is only getting longer, so I would
recommend having at least 30-40GB free space. recommend having at least 30-40GB free space.

11
docs/RELEASE-NOTES

@ -1,3 +1,14 @@
version 0.7
-----------
- daemon failover is now supported; see docs/ENV-NOTES. As a result,
DAEMON_URL must now be supplied and DAEMON_USERNAME, DAEMON_PASSWORD,
DAEMON_HOST and DAEMON_PORT are no longer used.
- fixed a bug introduced in 0.6 series where some client header requests
would fail
- fully asynchronous mempool handling; blocks can be processed and clients
notified whilst the mempool is still being processed
version 0.6.3 version 0.6.3
------------- -------------

19
lib/coins.py

@ -14,6 +14,7 @@ necessary for appropriate handling.
from decimal import Decimal from decimal import Decimal
from functools import partial from functools import partial
import inspect import inspect
import re
import struct import struct
import sys import sys
@ -34,6 +35,7 @@ class Coin(object):
# Not sure if these are coin-specific # Not sure if these are coin-specific
HEADER_LEN = 80 HEADER_LEN = 80
DEFAULT_RPC_PORT = 8332 DEFAULT_RPC_PORT = 8332
RPC_URL_REGEX = re.compile('.+@[^:]+(:[0-9]+)?')
VALUE_PER_COIN = 100000000 VALUE_PER_COIN = 100000000
CHUNK_SIZE=2016 CHUNK_SIZE=2016
STRANGE_VERBYTE = 0xff STRANGE_VERBYTE = 0xff
@ -50,6 +52,23 @@ class Coin(object):
raise CoinError('unknown coin {} and network {} combination' raise CoinError('unknown coin {} and network {} combination'
.format(name, net)) .format(name, net))
@classmethod
def sanitize_url(cls, url):
# Remove surrounding ws and trailing /s
url = url.strip().rstrip('/')
match = cls.RPC_URL_REGEX.match(url)
if not match:
raise CoinError('invalid daemon URL: "{}"'.format(url))
if match.groups()[0] is None:
url += ':{:d}'.format(cls.DEFAULT_RPC_PORT)
if not url.startswith('http://'):
url = 'http://' + url
return url + '/'
@classmethod
def daemon_urls(cls, urls):
return [cls.sanitize_url(url) for url in urls.split(',')]
@cachedproperty @cachedproperty
def hash168_handlers(cls): def hash168_handlers(cls):
return ScriptPubKey.PayToHandlers( return ScriptPubKey.PayToHandlers(

101
server/block_processor.py

@ -43,8 +43,8 @@ class Prefetcher(LoggedClass):
self.semaphore = asyncio.Semaphore() self.semaphore = asyncio.Semaphore()
self.queue = asyncio.Queue() self.queue = asyncio.Queue()
self.queue_size = 0 self.queue_size = 0
self.caught_up = False
self.fetched_height = height self.fetched_height = height
self.mempool_hashes = []
# Target cache size. Has little effect on sync time. # Target cache size. Has little effect on sync time.
self.target_cache_size = 10 * 1024 * 1024 self.target_cache_size = 10 * 1024 * 1024
# First fetch to be 10 blocks # First fetch to be 10 blocks
@ -64,13 +64,14 @@ class Prefetcher(LoggedClass):
self.fetched_height = height self.fetched_height = height
async def get_blocks(self): async def get_blocks(self):
'''Returns a list of prefetched blocks and the mempool.''' '''Blocking function that returns prefetched blocks.
blocks, height, size = await self.queue.get()
The returned result empty just once - when the prefetcher
has caught up with the daemon.
'''
blocks, size = await self.queue.get()
self.queue_size -= size self.queue_size -= size
if height == self.daemon.cached_height(): return blocks
return blocks, self.mempool_hashes
else:
return blocks, None
async def main_loop(self): async def main_loop(self):
'''Loop forever polling for more blocks.''' '''Loop forever polling for more blocks.'''
@ -78,39 +79,19 @@ class Prefetcher(LoggedClass):
.format(await self.daemon.height())) .format(await self.daemon.height()))
while True: while True:
try: try:
if await self._caught_up(): with await self.semaphore:
await asyncio.sleep(5) await self._prefetch()
else: await asyncio.sleep(5 if self.caught_up else 0)
await asyncio.sleep(0)
except DaemonError as e: except DaemonError as e:
self.logger.info('ignoring daemon error: {}'.format(e)) self.logger.info('ignoring daemon error: {}'.format(e))
except asyncio.CancelledError: except asyncio.CancelledError:
break break
async def _caught_up(self):
'''Poll for new blocks and mempool state.
Mempool is only queried if caught up with daemon.'''
with await self.semaphore:
blocks, size = await self._prefetch()
self.fetched_height += len(blocks)
caught_up = self.fetched_height == self.daemon.cached_height()
if caught_up:
self.mempool_hashes = await self.daemon.mempool_hashes()
# Wake up block processor if we have something
if blocks or caught_up:
self.queue.put_nowait((blocks, self.fetched_height, size))
self.queue_size += size
return caught_up
async def _prefetch(self): async def _prefetch(self):
'''Prefetch blocks unless the prefetch queue is full.''' '''Prefetch blocks unless the prefetch queue is full.'''
if self.queue_size >= self.target_cache_size: if self.queue_size >= self.target_cache_size:
return [], 0 return
caught_up = self.daemon.cached_height() == self.fetched_height
daemon_height = await self.daemon.height() daemon_height = await self.daemon.height()
cache_room = self.target_cache_size // self.ave_size cache_room = self.target_cache_size // self.ave_size
@ -119,15 +100,18 @@ class Prefetcher(LoggedClass):
count = min(daemon_height - self.fetched_height, cache_room) count = min(daemon_height - self.fetched_height, cache_room)
count = min(4000, max(count, 0)) count = min(4000, max(count, 0))
if not count: if not count:
return [], 0 # Indicate when we have caught up for the first time only
if not self.caught_up:
self.caught_up = True
self.queue.put_nowait(([], 0))
return
first = self.fetched_height + 1 first = self.fetched_height + 1
hex_hashes = await self.daemon.block_hex_hashes(first, count) hex_hashes = await self.daemon.block_hex_hashes(first, count)
if caught_up: if self.caught_up:
self.logger.info('new block height {:,d} hash {}' self.logger.info('new block height {:,d} hash {}'
.format(first + count - 1, hex_hashes[-1])) .format(first + count - 1, hex_hashes[-1]))
blocks = await self.daemon.raw_blocks(hex_hashes) blocks = await self.daemon.raw_blocks(hex_hashes)
size = sum(len(block) for block in blocks) size = sum(len(block) for block in blocks)
# Update our recent average block size estimate # Update our recent average block size estimate
@ -136,7 +120,9 @@ class Prefetcher(LoggedClass):
else: else:
self.ave_size = (size + (10 - count) * self.ave_size) // 10 self.ave_size = (size + (10 - count) * self.ave_size) // 10
return blocks, size self.fetched_height += len(blocks)
self.queue.put_nowait((blocks, size))
self.queue_size += size
class ChainReorg(Exception): class ChainReorg(Exception):
@ -160,8 +146,9 @@ class BlockProcessor(server.db.DB):
self.tip = self.db_tip self.tip = self.db_tip
self.tx_count = self.db_tx_count self.tx_count = self.db_tx_count
self.daemon = Daemon(env.daemon_url, env.debug) self.daemon = Daemon(self.coin.daemon_urls(env.daemon_url), env.debug)
self.daemon.debug_set_height(self.height) self.daemon.debug_set_height(self.height)
self.caught_up = False
self.touched = set() self.touched = set()
self.futures = [] self.futures = []
@ -223,41 +210,51 @@ class BlockProcessor(server.db.DB):
await asyncio.sleep(0) await asyncio.sleep(0)
async def _wait_for_update(self): async def _wait_for_update(self):
'''Wait for the prefetcher to deliver blocks or a mempool update. '''Wait for the prefetcher to deliver blocks.
Blocks are only processed in the forward direction. The Blocks are only processed in the forward direction.
prefetcher only provides a non-None mempool when caught up.
''' '''
blocks, mempool_hashes = await self.prefetcher.get_blocks() blocks = await self.prefetcher.get_blocks()
if not blocks:
await self.first_caught_up()
return
'''Strip the unspendable genesis coinbase.''' '''Strip the unspendable genesis coinbase.'''
if self.height == -1: if self.height == -1:
blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1) blocks[0] = blocks[0][:self.coin.HEADER_LEN] + bytes(1)
caught_up = mempool_hashes is not None
try: try:
for block in blocks: for block in blocks:
self.advance_block(block, caught_up) self.advance_block(block, self.caught_up)
if not caught_up and time.time() > self.next_cache_check:
self.check_cache_size()
self.next_cache_check = time.time() + 60
await asyncio.sleep(0) # Yield await asyncio.sleep(0) # Yield
if caught_up:
await self.caught_up(mempool_hashes)
self.touched = set()
except ChainReorg: except ChainReorg:
await self.handle_chain_reorg() await self.handle_chain_reorg()
async def caught_up(self, mempool_hashes): if self.caught_up:
# Flush everything as queries are performed on the DB and
# not in-memory.
self.flush(True)
self.notify(self.touched)
elif time.time() > self.next_cache_check:
self.check_cache_size()
self.next_cache_check = time.time() + 60
self.touched = set()
async def first_caught_up(self):
'''Called after each deamon poll if caught up.''' '''Called after each deamon poll if caught up.'''
# Caught up to daemon height. Flush everything as queries self.caught_up = True
# are performed on the DB and not in-memory.
if self.first_sync: if self.first_sync:
self.first_sync = False self.first_sync = False
self.logger.info('{} synced to height {:,d}. DB version:' self.logger.info('{} synced to height {:,d}. DB version:'
.format(VERSION, self.height, self.db_version)) .format(VERSION, self.height, self.db_version))
self.flush(True) self.flush(True)
def notify(self, touched):
'''Called with list of touched addresses by new blocks.
Only called for blocks found after first_caught_up is called.
Intended to be overridden in derived classes.'''
async def handle_chain_reorg(self): async def handle_chain_reorg(self):
# First get all state on disk # First get all state on disk
self.logger.info('chain reorg detected') self.logger.info('chain reorg detected')
@ -828,7 +825,7 @@ class BlockProcessor(server.db.DB):
def read_headers(self, start, count): def read_headers(self, start, count):
# Read some from disk # Read some from disk
disk_count = min(count, self.fs_height + 1 - start) disk_count = min(count, max(0, self.fs_height + 1 - start))
result = self.fs_read_headers(start, disk_count) result = self.fs_read_headers(start, disk_count)
count -= disk_count count -= disk_count
start += disk_count start += disk_count

28
server/daemon.py

@ -27,11 +27,15 @@ class Daemon(util.LoggedClass):
class DaemonWarmingUpError(Exception): class DaemonWarmingUpError(Exception):
'''Raised when the daemon returns an error in its results.''' '''Raised when the daemon returns an error in its results.'''
def __init__(self, url, debug): def __init__(self, urls, debug):
super().__init__() super().__init__()
self.url = url if not urls:
raise DaemonError('no daemon URLs provided')
for url in urls:
self.logger.info('daemon at {}'.format(self.logged_url(url)))
self.urls = urls
self.url_index = 0
self._height = None self._height = None
self.logger.info('connecting at URL {}'.format(url))
self.debug_caught_up = 'caught_up' in debug self.debug_caught_up = 'caught_up' in debug
# Limit concurrent RPC calls to this number. # Limit concurrent RPC calls to this number.
# See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16 # See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16
@ -64,10 +68,12 @@ class Daemon(util.LoggedClass):
data = json.dumps(payload) data = json.dumps(payload)
secs = 1 secs = 1
max_secs = 16
while True: while True:
try: try:
async with self.workqueue_semaphore: async with self.workqueue_semaphore:
async with aiohttp.post(self.url, data=data) as resp: url = self.urls[self.url_index]
async with aiohttp.post(url, data=data) as resp:
result = processor(await resp.json()) result = processor(await resp.json())
if self.prior_msg: if self.prior_msg:
self.logger.info('connection restored') self.logger.info('connection restored')
@ -86,8 +92,18 @@ class Daemon(util.LoggedClass):
raise raise
except Exception as e: except Exception as e:
log_error('request gave unexpected error: {}.'.format(e)) log_error('request gave unexpected error: {}.'.format(e))
await asyncio.sleep(secs) if secs >= max_secs and len(self.urls) > 1:
secs = min(16, secs * 2) self.url_index = (self.url_index + 1) % len(self.urls)
logged_url = self.logged_url(self.urls[self.url_index])
self.logger.info('failing over to {}'.format(logged_url))
secs = 1
else:
await asyncio.sleep(secs)
secs = min(16, secs * 2)
def logged_url(self, url):
'''The host and port part, for logging.'''
return url[url.rindex('@') + 1:]
async def _send_single(self, method, params=None): async def _send_single(self, method, params=None):
'''Send a single request to the daemon.''' '''Send a single request to the daemon.'''

1
server/db.py

@ -143,6 +143,7 @@ class DB(LoggedClass):
raise raise
def fs_read_headers(self, start, count): def fs_read_headers(self, start, count):
'''Requires count >= 0.'''
# Read some from disk # Read some from disk
disk_count = min(count, self.db_height + 1 - start) disk_count = min(count, self.db_height + 1 - start)
if start < 0 or count < 0 or disk_count != count: if start < 0 or count < 0 or disk_count != count:

13
server/env.py

@ -30,7 +30,7 @@ class Env(LoggedClass):
self.hist_MB = self.integer('HIST_MB', 300) self.hist_MB = self.integer('HIST_MB', 300)
self.host = self.default('HOST', 'localhost') self.host = self.default('HOST', 'localhost')
self.reorg_limit = self.integer('REORG_LIMIT', self.coin.REORG_LIMIT) self.reorg_limit = self.integer('REORG_LIMIT', self.coin.REORG_LIMIT)
self.daemon_url = self.build_daemon_url() self.daemon_url = self.required('DAEMON_URL')
# Server stuff # Server stuff
self.tcp_port = self.integer('TCP_PORT', None) self.tcp_port = self.integer('TCP_PORT', None)
self.ssl_port = self.integer('SSL_PORT', None) self.ssl_port = self.integer('SSL_PORT', None)
@ -74,14 +74,3 @@ class Env(LoggedClass):
except: except:
raise self.Error('cannot convert envvar {} value {} to an integer' raise self.Error('cannot convert envvar {} value {} to an integer'
.format(envvar, value)) .format(envvar, value))
def build_daemon_url(self):
daemon_url = environ.get('DAEMON_URL')
if not daemon_url:
username = self.required('DAEMON_USERNAME')
password = self.required('DAEMON_PASSWORD')
host = self.required('DAEMON_HOST')
port = self.default('DAEMON_PORT', self.coin.DEFAULT_RPC_PORT)
daemon_url = ('http://{}:{}@{}:{}/'
.format(username, password, host, port))
return daemon_url

2
server/irc.py

@ -55,6 +55,8 @@ class IRC(LoggedClass):
await self.join() await self.join()
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
except Exception as e:
self.logger.error(str(e))
async def join(self): async def join(self):
import irc.client as irc_client import irc.client as irc_client

83
server/protocol.py

@ -38,16 +38,17 @@ class BlockServer(BlockProcessor):
super().__init__(env) super().__init__(env)
self.server_mgr = ServerManager(self, env) self.server_mgr = ServerManager(self, env)
self.mempool = MemPool(self) self.mempool = MemPool(self)
self.caught_up_yet = False
async def caught_up(self, mempool_hashes): async def first_caught_up(self):
# Call the base class to flush before doing anything else. # Call the base class to flush and log first
await super().caught_up(mempool_hashes) await super().first_caught_up()
if not self.caught_up_yet: await self.server_mgr.start_servers()
await self.server_mgr.start_servers() self.futures.append(self.mempool.start())
self.caught_up_yet = True
self.touched.update(await self.mempool.update(mempool_hashes)) def notify(self, touched):
self.server_mgr.notify(self.height, self.touched) '''Called when addresses are touched by new blocks or mempool
updates.'''
self.server_mgr.notify(self.height, touched)
def on_cancel(self): def on_cancel(self):
'''Called when the main loop is cancelled.''' '''Called when the main loop is cancelled.'''
@ -97,13 +98,29 @@ class MemPool(LoggedClass):
self.bp = bp self.bp = bp
self.count = -1 self.count = -1
async def update(self, hex_hashes): def start(self):
'''Starts the mempool synchronization mainloop. Return a future.'''
return asyncio.ensure_future(self.main_loop())
async def main_loop(self):
'''Asynchronously maintain mempool status with daemon.'''
self.logger.info('maintaining state with daemon...')
while True:
try:
await self.update()
await asyncio.sleep(5)
except DaemonError as e:
self.logger.info('ignoring daemon error: {}'.format(e))
except asyncio.CancelledError:
break
async def update(self):
'''Update state given the current mempool to the passed set of hashes. '''Update state given the current mempool to the passed set of hashes.
Remove transactions that are no longer in our mempool. Remove transactions that are no longer in our mempool.
Request new transactions we don't have then add to our mempool. Request new transactions we don't have then add to our mempool.
''' '''
hex_hashes = set(hex_hashes) hex_hashes = set(await self.bp.daemon.mempool_hashes())
touched = set() touched = set()
missing_utxos = [] missing_utxos = []
@ -210,8 +227,7 @@ class MemPool(LoggedClass):
self.logger.info('{:,d} txs touching {:,d} addresses' self.logger.info('{:,d} txs touching {:,d} addresses'
.format(len(self.txs), len(self.hash168s))) .format(len(self.txs), len(self.hash168s)))
# Might include a None self.bp.notify(touched)
return touched
def transactions(self, hash168): def transactions(self, hash168):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
@ -295,13 +311,13 @@ class ServerManager(LoggedClass):
await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc) await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc)
if env.irc: if env.irc:
self.logger.info('starting IRC coroutine')
self.irc_future = asyncio.ensure_future(self.irc.start()) self.irc_future = asyncio.ensure_future(self.irc.start())
else: else:
self.logger.info('IRC disabled') self.logger.info('IRC disabled')
def notify(self, height, touched): def notify(self, height, touched):
'''Notify sessions about height changes and touched addresses.''' '''Notify sessions about height changes and touched addresses.'''
self.logger.info('{:,d} addresses touched'.format(len(touched)))
cache = {} cache = {}
for session in self.sessions: for session in self.sessions:
if isinstance(session, ElectrumX): if isinstance(session, ElectrumX):
@ -310,39 +326,45 @@ class ServerManager(LoggedClass):
def stop(self): def stop(self):
'''Close listening servers.''' '''Close listening servers.'''
self.logger.info('cleanly closing client sessions, please wait...')
for server in self.servers: for server in self.servers:
server.close() server.close()
if self.irc_future: if self.irc_future:
self.irc_future.cancel() self.irc_future.cancel()
if self.sessions:
self.logger.info('cleanly closing client sessions, please wait...')
for session in self.sessions: for session in self.sessions:
session.transport.close() self.close_session(session)
async def wait_shutdown(self): async def wait_shutdown(self):
# Wait for servers to close # Wait for servers to close
for server in self.servers: for server in self.servers:
await server.wait_closed() await server.wait_closed()
# Just in case a connection came in
await asyncio.sleep(0)
self.servers = [] self.servers = []
self.logger.info('server listening sockets closed')
limit = time.time() + 10 secs = 60
self.logger.info('server listening sockets closed, waiting '
'{:d} seconds for socket cleanup'.format(secs))
limit = time.time() + secs
while self.sessions and time.time() < limit: while self.sessions and time.time() < limit:
await asyncio.sleep(4)
self.logger.info('{:,d} sessions remaining' self.logger.info('{:,d} sessions remaining'
.format(len(self.sessions))) .format(len(self.sessions)))
await asyncio.sleep(2)
if self.sessions:
self.logger.info('forcibly closing {:,d} stragglers'
.format(len(self.sessions)))
for future in self.sessions.values():
future.cancel()
await asyncio.sleep(0)
def add_session(self, session): def add_session(self, session):
assert self.servers
assert session not in self.sessions
coro = session.serve_requests() coro = session.serve_requests()
self.sessions[session] = asyncio.ensure_future(coro) future = asyncio.ensure_future(coro)
self.sessions[session] = future
self.logger.info('connection from {}, {:,d} total'
.format(session.peername(), len(self.sessions)))
# Some connections are acknowledged after the servers are closed
if not self.servers:
self.close_session(session)
def close_session(self, session):
'''Close the session's transport and cancel its future.'''
session.transport.close()
self.sessions[session].cancel()
def remove_session(self, session): def remove_session(self, session):
self.subscription_count -= session.sub_count() self.subscription_count -= session.sub_count()
@ -418,7 +440,6 @@ class Session(JSONRPC):
def connection_made(self, transport): def connection_made(self, transport):
'''Handle an incoming client connection.''' '''Handle an incoming client connection.'''
super().connection_made(transport) super().connection_made(transport)
self.logger.info('connection from {}'.format(self.peername()))
self.manager.add_session(self) self.manager.add_session(self)
def connection_lost(self, exc): def connection_lost(self, exc):

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.6.3" VERSION = "ElectrumX 0.7"

Loading…
Cancel
Save