From c403415caad7b324299cc2d3c25fc73d7fc9494c Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Fri, 19 Oct 2018 11:47:48 +1030 Subject: [PATCH] lightningd: format JSON directly into json connection membuf. My test case is a mainnet gossip store with 22107 channels, and time to do `lightning-cli listchannels`: Before: `lightning-cli listchannels` DEVELOPER=0 real 0m1.303000-1.324000(1.3114+/-0.0091)s After: real 0m0.629000-0.695000(0.64985+/-0.019)s Signed-off-by: Rusty Russell --- lightningd/json.c | 62 +++++++----- lightningd/json.h | 15 ++- lightningd/jsonrpc.c | 185 +++++++++++++++++----------------- lightningd/jsonrpc.h | 8 +- lightningd/test/Makefile | 1 + lightningd/test/run-jsonrpc.c | 32 +++--- lightningd/test/run-param.c | 6 ++ tests/test_closing.py | 8 +- tests/test_pay.py | 6 +- 9 files changed, 175 insertions(+), 148 deletions(-) diff --git a/lightningd/json.c b/lightningd/json.c index 3ea5417de..ef9030916 100644 --- a/lightningd/json.c +++ b/lightningd/json.c @@ -491,31 +491,33 @@ struct json_result { /* True if we haven't yet put an element in current wrapping */ bool empty; - /* tal_count() of this is strlen() + 1 */ - char *s; + /* The command we're attached to */ + struct command *cmd; }; static void result_append(struct json_result *res, const char *str) { - size_t len = tal_count(res->s) - 1; + struct json_connection *jcon = res->cmd->jcon; - tal_resize(&res->s, len + strlen(str) + 1); - strcpy(res->s + len, str); + /* Don't do anything if they're disconnected. */ + if (!jcon) + return; + + jcon_append(jcon, str); } static void PRINTF_FMT(2,3) result_append_fmt(struct json_result *res, const char *fmt, ...) { - size_t len = tal_count(res->s) - 1, fmtlen; + struct json_connection *jcon = res->cmd->jcon; va_list ap; - va_start(ap, fmt); - fmtlen = vsnprintf(NULL, 0, fmt, ap); - va_end(ap); + /* Don't do anything if they're disconnected. */ + if (!jcon) + return; - tal_resize(&res->s, len + fmtlen + 1); va_start(ap, fmt); - vsprintf(res->s + len, fmt, ap); + jcon_append_vfmt(jcon, fmt, ap); va_end(ap); } @@ -707,7 +709,7 @@ static struct json_result *new_json_stream(struct command *cmd) { struct json_result *r = tal(cmd, struct json_result); - r->s = tal_strdup(r, ""); + r->cmd = cmd; #if DEVELOPER r->wrapping = tal_arr(r, jsmntype_t, 0); #endif @@ -721,27 +723,33 @@ static struct json_result *new_json_stream(struct command *cmd) struct json_result *json_stream_success(struct command *cmd) { - cmd->failcode = 0; - return new_json_stream(cmd); + struct json_result *r; + r = new_json_stream(cmd); + result_append(r, "\"result\" : "); + return r; } -struct json_result *json_stream_fail(struct command *cmd, - int code, - const char *errmsg) +struct json_result *json_stream_fail_nodata(struct command *cmd, + int code, + const char *errmsg) { + struct json_result *r = new_json_stream(cmd); + assert(code); assert(errmsg); - cmd->failcode = code; - cmd->errmsg = tal_strdup(cmd, errmsg); - return new_json_stream(cmd); + + result_append_fmt(r, " \"error\" : " + "{ \"code\" : %d," + " \"message\" : \"%s\"", code, errmsg); + return r; } -const char *json_result_string(const struct json_result *result) +struct json_result *json_stream_fail(struct command *cmd, + int code, + const char *errmsg) { -#if DEVELOPER - assert(tal_count(result->wrapping) == 0); -#endif - assert(result->indent == 0); - assert(tal_count(result->s) == strlen(result->s) + 1); - return result->s; + struct json_result *r = json_stream_fail_nodata(cmd, code, errmsg); + + result_append(r, ", \"data\" : "); + return r; } diff --git a/lightningd/json.h b/lightningd/json.h index 66135b3a0..b6a8e0fc2 100644 --- a/lightningd/json.h +++ b/lightningd/json.h @@ -159,7 +159,7 @@ bool json_tok_tok(struct command *cmd, const char *name, const jsmntok_t **out); -/* Creating JSON strings */ +/* Creating JSON output */ /* '"fieldname" : [ ' or '[ ' if fieldname is NULL */ void json_array_start(struct json_result *ptr, const char *fieldname); @@ -192,6 +192,18 @@ struct json_result *json_stream_fail(struct command *cmd, int code, const char *errmsg); +/** + * json_stream_fail_nodata - start streaming a failed json result. + * @cmd: the command we're running. + * @code: the error code from lightningd/jsonrpc_errors.h + * @errmsg: the error string. + * + * This is used by command_fail(), which doesn't add any JSON data. + */ +struct json_result *json_stream_fail_nodata(struct command *cmd, + int code, + const char *errmsg); + /* '"fieldname" : "value"' or '"value"' if fieldname is NULL. Turns * any non-printable chars into JSON escapes, but leaves existing escapes alone. */ @@ -227,5 +239,4 @@ void json_add_hex_talarr(struct json_result *result, const tal_t *data); void json_add_object(struct json_result *result, ...); -const char *json_result_string(const struct json_result *result); #endif /* LIGHTNING_LIGHTNINGD_JSON_H */ diff --git a/lightningd/jsonrpc.c b/lightningd/jsonrpc.c index 207d308a6..dc71254c8 100644 --- a/lightningd/jsonrpc.c +++ b/lightningd/jsonrpc.c @@ -55,7 +55,7 @@ static void destroy_jcon(struct json_connection *jcon) } /* FIXME: This, or something prettier (io_replan?) belong in ccan/io! */ -static UNNEEDED void adjust_io_write(struct io_conn *conn, ptrdiff_t delta) +static void adjust_io_write(struct io_conn *conn, ptrdiff_t delta) { conn->plan[IO_OUT].arg.u1.cp += delta; } @@ -283,74 +283,55 @@ static const struct json_command *find_cmd(const char *buffer, return NULL; } -static void json_done(struct json_connection *jcon, - struct command *cmd, - const char *json TAKES) +/* Make sure jcon->outbuf has room for len */ +static void json_connection_mkroom(struct json_connection *jcon, size_t len) { - /* Queue for writing, and wake writer. */ - size_t len = strlen(json); - char *p = membuf_add(&jcon->outbuf, len); - memcpy(p, json, len); - if (taken(json)) - tal_free(json); - io_wake(jcon); + ptrdiff_t delta = membuf_prepare_space(&jcon->outbuf, len); - /* Can be NULL if we failed to parse! */ - assert(jcon->command == cmd); - jcon->command = tal_free(cmd); + /* If io_write is in progress, we shift it to point to new buffer pos */ + if (io_lock_taken(jcon->lock)) + adjust_io_write(jcon->conn, delta); } -static void connection_complete_ok(struct json_connection *jcon, - struct command *cmd, - const char *id, - const struct json_result *result) +void jcon_append(struct json_connection *jcon, const char *str) { - assert(id != NULL); - assert(result != NULL); - if (cmd->ok) - *(cmd->ok) = true; + size_t len = strlen(str); + + json_connection_mkroom(jcon, len); + memcpy(membuf_add(&jcon->outbuf, len), str, len); - /* This JSON is simple enough that we build manually */ - json_done(jcon, cmd, take(tal_fmt(jcon, - "{ \"jsonrpc\": \"2.0\", " - "\"result\" : %s," - " \"id\" : %s }\n", - json_result_string(result), id))); + /* Wake writer. */ + io_wake(jcon); } -static void connection_complete_error(struct json_connection *jcon, - struct command *cmd, - const char *id, - const char *errmsg, - int code, - const struct json_result *data) +void jcon_append_vfmt(struct json_connection *jcon, const char *fmt, va_list ap) { - struct json_escaped *esc; - const char *data_str; + size_t fmtlen; + va_list ap2; - esc = json_escape(tmpctx, errmsg); - if (data) - data_str = tal_fmt(tmpctx, ", \"data\" : %s", - json_result_string(data)); - else - data_str = ""; + /* Make a copy in case we need it below. */ + va_copy(ap2, ap); - assert(id != NULL); + /* Try printing in place first. */ + fmtlen = vsnprintf(membuf_space(&jcon->outbuf), + membuf_num_space(&jcon->outbuf), fmt, ap); - /* cmd *can be* NULL if we failed to parse! */ - if (cmd && cmd->ok) - *(cmd->ok) = false; + /* Horrible subtlety: vsnprintf *will* NUL terminate, even if it means + * chopping off the last character. So if fmtlen == + * membuf_num_space(&jcon->outbuf), the result was truncated! */ + if (fmtlen < membuf_num_space(&jcon->outbuf)) { + membuf_added(&jcon->outbuf, fmtlen); + } else { + /* Make room for NUL terminator, even though we don't want it */ + json_connection_mkroom(jcon, fmtlen + 1); + vsprintf(membuf_space(&jcon->outbuf), fmt, ap2); + membuf_added(&jcon->outbuf, fmtlen); + } - json_done(jcon, cmd, take(tal_fmt(tmpctx, - "{ \"jsonrpc\": \"2.0\", " - " \"error\" : " - "{ \"code\" : %d," - " \"message\" : \"%s\"%s }," - " \"id\" : %s }\n", - code, - esc->s, - data_str, - id))); + va_end(ap2); + + /* Wake writer. */ + io_wake(jcon); } struct json_result *null_response(struct command *cmd) @@ -363,57 +344,56 @@ struct json_result *null_response(struct command *cmd) return response; } -void command_success(struct command *cmd, struct json_result *result) +/* This can be called directly on shutdown, even with unfinished cmd */ +static void destroy_command(struct command *cmd) { - struct json_connection *jcon = cmd->jcon; - - if (!jcon) { + if (!cmd->jcon) { log_debug(cmd->ld->log, "Command returned result after jcon close"); - tal_free(cmd); return; } - assert(jcon->command == cmd); - connection_complete_ok(jcon, cmd, cmd->id, result); + + assert(cmd->jcon->command == cmd); + cmd->jcon->command = NULL; } -static void command_fail_generic(struct command *cmd, - int code, - const struct json_result *data, - const char *error) +/* FIXME: Remove result arg here! */ +void command_success(struct command *cmd, struct json_result *result) { - struct json_connection *jcon = cmd->jcon; - - if (!jcon) { - log_debug(cmd->ld->log, - "%s: Command failed after jcon close", - cmd->json_cmd->name); - tal_free(cmd); - return; - } - - /* cmd->json_cmd can be NULL, if we're failing for command not found! */ - log_debug(jcon->log, "Failing %s: %s", - cmd->json_cmd ? cmd->json_cmd->name : "invalid cmd", - error); - - assert(jcon->command == cmd); - connection_complete_error(jcon, cmd, cmd->id, error, code, data); + assert(cmd); + assert(cmd->have_json_stream); + if (cmd->jcon) + jcon_append(cmd->jcon, " }\n"); + if (cmd->ok) + *(cmd->ok) = true; + tal_free(cmd); } +/* FIXME: Remove result arg here! */ void command_failed(struct command *cmd, struct json_result *result) { - assert(cmd->failcode != 0); - command_fail_generic(cmd, cmd->failcode, result, cmd->errmsg); + assert(cmd->have_json_stream); + /* Have to close error */ + if (cmd->jcon) + jcon_append(cmd->jcon, " } }\n"); + if (cmd->ok) + *(cmd->ok) = false; + tal_free(cmd); } void PRINTF_FMT(3, 4) command_fail(struct command *cmd, int code, const char *fmt, ...) { + const char *errmsg; + struct json_result *r; va_list ap; + va_start(ap, fmt); - command_fail_generic(cmd, code, NULL, tal_vfmt(cmd, fmt, ap)); + errmsg = tal_vfmt(cmd, fmt, ap); va_end(ap); + r = json_stream_fail_nodata(cmd, code, errmsg); + + command_failed(cmd, r); } void command_still_pending(struct command *cmd) @@ -423,12 +403,23 @@ void command_still_pending(struct command *cmd) cmd->pending = true; } +static void jcon_start(struct json_connection *jcon, const char *id) +{ + jcon_append(jcon, "{ \"jsonrpc\": \"2.0\", \"id\" : "); + jcon_append(jcon, id); + jcon_append(jcon, ", "); +} + static void json_command_malformed(struct json_connection *jcon, const char *id, const char *error) { - return connection_complete_error(jcon, NULL, id, error, - JSONRPC2_INVALID_REQUEST, NULL); + jcon_start(jcon, id); + jcon_append(jcon, + tal_fmt(tmpctx, " \"error\" : " + "{ \"code\" : %d," + " \"message\" : \"%s\" } }\n", + JSONRPC2_INVALID_REQUEST, error)); } /* Returns true if command already completed. */ @@ -470,6 +461,10 @@ static bool parse_request(struct json_connection *jcon, const jsmntok_t tok[]) c->mode = CMD_NORMAL; c->ok = NULL; jcon->command = c; + tal_add_destructor(c, destroy_command); + + /* Write start of response: rest will be appended directly. */ + jcon_start(jcon, c->id); if (!method || !params) { command_fail(c, JSONRPC2_INVALID_REQUEST, @@ -513,12 +508,18 @@ static bool parse_request(struct json_connection *jcon, const jsmntok_t tok[]) /* Mutual recursion */ static struct io_plan *locked_write_json(struct io_conn *conn, struct json_connection *jcon); +static struct io_plan *write_json(struct io_conn *conn, + struct json_connection *jcon); static struct io_plan *write_json_done(struct io_conn *conn, struct json_connection *jcon) { membuf_consume(&jcon->outbuf, jcon->out_amount); + /* If we have more to write, do it now. */ + if (membuf_num_elems(&jcon->outbuf)) + return write_json(conn, jcon); + if (jcon->stop) { log_unusual(jcon->log, "JSON-RPC shutdown"); /* Return us to toplevel lightningd.c */ @@ -526,8 +527,10 @@ static struct io_plan *write_json_done(struct io_conn *conn, return io_close(conn); } - /* Wake reader. */ - io_wake(conn); + /* If command is done and we've output everything, wake read_json + * for next command. */ + if (!jcon->command) + io_wake(conn); io_lock_release(jcon->lock); /* Wait for more output. */ diff --git a/lightningd/jsonrpc.h b/lightningd/jsonrpc.h index c297df99a..a48dfb1d8 100644 --- a/lightningd/jsonrpc.h +++ b/lightningd/jsonrpc.h @@ -7,6 +7,7 @@ #include #include #include +#include struct bitcoin_txid; struct wireaddr; @@ -40,10 +41,6 @@ struct command { bool *ok; /* Have we started a json stream already? For debugging. */ bool have_json_stream; - - /* FIXME: Temporary. */ - int failcode; - const char *errmsg; }; struct json_connection { @@ -97,6 +94,9 @@ void PRINTF_FMT(3, 4) command_fail(struct command *cmd, int code, /* Mainly for documentation, that we plan to close this later. */ void command_still_pending(struct command *cmd); +/* Low level jcon routines. */ +void jcon_append(struct json_connection *jcon, const char *str); +void jcon_append_vfmt(struct json_connection *jcon, const char *fmt, va_list ap); /* For initialization */ void setup_jsonrpc(struct lightningd *ld, const char *rpc_filename); diff --git a/lightningd/test/Makefile b/lightningd/test/Makefile index 60cc9e551..9b555b5d7 100644 --- a/lightningd/test/Makefile +++ b/lightningd/test/Makefile @@ -11,6 +11,7 @@ LIGHTNINGD_TEST_COMMON_OBJS := \ common/bech32.o \ common/daemon_conn.o \ common/htlc_state.o \ + common/io_lock.o \ common/json.o \ common/pseudorand.o \ common/memleak.o \ diff --git a/lightningd/test/run-jsonrpc.c b/lightningd/test/run-jsonrpc.c index f76e8fe4c..45a121257 100644 --- a/lightningd/test/run-jsonrpc.c +++ b/lightningd/test/run-jsonrpc.c @@ -27,18 +27,6 @@ u32 get_block_height(const struct chain_topology *topo UNNEEDED) /* Generated stub for get_chainparams */ const struct chainparams *get_chainparams(const struct lightningd *ld UNNEEDED) { fprintf(stderr, "get_chainparams called!\n"); abort(); } -/* Generated stub for io_lock_acquire_out_ */ -struct io_plan *io_lock_acquire_out_(struct io_conn *conn UNNEEDED, struct io_lock *lock UNNEEDED, - struct io_plan *(*next)(struct io_conn * UNNEEDED, - void *) UNNEEDED, - void *arg UNNEEDED) -{ fprintf(stderr, "io_lock_acquire_out_ called!\n"); abort(); } -/* Generated stub for io_lock_new */ -struct io_lock *io_lock_new(const tal_t *ctx UNNEEDED) -{ fprintf(stderr, "io_lock_new called!\n"); abort(); } -/* Generated stub for io_lock_release */ -void io_lock_release(struct io_lock *lock UNNEEDED) -{ fprintf(stderr, "io_lock_release called!\n"); abort(); } /* Generated stub for json_feerate_estimate */ bool json_feerate_estimate(struct command *cmd UNNEEDED, u32 **feerate_per_kw UNNEEDED, enum feerate feerate UNNEEDED) @@ -71,6 +59,7 @@ bool deprecated_apis; static int test_json_filter(void) { struct command *cmd = talz(NULL, struct command); + struct json_connection *jcon = talz(cmd, struct json_connection); struct json_result *result = json_stream_success(cmd); jsmntok_t *toks; const jsmntok_t *x; @@ -79,6 +68,12 @@ static int test_json_filter(void) char *badstr = tal_arr(result, char, 256); const char *str; + /* We need to initialize membuf so we can gather results. */ + cmd->jcon = jcon; + jcon->lock = io_lock_new(jcon); + membuf_init(&jcon->outbuf, + tal_arr(cmd, char, 64), 64, membuf_tal_realloc); + /* Fill with junk, and nul-terminate (256 -> 0) */ for (i = 1; i < 257; i++) badstr[i-1] = i; @@ -88,7 +83,8 @@ static int test_json_filter(void) json_object_end(result); /* Parse back in, make sure nothing crazy. */ - str = json_result_string(result); + str = tal_strndup(cmd, membuf_elems(&jcon->outbuf), + membuf_num_elems(&jcon->outbuf)); toks = json_parse_input(str, strlen(str), &valid); assert(valid); @@ -117,18 +113,26 @@ static void test_json_escape(void) for (i = 1; i < 256; i++) { char badstr[2]; struct command *cmd = talz(NULL, struct command); + struct json_connection *jcon = talz(cmd, struct json_connection); struct json_result *result = json_stream_success(cmd); struct json_escaped *esc; badstr[0] = i; badstr[1] = 0; + /* We need to initialize membuf so we can gather results. */ + cmd->jcon = jcon; + jcon->lock = io_lock_new(jcon); + membuf_init(&jcon->outbuf, + tal_arr(cmd, char, 64), 64, membuf_tal_realloc); + json_object_start(result, NULL); esc = json_escape(NULL, badstr); json_add_escaped_string(result, "x", take(esc)); json_object_end(result); - const char *str = json_result_string(result); + const char *str = tal_strndup(cmd, membuf_elems(&jcon->outbuf), + membuf_num_elems(&jcon->outbuf)); if (i == '\\' || i == '"' || i == '\n' || i == '\r' || i == '\b' || i == '\t' || i == '\f') diff --git a/lightningd/test/run-param.c b/lightningd/test/run-param.c index f9675597d..770872482 100644 --- a/lightningd/test/run-param.c +++ b/lightningd/test/run-param.c @@ -40,6 +40,12 @@ const char *feerate_name(enum feerate feerate UNNEEDED) /* Generated stub for fmt_wireaddr_without_port */ char *fmt_wireaddr_without_port(const tal_t *ctx UNNEEDED, const struct wireaddr *a UNNEEDED) { fprintf(stderr, "fmt_wireaddr_without_port called!\n"); abort(); } +/* Generated stub for jcon_append */ +void jcon_append(struct json_connection *jcon UNNEEDED, const char *str UNNEEDED) +{ fprintf(stderr, "jcon_append called!\n"); abort(); } +/* Generated stub for jcon_append_vfmt */ +void jcon_append_vfmt(struct json_connection *jcon UNNEEDED, const char *fmt UNNEEDED, va_list ap UNNEEDED) +{ fprintf(stderr, "jcon_append_vfmt called!\n"); abort(); } /* Generated stub for json_feerate_estimate */ bool json_feerate_estimate(struct command *cmd UNNEEDED, u32 **feerate_per_kw UNNEEDED, enum feerate feerate UNNEEDED) diff --git a/tests/test_closing.py b/tests/test_closing.py index 5c2c67cb7..1744839cf 100644 --- a/tests/test_closing.py +++ b/tests/test_closing.py @@ -602,11 +602,9 @@ def test_onchain_dust_out(node_factory, bitcoind, executor): bitcoind.generate_block(3) # It should fail. - with pytest.raises(RpcError): + with pytest.raises(RpcError, match=r'WIRE_PERMANENT_CHANNEL_FAILURE: missing in commitment tx'): payfuture.result(5) - l1.daemon.wait_for_log('WIRE_PERMANENT_CHANNEL_FAILURE: missing in commitment tx') - # Retry payment, this should fail (and, as a side-effect, tickle a # bug). with pytest.raises(RpcError, match=r'WIRE_UNKNOWN_NEXT_PEER'): @@ -682,11 +680,9 @@ def test_onchain_timeout(node_factory, bitcoind, executor): bitcoind.generate_block(3) # It should fail. - with pytest.raises(RpcError): + with pytest.raises(RpcError, match=r'WIRE_PERMANENT_CHANNEL_FAILURE: timed out'): payfuture.result(5) - l1.daemon.wait_for_log('WIRE_PERMANENT_CHANNEL_FAILURE: timed out') - # 2 later, l1 spends HTLC (5 blocks total). bitcoind.generate_block(2) l1.wait_for_onchaind_broadcast('OUR_DELAYED_RETURN_TO_WALLET', diff --git a/tests/test_pay.py b/tests/test_pay.py index b8cde34a8..4eea3398e 100644 --- a/tests/test_pay.py +++ b/tests/test_pay.py @@ -100,9 +100,8 @@ def test_pay_disconnect(node_factory, bitcoind): wait_for(lambda: [c['active'] for c in l1.rpc.listchannels()['channels']] == [False, False]) # Can't pay while its offline. - with pytest.raises(RpcError): + with pytest.raises(RpcError, match=r'failed: WIRE_TEMPORARY_CHANNEL_FAILURE \(First peer not ready\)'): l1.rpc.sendpay(route, rhash) - l1.daemon.wait_for_log('failed: WIRE_TEMPORARY_CHANNEL_FAILURE \\(First peer not ready\\)') l2.start() l1.daemon.wait_for_log('peer_out WIRE_CHANNEL_REESTABLISH') @@ -114,10 +113,9 @@ def test_pay_disconnect(node_factory, bitcoind): l1.daemon.wait_for_log(r'Peer permanent failure in CHANNELD_NORMAL: lightning_channeld: received ERROR channel .*: update_fee \d+ outside range 1875-75000') # Should fail due to permenant channel fail - with pytest.raises(RpcError): + with pytest.raises(RpcError, match=r'failed: WIRE_UNKNOWN_NEXT_PEER \(First peer not ready\)'): l1.rpc.sendpay(route, rhash) - l1.daemon.wait_for_log('failed: WIRE_UNKNOWN_NEXT_PEER \\(First peer not ready\\)') assert not l1.daemon.is_in_log('Payment is still in progress') # After it sees block, someone should close channel.