Browse Source

move htlc_switch task to lnpeer

hard-fail-on-bad-server-string
ThomasV 5 years ago
parent
commit
f801307a08
  1. 51
      electrum/lnpeer.py
  2. 53
      electrum/lnworker.py
  3. 12
      electrum/tests/test_lnpeer.py

51
electrum/lnpeer.py

@ -249,6 +249,7 @@ class Peer(Logger):
async def main_loop(self):
async with self.taskgroup as group:
await group.spawn(self._message_loop())
await group.spawn(self.htlc_switch())
await group.spawn(self.query_gossip())
await group.spawn(self.process_gossip())
@ -1425,3 +1426,53 @@ class Peer(Logger):
# broadcast
await self.network.try_broadcasting(closing_tx, 'closing')
return closing_tx.txid()
@log_exceptions
async def htlc_switch(self):
while True:
await asyncio.sleep(0.1)
for chan_id, chan in self.channels.items():
if not chan.can_send_ctx_updates():
continue
self.maybe_send_commitment(chan)
done = set()
unfulfilled = chan.hm.log.get('unfulfilled_htlcs', {})
for htlc_id, (local_ctn, remote_ctn, onion_packet_hex, forwarded) in unfulfilled.items():
if chan.get_oldest_unrevoked_ctn(LOCAL) <= local_ctn:
continue
if chan.get_oldest_unrevoked_ctn(REMOTE) <= remote_ctn:
continue
chan.logger.info(f'found unfulfilled htlc: {htlc_id}')
onion_packet = OnionPacket.from_bytes(bytes.fromhex(onion_packet_hex))
htlc = chan.hm.log[REMOTE]['adds'][htlc_id]
payment_hash = htlc.payment_hash
processed_onion = process_onion_packet(onion_packet, associated_data=payment_hash, our_onion_private_key=self.privkey)
preimage, error = None, None
if processed_onion.are_we_final:
preimage, error = self.maybe_fulfill_htlc(
chan=chan,
htlc=htlc,
onion_packet=onion_packet,
processed_onion=processed_onion)
elif not forwarded:
next_chan, next_peer, error = self.maybe_forward_htlc(
chan=chan,
htlc=htlc,
onion_packet=onion_packet,
processed_onion=processed_onion)
if not error:
unfulfilled[htlc_id] = local_ctn, remote_ctn, onion_packet_hex, True
else:
f = self.lnworker.pending_payments[payment_hash]
if f.done():
success, preimage, error = f.result()
if preimage:
await self.lnworker.enable_htlc_settle.wait()
self.fulfill_htlc(chan, htlc.htlc_id, preimage)
done.add(htlc_id)
if error:
self.fail_htlc(chan, htlc.htlc_id, onion_packet, error)
done.add(htlc_id)
# cleanup
for htlc_id in done:
unfulfilled.pop(htlc_id)

53
electrum/lnworker.py

