Browse Source

lightningd/plugin: simply patch requests through and don't interpret them (much).

We simply look for the id token, and substitute it on the way in/out.
We also need to make sure output is '\n\n' terminated.

I started this because we weren't forwarding complex errors properly
(we treated them as a string), but it's also a huge simplification.

`struct plugin_rpc_request` is eliminated entirely: the information we need
is actually inside `struct command` already.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
plugin-6
Rusty Russell 6 years ago
parent
commit
b3d30095cb
  1. 285
      lightningd/plugin.c

285
lightningd/plugin.c

@ -1,5 +1,6 @@
#include "lightningd/plugin.h"
#include <ccan/array_size/array_size.h>
#include <ccan/intmap/intmap.h>
#include <ccan/io/io.h>
#include <ccan/list/list.h>
@ -61,19 +62,14 @@ struct plugin {
struct plugin_request {
u64 id;
struct plugin *plugin;
/* Method to be called */
const char *method;
/* JSON encoded params, either a dict or an array */
const char *json_params;
const char *response;
const jsmntok_t *resulttok, *errortok, *toks;
struct json_stream *stream;
/* The response handler to be called on success or error */
void (*cb)(const struct plugin_request *, void *);
/* The response handler to be called when plugin gives us an object. */
void (*cb)(const struct plugin_request *,
const char *buffer,
const jsmntok_t *toks,
const jsmntok_t *idtok,
void *);
void *arg;
};
@ -93,20 +89,6 @@ struct plugins {
struct lightningd *ld;
};
/* Represents a pending JSON-RPC request that was forwarded to a
* plugin and is currently waiting for it to return the result. */
struct plugin_rpc_request {
/* The json-serialized ID as it was passed to us by the
* client, will be used to return the result */
const char *id;
const char *method;
const char *params;
struct plugin *plugin;
struct command *cmd;
};
/* Simple storage for plugin options inbetween registering them on the
* command line and passing them off to the plugin */
struct plugin_opt {
@ -203,25 +185,23 @@ static void PRINTF_FMT(2,3) plugin_kill(struct plugin *plugin, char *fmt, ...)
/**
* Create the header of a JSON-RPC request and return open stream.
*
* This is a partial request, missing the params element, which the
* caller needs to add. We can't open it yet since we don't know
* whether it is supposed to be an object (name-value pairs) or an
* array.
* The caller needs to add the request to req->stream.
*/
static struct plugin_request *
plugin_request_new_(struct plugin *plugin, const char *method,
void (*cb)(const struct plugin_request *, void *),
plugin_request_new_(struct plugin *plugin,
void (*cb)(const struct plugin_request *,
const char *buffer,
const jsmntok_t *toks,
const jsmntok_t *idtok,
void *),
void *arg)
{
static u64 next_request_id = 0;
struct plugin_request *req = tal(plugin, struct plugin_request);
u64 request_id = next_request_id++;
req->id = request_id;
req->method = tal_strdup(req, method);
req->id = next_request_id++;
req->cb = cb;
req->arg = arg;
req->plugin = plugin;
/* We will not concurrently drain, if we do we must set the
* writer to non-NULL */
@ -229,31 +209,27 @@ plugin_request_new_(struct plugin *plugin, const char *method,
/* Add to map so we can find it later when routing the response */
uintmap_add(&plugin->plugins->pending_requests, req->id, req);
json_object_start(req->stream, NULL);
json_add_string(req->stream, "jsonrpc", "2.0");
json_add_string(req->stream, "method", method);
json_add_u64(req->stream, "id", request_id);
return req;
}
#define plugin_request_new(plugin, method, cb, arg) \
plugin_request_new_( \
(plugin), (method), \
typesafe_cb_preargs(void, void *, (cb), (arg), \
const struct plugin_request *), \
#define plugin_request_new(plugin, cb, arg) \
plugin_request_new_( \
(plugin), \
typesafe_cb_preargs(void, void *, (cb), (arg), \
const struct plugin_request *, \
const char *buffer, \
const jsmntok_t *toks, \
const jsmntok_t *idtok), \
(arg))
/**
* Given a request, send it to the plugin.
*/
static void plugin_request_queue(struct plugin_request *req)
static void plugin_request_queue(struct plugin *plugin,
struct plugin_request *req)
{
/* Finish the `params` object and submit the request */
json_object_end(req->stream); /* root element */
json_stream_append(req->stream, "\n\n");
*tal_arr_expand(&req->plugin->js_arr) = req->stream;
io_wake(req->plugin);
*tal_arr_expand(&plugin->js_arr) = req->stream;
io_wake(plugin);
}
/**
@ -267,7 +243,7 @@ static bool plugin_read_json_one(struct plugin *plugin)
jsmntok_t *toks;
bool valid;
u64 id;
const jsmntok_t *idtok, *resulttok, *errortok;
const jsmntok_t *idtok;
struct plugin_request *request;
/* FIXME: This could be done more efficiently by storing the
@ -290,19 +266,15 @@ static bool plugin_read_json_one(struct plugin *plugin)
return false;
}
resulttok = json_get_member(plugin->buffer, toks, "result");
errortok = json_get_member(plugin->buffer, toks, "error");
idtok = json_get_member(plugin->buffer, toks, "id");
if (!idtok) {
plugin_kill(plugin, "JSON-RPC response does not contain an \"id\"-field");
return false;
} else if (!resulttok && !errortok) {
plugin_kill(plugin, "JSON-RPC response does not contain a \"result\" or \"error\" field");
return false;
}
/* We only send u64 ids, so if this fails it's a critical error */
/* We only send u64 ids, so if this fails it's a critical error (note
* that this also works if id is inside a JSON string!). */
if (!json_to_u64(plugin->buffer, idtok, &id)) {
plugin_kill(plugin, "JSON-RPC response \"id\"-field is not a u64");
return false;
@ -316,12 +288,7 @@ static bool plugin_read_json_one(struct plugin *plugin)
}
/* We expect the request->cb to copy if needed */
request->response = plugin->buffer;
request->errortok = errortok;
request->resulttok = resulttok;
request->toks = toks;
request->cb(request, request->arg);
request->cb(request, plugin->buffer, toks, idtok, request->arg);
tal_free(request);
uintmap_del(&plugin->plugins->pending_requests, id);
@ -478,25 +445,25 @@ static bool plugin_opt_add(struct plugin *plugin, const char *buffer,
/* Iterate through the options in the manifest response, and add them
* to the plugin and the command line options */
static bool plugin_opts_add(const struct plugin_request *req)
static bool plugin_opts_add(struct plugin *plugin,
const char *buffer,
const jsmntok_t *resulttok)
{
const char *buffer = req->plugin->buffer;
const jsmntok_t *options =
json_get_member(req->plugin->buffer, req->resulttok, "options");
const jsmntok_t *options = json_get_member(buffer, resulttok, "options");
if (!options) {
plugin_kill(req->plugin,
plugin_kill(plugin,
"\"result.options\" was not found in the manifest");
return false;
}
if (options->type != JSMN_ARRAY) {
plugin_kill(req->plugin, "\"result.options\" is not an array");
plugin_kill(plugin, "\"result.options\" is not an array");
return false;
}
for (size_t i = 0; i < options->size; i++)
if (!plugin_opt_add(req->plugin, buffer, json_get_arr(options, i)))
if (!plugin_opt_add(plugin, buffer, json_get_arr(options, i)))
return false;
return true;
@ -508,92 +475,91 @@ static void plugin_rpcmethod_destroy(struct json_command *cmd,
jsonrpc_command_remove(rpc, cmd->name);
}
static void json_stream_forward_change_id(struct json_stream *stream,
const char *buffer,
const jsmntok_t *toks,
const jsmntok_t *idtok,
const char *new_id)
{
/* We copy everything, but replace the id (maybe inside a string, we
* don't care) */
json_stream_append_part(stream, buffer + toks->start,
idtok->start - toks->start);
json_stream_append(stream, new_id);
json_stream_append_part(stream, buffer + idtok->end,
toks->end - idtok->end);
/* We promise it will end in '\n\n' */
/* It's an object (with an id!): definitely can't be less that "{}" */
assert(toks->end - toks->start >= 2);
if (buffer[toks->end-1] != '\n')
json_stream_append(stream, "\n\n");
else if (buffer[toks->end-2] != '\n')
json_stream_append(stream, "\n");
}
static void plugin_rpcmethod_cb(const struct plugin_request *req,
struct plugin_rpc_request *rpc_req)
const char *buffer,
const jsmntok_t *toks,
const jsmntok_t *idtok,
struct command *cmd)
{
struct json_stream *response;
const jsmntok_t *res;
assert(req->resulttok || req->errortok);
if (req->errortok) {
res = req->errortok;
command_fail(rpc_req->cmd, PLUGIN_ERROR, "%.*s",
res->end - res->start, req->response + res->start);
tal_free(rpc_req);
return;
}
res = req->resulttok;
response = json_stream_success(rpc_req->cmd);
response = json_stream_raw_for_cmd(cmd);
json_stream_forward_change_id(response, buffer, toks, idtok, cmd->id);
command_raw_complete(cmd, response);
}
json_add_member(response, NULL, "%.*s", json_tok_len(res),
json_tok_contents(req->response, res));
static struct plugin *find_plugin_for_command(struct command *cmd)
{
struct plugins *plugins = cmd->ld->plugins;
struct plugin *plugin;
command_success(rpc_req->cmd, response);
tal_free(rpc_req);
/* Find the plugin that registered this RPC call */
list_for_each(&plugins->plugins, plugin, list) {
for (size_t i=0; i<tal_count(plugin->methods); i++) {
if (streq(cmd->json_cmd->name, plugin->methods[i]))
return plugin;
}
}
/* This should never happen, it'd mean that a plugin didn't
* cleanup after dying */
abort();
}
static void plugin_rpcmethod_dispatch(struct command *cmd, const char *buffer,
const jsmntok_t *obj UNNEEDED,
const jsmntok_t *params)
const jsmntok_t *toks,
const jsmntok_t *params UNNEEDED)
{
const jsmntok_t *toks = params, *methtok, *idtok;
struct plugin_rpc_request *request;
struct plugins *plugins = cmd->ld->plugins;
const jsmntok_t *idtok;
struct plugin *plugin;
struct plugin_request *req;
char id[STR_MAX_CHARS(u64)];
if (cmd->mode == CMD_USAGE) {
/* FIXME! */
cmd->usage = "[params]";
return;
}
/* We're given the params, but we need to walk back to the
* root object, so just walk backwards until the current
* element has no parents, that's going to be the root
* element. */
while (toks->parent != -1)
toks--;
plugin = find_plugin_for_command(cmd);
methtok = json_get_member(buffer, toks, "method");
/* Find ID again (We've parsed them before, this should not fail!) */
idtok = json_get_member(buffer, toks, "id");
/* We've parsed them before, these should not fail! */
assert(idtok != NULL && methtok != NULL);
request = tal(NULL, struct plugin_rpc_request);
request->method = tal_strndup(request, buffer + methtok->start,
methtok->end - methtok->start);
request->id = tal_strndup(request, buffer + idtok->start,
idtok->end - idtok->start);
request->params = tal_strndup(request, buffer + params->start,
params->end - params->start);
request->plugin = NULL;
request->cmd = cmd;
/* Find the plugin that registered this RPC call */
list_for_each(&plugins->plugins, plugin, list) {
for (size_t i=0; i<tal_count(plugin->methods); i++) {
if (streq(request->method, plugin->methods[i])) {
request->plugin = plugin;
goto found;
}
}
}
assert(idtok != NULL);
found:
/* This should never happen, it'd mean that a plugin didn't
* cleanup after dying */
assert(request->plugin);
req = plugin_request_new(plugin, plugin_rpcmethod_cb, cmd);
snprintf(id, ARRAY_SIZE(id), "%"PRIu64, req->id);
tal_steal(request->plugin, request);
req = plugin_request_new(request->plugin, request->method, plugin_rpcmethod_cb, request);
json_stream_append_fmt(req->stream, ", \"params\": %s", request->params);
plugin_request_queue(req);
json_stream_forward_change_id(req->stream, buffer, toks, idtok, id);
plugin_request_queue(plugin, req);
command_still_pending(cmd);
}
static bool plugin_rpcmethod_add(struct plugin *plugin, const char *buffer,
static bool plugin_rpcmethod_add(struct plugin *plugin,
const char *buffer,
const jsmntok_t *meth)
{
const jsmntok_t *nametok, *desctok, *longdesctok;
@ -651,23 +617,24 @@ static bool plugin_rpcmethod_add(struct plugin *plugin, const char *buffer,
return true;
}
static bool plugin_rpcmethods_add(const struct plugin_request *req)
static bool plugin_rpcmethods_add(struct plugin *plugin,
const char *buffer,
const jsmntok_t *resulttok)
{
const char *buffer = req->plugin->buffer;
const jsmntok_t *methods =
json_get_member(req->plugin->buffer, req->resulttok, "rpcmethods");
json_get_member(buffer, resulttok, "rpcmethods");
if (!methods)
return false;
if (methods->type != JSMN_ARRAY) {
plugin_kill(req->plugin,
plugin_kill(plugin,
"\"result.rpcmethods\" is not an array");
return false;
}
for (size_t i = 0; i < methods->size; i++)
if (!plugin_rpcmethod_add(req->plugin, buffer,
if (!plugin_rpcmethod_add(plugin, buffer,
json_get_arr(methods, i)))
return false;
return true;
@ -682,21 +649,29 @@ static void plugin_manifest_timeout(struct plugin *plugin)
/**
* Callback for the plugin_manifest request.
*/
static void plugin_manifest_cb(const struct plugin_request *req, struct plugin *plugin)
static void plugin_manifest_cb(const struct plugin_request *req,
const char *buffer,
const jsmntok_t *toks,
const jsmntok_t *idtok,
struct plugin *plugin)
{
const jsmntok_t *resulttok;
/* Check if all plugins have replied to getmanifest, and break
* if they are */
plugin->plugins->pending_manifests--;
if (plugin->plugins->pending_manifests == 0)
io_break(plugin->plugins);
if (req->resulttok->type != JSMN_OBJECT) {
resulttok = json_get_member(buffer, toks, "result");
if (!resulttok || resulttok->type != JSMN_OBJECT) {
plugin_kill(plugin,
"\"getmanifest\" response is not an object");
"\"getmanifest\" result is not an object");
return;
}
if (!plugin_opts_add(req) || !plugin_rpcmethods_add(req))
if (!plugin_opts_add(plugin, buffer, resulttok)
|| !plugin_rpcmethods_add(plugin, buffer, resulttok))
plugin_kill(plugin, "Failed to register options or methods");
/* Reset timer, it'd kill us otherwise. */
tal_free(plugin->timeout_timer);
@ -768,6 +743,22 @@ void clear_plugins(struct plugins *plugins)
tal_free(p);
}
/* For our own "getmanifest" and "init" requests: starts params[] */
static void start_simple_request(struct plugin_request *req, const char *reqname)
{
json_object_start(req->stream, NULL);
json_add_string(req->stream, "jsonrpc", "2.0");
json_add_string(req->stream, "method", reqname);
json_add_u64(req->stream, "id", req->id);
}
static void end_simple_request(struct plugin *plugin, struct plugin_request *req)
{
json_object_end(req->stream);
json_stream_append(req->stream, "\n\n");
plugin_request_queue(plugin, req);
}
void plugins_init(struct plugins *plugins, const char *dev_plugin_debug)
{
struct plugin *p;
@ -799,10 +790,11 @@ void plugins_init(struct plugins *plugins, const char *dev_plugin_debug)
* write-only on p->stdout */
io_new_conn(p, stdout, plugin_stdout_conn_init, p);
io_new_conn(p, stdin, plugin_stdin_conn_init, p);
req = plugin_request_new(p, "getmanifest", plugin_manifest_cb, p);
req = plugin_request_new(p, plugin_manifest_cb, p);
start_simple_request(req, "getmanifest");
json_array_start(req->stream, "params");
json_array_end(req->stream);
plugin_request_queue(req);
end_simple_request(p, req);
plugins->pending_manifests++;
/* Don't timeout if they're running a debugger. */
if (debug)
@ -826,6 +818,9 @@ void plugins_init(struct plugins *plugins, const char *dev_plugin_debug)
}
static void plugin_config_cb(const struct plugin_request *req,
const char *buffer,
const jsmntok_t *toks,
const jsmntok_t *idtok,
struct plugin *plugin)
{
/* Nothing to be done here, this is just a report */
@ -842,7 +837,8 @@ static void plugin_config(struct plugin *plugin)
struct lightningd *ld = plugin->plugins->ld;
/* No writer since we don't flush concurrently. */
req = plugin_request_new(plugin, "init", plugin_config_cb, plugin);
req = plugin_request_new(plugin, plugin_config_cb, plugin);
start_simple_request(req, "init");
json_object_start(req->stream, "params"); /* start of .params */
/* Add .params.options */
@ -861,8 +857,7 @@ static void plugin_config(struct plugin *plugin)
json_object_end(req->stream);
json_object_end(req->stream); /* end of .params */
plugin_request_queue(req);
end_simple_request(plugin, req);
}
void plugins_config(struct plugins *plugins)

Loading…
Cancel
Save