Browse Source

jsonrpc: allow multiple commands at once.

We now keep multiple commands for a json_connection, and an array of
json_streams.

When a command wants to write something, we allocate a new json_stream
at the end of the array.

We always output from the first available json_stream; once that
command has finished, we free that and move to the next.  Once all are
done, we wake the reader.

This means we won't read a new command if output is still pending, but
as most commands don't start writing until they're ready to write
everything, we still get command parallelism.

In particular, you can now 'waitinvoice' and 'delinvoice' and it will
work even though the 'waitinvoice' blocks.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
trytravis
Rusty Russell 6 years ago
committed by Christian Decker
parent
commit
e0d14bddb9
  1. 2
      CHANGELOG.md
  2. 7
      lightningd/json_stream.c
  3. 4
      lightningd/json_stream.h
  4. 139
      lightningd/jsonrpc.c
  5. 4
      lightningd/jsonrpc.h
  6. 2
      tests/test_misc.py

2
CHANGELOG.md

@ -22,9 +22,9 @@ changes.
### Fixed
- JSON API: uppercase invoices now parsed correctly (broken in 0.6.2).
- JSON API: commands are once again read even if one hasn't responded yet (broken in 0.6.2).
- pylightning: handle multiple simultanous RPC replies reliably.
### Security

7
lightningd/json_stream.c

@ -23,7 +23,9 @@ struct json_stream {
/* Who is io_writing from this buffer now: NULL if nobody is. */
struct io_conn *reader;
struct io_plan *(*reader_cb)(struct io_conn *conn, void *arg);
struct io_plan *(*reader_cb)(struct io_conn *conn,
struct json_stream *js,
void *arg);
void *reader_arg;
size_t len_read;
@ -263,7 +265,7 @@ static struct io_plan *json_stream_output_write(struct io_conn *conn,
/* We're not doing io_write now, unset. */
js->reader = NULL;
if (!json_stream_still_writing(js))
return js->reader_cb(conn, js->reader_arg);
return js->reader_cb(conn, js, js->reader_arg);
return io_out_wait(conn, js, json_stream_output_write, js);
}
@ -276,6 +278,7 @@ static struct io_plan *json_stream_output_write(struct io_conn *conn,
struct io_plan *json_stream_output_(struct json_stream *js,
struct io_conn *conn,
struct io_plan *(*cb)(struct io_conn *conn,
struct json_stream *js,
void *arg),
void *arg)
{

4
lightningd/json_stream.h

@ -85,12 +85,14 @@ json_add_member(struct json_stream *js, const char *fieldname,
typesafe_cb_preargs(struct io_plan *, \
void *, \
(cb), (arg), \
struct io_conn *), \
struct io_conn *, \
struct json_stream *), \
(arg))
struct io_plan *json_stream_output_(struct json_stream *js,
struct io_conn *conn,
struct io_plan *(*cb)(struct io_conn *conn,
struct json_stream *js,
void *arg),
void *arg);

139
lightningd/jsonrpc.c

@ -31,6 +31,8 @@
#include <sys/types.h>
#include <sys/un.h>
/* This represents a JSON RPC connection. It can invoke multiple commands, but
* a command can outlive the connection, which could close any time. */
struct json_connection {
/* The global state */
struct lightningd *ld;
@ -53,19 +55,53 @@ struct json_connection {
/* We've been told to stop. */
bool stop;
/* Current command. */
struct command *command;
/* Our commands */
struct list_head commands;
/* Our json_stream */
struct json_stream *js;
/* Our json_streams (owned by the commands themselves while running).
* Since multiple streams could start returning data at once, we
* always service these in order, freeing once empty. */
struct json_stream **js_arr;
};
/* The command itself usually owns the stream, because jcon may get closed.
* The command transfers ownership once it's done though. */
static struct json_stream *jcon_new_json_stream(const tal_t *ctx,
struct json_connection *jcon,
struct command *writer)
{
/* Wake writer to start streaming, in case it's not already. */
io_wake(jcon);
/* FIXME: Keep streams around for recycling. */
return *tal_arr_expand(&jcon->js_arr) = new_json_stream(ctx, writer);
}
static void jcon_remove_json_stream(struct json_connection *jcon,
struct json_stream *js)
{
for (size_t i = 0; i < tal_count(jcon->js_arr); i++) {
if (js != jcon->js_arr[i])
continue;
memmove(jcon->js_arr + i,
jcon->js_arr + i + 1,
(tal_count(jcon->js_arr) - i - 1)
* sizeof(jcon->js_arr[i]));
tal_resize(&jcon->js_arr, tal_count(jcon->js_arr)-1);
return;
}
abort();
}
/* jcon and cmd have separate lifetimes: we detach them on either destruction */
static void destroy_jcon(struct json_connection *jcon)
{
if (jcon->command) {
log_debug(jcon->log, "Abandoning command");
jcon->command->jcon = NULL;
struct command *c;
list_for_each(&jcon->commands, c, list) {
log_debug(jcon->log, "Abandoning command %s", c->json_cmd->name);
c->jcon = NULL;
}
/* Make sure this happens last! */
@ -313,9 +349,7 @@ static void destroy_command(struct command *cmd)
"Command returned result after jcon close");
return;
}
assert(cmd->jcon->command == cmd);
cmd->jcon->command = NULL;
list_del_from(&cmd->jcon->commands, &cmd->list);
}
void command_success(struct command *cmd, struct json_stream *result)
@ -375,32 +409,29 @@ static void json_command_malformed(struct json_connection *jcon,
const char *id,
const char *error)
{
/* FIXME: We only allow one command at a time */
assert(!jcon->js);
jcon->js = new_json_stream(jcon, NULL);
io_wake(jcon);
/* NULL writer is OK here, since we close it immediately. */
struct json_stream *js = jcon_new_json_stream(jcon, jcon, NULL);
json_stream_append_fmt(jcon->js,
json_stream_append_fmt(js,
"{ \"jsonrpc\": \"2.0\", \"id\" : %s,"
" \"error\" : "
"{ \"code\" : %d,"
" \"message\" : \"%s\" } }\n\n",
id, JSONRPC2_INVALID_REQUEST, error);
json_stream_close(jcon->js, NULL);
json_stream_close(js, NULL);
}
static struct json_stream *attach_json_stream(struct command *cmd)
{
struct json_stream *js = new_json_stream(cmd, cmd);
struct json_stream *js;
/* If they still care about the result, attach it to them. */
if (cmd->jcon)
js = jcon_new_json_stream(cmd, cmd->jcon, cmd);
else
js = new_json_stream(cmd, cmd);
/* If they still care about the result, wake them */
if (cmd->jcon) {
/* FIXME: We only allow one command at a time */
assert(!cmd->jcon->js);
cmd->jcon->js = js;
io_wake(cmd->jcon);
}
assert(!cmd->have_json_stream);
cmd->have_json_stream = true;
return js;
@ -447,8 +478,7 @@ struct json_stream *json_stream_fail(struct command *cmd,
return r;
}
/* Returns true if command already completed. */
static bool parse_request(struct json_connection *jcon, const jsmntok_t tok[])
static void parse_request(struct json_connection *jcon, const jsmntok_t tok[])
{
const jsmntok_t *method, *id, *params;
struct command *c;
@ -456,7 +486,7 @@ static bool parse_request(struct json_connection *jcon, const jsmntok_t tok[])
if (tok[0].type != JSMN_OBJECT) {
json_command_malformed(jcon, "null",
"Expected {} for json command");
return true;
return;
}
method = json_get_member(jcon->buffer, tok, "method");
@ -465,12 +495,12 @@ static bool parse_request(struct json_connection *jcon, const jsmntok_t tok[])
if (!id) {
json_command_malformed(jcon, "null", "No id");
return true;
return;
}
if (id->type != JSMN_STRING && id->type != JSMN_PRIMITIVE) {
json_command_malformed(jcon, "null",
"Expected string/primitive for id");
return true;
return;
}
/* This is a convenient tal parent for duration of command
@ -485,19 +515,19 @@ static bool parse_request(struct json_connection *jcon, const jsmntok_t tok[])
json_tok_len(id));
c->mode = CMD_NORMAL;
c->ok = NULL;
jcon->command = c;
list_add_tail(&jcon->commands, &c->list);
tal_add_destructor(c, destroy_command);
if (!method || !params) {
command_fail(c, JSONRPC2_INVALID_REQUEST,
method ? "No params" : "No method");
return true;
return;
}
if (method->type != JSMN_STRING) {
command_fail(c, JSONRPC2_INVALID_REQUEST,
"Expected string for method");
return true;
return;
}
c->json_cmd = find_cmd(jcon->buffer, method);
@ -506,50 +536,52 @@ static bool parse_request(struct json_connection *jcon, const jsmntok_t tok[])
"Unknown command '%.*s'",
method->end - method->start,
jcon->buffer + method->start);
return true;
return;
}
if (c->json_cmd->deprecated && !deprecated_apis) {
command_fail(c, JSONRPC2_METHOD_NOT_FOUND,
"Command '%.*s' is deprecated",
method->end - method->start,
jcon->buffer + method->start);
return true;
return;
}
db_begin_transaction(jcon->ld->wallet->db);
c->json_cmd->dispatch(c, jcon->buffer, params);
db_commit_transaction(jcon->ld->wallet->db);
/* If they didn't complete it, they must call command_still_pending */
if (jcon->command == c)
/* If they didn't complete it, they must call command_still_pending.
* If they completed it, it's freed already. */
list_for_each(&jcon->commands, c, list)
assert(c->pending);
return jcon->command == NULL;
}
/* Mutual recursion */
static struct io_plan *stream_out_complete(struct io_conn *conn,
struct json_stream *js,
struct json_connection *jcon);
static struct io_plan *start_json_stream(struct io_conn *conn,
struct json_connection *jcon)
{
/* If something has created an output buffer, start streaming. */
if (jcon->js)
return json_stream_output(jcon->js, conn,
if (tal_count(jcon->js_arr))
return json_stream_output(jcon->js_arr[0], conn,
stream_out_complete, jcon);
/* Tell reader it can run next command. */
io_wake(conn);
/* Wait for attach_json_stream */
return io_out_wait(conn, jcon, start_json_stream, jcon);
}
/* Command has completed writing, and we've written it all out to conn. */
static struct io_plan *stream_out_complete(struct io_conn *conn,
struct json_stream *js,
struct json_connection *jcon)
{
/* Free up the json_stream for next command. */
assert(jcon->js);
jcon->js = tal_free(jcon->js);
jcon_remove_json_stream(jcon, js);
if (jcon->stop) {
log_unusual(jcon->log, "JSON-RPC shutdown");
@ -558,9 +590,6 @@ static struct io_plan *stream_out_complete(struct io_conn *conn,
return io_close(conn);
}
/* Tell reader to run next command. */
io_wake(conn);
/* Wait for more output. */
return start_json_stream(conn, jcon);
}
@ -569,7 +598,7 @@ static struct io_plan *read_json(struct io_conn *conn,
struct json_connection *jcon)
{
jsmntok_t *toks;
bool valid, completed;
bool valid;
if (jcon->len_read)
log_io(jcon->log, LOG_IO_IN, "",
@ -581,7 +610,7 @@ static struct io_plan *read_json(struct io_conn *conn,
tal_resize(&jcon->buffer, jcon->used * 2);
/* We wait for pending output to be consumed, to avoid DoS */
if (jcon->js) {
if (tal_count(jcon->js_arr) != 0) {
jcon->len_read = 0;
return io_wait(conn, conn, read_json, jcon);
}
@ -607,26 +636,20 @@ static struct io_plan *read_json(struct io_conn *conn,
goto read_more;
}
completed = parse_request(jcon, toks);
parse_request(jcon, toks);
/* Remove first {}. */
memmove(jcon->buffer, jcon->buffer + toks[0].end,
tal_count(jcon->buffer) - toks[0].end);
jcon->used -= toks[0].end;
/* If we haven't completed, wait for cmd completion. */
jcon->len_read = 0;
if (!completed) {
tal_free(toks);
return io_wait(conn, conn, read_json, jcon);
}
/* If we have more to process, try again. FIXME: this still gets
* first priority in io_loop, so can starve others. Hack would be
* a (non-zero) timer, but better would be to have io_loop avoid
* such livelock */
if (jcon->used) {
tal_free(toks);
jcon->len_read = 0;
return io_always(conn, read_json, jcon);
}
@ -648,9 +671,9 @@ static struct io_plan *jcon_connected(struct io_conn *conn,
jcon->used = 0;
jcon->buffer = tal_arr(jcon, char, 64);
jcon->stop = false;
jcon->js = NULL;
jcon->js_arr = tal_arr(jcon, struct json_stream *, 0);
jcon->len_read = 0;
jcon->command = NULL;
list_head_init(&jcon->commands);
/* We want to log on destruction, so we free this in destructor. */
jcon->log = new_log(ld->log_book, ld->log_book, "%sjcon fd %i:",

4
lightningd/jsonrpc.h

@ -3,7 +3,7 @@
#include "config.h"
#include <bitcoin/chainparams.h>
#include <ccan/autodata/autodata.h>
#include <common/io_lock.h>
#include <ccan/list/list.h>
#include <common/json.h>
#include <lightningd/json_stream.h>
#include <stdarg.h>
@ -19,6 +19,8 @@ enum command_mode {
/* Context for a command (from JSON, but might outlive the connection!). */
/* FIXME: move definition into jsonrpc.c */
struct command {
/* Off json_cmd->commands */
struct list_node list;
/* The global state */
struct lightningd *ld;
/* The 'id' which we need to include in the response. */

2
tests/test_misc.py

@ -605,7 +605,7 @@ def test_multirpc(node_factory):
b'{"id":5,"jsonrpc":"2.0","method":"listpeers","params":[]}',
b'{"id":6,"jsonrpc":"2.0","method":"listpeers","params":[]}',
b'{"method": "invoice", "params": [100, "foo", "foo"], "jsonrpc": "2.0", "id": 7 }',
# FIXME: b'{"method": "waitinvoice", "params": ["foo"], "jsonrpc" : "2.0", "id": 8 }',
b'{"method": "waitinvoice", "params": ["foo"], "jsonrpc" : "2.0", "id": 8 }',
b'{"method": "delinvoice", "params": ["foo", "unpaid"], "jsonrpc" : "2.0", "id": 9 }',
]

Loading…
Cancel
Save