diff --git a/gossipd/broadcast.c b/gossipd/broadcast.c index c6af74de7..39c42db59 100644 --- a/gossipd/broadcast.c +++ b/gossipd/broadcast.c @@ -13,57 +13,49 @@ #include #include -struct queued_message { - struct broadcastable *bcast; - - /* Serialized payload */ - const u8 *payload; -}; +static void destroy_broadcast_state(struct broadcast_state *bstate) +{ + uintmap_clear(&bstate->broadcasts); +} -struct broadcast_state *new_broadcast_state(struct routing_state *rstate) +struct broadcast_state *new_broadcast_state(struct routing_state *rstate, + struct gossip_store *gs) { struct broadcast_state *bstate = tal(rstate, struct broadcast_state); uintmap_init(&bstate->broadcasts); bstate->count = 0; - bstate->gs = gossip_store_new(rstate); + bstate->gs = gs; + tal_add_destructor(bstate, destroy_broadcast_state); return bstate; } void broadcast_del(struct broadcast_state *bstate, struct broadcastable *bcast) { - const struct queued_message *q + const struct broadcastable *b = uintmap_del(&bstate->broadcasts, bcast->index); - if (q != NULL) { - assert(q->bcast == bcast); - tal_free(q); + if (b != NULL) { + assert(b == bcast); bstate->count--; broadcast_state_check(bstate, "broadcast_del"); bcast->index = 0; } } -static struct queued_message *new_queued_message(struct broadcast_state *bstate, - const u8 *payload, - struct broadcastable *bcast) +static void add_broadcast(struct broadcast_state *bstate, + struct broadcastable *bcast) { - struct queued_message *msg = tal(bstate, struct queued_message); - assert(payload); assert(bcast); assert(bcast->index); - msg->payload = payload; - msg->bcast = bcast; - if (!uintmap_add(&bstate->broadcasts, bcast->index, msg)) + if (!uintmap_add(&bstate->broadcasts, bcast->index, bcast)) abort(); bstate->count++; - return msg; } void insert_broadcast_nostore(struct broadcast_state *bstate, - const u8 *msg, struct broadcastable *bcast) { - new_queued_message(bstate, msg, bcast); + add_broadcast(bstate, bcast); broadcast_state_check(bstate, "insert_broadcast"); } @@ -84,40 +76,38 @@ void insert_broadcast(struct broadcast_state **bstate, assert(idx == bcast->index); } - insert_broadcast_nostore(*bstate, msg, bcast); + insert_broadcast_nostore(*bstate, bcast); /* If it compacts, it replaces *bstate */ gossip_store_maybe_compact((*bstate)->gs, bstate); } -const u8 *pop_first_broadcast(struct broadcast_state *bstate, - struct broadcastable **bcast) +struct broadcastable *next_broadcast_raw(struct broadcast_state *bstate, + u32 *last_index) { - u64 idx; - const u8 *msg; - struct queued_message *q = uintmap_first(&bstate->broadcasts, &idx); - if (!q) - return NULL; - - *bcast = q->bcast; - msg = q->payload; + struct broadcastable *b; + u64 idx = *last_index; - broadcast_del(bstate, *bcast); - return msg; + b = uintmap_after(&bstate->broadcasts, &idx); + if (!b) + return NULL; + /* Assert no overflow */ + *last_index = idx; + assert(*last_index == idx); + return b; } -const u8 *next_broadcast(struct broadcast_state *bstate, +const u8 *next_broadcast(const tal_t *ctx, + struct broadcast_state *bstate, u32 timestamp_min, u32 timestamp_max, u32 *last_index) { - struct queued_message *m; - u64 idx = *last_index; + struct broadcastable *b; - while ((m = uintmap_after(&bstate->broadcasts, &idx)) != NULL) { - if (m->bcast->timestamp >= timestamp_min - && m->bcast->timestamp <= timestamp_max) { - *last_index = idx; - return m->payload; + while ((b = next_broadcast_raw(bstate, last_index)) != NULL) { + if (b->timestamp >= timestamp_min + && b->timestamp <= timestamp_max) { + return gossip_store_get(ctx, bstate->gs, b->index); } } return NULL; diff --git a/gossipd/broadcast.h b/gossipd/broadcast.h index 0a5b5012e..a9dc57a07 100644 --- a/gossipd/broadcast.h +++ b/gossipd/broadcast.h @@ -11,12 +11,6 @@ struct routing_state; -struct broadcast_state { - UINTMAP(struct queued_message *) broadcasts; - size_t count; - struct gossip_store *gs; -}; - /* This is nested inside a node, chan or half_chan; rewriting the store can * cause it to change! */ struct broadcastable { @@ -26,13 +20,19 @@ struct broadcastable { u32 timestamp; }; +struct broadcast_state { + UINTMAP(struct broadcastable *) broadcasts; + size_t count; + struct gossip_store *gs; +}; + static inline void broadcastable_init(struct broadcastable *bcast) { bcast->index = 0; } -struct broadcast_state *new_broadcast_state(struct routing_state *rstate); - +struct broadcast_state *new_broadcast_state(struct routing_state *rstate, + struct gossip_store *gs); /* Append a queued message for broadcast. Must be explicitly deleted. * Also adds it to the gossip store. */ @@ -42,27 +42,27 @@ void insert_broadcast(struct broadcast_state **bstate, /* Add to broadcast, but not store: for gossip store compaction. */ void insert_broadcast_nostore(struct broadcast_state *bstate, - const u8 *msg, struct broadcastable *bcast); /* Delete a broadcast: not usually needed, since destructor does it */ void broadcast_del(struct broadcast_state *bstate, struct broadcastable *bcast); +/* Return the next broadcastable entry; doesn't load it. */ +struct broadcastable *next_broadcast_raw(struct broadcast_state *bstate, + u32 *last_index); + /* Return the broadcast with index >= *last_index, timestamp >= min and <= max * and update *last_index. * There's no broadcast with index 0. */ -const u8 *next_broadcast(struct broadcast_state *bstate, +const u8 *next_broadcast(const tal_t *ctx, + struct broadcast_state *bstate, u32 timestamp_min, u32 timestamp_max, u32 *last_index); /* index of last entry. */ u64 broadcast_final_index(const struct broadcast_state *bstate); -/* Return and remove first element: used by gossip_store_compact */ -const u8 *pop_first_broadcast(struct broadcast_state *bstate, - struct broadcastable **bcast); - /* Returns b if all OK, otherwise aborts if abortstr non-NULL, otherwise returns * NULL. */ struct broadcast_state *broadcast_state_check(struct broadcast_state *b, diff --git a/gossipd/gossip_store.c b/gossipd/gossip_store.c index 4312c07df..2a26d72cc 100644 --- a/gossipd/gossip_store.c +++ b/gossipd/gossip_store.c @@ -208,12 +208,12 @@ bool gossip_store_compact(struct gossip_store *gs, { size_t count = 0; int fd; - const u8 *msg; struct node *self; u64 len = sizeof(gs->version); struct broadcastable *bcast; struct broadcast_state *oldb = *bs; struct broadcast_state *newb; + u32 idx = 0; if (gs->disable_compaction) return false; @@ -223,7 +223,7 @@ bool gossip_store_compact(struct gossip_store *gs, "Compacting gossip_store with %zu entries, %zu of which are stale", gs->count, gs->count - oldb->count); - newb = new_broadcast_state(gs->rstate); + newb = new_broadcast_state(gs->rstate, gs); fd = open(GOSSIP_STORE_TEMP_FILENAME, O_RDWR|O_APPEND|O_CREAT, 0600); if (fd < 0) { @@ -238,14 +238,45 @@ bool gossip_store_compact(struct gossip_store *gs, goto unlink_disable; } - while ((msg = pop_first_broadcast(oldb, &bcast)) != NULL) { + /* Copy entries one at a time. */ + while ((bcast = next_broadcast_raw(oldb, &idx)) != NULL) { + beint32_t belen, becsum; + u32 msglen; + u8 *msg; + + /* FIXME: optimize both read and allocation */ + if (lseek(gs->fd, bcast->index, SEEK_SET) < 0 + || read(gs->fd, &belen, sizeof(belen)) != sizeof(belen) + || read(gs->fd, &becsum, sizeof(becsum)) != sizeof(becsum)) { + status_broken("Failed reading header from to gossip store @%u" + ": %s", + bcast->index, strerror(errno)); + goto unlink_disable; + } + + msglen = be32_to_cpu(belen); + msg = tal_arr(tmpctx, u8, sizeof(belen) + sizeof(becsum) + msglen); + memcpy(msg, &belen, sizeof(belen)); + memcpy(msg + sizeof(belen), &becsum, sizeof(becsum)); + if (read(gs->fd, msg + sizeof(belen) + sizeof(becsum), msglen) + != msglen) { + status_broken("Failed reading %u from to gossip store @%u" + ": %s", + msglen, bcast->index, strerror(errno)); + goto unlink_disable; + } + + broadcast_del(oldb, bcast); bcast->index = len; - insert_broadcast_nostore(newb, msg, bcast); - if (!gossip_store_append(fd, gs->rstate, msg, &len)) { + insert_broadcast_nostore(newb, bcast); + + if (write(fd, msg, msglen + sizeof(belen) + sizeof(becsum)) + != msglen + sizeof(belen) + sizeof(becsum)) { status_broken("Failed writing to gossip store: %s", strerror(errno)); goto unlink_disable; } + len += sizeof(belen) + sizeof(becsum) + msglen; count++; } @@ -266,8 +297,8 @@ bool gossip_store_compact(struct gossip_store *gs, } status_trace( - "Compaction completed: dropped %zu messages, new count %zu", - gs->count - count, count); + "Compaction completed: dropped %zu messages, new count %zu, len %"PRIu64, + gs->count - count, count, len); gs->count = count; gs->len = len; close(gs->fd); @@ -359,7 +390,7 @@ const u8 *gossip_store_get(const tal_t *ctx, msglen = be32_to_cpu(belen); checksum = be32_to_cpu(becsum); - msg = tal_arr(ctx, u8, msglen); + msg = tal_arr(tmpctx, u8, msglen); if (read(gs->fd, msg, msglen) != msglen) status_failed(STATUS_FAIL_INTERNAL_ERROR, "gossip_store: can't read len %u offset %"PRIu64 diff --git a/gossipd/gossipd.c b/gossipd/gossipd.c index 355486b0e..46b294df0 100644 --- a/gossipd/gossipd.c +++ b/gossipd/gossipd.c @@ -1127,13 +1127,13 @@ static void maybe_queue_gossip(struct peer *peer) * only needs to keep an index; this returns the next gossip message * which is past the previous index and within the timestamp: it * also updates `broadcast_index`. */ - next = next_broadcast(peer->daemon->rstate->broadcasts, + next = next_broadcast(NULL, peer->daemon->rstate->broadcasts, peer->gossip_timestamp_min, peer->gossip_timestamp_max, &peer->broadcast_index); if (next) { - queue_peer_msg(peer, next); + queue_peer_msg(peer, take(next)); return; } diff --git a/gossipd/routing.c b/gossipd/routing.c index f8afbdb38..54c9c3669 100644 --- a/gossipd/routing.c +++ b/gossipd/routing.c @@ -154,7 +154,7 @@ struct routing_state *new_routing_state(const tal_t *ctx, { struct routing_state *rstate = tal(ctx, struct routing_state); rstate->nodes = empty_node_map(rstate); - rstate->broadcasts = new_broadcast_state(rstate); + rstate->broadcasts = new_broadcast_state(rstate, gossip_store_new(rstate)); rstate->chainparams = chainparams; rstate->local_id = *local_id; rstate->prune_timeout = prune_timeout; @@ -1919,7 +1919,6 @@ void memleak_remove_routing_tables(struct htable *memtable, memleak_remove_htable(memtable, &rstate->nodes->raw); memleak_remove_htable(memtable, &rstate->pending_node_map->raw); - memleak_remove_uintmap(memtable, &rstate->broadcasts->broadcasts); for (n = node_map_first(rstate->nodes, &nit); n;