|
|
@ -246,7 +246,13 @@ class ServerManager(LoggedClass): |
|
|
|
self.servers = [] |
|
|
|
self.irc = IRC(env) |
|
|
|
self.sessions = {} |
|
|
|
self.max_subs = env.max_subs |
|
|
|
self.subscription_count = 0 |
|
|
|
self.futures = [] # At present just the IRC future, if any |
|
|
|
self.logger.info('max subscriptions across all sessions: {:,d}' |
|
|
|
.format(self.max_subs)) |
|
|
|
self.logger.info('max subscriptions per session: {:,d}' |
|
|
|
.format(env.max_session_subs)) |
|
|
|
|
|
|
|
async def start_server(self, kind, *args, **kw_args): |
|
|
|
loop = asyncio.get_event_loop() |
|
|
@ -318,9 +324,17 @@ class ServerManager(LoggedClass): |
|
|
|
self.sessions[session] = asyncio.ensure_future(coro) |
|
|
|
|
|
|
|
def remove_session(self, session): |
|
|
|
if isinstance(session, ElectrumX): |
|
|
|
self.subscription_count -= len(session.hash168s) |
|
|
|
future = self.sessions.pop(session) |
|
|
|
future.cancel() |
|
|
|
|
|
|
|
def new_subscription(self): |
|
|
|
if self.subscription_count >= self.max_subs: |
|
|
|
raise JSONRPC.RPCError('server subscription limit {:,d} reached' |
|
|
|
.format(self.max_subs)) |
|
|
|
self.subscription_count += 1 |
|
|
|
|
|
|
|
def irc_peers(self): |
|
|
|
return self.irc.peers |
|
|
|
|
|
|
@ -330,18 +344,19 @@ class ServerManager(LoggedClass): |
|
|
|
total = len(self.sessions) |
|
|
|
return {'active': active, 'inert': total - active, 'total': total} |
|
|
|
|
|
|
|
def address_count(self): |
|
|
|
return sum(len(session.hash168s) for session in self.sessions |
|
|
|
if isinstance(session, ElectrumX)) |
|
|
|
|
|
|
|
async def rpc_getinfo(self, params): |
|
|
|
'''The RPC 'getinfo' call.''' |
|
|
|
# FIXME: remove later |
|
|
|
indep_count = sum(len(session.hash168s) for session in self.sessions |
|
|
|
if isinstance(session, ElectrumX)) |
|
|
|
if indep_count != self.subscription_count: |
|
|
|
self.logger.error('sub count {:,d} but session total {:,d}' |
|
|
|
.format(self.subscription_count, indep_count)) |
|
|
|
return { |
|
|
|
'blocks': self.bp.height, |
|
|
|
'peers': len(self.irc.peers), |
|
|
|
'sessions': self.session_count(), |
|
|
|
'watched': self.address_count(), |
|
|
|
'cached': 0, |
|
|
|
'watched': self.subscription_count, |
|
|
|
} |
|
|
|
|
|
|
|
async def rpc_sessions(self, params): |
|
|
@ -503,6 +518,7 @@ class ElectrumX(Session): |
|
|
|
self.subscribe_headers = False |
|
|
|
self.subscribe_height = False |
|
|
|
self.notified_height = None |
|
|
|
self.max_subs = self.env.max_session_subs |
|
|
|
self.hash168s = set() |
|
|
|
rpcs = [ |
|
|
|
('blockchain', |
|
|
@ -689,8 +705,14 @@ class ElectrumX(Session): |
|
|
|
|
|
|
|
async def address_subscribe(self, params): |
|
|
|
hash168 = self.extract_hash168(params) |
|
|
|
if len(self.hash168s) >= self.max_subs: |
|
|
|
raise self.RPCError('your address subscription limit {:,d} reached' |
|
|
|
.format(self.max_subs)) |
|
|
|
result = await self.address_status(hash168) |
|
|
|
# add_subscription can raise so call it before adding |
|
|
|
self.manager.new_subscription() |
|
|
|
self.hash168s.add(hash168) |
|
|
|
return await self.address_status(hash168) |
|
|
|
return result |
|
|
|
|
|
|
|
async def block_get_chunk(self, params): |
|
|
|
index = self.extract_non_negative_integer(params) |
|
|
|