|
|
@ -99,7 +99,7 @@ class Peer(Logger): |
|
|
|
# gossip uses a single queue to preserve message order |
|
|
|
self.gossip_queue = asyncio.Queue() |
|
|
|
self.ordered_message_queues = defaultdict(asyncio.Queue) # for messages that are ordered |
|
|
|
self.temp_id_to_id = {} # to forward error messages |
|
|
|
self.temp_id_to_id = {} # type: Dict[bytes, Optional[bytes]] # to forward error messages |
|
|
|
self.funding_created_sent = set() # for channels in PREOPENING |
|
|
|
self.funding_signed_sent = set() # for channels in PREOPENING |
|
|
|
self.shutdown_received = {} # chan_id -> asyncio.Future() |
|
|
@ -224,36 +224,42 @@ class Peer(Logger): |
|
|
|
if asyncio.iscoroutinefunction(f): |
|
|
|
asyncio.ensure_future(self.taskgroup.spawn(execution_result)) |
|
|
|
|
|
|
|
def _get_channel_ids(self, channel_id): |
|
|
|
# if channel_id is all zero: MUST fail all channels with the sending node. |
|
|
|
# otherwise: MUST fail the channel referred to by channel_id, if that channel is with the sending node. |
|
|
|
# if no existing channel is referred to by `channel_id: MUST ignore the message. |
|
|
|
if channel_id == bytes(32): |
|
|
|
return self.channels.keys() |
|
|
|
elif channel_id in self.temp_id_to_id: |
|
|
|
return [self.temp_id_to_id[channel_id]] |
|
|
|
elif channel_id in self.channels: |
|
|
|
return [channel_id] |
|
|
|
else: |
|
|
|
return [] |
|
|
|
|
|
|
|
def on_warning(self, payload): |
|
|
|
# TODO: we could need some reconnection logic here -> delayed reconnect |
|
|
|
self.logger.info(f"remote peer sent warning [DO NOT TRUST THIS MESSAGE]: {payload['data'].decode('ascii')}") |
|
|
|
channel_ids = self._get_channel_ids(payload.get("channel_id")) |
|
|
|
for cid in channel_ids: |
|
|
|
self.ordered_message_queues[cid].put_nowait((None, {'warning': payload['data']})) |
|
|
|
if channel_ids: |
|
|
|
raise GracefulDisconnect |
|
|
|
chan_id = payload.get("channel_id") |
|
|
|
self.logger.info(f"remote peer sent warning [DO NOT TRUST THIS MESSAGE]: " |
|
|
|
f"{payload['data'].decode('ascii')}. chan_id={chan_id.hex()}") |
|
|
|
if chan_id in self.channels: |
|
|
|
self.ordered_message_queues[chan_id].put_nowait((None, {'warning': payload['data']})) |
|
|
|
elif chan_id in self.temp_id_to_id: |
|
|
|
chan_id = self.temp_id_to_id[chan_id] or chan_id |
|
|
|
self.ordered_message_queues[chan_id].put_nowait((None, {'warning': payload['data']})) |
|
|
|
else: |
|
|
|
# if no existing channel is referred to by channel_id: |
|
|
|
# - MUST ignore the message. |
|
|
|
return |
|
|
|
raise GracefulDisconnect |
|
|
|
|
|
|
|
def on_error(self, payload): |
|
|
|
self.logger.info(f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: {payload['data'].decode('ascii')}") |
|
|
|
channel_ids = self._get_channel_ids(payload.get("channel_id")) |
|
|
|
for cid in channel_ids: |
|
|
|
self.schedule_force_closing(cid) |
|
|
|
self.ordered_message_queues[cid].put_nowait((None, {'error': payload['data']})) |
|
|
|
if channel_ids: |
|
|
|
raise GracefulDisconnect |
|
|
|
chan_id = payload.get("channel_id") |
|
|
|
self.logger.info(f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: " |
|
|
|
f"{payload['data'].decode('ascii')}. chan_id={chan_id.hex()}") |
|
|
|
if chan_id in self.channels: |
|
|
|
self.schedule_force_closing(chan_id) |
|
|
|
self.ordered_message_queues[chan_id].put_nowait((None, {'error': payload['data']})) |
|
|
|
elif chan_id in self.temp_id_to_id: |
|
|
|
chan_id = self.temp_id_to_id[chan_id] or chan_id |
|
|
|
self.ordered_message_queues[chan_id].put_nowait((None, {'error': payload['data']})) |
|
|
|
elif chan_id == bytes(32): |
|
|
|
# if channel_id is all zero: |
|
|
|
# - MUST fail all channels with the sending node. |
|
|
|
for cid in self.channels: |
|
|
|
self.schedule_force_closing(cid) |
|
|
|
self.ordered_message_queues[cid].put_nowait((None, {'error': payload['data']})) |
|
|
|
else: |
|
|
|
# if no existing channel is referred to by channel_id: |
|
|
|
# - MUST ignore the message. |
|
|
|
return |
|
|
|
raise GracefulDisconnect |
|
|
|
|
|
|
|
async def send_warning(self, channel_id: bytes, message: str = None, *, close_connection=True): |
|
|
|
"""Sends a warning and disconnects if close_connection. |
|
|
@ -298,8 +304,11 @@ class Peer(Logger): |
|
|
|
# MUST fail the channel(s) referred to by the error message: |
|
|
|
# we may violate this with force_close_channel |
|
|
|
if force_close_channel: |
|
|
|
for cid in self._get_channel_ids(channel_id): |
|
|
|
if channel_id in self.channels: |
|
|
|
self.schedule_force_closing(channel_id) |
|
|
|
elif channel_id == bytes(32): |
|
|
|
for cid in self.channels: |
|
|
|
self.schedule_force_closing(cid) |
|
|
|
raise GracefulDisconnect |
|
|
|
|
|
|
|
def on_ping(self, payload): |
|
|
@ -310,11 +319,15 @@ class Peer(Logger): |
|
|
|
pass |
|
|
|
|
|
|
|
async def wait_for_message(self, expected_name, channel_id): |
|
|
|
# errors and warnings are sent to the queue with name set to None, so that this task terminates |
|
|
|
q = self.ordered_message_queues[channel_id] |
|
|
|
name, payload = await asyncio.wait_for(q.get(), LN_P2P_NETWORK_TIMEOUT) |
|
|
|
if name is None: |
|
|
|
raise GracefulDisconnect |
|
|
|
# raise exceptions for errors/warnings, so that the caller sees them |
|
|
|
if payload.get('error'): |
|
|
|
raise GracefulDisconnect( |
|
|
|
f"remote peer sent error [DO NOT TRUST THIS MESSAGE]: {payload['error'].decode('ascii')}") |
|
|
|
elif payload.get('warning'): |
|
|
|
raise GracefulDisconnect( |
|
|
|
f"remote peer sent warning [DO NOT TRUST THIS MESSAGE]: {payload['warning'].decode('ascii')}") |
|
|
|
if name != expected_name: |
|
|
|
raise Exception(f"Received unexpected '{name}'") |
|
|
|
return payload |
|
|
@ -663,7 +676,6 @@ class Peer(Logger): |
|
|
|
self.lnworker.wallet.set_reserved_state_of_address(addr, reserved=False) |
|
|
|
return wrapper |
|
|
|
|
|
|
|
@log_exceptions |
|
|
|
@temporarily_reserve_funding_tx_change_address |
|
|
|
async def channel_establishment_flow( |
|
|
|
self, *, |
|
|
@ -714,6 +726,10 @@ class Peer(Logger): |
|
|
|
) |
|
|
|
per_commitment_point_first = secret_to_pubkey( |
|
|
|
int.from_bytes(per_commitment_secret_first, 'big')) |
|
|
|
|
|
|
|
# store the temp id now, so that it is recognized for e.g. 'error' messages |
|
|
|
# TODO: this is never cleaned up; the dict grows unbounded until disconnect |
|
|
|
self.temp_id_to_id[temp_channel_id] = None |
|
|
|
self.send_message( |
|
|
|
"open_channel", |
|
|
|
temporary_channel_id=temp_channel_id, |
|
|
@ -897,6 +913,9 @@ class Peer(Logger): |
|
|
|
push_msat = payload['push_msat'] |
|
|
|
feerate = payload['feerate_per_kw'] # note: we are not validating this |
|
|
|
temp_chan_id = payload['temporary_channel_id'] |
|
|
|
# store the temp id now, so that it is recognized for e.g. 'error' messages |
|
|
|
# TODO: this is never cleaned up; the dict grows unbounded until disconnect |
|
|
|
self.temp_id_to_id[temp_chan_id] = None |
|
|
|
|
|
|
|
open_channel_tlvs = payload.get('open_channel_tlvs') |
|
|
|
channel_type = open_channel_tlvs.get('channel_type') if open_channel_tlvs else None |
|
|
@ -1018,6 +1037,7 @@ class Peer(Logger): |
|
|
|
channel_id=channel_id, |
|
|
|
signature=sig_64, |
|
|
|
) |
|
|
|
self.temp_id_to_id[temp_chan_id] = channel_id |
|
|
|
self.funding_signed_sent.add(chan.channel_id) |
|
|
|
chan.open_with_first_pcp(payload['first_per_commitment_point'], remote_sig) |
|
|
|
chan.set_state(ChannelState.OPENING) |
|
|
@ -1040,7 +1060,10 @@ class Peer(Logger): |
|
|
|
channels_with_peer.extend(self.temp_id_to_id.values()) |
|
|
|
if channel_id not in channels_with_peer: |
|
|
|
raise ValueError(f"channel {channel_id.hex()} does not belong to this peer") |
|
|
|
self.lnworker.schedule_force_closing(channel_id) |
|
|
|
if channel_id in self.channels: |
|
|
|
self.lnworker.schedule_force_closing(channel_id) |
|
|
|
else: |
|
|
|
self.logger.warning(f"tried to force-close channel {channel_id.hex()} but it is not in self.channels yet") |
|
|
|
|
|
|
|
def on_channel_reestablish(self, chan, msg): |
|
|
|
their_next_local_ctn = msg["next_commitment_number"] |
|
|
|