@ -470,7 +470,6 @@ class LNWallet(LNWorker):
self.reestablish_peers_and_channels(),
self.sync_with_local_watchtower(),
self.sync_with_remote_watchtower(),
self.htlc_switch(),
]:
tg_coro = self.taskgroup.spawn(coro)
asyncio.run_coroutine_threadsafe(tg_coro, self.network.asyncio_loop)
@ -1326,55 +1325,3 @@ class LNWallet(LNWorker):
if feerate_per_kvbyte is None:
feerate_per_kvbyte = FEERATE_FALLBACK_STATIC_FEE
return max(253, feerate_per_kvbyte // 4)
@log_exceptions
async def htlc_switch(self):
while True:
await asyncio.sleep(0.1)
for chan_id, chan in self.channels.items():
if not chan.can_send_ctx_updates():
continue
peer = self.peers[chan.node_id]
peer.maybe_send_commitment(chan)
done = set()
unfulfilled = chan.hm.log.get('unfulfilled_htlcs', {})
for htlc_id, (local_ctn, remote_ctn, onion_packet_hex, forwarded) in unfulfilled.items():
if chan.get_oldest_unrevoked_ctn(LOCAL) <= local_ctn:
continue
if chan.get_oldest_unrevoked_ctn(REMOTE) <= remote_ctn:
continue
chan.logger.info(f'found unfulfilled htlc: {htlc_id}')
onion_packet = OnionPacket.from_bytes(bytes.fromhex(onion_packet_hex))
htlc = chan.hm.log[REMOTE]['adds'][htlc_id]
payment_hash = htlc.payment_hash
processed_onion = process_onion_packet(onion_packet, associated_data=payment_hash, our_onion_private_key=peer.privkey)
if processed_onion.are_we_final:
preimage, error = peer.maybe_fulfill_htlc(
chan=chan,
htlc=htlc,
onion_packet=onion_packet,
processed_onion=processed_onion)
else:
preimage, error = None, None
if not forwarded:
next_chan, next_peer, error = peer.maybe_forward_htlc(
chan=chan,
htlc=htlc,
onion_packet=onion_packet,
processed_onion=processed_onion)
if not error:
unfulfilled[htlc_id] = local_ctn, remote_ctn, onion_packet_hex, True
else:
f = self.pending_payments[payment_hash]
if f.done():
success, preimage, error = f.result()
if preimage:
await self.enable_htlc_settle.wait()
peer.fulfill_htlc(chan, htlc.htlc_id, preimage)
done.add(htlc_id)
if error:
peer.fail_htlc(chan, htlc.htlc_id, onion_packet, error)
done.add(htlc_id)
# cleanup
for htlc_id in done:
unfulfilled.pop(htlc_id)

12
electrum/tests/test_lnpeer.py

@ -238,7 +238,7 @@ class TestPeer(ElectrumTestCase):
self.assertEqual(alice_channel.peer_state, peer_states.GOOD)
self.assertEqual(bob_channel.peer_state, peer_states.GOOD)
gath.cancel()
gath = asyncio.gather(reestablish(), p1._message_loop(), p2._message_loop(), LNWallet.htlc_switch(w1), LNWallet.htlc_switch(w2))
gath = asyncio.gather(reestablish(), p1._message_loop(), p2._message_loop(), p1.htlc_switch(), p1.htlc_switch())
async def f():
await gath
with self.assertRaises(concurrent.futures.CancelledError):
@ -253,7 +253,7 @@ class TestPeer(ElectrumTestCase):
result = await LNWallet._pay(w1, pay_req)
self.assertEqual(result, True)
gath.cancel()
gath = asyncio.gather(pay(), p1._message_loop(), p2._message_loop(), LNWallet.htlc_switch(w1), LNWallet.htlc_switch(w2))
gath = asyncio.gather(pay(), p1._message_loop(), p2._message_loop(), p1.htlc_switch(), p2.htlc_switch())
async def f():
await gath
with self.assertRaises(concurrent.futures.CancelledError):
@ -271,7 +271,7 @@ class TestPeer(ElectrumTestCase):
# wait so that pending messages are processed
#await asyncio.sleep(1)
gath.cancel()
gath = asyncio.gather(reestablish(), p1._message_loop(), p2._message_loop(), LNWallet.htlc_switch(w1), LNWallet.htlc_switch(w2))
gath = asyncio.gather(reestablish(), p1._message_loop(), p2._message_loop(), p1.htlc_switch(), p2.htlc_switch())
async def f():
await gath
with self.assertRaises(concurrent.futures.CancelledError):
@ -285,7 +285,7 @@ class TestPeer(ElectrumTestCase):
result = await LNWallet._pay(w1, pay_req)
self.assertTrue(result)
gath.cancel()
gath = asyncio.gather(pay(), p1._message_loop(), p2._message_loop(), LNWallet.htlc_switch(w1), LNWallet.htlc_switch(w2))
gath = asyncio.gather(pay(), p1._message_loop(), p2._message_loop(), p1.htlc_switch(), p2.htlc_switch())
async def f():
await gath
with self.assertRaises(concurrent.futures.CancelledError):
@ -313,7 +313,7 @@ class TestPeer(ElectrumTestCase):
async def set_settle():
await asyncio.sleep(0.1)
w2.enable_htlc_settle.set()
gath = asyncio.gather(pay(), set_settle(), p1._message_loop(), p2._message_loop(), LNWallet.htlc_switch(w1), LNWallet.htlc_switch(w2))
gath = asyncio.gather(pay(), set_settle(), p1._message_loop(), p2._message_loop(), p1.htlc_switch(), p2.htlc_switch())
async def f():
await gath
with self.assertRaises(concurrent.futures.CancelledError):
@ -338,7 +338,7 @@ class TestPeer(ElectrumTestCase):
# AssertionError is ok since we shouldn't use old routes, and the
# route finding should fail when channel is closed
async def f():
await asyncio.gather(w1._pay_to_route(route, addr), p1._message_loop(), p2._message_loop(), LNWallet.htlc_switch(w1), LNWallet.htlc_switch(w2))
await asyncio.gather(w1._pay_to_route(route, addr), p1._message_loop(), p2._message_loop(), p1.htlc_switch(), p2.htlc_switch())
with self.assertRaises(PaymentFailure):
run(f())

Loading…
Cancel
Save