From c02cc9bb3baa1215b7e73b84dec26161ba6fb1d3 Mon Sep 17 00:00:00 2001 From: SomberNight Date: Fri, 27 Jul 2018 20:59:04 +0200 Subject: [PATCH] persist recent peers. implement dns seed bootstrapping. dns seeds are currently disabled though, as they always seem to return mainnet nodes. --- electrum/constants.py | 12 ++++ electrum/lnbase.py | 4 +- electrum/lnrouter.py | 36 +++++++++- electrum/lnutil.py | 18 +++++ electrum/lnworker.py | 132 +++++++++++++++++++++++++++------- electrum/tests/test_lnutil.py | 11 ++- electrum/util.py | 15 ++++ 7 files changed, 194 insertions(+), 34 deletions(-) diff --git a/electrum/constants.py b/electrum/constants.py index 7781de369..d5e5ebcab 100644 --- a/electrum/constants.py +++ b/electrum/constants.py @@ -84,6 +84,11 @@ class BitcoinMainnet(AbstractNet): } XPUB_HEADERS_INV = inv_dict(XPUB_HEADERS) BIP44_COIN_TYPE = 0 + LN_REALM_BYTE = 0 + LN_DNS_SEEDS = [ + 'nodes.lightning.directory.', + 'lseed.bitcoinstats.com.', + ] class BitcoinTestnet(AbstractNet): @@ -115,6 +120,11 @@ class BitcoinTestnet(AbstractNet): } XPUB_HEADERS_INV = inv_dict(XPUB_HEADERS) BIP44_COIN_TYPE = 1 + LN_REALM_BYTE = 1 + LN_DNS_SEEDS = [ + 'test.nodes.lightning.directory.', + 'lseed.bitcoinstats.com.', + ] class BitcoinRegtest(BitcoinTestnet): @@ -123,6 +133,7 @@ class BitcoinRegtest(BitcoinTestnet): GENESIS = "0f9188f13cb7b2c71f2a335e3a4fc328bf5beb436012afca590b1a11466e2206" DEFAULT_SERVERS = read_json('servers_regtest.json', {}) CHECKPOINTS = [] + LN_DNS_SEEDS = [] class BitcoinSimnet(BitcoinTestnet): @@ -134,6 +145,7 @@ class BitcoinSimnet(BitcoinTestnet): GENESIS = "683e86bd5c6d110d91b94b97137ba6bfe02dbbdb8e3dff722a669b5d69d77af6" DEFAULT_SERVERS = read_json('servers_regtest.json', {}) CHECKPOINTS = [] + LN_DNS_SEEDS = [] # don't import net directly, import the module instead (so that net is singleton) diff --git a/electrum/lnbase.py b/electrum/lnbase.py index 8ce7a24ed..c266cb2f3 100644 --- a/electrum/lnbase.py +++ b/electrum/lnbase.py @@ -7,7 +7,7 @@ from collections import namedtuple, defaultdict, OrderedDict, defaultdict from .lnutil import Outpoint, ChannelConfig, LocalState, RemoteState, Keypair, OnlyPubkeyKeypair, ChannelConstraints, RevocationStore from .lnutil import sign_and_get_sig_string, funding_output_script, get_ecdh, get_per_commitment_secret_from_seed -from .lnutil import secret_to_pubkey +from .lnutil import secret_to_pubkey, LNPeerAddr from .bitcoin import COIN from ecdsa.util import sigdecode_der, sigencode_string_canonize, sigdecode_string @@ -439,7 +439,6 @@ class Peer(PrintError): def on_channel_announcement(self, payload): self.channel_db.on_channel_announcement(payload) - self.network.trigger_callback('ln_status') def on_announcement_signatures(self, payload): channel_id = payload['channel_id'] @@ -462,6 +461,7 @@ class Peer(PrintError): @aiosafe async def main_loop(self): await asyncio.wait_for(self.initialize(), 5) + self.channel_db.add_recent_peer(LNPeerAddr(self.host, self.port, self.pubkey)) # loop while True: self.ping_if_required() diff --git a/electrum/lnrouter.py b/electrum/lnrouter.py index f87b33f9d..2af83427b 100644 --- a/electrum/lnrouter.py +++ b/electrum/lnrouter.py @@ -38,7 +38,7 @@ from .storage import JsonDB from .lnchanannverifier import LNChanAnnVerifier, verify_sig_for_channel_update from .crypto import Hash from . import ecc -from .lnutil import LN_GLOBAL_FEATURE_BITS +from .lnutil import LN_GLOBAL_FEATURE_BITS, LNPeerAddr class UnknownEvenFeatureBits(Exception): pass @@ -256,16 +256,19 @@ class NodeInfo(PrintError): class ChannelDB(JsonDB): + NUM_MAX_RECENT_PEERS = 20 + def __init__(self, network): self.network = network path = os.path.join(get_headers_dir(network.config), 'channel_db') JsonDB.__init__(self, path) - self.lock = threading.Lock() + self.lock = threading.RLock() self._id_to_channel_info = {} self._channels_for_node = defaultdict(set) # node -> set(short_channel_id) self.nodes = {} # node_id -> NodeInfo + self._recent_peers = [] self.ca_verifier = LNChanAnnVerifier(network, self) self.network.add_jobs([self.ca_verifier]) @@ -289,6 +292,11 @@ class ChannelDB(JsonDB): node_info = NodeInfo.from_json(node_info_d) node_id = bfh(node_id) self.nodes[node_id] = node_info + # recent peers + recent_peers = self.get('recent_peers', {}) + for host, port, pubkey in recent_peers: + peer = LNPeerAddr(str(host), int(port), bfh(pubkey)) + self._recent_peers.append(peer) def save_data(self): with self.lock: @@ -302,6 +310,12 @@ class ChannelDB(JsonDB): for node_id, node_info in self.nodes.items(): node_infos[bh2u(node_id)] = node_info self.put('node_infos', node_infos) + # recent peers + recent_peers = [] + for peer in self._recent_peers: + recent_peers.append( + [str(peer.host), int(peer.port), bh2u(peer.pubkey)]) + self.put('recent_peers', recent_peers) self.write() def __len__(self): @@ -320,12 +334,26 @@ class ChannelDB(JsonDB): self._id_to_channel_info[short_channel_id] = channel_info self._channels_for_node[channel_info.node_id_1].add(short_channel_id) self._channels_for_node[channel_info.node_id_2].add(short_channel_id) + self.network.trigger_callback('ln_status') + + def get_recent_peers(self): + with self.lock: + return list(self._recent_peers) + + def add_recent_peer(self, peer: LNPeerAddr): + with self.lock: + # list is ordered + if peer in self._recent_peers: + self._recent_peers.remove(peer) + self._recent_peers.insert(0, peer) + self._recent_peers = self._recent_peers[:self.NUM_MAX_RECENT_PEERS] def on_channel_announcement(self, msg_payload, trusted=False): short_channel_id = msg_payload['short_channel_id'] if short_channel_id in self._id_to_channel_info: return if constants.net.rev_genesis_bytes() != msg_payload['chain_hash']: + #self.print_error("ChanAnn has unexpected chain_hash {}".format(bh2u(msg_payload['chain_hash']))) return try: channel_info = ChannelInfo(msg_payload) @@ -365,6 +393,10 @@ class ChannelDB(JsonDB): new_node_info = NodeInfo(msg_payload) except UnknownEvenFeatureBits: return + # TODO if this message is for a new node, and if we have no associated + # channels for this node, we should ignore the message and return here, + # to mitigate DOS. but race condition: the channels we have for this + # node, might be under verification in self.ca_verifier, what then? if old_node_info and old_node_info.timestamp >= new_node_info.timestamp: return # ignore self.nodes[pubkey] = new_node_info diff --git a/electrum/lnutil.py b/electrum/lnutil.py index e55bd27b1..d90f0ae6a 100644 --- a/electrum/lnutil.py +++ b/electrum/lnutil.py @@ -7,6 +7,7 @@ from .ecc import CURVE_ORDER, sig_string_from_der_sig, ECPubkey, string_to_numbe from . import ecc, bitcoin, crypto, transaction from .transaction import opcodes from .bitcoin import push_script +from . import segwit_addr HTLC_TIMEOUT_WEIGHT = 663 HTLC_SUCCESS_WEIGHT = 703 @@ -396,3 +397,20 @@ LN_LOCAL_FEATURE_BITS_INV = inv_dict(LN_LOCAL_FEATURE_BITS) LN_GLOBAL_FEATURE_BITS = {} LN_GLOBAL_FEATURE_BITS_INV = inv_dict(LN_GLOBAL_FEATURE_BITS) + +class LNPeerAddr(namedtuple('LNPeerAddr', ['host', 'port', 'pubkey'])): + __slots__ = () + + def __str__(self): + return '{}@{}:{}'.format(bh2u(self.pubkey), self.host, self.port) + + +def get_compressed_pubkey_from_bech32(bech32_pubkey: str) -> bytes: + hrp, data_5bits = segwit_addr.bech32_decode(bech32_pubkey) + if hrp != 'ln': + raise Exception('unexpected hrp: {}'.format(hrp)) + data_8bits = segwit_addr.convertbits(data_5bits, 5, 8, False) + # pad with zeroes + COMPRESSED_PUBKEY_LENGTH = 33 + data_8bits = data_8bits + ((COMPRESSED_PUBKEY_LENGTH - len(data_8bits)) * [0]) + return bytes(data_8bits) diff --git a/electrum/lnworker.py b/electrum/lnworker.py index aa668a263..7268441fd 100644 --- a/electrum/lnworker.py +++ b/electrum/lnworker.py @@ -1,35 +1,39 @@ -import json -import binascii import asyncio import os from decimal import Decimal -import threading -from collections import defaultdict import random +import time +from typing import Optional, Sequence + +import dns.resolver +import dns.exception from . import constants from .bitcoin import sha256, COIN -from .util import bh2u, bfh, PrintError, InvoiceError -from .constants import set_testnet, set_simnet +from .util import bh2u, bfh, PrintError, InvoiceError, resolve_dns_srv from .lnbase import Peer, privkey_to_pubkey, aiosafe from .lnaddr import lnencode, LnAddr, lndecode from .ecc import der_sig_from_sig_string -from .transaction import Transaction from .lnhtlc import HTLCStateMachine -from .lnutil import Outpoint, calc_short_channel_id +from .lnutil import Outpoint, calc_short_channel_id, LNPeerAddr, get_compressed_pubkey_from_bech32 from .lnwatcher import LNChanCloseHandler from .i18n import _ -# hardcoded nodes -node_list = [ - ('ecdsa.net', '9735', '038370f0e7a03eded3e1d41dc081084a87f0afa1c5b22090b4f3abb391eb15d8ff'), -] + +NUM_PEERS_TARGET = 4 +PEER_RETRY_INTERVAL = 600 # seconds + +FALLBACK_NODE_LIST = ( + LNPeerAddr('ecdsa.net', 9735, bfh('038370f0e7a03eded3e1d41dc081084a87f0afa1c5b22090b4f3abb391eb15d8ff')), +) + class LNWorker(PrintError): def __init__(self, wallet, network): self.wallet = wallet self.network = network + self.channel_db = self.network.channel_db pk = wallet.storage.get('lightning_privkey') if pk is None: pk = bh2u(os.urandom(32)) @@ -43,17 +47,21 @@ class LNWorker(PrintError): self.invoices = wallet.storage.get('lightning_invoices', {}) for chan_id, chan in self.channels.items(): self.network.lnwatcher.watch_channel(chan, self.on_channel_utxos) + self._last_tried_peer = {} # LNPeerAddr -> unix timestamp # TODO peers that we have channels with should also be added now # but we don't store their IP/port yet.. also what if it changes? # need to listen for node_announcements and save the new IP/port - peer_list = self.config.get('lightning_peers', node_list) - for host, port, pubkey in peer_list: - self.add_peer(host, int(port), bfh(pubkey)) + self._add_peers_from_config() # wait until we see confirmations self.network.register_callback(self.on_network_update, ['updated', 'verified', 'fee_histogram']) # thread safe self.on_network_update('updated') # shortcut (don't block) if funding tx locked and verified self.network.futures.append(asyncio.run_coroutine_threadsafe(self.main_loop(), asyncio.get_event_loop())) + def _add_peers_from_config(self): + peer_list = self.config.get('lightning_peers', []) + for host, port, pubkey in peer_list: + self.add_peer(host, int(port), bfh(pubkey)) + def suggest_peer(self): for node_id, peer in self.peers.items(): if len(peer.channels) > 0: @@ -67,7 +75,8 @@ class LNWorker(PrintError): return {x: y for (x, y) in self.channels.items() if y.node_id == node_id} def add_peer(self, host, port, node_id): - peer = Peer(self, host, int(port), node_id, request_initial_sync=self.config.get("request_initial_sync", True)) + port = int(port) + peer = Peer(self, host, port, node_id, request_initial_sync=self.config.get("request_initial_sync", True)) self.network.futures.append(asyncio.run_coroutine_threadsafe(peer.main_loop(), asyncio.get_event_loop())) self.peers[node_id] = peer self.network.trigger_callback('ln_status') @@ -218,6 +227,80 @@ class LNWorker(PrintError): assert tx.is_complete() return self.network.broadcast_transaction(tx) + def _get_next_peers_to_try(self) -> Sequence[LNPeerAddr]: + now = time.time() + recent_peers = self.channel_db.get_recent_peers() + # maintenance for last tried times + for peer in list(self._last_tried_peer): + if now >= self._last_tried_peer[peer] + PEER_RETRY_INTERVAL: + del self._last_tried_peer[peer] + # first try from recent peers + for peer in recent_peers: + if peer in self.peers: + continue + if peer in self._last_tried_peer: + # due to maintenance above, this means we tried recently + continue + return [peer] + # try random peer from graph + all_nodes = self.channel_db.nodes + if all_nodes: + self.print_error('trying to get ln peers from channel db') + node_ids = list(all_nodes) + max_tries = min(200, len(all_nodes)) + for i in range(max_tries): + node_id = random.choice(node_ids) + node = all_nodes.get(node_id) + if node is None: continue + addresses = node.addresses + if not addresses: continue + host, port = addresses[0] + peer = LNPeerAddr(host, port, node_id) + if peer in self._last_tried_peer: + continue + self.print_error('taking random ln peer from our channel db') + return [peer] + + # TODO remove this. For some reason the dns seeds seem to ignore the realm byte + # and only return mainnet nodes. so for the time being dns seeding is disabled: + if constants.net in (constants.BitcoinTestnet, ): + return [random.choice(FALLBACK_NODE_LIST)] + else: + return [] + + # try peers from dns seed. + # return several peers to reduce the number of dns queries. + if not constants.net.LN_DNS_SEEDS: + return [] + dns_seed = random.choice(constants.net.LN_DNS_SEEDS) + self.print_error('asking dns seed "{}" for ln peers'.format(dns_seed)) + try: + # note: this might block for several seconds + # this will include bech32-encoded-pubkeys and ports + srv_answers = resolve_dns_srv('r{}.{}'.format( + constants.net.LN_REALM_BYTE, dns_seed)) + except dns.exception.DNSException as e: + return [] + random.shuffle(srv_answers) + num_peers = 2 * NUM_PEERS_TARGET + srv_answers = srv_answers[:num_peers] + # we now have pubkeys and ports but host is still needed + peers = [] + for srv_ans in srv_answers: + try: + # note: this might block for several seconds + answers = dns.resolver.query(srv_ans['host']) + except dns.exception.DNSException: + continue + else: + ln_host = str(answers[0]) + port = int(srv_ans['port']) + bech32_pubkey = srv_ans['host'].split('.')[0] + pubkey = get_compressed_pubkey_from_bech32(bech32_pubkey) + peers.append(LNPeerAddr(ln_host, port, pubkey)) + self.print_error('got {} ln peers from dns seed'.format(len(peers))) + return peers + @aiosafe async def main_loop(self): while True: @@ -226,15 +309,10 @@ class LNWorker(PrintError): if peer.exception: self.print_error("removing peer", peer.host) self.peers.pop(k) - if len(self.peers) > 3: + if len(self.peers) >= NUM_PEERS_TARGET: continue - if not self.network.channel_db.nodes: - continue - all_nodes = self.network.channel_db.nodes - node_id = random.choice(list(all_nodes)) - node = all_nodes.get(node_id) - addresses = node.addresses - if addresses: - host, port = addresses[0] - self.print_error("trying node", bh2u(node_id)) - self.add_peer(host, port, node_id) + peers = self._get_next_peers_to_try() + for peer in peers: + self._last_tried_peer[peer] = time.time() + self.print_error("trying node", peer) + self.add_peer(peer.host, peer.port, peer.pubkey) diff --git a/electrum/tests/test_lnutil.py b/electrum/tests/test_lnutil.py index a56632ccb..d11dee0f0 100644 --- a/electrum/tests/test_lnutil.py +++ b/electrum/tests/test_lnutil.py @@ -2,9 +2,10 @@ import unittest import json from electrum import bitcoin from electrum.lnutil import (RevocationStore, get_per_commitment_secret_from_seed, make_offered_htlc, - make_received_htlc, make_commitment, make_htlc_tx_witness, make_htlc_tx_output, - make_htlc_tx_inputs, secret_to_pubkey, derive_blinded_pubkey, derive_privkey, - derive_pubkey, make_htlc_tx, extract_ctn_from_tx, UnableToDeriveSecret) + make_received_htlc, make_commitment, make_htlc_tx_witness, make_htlc_tx_output, + make_htlc_tx_inputs, secret_to_pubkey, derive_blinded_pubkey, derive_privkey, + derive_pubkey, make_htlc_tx, extract_ctn_from_tx, UnableToDeriveSecret, + get_compressed_pubkey_from_bech32) from electrum.util import bh2u, bfh from electrum.transaction import Transaction @@ -675,3 +676,7 @@ class TestLNUtil(unittest.TestCase): index_of_pubkey = pubkeys.index(bh2u(remote_pubkey)) tx._inputs[0]["signatures"][index_of_pubkey] = remote_signature + "01" tx.raw = None + + def test_get_compressed_pubkey_from_bech32(self): + self.assertEqual(b'\x03\x84\xef\x87\xd9d\xa2\xaaa7=\xff\xb8\xfe=t8[}>;\n\x13\xa8e\x8eo:\xf5Mi\xb5H', + get_compressed_pubkey_from_bech32('ln1qwzwlp7evj325cfh8hlm3l3awsu9klf78v9p82r93ehn4a2ddx65s66awg5')) diff --git a/electrum/util.py b/electrum/util.py index b9232164f..33851fa22 100644 --- a/electrum/util.py +++ b/electrum/util.py @@ -46,6 +46,7 @@ import aiohttp from aiohttp_socks import SocksConnector, SocksVer from aiorpcx import TaskGroup import certifi +import dns.resolver from .i18n import _ from .logging import get_logger, Logger @@ -1174,3 +1175,17 @@ def list_enabled_bits(x: int) -> Sequence[int]: binary = bin(x)[2:] rev_bin = reversed(binary) return tuple(i for i, b in enumerate(rev_bin) if b == '1') + + +def resolve_dns_srv(host: str): + srv_records = dns.resolver.query(host, 'SRV') + # priority: prefer lower + # weight: tie breaker; prefer higher + srv_records = sorted(srv_records, key=lambda x: (x.priority, -x.weight)) + + def dict_from_srv_record(srv): + return { + 'host': str(srv.target), + 'port': srv.port, + } + return [dict_from_srv_record(srv) for srv in srv_records]