Browse Source

Merge branch 'release-0.3.1'

master
Neil Booth 8 years ago
parent
commit
d89b1f56e0
  1. 6
      docs/RELEASE-NOTES
  2. 14
      server/block_processor.py
  3. 128
      server/daemon.py
  4. 31
      server/db.py
  5. 2
      server/version.py

6
docs/RELEASE-NOTES

@ -1,3 +1,9 @@
Version 0.3.1
-------------
- fixes issue #9
- save DB version in DB; warn on DB open if incompatible format
Version 0.3 Version 0.3
----------- -----------

14
server/block_processor.py

@ -241,7 +241,7 @@ class MemPool(LoggedClass):
await asyncio.sleep(0) await asyncio.sleep(0)
if initial and time.time() > next_log: if initial and time.time() > next_log:
next_log = time.time() + 10 next_log = time.time() + 20
self.logger.info('{:,d} done ({:d}%)' self.logger.info('{:,d} done ({:d}%)'
.format(n, int(n / len(new_txs) * 100))) .format(n, int(n / len(new_txs) * 100)))
@ -518,17 +518,7 @@ class BlockProcessor(server.db.DB):
self.wall_time += now - self.last_flush self.wall_time += now - self.last_flush
self.last_flush = now self.last_flush = now
self.last_flush_tx_count = self.tx_count self.last_flush_tx_count = self.tx_count
state = { self.write_state(batch)
'genesis': self.coin.GENESIS_HASH,
'height': self.db_height,
'tx_count': self.db_tx_count,
'tip': self.db_tip,
'flush_count': self.flush_count,
'utxo_flush_count': self.utxo_flush_count,
'wall_time': self.wall_time,
'first_sync': self.first_sync,
}
batch.put(b'state', repr(state).encode())
def assert_flushed(self): def assert_flushed(self):
'''Asserts state is fully flushed.''' '''Asserts state is fully flushed.'''

128
server/daemon.py

