diff --git a/electrum/lnchannel.py b/electrum/lnchannel.py index 810b937b8..52a1b6d4e 100644 --- a/electrum/lnchannel.py +++ b/electrum/lnchannel.py @@ -594,6 +594,9 @@ class Channel(PrintError): feerate = self.constraints.feerate return self.make_commitment(subject, this_point, ctn, feerate, False) + def get_current_ctn(self, subject): + return self.config[subject].ctn + def total_msat(self, direction): assert type(direction) is Direction sub = LOCAL if direction == SENT else REMOTE diff --git a/electrum/lnpeer.py b/electrum/lnpeer.py index c9e9e1d61..b898568dc 100644 --- a/electrum/lnpeer.py +++ b/electrum/lnpeer.py @@ -68,8 +68,6 @@ class Peer(PrintError): self.channel_reestablished = defaultdict(asyncio.Future) self.funding_signed = defaultdict(asyncio.Queue) self.funding_created = defaultdict(asyncio.Queue) - self.revoke_and_ack = defaultdict(asyncio.Queue) - self.commitment_signed = defaultdict(asyncio.Queue) self.announcement_signatures = defaultdict(asyncio.Queue) self.closing_signed = defaultdict(asyncio.Queue) self.payment_preimages = defaultdict(asyncio.Queue) @@ -79,10 +77,11 @@ class Peer(PrintError): self.localfeatures |= LnLocalFeatures.OPTION_DATA_LOSS_PROTECT_REQ self.attempted_route = {} self.orphan_channel_updates = OrderedDict() + self.pending_updates = defaultdict(bool) def send_message(self, message_name: str, **kwargs): assert type(message_name) is str - self.print_error("Sending '%s'"%message_name.upper()) + #self.print_error("Sending '%s'"%message_name.upper()) self.transport.send_bytes(encode_msg(message_name, **kwargs)) async def initialize(self): @@ -108,7 +107,7 @@ class Peer(PrintError): try: f = getattr(self, 'on_' + message_type) except AttributeError: - self.print_error("Received '%s'" % message_type.upper(), payload) + #self.print_error("Received '%s'" % message_type.upper(), payload) return # raw message is needed to check signature if message_type=='node_announcement': @@ -122,7 +121,7 @@ class Peer(PrintError): self.print_error("error", payload["data"].decode("ascii")) chan_id = payload.get("channel_id") for d in [ self.channel_accepted, self.funding_signed, - self.funding_created, self.revoke_and_ack, self.commitment_signed, + self.funding_created, self.announcement_signatures, self.closing_signed ]: if chan_id in d: d[chan_id].put_nowait({'error':payload['data']}) @@ -749,8 +748,7 @@ class Peer(PrintError): return h, node_signature, bitcoin_signature - @log_exceptions - async def on_update_fail_htlc(self, payload): + def on_update_fail_htlc(self, payload): channel_id = payload["channel_id"] htlc_id = int.from_bytes(payload["id"], "big") key = (channel_id, htlc_id) @@ -762,7 +760,7 @@ class Peer(PrintError): self.print_error("UPDATE_FAIL_HTLC. cannot decode! attempted route is MISSING. {}".format(key)) else: try: - await self._handle_error_code_from_failed_htlc(payload["reason"], route, channel_id, htlc_id) + self._handle_error_code_from_failed_htlc(payload["reason"], route, channel_id, htlc_id) except Exception: # exceptions are suppressed as failing to handle an error code # should not block us from removing the htlc @@ -770,10 +768,15 @@ class Peer(PrintError): # process update_fail_htlc on channel chan = self.channels[channel_id] chan.receive_fail_htlc(htlc_id) - await self.receive_and_revoke(chan) + local_ctn = chan.get_current_ctn(LOCAL) + asyncio.ensure_future(self._on_update_fail_htlc(chan, htlc_id, local_ctn)) + + @log_exceptions + async def _on_update_fail_htlc(self, chan, htlc_id, local_ctn): + await self.await_local(chan, local_ctn) self.network.trigger_callback('ln_message', self.lnworker, 'Payment failed', htlc_id) - async def _handle_error_code_from_failed_htlc(self, error_reason, route: List['RouteEdge'], channel_id, htlc_id): + def _handle_error_code_from_failed_htlc(self, error_reason, route: List['RouteEdge'], channel_id, htlc_id): chan = self.channels[channel_id] failure_msg, sender_idx = decode_onion_error(error_reason, [x.node_id for x in route], @@ -814,23 +817,22 @@ class Peer(PrintError): else: self.network.path_finder.blacklist.add(short_chan_id) - def send_commitment(self, chan: Channel): + def maybe_send_commitment(self, chan: Channel): + if not self.pending_updates[chan]: + return + self.print_error('send_commitment') sig_64, htlc_sigs = chan.sign_next_commitment() self.send_message("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs)) - return len(htlc_sigs) + self.pending_updates[chan] = False - async def send_and_revoke(self, chan: Channel): - """ generic channel update flow """ - self.send_commitment(chan) - await self.receive_revoke_and_ack(chan) - await self.receive_commitment(chan) - self.send_revoke_and_ack(chan) + async def await_remote(self, chan: Channel, ctn: int): + self.maybe_send_commitment(chan) + while chan.get_current_ctn(REMOTE) <= ctn: + await asyncio.sleep(0.1) - async def receive_and_revoke(self, chan: Channel): - await self.receive_commitment(chan) - self.send_revoke_and_ack(chan) - self.send_commitment(chan) - await self.receive_revoke_and_ack(chan) + async def await_local(self, chan: Channel, ctn: int): + while chan.get_current_ctn(LOCAL) <= ctn: + await asyncio.sleep(0.1) async def pay(self, route: List['RouteEdge'], chan: Channel, amount_msat: int, payment_hash: bytes, min_final_cltv_expiry: int): @@ -845,6 +847,7 @@ class Peer(PrintError): # create htlc htlc = {'amount_msat':amount_msat, 'payment_hash':payment_hash, 'cltv_expiry':cltv} htlc_id = chan.add_htlc(htlc) + remote_ctn = chan.get_current_ctn(REMOTE) chan.onion_keys[htlc_id] = secret_key self.attempted_route[(chan.channel_id, htlc_id)] = route self.print_error(f"starting payment. route: {route}") @@ -855,14 +858,10 @@ class Peer(PrintError): amount_msat=amount_msat, payment_hash=payment_hash, onion_routing_packet=onion.to_bytes()) - await self.send_and_revoke(chan) + self.pending_updates[chan] = True + await self.await_remote(chan, remote_ctn) return UpdateAddHtlc(**htlc, htlc_id=htlc_id) - async def receive_revoke_and_ack(self, chan: Channel): - revoke_and_ack_msg = await self.revoke_and_ack[chan.channel_id].get() - chan.receive_revocation(RevokeAndAck(revoke_and_ack_msg["per_commitment_secret"], revoke_and_ack_msg["next_per_commitment_point"])) - self.lnworker.save_channel(chan) - def send_revoke_and_ack(self, chan: Channel): rev, _ = chan.revoke_current_commitment() self.lnworker.save_channel(chan) @@ -871,36 +870,34 @@ class Peer(PrintError): per_commitment_secret=rev.per_commitment_secret, next_per_commitment_point=rev.next_per_commitment_point) - async def receive_commitment(self, chan: Channel, commitment_signed_msg=None): - if commitment_signed_msg is None: - commitment_signed_msg = await self.commitment_signed[chan.channel_id].get() - data = commitment_signed_msg["htlc_signature"] - htlc_sigs = [data[i:i+64] for i in range(0, len(data), 64)] - chan.receive_new_commitment(commitment_signed_msg["signature"], htlc_sigs) - return len(htlc_sigs) - def on_commitment_signed(self, payload): - self.print_error("commitment_signed", payload) + self.print_error("on_commitment_signed") channel_id = payload['channel_id'] - self.commitment_signed[channel_id].put_nowait(payload) + chan = self.channels[channel_id] + data = payload["htlc_signature"] + htlc_sigs = [data[i:i+64] for i in range(0, len(data), 64)] + chan.receive_new_commitment(payload["signature"], htlc_sigs) + self.send_revoke_and_ack(chan) - @log_exceptions - async def on_update_fulfill_htlc(self, update_fulfill_htlc_msg): + def on_update_fulfill_htlc(self, update_fulfill_htlc_msg): self.print_error("update_fulfill") chan = self.channels[update_fulfill_htlc_msg["channel_id"]] preimage = update_fulfill_htlc_msg["payment_preimage"] htlc_id = int.from_bytes(update_fulfill_htlc_msg["id"], "big") chan.receive_htlc_settle(preimage, htlc_id) - await self.receive_and_revoke(chan) + local_ctn = chan.get_current_ctn(LOCAL) + asyncio.ensure_future(self._on_update_fulfill_htlc(chan, htlc_id, preimage, local_ctn)) + + @log_exceptions + async def _on_update_fulfill_htlc(self, chan, htlc_id, preimage, local_ctn): + await self.await_local(chan, local_ctn) self.network.trigger_callback('ln_message', self.lnworker, 'Payment sent', htlc_id) - # used in lightning-integration self.payment_preimages[sha256(preimage)].put_nowait(preimage) def on_update_fail_malformed_htlc(self, payload): self.print_error("error", payload["data"].decode("ascii")) - @log_exceptions - async def on_update_add_htlc(self, payload): + def on_update_add_htlc(self, payload): # no onion routing for the moment: we assume we are the end node self.print_error('on_update_add_htlc') # check if this in our list of requests @@ -919,7 +916,12 @@ class Peer(PrintError): # add htlc htlc = {'amount_msat': amount_msat_htlc, 'payment_hash':payment_hash, 'cltv_expiry':cltv_expiry} htlc_id = chan.receive_htlc(htlc) - await self.receive_and_revoke(chan) + local_ctn = chan.get_current_ctn(LOCAL) + asyncio.ensure_future(self._on_update_add_htlc(chan, local_ctn, htlc_id, htlc, payment_hash, cltv_expiry, amount_msat_htlc, processed_onion)) + + @log_exceptions + async def _on_update_add_htlc(self, chan, local_ctn, htlc_id, htlc, payment_hash, cltv_expiry, amount_msat_htlc, processed_onion): + await self.await_local(chan, local_ctn) # Forward HTLC # FIXME: this is not robust to us going offline before payment is fulfilled if not processed_onion.are_we_final: @@ -936,6 +938,7 @@ class Peer(PrintError): next_amount_msat_htlc = int.from_bytes(dph.amt_to_forward, 'big') next_htlc = {'amount_msat':next_amount_msat_htlc, 'payment_hash':payment_hash, 'cltv_expiry':next_cltv_expiry} next_htlc_id = next_chan.add_htlc(next_htlc) + next_remote_ctn = next_chan.get_current_ctn(REMOTE) next_peer.send_message( "update_add_htlc", channel_id=next_chan.channel_id, @@ -945,7 +948,8 @@ class Peer(PrintError): payment_hash=payment_hash, onion_routing_packet=processed_onion.next_packet.to_bytes() ) - await next_peer.send_and_revoke(next_chan) + next_peer.pending_updates[next_chan] = True + await next_peer.await_remote(next_chan, next_remote_ctn) # wait until we get paid preimage = await next_peer.payment_preimages[payment_hash].get() # fulfill the original htlc @@ -989,29 +993,35 @@ class Peer(PrintError): async def fulfill_htlc(self, chan: Channel, htlc_id: int, preimage: bytes): chan.settle_htlc(preimage, htlc_id) + remote_ctn = chan.get_current_ctn(REMOTE) self.send_message("update_fulfill_htlc", channel_id=chan.channel_id, id=htlc_id, payment_preimage=preimage) - await self.send_and_revoke(chan) + self.pending_updates[chan] = True + await self.await_remote(chan, remote_ctn) self.network.trigger_callback('ln_message', self.lnworker, 'Payment received', htlc_id) async def fail_htlc(self, chan: Channel, htlc_id: int, onion_packet: OnionPacket, reason: OnionRoutingFailureMessage): self.print_error(f"failing received htlc {(bh2u(chan.channel_id), htlc_id)}. reason: {reason}") chan.fail_htlc(htlc_id) + remote_ctn = chan.get_current_ctn(REMOTE) error_packet = construct_onion_error(reason, onion_packet, our_onion_private_key=self.privkey) self.send_message("update_fail_htlc", channel_id=chan.channel_id, id=htlc_id, len=len(error_packet), reason=error_packet) - await self.send_and_revoke(chan) + self.pending_updates[chan] = True + await self.await_remote(chan, remote_ctn) def on_revoke_and_ack(self, payload): - self.print_error("got revoke_and_ack") + self.print_error("on_revoke_and_ack") channel_id = payload["channel_id"] - self.revoke_and_ack[channel_id].put_nowait(payload) + chan = self.channels[channel_id] + chan.receive_revocation(RevokeAndAck(payload["per_commitment_secret"], payload["next_per_commitment_point"])) + self.lnworker.save_channel(chan) def on_update_fee(self, payload): channel_id = payload["channel_id"] @@ -1036,10 +1046,12 @@ class Peer(PrintError): else: return chan.update_fee(feerate_per_kw, True) + remote_ctn = chan.get_current_ctn(REMOTE) self.send_message("update_fee", channel_id=chan.channel_id, feerate_per_kw=feerate_per_kw) - await self.send_and_revoke(chan) + self.pending_updates[chan] = True + await self.await_remote(chan, remote_ctn) def on_closing_signed(self, payload): chan_id = payload["channel_id"]