You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
431 lines
12 KiB
431 lines
12 KiB
#include "lightningd/plugin.h"
|
|
|
|
#include <ccan/intmap/intmap.h>
|
|
#include <ccan/io/io.h>
|
|
#include <ccan/list/list.h>
|
|
#include <ccan/opt/opt.h>
|
|
#include <ccan/pipecmd/pipecmd.h>
|
|
#include <ccan/tal/str/str.h>
|
|
#include <lightningd/json.h>
|
|
#include <unistd.h>
|
|
|
|
struct plugin {
|
|
int stdin, stdout;
|
|
pid_t pid;
|
|
char *cmd;
|
|
struct io_conn *stdin_conn, *stdout_conn;
|
|
bool stop;
|
|
struct plugins *plugins;
|
|
|
|
/* Stuff we read */
|
|
char *buffer;
|
|
size_t used, len_read;
|
|
|
|
/* Stuff we write */
|
|
struct list_head output;
|
|
const char *outbuf;
|
|
|
|
struct log *log;
|
|
|
|
/* List of options that this plugin registered */
|
|
struct list_head plugin_opts;
|
|
};
|
|
|
|
struct plugin_request {
|
|
u64 id;
|
|
struct plugin *plugin;
|
|
|
|
/* Method to be called */
|
|
const char *method;
|
|
|
|
/* JSON encoded params, either a dict or an array */
|
|
const char *json_params;
|
|
const char *response;
|
|
const jsmntok_t *resulttok, *errortok, *toks;
|
|
|
|
/* The response handler to be called on success or error */
|
|
void (*cb)(const struct plugin_request *, void *);
|
|
void *arg;
|
|
};
|
|
|
|
struct plugins {
|
|
struct plugin **plugins;
|
|
size_t pending_init;
|
|
|
|
/* Currently pending requests by their request ID */
|
|
UINTMAP(struct plugin_request *) pending_requests;
|
|
struct log *log;
|
|
};
|
|
|
|
struct json_output {
|
|
struct list_node list;
|
|
const char *json;
|
|
};
|
|
|
|
/* Simple storage for plugin options inbetween registering them on the
|
|
* command line and passing them off to the plugin */
|
|
struct plugin_opt {
|
|
struct list_node list;
|
|
const char *name;
|
|
const char *description;
|
|
char *value;
|
|
};
|
|
|
|
struct plugins *plugins_new(const tal_t *ctx, struct log *log){
|
|
struct plugins *p;
|
|
p = tal(ctx, struct plugins);
|
|
p->plugins = tal_arr(p, struct plugin *, 0);
|
|
p->log = log;
|
|
return p;
|
|
}
|
|
|
|
void plugin_register(struct plugins *plugins, const char* path TAKES)
|
|
{
|
|
struct plugin *p;
|
|
size_t n = tal_count(plugins->plugins);
|
|
tal_resize(&plugins->plugins, n+1);
|
|
p = tal(plugins, struct plugin);
|
|
plugins->plugins[n] = p;
|
|
p->plugins = plugins;
|
|
p->cmd = tal_strdup(p, path);
|
|
p->log = plugins->log;
|
|
list_head_init(&p->plugin_opts);
|
|
}
|
|
|
|
/**
|
|
* Kill a plugin process, with an error message.
|
|
*/
|
|
static void plugin_kill(struct plugin *plugin, char *msg)
|
|
{
|
|
log_broken(plugin->log, "Killing plugin: %s", msg);
|
|
plugin->stop = true;
|
|
io_wake(plugin);
|
|
kill(plugin->pid, SIGKILL);
|
|
}
|
|
|
|
/**
|
|
* Try to parse a complete message from the plugin's buffer.
|
|
*
|
|
* Internally calls the handler if it was able to fully parse a JSON message,
|
|
* and returns true in that case.
|
|
*/
|
|
static bool plugin_read_json_one(struct plugin *plugin)
|
|
{
|
|
jsmntok_t *toks;
|
|
bool valid;
|
|
u64 id;
|
|
const jsmntok_t *idtok, *resulttok, *errortok;
|
|
struct plugin_request *request;
|
|
|
|
/* FIXME: This could be done more efficiently by storing the
|
|
* toks and doing an incremental parse, like lightning-cli
|
|
* does. */
|
|
toks = json_parse_input(plugin->buffer, plugin->used, &valid);
|
|
if (!toks) {
|
|
if (!valid) {
|
|
plugin_kill(plugin, "Failed to parse JSON response");
|
|
return false;
|
|
}
|
|
/* We need more. */
|
|
return false;
|
|
}
|
|
|
|
/* Empty buffer? (eg. just whitespace). */
|
|
if (tal_count(toks) == 1) {
|
|
plugin->used = 0;
|
|
return false;
|
|
}
|
|
|
|
resulttok = json_get_member(plugin->buffer, toks, "result");
|
|
errortok = json_get_member(plugin->buffer, toks, "error");
|
|
idtok = json_get_member(plugin->buffer, toks, "id");
|
|
|
|
if (!idtok) {
|
|
plugin_kill(plugin, "JSON-RPC response does not contain an \"id\"-field");
|
|
return false;
|
|
} else if (!resulttok && !errortok) {
|
|
plugin_kill(plugin, "JSON-RPC response does not contain a \"result\" or \"error\" field");
|
|
return false;
|
|
}
|
|
|
|
/* We only send u64 ids, so if this fails it's a critical error */
|
|
if (!json_to_u64(plugin->buffer, idtok, &id)) {
|
|
plugin_kill(plugin, "JSON-RPC response \"id\"-field is not a u64");
|
|
return false;
|
|
}
|
|
|
|
request = uintmap_get(&plugin->plugins->pending_requests, id);
|
|
|
|
if (!request) {
|
|
plugin_kill(plugin, "Received a JSON-RPC response for non-existent request");
|
|
return false;
|
|
}
|
|
|
|
/* We expect the request->cb to copy if needed */
|
|
request->response = plugin->buffer;
|
|
request->errortok = errortok;
|
|
request->resulttok = resulttok;
|
|
request->toks = toks;
|
|
request->cb(request, request->arg);
|
|
|
|
/* Move this object out of the buffer */
|
|
memmove(plugin->buffer, plugin->buffer + toks[0].end,
|
|
tal_count(plugin->buffer) - toks[0].end);
|
|
plugin->used -= toks[0].end;
|
|
tal_free(toks);
|
|
return true;
|
|
}
|
|
|
|
static struct io_plan *plugin_read_json(struct io_conn *conn UNUSED,
|
|
struct plugin *plugin)
|
|
{
|
|
bool success;
|
|
plugin->used += plugin->len_read;
|
|
if (plugin->used == tal_count(plugin->buffer))
|
|
tal_resize(&plugin->buffer, plugin->used * 2);
|
|
|
|
/* Read and process all messages from the connection */
|
|
do {
|
|
success = plugin_read_json_one(plugin);
|
|
} while (success);
|
|
|
|
if (plugin->stop)
|
|
return io_close(plugin->stdout_conn);
|
|
|
|
/* Now read more from the connection */
|
|
return io_read_partial(plugin->stdout_conn,
|
|
plugin->buffer + plugin->used,
|
|
tal_count(plugin->buffer) - plugin->used,
|
|
&plugin->len_read, plugin_read_json, plugin);
|
|
}
|
|
|
|
static struct io_plan *plugin_write_json(struct io_conn *conn UNUSED,
|
|
struct plugin *plugin)
|
|
{
|
|
struct json_output *out;
|
|
out = list_pop(&plugin->output, struct json_output, list);
|
|
if (!out) {
|
|
if (plugin->stop) {
|
|
return io_close(conn);
|
|
} else {
|
|
return io_out_wait(plugin->stdin_conn, plugin,
|
|
plugin_write_json, plugin);
|
|
}
|
|
}
|
|
|
|
/* We have a message we'd like to send */
|
|
plugin->outbuf = tal_steal(plugin, out->json);
|
|
tal_free(out);
|
|
return io_write(conn, plugin->outbuf, strlen(plugin->outbuf),
|
|
plugin_write_json, plugin);
|
|
}
|
|
|
|
static void plugin_request_send_(
|
|
struct plugin *plugin, const char *method TAKES, const char *params TAKES,
|
|
void (*cb)(const struct plugin_request *, void *), void *arg)
|
|
{
|
|
static u64 next_request_id = 0;
|
|
struct plugin_request *req = tal(plugin, struct plugin_request);
|
|
struct json_output *out = tal(plugin, struct json_output);
|
|
u64 request_id = next_request_id++;
|
|
|
|
req->id = request_id;
|
|
req->method = tal_strdup(req, method);
|
|
req->json_params = tal_strdup(req, params);
|
|
req->cb = cb;
|
|
req->arg = arg;
|
|
req->plugin = plugin;
|
|
|
|
/* Add to map so we can find it later when routing the response */
|
|
uintmap_add(&plugin->plugins->pending_requests, req->id, req);
|
|
|
|
/* Wrap the request in the JSON-RPC request object. Terminate
|
|
* with an empty line that serves as a hint that the JSON
|
|
* object is done. */
|
|
out->json = tal_fmt(out, "{"
|
|
"\"jsonrpc\": \"2.0\", "
|
|
"\"method\": \"%s\", "
|
|
"\"params\" : %s, "
|
|
"\"id\" : %" PRIu64 " }\n\n",
|
|
method, params, request_id);
|
|
|
|
/* Queue and notify the writer */
|
|
list_add_tail(&plugin->output, &out->list);
|
|
io_wake(plugin);
|
|
}
|
|
|
|
#define plugin_request_send(plugin, method, params, cb, arg) \
|
|
plugin_request_send_( \
|
|
(plugin), (method), (params), \
|
|
typesafe_cb_preargs(void, void *, (cb), (arg), \
|
|
const struct plugin_request *), \
|
|
(arg))
|
|
|
|
static struct io_plan *plugin_conn_init(struct io_conn *conn,
|
|
struct plugin *plugin)
|
|
{
|
|
plugin->stop = false;
|
|
if (plugin->stdout == io_conn_fd(conn)) {
|
|
/* We read from their stdout */
|
|
plugin->stdout_conn = conn;
|
|
return io_read_partial(plugin->stdout_conn, plugin->buffer,
|
|
tal_bytelen(plugin->buffer),
|
|
&plugin->len_read, plugin_read_json,
|
|
plugin);
|
|
} else {
|
|
/* We write to their stdin */
|
|
plugin->stdin_conn = conn;
|
|
/* We don't have anything queued yet, wait for notification */
|
|
return io_wait(plugin->stdin_conn, plugin, plugin_write_json,
|
|
plugin);
|
|
}
|
|
}
|
|
|
|
/* Callback called when parsing options. It just stores the value in
|
|
* the plugin_opt */
|
|
static char *plugin_opt_set(const char *arg, struct plugin_opt *popt)
|
|
{
|
|
popt->value = tal_strdup(popt, arg);
|
|
return NULL;
|
|
}
|
|
|
|
/* Add a single plugin option to the plugin as well as registering it with the
|
|
* command line options. */
|
|
static bool plugin_opt_add(struct plugin *plugin, const char *buffer,
|
|
const jsmntok_t *opt)
|
|
{
|
|
const jsmntok_t *nametok, *typetok, *defaulttok, *desctok;
|
|
struct plugin_opt *popt;
|
|
nametok = json_get_member(buffer, opt, "name");
|
|
typetok = json_get_member(buffer, opt, "type");
|
|
desctok = json_get_member(buffer, opt, "description");
|
|
defaulttok = json_get_member(buffer, opt, "default");
|
|
|
|
if (!typetok || !nametok || !desctok) {
|
|
plugin_kill(plugin,
|
|
"An option is missing either \"name\", \"description\" or \"type\"");
|
|
return false;
|
|
}
|
|
|
|
/* FIXME(cdecker) Support numeric and boolean options as well */
|
|
if (!json_tok_streq(buffer, typetok, "string")) {
|
|
plugin_kill(plugin,
|
|
"Only \"string\" options currently supported");
|
|
return false;
|
|
}
|
|
|
|
popt = tal(plugin, struct plugin_opt);
|
|
|
|
popt->name = tal_fmt(plugin, "--%.*s", nametok->end - nametok->start,
|
|
buffer + nametok->start);
|
|
popt->value = NULL;
|
|
if (defaulttok) {
|
|
popt->value = tal_strndup(plugin, buffer + defaulttok->start,
|
|
defaulttok->end - defaulttok->start);
|
|
popt->description = tal_fmt(
|
|
plugin, "%.*s (default: %s)", desctok->end - desctok->start,
|
|
buffer + desctok->start, popt->value);
|
|
} else {
|
|
popt->description = tal_strndup(plugin, buffer + desctok->start,
|
|
desctok->end - desctok->start);
|
|
}
|
|
|
|
list_add_tail(&plugin->plugin_opts, &popt->list);
|
|
|
|
opt_register_arg(popt->name, plugin_opt_set, NULL, popt,
|
|
popt->description);
|
|
return true;
|
|
}
|
|
|
|
/* Iterate through the options in the init response, and add them to
|
|
* the plugin and the command line options */
|
|
static bool plugin_opts_add(const struct plugin_request *req)
|
|
{
|
|
const char *buffer = req->plugin->buffer;
|
|
const jsmntok_t *cur, *options;
|
|
|
|
/* This is the parent for all elements in the "options" array */
|
|
int optpos;
|
|
options =
|
|
json_get_member(req->plugin->buffer, req->resulttok, "options");
|
|
if (!options)
|
|
return false;
|
|
|
|
optpos = options - req->toks;
|
|
|
|
if (options->type != JSMN_ARRAY) {
|
|
plugin_kill(req->plugin, "\"result.options\" is not an array");
|
|
return false;
|
|
}
|
|
|
|
for (cur = options + 1; cur->parent == optpos; cur = json_next(cur))
|
|
if (!plugin_opt_add(req->plugin, buffer, cur))
|
|
return false;
|
|
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Callback for the plugin_init request.
|
|
*/
|
|
static void plugin_init_cb(const struct plugin_request *req, struct plugin *plugin)
|
|
{
|
|
/* Check if all plugins are initialized, and break if they are */
|
|
plugin->plugins->pending_init--;
|
|
if (plugin->plugins->pending_init == 0)
|
|
io_break(plugin->plugins);
|
|
|
|
if (req->resulttok->type != JSMN_OBJECT) {
|
|
plugin_kill(plugin, "\"init\" response is not an object");
|
|
return;
|
|
}
|
|
|
|
if (!plugin_opts_add(req))
|
|
return;
|
|
}
|
|
|
|
void plugins_init(struct plugins *plugins)
|
|
{
|
|
struct plugin *p;
|
|
char **cmd;
|
|
plugins->pending_init = tal_count(plugins->plugins);
|
|
uintmap_init(&plugins->pending_requests);
|
|
|
|
/* Spawn the plugin processes before entering the io_loop */
|
|
for (size_t i=0; i<tal_count(plugins->plugins); i++) {
|
|
p = plugins->plugins[i];
|
|
cmd = tal_arr(p, char *, 2);
|
|
cmd[0] = p->cmd;
|
|
cmd[1] = NULL;
|
|
p->pid = pipecmdarr(&p->stdout, &p->stdin, NULL, cmd);
|
|
|
|
list_head_init(&p->output);
|
|
p->buffer = tal_arr(p, char, 64);
|
|
p->used = 0;
|
|
|
|
/* Create two connections, one read-only on top of p->stdin, and one
|
|
* write-only on p->stdout */
|
|
io_new_conn(p, p->stdout, plugin_conn_init, p);
|
|
io_new_conn(p, p->stdin, plugin_conn_init, p);
|
|
plugin_request_send(p, "init", "[]", plugin_init_cb, p);
|
|
}
|
|
if (plugins->pending_init > 0)
|
|
io_loop(NULL, NULL);
|
|
}
|
|
|
|
void plugins_config(struct plugins *plugins)
|
|
{
|
|
}
|
|
|
|
void json_add_opt_plugins(struct json_stream *response,
|
|
const struct plugins *plugins)
|
|
{
|
|
struct plugin *p;
|
|
json_object_start(response, "plugin");
|
|
for (size_t i=0; i<tal_count(plugins->plugins); i++) {
|
|
p = plugins->plugins[i];
|
|
json_object_start(response, p->cmd);
|
|
json_object_end(response);
|
|
}
|
|
json_object_end(response);
|
|
}
|
|
|