You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

1936 lines
96 KiB

#!/usr/bin/env python3
#
# Copyright (C) 2018 The Electrum developers
# Distributed under the MIT software license, see the accompanying
# file LICENCE or http://www.opensource.org/licenses/mit-license.php
import zlib
from collections import OrderedDict, defaultdict
import asyncio
import os
import time
from typing import Tuple, Dict, TYPE_CHECKING, Optional, Union
from datetime import datetime
import aiorpcx
from .crypto import sha256, sha256d
from . import bitcoin, util
from . import ecc
from .ecc import sig_string_from_r_and_s, der_sig_from_sig_string
from . import constants
from .util import (bh2u, bfh, log_exceptions, ignore_exceptions, chunks, SilentTaskGroup,
UnrelatedTransactionException)
from . import transaction
from .transaction import PartialTxOutput, match_script_against_template
from .logging import Logger
from .lnonion import (new_onion_packet, OnionFailureCode, calc_hops_data_for_payment,
process_onion_packet, OnionPacket, construct_onion_error, OnionRoutingFailure,
ProcessedOnionPacket, UnsupportedOnionPacketVersion, InvalidOnionMac, InvalidOnionPubkey,
OnionFailureCodeMetaFlag)
from .lnchannel import Channel, RevokeAndAck, RemoteCtnTooFarInFuture, ChannelState, PeerState
from . import lnutil
from .lnutil import (Outpoint, LocalConfig, RECEIVED, UpdateAddHtlc,
RemoteConfig, OnlyPubkeyKeypair, ChannelConstraints, RevocationStore,
funding_output_script, get_per_commitment_secret_from_seed,
secret_to_pubkey, PaymentFailure, LnFeatures,
LOCAL, REMOTE, HTLCOwner,
ln_compare_features, privkey_to_pubkey, MIN_FINAL_CLTV_EXPIRY_ACCEPTED,
LightningPeerConnectionClosed, HandshakeFailed,
RemoteMisbehaving, ShortChannelID,
IncompatibleLightningFeatures, derive_payment_secret_from_payment_preimage,
LN_MAX_FUNDING_SAT, calc_fees_for_commitment_tx,
UpfrontShutdownScriptViolation)
from .lnutil import FeeUpdate, channel_id_from_funding_tx
from .lntransport import LNTransport, LNTransportBase
from .lnmsg import encode_msg, decode_msg
from .interface import GracefulDisconnect
from .lnrouter import fee_for_edge_msat
from .lnutil import ln_dummy_address
from .json_db import StoredDict
if TYPE_CHECKING:
from .lnworker import LNGossip, LNWallet, LNBackups
from .lnrouter import LNPaymentRoute
from .transaction import PartialTransaction
LN_P2P_NETWORK_TIMEOUT = 20
class Peer(Logger):
LOGGING_SHORTCUT = 'P'
def __init__(
self,
lnworker: Union['LNGossip', 'LNWallet', 'LNBackups'],
pubkey: bytes,
transport: LNTransportBase
):
self._sent_init = False # type: bool
self._received_init = False # type: bool
self.initialized = asyncio.Future()
self.querying = asyncio.Event()
self.transport = transport
self.pubkey = pubkey # remote pubkey
self.lnworker = lnworker
self.privkey = self.transport.privkey # local privkey
self.features = self.lnworker.features # type: LnFeatures
self.their_features = LnFeatures(0) # type: LnFeatures
self.node_ids = [self.pubkey, privkey_to_pubkey(self.privkey)]
assert self.node_ids[0] != self.node_ids[1]
self.network = lnworker.network
self.ping_time = 0
self.reply_channel_range = asyncio.Queue()
# gossip uses a single queue to preserve message order
self.gossip_queue = asyncio.Queue()
self.ordered_messages = ['accept_channel', 'funding_signed', 'funding_created', 'accept_channel', 'channel_reestablish', 'closing_signed']
self.ordered_message_queues = defaultdict(asyncio.Queue) # for messsage that are ordered
self.temp_id_to_id = {} # to forward error messages
self.funding_created_sent = set() # for channels in PREOPENING
self.funding_signed_sent = set() # for channels in PREOPENING
self.shutdown_received = {} # chan_id -> asyncio.Future()
self.announcement_signatures = defaultdict(asyncio.Queue)
self.orphan_channel_updates = OrderedDict()
Logger.__init__(self)
self.taskgroup = SilentTaskGroup()
def send_message(self, message_name: str, **kwargs):
assert type(message_name) is str
self.logger.debug(f"Sending {message_name.upper()}")
if message_name.upper() != "INIT" and not self.is_initialized():
raise Exception("tried to send message before we are initialized")
raw_msg = encode_msg(message_name, **kwargs)
self._store_raw_msg_if_local_update(raw_msg, message_name=message_name, channel_id=kwargs.get("channel_id"))
self.transport.send_bytes(raw_msg)
def _store_raw_msg_if_local_update(self, raw_msg: bytes, *, message_name: str, channel_id: Optional[bytes]):
is_commitment_signed = message_name == "commitment_signed"
if not (message_name.startswith("update_") or is_commitment_signed):
return
assert channel_id
chan = self.get_channel_by_id(channel_id)
if not chan:
raise Exception(f"channel {channel_id.hex()} not found for peer {self.pubkey.hex()}")
chan.hm.store_local_update_raw_msg(raw_msg, is_commitment_signed=is_commitment_signed)
if is_commitment_signed:
# saving now, to ensure replaying updates works (in case of channel reestablishment)
self.lnworker.save_channel(chan)
def maybe_set_initialized(self):
if self.initialized.done():
return
if self._sent_init and self._received_init:
self.initialized.set_result(True)
def is_initialized(self) -> bool:
return (self.initialized.done()
and not self.initialized.cancelled()
and self.initialized.exception() is None
and self.initialized.result() is True)
async def initialize(self):
if isinstance(self.transport, LNTransport):
await self.transport.handshake()
features = self.features.for_init_message()
b = int.bit_length(features)
flen = b // 8 + int(bool(b % 8))
self.send_message(
"init", gflen=0, flen=flen,
features=features,
init_tlvs={
'networks':
{'chains': constants.net.rev_genesis_bytes()}
})
self._sent_init = True
self.maybe_set_initialized()
@property
def channels(self) -> Dict[bytes, Channel]:
return self.lnworker.channels_for_peer(self.pubkey)
def get_channel_by_id(self, channel_id: bytes) -> Optional[Channel]:
# note: this is faster than self.channels.get(channel_id)
chan = self.lnworker.get_channel_by_id(channel_id)
if not chan:
return None
if chan.node_id != self.pubkey:
return None
return chan
def diagnostic_name(self):
return self.lnworker.__class__.__name__ + ', ' + self.transport.name()
def ping_if_required(self):
if time.time() - self.ping_time > 120:
self.send_message('ping', num_pong_bytes=4, byteslen=4)
self.ping_time = time.time()
def process_message(self, message):
message_type, payload = decode_msg(message)
# only process INIT if we are a backup
from .lnworker import LNBackups
if isinstance(self.lnworker, LNBackups) and message_type != 'init':
return
if message_type in self.ordered_messages:
chan_id = payload.get('channel_id') or payload["temporary_channel_id"]
self.ordered_message_queues[chan_id].put_nowait((message_type, payload))
else:
if message_type != 'error' and 'channel_id' in payload:
chan = self.get_channel_by_id(payload['channel_id'])
if chan is None:
raise Exception('Got unknown '+ message_type)
args = (chan, payload)
else:
args = (payload,)
try:
f = getattr(self, 'on_' + message_type)
except AttributeError:
#self.logger.info("Received '%s'" % message_type.upper(), payload)
return
# raw message is needed to check signature
if message_type in ['node_announcement', 'channel_announcement', 'channel_update']:
payload['raw'] = message
execution_result = f(*args)
if asyncio.iscoroutinefunction(f):
asyncio.ensure_future(self.taskgroup.spawn(execution_result))
def on_error(self, payload):
self.logger.info(f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: {payload['data'].decode('ascii')}")
chan_id = payload.get("channel_id")
if chan_id in self.temp_id_to_id:
chan_id = self.temp_id_to_id[chan_id]
self.ordered_message_queues[chan_id].put_nowait((None, {'error':payload['data']}))
def on_ping(self, payload):
l = payload['num_pong_bytes']
self.send_message('pong', byteslen=l)
def on_pong(self, payload):
pass
async def wait_for_message(self, expected_name, channel_id):
q = self.ordered_message_queues[channel_id]
name, payload = await asyncio.wait_for(q.get(), LN_P2P_NETWORK_TIMEOUT)
if payload.get('error'):
raise Exception('Remote peer reported error [DO NOT TRUST THIS MESSAGE]: ' + repr(payload.get('error')))
if name != expected_name:
raise Exception(f"Received unexpected '{name}'")
return payload
def on_init(self, payload):
if self._received_init:
self.logger.info("ALREADY INITIALIZED BUT RECEIVED INIT")
return
self.their_features = LnFeatures(int.from_bytes(payload['features'], byteorder="big"))
their_globalfeatures = int.from_bytes(payload['globalfeatures'], byteorder="big")
self.their_features |= their_globalfeatures
# check transitive dependencies for received features
if not self.their_features.validate_transitive_dependencies():
raise GracefulDisconnect("remote did not set all dependencies for the features they sent")
# check if features are compatible, and set self.features to what we negotiated
try:
self.features = ln_compare_features(self.features, self.their_features)
except IncompatibleLightningFeatures as e:
self.initialized.set_exception(e)
raise GracefulDisconnect(f"{str(e)}")
# check that they are on the same chain as us, if provided
their_networks = payload["init_tlvs"].get("networks")
if their_networks:
their_chains = list(chunks(their_networks["chains"], 32))
if constants.net.rev_genesis_bytes() not in their_chains:
raise GracefulDisconnect(f"no common chain found with remote. (they sent: {their_chains})")
# all checks passed
self.lnworker.on_peer_successfully_established(self)
self._received_init = True
self.maybe_set_initialized()
def on_node_announcement(self, payload):
if self.lnworker.channel_db:
self.gossip_queue.put_nowait(('node_announcement', payload))
def on_channel_announcement(self, payload):
if self.lnworker.channel_db:
self.gossip_queue.put_nowait(('channel_announcement', payload))
def on_channel_update(self, payload):
self.maybe_save_remote_update(payload)
if self.lnworker.channel_db:
self.gossip_queue.put_nowait(('channel_update', payload))
def maybe_save_remote_update(self, payload):
if not self.channels:
return
for chan in self.channels.values():
if chan.short_channel_id == payload['short_channel_id']:
chan.set_remote_update(payload['raw'])
self.logger.info("saved remote_update")
break
else:
# Save (some bounded number of) orphan channel updates for later
# as it might be for our own direct channel with this peer
# (and we might not yet know the short channel id for that)
# Background: this code is here to deal with a bug in LND,
# see https://github.com/lightningnetwork/lnd/issues/3651
# and https://github.com/lightningnetwork/lightning-rfc/pull/657
# This code assumes gossip_queries is set. BOLT7: "if the
# gossip_queries feature is negotiated, [a node] MUST NOT
# send gossip it did not generate itself"
short_channel_id = ShortChannelID(payload['short_channel_id'])
self.logger.info(f'received orphan channel update {short_channel_id}')
self.orphan_channel_updates[short_channel_id] = payload
while len(self.orphan_channel_updates) > 25:
self.orphan_channel_updates.popitem(last=False)
def on_announcement_signatures(self, chan: Channel, payload):
if chan.config[LOCAL].was_announced:
h, local_node_sig, local_bitcoin_sig = self.send_announcement_signatures(chan)
else:
self.announcement_signatures[chan.channel_id].put_nowait(payload)
def handle_disconnect(func):
async def wrapper_func(self, *args, **kwargs):
try:
return await func(self, *args, **kwargs)
except GracefulDisconnect as e:
self.logger.log(e.log_level, f"Disconnecting: {repr(e)}")
except (LightningPeerConnectionClosed, IncompatibleLightningFeatures,
aiorpcx.socks.SOCKSError) as e:
self.logger.info(f"Disconnecting: {repr(e)}")
finally:
self.close_and_cleanup()
return wrapper_func
@ignore_exceptions # do not kill outer taskgroup
@log_exceptions
@handle_disconnect
async def main_loop(self):
async with self.taskgroup as group:
await group.spawn(self._message_loop())
await group.spawn(self.htlc_switch())
await group.spawn(self.query_gossip())
await group.spawn(self.process_gossip())
async def process_gossip(self):
while True:
await asyncio.sleep(5)
if not self.network.lngossip:
continue
chan_anns = []
chan_upds = []
node_anns = []
while True:
name, payload = await self.gossip_queue.get()
if name == 'channel_announcement':
chan_anns.append(payload)
elif name == 'channel_update':
chan_upds.append(payload)
elif name == 'node_announcement':
node_anns.append(payload)
else:
raise Exception('unknown message')
if self.gossip_queue.empty():
break
# verify in peer's TaskGroup so that we fail the connection
self.verify_channel_announcements(chan_anns)
self.verify_node_announcements(node_anns)
if self.network.lngossip:
await self.network.lngossip.process_gossip(chan_anns, node_anns, chan_upds)
def verify_channel_announcements(self, chan_anns):
for payload in chan_anns:
h = sha256d(payload['raw'][2+256:])
pubkeys = [payload['node_id_1'], payload['node_id_2'], payload['bitcoin_key_1'], payload['bitcoin_key_2']]
sigs = [payload['node_signature_1'], payload['node_signature_2'], payload['bitcoin_signature_1'], payload['bitcoin_signature_2']]
for pubkey, sig in zip(pubkeys, sigs):
if not ecc.verify_signature(pubkey, sig, h):
raise Exception('signature failed')
def verify_node_announcements(self, node_anns):
for payload in node_anns:
pubkey = payload['node_id']
signature = payload['signature']
h = sha256d(payload['raw'][66:])
if not ecc.verify_signature(pubkey, signature, h):
raise Exception('signature failed')
async def query_gossip(self):
try:
await asyncio.wait_for(self.initialized, LN_P2P_NETWORK_TIMEOUT)
except Exception as e:
raise GracefulDisconnect(f"Failed to initialize: {e!r}") from e
if self.lnworker == self.lnworker.network.lngossip:
try:
ids, complete = await asyncio.wait_for(self.get_channel_range(), LN_P2P_NETWORK_TIMEOUT)
except asyncio.TimeoutError as e:
raise GracefulDisconnect("query_channel_range timed out") from e
self.logger.info('Received {} channel ids. (complete: {})'.format(len(ids), complete))
await self.lnworker.add_new_ids(ids)
while True:
todo = self.lnworker.get_ids_to_query()
if not todo:
await asyncio.sleep(1)
continue
await self.get_short_channel_ids(todo)
async def get_channel_range(self):
first_block = constants.net.BLOCK_HEIGHT_FIRST_LIGHTNING_CHANNELS
num_blocks = self.lnworker.network.get_local_height() - first_block
self.query_channel_range(first_block, num_blocks)
intervals = []
ids = set()
# note: implementations behave differently...
# "sane implementation that follows BOLT-07" example:
# query_channel_range. <<< first_block 497000, num_blocks 79038
# on_reply_channel_range. >>> first_block 497000, num_blocks 39516, num_ids 4648, complete True
# on_reply_channel_range. >>> first_block 536516, num_blocks 19758, num_ids 5734, complete True
# on_reply_channel_range. >>> first_block 556274, num_blocks 9879, num_ids 13712, complete True
# on_reply_channel_range. >>> first_block 566153, num_blocks 9885, num_ids 18114, complete True
# lnd example:
# query_channel_range. <<< first_block 497000, num_blocks 79038
# on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
# on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
# on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
# on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 8000, complete False
# on_reply_channel_range. >>> first_block 497000, num_blocks 79038, num_ids 5344, complete True
while True:
index, num, complete, _ids = await self.reply_channel_range.get()
ids.update(_ids)
intervals.append((index, index+num))
intervals.sort()
while len(intervals) > 1:
a,b = intervals[0]
c,d = intervals[1]
if not (a <= c and a <= b and c <= d):
raise Exception(f"insane reply_channel_range intervals {(a,b,c,d)}")
if b >= c:
intervals = [(a,d)] + intervals[2:]
else:
break
if len(intervals) == 1 and complete:
a, b = intervals[0]
if a <= first_block and b >= first_block + num_blocks:
break
return ids, complete
def request_gossip(self, timestamp=0):
if timestamp == 0:
self.logger.info('requesting whole channel graph')
else:
self.logger.info(f'requesting channel graph since {datetime.fromtimestamp(timestamp).ctime()}')
self.send_message(
'gossip_timestamp_filter',
chain_hash=constants.net.rev_genesis_bytes(),
first_timestamp=timestamp,
timestamp_range=b'\xff'*4)
def query_channel_range(self, first_block, num_blocks):
self.logger.info(f'query channel range {first_block} {num_blocks}')
self.send_message(
'query_channel_range',
chain_hash=constants.net.rev_genesis_bytes(),
first_blocknum=first_block,
number_of_blocks=num_blocks)
def decode_short_ids(self, encoded):
if encoded[0] == 0:
decoded = encoded[1:]
elif encoded[0] == 1:
decoded = zlib.decompress(encoded[1:])
else:
raise Exception(f'decode_short_ids: unexpected first byte: {encoded[0]}')
ids = [decoded[i:i+8] for i in range(0, len(decoded), 8)]
return ids
def on_reply_channel_range(self, payload):
first = payload['first_blocknum']
num = payload['number_of_blocks']
complete = bool(int.from_bytes(payload['complete'], 'big'))
encoded = payload['encoded_short_ids']
ids = self.decode_short_ids(encoded)
#self.logger.info(f"on_reply_channel_range. >>> first_block {first}, num_blocks {num}, num_ids {len(ids)}, complete {repr(payload['complete'])}")
self.reply_channel_range.put_nowait((first, num, complete, ids))
async def get_short_channel_ids(self, ids):
self.logger.info(f'Querying {len(ids)} short_channel_ids')
assert not self.querying.is_set()
self.query_short_channel_ids(ids)
await self.querying.wait()
self.querying.clear()
def query_short_channel_ids(self, ids, compressed=True):
ids = sorted(ids)
s = b''.join(ids)
encoded = zlib.compress(s) if compressed else s
prefix = b'\x01' if compressed else b'\x00'
self.send_message(
'query_short_channel_ids',
chain_hash=constants.net.rev_genesis_bytes(),
len=1+len(encoded),
encoded_short_ids=prefix+encoded)
async def _message_loop(self):
try:
await asyncio.wait_for(self.initialize(), LN_P2P_NETWORK_TIMEOUT)
except (OSError, asyncio.TimeoutError, HandshakeFailed) as e:
raise GracefulDisconnect(f'initialize failed: {repr(e)}') from e
async for msg in self.transport.read_messages():
self.process_message(msg)
await asyncio.sleep(.01)
def on_reply_short_channel_ids_end(self, payload):
self.querying.set()
def close_and_cleanup(self):
try:
if self.transport:
self.transport.close()
except:
pass
self.lnworker.peer_closed(self)
def is_static_remotekey(self):
return self.features.supports(LnFeatures.OPTION_STATIC_REMOTEKEY_OPT)
def is_upfront_shutdown_script(self):
return self.features.supports(LnFeatures.OPTION_UPFRONT_SHUTDOWN_SCRIPT_OPT)
def upfront_shutdown_script_from_payload(self, payload, msg_identifier: str) -> Optional[bytes]:
if msg_identifier not in ['accept', 'open']:
raise ValueError("msg_identifier must be either 'accept' or 'open'")
uss_tlv = payload[msg_identifier + '_channel_tlvs'].get(
'upfront_shutdown_script')
if uss_tlv and self.is_upfront_shutdown_script():
upfront_shutdown_script = uss_tlv['shutdown_scriptpubkey']
else:
upfront_shutdown_script = b''
self.logger.info(f"upfront shutdown script received: {upfront_shutdown_script}")
return upfront_shutdown_script
def make_local_config(self, funding_sat: int, push_msat: int, initiator: HTLCOwner) -> LocalConfig:
channel_seed = os.urandom(32)
initial_msat = funding_sat * 1000 - push_msat if initiator == LOCAL else push_msat
static_remotekey = None
# sending empty bytes as the upfront_shutdown_script will give us the
# flexibility to decide an address at closing time
upfront_shutdown_script = b''
if self.is_static_remotekey():
wallet = self.lnworker.wallet
assert wallet.txin_type == 'p2wpkh'
addr = wallet.get_new_sweep_address_for_channel()
static_remotekey = bfh(wallet.get_public_key(addr))
else:
static_remotekey = None
dust_limit_sat = bitcoin.DUST_LIMIT_DEFAULT_SAT_LEGACY
reserve_sat = max(funding_sat // 100, dust_limit_sat)
local_config = LocalConfig.from_seed(
channel_seed=channel_seed,
static_remotekey=static_remotekey,
upfront_shutdown_script=upfront_shutdown_script,
to_self_delay=self.network.config.get('lightning_to_self_delay', 7 * 144),
dust_limit_sat=dust_limit_sat,
max_htlc_value_in_flight_msat=funding_sat * 1000,
max_accepted_htlcs=5,
initial_msat=initial_msat,
reserve_sat=reserve_sat,
funding_locked_received=False,
was_announced=False,
current_commitment_signature=None,
current_htlc_signatures=b'',
htlc_minimum_msat=1,
)
local_config.validate_params(funding_sat=funding_sat)
return local_config
def temporarily_reserve_funding_tx_change_address(func):
# During the channel open flow, if we initiated, we might have used a change address
# of ours in the funding tx. The funding tx is not part of the wallet history
# at that point yet, but we should already consider this change address as 'used'.
async def wrapper(self: 'Peer', *args, **kwargs):
funding_tx = kwargs['funding_tx'] # type: PartialTransaction
wallet = self.lnworker.wallet
change_addresses = [txout.address for txout in funding_tx.outputs()
if wallet.is_change(txout.address)]
for addr in change_addresses:
wallet.set_reserved_state_of_address(addr, reserved=True)
try:
return await func(self, *args, **kwargs)
finally:
for addr in change_addresses:
self.lnworker.wallet.set_reserved_state_of_address(addr, reserved=False)
return wrapper
@log_exceptions
@temporarily_reserve_funding_tx_change_address
async def channel_establishment_flow(
self, *,
funding_tx: 'PartialTransaction',
funding_sat: int,
push_msat: int,
temp_channel_id: bytes
) -> Tuple[Channel, 'PartialTransaction']:
"""Implements the channel opening flow.
-> open_channel message
<- accept_channel message
-> funding_created message
<- funding_signed message
Channel configurations are initialized in this method.
"""
# will raise if init fails
await asyncio.wait_for(self.initialized, LN_P2P_NETWORK_TIMEOUT)
# trampoline is not yet in features
if not self.lnworker.channel_db and not self.lnworker.is_trampoline_peer(self.pubkey):
raise Exception('Not a trampoline node: ' + str(self.their_features))
feerate = self.lnworker.current_feerate_per_kw()
local_config = self.make_local_config(funding_sat, push_msat, LOCAL)
if funding_sat > LN_MAX_FUNDING_SAT:
raise Exception(
f"MUST set funding_satoshis to less than 2^24 satoshi. "
f"{funding_sat} sat > {LN_MAX_FUNDING_SAT}")
if push_msat > 1000 * funding_sat:
raise Exception(
f"MUST set push_msat to equal or less than 1000 * funding_satoshis: "
f"{push_msat} msat > {1000 * funding_sat} msat")
if funding_sat < lnutil.MIN_FUNDING_SAT:
raise Exception(f"funding_sat too low: {funding_sat} < {lnutil.MIN_FUNDING_SAT}")
# for the first commitment transaction
per_commitment_secret_first = get_per_commitment_secret_from_seed(
local_config.per_commitment_secret_seed,
RevocationStore.START_INDEX
)
per_commitment_point_first = secret_to_pubkey(
int.from_bytes(per_commitment_secret_first, 'big'))
self.send_message(
"open_channel",
temporary_channel_id=temp_channel_id,
chain_hash=constants.net.rev_genesis_bytes(),
funding_satoshis=funding_sat,
push_msat=push_msat,
dust_limit_satoshis=local_config.dust_limit_sat,
feerate_per_kw=feerate,
max_accepted_htlcs=local_config.max_accepted_htlcs,
funding_pubkey=local_config.multisig_key.pubkey,
revocation_basepoint=local_config.revocation_basepoint.pubkey,
htlc_basepoint=local_config.htlc_basepoint.pubkey,
payment_basepoint=local_config.payment_basepoint.pubkey,
delayed_payment_basepoint=local_config.delayed_basepoint.pubkey,
first_per_commitment_point=per_commitment_point_first,
to_self_delay=local_config.to_self_delay,
max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat,
channel_flags=0x00, # not willing to announce channel
channel_reserve_satoshis=local_config.reserve_sat,
htlc_minimum_msat=local_config.htlc_minimum_msat,
open_channel_tlvs={
'upfront_shutdown_script':
{'shutdown_scriptpubkey': local_config.upfront_shutdown_script}
}
)
# <- accept_channel
payload = await self.wait_for_message('accept_channel', temp_channel_id)
remote_per_commitment_point = payload['first_per_commitment_point']
funding_txn_minimum_depth = payload['minimum_depth']
if funding_txn_minimum_depth <= 0:
raise Exception(f"minimum depth too low, {funding_txn_minimum_depth}")
if funding_txn_minimum_depth > 30:
raise Exception(f"minimum depth too high, {funding_txn_minimum_depth}")
upfront_shutdown_script = self.upfront_shutdown_script_from_payload(
payload, 'accept')
remote_config = RemoteConfig(
payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']),
multisig_key=OnlyPubkeyKeypair(payload["funding_pubkey"]),
htlc_basepoint=OnlyPubkeyKeypair(payload['htlc_basepoint']),
delayed_basepoint=OnlyPubkeyKeypair(payload['delayed_payment_basepoint']),
revocation_basepoint=OnlyPubkeyKeypair(payload['revocation_basepoint']),
to_self_delay=payload['to_self_delay'],
dust_limit_sat=payload['dust_limit_satoshis'],
max_htlc_value_in_flight_msat=payload['max_htlc_value_in_flight_msat'],
max_accepted_htlcs=payload["max_accepted_htlcs"],
initial_msat=push_msat,
reserve_sat=payload["channel_reserve_satoshis"],
htlc_minimum_msat=payload['htlc_minimum_msat'],
next_per_commitment_point=remote_per_commitment_point,
current_per_commitment_point=None,
upfront_shutdown_script=upfront_shutdown_script
)
remote_config.validate_params(funding_sat=funding_sat)
# if channel_reserve_satoshis is less than dust_limit_satoshis within the open_channel message:
# MUST reject the channel.
if remote_config.reserve_sat < local_config.dust_limit_sat:
raise Exception("violated constraint: remote_config.reserve_sat < local_config.dust_limit_sat")
# if channel_reserve_satoshis from the open_channel message is less than dust_limit_satoshis:
# MUST reject the channel.
if local_config.reserve_sat < remote_config.dust_limit_sat:
raise Exception("violated constraint: local_config.reserve_sat < remote_config.dust_limit_sat")
# -> funding created
# replace dummy output in funding tx
redeem_script = funding_output_script(local_config, remote_config)
funding_address = bitcoin.redeem_script_to_address('p2wsh', redeem_script)
funding_output = PartialTxOutput.from_address_and_value(funding_address, funding_sat)
dummy_output = PartialTxOutput.from_address_and_value(ln_dummy_address(), funding_sat)
funding_tx.outputs().remove(dummy_output)
funding_tx.add_outputs([funding_output])
funding_tx.set_rbf(False)
if not funding_tx.is_segwit():
raise Exception('Funding transaction is not segwit')
funding_txid = funding_tx.txid()
assert funding_txid
funding_index = funding_tx.outputs().index(funding_output)
# build remote commitment transaction
channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_index)
outpoint = Outpoint(funding_txid, funding_index)
constraints = ChannelConstraints(
capacity=funding_sat,
is_initiator=True,
funding_txn_minimum_depth=funding_txn_minimum_depth
)
chan_dict = self.create_channel_storage(
channel_id, outpoint, local_config, remote_config, constraints)
chan = Channel(
chan_dict,
sweep_address=self.lnworker.sweep_address,
lnworker=self.lnworker,
initial_feerate=feerate
)
chan.storage['funding_inputs'] = [txin.prevout.to_json() for txin in funding_tx.inputs()]
if isinstance(self.transport, LNTransport):
chan.add_or_update_peer_addr(self.transport.peer_addr)
sig_64, _ = chan.sign_next_commitment()
self.temp_id_to_id[temp_channel_id] = channel_id
self.send_message("funding_created",
temporary_channel_id=temp_channel_id,
funding_txid=funding_txid_bytes,
funding_output_index=funding_index,
signature=sig_64)
self.funding_created_sent.add(channel_id)
# <- funding signed
payload = await self.wait_for_message('funding_signed', channel_id)
self.logger.info('received funding_signed')
remote_sig = payload['signature']
chan.receive_new_commitment(remote_sig, [])
chan.open_with_first_pcp(remote_per_commitment_point, remote_sig)
chan.set_state(ChannelState.OPENING)
self.lnworker.add_new_channel(chan)
return chan, funding_tx
def create_channel_storage(self, channel_id, outpoint, local_config, remote_config, constraints):
chan_dict = {
"node_id": self.pubkey.hex(),
"channel_id": channel_id.hex(),
"short_channel_id": None,
"funding_outpoint": outpoint,
"remote_config": remote_config,
"local_config": local_config,
"constraints": constraints,
"remote_update": None,
"state": ChannelState.PREOPENING.name,
'onion_keys': {},
'data_loss_protect_remote_pcp': {},
"log": {},
"revocation_store": {},
"static_remotekey_enabled": self.is_static_remotekey(), # stored because it cannot be "downgraded", per BOLT2
}
return StoredDict(chan_dict, self.lnworker.db if self.lnworker else None, [])
async def on_open_channel(self, payload):
"""Implements the channel acceptance flow.
<- open_channel message
-> accept_channel message
<- funding_created message
-> funding_signed message
Channel configurations are initialized in this method.
"""
# <- open_channel
if payload['chain_hash'] != constants.net.rev_genesis_bytes():
raise Exception('wrong chain_hash')
funding_sat = payload['funding_satoshis']
push_msat = payload['push_msat']
feerate = payload['feerate_per_kw'] # note: we are not validating this
temp_chan_id = payload['temporary_channel_id']
local_config = self.make_local_config(funding_sat, push_msat, REMOTE)
if funding_sat > LN_MAX_FUNDING_SAT:
raise Exception(
f"MUST set funding_satoshis to less than 2^24 satoshi. "
f"{funding_sat} sat > {LN_MAX_FUNDING_SAT}")
if push_msat > 1000 * funding_sat:
raise Exception(
f"MUST set push_msat to equal or less than 1000 * funding_satoshis: "
f"{push_msat} msat > {1000 * funding_sat} msat")
if funding_sat < lnutil.MIN_FUNDING_SAT:
raise Exception(f"funding_sat too low: {funding_sat} < {lnutil.MIN_FUNDING_SAT}")
upfront_shutdown_script = self.upfront_shutdown_script_from_payload(
payload, 'open')
remote_config = RemoteConfig(
payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']),
multisig_key=OnlyPubkeyKeypair(payload['funding_pubkey']),
htlc_basepoint=OnlyPubkeyKeypair(payload['htlc_basepoint']),
delayed_basepoint=OnlyPubkeyKeypair(payload['delayed_payment_basepoint']),
revocation_basepoint=OnlyPubkeyKeypair(payload['revocation_basepoint']),
to_self_delay=payload['to_self_delay'],
dust_limit_sat=payload['dust_limit_satoshis'],
max_htlc_value_in_flight_msat=payload['max_htlc_value_in_flight_msat'],
max_accepted_htlcs=payload['max_accepted_htlcs'],
initial_msat=funding_sat * 1000 - push_msat,
reserve_sat=payload['channel_reserve_satoshis'],
htlc_minimum_msat=payload['htlc_minimum_msat'],
next_per_commitment_point=payload['first_per_commitment_point'],
current_per_commitment_point=None,
upfront_shutdown_script=upfront_shutdown_script,
)
remote_config.validate_params(funding_sat=funding_sat)
# The receiving node MUST fail the channel if:
# the funder's amount for the initial commitment transaction is not
# sufficient for full fee payment.
if remote_config.initial_msat < calc_fees_for_commitment_tx(
num_htlcs=0,
feerate=feerate,
is_local_initiator=False)[REMOTE]:
raise Exception(
"the funder's amount for the initial commitment transaction "
"is not sufficient for full fee payment")
# The receiving node MUST fail the channel if:
# both to_local and to_remote amounts for the initial commitment transaction are
# less than or equal to channel_reserve_satoshis (see BOLT 3).
if (local_config.initial_msat <= 1000 * payload['channel_reserve_satoshis']
and remote_config.initial_msat <= 1000 * payload['channel_reserve_satoshis']):
raise Exception(
"both to_local and to_remote amounts for the initial commitment "
"transaction are less than or equal to channel_reserve_satoshis")
# note: we ignore payload['channel_flags'], which e.g. contains 'announce_channel'.
# Notably if the remote sets 'announce_channel' to True, we will ignore that too,
# but we will not play along with actually announcing the channel (so we keep it private).
# -> accept channel
# for the first commitment transaction
per_commitment_secret_first = get_per_commitment_secret_from_seed(
local_config.per_commitment_secret_seed,
RevocationStore.START_INDEX
)
per_commitment_point_first = secret_to_pubkey(
int.from_bytes(per_commitment_secret_first, 'big'))
min_depth = 3
self.send_message(
'accept_channel',
temporary_channel_id=temp_chan_id,
dust_limit_satoshis=local_config.dust_limit_sat,
max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat,
channel_reserve_satoshis=local_config.reserve_sat,
htlc_minimum_msat=local_config.htlc_minimum_msat,
minimum_depth=min_depth,
to_self_delay=local_config.to_self_delay,
max_accepted_htlcs=local_config.max_accepted_htlcs,
funding_pubkey=local_config.multisig_key.pubkey,
revocation_basepoint=local_config.revocation_basepoint.pubkey,
payment_basepoint=local_config.payment_basepoint.pubkey,
delayed_payment_basepoint=local_config.delayed_basepoint.pubkey,
htlc_basepoint=local_config.htlc_basepoint.pubkey,
first_per_commitment_point=per_commitment_point_first,
accept_channel_tlvs={
'upfront_shutdown_script':
{'shutdown_scriptpubkey': local_config.upfront_shutdown_script}
}
)
# <- funding created
funding_created = await self.wait_for_message('funding_created', temp_chan_id)
# -> funding signed
funding_idx = funding_created['funding_output_index']
funding_txid = bh2u(funding_created['funding_txid'][::-1])
channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_idx)
constraints = ChannelConstraints(
capacity=funding_sat,
is_initiator=False,
funding_txn_minimum_depth=min_depth
)
outpoint = Outpoint(funding_txid, funding_idx)
chan_dict = self.create_channel_storage(
channel_id, outpoint, local_config, remote_config, constraints)
chan = Channel(
chan_dict,
sweep_address=self.lnworker.sweep_address,
lnworker=self.lnworker,
initial_feerate=feerate
)
chan.storage['init_timestamp'] = int(time.time())
if isinstance(self.transport, LNTransport):
chan.add_or_update_peer_addr(self.transport.peer_addr)
remote_sig = funding_created['signature']
chan.receive_new_commitment(remote_sig, [])
sig_64, _ = chan.sign_next_commitment()
self.send_message('funding_signed',
channel_id=channel_id,
signature=sig_64,
)
self.funding_signed_sent.add(chan.channel_id)
chan.open_with_first_pcp(payload['first_per_commitment_point'], remote_sig)
chan.set_state(ChannelState.OPENING)
self.lnworker.add_new_channel(chan)
async def trigger_force_close(self, channel_id: bytes):
await self.initialized
latest_point = secret_to_pubkey(42) # we need a valid point (BOLT2)
self.send_message(
"channel_reestablish",
channel_id=channel_id,
next_commitment_number=0,
next_revocation_number=0,
your_last_per_commitment_secret=0,
my_current_per_commitment_point=latest_point)
async def reestablish_channel(self, chan: Channel):
await self.initialized
chan_id = chan.channel_id
assert ChannelState.PREOPENING < chan.get_state() < ChannelState.FORCE_CLOSING
if chan.peer_state != PeerState.DISCONNECTED:
self.logger.info(f'reestablish_channel was called but channel {chan.get_id_for_log()} '
f'already in peer_state {chan.peer_state!r}')
return
chan.peer_state = PeerState.REESTABLISHING
util.trigger_callback('channel', self.lnworker.wallet, chan)
# BOLT-02: "A node [...] upon disconnection [...] MUST reverse any uncommitted updates sent by the other side"
chan.hm.discard_unsigned_remote_updates()
# ctns
oldest_unrevoked_local_ctn = chan.get_oldest_unrevoked_ctn(LOCAL)
latest_local_ctn = chan.get_latest_ctn(LOCAL)
next_local_ctn = chan.get_next_ctn(LOCAL)
oldest_unrevoked_remote_ctn = chan.get_oldest_unrevoked_ctn(REMOTE)
latest_remote_ctn = chan.get_latest_ctn(REMOTE)
next_remote_ctn = chan.get_next_ctn(REMOTE)
assert self.features.supports(LnFeatures.OPTION_DATA_LOSS_PROTECT_OPT)
# send message
if chan.is_static_remotekey_enabled():
latest_secret, latest_point = chan.get_secret_and_point(LOCAL, 0)
else:
latest_secret, latest_point = chan.get_secret_and_point(LOCAL, latest_local_ctn)
if oldest_unrevoked_remote_ctn == 0:
last_rev_secret = 0
else:
last_rev_index = oldest_unrevoked_remote_ctn - 1
last_rev_secret = chan.revocation_store.retrieve_secret(RevocationStore.START_INDEX - last_rev_index)
self.send_message(
"channel_reestablish",
channel_id=chan_id,
next_commitment_number=next_local_ctn,
next_revocation_number=oldest_unrevoked_remote_ctn,
your_last_per_commitment_secret=last_rev_secret,
my_current_per_commitment_point=latest_point)
self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): sent channel_reestablish with '
f'(next_local_ctn={next_local_ctn}, '
f'oldest_unrevoked_remote_ctn={oldest_unrevoked_remote_ctn})')
while True:
try:
msg = await self.wait_for_message('channel_reestablish', chan_id)
break
except asyncio.TimeoutError:
self.logger.info('waiting to receive channel_reestablish...')
continue
their_next_local_ctn = msg["next_commitment_number"]
their_oldest_unrevoked_remote_ctn = msg["next_revocation_number"]
their_local_pcp = msg.get("my_current_per_commitment_point")
their_claim_of_our_last_per_commitment_secret = msg.get("your_last_per_commitment_secret")
self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): received channel_reestablish with '
f'(their_next_local_ctn={their_next_local_ctn}, '
f'their_oldest_unrevoked_remote_ctn={their_oldest_unrevoked_remote_ctn})')
# sanity checks of received values
if their_next_local_ctn < 0:
raise RemoteMisbehaving(f"channel reestablish: their_next_local_ctn < 0")
if their_oldest_unrevoked_remote_ctn < 0:
raise RemoteMisbehaving(f"channel reestablish: their_oldest_unrevoked_remote_ctn < 0")
# Replay un-acked local updates (including commitment_signed) byte-for-byte.
# If we have sent them a commitment signature that they "lost" (due to disconnect),
# we need to make sure we replay the same local updates, as otherwise they could
# end up with two (or more) signed valid commitment transactions at the same ctn.
# Multiple valid ctxs at the same ctn is a major headache for pre-signing spending txns,
# e.g. for watchtowers, hence we must ensure these ctxs coincide.
# We replay the local updates even if they were not yet committed.
unacked = chan.hm.get_unacked_local_updates()
n_replayed_msgs = 0
for ctn, messages in unacked.items():
if ctn < their_next_local_ctn:
# They claim to have received these messages and the corresponding
# commitment_signed, hence we must not replay them.
continue
for raw_upd_msg in messages:
self.transport.send_bytes(raw_upd_msg)
n_replayed_msgs += 1
self.logger.info(f'channel_reestablish ({chan.get_id_for_log()}): replayed {n_replayed_msgs} unacked messages')
we_are_ahead = False
they_are_ahead = False
# compare remote ctns
if next_remote_ctn != their_next_local_ctn:
if their_next_local_ctn == latest_remote_ctn and chan.hm.is_revack_pending(REMOTE):
# We replayed the local updates (see above), which should have contained a commitment_signed
# (due to is_revack_pending being true), and this should have remedied this situation.
pass
else:
self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): "
f"expected remote ctn {next_remote_ctn}, got {their_next_local_ctn}")
if their_next_local_ctn < next_remote_ctn:
we_are_ahead = True
else:
they_are_ahead = True
# compare local ctns
if oldest_unrevoked_local_ctn != their_oldest_unrevoked_remote_ctn:
if oldest_unrevoked_local_ctn - 1 == their_oldest_unrevoked_remote_ctn:
# A node:
# if next_revocation_number is equal to the commitment number of the last revoke_and_ack
# the receiving node sent, AND the receiving node hasn't already received a closing_signed:
# MUST re-send the revoke_and_ack.
last_secret, last_point = chan.get_secret_and_point(LOCAL, oldest_unrevoked_local_ctn - 1)
next_secret, next_point = chan.get_secret_and_point(LOCAL, oldest_unrevoked_local_ctn + 1)
self.send_message(
"revoke_and_ack",
channel_id=chan.channel_id,
per_commitment_secret=last_secret,
next_per_commitment_point=next_point)
else:
self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): "
f"expected local ctn {oldest_unrevoked_local_ctn}, got {their_oldest_unrevoked_remote_ctn}")
if their_oldest_unrevoked_remote_ctn < oldest_unrevoked_local_ctn:
we_are_ahead = True
else:
they_are_ahead = True
# option_data_loss_protect
def are_datalossprotect_fields_valid() -> bool:
if their_local_pcp is None or their_claim_of_our_last_per_commitment_secret is None:
return False
if their_oldest_unrevoked_remote_ctn > 0:
our_pcs, __ = chan.get_secret_and_point(LOCAL, their_oldest_unrevoked_remote_ctn - 1)
else:
assert their_oldest_unrevoked_remote_ctn == 0
our_pcs = bytes(32)
if our_pcs != their_claim_of_our_last_per_commitment_secret:
self.logger.error(f"channel_reestablish ({chan.get_id_for_log()}): "
f"(DLP) local PCS mismatch: {bh2u(our_pcs)} != {bh2u(their_claim_of_our_last_per_commitment_secret)}")
return False
if chan.is_static_remotekey_enabled():
return True
try:
__, our_remote_pcp = chan.get_secret_and_point(REMOTE, their_next_local_ctn - 1)
except RemoteCtnTooFarInFuture:
pass
else:
if our_remote_pcp != their_local_pcp:
self.logger.error(f"channel_reestablish ({chan.get_id_for_log()}): "
f"(DLP) remote PCP mismatch: {bh2u(our_remote_pcp)} != {bh2u(their_local_pcp)}")
return False
return True
if not are_datalossprotect_fields_valid():
raise RemoteMisbehaving("channel_reestablish: data loss protect fields invalid")
if they_are_ahead:
self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): "
f"remote is ahead of us! They should force-close. Remote PCP: {bh2u(their_local_pcp)}")
# data_loss_protect_remote_pcp is used in lnsweep
chan.set_data_loss_protect_remote_pcp(their_next_local_ctn - 1, their_local_pcp)
self.lnworker.save_channel(chan)
chan.peer_state = PeerState.BAD
return
elif we_are_ahead:
self.logger.warning(f"channel_reestablish ({chan.get_id_for_log()}): we are ahead of remote! trying to force-close.")
await self.lnworker.try_force_closing(chan_id)
return
chan.peer_state = PeerState.GOOD
if chan.is_funded() and their_next_local_ctn == next_local_ctn == 1:
self.send_funding_locked(chan)
# checks done
if chan.is_funded() and chan.config[LOCAL].funding_locked_received:
self.mark_open(chan)
util.trigger_callback('channel', self.lnworker.wallet, chan)
# if we have sent a previous shutdown, it must be retransmitted (Bolt2)
if chan.get_state() == ChannelState.SHUTDOWN:
await self.send_shutdown(chan)
def send_funding_locked(self, chan: Channel):
channel_id = chan.channel_id
per_commitment_secret_index = RevocationStore.START_INDEX - 1
per_commitment_point_second = secret_to_pubkey(int.from_bytes(
get_per_commitment_secret_from_seed(chan.config[LOCAL].per_commitment_secret_seed, per_commitment_secret_index), 'big'))
# note: if funding_locked was not yet received, we might send it multiple times
self.send_message("funding_locked", channel_id=channel_id, next_per_commitment_point=per_commitment_point_second)
if chan.is_funded() and chan.config[LOCAL].funding_locked_received:
self.mark_open(chan)
def on_funding_locked(self, chan: Channel, payload):
self.logger.info(f"on_funding_locked. channel: {bh2u(chan.channel_id)}")
if not chan.config[LOCAL].funding_locked_received:
their_next_point = payload["next_per_commitment_point"]
chan.config[REMOTE].next_per_commitment_point = their_next_point
chan.config[LOCAL].funding_locked_received = True
self.lnworker.save_channel(chan)
if chan.is_funded():
self.mark_open(chan)
def on_network_update(self, chan: Channel, funding_tx_depth: int):
"""
Only called when the channel is OPEN.
Runs on the Network thread.
"""
if not chan.config[LOCAL].was_announced and funding_tx_depth >= 6:
# don't announce our channels
# FIXME should this be a field in chan.local_state maybe?
return
chan.config[LOCAL].was_announced = True
self.lnworker.save_channel(chan)
coro = self.handle_announcements(chan)
asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
@log_exceptions
async def handle_announcements(self, chan: Channel):
h, local_node_sig, local_bitcoin_sig = self.send_announcement_signatures(chan)
announcement_signatures_msg = await self.announcement_signatures[chan.channel_id].get()
remote_node_sig = announcement_signatures_msg["node_signature"]
remote_bitcoin_sig = announcement_signatures_msg["bitcoin_signature"]
if not ecc.verify_signature(chan.config[REMOTE].multisig_key.pubkey, remote_bitcoin_sig, h):
raise Exception("bitcoin_sig invalid in announcement_signatures")
if not ecc.verify_signature(self.pubkey, remote_node_sig, h):
raise Exception("node_sig invalid in announcement_signatures")
node_sigs = [remote_node_sig, local_node_sig]
bitcoin_sigs = [remote_bitcoin_sig, local_bitcoin_sig]
bitcoin_keys = [chan.config[REMOTE].multisig_key.pubkey, chan.config[LOCAL].multisig_key.pubkey]
if self.node_ids[0] > self.node_ids[1]:
node_sigs.reverse()
bitcoin_sigs.reverse()
node_ids = list(reversed(self.node_ids))
bitcoin_keys.reverse()
else:
node_ids = self.node_ids
self.send_message("channel_announcement",
node_signatures_1=node_sigs[0],
node_signatures_2=node_sigs[1],
bitcoin_signature_1=bitcoin_sigs[0],
bitcoin_signature_2=bitcoin_sigs[1],
len=0,
#features not set (defaults to zeros)
chain_hash=constants.net.rev_genesis_bytes(),
short_channel_id=chan.short_channel_id,
node_id_1=node_ids[0],
node_id_2=node_ids[1],
bitcoin_key_1=bitcoin_keys[0],
bitcoin_key_2=bitcoin_keys[1]
)
def mark_open(self, chan: Channel):
assert chan.is_funded()
# only allow state transition from "FUNDED" to "OPEN"
old_state = chan.get_state()
if old_state == ChannelState.OPEN:
return
if old_state != ChannelState.FUNDED:
self.logger.info(f"cannot mark open ({chan.get_id_for_log()}), current state: {repr(old_state)}")
return
assert chan.config[LOCAL].funding_locked_received
chan.set_state(ChannelState.OPEN)
util.trigger_callback('channel', self.lnworker.wallet, chan)
# peer may have sent us a channel update for the incoming direction previously
pending_channel_update = self.orphan_channel_updates.get(chan.short_channel_id)
if pending_channel_update:
chan.set_remote_update(pending_channel_update['raw'])
self.logger.info(f"CHANNEL OPENING COMPLETED ({chan.get_id_for_log()})")
forwarding_enabled = self.network.config.get('lightning_forward_payments', False)
if forwarding_enabled:
# send channel_update of outgoing edge to peer,
# so that channel can be used to to receive payments
self.logger.info(f"sending channel update for outgoing edge ({chan.get_id_for_log()})")
chan_upd = chan.get_outgoing_gossip_channel_update()
self.transport.send_bytes(chan_upd)
def send_announcement_signatures(self, chan: Channel):
chan_ann = chan.construct_channel_announcement_without_sigs()
preimage = chan_ann[256+2:]
msg_hash = sha256d(preimage)
bitcoin_signature = ecc.ECPrivkey(chan.config[LOCAL].multisig_key.privkey).sign(msg_hash, sig_string_from_r_and_s)
node_signature = ecc.ECPrivkey(self.privkey).sign(msg_hash, sig_string_from_r_and_s)
self.send_message("announcement_signatures",
channel_id=chan.channel_id,
short_channel_id=chan.short_channel_id,
node_signature=node_signature,
bitcoin_signature=bitcoin_signature
)
return msg_hash, node_signature, bitcoin_signature
def on_update_fail_htlc(self, chan: Channel, payload):
htlc_id = payload["id"]
reason = payload["reason"]
self.logger.info(f"on_update_fail_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
chan.receive_fail_htlc(htlc_id, error_bytes=reason) # TODO handle exc and maybe fail channel (e.g. bad htlc_id)
self.maybe_send_commitment(chan)
def maybe_send_commitment(self, chan: Channel):
# REMOTE should revoke first before we can sign a new ctx
if chan.hm.is_revack_pending(REMOTE):
return
# if there are no changes, we will not (and must not) send a new commitment
if not chan.has_pending_changes(REMOTE):
return
self.logger.info(f'send_commitment. chan {chan.short_channel_id}. ctn: {chan.get_next_ctn(REMOTE)}.')
sig_64, htlc_sigs = chan.sign_next_commitment()
self.send_message("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs))
def pay(self, *,
route: 'LNPaymentRoute',
chan: Channel,
amount_msat: int,
total_msat: int,
payment_hash: bytes,
min_final_cltv_expiry: int,
payment_secret: bytes = None,
trampoline_onion=None) -> UpdateAddHtlc:
assert amount_msat > 0, "amount_msat is not greater zero"
assert len(route) > 0
if not chan.can_send_update_add_htlc():
raise PaymentFailure("Channel cannot send update_add_htlc")
# add features learned during "init" for direct neighbour:
route[0].node_features |= self.features
local_height = self.network.get_local_height()
final_cltv = local_height + min_final_cltv_expiry
hops_data, amount_msat, cltv = calc_hops_data_for_payment(
route,
amount_msat,
final_cltv,
total_msat=total_msat,
payment_secret=payment_secret)
num_hops = len(hops_data)
self.logger.info(f"lnpeer.pay len(route)={len(route)}")
for i in range(len(route)):
self.logger.info(f" {i}: edge={route[i].short_channel_id} hop_data={hops_data[i]!r}")
assert final_cltv <= cltv, (final_cltv, cltv)
session_key = os.urandom(32) # session_key
# if we are forwarding a trampoline payment, add trampoline onion
if trampoline_onion:
self.logger.info(f'adding trampoline onion to final payload')
trampoline_payload = hops_data[num_hops-2].payload
trampoline_payload["trampoline_onion_packet"] = {
"version": trampoline_onion.version,
"public_key": trampoline_onion.public_key,
"hops_data": trampoline_onion.hops_data,
"hmac": trampoline_onion.hmac
}
# create onion packet
payment_path_pubkeys = [x.node_id for x in route]
onion = new_onion_packet(payment_path_pubkeys, session_key, hops_data, associated_data=payment_hash) # must use another sessionkey
self.logger.info(f"starting payment. len(route)={len(hops_data)}.")
# create htlc
if cltv > local_height + lnutil.NBLOCK_CLTV_EXPIRY_TOO_FAR_INTO_FUTURE:
raise PaymentFailure(f"htlc expiry too far into future. (in {cltv-local_height} blocks)")
htlc = UpdateAddHtlc(amount_msat=amount_msat, payment_hash=payment_hash, cltv_expiry=cltv, timestamp=int(time.time()))
htlc = chan.add_htlc(htlc)
chan.set_onion_key(htlc.htlc_id, session_key) # should it be the outer onion secret?
self.logger.info(f"starting payment. htlc: {htlc}")
self.send_message(
"update_add_htlc",
channel_id=chan.channel_id,
id=htlc.htlc_id,
cltv_expiry=htlc.cltv_expiry,
amount_msat=htlc.amount_msat,
payment_hash=htlc.payment_hash,
onion_routing_packet=onion.to_bytes())
self.maybe_send_commitment(chan)
return htlc
def send_revoke_and_ack(self, chan: Channel):
self.logger.info(f'send_revoke_and_ack. chan {chan.short_channel_id}. ctn: {chan.get_oldest_unrevoked_ctn(LOCAL)}')
rev = chan.revoke_current_commitment()
self.lnworker.save_channel(chan)
self.send_message("revoke_and_ack",
channel_id=chan.channel_id,
per_commitment_secret=rev.per_commitment_secret,
next_per_commitment_point=rev.next_per_commitment_point)
self.maybe_send_commitment(chan)
def on_commitment_signed(self, chan: Channel, payload):
if chan.peer_state == PeerState.BAD:
return
self.logger.info(f'on_commitment_signed. chan {chan.short_channel_id}. ctn: {chan.get_next_ctn(LOCAL)}.')
# make sure there were changes to the ctx, otherwise the remote peer is misbehaving
if not chan.has_pending_changes(LOCAL):
# TODO if feerate changed A->B->A; so there were updates but the value is identical,
# then it might be legal to send a commitment_signature
# see https://github.com/lightningnetwork/lightning-rfc/pull/618
raise RemoteMisbehaving('received commitment_signed without pending changes')
# REMOTE should wait until we have revoked
if chan.hm.is_revack_pending(LOCAL):
raise RemoteMisbehaving('received commitment_signed before we revoked previous ctx')
data = payload["htlc_signature"]
htlc_sigs = list(chunks(data, 64))
chan.receive_new_commitment(payload["signature"], htlc_sigs)
self.send_revoke_and_ack(chan)
def on_update_fulfill_htlc(self, chan: Channel, payload):
preimage = payload["payment_preimage"]
payment_hash = sha256(preimage)
htlc_id = payload["id"]
self.logger.info(f"on_update_fulfill_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
chan.receive_htlc_settle(preimage, htlc_id) # TODO handle exc and maybe fail channel (e.g. bad htlc_id)
self.lnworker.save_preimage(payment_hash, preimage)
self.maybe_send_commitment(chan)
def on_update_fail_malformed_htlc(self, chan: Channel, payload):
htlc_id = payload["id"]
failure_code = payload["failure_code"]
self.logger.info(f"on_update_fail_malformed_htlc. chan {chan.get_id_for_log()}. "
f"htlc_id {htlc_id}. failure_code={failure_code}")
if failure_code & OnionFailureCodeMetaFlag.BADONION == 0:
asyncio.ensure_future(self.lnworker.try_force_closing(chan.channel_id))
raise RemoteMisbehaving(f"received update_fail_malformed_htlc with unexpected failure code: {failure_code}")
reason = OnionRoutingFailure(code=failure_code, data=payload["sha256_of_onion"])
chan.receive_fail_htlc(htlc_id, error_bytes=None, reason=reason)
self.maybe_send_commitment(chan)
def on_update_add_htlc(self, chan: Channel, payload):
payment_hash = payload["payment_hash"]
htlc_id = payload["id"]
self.logger.info(f"on_update_add_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
cltv_expiry = payload["cltv_expiry"]
amount_msat_htlc = payload["amount_msat"]
onion_packet = payload["onion_routing_packet"]
if chan.get_state() != ChannelState.OPEN:
raise RemoteMisbehaving(f"received update_add_htlc while chan.get_state() != OPEN. state was {chan.get_state()!r}")
if cltv_expiry > bitcoin.NLOCKTIME_BLOCKHEIGHT_MAX:
asyncio.ensure_future(self.lnworker.try_force_closing(chan.channel_id))
raise RemoteMisbehaving(f"received update_add_htlc with cltv_expiry > BLOCKHEIGHT_MAX. value was {cltv_expiry}")
# add htlc
htlc = UpdateAddHtlc(
amount_msat=amount_msat_htlc,
payment_hash=payment_hash,
cltv_expiry=cltv_expiry,
timestamp=int(time.time()),
htlc_id=htlc_id)
chan.receive_htlc(htlc, onion_packet)
util.trigger_callback('htlc_added', chan, htlc, RECEIVED)
def maybe_forward_htlc(
self,
*,
htlc: UpdateAddHtlc,
processed_onion: ProcessedOnionPacket,
) -> Tuple[bytes, int]:
# Forward HTLC
# FIXME: there are critical safety checks MISSING here
forwarding_enabled = self.network.config.get('lightning_forward_payments', False)
if not forwarding_enabled:
self.logger.info(f"forwarding is disabled. failing htlc.")
raise OnionRoutingFailure(code=OnionFailureCode.PERMANENT_CHANNEL_FAILURE, data=b'')
chain = self.network.blockchain()
if chain.is_tip_stale():
raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
try:
next_chan_scid = processed_onion.hop_data.payload["short_channel_id"]["short_channel_id"]
except:
raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
next_chan = self.lnworker.get_channel_by_short_id(next_chan_scid)
local_height = chain.height()
if next_chan is None:
self.logger.info(f"cannot forward htlc. cannot find next_chan {next_chan_scid}")
raise OnionRoutingFailure(code=OnionFailureCode.UNKNOWN_NEXT_PEER, data=b'')
outgoing_chan_upd = next_chan.get_outgoing_gossip_channel_update()[2:]
outgoing_chan_upd_len = len(outgoing_chan_upd).to_bytes(2, byteorder="big")
if not next_chan.can_send_update_add_htlc():
self.logger.info(f"cannot forward htlc. next_chan {next_chan_scid} cannot send ctx updates. "
f"chan state {next_chan.get_state()!r}, peer state: {next_chan.peer_state!r}")
data = outgoing_chan_upd_len + outgoing_chan_upd
raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=data)
try:
next_cltv_expiry = processed_onion.hop_data.payload["outgoing_cltv_value"]["outgoing_cltv_value"]
except:
raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
if htlc.cltv_expiry - next_cltv_expiry < next_chan.forwarding_cltv_expiry_delta:
data = htlc.cltv_expiry.to_bytes(4, byteorder="big") + outgoing_chan_upd_len + outgoing_chan_upd
raise OnionRoutingFailure(code=OnionFailureCode.INCORRECT_CLTV_EXPIRY, data=data)
if htlc.cltv_expiry - lnutil.MIN_FINAL_CLTV_EXPIRY_ACCEPTED <= local_height \
or next_cltv_expiry <= local_height:
data = outgoing_chan_upd_len + outgoing_chan_upd
raise OnionRoutingFailure(code=OnionFailureCode.EXPIRY_TOO_SOON, data=data)
if max(htlc.cltv_expiry, next_cltv_expiry) > local_height + lnutil.NBLOCK_CLTV_EXPIRY_TOO_FAR_INTO_FUTURE:
raise OnionRoutingFailure(code=OnionFailureCode.EXPIRY_TOO_FAR, data=b'')
try:
next_amount_msat_htlc = processed_onion.hop_data.payload["amt_to_forward"]["amt_to_forward"]
except:
raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
forwarding_fees = fee_for_edge_msat(
forwarded_amount_msat=next_amount_msat_htlc,
fee_base_msat=next_chan.forwarding_fee_base_msat,
fee_proportional_millionths=next_chan.forwarding_fee_proportional_millionths)
if htlc.amount_msat - next_amount_msat_htlc < forwarding_fees:
data = next_amount_msat_htlc.to_bytes(8, byteorder="big") + outgoing_chan_upd_len + outgoing_chan_upd
raise OnionRoutingFailure(code=OnionFailureCode.FEE_INSUFFICIENT, data=data)
self.logger.info(f'forwarding htlc to {next_chan.node_id}')
next_htlc = UpdateAddHtlc(
amount_msat=next_amount_msat_htlc,
payment_hash=htlc.payment_hash,
cltv_expiry=next_cltv_expiry,
timestamp=int(time.time()))
next_htlc = next_chan.add_htlc(next_htlc)
next_peer = self.lnworker.peers[next_chan.node_id]
try:
next_peer.send_message(
"update_add_htlc",
channel_id=next_chan.channel_id,
id=next_htlc.htlc_id,
cltv_expiry=next_cltv_expiry,
amount_msat=next_amount_msat_htlc,
payment_hash=next_htlc.payment_hash,
onion_routing_packet=processed_onion.next_packet.to_bytes()
)
except BaseException as e:
self.logger.info(f"failed to forward htlc: error sending message. {e}")
data = outgoing_chan_upd_len + outgoing_chan_upd
raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_CHANNEL_FAILURE, data=data)
return next_chan_scid, next_htlc.htlc_id
def maybe_forward_trampoline(
self, *,
chan: Channel,
htlc: UpdateAddHtlc,
trampoline_onion: ProcessedOnionPacket):
payload = trampoline_onion.hop_data.payload
payment_hash = htlc.payment_hash
try:
outgoing_node_id = payload["outgoing_node_id"]["outgoing_node_id"]
payment_secret = payload["payment_data"]["payment_secret"]
amt_to_forward = payload["amt_to_forward"]["amt_to_forward"]
cltv_from_onion = payload["outgoing_cltv_value"]["outgoing_cltv_value"]
if "invoice_features" in payload:
self.logger.info('forward_trampoline: legacy')
next_trampoline_onion = None
invoice_features = payload["invoice_features"]["invoice_features"]
invoice_routing_info = payload["invoice_routing_info"]["invoice_routing_info"]
else:
self.logger.info('forward_trampoline: end-to-end')
invoice_features = 0
next_trampoline_onion = trampoline_onion.next_packet
except Exception as e:
self.logger.exception('')
raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
trampoline_cltv_delta = htlc.cltv_expiry - cltv_from_onion
trampoline_fee = htlc.amount_msat - amt_to_forward
@log_exceptions
async def forward_trampoline_payment():
try:
await self.lnworker.pay_to_node(
node_pubkey=outgoing_node_id,
payment_hash=payment_hash,
payment_secret=payment_secret,
amount_to_pay=amt_to_forward,
min_cltv_expiry=cltv_from_onion,
r_tags=[],
t_tags=[],
invoice_features=invoice_features,
trampoline_onion=next_trampoline_onion,
trampoline_fee=trampoline_fee,
trampoline_cltv_delta=trampoline_cltv_delta,
attempts=1)
except OnionRoutingFailure as e:
# FIXME: cannot use payment_hash as key
self.lnworker.trampoline_forwarding_failures[payment_hash] = e
except PaymentFailure as e:
# FIXME: adapt the error code
error_reason = OnionRoutingFailure(code=OnionFailureCode.TRAMPOLINE_FEE_INSUFFICIENT, data=b'')
self.lnworker.trampoline_forwarding_failures[payment_hash] = error_reason
asyncio.ensure_future(forward_trampoline_payment())
def maybe_fulfill_htlc(
self, *,
chan: Channel,
htlc: UpdateAddHtlc,
processed_onion: ProcessedOnionPacket,
is_trampoline: bool = False) -> Optional[bytes]:
"""As a final recipient of an HTLC, decide if we should fulfill it.
Returns the preimage if yes, or None.
"""
def log_fail_reason(reason: str):
self.logger.info(f"maybe_fulfill_htlc. will FAIL HTLC: chan {chan.short_channel_id}. "
f"{reason}. htlc={str(htlc)}. onion_payload={processed_onion.hop_data.payload}")
try:
amt_to_forward = processed_onion.hop_data.payload["amt_to_forward"]["amt_to_forward"]
except:
log_fail_reason(f"'amt_to_forward' missing from onion")
raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
# Check that our blockchain tip is sufficiently recent so that we have an approx idea of the height.
# We should not release the preimage for an HTLC that its sender could already time out as
# then they might try to force-close and it becomes a race.
chain = self.network.blockchain()
if chain.is_tip_stale():
log_fail_reason(f"our chain tip is stale")
raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
local_height = chain.height()
exc_incorrect_or_unknown_pd = OnionRoutingFailure(
code=OnionFailureCode.INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS,
data=amt_to_forward.to_bytes(8, byteorder="big") + local_height.to_bytes(4, byteorder="big"))
if local_height + MIN_FINAL_CLTV_EXPIRY_ACCEPTED > htlc.cltv_expiry:
log_fail_reason(f"htlc.cltv_expiry is unreasonably close")
raise exc_incorrect_or_unknown_pd
try:
cltv_from_onion = processed_onion.hop_data.payload["outgoing_cltv_value"]["outgoing_cltv_value"]
except:
log_fail_reason(f"'outgoing_cltv_value' missing from onion")
raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_PAYLOAD, data=b'\x00\x00\x00')
if not is_trampoline:
if cltv_from_onion != htlc.cltv_expiry:
log_fail_reason(f"cltv_from_onion != htlc.cltv_expiry")
raise OnionRoutingFailure(
code=OnionFailureCode.FINAL_INCORRECT_CLTV_EXPIRY,
data=htlc.cltv_expiry.to_bytes(4, byteorder="big"))
try:
total_msat = processed_onion.hop_data.payload["payment_data"]["total_msat"]
except:
total_msat = amt_to_forward # fall back to "amt_to_forward"
if not is_trampoline and amt_to_forward != htlc.amount_msat:
log_fail_reason(f"amt_to_forward != htlc.amount_msat")
raise OnionRoutingFailure(
code=OnionFailureCode.FINAL_INCORRECT_HTLC_AMOUNT,
data=htlc.amount_msat.to_bytes(8, byteorder="big"))
# if there is a trampoline_onion, perform the above checks on it
if processed_onion.trampoline_onion_packet:
return
info = self.lnworker.get_payment_info(htlc.payment_hash)
if info is None:
log_fail_reason(f"no payment_info found for RHASH {htlc.payment_hash.hex()}")
raise exc_incorrect_or_unknown_pd
preimage = self.lnworker.get_preimage(htlc.payment_hash)
try:
payment_secret_from_onion = processed_onion.hop_data.payload["payment_data"]["payment_secret"]
except:
if total_msat > amt_to_forward:
# payment_secret is required for MPP
log_fail_reason(f"'payment_secret' missing from onion")
raise exc_incorrect_or_unknown_pd
# TODO fail here if invoice has set PAYMENT_SECRET_REQ
else:
if payment_secret_from_onion != derive_payment_secret_from_payment_preimage(preimage):
log_fail_reason(f'incorrect payment secret {payment_secret_from_onion.hex()} != {derive_payment_secret_from_payment_preimage(preimage).hex()}')
raise exc_incorrect_or_unknown_pd
invoice_msat = info.amount_msat
if not (invoice_msat is None or invoice_msat <= total_msat <= 2 * invoice_msat):
log_fail_reason(f"total_msat={total_msat} too different from invoice_msat={invoice_msat}")
raise exc_incorrect_or_unknown_pd
mpp_status = self.lnworker.add_received_htlc(chan.short_channel_id, htlc, total_msat)
if mpp_status is True:
self.logger.info(f"maybe_fulfill_htlc. will FULFILL HTLC: chan {chan.short_channel_id}. htlc={str(htlc)}")
return preimage
elif mpp_status is False:
log_fail_reason(f"MPP_TIMEOUT")
raise OnionRoutingFailure(code=OnionFailureCode.MPP_TIMEOUT, data=b'')
else:
return None
def fulfill_htlc(self, chan: Channel, htlc_id: int, preimage: bytes):
self.logger.info(f"_fulfill_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}")
assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}"
assert chan.hm.is_add_htlc_irrevocably_committed_yet(htlc_proposer=REMOTE, htlc_id=htlc_id)
chan.settle_htlc(preimage, htlc_id)
self.send_message("update_fulfill_htlc",
channel_id=chan.channel_id,
id=htlc_id,
payment_preimage=preimage)
def fail_htlc(self, *, chan: Channel, htlc_id: int, error_bytes: bytes):
self.logger.info(f"fail_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}.")
assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}"
chan.fail_htlc(htlc_id)
self.send_message(
"update_fail_htlc",
channel_id=chan.channel_id,
id=htlc_id,
len=len(error_bytes),
reason=error_bytes)
def fail_malformed_htlc(self, *, chan: Channel, htlc_id: int, reason: OnionRoutingFailure):
self.logger.info(f"fail_malformed_htlc. chan {chan.short_channel_id}. htlc_id {htlc_id}.")
assert chan.can_send_ctx_updates(), f"cannot send updates: {chan.short_channel_id}"
chan.fail_htlc(htlc_id)
if not (reason.code & OnionFailureCodeMetaFlag.BADONION and len(reason.data) == 32):
raise Exception(f"unexpected reason when sending 'update_fail_malformed_htlc': {reason!r}")
self.send_message(
"update_fail_malformed_htlc",
channel_id=chan.channel_id,
id=htlc_id,
sha256_of_onion=reason.data,
failure_code=reason.code)
def on_revoke_and_ack(self, chan: Channel, payload):
if chan.peer_state == PeerState.BAD:
return
self.logger.info(f'on_revoke_and_ack. chan {chan.short_channel_id}. ctn: {chan.get_oldest_unrevoked_ctn(REMOTE)}')
rev = RevokeAndAck(payload["per_commitment_secret"], payload["next_per_commitment_point"])
chan.receive_revocation(rev)
self.lnworker.save_channel(chan)
self.maybe_send_commitment(chan)
def on_update_fee(self, chan: Channel, payload):
feerate = payload["feerate_per_kw"]
chan.update_fee(feerate, False)
async def maybe_update_fee(self, chan: Channel):
"""
called when our fee estimates change
"""
if not chan.can_send_ctx_updates():
return
feerate_per_kw = self.lnworker.current_feerate_per_kw()
if not chan.constraints.is_initiator:
if constants.net is not constants.BitcoinRegtest:
chan_feerate = chan.get_latest_feerate(LOCAL)
ratio = chan_feerate / feerate_per_kw
if ratio < 0.5:
# Note that we trust the Electrum server about fee rates
# Thus, automated force-closing might not be a good idea
# Maybe we should display something in the GUI instead
self.logger.warning(
f"({chan.get_id_for_log()}) feerate is {chan_feerate} sat/kw, "
f"current recommended feerate is {feerate_per_kw} sat/kw, consider force closing!")
return
chan_fee = chan.get_next_feerate(REMOTE)
if feerate_per_kw < chan_fee / 2:
self.logger.info("FEES HAVE FALLEN")
elif feerate_per_kw > chan_fee * 2:
self.logger.info("FEES HAVE RISEN")
else:
return
self.logger.info(f"(chan: {chan.get_id_for_log()}) current pending feerate {chan_fee}. "
f"new feerate {feerate_per_kw}")
chan.update_fee(feerate_per_kw, True)
self.send_message(
"update_fee",
channel_id=chan.channel_id,
feerate_per_kw=feerate_per_kw)
self.maybe_send_commitment(chan)
@log_exceptions
async def close_channel(self, chan_id: bytes):
chan = self.channels[chan_id]
self.shutdown_received[chan_id] = asyncio.Future()
await self.send_shutdown(chan)
payload = await self.shutdown_received[chan_id]
txid = await self._shutdown(chan, payload, is_local=True)
self.logger.info(f'({chan.get_id_for_log()}) Channel closed {txid}')
return txid
async def on_shutdown(self, chan: Channel, payload):
their_scriptpubkey = payload['scriptpubkey']
their_upfront_scriptpubkey = chan.config[REMOTE].upfront_shutdown_script
# BOLT-02 check if they use the upfront shutdown script they advertized
if their_upfront_scriptpubkey:
if not (their_scriptpubkey == their_upfront_scriptpubkey):
raise UpfrontShutdownScriptViolation("remote didn't use upfront shutdown script it commited to in channel opening")
# BOLT-02 restrict the scriptpubkey to some templates:
if not (match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_WITNESS_V0)
or match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_P2SH)
or match_script_against_template(their_scriptpubkey, transaction.SCRIPTPUBKEY_TEMPLATE_P2PKH)):
raise Exception(f'scriptpubkey in received shutdown message does not conform to any template: {their_scriptpubkey.hex()}')
chan_id = chan.channel_id
if chan_id in self.shutdown_received:
self.shutdown_received[chan_id].set_result(payload)
else:
chan = self.channels[chan_id]
await self.send_shutdown(chan)
txid = await self._shutdown(chan, payload, is_local=False)
self.logger.info(f'({chan.get_id_for_log()}) Channel closed by remote peer {txid}')
def can_send_shutdown(self, chan: Channel):
if chan.get_state() >= ChannelState.OPENING:
return True
if chan.constraints.is_initiator and chan.channel_id in self.funding_created_sent:
return True
if not chan.constraints.is_initiator and chan.channel_id in self.funding_signed_sent:
return True
return False
async def send_shutdown(self, chan: Channel):
if not self.can_send_shutdown(chan):
raise Exception('cannot send shutdown')
if chan.config[LOCAL].upfront_shutdown_script:
scriptpubkey = chan.config[LOCAL].upfront_shutdown_script
else:
scriptpubkey = bfh(bitcoin.address_to_script(chan.sweep_address))
assert scriptpubkey
# wait until no more pending updates (bolt2)
chan.set_can_send_ctx_updates(False)
while chan.has_pending_changes(REMOTE):
await asyncio.sleep(0.1)
self.send_message('shutdown', channel_id=chan.channel_id, len=len(scriptpubkey), scriptpubkey=scriptpubkey)
chan.set_state(ChannelState.SHUTDOWN)
# can fullfill or fail htlcs. cannot add htlcs, because state != OPEN
chan.set_can_send_ctx_updates(True)
@log_exceptions
async def _shutdown(self, chan: Channel, payload, *, is_local: bool):
# wait until no HTLCs remain in either commitment transaction
while len(chan.hm.htlcs(LOCAL)) + len(chan.hm.htlcs(REMOTE)) > 0:
self.logger.info(f'(chan: {chan.short_channel_id}) waiting for htlcs to settle...')
await asyncio.sleep(1)
# if no HTLCs remain, we must not send updates
chan.set_can_send_ctx_updates(False)
their_scriptpubkey = payload['scriptpubkey']
if chan.config[LOCAL].upfront_shutdown_script:
our_scriptpubkey = chan.config[LOCAL].upfront_shutdown_script
else:
our_scriptpubkey = bfh(bitcoin.address_to_script(chan.sweep_address))
assert our_scriptpubkey
# estimate fee of closing tx
our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=0)
fee_rate = self.network.config.fee_per_kb()
our_fee = fee_rate * closing_tx.estimated_size() // 1000
# BOLT2: The sending node MUST set fee less than or equal to the base fee of the final ctx
max_fee = chan.get_latest_fee(LOCAL if is_local else REMOTE)
our_fee = min(our_fee, max_fee)
drop_remote = False
def send_closing_signed():
our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=our_fee, drop_remote=drop_remote)
self.send_message('closing_signed', channel_id=chan.channel_id, fee_satoshis=our_fee, signature=our_sig)
def verify_signature(tx, sig):
their_pubkey = chan.config[REMOTE].multisig_key.pubkey
preimage_hex = tx.serialize_preimage(0)
pre_hash = sha256d(bfh(preimage_hex))
return ecc.verify_signature(their_pubkey, sig, pre_hash)
# the funder sends the first 'closing_signed' message
if chan.constraints.is_initiator:
send_closing_signed()
# negotiate fee
while True:
# FIXME: the remote SHOULD send closing_signed, but some don't.
cs_payload = await self.wait_for_message('closing_signed', chan.channel_id)
their_fee = cs_payload['fee_satoshis']
if their_fee > max_fee:
raise Exception(f'the proposed fee exceeds the base fee of the latest commitment transaction {is_local, their_fee, max_fee}')
their_sig = cs_payload['signature']
# verify their sig: they might have dropped their output
our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=their_fee, drop_remote=False)
if verify_signature(closing_tx, their_sig):
drop_remote = False
else:
our_sig, closing_tx = chan.make_closing_tx(our_scriptpubkey, their_scriptpubkey, fee_sat=their_fee, drop_remote=True)
if verify_signature(closing_tx, their_sig):
drop_remote = True
else:
raise Exception('failed to verify their signature')
# Agree if difference is lower or equal to one (see below)
if abs(our_fee - their_fee) < 2:
our_fee = their_fee
break
# this will be "strictly between" (as in BOLT2) previous values because of the above
our_fee = (our_fee + their_fee) // 2
# another round
send_closing_signed()
# the non-funder replies
if not chan.constraints.is_initiator:
send_closing_signed()
# add signatures
closing_tx.add_signature_to_txin(
txin_idx=0,
signing_pubkey=chan.config[LOCAL].multisig_key.pubkey.hex(),
sig=bh2u(der_sig_from_sig_string(our_sig) + b'\x01'))
closing_tx.add_signature_to_txin(
txin_idx=0,
signing_pubkey=chan.config[REMOTE].multisig_key.pubkey.hex(),
sig=bh2u(der_sig_from_sig_string(their_sig) + b'\x01'))
# save local transaction and set state
try:
self.lnworker.wallet.add_transaction(closing_tx)
except UnrelatedTransactionException:
pass # this can happen if (~all the balance goes to REMOTE)
chan.set_state(ChannelState.CLOSING)
# broadcast
await self.network.try_broadcasting(closing_tx, 'closing')
return closing_tx.txid()
async def htlc_switch(self):
await self.initialized
while True:
await asyncio.sleep(0.1)
self.ping_if_required()
for chan_id, chan in self.channels.items():
if not chan.can_send_ctx_updates():
continue
self.maybe_send_commitment(chan)
done = set()
unfulfilled = chan.hm.log.get('unfulfilled_htlcs', {})
for htlc_id, (local_ctn, remote_ctn, onion_packet_hex, forwarding_info) in unfulfilled.items():
if not chan.hm.is_add_htlc_irrevocably_committed_yet(htlc_proposer=REMOTE, htlc_id=htlc_id):
continue
htlc = chan.hm.get_htlc_by_id(REMOTE, htlc_id)
error_reason = None # type: Optional[OnionRoutingFailure]
error_bytes = None # type: Optional[bytes]
preimage = None
fw_info = None
onion_packet_bytes = bytes.fromhex(onion_packet_hex)
onion_packet = None
try:
onion_packet = OnionPacket.from_bytes(onion_packet_bytes)
except OnionRoutingFailure as e:
error_reason = e
else:
try:
preimage, fw_info, error_bytes = self.process_unfulfilled_htlc(
chan=chan,
htlc=htlc,
forwarding_info=forwarding_info,
onion_packet_bytes=onion_packet_bytes,
onion_packet=onion_packet)
except OnionRoutingFailure as e:
error_bytes = construct_onion_error(e, onion_packet, our_onion_private_key=self.privkey)
if fw_info:
unfulfilled[htlc_id] = local_ctn, remote_ctn, onion_packet_hex, fw_info
elif preimage or error_reason or error_bytes:
if preimage:
await self.lnworker.enable_htlc_settle.wait()
self.fulfill_htlc(chan, htlc.htlc_id, preimage)
elif error_bytes:
self.fail_htlc(
chan=chan,
htlc_id=htlc.htlc_id,
error_bytes=error_bytes)
else:
self.fail_malformed_htlc(
chan=chan,
htlc_id=htlc.htlc_id,
reason=error_reason)
done.add(htlc_id)
# cleanup
for htlc_id in done:
unfulfilled.pop(htlc_id)
def process_unfulfilled_htlc(
self,
*,
chan: Channel,
htlc: UpdateAddHtlc,
forwarding_info: Tuple[str, int],
onion_packet_bytes: bytes,
onion_packet: OnionPacket,
) -> Tuple[Optional[bytes], Union[bool, None, Tuple[str, int]], Optional[bytes]]:
"""
returns either preimage or fw_info or error_bytes or (None, None, None)
raise an OnionRoutingFailure if we need to fail the htlc
"""
payment_hash = htlc.payment_hash
processed_onion = self.process_onion_packet(
onion_packet,
payment_hash=payment_hash,
onion_packet_bytes=onion_packet_bytes)
if processed_onion.are_we_final:
preimage = self.maybe_fulfill_htlc(
chan=chan,
htlc=htlc,
processed_onion=processed_onion)
# trampoline forwarding
if not preimage and processed_onion.trampoline_onion_packet:
if not forwarding_info:
trampoline_onion = self.process_onion_packet(
processed_onion.trampoline_onion_packet,
payment_hash=htlc.payment_hash,
onion_packet_bytes=onion_packet_bytes,
is_trampoline=True)
if trampoline_onion.are_we_final:
preimage = self.maybe_fulfill_htlc(
chan=chan,
htlc=htlc,
processed_onion=trampoline_onion,
is_trampoline=True)
else:
self.maybe_forward_trampoline(
chan=chan,
htlc=htlc,
trampoline_onion=trampoline_onion)
# return True so that this code gets executed only once
return None, True, None
else:
preimage = self.lnworker.get_preimage(payment_hash)
error_reason = self.lnworker.trampoline_forwarding_failures.pop(payment_hash, None)
if error_reason:
self.logger.info(f'trampoline forwarding failure {error_reason}')
raise error_reason
elif not forwarding_info:
next_chan_id, next_htlc_id = self.maybe_forward_htlc(
htlc=htlc,
processed_onion=processed_onion)
fw_info = (next_chan_id.hex(), next_htlc_id)
return None, fw_info, None
else:
preimage = self.lnworker.get_preimage(payment_hash)
next_chan_id_hex, htlc_id = forwarding_info
next_chan = self.lnworker.get_channel_by_short_id(bytes.fromhex(next_chan_id_hex))
if next_chan:
error_bytes, error_reason = next_chan.pop_fail_htlc_reason(htlc_id)
if error_bytes:
return None, None, error_bytes
if error_reason:
raise error_reason
if preimage:
return preimage, None, None
return None, None, None
def process_onion_packet(
self,
onion_packet: OnionPacket,
*,
payment_hash: bytes,
onion_packet_bytes: bytes,
is_trampoline: bool = False,
) -> ProcessedOnionPacket:
failure_data = sha256(onion_packet_bytes)
try:
processed_onion = process_onion_packet(
onion_packet,
associated_data=payment_hash,
our_onion_private_key=self.privkey,
is_trampoline=is_trampoline)
except UnsupportedOnionPacketVersion:
raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
except InvalidOnionPubkey:
raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_KEY, data=failure_data)
except InvalidOnionMac:
raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_HMAC, data=failure_data)
except Exception as e:
self.logger.info(f"error processing onion packet: {e!r}")
raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
if self.network.config.get('test_fail_malformed_htlc'):
raise OnionRoutingFailure(code=OnionFailureCode.INVALID_ONION_VERSION, data=failure_data)
if self.network.config.get('test_fail_htlcs_with_temp_node_failure'):
raise OnionRoutingFailure(code=OnionFailureCode.TEMPORARY_NODE_FAILURE, data=b'')
return processed_onion