|
|
@ -15,7 +15,9 @@ import ssl |
|
|
|
import time |
|
|
|
import traceback |
|
|
|
from collections import defaultdict, namedtuple |
|
|
|
from functools import partial |
|
|
|
from functools import partial, lru_cache |
|
|
|
|
|
|
|
import pylru |
|
|
|
|
|
|
|
from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash |
|
|
|
from lib.jsonrpc import JSONRPC, json_notification_payload |
|
|
@ -229,6 +231,7 @@ class ServerManager(util.LoggedClass): |
|
|
|
self.max_subs = env.max_subs |
|
|
|
self.subscription_count = 0 |
|
|
|
self.next_stale_check = 0 |
|
|
|
self.history_cache = pylru.lrucache(512) |
|
|
|
self.futures = [] |
|
|
|
env.max_send = max(350000, env.max_send) |
|
|
|
self.logger.info('session timeout: {:,d} seconds' |
|
|
@ -329,6 +332,25 @@ class ServerManager(util.LoggedClass): |
|
|
|
self.logger.info(json.dumps(self.server_summary())) |
|
|
|
self.next_log_sessions = time.time() + self.env.log_sessions |
|
|
|
|
|
|
|
async def async_get_history(self, hash168): |
|
|
|
if hash168 in self.history_cache: |
|
|
|
return self.history_cache[hash168] |
|
|
|
|
|
|
|
# History DoS limit. Each element of history is about 99 |
|
|
|
# bytes when encoded as JSON. This limits resource usage on |
|
|
|
# bloated history requests, and uses a smaller divisor so |
|
|
|
# large requests are logged before refusing them. |
|
|
|
limit = self.env.max_send // 97 |
|
|
|
# Python 3.6: use async generators; update callers |
|
|
|
history = [] |
|
|
|
for item in self.bp.get_history(hash168, limit=limit): |
|
|
|
history.append(item) |
|
|
|
if len(history) % 100 == 0: |
|
|
|
await asyncio.sleep(0) |
|
|
|
|
|
|
|
self.history_cache[hash168] = history |
|
|
|
return history |
|
|
|
|
|
|
|
async def shutdown(self): |
|
|
|
'''Call to shutdown the servers. Returns when done.''' |
|
|
|
self.bp.shutdown() |
|
|
@ -351,8 +373,13 @@ class ServerManager(util.LoggedClass): |
|
|
|
self.logger.info('server listening sockets closed, waiting ' |
|
|
|
'{:d} seconds for socket cleanup'.format(secs)) |
|
|
|
limit = time.time() + secs |
|
|
|
while self.sessions and time.time() < limit: |
|
|
|
await asyncio.sleep(4) |
|
|
|
while self.sessions: |
|
|
|
if time.time() < limit: |
|
|
|
await asyncio.sleep(4) |
|
|
|
else: |
|
|
|
for session in list(self.sessions): |
|
|
|
self.close_session(session, hard=True) |
|
|
|
await asyncio.sleep(0) |
|
|
|
self.logger.info('{:,d} sessions remaining' |
|
|
|
.format(len(self.sessions))) |
|
|
|
|
|
|
@ -368,23 +395,26 @@ class ServerManager(util.LoggedClass): |
|
|
|
self.close_session(session) |
|
|
|
|
|
|
|
def remove_session(self, session): |
|
|
|
self.subscription_count -= session.sub_count() |
|
|
|
future = self.sessions.pop(session) |
|
|
|
future.cancel() |
|
|
|
# It might have been forcefully removed earlier by close_session() |
|
|
|
if session in self.sessions: |
|
|
|
self.subscription_count -= session.sub_count() |
|
|
|
future = self.sessions.pop(session) |
|
|
|
future.cancel() |
|
|
|
|
|
|
|
def close_session(self, session): |
|
|
|
def close_session(self, session, hard=False): |
|
|
|
'''Close the session's transport and cancel its future.''' |
|
|
|
session.transport.close() |
|
|
|
self.sessions[session].cancel() |
|
|
|
return '{:d} disconnected'.format(session.id_) |
|
|
|
if hard: |
|
|
|
self.remove_session(session) |
|
|
|
socket = session.transport.get_extra_info('socket') |
|
|
|
socket.close() |
|
|
|
return 'disconnected {:d}'.format(session.id_) |
|
|
|
|
|
|
|
def toggle_logging(self, session): |
|
|
|
'''Close the session's transport and cancel its future.''' |
|
|
|
'''Toggle logging of the session.''' |
|
|
|
session.log_me = not session.log_me |
|
|
|
if session.log_me: |
|
|
|
return 'logging {:d}'.format(session.id_) |
|
|
|
else: |
|
|
|
return 'not logging {:d}'.format(session.id_) |
|
|
|
return 'log {:d}: {}'.format(session.id_, session.log_me) |
|
|
|
|
|
|
|
def clear_stale_sessions(self): |
|
|
|
'''Cut off sessions that haven't done anything for 10 minutes.''' |
|
|
@ -395,10 +425,10 @@ class ServerManager(util.LoggedClass): |
|
|
|
stale = [session for session in self.sessions |
|
|
|
if session.last_recv < cutoff] |
|
|
|
for session in stale: |
|
|
|
self.close_session(session) |
|
|
|
self.close_session(session, hard=True) |
|
|
|
if stale: |
|
|
|
self.logger.info('dropped {:,d} stale connections' |
|
|
|
.format(len(stale))) |
|
|
|
self.logger.info('dropped stale connections {}' |
|
|
|
.format([session.id_ for session in stale])) |
|
|
|
|
|
|
|
def new_subscription(self): |
|
|
|
if self.subscription_count >= self.max_subs: |
|
|
@ -716,7 +746,7 @@ class ElectrumX(Session): |
|
|
|
'''Returns status as 32 bytes.''' |
|
|
|
# Note history is ordered and mempool unordered in electrum-server |
|
|
|
# For mempool, height is -1 if unconfirmed txins, otherwise 0 |
|
|
|
history = await self.async_get_history(hash168) |
|
|
|
history = await self.manager.async_get_history(hash168) |
|
|
|
mempool = self.manager.mempool_transactions(hash168) |
|
|
|
|
|
|
|
status = ''.join('{}:{:d}:'.format(hash_to_str(tx_hash), height) |
|
|
@ -761,7 +791,7 @@ class ElectrumX(Session): |
|
|
|
|
|
|
|
async def get_history(self, hash168): |
|
|
|
# Note history is ordered but unconfirmed is unordered in e-s |
|
|
|
history = await self.async_get_history(hash168) |
|
|
|
history = await self.manager.async_get_history(hash168) |
|
|
|
conf = [{'tx_hash': hash_to_str(tx_hash), 'height': height} |
|
|
|
for tx_hash, height in history] |
|
|
|
|
|
|
@ -775,20 +805,6 @@ class ElectrumX(Session): |
|
|
|
count = min(next_height - start_height, chunk_size) |
|
|
|
return self.bp.read_headers(start_height, count).hex() |
|
|
|
|
|
|
|
async def async_get_history(self, hash168): |
|
|
|
# History DoS limit. Each element of history is about 99 |
|
|
|
# bytes when encoded as JSON. This limits resource usage on |
|
|
|
# bloated history requests, and uses a smaller divisor so |
|
|
|
# large requests are logged before refusing them. |
|
|
|
limit = self.max_send // 97 |
|
|
|
# Python 3.6: use async generators; update callers |
|
|
|
history = [] |
|
|
|
for item in self.bp.get_history(hash168, limit=limit): |
|
|
|
history.append(item) |
|
|
|
if len(history) % 100 == 0: |
|
|
|
await asyncio.sleep(0) |
|
|
|
return history |
|
|
|
|
|
|
|
async def get_utxos(self, hash168): |
|
|
|
# Python 3.6: use async generators; update callers |
|
|
|
utxos = [] |
|
|
|