Browse Source

ln: restore channels correctly after restart

* save funding_locked_received: if a node already sent us
funding_locked, save it to avoid superfluous messages

* use Queues instead of Futures: this ensure that we don't error if we
receive two messages of the same type, and in avoids having to delete
futures in finally blocks. A queue monitor could be added to detect
queue elements that are not popped.

* request initial routing sync: since we don't store the graph yet, it
is better to request the graph from the Peer so that we can route

* channel_state cleanup: now each channel should have a state, which is
initialized to OPENING and only marked OPEN once we have verified that
the funding_tx has been mined
regtest_lnd
Janus 7 years ago
committed by SomberNight
parent
commit
44ccc39fc4
No known key found for this signature in database GPG Key ID: B33B5F232C6271E9
  1. 116
      lib/lnbase.py
  2. 60
      lib/lnworker.py

116
lib/lnbase.py

@ -273,7 +273,7 @@ ChannelConfig = namedtuple("ChannelConfig", [
"to_self_delay", "dust_limit_sat", "max_htlc_value_in_flight_msat", "max_accepted_htlcs"])
OnlyPubkeyKeypair = namedtuple("OnlyPubkeyKeypair", ["pubkey"])
RemoteState = namedtuple("RemoteState", ["ctn", "next_per_commitment_point", "amount_msat", "revocation_store", "last_per_commitment_point", "next_htlc_id"])
LocalState = namedtuple("LocalState", ["ctn", "per_commitment_secret_seed", "amount_msat", "next_htlc_id"])
LocalState = namedtuple("LocalState", ["ctn", "per_commitment_secret_seed", "amount_msat", "next_htlc_id", "funding_locked_received"])
ChannelConstraints = namedtuple("ChannelConstraints", ["feerate", "capacity", "is_initiator", "funding_txn_minimum_depth"])
OpenChannel = namedtuple("OpenChannel", ["channel_id", "short_channel_id", "funding_outpoint", "local_config", "remote_config", "remote_state", "local_state", "constraints", "node_id"])
@ -580,22 +580,12 @@ class Peer(PrintError):
self.path_finder = path_finder
self.read_buffer = b''
self.ping_time = 0
self.futures = ["channel_accepted",
"funding_signed",
"local_funding_locked",
"remote_funding_locked",
"revoke_and_ack",
"channel_reestablish",
"update_fulfill_htlc",
"commitment_signed"]
self.channel_accepted = defaultdict(asyncio.Future)
self.funding_signed = defaultdict(asyncio.Future)
self.local_funding_locked = defaultdict(asyncio.Future)
self.remote_funding_locked = defaultdict(asyncio.Future)
self.revoke_and_ack = defaultdict(asyncio.Future)
self.update_fulfill_htlc = defaultdict(asyncio.Future)
self.commitment_signed = defaultdict(asyncio.Future)
self.initialized = asyncio.Future()
self.channel_accepted = defaultdict(asyncio.Queue)
self.funding_signed = defaultdict(asyncio.Queue)
self.remote_funding_locked = defaultdict(asyncio.Queue)
self.revoke_and_ack = defaultdict(asyncio.Queue)
self.update_fulfill_htlc = defaultdict(asyncio.Queue)
self.commitment_signed = defaultdict(asyncio.Queue)
self.localfeatures = (0x08 if request_initial_sync else 0)
self.unfulfilled_htlcs = []
self.channel_state = channel_state
@ -703,11 +693,7 @@ class Peer(PrintError):
f(payload)
def on_error(self, payload):
for i in self.futures:
if payload["channel_id"] in getattr(self, i):
getattr(self, i)[payload["channel_id"]].set_exception(LightningError(payload["data"]))
return
self.print_error("no future found to resolve", payload)
self.print_error("error", payload)
def on_ping(self, payload):
l = int.from_bytes(payload['num_pong_bytes'], 'big')
@ -716,17 +702,17 @@ class Peer(PrintError):
def on_accept_channel(self, payload):
temp_chan_id = payload["temporary_channel_id"]
if temp_chan_id not in self.channel_accepted: raise Exception("Got unknown accept_channel")
self.channel_accepted[temp_chan_id].set_result(payload)
self.channel_accepted[temp_chan_id].put_nowait(payload)
def on_funding_signed(self, payload):
channel_id = int.from_bytes(payload['channel_id'], 'big')
if channel_id not in self.funding_signed: raise Exception("Got unknown funding_signed")
self.funding_signed[channel_id].set_result(payload)
self.funding_signed[channel_id].put_nowait(payload)
def on_funding_locked(self, payload):
channel_id = int.from_bytes(payload['channel_id'], 'big')
if channel_id not in self.funding_signed: print("Got unknown funding_locked", payload)
self.remote_funding_locked[channel_id].set_result(payload)
self.remote_funding_locked[channel_id].put_nowait(payload)
def on_node_announcement(self, payload):
pubkey = payload['node_id']
@ -785,8 +771,6 @@ class Peer(PrintError):
# read init
msg = await self.read_message()
self.process_message(msg)
# initialized
self.initialized.set_result(msg)
# reestablish channels
[await self.reestablish_channel(c) for c in self.channels]
# loop
@ -799,7 +783,6 @@ class Peer(PrintError):
self.writer.close()
async def channel_establishment_flow(self, wallet, config, password, funding_sat, push_msat, temp_channel_id):
await self.initialized
# see lnd/keychain/derivation.go
keyfamilymultisig = 0
keyfamilyrevocationbase = 1
@ -849,10 +832,7 @@ class Peer(PrintError):
channel_reserve_satoshis=10
)
self.send_message(msg)
try:
payload = await self.channel_accepted[temp_channel_id]
finally:
del self.channel_accepted[temp_channel_id]
payload = await self.channel_accepted[temp_channel_id].get()
remote_per_commitment_point = payload['first_per_commitment_point']
remote_config=ChannelConfig(
payment_basepoint=OnlyPubkeyKeypair(payload['payment_basepoint']),
@ -903,16 +883,12 @@ class Peer(PrintError):
sig_64 = sign_and_get_sig_string(remote_ctx, local_config, remote_config)
funding_txid_bytes = bytes.fromhex(funding_txid)[::-1]
channel_id = int.from_bytes(funding_txid_bytes, 'big') ^ funding_index
self.channel_state[channel_id] = "OPENING"
self.send_message(gen_msg("funding_created",
temporary_channel_id=temp_channel_id,
funding_txid=funding_txid_bytes,
funding_output_index=funding_index,
signature=sig_64))
try:
payload = await self.funding_signed[channel_id]
finally:
del self.funding_signed[channel_id]
payload = await self.funding_signed[channel_id].get()
self.print_error('received funding_signed')
remote_sig = payload['signature']
# verify remote signature
@ -949,14 +925,14 @@ class Peer(PrintError):
ctn = 0,
per_commitment_secret_seed=per_commitment_secret_seed,
amount_msat=local_amount,
next_htlc_id = 0
next_htlc_id = 0,
funding_locked_received = False
),
constraints=ChannelConstraints(capacity=funding_sat, feerate=local_feerate, is_initiator=True, funding_txn_minimum_depth=funding_txn_minimum_depth)
)
return chan
async def reestablish_channel(self, chan):
assert chan.channel_id not in self.channel_state
self.send_message(gen_msg("channel_reestablish",
channel_id=chan.channel_id,
next_local_commitment_number=chan.local_state.ctn+1,
@ -980,27 +956,21 @@ class Peer(PrintError):
raise Exception("expected local ctn {}, got {}".format(chan.local_state.ctn, local_ctn))
if channel_reestablish_msg["my_current_per_commitment_point"] != chan.remote_state.last_per_commitment_point:
raise Exception("Remote PCP mismatch")
self.channel_state[chan.channel_id] = "OPEN"
async def funding_locked(self, chan):
channel_id = chan.channel_id
try:
short_channel_id = await self.local_funding_locked[channel_id]
finally:
del self.local_funding_locked[channel_id]
short_channel_id = chan.short_channel_id
per_commitment_secret_index = 2**48 - 2
per_commitment_point_second = secret_to_pubkey(int.from_bytes(
get_per_commitment_secret_from_seed(chan.local_state.per_commitment_secret_seed, per_commitment_secret_index), 'big'))
self.send_message(gen_msg("funding_locked", channel_id=channel_id, next_per_commitment_point=per_commitment_point_second))
# wait until we receive funding_locked
try:
remote_funding_locked_msg = await self.remote_funding_locked[channel_id]
finally:
del self.remote_funding_locked[channel_id]
remote_funding_locked_msg = await self.remote_funding_locked[channel_id].get()
self.print_error('Done waiting for remote_funding_locked', remote_funding_locked_msg)
self.channel_state[chan.channel_id] = "OPEN"
return chan._replace(short_channel_id=short_channel_id, remote_state=chan.remote_state._replace(next_per_commitment_point=remote_funding_locked_msg["next_per_commitment_point"]))
new_remote_state = chan.remote_state._replace(next_per_commitment_point=remote_funding_locked_msg["next_per_commitment_point"])
new_local_state = chan.local_state._replace(funding_locked_received = True)
return chan._replace(short_channel_id=short_channel_id, remote_state=new_remote_state, local_state=new_local_state)
def on_update_fail_htlc(self, payload):
print("UPDATE_FAIL_HTLC", decode_onion_error(payload["reason"], self.node_keys, self.secret_key))
@ -1097,10 +1067,7 @@ class Peer(PrintError):
self.send_message(gen_msg("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=1, htlc_signature=htlc_sig))
try:
revoke_and_ack_msg = await self.revoke_and_ack[chan.channel_id]
finally:
del self.revoke_and_ack[chan.channel_id]
revoke_and_ack_msg = await self.revoke_and_ack[chan.channel_id].get()
# TODO check revoke_and_ack results
last_secret, _, next_point = derive_and_incr()
@ -1112,16 +1079,10 @@ class Peer(PrintError):
print("waiting for update_fulfill")
try:
update_fulfill_htlc_msg = await self.update_fulfill_htlc[chan.channel_id]
finally:
del self.update_fulfill_htlc[chan.channel_id]
update_fulfill_htlc_msg = await self.update_fulfill_htlc[chan.channel_id].get()
print("waiting for commitment_signed")
try:
commitment_signed_msg = await self.commitment_signed[chan.channel_id]
finally:
del self.commitment_signed[chan.channel_id]
commitment_signed_msg = await self.commitment_signed[chan.channel_id].get()
last_secret, _, next_point = derive_and_incr()
their_revstore.add_next_entry(last_secret)
@ -1139,10 +1100,7 @@ class Peer(PrintError):
self.send_message(gen_msg("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=0))
try:
revoke_and_ack_msg = await self.revoke_and_ack[chan.channel_id]
finally:
del self.revoke_and_ack[chan.channel_id]
revoke_and_ack_msg = await self.revoke_and_ack[chan.channel_id].get()
# TODO check revoke_and_ack results
return chan._replace(
@ -1182,10 +1140,7 @@ class Peer(PrintError):
their_revstore = chan.remote_state.revocation_store
channel_id = chan.channel_id
try:
commitment_signed_msg = await self.commitment_signed[channel_id]
finally:
del self.commitment_signed[channel_id]
commitment_signed_msg = await self.commitment_signed[channel_id].get()
if int.from_bytes(commitment_signed_msg["num_htlcs"], "big") < 1:
while len(self.unfulfilled_htlcs) < 1:
@ -1267,10 +1222,7 @@ class Peer(PrintError):
self.send_message(gen_msg("commitment_signed", channel_id=channel_id, signature=sig_64, num_htlcs=1, htlc_signature=htlc_sig))
try:
revoke_and_ack_msg = await self.revoke_and_ack[channel_id]
finally:
del self.revoke_and_ack[channel_id]
revoke_and_ack_msg = await self.revoke_and_ack[channel_id].get()
# TODO check revoke_and_ack_msg contents
@ -1285,17 +1237,11 @@ class Peer(PrintError):
sig_64 = sign_and_get_sig_string(bare_ctx, chan.local_config, chan.remote_config)
self.send_message(gen_msg("commitment_signed", channel_id=channel_id, signature=sig_64, num_htlcs=0))
try:
revoke_and_ack_msg = await self.revoke_and_ack[channel_id]
finally:
del self.revoke_and_ack[channel_id]
revoke_and_ack_msg = await self.revoke_and_ack[channel_id].get()
# TODO check revoke_and_ack results
try:
commitment_signed_msg = await self.commitment_signed[channel_id]
finally:
del self.commitment_signed[channel_id]
commitment_signed_msg = await self.commitment_signed[channel_id].get()
# TODO check commitment_signed results
@ -1325,11 +1271,11 @@ class Peer(PrintError):
def on_commitment_signed(self, payload):
self.print_error("commitment_signed", payload)
channel_id = int.from_bytes(payload['channel_id'], 'big')
self.commitment_signed[channel_id].set_result(payload)
self.commitment_signed[channel_id].put_nowait(payload)
def on_update_fulfill_htlc(self, payload):
channel_id = int.from_bytes(payload["channel_id"], 'big')
self.update_fulfill_htlc[channel_id].set_result(payload)
self.update_fulfill_htlc[channel_id].put_nowait(payload)
def on_update_fail_malformed_htlc(self, payload):
self.on_error(payload)
@ -1343,7 +1289,7 @@ class Peer(PrintError):
def on_revoke_and_ack(self, payload):
channel_id = int.from_bytes(payload["channel_id"], 'big')
self.revoke_and_ack[channel_id].set_result(payload)
self.revoke_and_ack[channel_id].put_nowait(payload)
def count_trailing_zeros(index):

60
lib/lnworker.py

@ -12,7 +12,7 @@ import asyncio
from . import constants
from .bitcoin import sha256, COIN
from .util import bh2u, bfh
from .util import bh2u, bfh, PrintError
from .constants import set_testnet, set_simnet
from .simple_config import SimpleConfig
from .network import Network
@ -86,7 +86,7 @@ node_list = [
class LNWorker:
class LNWorker(PrintError):
def __init__(self, wallet, network):
self.wallet = wallet
@ -100,7 +100,7 @@ class LNWorker:
self.path_finder = lnrouter.LNPathFinder(self.channel_db)
self.channels = [reconstruct_namedtuples(x) for x in wallet.storage.get("channels", {})]
peer_list = network.config.get('lightning_peers', node_list)
self.channel_state = {}
self.channel_state = {chan.channel_id: "OPENING" for chan in self.channels}
for host, port, pubkey in peer_list:
self.add_peer(host, int(port), pubkey)
# wait until we see confirmations
@ -110,18 +110,25 @@ class LNWorker:
def add_peer(self, host, port, pubkey):
node_id = bfh(pubkey)
channels = list(filter(lambda x: x.node_id == node_id, self.channels))
peer = Peer(host, int(port), node_id, self.privkey, self.network, self.channel_db, self.path_finder, self.channel_state, channels)
peer = Peer(host, int(port), node_id, self.privkey, self.network, self.channel_db, self.path_finder, self.channel_state, channels, request_initial_sync=True)
self.network.futures.append(asyncio.run_coroutine_threadsafe(peer.main_loop(), asyncio.get_event_loop()))
self.peers[node_id] = peer
def save_channel(self, openchannel):
if openchannel.channel_id not in self.channel_state:
self.channel_state[openchannel.channel_id] = "OPENING"
self.channels = [openchannel] # TODO multiple channels
dumped = serialize_channels(self.channels)
self.wallet.storage.put("channels", dumped)
self.wallet.storage.write()
def on_network_update(self, event, *args):
for chan in self.channels:
def save_short_chan_id(self, chan):
"""
Checks if the Funding TX has been mined. If it has save the short channel ID to disk and return the new OpenChannel.
If the Funding TX has not been mined, return None
"""
assert self.channel_state[chan.channel_id] == "OPENING"
peer = self.peers[chan.node_id]
conf = self.wallet.get_tx_height(chan.funding_outpoint.txid)[1]
if conf >= chan.constraints.funding_txn_minimum_depth:
@ -129,30 +136,39 @@ class LNWorker:
if tx_pos == -1:
self.print_error('funding tx is not yet SPV verified.. but there are '
'already enough confirmations (currently {})'.format(conf))
return
if chan.channel_id not in self.channel_state or self.channel_state[chan.channel_id] != "OPENING":
return
asyncio.run_coroutine_threadsafe(self.set_local_funding_locked_result(peer, chan, block_height, tx_pos), asyncio.get_event_loop())
return None
chan = chan._replace(short_channel_id = calc_short_channel_id(block_height, tx_pos, chan.funding_outpoint.output_index))
self.save_channel(chan)
return chan
return None
def on_network_update(self, event, *args):
for chan in self.channels:
if self.channel_state[chan.channel_id] == "OPEN":
continue
chan = self.save_short_chan_id(chan)
if not chan:
self.print_error("network update but funding tx is still not at sufficient depth")
continue
peer = self.peers[chan.node_id]
asyncio.run_coroutine_threadsafe(self.wait_funding_locked_and_mark_open(peer, chan), asyncio.get_event_loop())
# aiosafe because we don't wait for result
@aiosafe
async def set_local_funding_locked_result(self, peer, chan, block_height, tx_pos):
channel_id = chan.channel_id
short_channel_id = calc_short_channel_id(block_height, tx_pos, chan.funding_outpoint.output_index)
try:
peer.local_funding_locked[channel_id].set_result(short_channel_id)
except (asyncio.InvalidStateError, KeyError) as e:
# FIXME race condition if updates come in quickly, set_result might be called multiple times
# or self.local_funding_locked[channel_id] might be deleted already
self.print_error('local_funding_locked.set_result error for channel {}: {}'.format(channel_id, e))
openchannel = await peer.funding_locked(chan)
self.save_channel(openchannel)
print("CHANNEL OPENING COMPLETED")
async def wait_funding_locked_and_mark_open(self, peer, chan):
if self.channel_state[chan.channel_id] == "OPEN":
return
if not chan.local_state.funding_locked_received:
chan = await peer.funding_locked(chan)
self.save_channel(chan)
self.print_error("CHANNEL OPENING COMPLETED")
self.channel_state[chan.channel_id] = "OPEN"
# not aiosafe because we call .result() which will propagate an exception
async def _open_channel_coroutine(self, node_id, amount, push_msat, password):
peer = self.peers[bfh(node_id)]
openingchannel = await peer.channel_establishment_flow(self.wallet, self.config, password, amount, push_msat, temp_channel_id=os.urandom(32))
self.print_error("SAVING OPENING CHANNEL")
self.save_channel(openingchannel)
def open_channel(self, node_id, local_amt, push_amt, pw):

Loading…
Cancel
Save