Browse Source

More refactoring of controller

- remove the header cache - not needed for higher protocol versions
- simplify session notification; move to session manager
- move history cache to session manager
patch-2
Neil Booth 7 years ago
parent
commit
b7572ce9c0
  1. 42
      electrumx/server/controller.py
  2. 5
      electrumx/server/mempool.py
  3. 42
      electrumx/server/session.py

42
electrumx/server/controller.py

@ -45,9 +45,6 @@ class Controller(ServerBase):
self.coin = env.coin self.coin = env.coin
self.tasks = TaskSet() self.tasks = TaskSet()
self.history_cache = pylru.lrucache(256)
self.header_cache = pylru.lrucache(8)
self.cache_height = 0
self.cache_mn_height = 0 self.cache_mn_height = 0
self.mn_cache = pylru.lrucache(256) self.mn_cache = pylru.lrucache(256)
env.max_send = max(350000, env.max_send) env.max_send = max(350000, env.max_send)
@ -127,48 +124,9 @@ class Controller(ServerBase):
self.create_task(self.session_mgr.start_serving()) self.create_task(self.session_mgr.start_serving())
self.create_task(self.session_mgr.housekeeping()) self.create_task(self.session_mgr.housekeeping())
def notify_sessions(self, touched):
'''Notify sessions about height changes and touched addresses.'''
# Invalidate caches
hc = self.history_cache
for hashX in set(hc).intersection(touched):
del hc[hashX]
height = self.bp.db_height
if height != self.cache_height:
self.cache_height = height
self.header_cache.clear()
self.session_mgr.notify(height, touched)
def raw_header(self, height): def raw_header(self, height):
'''Return the binary header at the given height.''' '''Return the binary header at the given height.'''
header, n = self.bp.read_headers(height, 1) header, n = self.bp.read_headers(height, 1)
if n != 1: if n != 1:
raise RPCError(BAD_REQUEST, f'height {height:,d} out of range') raise RPCError(BAD_REQUEST, f'height {height:,d} out of range')
return header return header
def electrum_header(self, height):
'''Return the deserialized header at the given height.'''
if height not in self.header_cache:
raw_header = self.raw_header(height)
self.header_cache[height] = self.coin.electrum_header(raw_header,
height)
return self.header_cache[height]
async def get_history(self, hashX):
'''Get history asynchronously to reduce latency.'''
if hashX in self.history_cache:
return self.history_cache[hashX]
def job():
# History DoS limit. Each element of history is about 99
# bytes when encoded as JSON. This limits resource usage
# on bloated history requests, and uses a smaller divisor
# so large requests are logged before refusing them.
limit = self.env.max_send // 97
return list(self.bp.get_history(hashX, limit=limit))
history = await self.run_in_executor(job)
self.history_cache[hashX] = history
return history

5
electrumx/server/mempool.py

@ -36,6 +36,7 @@ class MemPool(object):
self.logger = class_logger(__name__, self.__class__.__name__) self.logger = class_logger(__name__, self.__class__.__name__)
self.daemon = bp.daemon self.daemon = bp.daemon
self.controller = controller self.controller = controller
self.notify_sessions = controller.session_mgr.notify_sessions
self.coin = bp.coin self.coin = bp.coin
self.db = bp self.db = bp
self.touched = set() self.touched = set()
@ -104,7 +105,7 @@ class MemPool(object):
while True: while True:
# Avoid double notifications if processing a block # Avoid double notifications if processing a block
if self.touched and not self.processing_new_block(): if self.touched and not self.processing_new_block():
self.controller.notify_sessions(self.touched) self.notify_sessions(self.touched)
self.touched.clear() self.touched.clear()
# Log progress / state # Log progress / state
@ -192,7 +193,7 @@ class MemPool(object):
# Minor race condition here with mempool processor thread # Minor race condition here with mempool processor thread
touched.update(self.touched) touched.update(self.touched)
self.touched.clear() self.touched.clear()
self.controller.notify_sessions(touched) self.notify_sessions(touched)
def processing_new_block(self): def processing_new_block(self):
'''Return True if we're processing a new block.''' '''Return True if we're processing a new block.'''

42
electrumx/server/session.py

