Browse Source

libplugin: use ccan/io for lightningd connections

Now we have streams and a global object, we can use them for io_plans.
This makes the logic closer from lightningd/plugin (it has been mostly
stolen from here), and simplifies the main.
This also allows plugins to use io_plans (thanks to the io_loop) for
asynchronous connections.

This commit only handle incoming commands.
travis-debug
darosior 5 years ago
committed by Rusty Russell
parent
commit
42cd23092c
  1. 412
      plugins/libplugin.c

412
plugins/libplugin.c

@ -1,6 +1,7 @@
#include <bitcoin/chainparams.h>
#include <ccan/err/err.h>
#include <ccan/intmap/intmap.h>
#include <ccan/io/io.h>
#include <ccan/json_out/json_out.h>
#include <ccan/membuf/membuf.h>
#include <ccan/read_write_all/read_write_all.h>
@ -8,6 +9,7 @@
#include <ccan/tal/str/str.h>
#include <ccan/timer/timer.h>
#include <common/daemon.h>
#include <common/json_stream.h>
#include <common/utils.h>
#include <errno.h>
#include <poll.h>
@ -38,6 +40,17 @@ bool deprecated_apis;
extern const struct chainparams *chainparams;
struct plugin {
/* lightningd interaction */
struct io_conn *stdin_conn;
struct io_conn *stdout_conn;
/* To read from lightningd */
char *buffer;
size_t used, len_read;
/* To write to lightningd */
struct json_stream **js_arr;
enum plugin_restartability restartability;
const struct plugin_command *commands;
size_t num_commands;
@ -46,6 +59,14 @@ struct plugin {
const struct plugin_hook *hook_subs;
size_t num_hook_subs;
struct plugin_option *opts;
/* Anything special to do at init ? */
void (*init)(struct plugin_conn *,
const char *buf, const jsmntok_t *);
/* Has the manifest been sent already ? */
bool manifested;
/* Has init been received ? */
bool initialized;
};
struct plugin_timer {
@ -65,6 +86,7 @@ struct command {
u64 *id;
const char *methodname;
bool usage_only;
struct plugin *plugin;
};
struct out_req {
@ -148,43 +170,6 @@ static int read_json(struct plugin_conn *conn)
return end + 2 - membuf_elems(&conn->mb);
}
static struct command *read_json_request(const tal_t *ctx,
struct plugin_conn *conn,
struct plugin_conn *rpc,
const jsmntok_t **params,
int *reqlen)
{
const jsmntok_t *toks, *id, *method;
bool valid;
struct command *cmd = tal(ctx, struct command);
*reqlen = read_json(conn);
toks = json_parse_input(cmd, membuf_elems(&conn->mb), *reqlen, &valid);
if (!valid)
plugin_err("Malformed JSON input '%.*s'",
*reqlen, membuf_elems(&conn->mb));
if (toks[0].type != JSMN_OBJECT)
plugin_err("Malformed JSON command '%*.s' is not an object",
*reqlen, membuf_elems(&conn->mb));
method = json_get_member(membuf_elems(&conn->mb), toks, "method");
*params = json_get_member(membuf_elems(&conn->mb), toks, "params");
id = json_get_member(membuf_elems(&conn->mb), toks, "id");
if (id) {
cmd->id = tal(cmd, u64);
if (!json_to_u64(membuf_elems(&conn->mb), id, cmd->id))
plugin_err("JSON id '%*.s' is not a number",
id->end - id->start,
membuf_elems(&conn->mb) + id->start);
} else
cmd->id = NULL;
cmd->usage_only = false;
cmd->methodname = json_strdup(cmd, membuf_elems(&conn->mb), method);
return cmd;
}
/* This starts a JSON RPC message with boilerplate */
static struct json_out *start_json_rpc(const tal_t *ctx, u64 id)
{
@ -494,10 +479,10 @@ send_outreq_(struct command *cmd,
}
static struct command_result *
handle_getmanifest(struct command *getmanifest_cmd,
struct plugin *p)
handle_getmanifest(struct command *getmanifest_cmd)
{
struct json_out *params = json_out_new(tmpctx);
struct plugin *p = getmanifest_cmd->plugin;
json_out_start(params, NULL, '{');
json_out_start(params, "options", '[');
@ -542,18 +527,16 @@ handle_getmanifest(struct command *getmanifest_cmd,
return command_success(getmanifest_cmd, params);
}
static struct command_result *handle_init(struct command *init_cmd,
static struct command_result *handle_init(struct command *cmd,
const char *buf,
const jsmntok_t *params,
const struct plugin_option *opts,
void (*init)(struct plugin_conn *,
const char *buf, const jsmntok_t *))
const jsmntok_t *params)
{
const jsmntok_t *configtok, *rpctok, *dirtok, *opttok, *nettok, *t;
struct sockaddr_un addr;
size_t i;
char *dir, *network;
struct json_out *param_obj;
struct plugin *p = cmd->plugin;
configtok = json_delve(buf, params, ".configuration");
@ -591,24 +574,24 @@ static struct command_result *handle_init(struct command *init_cmd,
opttok = json_get_member(buf, params, "options");
json_for_each_obj(i, t, opttok) {
char *opt = json_strdup(NULL, buf, t);
for (size_t i = 0; i < tal_count(opts); i++) {
for (size_t i = 0; i < tal_count(p->opts); i++) {
char *problem;
if (!streq(opts[i].name, opt))
if (!streq(p->opts[i].name, opt))
continue;
problem = opts[i].handle(json_strdup(opt, buf, t+1),
opts[i].arg);
problem = p->opts[i].handle(json_strdup(opt, buf, t+1),
p->opts[i].arg);
if (problem)
plugin_err("option '%s': %s",
opts[i].name, problem);
p->opts[i].name, problem);
break;
}
tal_free(opt);
}
if (init)
init(&rpc_conn, buf, configtok);
if (p->init)
p->init(&rpc_conn, buf, configtok);
return command_success_str(init_cmd, NULL);
return command_success_str(cmd, NULL);
}
char *u64_option(const char *arg, u64 *i)
@ -631,47 +614,6 @@ char *charp_option(const char *arg, char **p)
return NULL;
}
static void handle_new_command(const tal_t *ctx,
struct plugin_conn *request_conn,
struct plugin_conn *rpc_conn,
struct plugin *p)
{
struct command *cmd;
const jsmntok_t *params;
int reqlen;
cmd = read_json_request(ctx, request_conn, rpc_conn, &params, &reqlen);
/* If this is a notification. */
if (!cmd->id) {
for (size_t i = 0; i < p->num_notif_subs; i++) {
if (streq(cmd->methodname, p->notif_subs[i].name)) {
p->notif_subs[i].handle(cmd, membuf_elems(&request_conn->mb),
params);
membuf_consume(&request_conn->mb, reqlen);
}
}
return;
}
for (size_t i = 0; i < p->num_hook_subs; i++) {
if (streq(cmd->methodname, p->hook_subs[i].name)) {
p->hook_subs[i].handle(cmd, membuf_elems(&request_conn->mb),
params);
membuf_consume(&request_conn->mb, reqlen);
return;
}
}
for (size_t i = 0; i < p->num_commands; i++) {
if (streq(cmd->methodname, p->commands[i].name)) {
p->commands[i].handle(cmd, membuf_elems(&request_conn->mb),
params);
membuf_consume(&request_conn->mb, reqlen);
return;
}
}
plugin_err("Unknown command '%s'", cmd->methodname);
}
static void setup_command_usage(const struct plugin_command *commands,
size_t num_commands)
{
@ -757,7 +699,215 @@ void plugin_log(enum log_level l, const char *fmt, ...)
va_end(ap);
}
static void ld_command_handle(struct plugin *plugin,
struct command *cmd,
const jsmntok_t *toks)
{
const jsmntok_t *idtok, *methtok, *paramstok;
idtok = json_get_member(plugin->buffer, toks, "id");
methtok = json_get_member(plugin->buffer, toks, "method");
paramstok = json_get_member(plugin->buffer, toks, "params");
if (!methtok || !paramstok)
plugin_err("Malformed JSON-RPC notification missing "
"\"method\" or \"params\": %.*s",
json_tok_full_len(toks),
json_tok_full(plugin->buffer, toks));
cmd->plugin = plugin;
cmd->id = NULL;
cmd->usage_only = false;
cmd->methodname = json_strdup(cmd, plugin->buffer, methtok);
if (idtok) {
cmd->id = tal(cmd, u64);
if (!json_to_u64(plugin->buffer, idtok, cmd->id))
plugin_err("JSON id '%*.s' is not a number",
json_tok_full_len(idtok),
json_tok_full(plugin->buffer, idtok));
}
if (!plugin->manifested) {
if (streq(cmd->methodname, "getmanifest")) {
handle_getmanifest(cmd);
plugin->manifested = true;
return;
}
plugin_err("Did not receive 'getmanifest' yet, but got '%s'"
" instead", cmd->methodname);
}
if (!plugin->initialized) {
if (streq(cmd->methodname, "init")) {
handle_init(cmd, plugin->buffer, paramstok);
plugin->initialized = true;
return;
}
plugin_err("Did not receive 'init' yet, but got '%s'"
" instead", cmd->methodname);
}
/* If that's a notification. */
if (!cmd->id) {
for (size_t i = 0; i < plugin->num_notif_subs; i++) {
if (streq(cmd->methodname,
plugin->notif_subs[i].name)) {
plugin->notif_subs[i].handle(cmd,
plugin->buffer,
paramstok);
return;
}
}
plugin_err("Unregistered notification %.*s",
json_tok_full_len(methtok),
json_tok_full(plugin->buffer, methtok));
}
for (size_t i = 0; i < plugin->num_hook_subs; i++) {
if (streq(cmd->methodname, plugin->hook_subs[i].name)) {
plugin->hook_subs[i].handle(cmd,
plugin->buffer,
paramstok);
return;
}
}
for (size_t i = 0; i < plugin->num_commands; i++) {
if (streq(cmd->methodname, plugin->commands[i].name)) {
plugin->commands[i].handle(cmd,
plugin->buffer,
paramstok);
return;
}
}
plugin_err("Unknown command '%s'", cmd->methodname);
}
/**
* Try to parse a complete message from lightningd's buffer, and return true
* if we could handle it.
*/
static bool ld_read_json_one(struct plugin *plugin)
{
bool valid;
const jsmntok_t *toks, *jrtok;
struct command *cmd = tal(plugin, struct command);
/* FIXME: This could be done more efficiently by storing the
* toks and doing an incremental parse, like lightning-cli
* does. */
toks = json_parse_input(NULL, plugin->buffer, plugin->used,
&valid);
if (!toks) {
if (!valid) {
plugin_err("Failed to parse JSON response '%.*s'",
(int)plugin->used, plugin->buffer);
return false;
}
/* We need more. */
return false;
}
/* Empty buffer? (eg. just whitespace). */
if (tal_count(toks) == 1) {
plugin->used = 0;
return false;
}
jrtok = json_get_member(plugin->buffer, toks, "jsonrpc");
if (!jrtok) {
plugin_err("JSON-RPC message does not contain \"jsonrpc\" field");
return false;
}
ld_command_handle(plugin, cmd, toks);
/* Move this object out of the buffer */
memmove(plugin->buffer, plugin->buffer + toks[0].end,
tal_count(plugin->buffer) - toks[0].end);
plugin->used -= toks[0].end;
tal_free(toks);
return true;
}
static struct io_plan *ld_read_json(struct io_conn *conn,
struct plugin *plugin)
{
plugin->used += plugin->len_read;
if (plugin->used && plugin->used == tal_count(plugin->buffer))
tal_resize(&plugin->buffer, plugin->used * 2);
/* Read and process all messages from the connection */
while (ld_read_json_one(plugin))
;
/* Now read more from the connection */
return io_read_partial(plugin->stdin_conn,
plugin->buffer + plugin->used,
tal_count(plugin->buffer) - plugin->used,
&plugin->len_read, ld_read_json, plugin);
}
static struct io_plan *ld_write_json(struct io_conn *conn,
struct plugin *plugin);
static struct io_plan *
ld_stream_complete(struct io_conn *conn, struct json_stream *js,
struct plugin *plugin)
{
assert(tal_count(plugin->js_arr) > 0);
/* Remove js and shift all remainig over */
tal_arr_remove(&plugin->js_arr, 0);
/* It got dropped off the queue, free it. */
tal_free(js);
return ld_write_json(conn, plugin);
}
static struct io_plan *ld_write_json(struct io_conn *conn,
struct plugin *plugin)
{
if (tal_count(plugin->js_arr) > 0)
return json_stream_output(plugin->js_arr[0], plugin->stdout_conn,
ld_stream_complete, plugin);
return io_out_wait(conn, plugin, ld_write_json, plugin);
}
static void ld_conn_finish(struct io_conn *conn, struct plugin *plugin)
{
/* Without one of the conns there is no reason to stay alive. That
* certainly means lightningd died, since there is no cleaner way
* to stop, return 0. */
exit(0);
}
/* lightningd writes on our stdin */
static struct io_plan *stdin_conn_init(struct io_conn *conn,
struct plugin *plugin)
{
plugin->stdin_conn = conn;
io_set_finish(conn, ld_conn_finish, plugin);
return io_read_partial(plugin->stdin_conn, plugin->buffer,
tal_bytelen(plugin->buffer), &plugin->len_read,
ld_read_json, plugin);
}
/* lightningd reads from our stdout */
static struct io_plan *stdout_conn_init(struct io_conn *conn,
struct plugin *plugin)
{
plugin->stdout_conn = conn;
io_set_finish(conn, ld_conn_finish, plugin);
return io_wait(plugin->stdout_conn, plugin, ld_write_json, plugin);
}
static struct plugin *new_plugin(const tal_t *ctx,
void (*init)(struct plugin_conn *rpc,
const char *buf, const jsmntok_t *),
const enum plugin_restartability restartability,
const struct plugin_command *commands,
size_t num_commands,
@ -770,7 +920,15 @@ static struct plugin *new_plugin(const tal_t *ctx,
const char *optname;
struct plugin *p = tal(ctx, struct plugin);
p->buffer = tal_arr(p, char, 64);
p->js_arr = tal_arr(p, struct json_stream *, 0);
p->used = 0;
p->len_read = 0;
p->init = init;
p->manifested = p->initialized = false;
p->restartability = restartability;
p->commands = commands;
p->num_commands = num_commands;
p->notif_subs = notif_subs;
@ -805,11 +963,6 @@ void plugin_main(char *argv[],
...)
{
struct plugin *plugin;
struct plugin_conn request_conn;
struct command *cmd;
const jsmntok_t *params;
int reqlen;
struct pollfd fds[2];
va_list ap;
setup_locale();
@ -822,7 +975,7 @@ void plugin_main(char *argv[],
setup_command_usage(commands, num_commands);
va_start(ap, num_hook_subs);
plugin = new_plugin(NULL, restartability, commands, num_commands,
plugin = new_plugin(NULL, init, restartability, commands, num_commands,
notif_subs, num_notif_subs, hook_subs,
num_hook_subs, ap);
va_end(ap);
@ -831,73 +984,24 @@ void plugin_main(char *argv[],
membuf_init(&rpc_conn.mb,
tal_arr(plugin, char, READ_CHUNKSIZE), READ_CHUNKSIZE,
membuf_tal_realloc);
request_conn.fd = STDIN_FILENO;
membuf_init(&request_conn.mb,
tal_arr(plugin, char, READ_CHUNKSIZE), READ_CHUNKSIZE,
membuf_tal_realloc);
uintmap_init(&out_reqs);
cmd = read_json_request(tmpctx, &request_conn, NULL,
&params, &reqlen);
if (!streq(cmd->methodname, "getmanifest"))
plugin_err("Expected getmanifest not %s", cmd->methodname);
membuf_consume(&request_conn.mb, reqlen);
handle_getmanifest(cmd, plugin);
cmd = read_json_request(tmpctx, &request_conn, &rpc_conn,
&params, &reqlen);
if (!streq(cmd->methodname, "init"))
plugin_err("Expected init not %s", cmd->methodname);
handle_init(cmd, membuf_elems(&request_conn.mb),
params, plugin->opts, init);
membuf_consume(&request_conn.mb, reqlen);
/* Set up fds for poll. */
fds[0].fd = STDIN_FILENO;
fds[0].events = POLLIN;
fds[1].fd = rpc_conn.fd;
fds[1].events = POLLIN;
io_new_conn(plugin, STDIN_FILENO, stdin_conn_init, plugin);
io_new_conn(plugin, STDOUT_FILENO, stdout_conn_init, plugin);
for (;;) {
struct timer *expired;
struct timemono now, first;
int t;
struct timer *expired = NULL;
clean_tmpctx();
/* If we already have some input, process now. */
if (membuf_num_elems(&request_conn.mb) != 0) {
handle_new_command(plugin, &request_conn, &rpc_conn, plugin);
continue;
}
if (membuf_num_elems(&rpc_conn.mb) != 0) {
handle_rpc_reply(&rpc_conn);
continue;
}
/* Handle any timeouts */
now = time_mono();
expired = timers_expire(&timers, now);
if (expired) {
call_plugin_timer(&rpc_conn, expired);
continue;
}
/* If we have a pending timer, timeout then */
if (timer_earliest(&timers, &first))
t = time_to_msec(timemono_between(first, now));
else
t = -1;
/* Otherwise, we poll. */
poll(fds, 2, t);
if (fds[0].revents & POLLIN)
handle_new_command(plugin, &request_conn, &rpc_conn, plugin);
if (fds[1].revents & POLLIN)
handle_rpc_reply(&rpc_conn);
/* Will only exit if a timer has expired. */
io_loop(&timers, &expired);
call_plugin_timer(&rpc_conn, expired);
}
tal_free(plugin);

Loading…
Cancel
Save