Browse Source

Merge branch 'rpc' into develop

master
Neil Booth 8 years ago
parent
commit
acfa037f6f
  1. 157
      server/server.py
  2. 2
      server_main.py

157
server/server.py

@ -19,27 +19,37 @@ class Server(object):
def __init__(self, env): def __init__(self, env):
self.env = env self.env = env
self.db = DB(env) self.db = DB(env)
self.rpc = RPC(env) self.block_cache = BlockCache(env, self.db)
self.block_cache = BlockCache(env, self.db, self.rpc) self.tasks = [
def async_tasks(self):
return [
asyncio.ensure_future(self.block_cache.catch_up()), asyncio.ensure_future(self.block_cache.catch_up()),
asyncio.ensure_future(self.block_cache.process_cache()), asyncio.ensure_future(self.block_cache.process_cache()),
] ]
loop = asyncio.get_event_loop()
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame),
partial(self.on_signal, signame))
def on_signal(self, signame):
logging.warning('received {} signal, preparing to shut down'
.format(signame))
for task in self.tasks:
task.cancel()
def async_tasks(self):
return self.tasks
class BlockCache(object): class BlockCache(object):
'''Requests blocks ahead of time from the daemon. Serves them '''Requests blocks ahead of time from the daemon. Serves them
to the blockchain processor.''' to the blockchain processor.'''
def __init__(self, env, db, rpc): def __init__(self, env, db):
self.logger = logging.getLogger('BlockCache') self.logger = logging.getLogger('BlockCache')
self.logger.setLevel(logging.INFO) self.logger.setLevel(logging.INFO)
self.db = db self.db = db
self.rpc = rpc self.rpc_url = env.rpc_url
self.stop = False
# Cache target size is in MB. Has little effect on sync time. # Cache target size is in MB. Has little effect on sync time.
self.cache_limit = 10 self.cache_limit = 10
self.daemon_height = 0 self.daemon_height = 0
@ -49,37 +59,26 @@ class BlockCache(object):
self.recent_sizes = [] self.recent_sizes = []
self.ave_size = 0 self.ave_size = 0
loop = asyncio.get_event_loop() self.logger.info('using RPC URL {}'.format(self.rpc_url))
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame),
partial(self.on_signal, signame))
def on_signal(self, signame):
logging.warning('Received {} signal, preparing to shut down'
.format(signame))
self.blocks = []
self.stop = True
async def process_cache(self): async def process_cache(self):
while not self.stop: while True:
await asyncio.sleep(1) await asyncio.sleep(1)
while self.blocks: while self.blocks:
self.db.process_block(self.blocks.pop(), self.daemon_height) self.db.process_block(self.blocks.pop(), self.daemon_height)
# Release asynchronous block fetching # Release asynchronous block fetching
await asyncio.sleep(0) await asyncio.sleep(0)
self.db.flush_all(self.daemon_height)
async def catch_up(self): async def catch_up(self):
self.logger.info('catching up, block cache limit {:d}MB...' self.logger.info('catching up, block cache limit {:d}MB...'
.format(self.cache_limit)) .format(self.cache_limit))
while await self.maybe_prefill(): try:
await asyncio.sleep(1) while await self.maybe_prefill():
await asyncio.sleep(1)
if not self.stop:
self.logger.info('caught up to height {:d}' self.logger.info('caught up to height {:d}'
.format(self.daemon_height)) .format(self.daemon_height))
finally:
self.db.flush_all(self.daemon_height) self.db.flush_all(self.daemon_height)
def cache_used(self): def cache_used(self):
@ -96,35 +95,26 @@ class BlockCache(object):
processing.''' processing.'''
cache_limit = self.cache_limit * 1024 * 1024 cache_limit = self.cache_limit * 1024 * 1024
while True: while True:
if self.stop:
return False
cache_used = self.cache_used() cache_used = self.cache_used()
if cache_used > cache_limit: if cache_used > cache_limit:
return True return True
# Keep going by getting a whole new cache_limit of blocks # Keep going by getting a whole new cache_limit of blocks
self.daemon_height = await self.rpc.rpc_single('getblockcount') self.daemon_height = await self.send_single('getblockcount')
max_count = min(self.daemon_height - self.fetched_height, 4000) max_count = min(self.daemon_height - self.fetched_height, 4000)
count = min(max_count, self.prefill_count(cache_limit)) count = min(max_count, self.prefill_count(cache_limit))
if not count or self.stop: if not count:
return False # Done catching up return False # Done catching up
first = self.fetched_height + 1 first = self.fetched_height + 1
param_lists = [[height] for height in range(first, first + count)] param_lists = [[height] for height in range(first, first + count)]
hashes = await self.rpc.rpc_multi('getblockhash', param_lists) hashes = await self.send_vector('getblockhash', param_lists)
if self.stop:
return False
# Hashes is an array of hex strings # Hashes is an array of hex strings
param_lists = [(h, False) for h in hashes] param_lists = [(h, False) for h in hashes]
blocks = await self.rpc.rpc_multi('getblock', param_lists) blocks = await self.send_vector('getblock', param_lists)
self.fetched_height += count self.fetched_height += count
if self.stop:
return False
# Convert hex string to bytes and put in memoryview # Convert hex string to bytes and put in memoryview
blocks = [bytes.fromhex(block) for block in blocks] blocks = [bytes.fromhex(block) for block in blocks]
# Reverse order and place at front of list # Reverse order and place at front of list
@ -138,64 +128,47 @@ class BlockCache(object):
self.recent_sizes = self.recent_sizes[excess:] self.recent_sizes = self.recent_sizes[excess:]
self.ave_size = sum(self.recent_sizes) // len(self.recent_sizes) self.ave_size = sum(self.recent_sizes) // len(self.recent_sizes)
async def send_single(self, method, params=None):
class RPC(object):
def __init__(self, env):
self.logger = logging.getLogger('RPC')
self.logger.setLevel(logging.INFO)
self.rpc_url = env.rpc_url
self.logger.info('using RPC URL {}'.format(self.rpc_url))
async def rpc_multi(self, method, param_lists):
payload = [{'method': method, 'params': param_list}
for param_list in param_lists]
while True:
dresults = await self.daemon(payload)
errs = [dresult['error'] for dresult in dresults]
if not any(errs):
return [dresult['result'] for dresult in dresults]
for err in errs:
if err.get('code') == -28:
self.logger.warning('daemon still warming up...')
secs = 10
break
else:
self.logger.error('daemon returned errors: {}'.format(errs))
secs = 0
self.logger.info('sleeping {:d} seconds and trying again...'
.format(secs))
await asyncio.sleep(secs)
async def rpc_single(self, method, params=None):
payload = {'method': method} payload = {'method': method}
if params: if params:
payload['params'] = params payload['params'] = params
while True: result, = await self.send((payload, ))
dresult = await self.daemon(payload) return result
err = dresult['error']
if not err: async def send_many(self, mp_pairs):
return dresult['result'] payload = [{'method': method, 'params': params}
if err.get('code') == -28: for method, params in mp_pairs]
self.logger.warning('daemon still warming up...') return await self.send(payload)
secs = 10
else: async def send_vector(self, method, params_list):
self.logger.error('daemon returned error: {}'.format(err)) payload = [{'method': method, 'params': params}
secs = 0 for params in params_list]
self.logger.info('sleeping {:d} seconds and trying again...' return await self.send(payload)
.format(secs))
await asyncio.sleep(secs) async def send(self, payload):
assert isinstance(payload, (tuple, list))
async def daemon(self, payload): data = json.dumps(payload)
while True: while True:
try: try:
async with aiohttp.ClientSession() as session: async with aiohttp.request('POST', self.rpc_url,
async with session.post(self.rpc_url, data = data) as resp:
data=json.dumps(payload)) as resp: result = await resp.json()
return await resp.json() except asyncio.CancelledError:
raise
except Exception as e: except Exception as e:
self.logger.error('aiohttp error: {}'.format(e)) msg = 'aiohttp error: {}'.format(e)
secs = 3
else:
errs = tuple(item['error'] for item in result)
if not any(errs):
return tuple(item['result'] for item in result)
if any(err.get('code') == -28 for err in errs):
msg = 'daemon still warming up...'
secs = 10
else:
msg = 'daemon errors: {}'.format(errs)
secs = 1
self.logger.info('sleeping 1 second and trying again...') self.logger.error('{}. Sleeping {:d}s and trying again...'
await asyncio.sleep(1) .format(msg, secs))
await asyncio.sleep(secs)

2
server_main.py

@ -28,6 +28,8 @@ def main_loop():
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
try: try:
loop.run_until_complete(asyncio.gather(*tasks)) loop.run_until_complete(asyncio.gather(*tasks))
except asyncio.CancelledError:
logging.warning('task cancelled; asyncio event loop closing')
finally: finally:
loop.close() loop.close()

Loading…
Cancel
Save