@ -18,6 +18,7 @@ import time
from collections import defaultdict from collections import defaultdict
from functools import partial from functools import partial
import pylru
from aiorpcx import ServerSession, JSONRPCAutoDetect, RPCError from aiorpcx import ServerSession, JSONRPCAutoDetect, RPCError
import electrumx import electrumx
@ -111,6 +112,7 @@ class SessionManager(object):
self.state = self.CATCHING_UP self.state = self.CATCHING_UP
self.txs_sent = 0 self.txs_sent = 0
self.start_time = time.time() self.start_time = time.time()
self.history_cache = pylru.lrucache(256)
# Cache some idea of room to avoid recounting on each subscription # Cache some idea of room to avoid recounting on each subscription
self.subs_room = 0 self.subs_room = 0
# Event triggered when electrumx is listening for incoming requests. # Event triggered when electrumx is listening for incoming requests.
@ -400,7 +402,14 @@ class SessionManager(object):
'''The number of connections that we've sent something to.''' '''The number of connections that we've sent something to.'''
return len(self.sessions) return len(self.sessions)
def notify(self, height, touched): def notify_sessions(self, touched):
'''Notify sessions about height changes and touched addresses.'''
height = self.controller.bp.db_height
# Invalidate caches
hc = self.history_cache
for hashX in set(hc).intersection(touched):
del hc[hashX]
# Height notifications are synchronous. Those sessions with # Height notifications are synchronous. Those sessions with
# touched addresses are scheduled for asynchronous completion # touched addresses are scheduled for asynchronous completion
create_task = self.controller.create_task create_task = self.controller.create_task
@ -411,6 +420,24 @@ class SessionManager(object):
if session_touched is not None: if session_touched is not None:
create_task(session.notify_async(session_touched)) create_task(session.notify_async(session_touched))
async def get_history(self, hashX):
'''Get history asynchronously to reduce latency.'''
if hashX in self.history_cache:
return self.history_cache[hashX]
controller = self.controller
def job():
# History DoS limit. Each element of history is about 99
# bytes when encoded as JSON. This limits resource usage
# on bloated history requests, and uses a smaller divisor
# so large requests are logged before refusing them.
limit = self.env.max_send // 97
return list(controller.bp.get_history(hashX, limit=limit))
history = await controller.run_in_executor(job)
self.history_cache[hashX] = history
return history
async def housekeeping(self): async def housekeeping(self):
'''Regular housekeeping checks.''' '''Regular housekeeping checks.'''
n = 0 n = 0
@ -672,12 +699,17 @@ class ElectrumX(SessionBase):
return value return value
raise RPCError(BAD_REQUEST, f'{value} should be a boolean value') raise RPCError(BAD_REQUEST, f'{value} should be a boolean value')
def electrum_header(self, height):
'''Return the deserialized header at the given height.'''
raw_header = self.controller.raw_header(height)
return self.coin.electrum_header(raw_header, height)
def subscribe_headers_result(self, height): def subscribe_headers_result(self, height):
'''The result of a header subscription for the given height.''' '''The result of a header subscription for the given height.'''
if self.subscribe_headers_raw: if self.subscribe_headers_raw:
raw_header = self.controller.raw_header(height) raw_header = self.controller.raw_header(height)
return {'hex': raw_header.hex(), 'height': height} return {'hex': raw_header.hex(), 'height': height}
return self.controller.electrum_header(height) return self.electrum_header(height)
def _headers_subscribe(self, raw): def _headers_subscribe(self, raw):
'''Subscribe to get headers of new blocks.''' '''Subscribe to get headers of new blocks.'''
@ -714,7 +746,7 @@ class ElectrumX(SessionBase):
''' '''
# Note history is ordered and mempool unordered in electrum-server # Note history is ordered and mempool unordered in electrum-server
# For mempool, height is -1 if unconfirmed txins, otherwise 0 # For mempool, height is -1 if unconfirmed txins, otherwise 0
history = await self.controller.get_history(hashX) history = await self.session_mgr.get_history(hashX)
mempool = await self.controller.mempool_transactions(hashX) mempool = await self.controller.mempool_transactions(hashX)
status = ''.join('{}:{:d}:'.format(hash_to_hex_str(tx_hash), height) status = ''.join('{}:{:d}:'.format(hash_to_hex_str(tx_hash), height)
@ -819,7 +851,7 @@ class ElectrumX(SessionBase):
async def confirmed_and_unconfirmed_history(self, hashX): async def confirmed_and_unconfirmed_history(self, hashX):
# Note history is ordered but unconfirmed is unordered in e-s # Note history is ordered but unconfirmed is unordered in e-s
history = await self.controller.get_history(hashX) history = await self.session_mgr.get_history(hashX)
conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height} conf = [{'tx_hash': hash_to_hex_str(tx_hash), 'height': height}
for tx_hash, height in history] for tx_hash, height in history]
return conf + await self.unconfirmed_history(hashX) return conf + await self.unconfirmed_history(hashX)
@ -915,7 +947,7 @@ class ElectrumX(SessionBase):
height: the header's height''' height: the header's height'''
height = non_negative_integer(height) height = non_negative_integer(height)
return self.controller.electrum_header(height) return self.electrum_header(height)
def is_tor(self): def is_tor(self):
'''Try to detect if the connection is to a tor hidden service we are '''Try to detect if the connection is to a tor hidden service we are

Loading…
Cancel
Save