Browse Source

daemon: reconnect support.

To do this we keep an order counter so we know how to retransmit.  We
could simply keep old packets, but this is a little clearer for now.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
ppa-0.6.1
Rusty Russell 9 years ago
parent
commit
3866d7605c
  1. 10
      Makefile
  2. 21
      daemon/htlc.c
  3. 3
      daemon/htlc.h
  4. 1
      daemon/jsonrpc.c
  5. 1
      daemon/jsonrpc.h
  6. 3
      daemon/lightningd.c
  7. 39
      daemon/packets.c
  8. 6
      daemon/packets.h
  9. 598
      daemon/peer.c
  10. 16
      daemon/peer.h
  11. 30
      daemon/test/test.sh
  12. 1
      state.c

10
Makefile

@ -198,8 +198,14 @@ daemon-test-timeout-anchor: daemon-test-different-fee-rates
daemon-test-different-fee-rates: daemon-test-normal
daemon-test-normal: daemon-test-manual-commit
daemon-test-manual-commit: daemon-test-mutual-close-with-htlcs
daemon-test-mutual-close-with-htlcs: daemon-all
daemon-test-mutual-close-with-htlcs: daemon-test-steal\ --reconnect
daemon-test-steal\ --reconnect: daemon-test-dump-onchain\ --reconnect
daemon-test-dump-onchain\ --reconnect: daemon-test-timeout-anchor\ --reconnect
daemon-test-timeout-anchor\ --reconnect: daemon-test-different-fee-rates\ --reconnect
daemon-test-different-fee-rates\ --reconnect: daemon-test-normal\ --reconnect
daemon-test-normal\ --reconnect: daemon-test-manual-commit\ --reconnect
daemon-test-manual-commit\ --reconnect: daemon-test-mutual-close-with-htlcs\ --reconnect
daemon-test-mutual-close-with-htlcs\ --reconnect: daemon-all
daemon-tests: daemon-test-steal
test-onion: test/test_onion test/onion_key

21
daemon/htlc.c

@ -157,3 +157,24 @@ void htlc_changestate(struct htlc *h,
h->state = newstate;
}
void htlc_undostate(struct htlc *h,
enum htlc_state oldstate,
enum htlc_state newstate)
{
log_debug(h->peer->log, "htlc %"PRIu64": %s->%s", h->id,
htlc_state_name(h->state), htlc_state_name(newstate));
assert(h->state == oldstate);
/* You can only return to previous state. */
assert(newstate == h->state - 1);
/* And must only be proposal, not commit. */
assert(h->state == SENT_REMOVE_HTLC || h->state == RCVD_REMOVE_HTLC);
/* You can't change sides. */
assert((htlc_state_flags(h->state)&(HTLC_LOCAL_F_OWNER|HTLC_REMOTE_F_OWNER))
== (htlc_state_flags(newstate)&(HTLC_LOCAL_F_OWNER|HTLC_REMOTE_F_OWNER)));
h->state = newstate;
}

3
daemon/htlc.h

