Browse Source

lnworker: async gen create_routes_for_payments

patch-4
bitromortac 4 years ago
parent
commit
591a36fb3b
No known key found for this signature in database GPG Key ID: 1965063FC13BEBE2
  1. 82
      electrum/lnworker.py
  2. 39
      electrum/tests/test_lnpeer.py

82
electrum/lnworker.py

@ -8,7 +8,7 @@ from decimal import Decimal
import random import random
import time import time
from typing import (Optional, Sequence, Tuple, List, Set, Dict, TYPE_CHECKING, from typing import (Optional, Sequence, Tuple, List, Set, Dict, TYPE_CHECKING,
NamedTuple, Union, Mapping, Any, Iterable) NamedTuple, Union, Mapping, Any, Iterable, AsyncGenerator)
import threading import threading
import socket import socket
import aiohttp import aiohttp
@ -1073,20 +1073,6 @@ class LNWallet(LNWorker):
if chan.short_channel_id == short_channel_id: if chan.short_channel_id == short_channel_id:
return chan return chan
def create_routes_from_invoice(self, amount_msat: int, decoded_invoice: LnAddr, *, full_path=None):
return self.create_routes_for_payment(
amount_msat=amount_msat,
final_total_msat=amount_msat,
invoice_pubkey=decoded_invoice.pubkey.serialize(),
min_cltv_expiry=decoded_invoice.get_min_final_cltv_expiry(),
r_tags=decoded_invoice.get_routing_info('r'),
invoice_features=decoded_invoice.get_features(),
trampoline_fee_level=0,
use_two_trampolines=False,
payment_hash=decoded_invoice.paymenthash,
payment_secret=decoded_invoice.payment_secret,
full_path=full_path)
@log_exceptions @log_exceptions
async def pay_invoice( async def pay_invoice(
self, invoice: str, *, self, invoice: str, *,
@ -1173,8 +1159,7 @@ class LNWallet(LNWorker):
# 1. create a set of routes for remaining amount. # 1. create a set of routes for remaining amount.
# note: path-finding runs in a separate thread so that we don't block the asyncio loop # note: path-finding runs in a separate thread so that we don't block the asyncio loop
# graph updates might occur during the computation # graph updates might occur during the computation
routes = await run_in_thread(partial( routes = self.create_routes_for_payment(
self.create_routes_for_payment,
amount_msat=amount_to_send, amount_msat=amount_to_send,
final_total_msat=amount_to_pay, final_total_msat=amount_to_pay,
invoice_pubkey=node_pubkey, invoice_pubkey=node_pubkey,
@ -1186,9 +1171,10 @@ class LNWallet(LNWorker):
payment_secret=payment_secret, payment_secret=payment_secret,
trampoline_fee_level=trampoline_fee_level, trampoline_fee_level=trampoline_fee_level,
use_two_trampolines=use_two_trampolines, use_two_trampolines=use_two_trampolines,
fwd_trampoline_onion=fwd_trampoline_onion)) fwd_trampoline_onion=fwd_trampoline_onion
)
# 2. send htlcs # 2. send htlcs
for route, amount_msat, total_msat, amount_receiver_msat, cltv_delta, bucket_payment_secret, trampoline_onion in routes: async for route, amount_msat, total_msat, amount_receiver_msat, cltv_delta, bucket_payment_secret, trampoline_onion in routes:
amount_inflight += amount_receiver_msat amount_inflight += amount_receiver_msat
if amount_inflight > amount_to_pay: # safety belts if amount_inflight > amount_to_pay: # safety belts
raise Exception(f"amount_inflight={amount_inflight} > amount_to_pay={amount_to_pay}") raise Exception(f"amount_inflight={amount_inflight} > amount_to_pay={amount_to_pay}")
@ -1432,8 +1418,7 @@ class LNWallet(LNWorker):
else: else:
return random.choice(list(hardcoded_trampoline_nodes().values())).pubkey return random.choice(list(hardcoded_trampoline_nodes().values())).pubkey
@profiler async def create_routes_for_payment(
def create_routes_for_payment(
self, *, self, *,
amount_msat: int, # part of payment amount we want routes for now amount_msat: int, # part of payment amount we want routes for now
final_total_msat: int, # total payment amount final receiver will get final_total_msat: int, # total payment amount final receiver will get
@ -1446,7 +1431,7 @@ class LNWallet(LNWorker):
trampoline_fee_level: int, trampoline_fee_level: int,
use_two_trampolines: bool, use_two_trampolines: bool,
fwd_trampoline_onion = None, fwd_trampoline_onion = None,
full_path: LNPaymentPath = None) -> Sequence[Tuple[LNPaymentRoute, int]]: full_path: LNPaymentPath = None) -> AsyncGenerator[Tuple[LNPaymentRoute, int], None]:
"""Creates multiple routes for splitting a payment over the available """Creates multiple routes for splitting a payment over the available
private channels. private channels.
@ -1502,20 +1487,24 @@ class LNWallet(LNWorker):
cltv_expiry_delta=0, cltv_expiry_delta=0,
node_features=trampoline_features) node_features=trampoline_features)
] ]
routes = [(route, amount_with_fees, trampoline_total_msat, amount_msat, cltv_delta, trampoline_payment_secret, trampoline_onion)] yield route, amount_with_fees, trampoline_total_msat, amount_msat, cltv_delta, trampoline_payment_secret, trampoline_onion
break break
else: else:
raise NoPathFound() raise NoPathFound()
else: else:
route = self.create_route_for_payment( route = await run_in_thread(
amount_msat=amount_msat, partial(
invoice_pubkey=invoice_pubkey, self.create_route_for_payment,
min_cltv_expiry=min_cltv_expiry, amount_msat=amount_msat,
r_tags=r_tags, invoice_pubkey=invoice_pubkey,
invoice_features=invoice_features, min_cltv_expiry=min_cltv_expiry,
channels=active_channels, r_tags=r_tags,
full_path=full_path) invoice_features=invoice_features,
routes = [(route, amount_msat, final_total_msat, amount_msat, min_cltv_expiry, payment_secret, fwd_trampoline_onion)] channels=active_channels,
full_path=full_path
)
)
yield route, amount_msat, final_total_msat, amount_msat, min_cltv_expiry, payment_secret, fwd_trampoline_onion
except NoPathFound: except NoPathFound:
if not invoice_features.supports(LnFeatures.BASIC_MPP_OPT): if not invoice_features.supports(LnFeatures.BASIC_MPP_OPT):
raise raise
@ -1532,7 +1521,6 @@ class LNWallet(LNWorker):
for s in split_configurations: for s in split_configurations:
self.logger.info(f"trying split configuration: {s[0].values()} rating: {s[1]}") self.logger.info(f"trying split configuration: {s[0].values()} rating: {s[1]}")
routes = []
try: try:
if not self.channel_db: if not self.channel_db:
buckets = defaultdict(list) buckets = defaultdict(list)
@ -1577,7 +1565,7 @@ class LNWallet(LNWorker):
node_features=trampoline_features) node_features=trampoline_features)
] ]
self.logger.info(f'adding route {part_amount_msat} {delta_fee} {margin}') self.logger.info(f'adding route {part_amount_msat} {delta_fee} {margin}')
routes.append((route, part_amount_msat_with_fees, bucket_amount_with_fees, part_amount_msat, bucket_cltv_delta, bucket_payment_secret, trampoline_onion)) yield route, part_amount_msat_with_fees, bucket_amount_with_fees, part_amount_msat, bucket_cltv_delta, bucket_payment_secret, trampoline_onion
if bucket_fees != 0: if bucket_fees != 0:
self.logger.info('not enough margin to pay trampoline fee') self.logger.info('not enough margin to pay trampoline fee')
raise NoPathFound() raise NoPathFound()
@ -1585,23 +1573,27 @@ class LNWallet(LNWorker):
for (chan_id, _), part_amount_msat in s[0].items(): for (chan_id, _), part_amount_msat in s[0].items():
if part_amount_msat: if part_amount_msat:
channel = self.channels[chan_id] channel = self.channels[chan_id]
route = self.create_route_for_payment( route = await run_in_thread(
amount_msat=part_amount_msat, partial(
invoice_pubkey=invoice_pubkey, self.create_route_for_payment,
min_cltv_expiry=min_cltv_expiry, amount_msat=part_amount_msat,
r_tags=r_tags, invoice_pubkey=invoice_pubkey,
invoice_features=invoice_features, min_cltv_expiry=min_cltv_expiry,
channels=[channel], r_tags=r_tags,
full_path=None) invoice_features=invoice_features,
routes.append((route, part_amount_msat, final_total_msat, part_amount_msat, min_cltv_expiry, payment_secret, fwd_trampoline_onion)) channels=[channel],
full_path=None
)
)
yield route, part_amount_msat, final_total_msat, part_amount_msat, min_cltv_expiry, payment_secret, fwd_trampoline_onion
self.logger.info(f"found acceptable split configuration: {list(s[0].values())} rating: {s[1]}") self.logger.info(f"found acceptable split configuration: {list(s[0].values())} rating: {s[1]}")
break break
except NoPathFound: except NoPathFound:
continue continue
else: else:
raise NoPathFound() raise NoPathFound()
return routes
@profiler
def create_route_for_payment( def create_route_for_payment(
self, *, self, *,
amount_msat: int, amount_msat: int,
@ -1610,7 +1602,7 @@ class LNWallet(LNWorker):
r_tags, r_tags,
invoice_features: int, invoice_features: int,
channels: List[Channel], channels: List[Channel],
full_path: Optional[LNPaymentPath]) -> Tuple[LNPaymentRoute, int]: full_path: Optional[LNPaymentPath]) -> LNPaymentRoute:
scid_to_my_channels = { scid_to_my_channels = {
chan.short_channel_id: chan for chan in channels chan.short_channel_id: chan for chan in channels

39
electrum/tests/test_lnpeer.py

@ -192,6 +192,20 @@ class MockLNWallet(Logger, NetworkRetryManager[LNPeerAddr]):
self.channel_db.stop() self.channel_db.stop()
await self.channel_db.stopped_event.wait() await self.channel_db.stopped_event.wait()
async def create_routes_from_invoice(self, amount_msat: int, decoded_invoice: LnAddr, *, full_path=None):
return [r async for r in self.create_routes_for_payment(
amount_msat=amount_msat,
final_total_msat=amount_msat,
invoice_pubkey=decoded_invoice.pubkey.serialize(),
min_cltv_expiry=decoded_invoice.get_min_final_cltv_expiry(),
r_tags=decoded_invoice.get_routing_info('r'),
invoice_features=decoded_invoice.get_features(),
trampoline_fee_level=0,
use_two_trampolines=False,
payment_hash=decoded_invoice.paymenthash,
payment_secret=decoded_invoice.payment_secret,
full_path=full_path)]
get_payments = LNWallet.get_payments get_payments = LNWallet.get_payments
get_payment_info = LNWallet.get_payment_info get_payment_info = LNWallet.get_payment_info
save_payment_info = LNWallet.save_payment_info save_payment_info = LNWallet.save_payment_info
@ -206,7 +220,6 @@ class MockLNWallet(Logger, NetworkRetryManager[LNPeerAddr]):
get_preimage = LNWallet.get_preimage get_preimage = LNWallet.get_preimage
create_route_for_payment = LNWallet.create_route_for_payment create_route_for_payment = LNWallet.create_route_for_payment
create_routes_for_payment = LNWallet.create_routes_for_payment create_routes_for_payment = LNWallet.create_routes_for_payment
create_routes_from_invoice = LNWallet.create_routes_from_invoice
_check_invoice = staticmethod(LNWallet._check_invoice) _check_invoice = staticmethod(LNWallet._check_invoice)
pay_to_route = LNWallet.pay_to_route pay_to_route = LNWallet.pay_to_route
pay_to_node = LNWallet.pay_to_node pay_to_node = LNWallet.pay_to_node
@ -598,7 +611,7 @@ class TestPeer(TestCaseForTestnet):
q2 = w2.sent_htlcs[lnaddr1.paymenthash] q2 = w2.sent_htlcs[lnaddr1.paymenthash]
# alice sends htlc BUT NOT COMMITMENT_SIGNED # alice sends htlc BUT NOT COMMITMENT_SIGNED
p1.maybe_send_commitment = lambda x: None p1.maybe_send_commitment = lambda x: None
route1 = w1.create_routes_from_invoice(lnaddr2.get_amount_msat(), decoded_invoice=lnaddr2)[0][0] route1 = (await w1.create_routes_from_invoice(lnaddr2.get_amount_msat(), decoded_invoice=lnaddr2))[0][0]
amount_msat = lnaddr2.get_amount_msat() amount_msat = lnaddr2.get_amount_msat()
await w1.pay_to_route( await w1.pay_to_route(
route=route1, route=route1,
@ -612,7 +625,7 @@ class TestPeer(TestCaseForTestnet):
p1.maybe_send_commitment = _maybe_send_commitment1 p1.maybe_send_commitment = _maybe_send_commitment1
# bob sends htlc BUT NOT COMMITMENT_SIGNED # bob sends htlc BUT NOT COMMITMENT_SIGNED
p2.maybe_send_commitment = lambda x: None p2.maybe_send_commitment = lambda x: None
route2 = w2.create_routes_from_invoice(lnaddr1.get_amount_msat(), decoded_invoice=lnaddr1)[0][0] route2 = (await w2.create_routes_from_invoice(lnaddr1.get_amount_msat(), decoded_invoice=lnaddr1))[0][0]
amount_msat = lnaddr1.get_amount_msat() amount_msat = lnaddr1.get_amount_msat()
await w2.pay_to_route( await w2.pay_to_route(
route=route2, route=route2,
@ -982,14 +995,14 @@ class TestPeer(TestCaseForTestnet):
await asyncio.wait_for(p1.initialized, 1) await asyncio.wait_for(p1.initialized, 1)
await asyncio.wait_for(p2.initialized, 1) await asyncio.wait_for(p2.initialized, 1)
# alice sends htlc # alice sends htlc
route, amount_msat = w1.create_routes_from_invoice(lnaddr.get_amount_msat(), decoded_invoice=lnaddr)[0][0:2] route, amount_msat = (await w1.create_routes_from_invoice(lnaddr.get_amount_msat(), decoded_invoice=lnaddr))[0][0:2]
htlc = p1.pay(route=route, p1.pay(route=route,
chan=alice_channel, chan=alice_channel,
amount_msat=lnaddr.get_amount_msat(), amount_msat=lnaddr.get_amount_msat(),
total_msat=lnaddr.get_amount_msat(), total_msat=lnaddr.get_amount_msat(),
payment_hash=lnaddr.paymenthash, payment_hash=lnaddr.paymenthash,
min_final_cltv_expiry=lnaddr.get_min_final_cltv_expiry(), min_final_cltv_expiry=lnaddr.get_min_final_cltv_expiry(),
payment_secret=lnaddr.payment_secret) payment_secret=lnaddr.payment_secret)
# alice closes # alice closes
await p1.close_channel(alice_channel.channel_id) await p1.close_channel(alice_channel.channel_id)
gath.cancel() gath.cancel()
@ -1078,7 +1091,7 @@ class TestPeer(TestCaseForTestnet):
lnaddr, pay_req = run(self.prepare_invoice(w2)) lnaddr, pay_req = run(self.prepare_invoice(w2))
lnaddr = w1._check_invoice(pay_req) lnaddr = w1._check_invoice(pay_req)
route, amount_msat = w1.create_routes_from_invoice(lnaddr.get_amount_msat(), decoded_invoice=lnaddr)[0][0:2] route, amount_msat = run(w1.create_routes_from_invoice(lnaddr.get_amount_msat(), decoded_invoice=lnaddr))[0][0:2]
assert amount_msat == lnaddr.get_amount_msat() assert amount_msat == lnaddr.get_amount_msat()
run(w1.force_close_channel(alice_channel.channel_id)) run(w1.force_close_channel(alice_channel.channel_id))
@ -1086,7 +1099,7 @@ class TestPeer(TestCaseForTestnet):
assert q1.qsize() == 1 assert q1.qsize() == 1
with self.assertRaises(NoPathFound) as e: with self.assertRaises(NoPathFound) as e:
w1.create_routes_from_invoice(lnaddr.get_amount_msat(), decoded_invoice=lnaddr) run(w1.create_routes_from_invoice(lnaddr.get_amount_msat(), decoded_invoice=lnaddr))
peer = w1.peers[route[0].node_id] peer = w1.peers[route[0].node_id]
# AssertionError is ok since we shouldn't use old routes, and the # AssertionError is ok since we shouldn't use old routes, and the

Loading…
Cancel
Save