From 07934e77bd92ca4271245b9f0ca93f57487942f9 Mon Sep 17 00:00:00 2001 From: Christian Decker Date: Fri, 28 Sep 2018 13:36:20 +0200 Subject: [PATCH] plugin: Add request muxing to the plugin subsystem --- lightningd/plugin.c | 111 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 110 insertions(+), 1 deletion(-) diff --git a/lightningd/plugin.c b/lightningd/plugin.c index d0e85d489..e3ea03329 100644 --- a/lightningd/plugin.c +++ b/lightningd/plugin.c @@ -1,9 +1,11 @@ #include "lightningd/plugin.h" +#include #include #include #include #include +#include #include #include @@ -13,6 +15,7 @@ struct plugin { char *cmd; struct io_conn *stdin_conn, *stdout_conn; bool stop; + struct plugins *plugins; /* Stuff we read */ char *buffer; @@ -23,8 +26,27 @@ struct plugin { const char *outbuf; }; +struct plugin_request { + u64 id; + /* Method to be called */ + const char *method; + + /* JSON encoded params, either a dict or an array */ + const char *json_params; + const char *response; + const jsmntok_t *resulttok, *errortok; + + /* The response handler to be called on success or error */ + void (*cb)(const struct plugin_request *, void *); + void *arg; +}; + struct plugins { struct plugin *plugins; + int pending_init; + + /* Currently pending requests by their request ID */ + UINTMAP(struct plugin_request *) pending_requests; }; struct json_output { @@ -45,6 +67,7 @@ void plugin_register(struct plugins *plugins, const char* path TAKES) size_t n = tal_count(plugins->plugins); tal_resize(&plugins->plugins, n+1); p = &plugins->plugins[n]; + p->plugins = plugins; p->cmd = tal_strdup(p, path); } @@ -58,6 +81,11 @@ static bool plugin_read_json_one(struct plugin *plugin) { jsmntok_t *toks; bool valid; + u64 id; + const jsmntok_t *idtok, *resulttok, *errortok; + struct plugin_request *request; + + toks = json_parse_input(plugin->buffer, plugin->used, &valid); if (!toks) { if (!valid) { @@ -74,7 +102,35 @@ static bool plugin_read_json_one(struct plugin *plugin) return false; } - /* FIXME(cdecker) Call dispatch to handle this message. */ + resulttok = json_get_member(plugin->buffer, toks, "result"); + errortok = json_get_member(plugin->buffer, toks, "error"); + idtok = json_get_member(plugin->buffer, toks, "id"); + + /* FIXME(cdecker) Kill the plugin if either of these fails */ + if (!idtok) { + return false; + } else if (!resulttok && !errortok) { + return false; + } + + /* We only send u64 ids, so if this fails it's a critical error */ + if (!json_to_u64(plugin->buffer, idtok, &id)) { + /* FIXME (cdecker) Log an error message and kill the plugin */ + return false; + } + + request = uintmap_get(&plugin->plugins->pending_requests, id); + + if (!request) { + /* FIXME(cdecker) Log an error and kill the plugin */ + return false; + } + + /* We expect the request->cb to copy if needed */ + request->response = plugin->buffer; + request->errortok = errortok; + request->resulttok = resulttok; + request->cb(request, request->arg); /* Move this object out of the buffer */ memmove(plugin->buffer, plugin->buffer + toks[0].end, @@ -125,6 +181,44 @@ static struct io_plan *plugin_write_json(struct io_conn *conn UNUSED, plugin_write_json, plugin); } +static void plugin_request_send_( + struct plugin *plugin, const char *method TAKES, const char *params TAKES, + void (*cb)(const struct plugin_request *, void *), void *arg) +{ + static u64 next_request_id = 0; + struct plugin_request *req = tal(plugin, struct plugin_request); + struct json_output *out = tal(plugin, struct json_output); + u64 request_id = next_request_id++; + + req->id = request_id; + req->method = tal_strdup(req, method); + req->json_params = tal_strdup(req, params); + req->cb = cb; + req->arg = arg; + + /* Add to map so we can find it later when routing the response */ + uintmap_add(&plugin->plugins->pending_requests, req->id, req); + + /* Wrap the request in the JSON-RPC request object */ + out->json = tal_fmt(out, "{" + "\"jsonrpc\": \"2.0\", " + "\"method\": \"%s\", " + "\"params\" : %s, " + "\"id\" : %" PRIu64 " }\n", + method, params, request_id); + + /* Queue and notify the writer */ + list_add_tail(&plugin->output, &out->list); + io_wake(plugin); +} + +#define plugin_request_send(plugin, method, params, cb, arg) \ + plugin_request_send_( \ + (plugin), (method), (params), \ + typesafe_cb_preargs(void, void *, (cb), (arg), \ + const struct plugin_request *), \ + (arg)) + static struct io_plan *plugin_conn_init(struct io_conn *conn, struct plugin *plugin) { @@ -145,10 +239,23 @@ static struct io_plan *plugin_conn_init(struct io_conn *conn, } } +/** + * Callback for the plugin_init request. + */ +static void plugin_init_cb(const struct plugin_request *req, struct plugin *plugin) +{ + /* Check if all plugins are initialized, and break if they are */ + plugin->plugins->pending_init--; + if (plugin->plugins->pending_init == 0) + io_break(plugin->plugins); +} + void plugins_init(struct plugins *plugins) { struct plugin *p; char **cmd; + plugins->pending_init = tal_count(plugins->plugins); + /* Spawn the plugin processes before entering the io_loop */ for (size_t i=0; iplugins); i++) { p = &plugins->plugins[i]; @@ -165,7 +272,9 @@ void plugins_init(struct plugins *plugins) * write-only on p->stdout */ io_new_conn(p, p->stdout, plugin_conn_init, p); io_new_conn(p, p->stdin, plugin_conn_init, p); + plugin_request_send(p, "init", "[]", plugin_init_cb, p); } + io_loop(NULL, NULL); } void json_add_opt_plugins(struct json_stream *response,