Browse Source

gossipd: split peer structure to clearly separate local and remote fields.

We should also go through and use consistent nomenclature on functions which
are used with a local peer ("lpeer_xxx"?) and those with a remote peer
("rpeer_xxx"?) but this is minimal.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
ppa-0.6.1
Rusty Russell 7 years ago
parent
commit
bcfbc24308
  1. 187
      gossipd/gossip.c

187
gossipd/gossip.c

@ -96,6 +96,30 @@ struct reaching {
bool succeeded;
};
/* Things we need when we're talking direct to the peer. */
struct local_peer_state {
/* Cryptostate */
struct peer_crypto_state pcs;
/* File descriptor corresponding to conn. */
int fd;
/* Our connection (and owner) */
struct io_conn *conn;
/* Waiting to send_peer_with_fds to master? */
bool return_to_master;
/* If we're exiting due to non-gossip msg, otherwise release */
u8 *nongossip_msg;
/* How many pongs are we expecting? */
size_t num_pings_outstanding;
/* Message queue for outgoing. */
struct msg_queue peer_out;
};
struct peer {
struct daemon *daemon;
@ -111,41 +135,18 @@ struct peer {
/* Feature bitmaps. */
u8 *gfeatures, *lfeatures;
/* Cryptostate */
struct peer_crypto_state pcs;
/* File descriptor corresponding to conn. */
int fd;
/* Our connection (and owner) */
struct io_conn *conn;
/* High water mark for the staggered broadcast */
u64 broadcast_index;
/* Message queue for outgoing. */
struct msg_queue peer_out;
/* Is it time to continue the staggered broadcast? */
bool gossip_sync;
/* The peer owner will use this to talk to gossipd */
struct daemon_conn owner_conn;
/* How many pongs are we expecting? */
size_t num_pings_outstanding;
/* Are we the owner of the peer? */
bool local;
/* If we die, should we reach again? */
bool reach_again;
/* Waiting to send_peer_with_fds to master? */
bool return_to_master;
/* If we're exiting due to non-gossip msg, otherwise release */
u8 *nongossip_msg;
/* Only one of these is set: */
struct local_peer_state *local;
struct daemon_conn *remote;
};
struct addrhint {
@ -198,6 +199,20 @@ static struct addrhint *find_addrhint(struct daemon *daemon,
return NULL;
}
static struct local_peer_state *
new_local_peer_state(struct peer *peer, const struct crypto_state *cs)
{
struct local_peer_state *lps = tal(peer, struct local_peer_state);
init_peer_crypto_state(peer, &lps->pcs);
lps->pcs.cs = *cs;
lps->return_to_master = false;
lps->num_pings_outstanding = 0;
msg_queue_init(&lps->peer_out, peer);
return lps;
}
static struct peer *new_peer(const tal_t *ctx,
struct daemon *daemon,
const struct pubkey *their_id,
@ -206,17 +221,13 @@ static struct peer *new_peer(const tal_t *ctx,
{
struct peer *peer = tal(ctx, struct peer);
init_peer_crypto_state(peer, &peer->pcs);
peer->pcs.cs = *cs;
peer->id = *their_id;
peer->addr = *addr;
peer->daemon = daemon;
peer->local = true;
peer->local = new_local_peer_state(peer, cs);
peer->remote = NULL;
peer->reach_again = false;
peer->return_to_master = false;
peer->num_pings_outstanding = 0;
peer->broadcast_index = 0;
msg_queue_init(&peer->peer_out, peer);
return peer;
}
@ -277,7 +288,7 @@ static void peer_error(struct peer *peer, const char *fmt, ...)
/* Send error: we'll close after writing this. */
va_start(ap, fmt);
msg_enqueue(&peer->peer_out,
msg_enqueue(&peer->local->peer_out,
take(towire_errorfmtv(peer, NULL, fmt, ap)));
va_end(ap);
}
@ -320,7 +331,7 @@ static struct io_plan *peer_init_received(struct io_conn *conn,
/* We will not have anything queued, since we're not duplex. */
msg = towire_gossip_peer_connected(peer, &peer->id, &peer->addr,
&peer->pcs.cs,
&peer->local->pcs.cs,
peer->gfeatures, peer->lfeatures);
if (!send_peer_with_fds(peer, msg))
return io_close(conn);
@ -341,7 +352,7 @@ static struct io_plan *read_init(struct io_conn *conn, struct peer *peer)
* Each node MUST wait to receive `init` before sending any other
* messages.
*/
return peer_read_message(conn, &peer->pcs, peer_init_received);
return peer_read_message(conn, &peer->local->pcs, peer_init_received);
}
/* This creates a temporary peer which is not in the list and is owner
@ -356,7 +367,7 @@ static struct io_plan *init_new_peer(struct io_conn *conn,
struct peer *peer = new_peer(conn, daemon, their_id, addr, cs);
u8 *initmsg;
peer->fd = io_conn_fd(conn);
peer->local->fd = io_conn_fd(conn);
/* BOLT #1:
*
@ -365,7 +376,8 @@ static struct io_plan *init_new_peer(struct io_conn *conn,
*/
initmsg = towire_init(peer,
daemon->globalfeatures, daemon->localfeatures);
return peer_write_message(conn, &peer->pcs, take(initmsg), read_init);
return peer_write_message(conn, &peer->local->pcs,
take(initmsg), read_init);
}
static struct io_plan *owner_msg_in(struct io_conn *conn,
@ -458,7 +470,7 @@ static void handle_ping(struct peer *peer, u8 *ping)
}
if (pong)
msg_enqueue(&peer->peer_out, take(pong));
msg_enqueue(&peer->local->peer_out, take(pong));
}
static void handle_pong(struct peer *peer, const u8 *pong)
@ -471,12 +483,12 @@ static void handle_pong(struct peer *peer, const u8 *pong)
return;
}
if (!peer->num_pings_outstanding) {
if (!peer->local->num_pings_outstanding) {
peer_error(peer, "Unexpected pong");
return;
}
peer->num_pings_outstanding--;
peer->local->num_pings_outstanding--;
daemon_conn_send(&peer->daemon->master,
take(towire_gossip_ping_reply(pong, true,
tal_len(pong))));
@ -493,24 +505,23 @@ static void fail_release(struct peer *peer)
static struct io_plan *ready_for_master(struct io_conn *conn, struct peer *peer)
{
u8 *msg;
if (peer->nongossip_msg)
if (peer->local->nongossip_msg)
msg = towire_gossip_peer_nongossip(peer, &peer->id,
&peer->addr,
&peer->pcs.cs,
&peer->local->pcs.cs,
peer->gfeatures,
peer->lfeatures,
peer->nongossip_msg);
peer->local->nongossip_msg);
else
msg = towire_gossipctl_release_peer_reply(peer,
&peer->addr,
&peer->pcs.cs,
&peer->local->pcs.cs,
peer->gfeatures,
peer->lfeatures);
if (send_peer_with_fds(peer, take(msg))) {
/* In case we set this earlier. */
tal_del_destructor(peer, fail_release);
peer->return_to_master = false;
return io_close_taken_fd(conn);
} else
return io_close(conn);
@ -523,14 +534,14 @@ static struct io_plan *peer_msgin(struct io_conn *conn,
* pass up to master */
static struct io_plan *peer_next_in(struct io_conn *conn, struct peer *peer)
{
if (peer->return_to_master) {
assert(!peer_in_started(conn, &peer->pcs));
if (!peer_out_started(conn, &peer->pcs))
if (peer->local->return_to_master) {
assert(!peer_in_started(conn, &peer->local->pcs));
if (!peer_out_started(conn, &peer->local->pcs))
return ready_for_master(conn, peer);
return io_wait(conn, peer, peer_next_in, peer);
}
return peer_read_message(conn, &peer->pcs, peer_msgin);
return peer_read_message(conn, &peer->local->pcs, peer_msgin);
}
static struct io_plan *peer_msgin(struct io_conn *conn,
@ -577,8 +588,8 @@ static struct io_plan *peer_msgin(struct io_conn *conn,
case WIRE_REVOKE_AND_ACK:
case WIRE_INIT:
/* Not our place to handle this, so we punt */
peer->return_to_master = true;
peer->nongossip_msg = tal_steal(peer, msg);
peer->local->return_to_master = true;
peer->local->nongossip_msg = tal_steal(peer, msg);
/* This will wait. */
return peer_next_in(conn, peer);
@ -609,28 +620,32 @@ static void wake_pkt_out(struct peer *peer)
new_reltimer(&peer->daemon->timers, peer,
time_from_msec(peer->daemon->broadcast_interval),
wake_pkt_out, peer);
if (peer->local)
/* Notify the peer-write loop */
msg_wake(&peer->peer_out);
msg_wake(&peer->local->peer_out);
else
/* Notify the daemon_conn-write loop */
msg_wake(&peer->owner_conn.out);
msg_wake(&peer->remote->out);
}
static struct io_plan *peer_pkt_out(struct io_conn *conn, struct peer *peer)
{
/* First priority is queued packets, if any */
const u8 *out = msg_dequeue(&peer->peer_out);
const u8 *out = msg_dequeue(&peer->local->peer_out);
if (out) {
if (is_all_channel_error(out))
return peer_write_message(conn, &peer->pcs, take(out),
return peer_write_message(conn, &peer->local->pcs,
take(out),
peer_close_after_error);
return peer_write_message(conn, &peer->pcs, take(out),
return peer_write_message(conn, &peer->local->pcs, take(out),
peer_pkt_out);
}
/* Do we want to send this peer to the master daemon? */
if (peer->return_to_master) {
assert(!peer_out_started(conn, &peer->pcs));
if (!peer_in_started(conn, &peer->pcs))
if (peer->local->return_to_master) {
assert(!peer_out_started(conn, &peer->local->pcs));
if (!peer_in_started(conn, &peer->local->pcs))
return ready_for_master(conn, peer);
return io_out_wait(conn, peer, peer_pkt_out, peer);
}
@ -643,14 +658,14 @@ static struct io_plan *peer_pkt_out(struct io_conn *conn, struct peer *peer)
&peer->broadcast_index);
if (next)
return peer_write_message(conn, &peer->pcs,
return peer_write_message(conn, &peer->local->pcs,
next->payload, peer_pkt_out);
/* Gossip is drained. Wait for next timer. */
peer->gossip_sync = false;
}
return msg_queue_wait(conn, &peer->peer_out, peer_pkt_out, peer);
return msg_queue_wait(conn, &peer->local->peer_out, peer_pkt_out, peer);
}
/* Now we're a fully-fledged peer. */
@ -704,7 +719,7 @@ static void handle_get_update(struct peer *peer, const u8 *msg)
reply:
msg = towire_gossip_get_update_reply(msg, update);
daemon_conn_send(&peer->owner_conn, take(msg));
daemon_conn_send(peer->remote, take(msg));
}
/**
@ -714,7 +729,7 @@ reply:
static struct io_plan *owner_msg_in(struct io_conn *conn,
struct daemon_conn *dc)
{
struct peer *peer = container_of(dc, struct peer, owner_conn);
struct peer *peer = dc->ctx;
u8 *msg = dc->msg_in;
int type = fromwire_peektype(msg);
@ -744,6 +759,7 @@ static void forget_peer(struct io_conn *conn, struct daemon_conn *dc)
static bool send_peer_with_fds(struct peer *peer, const u8 *msg)
{
int fds[2];
int peer_fd = peer->local->fd;
if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) {
status_trace("Failed to create socketpair: %s",
@ -755,21 +771,19 @@ static bool send_peer_with_fds(struct peer *peer, const u8 *msg)
}
/* Now we talk to socket to get to peer's owner daemon. */
peer->local = false;
daemon_conn_init(peer, &peer->owner_conn, fds[0],
peer->local = tal_free(peer->local);
peer->remote = tal(peer, struct daemon_conn);
daemon_conn_init(peer, peer->remote, fds[0],
owner_msg_in, forget_peer);
peer->owner_conn.msg_queue_cleared_cb = nonlocal_dump_gossip;
peer->remote->msg_queue_cleared_cb = nonlocal_dump_gossip;
/* Peer stays around, even though caller will close conn. */
tal_steal(peer->daemon, peer);
daemon_conn_send(&peer->daemon->master, msg);
daemon_conn_send_fd(&peer->daemon->master, peer->fd);
daemon_conn_send_fd(&peer->daemon->master, peer_fd);
daemon_conn_send_fd(&peer->daemon->master, fds[1]);
/* Don't get confused: we can't use this any more. */
peer->fd = -1;
return true;
}
@ -781,19 +795,17 @@ static bool send_peer_with_fds(struct peer *peer, const u8 *msg)
static struct io_plan *nonlocal_dump_gossip(struct io_conn *conn, struct daemon_conn *dc)
{
struct queued_message *next;
struct peer *peer = container_of(dc, struct peer, owner_conn);
struct peer *peer = dc->ctx;
/* Make sure we are not connected directly */
if (peer->local)
return msg_queue_wait(conn, &peer->owner_conn.out,
daemon_conn_write_next, dc);
assert(!peer->local);
next = next_broadcast_message(peer->daemon->rstate->broadcasts,
&peer->broadcast_index);
if (!next) {
return msg_queue_wait(conn, &peer->owner_conn.out,
return msg_queue_wait(conn, &peer->remote->out,
daemon_conn_write_next, dc);
} else {
return io_write_wire(conn, next->payload, nonlocal_dump_gossip, dc);
@ -802,14 +814,15 @@ static struct io_plan *nonlocal_dump_gossip(struct io_conn *conn, struct daemon_
static struct io_plan *new_peer_got_fd(struct io_conn *conn, struct peer *peer)
{
peer->conn = io_new_conn(conn, peer->fd, peer_start_gossip, peer);
if (!peer->conn) {
peer->local->conn = io_new_conn(conn, peer->local->fd,
peer_start_gossip, peer);
if (!peer->local->conn) {
status_trace("Could not create connection for peer: %s",
strerror(errno));
tal_free(peer);
} else {
/* If conn dies, we forget peer. */
tal_steal(peer->conn, peer);
tal_steal(peer->local->conn, peer);
}
return daemon_conn_read_next(conn, &peer->daemon->master);
}
@ -860,9 +873,9 @@ static struct io_plan *handle_peer(struct io_conn *conn, struct daemon *daemon,
peer_finalized(peer);
if (tal_len(inner_msg))
msg_enqueue(&peer->peer_out, take(inner_msg));
msg_enqueue(&peer->local->peer_out, take(inner_msg));
return io_recv_fd(conn, &peer->fd, new_peer_got_fd, peer);
return io_recv_fd(conn, &peer->local->fd, new_peer_got_fd, peer);
}
static struct io_plan *release_peer(struct io_conn *conn, struct daemon *daemon,
@ -875,21 +888,21 @@ static struct io_plan *release_peer(struct io_conn *conn, struct daemon *daemon,
master_badmsg(WIRE_GOSSIPCTL_RELEASE_PEER, msg);
peer = find_peer(daemon, &id);
if (!peer || !peer->local || peer->return_to_master) {
if (!peer || !peer->local || peer->local->return_to_master) {
/* This can happen with dying peers, or reconnect */
status_trace("release_peer: peer %s %s",
type_to_string(trc, struct pubkey, &id),
!peer ? "not found"
: peer->return_to_master ? "already releasing"
: peer->local ? "already releasing"
: "not local");
msg = towire_gossipctl_release_peer_replyfail(msg);
daemon_conn_send(&daemon->master, take(msg));
} else {
peer->return_to_master = true;
peer->nongossip_msg = NULL;
peer->local->return_to_master = true;
peer->local->nongossip_msg = NULL;
/* Wake output, in case it's idle. */
msg_wake(&peer->peer_out);
msg_wake(&peer->local->peer_out);
}
return daemon_conn_read_next(conn, &daemon->master);
}
@ -1002,7 +1015,7 @@ static struct io_plan *ping_req(struct io_conn *conn, struct daemon *daemon,
if (tal_len(ping) > 65535)
status_failed(STATUS_FAIL_MASTER_IO, "Oversize ping");
msg_enqueue(&peer->peer_out, take(ping));
msg_enqueue(&peer->local->peer_out, take(ping));
status_trace("sending ping expecting %sresponse",
num_pong_bytes >= 65532 ? "no " : "");
@ -1016,7 +1029,7 @@ static struct io_plan *ping_req(struct io_conn *conn, struct daemon *daemon,
daemon_conn_send(&daemon->master,
take(towire_gossip_ping_reply(peer, true, 0)));
else
peer->num_pings_outstanding++;
peer->local->num_pings_outstanding++;
out:
return daemon_conn_read_next(conn, &daemon->master);

Loading…
Cancel
Save