Browse Source

lnworker: improve route creation

- Separates the trampoline and local routing multi-part payment cases.
- Ask only for splits that don't send over a single channel (those have
  been tried already in the single-part case).
- Makes sure that create_routes_for_payment only yields partial routes
  that belong to a single split configuration.
- Tracks trampoline fee levels on a per node basis, previously, in the
  case of having two channels with a trampoline forwarder, the global
  fee level would have increased by two levels upon first try.
patch-4
bitromortac 3 years ago
parent
commit
68bc9c2474
No known key found for this signature in database GPG Key ID: 1965063FC13BEBE2
  1. 96
      electrum/lnworker.py
  2. 16
      electrum/mpp_split.py
  3. 2
      electrum/tests/test_lnpeer.py
  4. 4
      electrum/tests/test_mpp_split.py
  5. 11
      electrum/trampoline.py

96
electrum/lnworker.py

@ -8,7 +8,7 @@ from decimal import Decimal
import random import random
import time import time
from typing import (Optional, Sequence, Tuple, List, Set, Dict, TYPE_CHECKING, from typing import (Optional, Sequence, Tuple, List, Set, Dict, TYPE_CHECKING,
NamedTuple, Union, Mapping, Any, Iterable, AsyncGenerator) NamedTuple, Union, Mapping, Any, Iterable, AsyncGenerator, DefaultDict)
import threading import threading
import socket import socket
import aiohttp import aiohttp
@ -1148,7 +1148,7 @@ class LNWallet(LNWorker):
raise OnionRoutingFailure(code=OnionFailureCode.TRAMPOLINE_EXPIRY_TOO_SOON, data=b'') raise OnionRoutingFailure(code=OnionFailureCode.TRAMPOLINE_EXPIRY_TOO_SOON, data=b'')
self.logs[payment_hash.hex()] = log = [] self.logs[payment_hash.hex()] = log = []
trampoline_fee_level = self.INITIAL_TRAMPOLINE_FEE_LEVEL trampoline_fee_levels = defaultdict(lambda: self.INITIAL_TRAMPOLINE_FEE_LEVEL) # type: DefaultDict[bytes, int]
use_two_trampolines = True # only used for pay to legacy use_two_trampolines = True # only used for pay to legacy
amount_inflight = 0 # what we sent in htlcs (that receiver gets, without fees) amount_inflight = 0 # what we sent in htlcs (that receiver gets, without fees)
@ -1168,7 +1168,7 @@ class LNWallet(LNWorker):
full_path=full_path, full_path=full_path,
payment_hash=payment_hash, payment_hash=payment_hash,
payment_secret=payment_secret, payment_secret=payment_secret,
trampoline_fee_level=trampoline_fee_level, trampoline_fee_levels=trampoline_fee_levels,
use_two_trampolines=use_two_trampolines, use_two_trampolines=use_two_trampolines,
fwd_trampoline_onion=fwd_trampoline_onion fwd_trampoline_onion=fwd_trampoline_onion
) )
@ -1211,11 +1211,12 @@ class LNWallet(LNWorker):
# if we get a channel update, we might retry the same route and amount # if we get a channel update, we might retry the same route and amount
route = htlc_log.route route = htlc_log.route
sender_idx = htlc_log.sender_idx sender_idx = htlc_log.sender_idx
erring_node_id = route[sender_idx].node_id
failure_msg = htlc_log.failure_msg failure_msg = htlc_log.failure_msg
code, data = failure_msg.code, failure_msg.data code, data = failure_msg.code, failure_msg.data
self.logger.info(f"UPDATE_FAIL_HTLC. code={repr(code)}. " self.logger.info(f"UPDATE_FAIL_HTLC. code={repr(code)}. "
f"decoded_data={failure_msg.decode_data()}. data={data.hex()!r}") f"decoded_data={failure_msg.decode_data()}. data={data.hex()!r}")
self.logger.info(f"error reported by {bh2u(route[sender_idx].node_id)}") self.logger.info(f"error reported by {bh2u(erring_node_id)}")
if code == OnionFailureCode.MPP_TIMEOUT: if code == OnionFailureCode.MPP_TIMEOUT:
raise PaymentFailure(failure_msg.code_name()) raise PaymentFailure(failure_msg.code_name())
# trampoline # trampoline
@ -1227,7 +1228,7 @@ class LNWallet(LNWorker):
if code in (OnionFailureCode.TRAMPOLINE_FEE_INSUFFICIENT, if code in (OnionFailureCode.TRAMPOLINE_FEE_INSUFFICIENT,
OnionFailureCode.TRAMPOLINE_EXPIRY_TOO_SOON): OnionFailureCode.TRAMPOLINE_EXPIRY_TOO_SOON):
# todo: parse the node parameters here (not returned by eclair yet) # todo: parse the node parameters here (not returned by eclair yet)
trampoline_fee_level += 1 trampoline_fee_levels[erring_node_id] += 1
continue continue
elif use_two_trampolines: elif use_two_trampolines:
use_two_trampolines = False use_two_trampolines = False
@ -1447,7 +1448,7 @@ class LNWallet(LNWorker):
invoice_features: int, invoice_features: int,
payment_hash, payment_hash,
payment_secret, payment_secret,
trampoline_fee_level: int, trampoline_fee_levels: DefaultDict[bytes, int],
use_two_trampolines: bool, use_two_trampolines: bool,
fwd_trampoline_onion = None, fwd_trampoline_onion = None,
full_path: LNPaymentPath = None) -> AsyncGenerator[Tuple[LNPaymentRoute, int], None]: full_path: LNPaymentPath = None) -> AsyncGenerator[Tuple[LNPaymentRoute, int], None]:
@ -1462,6 +1463,7 @@ class LNWallet(LNWorker):
local_height = self.network.get_local_height() local_height = self.network.get_local_height()
active_channels = [chan for chan in self.channels.values() if chan.is_active() and not chan.is_frozen_for_sending()] active_channels = [chan for chan in self.channels.values() if chan.is_active() and not chan.is_frozen_for_sending()]
try: try:
self.logger.info("trying single-part payment")
# try to send over a single channel # try to send over a single channel
if not self.channel_db: if not self.channel_db:
for chan in active_channels: for chan in active_channels:
@ -1486,7 +1488,7 @@ class LNWallet(LNWorker):
payment_hash=payment_hash, payment_hash=payment_hash,
payment_secret=payment_secret, payment_secret=payment_secret,
local_height=local_height, local_height=local_height,
trampoline_fee_level=trampoline_fee_level, trampoline_fee_levels=trampoline_fee_levels,
use_two_trampolines=use_two_trampolines) use_two_trampolines=use_two_trampolines)
trampoline_payment_secret = os.urandom(32) trampoline_payment_secret = os.urandom(32)
trampoline_total_msat = amount_with_fees trampoline_total_msat = amount_with_fees
@ -1521,58 +1523,72 @@ class LNWallet(LNWorker):
) )
yield route, amount_msat, final_total_msat, amount_msat, min_cltv_expiry, payment_secret, fwd_trampoline_onion yield route, amount_msat, final_total_msat, amount_msat, min_cltv_expiry, payment_secret, fwd_trampoline_onion
except NoPathFound: # fall back to payment splitting except NoPathFound: # fall back to payment splitting
self.logger.info("no path found, trying multi-part payment")
if not invoice_features.supports(LnFeatures.BASIC_MPP_OPT): if not invoice_features.supports(LnFeatures.BASIC_MPP_OPT):
raise raise
channels_with_funds = { channels_with_funds = {
(chan.channel_id, chan.node_id): int(chan.available_to_spend(HTLCOwner.LOCAL)) (chan.channel_id, chan.node_id): int(chan.available_to_spend(HTLCOwner.LOCAL))
for chan in active_channels} for chan in active_channels}
self.logger.info(f"channels_with_funds: {channels_with_funds}") self.logger.info(f"channels_with_funds: {channels_with_funds}")
if not self.channel_db:
# for trampoline mpp payments we have to restrict ourselves to pay # for trampoline mpp payments we have to restrict ourselves to pay
# to a single node due to some incompatibility in Eclair, see: # to a single node due to some incompatibility in Eclair, see:
# https://github.com/ACINQ/eclair/issues/1723 # https://github.com/ACINQ/eclair/issues/1723
use_singe_node = not self.channel_db and constants.net is constants.BitcoinMainnet use_singe_node = constants.net is constants.BitcoinMainnet
split_configurations = suggest_splits(amount_msat, channels_with_funds, exclude_multinode_payments=use_singe_node) split_configurations = suggest_splits(
amount_msat,
channels_with_funds,
exclude_multinode_payments=use_singe_node,
exclude_single_part_payments=True,
# we don't split within a channel when sending to a trampoline node,
# the trampoline node will split for us
exclude_single_channel_splits=True,
)
self.logger.info(f'suggest_split {amount_msat} returned {len(split_configurations)} configurations') self.logger.info(f'suggest_split {amount_msat} returned {len(split_configurations)} configurations')
for sc in split_configurations: for sc in split_configurations:
self.logger.info(f"trying split configuration: {sc.config.values()} rating: {sc.rating}")
try: try:
if not self.channel_db: self.logger.info(f"trying split configuration: {sc.config.values()} rating: {sc.rating}")
buckets = defaultdict(list) per_trampoline_channel_amounts = defaultdict(list)
# categorize by trampoline nodes for trampolin mpp construction
for (chan_id, _), part_amounts_msat in sc.config.items(): for (chan_id, _), part_amounts_msat in sc.config.items():
chan = self.channels[chan_id] chan = self.channels[chan_id]
for part_amount_msat in part_amounts_msat: for part_amount_msat in part_amounts_msat:
buckets[chan.node_id].append((chan_id, part_amount_msat)) per_trampoline_channel_amounts[chan.node_id].append((chan_id, part_amount_msat))
for node_id, bucket in buckets.items(): # for each trampoline forwarder, construct mpp trampoline
bucket_amount_msat = sum([x[1] for x in bucket]) routes = []
trampoline_onion, bucket_amount_with_fees, bucket_cltv_delta = create_trampoline_route_and_onion( for trampoline_node_id, trampoline_parts in per_trampoline_channel_amounts.items():
amount_msat=bucket_amount_msat, per_trampoline_amount = sum([x[1] for x in trampoline_parts])
trampoline_onion, per_trampoline_amount_with_fees, per_trampoline_cltv_delta = create_trampoline_route_and_onion(
amount_msat=per_trampoline_amount,
total_msat=final_total_msat, total_msat=final_total_msat,
min_cltv_expiry=min_cltv_expiry, min_cltv_expiry=min_cltv_expiry,
my_pubkey=self.node_keypair.pubkey, my_pubkey=self.node_keypair.pubkey,
invoice_pubkey=invoice_pubkey, invoice_pubkey=invoice_pubkey,
invoice_features=invoice_features, invoice_features=invoice_features,
node_id=node_id, node_id=trampoline_node_id,
r_tags=r_tags, r_tags=r_tags,
payment_hash=payment_hash, payment_hash=payment_hash,
payment_secret=payment_secret, payment_secret=payment_secret,
local_height=local_height, local_height=local_height,
trampoline_fee_level=trampoline_fee_level, trampoline_fee_levels=trampoline_fee_levels,
use_two_trampolines=use_two_trampolines) use_two_trampolines=use_two_trampolines)
# node_features is only used to determine is_tlv # node_features is only used to determine is_tlv
bucket_payment_secret = os.urandom(32) per_trampoline_secret = os.urandom(32)
bucket_fees = bucket_amount_with_fees - bucket_amount_msat per_trampoline_fees = per_trampoline_amount_with_fees - per_trampoline_amount
self.logger.info(f'bucket_fees {bucket_fees}') self.logger.info(f'per trampoline fees: {per_trampoline_fees}')
for chan_id, part_amount_msat in bucket: for chan_id, part_amount_msat in trampoline_parts:
chan = self.channels[chan_id] chan = self.channels[chan_id]
margin = chan.available_to_spend(LOCAL, strict=True) - part_amount_msat margin = chan.available_to_spend(LOCAL, strict=True) - part_amount_msat
delta_fee = min(bucket_fees, margin) delta_fee = min(per_trampoline_fees, margin)
# TODO: distribute trampoline fee over several channels?
part_amount_msat_with_fees = part_amount_msat + delta_fee part_amount_msat_with_fees = part_amount_msat + delta_fee
bucket_fees -= delta_fee per_trampoline_fees -= delta_fee
route = [ route = [
RouteEdge( RouteEdge(
start_node=self.node_keypair.pubkey, start_node=self.node_keypair.pubkey,
end_node=node_id, end_node=trampoline_node_id,
short_channel_id=chan.short_channel_id, short_channel_id=chan.short_channel_id,
fee_base_msat=0, fee_base_msat=0,
fee_proportional_millionths=0, fee_proportional_millionths=0,
@ -1580,14 +1596,32 @@ class LNWallet(LNWorker):
node_features=trampoline_features) node_features=trampoline_features)
] ]
self.logger.info(f'adding route {part_amount_msat} {delta_fee} {margin}') self.logger.info(f'adding route {part_amount_msat} {delta_fee} {margin}')
yield route, part_amount_msat_with_fees, bucket_amount_with_fees, part_amount_msat, bucket_cltv_delta, bucket_payment_secret, trampoline_onion routes.append((route, part_amount_msat_with_fees, per_trampoline_amount_with_fees, part_amount_msat, per_trampoline_cltv_delta, per_trampoline_secret, trampoline_onion))
if bucket_fees != 0: if per_trampoline_fees != 0:
self.logger.info('not enough margin to pay trampoline fee') self.logger.info('not enough margin to pay trampoline fee')
raise NoPathFound() raise NoPathFound()
for route in routes:
yield route
return
except NoPathFound:
continue
else: else:
split_configurations = suggest_splits(
amount_msat,
channels_with_funds,
exclude_single_part_payments=True,
)
# We atomically loop through a split configuration. If there was
# a failure to find a path for a single part, we give back control
# after exhausting the split configuration.
yielded_from_split_configuration = False
self.logger.info(f'suggest_split {amount_msat} returned {len(split_configurations)} configurations')
for sc in split_configurations:
self.logger.info(f"trying split configuration: {list(sc.config.values())} rating: {sc.rating}")
for (chan_id, _), part_amounts_msat in sc.config.items(): for (chan_id, _), part_amounts_msat in sc.config.items():
for part_amount_msat in part_amounts_msat: for part_amount_msat in part_amounts_msat:
channel = self.channels[chan_id] channel = self.channels[chan_id]
try:
route = await run_in_thread( route = await run_in_thread(
partial( partial(
self.create_route_for_payment, self.create_route_for_payment,
@ -1601,11 +1635,11 @@ class LNWallet(LNWorker):
) )
) )
yield route, part_amount_msat, final_total_msat, part_amount_msat, min_cltv_expiry, payment_secret, fwd_trampoline_onion yield route, part_amount_msat, final_total_msat, part_amount_msat, min_cltv_expiry, payment_secret, fwd_trampoline_onion
self.logger.info(f"found acceptable split configuration: {list(sc.config.values())} rating: {sc.rating}") yielded_from_split_configuration = True
break
except NoPathFound: except NoPathFound:
continue continue
else: if yielded_from_split_configuration:
return
raise NoPathFound() raise NoPathFound()
@profiler @profiler

