|
|
@ -78,7 +78,8 @@ class Peer(PrintError): |
|
|
|
self.payment_preimages = defaultdict(asyncio.Queue) |
|
|
|
self.localfeatures = LnLocalFeatures(0) |
|
|
|
self.localfeatures |= LnLocalFeatures.GOSSIP_QUERIES_REQ |
|
|
|
self.localfeatures |= LnLocalFeatures.OPTION_DATA_LOSS_PROTECT_REQ |
|
|
|
#self.localfeatures |= LnLocalFeatures.OPTION_DATA_LOSS_PROTECT_REQ |
|
|
|
#self.localfeatures |= LnLocalFeatures.OPTION_DATA_LOSS_PROTECT_OPT |
|
|
|
self.attempted_route = {} |
|
|
|
self.orphan_channel_updates = OrderedDict() |
|
|
|
self.sent_commitment_for_ctn_last = defaultdict(lambda: None) # type: Dict[Channel, Optional[int]] |
|
|
@ -162,8 +163,9 @@ class Peer(PrintError): |
|
|
|
return |
|
|
|
# if they required some even flag we don't have, they will close themselves |
|
|
|
# but if we require an even flag they don't have, we close |
|
|
|
self.their_localfeatures = int.from_bytes(payload['localfeatures'], byteorder="big") |
|
|
|
our_flags = set(list_enabled_bits(self.localfeatures)) |
|
|
|
their_flags = set(list_enabled_bits(int.from_bytes(payload['localfeatures'], byteorder="big"))) |
|
|
|
their_flags = set(list_enabled_bits(self.their_localfeatures)) |
|
|
|
for flag in our_flags: |
|
|
|
if flag not in their_flags and get_ln_flag_pair_of_bit(flag) not in their_flags: |
|
|
|
# they don't have this feature we wanted :( |
|
|
@ -200,7 +202,6 @@ class Peer(PrintError): |
|
|
|
self.print_error("Disconnecting: {}".format(e)) |
|
|
|
finally: |
|
|
|
self.close_and_cleanup() |
|
|
|
self.lnworker.peers.pop(self.pubkey) |
|
|
|
return wrapper_func |
|
|
|
|
|
|
|
@ignore_exceptions # do not kill main_taskgroup |
|
|
@ -209,6 +210,7 @@ class Peer(PrintError): |
|
|
|
async with aiorpcx.TaskGroup() as group: |
|
|
|
await group.spawn(self._gossip_loop()) |
|
|
|
await group.spawn(self._message_loop()) |
|
|
|
# kill group if the peer times out |
|
|
|
await group.spawn(asyncio.wait_for(self.initialized.wait(), 10)) |
|
|
|
|
|
|
|
@log_exceptions |
|
|
@ -235,16 +237,17 @@ class Peer(PrintError): |
|
|
|
if self.chan_upds: |
|
|
|
self.channel_db.on_channel_update(self.chan_upds) |
|
|
|
self.chan_upds = [] |
|
|
|
need_to_get = sorted(self.channel_db.missing_short_chan_ids()) |
|
|
|
if need_to_get and not self.receiving_channels: |
|
|
|
self.print_error('missing', len(need_to_get), 'channels') |
|
|
|
zlibencoded = zlib.compress(bfh(''.join(need_to_get[0:100]))) |
|
|
|
self.send_message( |
|
|
|
'query_short_channel_ids', |
|
|
|
chain_hash=constants.net.rev_genesis_bytes(), |
|
|
|
len=1+len(zlibencoded), |
|
|
|
encoded_short_ids=b'\x01' + zlibencoded) |
|
|
|
self.receiving_channels = True |
|
|
|
# todo: enable when db is fixed |
|
|
|
#need_to_get = sorted(self.channel_db.missing_short_chan_ids()) |
|
|
|
#if need_to_get and not self.receiving_channels: |
|
|
|
# self.print_error('missing', len(need_to_get), 'channels') |
|
|
|
# zlibencoded = zlib.compress(bfh(''.join(need_to_get[0:100]))) |
|
|
|
# self.send_message( |
|
|
|
# 'query_short_channel_ids', |
|
|
|
# chain_hash=constants.net.rev_genesis_bytes(), |
|
|
|
# len=1+len(zlibencoded), |
|
|
|
# encoded_short_ids=b'\x01' + zlibencoded) |
|
|
|
# self.receiving_channels = True |
|
|
|
|
|
|
|
async def _message_loop(self): |
|
|
|
try: |
|
|
@ -270,6 +273,7 @@ class Peer(PrintError): |
|
|
|
if chan.get_state() != 'FORCE_CLOSING': |
|
|
|
chan.set_state('DISCONNECTED') |
|
|
|
self.network.trigger_callback('channel', chan) |
|
|
|
self.lnworker.peers.pop(self.pubkey) |
|
|
|
|
|
|
|
def make_local_config(self, funding_sat: int, push_msat: int, initiator: HTLCOwner) -> LocalConfig: |
|
|
|
# key derivation |
|
|
@ -337,7 +341,7 @@ class Peer(PrintError): |
|
|
|
channel_reserve_satoshis=local_config.reserve_sat, |
|
|
|
htlc_minimum_msat=1, |
|
|
|
) |
|
|
|
payload = await asyncio.wait_for(self.channel_accepted[temp_channel_id].get(), 20) |
|
|
|
payload = await asyncio.wait_for(self.channel_accepted[temp_channel_id].get(), 5) |
|
|
|
if payload.get('error'): |
|
|
|
raise Exception('Remote Lightning peer reported error: ' + repr(payload.get('error'))) |
|
|
|
remote_per_commitment_point = payload['first_per_commitment_point'] |
|
|
@ -412,12 +416,12 @@ class Peer(PrintError): |
|
|
|
funding_txid=funding_txid_bytes, |
|
|
|
funding_output_index=funding_index, |
|
|
|
signature=sig_64) |
|
|
|
payload = await asyncio.wait_for(self.funding_signed[channel_id].get(), 1) |
|
|
|
payload = await asyncio.wait_for(self.funding_signed[channel_id].get(), 5) |
|
|
|
self.print_error('received funding_signed') |
|
|
|
remote_sig = payload['signature'] |
|
|
|
chan.receive_new_commitment(remote_sig, []) |
|
|
|
# broadcast funding tx |
|
|
|
await asyncio.wait_for(self.network.broadcast_transaction(funding_tx), 1) |
|
|
|
await asyncio.wait_for(self.network.broadcast_transaction(funding_tx), 5) |
|
|
|
chan.open_with_first_pcp(remote_per_commitment_point, remote_sig) |
|
|
|
chan.set_remote_commitment() |
|
|
|
chan.set_local_commitment(chan.current_commitment(LOCAL)) |
|
|
@ -530,10 +534,10 @@ class Peer(PrintError): |
|
|
|
|
|
|
|
def on_channel_reestablish(self, payload): |
|
|
|
chan_id = payload["channel_id"] |
|
|
|
self.print_error("Received channel_reestablish", bh2u(chan_id), payload) |
|
|
|
chan = self.channels.get(chan_id) |
|
|
|
if not chan: |
|
|
|
raise Exception('Unknown channel_reestablish ' + bh2u(chan_id)) |
|
|
|
self.print_error("Received unknown channel_reestablish", bh2u(chan_id), payload) |
|
|
|
raise Exception('Unknown channel_reestablish') |
|
|
|
self.channel_reestablished[chan_id].set_result(payload) |
|
|
|
|
|
|
|
def try_to_get_remote_to_force_close_with_their_latest(chan_id): |
|
|
@ -555,24 +559,31 @@ class Peer(PrintError): |
|
|
|
return |
|
|
|
chan.set_state('REESTABLISHING') |
|
|
|
self.network.trigger_callback('channel', chan) |
|
|
|
# compute data_loss_protect fields |
|
|
|
current_remote_ctn = chan.config[REMOTE].ctn |
|
|
|
if current_remote_ctn == 0: |
|
|
|
last_rev_secret = 0 |
|
|
|
else: |
|
|
|
revocation_store = chan.config[REMOTE].revocation_store |
|
|
|
last_rev_index = current_remote_ctn - 1 |
|
|
|
last_rev_secret = revocation_store.retrieve_secret(RevocationStore.START_INDEX - last_rev_index) |
|
|
|
last_secret, last_point = chan.local_points(offset=-1) |
|
|
|
# send message |
|
|
|
self.send_message( |
|
|
|
"channel_reestablish", |
|
|
|
channel_id=chan_id, |
|
|
|
next_local_commitment_number=chan.config[LOCAL].ctn+1, |
|
|
|
next_remote_revocation_number=current_remote_ctn, |
|
|
|
your_last_per_commitment_secret=last_rev_secret, |
|
|
|
my_current_per_commitment_point=last_point |
|
|
|
) |
|
|
|
if self.their_localfeatures & LnLocalFeatures.OPTION_DATA_LOSS_PROTECT_REQ: |
|
|
|
self.print_error('peer requires data loss protect') |
|
|
|
if current_remote_ctn == 0: |
|
|
|
last_rev_secret = 0 |
|
|
|
else: |
|
|
|
revocation_store = chan.config[REMOTE].revocation_store |
|
|
|
last_rev_index = current_remote_ctn - 1 |
|
|
|
last_rev_secret = revocation_store.retrieve_secret(RevocationStore.START_INDEX - last_rev_index) |
|
|
|
last_secret, last_point = chan.local_points(offset=0) |
|
|
|
self.send_message( |
|
|
|
"channel_reestablish", |
|
|
|
channel_id=chan_id, |
|
|
|
next_local_commitment_number=chan.config[LOCAL].ctn+1, |
|
|
|
next_remote_revocation_number=current_remote_ctn, |
|
|
|
your_last_per_commitment_secret=last_rev_secret, |
|
|
|
my_current_per_commitment_point=last_point) |
|
|
|
else: |
|
|
|
self.send_message( |
|
|
|
"channel_reestablish", |
|
|
|
channel_id=chan_id, |
|
|
|
next_local_commitment_number=chan.config[LOCAL].ctn+1, |
|
|
|
next_remote_revocation_number=current_remote_ctn) |
|
|
|
|
|
|
|
channel_reestablish_msg = await self.channel_reestablished[chan_id] |
|
|
|
chan.set_state('OPENING') |
|
|
|
# compare remote ctns |
|
|
|