Browse Source

gossipd: remove broadcast map altogether.

This clarifies things a fair bit: we simply add and remove from the
gossip_store directly.

Before this series: (--disable-developer, -Og)
    store_load_msec:20669-20902(20822.2+/-82)
    vsz_kb:439704-439712(439706+/-3.2)
    listnodes_sec:0.890000-1.000000(0.92+/-0.04)
    listchannels_sec:11.960000-13.380000(12.576+/-0.49)
    routing_sec:3.070000-5.970000(4.814+/-1.2)
    peer_write_all_sec:28.490000-30.580000(29.532+/-0.78)

After: (--disable-developer, -Og)
    store_load_msec:19722-20124(19921.6+/-1.4e+02)
    vsz_kb:288320
    listnodes_sec:0.860000-0.980000(0.912+/-0.056)
    listchannels_sec:10.790000-12.260000(11.65+/-0.5)
    routing_sec:2.540000-4.950000(4.262+/-0.88)
    peer_write_all_sec:17.570000-19.500000(18.048+/-0.73)

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
htlc_accepted_hook
Rusty Russell 6 years ago
parent
commit
3e733afb2b
  1. 8
      gossipd/Makefile
  2. 227
      gossipd/broadcast.c
  3. 61
      gossipd/broadcast.h
  4. 61
      gossipd/gossip_store.c
  5. 26
      gossipd/gossip_store.h
  6. 22
      gossipd/gossipd.c
  7. 71
      gossipd/routing.c
  8. 8
      gossipd/routing.h
  9. 1
      gossipd/test/run-bench-find_route.c
  10. 1
      gossipd/test/run-find_route-specific.c
  11. 1
      gossipd/test/run-find_route.c
  12. 1
      gossipd/test/run-overlong.c

8
gossipd/Makefile

@ -12,13 +12,13 @@ LIGHTNINGD_GOSSIP_CONTROL_SRC := gossipd/gen_gossip_wire.c
LIGHTNINGD_GOSSIP_CONTROL_OBJS := $(LIGHTNINGD_GOSSIP_CONTROL_SRC:.c=.o) LIGHTNINGD_GOSSIP_CONTROL_OBJS := $(LIGHTNINGD_GOSSIP_CONTROL_SRC:.c=.o)
# gossipd needs these: # gossipd needs these:
LIGHTNINGD_GOSSIP_HEADERS := gossipd/gen_gossip_wire.h \ LIGHTNINGD_GOSSIP_HEADERS_WSRC := gossipd/gen_gossip_wire.h \
gossipd/gen_gossip_peerd_wire.h \ gossipd/gen_gossip_peerd_wire.h \
gossipd/gen_gossip_store.h \ gossipd/gen_gossip_store.h \
gossipd/gossip_store.h \ gossipd/gossip_store.h \
gossipd/routing.h \ gossipd/routing.h
gossipd/broadcast.h LIGHTNINGD_GOSSIP_HEADERS := $(LIGHTNINGD_GOSSIP_HEADERS_WSRC) gossipd/broadcast.h
LIGHTNINGD_GOSSIP_SRC := $(LIGHTNINGD_GOSSIP_HEADERS:.h=.c) gossipd/gossipd.c LIGHTNINGD_GOSSIP_SRC := $(LIGHTNINGD_GOSSIP_HEADERS_WSRC:.h=.c) gossipd/gossipd.c
LIGHTNINGD_GOSSIP_OBJS := $(LIGHTNINGD_GOSSIP_SRC:.c=.o) LIGHTNINGD_GOSSIP_OBJS := $(LIGHTNINGD_GOSSIP_SRC:.c=.o)
# Make sure these depend on everything. # Make sure these depend on everything.

227
gossipd/broadcast.c

