Browse Source

gossipd: handle gossip_timestamp_filter message.

And initialize filter (to "never") when we negotiated LOCAL_GOSSIP_QUERIES,
and send initial filter message.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
ppa-0.6.1
Rusty Russell 7 years ago
parent
commit
531c82b6ad
  1. 12
      gossipd/broadcast.c
  2. 7
      gossipd/broadcast.h
  3. 159
      gossipd/gossip.c

12
gossipd/broadcast.c

@ -51,12 +51,16 @@ void insert_broadcast(struct broadcast_state *bstate,
bstate->next_index++);
}
const u8 *next_broadcast(struct broadcast_state *bstate, u64 *last_index)
const u8 *next_broadcast(struct broadcast_state *bstate,
u32 timestamp_min, u32 timestamp_max,
u64 *last_index)
{
struct queued_message *m;
m = uintmap_after(&bstate->broadcasts, last_index);
if (m)
return m->payload;
while ((m = uintmap_after(&bstate->broadcasts, last_index)) != NULL) {
if (m->timestamp >= timestamp_min
&& m->timestamp <= timestamp_max)
return m->payload;
}
return NULL;
}

7
gossipd/broadcast.h

@ -20,7 +20,10 @@ struct broadcast_state *new_broadcast_state(tal_t *ctx);
void insert_broadcast(struct broadcast_state *bstate, const u8 *msg,
u32 timestamp);
/* Return the broadcast with index >= *last_index, and update *last_index.
/* Return the broadcast with index >= *last_index, timestamp >= min and <= max
* and update *last_index.
* There's no broadcast with index 0. */
const u8 *next_broadcast(struct broadcast_state *bstate, u64 *last_index);
const u8 *next_broadcast(struct broadcast_state *bstate,
u32 timestamp_min, u32 timestamp_max,
u64 *last_index);
#endif /* LIGHTNING_GOSSIPD_BROADCAST_H */

159
gossipd/gossip.c

