Browse Source

Cache headers.

master
Neil Booth 8 years ago
parent
commit
cbb1e504cc
  1. 94
      lib/jsonrpc.py
  2. 60
      server/protocol.py

94
lib/jsonrpc.py

@ -15,6 +15,53 @@ import time
from lib.util import LoggedClass from lib.util import LoggedClass
class SingleRequest(object):
'''An object that represents a single request.'''
def __init__(self, session, payload):
self.payload = payload
self.session = session
async def process(self):
'''Asynchronously handle the JSON request.'''
binary = await self.session.process_single_payload(self.payload)
if binary:
self.session._send_bytes(binary)
class BatchRequest(object):
'''An object that represents a batch request and its processing state.
Batches are processed in parts chunks.
'''
CUHNK_SIZE = 3
def __init__(self, session, payload):
self.session = session
self.payload = payload
self.done = 0
self.parts = []
async def process(self):
'''Asynchronously handle the JSON batch according to the JSON 2.0
spec.'''
for n in range(self.CHUNK_SIZE):
if self.done >= len(self.payload):
if self.parts:
binary = b'[' + b', '.join(self.parts) + b']'
self.session._send_bytes(binary)
return
item = self.payload[self.done]
part = await self.session.process_single_payload(item)
if part:
self.parts.append(part)
self.done += 1
# Re-enqueue to continue the rest later
self.session.enqueue_request(self)
return b''
class JSONRPC(asyncio.Protocol, LoggedClass): class JSONRPC(asyncio.Protocol, LoggedClass):
'''Manages a JSONRPC connection. '''Manages a JSONRPC connection.
@ -53,48 +100,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
self.msg = msg self.msg = msg
self.code = code self.code = code
class SingleRequest(object):
'''An object that represents a single request.'''
def __init__(self, session, payload):
self.payload = payload
self.session = session
async def process(self):
'''Asynchronously handle the JSON request.'''
binary = await self.session.process_single_payload(self.payload)
if binary:
self.session._send_bytes(binary)
class BatchRequest(object):
'''An object that represents a batch request and its processing
state.'''
def __init__(self, session, payload):
self.session = session
self.payload = payload
self.done = 0
self.parts = []
async def process(self):
'''Asynchronously handle the JSON batch according to the JSON 2.0
spec.'''
if not self.payload:
raise JSONRPC.RPCError('empty batch', self.INVALID_REQUEST)
for n in range(self.session.batch_limit):
if self.done >= len(self.payload):
if self.parts:
binary = b'[' + b', '.join(self.parts) + b']'
self.session._send_bytes(binary)
return
item = self.payload[self.done]
part = await self.session.process_single_payload(item)
if part:
self.parts.append(part)
self.done += 1
# Re-enqueue to continue the rest later
self.session.enqueue_request(self)
return b''
@classmethod @classmethod
def request_payload(cls, method, id_, params=None): def request_payload(cls, method, id_, params=None):
payload = {'jsonrpc': '2.0', 'id': id_, 'method': method} payload = {'jsonrpc': '2.0', 'id': id_, 'method': method}
@ -129,7 +134,6 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
self.bandwidth_interval = 3600 self.bandwidth_interval = 3600
self.bandwidth_used = 0 self.bandwidth_used = 0
self.bandwidth_limit = 5000000 self.bandwidth_limit = 5000000
self.batch_limit = 4
self.transport = None self.transport = None
# Parts of an incomplete JSON line. We buffer them until # Parts of an incomplete JSON line. We buffer them until
# getting a newline. # getting a newline.
@ -239,9 +243,9 @@ class JSONRPC(asyncio.Protocol, LoggedClass):
if not message: if not message:
self.send_json_error('empty batch', self.INVALID_REQUEST) self.send_json_error('empty batch', self.INVALID_REQUEST)
return return
request = self.BatchRequest(self, message) request = BatchRequest(self, message)
else: else:
request = self.SingleRequest(self, message) request = SingleRequest(self, message)
'''Queue the request for asynchronous handling.''' '''Queue the request for asynchronous handling.'''
self.enqueue_request(request) self.enqueue_request(request)

60
server/protocol.py

