Browse Source

Merge branch 'develop'

master
Neil Booth 8 years ago
parent
commit
f8f5d4429f
  1. 30
      docs/ENVIRONMENT.rst
  2. 6
      electrumx_rpc.py
  3. 27
      lib/jsonrpc.py
  4. 120
      server/controller.py
  5. 72
      server/daemon.py
  6. 4
      server/env.py
  7. 26
      server/peers.py
  8. 5
      server/session.py

30
docs/ENVIRONMENT.rst

@ -215,8 +215,8 @@ raise them.
functioning Electrum clients by default will send pings roughly
every 60 seconds.
TOR
---
PEER DISCOVERY
--------------
In response to the `server.peers.subscribe` RPC call, ElectrumX will
only return peer servers that is has recently connected to and
@ -229,12 +229,36 @@ peers it will fall back to a hard-coded list.
To give incoming clients a full range of onion servers you will need
to be running a Tor proxy for ElectrumX to use.
ElectrumX will perform peer-discovery by default and announce itself
to other peers. If your server is private you may wish to disable
some of this.
* **PEER_DISCOVERY**
If not defined, or non-empty, ElectrumX will occasionally connect to
and verify its network of peer servers. Set to empty to disable
peer discovery.
* **PEER_ANNOUNCE**
Set this environemnt variable to empty to disable announcing itself.
If not defined, or non-empty, ElectrumX will announce itself to
peers.
If peer discovery is disabled this environment variable has no
effect, because ElectrumX only announces itself to peers when doing
peer discovery if it notices it is not present in the peer's
returned list.
* **TOR_PROXY_HOST**
The host where the Tor proxy is running. Defaults to *127.0.0.1*.
The host where your Tor proxy is running. Defaults to *127.0.0.1*.
If you use a hostname here rather than an IP address, you must have
Python version >= 3.5.3, Python 3.5.2 will **not** work.
If you are not running a Tor proxy just leave this environment
variable undefined.
* **TOR_PROXY_PORT**
The port on which the Tor proxy is running. If not set, ElectrumX

6
electrumx_rpc.py

@ -26,13 +26,9 @@ class RPCClient(JSONSession):
super().__init__(version=JSONRPCv2)
self.max_send = 0
self.max_buffer_size = 5*10**6
self.event = asyncio.Event()
def have_pending_items(self):
self.event.set()
async def wait_for_response(self):
await self.event.wait()
await self.items_event.wait()
await self.process_pending_items()
def send_rpc_request(self, method, params):

27
lib/jsonrpc.py

