Browse Source

lnworker: make add_peer async

regtest_lnd
SomberNight 6 years ago
parent
commit
cc066f9f1c
No known key found for this signature in database GPG Key ID: B33B5F232C6271E9
  1. 27
      electrum/lnworker.py

27
electrum/lnworker.py

@ -90,7 +90,10 @@ class LNWorker(PrintError):
def _add_peers_from_config(self):
peer_list = self.config.get('lightning_peers', [])
for host, port, pubkey in peer_list:
self.add_peer(host, int(port), bfh(pubkey))
asyncio.run_coroutine_threadsafe(
self.add_peer(host, int(port), bfh(pubkey)),
self.network.asyncio_loop)
def suggest_peer(self):
for node_id, peer in self.peers.items():
@ -105,20 +108,20 @@ class LNWorker(PrintError):
with self.lock:
return {x: y for (x, y) in self.channels.items() if y.node_id == node_id}
def add_peer(self, host, port, node_id):
async def add_peer(self, host, port, node_id):
port = int(port)
peer_addr = LNPeerAddr(host, port, node_id)
if node_id in self.peers:
return
self._last_tried_peer[peer_addr] = time.time()
self.print_error("adding peer", peer_addr)
peer = Peer(self, peer_addr, request_initial_sync=self.config.get("request_initial_sync", True))
async def _init_peer():
reader, writer = await asyncio.open_connection(peer_addr.host, peer_addr.port)
transport = LNTransport(self.node_keypair.privkey, node_id, reader, writer)
peer.transport = transport
await self.network.main_taskgroup.spawn(peer.main_loop())
asyncio.ensure_future(_init_peer())
peer = Peer(self, peer_addr, request_initial_sync=self.config.get("request_initial_sync", True))
self.peers[node_id] = peer
self.network.trigger_callback('ln_status')
return peer
@ -240,7 +243,9 @@ class LNWorker(PrintError):
socket.getaddrinfo(host, int(port))
except socket.gaierror:
raise ConnStringFormatError(_('Hostname does not resolve (getaddrinfo failed)'))
peer = self.add_peer(host, port, node_id)
peer_future = asyncio.run_coroutine_threadsafe(self.add_peer(host, port, node_id),
self.network.asyncio_loop)
peer = peer_future.result(timeout)
coro = self._open_channel_coroutine(peer, local_amt_sat, push_amt_sat, password)
f = asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
chan = f.result(timeout)
@ -452,14 +457,14 @@ class LNWorker(PrintError):
self.print_error('got {} ln peers from dns seed'.format(len(peers)))
return peers
def reestablish_peers_and_channels(self):
def reestablish_peer_for_given_channel():
async def reestablish_peers_and_channels(self):
async def reestablish_peer_for_given_channel():
# try last good address first
peer = self.channel_db.get_last_good_address(chan.node_id)
if peer:
last_tried = self._last_tried_peer.get(peer, 0)
if last_tried + PEER_RETRY_INTERVAL_FOR_CHANNELS < now:
self.add_peer(peer.host, peer.port, peer.pubkey)
await self.add_peer(peer.host, peer.port, peer.pubkey)
return
# try random address for node_id
node_info = self.channel_db.nodes.get(chan.node_id, None)
@ -470,7 +475,7 @@ class LNWorker(PrintError):
peer = LNPeerAddr(host, port, chan.node_id)
last_tried = self._last_tried_peer.get(peer, 0)
if last_tried + PEER_RETRY_INTERVAL_FOR_CHANNELS < now:
self.add_peer(host, port, chan.node_id)
await self.add_peer(host, port, chan.node_id)
with self.lock:
channels = list(self.channels.values())
@ -480,7 +485,7 @@ class LNWorker(PrintError):
continue
peer = self.peers.get(chan.node_id, None)
if peer is None:
reestablish_peer_for_given_channel()
await reestablish_peer_for_given_channel()
else:
coro = peer.reestablish_channel(chan)
asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
@ -491,11 +496,11 @@ class LNWorker(PrintError):
while True:
await asyncio.sleep(1)
now = time.time()
self.reestablish_peers_and_channels()
await self.reestablish_peers_and_channels()
if len(self.peers) >= NUM_PEERS_TARGET:
continue
peers = self._get_next_peers_to_try()
for peer in peers:
last_tried = self._last_tried_peer.get(peer, 0)
if last_tried + PEER_RETRY_INTERVAL < now:
self.add_peer(peer.host, peer.port, peer.pubkey)
await self.add_peer(peer.host, peer.port, peer.pubkey)

Loading…
Cancel
Save