Browse Source

gossip: dev-query-channel-range to test query_channel_range.

We keep a crappy bitmap, and finish when their replies cover
everything we asked.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
ppa-0.6.1
Rusty Russell 7 years ago
parent
commit
118f099dd8
  1. 167
      gossipd/gossip.c
  2. 14
      gossipd/gossip_wire.csv
  3. 84
      lightningd/gossip_control.c
  4. 92
      tests/test_lightningd.py

167
gossipd/gossip.c

@ -234,6 +234,11 @@ struct peer {
/* How many query responses are we expecting? */
size_t num_scid_queries_outstanding;
/* Map of outstanding channel_range requests. */
u8 *query_channel_blocks;
u32 first_channel_range;
struct short_channel_id *query_channel_scids;
/* Only one of these is set: */
struct local_peer_state *local;
struct daemon_conn *remote;
@ -355,6 +360,7 @@ static struct peer *new_peer(const tal_t *ctx,
peer->scid_query_nodes = NULL;
peer->scid_query_nodes_idx = 0;
peer->num_scid_queries_outstanding = 0;
peer->query_channel_blocks = NULL;
peer->gossip_timestamp_min = 0;
peer->gossip_timestamp_max = UINT32_MAX;
@ -1081,6 +1087,97 @@ static void handle_reply_short_channel_ids_end(struct peer *peer, u8 *msg)
daemon_conn_send(&peer->daemon->master, take(msg));
}
static void handle_reply_channel_range(struct peer *peer, u8 *msg)
{
struct bitcoin_blkid chain;
u8 complete;
u32 first_blocknum, number_of_blocks;
u8 *encoded, *p;
struct short_channel_id *scids;
size_t n;
if (!fromwire_reply_channel_range(tmpctx, msg, &chain, &first_blocknum,
&number_of_blocks, &complete,
&encoded)) {
peer_error(peer, "Bad reply_channel_range %s",
tal_hex(tmpctx, msg));
return;
}
if (!structeq(&peer->daemon->rstate->chain_hash, &chain)) {
peer_error(peer, "reply_channel_range for bad chain: %s",
tal_hex(tmpctx, msg));
return;
}
if (!peer->query_channel_blocks) {
peer_error(peer, "reply_channel_range without query: %s",
tal_hex(tmpctx, msg));
return;
}
if (first_blocknum + number_of_blocks < first_blocknum) {
peer_error(peer, "reply_channel_range invalid %u+%u",
first_blocknum, number_of_blocks);
return;
}
scids = decode_short_ids(tmpctx, encoded);
if (!scids) {
peer_error(peer, "Bad reply_channel_range encoding %s",
tal_hex(tmpctx, encoded));
return;
}
n = first_blocknum - peer->first_channel_range;
if (first_blocknum < peer->first_channel_range
|| n + number_of_blocks > tal_count(peer->query_channel_blocks)) {
peer_error(peer, "reply_channel_range invalid %u+%u for query %u+%u",
first_blocknum, number_of_blocks,
peer->first_channel_range,
tal_count(peer->query_channel_blocks));
return;
}
p = memchr(peer->query_channel_blocks + n, 1, number_of_blocks);
if (p) {
peer_error(peer, "reply_channel_range %u+%u already have block %zu",
first_blocknum, number_of_blocks,
peer->first_channel_range + (p - peer->query_channel_blocks));
return;
}
/* Mark these blocks received */
memset(peer->query_channel_blocks + n, 1, number_of_blocks);
/* Add scids */
n = tal_count(peer->query_channel_scids);
tal_resize(&peer->query_channel_scids, n + tal_count(scids));
memcpy(peer->query_channel_scids + n, scids, tal_len(scids));
status_debug("peer %s reply_channel_range %u+%u (of %u+%zu) %zu scids",
type_to_string(tmpctx, struct pubkey, &peer->id),
first_blocknum, number_of_blocks,
peer->first_channel_range,
tal_count(peer->query_channel_blocks),
tal_count(scids));
/* Still more to go? */
if (memchr(peer->query_channel_blocks, 0,
tal_count(peer->query_channel_blocks)))
return;
/* All done, send reply */
msg = towire_gossip_query_channel_range_reply(NULL,
first_blocknum,
number_of_blocks,
complete,
peer->query_channel_scids);
daemon_conn_send(&peer->daemon->master, take(msg));
peer->query_channel_scids = tal_free(peer->query_channel_scids);
peer->query_channel_blocks = tal_free(peer->query_channel_blocks);
}
/* If master asks us to release peer, we attach this destructor in case it
* dies while we're waiting for it to finish IO */
static void fail_release(struct peer *peer)
@ -1176,6 +1273,10 @@ static struct io_plan *peer_msgin(struct io_conn *conn,
handle_query_channel_range(peer, msg);
return peer_next_in(conn, peer);
case WIRE_REPLY_CHANNEL_RANGE:
handle_reply_channel_range(peer, msg);
return peer_next_in(conn, peer);
case WIRE_OPEN_CHANNEL:
case WIRE_CHANNEL_REESTABLISH:
case WIRE_ACCEPT_CHANNEL:
@ -1199,10 +1300,6 @@ static struct io_plan *peer_msgin(struct io_conn *conn,
/* This will wait. */
return peer_next_in(conn, peer);
case WIRE_REPLY_CHANNEL_RANGE:
/* FIXME: Implement */
return peer_next_in(conn, peer);
}
/* BOLT #1:
@ -1612,6 +1709,10 @@ static struct io_plan *owner_msg_in(struct io_conn *conn,
handle_local_add_channel(peer->daemon->rstate, dc->msg_in);
} else if (type == WIRE_GOSSIP_LOCAL_CHANNEL_UPDATE) {
handle_local_channel_update(peer, dc->msg_in);
} else if (type == WIRE_QUERY_CHANNEL_RANGE) {
handle_query_channel_range(peer, dc->msg_in);
} else if (type == WIRE_REPLY_CHANNEL_RANGE) {
handle_reply_channel_range(peer, dc->msg_in);
} else {
status_broken("peer %s: send us unknown msg of type %s",
type_to_string(tmpctx, struct pubkey, &peer->id),
@ -2138,6 +2239,58 @@ static struct io_plan *send_timestamp_filter(struct io_conn *conn,
out:
return daemon_conn_read_next(conn, &daemon->master);
}
static struct io_plan *query_channel_range(struct io_conn *conn,
struct daemon *daemon,
const u8 *msg)
{
struct pubkey id;
u32 first_blocknum, number_of_blocks;
struct peer *peer;
if (!fromwire_gossip_query_channel_range(msg, &id, &first_blocknum,
&number_of_blocks))
master_badmsg(WIRE_GOSSIP_QUERY_SCIDS, msg);
peer = find_peer(daemon, &id);
if (!peer) {
status_broken("query_channel_range: unknown peer %s",
type_to_string(tmpctx, struct pubkey, &id));
goto fail;
}
if (!feature_offered(peer->lfeatures, LOCAL_GOSSIP_QUERIES)) {
status_broken("query_channel_range: no gossip_query support in peer %s",
type_to_string(tmpctx, struct pubkey, &id));
goto fail;
}
if (peer->query_channel_blocks) {
status_broken("query_channel_range: previous query active");
goto fail;
}
status_debug("sending query_channel_range for blocks %u+%u",
first_blocknum, number_of_blocks);
msg = towire_query_channel_range(NULL, &daemon->rstate->chain_hash,
first_blocknum, number_of_blocks);
queue_peer_msg(peer, take(msg));
peer->first_channel_range = first_blocknum;
/* This uses 8 times as much as it needs to, but it's only for dev */
peer->query_channel_blocks = tal_arrz(peer, u8, number_of_blocks);
peer->query_channel_scids = tal_arr(peer, struct short_channel_id, 0);
out:
return daemon_conn_read_next(conn, &daemon->master);
fail:
daemon_conn_send(&daemon->master,
take(towire_gossip_query_channel_range_reply(NULL,
0, 0,
false,
NULL)));
goto out;
}
#endif /* DEVELOPER */
static int make_listen_fd(int domain, void *addr, socklen_t len, bool mayfail)
@ -3361,10 +3514,15 @@ static struct io_plan *recv_req(struct io_conn *conn, struct daemon_conn *master
case WIRE_GOSSIP_SEND_TIMESTAMP_FILTER:
return send_timestamp_filter(conn, daemon, daemon->master.msg_in);
case WIRE_GOSSIP_QUERY_CHANNEL_RANGE:
return query_channel_range(conn, daemon, daemon->master.msg_in);
#else
case WIRE_GOSSIP_PING:
case WIRE_GOSSIP_QUERY_SCIDS:
case WIRE_GOSSIP_SEND_TIMESTAMP_FILTER:
case WIRE_GOSSIP_QUERY_CHANNEL_RANGE:
break;
#endif /* !DEVELOPER */
@ -3378,6 +3536,7 @@ static struct io_plan *recv_req(struct io_conn *conn, struct daemon_conn *master
case WIRE_GOSSIP_GETPEERS_REPLY:
case WIRE_GOSSIP_PING_REPLY:
case WIRE_GOSSIP_SCIDS_REPLY:
case WIRE_GOSSIP_QUERY_CHANNEL_RANGE_REPLY:
case WIRE_GOSSIP_RESOLVE_CHANNEL_REPLY:
case WIRE_GOSSIP_PEER_CONNECTED:
case WIRE_GOSSIPCTL_CONNECT_TO_PEER_RESULT:

14
gossipd/gossip_wire.csv

@ -172,6 +172,20 @@ gossip_send_timestamp_filter,,id,struct pubkey
gossip_send_timestamp_filter,,first_timestamp,u32
gossip_send_timestamp_filter,,timestamp_range,u32
# Test of query_channel_range. Master->gossipd
gossip_query_channel_range,3029
gossip_query_channel_range,,id,struct pubkey
gossip_query_channel_range,,first_blocknum,u32
gossip_query_channel_range,,number_of_blocks,u32
# Gossipd -> master
gossip_query_channel_range_reply,3129
gossip_query_channel_range_reply,,final_first_block,u32
gossip_query_channel_range_reply,,final_num_blocks,u32
gossip_query_channel_range_reply,,final_complete,bool
gossip_query_channel_range_reply,,num,u16
gossip_query_channel_range_reply,,scids,num*struct short_channel_id
# Given a short_channel_id, return the endpoints
gossip_resolve_channel_request,3009
gossip_resolve_channel_request,,channel_id,struct short_channel_id

Can't render this file because it has a wrong number of fields in line 6.

84
lightningd/gossip_control.c

@ -137,6 +137,7 @@ static unsigned gossip_msg(struct subd *gossip, const u8 *msg, const int *fds)
case WIRE_GOSSIP_ROUTING_FAILURE:
case WIRE_GOSSIP_MARK_CHANNEL_UNROUTABLE:
case WIRE_GOSSIP_QUERY_SCIDS:
case WIRE_GOSSIP_QUERY_CHANNEL_RANGE:
case WIRE_GOSSIP_SEND_TIMESTAMP_FILTER:
case WIRE_GOSSIPCTL_PEER_DISCONNECT:
case WIRE_GOSSIPCTL_PEER_IMPORTANT:
@ -150,6 +151,7 @@ static unsigned gossip_msg(struct subd *gossip, const u8 *msg, const int *fds)
case WIRE_GOSSIP_GETPEERS_REPLY:
case WIRE_GOSSIP_PING_REPLY:
case WIRE_GOSSIP_SCIDS_REPLY:
case WIRE_GOSSIP_QUERY_CHANNEL_RANGE_REPLY:
case WIRE_GOSSIP_RESOLVE_CHANNEL_REPLY:
case WIRE_GOSSIPCTL_RELEASE_PEER_REPLY:
case WIRE_GOSSIPCTL_RELEASE_PEER_REPLYFAIL:
@ -677,4 +679,86 @@ static const struct json_command dev_send_timestamp_filter = {
"Send {peerid} the timestamp filter {first} {range}"
};
AUTODATA(json_command, &dev_send_timestamp_filter);
static void json_channel_range_reply(struct subd *gossip UNUSED, const u8 *reply,
const int *fds UNUSED, struct command *cmd)
{
struct json_result *response = new_json_result(cmd);
u32 final_first_block, final_num_blocks;
bool final_complete;
struct short_channel_id *scids;
if (!fromwire_gossip_query_channel_range_reply(tmpctx, reply,
&final_first_block,
&final_num_blocks,
&final_complete,
&scids)) {
command_fail(cmd, LIGHTNINGD,
"Gossip gave bad gossip_query_channel_range_reply");
return;
}
if (final_num_blocks == 0 && final_num_blocks == 0 && !final_complete) {
command_fail(cmd, LIGHTNINGD,
"Gossip refused to query peer");
return;
}
json_object_start(response, NULL);
json_add_num(response, "final_first_block", final_first_block);
json_add_num(response, "final_num_blocks", final_num_blocks);
json_add_bool(response, "final_complete", final_complete);
json_array_start(response, "short_channel_ids");
for (size_t i = 0; i < tal_count(scids); i++)
json_add_short_channel_id(response, NULL, &scids[i]);
json_array_end(response);
json_object_end(response);
command_success(cmd, response);
}
static void json_dev_query_channel_range(struct command *cmd,
const char *buffer,
const jsmntok_t *params)
{
u8 *msg;
jsmntok_t *idtok, *firsttok, *numtok;
struct pubkey id;
u32 first, num;
if (!json_get_params(cmd, buffer, params,
"id", &idtok,
"first", &firsttok,
"num", &numtok,
NULL)) {
return;
}
if (!json_tok_pubkey(buffer, idtok, &id)) {
command_fail(cmd, JSONRPC2_INVALID_PARAMS,
"'%.*s' is not a valid id",
idtok->end - idtok->start,
buffer + idtok->start);
return;
}
if (!json_tok_number(buffer, firsttok, &first)
|| !json_tok_number(buffer, numtok, &num)) {
command_fail(cmd, JSONRPC2_INVALID_PARAMS,
"first and num must be numbers");
return;
}
/* Tell gossipd, since this is a gossip query. */
msg = towire_gossip_query_channel_range(cmd, &id, first, num);
subd_req(cmd->ld->gossip, cmd->ld->gossip,
take(msg), -1, 0, json_channel_range_reply, cmd);
command_still_pending(cmd);
}
static const struct json_command dev_query_channel_range_command = {
"dev-query-channel-range",
json_dev_query_channel_range,
"Query {peerid} for short_channel_ids for {first} block + {num} blocks"
};
AUTODATA(json_command, &dev_query_channel_range_command);
#endif /* DEVELOPER */

92
tests/test_lightningd.py

@ -2622,6 +2622,98 @@ class LightningDTests(BaseLightningDTests):
l1.daemon.wait_for_log('Got pong 1000 bytes \({}\.\.\.\)'
.format(l2.info['version']))
@unittest.skipIf(not DEVELOPER, "needs DEVELOPER=1")
def test_gossip_query_channel_range(self):
l1 = self.node_factory.get_node()
l2 = self.node_factory.get_node()
l3 = self.node_factory.get_node()
l1.rpc.connect(l2.info['id'], 'localhost', l2.port)
l2.rpc.connect(l3.info['id'], 'localhost', l3.port)
# Make public channels.
scid12 = self.fund_channel(l1, l2, 10**5)
block12 = int(scid12.split(':')[0])
scid23 = self.fund_channel(l2, l3, 10**5)
block23 = int(scid23.split(':')[0])
bitcoind.generate_block(5)
sync_blockheight([l2, l3])
# Make sure l2 has received all the gossip.
l2.daemon.wait_for_logs(['Received node_announcement for node ' + l1.info['id'],
'Received node_announcement for node ' + l3.info['id']])
# l1 asks for all channels, gets both.
ret = l1.rpc.dev_query_channel_range(id=l2.info['id'],
first=0,
num=1000000)
assert ret['final_first_block'] == 0
assert ret['final_num_blocks'] == 1000000
assert ret['final_complete']
assert len(ret['short_channel_ids']) == 2
assert ret['short_channel_ids'][0] == scid12
assert ret['short_channel_ids'][1] == scid23
# Does not include scid12
ret = l1.rpc.dev_query_channel_range(id=l2.info['id'],
first=0,
num=block12)
assert ret['final_first_block'] == 0
assert ret['final_num_blocks'] == block12
assert ret['final_complete']
assert len(ret['short_channel_ids']) == 0
# Does include scid12
ret = l1.rpc.dev_query_channel_range(id=l2.info['id'],
first=0,
num=block12 + 1)
assert ret['final_first_block'] == 0
assert ret['final_num_blocks'] == block12 + 1
assert ret['final_complete']
assert len(ret['short_channel_ids']) == 1
assert ret['short_channel_ids'][0] == scid12
# Doesn't include scid23
ret = l1.rpc.dev_query_channel_range(id=l2.info['id'],
first=0,
num=block23)
assert ret['final_first_block'] == 0
assert ret['final_num_blocks'] == block23
assert ret['final_complete']
assert len(ret['short_channel_ids']) == 1
assert ret['short_channel_ids'][0] == scid12
# Does include scid23
ret = l1.rpc.dev_query_channel_range(id=l2.info['id'],
first=block12,
num=block23 - block12 + 1)
assert ret['final_first_block'] == block12
assert ret['final_num_blocks'] == block23 - block12 + 1
assert ret['final_complete']
assert len(ret['short_channel_ids']) == 2
assert ret['short_channel_ids'][0] == scid12
assert ret['short_channel_ids'][1] == scid23
# Only includes scid23
ret = l1.rpc.dev_query_channel_range(id=l2.info['id'],
first=block23,
num=1)
assert ret['final_first_block'] == block23
assert ret['final_num_blocks'] == 1
assert ret['final_complete']
assert len(ret['short_channel_ids']) == 1
assert ret['short_channel_ids'][0] == scid23
# Past both
ret = l1.rpc.dev_query_channel_range(id=l2.info['id'],
first=block23 + 1,
num=1000000)
assert ret['final_first_block'] == block23 + 1
assert ret['final_num_blocks'] == 1000000
assert ret['final_complete']
assert len(ret['short_channel_ids']) == 0
@unittest.skipIf(not DEVELOPER, "needs DEVELOPER=1")
def test_query_short_channel_id(self):
l1 = self.node_factory.get_node(options={'log-level': 'io'})

Loading…
Cancel
Save