@ -217,6 +217,9 @@ struct peer {
/* High water mark for the staggered broadcast */
u64 broadcast_index;
/* Timestamp range to filter gossip by */
u32 gossip_timestamp_min, gossip_timestamp_max;
/* Are there outstanding queries on short_channel_ids? */
const struct short_channel_id *scid_queries;
size_t scid_query_idx;
@ -352,6 +355,8 @@ static struct peer *new_peer(const tal_t *ctx,
peer->scid_query_nodes = NULL;
peer->scid_query_nodes_idx = 0;
peer->num_scid_queries_outstanding = 0;
peer->gossip_timestamp_min = 0;
peer->gossip_timestamp_max = UINT32_MAX;
return peer;
}
@ -493,6 +498,25 @@ static struct io_plan *retry_peer_connected(struct io_conn *conn,
return peer_connected(conn, peer);
}
static void setup_gossip_range(struct peer *peer)
{
bool gossip_queries;
u8 *msg;
gossip_queries = feature_offered(peer->lfeatures, LOCAL_GOSSIP_QUERIES)
&& feature_offered(peer->daemon->localfeatures,
LOCAL_GOSSIP_QUERIES);
if (!gossip_queries)
return;
/* Tell it to start gossip! (And give us everything!) */
msg = towire_gossip_timestamp_filter(peer,
&peer->daemon->rstate->chain_hash,
0, UINT32_MAX);
queue_peer_msg(peer, take(msg));
}
static struct io_plan *peer_connected(struct io_conn *conn, struct peer *peer)
{
struct peer *old_peer;
@ -523,16 +547,34 @@ static struct io_plan *peer_connected(struct io_conn *conn, struct peer *peer)
/* BOLT #7:
*
* Upon receiving an `init` message with the `initial_routing_sync`
* flag set the node sends `channel_announcement`s, `channel_update`s
* and `node_announcement`s for all known channels and nodes as if
* they were just received.
* - if the `gossip_queries` feature is negotiated:
* - MUST NOT relay any gossip messages unless explicitly requested.
*/
if (feature_offered(peer->lfeatures, LOCAL_INITIAL_ROUTING_SYNC))
peer->broadcast_index = 0;
else
peer->broadcast_index
= peer->daemon->rstate->broadcasts->next_index;
if (feature_offered(peer->lfeatures, LOCAL_GOSSIP_QUERIES)
&& feature_offered(peer->daemon->localfeatures, LOCAL_GOSSIP_QUERIES)) {
peer->broadcast_index = UINT64_MAX;
/* Nothing in this range */
peer->gossip_timestamp_min = UINT32_MAX;
peer->gossip_timestamp_max = 0;
} else {
/* BOLT #7:
*
* - upon receiving an `init` message with the
* `initial_routing_sync` flag set to 1:
* - SHOULD send `channel_announcement`s, `channel_update`s
* and `node_announcement`s for all known channels and
* nodes, as if they were just received.
* - if the `initial_routing_sync` flag is set to 0, OR if the
* initial sync was completed:
* - SHOULD resume normal operation, as specified in the
* following [Rebroadcasting](#rebroadcasting) section.
*/
if (feature_offered(peer->lfeatures, LOCAL_INITIAL_ROUTING_SYNC))
peer->broadcast_index = 0;
else
peer->broadcast_index
= peer->daemon->rstate->broadcasts->next_index;
}
/* This is a full peer now; we keep it around until master says
* it's dead. */
@ -548,6 +590,7 @@ static struct io_plan *peer_connected(struct io_conn *conn, struct peer *peer)
/* Start the gossip flowing. */
wake_pkt_out(peer);
setup_gossip_range(peer);
return io_close_taken_fd(conn);
}
@ -821,6 +864,40 @@ static void handle_query_short_channel_ids(struct peer *peer, u8 *msg)
msg_wake(&peer->remote->out);
}
static void handle_gossip_timestamp_filter(struct peer *peer, u8 *msg)
{
struct bitcoin_blkid chain_hash;
u32 first_timestamp, timestamp_range;
if (!fromwire_gossip_timestamp_filter(msg, &chain_hash,
&first_timestamp,
&timestamp_range)) {
peer_error(peer, "Bad gossip_timestamp_filter %s",
tal_hex(tmpctx, msg));
return;
}
if (!structeq(&peer->daemon->rstate->chain_hash, &chain_hash)) {
status_trace("%s sent gossip_timestamp_filter chainhash %s",
type_to_string(tmpctx, struct pubkey, &peer->id),
type_to_string(tmpctx, struct bitcoin_blkid,
&chain_hash));
return;
}
/* First time, start gossip sync immediately. */
if (peer->gossip_timestamp_min > peer->gossip_timestamp_max)
wake_pkt_out(peer);
/* FIXME: We don't index by timestamp, so this forces a brute
* search! */
peer->gossip_timestamp_min = first_timestamp;
peer->gossip_timestamp_max = first_timestamp + timestamp_range - 1;
if (peer->gossip_timestamp_max < peer->gossip_timestamp_min)
peer->gossip_timestamp_max = UINT32_MAX;
peer->broadcast_index = 0;
}
static void handle_ping(struct peer *peer, u8 *ping)
{
u8 *pong;
@ -963,6 +1040,10 @@ static struct io_plan *peer_msgin(struct io_conn *conn,
handle_reply_short_channel_ids_end(peer, msg);
return peer_next_in(conn, peer);
case WIRE_GOSSIP_TIMESTAMP_FILTER:
handle_gossip_timestamp_filter(peer, msg);
return peer_next_in(conn, peer);
case WIRE_OPEN_CHANNEL:
case WIRE_CHANNEL_REESTABLISH:
case WIRE_ACCEPT_CHANNEL:
@ -989,7 +1070,6 @@ static struct io_plan *peer_msgin(struct io_conn *conn,
case WIRE_QUERY_CHANNEL_RANGE:
case WIRE_REPLY_CHANNEL_RANGE:
case WIRE_GOSSIP_TIMESTAMP_FILTER:
/* FIXME: Implement */
return peer_next_in(conn, peer);
}
@ -1161,6 +1241,29 @@ static bool create_next_scid_reply(struct peer *peer)
return sent;
}
/* If we're supposed to be sending gossip, do so now. */
static bool maybe_queue_gossip(struct peer *peer)
{
const u8 *next;
if (!peer->gossip_sync)
return false;
next = next_broadcast(peer->daemon->rstate->broadcasts,
peer->gossip_timestamp_min,
peer->gossip_timestamp_max,
&peer->broadcast_index);
if (next) {
queue_peer_msg(peer, next);
return true;
}
/* Gossip is drained. Wait for next timer. */
peer->gossip_sync = false;
return false;
}
static struct io_plan *peer_pkt_out(struct io_conn *conn, struct peer *peer)
{
/* First priority is queued packets, if any */
@ -1183,20 +1286,8 @@ again:
return ready_for_master(conn, peer);
} else if (create_next_scid_reply(peer)) {
goto again;
} else if (peer->gossip_sync) {
/* If we're supposed to be sending gossip, do so now. */
const u8 *next;
next = next_broadcast(peer->daemon->rstate->broadcasts,
&peer->broadcast_index);
if (next)
return peer_write_message(conn, &peer->local->pcs,
next,
peer_pkt_out);
/* Gossip is drained. Wait for next timer. */
peer->gossip_sync = false;
} else if (maybe_queue_gossip(peer)) {
goto again;
}
return msg_queue_wait(conn, &peer->local->peer_out, peer_pkt_out, peer);
@ -1397,6 +1488,8 @@ static struct io_plan *owner_msg_in(struct io_conn *conn,
handle_query_short_channel_ids(peer, dc->msg_in);
} else if (type == WIRE_REPLY_SHORT_CHANNEL_IDS_END) {
handle_reply_short_channel_ids_end(peer, dc->msg_in);
} else if (type == WIRE_GOSSIP_TIMESTAMP_FILTER) {
handle_gossip_timestamp_filter(peer, dc->msg_in);
} else if (type == WIRE_GOSSIP_GET_UPDATE) {
handle_get_update(peer, dc->msg_in);
} else if (type == WIRE_GOSSIP_LOCAL_ADD_CHANNEL) {
@ -1465,7 +1558,6 @@ static bool send_peer_with_fds(struct peer *peer, const u8 *msg)
*/
static bool nonlocal_dump_gossip(struct io_conn *conn, struct daemon_conn *dc)
{
const u8 *next;
struct peer *peer = dc->ctx;
/* Make sure we are not connected directly */
@ -1475,21 +1567,8 @@ static bool nonlocal_dump_gossip(struct io_conn *conn, struct daemon_conn *dc)
if (create_next_scid_reply(peer))
return true;
/* Nothing to do if we're not gossiping */
if (!peer->gossip_sync)
return false;
next = next_broadcast(peer->daemon->rstate->broadcasts,
&peer->broadcast_index);
if (!next) {
peer->gossip_sync = false;
return false;
} else {
u8 *msg = towire_gossip_send_gossip(NULL, next);
daemon_conn_send(peer->remote, take(msg));
return true;
}
/* Otherwise queue any gossip we want to send */
return maybe_queue_gossip(peer);
}
static struct io_plan *new_peer_got_fd(struct io_conn *conn, struct peer *peer)

Loading…
Cancel
Save