@ -1,227 +0,0 @@
#include <bitcoin/block.h>
#include <bitcoin/pubkey.h>
#include <bitcoin/short_channel_id.h>
#include <ccan/crypto/siphash24/siphash24.h>
#include <ccan/htable/htable_type.h>
#include <ccan/mem/mem.h>
#include <common/memleak.h>
#include <common/pseudorand.h>
#include <common/status.h>
#include <common/type_to_string.h>
#include <errno.h>
#include <gossipd/broadcast.h>
#include <gossipd/gossip_store.h>
#include <wire/gen_peer_wire.h>
static void destroy_broadcast_state(struct broadcast_state *bstate)
{
uintmap_clear(&bstate->broadcasts);
}
struct broadcast_state *new_broadcast_state(struct routing_state *rstate,
struct gossip_store *gs,
struct list_head *peers)
{
struct broadcast_state *bstate = tal(rstate, struct broadcast_state);
uintmap_init(&bstate->broadcasts);
bstate->count = 0;
bstate->gs = gs;
bstate->peers = peers;
tal_add_destructor(bstate, destroy_broadcast_state);
return bstate;
}
void broadcast_del(struct broadcast_state *bstate,
struct broadcastable *bcast)
{
const struct broadcastable *b
= uintmap_del(&bstate->broadcasts, bcast->index);
if (b != NULL) {
assert(b == bcast);
bstate->count--;
broadcast_state_check(bstate, "broadcast_del");
}
bcast->index = 0;
}
static void add_broadcast(struct broadcast_state *bstate,
struct broadcastable *bcast)
{
assert(bcast);
assert(bcast->index);
if (!uintmap_add(&bstate->broadcasts, bcast->index, bcast))
abort();
bstate->count++;
}
void insert_broadcast_nostore(struct broadcast_state *bstate,
struct broadcastable *bcast)
{
add_broadcast(bstate, bcast);
broadcast_state_check(bstate, "insert_broadcast");
}
void insert_broadcast(struct broadcast_state **bstate,
const u8 *msg,
const u8 *addendum,
struct broadcastable *bcast)
{
u32 offset;
assert(bcast->timestamp);
/* If we're loading from the store, we already have index */
if (!bcast->index) {
u64 idx;
bcast->index = idx = gossip_store_add((*bstate)->gs, msg,
bcast->timestamp,
addendum);
if (!idx)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Could not add to gossip store: %s",
strerror(errno));
/* We assume we can fit in 32 bits for now! */
assert(idx == bcast->index);
}
insert_broadcast_nostore(*bstate, bcast);
/* If it compacts, it replaces *bstate */
if (gossip_store_maybe_compact((*bstate)->gs, bstate, &offset))
update_peers_broadcast_index((*bstate)->peers, offset);
}
struct broadcastable *next_broadcast_raw(struct broadcast_state *bstate,
u32 *last_index)
{
struct broadcastable *b;
u64 idx = *last_index;
b = uintmap_after(&bstate->broadcasts, &idx);
if (!b)
return NULL;
/* Assert no overflow */
*last_index = idx;
assert(*last_index == idx);
return b;
}
struct broadcastable *next_broadcast(struct broadcast_state *bstate,
u32 timestamp_min, u32 timestamp_max,
u32 *last_index)
{
struct broadcastable *b;
while ((b = next_broadcast_raw(bstate, last_index)) != NULL) {
if (b->timestamp >= timestamp_min
&& b->timestamp <= timestamp_max) {
return b;
}
}
return NULL;
}
u64 broadcast_final_index(const struct broadcast_state *bstate)
{
u64 idx;
if (!uintmap_last(&bstate->broadcasts, &idx))
return 0;
return idx;
}
#ifdef PEDANTIC
static const struct pubkey *
pubkey_keyof(const struct pubkey *pk)
{
return pk;
}
static size_t pubkey_hash(const struct pubkey *id)
{
return siphash24(siphash_seed(), id, sizeof(*id));
}
HTABLE_DEFINE_TYPE(struct pubkey,
pubkey_keyof,
pubkey_hash,
pubkey_eq,
pubkey_set);
static void *corrupt(const char *abortstr, const char *problem,
const struct short_channel_id *scid,
const struct pubkey *node_id)
{
status_broken("Gossip corrupt %s %s: %s",
problem, abortstr ? abortstr : "",
scid ? type_to_string(tmpctx,
struct short_channel_id,
scid)
: type_to_string(tmpctx, struct pubkey, node_id));
if (abortstr)
abort();
return NULL;
}
struct broadcast_state *broadcast_state_check(struct broadcast_state *b,
const char *abortstr)
{
secp256k1_ecdsa_signature sig;
const u8 *msg;
u8 *features, *addresses, color[3], alias[32];
struct bitcoin_blkid chain_hash;
struct short_channel_id scid;
struct pubkey node_id_1, node_id_2, bitcoin_key;
u32 timestamp, fees;
u16 flags, expiry;
u32 index = 0;
u64 htlc_minimum_msat;
struct pubkey_set pubkeys;
struct broadcastable *bcast;
/* We actually only need a set, not a map. */
UINTMAP(u64 *) channels;
pubkey_set_init(&pubkeys);
uintmap_init(&channels);
while ((bcast = next_broadcast_raw(b, &index)) != NULL) {
msg = gossip_store_get(b, b->gs, b->index);
if (fromwire_channel_announcement(tmpctx, msg, &sig, &sig, &sig,
&sig, &features, &chain_hash,
&scid, &node_id_1, &node_id_2,
&bitcoin_key, &bitcoin_key)) {
if (!uintmap_add(&channels, scid.u64, &index))
return corrupt(abortstr, "announced twice",
&scid, NULL);
pubkey_set_add(&pubkeys, &node_id_1);
pubkey_set_add(&pubkeys, &node_id_2);
} else if (fromwire_channel_update(msg, &sig, &chain_hash,
&scid, &timestamp, &flags,
&expiry, &htlc_minimum_msat,
&fees, &fees)) {
if (!uintmap_get(&channels, scid.u64))
return corrupt(abortstr,
"updated before announce",
&scid, NULL);
} else if (fromwire_node_announcement(tmpctx, msg,
&sig, &features,
&timestamp,
&node_id_1, color, alias,
&addresses))
if (!pubkey_set_get(&pubkeys, &node_id_1))
return corrupt(abortstr,
"node announced before channel",
NULL, &node_id_1);
}
pubkey_set_clear(&pubkeys);
uintmap_clear(&channels);
return b;
}
#else
struct broadcast_state *broadcast_state_check(struct broadcast_state *b,
const char *abortstr UNUSED)
{
return b;
}
#endif

61
gossipd/broadcast.h