16
electrum/mpp_split.py

@ -86,6 +86,16 @@ def remove_single_part_configs(configs: List[SplitConfig]) -> List[SplitConfig]:
return [config for config in configs if number_parts(config) != 1] return [config for config in configs if number_parts(config) != 1]
def remove_single_channel_splits(configs: List[SplitConfig]) -> List[SplitConfig]:
filtered = []
for config in configs:
for v in config.values():
if len(v) > 1:
continue
filtered.append(config)
return filtered
def rate_config( def rate_config(
config: SplitConfig, config: SplitConfig,
channels_with_funds: ChannelsFundsInfo) -> float: channels_with_funds: ChannelsFundsInfo) -> float:
@ -113,7 +123,8 @@ def rate_config(
def suggest_splits( def suggest_splits(
amount_msat: int, channels_with_funds: ChannelsFundsInfo, amount_msat: int, channels_with_funds: ChannelsFundsInfo,
exclude_single_part_payments=False, exclude_single_part_payments=False,
exclude_multinode_payments=False exclude_multinode_payments=False,
exclude_single_channel_splits=False
) -> List[SplitConfigRating]: ) -> List[SplitConfigRating]:
"""Breaks amount_msat into smaller pieces and distributes them over the """Breaks amount_msat into smaller pieces and distributes them over the
channels according to the funds they can send. channels according to the funds they can send.
@ -172,6 +183,9 @@ def suggest_splits(
if exclude_single_part_payments: if exclude_single_part_payments:
configs = remove_single_part_configs(configs) configs = remove_single_part_configs(configs)
if exclude_single_channel_splits:
configs = remove_single_channel_splits(configs)
rated_configs = [SplitConfigRating( rated_configs = [SplitConfigRating(
config=c, config=c,
rating=rate_config(c, channels_with_funds) rating=rate_config(c, channels_with_funds)

2
electrum/tests/test_lnpeer.py

@ -204,7 +204,7 @@ class MockLNWallet(Logger, NetworkRetryManager[LNPeerAddr]):
min_cltv_expiry=decoded_invoice.get_min_final_cltv_expiry(), min_cltv_expiry=decoded_invoice.get_min_final_cltv_expiry(),
r_tags=decoded_invoice.get_routing_info('r'), r_tags=decoded_invoice.get_routing_info('r'),
invoice_features=decoded_invoice.get_features(), invoice_features=decoded_invoice.get_features(),
trampoline_fee_level=0, trampoline_fee_levels=defaultdict[int],
use_two_trampolines=False, use_two_trampolines=False,
payment_hash=decoded_invoice.paymenthash, payment_hash=decoded_invoice.paymenthash,
payment_secret=decoded_invoice.payment_secret, payment_secret=decoded_invoice.payment_secret,

4
electrum/tests/test_mpp_split.py

@ -122,3 +122,7 @@ class TestMppSplit(ElectrumTestCase):
mpp_split.PART_PENALTY = 0.3 mpp_split.PART_PENALTY = 0.3
splits = mpp_split.suggest_splits(1_000_000_000, channels_with_funds, exclude_single_part_payments=False) splits = mpp_split.suggest_splits(1_000_000_000, channels_with_funds, exclude_single_part_payments=False)
self.assertEqual(3, len(splits[0].config[(0, 0)])) self.assertEqual(3, len(splits[0].config[(0, 0)]))
with self.subTest(msg="exclude all single channel splits"):
mpp_split.PART_PENALTY = 0.3
splits = mpp_split.suggest_splits(1_000_000_000, channels_with_funds, exclude_single_channel_splits=True)
self.assertEqual(1, len(splits[0].config[(0, 0)]))

11
electrum/trampoline.py

@ -2,7 +2,7 @@ import os
import bitstring import bitstring
import random import random
from typing import Mapping from typing import Mapping, DefaultDict
from .logging import get_logger, Logger from .logging import get_logger, Logger
from .lnutil import LnFeatures from .lnutil import LnFeatures
@ -107,7 +107,7 @@ def create_trampoline_route(
my_pubkey: bytes, my_pubkey: bytes,
trampoline_node_id: bytes, # the first trampoline in the path; which we are directly connected to trampoline_node_id: bytes, # the first trampoline in the path; which we are directly connected to
r_tags, r_tags,
trampoline_fee_level: int, trampoline_fee_levels: DefaultDict[bytes, int],
use_two_trampolines: bool) -> LNPaymentRoute: use_two_trampolines: bool) -> LNPaymentRoute:
# figure out whether we can use end-to-end trampoline, or fallback to pay-to-legacy # figure out whether we can use end-to-end trampoline, or fallback to pay-to-legacy
@ -140,7 +140,8 @@ def create_trampoline_route(
if pubkey == TRAMPOLINE_NODES_MAINNET['ACINQ'].pubkey: if pubkey == TRAMPOLINE_NODES_MAINNET['ACINQ'].pubkey:
is_legacy = True is_legacy = True
use_two_trampolines = False use_two_trampolines = False
# fee level. the same fee is used for all trampolines # fee level
trampoline_fee_level = trampoline_fee_levels[trampoline_node_id]
if trampoline_fee_level < len(TRAMPOLINE_FEES): if trampoline_fee_level < len(TRAMPOLINE_FEES):
params = TRAMPOLINE_FEES[trampoline_fee_level] params = TRAMPOLINE_FEES[trampoline_fee_level]
else: else:
@ -269,7 +270,7 @@ def create_trampoline_route_and_onion(
payment_hash, payment_hash,
payment_secret, payment_secret,
local_height:int, local_height:int,
trampoline_fee_level: int, trampoline_fee_levels: DefaultDict[bytes, int],
use_two_trampolines: bool): use_two_trampolines: bool):
# create route for the trampoline_onion # create route for the trampoline_onion
trampoline_route = create_trampoline_route( trampoline_route = create_trampoline_route(
@ -280,7 +281,7 @@ def create_trampoline_route_and_onion(
invoice_features=invoice_features, invoice_features=invoice_features,
trampoline_node_id=node_id, trampoline_node_id=node_id,
r_tags=r_tags, r_tags=r_tags,
trampoline_fee_level=trampoline_fee_level, trampoline_fee_levels=trampoline_fee_levels,
use_two_trampolines=use_two_trampolines) use_two_trampolines=use_two_trampolines)
# compute onion and fees # compute onion and fees
final_cltv = local_height + min_cltv_expiry final_cltv = local_height + min_cltv_expiry

Loading…
Cancel
Save