@ -218,8 +218,13 @@ class ServerManager(util.LoggedClass):
MgrTask = namedtuple('MgrTask', 'session task') MgrTask = namedtuple('MgrTask', 'session task')
class NotificationRequest(object):
def __init__(self, fn_call):
self.process = fn_call
def __init__(self, env): def __init__(self, env):
super().__init__() super().__init__()
self.start = time.time()
self.bp = BlockProcessor(self, env) self.bp = BlockProcessor(self, env)
self.mempool = MemPool(self.bp.daemon, env.coin, self.bp, self) self.mempool = MemPool(self.bp.daemon, env.coin, self.bp, self)
self.irc = IRC(env) self.irc = IRC(env)
@ -231,7 +236,9 @@ class ServerManager(util.LoggedClass):
self.max_subs = env.max_subs self.max_subs = env.max_subs
self.subscription_count = 0 self.subscription_count = 0
self.next_stale_check = 0 self.next_stale_check = 0
self.history_cache = pylru.lrucache(128) self.history_cache = pylru.lrucache(256)
self.header_cache = pylru.lrucache(8)
self.height = 0
self.futures = [] self.futures = []
env.max_send = max(350000, env.max_send) env.max_send = max(350000, env.max_send)
self.logger.info('session timeout: {:,d} seconds' self.logger.info('session timeout: {:,d} seconds'
@ -316,21 +323,19 @@ class ServerManager(util.LoggedClass):
sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile) sslc.load_cert_chain(env.ssl_certfile, keyfile=env.ssl_keyfile)
await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc) await self.start_server('SSL', env.host, env.ssl_port, ssl=sslc)
class NotificationRequest(object):
def __init__(self, fn_call):
self.process = fn_call
def notify(self, touched): def notify(self, touched):
'''Notify sessions about height changes and touched addresses.''' '''Notify sessions about height changes and touched addresses.'''
# Remove invalidated history cache # Invalidate caches
hc = self.history_cache hc = self.history_cache
for hash168 in set(hc).intersection(touched): for hash168 in set(hc).intersection(touched):
del hc[hash168] del hc[hash168]
cache = {} if self.bp.db_height != self.height:
self.height = self.bp.db_height
self.header_cache.clear()
for session in self.sessions: for session in self.sessions:
if isinstance(session, ElectrumX): if isinstance(session, ElectrumX):
fn_call = partial(session.notify, self.bp.db_height, touched, fn_call = partial(session.notify, self.bp.db_height, touched)
cache)
session.enqueue_request(self.NotificationRequest(fn_call)) session.enqueue_request(self.NotificationRequest(fn_call))
# Periodically log sessions # Periodically log sessions
if self.env.log_sessions and time.time() > self.next_log_sessions: if self.env.log_sessions and time.time() > self.next_log_sessions:
@ -340,6 +345,17 @@ class ServerManager(util.LoggedClass):
self.logger.info(json.dumps(self.server_summary())) self.logger.info(json.dumps(self.server_summary()))
self.next_log_sessions = time.time() + self.env.log_sessions self.next_log_sessions = time.time() + self.env.log_sessions
def electrum_header(self, height):
'''Return the binary header at the given height.'''
if not 0 <= height <= self.bp.db_height:
raise self.RPCError('height {:,d} out of range'.format(height))
if height in self.header_cache:
return self.header_cache[height]
header = self.bp.read_headers(height, 1)
header = self.env.coin.electrum_header(header, height)
self.header_cache[height] = header
return header
async def async_get_history(self, hash168): async def async_get_history(self, hash168):
if hash168 in self.history_cache: if hash168 in self.history_cache:
return self.history_cache[hash168] return self.history_cache[hash168]
@ -561,6 +577,7 @@ class Session(JSONRPC):
self.max_send = env.max_send self.max_send = env.max_send
self.bandwidth_limit = env.bandwidth_limit self.bandwidth_limit = env.bandwidth_limit
self.txs_sent = 0 self.txs_sent = 0
self.bucket = int(self.start - self.manager.start) // 60
def is_closing(self): def is_closing(self):
'''True if this session is closing.''' '''True if this session is closing.'''
@ -676,7 +693,7 @@ class ElectrumX(Session):
def sub_count(self): def sub_count(self):
return len(self.hash168s) return len(self.hash168s)
async def notify(self, height, touched, cache): async def notify(self, height, touched):
'''Notify the client about changes in height and touched addresses. '''Notify the client about changes in height and touched addresses.
Cache is a shared cache for this update. Cache is a shared cache for this update.
@ -684,13 +701,11 @@ class ElectrumX(Session):
if height != self.notified_height: if height != self.notified_height:
self.notified_height = height self.notified_height = height
if self.subscribe_headers: if self.subscribe_headers:
key = 'headers_payload' payload = self.notification_payload(
if key not in cache: 'blockchain.headers.subscribe',
cache[key] = self.notification_payload( (self.manager.electrum_header(height), ),
'blockchain.headers.subscribe', )
(self.electrum_header(height), ), self.encode_and_send_payload(payload)
)
self.encode_and_send_payload(cache[key])
if self.subscribe_height: if self.subscribe_height:
payload = self.notification_payload( payload = self.notification_payload(
@ -717,14 +732,7 @@ class ElectrumX(Session):
def current_electrum_header(self): def current_electrum_header(self):
'''Used as response to a headers subscription request.''' '''Used as response to a headers subscription request.'''
return self.electrum_header(self.height()) return self.manager.electrum_header(self.height())
def electrum_header(self, height):
'''Return the binary header at the given height.'''
if not 0 <= height <= self.height():
raise self.RPCError('height {:,d} out of range'.format(height))
header = self.bp.read_headers(height, 1)
return self.coin.electrum_header(header, height)
async def address_status(self, hash168): async def address_status(self, hash168):
'''Returns status as 32 bytes.''' '''Returns status as 32 bytes.'''
@ -848,7 +856,7 @@ class ElectrumX(Session):
async def block_get_header(self, params): async def block_get_header(self, params):
height = self.params_to_non_negative_integer(params) height = self.params_to_non_negative_integer(params)
return self.electrum_header(height) return self.manager.electrum_header(height)
async def estimatefee(self, params): async def estimatefee(self, params):
return await self.daemon_request('estimatefee', params) return await self.daemon_request('estimatefee', params)

Loading…
Cancel
Save