Browse Source

Merge branch 'daemon' into develop

master
Neil Booth 8 years ago
parent
commit
13bf8b8427
  1. 132
      server/daemon.py

132
server/daemon.py

@ -45,94 +45,95 @@ class Daemon(util.LoggedClass):
.format(height))
self._height = height
async def post(self, data):
'''Send data to the daemon and handle the response.'''
async with self.workqueue_semaphore:
async with aiohttp.post(self.url, data=data) as resp:
result = await resp.json()
if isinstance(result, list):
errs = [item['error'] for item in result]
if not any(errs):
return [item['result'] for item in result]
if any(err.get('code') == self.WARMING_UP for err in errs if err):
raise DaemonWarmingUpError
raise DaemonError(errs)
else:
err = result['error']
if not err:
return result['result']
if err.get('code') == self.WARMING_UP:
raise DaemonWarmingUpError
raise DaemonError(err)
async def send(self, payload):
async def _send(self, payload, processor):
'''Send a payload to be converted to JSON.
Handles temporary connection issues. Daemon reponse errors
are raise through DaemonError.
'''
prior_msg = None
skip_count = None
def log_error(msg, skip_once=False):
if skip_once and skip_count is None:
skip_count = 1
if msg != prior_msg or skip_count == 0:
skip_count = 10
prior_msg = msg
self.logger.error('{}. Retrying between sleeps...'
.format(msg))
skip_count -= 1
data = json.dumps(payload)
secs = 1
prior_msg = None
while True:
try:
result = await self.post(data)
if prior_msg:
self.logger.info('connection successfully restored')
return result
async with self.workqueue_semaphore:
async with aiohttp.post(self.url, data=data) as resp:
result = processor(await resp.json())
if prior_msg:
self.logger.info('connection restored')
return result
except (asyncio.CancelledError, DaemonError):
raise
except asyncio.TimeoutError:
msg = 'timeout error'
log_error('timeout error', skip_once=True)
except aiohttp.ClientHttpProcessingError:
msg = 'HTTP error'
log_error('HTTP error', skip_once=True)
except aiohttp.ServerDisconnectedError:
msg = 'disconnected'
log_error('disconnected', skip_once=True)
except aiohttp.ClientConnectionError:
msg = 'connection problem - is your daemon running?'
log_error('connection problem - is your daemon running?')
except DaemonWarmingUpError:
msg = 'still starting up checking blocks...'
except (asyncio.CancelledError, DaemonError):
raise
log_error('still starting up checking blocks...')
except Exception as e:
msg = ('request gave unexpected error: {}'.format(e))
if msg != prior_msg or count == 10:
self.logger.error('{}. Retrying between sleeps...'
.format(msg))
prior_msg = msg
count = 0
log_error('request gave unexpected error: {}'.format(e))
await asyncio.sleep(secs)
count += 1
secs = min(16, secs * 2)
async def send_single(self, method, params=None):
async def _send_single(self, method, params=None):
'''Send a single request to the daemon.'''
def processor(result):
err = result['error']
if not err:
return result['result']
if err.get('code') == self.WARMING_UP:
raise DaemonWarmingUpError
raise DaemonError(err)
payload = {'method': method}
if params:
payload['params'] = params
return await self.send(payload)
return await self._send(payload, processor)
async def send_many(self, mp_iterable):
'''Send several requests at once.'''
payload = [{'method': m, 'params': p} for m, p in mp_iterable]
async def _send_vector(self, method, params_iterable, replace_errs=False):
'''Send several requests of the same method.
The result will be an array of the same length as params_iterable.
If replace_errs is true, any item with an error is returned as None,
othewise an exception is raised.'''
def processor(result):
errs = [item['error'] for item in result if item['error']]
if not errs or replace_errs:
return [item['result'] for item in result]
if any(err.get('code') == self.WARMING_UP for err in errs):
raise DaemonWarmingUpError
raise DaemonError(errs)
payload = [{'method': method, 'params': p} for p in params_iterable]
if payload:
return await self.send(payload)
return await self._send(payload, processor)
return []
async def send_vector(self, method, params_iterable):
'''Send several requests of the same method.'''
return await self.send_many((method, params)
for params in params_iterable)
async def block_hex_hashes(self, first, count):
'''Return the hex hashes of count block starting at height first.'''
params_iterable = ((h, ) for h in range(first, first + count))
return await self.send_vector('getblockhash', params_iterable)
return await self._send_vector('getblockhash', params_iterable)
async def raw_blocks(self, hex_hashes):
'''Return the raw binary blocks with the given hex hashes.'''
params_iterable = ((h, False) for h in hex_hashes)
blocks = await self.send_vector('getblock', params_iterable)
blocks = await self._send_vector('getblock', params_iterable)
# Convert hex string to bytes
return [bytes.fromhex(block) for block in blocks]
@ -140,39 +141,40 @@ class Daemon(util.LoggedClass):
'''Return the hashes of the txs in the daemon's mempool.'''
if self.debug_caught_up:
return []
return await self.send_single('getrawmempool')
return await self._send_single('getrawmempool')
async def estimatefee(self, params):
'''Return the fee estimate for the given parameters.'''
return await self.send_single('estimatefee', params)
return await self._send_single('estimatefee', params)
async def relayfee(self):
'''The minimum fee a low-priority tx must pay in order to be accepted
to the daemon's memory pool.'''
net_info = await self.send_single('getnetworkinfo')
net_info = await self._send_single('getnetworkinfo')
return net_info['relayfee']
async def getrawtransaction(self, hex_hash):
'''Return the serialized raw transaction with the given hash.'''
return await self.send_single('getrawtransaction', (hex_hash, 0))
return await self._send_single('getrawtransaction', (hex_hash, 0))
async def getrawtransactions(self, hex_hashes):
async def getrawtransactions(self, hex_hashes, replace_errs=True):
'''Return the serialized raw transactions with the given hashes.
Breaks large requests up. Yields after each sub request.'''
Replaces errors with None by default.'''
params_iterable = ((hex_hash, 0) for hex_hash in hex_hashes)
txs = await self.send_vector('getrawtransaction', params_iterable)
txs = await self._send_vector('getrawtransaction', params_iterable,
replace_errs=replace_errs)
# Convert hex strings to bytes
return [bytes.fromhex(tx) for tx in txs]
return [bytes.fromhex(tx) if tx else None for tx in txs]
async def sendrawtransaction(self, params):
'''Broadcast a transaction to the network.'''
return await self.send_single('sendrawtransaction', params)
return await self._send_single('sendrawtransaction', params)
async def height(self):
'''Query the daemon for its current height.'''
if not self.debug_caught_up:
self._height = await self.send_single('getblockcount')
self._height = await self._send_single('getblockcount')
return self._height
def cached_height(self):

Loading…
Cancel
Save