Browse Source

Merge branch 'release-0.1'

master 0.1
Neil Booth 9 years ago
parent
commit
281d9dacef
  1. 7
      HOWTO.rst
  2. 10
      RELEASE-NOTES
  3. 8
      electrumx_rpc.py
  4. 0
      electrumx_server.py
  5. 0
      lib/__init__.py
  6. 3
      samples/scripts/NOTES
  7. 1
      samples/scripts/env/ELECTRUMX
  8. 1
      samples/scripts/env/SERVER_MAIN
  9. 2
      samples/systemd-unit
  10. 55
      server/block_processor.py
  11. 2
      server/controller.py
  12. 152
      server/daemon.py
  13. 2
      server/env.py
  14. 5
      server/protocol.py
  15. 2
      server/version.py
  16. 24
      setup.py

7
HOWTO.rst

@ -53,8 +53,8 @@ Check out the code from Github::
git clone https://github.com/kyuupichan/electrumx.git git clone https://github.com/kyuupichan/electrumx.git
cd electrumx cd electrumx
I have not yet created a setup.py, so for now I suggest you run You can install with setup.py, or run the code from the source tree or
the code from the source tree or a copy of it. a copy of it.
You should create a standard user account to run the server under; You should create a standard user account to run the server under;
your own is probably adequate unless paranoid. The paranoid might your own is probably adequate unless paranoid. The paranoid might
@ -275,5 +275,4 @@ After flush-to-disk you may see an aiohttp error; this is the daemon
timing out the connection while the disk flush was in progress. This timing out the connection while the disk flush was in progress. This
is harmless. is harmless.
The ETA is just a guide and can be quite volatile, particularly around The ETA is just a guide and can be quite volatile around flushes.
flushes. It is too optimistic initially.

10
RELEASE-NOTES

@ -1,3 +1,13 @@
Version 0.1
------------
- added setup.py, experimental. Because of this server_main.py renamed
electrumx_server.py, and SERVER_MAIN environment variable was renamed
to ELECTRUMX. The sample run script was updated to match.
- improvements to logging of daemon connection issues
- removal of old reorg test code
- hopefully more accurate sync ETA
Version 0.07 Version 0.07
------------ ------------

8
electrumx_rpc.py

@ -43,14 +43,16 @@ def main():
parser = argparse.ArgumentParser('Send electrumx an RPC command' ) parser = argparse.ArgumentParser('Send electrumx an RPC command' )
parser.add_argument('-p', '--port', metavar='port_num', type=int, parser.add_argument('-p', '--port', metavar='port_num', type=int,
help='RPC port number') help='RPC port number')
parser.add_argument('command', nargs='*', default=[], parser.add_argument('command', nargs=1, default=[],
help='command to send') help='command to send')
parser.add_argument('param', nargs='*', default=[],
help='params to send')
args = parser.parse_args() args = parser.parse_args()
if args.port is None: if args.port is None:
args.port = int(environ.get('ELECTRUMX_RPC_PORT', 8000)) args.port = int(environ.get('ELECTRUMX_RPC_PORT', 8000))
payload = {'method': args.command[0], 'params': args.command[1:]} payload = {'method': args.command[0], 'params': args.param}
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
proto_factory = partial(RPCClient, loop) proto_factory = partial(RPCClient, loop)
@ -60,7 +62,7 @@ def main():
protocol.send(payload) protocol.send(payload)
loop.run_forever() loop.run_forever()
except OSError: except OSError:
print('error connecting - is ElectrumX running?') print('error connecting - is ElectrumX catching up or not running?')
finally: finally:
loop.close() loop.close()

0
server_main.py → electrumx_server.py

0
lib/__init__.py

3
samples/scripts/NOTES

