From 71afa3cc70ab22aeed8ce8b6bb19f78d29d1ec53 Mon Sep 17 00:00:00 2001 From: Janus Date: Sun, 14 Oct 2018 22:36:23 +0200 Subject: [PATCH] lnbase: split out BOLT-08 (Noise) implementation --- electrum/gui/qt/main_window.py | 4 +- electrum/lnbase.py | 240 ++++++++++++++++----------------- electrum/lnworker.py | 15 ++- 3 files changed, 132 insertions(+), 127 deletions(-) diff --git a/electrum/gui/qt/main_window.py b/electrum/gui/qt/main_window.py index 4d5da8bee..e9c3e629e 100644 --- a/electrum/gui/qt/main_window.py +++ b/electrum/gui/qt/main_window.py @@ -1854,10 +1854,10 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger): def open_channel(self, *args, **kwargs): def task(): return self.wallet.lnworker.open_channel(*args, **kwargs) - def on_success(chan): + def on_success(node_id): self.show_message('\n'.join([ _('Channel established.'), - _('Remote peer ID') + ':' + bh2u(chan.node_id), + _('Remote peer ID') + ':' + node_id, _('This channel will be usable after 3 confirmations') ])) WaitingDialog(self, _('Opening channel...'), task, on_success, self.on_error) diff --git a/electrum/lnbase.py b/electrum/lnbase.py index 3720043ca..877d62f27 100644 --- a/electrum/lnbase.py +++ b/electrum/lnbase.py @@ -266,53 +266,14 @@ def create_ephemeral_key() -> (bytes, bytes): privkey = ecc.ECPrivkey.generate_random_key() return privkey.get_secret_bytes(), privkey.get_public_key_bytes() - -class Peer(PrintError): - - def __init__(self, lnworker, host, port, pubkey, request_initial_sync=False): - self.host = host - self.port = port - self.pubkey = pubkey - self.peer_addr = LNPeerAddr(host, port, pubkey) - self.lnworker = lnworker - self.privkey = lnworker.node_keypair.privkey - self.network = lnworker.network - self.lnwatcher = lnworker.network.lnwatcher - self.channel_db = lnworker.network.channel_db - self.read_buffer = b'' - self.ping_time = 0 - self.initialized = asyncio.Future() - self.channel_accepted = defaultdict(asyncio.Queue) - self.channel_reestablished = defaultdict(asyncio.Future) - self.funding_signed = defaultdict(asyncio.Queue) - self.funding_created = defaultdict(asyncio.Queue) - self.revoke_and_ack = defaultdict(asyncio.Queue) - self.commitment_signed = defaultdict(asyncio.Queue) - self.announcement_signatures = defaultdict(asyncio.Queue) - self.closing_signed = defaultdict(asyncio.Queue) - self.payment_preimages = defaultdict(asyncio.Queue) - self.localfeatures = LnLocalFeatures(0) - if request_initial_sync: - self.localfeatures |= LnLocalFeatures.INITIAL_ROUTING_SYNC - self.localfeatures |= LnLocalFeatures.OPTION_DATA_LOSS_PROTECT_OPT - self.invoices = lnworker.invoices - self.attempted_route = {} - - @property - def channels(self): - return self.lnworker.channels_for_peer(self.pubkey) - - def diagnostic_name(self): - return 'lnbase:' + str(self.host) - - def ping_if_required(self): - if time.time() - self.ping_time > 120: - self.send_message(gen_msg('ping', num_pong_bytes=4, byteslen=4)) - self.ping_time = time.time() - - def send_message(self, msg): - message_type, payload = decode_msg(msg) - self.print_error("Sending '%s'"%message_type.upper()) +class InitiatorSession: + def __init__(self, privkey, remote_pubkey, reader, writer): + self.privkey = privkey + self.remote_pubkey = remote_pubkey + self.reader = reader + self.writer = writer + + def send_bytes(self, msg): l = len(msg).to_bytes(2, 'big') lc = aead_encrypt(self.sk, self.sn(), b'', l) c = aead_encrypt(self.sk, self.sn(), b'', msg) @@ -320,30 +281,33 @@ class Peer(PrintError): assert len(c) == len(msg) + 16 self.writer.write(lc+c) - async def read_message(self): - rn_l, rk_l = self.rn() - rn_m, rk_m = self.rn() + async def read_messages(self): + read_buffer = b'' while True: - if len(self.read_buffer) >= 18: - lc = self.read_buffer[:18] - l = aead_decrypt(rk_l, rn_l, b'', lc) - length = int.from_bytes(l, 'big') - offset = 18 + length + 16 - if len(self.read_buffer) >= offset: - c = self.read_buffer[18:offset] - self.read_buffer = self.read_buffer[offset:] - msg = aead_decrypt(rk_m, rn_m, b'', c) - return msg - try: - s = await self.reader.read(2**10) - except: - s = None - if not s: - raise LightningPeerConnectionClosed() - self.read_buffer += s + rn_l, rk_l = self.rn() + rn_m, rk_m = self.rn() + while True: + if len(read_buffer) >= 18: + lc = read_buffer[:18] + l = aead_decrypt(rk_l, rn_l, b'', lc) + length = int.from_bytes(l, 'big') + offset = 18 + length + 16 + if len(read_buffer) >= offset: + c = read_buffer[18:offset] + read_buffer = read_buffer[offset:] + msg = aead_decrypt(rk_m, rn_m, b'', c) + yield msg + break + try: + s = await self.reader.read(2**10) + except: + s = None + if not s: + raise LightningPeerConnectionClosed() + read_buffer += s async def handshake(self): - hs = HandshakeState(self.pubkey) + hs = HandshakeState(self.remote_pubkey) # Get a new ephemeral key epriv, epub = create_ephemeral_key() @@ -396,6 +360,57 @@ class Peer(PrintError): self._sn = 0 return o + +class Peer(PrintError): + + def __init__(self, lnworker, peer_addr, request_initial_sync=False): + self.initialized = asyncio.Future() + self.transport = None + self.peer_addr = peer_addr + self.lnworker = lnworker + self.privkey = lnworker.node_keypair.privkey + self.network = lnworker.network + self.lnwatcher = lnworker.network.lnwatcher + self.channel_db = lnworker.network.channel_db + self.ping_time = 0 + self.channel_accepted = defaultdict(asyncio.Queue) + self.channel_reestablished = defaultdict(asyncio.Future) + self.funding_signed = defaultdict(asyncio.Queue) + self.funding_created = defaultdict(asyncio.Queue) + self.revoke_and_ack = defaultdict(asyncio.Queue) + self.commitment_signed = defaultdict(asyncio.Queue) + self.announcement_signatures = defaultdict(asyncio.Queue) + self.closing_signed = defaultdict(asyncio.Queue) + self.payment_preimages = defaultdict(asyncio.Queue) + self.localfeatures = LnLocalFeatures(0) + if request_initial_sync: + self.localfeatures |= LnLocalFeatures.INITIAL_ROUTING_SYNC + self.localfeatures |= LnLocalFeatures.OPTION_DATA_LOSS_PROTECT_OPT + self.invoices = lnworker.invoices + self.attempted_route = {} + + def send_message(self, message_name, **kwargs): + assert type(message_name) is str + self.print_error("Sending '%s'"%message_name.upper()) + self.transport.send_bytes(gen_msg(message_name, **kwargs)) + + async def initialize(self): + await self.transport.handshake() + self.send_message("init", gflen=0, lflen=1, localfeatures=self.localfeatures) + self.initialized.set_result(True) + + @property + def channels(self): + return self.lnworker.channels_for_peer(self.peer_addr.pubkey) + + def diagnostic_name(self): + return 'lnbase:' + str(self.peer_addr.host) + + def ping_if_required(self): + if time.time() - self.ping_time > 120: + self.send_message('ping', num_pong_bytes=4, byteslen=4) + self.ping_time = time.time() + def process_message(self, message): message_type, payload = decode_msg(message) try: @@ -421,7 +436,7 @@ class Peer(PrintError): def on_ping(self, payload): l = int.from_bytes(payload['num_pong_bytes'], 'big') - self.send_message(gen_msg('pong', byteslen=l)) + self.send_message('pong', byteslen=l) def on_pong(self, payload): pass @@ -484,16 +499,6 @@ class Peer(PrintError): else: self.announcement_signatures[channel_id].put_nowait(payload) - async def initialize(self): - self.reader, self.writer = await asyncio.open_connection(self.host, self.port) - await self.handshake() - # send init - self.send_message(gen_msg("init", gflen=0, lflen=1, localfeatures=self.localfeatures)) - # read init - msg = await self.read_message() - self.process_message(msg) - self.initialized.set_result(True) - def handle_disconnect(func): async def wrapper_func(self, *args, **kwargs): try: @@ -502,7 +507,7 @@ class Peer(PrintError): self.print_error("disconnecting gracefully. {}".format(e)) finally: self.close_and_cleanup() - self.lnworker.peers.pop(self.pubkey) + self.lnworker.peers.pop(self.peer_addr.pubkey) return wrapper_func @ignore_exceptions # do not kill main_taskgroup @@ -516,10 +521,9 @@ class Peer(PrintError): return self.channel_db.add_recent_peer(self.peer_addr) # loop - while True: - self.ping_if_required() - msg = await self.read_message() + async for msg in self.transport.read_messages(): self.process_message(msg) + self.ping_if_required() def close_and_cleanup(self): try: @@ -564,7 +568,7 @@ class Peer(PrintError): # for the first commitment transaction per_commitment_secret_first = get_per_commitment_secret_from_seed(per_commitment_secret_seed, RevocationStore.START_INDEX) per_commitment_point_first = secret_to_pubkey(int.from_bytes(per_commitment_secret_first, 'big')) - msg = gen_msg( + self.send_message( "open_channel", temporary_channel_id=temp_channel_id, chain_hash=constants.net.rev_genesis_bytes(), @@ -584,7 +588,6 @@ class Peer(PrintError): channel_flags=0x00, # not willing to announce channel channel_reserve_satoshis=546 ) - self.send_message(msg) payload = await self.channel_accepted[temp_channel_id].get() if payload.get('error'): raise Exception(payload.get('error')) @@ -625,7 +628,7 @@ class Peer(PrintError): # remote commitment transaction channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_index) chan = { - "node_id": self.pubkey, + "node_id": self.peer_addr.pubkey, "channel_id": channel_id, "short_channel_id": None, "funding_outpoint": Outpoint(funding_txid, funding_index), @@ -645,11 +648,11 @@ class Peer(PrintError): m.lnwatcher = self.lnwatcher m.sweep_address = self.lnworker.sweep_address sig_64, _ = m.sign_next_commitment() - self.send_message(gen_msg("funding_created", + self.send_message("funding_created", temporary_channel_id=temp_channel_id, funding_txid=funding_txid_bytes, funding_output_index=funding_index, - signature=sig_64)) + signature=sig_64) payload = await self.funding_signed[channel_id].get() self.print_error('received funding_signed') remote_sig = payload['signature'] @@ -679,7 +682,7 @@ class Peer(PrintError): per_commitment_point_first = secret_to_pubkey(int.from_bytes(per_commitment_secret_first, 'big')) min_depth = 3 - self.send_message(gen_msg('accept_channel', + self.send_message('accept_channel', temporary_channel_id=temp_chan_id, dust_limit_satoshis=local_config.dust_limit_sat, max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat, @@ -694,7 +697,7 @@ class Peer(PrintError): delayed_payment_basepoint=local_config.delayed_basepoint.pubkey, htlc_basepoint=local_config.htlc_basepoint.pubkey, first_per_commitment_point=per_commitment_point_first, - )) + ) funding_created = await self.funding_created[temp_chan_id].get() funding_idx = int.from_bytes(funding_created['funding_output_index'], 'big') funding_txid = bh2u(funding_created['funding_txid'][::-1]) @@ -742,10 +745,10 @@ class Peer(PrintError): remote_sig = funding_created['signature'] m.receive_new_commitment(remote_sig, []) sig_64, _ = m.sign_next_commitment() - self.send_message(gen_msg('funding_signed', + self.send_message('funding_signed', channel_id=channel_id, signature=sig_64, - )) + ) m.set_state('OPENING') m.remote_commitment_to_be_revoked = m.pending_remote_commitment m.config[REMOTE] = m.config[REMOTE]._replace(ctn=0) @@ -778,11 +781,11 @@ class Peer(PrintError): return chan.set_state('REESTABLISHING') self.network.trigger_callback('channel', chan) - self.send_message(gen_msg("channel_reestablish", + self.send_message("channel_reestablish", channel_id=chan_id, next_local_commitment_number=chan.config[LOCAL].ctn+1, next_remote_revocation_number=chan.config[REMOTE].ctn - )) + ) await self.channel_reestablished[chan_id] chan.set_state('OPENING') if chan.config[LOCAL].funding_locked_received and chan.short_channel_id: @@ -799,11 +802,11 @@ class Peer(PrintError): def try_to_get_remote_to_force_close_with_their_latest(): self.print_error("trying to get remote to force close", bh2u(chan_id)) - self.send_message(gen_msg("channel_reestablish", + self.send_message("channel_reestablish", channel_id=chan_id, next_local_commitment_number=0, next_remote_revocation_number=0 - )) + ) channel_reestablish_msg = payload # compare remote ctns @@ -854,7 +857,7 @@ class Peer(PrintError): per_commitment_point_second = secret_to_pubkey(int.from_bytes( get_per_commitment_secret_from_seed(chan.config[LOCAL].per_commitment_secret_seed, per_commitment_secret_index), 'big')) # note: if funding_locked was not yet received, we might send it multiple times - self.send_message(gen_msg("funding_locked", channel_id=channel_id, next_per_commitment_point=per_commitment_point_second)) + self.send_message("funding_locked", channel_id=channel_id, next_per_commitment_point=per_commitment_point_second) if chan.config[LOCAL].funding_locked_received: self.mark_open(chan) @@ -903,7 +906,7 @@ class Peer(PrintError): node_sigs = [local_node_sig, remote_node_sig] bitcoin_sigs = [local_bitcoin_sig, remote_bitcoin_sig] - node_ids = [privkey_to_pubkey(self.privkey), self.pubkey] + node_ids = [privkey_to_pubkey(self.privkey), self.peer_addr.pubkey] bitcoin_keys = [chan.config[LOCAL].multisig_key.pubkey, chan.config[REMOTE].multisig_key.pubkey] if node_ids[0] > node_ids[1]: @@ -912,7 +915,7 @@ class Peer(PrintError): node_ids.reverse() bitcoin_keys.reverse() - channel_announcement = gen_msg("channel_announcement", + self.send_message("channel_announcement", node_signatures_1=node_sigs[0], node_signatures_2=node_sigs[1], bitcoin_signature_1=bitcoin_sigs[0], @@ -927,8 +930,6 @@ class Peer(PrintError): bitcoin_key_2=bitcoin_keys[1] ) - self.send_message(channel_announcement) - print("SENT CHANNEL ANNOUNCEMENT") def mark_open(self, chan): @@ -940,7 +941,7 @@ class Peer(PrintError): self.network.trigger_callback('channel', chan) # add channel to database pubkey_ours = self.lnworker.node_keypair.pubkey - pubkey_theirs = self.pubkey + pubkey_theirs = self.peer_addr.pubkey node_ids = [pubkey_theirs, pubkey_ours] bitcoin_keys = [chan.config[LOCAL].multisig_key.pubkey, chan.config[REMOTE].multisig_key.pubkey] sorted_node_ids = list(sorted(node_ids)) @@ -968,8 +969,8 @@ class Peer(PrintError): # peer may have sent us a channel update for the incoming direction previously # note: if we were offline when the 3rd conf happened, lnd will never send us this channel_update # see https://github.com/lightningnetwork/lnd/issues/1347 - #self.send_message(gen_msg("query_short_channel_ids", chain_hash=constants.net.rev_genesis_bytes(), - # len=9, encoded_short_ids=b'\x00'+chan.short_channel_id)) + #self.send_message("query_short_channel_ids", chain_hash=constants.net.rev_genesis_bytes(), + # len=9, encoded_short_ids=b'\x00'+chan.short_channel_id) if hasattr(chan, 'pending_channel_update_message'): self.on_channel_update(chan.pending_channel_update_message) @@ -981,7 +982,7 @@ class Peer(PrintError): chan.config[REMOTE].multisig_key.pubkey] node_ids = [privkey_to_pubkey(self.privkey), - self.pubkey] + self.peer_addr.pubkey] sorted_node_ids = list(sorted(node_ids)) if sorted_node_ids != node_ids: @@ -1002,12 +1003,12 @@ class Peer(PrintError): h = bitcoin.Hash(to_hash) bitcoin_signature = ecc.ECPrivkey(chan.config[LOCAL].multisig_key.privkey).sign(h, sig_string_from_r_and_s, get_r_and_s_from_sig_string) node_signature = ecc.ECPrivkey(self.privkey).sign(h, sig_string_from_r_and_s, get_r_and_s_from_sig_string) - self.send_message(gen_msg("announcement_signatures", + self.send_message("announcement_signatures", channel_id=chan.channel_id, short_channel_id=chan.short_channel_id, node_signature=node_signature, bitcoin_signature=bitcoin_signature - )) + ) return h, node_signature, bitcoin_signature @@ -1068,12 +1069,12 @@ class Peer(PrintError): def send_commitment(self, chan): sig_64, htlc_sigs = chan.sign_next_commitment() - self.send_message(gen_msg("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs))) + self.send_message("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs)) return len(htlc_sigs) - async def update_channel(self, chan, update): + async def update_channel(self, chan, message_name, **kwargs): """ generic channel update flow """ - self.send_message(update) + self.send_message(message_name, **kwargs) self.send_commitment(chan) await self.receive_revoke(chan) await self.receive_commitment(chan) @@ -1114,9 +1115,8 @@ class Peer(PrintError): raise PaymentFailure('not enough local balance') htlc_id = chan.add_htlc(htlc) chan.onion_keys[htlc_id] = secret_key - update = gen_msg("update_add_htlc", channel_id=chan.channel_id, id=htlc_id, cltv_expiry=final_cltv_expiry_with_deltas, amount_msat=amount_msat, payment_hash=payment_hash, onion_routing_packet=onion.to_bytes()) self.attempted_route[(chan.channel_id, htlc_id)] = route - await self.update_channel(chan, update) + await self.update_channel(chan, "update_add_htlc", channel_id=chan.channel_id, id=htlc_id, cltv_expiry=final_cltv_expiry_with_deltas, amount_msat=amount_msat, payment_hash=payment_hash, onion_routing_packet=onion.to_bytes()) async def receive_revoke(self, m): revoke_and_ack_msg = await self.revoke_and_ack[m.channel_id].get() @@ -1125,10 +1125,10 @@ class Peer(PrintError): def revoke(self, m): rev, _ = m.revoke_current_commitment() self.lnworker.save_channel(m) - self.send_message(gen_msg("revoke_and_ack", + self.send_message("revoke_and_ack", channel_id=m.channel_id, per_commitment_secret=rev.per_commitment_secret, - next_per_commitment_point=rev.next_per_commitment_point)) + next_per_commitment_point=rev.next_per_commitment_point) async def receive_commitment(self, m, commitment_signed_msg=None): if commitment_signed_msg is None: @@ -1158,8 +1158,7 @@ class Peer(PrintError): self.send_commitment(chan) await self.receive_revoke(chan) chan.settle_htlc(payment_preimage, htlc_id) - fulfillment = gen_msg("update_fulfill_htlc", channel_id=channel_id, id=htlc_id, payment_preimage=payment_preimage) - await self.update_channel(chan, fulfillment) + await self.update_channel(chan, "update_fulfill_htlc", channel_id=channel_id, id=htlc_id, payment_preimage=payment_preimage) self.lnworker.save_channel(chan) def on_commitment_signed(self, payload): @@ -1234,8 +1233,7 @@ class Peer(PrintError): else: return chan.update_fee(feerate_per_kw) - update = gen_msg("update_fee", channel_id=chan.channel_id, feerate_per_kw=feerate_per_kw) - await self.update_channel(chan, update) + await self.update_channel(chan, "update_fee", channel_id=chan.channel_id, feerate_per_kw=feerate_per_kw) def current_feerate_per_kw(self): if constants.net is constants.BitcoinRegtest: @@ -1258,9 +1256,9 @@ class Peer(PrintError): raise Exception('scriptpubkey length in received shutdown message invalid: ' + str(payload['len'])) chan = self.channels[payload['channel_id']] scriptpubkey = bfh(bitcoin.address_to_script(chan.sweep_address)) - self.send_message(gen_msg('shutdown', channel_id=chan.channel_id, len=len(scriptpubkey), scriptpubkey=scriptpubkey)) + self.send_message('shutdown', channel_id=chan.channel_id, len=len(scriptpubkey), scriptpubkey=scriptpubkey) signature, fee = chan.make_closing_tx(scriptpubkey, payload['scriptpubkey']) - self.send_message(gen_msg('closing_signed', channel_id=chan.channel_id, fee_satoshis=fee, signature=signature)) + self.send_message('closing_signed', channel_id=chan.channel_id, fee_satoshis=fee, signature=signature) while chan.get_state() != 'CLOSED': try: closing_signed = await asyncio.wait_for(self.closing_signed[chan.channel_id].get(), 1) @@ -1269,5 +1267,5 @@ class Peer(PrintError): else: fee = int.from_bytes(closing_signed['fee_satoshis'], 'big') signature, _ = chan.make_closing_tx(scriptpubkey, payload['scriptpubkey'], fee_sat=fee) - self.send_message(gen_msg('closing_signed', channel_id=chan.channel_id, fee_satoshis=fee, signature=signature)) + self.send_message('closing_signed', channel_id=chan.channel_id, fee_satoshis=fee, signature=signature) self.print_error('REMOTE PEER CLOSED CHANNEL') diff --git a/electrum/lnworker.py b/electrum/lnworker.py index 8155e8550..b41892ab4 100644 --- a/electrum/lnworker.py +++ b/electrum/lnworker.py @@ -16,7 +16,7 @@ from . import bitcoin from .keystore import BIP32_KeyStore from .bitcoin import sha256, COIN from .util import bh2u, bfh, PrintError, InvoiceError, resolve_dns_srv, is_ip_address, log_exceptions -from .lnbase import Peer +from .lnbase import Peer, InitiatorSession from .lnaddr import lnencode, LnAddr, lndecode from .ecc import der_sig_from_sig_string from .lnchan import Channel @@ -112,8 +112,14 @@ class LNWorker(PrintError): return self._last_tried_peer[peer_addr] = time.time() self.print_error("adding peer", peer_addr) - peer = Peer(self, host, port, node_id, request_initial_sync=self.config.get("request_initial_sync", True)) - asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(peer.main_loop()), self.network.asyncio_loop) + fut = asyncio.ensure_future(asyncio.open_connection(peer_addr.host, peer_addr.port)) + def cb(fut): + reader, writer = fut.result() + transport = InitiatorSession(self.node_keypair.privkey, node_id, reader, writer) + peer.transport = transport + asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(peer.main_loop()), self.network.asyncio_loop) + fut.add_done_callback(cb) + peer = Peer(self, peer_addr, request_initial_sync=self.config.get("request_initial_sync", True)) self.peers[node_id] = peer self.network.trigger_callback('ln_status') return peer @@ -238,7 +244,8 @@ class LNWorker(PrintError): peer = self.add_peer(host, port, node_id) coro = self._open_channel_coroutine(peer, local_amt_sat, push_amt_sat, password) f = asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop) - return f.result(timeout) + chan = f.result(timeout) + return bh2u(chan.node_id) def pay(self, invoice, amount_sat=None): addr = lndecode(invoice, expected_hrp=constants.net.SEGWIT_HRP)