@ -258,7 +258,7 @@ class JSONSessionBase(util.LoggedClass):
from empty
'''
_next_session_id = 0
_pending_reqs = {}
_pending_reqs = {} # Outgoing requests waiting for a response
@classmethod
def next_session_id(cls):
@ -320,6 +320,7 @@ class JSONSessionBase(util.LoggedClass):
self.pause = False
# Handling of incoming items
self.items = collections.deque()
self.items_event = asyncio.Event()
self.batch_results = []
# Handling of outgoing requests
self.next_request_id = 0
@ -461,10 +462,8 @@ class JSONSessionBase(util.LoggedClass):
self.send_error('empty batch', JSONRPC.INVALID_REQUEST)
return
# Incoming items get queued for later asynchronous processing.
if not self.items:
self.have_pending_items()
self.items.append(payload)
self.items_event.set()
async def process_batch(self, batch, count):
'''Processes count items from the batch according to the JSON 2.0
@ -626,6 +625,9 @@ class JSONSessionBase(util.LoggedClass):
if binary:
self.send_binary(binary)
if not self.items:
self.items_event.clear()
def count_pending_items(self):
'''Counts the number of pending items.'''
return sum(len(item) if isinstance(item, list) else 1
@ -716,15 +718,6 @@ class JSONSessionBase(util.LoggedClass):
# App layer
def have_pending_items(self):
'''Called to indicate there are items pending to be processed
asynchronously by calling process_pending_items.
This is *not* called every time an item is added, just when
there were previously none and now there is at least one.
'''
raise NotImplementedError
def using_bandwidth(self, amount):
'''Called as bandwidth is consumed.
@ -749,8 +742,12 @@ class JSONSession(JSONSessionBase, asyncio.Protocol):
'''A JSONSessionBase instance specialized for use with
asyncio.protocol to implement the transport layer.
Derived classes must provide have_pending_items() and may want to
override the request and notification handlers.
The app should await on items_event, which is set when unprocessed
incoming items remain and cleared when the queue is empty, and
then arrange to call process_pending_items asynchronously.
Derived classes may want to override the request and notification
handlers.
'''
def __init__(self, version=JSONRPCCompat):

120
server/controller.py

@ -69,9 +69,6 @@ class Controller(util.LoggedClass):
self.next_stale_check = 0
self.history_cache = pylru.lrucache(256)
self.header_cache = pylru.lrucache(8)
self.queue = asyncio.PriorityQueue()
self.delayed_sessions = []
self.next_queue_id = 0
self.cache_height = 0
env.max_send = max(350000, env.max_send)
self.setup_bands()
@ -136,67 +133,6 @@ class Controller(util.LoggedClass):
def is_deprioritized(self, session):
return self.session_priority(session) > self.BANDS
async def enqueue_delayed_sessions(self):
while True:
now = time.time()
keep = []
for pair in self.delayed_sessions:
timeout, item = pair
priority, queue_id, session = item
if not session.pause and timeout <= now:
self.queue.put_nowait(item)
else:
keep.append(pair)
self.delayed_sessions = keep
# If paused and session count has fallen, start listening again
if (len(self.sessions) <= self.low_watermark
and self.state == self.PAUSED):
await self.start_external_servers()
# Periodically log sessions
if self.env.log_sessions and time.time() > self.next_log_sessions:
if self.next_log_sessions:
data = self.session_data(for_log=True)
for line in Controller.sessions_text_lines(data):
self.logger.info(line)
self.logger.info(json.dumps(self.getinfo()))
self.next_log_sessions = time.time() + self.env.log_sessions
await asyncio.sleep(1)
def enqueue_session(self, session):
# Might have disconnected whilst waiting
if session not in self.sessions:
return
priority = self.session_priority(session)
item = (priority, self.next_queue_id, session)
self.next_queue_id += 1
excess = max(0, priority - self.BANDS)
if excess != session.last_delay:
session.last_delay = excess
if excess:
session.log_info('high bandwidth use, deprioritizing by '
'delaying responses {:d}s'.format(excess))
else:
session.log_info('stopped delaying responses')
delay = max(int(session.pause), excess)
if delay:
self.delayed_sessions.append((time.time() + delay, item))
else:
self.queue.put_nowait(item)
async def serve_requests(self):
'''Asynchronously run through the task queue.'''
while True:
priority_, id_, session = await self.queue.get()
if session in self.sessions:
await session.process_pending_items()
# Re-enqueue the session if stuff is left
if session.items:
self.enqueue_session(session)
async def run_in_executor(self, func, *args):
'''Wait whilst running func in the executor.'''
return await self.loop.run_in_executor(None, func, *args)
@ -225,11 +161,30 @@ class Controller(util.LoggedClass):
except Exception:
self.log_error(traceback.format_exc())
async def check_request_timeouts(self):
'''Regularly check pending JSON requests for timeouts.'''
async def housekeeping(self):
'''Regular housekeeping checks.'''
n = 0
while True:
await asyncio.sleep(30)
n += 1
await asyncio.sleep(15)
JSONSessionBase.timeout_check()
if n % 10 == 0:
self.clear_stale_sessions()
# Start listening for incoming connections if paused and
# session count has fallen
if (self.state == self.PAUSED and
len(self.sessions) <= self.low_watermark):
await self.start_external_servers()
# Periodically log sessions
if self.env.log_sessions and time.time() > self.next_log_sessions:
if self.next_log_sessions:
data = self.session_data(for_log=True)
for line in Controller.sessions_text_lines(data):
self.logger.info(line)
self.logger.info(json.dumps(self.getinfo()))
self.next_log_sessions = time.time() + self.env.log_sessions
async def wait_for_bp_catchup(self):
'''Called when the block processor catches up.'''
@ -237,12 +192,9 @@ class Controller(util.LoggedClass):
self.logger.info('block processor has caught up')
self.ensure_future(self.peer_mgr.main_loop())
self.ensure_future(self.start_servers())
self.ensure_future(self.check_request_timeouts())
self.ensure_future(self.housekeeping())
self.ensure_future(self.mempool.main_loop())
self.ensure_future(self.enqueue_delayed_sessions())
self.ensure_future(self.notify())
for n in range(4):
self.ensure_future(self.serve_requests())
async def main_loop(self):
'''Controller main loop.'''
@ -379,11 +331,28 @@ class Controller(util.LoggedClass):
self.header_cache[height] = header
return header
def session_delay(self, session):
priority = self.session_priority(session)
excess = max(0, priority - self.BANDS)
if excess != session.last_delay:
session.last_delay = excess
if excess:
session.log_info('high bandwidth use, deprioritizing by '
'delaying responses {:d}s'.format(excess))
else:
session.log_info('stopped delaying responses')
return max(int(session.pause), excess)
async def process_items(self, session):
'''Waits for incoming session items and processes them.'''
while True:
await session.items_event.wait()
await asyncio.sleep(self.session_delay(session))
if not session.pause:
await session.process_pending_items()
def add_session(self, session):
now = time.time()
if now > self.next_stale_check:
self.next_stale_check = now + 300
self.clear_stale_sessions()
session.items_future = self.ensure_future(self.process_items(session))
gid = int(session.start_time - self.start_time) // 900
self.groups[gid].append(session)
self.sessions[session] = gid
@ -400,6 +369,7 @@ class Controller(util.LoggedClass):
def remove_session(self, session):
'''Remove a session from our sessions list if there.'''
session.items_future.cancel()
if session in self.sessions:
gid = self.sessions.pop(session)
assert gid in self.groups

72
server/daemon.py

@ -10,6 +10,7 @@ daemon.'''
import asyncio
import json
import time
import traceback
import aiohttp
@ -38,6 +39,8 @@ class Daemon(util.LoggedClass):
# Limit concurrent RPC calls to this number.
# See DEFAULT_HTTP_WORKQUEUE in bitcoind, which is typically 16
self.workqueue_semaphore = asyncio.Semaphore(value=10)
self.down = False
self.last_error_time = 0
def set_urls(self, urls):
'''Set the URLS to the given list, and switch to the first one.'''
@ -65,48 +68,56 @@ class Daemon(util.LoggedClass):
return True
return False
async def _send_data(self, data):
async with self.workqueue_semaphore:
async with aiohttp.ClientSession() as session:
async with session.post(self.url(), data=data) as resp:
# If bitcoind can't find a tx, for some reason
# it returns 500 but fills out the JSON.
# Should still return 200 IMO.
if resp.status in (200, 500):
return await resp.json()
return (resp.status, resp.reason)
async def _send(self, payload, processor):
'''Send a payload to be converted to JSON.
Handles temporary connection issues. Daemon reponse errors
are raise through DaemonError.
'''
self.prior_msg = None
self.skip_count = None
def log_error(msg, skip_once=False):
if skip_once and self.skip_count is None:
self.skip_count = 1
if msg != self.prior_msg or self.skip_count == 0:
self.skip_count = 10
self.prior_msg = msg
self.logger.error('{} Retrying between sleeps...'
.format(msg))
self.skip_count -= 1
def log_error(error):
self.down = True
now = time.time()
prior_time = self.last_error_time
if now - prior_time > 60:
self.last_error_time = now
if prior_time and self.failover():
secs = 0
else:
self.logger.error('{} Retrying occasionally...'
.format(error))
data = json.dumps(payload)
secs = 1
max_secs = 16
max_secs = 4
while True:
try:
async with self.workqueue_semaphore:
async with aiohttp.post(self.url(), data=data) as resp:
# If bitcoind can't find a tx, for some reason
# it returns 500 but fills out the JSON.
# Should still return 200 IMO.
if resp.status in (200, 500):
if self.prior_msg:
self.logger.info('connection restored')
result = processor(await resp.json())
return result
result = await self._send_data(data)
if not isinstance(result, tuple):
result = processor(result)
if self.down:
self.down = False
self.last_error_time = 0
self.logger.info('connection restored')
return result
log_error('HTTP error code {:d}: {}'
.format(resp.status, resp.reason))
.format(result[0], result[1]))
except asyncio.TimeoutError:
log_error('timeout error.', skip_once=True)
log_error('timeout error.')
except aiohttp.ClientHttpProcessingError:
log_error('HTTP error.', skip_once=True)
log_error('HTTP error.')
except aiohttp.ServerDisconnectedError:
log_error('disconnected.', skip_once=True)
log_error('disconnected.')
except aiohttp.ClientConnectionError:
log_error('connection problem - is your daemon running?')
except self.DaemonWarmingUpError:
@ -116,11 +127,8 @@ class Daemon(util.LoggedClass):
except Exception:
self.log_error(traceback.format_exc())
if secs >= max_secs and self.failover():
secs = 1
else:
await asyncio.sleep(secs)
secs = min(max_secs, secs * 2)
await asyncio.sleep(secs)
secs = min(max_secs, secs * 2, 1)
def logged_url(self, url=None):
'''The host and port part, for logging.'''

