Browse Source

aiorpcx: replace network loop with asyncio and try to maintain ten sessions

3.3.3.1
Janus 7 years ago
committed by SomberNight
parent
commit
97ea0fc439
No known key found for this signature in database GPG Key ID: B33B5F232C6271E9
  1. 4
      electrum/address_synchronizer.py
  2. 4
      electrum/daemon.py
  3. 2
      electrum/gui/qt/main_window.py
  4. 85
      electrum/network.py
  5. 13
      electrum/util.py

4
electrum/address_synchronizer.py

@ -139,14 +139,14 @@ class AddressSynchronizer(PrintError):
if self.network is not None:
self.verifier = SPV(self.network, self)
self.synchronizer = Synchronizer(self, network)
network.add_jobs([self.verifier, self.synchronizer])
#network.add_jobs([self.verifier, self.synchronizer])
else:
self.verifier = None
self.synchronizer = None
def stop_threads(self):
if self.network:
self.network.remove_jobs([self.synchronizer, self.verifier])
#self.network.remove_jobs([self.synchronizer, self.verifier])
self.synchronizer.release()
self.synchronizer = None
self.verifier = None

4
electrum/daemon.py

@ -128,8 +128,8 @@ class Daemon(DaemonThread):
self.network = Network(config)
self.network.start()
self.fx = FxThread(config, self.network)
if self.network:
self.network.add_jobs([self.fx])
#if self.network:
# self.network.add_jobs([self.fx])
self.gui = None
self.wallets = {}
# Setup JSONRPC server

2
electrum/gui/qt/main_window.py

@ -712,7 +712,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, PrintError):
if not self.wallet:
return
if self.network is None or not self.network.is_running():
if self.network is None:
text = _("Offline")
icon = QIcon(":icons/status_disconnected.png")

85
electrum/network.py

@ -39,20 +39,42 @@ import dns.resolver
import socks
from . import util
from .util import print_error
from .util import print_error, PrintError
from . import bitcoin
from .bitcoin import COIN
from . import constants
from .interface import Connection, Interface
from . import blockchain
from .version import ELECTRUM_VERSION, PROTOCOL_VERSION
from .i18n import _
from .blockchain import InvalidHeader
import aiorpcx, asyncio, ssl
import concurrent.futures
NODES_RETRY_INTERVAL = 60
SERVER_RETRY_INTERVAL = 10
class Interface(PrintError):
@util.aiosafe
async def run(self):
self.host, self.port, self.protocol = self.server.split(':')
async with aiorpcx.ClientSession(self.host, self.port) as session:
ver = await session.send_request('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])
print(ver)
def __init__(self, server):
self.exception = None
self.server = server
self.fut = asyncio.get_event_loop().create_task(self.run())
def has_timed_out(self):
return self.fut.done()
def queue_request(self, method, params, msg_id):
pass
def close(self):
self.fut.cancel()
def parse_servers(result):
""" parse servers list into dict format"""
@ -162,7 +184,7 @@ def serialize_server(host, port, protocol):
return str(':'.join([host, port, protocol]))
class Network(util.DaemonThread):
class Network(PrintError):
"""The Network class manages a set of connections to remote electrum
servers, each connected socket is handled by an Interface() object.
Connections are initiated by a Connection() thread which stops once
@ -179,7 +201,6 @@ class Network(util.DaemonThread):
def __init__(self, config=None):
if config is None:
config = {} # Do not use mutables as default values!
util.DaemonThread.__init__(self)
self.config = SimpleConfig(config) if isinstance(config, dict) else config
self.num_server = 10 if not self.config.get('oneserver') else 0
self.blockchains = blockchain.read_blockchains(self.config) # note: needs self.blockchains_lock
@ -244,6 +265,7 @@ class Network(util.DaemonThread):
self.socket_queue = queue.Queue()
self.start_network(deserialize_server(self.default_server)[2],
deserialize_proxy(self.config.get('proxy')))
self.asyncio_loop = asyncio.get_event_loop()
def with_interface_lock(func):
def func_wrapper(self, *args, **kwargs):
@ -424,7 +446,7 @@ class Network(util.DaemonThread):
self.print_error("connecting to %s as new interface" % server)
self.set_status('connecting')
self.connecting.add(server)
Connection(server, self.socket_queue, self.config.path)
self.socket_queue.put(server)
def start_random_interface(self):
with self.interface_lock:
@ -781,10 +803,10 @@ class Network(util.DaemonThread):
if b.catch_up == server:
b.catch_up = None
def new_interface(self, server, socket):
def new_interface(self, server):
# todo: get tip first, then decide which checkpoint to use.
self.add_recent_server(server)
interface = Interface(server, socket)
interface = Interface(server)
interface.blockchain = None
interface.tip_header = None
interface.tip = 0
@ -804,12 +826,12 @@ class Network(util.DaemonThread):
'''Socket maintenance.'''
# Responses to connection attempts?
while not self.socket_queue.empty():
server, socket = self.socket_queue.get()
server = self.socket_queue.get()
if server in self.connecting:
self.connecting.remove(server)
if socket:
self.new_interface(server, socket)
self.new_interface(server)
else:
self.connection_down(server)
@ -1078,16 +1100,15 @@ class Network(util.DaemonThread):
with b.lock:
b.update_size()
def run(self):
def _run(self):
self.init_headers_file()
while self.is_running():
self.maintain_sockets()
self.wait_on_sockets()
self.maintain_requests()
self.run_jobs() # Synchronizer and Verifier
self.process_pending_sends()
self.stop_network()
self.on_stop()
these = [self.maintain_sessions()]
these = [self.asyncio_loop.create_task(x) for x in these]
self.gat = asyncio.gather(*these)
try:
self.asyncio_loop.run_until_complete(self.gat)
except concurrent.futures.CancelledError:
pass
def on_notify_header(self, interface, header_dict):
try:
@ -1321,3 +1342,31 @@ class Network(util.DaemonThread):
@classmethod
def max_checkpoint(cls):
return max(0, len(constants.net.CHECKPOINTS) * 2016 - 1)
def start(self):
self.fut = threading.Thread(target=self._run)
self.fut.start()
def stop(self):
async def stop():
self.gat.cancel()
asyncio.run_coroutine_threadsafe(stop(), self.asyncio_loop)
def join(self):
return self.fut.join(1)
async def maintain_sessions(self):
while True:
while self.socket_queue.qsize() > 0:
server = self.socket_queue.get()
self.new_interface(server)
remove = []
for k, i in self.interfaces.items():
if i.has_timed_out():
remove.append(k)
for k in remove:
self.connection_down(k)
for i in range(self.num_server - len(self.interfaces)):
self.start_random_interface()
self.notify('updated')
await asyncio.sleep(1)

13
electrum/util.py

@ -925,6 +925,19 @@ def make_dir(path, allow_symlink=True):
os.mkdir(path)
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
def aiosafe(f):
# save exception in object.
# f must be a method of a PrintError instance.
# aiosafe calls should not be nested
async def f2(*args, **kwargs):
self = args[0]
try:
return await f(*args, **kwargs)
except BaseException as e:
self.print_error("Exception in", f.__name__, ":", e.__class__.__name__, str(e))
traceback.print_exc(file=sys.stderr)
self.exception = e
return f2
TxMinedStatus = NamedTuple("TxMinedStatus", [("height", int),
("conf", int),

Loading…
Cancel
Save