@ -45,94 +45,95 @@ class Daemon(util.LoggedClass):
.format(height)) .format(height))
self._height = height self._height = height
async def post(self, data): async def _send(self, payload, processor):
'''Send data to the daemon and handle the response.'''
async with self.workqueue_semaphore:
async with aiohttp.post(self.url, data=data) as resp:
result = await resp.json()
if isinstance(result, list):
errs = [item['error'] for item in result]
if not any(errs):
return [item['result'] for item in result]
if any(err.get('code') == self.WARMING_UP for err in errs if err):
raise DaemonWarmingUpError
raise DaemonError(errs)
else:
err = result['error']
if not err:
return result['result']
if err.get('code') == self.WARMING_UP:
raise DaemonWarmingUpError
raise DaemonError(err)
async def send(self, payload):
'''Send a payload to be converted to JSON. '''Send a payload to be converted to JSON.
Handles temporary connection issues. Daemon reponse errors Handles temporary connection issues. Daemon reponse errors
are raise through DaemonError. are raise through DaemonError.
''' '''
prior_msg = None
skip_count = None
def log_error(msg, skip_once=False):
if skip_once and skip_count is None:
skip_count = 1
if msg != prior_msg or skip_count == 0:
skip_count = 10
prior_msg = msg
self.logger.error('{}. Retrying between sleeps...'
.format(msg))
skip_count -= 1
data = json.dumps(payload) data = json.dumps(payload)
secs = 1 secs = 1
prior_msg = None
while True: while True:
try: try:
result = await self.post(data) async with self.workqueue_semaphore:
async with aiohttp.post(self.url, data=data) as resp:
result = processor(await resp.json())
if prior_msg: if prior_msg:
self.logger.info('connection successfully restored') self.logger.info('connection restored')
return result return result
except (asyncio.CancelledError, DaemonError):
raise
except asyncio.TimeoutError: except asyncio.TimeoutError:
msg = 'timeout error' log_error('timeout error', skip_once=True)
except aiohttp.ClientHttpProcessingError: except aiohttp.ClientHttpProcessingError:
msg = 'HTTP error' log_error('HTTP error', skip_once=True)
except aiohttp.ServerDisconnectedError: except aiohttp.ServerDisconnectedError:
msg = 'disconnected' log_error('disconnected', skip_once=True)
except aiohttp.ClientConnectionError: except aiohttp.ClientConnectionError:
msg = 'connection problem - is your daemon running?' log_error('connection problem - is your daemon running?')
except DaemonWarmingUpError: except DaemonWarmingUpError:
msg = 'still starting up checking blocks...' log_error('still starting up checking blocks...')
except (asyncio.CancelledError, DaemonError):
raise
except Exception as e: except Exception as e:
msg = ('request gave unexpected error: {}'.format(e)) log_error('request gave unexpected error: {}'.format(e))
if msg != prior_msg or count == 10:
self.logger.error('{}. Retrying between sleeps...'
.format(msg))
prior_msg = msg
count = 0
await asyncio.sleep(secs) await asyncio.sleep(secs)
count += 1
secs = min(16, secs * 2) secs = min(16, secs * 2)
async def send_single(self, method, params=None): async def _send_single(self, method, params=None):
'''Send a single request to the daemon.''' '''Send a single request to the daemon.'''
def processor(result):
err = result['error']
if not err:
return result['result']
if err.get('code') == self.WARMING_UP:
raise DaemonWarmingUpError
raise DaemonError(err)
payload = {'method': method} payload = {'method': method}
if params: if params:
payload['params'] = params payload['params'] = params
return await self.send(payload) return await self._send(payload, processor)
async def _send_vector(self, method, params_iterable, replace_errs=False):
'''Send several requests of the same method.
async def send_many(self, mp_iterable): The result will be an array of the same length as params_iterable.
'''Send several requests at once.''' If replace_errs is true, any item with an error is returned as None,
payload = [{'method': m, 'params': p} for m, p in mp_iterable] othewise an exception is raised.'''
def processor(result):
errs = [item['error'] for item in result if item['error']]
if not errs or replace_errs:
return [item['result'] for item in result]
if any(err.get('code') == self.WARMING_UP for err in errs):
raise DaemonWarmingUpError
raise DaemonError(errs)
payload = [{'method': method, 'params': p} for p in params_iterable]
if payload: if payload:
return await self.send(payload) return await self._send(payload, processor)
return [] return []
async def send_vector(self, method, params_iterable):
'''Send several requests of the same method.'''
return await self.send_many((method, params)
for params in params_iterable)
async def block_hex_hashes(self, first, count): async def block_hex_hashes(self, first, count):
'''Return the hex hashes of count block starting at height first.''' '''Return the hex hashes of count block starting at height first.'''
params_iterable = ((h, ) for h in range(first, first + count)) params_iterable = ((h, ) for h in range(first, first + count))
return await self.send_vector('getblockhash', params_iterable) return await self._send_vector('getblockhash', params_iterable)
async def raw_blocks(self, hex_hashes): async def raw_blocks(self, hex_hashes):
'''Return the raw binary blocks with the given hex hashes.''' '''Return the raw binary blocks with the given hex hashes.'''
params_iterable = ((h, False) for h in hex_hashes) params_iterable = ((h, False) for h in hex_hashes)
blocks = await self.send_vector('getblock', params_iterable) blocks = await self._send_vector('getblock', params_iterable)
# Convert hex string to bytes # Convert hex string to bytes
return [bytes.fromhex(block) for block in blocks] return [bytes.fromhex(block) for block in blocks]
@ -140,39 +141,40 @@ class Daemon(util.LoggedClass):
'''Return the hashes of the txs in the daemon's mempool.''' '''Return the hashes of the txs in the daemon's mempool.'''
if self.debug_caught_up: if self.debug_caught_up:
return [] return []
return await self.send_single('getrawmempool') return await self._send_single('getrawmempool')
async def estimatefee(self, params): async def estimatefee(self, params):
'''Return the fee estimate for the given parameters.''' '''Return the fee estimate for the given parameters.'''
return await self.send_single('estimatefee', params) return await self._send_single('estimatefee', params)
async def relayfee(self): async def relayfee(self):
'''The minimum fee a low-priority tx must pay in order to be accepted '''The minimum fee a low-priority tx must pay in order to be accepted
to the daemon's memory pool.''' to the daemon's memory pool.'''
net_info = await self.send_single('getnetworkinfo') net_info = await self._send_single('getnetworkinfo')
return net_info['relayfee'] return net_info['relayfee']
async def getrawtransaction(self, hex_hash): async def getrawtransaction(self, hex_hash):
'''Return the serialized raw transaction with the given hash.''' '''Return the serialized raw transaction with the given hash.'''
return await self.send_single('getrawtransaction', (hex_hash, 0)) return await self._send_single('getrawtransaction', (hex_hash, 0))
async def getrawtransactions(self, hex_hashes): async def getrawtransactions(self, hex_hashes, replace_errs=True):
'''Return the serialized raw transactions with the given hashes. '''Return the serialized raw transactions with the given hashes.
Breaks large requests up. Yields after each sub request.''' Replaces errors with None by default.'''
params_iterable = ((hex_hash, 0) for hex_hash in hex_hashes) params_iterable = ((hex_hash, 0) for hex_hash in hex_hashes)
txs = await self.send_vector('getrawtransaction', params_iterable) txs = await self._send_vector('getrawtransaction', params_iterable,
replace_errs=replace_errs)
# Convert hex strings to bytes # Convert hex strings to bytes
return [bytes.fromhex(tx) for tx in txs] return [bytes.fromhex(tx) if tx else None for tx in txs]
async def sendrawtransaction(self, params): async def sendrawtransaction(self, params):
'''Broadcast a transaction to the network.''' '''Broadcast a transaction to the network.'''
return await self.send_single('sendrawtransaction', params) return await self._send_single('sendrawtransaction', params)
async def height(self): async def height(self):
'''Query the daemon for its current height.''' '''Query the daemon for its current height.'''
if not self.debug_caught_up: if not self.debug_caught_up:
self._height = await self.send_single('getblockcount') self._height = await self._send_single('getblockcount')
return self._height return self._height
def cached_height(self): def cached_height(self):