4
server/env.py

@ -48,7 +48,9 @@ class Env(LoggedClass):
self.banner_file)
self.anon_logs = self.default('ANON_LOGS', False)
self.log_sessions = self.integer('LOG_SESSIONS', 3600)
# Tor proxy
# Peer discovery
self.peer_discovery = bool(self.default('PEER_DISCOVERY', True))
self.peer_announce = bool(self.default('PEER_ANNOUNCE', True))
# Python 3.5.3 - revert back to localhost?
self.tor_proxy_host = self.default('TOR_PROXY_HOST', '127.0.0.1')
self.tor_proxy_port = self.integer('TOR_PROXY_PORT', None)

26
server/peers.py

@ -59,13 +59,16 @@ class PeerSession(JSONSession):
self.failed = False
self.log_prefix = '[{}] '.format(self.peer)
def have_pending_items(self):
self.peer_mgr.ensure_future(self.process_pending_items())
async def wait_on_items(self):
while True:
await self.items_event.wait()
await self.process_pending_items()
def connection_made(self, transport):
'''Handle an incoming client connection.'''
super().connection_made(transport)
self.log_prefix = '[{}] '.format(str(self.peer)[:25])
self.future = self.peer_mgr.ensure_future(self.wait_on_items())
# Update IP address
if not self.peer.is_tor:
@ -82,6 +85,7 @@ class PeerSession(JSONSession):
def connection_lost(self, exc):
'''Handle disconnection.'''
super().connection_lost(exc)
self.future.cancel()
self.peer_mgr.connection_lost(self)
def on_peers_subscribe(self, result, error):
@ -108,13 +112,16 @@ class PeerSession(JSONSession):
return
self.peer_mgr.add_peers(peers)
if not self.peer_mgr.env.peer_announce:
return
# Announce ourself if not present
my = self.peer_mgr.myself
for peer in my.matches(peers):
if peer.tcp_port == my.tcp_port and peer.ssl_port == my.ssl_port:
return
# Announce ourself if not present
self.log_info('registering with server.add_peer')
self.log_info('registering ourself with server.add_peer')
self.send_request(self.on_add_peer, 'server.add_peer', [my.features])
def on_add_peer(self, result, error):
@ -306,8 +313,8 @@ class PeerManager(util.LoggedClass):
'''Returns the server peers as a list of (ip, host, details) tuples.
We return all peers we've connected to in the last day.
Additionally, if we don't have onion routing, we return up to
three randomly selected onion servers.
Additionally, if we don't have onion routing, we return a few
hard-coded onion servers.
'''
cutoff = time.time() - STALE_SECS
recent = [peer for peer in self.peers
@ -403,6 +410,11 @@ class PeerManager(util.LoggedClass):
3) Retrying old peers at regular intervals.
'''
self.connect_to_irc()
if not self.env.peer_discovery:
self.logger.info('peer discovery is disabled')
return
self.logger.info('beginning peer discovery')
try:
while True:
timeout = self.loop.call_later(WAKEUP_SECS,

5
server/session.py

@ -47,11 +47,6 @@ class SessionBase(JSONSession):
self.bw_used = 0
self.peer_added = False
def have_pending_items(self):
'''Called each time the pending item queue goes from empty to having
one item.'''
self.controller.enqueue_session(self)
def close_connection(self):
'''Call this to close the connection.'''
self.close_time = time.time()

Loading…
Cancel
Save