@ -2,7 +2,8 @@ The following environment variables are required:
DB_DIRECTORY - path to the database directory (if relative, to `run` script) DB_DIRECTORY - path to the database directory (if relative, to `run` script)
USERNAME - the username the server will run as if using `run` script USERNAME - the username the server will run as if using `run` script
SERVER_MAIN - path to the server_main.py script (if relative, to `run` script) ELECTRUMX - path to the electrumx_server.py script (if relative,
to `run` script)
DAEMON_URL - the URL used to connect to the daemon. Should be of the form DAEMON_URL - the URL used to connect to the daemon. Should be of the form
http://username:password@hostname:port/ http://username:password@hostname:port/
Alternatively you can specify DAEMON_USERNAME, DAEMON_PASSWORD, Alternatively you can specify DAEMON_USERNAME, DAEMON_PASSWORD,

1
samples/scripts/env/ELECTRUMX

@ -0,0 +1 @@
/path/to/electrumx_server.py

1
samples/scripts/env/SERVER_MAIN

@ -1 +0,0 @@
/path/to/repos/electrumx/server_main.py

2
samples/systemd-unit

@ -4,7 +4,7 @@ After=network.target
[Service] [Service]
EnvironmentFile=/etc/electrumx.conf EnvironmentFile=/etc/electrumx.conf
ExecStart=/home/electrumx/electrumx/server_main.py ExecStart=/usr/local/bin/electrumx_server.py
User=electrumx User=electrumx
[Install] [Install]

55
server/block_processor.py

