diff --git a/HOWTO.rst b/HOWTO.rst index 63c67cc..fc0997c 100644 --- a/HOWTO.rst +++ b/HOWTO.rst @@ -53,8 +53,8 @@ Check out the code from Github:: git clone https://github.com/kyuupichan/electrumx.git cd electrumx -I have not yet created a setup.py, so for now I suggest you run -the code from the source tree or a copy of it. +You can install with setup.py, or run the code from the source tree or +a copy of it. You should create a standard user account to run the server under; your own is probably adequate unless paranoid. The paranoid might @@ -275,5 +275,4 @@ After flush-to-disk you may see an aiohttp error; this is the daemon timing out the connection while the disk flush was in progress. This is harmless. -The ETA is just a guide and can be quite volatile, particularly around -flushes. It is too optimistic initially. +The ETA is just a guide and can be quite volatile around flushes. diff --git a/RELEASE-NOTES b/RELEASE-NOTES index f4af83c..4e763a5 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,3 +1,13 @@ +Version 0.1 +------------ + +- added setup.py, experimental. Because of this server_main.py renamed + electrumx_server.py, and SERVER_MAIN environment variable was renamed + to ELECTRUMX. The sample run script was updated to match. +- improvements to logging of daemon connection issues +- removal of old reorg test code +- hopefully more accurate sync ETA + Version 0.07 ------------ diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 2ce4dee..03e25c1 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -43,14 +43,16 @@ def main(): parser = argparse.ArgumentParser('Send electrumx an RPC command' ) parser.add_argument('-p', '--port', metavar='port_num', type=int, help='RPC port number') - parser.add_argument('command', nargs='*', default=[], + parser.add_argument('command', nargs=1, default=[], help='command to send') + parser.add_argument('param', nargs='*', default=[], + help='params to send') args = parser.parse_args() if args.port is None: args.port = int(environ.get('ELECTRUMX_RPC_PORT', 8000)) - payload = {'method': args.command[0], 'params': args.command[1:]} + payload = {'method': args.command[0], 'params': args.param} loop = asyncio.get_event_loop() proto_factory = partial(RPCClient, loop) @@ -60,7 +62,7 @@ def main(): protocol.send(payload) loop.run_forever() except OSError: - print('error connecting - is ElectrumX running?') + print('error connecting - is ElectrumX catching up or not running?') finally: loop.close() diff --git a/server_main.py b/electrumx_server.py similarity index 100% rename from server_main.py rename to electrumx_server.py diff --git a/lib/__init__.py b/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/samples/scripts/NOTES b/samples/scripts/NOTES index e56c4ec..4b86bde 100644 --- a/samples/scripts/NOTES +++ b/samples/scripts/NOTES @@ -2,7 +2,8 @@ The following environment variables are required: DB_DIRECTORY - path to the database directory (if relative, to `run` script) USERNAME - the username the server will run as if using `run` script -SERVER_MAIN - path to the server_main.py script (if relative, to `run` script) +ELECTRUMX - path to the electrumx_server.py script (if relative, + to `run` script) DAEMON_URL - the URL used to connect to the daemon. Should be of the form http://username:password@hostname:port/ Alternatively you can specify DAEMON_USERNAME, DAEMON_PASSWORD, diff --git a/samples/scripts/env/ELECTRUMX b/samples/scripts/env/ELECTRUMX new file mode 100644 index 0000000..3de0637 --- /dev/null +++ b/samples/scripts/env/ELECTRUMX @@ -0,0 +1 @@ +/path/to/electrumx_server.py diff --git a/samples/scripts/env/SERVER_MAIN b/samples/scripts/env/SERVER_MAIN deleted file mode 100644 index 6aee931..0000000 --- a/samples/scripts/env/SERVER_MAIN +++ /dev/null @@ -1 +0,0 @@ -/path/to/repos/electrumx/server_main.py diff --git a/samples/systemd-unit b/samples/systemd-unit index 94b7d47..46107f3 100644 --- a/samples/systemd-unit +++ b/samples/systemd-unit @@ -4,7 +4,7 @@ After=network.target [Service] EnvironmentFile=/etc/electrumx.conf -ExecStart=/home/electrumx/electrumx/server_main.py +ExecStart=/usr/local/bin/electrumx_server.py User=electrumx [Install] diff --git a/server/block_processor.py b/server/block_processor.py index 6a64d15..4ea5d55 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -81,7 +81,7 @@ class Prefetcher(LoggedClass): async def start(self): '''Loop forever polling for more blocks.''' - self.logger.info('starting prefetch loop...') + self.logger.info('starting daemon poll loop...') while True: try: if await self._caught_up(): @@ -89,7 +89,7 @@ class Prefetcher(LoggedClass): else: await asyncio.sleep(0) except DaemonError as e: - self.logger.info('ignoring daemon errors: {}'.format(e)) + self.logger.info('ignoring daemon error: {}'.format(e)) async def _caught_up(self): '''Poll for new blocks and mempool state. @@ -167,6 +167,7 @@ class MemPool(LoggedClass): self.txs = {} self.hash168s = defaultdict(set) # None can be a key self.bp = bp + self.count = 0 async def update(self, hex_hashes): '''Update state given the current mempool to the passed set of hashes. @@ -177,7 +178,7 @@ class MemPool(LoggedClass): hex_hashes = set(hex_hashes) touched = set() - if not self.txs: + if self.count == 0: self.logger.info('initial fetch of {:,d} daemon mempool txs' .format(len(hex_hashes))) @@ -190,9 +191,6 @@ class MemPool(LoggedClass): for hash168 in hash168s: self.hash168s[hash168].remove(hex_hash) touched.update(hash168s) - if gone: - self.logger.info('{:,d} entries removed from mempool' - .format(len(gone))) # Get the raw transactions for the new hashes. Ignore the # ones the daemon no longer has (it will return None). Put @@ -251,8 +249,10 @@ class MemPool(LoggedClass): self.hash168s[hash168].add(hex_hash) touched.add(hash168) - self.logger.info('{:,d} entries in mempool for {:,d} addresses' - .format(len(self.txs), len(self.hash168s))) + if self.count % 20 == 0: + self.logger.info('{:,d} entries in mempool for {:,d} addresses' + .format(len(self.txs), len(self.hash168s))) + self.count += 1 # Might include a None return touched @@ -261,7 +261,7 @@ class MemPool(LoggedClass): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool entries for the hash168. - unconfirmed is True if any txin is confirmed. + unconfirmed is True if any txin is unconfirmed. ''' for hex_hash in self.hash168s[hash168]: txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash] @@ -322,6 +322,8 @@ class BlockProcessor(LoggedClass): self.height = self.db_height self.tip = self.db_tip + self.daemon.debug_set_height(self.height) + # Caches to be flushed later. Headers and tx_hashes have one # entry per block self.history = defaultdict(partial(array.array, 'I')) @@ -354,11 +356,8 @@ class BlockProcessor(LoggedClass): self.clean_db() - def coros(self, force_backup=False): - if force_backup: - return [self.force_chain_reorg(True), self.prefetcher.start()] - else: - return [self.start(), self.prefetcher.start()] + def coros(self): + return [self.start(), self.prefetcher.start()] async def start(self): '''External entry point for block processing. @@ -402,19 +401,13 @@ class BlockProcessor(LoggedClass): await self.on_update(self.height, self.touched) self.touched = set() - async def force_chain_reorg(self, to_genesis): - try: - await self.handle_chain_reorg(to_genesis) - finally: - self.flush(True) - - async def handle_chain_reorg(self, to_genesis=False): + async def handle_chain_reorg(self): # First get all state on disk self.logger.info('chain reorg detected') self.flush(True) self.logger.info('finding common height...') - hashes = await self.reorg_hashes(to_genesis) + hashes = await self.reorg_hashes() # Reverse and convert to hex strings. hashes = [hash_to_str(hash) for hash in reversed(hashes)] for hex_hashes in chunks(hashes, 50): @@ -425,7 +418,7 @@ class BlockProcessor(LoggedClass): await self.prefetcher.clear(self.height) self.logger.info('prefetcher reset') - async def reorg_hashes(self, to_genesis): + async def reorg_hashes(self): '''Return the list of hashes to back up beacuse of a reorg. The hashes are returned in order of increasing height.''' @@ -443,7 +436,7 @@ class BlockProcessor(LoggedClass): hex_hashes = [hash_to_str(hash) for hash in hashes] d_hex_hashes = await self.daemon.block_hex_hashes(start, count) n = match_pos(hex_hashes, d_hex_hashes) - if n >= 0 and not to_genesis: + if n >= 0: start += n + 1 break count = min(count * 2, start) @@ -609,8 +602,8 @@ class BlockProcessor(LoggedClass): # Catch-up stats if show_stats: daemon_height = self.daemon.cached_height() - txs_per_sec = int(self.tx_count / self.wall_time) - this_txs_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush)) + tx_per_sec = int(self.tx_count / self.wall_time) + this_tx_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush)) if self.height > self.coin.TX_COUNT_HEIGHT: tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK else: @@ -618,12 +611,16 @@ class BlockProcessor(LoggedClass): * self.coin.TX_PER_BLOCK + (self.coin.TX_COUNT - self.tx_count)) + # Damp the enthusiasm + realism = 2.0 - 0.9 * self.height / self.coin.TX_COUNT_HEIGHT + tx_est *= max(realism, 1.0) + self.logger.info('tx/sec since genesis: {:,d}, ' 'since last flush: {:,d}' - .format(txs_per_sec, this_txs_per_sec)) + .format(tx_per_sec, this_tx_per_sec)) self.logger.info('sync time: {} ETA: {}' .format(formatted_time(self.wall_time), - formatted_time(tx_est / this_txs_per_sec))) + formatted_time(tx_est / this_tx_per_sec))) def flush_history(self, batch): self.logger.info('flushing history') @@ -854,7 +851,7 @@ class BlockProcessor(LoggedClass): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool entries for the hash168. - unconfirmed is True if any txin is confirmed. + unconfirmed is True if any txin is unconfirmed. ''' return self.mempool.transactions(hash168) diff --git a/server/controller.py b/server/controller.py index cbbfaf1..cd159b3 100644 --- a/server/controller.py +++ b/server/controller.py @@ -34,7 +34,7 @@ class Controller(LoggedClass): self.loop = loop self.env = env self.coin = env.coin - self.daemon = Daemon(env.daemon_url) + self.daemon = Daemon(env.daemon_url, env.debug) self.block_processor = BlockProcessor(env, self.daemon, on_update=self.on_update) JSONRPC.init(self.block_processor, self.daemon, self.coin, diff --git a/server/daemon.py b/server/daemon.py index e2406d1..f321873 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -17,80 +17,125 @@ import lib.util as util class DaemonError(Exception): - '''Raised when the daemon returns an error in its results that - cannot be remedied by retrying.''' + '''Raised when the daemon returns an error in its results.''' + + +class DaemonWarmingUpError(DaemonError): + '''Raised when the daemon returns an error in its results.''' class Daemon(util.LoggedClass): '''Handles connections to a daemon at the given URL.''' - def __init__(self, url): + WARMING_UP = -28 + + def __init__(self, url, debug): super().__init__() self.url = url self._height = None self.logger.info('connecting to daemon at URL {}'.format(url)) + self.debug_caught_up = 'caught_up' in debug + + def debug_set_height(self, height): + if self.debug_caught_up: + self.logger.info('pretending to have caught up to height {}' + .format(height)) + self._height = height + + async def post(self, data): + '''Send data to the daemon and handle the response.''' + async with aiohttp.post(self.url, data=data) as resp: + result = await resp.json() + + if isinstance(result, list): + errs = tuple(item['error'] for item in result) + if not any(errs): + return tuple(item['result'] for item in result) + if any(err.get('code') == self.WARMING_UP for err in errs if err): + raise DaemonWarmingUpError + raise DaemonError(errs) + else: + err = result['error'] + if not err: + return result['result'] + if err.get('code') == self.WARMING_UP: + raise DaemonWarmingUpError + raise DaemonError(err) + + async def send(self, payload): + '''Send a payload to be converted to JSON. + + Handles temporary connection issues. Daemon reponse errors + are raise through DaemonError. + ''' + data = json.dumps(payload) + secs = 1 + prior_msg = None + while True: + try: + result = await self.post(data) + if prior_msg: + self.logger.info('connection successfully restored') + return result + except asyncio.TimeoutError: + msg = 'timeout error' + except aiohttp.ClientHttpProcessingError: + msg = 'HTTP error' + except aiohttp.ServerDisconnectedError: + msg = 'daemon disconnected' + except aiohttp.ClientConnectionError: + msg = 'connection problem - is your daemon running?' + except DaemonWarmingUpError: + msg = 'daemon is still warming up' + + if msg != prior_msg or count == 10: + self.logger.error('{}. Retrying between sleeps...' + .format(msg)) + prior_msg = msg + count = 0 + await asyncio.sleep(secs) + count += 1 + secs = min(16, secs * 2) async def send_single(self, method, params=None): + '''Send a single request to the daemon.''' payload = {'method': method} if params: payload['params'] = params - result, = await self.send((payload, )) - return result + return await self.send(payload) - async def send_many(self, mp_pairs): - if mp_pairs: - payload = [{'method': method, 'params': params} - for method, params in mp_pairs] - return await self.send(payload) - return [] + async def send_many(self, mp_iterable): + '''Send several requests at once. - async def send_vector(self, method, params_list): - if params_list: - payload = [{'method': method, 'params': params} - for params in params_list] + The results are returned as a tuple.''' + payload = tuple({'method': m, 'params': p} for m, p in mp_iterable) + if payload: return await self.send(payload) - return [] + return () - async def send(self, payload): - assert isinstance(payload, (tuple, list)) - data = json.dumps(payload) - while True: - try: - async with aiohttp.post(self.url, data=data) as resp: - result = await resp.json() - except asyncio.CancelledError: - raise - except Exception as e: - msg = 'aiohttp error: {}'.format(e) - secs = 3 - else: - errs = tuple(item['error'] for item in result) - if not any(errs): - return tuple(item['result'] for item in result) - if any(err.get('code') == -28 for err in errs): - msg = 'daemon still warming up.' - secs = 30 - else: - raise DaemonError(errs) - - self.logger.error('{}. Sleeping {:d}s and trying again...' - .format(msg, secs)) - await asyncio.sleep(secs) + async def send_vector(self, method, params_iterable): + '''Send several requests of the same method. + + The results are returned as a tuple.''' + return await self.send_many((method, params) + for params in params_iterable) async def block_hex_hashes(self, first, count): '''Return the hex hashes of count block starting at height first.''' - param_lists = [[height] for height in range(first, first + count)] - return await self.send_vector('getblockhash', param_lists) + params_iterable = ((h, ) for h in range(first, first + count)) + return await self.send_vector('getblockhash', params_iterable) async def raw_blocks(self, hex_hashes): '''Return the raw binary blocks with the given hex hashes.''' - param_lists = [(h, False) for h in hex_hashes] - blocks = await self.send_vector('getblock', param_lists) + params_iterable = ((h, False) for h in hex_hashes) + blocks = await self.send_vector('getblock', params_iterable) # Convert hex string to bytes - return [bytes.fromhex(block) for block in blocks] + return tuple(bytes.fromhex(block) for block in blocks) async def mempool_hashes(self): '''Return the hashes of the txs in the daemon's mempool.''' + if self.debug_caught_up: + return [] return await self.send_single('getrawmempool') async def estimatefee(self, params): @@ -111,14 +156,10 @@ class Daemon(util.LoggedClass): '''Return the serialized raw transactions with the given hashes. Breaks large requests up. Yields after each sub request.''' - param_lists = tuple((hex_hash, 0) for hex_hash in hex_hashes) - raw_txs = [] - for chunk in util.chunks(param_lists, 10000): - txs = await self.send_vector('getrawtransaction', chunk) - # Convert hex strings to bytes - raw_txs.append(tuple(bytes.fromhex(tx) for tx in txs)) - await asyncio.sleep(0) - return sum(raw_txs, ()) + params_iterable = ((hex_hash, 0) for hex_hash in hex_hashes) + txs = await self.send_vector('getrawtransaction', params_iterable) + # Convert hex strings to bytes + return tuple(bytes.fromhex(tx) for tx in txs) async def sendrawtransaction(self, params): '''Broadcast a transaction to the network.''' @@ -126,7 +167,8 @@ class Daemon(util.LoggedClass): async def height(self): '''Query the daemon for its current height.''' - self._height = await self.send_single('getblockcount') + if not self.debug_caught_up: + self._height = await self.send_single('getblockcount') return self._height def cached_height(self): diff --git a/server/env.py b/server/env.py index 21fa8ae..403a277 100644 --- a/server/env.py +++ b/server/env.py @@ -43,6 +43,8 @@ class Env(LoggedClass): # The electrum client takes the empty string as unspecified self.donation_address = self.default('DONATION_ADDRESS', '') self.db_engine = self.default('DB_ENGINE', 'leveldb') + self.debug = self.default('DEBUG', '') + self.debug = [item.lower() for item in self.debug.split()] def default(self, envvar, default): return environ.get(envvar, default) diff --git a/server/protocol.py b/server/protocol.py index 69105e2..a2ec055 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -77,7 +77,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): try: message = json.loads(message.decode()) except Exception as e: - self.logger.info('error decoding JSON message'.format(e)) + self.logger.info('error decoding JSON message: {}'.format(e)) else: self.ADD_JOB(self.request_handler(message)) @@ -404,8 +404,7 @@ class ElectrumX(JSONRPC): self.logger.info('sent tx: {}'.format(tx_hash)) return tx_hash except DaemonError as e: - errors = e.args[0] - error = errors[0] + error = e.args[0] message = error['message'] self.logger.info('sendrawtransaction: {}'.format(message)) if 'non-mandatory-script-verify-flag' in message: diff --git a/server/version.py b/server/version.py index 74c926d..d8049bf 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.07" +VERSION = "ElectrumX 0.1" diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..827e807 --- /dev/null +++ b/setup.py @@ -0,0 +1,24 @@ +import setuptools +from server.version import VERSION + + +setuptools.setup( + name='electrumx', + version=VERSION.split()[-1], + scripts=['electrumx_server.py', 'electrumx_rpc.py'], + python_requires='>=3.5', + install_requires=['plyvel', 'aiohttp >= 1'], + packages=setuptools.find_packages(), + description='ElectrumX Server', + author='Neil Booth', + author_email='kyuupichan@gmail.com', + license='MIT Licence', + url='https://github.com/kyuupichan/electrumx/', + long_description='Server implementation for the Electrum wallet', + classifiers=[ + 'Development Status :: 3 - Alpha', + 'Topic :: Internet', + 'License :: OSI Approved :: MIT License', + 'Operating System :: Unix', + ], +)