Browse Source

plugin: Add request muxing to the plugin subsystem

plugin-1
Christian Decker 6 years ago
parent
commit
07934e77bd
No known key found for this signature in database GPG Key ID: 1416D83DC4F0E86D
  1. 111
      lightningd/plugin.c

111
lightningd/plugin.c

@ -1,9 +1,11 @@
#include "lightningd/plugin.h" #include "lightningd/plugin.h"
#include <ccan/intmap/intmap.h>
#include <ccan/io/io.h> #include <ccan/io/io.h>
#include <ccan/list/list.h> #include <ccan/list/list.h>
#include <ccan/pipecmd/pipecmd.h> #include <ccan/pipecmd/pipecmd.h>
#include <ccan/tal/str/str.h> #include <ccan/tal/str/str.h>
#include <ccan/typesafe_cb/typesafe_cb.h>
#include <lightningd/json.h> #include <lightningd/json.h>
#include <unistd.h> #include <unistd.h>
@ -13,6 +15,7 @@ struct plugin {
char *cmd; char *cmd;
struct io_conn *stdin_conn, *stdout_conn; struct io_conn *stdin_conn, *stdout_conn;
bool stop; bool stop;
struct plugins *plugins;
/* Stuff we read */ /* Stuff we read */
char *buffer; char *buffer;
@ -23,8 +26,27 @@ struct plugin {
const char *outbuf; const char *outbuf;
}; };
struct plugin_request {
u64 id;
/* Method to be called */
const char *method;
/* JSON encoded params, either a dict or an array */
const char *json_params;
const char *response;
const jsmntok_t *resulttok, *errortok;
/* The response handler to be called on success or error */
void (*cb)(const struct plugin_request *, void *);
void *arg;
};
struct plugins { struct plugins {
struct plugin *plugins; struct plugin *plugins;
int pending_init;
/* Currently pending requests by their request ID */
UINTMAP(struct plugin_request *) pending_requests;
}; };
struct json_output { struct json_output {
@ -45,6 +67,7 @@ void plugin_register(struct plugins *plugins, const char* path TAKES)
size_t n = tal_count(plugins->plugins); size_t n = tal_count(plugins->plugins);
tal_resize(&plugins->plugins, n+1); tal_resize(&plugins->plugins, n+1);
p = &plugins->plugins[n]; p = &plugins->plugins[n];
p->plugins = plugins;
p->cmd = tal_strdup(p, path); p->cmd = tal_strdup(p, path);
} }
@ -58,6 +81,11 @@ static bool plugin_read_json_one(struct plugin *plugin)
{ {
jsmntok_t *toks; jsmntok_t *toks;
bool valid; bool valid;
u64 id;
const jsmntok_t *idtok, *resulttok, *errortok;
struct plugin_request *request;
toks = json_parse_input(plugin->buffer, plugin->used, &valid); toks = json_parse_input(plugin->buffer, plugin->used, &valid);
if (!toks) { if (!toks) {
if (!valid) { if (!valid) {
@ -74,7 +102,35 @@ static bool plugin_read_json_one(struct plugin *plugin)
return false; return false;
} }
/* FIXME(cdecker) Call dispatch to handle this message. */ resulttok = json_get_member(plugin->buffer, toks, "result");
errortok = json_get_member(plugin->buffer, toks, "error");
idtok = json_get_member(plugin->buffer, toks, "id");
/* FIXME(cdecker) Kill the plugin if either of these fails */
if (!idtok) {
return false;
} else if (!resulttok && !errortok) {
return false;
}
/* We only send u64 ids, so if this fails it's a critical error */
if (!json_to_u64(plugin->buffer, idtok, &id)) {
/* FIXME (cdecker) Log an error message and kill the plugin */
return false;
}
request = uintmap_get(&plugin->plugins->pending_requests, id);
if (!request) {
/* FIXME(cdecker) Log an error and kill the plugin */
return false;
}
/* We expect the request->cb to copy if needed */
request->response = plugin->buffer;
request->errortok = errortok;
request->resulttok = resulttok;
request->cb(request, request->arg);
/* Move this object out of the buffer */ /* Move this object out of the buffer */
memmove(plugin->buffer, plugin->buffer + toks[0].end, memmove(plugin->buffer, plugin->buffer + toks[0].end,
@ -125,6 +181,44 @@ static struct io_plan *plugin_write_json(struct io_conn *conn UNUSED,
plugin_write_json, plugin); plugin_write_json, plugin);
} }
static void plugin_request_send_(
struct plugin *plugin, const char *method TAKES, const char *params TAKES,
void (*cb)(const struct plugin_request *, void *), void *arg)
{
static u64 next_request_id = 0;
struct plugin_request *req = tal(plugin, struct plugin_request);
struct json_output *out = tal(plugin, struct json_output);
u64 request_id = next_request_id++;
req->id = request_id;
req->method = tal_strdup(req, method);
req->json_params = tal_strdup(req, params);
req->cb = cb;
req->arg = arg;
/* Add to map so we can find it later when routing the response */
uintmap_add(&plugin->plugins->pending_requests, req->id, req);
/* Wrap the request in the JSON-RPC request object */
out->json = tal_fmt(out, "{"
"\"jsonrpc\": \"2.0\", "
"\"method\": \"%s\", "
"\"params\" : %s, "
"\"id\" : %" PRIu64 " }\n",
method, params, request_id);
/* Queue and notify the writer */
list_add_tail(&plugin->output, &out->list);
io_wake(plugin);
}
#define plugin_request_send(plugin, method, params, cb, arg) \
plugin_request_send_( \
(plugin), (method), (params), \
typesafe_cb_preargs(void, void *, (cb), (arg), \
const struct plugin_request *), \
(arg))
static struct io_plan *plugin_conn_init(struct io_conn *conn, static struct io_plan *plugin_conn_init(struct io_conn *conn,
struct plugin *plugin) struct plugin *plugin)
{ {
@ -145,10 +239,23 @@ static struct io_plan *plugin_conn_init(struct io_conn *conn,
} }
} }
/**
* Callback for the plugin_init request.
*/
static void plugin_init_cb(const struct plugin_request *req, struct plugin *plugin)
{
/* Check if all plugins are initialized, and break if they are */
plugin->plugins->pending_init--;
if (plugin->plugins->pending_init == 0)
io_break(plugin->plugins);
}
void plugins_init(struct plugins *plugins) void plugins_init(struct plugins *plugins)
{ {
struct plugin *p; struct plugin *p;
char **cmd; char **cmd;
plugins->pending_init = tal_count(plugins->plugins);
/* Spawn the plugin processes before entering the io_loop */ /* Spawn the plugin processes before entering the io_loop */
for (size_t i=0; i<tal_count(plugins->plugins); i++) { for (size_t i=0; i<tal_count(plugins->plugins); i++) {
p = &plugins->plugins[i]; p = &plugins->plugins[i];
@ -165,7 +272,9 @@ void plugins_init(struct plugins *plugins)
* write-only on p->stdout */ * write-only on p->stdout */
io_new_conn(p, p->stdout, plugin_conn_init, p); io_new_conn(p, p->stdout, plugin_conn_init, p);
io_new_conn(p, p->stdin, plugin_conn_init, p); io_new_conn(p, p->stdin, plugin_conn_init, p);
plugin_request_send(p, "init", "[]", plugin_init_cb, p);
} }
io_loop(NULL, NULL);
} }
void json_add_opt_plugins(struct json_stream *response, void json_add_opt_plugins(struct json_stream *response,

Loading…
Cancel
Save