From 9fbbc8bfdb35d0c4607ea6ca7fe6bfd2dddbbdc8 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 5 Nov 2016 15:59:42 +0900 Subject: [PATCH 01/12] Clean up daemon interface. --- server/block_processor.py | 2 +- server/daemon.py | 90 ++++++++++++++++++++++----------------- server/protocol.py | 3 +- 3 files changed, 52 insertions(+), 43 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 6a64d15..ec9891e 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -89,7 +89,7 @@ class Prefetcher(LoggedClass): else: await asyncio.sleep(0) except DaemonError as e: - self.logger.info('ignoring daemon errors: {}'.format(e)) + self.logger.info('ignoring daemon error: {}'.format(e)) async def _caught_up(self): '''Poll for new blocks and mempool state. diff --git a/server/daemon.py b/server/daemon.py index e2406d1..b745076 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -24,70 +24,80 @@ class DaemonError(Exception): class Daemon(util.LoggedClass): '''Handles connections to a daemon at the given URL.''' + WARMING_UP = -28 + def __init__(self, url): super().__init__() self.url = url self._height = None self.logger.info('connecting to daemon at URL {}'.format(url)) - async def send_single(self, method, params=None): - payload = {'method': method} - if params: - payload['params'] = params - result, = await self.send((payload, )) - return result - - async def send_many(self, mp_pairs): - if mp_pairs: - payload = [{'method': method, 'params': params} - for method, params in mp_pairs] - return await self.send(payload) - return [] - - async def send_vector(self, method, params_list): - if params_list: - payload = [{'method': method, 'params': params} - for params in params_list] - return await self.send(payload) - return [] + @classmethod + def is_warming_up(cls, err): + if not isinstance(err, list): + err = [err] + return any(elt.get('code') == cls.WARMING_UP for elt in err) async def send(self, payload): - assert isinstance(payload, (tuple, list)) + '''Send a payload to be converted to JSON.''' data = json.dumps(payload) + secs = 1 while True: try: async with aiohttp.post(self.url, data=data) as resp: result = await resp.json() - except asyncio.CancelledError: - raise - except Exception as e: - msg = 'aiohttp error: {}'.format(e) - secs = 3 - else: - errs = tuple(item['error'] for item in result) - if not any(errs): - return tuple(item['result'] for item in result) - if any(err.get('code') == -28 for err in errs): - msg = 'daemon still warming up.' - secs = 30 - else: - raise DaemonError(errs) + if not self.is_warming_up(result): + return result + msg = 'daemon is still warming up' + except aiohttp.DisconnectedError as e: + msg = '{}: {}'.format(e.__class__.__name__, e) + secs = min(180, secs * 2) self.logger.error('{}. Sleeping {:d}s and trying again...' .format(msg, secs)) await asyncio.sleep(secs) + async def send_single(self, method, params=None): + '''Send a single request to the daemon.''' + payload = {'method': method} + if params: + payload['params'] = params + item = await self.send(payload) + if item['error']: + raise DaemonError(item['error']) + return item['result'] + + async def send_many(self, mp_iterable): + '''Send several requests at once. + + The results are returned as a tuple.''' + payload = tuple({'method': m, 'params': p} for m, p in mp_iterable) + if payload: + items = await self.send(payload) + errs = tuple(item['error'] for item in items) + if any(errs): + raise DaemonError(errs) + return tuple(item['result'] for item in items) + return () + + async def send_vector(self, method, params_iterable): + '''Send several requests of the same method. + + The results are returned as a tuple.''' + return await self.send_many((method, params) + for params in params_iterable) + async def block_hex_hashes(self, first, count): '''Return the hex hashes of count block starting at height first.''' - param_lists = [[height] for height in range(first, first + count)] - return await self.send_vector('getblockhash', param_lists) + params_iterable = ((h, ) for h in range(first, first + count)) + return await self.send_vector('getblockhash', params_iterable) async def raw_blocks(self, hex_hashes): '''Return the raw binary blocks with the given hex hashes.''' - param_lists = [(h, False) for h in hex_hashes] - blocks = await self.send_vector('getblock', param_lists) + params_iterable = ((h, False) for h in hex_hashes) + blocks = await self.send_vector('getblock', params_iterable) # Convert hex string to bytes - return [bytes.fromhex(block) for block in blocks] + return tuple(bytes.fromhex(block) for block in blocks) async def mempool_hashes(self): '''Return the hashes of the txs in the daemon's mempool.''' diff --git a/server/protocol.py b/server/protocol.py index 69105e2..5c960d1 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -404,8 +404,7 @@ class ElectrumX(JSONRPC): self.logger.info('sent tx: {}'.format(tx_hash)) return tx_hash except DaemonError as e: - errors = e.args[0] - error = errors[0] + error = e.args[0] message = error['message'] self.logger.info('sendrawtransaction: {}'.format(message)) if 'non-mandatory-script-verify-flag' in message: From f86d91862ecb5dbcdb9f6f24cb9289b3c5c12630 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 5 Nov 2016 16:03:11 +0900 Subject: [PATCH 02/12] Fix typos --- server/block_processor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index ec9891e..a9da67e 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -81,7 +81,7 @@ class Prefetcher(LoggedClass): async def start(self): '''Loop forever polling for more blocks.''' - self.logger.info('starting prefetch loop...') + self.logger.info('starting daemon poll loop...') while True: try: if await self._caught_up(): @@ -261,7 +261,7 @@ class MemPool(LoggedClass): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool entries for the hash168. - unconfirmed is True if any txin is confirmed. + unconfirmed is True if any txin is unconfirmed. ''' for hex_hash in self.hash168s[hash168]: txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash] @@ -854,7 +854,7 @@ class BlockProcessor(LoggedClass): '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool entries for the hash168. - unconfirmed is True if any txin is confirmed. + unconfirmed is True if any txin is unconfirmed. ''' return self.mempool.transactions(hash168) From 6946863879e6cc129ac0c64520fcba3cb15d9ef4 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 5 Nov 2016 18:00:03 +0900 Subject: [PATCH 03/12] Add setup.py Fix electrum_rpc.py Rename server_main.py to electrumx_server.py --- electrumx_rpc.py | 8 +++++--- server_main.py => electrumx_server.py | 0 lib/__init__.py | 0 samples/scripts/NOTES | 3 ++- samples/scripts/env/ELECTRUMX | 1 + samples/scripts/env/SERVER_MAIN | 1 - samples/systemd-unit | 2 +- setup.py | 24 ++++++++++++++++++++++++ 8 files changed, 33 insertions(+), 6 deletions(-) rename server_main.py => electrumx_server.py (100%) create mode 100644 lib/__init__.py create mode 100644 samples/scripts/env/ELECTRUMX delete mode 100644 samples/scripts/env/SERVER_MAIN create mode 100644 setup.py diff --git a/electrumx_rpc.py b/electrumx_rpc.py index 2ce4dee..03e25c1 100755 --- a/electrumx_rpc.py +++ b/electrumx_rpc.py @@ -43,14 +43,16 @@ def main(): parser = argparse.ArgumentParser('Send electrumx an RPC command' ) parser.add_argument('-p', '--port', metavar='port_num', type=int, help='RPC port number') - parser.add_argument('command', nargs='*', default=[], + parser.add_argument('command', nargs=1, default=[], help='command to send') + parser.add_argument('param', nargs='*', default=[], + help='params to send') args = parser.parse_args() if args.port is None: args.port = int(environ.get('ELECTRUMX_RPC_PORT', 8000)) - payload = {'method': args.command[0], 'params': args.command[1:]} + payload = {'method': args.command[0], 'params': args.param} loop = asyncio.get_event_loop() proto_factory = partial(RPCClient, loop) @@ -60,7 +62,7 @@ def main(): protocol.send(payload) loop.run_forever() except OSError: - print('error connecting - is ElectrumX running?') + print('error connecting - is ElectrumX catching up or not running?') finally: loop.close() diff --git a/server_main.py b/electrumx_server.py similarity index 100% rename from server_main.py rename to electrumx_server.py diff --git a/lib/__init__.py b/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/samples/scripts/NOTES b/samples/scripts/NOTES index e56c4ec..4b86bde 100644 --- a/samples/scripts/NOTES +++ b/samples/scripts/NOTES @@ -2,7 +2,8 @@ The following environment variables are required: DB_DIRECTORY - path to the database directory (if relative, to `run` script) USERNAME - the username the server will run as if using `run` script -SERVER_MAIN - path to the server_main.py script (if relative, to `run` script) +ELECTRUMX - path to the electrumx_server.py script (if relative, + to `run` script) DAEMON_URL - the URL used to connect to the daemon. Should be of the form http://username:password@hostname:port/ Alternatively you can specify DAEMON_USERNAME, DAEMON_PASSWORD, diff --git a/samples/scripts/env/ELECTRUMX b/samples/scripts/env/ELECTRUMX new file mode 100644 index 0000000..3de0637 --- /dev/null +++ b/samples/scripts/env/ELECTRUMX @@ -0,0 +1 @@ +/path/to/electrumx_server.py diff --git a/samples/scripts/env/SERVER_MAIN b/samples/scripts/env/SERVER_MAIN deleted file mode 100644 index 6aee931..0000000 --- a/samples/scripts/env/SERVER_MAIN +++ /dev/null @@ -1 +0,0 @@ -/path/to/repos/electrumx/server_main.py diff --git a/samples/systemd-unit b/samples/systemd-unit index 94b7d47..46107f3 100644 --- a/samples/systemd-unit +++ b/samples/systemd-unit @@ -4,7 +4,7 @@ After=network.target [Service] EnvironmentFile=/etc/electrumx.conf -ExecStart=/home/electrumx/electrumx/server_main.py +ExecStart=/usr/local/bin/electrumx_server.py User=electrumx [Install] diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..827e807 --- /dev/null +++ b/setup.py @@ -0,0 +1,24 @@ +import setuptools +from server.version import VERSION + + +setuptools.setup( + name='electrumx', + version=VERSION.split()[-1], + scripts=['electrumx_server.py', 'electrumx_rpc.py'], + python_requires='>=3.5', + install_requires=['plyvel', 'aiohttp >= 1'], + packages=setuptools.find_packages(), + description='ElectrumX Server', + author='Neil Booth', + author_email='kyuupichan@gmail.com', + license='MIT Licence', + url='https://github.com/kyuupichan/electrumx/', + long_description='Server implementation for the Electrum wallet', + classifiers=[ + 'Development Status :: 3 - Alpha', + 'Topic :: Internet', + 'License :: OSI Approved :: MIT License', + 'Operating System :: Unix', + ], +) From 5a76c963e906764d7e73abfb2edeacd8c0fc06af Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 5 Nov 2016 18:02:04 +0900 Subject: [PATCH 04/12] Update HOWO --- HOWTO.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/HOWTO.rst b/HOWTO.rst index 63c67cc..ebb9f03 100644 --- a/HOWTO.rst +++ b/HOWTO.rst @@ -53,8 +53,8 @@ Check out the code from Github:: git clone https://github.com/kyuupichan/electrumx.git cd electrumx -I have not yet created a setup.py, so for now I suggest you run -the code from the source tree or a copy of it. +You can install with setup.py, or run the code from the source tree or +a copy of it. You should create a standard user account to run the server under; your own is probably adequate unless paranoid. The paranoid might From 28e7b95412283e46b6511fb54f0da78c914af5c6 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 5 Nov 2016 18:05:46 +0900 Subject: [PATCH 05/12] Bump version, update RELEASE_NOTES --- RELEASE-NOTES | 5 +++++ server/version.py | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/RELEASE-NOTES b/RELEASE-NOTES index f4af83c..b50ef9b 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,3 +1,8 @@ +Version 0.1 +------------ + +- added setup.py, experimental + Version 0.07 ------------ diff --git a/server/version.py b/server/version.py index 74c926d..d8049bf 100644 --- a/server/version.py +++ b/server/version.py @@ -1 +1 @@ -VERSION = "ElectrumX 0.07" +VERSION = "ElectrumX 0.1" From 26dc1021f6530a12d8aa92d8aff9da631ad2a840 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 5 Nov 2016 18:37:40 +0900 Subject: [PATCH 06/12] Catch TimeoutError --- server/daemon.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/daemon.py b/server/daemon.py index b745076..27c0a3b 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -49,6 +49,8 @@ class Daemon(util.LoggedClass): if not self.is_warming_up(result): return result msg = 'daemon is still warming up' + except asyncio.TimeoutError: + msg = 'timeout error' except aiohttp.DisconnectedError as e: msg = '{}: {}'.format(e.__class__.__name__, e) From 312aaf4736951a594f56a51ac63c30ed56a5d619 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 5 Nov 2016 18:15:09 +0900 Subject: [PATCH 07/12] Attempt to improve the ETA --- HOWTO.rst | 3 +-- server/block_processor.py | 12 ++++++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/HOWTO.rst b/HOWTO.rst index ebb9f03..fc0997c 100644 --- a/HOWTO.rst +++ b/HOWTO.rst @@ -275,5 +275,4 @@ After flush-to-disk you may see an aiohttp error; this is the daemon timing out the connection while the disk flush was in progress. This is harmless. -The ETA is just a guide and can be quite volatile, particularly around -flushes. It is too optimistic initially. +The ETA is just a guide and can be quite volatile around flushes. diff --git a/server/block_processor.py b/server/block_processor.py index a9da67e..8180c06 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -609,8 +609,8 @@ class BlockProcessor(LoggedClass): # Catch-up stats if show_stats: daemon_height = self.daemon.cached_height() - txs_per_sec = int(self.tx_count / self.wall_time) - this_txs_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush)) + tx_per_sec = int(self.tx_count / self.wall_time) + this_tx_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush)) if self.height > self.coin.TX_COUNT_HEIGHT: tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK else: @@ -618,12 +618,16 @@ class BlockProcessor(LoggedClass): * self.coin.TX_PER_BLOCK + (self.coin.TX_COUNT - self.tx_count)) + # Damp the enthusiasm + realism = 2.0 - 0.9 * self.height / self.coin.TX_COUNT_HEIGHT + tx_est *= max(realism, 1.0) + self.logger.info('tx/sec since genesis: {:,d}, ' 'since last flush: {:,d}' - .format(txs_per_sec, this_txs_per_sec)) + .format(tx_per_sec, this_tx_per_sec)) self.logger.info('sync time: {} ETA: {}' .format(formatted_time(self.wall_time), - formatted_time(tx_est / this_txs_per_sec))) + formatted_time(tx_est / this_tx_per_sec))) def flush_history(self, batch): self.logger.info('flushing history') From 00e9c5a31dad0a3dd80a842128e2a65399396860 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 5 Nov 2016 21:26:52 +0900 Subject: [PATCH 08/12] Add daemon debugging caught-up facility --- server/block_processor.py | 6 +++++- server/controller.py | 2 +- server/daemon.py | 14 ++++++++++++-- server/env.py | 2 ++ server/protocol.py | 2 +- 5 files changed, 21 insertions(+), 5 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 8180c06..7660b88 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -167,6 +167,7 @@ class MemPool(LoggedClass): self.txs = {} self.hash168s = defaultdict(set) # None can be a key self.bp = bp + self.initial = True async def update(self, hex_hashes): '''Update state given the current mempool to the passed set of hashes. @@ -177,7 +178,8 @@ class MemPool(LoggedClass): hex_hashes = set(hex_hashes) touched = set() - if not self.txs: + if self.initial: + self.initial = False self.logger.info('initial fetch of {:,d} daemon mempool txs' .format(len(hex_hashes))) @@ -322,6 +324,8 @@ class BlockProcessor(LoggedClass): self.height = self.db_height self.tip = self.db_tip + self.daemon.debug_set_height(self.height) + # Caches to be flushed later. Headers and tx_hashes have one # entry per block self.history = defaultdict(partial(array.array, 'I')) diff --git a/server/controller.py b/server/controller.py index cbbfaf1..cd159b3 100644 --- a/server/controller.py +++ b/server/controller.py @@ -34,7 +34,7 @@ class Controller(LoggedClass): self.loop = loop self.env = env self.coin = env.coin - self.daemon = Daemon(env.daemon_url) + self.daemon = Daemon(env.daemon_url, env.debug) self.block_processor = BlockProcessor(env, self.daemon, on_update=self.on_update) JSONRPC.init(self.block_processor, self.daemon, self.coin, diff --git a/server/daemon.py b/server/daemon.py index 27c0a3b..a4682df 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -26,11 +26,18 @@ class Daemon(util.LoggedClass): WARMING_UP = -28 - def __init__(self, url): + def __init__(self, url, debug): super().__init__() self.url = url self._height = None self.logger.info('connecting to daemon at URL {}'.format(url)) + self.debug_caught_up = 'caught_up' in debug + + def debug_set_height(self, height): + if self.debug_caught_up: + self.logger.info('pretending to have caught up to height {}' + .format(height)) + self._height = height @classmethod def is_warming_up(cls, err): @@ -103,6 +110,8 @@ class Daemon(util.LoggedClass): async def mempool_hashes(self): '''Return the hashes of the txs in the daemon's mempool.''' + if self.debug_caught_up: + return [] return await self.send_single('getrawmempool') async def estimatefee(self, params): @@ -138,7 +147,8 @@ class Daemon(util.LoggedClass): async def height(self): '''Query the daemon for its current height.''' - self._height = await self.send_single('getblockcount') + if not self.debug_caught_up: + self._height = await self.send_single('getblockcount') return self._height def cached_height(self): diff --git a/server/env.py b/server/env.py index 21fa8ae..403a277 100644 --- a/server/env.py +++ b/server/env.py @@ -43,6 +43,8 @@ class Env(LoggedClass): # The electrum client takes the empty string as unspecified self.donation_address = self.default('DONATION_ADDRESS', '') self.db_engine = self.default('DB_ENGINE', 'leveldb') + self.debug = self.default('DEBUG', '') + self.debug = [item.lower() for item in self.debug.split()] def default(self, envvar, default): return environ.get(envvar, default) diff --git a/server/protocol.py b/server/protocol.py index 5c960d1..a2ec055 100644 --- a/server/protocol.py +++ b/server/protocol.py @@ -77,7 +77,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass): try: message = json.loads(message.decode()) except Exception as e: - self.logger.info('error decoding JSON message'.format(e)) + self.logger.info('error decoding JSON message: {}'.format(e)) else: self.ADD_JOB(self.request_handler(message)) From 1b589d3d1f82dfdcae31c25fd78c747168574750 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sat, 5 Nov 2016 21:49:05 +0900 Subject: [PATCH 09/12] Remove old reorg forcing code --- server/block_processor.py | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 7660b88..34391b3 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -358,11 +358,8 @@ class BlockProcessor(LoggedClass): self.clean_db() - def coros(self, force_backup=False): - if force_backup: - return [self.force_chain_reorg(True), self.prefetcher.start()] - else: - return [self.start(), self.prefetcher.start()] + def coros(self): + return [self.start(), self.prefetcher.start()] async def start(self): '''External entry point for block processing. @@ -406,19 +403,13 @@ class BlockProcessor(LoggedClass): await self.on_update(self.height, self.touched) self.touched = set() - async def force_chain_reorg(self, to_genesis): - try: - await self.handle_chain_reorg(to_genesis) - finally: - self.flush(True) - - async def handle_chain_reorg(self, to_genesis=False): + async def handle_chain_reorg(self): # First get all state on disk self.logger.info('chain reorg detected') self.flush(True) self.logger.info('finding common height...') - hashes = await self.reorg_hashes(to_genesis) + hashes = await self.reorg_hashes() # Reverse and convert to hex strings. hashes = [hash_to_str(hash) for hash in reversed(hashes)] for hex_hashes in chunks(hashes, 50): @@ -429,7 +420,7 @@ class BlockProcessor(LoggedClass): await self.prefetcher.clear(self.height) self.logger.info('prefetcher reset') - async def reorg_hashes(self, to_genesis): + async def reorg_hashes(self): '''Return the list of hashes to back up beacuse of a reorg. The hashes are returned in order of increasing height.''' @@ -447,7 +438,7 @@ class BlockProcessor(LoggedClass): hex_hashes = [hash_to_str(hash) for hash in hashes] d_hex_hashes = await self.daemon.block_hex_hashes(start, count) n = match_pos(hex_hashes, d_hex_hashes) - if n >= 0 and not to_genesis: + if n >= 0: start += n + 1 break count = min(count * 2, start) From 18efa67f1d5e773d3e03a7658d5e51640b4be28a Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 6 Nov 2016 07:26:06 +0900 Subject: [PATCH 10/12] Various daemon improvements --- server/block_processor.py | 14 +++---- server/daemon.py | 78 ++++++++++++++++++++++----------------- 2 files changed, 51 insertions(+), 41 deletions(-) diff --git a/server/block_processor.py b/server/block_processor.py index 34391b3..4ea5d55 100644 --- a/server/block_processor.py +++ b/server/block_processor.py @@ -167,7 +167,7 @@ class MemPool(LoggedClass): self.txs = {} self.hash168s = defaultdict(set) # None can be a key self.bp = bp - self.initial = True + self.count = 0 async def update(self, hex_hashes): '''Update state given the current mempool to the passed set of hashes. @@ -178,8 +178,7 @@ class MemPool(LoggedClass): hex_hashes = set(hex_hashes) touched = set() - if self.initial: - self.initial = False + if self.count == 0: self.logger.info('initial fetch of {:,d} daemon mempool txs' .format(len(hex_hashes))) @@ -192,9 +191,6 @@ class MemPool(LoggedClass): for hash168 in hash168s: self.hash168s[hash168].remove(hex_hash) touched.update(hash168s) - if gone: - self.logger.info('{:,d} entries removed from mempool' - .format(len(gone))) # Get the raw transactions for the new hashes. Ignore the # ones the daemon no longer has (it will return None). Put @@ -253,8 +249,10 @@ class MemPool(LoggedClass): self.hash168s[hash168].add(hex_hash) touched.add(hash168) - self.logger.info('{:,d} entries in mempool for {:,d} addresses' - .format(len(self.txs), len(self.hash168s))) + if self.count % 20 == 0: + self.logger.info('{:,d} entries in mempool for {:,d} addresses' + .format(len(self.txs), len(self.hash168s))) + self.count += 1 # Might include a None return touched diff --git a/server/daemon.py b/server/daemon.py index a4682df..6f29142 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -17,8 +17,11 @@ import lib.util as util class DaemonError(Exception): - '''Raised when the daemon returns an error in its results that - cannot be remedied by retrying.''' + '''Raised when the daemon returns an error in its results.''' + + +class DaemonWarmingUpError(DaemonError): + '''Raised when the daemon returns an error in its results.''' class Daemon(util.LoggedClass): @@ -39,42 +42,59 @@ class Daemon(util.LoggedClass): .format(height)) self._height = height - @classmethod - def is_warming_up(cls, err): - if not isinstance(err, list): - err = [err] - return any(elt.get('code') == cls.WARMING_UP for elt in err) + async def post(self, data): + '''Send data to the daemon and handle the response.''' + async with aiohttp.post(self.url, data=data) as resp: + result = await resp.json() + + if isinstance(result, list): + errs = tuple(item['error'] for item in result) + if not any(errs): + return tuple(item['result'] for item in result) + if any(err.get('code') == self.WARMING_UP for err in errs if err): + raise DaemonWarmingUpError + raise DaemonError(errs) + else: + err = result['error'] + if not err: + return result['result'] + if err.get('code') == self.WARMING_UP: + raise DaemonWarmingUpError + raise DaemonError(err) async def send(self, payload): - '''Send a payload to be converted to JSON.''' + '''Send a payload to be converted to JSON. + + Handles temporary connection issues. Daemon reponse errors + are raise through DaemonError. + ''' data = json.dumps(payload) secs = 1 while True: try: - async with aiohttp.post(self.url, data=data) as resp: - result = await resp.json() - if not self.is_warming_up(result): - return result - msg = 'daemon is still warming up' + return await self.post(data) except asyncio.TimeoutError: msg = 'timeout error' - except aiohttp.DisconnectedError as e: - msg = '{}: {}'.format(e.__class__.__name__, e) + except aiohttp.ClientHttpProcessingError: + msg = 'HTTP error' + except aiohttp.ServerDisconnectedError: + msg = 'daemon disconnected' + except aiohttp.ClientConnectionError: + msg = 'connection problem - is your daemon running?' + except DaemonWarmingUpError: + msg = 'daemon is still warming up' - secs = min(180, secs * 2) self.logger.error('{}. Sleeping {:d}s and trying again...' .format(msg, secs)) await asyncio.sleep(secs) + secs = min(180, secs * 2) async def send_single(self, method, params=None): '''Send a single request to the daemon.''' payload = {'method': method} if params: payload['params'] = params - item = await self.send(payload) - if item['error']: - raise DaemonError(item['error']) - return item['result'] + return await self.send(payload) async def send_many(self, mp_iterable): '''Send several requests at once. @@ -82,11 +102,7 @@ class Daemon(util.LoggedClass): The results are returned as a tuple.''' payload = tuple({'method': m, 'params': p} for m, p in mp_iterable) if payload: - items = await self.send(payload) - errs = tuple(item['error'] for item in items) - if any(errs): - raise DaemonError(errs) - return tuple(item['result'] for item in items) + return await self.send(payload) return () async def send_vector(self, method, params_iterable): @@ -132,14 +148,10 @@ class Daemon(util.LoggedClass): '''Return the serialized raw transactions with the given hashes. Breaks large requests up. Yields after each sub request.''' - param_lists = tuple((hex_hash, 0) for hex_hash in hex_hashes) - raw_txs = [] - for chunk in util.chunks(param_lists, 10000): - txs = await self.send_vector('getrawtransaction', chunk) - # Convert hex strings to bytes - raw_txs.append(tuple(bytes.fromhex(tx) for tx in txs)) - await asyncio.sleep(0) - return sum(raw_txs, ()) + params_iterable = ((hex_hash, 0) for hex_hash in hex_hashes) + txs = await self.send_vector('getrawtransaction', params_iterable) + # Convert hex strings to bytes + return tuple(bytes.fromhex(tx) for tx in txs) async def sendrawtransaction(self, params): '''Broadcast a transaction to the network.''' From db29121d588bff0f935b21a314098873159c625d Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 6 Nov 2016 08:36:11 +0900 Subject: [PATCH 11/12] Improved daemon logging --- server/daemon.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/server/daemon.py b/server/daemon.py index 6f29142..2df7b27 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -70,9 +70,13 @@ class Daemon(util.LoggedClass): ''' data = json.dumps(payload) secs = 1 + prior_msg = None while True: try: - return await self.post(data) + result = await self.post(data) + if prior_msg: + self.logger.error('connection successfully restored') + return result except asyncio.TimeoutError: msg = 'timeout error' except aiohttp.ClientHttpProcessingError: @@ -84,10 +88,14 @@ class Daemon(util.LoggedClass): except DaemonWarmingUpError: msg = 'daemon is still warming up' - self.logger.error('{}. Sleeping {:d}s and trying again...' - .format(msg, secs)) + if msg != prior_msg or count == 10: + self.logger.error('{}. Retrying between sleeps...' + .format(msg)) + prior_msg = msg + count = 0 await asyncio.sleep(secs) - secs = min(180, secs * 2) + count += 1 + secs = min(16, secs * 2) async def send_single(self, method, params=None): '''Send a single request to the daemon.''' From 48f0d5456c045b39ed5e8ff54e845a14eaa38ca2 Mon Sep 17 00:00:00 2001 From: Neil Booth Date: Sun, 6 Nov 2016 08:48:38 +0900 Subject: [PATCH 12/12] Release preparation --- RELEASE-NOTES | 7 ++++++- server/daemon.py | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/RELEASE-NOTES b/RELEASE-NOTES index b50ef9b..4e763a5 100644 --- a/RELEASE-NOTES +++ b/RELEASE-NOTES @@ -1,7 +1,12 @@ Version 0.1 ------------ -- added setup.py, experimental +- added setup.py, experimental. Because of this server_main.py renamed + electrumx_server.py, and SERVER_MAIN environment variable was renamed + to ELECTRUMX. The sample run script was updated to match. +- improvements to logging of daemon connection issues +- removal of old reorg test code +- hopefully more accurate sync ETA Version 0.07 ------------ diff --git a/server/daemon.py b/server/daemon.py index 2df7b27..f321873 100644 --- a/server/daemon.py +++ b/server/daemon.py @@ -75,7 +75,7 @@ class Daemon(util.LoggedClass): try: result = await self.post(data) if prior_msg: - self.logger.error('connection successfully restored') + self.logger.info('connection successfully restored') return result except asyncio.TimeoutError: msg = 'timeout error'