From c765a223f125f44851aa965f88ee37afeee799cf Mon Sep 17 00:00:00 2001 From: darosior Date: Sun, 26 Jan 2020 23:54:33 +0100 Subject: [PATCH] libplugin: use ccan/io for async Rpc requests --- plugins/libplugin.c | 201 +++++++++++++++++++++++++++++++++++--------- plugins/libplugin.h | 14 ++- 2 files changed, 176 insertions(+), 39 deletions(-) diff --git a/plugins/libplugin.c b/plugins/libplugin.c index 03726f5b7..f00ab7603 100644 --- a/plugins/libplugin.c +++ b/plugins/libplugin.c @@ -1,6 +1,5 @@ #include #include -#include #include #include #include @@ -20,10 +19,6 @@ #define READ_CHUNKSIZE 4096 -/* Tracking requests */ -static UINTMAP(struct out_req *) out_reqs; -static u64 next_outreq_id; - const struct chainparams *chainparams; struct plugin_timer { @@ -70,6 +65,13 @@ static void ld_send(struct plugin *plugin, struct json_stream *stream) io_wake(plugin); } +static void ld_rpc_send(struct plugin *plugin, struct json_stream *stream) +{ + tal_steal(plugin->rpc_js_arr, stream); + tal_arr_expand(&plugin->rpc_js_arr, stream); + io_wake(plugin->io_rpc_conn); +} + /* FIXME: Move lightningd/jsonrpc to common/ ? */ static void jsonrpc_finish_and_send(struct plugin *p, struct json_stream *js) @@ -381,42 +383,46 @@ const char *rpc_delve(const tal_t *ctx, return ret; } -static void handle_rpc_reply(struct plugin *plugin) +static void handle_rpc_reply(struct plugin *plugin, const jsmntok_t *toks) { - int reqlen; - const jsmntok_t *toks, *contents, *t; + const jsmntok_t *idtok, *contenttok; struct out_req *out; struct command_result *res; u64 id; - bool error; - toks = read_rpc_reply(tmpctx, plugin, &contents, &error, &reqlen); - - t = json_get_member(membuf_elems(&plugin->rpc_conn.mb), toks, "id"); - if (!t) + idtok = json_get_member(plugin->rpc_buffer, toks, "id"); + if (!idtok) plugin_err(plugin, "JSON reply without id '%.*s'", - reqlen, membuf_elems(&plugin->rpc_conn.mb)); - if (!json_to_u64(membuf_elems(&plugin->rpc_conn.mb), t, &id)) + json_tok_full_len(toks), + json_tok_full(plugin->rpc_buffer, toks)); + if (!json_to_u64(plugin->rpc_buffer, idtok, &id)) plugin_err(plugin, "JSON reply without numeric id '%.*s'", - reqlen, membuf_elems(&plugin->rpc_conn.mb)); - out = uintmap_get(&out_reqs, id); + json_tok_full_len(toks), + json_tok_full(plugin->rpc_buffer, toks)); + out = uintmap_get(&plugin->out_reqs, id); if (!out) plugin_err(plugin, "JSON reply with unknown id '%.*s' (%"PRIu64")", - reqlen, membuf_elems(&plugin->rpc_conn.mb), id); + json_tok_full_len(toks), + json_tok_full(plugin->rpc_buffer, toks), id); /* We want to free this if callback doesn't. */ tal_steal(tmpctx, out); - uintmap_del(&out_reqs, out->id); + uintmap_del(&plugin->out_reqs, out->id); - if (error) - res = out->errcb(out->cmd, membuf_elems(&plugin->rpc_conn.mb), - contents, out->arg); - else - res = out->cb(out->cmd, membuf_elems(&plugin->rpc_conn.mb), - contents, out->arg); + contenttok = json_get_member(plugin->rpc_buffer, toks, "error"); + if (contenttok) + res = out->errcb(out->cmd, plugin->rpc_buffer, + contenttok, out->arg); + else { + contenttok = json_get_member(plugin->rpc_buffer, toks, "result"); + if (!contenttok) + plugin_err(plugin, "Bad JSONRPC, no 'error' nor 'result': '%.*s'", + json_tok_full_len(toks), + json_tok_full(plugin->rpc_buffer, toks)); + res = out->cb(out->cmd, plugin->rpc_buffer, contenttok, out->arg); + } assert(res == &pending || res == &complete); - membuf_consume(&plugin->rpc_conn.mb, reqlen); } struct command_result * @@ -433,19 +439,29 @@ send_outreq_(struct command *cmd, void *arg, const struct json_out *params TAKES) { - struct json_out *jout; + struct json_stream *js; struct out_req *out; out = tal(cmd, struct out_req); - out->id = next_outreq_id++; + out->id = cmd->plugin->next_outreq_id++; out->cmd = cmd; out->cb = cb; out->errcb = errcb; out->arg = arg; - uintmap_add(&out_reqs, out->id, out); + uintmap_add(&cmd->plugin->out_reqs, out->id, out); - jout = start_json_request(tmpctx, out->id, method, params); - finish_and_send_json(cmd->plugin->rpc_conn.fd, jout); + js = new_json_stream(NULL, cmd, NULL); + json_object_start(js, NULL); + json_add_string(js, "jsonrpc", "2.0"); + json_add_u64(js, "id", out->id); + json_add_string(js, "method", method); + json_out_add_splice(js->jout, "params", params); + json_object_compat_end(js); + json_stream_close(js, cmd); + ld_rpc_send(cmd->plugin, js); + + if (taken(params)) + tal_free(params); return &pending; } @@ -499,6 +515,112 @@ handle_getmanifest(struct command *getmanifest_cmd) return command_success(getmanifest_cmd, params); } +static void rpc_conn_finished(struct io_conn *conn, + struct plugin *plugin) +{ + plugin_err(plugin, "Lost connection to the RPC socket."); +} + +static bool rpc_read_response_one(struct plugin *plugin) +{ + bool valid; + const jsmntok_t *toks, *jrtok; + + /* FIXME: This could be done more efficiently by storing the + * toks and doing an incremental parse, like lightning-cli + * does. */ + toks = json_parse_input(NULL, plugin->rpc_buffer, plugin->rpc_used, + &valid); + if (!toks) { + if (!valid) { + plugin_err(plugin, "Failed to parse RPC JSON response '%.*s'", + (int)plugin->rpc_used, plugin->rpc_buffer); + return false; + } + /* We need more. */ + return false; + } + + /* Empty buffer? (eg. just whitespace). */ + if (tal_count(toks) == 1) { + plugin->rpc_used = 0; + return false; + } + + jrtok = json_get_member(plugin->rpc_buffer, toks, "jsonrpc"); + if (!jrtok) { + plugin_err(plugin, "JSON-RPC message does not contain \"jsonrpc\" field"); + return false; + } + + handle_rpc_reply(plugin, toks); + + /* Move this object out of the buffer */ + memmove(plugin->rpc_buffer, plugin->rpc_buffer + toks[0].end, + tal_count(plugin->rpc_buffer) - toks[0].end); + plugin->rpc_used -= toks[0].end; + tal_free(toks); + + return true; +} + +static struct io_plan *rpc_conn_read_response(struct io_conn *conn, + struct plugin *plugin) +{ + plugin->rpc_used += plugin->rpc_len_read; + if (plugin->rpc_used == tal_count(plugin->rpc_buffer)) + tal_resize(&plugin->rpc_buffer, plugin->rpc_used * 2); + + /* Read and process all messages from the connection */ + while (rpc_read_response_one(plugin)) + ; + + /* Read more, if there is. */ + return io_read_partial(plugin->io_rpc_conn, + plugin->rpc_buffer + plugin->rpc_used, + tal_bytelen(plugin->rpc_buffer) - plugin->rpc_used, + &plugin->rpc_len_read, + rpc_conn_read_response, plugin); +} + +static struct io_plan *rpc_conn_write_request(struct io_conn *conn, + struct plugin *plugin); + +static struct io_plan * +rpc_stream_complete(struct io_conn *conn, struct json_stream *js, + struct plugin *plugin) +{ + assert(tal_count(plugin->rpc_js_arr) > 0); + /* Remove js and shift all remaining over */ + tal_arr_remove(&plugin->rpc_js_arr, 0); + + /* It got dropped off the queue, free it. */ + tal_free(js); + + return rpc_conn_write_request(conn, plugin); +} + +static struct io_plan *rpc_conn_write_request(struct io_conn *conn, + struct plugin *plugin) +{ + if (tal_count(plugin->rpc_js_arr) > 0) + return json_stream_output(plugin->rpc_js_arr[0], conn, + rpc_stream_complete, plugin); + + return io_out_wait(conn, plugin->io_rpc_conn, + rpc_conn_write_request, plugin); +} + +static struct io_plan *rpc_conn_init(struct io_conn *conn, + struct plugin *plugin) +{ + plugin->io_rpc_conn = conn; + io_set_finish(conn, rpc_conn_finished, plugin); + return io_duplex(conn, + rpc_conn_read_response(conn, plugin), + rpc_conn_write_request(conn, plugin)); +} + static struct command_result *handle_init(struct command *cmd, const char *buf, const jsmntok_t *params) @@ -562,6 +684,8 @@ static struct command_result *handle_init(struct command *cmd, if (p->init) p->init(p, buf, configtok); + io_new_conn(p, p->rpc_conn.fd, rpc_conn_init, p); + return command_success_str(cmd, NULL); } @@ -895,7 +1019,14 @@ static struct plugin *new_plugin(const tal_t *ctx, p->js_arr = tal_arr(p, struct json_stream *, 0); p->used = 0; p->len_read = 0; - /* rpc. TODO: use ccan/io also for RPC */ + /* Async RPC */ + p->rpc_buffer = tal_arr(p, char, 64); + p->rpc_js_arr = tal_arr(p, struct json_stream *, 0); + p->rpc_used = 0; + p->rpc_len_read = 0; + p->next_outreq_id = 0; + uintmap_init(&p->out_reqs); + /* Sync RPC FIXME: maybe go full async ? */ membuf_init(&p->rpc_conn.mb, tal_arr(p, char, READ_CHUNKSIZE), READ_CHUNKSIZE, membuf_tal_realloc); @@ -954,7 +1085,6 @@ void plugin_main(char *argv[], notif_subs, num_notif_subs, hook_subs, num_hook_subs, ap); va_end(ap); - uintmap_init(&out_reqs); setup_command_usage(plugin); timers_init(&plugin->timers, time_mono()); @@ -967,11 +1097,6 @@ void plugin_main(char *argv[], clean_tmpctx(); - if (membuf_num_elems(&plugin->rpc_conn.mb) != 0) { - handle_rpc_reply(plugin); - continue; - } - /* Will only exit if a timer has expired. */ io_loop(&plugin->timers, &expired); call_plugin_timer(plugin, expired); diff --git a/plugins/libplugin.h b/plugins/libplugin.h index ed9eb4905..c8e74188f 100644 --- a/plugins/libplugin.h +++ b/plugins/libplugin.h @@ -3,7 +3,8 @@ #define LIGHTNING_PLUGINS_LIBPLUGIN_H #include "config.h" -#include +#include +#include #include #include #include @@ -39,8 +40,19 @@ struct plugin { /* To write to lightningd */ struct json_stream **js_arr; + /* Asynchronous RPC interaction */ + struct io_conn *io_rpc_conn; + struct json_stream **rpc_js_arr; + char *rpc_buffer; + size_t rpc_used, rpc_len_read; + /* Tracking async RPC requests */ + UINTMAP(struct out_req *) out_reqs; + u64 next_outreq_id; + + /* Synchronous RPC interaction */ struct rpc_conn rpc_conn; + /* Plugin informations */ enum plugin_restartability restartability; const struct plugin_command *commands; size_t num_commands;