diff --git a/gossipd/broadcast.c b/gossipd/broadcast.c index c5d118243..8fc02eefb 100644 --- a/gossipd/broadcast.c +++ b/gossipd/broadcast.c @@ -27,13 +27,13 @@ void queue_broadcast(struct broadcast_state *bstate, const u8 *payload) { struct queued_message *msg; - u64 index = 0; + u64 index; + /* Remove any tag&type collisions */ - while (true) { - msg = next_broadcast_message(bstate, &index); - if (msg == NULL) - break; - else if (msg->type == type && memcmp(msg->tag, tag, tal_count(tag)) == 0) { + for (msg = uintmap_first(&bstate->broadcasts, &index); + msg; + msg = uintmap_after(&bstate->broadcasts, &index)) { + if (msg->type == type && memcmp(msg->tag, tag, tal_count(tag)) == 0) { uintmap_del(&bstate->broadcasts, index); tal_free(msg); } @@ -45,7 +45,7 @@ void queue_broadcast(struct broadcast_state *bstate, bstate->next_index += 1; } -struct queued_message *next_broadcast_message(struct broadcast_state *bstate, u64 *last_index) +struct queued_message *next_broadcast_message(struct broadcast_state *bstate, u64 last_index) { - return uintmap_after(&bstate->broadcasts, last_index); + return uintmap_after(&bstate->broadcasts, &last_index); } diff --git a/gossipd/broadcast.h b/gossipd/broadcast.h index 80545c4f6..baa561d4f 100644 --- a/gossipd/broadcast.h +++ b/gossipd/broadcast.h @@ -35,6 +35,6 @@ void queue_broadcast(struct broadcast_state *bstate, const u8 *tag, const u8 *payload); -struct queued_message *next_broadcast_message(struct broadcast_state *bstate, u64 *last_index); +struct queued_message *next_broadcast_message(struct broadcast_state *bstate, u64 last_index); #endif /* LIGHTNING_LIGHTNINGD_GOSSIP_BROADCAST_H */ diff --git a/gossipd/gossip.c b/gossipd/gossip.c index ab38d880b..e51532e4f 100644 --- a/gossipd/gossip.c +++ b/gossipd/gossip.c @@ -629,6 +629,17 @@ static void wake_pkt_out(struct peer *peer) msg_wake(&peer->remote->out); } +/* Mutual recursion. */ +static struct io_plan *peer_pkt_out(struct io_conn *conn, struct peer *peer); + +static struct io_plan *local_gossip_broadcast_done(struct io_conn *conn, + struct peer *peer) +{ + status_trace("%s", __func__); + peer->broadcast_index++; + return peer_pkt_out(conn, peer); +} + static struct io_plan *peer_pkt_out(struct io_conn *conn, struct peer *peer) { /* First priority is queued packets, if any */ @@ -655,11 +666,12 @@ static struct io_plan *peer_pkt_out(struct io_conn *conn, struct peer *peer) struct queued_message *next; next = next_broadcast_message(peer->daemon->rstate->broadcasts, - &peer->broadcast_index); + peer->broadcast_index); if (next) return peer_write_message(conn, &peer->local->pcs, - next->payload, peer_pkt_out); + next->payload, + local_gossip_broadcast_done); /* Gossip is drained. Wait for next timer. */ peer->gossip_sync = false; @@ -787,6 +799,16 @@ static bool send_peer_with_fds(struct peer *peer, const u8 *msg) return true; } +static struct io_plan *nonlocal_gossip_broadcast_done(struct io_conn *conn, + struct daemon_conn *dc) +{ + struct peer *peer = dc->ctx; + + status_trace("%s", __func__); + peer->broadcast_index++; + return nonlocal_dump_gossip(conn, dc); +} + /** * nonlocal_dump_gossip - catch the nonlocal peer up with the latest gossip. * @@ -802,13 +824,14 @@ static struct io_plan *nonlocal_dump_gossip(struct io_conn *conn, struct daemon_ assert(!peer->local); next = next_broadcast_message(peer->daemon->rstate->broadcasts, - &peer->broadcast_index); + peer->broadcast_index); if (!next) { return msg_queue_wait(conn, &peer->remote->out, daemon_conn_write_next, dc); } else { - return io_write_wire(conn, next->payload, nonlocal_dump_gossip, dc); + return io_write_wire(conn, next->payload, + nonlocal_gossip_broadcast_done, dc); } }