diff --git a/electrum/lnworker.py b/electrum/lnworker.py index 888a3a361..51a04a4ca 100644 --- a/electrum/lnworker.py +++ b/electrum/lnworker.py @@ -556,28 +556,30 @@ class LNGossip(LNWorker): # and disconnect only from that peer await self.channel_db.data_loaded.wait() self.logger.debug(f'process_gossip {len(chan_anns)} {len(node_anns)} {len(chan_upds)}') - # note: data processed in chunks to avoid taking sql lock for too long # channel announcements - for payload in chan_anns: - self.channel_db.verify_channel_announcement(payload) - for chan_anns_chunk in chunks(chan_anns, 300): - self.channel_db.add_channel_announcements(chan_anns_chunk) + def process_chan_anns(): + for payload in chan_anns: + self.channel_db.verify_channel_announcement(payload) + self.channel_db.add_channel_announcements(chan_anns) + await run_in_thread(process_chan_anns) # node announcements - for payload in node_anns: - self.channel_db.verify_node_announcement(payload) - for node_anns_chunk in chunks(node_anns, 100): - self.channel_db.add_node_announcements(node_anns_chunk) + def process_node_anns(): + for payload in node_anns: + self.channel_db.verify_node_announcement(payload) + self.channel_db.add_node_announcements(node_anns) + await run_in_thread(process_node_anns) # channel updates - for chan_upds_chunk in chunks(chan_upds, 1000): - categorized_chan_upds = self.channel_db.add_channel_updates( - chan_upds_chunk, max_age=self.max_age) - orphaned = categorized_chan_upds.orphaned - if orphaned: - self.logger.info(f'adding {len(orphaned)} unknown channel ids') - orphaned_ids = [c['short_channel_id'] for c in orphaned] - await self.add_new_ids(orphaned_ids) - if categorized_chan_upds.good: - self.logger.debug(f'on_channel_update: {len(categorized_chan_upds.good)}/{len(chan_upds_chunk)}') + categorized_chan_upds = await run_in_thread(partial( + self.channel_db.add_channel_updates, + chan_upds, + max_age=self.max_age)) + orphaned = categorized_chan_upds.orphaned + if orphaned: + self.logger.info(f'adding {len(orphaned)} unknown channel ids') + orphaned_ids = [c['short_channel_id'] for c in orphaned] + await self.add_new_ids(orphaned_ids) + if categorized_chan_upds.good: + self.logger.debug(f'on_channel_update: {len(categorized_chan_upds.good)}/{len(chan_upds)}') class LNWallet(LNWorker): diff --git a/electrum/sql_db.py b/electrum/sql_db.py index c6928a87f..dbfc4cb3c 100644 --- a/electrum/sql_db.py +++ b/electrum/sql_db.py @@ -13,7 +13,7 @@ def sql(func): """wrapper for sql methods""" def wrapper(self: 'SqlDB', *args, **kwargs): assert threading.currentThread() != self.sql_thread - f = asyncio.Future() + f = self.asyncio_loop.create_future() self.db_requests.put((f, func, args, kwargs)) return f return wrapper