Browse Source

common/gossip_store: handle timestamp filtering.

This means we intercept the peer's gossip_timestamp_filter request
in the per-peer subdaemon itself.  The rest of the semantics are fairly
simple however.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
htlc_accepted_hook
Rusty Russell 6 years ago
parent
commit
728bb4e662
  1. 1
      closingd/Makefile
  2. 51
      common/gossip_store.c
  3. 6
      common/gossip_store.h
  4. 4
      common/per_peer_state.c
  5. 2
      common/per_peer_state.h
  6. 28
      common/read_peer_msg.c
  7. 7
      common/read_peer_msg.h
  8. 44
      gossipd/gossipd.c
  9. 4
      openingd/openingd.c
  10. 2
      tests/test_gossip.py
  11. 2
      wire/peer_wire.c

1
closingd/Makefile

@ -53,6 +53,7 @@ CLOSINGD_COMMON_OBJS := \
common/daemon_conn.o \
common/dev_disconnect.o \
common/derive_basepoints.o \
common/features.o \
common/gen_peer_status_wire.o \
common/gen_status_wire.o \
common/gossip_store.o \

51
common/gossip_store.c

@ -1,5 +1,6 @@
#include <assert.h>
#include <ccan/crc/crc.h>
#include <common/features.h>
#include <common/gossip_store.h>
#include <common/per_peer_state.h>
#include <common/status.h>
@ -9,6 +10,51 @@
#include <unistd.h>
#include <wire/gen_peer_wire.h>
void gossip_setup_timestamp_filter(struct per_peer_state *pps,
u32 first_timestamp,
u32 timestamp_range)
{
/* If this is the first filter, we gossip sync immediately. */
if (!pps->gs) {
pps->gs = tal(pps, struct gossip_state);
pps->gs->next_gossip = time_mono();
}
pps->gs->timestamp_min = first_timestamp;
pps->gs->timestamp_max = first_timestamp + timestamp_range - 1;
/* Make sure we never leave it on an impossible value. */
if (pps->gs->timestamp_max < pps->gs->timestamp_min)
pps->gs->timestamp_max = UINT32_MAX;
/* BOLT #7:
*
* The receiver:
* - SHOULD send all gossip messages whose `timestamp` is greater or
* equal to `first_timestamp`, and less than `first_timestamp` plus
* `timestamp_range`.
* - MAY wait for the next outgoing gossip flush to send these.
* - SHOULD restrict future gossip messages to those whose `timestamp`
* is greater or equal to `first_timestamp`, and less than
* `first_timestamp` plus `timestamp_range`.
*/
/* Restart just after header. */
lseek(pps->gossip_store_fd, 1, SEEK_SET);
}
static bool timestamp_filter(const struct per_peer_state *pps, u32 timestamp)
{
/* BOLT #7:
*
* - SHOULD send all gossip messages whose `timestamp` is greater or
* equal to `first_timestamp`, and less than `first_timestamp` plus
* `timestamp_range`.
*/
/* Note that we turn first_timestamp & timestamp_range into an inclusive range */
return timestamp >= pps->gs->timestamp_min
&& timestamp <= pps->gs->timestamp_max;
}
u8 *gossip_store_next(const tal_t *ctx, struct per_peer_state *pps)
{
u8 *msg = NULL;
@ -19,7 +65,7 @@ u8 *gossip_store_next(const tal_t *ctx, struct per_peer_state *pps)
while (!msg) {
struct gossip_hdr hdr;
u32 msglen, checksum;
u32 msglen, checksum, timestamp;
int type;
if (read(pps->gossip_store_fd, &hdr, sizeof(hdr)) != sizeof(hdr)) {
@ -38,6 +84,7 @@ u8 *gossip_store_next(const tal_t *ctx, struct per_peer_state *pps)
msglen = be32_to_cpu(hdr.len);
checksum = be32_to_cpu(hdr.crc);
timestamp = be32_to_cpu(hdr.timestamp);
msg = tal_arr(ctx, u8, msglen);
if (read(pps->gossip_store_fd, msg, msglen) != msglen)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
@ -61,6 +108,8 @@ u8 *gossip_store_next(const tal_t *ctx, struct per_peer_state *pps)
&& type != WIRE_CHANNEL_UPDATE
&& type != WIRE_NODE_ANNOUNCEMENT)
msg = tal_free(msg);
else if (!timestamp_filter(pps, timestamp))
msg = tal_free(msg);
}
return msg;

6
common/gossip_store.h

@ -40,4 +40,10 @@ u8 *gossip_store_next(const tal_t *ctx, struct per_peer_state *pps);
void gossip_store_switch_fd(struct per_peer_state *pps,
int newfd, u64 offset_shorter);
/**
* Sets up the tiemstamp filter once they told us to set it.(
*/
void gossip_setup_timestamp_filter(struct per_peer_state *pps,
u32 first_timestamp,
u32 timestamp_range);
#endif /* LIGHTNING_COMMON_GOSSIP_STORE_H */

4
common/per_peer_state.c

@ -48,6 +48,8 @@ void towire_gossip_state(u8 **pptr, const struct gossip_state *gs)
{
towire_u64(pptr, gs->next_gossip.ts.tv_sec);
towire_u64(pptr, gs->next_gossip.ts.tv_nsec);
towire_u32(pptr, gs->timestamp_min);
towire_u32(pptr, gs->timestamp_max);
}
void fromwire_gossip_state(const u8 **cursor, size_t *max,
@ -55,6 +57,8 @@ void fromwire_gossip_state(const u8 **cursor, size_t *max,
{
gs->next_gossip.ts.tv_sec = fromwire_u64(cursor, max);
gs->next_gossip.ts.tv_nsec = fromwire_u64(cursor, max);
gs->timestamp_min = fromwire_u32(cursor, max);
gs->timestamp_max = fromwire_u32(cursor, max);
}
void towire_per_peer_state(u8 **pptr, const struct per_peer_state *pps)

2
common/per_peer_state.h

@ -9,6 +9,8 @@
struct gossip_state {
/* Time for next gossip burst. */
struct timemono next_gossip;
/* Timestamp filtering for gossip. */
u32 timestamp_min, timestamp_max;
};
/* Things we hand between daemons to talk to peers. */

28
common/read_peer_msg.c

@ -117,16 +117,12 @@ void handle_gossip_msg(struct per_peer_state *pps, const u8 *msg TAKES)
gossip = tal_dup_arr(tmpctx, u8, msg, tal_bytelen(msg), 0);
/* Gossipd can send us gossip messages, OR errors */
if (is_msg_for_gossipd(gossip)) {
sync_crypto_write(pps, gossip);
} else if (fromwire_peektype(gossip) == WIRE_ERROR) {
if (fromwire_peektype(gossip) == WIRE_ERROR) {
status_debug("Gossipd told us to send error");
sync_crypto_write(pps, gossip);
peer_failed_connection_lost();
} else {
status_broken("Gossipd gave us bad send_gossip message %s",
tal_hex(tmpctx, gossip));
peer_failed_connection_lost();
sync_crypto_write(pps, gossip);
}
out:
@ -134,6 +130,22 @@ out:
tal_free(msg);
}
/* takes iff returns true */
bool handle_timestamp_filter(struct per_peer_state *pps, const u8 *msg TAKES)
{
struct bitcoin_blkid chain_hash; /* FIXME: don't ignore! */
u32 first_timestamp, timestamp_range;
if (!fromwire_gossip_timestamp_filter(msg, &chain_hash,
&first_timestamp,
&timestamp_range)) {
return false;
}
gossip_setup_timestamp_filter(pps, first_timestamp, timestamp_range);
return true;
}
bool handle_peer_gossip_or_error(struct per_peer_state *pps,
const struct channel_id *channel_id,
const u8 *msg TAKES)
@ -142,7 +154,9 @@ bool handle_peer_gossip_or_error(struct per_peer_state *pps,
bool all_channels;
struct channel_id actual;
if (is_msg_for_gossipd(msg)) {
if (handle_timestamp_filter(pps, msg))
return true;
else if (is_msg_for_gossipd(msg)) {
wire_sync_write(pps->gossip_fd, msg);
/* wire_sync_write takes, so don't take again. */
return true;

7
common/read_peer_msg.h

@ -68,6 +68,13 @@ bool handle_peer_gossip_or_error(struct per_peer_state *pps,
const struct channel_id *channel_id,
const u8 *msg TAKES);
/**
* handle_timestamp_filter - deal with timestamp filter requests.
* @pps: per-peer state.
* @msg: the peer message (only taken if returns true).
*/
bool handle_timestamp_filter(struct per_peer_state *pps, const u8 *msg TAKES);
/* We got this message from gossipd: forward/quit as it asks. */
void handle_gossip_msg(struct per_peer_state *pps, const u8 *msg TAKES);
#endif /* LIGHTNING_COMMON_READ_PEER_MSG_H */

44
gossipd/gossipd.c

@ -618,17 +618,6 @@ static const u8 *handle_query_short_channel_ids(struct peer *peer, const u8 *msg
return NULL;
}
/*~ The peer can specify a timestamp range; gossip outside this range won't be
* sent any more, and we'll start streaming gossip in this range. This is
* only supposed to be used if we negotiate the `gossip_queries` in which case
* the first send triggers the first gossip to be sent.
*/
static u8 *handle_gossip_timestamp_filter(struct peer *peer, const u8 *msg)
{
/* FIXME: Move handling this msg to peer! */
return NULL;
}
/*~ When we compact the gossip store, all the broadcast indexs move.
* We simply offset everyone, which means in theory they could retransmit
* some, but that's a lesser evil than skipping some. */
@ -1510,9 +1499,6 @@ static struct io_plan *peer_msg_in(struct io_conn *conn,
case WIRE_REPLY_SHORT_CHANNEL_IDS_END:
err = handle_reply_short_channel_ids_end(peer, msg);
goto handled_relay;
case WIRE_GOSSIP_TIMESTAMP_FILTER:
err = handle_gossip_timestamp_filter(peer, msg);
goto handled_relay;
case WIRE_PING:
err = handle_ping(peer, msg);
goto handled_relay;
@ -1539,6 +1525,7 @@ static struct io_plan *peer_msg_in(struct io_conn *conn,
case WIRE_UPDATE_FEE:
case WIRE_CHANNEL_REESTABLISH:
case WIRE_ANNOUNCEMENT_SIGNATURES:
case WIRE_GOSSIP_TIMESTAMP_FILTER:
status_broken("peer %s: relayed unexpected msg of type %s",
type_to_string(tmpctx, struct node_id, &peer->id),
wire_type_name(fromwire_peektype(msg)));
@ -1654,9 +1641,36 @@ static struct io_plan *connectd_new_peer(struct io_conn *conn,
/* This sends the initial timestamp filter. */
setup_gossip_range(peer);
/* Start gossiping immediately */
/* BOLT #7:
*
* A node:
* - if the `gossip_queries` feature is negotiated:
* - MUST NOT relay any gossip messages unless explicitly requested.
*/
if (peer->gossip_queries_feature) {
gs = NULL;
} else {
/* BOLT #7:
*
* - upon receiving an `init` message with the
* `initial_routing_sync` flag set to 1:
* - SHOULD send gossip messages for all known channels and
* nodes, as if they were just received.
* - if the `initial_routing_sync` flag is set to 0, OR if the
* initial sync was completed:
* - SHOULD resume normal operation, as specified in the
* following [Rebroadcasting](#rebroadcasting) section.
*/
gs = tal(tmpctx, struct gossip_state);
gs->timestamp_min = 0;
gs->timestamp_max = UINT32_MAX;
/* If they don't want initial sync, start at end of store */
if (!peer->initial_routing_sync_feature)
lseek(gossip_store_fd, 0, SEEK_END);
gs->next_gossip = time_mono();
}
/* Reply with success, and the new fd and gossip_state. */
daemon_conn_send(daemon->connectd,

4
openingd/openingd.c

@ -376,6 +376,10 @@ static u8 *opening_negotiate_msg(const tal_t *ctx, struct state *state,
continue;
}
/* Might be a timestamp filter request: handle. */
if (handle_timestamp_filter(state->pps, msg))
continue;
/* A helper which decodes an error. */
if (is_peer_error(tmpctx, msg, &state->channel_id,
&err, &all_channels)) {

2
tests/test_gossip.py

@ -136,8 +136,6 @@ def test_announce_address(node_factory, bitcoind):
l1.daemon.wait_for_log(r"\[OUT\] 0101.*004d010102030404d202000000000000000000000000000000002607039216a8b803f3acd758aa260704e00533f3e8f2aedaa8969b3d0fa03a96e857bbb28064dca5e147e934244b9ba50230032607'")
# FIXME: Implement timestamp filtering
@pytest.mark.xfail(strict=True)
@unittest.skipIf(not DEVELOPER, "needs DEVELOPER=1")
def test_gossip_timestamp_filter(node_factory, bitcoind):
# Need full IO logging so we can see gossip (from gossipd and channeld)

2
wire/peer_wire.c

@ -46,7 +46,6 @@ bool is_msg_for_gossipd(const u8 *cursor)
case WIRE_REPLY_SHORT_CHANNEL_IDS_END:
case WIRE_QUERY_CHANNEL_RANGE:
case WIRE_REPLY_CHANNEL_RANGE:
case WIRE_GOSSIP_TIMESTAMP_FILTER:
case WIRE_PING:
case WIRE_PONG:
return true;
@ -68,6 +67,7 @@ bool is_msg_for_gossipd(const u8 *cursor)
case WIRE_UPDATE_FEE:
case WIRE_CHANNEL_REESTABLISH:
case WIRE_ANNOUNCEMENT_SIGNATURES:
case WIRE_GOSSIP_TIMESTAMP_FILTER:
break;
}
return false;

Loading…
Cancel
Save