@ -105,6 +105,9 @@ static inline enum channel_side htlc_channel_side(const struct htlc *h)
return THEIRS;
}
void htlc_undostate(struct htlc *h,
enum htlc_state oldstate, enum htlc_state newstate);
/* htlc_map: ID -> htlc mapping. */
static inline u64 htlc_key(const struct htlc *h)
{

1
daemon/jsonrpc.c

@ -263,6 +263,7 @@ static const struct json_command *cmdlist[] = {
&mocktime_command,
&crash_command,
&disconnect_command,
&reconnect_command,
&signcommit_command,
&output_command,
&add_route_command,

1
daemon/jsonrpc.h

@ -66,6 +66,7 @@ extern const struct json_command commit_command;
extern const struct json_command mocktime_command;
extern const struct json_command close_command;
extern const struct json_command newaddr_command;
extern const struct json_command reconnect_command;
extern const struct json_command disconnect_command;
extern const struct json_command signcommit_command;
extern const struct json_command output_command;

3
daemon/lightningd.c

@ -344,6 +344,9 @@ int main(int argc, char *argv[])
log_info(dstate->base_log, "Hello world!");
/* If we loaded peers from database, reconnect now. */
reconnect_peers(dstate);
for (;;) {
struct timer *expired;
void *v = io_loop(&dstate->timers, &expired);

39
daemon/packets.c

@ -60,7 +60,8 @@ static void queue_raw_pkt(struct peer *peer, Pkt *pkt)
tal_resize(&peer->outpkt, n+1);
peer->outpkt[n] = pkt;
log_debug(peer->log, "Queued pkt %s", pkt_name(pkt->pkt_case));
log_debug(peer->log, "Queued pkt %s (order=%"PRIu64")",
pkt_name(pkt->pkt_case), peer->order_counter);
/* In case it was waiting for output. */
io_wake(peer);
@ -178,14 +179,13 @@ void queue_pkt_htlc_fail(struct peer *peer, struct htlc *htlc)
}
/* OK, we're sending a signature for their pending changes. */
void queue_pkt_commit(struct peer *peer)
void queue_pkt_commit(struct peer *peer, const struct bitcoin_signature *sig)
{
UpdateCommit *u = tal(peer, UpdateCommit);
/* Now send message */
update_commit__init(u);
u->sig = signature_to_proto(u, peer->dstate->secpctx,
&peer->remote.commit->sig->sig);
u->sig = signature_to_proto(u, peer->dstate->secpctx, &sig->sig);
queue_pkt(peer, PKT__PKT_UPDATE_COMMIT, u);
}
@ -202,9 +202,7 @@ void queue_pkt_revocation(struct peer *peer,
update_revocation__init(u);
u->revocation_preimage = sha256_to_proto(u, preimage);
u->next_revocation_hash
= sha256_to_proto(u, &peer->local.next_revocation_hash);
u->next_revocation_hash = sha256_to_proto(u, next_hash);
queue_pkt(peer, PKT__PKT_UPDATE_REVOCATION, u);
}
@ -222,6 +220,14 @@ Pkt *pkt_err(struct peer *peer, const char *msg, ...)
return make_pkt(peer, PKT__PKT_ERROR, e);
}
Pkt *pkt_reconnect(struct peer *peer, u64 ack)
{
Reconnect *r = tal(peer, Reconnect);
reconnect__init(r);
r->ack = ack;
return make_pkt(peer, PKT__PKT_RECONNECT, r);
}
void queue_pkt_err(struct peer *peer, Pkt *err)
{
queue_raw_pkt(peer, err);
@ -440,7 +446,8 @@ Pkt *accept_pkt_htlc_fail(struct peer *peer, const Pkt *pkt, struct htlc **h)
return NULL;
}
Pkt *accept_pkt_htlc_fulfill(struct peer *peer, const Pkt *pkt, struct htlc **h)
Pkt *accept_pkt_htlc_fulfill(struct peer *peer, const Pkt *pkt, struct htlc **h,
bool *was_already_fulfilled)
{
const UpdateFulfillHtlc *f = pkt->update_fulfill_htlc;
struct sha256 rhash;
@ -458,8 +465,12 @@ Pkt *accept_pkt_htlc_fulfill(struct peer *peer, const Pkt *pkt, struct htlc **h)
if (!structeq(&rhash, &(*h)->rhash))
return pkt_err(peer, "Invalid r for %"PRIu64, f->id);
assert(!(*h)->r);
(*h)->r = tal_dup(*h, struct rval, &r);
if ((*h)->r) {
*was_already_fulfilled = true;
} else {
*was_already_fulfilled = false;
(*h)->r = tal_dup(*h, struct rval, &r);
}
return NULL;
}
@ -489,8 +500,11 @@ Pkt *accept_pkt_revocation(struct peer *peer, const Pkt *pkt)
* transaction, and MUST fail if it does not.
*/
sha256(&h, &preimage, sizeof(preimage));
if (!structeq(&h, peer->their_prev_revocation_hash))
if (!structeq(&h, peer->their_prev_revocation_hash)) {
log_unusual(peer->log, "Incorrect preimage for %"PRIu64,
peer->remote.commit->commit_num - 1);
return pkt_err(peer, "complete preimage incorrect");
}
// save revocation preimages in shachain
if (!shachain_add_hash(&peer->their_preimages,
@ -499,6 +513,9 @@ Pkt *accept_pkt_revocation(struct peer *peer, const Pkt *pkt)
&preimage))
return pkt_err(peer, "preimage not next in shachain");
log_debug(peer->log, "Got revocation preimage %"PRIu64,
peer->remote.commit->commit_num - 1);
/* Clear the previous revocation hash. */
peer->their_prev_revocation_hash
= tal_free(peer->their_prev_revocation_hash);

6
daemon/packets.h

@ -17,7 +17,7 @@ void queue_pkt_open_complete(struct peer *peer);
void queue_pkt_htlc_add(struct peer *peer, struct htlc *htlc);
void queue_pkt_htlc_fulfill(struct peer *peer, struct htlc *htlc);
void queue_pkt_htlc_fail(struct peer *peer, struct htlc *htlc);
void queue_pkt_commit(struct peer *peer);
void queue_pkt_commit(struct peer *peer, const struct bitcoin_signature *sig);
void queue_pkt_revocation(struct peer *peer,
const struct sha256 *preimage,
const struct sha256 *next_hash);
@ -25,6 +25,7 @@ void queue_pkt_close_shutdown(struct peer *peer);
void queue_pkt_close_signature(struct peer *peer);
Pkt *pkt_err(struct peer *peer, const char *msg, ...);
Pkt *pkt_reconnect(struct peer *peer, u64 ack);
void queue_pkt_err(struct peer *peer, Pkt *err);
Pkt *pkt_err_unexpected(struct peer *peer, const Pkt *pkt);
@ -44,7 +45,8 @@ Pkt *accept_pkt_htlc_add(struct peer *peer, const Pkt *pkt, struct htlc **h);
Pkt *accept_pkt_htlc_fail(struct peer *peer, const Pkt *pkt, struct htlc **h);
Pkt *accept_pkt_htlc_fulfill(struct peer *peer, const Pkt *pkt, struct htlc **h);
Pkt *accept_pkt_htlc_fulfill(struct peer *peer, const Pkt *pkt, struct htlc **h,
bool *was_already_fulfilled);
Pkt *accept_pkt_update_accept(struct peer *peer, const Pkt *pkt);

598
daemon/peer.c

@ -42,6 +42,7 @@
#include <ccan/tal/tal.h>
#include <errno.h>
#include <inttypes.h>
#include <netdb.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <sys/socket.h>
@ -371,13 +372,17 @@ static void route_htlc_onwards(struct peer *peer,
struct htlc *htlc,
u64 msatoshis,
const BitcoinPubkey *pb_id,
const u8 *rest_of_route)
const u8 *rest_of_route,
const struct peer *only_dest)
{
struct pubkey id;
struct peer *next;
log_debug_struct(peer->log, "Forwarding HTLC %s", struct sha256, &htlc->rhash);
log_add(peer->log, " (id %"PRIu64")", htlc->id);
if (!only_dest) {
log_debug_struct(peer->log, "Forwarding HTLC %s",
struct sha256, &htlc->rhash);
log_add(peer->log, " (id %"PRIu64")", htlc->id);
}
if (!proto_to_pubkey(peer->dstate->secpctx, pb_id, &id)) {
log_unusual(peer->log,
@ -396,6 +401,9 @@ static void route_htlc_onwards(struct peer *peer,
return;
}
if (only_dest && next != only_dest)
return;
/* Offered fee must be sufficient. */
if (htlc->msatoshis - msatoshis < connection_fee(next->nc, msatoshis)) {
log_unusual(peer->log,
@ -420,7 +428,8 @@ static void route_htlc_onwards(struct peer *peer,
}
}
static void their_htlc_added(struct peer *peer, struct htlc *htlc)
static void their_htlc_added(struct peer *peer, struct htlc *htlc,
struct peer *only_dest)
{
RouteStep *step;
const u8 *rest_of_route;
@ -461,6 +470,8 @@ static void their_htlc_added(struct peer *peer, struct htlc *htlc)
switch (step->next_case) {
case ROUTE_STEP__NEXT_END:
if (only_dest)
return;
payment = find_payment(peer->dstate, &htlc->rhash);
if (!payment) {
log_unusual(peer->log, "No payment for HTLC %"PRIu64,
@ -492,7 +503,7 @@ static void their_htlc_added(struct peer *peer, struct htlc *htlc)
case ROUTE_STEP__NEXT_BITCOIN:
route_htlc_onwards(peer, htlc, step->amount, step->bitcoin,
rest_of_route);
rest_of_route, only_dest);
goto free_rest;
default:
log_info(peer->log, "Unknown step type %u", step->next_case);
@ -524,6 +535,28 @@ static void our_htlc_fulfilled(struct peer *peer, struct htlc *htlc,
}
}
/* peer has come back online: re-send any we have to send to them. */
static void retry_all_routing(struct peer *restarted_peer)
{
struct peer *peer;
list_for_each(&restarted_peer->dstate->peers, peer, list) {
struct htlc_map_iter it;
struct htlc *h;
if (peer == restarted_peer)
continue;
for (h = htlc_map_first(&peer->htlcs, &it);
h;
h = htlc_map_next(&peer->htlcs, &it)) {
if (h->state != RCVD_ADD_ACK_REVOCATION)
continue;
their_htlc_added(peer, h, restarted_peer);
}
}
}
static void adjust_cstate_side(struct channel_state *cstate,
struct htlc *h,
enum htlc_state old, enum htlc_state new,
@ -582,7 +615,7 @@ static void check_both_committed(struct peer *peer, struct htlc *h)
our_htlc_failed(peer, h);
break;
case RCVD_ADD_ACK_REVOCATION:
their_htlc_added(peer, h);
their_htlc_added(peer, h, NULL);
break;
default:
break;
@ -686,6 +719,7 @@ static bool closing_pkt_in(struct peer *peer, const Pkt *pkt)
peer->closing.their_sig = tal_dup(peer,
struct bitcoin_signature, &theirsig);
peer->closing.their_fee = c->close_fee;
peer->closing.sigs_in++;
if (peer->closing.our_fee != peer->closing.their_fee) {
/* BOLT #2:
@ -709,6 +743,7 @@ static bool closing_pkt_in(struct peer *peer, const Pkt *pkt)
log_info(peer->log, "accept_pkt_close_sig: we change to %"PRIu64,
peer->closing.our_fee);
peer->closing.closing_order = peer->order_counter++;
queue_pkt_close_signature(peer);
}
@ -814,6 +849,7 @@ static Pkt *handle_pkt_commit(struct peer *peer, const Pkt *pkt)
if (peer_uncommitted_changes(peer))
remote_changes_pending(peer);
peer->local.commit->order = peer->order_counter++;
queue_pkt_revocation(peer, &preimage, &peer->local.next_revocation_hash);
return NULL;
}
@ -868,13 +904,16 @@ static Pkt *handle_pkt_htlc_fulfill(struct peer *peer, const Pkt *pkt)
{
struct htlc *htlc;
Pkt *err;
bool was_already_fulfilled;
err = accept_pkt_htlc_fulfill(peer, pkt, &htlc);
/* Reconnect may mean HTLC was already fulfilled. That's OK. */
err = accept_pkt_htlc_fulfill(peer, pkt, &htlc, &was_already_fulfilled);
if (err)
return err;
/* We can relay this upstream immediately. */
our_htlc_fulfilled(peer, htlc, htlc->r);
if (!was_already_fulfilled)
our_htlc_fulfilled(peer, htlc, htlc->r);
/* BOLT #2:
*
@ -1014,6 +1053,7 @@ static bool shutdown_pkt_in(struct peer *peer, const Pkt *pkt)
if (!committed_to_htlcs(peer)) {
set_peer_state(peer, STATE_MUTUAL_CLOSING, __func__);
peer_calculate_close_fee(peer);
peer->closing.closing_order = peer->order_counter++;
queue_pkt_close_signature(peer);
}
@ -1038,6 +1078,7 @@ static void peer_start_shutdown(struct peer *peer)
* A node SHOULD send a `close_shutdown` (if it has
* not already) after receiving `close_shutdown`.
*/
peer->closing.shutdown_order = peer->order_counter++;
queue_pkt_close_shutdown(peer);
}
@ -1046,6 +1087,7 @@ static void peer_start_shutdown(struct peer *peer)
&& !committed_to_htlcs(peer)) {
set_peer_state(peer, STATE_MUTUAL_CLOSING, __func__);
peer_calculate_close_fee(peer);
peer->closing.closing_order = peer->order_counter++;
queue_pkt_close_signature(peer);
}
}
@ -1393,9 +1435,18 @@ static struct io_plan *pkt_out(struct io_conn *conn, struct peer *peer)
out = peer->outpkt[0];
memmove(peer->outpkt, peer->outpkt + 1, (sizeof(*peer->outpkt)*(n-1)));
tal_resize(&peer->outpkt, n-1);
log_debug(peer->log, "pkt_out: writing %s", pkt_name(out->pkt_case));
return peer_write_packet(conn, peer, out, pkt_out);
}
static void clear_output_queue(struct peer *peer)
{
size_t i, n = tal_count(peer->outpkt);
for (i = 0; i < n; i++)
tal_free(peer->outpkt[i]);
tal_resize(&peer->outpkt, 0);
}
static struct io_plan *pkt_in(struct io_conn *conn, struct peer *peer)
{
bool keep_going;
@ -1421,6 +1472,254 @@ static struct io_plan *pkt_in(struct io_conn *conn, struct peer *peer)
return peer_close(conn, peer);
}
/*
* This only works because we send one update at a time, and they can't
* ask for it again if they've already sent the `update_revocation` acking it.
*/
static void retransmit_updates(struct peer *peer)
{
struct htlc_map_iter it;
struct htlc *h;
/* BOLT #2:
*
* A node MAY simply retransmit messages which are identical to the
* previous transmission. */
for (h = htlc_map_first(&peer->htlcs, &it);
h;
h = htlc_map_next(&peer->htlcs, &it)) {
switch (h->state) {
case SENT_ADD_COMMIT:
log_debug(peer->log, "Retransmitting add HTLC %"PRIu64,
h->id);
queue_pkt_htlc_add(peer, h);
break;
case SENT_REMOVE_COMMIT:
log_debug(peer->log, "Retransmitting %s HTLC %"PRIu64,
h->r ? "fulfill" : "fail", h->id);
if (h->r)
queue_pkt_htlc_fulfill(peer, h);
else
queue_pkt_htlc_fail(peer, h);
break;
default:
break;
}
}
}
/* FIXME: Maybe it would be neater to remember all pay commands, and simply
* re-run them after reconnect if they didn't get committed. */
static void resend_local_requests(struct peer *peer)
{
struct htlc_map_iter it;
struct htlc *h;
for (h = htlc_map_first(&peer->htlcs, &it);
h;
h = htlc_map_next(&peer->htlcs, &it)) {
switch (h->state) {
case SENT_ADD_HTLC:
/* We removed everything which was routed. */
assert(!h->src);
log_debug(peer->log, "Re-sending local add HTLC %"PRIu64,
h->id);
queue_pkt_htlc_add(peer, h);
remote_changes_pending(peer);
break;
case SENT_REMOVE_HTLC:
/* We removed everything which was routed. */
assert(!h->src);
log_debug(peer->log, "Re-sending local %s HTLC %"PRIu64,
h->r ? "fulfill" : "fail", h->id);
if (h->r)
queue_pkt_htlc_fulfill(peer, h);
else
queue_pkt_htlc_fail(peer, h);
remote_changes_pending(peer);
break;
default:
break;
}
}
}
/* BOLT #2:
*
* On disconnection, a node MUST reverse any uncommitted changes sent by the
* other side (ie. `update_add_htlc`, `update_fee`, `update_fail_htlc` and
* `update_fulfill_htlc` for which no `update_commit` has been received). A
* node SHOULD retain the `r` value from the `update_fulfill_htlc`, however.
*/
static void forget_uncommitted_changes(struct peer *peer)
{
struct htlc *h;
struct htlc_map_iter it;
bool retry;
if (!peer->remote.commit || !peer->remote.commit->cstate)
return;
log_debug(peer->log, "Forgetting uncommitted");
log_debug(peer->log, "LOCAL: changing from (us) %u/%u and (them) %u/%u to %u/%u and %u/%u",
peer->local.staging_cstate->side[OURS].pay_msat,
peer->local.staging_cstate->side[OURS].fee_msat,
peer->local.staging_cstate->side[THEIRS].pay_msat,
peer->local.staging_cstate->side[THEIRS].fee_msat,
peer->local.commit->cstate->side[OURS].pay_msat,
peer->local.commit->cstate->side[OURS].fee_msat,
peer->local.commit->cstate->side[THEIRS].pay_msat,
peer->local.commit->cstate->side[THEIRS].fee_msat);
log_debug(peer->log, "REMOTE: changing from (us) %u/%u and (them) %u/%u to %u/%u and %u/%u",
peer->remote.staging_cstate->side[OURS].pay_msat,
peer->remote.staging_cstate->side[OURS].fee_msat,
peer->remote.staging_cstate->side[THEIRS].pay_msat,
peer->remote.staging_cstate->side[THEIRS].fee_msat,
peer->remote.commit->cstate->side[OURS].pay_msat,
peer->remote.commit->cstate->side[OURS].fee_msat,
peer->remote.commit->cstate->side[THEIRS].pay_msat,
peer->remote.commit->cstate->side[THEIRS].fee_msat);
tal_free(peer->local.staging_cstate);
tal_free(peer->remote.staging_cstate);
peer->local.staging_cstate
= copy_cstate(peer, peer->local.commit->cstate);
peer->remote.staging_cstate
= copy_cstate(peer, peer->remote.commit->cstate);
/* We forget everything we're routing, and re-send. This
* works for the reload-from-database case as well as the
* normal reconnect. */
again:
retry = false;
for (h = htlc_map_first(&peer->htlcs, &it);
h;
h = htlc_map_next(&peer->htlcs, &it)) {
switch (h->state) {
case SENT_ADD_HTLC:
/* FIXME: re-submit these after connect, instead? */
/* Keep local adds. */
if (!h->src) {
if (!cstate_add_htlc(peer->remote.staging_cstate, h))
fatal("Could not add HTLC?");
break;
}
case RCVD_ADD_HTLC:
log_debug(peer->log, "Forgetting %s %"PRIu64,
htlc_state_name(h->state), h->id);
/* May miss some due to delete reorg. */
tal_free(h);
retry = true;
break;
case RCVD_REMOVE_HTLC:
log_debug(peer->log, "Undoing %s %"PRIu64,
htlc_state_name(h->state), h->id);
htlc_undostate(h, RCVD_REMOVE_HTLC,
SENT_ADD_ACK_REVOCATION);
break;
case SENT_REMOVE_HTLC:
/* Keep local removes. */
/* FIXME: re-submit these after connect, instead? */
if (!h->src) {
if (h->r) {
cstate_fulfill_htlc(peer->remote.staging_cstate,
h);
} else {
cstate_fail_htlc(peer->remote.staging_cstate, h);
}
break;
}
log_debug(peer->log, "Undoing %s %"PRIu64,
htlc_state_name(h->state), h->id);
htlc_undostate(h, SENT_REMOVE_HTLC,
RCVD_ADD_ACK_REVOCATION);
break;
default:
break;
}
}
if (retry)
goto again;
}
static void retransmit_pkts(struct peer *peer, u64 ack)
{
log_debug(peer->log, "Our order counter is %"PRIu64", their ack %"PRIu64,
peer->order_counter, ack);
if (ack > peer->order_counter) {
log_unusual(peer->log, "reconnect ack %"PRIu64" > %"PRIu64,
ack, peer->order_counter);
peer_comms_err(peer, pkt_err(peer, "invalid ack"));
return;
}
log_debug(peer->log, "They acked %"PRIu64", remote=%"PRIu64" local=%"PRIu64,
ack, peer->remote.commit ? peer->remote.commit->order : 0,
peer->local.commit ? peer->local.commit->order : 0);
/* BOLT #2:
*
* A node MAY assume that only one of each type of message need be
* retransmitted. A node SHOULD retransmit the last of each message
* type which was not counted by the `ack` field.
*/
while (ack < peer->order_counter) {
if (peer->remote.commit && ack == peer->remote.commit->order) {
if (!peer->remote.commit->sig) {
log_broken(peer->log, "No sig for commit order %"
PRIu64, ack);
peer_comms_err(peer,
pkt_err(peer, "invalid ack"));
return;
}
/* BOLT #2:
*
* Before retransmitting `update_commit`, the node
* MUST send appropriate `update_add_htlc`,
* `update_fee`, `update_fail_htlc` or
* `update_fulfill_htlc` messages (the other node will
* have forgotten them, as required above).
*/
retransmit_updates(peer);
queue_pkt_commit(peer, peer->remote.commit->sig);
} else if (peer->local.commit
&& ack == peer->local.commit->order) {
/* Re-transmit revocation. */
struct sha256 preimage, next;
u64 commit_num = peer->local.commit->commit_num - 1;
/* Make sure we don't revoke current commit! */
assert(commit_num < peer->local.commit->commit_num);
peer_get_revocation_preimage(peer, commit_num,&preimage);
peer_get_revocation_hash(peer, commit_num + 2, &next);
log_debug(peer->log, "Re-sending revocation %"PRIu64,
commit_num);
queue_pkt_revocation(peer, &preimage, &next);
} else if (ack == peer->closing.shutdown_order) {
log_debug(peer->log, "Re-sending shutdown");
queue_pkt_close_shutdown(peer);
} else if (ack == peer->closing.closing_order) {
log_debug(peer->log, "Re-sending closing order");
queue_pkt_close_signature(peer);
} else {
log_broken(peer->log, "Can't rexmit %"PRIu64
" when local commit %"PRIu64" and remote %"PRIu64,
ack,
peer->local.commit ? peer->local.commit->order : (u64)-2ULL,
peer->remote.commit ? peer->remote.commit->order : (u64)-2ULL);
peer_comms_err(peer, pkt_err(peer, "invalid ack"));
return;
}
ack++;
}
resend_local_requests(peer);
/* We might need to re-propose HTLCs which were from other peers. */
retry_all_routing(peer);
}
/* Crypto is on, we are live. */
static struct io_plan *peer_crypto_on(struct io_conn *conn, struct peer *peer)
{
@ -1432,6 +1731,8 @@ static struct io_plan *peer_crypto_on(struct io_conn *conn, struct peer *peer)
state_event(peer, peer->local.offer_anchor, NULL);
assert(!peer->connected);
peer->connected = true;
return io_duplex(conn,
peer_read_packet(conn, peer, pkt_in),
pkt_out(conn, peer));
@ -1444,12 +1745,15 @@ static void destroy_peer(struct peer *peer)
list_del_from(&peer->dstate->peers, &peer->list);
}
static void try_reconnect(struct peer *peer);
static void peer_disconnect(struct io_conn *conn, struct peer *peer)
{
log_info(peer->log, "Disconnected");
/* No longer connected. */
peer->conn = NULL;
peer->connected = false;
/* Not even set up yet? Simply free.*/
if (peer->state == STATE_INIT) {
@ -1464,10 +1768,13 @@ static void peer_disconnect(struct io_conn *conn, struct peer *peer)
}
/* This is an unexpected close. */
if (!state_is_onchain(peer->state) && !state_is_error(peer->state)) {
/* FIXME: Try to reconnect. */
set_peer_state(peer, STATE_ERR_BREAKDOWN, "peer_disconnect");
peer_breakdown(peer);
if (state_can_io(peer->state)) {
clear_output_queue(peer);
forget_uncommitted_changes(peer);
/* FIXME: We could try connecting back to them even if
* they initiated the original? */
if (peer->we_connected)
try_reconnect(peer);
}
}
@ -1489,7 +1796,8 @@ static void do_commit(struct peer *peer, struct command *jsoncmd)
return;
}
log_debug(peer->log, "do_commit: sending commit command");
log_debug(peer->log, "do_commit: sending commit command %"PRIu64,
peer->remote.commit->commit_num + 1);
assert(state_can_commit(peer->state));
assert(!peer->commit_jsoncmd);
@ -1540,7 +1848,8 @@ static void do_commit(struct peer *peer, struct command *jsoncmd)
peer_add_their_commit(peer, &ci->txid, ci->commit_num);
queue_pkt_commit(peer);
peer->remote.commit->order = peer->order_counter++;
queue_pkt_commit(peer, ci->sig);
if (peer->state == STATE_SHUTDOWN) {
set_peer_state(peer, STATE_SHUTDOWN_COMMITTING, __func__);
} else {
@ -1553,11 +1862,11 @@ static void try_commit(struct peer *peer)
{
peer->commit_timer = NULL;
if (state_can_commit(peer->state))
if (state_can_commit(peer->state) && peer->connected)
do_commit(peer, NULL);
else {
/* FIXME: try again when we receive revocation, rather
* than using timer! */
/* FIXME: try again when we receive revocation /
* reconnect, rather than using timer! */
log_debug(peer->log, "try_commit: state=%s, re-queueing timer",
state_name(peer->state));
@ -1572,14 +1881,66 @@ struct commit_info *new_commit_info(const tal_t *ctx, u64 commit_num)
ci->tx = NULL;
ci->cstate = NULL;
ci->sig = NULL;
ci->order = -1ULL;
return ci;
}
static bool peer_getaddr(struct peer *peer,
int fd, int addr_type, int addr_protocol)
{
peer->addr.type = addr_type;
peer->addr.protocol = addr_protocol;
peer->addr.addrlen = sizeof(peer->addr.saddr);
if (getpeername(fd, &peer->addr.saddr.s, &peer->addr.addrlen) != 0) {
log_broken(peer->dstate->base_log,
"Could not get address for peer: %s",
strerror(errno));
return false;
}
return true;
}
static bool peer_reconnected(struct peer *peer,
struct io_conn *conn,
int addr_type, int addr_protocol,
struct io_data *iod,
const struct pubkey *id)
{
char *prefix;
assert(structeq(peer->id, id));
peer->io_data = tal_steal(peer, iod);
/* FIXME: Attach IO logging for this peer. */
if (!peer_getaddr(peer, io_conn_fd(conn), addr_type, addr_protocol))
return false;
/* If we free peer, conn should be closed, but can't be freed
* immediately so don't make peer a parent. */
peer->conn = conn;
io_set_finish(conn, peer_disconnect, peer);
prefix = tal_fmt(peer, "%s%s:%s:",
log_prefix(peer->dstate->base_log),
peer->we_connected ? "out" : "in",
netaddr_name(peer, &peer->addr));
if (peer->log) {
log_info(peer->log, "Reconnected as %s", prefix);
set_log_prefix(peer->log, prefix);
} else {
peer->log = new_log(peer, peer->dstate->log_record,
"%s", prefix);
}
tal_free(prefix);
return true;
}
static struct peer *new_peer(struct lightningd_state *dstate,
struct io_conn *conn,
int addr_type, int addr_protocol,
enum state_input offer_anchor,
const char *in_or_out)
bool we_connected)
{
struct peer *peer = tal(dstate, struct peer);
@ -1590,6 +1951,8 @@ static struct peer *new_peer(struct lightningd_state *dstate,
list_add(&dstate->peers, &peer->list);
peer->state = STATE_INIT;
peer->we_connected = we_connected;
peer->connected = false;
peer->id = NULL;
peer->dstate = dstate;
peer->addr.type = addr_type;
@ -1603,11 +1966,15 @@ static struct peer *new_peer(struct lightningd_state *dstate,
list_head_init(&peer->pay_commands);
list_head_init(&peer->their_commits);
peer->anchor.ok_depth = -1;
peer->order_counter = 0;
peer->their_commitsigs = 0;
peer->cur_commit.watch = NULL;
peer->closing.their_sig = NULL;
peer->closing.our_script = NULL;
peer->closing.their_script = NULL;
peer->closing.shutdown_order = -1ULL;
peer->closing.closing_order = -1ULL;
peer->closing.sigs_in = 0;
peer->onchain.tx = NULL;
peer->onchain.resolved = NULL;
peer->onchain.htlcs = NULL;
@ -1653,7 +2020,8 @@ static struct peer *new_peer(struct lightningd_state *dstate,
}
peer->log = new_log(peer, dstate->log_record, "%s%s:%s:",
log_prefix(dstate->base_log), in_or_out,
log_prefix(dstate->base_log),
we_connected ? "out" : "in",
netaddr_name(peer, &peer->addr));
log_debug(peer->log, "Using fee rate %"PRIu64,
@ -1661,8 +2029,6 @@ static struct peer *new_peer(struct lightningd_state *dstate,
return peer;
}
/* Unused for the moment. */
#if 0
static u64 peer_commitsigs_received(struct peer *peer)
{
return peer->their_commitsigs;
@ -1673,7 +2039,6 @@ static u64 peer_revocations_received(struct peer *peer)
/* How many preimages we've received. */
return -peer->their_preimages.min_index;
}
#endif
static void htlc_destroy(struct htlc *htlc)
{
@ -1720,6 +2085,78 @@ struct htlc *peer_new_htlc(struct peer *peer,
return h;
}
static struct io_plan *reconnect_pkt_in(struct io_conn *conn, struct peer *peer)
{
if (peer->inpkt->pkt_case != PKT__PKT_RECONNECT) {
peer_received_unexpected_pkt(peer, peer->inpkt);
return pkt_out(conn, peer);
}
/* Send any packets they missed. */
retransmit_pkts(peer, peer->inpkt->reconnect->ack);
/* We let the conversation go this far in case they missed the
* close packets. But now we can close if we're done. */
if (!state_can_io(peer->state)) {
log_debug(peer->log, "State %s, closing immediately",
state_name(peer->state));
return io_close(conn);
}
/* Back into normal mode. */
assert(!peer->connected);
peer->connected = true;
return io_duplex(conn,
peer_read_packet(conn, peer, pkt_in),
pkt_out(conn, peer));
}
static struct io_plan *read_reconnect_pkt(struct io_conn *conn,
struct peer *peer)
{
return peer_read_packet(conn, peer, reconnect_pkt_in);
}
static struct io_plan *crypto_on_reconnect(struct io_conn *conn,
struct lightningd_state *dstate,
struct io_data *iod,
const struct pubkey *id,
struct peer *peer)
{
u64 sigs, revokes, shutdown, closing;
/* Setup peer->conn and peer->io_data */
if (!peer_reconnected(peer, conn, peer->addr.type,
peer->addr.protocol, iod, id))
return io_close(conn);
sigs = peer_commitsigs_received(peer);
revokes = peer_revocations_received(peer);
shutdown = peer->closing.their_script ? 1 : 0;
closing = peer->closing.sigs_in;
log_debug(peer->log,
"Reconnecting with ack %"PRIu64" sigs + %"PRIu64" revokes"
" + %"PRIu64" shutdown + %"PRIu64" closing",
sigs, revokes, shutdown, closing);
/* BOLT #2:
*
* A node reconnecting after receiving or sending an `open_channel`
* message SHOULD send a `reconnect` message on the new connection
* immediately after it has validated the `authenticate` message. */
/* BOLT #2:
*
* A node MUST set the `ack` field in the `reconnect` message to the
* the sum of previously-processed messages of types
* `open_commit_sig`, `update_commit`, `update_revocation`,
* `close_shutdown` and `close_signature`. */
return peer_write_packet(conn, peer,
pkt_reconnect(peer, sigs + revokes
+ shutdown + closing),
read_reconnect_pkt);
}
static struct io_plan *crypto_on_out(struct io_conn *conn,
struct lightningd_state *dstate,
struct io_data *iod,
@ -1728,7 +2165,7 @@ static struct io_plan *crypto_on_out(struct io_conn *conn,
{
/* Initiator currently funds channel */
struct peer *peer = new_peer(dstate, conn, SOCK_STREAM, IPPROTO_TCP,
CMD_OPEN_WITH_ANCHOR, "out");
CMD_OPEN_WITH_ANCHOR, true);
if (!peer) {
command_fail(connect->cmd, "Failed to make peer for %s:%s",
connect->name, connect->port);
@ -1758,9 +2195,29 @@ static struct io_plan *crypto_on_in(struct io_conn *conn,
const struct pubkey *id,
void *unused)
{
struct peer *peer;
/* BOLT #2:
*
* A node MUST handle continuing a previous channel on a new encrypted
* transport. */
peer = find_peer(dstate, id);
if (peer) {
/* Close any existing connection, without side effects. */
if (peer->conn) {
log_debug(peer->log, "Reconnect: closing old conn %p for new conn %p",
peer->conn, conn);
io_set_finish(peer->conn, NULL, NULL);
io_close(peer->conn);
peer->conn = NULL;
peer->connected = false;
}
return crypto_on_reconnect(conn, dstate, iod, id, peer);
}
/* Initiator currently funds channel */
struct peer *peer = new_peer(dstate, conn, SOCK_STREAM, IPPROTO_TCP,
CMD_OPEN_WITHOUT_ANCHOR, "in");
peer = new_peer(dstate, conn, SOCK_STREAM, IPPROTO_TCP,
CMD_OPEN_WITHOUT_ANCHOR, false);
if (!peer)
return io_close(conn);
@ -3089,6 +3546,59 @@ bool setup_first_commit(struct peer *peer)
return true;
}
static struct io_plan *peer_reconnect(struct io_conn *conn, struct peer *peer)
{
/* FIXME: log incoming address. */
log_debug(peer->log, "Reconnected, doing crypto...");
assert(peer->id);
return peer_crypto_setup(conn, peer->dstate,
peer->id, crypto_on_reconnect, peer);
}
/* FIXME: Do timeouts and backoff */
static void reconnect_failed(struct io_conn *conn, struct peer *peer)
{
log_broken(peer->log, "reconnecting gave %s", strerror(errno));
set_peer_state(peer, STATE_ERR_BREAKDOWN, "try_reconnect");
peer_breakdown(peer);
}
static struct io_plan *init_conn(struct io_conn *conn, struct peer *peer)
{
struct addrinfo a;
netaddr_to_addrinfo(&a, &peer->addr);
return io_connect(conn, &a, peer_reconnect, peer);
}
static void try_reconnect(struct peer *peer)
{
int fd = socket(peer->addr.saddr.s.sa_family, peer->addr.type,
peer->addr.protocol);
if (fd < 0) {
log_broken(peer->log, "try_reconnect: failed to create socket: %s",
strerror(errno));
reconnect_failed(NULL, peer);
return;
}
assert(!peer->conn);
peer->conn = io_new_conn(peer->dstate, fd, init_conn, peer);
log_debug(peer->log, "Trying to reconnect...");
io_set_finish(peer->conn, reconnect_failed, peer);
}
void reconnect_peers(struct lightningd_state *dstate)
{
struct peer *peer;
list_for_each(&dstate->peers, peer, list) {
if (peer->we_connected)
try_reconnect(peer);
}
}
static void json_add_abstime(struct json_result *response,
const char *id,
const struct abs_locktime *t)
@ -3544,6 +4054,35 @@ static void json_disconnect(struct command *cmd,
command_success(cmd, null_response(cmd));
}
static void json_reconnect(struct command *cmd,
const char *buffer, const jsmntok_t *params)
{
struct peer *peer;
jsmntok_t *peeridtok;
if (!json_get_params(buffer, params,
"peerid", &peeridtok,
NULL)) {
command_fail(cmd, "Need peerid");
return;
}
peer = find_peer_json(cmd->dstate, buffer, peeridtok);
if (!peer) {
command_fail(cmd, "Could not find peer with that peerid");
return;
}
if (!peer->conn) {
command_fail(cmd, "Peer is already disconnected");
return;
}
/* Should reconnect on its own. */
io_close(peer->conn);
command_success(cmd, null_response(cmd));
}
static void json_signcommit(struct command *cmd,
const char *buffer, const jsmntok_t *params)
{
@ -3635,7 +4174,14 @@ const struct json_command output_command = {
const struct json_command disconnect_command = {
"dev-disconnect",
json_disconnect,
"Force a disconned with peer {peerid}",
"Force a disconnect with peer {peerid}",
"Returns an empty result on success"
};
const struct json_command reconnect_command = {
"dev-reconnect",
json_reconnect,
"Force a reconnect with peer {peerid}",
"Returns an empty result on success"
};

16
daemon/peer.h

@ -50,6 +50,8 @@ struct commit_info {
struct channel_state *cstate;
/* Other side's signature for last commit tx (if known) */
struct bitcoin_signature *sig;
/* Order which commit was sent (theirs) / revocation was sent (ours) */
u64 order;
};
struct peer_visible_state {
@ -89,6 +91,12 @@ struct peer {
/* Network connection. */
struct io_conn *conn;
/* Did we connect to it? (Otherwise, they connected to us) */
bool we_connected;
/* Are we connected now? (Crypto handshake completed). */
bool connected;
/* If we're doing a commit, this is the command which triggered it */
struct command *commit_jsoncmd;
@ -104,6 +112,9 @@ struct peer {
/* Their ID. */
struct pubkey *id;
/* Order counter for transmission of revocations/commitments. */
u64 order_counter;
/* Current received packet. */
Pkt *inpkt;
@ -153,6 +164,10 @@ struct peer {
u64 their_fee;
/* scriptPubKey we/they want for closing. */
u8 *our_script, *their_script;
/* Last sent (in case we need to retransmit) */
u64 shutdown_order, closing_order;
/* How many closing sigs have we receieved? */
u32 sigs_in;
} closing;
/* If we're closing on-chain */
@ -248,5 +263,6 @@ void peer_open_complete(struct peer *peer, const char *problem);
struct bitcoin_tx *peer_create_close_tx(struct peer *peer, u64 fee);
void reconnect_peers(struct lightningd_state *dstate);
void cleanup_peers(struct lightningd_state *dstate);
#endif /* LIGHTNING_DAEMON_PEER_H */

30
daemon/test/test.sh

@ -7,6 +7,10 @@ cd `git rev-parse --show-toplevel`/daemon/test
scripts/setup.sh
# Bash variables for in-depth debugging.
#set -vx
#export PS4='+(${BASH_SOURCE}:${LINENO}): ${FUNCNAME[0]:+${FUNCNAME[0]}(): }'
DIR1=/tmp/lightning.$$.1
DIR2=/tmp/lightning.$$.2
DIR3=/tmp/lightning.$$.3
@ -67,6 +71,9 @@ while [ $# != 0 ]; do
;;
x"--normal")
;;
x"--reconnect")
RECONNECT=1
;;
x"--crash")
CRASH_ON_FAIL=1
;;
@ -97,6 +104,24 @@ lcli1()
echo $LCLI1 "$@" >&2
fi
$LCLI1 "$@"
STATUS=$?
if [ -n "$DO_RECONNECT" ]; then
case "$1" in
# Don't restart on every get* command.
get*)
;;
dev-mocktime*)
;;
stop)
;;
*)
[ -z "$VERBOSE" ] || echo RECONNECTING >&2
$LCLI1 dev-reconnect $ID2 >/dev/null
sleep 1
;;
esac
fi
return $STATUS
}
lcli2()
@ -348,6 +373,8 @@ lcli1 connect localhost $PORT2 $TX
check_peerstate lcli1 STATE_OPEN_WAITING_OURANCHOR
check_peerstate lcli2 STATE_OPEN_WAITING_THEIRANCHOR
DO_RECONNECT=$RECONNECT
if [ -n "$TIMEOUT_ANCHOR" ]; then
# Check anchor emitted, not mined deep enough.
check_tx_spend lcli1
@ -741,6 +768,7 @@ B_AMOUNT=$(($B_AMOUNT - $HTLC_AMOUNT))
check_status $A_AMOUNT $A_FEE "" $B_AMOUNT $B_FEE ""
# Now, test making more changes before receiving commit reply.
DO_RECONNECT=""
lcli2 dev-output $ID1 false
HTLCID=`lcli1 newhtlc $ID2 $HTLC_AMOUNT $EXPIRY $RHASH | extract_id`
@ -766,6 +794,8 @@ lcli2 dev-output $ID1 true
[ ! -n "$MANUALCOMMIT" ] || lcli1 commit $ID2
[ ! -n "$MANUALCOMMIT" ] || lcli2 commit $ID1
DO_RECONNECT=$RECONNECT
# Both sides should be committed to htlcs
# We open-code check_status here: HTLCs could be in either order.
check_balance_single lcli1 $(($A_AMOUNT - $HTLC_AMOUNT*2 - $EXTRA_FEE)) $(($A_FEE + $EXTRA_FEE)) $(($B_AMOUNT - $EXTRA_FEE)) $(($B_FEE + $EXTRA_FEE))

1
state.c

@ -148,6 +148,7 @@ enum state state(struct peer *peer,
peer_add_their_commit(peer, &peer->remote.commit->txid,
peer->remote.commit->commit_num);
peer->remote.commit->order = peer->order_counter++;
queue_pkt_open_commit_sig(peer);
peer_watch_anchor(peer,
peer->local.mindepth,

Loading…
Cancel
Save