Browse Source

gossipd: hand back peer, don't hand a new peer.

All peers come from gossipd, and maintain an fd to talk to it.  Sometimes
we hand the peer back, but to avoid a race, we always recreated it.

The race was that a daemon closed the gossip_fd, which made gossipd
forget the peer, then master handed the peer back to gossipd.  We stop
the race by never closing the gossipfd, but hand it back to gossipd
for closing.

Now gossipd has to accept two fds, but the handling of peers is far
clearer.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
ppa-0.6.1
Rusty Russell 7 years ago
parent
commit
67aa95c194
  1. 109
      gossipd/gossip.c
  2. 17
      gossipd/gossip_wire.csv
  3. 2
      lightningd/gossip_control.c
  4. 25
      lightningd/peer_control.c
  5. 3
      lightningd/peer_control.h
  6. 1
      openingd/opening.c
  7. 30
      tests/test_lightningd.py

109
gossipd/gossip.c

@ -827,55 +827,80 @@ static struct io_plan *new_peer_got_fd(struct io_conn *conn, struct peer *peer)
return daemon_conn_read_next(conn, &peer->daemon->master);
}
/* Read and close fd */
static struct io_plan *discard_peer_fd(struct io_conn *conn, int *fd)
{
struct daemon *daemon = tal_parent(fd);
close(*fd);
tal_free(fd);
return daemon_conn_read_next(conn, &daemon->master);
}
/* This lets us read the fds in before handling anything. */
struct returning_peer {
struct daemon *daemon;
struct pubkey id;
struct crypto_state cs;
u8 *inner_msg;
int peer_fd, gossip_fd;
};
static struct io_plan *handle_peer(struct io_conn *conn, struct daemon *daemon,
const u8 *msg)
static struct io_plan *handle_returning_peer(struct io_conn *conn,
struct returning_peer *rpeer)
{
struct daemon *daemon = rpeer->daemon;
struct peer *peer;
struct crypto_state cs;
struct pubkey id;
struct wireaddr addr;
u8 *gfeatures, *lfeatures;
u8 *inner_msg;
if (!fromwire_gossipctl_handle_peer(msg, msg, NULL, &id, &addr, &cs,
&gfeatures, &lfeatures, &inner_msg))
master_badmsg(WIRE_GOSSIPCTL_HANDLE_PEER, msg);
peer = find_peer(daemon, &rpeer->id);
if (!peer)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"hand_back_peer unknown peer: %s",
type_to_string(trc, struct pubkey, &rpeer->id));
/* If it already exists locally, that's probably a reconnect:
* drop this one. If it exists as remote, replace with this.*/
peer = find_peer(daemon, &id);
if (peer) {
if (peer->local) {
int *fd = tal(daemon, int);
status_trace("handle_peer %s: duplicate, dropping",
type_to_string(trc, struct pubkey, &id));
return io_recv_fd(conn, fd, discard_peer_fd, fd);
}
status_trace("handle_peer %s: found remote duplicate, dropping",
type_to_string(trc, struct pubkey, &id));
tal_free(peer);
/* We don't need the gossip_fd. We could drain it, so no gossip msgs
* are missed, but that seems overkill. */
close(rpeer->gossip_fd);
/* Possible if there's a reconnect: ignore handed back. */
if (peer->local) {
status_trace("hand_back_peer %s: reconnected, dropping handback",
type_to_string(trc, struct pubkey, &rpeer->id));
close(rpeer->peer_fd);
tal_free(rpeer);
return daemon_conn_read_next(conn, &daemon->master);
}
status_trace("handle_peer %s: new peer",
type_to_string(trc, struct pubkey, &id));
peer = new_peer(daemon, daemon, &id, &addr, &cs);
peer->gfeatures = tal_steal(peer, gfeatures);
peer->lfeatures = tal_steal(peer, lfeatures);
peer_finalized(peer);
status_trace("hand_back_peer %s: now local again",
type_to_string(trc, struct pubkey, &rpeer->id));
/* Now we talk to peer directly again. */
daemon_conn_clear(peer->remote);
peer->remote = tal_free(peer->remote);
peer->local = new_local_peer_state(peer, &rpeer->cs);
peer->local->fd = rpeer->peer_fd;
/* If they told us to send a message, queue it now */
if (tal_len(rpeer->inner_msg))
msg_enqueue(&peer->local->peer_out, take(rpeer->inner_msg));
tal_free(rpeer);
return new_peer_got_fd(conn, peer);
}
static struct io_plan *read_returning_gossipfd(struct io_conn *conn,
struct returning_peer *rpeer)
{
return io_recv_fd(conn, &rpeer->gossip_fd,
handle_returning_peer, rpeer);
}
static struct io_plan *hand_back_peer(struct io_conn *conn,
struct daemon *daemon,
const u8 *msg)
{
struct returning_peer *rpeer = tal(daemon, struct returning_peer);
if (tal_len(inner_msg))
msg_enqueue(&peer->local->peer_out, take(inner_msg));
rpeer->daemon = daemon;
if (!fromwire_gossipctl_hand_back_peer(msg, msg, NULL,
&rpeer->id, &rpeer->cs,
&rpeer->inner_msg))
master_badmsg(WIRE_GOSSIPCTL_HAND_BACK_PEER, msg);
return io_recv_fd(conn, &peer->local->fd, new_peer_got_fd, peer);
return io_recv_fd(conn, &rpeer->peer_fd,
read_returning_gossipfd, rpeer);
}
static struct io_plan *release_peer(struct io_conn *conn, struct daemon *daemon,
@ -1483,8 +1508,8 @@ static struct io_plan *recv_req(struct io_conn *conn, struct daemon_conn *master
handle_forwarded_msg(conn, daemon, daemon->master.msg_in);
return daemon_conn_read_next(conn, &daemon->master);
case WIRE_GOSSIPCTL_HANDLE_PEER:
return handle_peer(conn, daemon, master->msg_in);
case WIRE_GOSSIPCTL_HAND_BACK_PEER:
return hand_back_peer(conn, daemon, master->msg_in);
case WIRE_GOSSIPCTL_REACH_PEER:
return reach_peer(conn, daemon, master->msg_in);

17
gossipd/gossip_wire.csv

@ -64,17 +64,12 @@ gossipctl_release_peer_reply,,lfeatures,lflen*u8
# Gossipd -> master: reply to gossip_release_peer if we couldn't find the peer.
gossipctl_release_peer_replyfail,3204
# Gossipd -> master: take over peer, with optional msg. (+peer fd)
gossipctl_handle_peer,3013
gossipctl_handle_peer,,id,struct pubkey
gossipctl_handle_peer,,addr,struct wireaddr
gossipctl_handle_peer,,crypto_state,struct crypto_state
gossipctl_handle_peer,,gflen,u16
gossipctl_handle_peer,,gfeatures,gflen*u8
gossipctl_handle_peer,,lflen,u16
gossipctl_handle_peer,,lfeatures,lflen*u8
gossipctl_handle_peer,,len,u16
gossipctl_handle_peer,,msg,len*u8
# Gossipd -> master: take back peer, with optional msg. (+peer fd, +gossip fd)
gossipctl_hand_back_peer,3013
gossipctl_hand_back_peer,,id,struct pubkey
gossipctl_hand_back_peer,,crypto_state,struct crypto_state
gossipctl_hand_back_peer,,len,u16
gossipctl_hand_back_peer,,msg,len*u8
# Pass JSON-RPC getnodes call through
gossip_getnodes_request,3005

Can't render this file because it has a wrong number of fields in line 5.

2
lightningd/gossip_control.c

@ -66,7 +66,7 @@ static unsigned gossip_msg(struct subd *gossip, const u8 *msg, const int *fds)
case WIRE_GOSSIP_RESOLVE_CHANNEL_REQUEST:
case WIRE_GOSSIP_FORWARDED_MSG:
case WIRE_GOSSIPCTL_REACH_PEER:
case WIRE_GOSSIPCTL_HANDLE_PEER:
case WIRE_GOSSIPCTL_HAND_BACK_PEER:
case WIRE_GOSSIPCTL_RELEASE_PEER:
case WIRE_GOSSIPCTL_PEER_ADDRHINT:
case WIRE_GOSSIP_GET_UPDATE:

25
lightningd/peer_control.c

@ -640,11 +640,10 @@ void peer_connected(struct lightningd *ld, const u8 *msg,
return_to_gossipd:
/* Otherwise, we hand back to gossipd, to continue. */
msg = towire_gossipctl_handle_peer(msg, &id, &addr, &cs,
gfeatures, lfeatures, NULL);
msg = towire_gossipctl_hand_back_peer(msg, &id, &cs, NULL);
subd_send_msg(ld->gossip, take(msg));
subd_send_fd(ld->gossip, peer_fd);
close(gossip_fd);
subd_send_fd(ld->gossip, gossip_fd);
/* If we were waiting for connection, we succeeded. */
connect_succeeded(ld, &id);
@ -653,11 +652,10 @@ return_to_gossipd:
send_error:
/* Hand back to gossipd, with an error packet. */
connect_failed(ld, &id, sanitize_error(msg, error, NULL));
msg = towire_gossipctl_handle_peer(msg, &id, &addr, &cs,
gfeatures, lfeatures, error);
msg = towire_gossipctl_hand_back_peer(msg, &id, &cs, error);
subd_send_msg(ld->gossip, take(msg));
subd_send_fd(ld->gossip, peer_fd);
close(gossip_fd);
subd_send_fd(ld->gossip, gossip_fd);
}
void peer_sent_nongossip(struct lightningd *ld,
@ -704,11 +702,10 @@ void peer_sent_nongossip(struct lightningd *ld,
send_error:
/* Hand back to gossipd, with an error packet. */
connect_failed(ld, id, sanitize_error(error, error, NULL));
msg = towire_gossipctl_handle_peer(error, id, addr, cs,
gfeatures, lfeatures, error);
msg = towire_gossipctl_hand_back_peer(ld, id, cs, error);
subd_send_msg(ld->gossip, take(msg));
subd_send_fd(ld->gossip, peer_fd);
close(gossip_fd);
subd_send_fd(ld->gossip, gossip_fd);
tal_free(error);
}
@ -2393,9 +2390,9 @@ static unsigned int opening_negotiation_failed(struct subd *openingd,
u8 *err;
const char *why;
/* We need the peer fd. */
/* We need the peer fd and gossip fd. */
if (tal_count(fds) == 0)
return 1;
return 2;
if (!fromwire_opening_negotiation_failed(msg, msg, NULL, &cs, &err)) {
peer_internal_error(peer,
@ -2404,12 +2401,10 @@ static unsigned int opening_negotiation_failed(struct subd *openingd,
return 0;
}
/* FIXME: Should we save addr in peer, or should gossipd remember it? */
msg = towire_gossipctl_handle_peer(msg, &peer->id, NULL, &cs,
peer->gfeatures, peer->lfeatures,
NULL);
msg = towire_gossipctl_hand_back_peer(msg, &peer->id, &cs, NULL);
subd_send_msg(openingd->ld->gossip, take(msg));
subd_send_fd(openingd->ld->gossip, fds[0]);
subd_send_fd(openingd->ld->gossip, fds[1]);
why = tal_strndup(peer, (const char *)err, tal_len(err));
log_unusual(peer->log, "Opening negotiation failed: %s", why);

3
lightningd/peer_control.h

@ -26,9 +26,6 @@ struct peer {
/* ID of peer */
struct pubkey id;
/* Global and local features bitfields. */
const u8 *gfeatures, *lfeatures;
/* Error message (iff in error state) */
u8 *error;

1
openingd/opening.c

@ -92,6 +92,7 @@ static void negotiation_failed(struct state *state, bool send_error,
(const u8 *)errmsg);
wire_sync_write(REQ_FD, msg);
fdpass_send(REQ_FD, PEER_FD);
fdpass_send(REQ_FD, GOSSIP_FD);
tal_free(state);
exit(0);

30
tests/test_lightningd.py

@ -221,8 +221,8 @@ class LightningDTests(BaseLightningDTests):
assert ret['id'] == l2.info['id']
l1.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER')
l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER')
l1.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER')
l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER')
return l1,l2
# Returns the short channel-id: <blocknum>:<txnum>:<outnum>
@ -342,8 +342,8 @@ class LightningDTests(BaseLightningDTests):
assert l2.rpc.getpeer(l1.info['id'])['state'] == 'GOSSIPING'
# Both gossipds will have them as new peers once handed back.
l1.daemon.wait_for_log('handle_peer {}: new peer'.format(l2.info['id']))
l2.daemon.wait_for_log('handle_peer {}: new peer'.format(l1.info['id']))
l1.daemon.wait_for_log('hand_back_peer {}: now local again'.format(l2.info['id']))
l2.daemon.wait_for_log('hand_back_peer {}: now local again'.format(l1.info['id']))
def test_balance(self):
l1,l2 = self.connect()
@ -709,8 +709,8 @@ class LightningDTests(BaseLightningDTests):
assert ret['id'] == l2.info['id']
l1.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER')
l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER')
l1.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER')
l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER')
addr = l1.rpc.newaddr()['address']
txid = l1.bitcoin.rpc.sendtoaddress(addr, 10**6 / 10**8 + 0.01)
@ -1406,7 +1406,7 @@ class LightningDTests(BaseLightningDTests):
assert ret['id'] == l3.info['id']
l3.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER')
l3.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER')
self.fund_channel(l1, l2, 10**6)
self.fund_channel(l2, l3, 10**6)
@ -1497,14 +1497,14 @@ class LightningDTests(BaseLightningDTests):
ret = l1.rpc.connect(l2.info['id'], 'localhost', l2.info['port'])
assert ret['id'] == l2.info['id']
l1.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER')
l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER')
l1.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER')
l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER')
ret = l2.rpc.connect(l3.info['id'], 'localhost', l3.info['port'])
assert ret['id'] == l3.info['id']
l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER')
l3.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER')
l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER')
l3.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER')
c1 = self.fund_channel(l1, l2, 10**6)
c2 = self.fund_channel(l2, l3, 10**6)
@ -1596,14 +1596,14 @@ class LightningDTests(BaseLightningDTests):
ret = l1.rpc.connect(l2.info['id'], 'localhost', l2.info['port'])
assert ret['id'] == l2.info['id']
l1.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER')
l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER')
l1.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER')
l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER')
ret = l2.rpc.connect(l3.info['id'], 'localhost', l3.info['port'])
assert ret['id'] == l3.info['id']
l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER')
l3.daemon.wait_for_log('WIRE_GOSSIPCTL_HANDLE_PEER')
l2.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER')
l3.daemon.wait_for_log('WIRE_GOSSIPCTL_HAND_BACK_PEER')
c1 = self.fund_channel(l1, l2, 10**6)
c2 = self.fund_channel(l2, l3, 10**6)

Loading…
Cancel
Save