@ -81,7 +81,7 @@ class Prefetcher(LoggedClass):
async def start(self): async def start(self):
'''Loop forever polling for more blocks.''' '''Loop forever polling for more blocks.'''
self.logger.info('starting prefetch loop...') self.logger.info('starting daemon poll loop...')
while True: while True:
try: try:
if await self._caught_up(): if await self._caught_up():
@ -89,7 +89,7 @@ class Prefetcher(LoggedClass):
else: else:
await asyncio.sleep(0) await asyncio.sleep(0)
except DaemonError as e: except DaemonError as e:
self.logger.info('ignoring daemon errors: {}'.format(e)) self.logger.info('ignoring daemon error: {}'.format(e))
async def _caught_up(self): async def _caught_up(self):
'''Poll for new blocks and mempool state. '''Poll for new blocks and mempool state.
@ -167,6 +167,7 @@ class MemPool(LoggedClass):
self.txs = {} self.txs = {}
self.hash168s = defaultdict(set) # None can be a key self.hash168s = defaultdict(set) # None can be a key
self.bp = bp self.bp = bp
self.count = 0
async def update(self, hex_hashes): async def update(self, hex_hashes):
'''Update state given the current mempool to the passed set of hashes. '''Update state given the current mempool to the passed set of hashes.
@ -177,7 +178,7 @@ class MemPool(LoggedClass):
hex_hashes = set(hex_hashes) hex_hashes = set(hex_hashes)
touched = set() touched = set()
if not self.txs: if self.count == 0:
self.logger.info('initial fetch of {:,d} daemon mempool txs' self.logger.info('initial fetch of {:,d} daemon mempool txs'
.format(len(hex_hashes))) .format(len(hex_hashes)))
@ -190,9 +191,6 @@ class MemPool(LoggedClass):
for hash168 in hash168s: for hash168 in hash168s:
self.hash168s[hash168].remove(hex_hash) self.hash168s[hash168].remove(hex_hash)
touched.update(hash168s) touched.update(hash168s)
if gone:
self.logger.info('{:,d} entries removed from mempool'
.format(len(gone)))
# Get the raw transactions for the new hashes. Ignore the # Get the raw transactions for the new hashes. Ignore the
# ones the daemon no longer has (it will return None). Put # ones the daemon no longer has (it will return None). Put
@ -251,8 +249,10 @@ class MemPool(LoggedClass):
self.hash168s[hash168].add(hex_hash) self.hash168s[hash168].add(hex_hash)
touched.add(hash168) touched.add(hash168)
self.logger.info('{:,d} entries in mempool for {:,d} addresses' if self.count % 20 == 0:
.format(len(self.txs), len(self.hash168s))) self.logger.info('{:,d} entries in mempool for {:,d} addresses'
.format(len(self.txs), len(self.hash168s)))
self.count += 1
# Might include a None # Might include a None
return touched return touched
@ -261,7 +261,7 @@ class MemPool(LoggedClass):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
entries for the hash168. entries for the hash168.
unconfirmed is True if any txin is confirmed. unconfirmed is True if any txin is unconfirmed.
''' '''
for hex_hash in self.hash168s[hash168]: for hex_hash in self.hash168s[hash168]:
txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash] txin_pairs, txout_pairs, unconfirmed = self.txs[hex_hash]
@ -322,6 +322,8 @@ class BlockProcessor(LoggedClass):
self.height = self.db_height self.height = self.db_height
self.tip = self.db_tip self.tip = self.db_tip
self.daemon.debug_set_height(self.height)
# Caches to be flushed later. Headers and tx_hashes have one # Caches to be flushed later. Headers and tx_hashes have one
# entry per block # entry per block
self.history = defaultdict(partial(array.array, 'I')) self.history = defaultdict(partial(array.array, 'I'))
@ -354,11 +356,8 @@ class BlockProcessor(LoggedClass):
self.clean_db() self.clean_db()
def coros(self, force_backup=False): def coros(self):
if force_backup: return [self.start(), self.prefetcher.start()]
return [self.force_chain_reorg(True), self.prefetcher.start()]
else:
return [self.start(), self.prefetcher.start()]
async def start(self): async def start(self):
'''External entry point for block processing. '''External entry point for block processing.
@ -402,19 +401,13 @@ class BlockProcessor(LoggedClass):
await self.on_update(self.height, self.touched) await self.on_update(self.height, self.touched)
self.touched = set() self.touched = set()
async def force_chain_reorg(self, to_genesis): async def handle_chain_reorg(self):
try:
await self.handle_chain_reorg(to_genesis)
finally:
self.flush(True)
async def handle_chain_reorg(self, to_genesis=False):
# First get all state on disk # First get all state on disk
self.logger.info('chain reorg detected') self.logger.info('chain reorg detected')
self.flush(True) self.flush(True)
self.logger.info('finding common height...') self.logger.info('finding common height...')
hashes = await self.reorg_hashes(to_genesis) hashes = await self.reorg_hashes()
# Reverse and convert to hex strings. # Reverse and convert to hex strings.
hashes = [hash_to_str(hash) for hash in reversed(hashes)] hashes = [hash_to_str(hash) for hash in reversed(hashes)]
for hex_hashes in chunks(hashes, 50): for hex_hashes in chunks(hashes, 50):
@ -425,7 +418,7 @@ class BlockProcessor(LoggedClass):
await self.prefetcher.clear(self.height) await self.prefetcher.clear(self.height)
self.logger.info('prefetcher reset') self.logger.info('prefetcher reset')
async def reorg_hashes(self, to_genesis): async def reorg_hashes(self):
'''Return the list of hashes to back up beacuse of a reorg. '''Return the list of hashes to back up beacuse of a reorg.
The hashes are returned in order of increasing height.''' The hashes are returned in order of increasing height.'''
@ -443,7 +436,7 @@ class BlockProcessor(LoggedClass):
hex_hashes = [hash_to_str(hash) for hash in hashes] hex_hashes = [hash_to_str(hash) for hash in hashes]
d_hex_hashes = await self.daemon.block_hex_hashes(start, count) d_hex_hashes = await self.daemon.block_hex_hashes(start, count)
n = match_pos(hex_hashes, d_hex_hashes) n = match_pos(hex_hashes, d_hex_hashes)
if n >= 0 and not to_genesis: if n >= 0:
start += n + 1 start += n + 1
break break
count = min(count * 2, start) count = min(count * 2, start)
@ -609,8 +602,8 @@ class BlockProcessor(LoggedClass):
# Catch-up stats # Catch-up stats
if show_stats: if show_stats:
daemon_height = self.daemon.cached_height() daemon_height = self.daemon.cached_height()
txs_per_sec = int(self.tx_count / self.wall_time) tx_per_sec = int(self.tx_count / self.wall_time)
this_txs_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush)) this_tx_per_sec = 1 + int(tx_diff / (self.last_flush - last_flush))
if self.height > self.coin.TX_COUNT_HEIGHT: if self.height > self.coin.TX_COUNT_HEIGHT:
tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK tx_est = (daemon_height - self.height) * self.coin.TX_PER_BLOCK
else: else:
@ -618,12 +611,16 @@ class BlockProcessor(LoggedClass):
* self.coin.TX_PER_BLOCK * self.coin.TX_PER_BLOCK
+ (self.coin.TX_COUNT - self.tx_count)) + (self.coin.TX_COUNT - self.tx_count))
# Damp the enthusiasm
realism = 2.0 - 0.9 * self.height / self.coin.TX_COUNT_HEIGHT
tx_est *= max(realism, 1.0)
self.logger.info('tx/sec since genesis: {:,d}, ' self.logger.info('tx/sec since genesis: {:,d}, '
'since last flush: {:,d}' 'since last flush: {:,d}'
.format(txs_per_sec, this_txs_per_sec)) .format(tx_per_sec, this_tx_per_sec))
self.logger.info('sync time: {} ETA: {}' self.logger.info('sync time: {} ETA: {}'
.format(formatted_time(self.wall_time), .format(formatted_time(self.wall_time),
formatted_time(tx_est / this_txs_per_sec))) formatted_time(tx_est / this_tx_per_sec)))
def flush_history(self, batch): def flush_history(self, batch):
self.logger.info('flushing history') self.logger.info('flushing history')
@ -854,7 +851,7 @@ class BlockProcessor(LoggedClass):
'''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool '''Generate (hex_hash, tx_fee, unconfirmed) tuples for mempool
entries for the hash168. entries for the hash168.
unconfirmed is True if any txin is confirmed. unconfirmed is True if any txin is unconfirmed.
''' '''
return self.mempool.transactions(hash168) return self.mempool.transactions(hash168)

