diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py index 695c50496..37c283f4c 100644 --- a/electrum/lnpeer.py +++ b/electrum/lnpeer.py @@ -42,6 +42,7 @@ from .lnutil import (Outpoint, LocalConfig, RECEIVED, UpdateAddHtlc, MAXIMUM_REMOTE_TO_SELF_DELAY_ACCEPTED, RemoteMisbehaving) from .lntransport import LNTransport, LNTransportBase from .lnmsg import encode_msg, decode_msg +from .lnverifier import verify_sig_for_channel_update if TYPE_CHECKING: from .lnworker import LNWorker @@ -216,34 +217,67 @@ class Peer(Logger): @log_exceptions async def process_gossip(self): # verify in peer's TaskGroup so that we fail the connection - # forward to channel_db.gossip_queue while True: - name, payload = await self.gossip_queue.get() - if name == 'node_announcement': - self.verify_node_announcement(payload) - elif name == 'channel_announcement': - self.verify_channel_announcement(payload) - elif name == 'channel_update': - pass - else: - raise Exception('unknown message') - self.channel_db.gossip_queue.put_nowait((name, payload)) - - def verify_node_announcement(self, payload): - pubkey = payload['node_id'] - signature = payload['signature'] - h = sha256d(payload['raw'][66:]) - if not ecc.verify_signature(pubkey, signature, h): - raise Exception('signature failed') - - def verify_channel_announcement(self, payload): - h = sha256d(payload['raw'][2+256:]) - pubkeys = [payload['node_id_1'], payload['node_id_2'], payload['bitcoin_key_1'], payload['bitcoin_key_2']] - sigs = [payload['node_signature_1'], payload['node_signature_2'], payload['bitcoin_signature_1'], payload['bitcoin_signature_2']] - for pubkey, sig in zip(pubkeys, sigs): - if not ecc.verify_signature(pubkey, sig, h): + await asyncio.sleep(5) + chan_anns = [] + chan_upds = [] + node_anns = [] + while True: + name, payload = await self.gossip_queue.get() + if name == 'channel_announcement': + chan_anns.append(payload) + elif name == 'channel_update': + chan_upds.append(payload) + elif name == 'node_announcement': + node_anns.append(payload) + else: + raise Exception('unknown message') + if self.gossip_queue.empty(): + break + # channel announcements + self.verify_channel_announcements(chan_anns) + self.channel_db.on_channel_announcement(chan_anns) + # node announcements + self.verify_node_announcements(node_anns) + self.channel_db.on_node_announcement(node_anns) + # channel updates + good, bad = self.channel_db.filter_channel_updates(chan_upds) + if bad: + self.logger.info(f'adding {len(bad)} unknown channel ids') + self.network.lngossip.add_new_ids(bad) + self.verify_channel_updates(good) + self.channel_db.on_channel_update(good) + # refresh gui + known = self.channel_db.num_channels + unknown = len(self.network.lngossip.unknown_ids) + self.logger.info(f'Channels: {known} of {known+unknown}') + self.network.trigger_callback('ln_status') + + def verify_channel_announcements(self, chan_anns): + for payload in chan_anns: + h = sha256d(payload['raw'][2+256:]) + pubkeys = [payload['node_id_1'], payload['node_id_2'], payload['bitcoin_key_1'], payload['bitcoin_key_2']] + sigs = [payload['node_signature_1'], payload['node_signature_2'], payload['bitcoin_signature_1'], payload['bitcoin_signature_2']] + for pubkey, sig in zip(pubkeys, sigs): + if not ecc.verify_signature(pubkey, sig, h): + raise Exception('signature failed') + + def verify_node_announcements(self, node_anns): + for payload in node_anns: + pubkey = payload['node_id'] + signature = payload['signature'] + h = sha256d(payload['raw'][66:]) + if not ecc.verify_signature(pubkey, signature, h): raise Exception('signature failed') + def verify_channel_updates(self, chan_upds): + for payload in chan_upds: + short_channel_id = payload['short_channel_id'] + if constants.net.rev_genesis_bytes() != payload['chain_hash']: + raise Exception('wrong chain hash') + if not verify_sig_for_channel_update(payload, payload['node_id']): + raise BaseException('verify error') + @log_exceptions async def query_gossip(self): await asyncio.wait_for(self.initialized.wait(), 10) @@ -851,7 +885,7 @@ class Peer(Logger): # only inject outgoing direction: channel_flags = b'\x00' if node_ids[0] == privkey_to_pubkey(self.privkey) else b'\x01' now = int(time.time()) - self.channel_db.on_channel_update( + self.channel_db.add_channel_update( { "short_channel_id": chan.short_channel_id, 'channel_flags': channel_flags, @@ -861,8 +895,7 @@ class Peer(Logger): 'fee_proportional_millionths': b'\x01', 'chain_hash': constants.net.rev_genesis_bytes(), 'timestamp': now.to_bytes(4, byteorder="big") - }, - trusted=True) + }) # peer may have sent us a channel update for the incoming direction previously # note: if we were offline when the 3rd conf happened, lnd will never send us this channel_update # see https://github.com/lightningnetwork/lnd/issues/1347 diff --git a/electrum/lnrouter.py b/electrum/lnrouter.py index 41d3885a4..080d7a370 100644 --- a/electrum/lnrouter.py +++ b/electrum/lnrouter.py @@ -35,7 +35,6 @@ from collections import defaultdict from typing import Sequence, List, Tuple, Optional, Dict, NamedTuple, TYPE_CHECKING, Set import binascii import base64 -import asyncio from sqlalchemy import Column, ForeignKey, Integer, String, Boolean from sqlalchemy.orm.query import Query @@ -224,7 +223,6 @@ class ChannelDB(SqlDB): self._channel_updates_for_private_channels = {} # type: Dict[Tuple[bytes, bytes], dict] self.ca_verifier = LNChannelVerifier(network, self) self.update_counts() - self.gossip_queue = asyncio.Queue() @sql def update_counts(self): @@ -358,27 +356,46 @@ class ChannelDB(SqlDB): return r.max_timestamp or 0 @sql - @profiler - def on_channel_update(self, msg_payloads, trusted=False): - if type(msg_payloads) is dict: - msg_payloads = [msg_payloads] + def get_info_for_updates(self, msg_payloads): short_channel_ids = [msg_payload['short_channel_id'].hex() for msg_payload in msg_payloads] channel_infos_list = self.DBSession.query(ChannelInfo).filter(ChannelInfo.short_channel_id.in_(short_channel_ids)).all() channel_infos = {bfh(x.short_channel_id): x for x in channel_infos_list} - new_policies = {} - for msg_payload in msg_payloads: - short_channel_id = msg_payload['short_channel_id'] - if constants.net.rev_genesis_bytes() != msg_payload['chain_hash']: - continue + return channel_infos + + @profiler + def filter_channel_updates(self, payloads): + # add 'node_id' to payload + channel_infos = self.get_info_for_updates(payloads) + known = [] + unknown = [] + for payload in payloads: + short_channel_id = payload['short_channel_id'] channel_info = channel_infos.get(short_channel_id) if not channel_info: + unknown.append(short_channel_id) continue - flags = int.from_bytes(msg_payload['channel_flags'], 'big') + flags = int.from_bytes(payload['channel_flags'], 'big') direction = flags & FLAG_DIRECTION - node_id = channel_info.node1_id if direction == 0 else channel_info.node2_id - if not trusted and not verify_sig_for_channel_update(msg_payload, bytes.fromhex(node_id)): - continue - short_channel_id = channel_info.short_channel_id + node_id = bfh(channel_info.node1_id if direction == 0 else channel_info.node2_id) + payload['node_id'] = node_id + known.append(payload) + return known, unknown + + def add_channel_update(self, payload): + # called in tests/test_lnrouter + good, bad = self.filter_channel_updates([payload]) + assert len(bad) == 0 + self.on_channel_update(good) + + @sql + @profiler + def on_channel_update(self, msg_payloads): + if type(msg_payloads) is dict: + msg_payloads = [msg_payloads] + new_policies = {} + for msg_payload in msg_payloads: + short_channel_id = msg_payload['short_channel_id'].hex() + node_id = msg_payload['node_id'].hex() new_policy = Policy.from_msg(msg_payload, node_id, short_channel_id) #self.logger.info(f'on_channel_update {datetime.fromtimestamp(new_policy.timestamp).ctime()}') old_policy = self.DBSession.query(Policy).filter_by(short_channel_id=short_channel_id, start_node=node_id).one_or_none() diff --git a/electrum/lnworker.py b/electrum/lnworker.py index 6bb0ea4a9..5aa469d12 100644 --- a/electrum/lnworker.py +++ b/electrum/lnworker.py @@ -244,13 +244,12 @@ class LNGossip(LNWorker): def start_network(self, network: 'Network'): super().start_network(network) - asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(self.process_gossip()), self.network.asyncio_loop) def add_new_ids(self, ids): #if complete: # self.channel_db.purge_unknown_channels(ids) known = self.channel_db.compare_channels(ids) - new = ids - set(known) + new = set(ids) - set(known) self.unknown_ids.update(new) def get_ids_to_query(self): @@ -259,34 +258,6 @@ class LNGossip(LNWorker): self.unknown_ids = set(l[N:]) return l[0:N] - @log_exceptions - async def process_gossip(self): - while True: - await asyncio.sleep(5) - chan_anns = [] - chan_upds = [] - node_anns = [] - while True: - name, payload = await self.channel_db.gossip_queue.get() - if name == 'channel_announcement': - chan_anns.append(payload) - elif name == 'channel_update': - chan_upds.append(payload) - elif name == 'node_announcement': - node_anns.append(payload) - else: - raise Exception('unknown message') - if self.channel_db.gossip_queue.empty(): - break - self.channel_db.on_channel_announcement(chan_anns) - self.channel_db.on_channel_update(chan_upds) - self.channel_db.on_node_announcement(node_anns) - # refresh gui - known = self.channel_db.num_channels - unknown = len(self.unknown_ids) - self.logger.info(f'Channels: {known} of {known+unknown}') - self.network.trigger_callback('ln_status') - def peer_closed(self, peer): self.peers.pop(peer.pubkey) diff --git a/electrum/tests/test_lnrouter.py b/electrum/tests/test_lnrouter.py index 86d10a7ba..0e7387f95 100644 --- a/electrum/tests/test_lnrouter.py +++ b/electrum/tests/test_lnrouter.py @@ -91,18 +91,18 @@ class Test_LNRouter(TestCaseForTestnet): 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'len': b'\x00\x00', 'features': b''}, trusted=True) o = lambda i: i.to_bytes(8, "big") - cdb.on_channel_update({'short_channel_id': bfh('0000000000000001'), 'message_flags': b'\x00', 'channel_flags': b'\x00', 'cltv_expiry_delta': o(10), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(150), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}, trusted=True) - cdb.on_channel_update({'short_channel_id': bfh('0000000000000001'), 'message_flags': b'\x00', 'channel_flags': b'\x01', 'cltv_expiry_delta': o(10), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(150), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}, trusted=True) - cdb.on_channel_update({'short_channel_id': bfh('0000000000000002'), 'message_flags': b'\x00', 'channel_flags': b'\x00', 'cltv_expiry_delta': o(99), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(150), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}, trusted=True) - cdb.on_channel_update({'short_channel_id': bfh('0000000000000002'), 'message_flags': b'\x00', 'channel_flags': b'\x01', 'cltv_expiry_delta': o(10), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(150), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}, trusted=True) - cdb.on_channel_update({'short_channel_id': bfh('0000000000000003'), 'message_flags': b'\x00', 'channel_flags': b'\x01', 'cltv_expiry_delta': o(10), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(150), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}, trusted=True) - cdb.on_channel_update({'short_channel_id': bfh('0000000000000003'), 'message_flags': b'\x00', 'channel_flags': b'\x00', 'cltv_expiry_delta': o(10), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(150), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}, trusted=True) - cdb.on_channel_update({'short_channel_id': bfh('0000000000000004'), 'message_flags': b'\x00', 'channel_flags': b'\x01', 'cltv_expiry_delta': o(10), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(150), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}, trusted=True) - cdb.on_channel_update({'short_channel_id': bfh('0000000000000004'), 'message_flags': b'\x00', 'channel_flags': b'\x00', 'cltv_expiry_delta': o(10), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(150), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}, trusted=True) - cdb.on_channel_update({'short_channel_id': bfh('0000000000000005'), 'message_flags': b'\x00', 'channel_flags': b'\x01', 'cltv_expiry_delta': o(10), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(150), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}, trusted=True) - cdb.on_channel_update({'short_channel_id': bfh('0000000000000005'), 'message_flags': b'\x00', 'channel_flags': b'\x00', 'cltv_expiry_delta': o(10), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(999), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}, trusted=True) - cdb.on_channel_update({'short_channel_id': bfh('0000000000000006'), 'message_flags': b'\x00', 'channel_flags': b'\x00', 'cltv_expiry_delta': o(10), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(99999999), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}, trusted=True) - cdb.on_channel_update({'short_channel_id': bfh('0000000000000006'), 'message_flags': b'\x00', 'channel_flags': b'\x01', 'cltv_expiry_delta': o(10), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(150), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}, trusted=True) + cdb.add_channel_update({'short_channel_id': bfh('0000000000000001'), 'message_flags': b'\x00', 'channel_flags': b'\x00', 'cltv_expiry_delta': o(10), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(150), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}) + cdb.add_channel_update({'short_channel_id': bfh('0000000000000001'), 'message_flags': b'\x00', 'channel_flags': b'\x01', 'cltv_expiry_delta': o(10), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(150), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}) + cdb.add_channel_update({'short_channel_id': bfh('0000000000000002'), 'message_flags': b'\x00', 'channel_flags': b'\x00', 'cltv_expiry_delta': o(99), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(150), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}) + cdb.add_channel_update({'short_channel_id': bfh('0000000000000002'), 'message_flags': b'\x00', 'channel_flags': b'\x01', 'cltv_expiry_delta': o(10), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(150), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}) + cdb.add_channel_update({'short_channel_id': bfh('0000000000000003'), 'message_flags': b'\x00', 'channel_flags': b'\x01', 'cltv_expiry_delta': o(10), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(150), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}) + cdb.add_channel_update({'short_channel_id': bfh('0000000000000003'), 'message_flags': b'\x00', 'channel_flags': b'\x00', 'cltv_expiry_delta': o(10), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(150), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}) + cdb.add_channel_update({'short_channel_id': bfh('0000000000000004'), 'message_flags': b'\x00', 'channel_flags': b'\x01', 'cltv_expiry_delta': o(10), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(150), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}) + cdb.add_channel_update({'short_channel_id': bfh('0000000000000004'), 'message_flags': b'\x00', 'channel_flags': b'\x00', 'cltv_expiry_delta': o(10), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(150), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}) + cdb.add_channel_update({'short_channel_id': bfh('0000000000000005'), 'message_flags': b'\x00', 'channel_flags': b'\x01', 'cltv_expiry_delta': o(10), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(150), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}) + cdb.add_channel_update({'short_channel_id': bfh('0000000000000005'), 'message_flags': b'\x00', 'channel_flags': b'\x00', 'cltv_expiry_delta': o(10), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(999), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}) + cdb.add_channel_update({'short_channel_id': bfh('0000000000000006'), 'message_flags': b'\x00', 'channel_flags': b'\x00', 'cltv_expiry_delta': o(10), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(99999999), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}) + cdb.add_channel_update({'short_channel_id': bfh('0000000000000006'), 'message_flags': b'\x00', 'channel_flags': b'\x01', 'cltv_expiry_delta': o(10), 'htlc_minimum_msat': o(250), 'fee_base_msat': o(100), 'fee_proportional_millionths': o(150), 'chain_hash': bfh('43497fd7f826957108f4a30fd9cec3aeba79972084e90ead01ea330900000000'), 'timestamp': b'\x00\x00\x00\x00'}) path = path_finder.find_path_for_payment(b'\x02aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', b'\x02eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee', 100000) self.assertEqual([(b'\x02bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb', b'\x00\x00\x00\x00\x00\x00\x00\x03'), (b'\x02cccccccccccccccccccccccccccccccc', b'\x00\x00\x00\x00\x00\x00\x00\x01'),