Browse Source

lnpeer: some rework of error/warning message handling

- rm the `_get_channel_ids` abstraction as each of its usages needs subtle differences.
  Some code duplication is preferable in this case.
- raise exceptions in `wait_for_message`, so that callers such as the GUI can show user-feedback
- on_error/on_warning were dropping messages with temp_chan_ids if they were not stored in
  `temp_id_to_id` - which was only done once the mapping was known (so the normal chan_id was known).
  To fix this, we now store temp_chan_ids into `temp_id_to_id` early.
- `schedule_force_closing` only works if the chan_id is already in `channels`

related:
https://github.com/spesmilo/electrum/pull/7645 (and related commits)

-----

example before commit:
```
D/P | lnpeer.Peer.[LNWallet, 03933884aa-3b53e4ab] | Sending OPEN_CHANNEL
D/P | lnpeer.Peer.[LNWallet, 03933884aa-3b53e4ab] | Received ERROR
I/P | lnpeer.Peer.[LNWallet, 03933884aa-3b53e4ab] | remote peer sent error [DO NOT TRUST THIS MESSAGE]: invalid funding_satoshis=10000 sat (min=400000 sat max=1500000000 sat)

E | gui.qt.main_window.[test_segwit_2] | Could not open channel
Traceback (most recent call last):
  File "...\electrum\electrum\util.py", line 1160, in wrapper
    return await func(*args, **kwargs)
  File "...\electrum\electrum\lnpeer.py", line 661, in wrapper
    return await func(self, *args, **kwargs)
  File "...\electrum\electrum\lnpeer.py", line 742, in channel_establishment_flow
    payload = await self.wait_for_message('accept_channel', temp_channel_id)  #
  File "...\electrum\electrum\lnpeer.py", line 315, in wait_for_message
    name, payload = await asyncio.wait_for(q.get(), LN_P2P_NETWORK_TIMEOUT)
  File "...\Python39\lib\asyncio\tasks.py", line 468, in wait_for
    await waiter
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "...\Python39\lib\asyncio\tasks.py", line 492, in wait_for
    fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "...\electrum\electrum\gui\qt\util.py", line 914, in run
    result = task.task()
  File "...\electrum\electrum\gui\qt\main_window.py", line 1875, in task
    return self.wallet.lnworker.open_channel(
  File "...\electrum\electrum\lnworker.py", line 1075, in open_channel
    chan, funding_tx = fut.result()
  File "...\Python39\lib\concurrent\futures\_base.py", line 445, in result
    return self.__get_result()
  File "...\Python39\lib\concurrent\futures\_base.py", line 390, in __get_result
    raise self._exception
  File "...\electrum\electrum\util.py", line 1160, in wrapper
    return await func(*args, **kwargs)
  File "...\electrum\electrum\lnworker.py", line 1006, in _open_channel_coroutine
    chan, funding_tx = await asyncio.wait_for(coro, LN_P2P_NETWORK_TIMEOUT)
  File "...\Python39\lib\asyncio\tasks.py", line 494, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError
```

example after commit:
```
D/P | lnpeer.Peer.[LNWallet, 03933884aa-ff3a866f] | Sending OPEN_CHANNEL
D/P | lnpeer.Peer.[LNWallet, 03933884aa-ff3a866f] | Received ERROR
I/P | lnpeer.Peer.[LNWallet, 03933884aa-ff3a866f] | remote peer sent error [DO NOT TRUST THIS MESSAGE]: invalid funding_satoshis=10000 sat (min=400000 sat max=1500000000 sat). chan_id=124ca21fa6aa2993430ad71f465f0d44731ef87f7478e4b31327e4459b5a3988
E | lnworker.LNWallet.[test_segwit_2] | Exception in _open_channel_coroutine: GracefulDisconnect('remote peer sent error [DO NOT TRUST THIS MESSAGE]: invalid funding_satoshis=10000 sat (min=400000 sat max=1500000000 sat)')
Traceback (most recent call last):
  File "...\electrum\electrum\util.py", line 1160, in wrapper
    return await func(*args, **kwargs)
  File "...\electrum\electrum\lnworker.py", line 1006, in _open_channel_coroutine
    chan, funding_tx = await asyncio.wait_for(coro, LN_P2P_NETWORK_TIMEOUT)
  File "...\Python39\lib\asyncio\tasks.py", line 481, in wait_for
    return fut.result()
  File "...\electrum\electrum\lnpeer.py", line 673, in wrapper
    return await func(self, *args, **kwargs)
  File "...\electrum\electrum\lnpeer.py", line 755, in channel_establishment_flow
    payload = await self.wait_for_message('accept_channel', temp_channel_id)
  File "...\electrum\electrum\lnpeer.py", line 326, in wait_for_message
    raise GracefulDisconnect(
electrum.interface.GracefulDisconnect: remote peer sent error [DO NOT TRUST THIS MESSAGE]: invalid funding_satoshis=10000 sat (min=400000 sat max=1500000000 sat)
I/P | lnpeer.Peer.[LNWallet, 03933884aa-ff3a866f] | Disconnecting: GracefulDisconnect()
```
patch-4
SomberNight 3 years ago
parent
commit
a92dede490
No known key found for this signature in database GPG Key ID: B33B5F232C6271E9
  1. 1
      electrum/gui/qt/main_window.py
  2. 91
      electrum/lnpeer.py

