From f801307a085b59c0d6126f5de7117c2d8df31b37 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Mon, 2 Mar 2020 15:41:50 +0100 Subject: [PATCH] move htlc_switch task to lnpeer --- electrum/lnpeer.py | 51 +++++++++++++++++++++++++++++++++ electrum/lnworker.py | 53 ----------------------------------- electrum/tests/test_lnpeer.py | 12 ++++---- 3 files changed, 57 insertions(+), 59 deletions(-) diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py index b38192972..12af8fe66 100644 --- a/electrum/lnpeer.py +++ b/electrum/lnpeer.py @@ -249,6 +249,7 @@ class Peer(Logger): async def main_loop(self): async with self.taskgroup as group: await group.spawn(self._message_loop()) + await group.spawn(self.htlc_switch()) await group.spawn(self.query_gossip()) await group.spawn(self.process_gossip()) @@ -1425,3 +1426,53 @@ class Peer(Logger): # broadcast await self.network.try_broadcasting(closing_tx, 'closing') return closing_tx.txid() + + @log_exceptions + async def htlc_switch(self): + while True: + await asyncio.sleep(0.1) + for chan_id, chan in self.channels.items(): + if not chan.can_send_ctx_updates(): + continue + self.maybe_send_commitment(chan) + done = set() + unfulfilled = chan.hm.log.get('unfulfilled_htlcs', {}) + for htlc_id, (local_ctn, remote_ctn, onion_packet_hex, forwarded) in unfulfilled.items(): + if chan.get_oldest_unrevoked_ctn(LOCAL) <= local_ctn: + continue + if chan.get_oldest_unrevoked_ctn(REMOTE) <= remote_ctn: + continue + chan.logger.info(f'found unfulfilled htlc: {htlc_id}') + onion_packet = OnionPacket.from_bytes(bytes.fromhex(onion_packet_hex)) + htlc = chan.hm.log[REMOTE]['adds'][htlc_id] + payment_hash = htlc.payment_hash + processed_onion = process_onion_packet(onion_packet, associated_data=payment_hash, our_onion_private_key=self.privkey) + preimage, error = None, None + if processed_onion.are_we_final: + preimage, error = self.maybe_fulfill_htlc( + chan=chan, + htlc=htlc, + onion_packet=onion_packet, + processed_onion=processed_onion) + elif not forwarded: + next_chan, next_peer, error = self.maybe_forward_htlc( + chan=chan, + htlc=htlc, + onion_packet=onion_packet, + processed_onion=processed_onion) + if not error: + unfulfilled[htlc_id] = local_ctn, remote_ctn, onion_packet_hex, True + else: + f = self.lnworker.pending_payments[payment_hash] + if f.done(): + success, preimage, error = f.result() + if preimage: + await self.lnworker.enable_htlc_settle.wait() + self.fulfill_htlc(chan, htlc.htlc_id, preimage) + done.add(htlc_id) + if error: + self.fail_htlc(chan, htlc.htlc_id, onion_packet, error) + done.add(htlc_id) + # cleanup + for htlc_id in done: + unfulfilled.pop(htlc_id) diff --git a/electrum/lnworker.py b/electrum/lnworker.py index ea9230c41..baa34cc08 100644 --- a/electrum/lnworker.py +++ b/electrum/lnworker.py @@ -470,7 +470,6 @@ class LNWallet(LNWorker): self.reestablish_peers_and_channels(), self.sync_with_local_watchtower(), self.sync_with_remote_watchtower(), - self.htlc_switch(), ]: tg_coro = self.taskgroup.spawn(coro) asyncio.run_coroutine_threadsafe(tg_coro, self.network.asyncio_loop) @@ -1326,55 +1325,3 @@ class LNWallet(LNWorker): if feerate_per_kvbyte is None: feerate_per_kvbyte = FEERATE_FALLBACK_STATIC_FEE return max(253, feerate_per_kvbyte // 4) - - @log_exceptions - async def htlc_switch(self): - while True: - await asyncio.sleep(0.1) - for chan_id, chan in self.channels.items(): - if not chan.can_send_ctx_updates(): - continue - peer = self.peers[chan.node_id] - peer.maybe_send_commitment(chan) - done = set() - unfulfilled = chan.hm.log.get('unfulfilled_htlcs', {}) - for htlc_id, (local_ctn, remote_ctn, onion_packet_hex, forwarded) in unfulfilled.items(): - if chan.get_oldest_unrevoked_ctn(LOCAL) <= local_ctn: - continue - if chan.get_oldest_unrevoked_ctn(REMOTE) <= remote_ctn: - continue - chan.logger.info(f'found unfulfilled htlc: {htlc_id}') - onion_packet = OnionPacket.from_bytes(bytes.fromhex(onion_packet_hex)) - htlc = chan.hm.log[REMOTE]['adds'][htlc_id] - payment_hash = htlc.payment_hash - processed_onion = process_onion_packet(onion_packet, associated_data=payment_hash, our_onion_private_key=peer.privkey) - if processed_onion.are_we_final: - preimage, error = peer.maybe_fulfill_htlc( - chan=chan, - htlc=htlc, - onion_packet=onion_packet, - processed_onion=processed_onion) - else: - preimage, error = None, None - if not forwarded: - next_chan, next_peer, error = peer.maybe_forward_htlc( - chan=chan, - htlc=htlc, - onion_packet=onion_packet, - processed_onion=processed_onion) - if not error: - unfulfilled[htlc_id] = local_ctn, remote_ctn, onion_packet_hex, True - else: - f = self.pending_payments[payment_hash] - if f.done(): - success, preimage, error = f.result() - if preimage: - await self.enable_htlc_settle.wait() - peer.fulfill_htlc(chan, htlc.htlc_id, preimage) - done.add(htlc_id) - if error: - peer.fail_htlc(chan, htlc.htlc_id, onion_packet, error) - done.add(htlc_id) - # cleanup - for htlc_id in done: - unfulfilled.pop(htlc_id) diff --git a/electrum/tests/test_lnpeer.py b/electrum/tests/test_lnpeer.py index 6acba2c25..2c31f23db 100644 --- a/electrum/tests/test_lnpeer.py +++ b/electrum/tests/test_lnpeer.py @@ -238,7 +238,7 @@ class TestPeer(ElectrumTestCase): self.assertEqual(alice_channel.peer_state, peer_states.GOOD) self.assertEqual(bob_channel.peer_state, peer_states.GOOD) gath.cancel() - gath = asyncio.gather(reestablish(), p1._message_loop(), p2._message_loop(), LNWallet.htlc_switch(w1), LNWallet.htlc_switch(w2)) + gath = asyncio.gather(reestablish(), p1._message_loop(), p2._message_loop(), p1.htlc_switch(), p1.htlc_switch()) async def f(): await gath with self.assertRaises(concurrent.futures.CancelledError): @@ -253,7 +253,7 @@ class TestPeer(ElectrumTestCase): result = await LNWallet._pay(w1, pay_req) self.assertEqual(result, True) gath.cancel() - gath = asyncio.gather(pay(), p1._message_loop(), p2._message_loop(), LNWallet.htlc_switch(w1), LNWallet.htlc_switch(w2)) + gath = asyncio.gather(pay(), p1._message_loop(), p2._message_loop(), p1.htlc_switch(), p2.htlc_switch()) async def f(): await gath with self.assertRaises(concurrent.futures.CancelledError): @@ -271,7 +271,7 @@ class TestPeer(ElectrumTestCase): # wait so that pending messages are processed #await asyncio.sleep(1) gath.cancel() - gath = asyncio.gather(reestablish(), p1._message_loop(), p2._message_loop(), LNWallet.htlc_switch(w1), LNWallet.htlc_switch(w2)) + gath = asyncio.gather(reestablish(), p1._message_loop(), p2._message_loop(), p1.htlc_switch(), p2.htlc_switch()) async def f(): await gath with self.assertRaises(concurrent.futures.CancelledError): @@ -285,7 +285,7 @@ class TestPeer(ElectrumTestCase): result = await LNWallet._pay(w1, pay_req) self.assertTrue(result) gath.cancel() - gath = asyncio.gather(pay(), p1._message_loop(), p2._message_loop(), LNWallet.htlc_switch(w1), LNWallet.htlc_switch(w2)) + gath = asyncio.gather(pay(), p1._message_loop(), p2._message_loop(), p1.htlc_switch(), p2.htlc_switch()) async def f(): await gath with self.assertRaises(concurrent.futures.CancelledError): @@ -313,7 +313,7 @@ class TestPeer(ElectrumTestCase): async def set_settle(): await asyncio.sleep(0.1) w2.enable_htlc_settle.set() - gath = asyncio.gather(pay(), set_settle(), p1._message_loop(), p2._message_loop(), LNWallet.htlc_switch(w1), LNWallet.htlc_switch(w2)) + gath = asyncio.gather(pay(), set_settle(), p1._message_loop(), p2._message_loop(), p1.htlc_switch(), p2.htlc_switch()) async def f(): await gath with self.assertRaises(concurrent.futures.CancelledError): @@ -338,7 +338,7 @@ class TestPeer(ElectrumTestCase): # AssertionError is ok since we shouldn't use old routes, and the # route finding should fail when channel is closed async def f(): - await asyncio.gather(w1._pay_to_route(route, addr), p1._message_loop(), p2._message_loop(), LNWallet.htlc_switch(w1), LNWallet.htlc_switch(w2)) + await asyncio.gather(w1._pay_to_route(route, addr), p1._message_loop(), p2._message_loop(), p1.htlc_switch(), p2.htlc_switch()) with self.assertRaises(PaymentFailure): run(f())