diff --git a/gossipd/gossip.c b/gossipd/gossip.c index 653b12c39..5aa661d09 100644 --- a/gossipd/gossip.c +++ b/gossipd/gossip.c @@ -234,6 +234,11 @@ struct peer { /* How many query responses are we expecting? */ size_t num_scid_queries_outstanding; + /* Map of outstanding channel_range requests. */ + u8 *query_channel_blocks; + u32 first_channel_range; + struct short_channel_id *query_channel_scids; + /* Only one of these is set: */ struct local_peer_state *local; struct daemon_conn *remote; @@ -355,6 +360,7 @@ static struct peer *new_peer(const tal_t *ctx, peer->scid_query_nodes = NULL; peer->scid_query_nodes_idx = 0; peer->num_scid_queries_outstanding = 0; + peer->query_channel_blocks = NULL; peer->gossip_timestamp_min = 0; peer->gossip_timestamp_max = UINT32_MAX; @@ -1081,6 +1087,97 @@ static void handle_reply_short_channel_ids_end(struct peer *peer, u8 *msg) daemon_conn_send(&peer->daemon->master, take(msg)); } +static void handle_reply_channel_range(struct peer *peer, u8 *msg) +{ + struct bitcoin_blkid chain; + u8 complete; + u32 first_blocknum, number_of_blocks; + u8 *encoded, *p; + struct short_channel_id *scids; + size_t n; + + if (!fromwire_reply_channel_range(tmpctx, msg, &chain, &first_blocknum, + &number_of_blocks, &complete, + &encoded)) { + peer_error(peer, "Bad reply_channel_range %s", + tal_hex(tmpctx, msg)); + return; + } + + if (!structeq(&peer->daemon->rstate->chain_hash, &chain)) { + peer_error(peer, "reply_channel_range for bad chain: %s", + tal_hex(tmpctx, msg)); + return; + } + + if (!peer->query_channel_blocks) { + peer_error(peer, "reply_channel_range without query: %s", + tal_hex(tmpctx, msg)); + return; + } + + if (first_blocknum + number_of_blocks < first_blocknum) { + peer_error(peer, "reply_channel_range invalid %u+%u", + first_blocknum, number_of_blocks); + return; + } + + scids = decode_short_ids(tmpctx, encoded); + if (!scids) { + peer_error(peer, "Bad reply_channel_range encoding %s", + tal_hex(tmpctx, encoded)); + return; + } + + n = first_blocknum - peer->first_channel_range; + if (first_blocknum < peer->first_channel_range + || n + number_of_blocks > tal_count(peer->query_channel_blocks)) { + peer_error(peer, "reply_channel_range invalid %u+%u for query %u+%u", + first_blocknum, number_of_blocks, + peer->first_channel_range, + tal_count(peer->query_channel_blocks)); + return; + } + + p = memchr(peer->query_channel_blocks + n, 1, number_of_blocks); + if (p) { + peer_error(peer, "reply_channel_range %u+%u already have block %zu", + first_blocknum, number_of_blocks, + peer->first_channel_range + (p - peer->query_channel_blocks)); + return; + } + + /* Mark these blocks received */ + memset(peer->query_channel_blocks + n, 1, number_of_blocks); + + /* Add scids */ + n = tal_count(peer->query_channel_scids); + tal_resize(&peer->query_channel_scids, n + tal_count(scids)); + memcpy(peer->query_channel_scids + n, scids, tal_len(scids)); + + status_debug("peer %s reply_channel_range %u+%u (of %u+%zu) %zu scids", + type_to_string(tmpctx, struct pubkey, &peer->id), + first_blocknum, number_of_blocks, + peer->first_channel_range, + tal_count(peer->query_channel_blocks), + tal_count(scids)); + + /* Still more to go? */ + if (memchr(peer->query_channel_blocks, 0, + tal_count(peer->query_channel_blocks))) + return; + + /* All done, send reply */ + msg = towire_gossip_query_channel_range_reply(NULL, + first_blocknum, + number_of_blocks, + complete, + peer->query_channel_scids); + daemon_conn_send(&peer->daemon->master, take(msg)); + peer->query_channel_scids = tal_free(peer->query_channel_scids); + peer->query_channel_blocks = tal_free(peer->query_channel_blocks); +} + /* If master asks us to release peer, we attach this destructor in case it * dies while we're waiting for it to finish IO */ static void fail_release(struct peer *peer) @@ -1176,6 +1273,10 @@ static struct io_plan *peer_msgin(struct io_conn *conn, handle_query_channel_range(peer, msg); return peer_next_in(conn, peer); + case WIRE_REPLY_CHANNEL_RANGE: + handle_reply_channel_range(peer, msg); + return peer_next_in(conn, peer); + case WIRE_OPEN_CHANNEL: case WIRE_CHANNEL_REESTABLISH: case WIRE_ACCEPT_CHANNEL: @@ -1199,10 +1300,6 @@ static struct io_plan *peer_msgin(struct io_conn *conn, /* This will wait. */ return peer_next_in(conn, peer); - - case WIRE_REPLY_CHANNEL_RANGE: - /* FIXME: Implement */ - return peer_next_in(conn, peer); } /* BOLT #1: @@ -1612,6 +1709,10 @@ static struct io_plan *owner_msg_in(struct io_conn *conn, handle_local_add_channel(peer->daemon->rstate, dc->msg_in); } else if (type == WIRE_GOSSIP_LOCAL_CHANNEL_UPDATE) { handle_local_channel_update(peer, dc->msg_in); + } else if (type == WIRE_QUERY_CHANNEL_RANGE) { + handle_query_channel_range(peer, dc->msg_in); + } else if (type == WIRE_REPLY_CHANNEL_RANGE) { + handle_reply_channel_range(peer, dc->msg_in); } else { status_broken("peer %s: send us unknown msg of type %s", type_to_string(tmpctx, struct pubkey, &peer->id), @@ -2138,6 +2239,58 @@ static struct io_plan *send_timestamp_filter(struct io_conn *conn, out: return daemon_conn_read_next(conn, &daemon->master); } + +static struct io_plan *query_channel_range(struct io_conn *conn, + struct daemon *daemon, + const u8 *msg) +{ + struct pubkey id; + u32 first_blocknum, number_of_blocks; + struct peer *peer; + + if (!fromwire_gossip_query_channel_range(msg, &id, &first_blocknum, + &number_of_blocks)) + master_badmsg(WIRE_GOSSIP_QUERY_SCIDS, msg); + + peer = find_peer(daemon, &id); + if (!peer) { + status_broken("query_channel_range: unknown peer %s", + type_to_string(tmpctx, struct pubkey, &id)); + goto fail; + } + + if (!feature_offered(peer->lfeatures, LOCAL_GOSSIP_QUERIES)) { + status_broken("query_channel_range: no gossip_query support in peer %s", + type_to_string(tmpctx, struct pubkey, &id)); + goto fail; + } + + if (peer->query_channel_blocks) { + status_broken("query_channel_range: previous query active"); + goto fail; + } + + status_debug("sending query_channel_range for blocks %u+%u", + first_blocknum, number_of_blocks); + msg = towire_query_channel_range(NULL, &daemon->rstate->chain_hash, + first_blocknum, number_of_blocks); + queue_peer_msg(peer, take(msg)); + peer->first_channel_range = first_blocknum; + /* This uses 8 times as much as it needs to, but it's only for dev */ + peer->query_channel_blocks = tal_arrz(peer, u8, number_of_blocks); + peer->query_channel_scids = tal_arr(peer, struct short_channel_id, 0); + +out: + return daemon_conn_read_next(conn, &daemon->master); + +fail: + daemon_conn_send(&daemon->master, + take(towire_gossip_query_channel_range_reply(NULL, + 0, 0, + false, + NULL))); + goto out; +} #endif /* DEVELOPER */ static int make_listen_fd(int domain, void *addr, socklen_t len, bool mayfail) @@ -3361,10 +3514,15 @@ static struct io_plan *recv_req(struct io_conn *conn, struct daemon_conn *master case WIRE_GOSSIP_SEND_TIMESTAMP_FILTER: return send_timestamp_filter(conn, daemon, daemon->master.msg_in); + + case WIRE_GOSSIP_QUERY_CHANNEL_RANGE: + return query_channel_range(conn, daemon, daemon->master.msg_in); + #else case WIRE_GOSSIP_PING: case WIRE_GOSSIP_QUERY_SCIDS: case WIRE_GOSSIP_SEND_TIMESTAMP_FILTER: + case WIRE_GOSSIP_QUERY_CHANNEL_RANGE: break; #endif /* !DEVELOPER */ @@ -3378,6 +3536,7 @@ static struct io_plan *recv_req(struct io_conn *conn, struct daemon_conn *master case WIRE_GOSSIP_GETPEERS_REPLY: case WIRE_GOSSIP_PING_REPLY: case WIRE_GOSSIP_SCIDS_REPLY: + case WIRE_GOSSIP_QUERY_CHANNEL_RANGE_REPLY: case WIRE_GOSSIP_RESOLVE_CHANNEL_REPLY: case WIRE_GOSSIP_PEER_CONNECTED: case WIRE_GOSSIPCTL_CONNECT_TO_PEER_RESULT: diff --git a/gossipd/gossip_wire.csv b/gossipd/gossip_wire.csv index d062f144a..2c4125831 100644 --- a/gossipd/gossip_wire.csv +++ b/gossipd/gossip_wire.csv @@ -172,6 +172,20 @@ gossip_send_timestamp_filter,,id,struct pubkey gossip_send_timestamp_filter,,first_timestamp,u32 gossip_send_timestamp_filter,,timestamp_range,u32 +# Test of query_channel_range. Master->gossipd +gossip_query_channel_range,3029 +gossip_query_channel_range,,id,struct pubkey +gossip_query_channel_range,,first_blocknum,u32 +gossip_query_channel_range,,number_of_blocks,u32 + +# Gossipd -> master +gossip_query_channel_range_reply,3129 +gossip_query_channel_range_reply,,final_first_block,u32 +gossip_query_channel_range_reply,,final_num_blocks,u32 +gossip_query_channel_range_reply,,final_complete,bool +gossip_query_channel_range_reply,,num,u16 +gossip_query_channel_range_reply,,scids,num*struct short_channel_id + # Given a short_channel_id, return the endpoints gossip_resolve_channel_request,3009 gossip_resolve_channel_request,,channel_id,struct short_channel_id diff --git a/lightningd/gossip_control.c b/lightningd/gossip_control.c index c1b8dd886..60c2c6890 100644 --- a/lightningd/gossip_control.c +++ b/lightningd/gossip_control.c @@ -137,6 +137,7 @@ static unsigned gossip_msg(struct subd *gossip, const u8 *msg, const int *fds) case WIRE_GOSSIP_ROUTING_FAILURE: case WIRE_GOSSIP_MARK_CHANNEL_UNROUTABLE: case WIRE_GOSSIP_QUERY_SCIDS: + case WIRE_GOSSIP_QUERY_CHANNEL_RANGE: case WIRE_GOSSIP_SEND_TIMESTAMP_FILTER: case WIRE_GOSSIPCTL_PEER_DISCONNECT: case WIRE_GOSSIPCTL_PEER_IMPORTANT: @@ -150,6 +151,7 @@ static unsigned gossip_msg(struct subd *gossip, const u8 *msg, const int *fds) case WIRE_GOSSIP_GETPEERS_REPLY: case WIRE_GOSSIP_PING_REPLY: case WIRE_GOSSIP_SCIDS_REPLY: + case WIRE_GOSSIP_QUERY_CHANNEL_RANGE_REPLY: case WIRE_GOSSIP_RESOLVE_CHANNEL_REPLY: case WIRE_GOSSIPCTL_RELEASE_PEER_REPLY: case WIRE_GOSSIPCTL_RELEASE_PEER_REPLYFAIL: @@ -677,4 +679,86 @@ static const struct json_command dev_send_timestamp_filter = { "Send {peerid} the timestamp filter {first} {range}" }; AUTODATA(json_command, &dev_send_timestamp_filter); + +static void json_channel_range_reply(struct subd *gossip UNUSED, const u8 *reply, + const int *fds UNUSED, struct command *cmd) +{ + struct json_result *response = new_json_result(cmd); + u32 final_first_block, final_num_blocks; + bool final_complete; + struct short_channel_id *scids; + + if (!fromwire_gossip_query_channel_range_reply(tmpctx, reply, + &final_first_block, + &final_num_blocks, + &final_complete, + &scids)) { + command_fail(cmd, LIGHTNINGD, + "Gossip gave bad gossip_query_channel_range_reply"); + return; + } + + if (final_num_blocks == 0 && final_num_blocks == 0 && !final_complete) { + command_fail(cmd, LIGHTNINGD, + "Gossip refused to query peer"); + return; + } + + json_object_start(response, NULL); + json_add_num(response, "final_first_block", final_first_block); + json_add_num(response, "final_num_blocks", final_num_blocks); + json_add_bool(response, "final_complete", final_complete); + json_array_start(response, "short_channel_ids"); + for (size_t i = 0; i < tal_count(scids); i++) + json_add_short_channel_id(response, NULL, &scids[i]); + json_array_end(response); + json_object_end(response); + command_success(cmd, response); +} + +static void json_dev_query_channel_range(struct command *cmd, + const char *buffer, + const jsmntok_t *params) +{ + u8 *msg; + jsmntok_t *idtok, *firsttok, *numtok; + struct pubkey id; + u32 first, num; + + if (!json_get_params(cmd, buffer, params, + "id", &idtok, + "first", &firsttok, + "num", &numtok, + NULL)) { + return; + } + + if (!json_tok_pubkey(buffer, idtok, &id)) { + command_fail(cmd, JSONRPC2_INVALID_PARAMS, + "'%.*s' is not a valid id", + idtok->end - idtok->start, + buffer + idtok->start); + return; + } + + if (!json_tok_number(buffer, firsttok, &first) + || !json_tok_number(buffer, numtok, &num)) { + command_fail(cmd, JSONRPC2_INVALID_PARAMS, + "first and num must be numbers"); + return; + } + + /* Tell gossipd, since this is a gossip query. */ + msg = towire_gossip_query_channel_range(cmd, &id, first, num); + subd_req(cmd->ld->gossip, cmd->ld->gossip, + take(msg), -1, 0, json_channel_range_reply, cmd); + command_still_pending(cmd); +} + +static const struct json_command dev_query_channel_range_command = { + "dev-query-channel-range", + json_dev_query_channel_range, + "Query {peerid} for short_channel_ids for {first} block + {num} blocks" +}; +AUTODATA(json_command, &dev_query_channel_range_command); #endif /* DEVELOPER */ diff --git a/tests/test_lightningd.py b/tests/test_lightningd.py index 8caf535b4..a948bfb7d 100644 --- a/tests/test_lightningd.py +++ b/tests/test_lightningd.py @@ -2622,6 +2622,98 @@ class LightningDTests(BaseLightningDTests): l1.daemon.wait_for_log('Got pong 1000 bytes \({}\.\.\.\)' .format(l2.info['version'])) + @unittest.skipIf(not DEVELOPER, "needs DEVELOPER=1") + def test_gossip_query_channel_range(self): + l1 = self.node_factory.get_node() + l2 = self.node_factory.get_node() + l3 = self.node_factory.get_node() + + l1.rpc.connect(l2.info['id'], 'localhost', l2.port) + l2.rpc.connect(l3.info['id'], 'localhost', l3.port) + + # Make public channels. + scid12 = self.fund_channel(l1, l2, 10**5) + block12 = int(scid12.split(':')[0]) + scid23 = self.fund_channel(l2, l3, 10**5) + block23 = int(scid23.split(':')[0]) + bitcoind.generate_block(5) + sync_blockheight([l2, l3]) + + # Make sure l2 has received all the gossip. + l2.daemon.wait_for_logs(['Received node_announcement for node ' + l1.info['id'], + 'Received node_announcement for node ' + l3.info['id']]) + + # l1 asks for all channels, gets both. + ret = l1.rpc.dev_query_channel_range(id=l2.info['id'], + first=0, + num=1000000) + + assert ret['final_first_block'] == 0 + assert ret['final_num_blocks'] == 1000000 + assert ret['final_complete'] + assert len(ret['short_channel_ids']) == 2 + assert ret['short_channel_ids'][0] == scid12 + assert ret['short_channel_ids'][1] == scid23 + + # Does not include scid12 + ret = l1.rpc.dev_query_channel_range(id=l2.info['id'], + first=0, + num=block12) + assert ret['final_first_block'] == 0 + assert ret['final_num_blocks'] == block12 + assert ret['final_complete'] + assert len(ret['short_channel_ids']) == 0 + + # Does include scid12 + ret = l1.rpc.dev_query_channel_range(id=l2.info['id'], + first=0, + num=block12 + 1) + assert ret['final_first_block'] == 0 + assert ret['final_num_blocks'] == block12 + 1 + assert ret['final_complete'] + assert len(ret['short_channel_ids']) == 1 + assert ret['short_channel_ids'][0] == scid12 + + # Doesn't include scid23 + ret = l1.rpc.dev_query_channel_range(id=l2.info['id'], + first=0, + num=block23) + assert ret['final_first_block'] == 0 + assert ret['final_num_blocks'] == block23 + assert ret['final_complete'] + assert len(ret['short_channel_ids']) == 1 + assert ret['short_channel_ids'][0] == scid12 + + # Does include scid23 + ret = l1.rpc.dev_query_channel_range(id=l2.info['id'], + first=block12, + num=block23 - block12 + 1) + assert ret['final_first_block'] == block12 + assert ret['final_num_blocks'] == block23 - block12 + 1 + assert ret['final_complete'] + assert len(ret['short_channel_ids']) == 2 + assert ret['short_channel_ids'][0] == scid12 + assert ret['short_channel_ids'][1] == scid23 + + # Only includes scid23 + ret = l1.rpc.dev_query_channel_range(id=l2.info['id'], + first=block23, + num=1) + assert ret['final_first_block'] == block23 + assert ret['final_num_blocks'] == 1 + assert ret['final_complete'] + assert len(ret['short_channel_ids']) == 1 + assert ret['short_channel_ids'][0] == scid23 + + # Past both + ret = l1.rpc.dev_query_channel_range(id=l2.info['id'], + first=block23 + 1, + num=1000000) + assert ret['final_first_block'] == block23 + 1 + assert ret['final_num_blocks'] == 1000000 + assert ret['final_complete'] + assert len(ret['short_channel_ids']) == 0 + @unittest.skipIf(not DEVELOPER, "needs DEVELOPER=1") def test_query_short_channel_id(self): l1 = self.node_factory.get_node(options={'log-level': 'io'})