2
server/controller.py

@ -34,7 +34,7 @@ class Controller(LoggedClass):
self.loop = loop self.loop = loop
self.env = env self.env = env
self.coin = env.coin self.coin = env.coin
self.daemon = Daemon(env.daemon_url) self.daemon = Daemon(env.daemon_url, env.debug)
self.block_processor = BlockProcessor(env, self.daemon, self.block_processor = BlockProcessor(env, self.daemon,
on_update=self.on_update) on_update=self.on_update)
JSONRPC.init(self.block_processor, self.daemon, self.coin, JSONRPC.init(self.block_processor, self.daemon, self.coin,

152
server/daemon.py

@ -17,80 +17,125 @@ import lib.util as util
class DaemonError(Exception): class DaemonError(Exception):
'''Raised when the daemon returns an error in its results that '''Raised when the daemon returns an error in its results.'''
cannot be remedied by retrying.'''
class DaemonWarmingUpError(DaemonError):
'''Raised when the daemon returns an error in its results.'''
class Daemon(util.LoggedClass): class Daemon(util.LoggedClass):
'''Handles connections to a daemon at the given URL.''' '''Handles connections to a daemon at the given URL.'''
def __init__(self, url): WARMING_UP = -28
def __init__(self, url, debug):
super().__init__() super().__init__()
self.url = url self.url = url
self._height = None self._height = None
self.logger.info('connecting to daemon at URL {}'.format(url)) self.logger.info('connecting to daemon at URL {}'.format(url))
self.debug_caught_up = 'caught_up' in debug
def debug_set_height(self, height):
if self.debug_caught_up:
self.logger.info('pretending to have caught up to height {}'
.format(height))
self._height = height
async def post(self, data):
'''Send data to the daemon and handle the response.'''
async with aiohttp.post(self.url, data=data) as resp:
result = await resp.json()
if isinstance(result, list):
errs = tuple(item['error'] for item in result)
if not any(errs):
return tuple(item['result'] for item in result)
if any(err.get('code') == self.WARMING_UP for err in errs if err):
raise DaemonWarmingUpError
raise DaemonError(errs)
else:
err = result['error']
if not err:
return result['result']
if err.get('code') == self.WARMING_UP:
raise DaemonWarmingUpError
raise DaemonError(err)
async def send(self, payload):
'''Send a payload to be converted to JSON.
Handles temporary connection issues. Daemon reponse errors
are raise through DaemonError.
'''
data = json.dumps(payload)
secs = 1
prior_msg = None
while True:
try:
result = await self.post(data)
if prior_msg:
self.logger.info('connection successfully restored')
return result
except asyncio.TimeoutError:
msg = 'timeout error'
except aiohttp.ClientHttpProcessingError:
msg = 'HTTP error'
except aiohttp.ServerDisconnectedError:
msg = 'daemon disconnected'
except aiohttp.ClientConnectionError:
msg = 'connection problem - is your daemon running?'
except DaemonWarmingUpError:
msg = 'daemon is still warming up'
if msg != prior_msg or count == 10:
self.logger.error('{}. Retrying between sleeps...'
.format(msg))
prior_msg = msg
count = 0
await asyncio.sleep(secs)
count += 1
secs = min(16, secs * 2)
async def send_single(self, method, params=None): async def send_single(self, method, params=None):
'''Send a single request to the daemon.'''
payload = {'method': method} payload = {'method': method}
if params: if params:
payload['params'] = params payload['params'] = params
result, = await self.send((payload, )) return await self.send(payload)
return result
async def send_many(self, mp_pairs): async def send_many(self, mp_iterable):
if mp_pairs: '''Send several requests at once.
payload = [{'method': method, 'params': params}
for method, params in mp_pairs]
return await self.send(payload)
return []
async def send_vector(self, method, params_list): The results are returned as a tuple.'''
if params_list: payload = tuple({'method': m, 'params': p} for m, p in mp_iterable)
payload = [{'method': method, 'params': params} if payload:
for params in params_list]
return await self.send(payload) return await self.send(payload)
return [] return ()
async def send(self, payload): async def send_vector(self, method, params_iterable):
assert isinstance(payload, (tuple, list)) '''Send several requests of the same method.
data = json.dumps(payload)
while True: The results are returned as a tuple.'''
try: return await self.send_many((method, params)
async with aiohttp.post(self.url, data=data) as resp: for params in params_iterable)
result = await resp.json()
except asyncio.CancelledError:
raise
except Exception as e:
msg = 'aiohttp error: {}'.format(e)
secs = 3
else:
errs = tuple(item['error'] for item in result)
if not any(errs):
return tuple(item['result'] for item in result)
if any(err.get('code') == -28 for err in errs):
msg = 'daemon still warming up.'
secs = 30
else:
raise DaemonError(errs)
self.logger.error('{}. Sleeping {:d}s and trying again...'
.format(msg, secs))
await asyncio.sleep(secs)
async def block_hex_hashes(self, first, count): async def block_hex_hashes(self, first, count):
'''Return the hex hashes of count block starting at height first.''' '''Return the hex hashes of count block starting at height first.'''
param_lists = [[height] for height in range(first, first + count)] params_iterable = ((h, ) for h in range(first, first + count))
return await self.send_vector('getblockhash', param_lists) return await self.send_vector('getblockhash', params_iterable)
async def raw_blocks(self, hex_hashes): async def raw_blocks(self, hex_hashes):
'''Return the raw binary blocks with the given hex hashes.''' '''Return the raw binary blocks with the given hex hashes.'''
param_lists = [(h, False) for h in hex_hashes] params_iterable = ((h, False) for h in hex_hashes)
blocks = await self.send_vector('getblock', param_lists) blocks = await self.send_vector('getblock', params_iterable)
# Convert hex string to bytes # Convert hex string to bytes
return [bytes.fromhex(block) for block in blocks] return tuple(bytes.fromhex(block) for block in blocks)
async def mempool_hashes(self): async def mempool_hashes(self):
'''Return the hashes of the txs in the daemon's mempool.''' '''Return the hashes of the txs in the daemon's mempool.'''
if self.debug_caught_up:
return []
return await self.send_single('getrawmempool') return await self.send_single('getrawmempool')
async def estimatefee(self, params): async def estimatefee(self, params):
@ -111,14 +156,10 @@ class Daemon(util.LoggedClass):
'''Return the serialized raw transactions with the given hashes. '''Return the serialized raw transactions with the given hashes.
Breaks large requests up. Yields after each sub request.''' Breaks large requests up. Yields after each sub request.'''
param_lists = tuple((hex_hash, 0) for hex_hash in hex_hashes) params_iterable = ((hex_hash, 0) for hex_hash in hex_hashes)
raw_txs = [] txs = await self.send_vector('getrawtransaction', params_iterable)
for chunk in util.chunks(param_lists, 10000): # Convert hex strings to bytes
txs = await self.send_vector('getrawtransaction', chunk) return tuple(bytes.fromhex(tx) for tx in txs)
# Convert hex strings to bytes
raw_txs.append(tuple(bytes.fromhex(tx) for tx in txs))
await asyncio.sleep(0)
return sum(raw_txs, ())
async def sendrawtransaction(self, params): async def sendrawtransaction(self, params):
'''Broadcast a transaction to the network.''' '''Broadcast a transaction to the network.'''
@ -126,7 +167,8 @@ class Daemon(util.LoggedClass):
async def height(self): async def height(self):
'''Query the daemon for its current height.''' '''Query the daemon for its current height.'''
self._height = await self.send_single('getblockcount') if not self.debug_caught_up:
self._height = await self.send_single('getblockcount')
return self._height return self._height
def cached_height(self): def cached_height(self):

2
server/env.py

@ -43,6 +43,8 @@ class Env(LoggedClass):
# The electrum client takes the empty string as unspecified # The electrum client takes the empty string as unspecified
self.donation_address = self.default('DONATION_ADDRESS', '') self.donation_address = self.default('DONATION_ADDRESS', '')
self.db_engine = self.default('DB_ENGINE', 'leveldb') self.db_engine = self.default('DB_ENGINE', 'leveldb')
self.debug = self.default('DEBUG', '')
self.debug = [item.lower() for item in self.debug.split()]
def default(self, envvar, default): def default(self, envvar, default):
return environ.get(envvar, default) return environ.get(envvar, default)

5
server/protocol.py

@ -77,7 +77,7 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
try: try:
message = json.loads(message.decode()) message = json.loads(message.decode())
except Exception as e: except Exception as e:
self.logger.info('error decoding JSON message'.format(e)) self.logger.info('error decoding JSON message: {}'.format(e))
else: else:
self.ADD_JOB(self.request_handler(message)) self.ADD_JOB(self.request_handler(message))
@ -404,8 +404,7 @@ class ElectrumX(JSONRPC):
self.logger.info('sent tx: {}'.format(tx_hash)) self.logger.info('sent tx: {}'.format(tx_hash))
return tx_hash return tx_hash
except DaemonError as e: except DaemonError as e:
errors = e.args[0] error = e.args[0]
error = errors[0]
message = error['message'] message = error['message']
self.logger.info('sendrawtransaction: {}'.format(message)) self.logger.info('sendrawtransaction: {}'.format(message))
if 'non-mandatory-script-verify-flag' in message: if 'non-mandatory-script-verify-flag' in message:

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.07" VERSION = "ElectrumX 0.1"

24
setup.py

@ -0,0 +1,24 @@
import setuptools
from server.version import VERSION
setuptools.setup(
name='electrumx',
version=VERSION.split()[-1],
scripts=['electrumx_server.py', 'electrumx_rpc.py'],
python_requires='>=3.5',
install_requires=['plyvel', 'aiohttp >= 1'],
packages=setuptools.find_packages(),
description='ElectrumX Server',
author='Neil Booth',
author_email='kyuupichan@gmail.com',
license='MIT Licence',
url='https://github.com/kyuupichan/electrumx/',
long_description='Server implementation for the Electrum wallet',
classifiers=[
'Development Status :: 3 - Alpha',
'Topic :: Internet',
'License :: OSI Approved :: MIT License',
'Operating System :: Unix',
],
)
Loading…
Cancel
Save