Browse Source

ln: restore functionality

regtest_lnd
Janus 7 years ago
committed by SomberNight
parent
commit
c9ac5d1376
No known key found for this signature in database GPG Key ID: B33B5F232C6271E9
  1. 4
      electrum/commands.py
  2. 57
      lib/lnbase.py
  3. 90
      lib/lnworker.py
  4. 2
      lib/tests/test_lnbase.py

4
electrum/commands.py

@ -777,8 +777,8 @@ class Commands:
self.wallet.lnworker.reestablish_channel() self.wallet.lnworker.reestablish_channel()
@command('wn') @command('wn')
def lnpay(): def lnpay(self, invoice):
self.wallet.lnworker.pay() self.wallet.lnworker.pay(invoice)
@command('wn') @command('wn')
def lnreceive(self): def lnreceive(self):

57
lib/lnbase.py

@ -39,10 +39,12 @@ from .lnrouter import new_onion_packet, OnionHopsDataSingle, OnionPerHop
from collections import namedtuple, defaultdict from collections import namedtuple, defaultdict
class LightningError(Exception): class LightningError(Exception):
pass pass
class LightningPeerConnectionClosed(LightningError):
pass
message_types = {} message_types = {}
def handlesingle(x, ma): def handlesingle(x, ma):
@ -566,7 +568,8 @@ def is_synced(network):
class Peer(PrintError): class Peer(PrintError):
def __init__(self, host, port, pubkey, privkey, network, channel_db, path_finder, request_initial_sync=False): def __init__(self, host, port, pubkey, privkey, network, channel_db, path_finder, channel_state, handle_channel_reestablish, request_initial_sync=False):
self.handle_channel_reestablish = handle_channel_reestablish
self.update_add_htlc_event = asyncio.Event() self.update_add_htlc_event = asyncio.Event()
self.channel_update_event = asyncio.Event() self.channel_update_event = asyncio.Event()
self.host = host self.host = host
@ -576,7 +579,6 @@ class Peer(PrintError):
self.network = network self.network = network
self.channel_db = channel_db self.channel_db = channel_db
self.path_finder = path_finder self.path_finder = path_finder
self.read_buffer = b''
self.ping_time = 0 self.ping_time = 0
self.futures = ["channel_accepted", self.futures = ["channel_accepted",
"funding_signed", "funding_signed",
@ -597,6 +599,8 @@ class Peer(PrintError):
self.initialized = asyncio.Future() self.initialized = asyncio.Future()
self.localfeatures = (0x08 if request_initial_sync else 0) self.localfeatures = (0x08 if request_initial_sync else 0)
self.unfulfilled_htlcs = [] self.unfulfilled_htlcs = []
self.channel_state = channel_state
self.nodes = {}
def diagnostic_name(self): def diagnostic_name(self):
return self.host return self.host
@ -619,21 +623,22 @@ class Peer(PrintError):
async def read_message(self): async def read_message(self):
rn_l, rk_l = self.rn() rn_l, rk_l = self.rn()
rn_m, rk_m = self.rn() rn_m, rk_m = self.rn()
read_buffer = b''
while True: while True:
s = await self.reader.read(1) s = await self.reader.read(1)
if not s: if not s:
raise Exception('connection closed') raise LightningPeerConnectionClosed()
self.read_buffer += s read_buffer += s
if len(self.read_buffer) < 18: if len(read_buffer) < 18:
continue continue
lc = self.read_buffer[:18] lc = read_buffer[:18]
l = aead_decrypt(rk_l, rn_l, b'', lc) l = aead_decrypt(rk_l, rn_l, b'', lc)
length = int.from_bytes(l, 'big') length = int.from_bytes(l, 'big')
offset = 18 + length + 16 offset = 18 + length + 16
if len(self.read_buffer) < offset: if len(read_buffer) < offset:
continue continue
c = self.read_buffer[18:offset] c = read_buffer[18:offset]
self.read_buffer = self.read_buffer[offset:] read_buffer = read_buffer[offset:]
msg = aead_decrypt(rk_m, rn_m, b'', c) msg = aead_decrypt(rk_m, rn_m, b'', c)
return msg return msg
@ -716,7 +721,7 @@ class Peer(PrintError):
if chan_id in self.channel_reestablish: if chan_id in self.channel_reestablish:
self.channel_reestablish[chan_id].set_result(payload) self.channel_reestablish[chan_id].set_result(payload)
else: else:
print("Warning: received unknown channel_reestablish") asyncio.run_coroutine_threadsafe(self.handle_channel_reestablish(chan_id, payload), self.network.asyncio_loop).result()
def on_accept_channel(self, payload): def on_accept_channel(self, payload):
temp_chan_id = payload["temporary_channel_id"] temp_chan_id = payload["temporary_channel_id"]
@ -906,6 +911,7 @@ class Peer(PrintError):
sig_64 = sign_and_get_sig_string(remote_ctx, local_config, remote_config) sig_64 = sign_and_get_sig_string(remote_ctx, local_config, remote_config)
funding_txid_bytes = bytes.fromhex(funding_txid)[::-1] funding_txid_bytes = bytes.fromhex(funding_txid)[::-1]
channel_id = int.from_bytes(funding_txid_bytes, 'big') ^ funding_index channel_id = int.from_bytes(funding_txid_bytes, 'big') ^ funding_index
self.channel_state[channel_id] = "OPENING"
self.send_message(gen_msg("funding_created", self.send_message(gen_msg("funding_created",
temporary_channel_id=temp_channel_id, temporary_channel_id=temp_channel_id,
funding_txid=funding_txid_bytes, funding_txid=funding_txid_bytes,
@ -958,6 +964,7 @@ class Peer(PrintError):
return chan return chan
async def reestablish_channel(self, chan): async def reestablish_channel(self, chan):
assert chan.channel_id not in self.channel_state
await self.initialized await self.initialized
self.send_message(gen_msg("channel_reestablish", self.send_message(gen_msg("channel_reestablish",
@ -984,9 +991,11 @@ class Peer(PrintError):
if channel_reestablish_msg["my_current_per_commitment_point"] != chan.remote_state.last_per_commitment_point: if channel_reestablish_msg["my_current_per_commitment_point"] != chan.remote_state.last_per_commitment_point:
raise Exception("Remote PCP mismatch") raise Exception("Remote PCP mismatch")
return chan
async def on_funding_locked(self): self.channel_state[chan.channel_id] = "OPEN"
async def funding_locked(self, chan):
channel_id = chan.channel_id
try: try:
short_channel_id = await self.local_funding_locked[channel_id] short_channel_id = await self.local_funding_locked[channel_id]
finally: finally:
@ -1003,6 +1012,8 @@ class Peer(PrintError):
del self.remote_funding_locked[channel_id] del self.remote_funding_locked[channel_id]
self.print_error('Done waiting for remote_funding_locked', remote_funding_locked_msg) self.print_error('Done waiting for remote_funding_locked', remote_funding_locked_msg)
self.channel_state[chan.channel_id] = "OPEN"
return chan._replace(short_channel_id=short_channel_id, remote_state=chan.remote_state._replace(next_per_commitment_point=remote_funding_locked_msg["next_per_commitment_point"])) return chan._replace(short_channel_id=short_channel_id, remote_state=chan.remote_state._replace(next_per_commitment_point=remote_funding_locked_msg["next_per_commitment_point"]))
def on_update_fail_htlc(self, payload): def on_update_fail_htlc(self, payload):
@ -1025,11 +1036,16 @@ class Peer(PrintError):
) )
) )
return last_secret, this_point, next_point return last_secret, this_point, next_point
assert self.channel_state[chan.channel_id] == "OPEN"
their_revstore = chan.remote_state.revocation_store their_revstore = chan.remote_state.revocation_store
await asyncio.sleep(1)
while not is_synced(wallet.network): while not is_synced(wallet.network):
await asyncio.sleep(1) await asyncio.sleep(1)
print("sleeping more") print("sleeping more")
if chan.channel_id in self.commitment_signed:
print("too many commitments signed")
del self.commitment_signed[chan.channel_id]
height = wallet.get_local_height() height = wallet.get_local_height()
assert amount_msat > 0, "amount_msat is not greater zero" assert amount_msat > 0, "amount_msat is not greater zero"
@ -1175,6 +1191,8 @@ class Peer(PrintError):
) )
return last_secret, this_point, next_point return last_secret, this_point, next_point
assert self.channel_state[chan.channel_id] == "OPEN"
their_revstore = chan.remote_state.revocation_store their_revstore = chan.remote_state.revocation_store
channel_id = chan.channel_id channel_id = chan.channel_id
@ -1183,8 +1201,15 @@ class Peer(PrintError):
finally: finally:
del self.commitment_signed[channel_id] del self.commitment_signed[channel_id]
assert len(self.unfulfilled_htlcs) == 1 if int.from_bytes(commitment_signed_msg["num_htlcs"], "big") < 1:
htlc = self.unfulfilled_htlcs.pop() while len(self.unfulfilled_htlcs) < 1:
print("waiting for add_update_htlc")
await asyncio.sleep(1)
else:
print("commitment signed message had htlcs")
assert len(self.unfulfilled_htlcs) == 1
htlc = self.unfulfilled_htlcs.pop(0)
htlc_id = int.from_bytes(htlc["id"], 'big') htlc_id = int.from_bytes(htlc["id"], 'big')
assert htlc_id == chan.remote_state.next_htlc_id, (htlc_id, chan.remote_state.next_htlc_id) assert htlc_id == chan.remote_state.next_htlc_id, (htlc_id, chan.remote_state.next_htlc_id)
cltv_expiry = int.from_bytes(htlc["cltv_expiry"], 'big') cltv_expiry = int.from_bytes(htlc["cltv_expiry"], 'big')

90
lib/lnworker.py

@ -10,6 +10,7 @@ import binascii
import asyncio import asyncio
from . import constants
from .bitcoin import sha256, COIN from .bitcoin import sha256, COIN
from .util import bh2u, bfh from .util import bh2u, bfh
from .constants import set_testnet, set_simnet from .constants import set_testnet, set_simnet
@ -17,7 +18,7 @@ from .simple_config import SimpleConfig
from .network import Network from .network import Network
from .storage import WalletStorage from .storage import WalletStorage
from .wallet import Wallet from .wallet import Wallet
from .lnbase import Peer, Outpoint, ChannelConfig, LocalState, RemoteState, Keypair, OnlyPubkeyKeypair, OpenChannel, ChannelConstraints, RevocationStore, aiosafe from .lnbase import Peer, Outpoint, ChannelConfig, LocalState, RemoteState, Keypair, OnlyPubkeyKeypair, OpenChannel, ChannelConstraints, RevocationStore, aiosafe, calc_short_channel_id, privkey_to_pubkey
from .lightning_payencode.lnaddr import lnencode, LnAddr, lndecode from .lightning_payencode.lnaddr import lnencode, LnAddr, lndecode
from . import lnrouter from . import lnrouter
@ -98,92 +99,119 @@ class LNWorker:
self.channel_db = lnrouter.ChannelDB() self.channel_db = lnrouter.ChannelDB()
self.path_finder = lnrouter.LNPathFinder(self.channel_db) self.path_finder = lnrouter.LNPathFinder(self.channel_db)
self.channels = wallet.storage.get("channels", {}) self.channels = [reconstruct_namedtuples(x) for x in wallet.storage.get("channels", {})]
peer_list = network.config.get('lightning_peers', node_list) peer_list = network.config.get('lightning_peers', node_list)
self.channel_state = {}
for host, port, pubkey in peer_list: for host, port, pubkey in peer_list:
self.add_peer(host, port, pubkey) self.add_peer(host, int(port), pubkey)
# wait until we see confirmations # wait until we see confirmations
self.network.register_callback(self.on_network_update, ['updated', 'verified']) # thread safe self.network.register_callback(self.on_network_update, ['updated', 'verified']) # thread safe
self.on_network_update('updated') # shortcut (don't block) if funding tx locked and verified self.on_network_update('updated') # shortcut (don't block) if funding tx locked and verified
def add_peer(self, host, port, pubkey): def add_peer(self, host, port, pubkey):
peer = Peer(host, int(port), binascii.unhexlify(pubkey), self.privkey, peer = Peer(host, int(port), binascii.unhexlify(pubkey), self.privkey,
self.network, self.channel_db, self.path_finder) self.network, self.channel_db, self.path_finder, self.channel_state, self.handle_channel_reestablish)
self.network.futures.append(asyncio.run_coroutine_threadsafe(peer.main_loop(), asyncio.get_event_loop())) self.network.futures.append(asyncio.run_coroutine_threadsafe(peer.main_loop(), asyncio.get_event_loop()))
self.peers[pubkey] = peer self.peers[bfh(pubkey)] = peer
async def handle_channel_reestablish(self, chan_id, payload):
chans = [x for x in self.channels if x.channel_id == chan_id ]
chan = chans[0]
await self.peers[chan.node_id].reestablish_channel(chan)
def save_channel(self, openchannel): def save_channel(self, openchannel):
dumped = serialize_channels([openchannel]) self.channels = [openchannel] # TODO multiple channels
dumped = serialize_channels(self.channels)
self.wallet.storage.put("channels", dumped) self.wallet.storage.put("channels", dumped)
self.wallet.storage.write() self.wallet.storage.write()
def on_network_update(self, event, *args): def on_network_update(self, event, *args):
for chan in self.channels: for chan in self.channels:
peer = self.peers[chan.node_id] peer = self.peers[chan.node_id]
conf = wallet.get_tx_height(chan.funding_outpoint.txid)[1] conf = self.wallet.get_tx_height(chan.funding_outpoint.txid)[1]
if conf >= chan.constraints.funding_txn_minimum_depth: if conf >= chan.constraints.funding_txn_minimum_depth:
block_height, tx_pos = wallet.get_txpos(chan.funding_outpoint.txid) block_height, tx_pos = self.wallet.get_txpos(chan.funding_outpoint.txid)
if tx_pos == -1: if tx_pos == -1:
self.print_error('funding tx is not yet SPV verified.. but there are ' self.print_error('funding tx is not yet SPV verified.. but there are '
'already enough confirmations (currently {})'.format(conf)) 'already enough confirmations (currently {})'.format(conf))
return return
asyncio.run_coroutine_threadsafe(self.set_local_funding_locked_result(peer, chan, block_height, txpos), asyncio.get_event_loop()) if chan.channel_id not in self.channel_state or self.channel_state[chan.channel_id] != "OPENING":
return
asyncio.run_coroutine_threadsafe(self.set_local_funding_locked_result(peer, chan, block_height, tx_pos), asyncio.get_event_loop())
async def set_local_funding_locked_result(self, peer, chan, block_height, txpos): # aiosafe because we don't wait for result
@aiosafe
async def set_local_funding_locked_result(self, peer, chan, block_height, tx_pos):
channel_id = chan.channel_id channel_id = chan.channel_id
short_channel_id = calc_short_channel_id(block_height, tx_pos, chan.funding_outpoint.output_index)
try: try:
peer.local_funding_locked[channel_id].set_result(short_channel_id) peer.local_funding_locked[channel_id].set_result(short_channel_id)
except (asyncio.InvalidStateError, KeyError) as e: except (asyncio.InvalidStateError, KeyError) as e:
# FIXME race condition if updates come in quickly, set_result might be called multiple times # FIXME race condition if updates come in quickly, set_result might be called multiple times
# or self.local_funding_locked[channel_id] might be deleted already # or self.local_funding_locked[channel_id] might be deleted already
self.print_error('local_funding_locked.set_result error for channel {}: {}'.format(channel_id, e)) self.print_error('local_funding_locked.set_result error for channel {}: {}'.format(channel_id, e))
short_channel_id = calc_short_channel_id(block_height, tx_pos, chan.funding_outpoint.output_index) openchannel = await peer.funding_locked(chan)
openchannel = await peer.on_funding_locked(openingchannel, self.wallet)
self.save_channel(openchannel) self.save_channel(openchannel)
print("CHANNEL OPENING COMPLETED")
@aiosafe # not aiosafe because we call .result() which will propagate an exception
async def _open_channel_coroutine(self, node_id, amount, push_msat, password): async def _open_channel_coroutine(self, node_id, amount, push_msat, password):
peer = self.peers[node_id] peer = self.peers[bfh(node_id)]
openingchannel = await peer.channel_establishment_flow(self.wallet, self.config, password, amount, push_msat, temp_channel_id=os.urandom(32)) openingchannel = await peer.channel_establishment_flow(self.wallet, self.config, password, amount, push_msat, temp_channel_id=os.urandom(32))
self.save_channel(openingchannel) self.save_channel(openingchannel)
def open_channel(self, node_id, local_amt, push_amt, emit_function, pw): def open_channel(self, node_id, local_amt, push_amt, pw):
coro = self._open_channel_coroutine(node_id, local_amt, push_amt, None if pw == "" else pw) coro = self._open_channel_coroutine(node_id, local_amt, push_amt, None if pw == "" else pw)
asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop) return asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop).result()
#chan = fut.result() #chan = fut.result()
# https://api.lightning.community/#listchannels # https://api.lightning.community/#listchannels
#std_chan = {"chan_id": chan.channel_id} #std_chan = {"chan_id": chan.channel_id}
#emit_function({"channels": [std_chan]}) #emit_function({"channels": [std_chan]})
def get_paid(self):
coro = self._get_paid_coroutine()
return asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop).result()
def pay(self, invoice):
coro = self._pay_coroutine(invoice)
return asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop).result()
def list_channels(self): def list_channels(self):
return self.channels return serialize_channels(self.channels)
@aiosafe def reestablish_channels(self):
async def reestablish_channel(self): coro = self._reestablish_channels_coroutine()
return asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop).result()
# not aiosafe because we call .result() which will propagate an exception
async def _reestablish_channels_coroutine(self):
if self.channels is None or len(self.channels) < 1: if self.channels is None or len(self.channels) < 1:
raise Exception("Can't reestablish: No channel saved") raise Exception("Can't reestablish: No channel saved")
openchannel = self.channels[0] peer = self.peers[self.channels[0].node_id]
openchannel = reconstruct_namedtuples(openchannel) await peer.reestablish_channel(self.channels[0])
openchannel = await peer.reestablish_channel(openchannel)
self.save_channel(openchannel)
@aiosafe # not aiosafe because we call .result() which will propagate an exception
async def pay(self): async def _pay_coroutine(self, invoice):
addr = lndecode(sys.argv[6], expected_hrp="sb" if sys.argv[2] == "simnet" else "tb") openchannel = self.channels[0]
addr = lndecode(invoice, expected_hrp=constants.net.SEGWIT_HRP)
payment_hash = addr.paymenthash payment_hash = addr.paymenthash
pubkey = addr.pubkey.serialize() pubkey = addr.pubkey.serialize()
msat_amt = int(addr.amount * COIN * 1000) msat_amt = int(addr.amount * COIN * 1000)
openchannel = await peer.pay(wallet, openchannel, msat_amt, payment_hash, pubkey, addr.min_final_cltv_expiry) peer = self.peers[openchannel.node_id]
openchannel = await peer.pay(self.wallet, openchannel, msat_amt, payment_hash, pubkey, addr.min_final_cltv_expiry)
self.save_channel(openchannel) self.save_channel(openchannel)
@aiosafe # not aiosafe because we call .result() which will propagate an exception
async def get_paid(self): async def _get_paid_coroutine(self):
openchannel = self.channels[0]
payment_preimage = os.urandom(32) payment_preimage = os.urandom(32)
RHASH = sha256(payment_preimage) RHASH = sha256(payment_preimage)
expected_received_sat = 200000 expected_received_sat = 200000
expected_received_msat = expected_received_sat * 1000 expected_received_msat = expected_received_sat * 1000
pay_req = lnencode(LnAddr(RHASH, amount=1/Decimal(COIN)*expected_received_sat, tags=[('d', 'one cup of coffee')]), peer.privkey[:32]) peer = self.peers[openchannel.node_id]
pay_req = lnencode(LnAddr(RHASH, amount=1/Decimal(COIN)*expected_received_sat, tags=[('d', 'one cup of coffee')]), peer.privkey)
decoded = lndecode(pay_req, expected_hrp="sb")
assert decoded.pubkey.serialize() == privkey_to_pubkey(self.privkey)
print("payment request", pay_req) print("payment request", pay_req)
openchannel = await peer.receive_commitment_revoke_ack(openchannel, expected_received_msat, payment_preimage) openchannel = await peer.receive_commitment_revoke_ack(openchannel, expected_received_msat, payment_preimage)
self.save_channel(openchannel) self.save_channel(openchannel)