31
server/db.py

@ -29,6 +29,8 @@ class DB(LoggedClass):
it was shutdown uncleanly. it was shutdown uncleanly.
''' '''
VERSIONS = [0]
class MissingUTXOError(Exception): class MissingUTXOError(Exception):
'''Raised if a mempool tx input UTXO couldn't be found.''' '''Raised if a mempool tx input UTXO couldn't be found.'''
@ -53,7 +55,7 @@ class DB(LoggedClass):
else: else:
self.logger.info('successfully opened {} database {}' self.logger.info('successfully opened {} database {}'
.format(env.db_engine, db_name)) .format(env.db_engine, db_name))
self.init_state_from_db() self.read_state()
create = self.db_height == -1 create = self.db_height == -1
self.headers_file = self.open_file('headers', create) self.headers_file = self.open_file('headers', create)
@ -70,7 +72,7 @@ class DB(LoggedClass):
else: else:
assert self.db_tx_count == 0 assert self.db_tx_count == 0
def init_state_from_db(self): def read_state(self):
if self.db.is_new: if self.db.is_new:
self.db_height = -1 self.db_height = -1
self.db_tx_count = 0 self.db_tx_count = 0
@ -81,7 +83,15 @@ class DB(LoggedClass):
self.first_sync = True self.first_sync = True
else: else:
state = self.db.get(b'state') state = self.db.get(b'state')
if state:
state = ast.literal_eval(state.decode()) state = ast.literal_eval(state.decode())
if not isinstance(state, dict):
raise self.DBError('failed reading state from DB')
db_version = state.get('db_version', 0)
if db_version not in self.VERSIONS:
raise self.DBError('your DB version is {} but this software '
'only handles versions {}'
.format(db_version, self.VERSIONS))
if state['genesis'] != self.coin.GENESIS_HASH: if state['genesis'] != self.coin.GENESIS_HASH:
raise self.DBError('DB genesis hash {} does not match coin {}' raise self.DBError('DB genesis hash {} does not match coin {}'
.format(state['genesis_hash'], .format(state['genesis_hash'],
@ -92,7 +102,22 @@ class DB(LoggedClass):
self.flush_count = state['flush_count'] self.flush_count = state['flush_count']
self.utxo_flush_count = state['utxo_flush_count'] self.utxo_flush_count = state['utxo_flush_count']
self.wall_time = state['wall_time'] self.wall_time = state['wall_time']
self.first_sync = state.get('first_sync', True) self.first_sync = state['first_sync']
def write_state(self, batch):
'''Write chain state to the batch.'''
state = {
'genesis': self.coin.GENESIS_HASH,
'height': self.db_height,
'tx_count': self.db_tx_count,
'tip': self.db_tip,
'flush_count': self.flush_count,
'utxo_flush_count': self.utxo_flush_count,
'wall_time': self.wall_time,
'first_sync': self.first_sync,
'db_version': max(self.VERSIONS),
}
batch.put(b'state', repr(state).encode())
def open_file(self, filename, create=False): def open_file(self, filename, create=False):
'''Open the file name. Return its handle.''' '''Open the file name. Return its handle.'''

2
server/version.py

@ -1 +1 @@
VERSION = "ElectrumX 0.3" VERSION = "ElectrumX 0.3.1"

Loading…
Cancel
Save