From 68bc9c24744005daac24174fd57599a20eedc214 Mon Sep 17 00:00:00 2001 From: bitromortac Date: Tue, 13 Jul 2021 16:44:17 +0200 Subject: [PATCH] lnworker: improve route creation - Separates the trampoline and local routing multi-part payment cases. - Ask only for splits that don't send over a single channel (those have been tried already in the single-part case). - Makes sure that create_routes_for_payment only yields partial routes that belong to a single split configuration. - Tracks trampoline fee levels on a per node basis, previously, in the case of having two channels with a trampoline forwarder, the global fee level would have increased by two levels upon first try. --- electrum/lnworker.py | 124 ++++++++++++++++++++----------- electrum/mpp_split.py | 16 +++- electrum/tests/test_lnpeer.py | 2 +- electrum/tests/test_mpp_split.py | 4 + electrum/trampoline.py | 11 +-- 5 files changed, 105 insertions(+), 52 deletions(-) diff --git a/electrum/lnworker.py b/electrum/lnworker.py index e6bc3eb3e..0e3c930dd 100644 --- a/electrum/lnworker.py +++ b/electrum/lnworker.py @@ -8,7 +8,7 @@ from decimal import Decimal import random import time from typing import (Optional, Sequence, Tuple, List, Set, Dict, TYPE_CHECKING, - NamedTuple, Union, Mapping, Any, Iterable, AsyncGenerator) + NamedTuple, Union, Mapping, Any, Iterable, AsyncGenerator, DefaultDict) import threading import socket import aiohttp @@ -1148,7 +1148,7 @@ class LNWallet(LNWorker): raise OnionRoutingFailure(code=OnionFailureCode.TRAMPOLINE_EXPIRY_TOO_SOON, data=b'') self.logs[payment_hash.hex()] = log = [] - trampoline_fee_level = self.INITIAL_TRAMPOLINE_FEE_LEVEL + trampoline_fee_levels = defaultdict(lambda: self.INITIAL_TRAMPOLINE_FEE_LEVEL) # type: DefaultDict[bytes, int] use_two_trampolines = True # only used for pay to legacy amount_inflight = 0 # what we sent in htlcs (that receiver gets, without fees) @@ -1168,7 +1168,7 @@ class LNWallet(LNWorker): full_path=full_path, payment_hash=payment_hash, payment_secret=payment_secret, - trampoline_fee_level=trampoline_fee_level, + trampoline_fee_levels=trampoline_fee_levels, use_two_trampolines=use_two_trampolines, fwd_trampoline_onion=fwd_trampoline_onion ) @@ -1211,11 +1211,12 @@ class LNWallet(LNWorker): # if we get a channel update, we might retry the same route and amount route = htlc_log.route sender_idx = htlc_log.sender_idx + erring_node_id = route[sender_idx].node_id failure_msg = htlc_log.failure_msg code, data = failure_msg.code, failure_msg.data self.logger.info(f"UPDATE_FAIL_HTLC. code={repr(code)}. " f"decoded_data={failure_msg.decode_data()}. data={data.hex()!r}") - self.logger.info(f"error reported by {bh2u(route[sender_idx].node_id)}") + self.logger.info(f"error reported by {bh2u(erring_node_id)}") if code == OnionFailureCode.MPP_TIMEOUT: raise PaymentFailure(failure_msg.code_name()) # trampoline @@ -1227,7 +1228,7 @@ class LNWallet(LNWorker): if code in (OnionFailureCode.TRAMPOLINE_FEE_INSUFFICIENT, OnionFailureCode.TRAMPOLINE_EXPIRY_TOO_SOON): # todo: parse the node parameters here (not returned by eclair yet) - trampoline_fee_level += 1 + trampoline_fee_levels[erring_node_id] += 1 continue elif use_two_trampolines: use_two_trampolines = False @@ -1447,7 +1448,7 @@ class LNWallet(LNWorker): invoice_features: int, payment_hash, payment_secret, - trampoline_fee_level: int, + trampoline_fee_levels: DefaultDict[bytes, int], use_two_trampolines: bool, fwd_trampoline_onion = None, full_path: LNPaymentPath = None) -> AsyncGenerator[Tuple[LNPaymentRoute, int], None]: @@ -1462,6 +1463,7 @@ class LNWallet(LNWorker): local_height = self.network.get_local_height() active_channels = [chan for chan in self.channels.values() if chan.is_active() and not chan.is_frozen_for_sending()] try: + self.logger.info("trying single-part payment") # try to send over a single channel if not self.channel_db: for chan in active_channels: @@ -1486,7 +1488,7 @@ class LNWallet(LNWorker): payment_hash=payment_hash, payment_secret=payment_secret, local_height=local_height, - trampoline_fee_level=trampoline_fee_level, + trampoline_fee_levels=trampoline_fee_levels, use_two_trampolines=use_two_trampolines) trampoline_payment_secret = os.urandom(32) trampoline_total_msat = amount_with_fees @@ -1521,58 +1523,72 @@ class LNWallet(LNWorker): ) yield route, amount_msat, final_total_msat, amount_msat, min_cltv_expiry, payment_secret, fwd_trampoline_onion except NoPathFound: # fall back to payment splitting + self.logger.info("no path found, trying multi-part payment") if not invoice_features.supports(LnFeatures.BASIC_MPP_OPT): raise channels_with_funds = { (chan.channel_id, chan.node_id): int(chan.available_to_spend(HTLCOwner.LOCAL)) for chan in active_channels} self.logger.info(f"channels_with_funds: {channels_with_funds}") - # for trampoline mpp payments we have to restrict ourselves to pay - # to a single node due to some incompatibility in Eclair, see: - # https://github.com/ACINQ/eclair/issues/1723 - use_singe_node = not self.channel_db and constants.net is constants.BitcoinMainnet - split_configurations = suggest_splits(amount_msat, channels_with_funds, exclude_multinode_payments=use_singe_node) - self.logger.info(f'suggest_split {amount_msat} returned {len(split_configurations)} configurations') - - for sc in split_configurations: - self.logger.info(f"trying split configuration: {sc.config.values()} rating: {sc.rating}") - try: - if not self.channel_db: - buckets = defaultdict(list) + + if not self.channel_db: + # for trampoline mpp payments we have to restrict ourselves to pay + # to a single node due to some incompatibility in Eclair, see: + # https://github.com/ACINQ/eclair/issues/1723 + use_singe_node = constants.net is constants.BitcoinMainnet + split_configurations = suggest_splits( + amount_msat, + channels_with_funds, + exclude_multinode_payments=use_singe_node, + exclude_single_part_payments=True, + # we don't split within a channel when sending to a trampoline node, + # the trampoline node will split for us + exclude_single_channel_splits=True, + ) + self.logger.info(f'suggest_split {amount_msat} returned {len(split_configurations)} configurations') + + for sc in split_configurations: + try: + self.logger.info(f"trying split configuration: {sc.config.values()} rating: {sc.rating}") + per_trampoline_channel_amounts = defaultdict(list) + # categorize by trampoline nodes for trampolin mpp construction for (chan_id, _), part_amounts_msat in sc.config.items(): chan = self.channels[chan_id] for part_amount_msat in part_amounts_msat: - buckets[chan.node_id].append((chan_id, part_amount_msat)) - for node_id, bucket in buckets.items(): - bucket_amount_msat = sum([x[1] for x in bucket]) - trampoline_onion, bucket_amount_with_fees, bucket_cltv_delta = create_trampoline_route_and_onion( - amount_msat=bucket_amount_msat, + per_trampoline_channel_amounts[chan.node_id].append((chan_id, part_amount_msat)) + # for each trampoline forwarder, construct mpp trampoline + routes = [] + for trampoline_node_id, trampoline_parts in per_trampoline_channel_amounts.items(): + per_trampoline_amount = sum([x[1] for x in trampoline_parts]) + trampoline_onion, per_trampoline_amount_with_fees, per_trampoline_cltv_delta = create_trampoline_route_and_onion( + amount_msat=per_trampoline_amount, total_msat=final_total_msat, min_cltv_expiry=min_cltv_expiry, my_pubkey=self.node_keypair.pubkey, invoice_pubkey=invoice_pubkey, invoice_features=invoice_features, - node_id=node_id, + node_id=trampoline_node_id, r_tags=r_tags, payment_hash=payment_hash, payment_secret=payment_secret, local_height=local_height, - trampoline_fee_level=trampoline_fee_level, + trampoline_fee_levels=trampoline_fee_levels, use_two_trampolines=use_two_trampolines) # node_features is only used to determine is_tlv - bucket_payment_secret = os.urandom(32) - bucket_fees = bucket_amount_with_fees - bucket_amount_msat - self.logger.info(f'bucket_fees {bucket_fees}') - for chan_id, part_amount_msat in bucket: + per_trampoline_secret = os.urandom(32) + per_trampoline_fees = per_trampoline_amount_with_fees - per_trampoline_amount + self.logger.info(f'per trampoline fees: {per_trampoline_fees}') + for chan_id, part_amount_msat in trampoline_parts: chan = self.channels[chan_id] margin = chan.available_to_spend(LOCAL, strict=True) - part_amount_msat - delta_fee = min(bucket_fees, margin) + delta_fee = min(per_trampoline_fees, margin) + # TODO: distribute trampoline fee over several channels? part_amount_msat_with_fees = part_amount_msat + delta_fee - bucket_fees -= delta_fee + per_trampoline_fees -= delta_fee route = [ RouteEdge( start_node=self.node_keypair.pubkey, - end_node=node_id, + end_node=trampoline_node_id, short_channel_id=chan.short_channel_id, fee_base_msat=0, fee_proportional_millionths=0, @@ -1580,14 +1596,32 @@ class LNWallet(LNWorker): node_features=trampoline_features) ] self.logger.info(f'adding route {part_amount_msat} {delta_fee} {margin}') - yield route, part_amount_msat_with_fees, bucket_amount_with_fees, part_amount_msat, bucket_cltv_delta, bucket_payment_secret, trampoline_onion - if bucket_fees != 0: + routes.append((route, part_amount_msat_with_fees, per_trampoline_amount_with_fees, part_amount_msat, per_trampoline_cltv_delta, per_trampoline_secret, trampoline_onion)) + if per_trampoline_fees != 0: self.logger.info('not enough margin to pay trampoline fee') raise NoPathFound() - else: - for (chan_id, _), part_amounts_msat in sc.config.items(): - for part_amount_msat in part_amounts_msat: - channel = self.channels[chan_id] + for route in routes: + yield route + return + except NoPathFound: + continue + else: + split_configurations = suggest_splits( + amount_msat, + channels_with_funds, + exclude_single_part_payments=True, + ) + # We atomically loop through a split configuration. If there was + # a failure to find a path for a single part, we give back control + # after exhausting the split configuration. + yielded_from_split_configuration = False + self.logger.info(f'suggest_split {amount_msat} returned {len(split_configurations)} configurations') + for sc in split_configurations: + self.logger.info(f"trying split configuration: {list(sc.config.values())} rating: {sc.rating}") + for (chan_id, _), part_amounts_msat in sc.config.items(): + for part_amount_msat in part_amounts_msat: + channel = self.channels[chan_id] + try: route = await run_in_thread( partial( self.create_route_for_payment, @@ -1601,12 +1635,12 @@ class LNWallet(LNWorker): ) ) yield route, part_amount_msat, final_total_msat, part_amount_msat, min_cltv_expiry, payment_secret, fwd_trampoline_onion - self.logger.info(f"found acceptable split configuration: {list(sc.config.values())} rating: {sc.rating}") - break - except NoPathFound: - continue - else: - raise NoPathFound() + yielded_from_split_configuration = True + except NoPathFound: + continue + if yielded_from_split_configuration: + return + raise NoPathFound() @profiler def create_route_for_payment( diff --git a/electrum/mpp_split.py b/electrum/mpp_split.py index b7e07c938..c3eec558f 100644 --- a/electrum/mpp_split.py +++ b/electrum/mpp_split.py @@ -86,6 +86,16 @@ def remove_single_part_configs(configs: List[SplitConfig]) -> List[SplitConfig]: return [config for config in configs if number_parts(config) != 1] +def remove_single_channel_splits(configs: List[SplitConfig]) -> List[SplitConfig]: + filtered = [] + for config in configs: + for v in config.values(): + if len(v) > 1: + continue + filtered.append(config) + return filtered + + def rate_config( config: SplitConfig, channels_with_funds: ChannelsFundsInfo) -> float: @@ -113,7 +123,8 @@ def rate_config( def suggest_splits( amount_msat: int, channels_with_funds: ChannelsFundsInfo, exclude_single_part_payments=False, - exclude_multinode_payments=False + exclude_multinode_payments=False, + exclude_single_channel_splits=False ) -> List[SplitConfigRating]: """Breaks amount_msat into smaller pieces and distributes them over the channels according to the funds they can send. @@ -172,6 +183,9 @@ def suggest_splits( if exclude_single_part_payments: configs = remove_single_part_configs(configs) + if exclude_single_channel_splits: + configs = remove_single_channel_splits(configs) + rated_configs = [SplitConfigRating( config=c, rating=rate_config(c, channels_with_funds) diff --git a/electrum/tests/test_lnpeer.py b/electrum/tests/test_lnpeer.py index cf02a5c79..de5857075 100644 --- a/electrum/tests/test_lnpeer.py +++ b/electrum/tests/test_lnpeer.py @@ -204,7 +204,7 @@ class MockLNWallet(Logger, NetworkRetryManager[LNPeerAddr]): min_cltv_expiry=decoded_invoice.get_min_final_cltv_expiry(), r_tags=decoded_invoice.get_routing_info('r'), invoice_features=decoded_invoice.get_features(), - trampoline_fee_level=0, + trampoline_fee_levels=defaultdict[int], use_two_trampolines=False, payment_hash=decoded_invoice.paymenthash, payment_secret=decoded_invoice.payment_secret, diff --git a/electrum/tests/test_mpp_split.py b/electrum/tests/test_mpp_split.py index de0580e13..702ee960f 100644 --- a/electrum/tests/test_mpp_split.py +++ b/electrum/tests/test_mpp_split.py @@ -122,3 +122,7 @@ class TestMppSplit(ElectrumTestCase): mpp_split.PART_PENALTY = 0.3 splits = mpp_split.suggest_splits(1_000_000_000, channels_with_funds, exclude_single_part_payments=False) self.assertEqual(3, len(splits[0].config[(0, 0)])) + with self.subTest(msg="exclude all single channel splits"): + mpp_split.PART_PENALTY = 0.3 + splits = mpp_split.suggest_splits(1_000_000_000, channels_with_funds, exclude_single_channel_splits=True) + self.assertEqual(1, len(splits[0].config[(0, 0)])) diff --git a/electrum/trampoline.py b/electrum/trampoline.py index 931245cf7..444fd6404 100644 --- a/electrum/trampoline.py +++ b/electrum/trampoline.py @@ -2,7 +2,7 @@ import os import bitstring import random -from typing import Mapping +from typing import Mapping, DefaultDict from .logging import get_logger, Logger from .lnutil import LnFeatures @@ -107,7 +107,7 @@ def create_trampoline_route( my_pubkey: bytes, trampoline_node_id: bytes, # the first trampoline in the path; which we are directly connected to r_tags, - trampoline_fee_level: int, + trampoline_fee_levels: DefaultDict[bytes, int], use_two_trampolines: bool) -> LNPaymentRoute: # figure out whether we can use end-to-end trampoline, or fallback to pay-to-legacy @@ -140,7 +140,8 @@ def create_trampoline_route( if pubkey == TRAMPOLINE_NODES_MAINNET['ACINQ'].pubkey: is_legacy = True use_two_trampolines = False - # fee level. the same fee is used for all trampolines + # fee level + trampoline_fee_level = trampoline_fee_levels[trampoline_node_id] if trampoline_fee_level < len(TRAMPOLINE_FEES): params = TRAMPOLINE_FEES[trampoline_fee_level] else: @@ -269,7 +270,7 @@ def create_trampoline_route_and_onion( payment_hash, payment_secret, local_height:int, - trampoline_fee_level: int, + trampoline_fee_levels: DefaultDict[bytes, int], use_two_trampolines: bool): # create route for the trampoline_onion trampoline_route = create_trampoline_route( @@ -280,7 +281,7 @@ def create_trampoline_route_and_onion( invoice_features=invoice_features, trampoline_node_id=node_id, r_tags=r_tags, - trampoline_fee_level=trampoline_fee_level, + trampoline_fee_levels=trampoline_fee_levels, use_two_trampolines=use_two_trampolines) # compute onion and fees final_cltv = local_height + min_cltv_expiry