2
lib/tests/test_lnbase.py

@ -256,7 +256,7 @@ class Test_LNBase(unittest.TestCase):
def test_find_path_for_payment(self): def test_find_path_for_payment(self):
channel_db = lnrouter.ChannelDB() channel_db = lnrouter.ChannelDB()
path_finder = lnrouter.LNPathFinder(channel_db) path_finder = lnrouter.LNPathFinder(channel_db)
p = Peer('', 0, 'a', bitcoin.sha256('privkeyseed'), None, channel_db, path_finder) p = Peer('', 0, 'a', bitcoin.sha256('privkeyseed'), None, channel_db, path_finder, {}, lambda x, y: None)
p.on_channel_announcement({'node_id_1': b'b', 'node_id_2': b'c', 'short_channel_id': bfh('0000000000000001')}) p.on_channel_announcement({'node_id_1': b'b', 'node_id_2': b'c', 'short_channel_id': bfh('0000000000000001')})
p.on_channel_announcement({'node_id_1': b'b', 'node_id_2': b'e', 'short_channel_id': bfh('0000000000000002')}) p.on_channel_announcement({'node_id_1': b'b', 'node_id_2': b'e', 'short_channel_id': bfh('0000000000000002')})
p.on_channel_announcement({'node_id_1': b'a', 'node_id_2': b'b', 'short_channel_id': bfh('0000000000000003')}) p.on_channel_announcement({'node_id_1': b'a', 'node_id_2': b'b', 'short_channel_id': bfh('0000000000000003')})

Loading…
Cancel
Save