Browse Source

Refactor LNPeer in order to support HTLC forwarding:

1. Do not perform channel updates in coroutines, because they would get executed in random order.
 2. After applying channel updates, wait only for the relevant commitment (local or remote) and not for both, because local and remote might be out of sync (BOLT 2).
 3. When waiting for a commitment, wait until a given ctn has been reached, because a queue cannot be shared by several coroutines
regtest_lnd
ThomasV 6 years ago
committed by SomberNight
parent
commit
6dd84c238e
No known key found for this signature in database GPG Key ID: B33B5F232C6271E9
  1. 3
      electrum/lnchannel.py
  2. 116
      electrum/lnpeer.py

3
electrum/lnchannel.py

@ -594,6 +594,9 @@ class Channel(PrintError):
feerate = self.constraints.feerate
return self.make_commitment(subject, this_point, ctn, feerate, False)
def get_current_ctn(self, subject):
return self.config[subject].ctn
def total_msat(self, direction):
assert type(direction) is Direction
sub = LOCAL if direction == SENT else REMOTE

116
electrum/lnpeer.py

@ -68,8 +68,6 @@ class Peer(PrintError):
self.channel_reestablished = defaultdict(asyncio.Future)
self.funding_signed = defaultdict(asyncio.Queue)
self.funding_created = defaultdict(asyncio.Queue)
self.revoke_and_ack = defaultdict(asyncio.Queue)
self.commitment_signed = defaultdict(asyncio.Queue)
self.announcement_signatures = defaultdict(asyncio.Queue)
self.closing_signed = defaultdict(asyncio.Queue)
self.payment_preimages = defaultdict(asyncio.Queue)
@ -79,10 +77,11 @@ class Peer(PrintError):
self.localfeatures |= LnLocalFeatures.OPTION_DATA_LOSS_PROTECT_REQ
self.attempted_route = {}
self.orphan_channel_updates = OrderedDict()
self.pending_updates = defaultdict(bool)
def send_message(self, message_name: str, **kwargs):
assert type(message_name) is str
self.print_error("Sending '%s'"%message_name.upper())
#self.print_error("Sending '%s'"%message_name.upper())
self.transport.send_bytes(encode_msg(message_name, **kwargs))
async def initialize(self):
@ -108,7 +107,7 @@ class Peer(PrintError):
try:
f = getattr(self, 'on_' + message_type)
except AttributeError:
self.print_error("Received '%s'" % message_type.upper(), payload)
#self.print_error("Received '%s'" % message_type.upper(), payload)
return
# raw message is needed to check signature
if message_type=='node_announcement':
@ -122,7 +121,7 @@ class Peer(PrintError):
self.print_error("error", payload["data"].decode("ascii"))
chan_id = payload.get("channel_id")
for d in [ self.channel_accepted, self.funding_signed,
self.funding_created, self.revoke_and_ack, self.commitment_signed,
self.funding_created,
self.announcement_signatures, self.closing_signed ]:
if chan_id in d:
d[chan_id].put_nowait({'error':payload['data']})
@ -749,8 +748,7 @@ class Peer(PrintError):
return h, node_signature, bitcoin_signature
@log_exceptions
async def on_update_fail_htlc(self, payload):
def on_update_fail_htlc(self, payload):
channel_id = payload["channel_id"]
htlc_id = int.from_bytes(payload["id"], "big")
key = (channel_id, htlc_id)
@ -762,7 +760,7 @@ class Peer(PrintError):
self.print_error("UPDATE_FAIL_HTLC. cannot decode! attempted route is MISSING. {}".format(key))
else:
try:
await self._handle_error_code_from_failed_htlc(payload["reason"], route, channel_id, htlc_id)
self._handle_error_code_from_failed_htlc(payload["reason"], route, channel_id, htlc_id)
except Exception:
# exceptions are suppressed as failing to handle an error code
# should not block us from removing the htlc
@ -770,10 +768,15 @@ class Peer(PrintError):
# process update_fail_htlc on channel
chan = self.channels[channel_id]
chan.receive_fail_htlc(htlc_id)
await self.receive_and_revoke(chan)
local_ctn = chan.get_current_ctn(LOCAL)
asyncio.ensure_future(self._on_update_fail_htlc(chan, htlc_id, local_ctn))
@log_exceptions
async def _on_update_fail_htlc(self, chan, htlc_id, local_ctn):
await self.await_local(chan, local_ctn)
self.network.trigger_callback('ln_message', self.lnworker, 'Payment failed', htlc_id)
async def _handle_error_code_from_failed_htlc(self, error_reason, route: List['RouteEdge'], channel_id, htlc_id):
def _handle_error_code_from_failed_htlc(self, error_reason, route: List['RouteEdge'], channel_id, htlc_id):
chan = self.channels[channel_id]
failure_msg, sender_idx = decode_onion_error(error_reason,
[x.node_id for x in route],
@ -814,23 +817,22 @@ class Peer(PrintError):
else:
self.network.path_finder.blacklist.add(short_chan_id)
def send_commitment(self, chan: Channel):
def maybe_send_commitment(self, chan: Channel):
if not self.pending_updates[chan]:
return
self.print_error('send_commitment')
sig_64, htlc_sigs = chan.sign_next_commitment()
self.send_message("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs))
return len(htlc_sigs)
self.pending_updates[chan] = False
async def send_and_revoke(self, chan: Channel):
""" generic channel update flow """
self.send_commitment(chan)
await self.receive_revoke_and_ack(chan)
await self.receive_commitment(chan)
self.send_revoke_and_ack(chan)
async def await_remote(self, chan: Channel, ctn: int):
self.maybe_send_commitment(chan)
while chan.get_current_ctn(REMOTE) <= ctn:
await asyncio.sleep(0.1)
async def receive_and_revoke(self, chan: Channel):
await self.receive_commitment(chan)
self.send_revoke_and_ack(chan)
self.send_commitment(chan)
await self.receive_revoke_and_ack(chan)
async def await_local(self, chan: Channel, ctn: int):
while chan.get_current_ctn(LOCAL) <= ctn:
await asyncio.sleep(0.1)
async def pay(self, route: List['RouteEdge'], chan: Channel, amount_msat: int,
payment_hash: bytes, min_final_cltv_expiry: int):
@ -845,6 +847,7 @@ class Peer(PrintError):
# create htlc
htlc = {'amount_msat':amount_msat, 'payment_hash':payment_hash, 'cltv_expiry':cltv}
htlc_id = chan.add_htlc(htlc)
remote_ctn = chan.get_current_ctn(REMOTE)
chan.onion_keys[htlc_id] = secret_key
self.attempted_route[(chan.channel_id, htlc_id)] = route
self.print_error(f"starting payment. route: {route}")
@ -855,14 +858,10 @@ class Peer(PrintError):
amount_msat=amount_msat,
payment_hash=payment_hash,
onion_routing_packet=onion.to_bytes())
await self.send_and_revoke(chan)
self.pending_updates[chan] = True
await self.await_remote(chan, remote_ctn)
return UpdateAddHtlc(**htlc, htlc_id=htlc_id)
async def receive_revoke_and_ack(self, chan: Channel):
revoke_and_ack_msg = await self.revoke_and_ack[chan.channel_id].get()
chan.receive_revocation(RevokeAndAck(revoke_and_ack_msg["per_commitment_secret"], revoke_and_ack_msg["next_per_commitment_point"]))
self.lnworker.save_channel(chan)
def send_revoke_and_ack(self, chan: Channel):
rev, _ = chan.revoke_current_commitment()
self.lnworker.save_channel(chan)
@ -871,36 +870,34 @@ class Peer(PrintError):
per_commitment_secret=rev.per_commitment_secret,
next_per_commitment_point=rev.next_per_commitment_point)
async def receive_commitment(self, chan: Channel, commitment_signed_msg=None):
if commitment_signed_msg is None:
commitment_signed_msg = await self.commitment_signed[chan.channel_id].get()
data = commitment_signed_msg["htlc_signature"]
htlc_sigs = [data[i:i+64] for i in range(0, len(data), 64)]
chan.receive_new_commitment(commitment_signed_msg["signature"], htlc_sigs)
return len(htlc_sigs)
def on_commitment_signed(self, payload):
self.print_error("commitment_signed", payload)
self.print_error("on_commitment_signed")
channel_id = payload['channel_id']
self.commitment_signed[channel_id].put_nowait(payload)
chan = self.channels[channel_id]
data = payload["htlc_signature"]
htlc_sigs = [data[i:i+64] for i in range(0, len(data), 64)]
chan.receive_new_commitment(payload["signature"], htlc_sigs)
self.send_revoke_and_ack(chan)
@log_exceptions
async def on_update_fulfill_htlc(self, update_fulfill_htlc_msg):
def on_update_fulfill_htlc(self, update_fulfill_htlc_msg):
self.print_error("update_fulfill")
chan = self.channels[update_fulfill_htlc_msg["channel_id"]]
preimage = update_fulfill_htlc_msg["payment_preimage"]
htlc_id = int.from_bytes(update_fulfill_htlc_msg["id"], "big")
chan.receive_htlc_settle(preimage, htlc_id)
await self.receive_and_revoke(chan)
local_ctn = chan.get_current_ctn(LOCAL)
asyncio.ensure_future(self._on_update_fulfill_htlc(chan, htlc_id, preimage, local_ctn))
@log_exceptions
async def _on_update_fulfill_htlc(self, chan, htlc_id, preimage, local_ctn):
await self.await_local(chan, local_ctn)
self.network.trigger_callback('ln_message', self.lnworker, 'Payment sent', htlc_id)
# used in lightning-integration
self.payment_preimages[sha256(preimage)].put_nowait(preimage)
def on_update_fail_malformed_htlc(self, payload):
self.print_error("error", payload["data"].decode("ascii"))
@log_exceptions
async def on_update_add_htlc(self, payload):
def on_update_add_htlc(self, payload):
# no onion routing for the moment: we assume we are the end node
self.print_error('on_update_add_htlc')
# check if this in our list of requests
@ -919,7 +916,12 @@ class Peer(PrintError):
# add htlc
htlc = {'amount_msat': amount_msat_htlc, 'payment_hash':payment_hash, 'cltv_expiry':cltv_expiry}
htlc_id = chan.receive_htlc(htlc)
await self.receive_and_revoke(chan)
local_ctn = chan.get_current_ctn(LOCAL)
asyncio.ensure_future(self._on_update_add_htlc(chan, local_ctn, htlc_id, htlc, payment_hash, cltv_expiry, amount_msat_htlc, processed_onion))
@log_exceptions
async def _on_update_add_htlc(self, chan, local_ctn, htlc_id, htlc, payment_hash, cltv_expiry, amount_msat_htlc, processed_onion):
await self.await_local(chan, local_ctn)
# Forward HTLC
# FIXME: this is not robust to us going offline before payment is fulfilled
if not processed_onion.are_we_final:
@ -936,6 +938,7 @@ class Peer(PrintError):
next_amount_msat_htlc = int.from_bytes(dph.amt_to_forward, 'big')
next_htlc = {'amount_msat':next_amount_msat_htlc, 'payment_hash':payment_hash, 'cltv_expiry':next_cltv_expiry}
next_htlc_id = next_chan.add_htlc(next_htlc)
next_remote_ctn = next_chan.get_current_ctn(REMOTE)
next_peer.send_message(
"update_add_htlc",
channel_id=next_chan.channel_id,
@ -945,7 +948,8 @@ class Peer(PrintError):
payment_hash=payment_hash,
onion_routing_packet=processed_onion.next_packet.to_bytes()
)
await next_peer.send_and_revoke(next_chan)
next_peer.pending_updates[next_chan] = True
await next_peer.await_remote(next_chan, next_remote_ctn)
# wait until we get paid
preimage = await next_peer.payment_preimages[payment_hash].get()
# fulfill the original htlc
@ -989,29 +993,35 @@ class Peer(PrintError):
async def fulfill_htlc(self, chan: Channel, htlc_id: int, preimage: bytes):
chan.settle_htlc(preimage, htlc_id)
remote_ctn = chan.get_current_ctn(REMOTE)
self.send_message("update_fulfill_htlc",
channel_id=chan.channel_id,
id=htlc_id,
payment_preimage=preimage)
await self.send_and_revoke(chan)
self.pending_updates[chan] = True
await self.await_remote(chan, remote_ctn)
self.network.trigger_callback('ln_message', self.lnworker, 'Payment received', htlc_id)
async def fail_htlc(self, chan: Channel, htlc_id: int, onion_packet: OnionPacket,
reason: OnionRoutingFailureMessage):
self.print_error(f"failing received htlc {(bh2u(chan.channel_id), htlc_id)}. reason: {reason}")
chan.fail_htlc(htlc_id)
remote_ctn = chan.get_current_ctn(REMOTE)
error_packet = construct_onion_error(reason, onion_packet, our_onion_private_key=self.privkey)
self.send_message("update_fail_htlc",
channel_id=chan.channel_id,
id=htlc_id,
len=len(error_packet),
reason=error_packet)
await self.send_and_revoke(chan)
self.pending_updates[chan] = True
await self.await_remote(chan, remote_ctn)
def on_revoke_and_ack(self, payload):
self.print_error("got revoke_and_ack")
self.print_error("on_revoke_and_ack")
channel_id = payload["channel_id"]
self.revoke_and_ack[channel_id].put_nowait(payload)
chan = self.channels[channel_id]
chan.receive_revocation(RevokeAndAck(payload["per_commitment_secret"], payload["next_per_commitment_point"]))
self.lnworker.save_channel(chan)
def on_update_fee(self, payload):
channel_id = payload["channel_id"]
@ -1036,10 +1046,12 @@ class Peer(PrintError):
else:
return
chan.update_fee(feerate_per_kw, True)
remote_ctn = chan.get_current_ctn(REMOTE)
self.send_message("update_fee",
channel_id=chan.channel_id,
feerate_per_kw=feerate_per_kw)
await self.send_and_revoke(chan)
self.pending_updates[chan] = True
await self.await_remote(chan, remote_ctn)
def on_closing_signed(self, payload):
chan_id = payload["channel_id"]

Loading…
Cancel
Save