1
electrum/gui/qt/main_window.py

@ -1880,6 +1880,7 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger):
password=password)
def on_failure(exc_info):
type_, e, traceback = exc_info
#self.logger.error("Could not open channel", exc_info=exc_info)
self.show_error(_('Could not open channel: {}').format(repr(e)))
WaitingDialog(self, _('Opening channel...'), task, self.on_open_channel_success, on_failure)

91
electrum/lnpeer.py

@ -99,7 +99,7 @@ class Peer(Logger):
# gossip uses a single queue to preserve message order
self.gossip_queue = asyncio.Queue()
self.ordered_message_queues = defaultdict(asyncio.Queue) # for messages that are ordered
self.temp_id_to_id = {} # to forward error messages
self.temp_id_to_id = {} # type: Dict[bytes, Optional[bytes]] # to forward error messages
self.funding_created_sent = set() # for channels in PREOPENING
self.funding_signed_sent = set() # for channels in PREOPENING
self.shutdown_received = {} # chan_id -> asyncio.Future()
@ -224,36 +224,42 @@ class Peer(Logger):
if asyncio.iscoroutinefunction(f):
asyncio.ensure_future(self.taskgroup.spawn(execution_result))
def _get_channel_ids(self, channel_id):
# if channel_id is all zero: MUST fail all channels with the sending node.
# otherwise: MUST fail the channel referred to by channel_id, if that channel is with the sending node.
# if no existing channel is referred to by `channel_id: MUST ignore the message.
if channel_id == bytes(32):
return self.channels.keys()
elif channel_id in self.temp_id_to_id:
return [self.temp_id_to_id[channel_id]]
elif channel_id in self.channels:
return [channel_id]
else:
return []
def on_warning(self, payload):
# TODO: we could need some reconnection logic here -> delayed reconnect
self.logger.info(f"remote peer sent warning [DO NOT TRUST THIS MESSAGE]: {payload['data'].decode('ascii')}")
channel_ids = self._get_channel_ids(payload.get("channel_id"))
for cid in channel_ids:
self.ordered_message_queues[cid].put_nowait((None, {'warning': payload['data']}))
if channel_ids:
raise GracefulDisconnect
chan_id = payload.get("channel_id")
self.logger.info(f"remote peer sent warning [DO NOT TRUST THIS MESSAGE]: "
f"{payload['data'].decode('ascii')}. chan_id={chan_id.hex()}")
if chan_id in self.channels:
self.ordered_message_queues[chan_id].put_nowait((None, {'warning': payload['data']}))
elif chan_id in self.temp_id_to_id:
chan_id = self.temp_id_to_id[chan_id] or chan_id
self.ordered_message_queues[chan_id].put_nowait((None, {'warning': payload['data']}))
else:
# if no existing channel is referred to by channel_id:
# - MUST ignore the message.
return
raise GracefulDisconnect
def on_error(self, payload):
self.logger.info(f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: {payload['data'].decode('ascii')}")
channel_ids = self._get_channel_ids(payload.get("channel_id"))
for cid in channel_ids:
self.schedule_force_closing(cid)
self.ordered_message_queues[cid].put_nowait((None, {'error': payload['data']}))
if channel_ids:
raise GracefulDisconnect
chan_id = payload.get("channel_id")
self.logger.info(f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: "
f"{payload['data'].decode('ascii')}. chan_id={chan_id.hex()}")
if chan_id in self.channels:
self.schedule_force_closing(chan_id)
self.ordered_message_queues[chan_id].put_nowait((None, {'error': payload['data']}))
elif chan_id in self.temp_id_to_id:
chan_id = self.temp_id_to_id[chan_id] or chan_id
self.ordered_message_queues[chan_id].put_nowait((None, {'error': payload['data']}))
elif chan_id == bytes(32):
# if channel_id is all zero:
# - MUST fail all channels with the sending node.
for cid in self.channels:
self.schedule_force_closing(cid)
self.ordered_message_queues[cid].put_nowait((None, {'error': payload['data']}))
else:
# if no existing channel is referred to by channel_id:
# - MUST ignore the message.
return
raise GracefulDisconnect
async def send_warning(self, channel_id: bytes, message: str = None, *, close_connection=True):
"""Sends a warning and disconnects if close_connection.
@ -298,8 +304,11 @@ class Peer(Logger):
# MUST fail the channel(s) referred to by the error message:
# we may violate this with force_close_channel
if force_close_channel:
for cid in self._get_channel_ids(channel_id):
if channel_id in self.channels:
self.schedule_force_closing(channel_id)
elif channel_id == bytes(32):
for cid in self.channels:
self.schedule_force_closing(cid)
raise GracefulDisconnect
def on_ping(self, payload):
@ -310,11 +319,15 @@ class Peer(Logger):
pass
async def wait_for_message(self, expected_name, channel_id):
# errors and warnings are sent to the queue with name set to None, so that this task terminates
q = self.ordered_message_queues[channel_id]
name, payload = await asyncio.wait_for(q.get(), LN_P2P_NETWORK_TIMEOUT)
if name is None:
raise GracefulDisconnect
# raise exceptions for errors/warnings, so that the caller sees them
if payload.get('error'):
raise GracefulDisconnect(
f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: {payload['error'].decode('ascii')}")
elif payload.get('warning'):
raise GracefulDisconnect(
f"remote peer sent warning [DO NOT TRUST THIS MESSAGE]: {payload['warning'].decode('ascii')}")
if name != expected_name:
raise Exception(f"Received unexpected '{name}'")
return payload
@ -663,7 +676,6 @@ class Peer(Logger):
self.lnworker.wallet.set_reserved_state_of_address(addr, reserved=False)
return wrapper
@log_exceptions
@temporarily_reserve_funding_tx_change_address
async def channel_establishment_flow(
self, *,
@ -714,6 +726,10 @@ class Peer(Logger):
)
per_commitment_point_first = secret_to_pubkey(
int.from_bytes(per_commitment_secret_first, 'big'))
# store the temp id now, so that it is recognized for e.g. 'error' messages
# TODO: this is never cleaned up; the dict grows unbounded until disconnect
self.temp_id_to_id[temp_channel_id] = None
self.send_message(
"open_channel",
temporary_channel_id=temp_channel_id,
@ -897,6 +913,9 @@ class Peer(Logger):
push_msat = payload['push_msat']
feerate = payload['feerate_per_kw'] # note: we are not validating this
temp_chan_id = payload['temporary_channel_id']
# store the temp id now, so that it is recognized for e.g. 'error' messages
# TODO: this is never cleaned up; the dict grows unbounded until disconnect
self.temp_id_to_id[temp_chan_id] = None
open_channel_tlvs = payload.get('open_channel_tlvs')
channel_type = open_channel_tlvs.get('channel_type') if open_channel_tlvs else None
@ -1018,6 +1037,7 @@ class Peer(Logger):
channel_id=channel_id,
signature=sig_64,
)
self.temp_id_to_id[temp_chan_id] = channel_id
self.funding_signed_sent.add(chan.channel_id)
chan.open_with_first_pcp(payload['first_per_commitment_point'], remote_sig)
chan.set_state(ChannelState.OPENING)
@ -1040,7 +1060,10 @@ class Peer(Logger):
channels_with_peer.extend(self.temp_id_to_id.values())
if channel_id not in channels_with_peer:
raise ValueError(f"channel {channel_id.hex()} does not belong to this peer")
self.lnworker.schedule_force_closing(channel_id)
if channel_id in self.channels:
self.lnworker.schedule_force_closing(channel_id)
else:
self.logger.warning(f"tried to force-close channel {channel_id.hex()} but it is not in self.channels yet")
def on_channel_reestablish(self, chan, msg):
their_next_local_ctn = msg["next_commitment_number"]

Loading…
Cancel
Save