@ -2,15 +2,10 @@
#define LIGHTNING_GOSSIPD_BROADCAST_H #define LIGHTNING_GOSSIPD_BROADCAST_H
#include "config.h" #include "config.h"
#include <ccan/intmap/intmap.h>
#include <ccan/list/list.h> #include <ccan/list/list.h>
#include <ccan/short_types/short_types.h> #include <ccan/short_types/short_types.h>
#include <ccan/tal/tal.h> #include <ccan/tal/tal.h>
/* Common functionality to implement staggered broadcasts with replacement. */
struct routing_state;
/* This is nested inside a node, chan or half_chan; rewriting the store can /* This is nested inside a node, chan or half_chan; rewriting the store can
* cause it to change! */ * cause it to change! */
struct broadcastable { struct broadcastable {
@ -20,64 +15,8 @@ struct broadcastable {
u32 timestamp; u32 timestamp;
}; };
struct broadcast_state {
UINTMAP(struct broadcastable *) broadcasts;
size_t count;
struct gossip_store *gs;
struct list_head *peers;
};
static inline void broadcastable_init(struct broadcastable *bcast) static inline void broadcastable_init(struct broadcastable *bcast)
{ {
bcast->index = 0; bcast->index = 0;
} }
struct broadcast_state *new_broadcast_state(struct routing_state *rstate,
struct gossip_store *gs,
struct list_head *peers);
/**
* Append a queued message for broadcast.
* @bstate: the broadcast state, will be replaced if we compact the store.
* @msg: the message to append.
* @addendum: an extra message (for GOSSIP_STORE_CHANNEL_AMOUNT after announce)
* @bcast: broadcast location.
*
* If @bcast.index is 0, it is written into the store and set.
* @bcast.timestamp must be set, and non-zero.
*/
void insert_broadcast(struct broadcast_state **bstate,
const u8 *msg,
const u8 *addendum,
struct broadcastable *bcast);
/* Add to broadcast, but not store: for gossip store compaction. */
void insert_broadcast_nostore(struct broadcast_state *bstate,
struct broadcastable *bcast);
/* Delete a broadcast: not usually needed, since destructor does it */
void broadcast_del(struct broadcast_state *bstate,
struct broadcastable *bcast);
/* Return the next broadcastable entry; doesn't load it. */
struct broadcastable *next_broadcast_raw(struct broadcast_state *bstate,
u32 *last_index);
/* Return the broadcast with index >= *last_index, timestamp >= min and <= max
* and update *last_index.
* There's no broadcast with index 0. */
struct broadcastable *next_broadcast(struct broadcast_state *bstate,
u32 timestamp_min, u32 timestamp_max,
u32 *last_index);
/* index of last entry. */
u64 broadcast_final_index(const struct broadcast_state *bstate);
/* Returns b if all OK, otherwise aborts if abortstr non-NULL, otherwise returns
* NULL. */
struct broadcast_state *broadcast_state_check(struct broadcast_state *b,
const char *abortstr);
/* Callback for after we compacted the store */
void update_peers_broadcast_index(struct list_head *peers, u32 offset);
#endif /* LIGHTNING_GOSSIPD_BROADCAST_H */ #endif /* LIGHTNING_GOSSIPD_BROADCAST_H */

61
gossipd/gossip_store.c

@ -41,6 +41,9 @@ struct gossip_store {
* should it be needed */ * should it be needed */
struct routing_state *rstate; struct routing_state *rstate;
/* This is daemon->peers for handling to update_peers_broadcast_index */
struct list_head *peers;
/* Disable compaction if we encounter an error during a prior /* Disable compaction if we encounter an error during a prior
* compaction */ * compaction */
bool disable_compaction; bool disable_compaction;
@ -73,7 +76,8 @@ static bool append_msg(int fd, const u8 *msg, u32 timestamp, u64 *len)
return writev(fd, iov, ARRAY_SIZE(iov)) == sizeof(hdr) + msglen; return writev(fd, iov, ARRAY_SIZE(iov)) == sizeof(hdr) + msglen;
} }
struct gossip_store *gossip_store_new(struct routing_state *rstate) struct gossip_store *gossip_store_new(struct routing_state *rstate,
struct list_head *peers)
{ {
struct gossip_store *gs = tal(rstate, struct gossip_store); struct gossip_store *gs = tal(rstate, struct gossip_store);
gs->count = gs->deleted = 0; gs->count = gs->deleted = 0;
@ -82,6 +86,7 @@ struct gossip_store *gossip_store_new(struct routing_state *rstate)
gs->rstate = rstate; gs->rstate = rstate;
gs->disable_compaction = false; gs->disable_compaction = false;
gs->len = sizeof(gs->version); gs->len = sizeof(gs->version);
gs->peers = peers;
tal_add_destructor(gs, gossip_store_destroy); tal_add_destructor(gs, gossip_store_destroy);
@ -184,8 +189,6 @@ HTABLE_DEFINE_TYPE(struct offset_map,
offset_map_key, hash_offset, offset_map_eq, offmap); offset_map_key, hash_offset, offset_map_eq, offmap);
static void move_broadcast(struct offmap *offmap, static void move_broadcast(struct offmap *offmap,
struct broadcast_state *oldb,
struct broadcast_state *newb,
struct broadcastable *bcast, struct broadcastable *bcast,
const char *what) const char *what)
{ {
@ -199,9 +202,7 @@ static void move_broadcast(struct offmap *offmap,
status_failed(STATUS_FAIL_INTERNAL_ERROR, status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Could not relocate %s at offset %u", "Could not relocate %s at offset %u",
what, bcast->index); what, bcast->index);
broadcast_del(oldb, bcast);
bcast->index = omap->to; bcast->index = omap->to;
insert_broadcast_nostore(newb, bcast);
offmap_del(offmap, omap); offmap_del(offmap, omap);
} }
@ -215,19 +216,12 @@ static void destroy_offmap(struct offmap *offmap)
* *
* Creates a new file, writes all the updates from the `broadcast_state`, and * Creates a new file, writes all the updates from the `broadcast_state`, and
* then atomically swaps the files. * then atomically swaps the files.
*
* Returns the amount of shrinkage in @offset on success, otherwise @offset
* is unchanged.
*/ */
bool gossip_store_compact(struct gossip_store *gs, bool gossip_store_compact(struct gossip_store *gs)
struct broadcast_state **bs,
u32 *offset)
{ {
size_t count = 0, deleted = 0; size_t count = 0, deleted = 0;
int fd; int fd;
u64 off, len = sizeof(gs->version), idx; u64 off, len = sizeof(gs->version), idx;
struct broadcast_state *oldb = *bs;
struct broadcast_state *newb;
struct offmap *offmap; struct offmap *offmap;
struct gossip_hdr hdr; struct gossip_hdr hdr;
struct offmap_iter oit; struct offmap_iter oit;
@ -237,7 +231,6 @@ bool gossip_store_compact(struct gossip_store *gs,
if (gs->disable_compaction) if (gs->disable_compaction)
return false; return false;
assert(oldb);
status_trace( status_trace(
"Compacting gossip_store with %zu entries, %zu of which are stale", "Compacting gossip_store with %zu entries, %zu of which are stale",
gs->count, gs->deleted); gs->count, gs->deleted);
@ -294,26 +287,21 @@ bool gossip_store_compact(struct gossip_store *gs,
off += wlen; off += wlen;
} }
/* OK, now we've written file successfully, we can remap broadcast. */ /* OK, now we've written file successfully, we can move broadcasts. */
newb = new_broadcast_state(gs->rstate, gs, oldb->peers);
/* Remap node announcements. */ /* Remap node announcements. */
for (struct node *n = node_map_first(gs->rstate->nodes, &nit); for (struct node *n = node_map_first(gs->rstate->nodes, &nit);
n; n;
n = node_map_next(gs->rstate->nodes, &nit)) { n = node_map_next(gs->rstate->nodes, &nit)) {
move_broadcast(offmap, oldb, newb, &n->bcast, "node_announce"); move_broadcast(offmap, &n->bcast, "node_announce");
} }
/* Remap channel announcements and updates */ /* Remap channel announcements and updates */
for (struct chan *c = uintmap_first(&gs->rstate->chanmap, &idx); for (struct chan *c = uintmap_first(&gs->rstate->chanmap, &idx);
c; c;
c = uintmap_after(&gs->rstate->chanmap, &idx)) { c = uintmap_after(&gs->rstate->chanmap, &idx)) {
move_broadcast(offmap, oldb, newb, &c->bcast, move_broadcast(offmap, &c->bcast, "channel_announce");
"channel_announce"); move_broadcast(offmap, &c->half[0].bcast, "channel_update");
move_broadcast(offmap, oldb, newb, &c->half[0].bcast, move_broadcast(offmap, &c->half[1].bcast, "channel_update");
"channel_update");
move_broadcast(offmap, oldb, newb, &c->half[1].bcast,
"channel_update");
} }
/* That should be everything. */ /* That should be everything. */
@ -347,13 +335,12 @@ bool gossip_store_compact(struct gossip_store *gs,
deleted, count, len); deleted, count, len);
gs->count = count; gs->count = count;
gs->deleted = 0; gs->deleted = 0;
*offset = gs->len - len; off = gs->len - len;
gs->len = len; gs->len = len;
close(gs->fd); close(gs->fd);
gs->fd = fd; gs->fd = fd;
tal_free(oldb); update_peers_broadcast_index(gs->peers, off);
*bs = newb;
return true; return true;
unlink_disable: unlink_disable:
@ -362,25 +349,20 @@ disable:
status_trace("Encountered an error while compacting, disabling " status_trace("Encountered an error while compacting, disabling "
"future compactions."); "future compactions.");
gs->disable_compaction = true; gs->disable_compaction = true;
tal_free(newb);
return false; return false;
} }
bool gossip_store_maybe_compact(struct gossip_store *gs, static void gossip_store_maybe_compact(struct gossip_store *gs)
struct broadcast_state **bs,
u32 *offset)
{ {
*offset = 0;
/* Don't compact while loading! */ /* Don't compact while loading! */
if (!gs->writable) if (!gs->writable)
return false; return;
if (gs->count < 1000) if (gs->count < 1000)
return false; return;
if (gs->deleted < gs->count / 4) if (gs->deleted < gs->count / 4)
return false; return;
return gossip_store_compact(gs, bs, offset); gossip_store_compact(gs);
} }
u64 gossip_store_add(struct gossip_store *gs, const u8 *gossip_msg, u64 gossip_store_add(struct gossip_store *gs, const u8 *gossip_msg,
@ -458,6 +440,11 @@ void gossip_store_delete(struct gossip_store *gs,
bcast->index, strerror(errno)); bcast->index, strerror(errno));
fcntl(gs->fd, F_SETFL, flags); fcntl(gs->fd, F_SETFL, flags);
gs->deleted++; gs->deleted++;
/* Reset index. */
bcast->index = 0;
gossip_store_maybe_compact(gs);
} }
const u8 *gossip_store_get(const tal_t *ctx, const u8 *gossip_store_get(const tal_t *ctx,

26
gossipd/gossip_store.h

@ -12,11 +12,11 @@
* gossip_store -- On-disk storage related information * gossip_store -- On-disk storage related information
*/ */
struct broadcast_state;
struct gossip_store; struct gossip_store;
struct routing_state; struct routing_state;
struct gossip_store *gossip_store_new(struct routing_state *rstate); struct gossip_store *gossip_store_new(struct routing_state *rstate,
struct list_head *peers);
/** /**
* Load the initial gossip store, if any. * Load the initial gossip store, if any.
@ -65,23 +65,8 @@ const u8 *gossip_store_get_private_update(const tal_t *ctx,
struct gossip_store *gs, struct gossip_store *gs,
u64 offset); u64 offset);
/** /* Exposed for dev-compact-gossip-store to force compaction. */
* If we need to compact the gossip store, do so. bool gossip_store_compact(struct gossip_store *gs);
* @gs: the gossip store.
* @bs: a pointer to the broadcast state: replaced if we compact it.
* @offset: the change in the store, if any.
*
* If return value is true, caller must update peers.
*/
bool gossip_store_maybe_compact(struct gossip_store *gs,
struct broadcast_state **bs,
u32 *offset);
/* Expose for dev-compact-gossip-store to force compaction. */
bool gossip_store_compact(struct gossip_store *gs,
struct broadcast_state **bs,
u32 *offset);
/** /**
* Get a readonly fd for the gossip_store. * Get a readonly fd for the gossip_store.
@ -91,4 +76,7 @@ bool gossip_store_compact(struct gossip_store *gs,
*/ */
int gossip_store_readonly_fd(struct gossip_store *gs); int gossip_store_readonly_fd(struct gossip_store *gs);
/* Callback inside gossipd when store is compacted */
void update_peers_broadcast_index(struct list_head *peers, u32 offset);
#endif /* LIGHTNING_GOSSIPD_GOSSIP_STORE_H */ #endif /* LIGHTNING_GOSSIPD_GOSSIP_STORE_H */

22
gossipd/gossipd.c

@ -220,7 +220,7 @@ static void queue_peer_msg(struct peer *peer, const u8 *msg TAKES)
static void queue_peer_from_store(struct peer *peer, static void queue_peer_from_store(struct peer *peer,
const struct broadcastable *bcast) const struct broadcastable *bcast)
{ {
struct gossip_store *gs = peer->daemon->rstate->broadcasts->gs; struct gossip_store *gs = peer->daemon->rstate->gs;
queue_peer_msg(peer, take(gossip_store_get(NULL, gs, bcast->index))); queue_peer_msg(peer, take(gossip_store_get(NULL, gs, bcast->index)));
} }
@ -421,8 +421,7 @@ static bool get_node_announcement(const tal_t *ctx,
if (!n->bcast.index) if (!n->bcast.index)
return false; return false;
msg = gossip_store_get(tmpctx, daemon->rstate->broadcasts->gs, msg = gossip_store_get(tmpctx, daemon->rstate->gs, n->bcast.index);
n->bcast.index);
/* Note: validity of node_id is already checked. */ /* Note: validity of node_id is already checked. */
if (!fromwire_node_announcement(ctx, msg, if (!fromwire_node_announcement(ctx, msg,
@ -631,7 +630,7 @@ void update_peers_broadcast_index(struct list_head *peers, u32 offset)
* new store. We also tell them how much this is shrunk, so * new store. We also tell them how much this is shrunk, so
* they can (approximately) tell where to start in the new store. * they can (approximately) tell where to start in the new store.
*/ */
gs_fd = gossip_store_readonly_fd(peer->daemon->rstate->broadcasts->gs); gs_fd = gossip_store_readonly_fd(peer->daemon->rstate->gs);
if (gs_fd < 0) { if (gs_fd < 0) {
status_broken("Can't get read-only gossip store fd:" status_broken("Can't get read-only gossip store fd:"
" killing peer"); " killing peer");
@ -1357,7 +1356,7 @@ static bool handle_get_update(struct peer *peer, const u8 *msg)
if (!is_halfchan_defined(&chan->half[direction])) if (!is_halfchan_defined(&chan->half[direction]))
update = NULL; update = NULL;
else else
update = gossip_store_get(tmpctx, rstate->broadcasts->gs, update = gossip_store_get(tmpctx, rstate->gs,
chan->half[direction].bcast.index); chan->half[direction].bcast.index);
out: out:
status_trace("peer %s schanid %s: %s update", status_trace("peer %s schanid %s: %s update",
@ -1591,7 +1590,7 @@ static struct io_plan *connectd_new_peer(struct io_conn *conn,
return io_close(conn); return io_close(conn);
} }
gossip_store_fd = gossip_store_readonly_fd(daemon->rstate->broadcasts->gs);; gossip_store_fd = gossip_store_readonly_fd(daemon->rstate->gs);;
if (gossip_store_fd < 0) { if (gossip_store_fd < 0) {
status_broken("Failed to get readonly store fd: %s", status_broken("Failed to get readonly store fd: %s",
strerror(errno)); strerror(errno));
@ -1860,7 +1859,7 @@ static struct io_plan *gossip_init(struct io_conn *conn,
dev_gossip_time); dev_gossip_time);
/* Load stored gossip messages */ /* Load stored gossip messages */
gossip_store_load(daemon->rstate, daemon->rstate->broadcasts->gs); gossip_store_load(daemon->rstate, daemon->rstate->gs);
/* Now disable all local channels, they can't be connected yet. */ /* Now disable all local channels, they can't be connected yet. */
gossip_disable_local_channels(daemon); gossip_disable_local_channels(daemon);
@ -2484,14 +2483,7 @@ static struct io_plan *dev_compact_store(struct io_conn *conn,
struct daemon *daemon, struct daemon *daemon,
const u8 *msg) const u8 *msg)
{ {
u32 offset; bool done = gossip_store_compact(daemon->rstate->gs);
bool done = gossip_store_compact(daemon->rstate->broadcasts->gs,
&daemon->rstate->broadcasts,
&offset);
/* Peers keep an offset into where they are with gossip. */
if (done)
update_peers_broadcast_index(&daemon->peers, offset);
daemon_conn_send(daemon->master, daemon_conn_send(daemon->master,
take(towire_gossip_dev_compact_store_reply(NULL, take(towire_gossip_dev_compact_store_reply(NULL,

71
gossipd/routing.c

@ -174,8 +174,7 @@ struct routing_state *new_routing_state(const tal_t *ctx,
{ {
struct routing_state *rstate = tal(ctx, struct routing_state); struct routing_state *rstate = tal(ctx, struct routing_state);
rstate->nodes = new_node_map(rstate); rstate->nodes = new_node_map(rstate);
rstate->broadcasts rstate->gs = gossip_store_new(rstate, peers);
= new_broadcast_state(rstate, gossip_store_new(rstate), peers);
rstate->chainparams = chainparams; rstate->chainparams = chainparams;
rstate->local_id = *local_id; rstate->local_id = *local_id;
rstate->prune_timeout = prune_timeout; rstate->prune_timeout = prune_timeout;
@ -227,9 +226,6 @@ static void destroy_node(struct node *node, struct routing_state *rstate)
struct chan *c; struct chan *c;
node_map_del(rstate->nodes, node); node_map_del(rstate->nodes, node);
/* Safe even if never placed in broadcast map */
broadcast_del(rstate->broadcasts, &node->bcast);
/* These remove themselves from chans[]. */ /* These remove themselves from chans[]. */
while ((c = first_chan(node, &i)) != NULL) while ((c = first_chan(node, &i)) != NULL)
free_chan(rstate, c); free_chan(rstate, c);
@ -330,7 +326,7 @@ static void remove_chan_from_node(struct routing_state *rstate,
/* Last channel? Simply delete node (and associated announce) */ /* Last channel? Simply delete node (and associated announce) */
if (num_chans == 0) { if (num_chans == 0) {
gossip_store_delete(rstate->broadcasts->gs, gossip_store_delete(rstate->gs,
&node->bcast, &node->bcast,
WIRE_NODE_ANNOUNCEMENT); WIRE_NODE_ANNOUNCEMENT);
tal_free(node); tal_free(node);
@ -342,26 +338,26 @@ static void remove_chan_from_node(struct routing_state *rstate,
/* Removed only public channel? Remove node announcement. */ /* Removed only public channel? Remove node announcement. */
if (!node_has_broadcastable_channels(node)) { if (!node_has_broadcastable_channels(node)) {
gossip_store_delete(rstate->broadcasts->gs, gossip_store_delete(rstate->gs,
&node->bcast, &node->bcast,
WIRE_NODE_ANNOUNCEMENT); WIRE_NODE_ANNOUNCEMENT);
broadcast_del(rstate->broadcasts, &node->bcast);
} else if (node_announce_predates_channels(node)) { } else if (node_announce_predates_channels(node)) {
const u8 *announce; const u8 *announce;
announce = gossip_store_get(tmpctx, rstate->broadcasts->gs, announce = gossip_store_get(tmpctx, rstate->gs,
node->bcast.index); node->bcast.index);
/* node announcement predates all channel announcements? /* node announcement predates all channel announcements?
* Move to end (we could, in theory, move to just past next * Move to end (we could, in theory, move to just past next
* channel_announce, but we don't care that much about spurious * channel_announce, but we don't care that much about spurious
* retransmissions in this corner case */ * retransmissions in this corner case */
gossip_store_delete(rstate->broadcasts->gs, gossip_store_delete(rstate->gs,
&node->bcast, &node->bcast,
WIRE_NODE_ANNOUNCEMENT); WIRE_NODE_ANNOUNCEMENT);
broadcast_del(rstate->broadcasts, &node->bcast); node->bcast.index = gossip_store_add(rstate->gs,
insert_broadcast(&rstate->broadcasts, announce, NULL, announce,
&node->bcast); node->bcast.timestamp,
NULL);
} }
} }
@ -372,11 +368,6 @@ void free_chan(struct routing_state *rstate, struct chan *chan)
remove_chan_from_node(rstate, chan->nodes[0], chan); remove_chan_from_node(rstate, chan->nodes[0], chan);
remove_chan_from_node(rstate, chan->nodes[1], chan); remove_chan_from_node(rstate, chan->nodes[1], chan);
/* Safe even if never placed in map */
broadcast_del(rstate->broadcasts, &chan->bcast);
broadcast_del(rstate->broadcasts, &chan->half[0].bcast);
broadcast_del(rstate->broadcasts, &chan->half[1].bcast);
uintmap_del(&rstate->chanmap, chan->scid.u64); uintmap_del(&rstate->chanmap, chan->scid.u64);
/* Remove from local_disabled_map if it's there. */ /* Remove from local_disabled_map if it's there. */
@ -1361,9 +1352,13 @@ static void add_channel_announce_to_broadcast(struct routing_state *rstate,
chan->bcast.timestamp = timestamp; chan->bcast.timestamp = timestamp;
/* 0, unless we're loading from store */ /* 0, unless we're loading from store */
if (index)
chan->bcast.index = index; chan->bcast.index = index;
insert_broadcast(&rstate->broadcasts, channel_announce, addendum, else
&chan->bcast); chan->bcast.index = gossip_store_add(rstate->gs,
channel_announce,
chan->bcast.timestamp,
addendum);
rstate->local_channel_announced |= is_local_channel(rstate, chan); rstate->local_channel_announced |= is_local_channel(rstate, chan);
} }
@ -1409,12 +1404,12 @@ bool routing_add_channel_announcement(struct routing_state *rstate,
if (chan->half[0].bcast.index) if (chan->half[0].bcast.index)
private_updates[0] private_updates[0]
= gossip_store_get_private_update(NULL, = gossip_store_get_private_update(NULL,
rstate->broadcasts->gs, rstate->gs,
chan->half[0].bcast.index); chan->half[0].bcast.index);
if (chan->half[1].bcast.index) if (chan->half[1].bcast.index)
private_updates[1] private_updates[1]
= gossip_store_get_private_update(NULL, = gossip_store_get_private_update(NULL,
rstate->broadcasts->gs, rstate->gs,
chan->half[1].bcast.index); chan->half[1].bcast.index);
} }
@ -1843,12 +1838,11 @@ bool routing_add_channel_update(struct routing_state *rstate,
/* Safe even if was never added, but if it's a private channel it /* Safe even if was never added, but if it's a private channel it
* would be a WIRE_GOSSIP_STORE_PRIVATE_UPDATE. */ * would be a WIRE_GOSSIP_STORE_PRIVATE_UPDATE. */
gossip_store_delete(rstate->broadcasts->gs, gossip_store_delete(rstate->gs,
&chan->half[direction].bcast, &chan->half[direction].bcast,
is_chan_public(chan) is_chan_public(chan)
? WIRE_CHANNEL_UPDATE ? WIRE_CHANNEL_UPDATE
: WIRE_GOSSIP_STORE_PRIVATE_UPDATE); : WIRE_GOSSIP_STORE_PRIVATE_UPDATE);
broadcast_del(rstate->broadcasts, &chan->half[direction].bcast);
/* BOLT #7: /* BOLT #7:
* - MUST consider the `timestamp` of the `channel_announcement` to be * - MUST consider the `timestamp` of the `channel_announcement` to be
@ -1869,7 +1863,7 @@ bool routing_add_channel_update(struct routing_state *rstate,
assert(is_local_channel(rstate, chan)); assert(is_local_channel(rstate, chan));
if (!index) { if (!index) {
hc->bcast.index hc->bcast.index
= gossip_store_add_private_update(rstate->broadcasts->gs, = gossip_store_add_private_update(rstate->gs,
update); update);
} else } else
hc->bcast.index = index; hc->bcast.index = index;
@ -1877,11 +1871,13 @@ bool routing_add_channel_update(struct routing_state *rstate,
} }
/* If we're loading from store, this means we don't re-add to store. */ /* If we're loading from store, this means we don't re-add to store. */
if (index)
chan->half[direction].bcast.index = index; chan->half[direction].bcast.index = index;
else
insert_broadcast(&rstate->broadcasts, chan->half[direction].bcast.index
update, NULL, = gossip_store_add(rstate->gs, update,
&chan->half[direction].bcast); chan->half[direction].bcast.timestamp,
NULL);
if (uc) { if (uc) {
/* If we were waiting for these nodes to appear (or gain a /* If we were waiting for these nodes to appear (or gain a
@ -1924,11 +1920,11 @@ void remove_channel_from_store(struct routing_state *rstate,
} }
/* If these aren't in the store, these are noops. */ /* If these aren't in the store, these are noops. */
gossip_store_delete(rstate->broadcasts->gs, gossip_store_delete(rstate->gs,
&chan->bcast, announcment_type); &chan->bcast, announcment_type);
gossip_store_delete(rstate->broadcasts->gs, gossip_store_delete(rstate->gs,
&chan->half[0].bcast, update_type); &chan->half[0].bcast, update_type);
gossip_store_delete(rstate->broadcasts->gs, gossip_store_delete(rstate->gs,
&chan->half[1].bcast, update_type); &chan->half[1].bcast, update_type);
} }
@ -2141,14 +2137,17 @@ bool routing_add_node_announcement(struct routing_state *rstate,
} }
/* Harmless if it was never added */ /* Harmless if it was never added */
gossip_store_delete(rstate->broadcasts->gs, gossip_store_delete(rstate->gs,
&node->bcast, &node->bcast,
WIRE_NODE_ANNOUNCEMENT); WIRE_NODE_ANNOUNCEMENT);
broadcast_del(rstate->broadcasts, &node->bcast);
node->bcast.timestamp = timestamp; node->bcast.timestamp = timestamp;
if (index)
node->bcast.index = index; node->bcast.index = index;
insert_broadcast(&rstate->broadcasts, msg, NULL, &node->bcast); else
node->bcast.index
= gossip_store_add(rstate->gs, msg,
node->bcast.timestamp, NULL);
return true; return true;
} }
@ -2517,7 +2516,7 @@ bool handle_local_add_channel(struct routing_state *rstate,
/* Create new (unannounced) channel */ /* Create new (unannounced) channel */
chan = new_chan(rstate, &scid, &rstate->local_id, &remote_node_id, sat); chan = new_chan(rstate, &scid, &rstate->local_id, &remote_node_id, sat);
if (!index) if (!index)
index = gossip_store_add(rstate->broadcasts->gs, msg, 0, NULL); index = gossip_store_add(rstate->gs, msg, 0, NULL);
chan->bcast.index = index; chan->bcast.index = index;
return true; return true;
} }

8
gossipd/routing.h

@ -4,6 +4,7 @@
#include <bitcoin/pubkey.h> #include <bitcoin/pubkey.h>
#include <ccan/crypto/siphash24/siphash24.h> #include <ccan/crypto/siphash24/siphash24.h>
#include <ccan/htable/htable_type.h> #include <ccan/htable/htable_type.h>
#include <ccan/intmap/intmap.h>
#include <ccan/time/time.h> #include <ccan/time/time.h>
#include <common/amount.h> #include <common/amount.h>
#include <common/node_id.h> #include <common/node_id.h>
@ -13,6 +14,8 @@
#include <wire/gen_onion_wire.h> #include <wire/gen_onion_wire.h>
#include <wire/wire.h> #include <wire/wire.h>
struct routing_state;
struct half_chan { struct half_chan {
/* millisatoshi. */ /* millisatoshi. */
u32 base_fee; u32 base_fee;
@ -218,8 +221,8 @@ struct routing_state {
/* channel_announcement which are pending short_channel_id lookup */ /* channel_announcement which are pending short_channel_id lookup */
struct pending_cannouncement_map pending_cannouncements; struct pending_cannouncement_map pending_cannouncements;
/* Broadcast map, and access to gossip store */ /* Gossip store */
struct broadcast_state *broadcasts; struct gossip_store *gs;
/* Our own ID so we can identify local channels */ /* Our own ID so we can identify local channels */
struct node_id local_id; struct node_id local_id;
@ -429,4 +432,5 @@ struct wireaddr *read_addresses(const tal_t *ctx, const u8 *ser);
/* Remove channel from store: announcement and any updates. */ /* Remove channel from store: announcement and any updates. */
void remove_channel_from_store(struct routing_state *rstate, void remove_channel_from_store(struct routing_state *rstate,
struct chan *chan); struct chan *chan);
#endif /* LIGHTNING_GOSSIPD_ROUTING_H */ #endif /* LIGHTNING_GOSSIPD_ROUTING_H */

1
gossipd/test/run-bench-find_route.c

@ -14,7 +14,6 @@
#include "../routing.c" #include "../routing.c"
#include "../gossip_store.c" #include "../gossip_store.c"
#include "../broadcast.c"
void status_fmt(enum log_level level UNUSED, const char *fmt, ...) void status_fmt(enum log_level level UNUSED, const char *fmt, ...)
{ {

1
gossipd/test/run-find_route-specific.c

@ -13,7 +13,6 @@
#include "../routing.c" #include "../routing.c"
#include "../gossip_store.c" #include "../gossip_store.c"
#include "../broadcast.c"
/* AUTOGENERATED MOCKS START */ /* AUTOGENERATED MOCKS START */
/* Generated stub for fromwire_channel_announcement */ /* Generated stub for fromwire_channel_announcement */

1
gossipd/test/run-find_route.c

@ -1,6 +1,5 @@
#include "../routing.c" #include "../routing.c"
#include "../gossip_store.c" #include "../gossip_store.c"
#include "../broadcast.c"
#include <stdio.h> #include <stdio.h>
void status_fmt(enum log_level level UNUSED, const char *fmt, ...) void status_fmt(enum log_level level UNUSED, const char *fmt, ...)

1
gossipd/test/run-overlong.c

@ -1,6 +1,5 @@
#include "../routing.c" #include "../routing.c"
#include "../gossip_store.c" #include "../gossip_store.c"
#include "../broadcast.c"
#include <stdio.h> #include <stdio.h>
void status_fmt(enum log_level level UNUSED, const char *fmt, ...) void status_fmt(enum log_level level UNUSED, const char *fmt, ...)

Loading…
Cancel
Save