Browse Source

lightningd: format JSON directly into json connection membuf.

My test case is a mainnet gossip store with 22107 channels, and
time to do `lightning-cli listchannels`:

Before: `lightning-cli listchannels` DEVELOPER=0
	real	0m1.303000-1.324000(1.3114+/-0.0091)s

After:
	real	0m0.629000-0.695000(0.64985+/-0.019)s

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
ppa-0.6.2rc1
Rusty Russell 6 years ago
parent
commit
c403415caa
  1. 62
      lightningd/json.c
  2. 15
      lightningd/json.h
  3. 185
      lightningd/jsonrpc.c
  4. 8
      lightningd/jsonrpc.h
  5. 1
      lightningd/test/Makefile
  6. 32
      lightningd/test/run-jsonrpc.c
  7. 6
      lightningd/test/run-param.c
  8. 8
      tests/test_closing.py
  9. 6
      tests/test_pay.py

62
lightningd/json.c

@ -491,31 +491,33 @@ struct json_result {
/* True if we haven't yet put an element in current wrapping */
bool empty;
/* tal_count() of this is strlen() + 1 */
char *s;
/* The command we're attached to */
struct command *cmd;
};
static void result_append(struct json_result *res, const char *str)
{
size_t len = tal_count(res->s) - 1;
struct json_connection *jcon = res->cmd->jcon;
tal_resize(&res->s, len + strlen(str) + 1);
strcpy(res->s + len, str);
/* Don't do anything if they're disconnected. */
if (!jcon)
return;
jcon_append(jcon, str);
}
static void PRINTF_FMT(2,3)
result_append_fmt(struct json_result *res, const char *fmt, ...)
{
size_t len = tal_count(res->s) - 1, fmtlen;
struct json_connection *jcon = res->cmd->jcon;
va_list ap;
va_start(ap, fmt);
fmtlen = vsnprintf(NULL, 0, fmt, ap);
va_end(ap);
/* Don't do anything if they're disconnected. */
if (!jcon)
return;
tal_resize(&res->s, len + fmtlen + 1);
va_start(ap, fmt);
vsprintf(res->s + len, fmt, ap);
jcon_append_vfmt(jcon, fmt, ap);
va_end(ap);
}
@ -707,7 +709,7 @@ static struct json_result *new_json_stream(struct command *cmd)
{
struct json_result *r = tal(cmd, struct json_result);
r->s = tal_strdup(r, "");
r->cmd = cmd;
#if DEVELOPER
r->wrapping = tal_arr(r, jsmntype_t, 0);
#endif
@ -721,27 +723,33 @@ static struct json_result *new_json_stream(struct command *cmd)
struct json_result *json_stream_success(struct command *cmd)
{
cmd->failcode = 0;
return new_json_stream(cmd);
struct json_result *r;
r = new_json_stream(cmd);
result_append(r, "\"result\" : ");
return r;
}
struct json_result *json_stream_fail(struct command *cmd,
int code,
const char *errmsg)
struct json_result *json_stream_fail_nodata(struct command *cmd,
int code,
const char *errmsg)
{
struct json_result *r = new_json_stream(cmd);
assert(code);
assert(errmsg);
cmd->failcode = code;
cmd->errmsg = tal_strdup(cmd, errmsg);
return new_json_stream(cmd);
result_append_fmt(r, " \"error\" : "
"{ \"code\" : %d,"
" \"message\" : \"%s\"", code, errmsg);
return r;
}
const char *json_result_string(const struct json_result *result)
struct json_result *json_stream_fail(struct command *cmd,
int code,
const char *errmsg)
{
#if DEVELOPER
assert(tal_count(result->wrapping) == 0);
#endif
assert(result->indent == 0);
assert(tal_count(result->s) == strlen(result->s) + 1);
return result->s;
struct json_result *r = json_stream_fail_nodata(cmd, code, errmsg);
result_append(r, ", \"data\" : ");
return r;
}

15
lightningd/json.h

@ -159,7 +159,7 @@ bool json_tok_tok(struct command *cmd, const char *name,
const jsmntok_t **out);
/* Creating JSON strings */
/* Creating JSON output */
/* '"fieldname" : [ ' or '[ ' if fieldname is NULL */
void json_array_start(struct json_result *ptr, const char *fieldname);
@ -192,6 +192,18 @@ struct json_result *json_stream_fail(struct command *cmd,
int code,
const char *errmsg);
/**
* json_stream_fail_nodata - start streaming a failed json result.
* @cmd: the command we're running.
* @code: the error code from lightningd/jsonrpc_errors.h
* @errmsg: the error string.
*
* This is used by command_fail(), which doesn't add any JSON data.
*/
struct json_result *json_stream_fail_nodata(struct command *cmd,
int code,
const char *errmsg);
/* '"fieldname" : "value"' or '"value"' if fieldname is NULL. Turns
* any non-printable chars into JSON escapes, but leaves existing escapes alone.
*/
@ -227,5 +239,4 @@ void json_add_hex_talarr(struct json_result *result,
const tal_t *data);
void json_add_object(struct json_result *result, ...);
const char *json_result_string(const struct json_result *result);
#endif /* LIGHTNING_LIGHTNINGD_JSON_H */

185
lightningd/jsonrpc.c

@ -55,7 +55,7 @@ static void destroy_jcon(struct json_connection *jcon)
}
/* FIXME: This, or something prettier (io_replan?) belong in ccan/io! */
static UNNEEDED void adjust_io_write(struct io_conn *conn, ptrdiff_t delta)
static void adjust_io_write(struct io_conn *conn, ptrdiff_t delta)
{
conn->plan[IO_OUT].arg.u1.cp += delta;
}
@ -283,74 +283,55 @@ static const struct json_command *find_cmd(const char *buffer,
return NULL;
}
static void json_done(struct json_connection *jcon,
struct command *cmd,
const char *json TAKES)
/* Make sure jcon->outbuf has room for len */
static void json_connection_mkroom(struct json_connection *jcon, size_t len)
{
/* Queue for writing, and wake writer. */
size_t len = strlen(json);
char *p = membuf_add(&jcon->outbuf, len);
memcpy(p, json, len);
if (taken(json))
tal_free(json);
io_wake(jcon);
ptrdiff_t delta = membuf_prepare_space(&jcon->outbuf, len);
/* Can be NULL if we failed to parse! */
assert(jcon->command == cmd);
jcon->command = tal_free(cmd);
/* If io_write is in progress, we shift it to point to new buffer pos */
if (io_lock_taken(jcon->lock))
adjust_io_write(jcon->conn, delta);
}
static void connection_complete_ok(struct json_connection *jcon,
struct command *cmd,
const char *id,
const struct json_result *result)
void jcon_append(struct json_connection *jcon, const char *str)
{
assert(id != NULL);
assert(result != NULL);
if (cmd->ok)
*(cmd->ok) = true;
size_t len = strlen(str);
json_connection_mkroom(jcon, len);
memcpy(membuf_add(&jcon->outbuf, len), str, len);
/* This JSON is simple enough that we build manually */
json_done(jcon, cmd, take(tal_fmt(jcon,
"{ \"jsonrpc\": \"2.0\", "
"\"result\" : %s,"
" \"id\" : %s }\n",
json_result_string(result), id)));
/* Wake writer. */
io_wake(jcon);
}
static void connection_complete_error(struct json_connection *jcon,
struct command *cmd,
const char *id,
const char *errmsg,
int code,
const struct json_result *data)
void jcon_append_vfmt(struct json_connection *jcon, const char *fmt, va_list ap)
{
struct json_escaped *esc;
const char *data_str;
size_t fmtlen;
va_list ap2;
esc = json_escape(tmpctx, errmsg);
if (data)
data_str = tal_fmt(tmpctx, ", \"data\" : %s",
json_result_string(data));
else
data_str = "";
/* Make a copy in case we need it below. */
va_copy(ap2, ap);
assert(id != NULL);
/* Try printing in place first. */
fmtlen = vsnprintf(membuf_space(&jcon->outbuf),
membuf_num_space(&jcon->outbuf), fmt, ap);
/* cmd *can be* NULL if we failed to parse! */
if (cmd && cmd->ok)
*(cmd->ok) = false;
/* Horrible subtlety: vsnprintf *will* NUL terminate, even if it means
* chopping off the last character. So if fmtlen ==
* membuf_num_space(&jcon->outbuf), the result was truncated! */
if (fmtlen < membuf_num_space(&jcon->outbuf)) {
membuf_added(&jcon->outbuf, fmtlen);
} else {
/* Make room for NUL terminator, even though we don't want it */
json_connection_mkroom(jcon, fmtlen + 1);
vsprintf(membuf_space(&jcon->outbuf), fmt, ap2);
membuf_added(&jcon->outbuf, fmtlen);
}
json_done(jcon, cmd, take(tal_fmt(tmpctx,
"{ \"jsonrpc\": \"2.0\", "
" \"error\" : "
"{ \"code\" : %d,"
" \"message\" : \"%s\"%s },"
" \"id\" : %s }\n",
code,
esc->s,
data_str,
id)));
va_end(ap2);
/* Wake writer. */
io_wake(jcon);
}
struct json_result *null_response(struct command *cmd)
@ -363,57 +344,56 @@ struct json_result *null_response(struct command *cmd)
return response;
}
void command_success(struct command *cmd, struct json_result *result)
/* This can be called directly on shutdown, even with unfinished cmd */
static void destroy_command(struct command *cmd)
{
struct json_connection *jcon = cmd->jcon;
if (!jcon) {
if (!cmd->jcon) {
log_debug(cmd->ld->log,
"Command returned result after jcon close");
tal_free(cmd);
return;
}
assert(jcon->command == cmd);
connection_complete_ok(jcon, cmd, cmd->id, result);
assert(cmd->jcon->command == cmd);
cmd->jcon->command = NULL;
}
static void command_fail_generic(struct command *cmd,
int code,
const struct json_result *data,
const char *error)
/* FIXME: Remove result arg here! */
void command_success(struct command *cmd, struct json_result *result)
{
struct json_connection *jcon = cmd->jcon;
if (!jcon) {
log_debug(cmd->ld->log,
"%s: Command failed after jcon close",
cmd->json_cmd->name);
tal_free(cmd);
return;
}
/* cmd->json_cmd can be NULL, if we're failing for command not found! */
log_debug(jcon->log, "Failing %s: %s",
cmd->json_cmd ? cmd->json_cmd->name : "invalid cmd",
error);
assert(jcon->command == cmd);
connection_complete_error(jcon, cmd, cmd->id, error, code, data);
assert(cmd);
assert(cmd->have_json_stream);
if (cmd->jcon)
jcon_append(cmd->jcon, " }\n");
if (cmd->ok)
*(cmd->ok) = true;
tal_free(cmd);
}
/* FIXME: Remove result arg here! */
void command_failed(struct command *cmd, struct json_result *result)
{
assert(cmd->failcode != 0);
command_fail_generic(cmd, cmd->failcode, result, cmd->errmsg);
assert(cmd->have_json_stream);
/* Have to close error */
if (cmd->jcon)
jcon_append(cmd->jcon, " } }\n");
if (cmd->ok)
*(cmd->ok) = false;
tal_free(cmd);
}
void PRINTF_FMT(3, 4) command_fail(struct command *cmd, int code,
const char *fmt, ...)
{
const char *errmsg;
struct json_result *r;
va_list ap;
va_start(ap, fmt);
command_fail_generic(cmd, code, NULL, tal_vfmt(cmd, fmt, ap));
errmsg = tal_vfmt(cmd, fmt, ap);
va_end(ap);
r = json_stream_fail_nodata(cmd, code, errmsg);
command_failed(cmd, r);
}
void command_still_pending(struct command *cmd)
@ -423,12 +403,23 @@ void command_still_pending(struct command *cmd)
cmd->pending = true;
}
static void jcon_start(struct json_connection *jcon, const char *id)
{
jcon_append(jcon, "{ \"jsonrpc\": \"2.0\", \"id\" : ");
jcon_append(jcon, id);
jcon_append(jcon, ", ");
}
static void json_command_malformed(struct json_connection *jcon,
const char *id,
const char *error)
{
return connection_complete_error(jcon, NULL, id, error,
JSONRPC2_INVALID_REQUEST, NULL);
jcon_start(jcon, id);
jcon_append(jcon,
tal_fmt(tmpctx, " \"error\" : "
"{ \"code\" : %d,"
" \"message\" : \"%s\" } }\n",
JSONRPC2_INVALID_REQUEST, error));
}
/* Returns true if command already completed. */
@ -470,6 +461,10 @@ static bool parse_request(struct json_connection *jcon, const jsmntok_t tok[])
c->mode = CMD_NORMAL;
c->ok = NULL;
jcon->command = c;
tal_add_destructor(c, destroy_command);
/* Write start of response: rest will be appended directly. */
jcon_start(jcon, c->id);
if (!method || !params) {
command_fail(c, JSONRPC2_INVALID_REQUEST,
@ -513,12 +508,18 @@ static bool parse_request(struct json_connection *jcon, const jsmntok_t tok[])
/* Mutual recursion */
static struct io_plan *locked_write_json(struct io_conn *conn,
struct json_connection *jcon);
static struct io_plan *write_json(struct io_conn *conn,
struct json_connection *jcon);
static struct io_plan *write_json_done(struct io_conn *conn,
struct json_connection *jcon)
{
membuf_consume(&jcon->outbuf, jcon->out_amount);
/* If we have more to write, do it now. */
if (membuf_num_elems(&jcon->outbuf))
return write_json(conn, jcon);
if (jcon->stop) {
log_unusual(jcon->log, "JSON-RPC shutdown");
/* Return us to toplevel lightningd.c */
@ -526,8 +527,10 @@ static struct io_plan *write_json_done(struct io_conn *conn,
return io_close(conn);
}
/* Wake reader. */
io_wake(conn);
/* If command is done and we've output everything, wake read_json
* for next command. */
if (!jcon->command)
io_wake(conn);
io_lock_release(jcon->lock);
/* Wait for more output. */

8
lightningd/jsonrpc.h

@ -7,6 +7,7 @@
#include <ccan/membuf/membuf.h>
#include <common/io_lock.h>
#include <common/json.h>
#include <stdarg.h>
struct bitcoin_txid;
struct wireaddr;
@ -40,10 +41,6 @@ struct command {
bool *ok;
/* Have we started a json stream already? For debugging. */
bool have_json_stream;
/* FIXME: Temporary. */
int failcode;
const char *errmsg;
};
struct json_connection {
@ -97,6 +94,9 @@ void PRINTF_FMT(3, 4) command_fail(struct command *cmd, int code,
/* Mainly for documentation, that we plan to close this later. */
void command_still_pending(struct command *cmd);
/* Low level jcon routines. */
void jcon_append(struct json_connection *jcon, const char *str);
void jcon_append_vfmt(struct json_connection *jcon, const char *fmt, va_list ap);
/* For initialization */
void setup_jsonrpc(struct lightningd *ld, const char *rpc_filename);

1
lightningd/test/Makefile

@ -11,6 +11,7 @@ LIGHTNINGD_TEST_COMMON_OBJS := \
common/bech32.o \
common/daemon_conn.o \
common/htlc_state.o \
common/io_lock.o \
common/json.o \
common/pseudorand.o \
common/memleak.o \

32
lightningd/test/run-jsonrpc.c

@ -27,18 +27,6 @@ u32 get_block_height(const struct chain_topology *topo UNNEEDED)
/* Generated stub for get_chainparams */
const struct chainparams *get_chainparams(const struct lightningd *ld UNNEEDED)
{ fprintf(stderr, "get_chainparams called!\n"); abort(); }
/* Generated stub for io_lock_acquire_out_ */
struct io_plan *io_lock_acquire_out_(struct io_conn *conn UNNEEDED, struct io_lock *lock UNNEEDED,
struct io_plan *(*next)(struct io_conn * UNNEEDED,
void *) UNNEEDED,
void *arg UNNEEDED)
{ fprintf(stderr, "io_lock_acquire_out_ called!\n"); abort(); }
/* Generated stub for io_lock_new */
struct io_lock *io_lock_new(const tal_t *ctx UNNEEDED)
{ fprintf(stderr, "io_lock_new called!\n"); abort(); }
/* Generated stub for io_lock_release */
void io_lock_release(struct io_lock *lock UNNEEDED)
{ fprintf(stderr, "io_lock_release called!\n"); abort(); }
/* Generated stub for json_feerate_estimate */
bool json_feerate_estimate(struct command *cmd UNNEEDED,
u32 **feerate_per_kw UNNEEDED, enum feerate feerate UNNEEDED)
@ -71,6 +59,7 @@ bool deprecated_apis;
static int test_json_filter(void)
{
struct command *cmd = talz(NULL, struct command);
struct json_connection *jcon = talz(cmd, struct json_connection);
struct json_result *result = json_stream_success(cmd);
jsmntok_t *toks;
const jsmntok_t *x;
@ -79,6 +68,12 @@ static int test_json_filter(void)
char *badstr = tal_arr(result, char, 256);
const char *str;
/* We need to initialize membuf so we can gather results. */
cmd->jcon = jcon;
jcon->lock = io_lock_new(jcon);
membuf_init(&jcon->outbuf,
tal_arr(cmd, char, 64), 64, membuf_tal_realloc);
/* Fill with junk, and nul-terminate (256 -> 0) */
for (i = 1; i < 257; i++)
badstr[i-1] = i;
@ -88,7 +83,8 @@ static int test_json_filter(void)
json_object_end(result);
/* Parse back in, make sure nothing crazy. */
str = json_result_string(result);
str = tal_strndup(cmd, membuf_elems(&jcon->outbuf),
membuf_num_elems(&jcon->outbuf));
toks = json_parse_input(str, strlen(str), &valid);
assert(valid);
@ -117,18 +113,26 @@ static void test_json_escape(void)
for (i = 1; i < 256; i++) {
char badstr[2];
struct command *cmd = talz(NULL, struct command);
struct json_connection *jcon = talz(cmd, struct json_connection);
struct json_result *result = json_stream_success(cmd);
struct json_escaped *esc;
badstr[0] = i;
badstr[1] = 0;
/* We need to initialize membuf so we can gather results. */
cmd->jcon = jcon;
jcon->lock = io_lock_new(jcon);
membuf_init(&jcon->outbuf,
tal_arr(cmd, char, 64), 64, membuf_tal_realloc);
json_object_start(result, NULL);
esc = json_escape(NULL, badstr);
json_add_escaped_string(result, "x", take(esc));
json_object_end(result);
const char *str = json_result_string(result);
const char *str = tal_strndup(cmd, membuf_elems(&jcon->outbuf),
membuf_num_elems(&jcon->outbuf));
if (i == '\\' || i == '"'
|| i == '\n' || i == '\r' || i == '\b'
|| i == '\t' || i == '\f')

6
lightningd/test/run-param.c

@ -40,6 +40,12 @@ const char *feerate_name(enum feerate feerate UNNEEDED)
/* Generated stub for fmt_wireaddr_without_port */
char *fmt_wireaddr_without_port(const tal_t *ctx UNNEEDED, const struct wireaddr *a UNNEEDED)
{ fprintf(stderr, "fmt_wireaddr_without_port called!\n"); abort(); }
/* Generated stub for jcon_append */
void jcon_append(struct json_connection *jcon UNNEEDED, const char *str UNNEEDED)
{ fprintf(stderr, "jcon_append called!\n"); abort(); }
/* Generated stub for jcon_append_vfmt */
void jcon_append_vfmt(struct json_connection *jcon UNNEEDED, const char *fmt UNNEEDED, va_list ap UNNEEDED)
{ fprintf(stderr, "jcon_append_vfmt called!\n"); abort(); }
/* Generated stub for json_feerate_estimate */
bool json_feerate_estimate(struct command *cmd UNNEEDED,
u32 **feerate_per_kw UNNEEDED, enum feerate feerate UNNEEDED)

8
tests/test_closing.py

@ -602,11 +602,9 @@ def test_onchain_dust_out(node_factory, bitcoind, executor):
bitcoind.generate_block(3)
# It should fail.
with pytest.raises(RpcError):
with pytest.raises(RpcError, match=r'WIRE_PERMANENT_CHANNEL_FAILURE: missing in commitment tx'):
payfuture.result(5)
l1.daemon.wait_for_log('WIRE_PERMANENT_CHANNEL_FAILURE: missing in commitment tx')
# Retry payment, this should fail (and, as a side-effect, tickle a
# bug).
with pytest.raises(RpcError, match=r'WIRE_UNKNOWN_NEXT_PEER'):
@ -682,11 +680,9 @@ def test_onchain_timeout(node_factory, bitcoind, executor):
bitcoind.generate_block(3)
# It should fail.
with pytest.raises(RpcError):
with pytest.raises(RpcError, match=r'WIRE_PERMANENT_CHANNEL_FAILURE: timed out'):
payfuture.result(5)
l1.daemon.wait_for_log('WIRE_PERMANENT_CHANNEL_FAILURE: timed out')
# 2 later, l1 spends HTLC (5 blocks total).
bitcoind.generate_block(2)
l1.wait_for_onchaind_broadcast('OUR_DELAYED_RETURN_TO_WALLET',

6
tests/test_pay.py

@ -100,9 +100,8 @@ def test_pay_disconnect(node_factory, bitcoind):
wait_for(lambda: [c['active'] for c in l1.rpc.listchannels()['channels']] == [False, False])
# Can't pay while its offline.
with pytest.raises(RpcError):
with pytest.raises(RpcError, match=r'failed: WIRE_TEMPORARY_CHANNEL_FAILURE \(First peer not ready\)'):
l1.rpc.sendpay(route, rhash)
l1.daemon.wait_for_log('failed: WIRE_TEMPORARY_CHANNEL_FAILURE \\(First peer not ready\\)')
l2.start()
l1.daemon.wait_for_log('peer_out WIRE_CHANNEL_REESTABLISH')
@ -114,10 +113,9 @@ def test_pay_disconnect(node_factory, bitcoind):
l1.daemon.wait_for_log(r'Peer permanent failure in CHANNELD_NORMAL: lightning_channeld: received ERROR channel .*: update_fee \d+ outside range 1875-75000')
# Should fail due to permenant channel fail
with pytest.raises(RpcError):
with pytest.raises(RpcError, match=r'failed: WIRE_UNKNOWN_NEXT_PEER \(First peer not ready\)'):
l1.rpc.sendpay(route, rhash)
l1.daemon.wait_for_log('failed: WIRE_UNKNOWN_NEXT_PEER \\(First peer not ready\\)')
assert not l1.daemon.is_in_log('Payment is still in progress')
# After it sees block, someone should close channel.

Loading…
Cancel
Save