From d229bb4e4d582515a802f110e8818a23da8cbb99 Mon Sep 17 00:00:00 2001 From: SomberNight Date: Fri, 16 Aug 2019 22:24:26 +0200 Subject: [PATCH] lnpeer: restore "temp save orphan channel updates" functionality needed to handle race where remote might send chan_upd too soon (before we save the short channel id for the channel after it reaches funding locked) --- electrum/channel_db.py | 41 ++++++++++++++++++++++++++++------------- electrum/lnpeer.py | 25 +++++++++++++++++-------- 2 files changed, 45 insertions(+), 21 deletions(-) diff --git a/electrum/channel_db.py b/electrum/channel_db.py index f6daf0eaf..a046db008 100644 --- a/electrum/channel_db.py +++ b/electrum/channel_db.py @@ -188,6 +188,15 @@ class Address(NamedTuple): port: int last_connected_date: int + +class CategorizedChannelUpdates(NamedTuple): + orphaned: List # no channel announcement for channel update + expired: List # update older than two weeks + deprecated: List # update older than database entry + good: List # good updates + to_delete: List # database entries to delete + + create_channel_info = """ CREATE TABLE IF NOT EXISTS channel_info ( short_channel_id VARCHAR(64), @@ -241,7 +250,7 @@ class ChannelDB(SqlDB): self._channel_updates_for_private_channels = {} # type: Dict[Tuple[bytes, bytes], dict] self.ca_verifier = LNChannelVerifier(network, self) # initialized in load_data - self._channels = {} + self._channels = {} # type: Dict[bytes, ChannelInfo] self._policies = {} self._nodes = {} self._addresses = defaultdict(set) @@ -320,12 +329,12 @@ class ChannelDB(SqlDB): if old_policy.channel_flags != new_policy.channel_flags: self.logger.info(f'channel_flags: {old_policy.channel_flags} -> {new_policy.channel_flags}') - def add_channel_updates(self, payloads, max_age=None, verify=True): - orphaned = [] # no channel announcement for channel update - expired = [] # update older than two weeks - deprecated = [] # update older than database entry - good = [] # good updates - to_delete = [] # database entries to delete + def add_channel_updates(self, payloads, max_age=None, verify=True) -> CategorizedChannelUpdates: + orphaned = [] + expired = [] + deprecated = [] + good = [] + to_delete = [] # filter orphaned and expired first known = [] now = int(time.time()) @@ -333,11 +342,11 @@ class ChannelDB(SqlDB): short_channel_id = payload['short_channel_id'] timestamp = int.from_bytes(payload['timestamp'], "big") if max_age and now - timestamp > max_age: - expired.append(short_channel_id) + expired.append(payload) continue channel_info = self._channels.get(short_channel_id) if not channel_info: - orphaned.append(short_channel_id) + orphaned.append(payload) continue flags = int.from_bytes(payload['channel_flags'], 'big') direction = flags & FLAG_DIRECTION @@ -352,7 +361,7 @@ class ChannelDB(SqlDB): key = (start_node, short_channel_id) old_policy = self._policies.get(key) if old_policy and timestamp <= old_policy.timestamp: - deprecated.append(short_channel_id) + deprecated.append(payload) continue good.append(payload) if verify: @@ -362,11 +371,17 @@ class ChannelDB(SqlDB): self.save_policy(policy) # self.update_counts() - return orphaned, expired, deprecated, good, to_delete + return CategorizedChannelUpdates( + orphaned=orphaned, + expired=expired, + deprecated=deprecated, + good=good, + to_delete=to_delete, + ) def add_channel_update(self, payload): - orphaned, expired, deprecated, good, to_delete = self.add_channel_updates([payload], verify=False) - assert len(good) == 1 + categorized_chan_upds = self.add_channel_updates([payload], verify=False) + assert len(categorized_chan_upds.good) == 1 def create_database(self): c = self.conn.cursor() diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py index a2873b99d..aa8f5e07b 100644 --- a/electrum/lnpeer.py +++ b/electrum/lnpeer.py @@ -266,13 +266,22 @@ class Peer(Logger): self.channel_db.add_node_announcement(node_anns_chunk) # channel updates for chan_upds_chunk in chunks(chan_upds, 1000): - orphaned, expired, deprecated, good, to_delete = self.channel_db.add_channel_updates( + categorized_chan_upds = self.channel_db.add_channel_updates( chan_upds_chunk, max_age=self.network.lngossip.max_age) + orphaned = categorized_chan_upds.orphaned if orphaned: self.logger.info(f'adding {len(orphaned)} unknown channel ids') await self.network.lngossip.add_new_ids(orphaned) - if good: - self.logger.debug(f'on_channel_update: {len(good)}/{len(chan_upds_chunk)}') + # Save (some bounded number of) orphan channel updates for later + # as it might be for our own direct channel with this peer + # (and we might not yet know the short channel id for that) + for chan_upd_payload in orphaned: + short_channel_id = chan_upd_payload['short_channel_id'] + self.orphan_channel_updates[short_channel_id] = chan_upd_payload + while len(self.orphan_channel_updates) > 25: + self.orphan_channel_updates.popitem(last=False) + if categorized_chan_upds.good: + self.logger.debug(f'on_channel_update: {len(categorized_chan_upds.good)}/{len(chan_upds_chunk)}') # refresh gui if chan_anns or node_anns or chan_upds: self.network.lngossip.refresh_gui() @@ -1074,18 +1083,18 @@ class Peer(Logger): channel_update = (258).to_bytes(length=2, byteorder="big") + data[offset+2: offset+2+channel_update_len] message_type, payload = decode_msg(channel_update) payload['raw'] = channel_update - orphaned, expired, deprecated, good, to_delete = self.channel_db.add_channel_updates([payload]) + categorized_chan_upds = self.channel_db.add_channel_updates([payload]) blacklist = False - if good: + if categorized_chan_upds.good: self.logger.info("applied channel update on our db") - elif orphaned: + elif categorized_chan_upds.orphaned: # maybe it is a private channel (and data in invoice was outdated) self.logger.info("maybe channel update is for private channel?") start_node_id = route[sender_idx].node_id self.channel_db.add_channel_update_for_private_channel(payload, start_node_id) - elif expired: + elif categorized_chan_upds.expired: blacklist = True - elif deprecated: + elif categorized_chan_upds.deprecated: self.logger.info(f'channel update is not more recent.') blacklist = True else: