From 230730eca4079b75d6112f0e3d34ea5410eff80b Mon Sep 17 00:00:00 2001 From: Christian Decker Date: Thu, 29 Nov 2018 15:41:09 +0100 Subject: [PATCH] plugin: Migrate request creation to json_stream We can use the internal buffering of the json_stream instead of manually building JSON-RPC calls. This makes it a lot easier to handle these requests. Notice that we do not flush concurrently and still buffer all the things, but it avoids double-buffering things. Signed-off-by: Christian Decker --- lightningd/plugin.c | 184 +++++++++++++++++++++++++------------------- 1 file changed, 104 insertions(+), 80 deletions(-) diff --git a/lightningd/plugin.c b/lightningd/plugin.c index 6601eae79..33dab208b 100644 --- a/lightningd/plugin.c +++ b/lightningd/plugin.c @@ -42,9 +42,10 @@ struct plugin { char *buffer; size_t used, len_read; - /* Stuff we write */ - struct list_head output; - const char *outbuf; + /* Our json_streams. Since multiple streams could start + * returning data at once, we always service these in order, + * freeing once empty. */ + struct json_stream **js_arr; struct log *log; @@ -69,6 +70,7 @@ struct plugin_request { const char *json_params; const char *response; const jsmntok_t *resulttok, *errortok, *toks; + struct json_stream *stream; /* The response handler to be called on success or error */ void (*cb)(const struct plugin_request *, void *); @@ -90,11 +92,6 @@ struct plugins { struct timers timers; }; -struct json_output { - struct list_node list; - const char *json; -}; - /* Represents a pending JSON-RPC request that was forwarded to a * plugin and is currently waiting for it to return the result. */ struct plugin_rpc_request { @@ -138,7 +135,8 @@ void plugin_register(struct plugins *plugins, const char* path TAKES) list_add_tail(&plugins->plugins, &p->list); p->plugins = plugins; p->cmd = tal_strdup(p, path); - p->outbuf = NULL; + p->js_arr = tal_arr(p, struct json_stream *, 0); + p->used = 0; p->log = new_log(p, plugins->log_book, "plugin-%s", path_basename(tmpctx, p->cmd)); @@ -201,6 +199,62 @@ static void PRINTF_FMT(2,3) plugin_kill(struct plugin *plugin, char *fmt, ...) tal_free(plugin); } +/** + * Create the header of a JSON-RPC request and return open stream. + * + * This is a partial request, missing the params element, which the + * caller needs to add. We can't open it yet since we don't know + * whether it is supposed to be an object (name-value pairs) or an + * array. + */ +static struct plugin_request * +plugin_request_new_(struct plugin *plugin, const char *method, + void (*cb)(const struct plugin_request *, void *), + void *arg) +{ + static u64 next_request_id = 0; + struct plugin_request *req = tal(plugin, struct plugin_request); + u64 request_id = next_request_id++; + + req->id = request_id; + req->method = tal_strdup(req, method); + req->cb = cb; + req->arg = arg; + req->plugin = plugin; + + /* We will not concurrently drain, if we do we must set the + * writer to non-NULL */ + req->stream = new_json_stream(req, NULL); + + /* Add to map so we can find it later when routing the response */ + uintmap_add(&plugin->plugins->pending_requests, req->id, req); + + json_object_start(req->stream, NULL); + json_add_string(req->stream, "jsonrpc", "2.0"); + json_add_string(req->stream, "method", method); + json_add_u64(req->stream, "id", request_id); + return req; +} + +#define plugin_request_new(plugin, method, cb, arg) \ + plugin_request_new_( \ + (plugin), (method), \ + typesafe_cb_preargs(void, void *, (cb), (arg), \ + const struct plugin_request *), \ + (arg)) + +/** + * Given a request, send it to the plugin. + */ +static void plugin_request_queue(struct plugin_request *req) +{ + /* Finish the `params` object and submit the request */ + json_object_end(req->stream); /* root element */ + json_stream_append(req->stream, "\n\n"); + *tal_arr_expand(&req->plugin->js_arr) = req->stream; + io_wake(req->plugin); +} + /** * Try to parse a complete message from the plugin's buffer. * @@ -300,71 +354,31 @@ static struct io_plan *plugin_read_json(struct io_conn *conn UNUSED, &plugin->len_read, plugin_read_json, plugin); } -static struct io_plan *plugin_write_json(struct io_conn *conn UNUSED, - struct plugin *plugin) +/* Mutual recursion */ +static struct io_plan *plugin_write_json(struct io_conn *conn, + struct plugin *plugin); + +static struct io_plan *plugin_stream_complete(struct io_conn *conn, struct json_stream *js, struct plugin *plugin) { - struct json_output *out; - if (plugin->outbuf) - plugin->outbuf = tal_free(plugin->outbuf); - - out = list_pop(&plugin->output, struct json_output, list); - if (!out) { - if (plugin->stop) { - return io_close(conn); - } else { - return io_out_wait(plugin->stdin_conn, plugin, - plugin_write_json, plugin); - } - } + size_t pending = tal_count(plugin->js_arr); + /* Remove js and shift all remainig over */ + tal_free(plugin->js_arr[0]); + memmove(plugin->js_arr, plugin->js_arr + 1, (pending - 1) * sizeof(plugin->js_arr[0])); + tal_resize(&plugin->js_arr, pending-1); - /* We have a message we'd like to send */ - plugin->outbuf = tal_steal(plugin, out->json); - tal_free(out); - return io_write(conn, plugin->outbuf, strlen(plugin->outbuf), - plugin_write_json, plugin); + return plugin_write_json(conn, plugin); } -static void plugin_request_send_( - struct plugin *plugin, const char *method TAKES, const char *params TAKES, - void (*cb)(const struct plugin_request *, void *), void *arg) +static struct io_plan *plugin_write_json(struct io_conn *conn, + struct plugin *plugin) { - static u64 next_request_id = 0; - struct plugin_request *req = tal(plugin, struct plugin_request); - struct json_output *out = tal(plugin, struct json_output); - u64 request_id = next_request_id++; - - req->id = request_id; - req->method = tal_strdup(req, method); - req->json_params = tal_strdup(req, params); - req->cb = cb; - req->arg = arg; - req->plugin = plugin; - - /* Add to map so we can find it later when routing the response */ - uintmap_add(&plugin->plugins->pending_requests, req->id, req); + if (tal_count(plugin->js_arr)) { + return json_stream_output(plugin->js_arr[0], plugin->stdin_conn, plugin_stream_complete, plugin); + } - /* Wrap the request in the JSON-RPC request object. Terminate - * with an empty line that serves as a hint that the JSON - * object is done. */ - out->json = tal_fmt(out, "{" - "\"jsonrpc\": \"2.0\", " - "\"method\": \"%s\", " - "\"params\" : %s, " - "\"id\" : %" PRIu64 " }\n\n", - method, params, request_id); - - /* Queue and notify the writer */ - list_add_tail(&plugin->output, &out->list); - io_wake(plugin); + return io_out_wait(conn, plugin, plugin_write_json, plugin); } -#define plugin_request_send(plugin, method, params, cb, arg) \ - plugin_request_send_( \ - (plugin), (method), (params), \ - typesafe_cb_preargs(void, void *, (cb), (arg), \ - const struct plugin_request *), \ - (arg)) - static struct io_plan *plugin_stdin_conn_init(struct io_conn *conn, struct plugin *plugin) { @@ -505,6 +519,7 @@ static void plugin_rpcmethod_dispatch(struct command *cmd, const char *buffer, struct plugin_rpc_request *request; struct plugins *plugins = cmd->ld->plugins; struct plugin *plugin; + struct plugin_request *req; if (cmd->mode == CMD_USAGE) { cmd->usage = "[params]"; @@ -549,7 +564,9 @@ found: assert(request->plugin); tal_steal(request->plugin, request); - plugin_request_send(request->plugin, request->method, request->params, plugin_rpcmethod_cb, request); + req = plugin_request_new(request->plugin, request->method, plugin_rpcmethod_cb, request); + json_stream_append_fmt(req->stream, ", \"params\": %s", request->params); + plugin_request_queue(req); command_still_pending(cmd); } @@ -735,6 +752,7 @@ void plugins_init(struct plugins *plugins) char **cmd; int stdin, stdout; struct timer *expired; + struct plugin_request *req; plugins->pending_manifests = 0; uintmap_init(&plugins->pending_requests); @@ -748,17 +766,17 @@ void plugins_init(struct plugins *plugins) if (p->pid == -1) fatal("error starting plugin '%s': %s", p->cmd, strerror(errno)); - - list_head_init(&p->output); p->buffer = tal_arr(p, char, 64); - p->used = 0; p->stop = false; /* Create two connections, one read-only on top of p->stdin, and one * write-only on p->stdout */ io_new_conn(p, stdout, plugin_stdout_conn_init, p); io_new_conn(p, stdin, plugin_stdin_conn_init, p); - plugin_request_send(p, "getmanifest", "[]", plugin_manifest_cb, p); + req = plugin_request_new(p, "getmanifest", plugin_manifest_cb, p); + json_array_start(req->stream, "params"); + json_array_end(req->stream); + plugin_request_queue(req); plugins->pending_manifests++; p->timeout_timer = new_reltimer( &plugins->timers, p, time_from_sec(PLUGIN_MANIFEST_TIMEOUT), @@ -787,19 +805,25 @@ static void plugin_config_cb(const struct plugin_request *req, static void plugin_config(struct plugin *plugin) { struct plugin_opt *opt; - bool first = true; - const char *name, *sep; - char *conf = tal_fmt(tmpctx, "{\n \"options\": {"); + const char *name; + struct plugin_request *req; + + /* No writer since we don't flush concurrently. */ + req = plugin_request_new(plugin, "init", plugin_config_cb, plugin); + json_object_start(req->stream, "params"); /* start of .params */ + + /* Add .params.options */ + json_object_start(req->stream, "options"); list_for_each(&plugin->plugin_opts, opt, list) { /* Trim the `--` that we added before */ name = opt->name + 2; - /* Separator between elements in the same object */ - sep = first?"":","; - first = false; - tal_append_fmt(&conf, "%s\n \"%s\": \"%s\"", sep, name, opt->value); + json_add_string(req->stream, name, opt->value); } - tal_append_fmt(&conf, "\n }\n}"); - plugin_request_send(plugin, "init", conf, plugin_config_cb, plugin); + json_object_end(req->stream); /* end of .params.options */ + + json_object_end(req->stream); /* end of .params */ + + plugin_request_queue(req); } void plugins_config(struct plugins *plugins)