Browse Source

gossipd: make sure master only ever sees one active connection.

When we get a reconnection, kill the current remote peer, and wait for the
master to tell us it's dead.  Then we hand it the new peer.

Previously, we would end up with gossipd holding multiple peers, and
the logging was really hard to interpret; I'm not completely convinced
that we did the right thing when one terminated, either.

Note that this now means we can have peers with neither ->local nor ->remote
populated, so we check that more carefully.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
ppa-0.6.1
Rusty Russell 7 years ago
parent
commit
bc4809aa85
  1. 7
      channeld/channel.c
  2. 147
      gossipd/gossip.c
  3. 19
      lightningd/peer_control.c
  4. 5
      tests/test_lightningd.py

7
channeld/channel.c

@ -2661,11 +2661,10 @@ int main(int argc, char *argv[])
req_in(peer, msg); req_in(peer, msg);
} else if (FD_ISSET(GOSSIP_FD, &rfds)) { } else if (FD_ISSET(GOSSIP_FD, &rfds)) {
msg = wire_sync_read(peer, GOSSIP_FD); msg = wire_sync_read(peer, GOSSIP_FD);
/* Gossipd hangs up on us to kill us when a new
* connection comes in. */
if (!msg) if (!msg)
status_failed(STATUS_FAIL_GOSSIP_IO, peer_conn_broken(peer);
"Can't read command: %s",
strerror(errno));
handle_gossip_msg(msg, &peer->cs, handle_gossip_msg(msg, &peer->cs,
channeld_send_reply, channeld_send_reply,
channeld_io_error, channeld_io_error,

147
gossipd/gossip.c

@ -88,9 +88,12 @@ struct daemon {
/* Who am I? */ /* Who am I? */
struct pubkey id; struct pubkey id;
/* Peers we have directly or indirectly */ /* Peers we have directly or indirectly: id is unique */
struct list_head peers; struct list_head peers;
/* Peers reconnecting now (waiting for current peer to die). */
struct list_head reconnecting;
/* Peers we are trying to reach */ /* Peers we are trying to reach */
struct list_head reaching; struct list_head reaching;
@ -228,6 +231,34 @@ static struct peer *find_peer(struct daemon *daemon, const struct pubkey *id)
return NULL; return NULL;
} }
static struct peer *find_reconnecting_peer(struct daemon *daemon,
const struct pubkey *id)
{
struct peer *peer;
list_for_each(&daemon->reconnecting, peer, list)
if (pubkey_eq(&peer->id, id))
return peer;
return NULL;
}
static void destroy_reconnecting_peer(struct peer *peer)
{
list_del_from(&peer->daemon->reconnecting, &peer->list);
/* This is safe even if we're being destroyed because of peer->conn,
* since tal_free protects against loops. */
io_close(peer->local->conn);
}
static void add_reconnecting_peer(struct daemon *daemon, struct peer *peer)
{
/* Drop any previous connecting peer */
tal_free(find_reconnecting_peer(peer->daemon, &peer->id));
list_add_tail(&daemon->reconnecting, &peer->list);
tal_add_destructor(peer, destroy_reconnecting_peer);
}
static void destroy_addrhint(struct addrhint *a) static void destroy_addrhint(struct addrhint *a)
{ {
list_del(&a->list); list_del(&a->list);
@ -329,7 +360,7 @@ static void queue_peer_msg(struct peer *peer, const u8 *msg TAKES)
{ {
if (peer->local) { if (peer->local) {
msg_enqueue(&peer->local->peer_out, msg); msg_enqueue(&peer->local->peer_out, msg);
} else { } else if (peer->remote) {
const u8 *send = towire_gossip_send_gossip(NULL, msg); const u8 *send = towire_gossip_send_gossip(NULL, msg);
if (taken(msg)) if (taken(msg))
tal_free(msg); tal_free(msg);
@ -372,15 +403,44 @@ static struct io_plan *peer_close_after_error(struct io_conn *conn,
return io_close(conn); return io_close(conn);
} }
static struct io_plan *peer_init_received(struct io_conn *conn, /* Mutual recursion */
struct peer *peer, static struct io_plan *peer_connected(struct io_conn *conn, struct peer *peer);
u8 *msg) static struct io_plan *retry_peer_connected(struct io_conn *conn,
struct peer *peer)
{ {
if (!fromwire_init(peer, msg, &peer->gfeatures, &peer->lfeatures)){ status_trace("peer %s: processing now old peer gone",
status_trace("peer %s bad fromwire_init '%s', closing", type_to_string(tmpctx, struct pubkey, &peer->id));
/* Clean up reconnecting state, try again */
list_del_from(&peer->daemon->reconnecting, &peer->list);
tal_del_destructor(peer, destroy_reconnecting_peer);
return peer_connected(conn, peer);
}
static struct io_plan *peer_connected(struct io_conn *conn, struct peer *peer)
{
struct peer *old_peer;
u8 *msg;
/* Now, is this a reconnect? */
old_peer = find_peer(peer->daemon, &peer->id);
if (old_peer) {
status_trace("peer %s: reconnect for %s",
type_to_string(tmpctx, struct pubkey, &peer->id), type_to_string(tmpctx, struct pubkey, &peer->id),
tal_hex(tmpctx, msg)); old_peer->local ? "local peer" : "active peer");
return io_close(conn); if (!old_peer->local) {
/* If not already closed, close it: it will
* fail, and master will peer_died to us */
if (old_peer->remote) {
daemon_conn_clear(old_peer->remote);
old_peer->remote = tal_free(old_peer->remote);
}
add_reconnecting_peer(peer->daemon, peer);
return io_wait(conn, peer, retry_peer_connected, peer);
}
/* Local peers can just be discarded when they reconnect */
tal_free(old_peer);
} }
reached_peer(peer, conn); reached_peer(peer, conn);
@ -399,7 +459,7 @@ static struct io_plan *peer_init_received(struct io_conn *conn,
= peer->daemon->rstate->broadcasts->next_index; = peer->daemon->rstate->broadcasts->next_index;
/* This is a full peer now; we keep it around until master says /* This is a full peer now; we keep it around until master says
* it's dead, or reconnect. */ * it's dead. */
peer_finalized(peer); peer_finalized(peer);
/* We will not have anything queued, since we're not duplex. */ /* We will not have anything queued, since we're not duplex. */
@ -410,14 +470,25 @@ static struct io_plan *peer_init_received(struct io_conn *conn,
return io_close(conn); return io_close(conn);
/* Start the gossip flowing. */ /* Start the gossip flowing. */
/* FIXME: This is a bit wasteful in the common case where master
* simply hands it straight back to us and we restart the peer and
* restart gossip broadcast... */
wake_pkt_out(peer); wake_pkt_out(peer);
return io_close_taken_fd(conn); return io_close_taken_fd(conn);
} }
static struct io_plan *peer_init_received(struct io_conn *conn,
struct peer *peer,
u8 *msg)
{
if (!fromwire_init(peer, msg, &peer->gfeatures, &peer->lfeatures)) {
status_trace("peer %s bad fromwire_init '%s', closing",
type_to_string(tmpctx, struct pubkey, &peer->id),
tal_hex(tmpctx, msg));
return io_close(conn);
}
return peer_connected(conn, peer);
}
static struct io_plan *read_init(struct io_conn *conn, struct peer *peer) static struct io_plan *read_init(struct io_conn *conn, struct peer *peer)
{ {
/* BOLT #1: /* BOLT #1:
@ -715,7 +786,7 @@ static void wake_pkt_out(struct peer *peer)
if (peer->local) if (peer->local)
/* Notify the peer-write loop */ /* Notify the peer-write loop */
msg_wake(&peer->local->peer_out); msg_wake(&peer->local->peer_out);
else else if (peer->remote)
/* Notify the daemon_conn-write loop */ /* Notify the daemon_conn-write loop */
msg_wake(&peer->remote->out); msg_wake(&peer->remote->out);
} }
@ -851,6 +922,13 @@ static struct io_plan *owner_msg_in(struct io_conn *conn,
return daemon_conn_read_next(conn, dc); return daemon_conn_read_next(conn, dc);
} }
static void free_peer_remote(struct io_conn *conn, struct daemon_conn *dc)
{
struct peer *peer = dc->ctx;
peer->remote = tal_free(peer->remote);
}
/* When a peer is to be owned by another daemon, we create a socket /* When a peer is to be owned by another daemon, we create a socket
* pair to send/receive gossip from it */ * pair to send/receive gossip from it */
static bool send_peer_with_fds(struct peer *peer, const u8 *msg) static bool send_peer_with_fds(struct peer *peer, const u8 *msg)
@ -871,12 +949,15 @@ static bool send_peer_with_fds(struct peer *peer, const u8 *msg)
peer->local = tal_free(peer->local); peer->local = tal_free(peer->local);
peer->remote = tal(peer, struct daemon_conn); peer->remote = tal(peer, struct daemon_conn);
daemon_conn_init(peer, peer->remote, fds[0], daemon_conn_init(peer, peer->remote, fds[0],
owner_msg_in, NULL); owner_msg_in, free_peer_remote);
peer->remote->msg_queue_cleared_cb = nonlocal_dump_gossip; peer->remote->msg_queue_cleared_cb = nonlocal_dump_gossip;
/* Peer stays around, even though caller will close conn. */ /* Peer stays around, even though caller will close conn. */
tal_steal(peer->daemon, peer); tal_steal(peer->daemon, peer);
status_debug("peer %s now remote",
type_to_string(tmpctx, struct pubkey, &peer->id));
daemon_conn_send(&peer->daemon->master, msg); daemon_conn_send(&peer->daemon->master, msg);
daemon_conn_send_fd(&peer->daemon->master, peer_fd); daemon_conn_send_fd(&peer->daemon->master, peer_fd);
daemon_conn_send_fd(&peer->daemon->master, fds[1]); daemon_conn_send_fd(&peer->daemon->master, fds[1]);
@ -965,7 +1046,7 @@ static struct io_plan *handle_returning_peer(struct io_conn *conn,
struct returning_peer *rpeer) struct returning_peer *rpeer)
{ {
struct daemon *daemon = rpeer->daemon; struct daemon *daemon = rpeer->daemon;
struct peer *peer; struct peer *peer, *connecting;
peer = find_peer(daemon, &rpeer->id); peer = find_peer(daemon, &rpeer->id);
if (!peer) if (!peer)
@ -973,14 +1054,20 @@ static struct io_plan *handle_returning_peer(struct io_conn *conn,
"hand_back_peer unknown peer: %s", "hand_back_peer unknown peer: %s",
type_to_string(tmpctx, struct pubkey, &rpeer->id)); type_to_string(tmpctx, struct pubkey, &rpeer->id));
/* Possible if there's a reconnect: ignore handed back. */ assert(!peer->local);
if (peer->local) {
status_trace("hand_back_peer %s: reconnected, dropping handback", /* Corner case: we got a reconnection while master was handing this
type_to_string(tmpctx, struct pubkey, &rpeer->id)); * back. We would have killed it immediately if it was local previously
* so do that now */
connecting = find_reconnecting_peer(daemon, &rpeer->id);
if (connecting) {
status_trace("Forgetting handed back peer %s",
type_to_string(tmpctx, struct pubkey, &peer->id));
tal_free(peer);
/* Now connecting peer can go ahead. */
io_wake(connecting);
close(rpeer->gossip_fd);
close(rpeer->peer_fd);
tal_free(rpeer);
return daemon_conn_read_next(conn, &daemon->master); return daemon_conn_read_next(conn, &daemon->master);
} }
@ -1873,17 +1960,18 @@ static struct io_plan *peer_disconnected(struct io_conn *conn,
"peer_disconnected unknown peer: %s", "peer_disconnected unknown peer: %s",
type_to_string(tmpctx, struct pubkey, &id)); type_to_string(tmpctx, struct pubkey, &id));
/* Possible if there's a reconnect: ignore disonnect. */ assert(!peer->local);
if (peer->local) {
status_trace("peer_disconnected %s: reconnected, ignoring",
type_to_string(tmpctx, struct pubkey, &id));
return daemon_conn_read_next(conn, &daemon->master);
}
status_trace("Forgetting remote peer %s", status_trace("Forgetting remote peer %s",
type_to_string(tmpctx, struct pubkey, &peer->id)); type_to_string(tmpctx, struct pubkey, &peer->id));
tal_free(peer); tal_free(peer);
/* If there was a connecting peer waiting, wake it now */
peer = find_reconnecting_peer(daemon, &id);
if (peer)
io_wake(peer);
return daemon_conn_read_next(conn, &daemon->master); return daemon_conn_read_next(conn, &daemon->master);
} }
@ -2205,6 +2293,7 @@ int main(int argc, char *argv[])
daemon = tal(NULL, struct daemon); daemon = tal(NULL, struct daemon);
list_head_init(&daemon->peers); list_head_init(&daemon->peers);
list_head_init(&daemon->reconnecting);
list_head_init(&daemon->reaching); list_head_init(&daemon->reaching);
list_head_init(&daemon->addrhints); list_head_init(&daemon->addrhints);
important_peerid_map_init(&daemon->important_peerids); important_peerid_map_init(&daemon->important_peerids);

19
lightningd/peer_control.c

@ -401,7 +401,8 @@ void channel_errmsg(struct channel *channel,
subd_send_fd(ld->gossip, gossip_fd); subd_send_fd(ld->gossip, gossip_fd);
} }
/* Gossipd tells us a peer has connected */ /* Gossipd tells us a peer has connected: it never hands us duplicates, since
* it holds them until we say peer_died. */
void peer_connected(struct lightningd *ld, const u8 *msg, void peer_connected(struct lightningd *ld, const u8 *msg,
int peer_fd, int gossip_fd) int peer_fd, int gossip_fd)
{ {
@ -450,11 +451,8 @@ void peer_connected(struct lightningd *ld, const u8 *msg,
* channel. */ * channel. */
channel = active_channel_by_id(ld, &id, &uc); channel = active_channel_by_id(ld, &id, &uc);
/* Opening now? Kill it */ /* Can't be opening now, since we wouldn't have sent peer_died. */
if (uc) { assert(!uc);
kill_uncommitted_channel(uc, "Peer reconnected");
goto return_to_gossipd;
}
if (channel) { if (channel) {
log_debug(channel->log, "Peer has reconnected, state %s", log_debug(channel->log, "Peer has reconnected, state %s",
@ -484,9 +482,7 @@ void peer_connected(struct lightningd *ld, const u8 *msg,
case CHANNELD_AWAITING_LOCKIN: case CHANNELD_AWAITING_LOCKIN:
case CHANNELD_NORMAL: case CHANNELD_NORMAL:
case CHANNELD_SHUTTING_DOWN: case CHANNELD_SHUTTING_DOWN:
/* Stop any existing daemon, without triggering error assert(!channel->owner);
* on this peer. */
channel_set_owner(channel, NULL);
channel->peer->addr = addr; channel->peer->addr = addr;
peer_start_channeld(channel, &cs, peer_start_channeld(channel, &cs,
@ -495,9 +491,7 @@ void peer_connected(struct lightningd *ld, const u8 *msg,
return; return;
case CLOSINGD_SIGEXCHANGE: case CLOSINGD_SIGEXCHANGE:
/* Stop any existing daemon, without triggering error assert(!channel->owner);
* on this peer. */
channel_set_owner(channel, NULL);
channel->peer->addr = addr; channel->peer->addr = addr;
peer_start_closingd(channel, &cs, peer_start_closingd(channel, &cs,
@ -508,7 +502,6 @@ void peer_connected(struct lightningd *ld, const u8 *msg,
abort(); abort();
} }
return_to_gossipd:
/* No err, all good. */ /* No err, all good. */
error = NULL; error = NULL;

5
tests/test_lightningd.py

@ -3131,8 +3131,9 @@ class LightningDTests(BaseLightningDTests):
# Reconnect. # Reconnect.
l1.rpc.connect(l2.info['id'], 'localhost', l2.info['port']) l1.rpc.connect(l2.info['id'], 'localhost', l2.info['port'])
# We should get a message about reconnecting. # We should get a message about reconnecting, but order unsynced.
l2.daemon.wait_for_log('Killing openingd: Peer reconnected') l2.daemon.wait_for_logs(['gossipd.*reconnect for active peer',
'openingd.*Error reading gossip msg'])
# Should work fine. # Should work fine.
l1.rpc.fundchannel(l2.info['id'], 20000) l1.rpc.fundchannel(l2.info['id'], 20000)

Loading…
Cancel
Save