Browse Source

libplugin: use ccan/io for async Rpc requests

travis-debug
darosior 5 years ago
committed by Rusty Russell
parent
commit
c765a223f1
  1. 201
      plugins/libplugin.c
  2. 14
      plugins/libplugin.h

201
plugins/libplugin.c

@ -1,6 +1,5 @@
#include <bitcoin/chainparams.h>
#include <ccan/err/err.h>
#include <ccan/intmap/intmap.h>
#include <ccan/io/io.h>
#include <ccan/json_out/json_out.h>
#include <ccan/read_write_all/read_write_all.h>
@ -20,10 +19,6 @@
#define READ_CHUNKSIZE 4096
/* Tracking requests */
static UINTMAP(struct out_req *) out_reqs;
static u64 next_outreq_id;
const struct chainparams *chainparams;
struct plugin_timer {
@ -70,6 +65,13 @@ static void ld_send(struct plugin *plugin, struct json_stream *stream)
io_wake(plugin);
}
static void ld_rpc_send(struct plugin *plugin, struct json_stream *stream)
{
tal_steal(plugin->rpc_js_arr, stream);
tal_arr_expand(&plugin->rpc_js_arr, stream);
io_wake(plugin->io_rpc_conn);
}
/* FIXME: Move lightningd/jsonrpc to common/ ? */
static void jsonrpc_finish_and_send(struct plugin *p, struct json_stream *js)
@ -381,42 +383,46 @@ const char *rpc_delve(const tal_t *ctx,
return ret;
}
static void handle_rpc_reply(struct plugin *plugin)
static void handle_rpc_reply(struct plugin *plugin, const jsmntok_t *toks)
{
int reqlen;
const jsmntok_t *toks, *contents, *t;
const jsmntok_t *idtok, *contenttok;
struct out_req *out;
struct command_result *res;
u64 id;
bool error;
toks = read_rpc_reply(tmpctx, plugin, &contents, &error, &reqlen);
t = json_get_member(membuf_elems(&plugin->rpc_conn.mb), toks, "id");
if (!t)
idtok = json_get_member(plugin->rpc_buffer, toks, "id");
if (!idtok)
plugin_err(plugin, "JSON reply without id '%.*s'",
reqlen, membuf_elems(&plugin->rpc_conn.mb));
if (!json_to_u64(membuf_elems(&plugin->rpc_conn.mb), t, &id))
json_tok_full_len(toks),
json_tok_full(plugin->rpc_buffer, toks));
if (!json_to_u64(plugin->rpc_buffer, idtok, &id))
plugin_err(plugin, "JSON reply without numeric id '%.*s'",
reqlen, membuf_elems(&plugin->rpc_conn.mb));
out = uintmap_get(&out_reqs, id);
json_tok_full_len(toks),
json_tok_full(plugin->rpc_buffer, toks));
out = uintmap_get(&plugin->out_reqs, id);
if (!out)
plugin_err(plugin, "JSON reply with unknown id '%.*s' (%"PRIu64")",
reqlen, membuf_elems(&plugin->rpc_conn.mb), id);
json_tok_full_len(toks),
json_tok_full(plugin->rpc_buffer, toks), id);
/* We want to free this if callback doesn't. */
tal_steal(tmpctx, out);
uintmap_del(&out_reqs, out->id);
uintmap_del(&plugin->out_reqs, out->id);
if (error)
res = out->errcb(out->cmd, membuf_elems(&plugin->rpc_conn.mb),
contents, out->arg);
else
res = out->cb(out->cmd, membuf_elems(&plugin->rpc_conn.mb),
contents, out->arg);
contenttok = json_get_member(plugin->rpc_buffer, toks, "error");
if (contenttok)
res = out->errcb(out->cmd, plugin->rpc_buffer,
contenttok, out->arg);
else {
contenttok = json_get_member(plugin->rpc_buffer, toks, "result");
if (!contenttok)
plugin_err(plugin, "Bad JSONRPC, no 'error' nor 'result': '%.*s'",
json_tok_full_len(toks),
json_tok_full(plugin->rpc_buffer, toks));
res = out->cb(out->cmd, plugin->rpc_buffer, contenttok, out->arg);
}
assert(res == &pending || res == &complete);
membuf_consume(&plugin->rpc_conn.mb, reqlen);
}
struct command_result *
@ -433,19 +439,29 @@ send_outreq_(struct command *cmd,
void *arg,
const struct json_out *params TAKES)
{
struct json_out *jout;
struct json_stream *js;
struct out_req *out;
out = tal(cmd, struct out_req);
out->id = next_outreq_id++;
out->id = cmd->plugin->next_outreq_id++;
out->cmd = cmd;
out->cb = cb;
out->errcb = errcb;
out->arg = arg;
uintmap_add(&out_reqs, out->id, out);
uintmap_add(&cmd->plugin->out_reqs, out->id, out);
jout = start_json_request(tmpctx, out->id, method, params);
finish_and_send_json(cmd->plugin->rpc_conn.fd, jout);
js = new_json_stream(NULL, cmd, NULL);
json_object_start(js, NULL);
json_add_string(js, "jsonrpc", "2.0");
json_add_u64(js, "id", out->id);
json_add_string(js, "method", method);
json_out_add_splice(js->jout, "params", params);
json_object_compat_end(js);
json_stream_close(js, cmd);
ld_rpc_send(cmd->plugin, js);
if (taken(params))
tal_free(params);
return &pending;
}
@ -499,6 +515,112 @@ handle_getmanifest(struct command *getmanifest_cmd)
return command_success(getmanifest_cmd, params);
}
static void rpc_conn_finished(struct io_conn *conn,
struct plugin *plugin)
{
plugin_err(plugin, "Lost connection to the RPC socket.");
}
static bool rpc_read_response_one(struct plugin *plugin)
{
bool valid;
const jsmntok_t *toks, *jrtok;
/* FIXME: This could be done more efficiently by storing the
* toks and doing an incremental parse, like lightning-cli
* does. */
toks = json_parse_input(NULL, plugin->rpc_buffer, plugin->rpc_used,
&valid);
if (!toks) {
if (!valid) {
plugin_err(plugin, "Failed to parse RPC JSON response '%.*s'",
(int)plugin->rpc_used, plugin->rpc_buffer);
return false;
}
/* We need more. */
return false;
}
/* Empty buffer? (eg. just whitespace). */
if (tal_count(toks) == 1) {
plugin->rpc_used = 0;
return false;
}
jrtok = json_get_member(plugin->rpc_buffer, toks, "jsonrpc");
if (!jrtok) {
plugin_err(plugin, "JSON-RPC message does not contain \"jsonrpc\" field");
return false;
}
handle_rpc_reply(plugin, toks);
/* Move this object out of the buffer */
memmove(plugin->rpc_buffer, plugin->rpc_buffer + toks[0].end,
tal_count(plugin->rpc_buffer) - toks[0].end);
plugin->rpc_used -= toks[0].end;
tal_free(toks);
return true;
}
static struct io_plan *rpc_conn_read_response(struct io_conn *conn,
struct plugin *plugin)
{
plugin->rpc_used += plugin->rpc_len_read;
if (plugin->rpc_used == tal_count(plugin->rpc_buffer))
tal_resize(&plugin->rpc_buffer, plugin->rpc_used * 2);
/* Read and process all messages from the connection */
while (rpc_read_response_one(plugin))
;
/* Read more, if there is. */
return io_read_partial(plugin->io_rpc_conn,
plugin->rpc_buffer + plugin->rpc_used,
tal_bytelen(plugin->rpc_buffer) - plugin->rpc_used,
&plugin->rpc_len_read,
rpc_conn_read_response, plugin);
}
static struct io_plan *rpc_conn_write_request(struct io_conn *conn,
struct plugin *plugin);
static struct io_plan *
rpc_stream_complete(struct io_conn *conn, struct json_stream *js,
struct plugin *plugin)
{
assert(tal_count(plugin->rpc_js_arr) > 0);
/* Remove js and shift all remaining over */
tal_arr_remove(&plugin->rpc_js_arr, 0);
/* It got dropped off the queue, free it. */
tal_free(js);
return rpc_conn_write_request(conn, plugin);
}
static struct io_plan *rpc_conn_write_request(struct io_conn *conn,
struct plugin *plugin)
{
if (tal_count(plugin->rpc_js_arr) > 0)
return json_stream_output(plugin->rpc_js_arr[0], conn,
rpc_stream_complete, plugin);
return io_out_wait(conn, plugin->io_rpc_conn,
rpc_conn_write_request, plugin);
}
static struct io_plan *rpc_conn_init(struct io_conn *conn,
struct plugin *plugin)
{
plugin->io_rpc_conn = conn;
io_set_finish(conn, rpc_conn_finished, plugin);
return io_duplex(conn,
rpc_conn_read_response(conn, plugin),
rpc_conn_write_request(conn, plugin));
}
static struct command_result *handle_init(struct command *cmd,
const char *buf,
const jsmntok_t *params)
@ -562,6 +684,8 @@ static struct command_result *handle_init(struct command *cmd,
if (p->init)
p->init(p, buf, configtok);
io_new_conn(p, p->rpc_conn.fd, rpc_conn_init, p);
return command_success_str(cmd, NULL);
}
@ -895,7 +1019,14 @@ static struct plugin *new_plugin(const tal_t *ctx,
p->js_arr = tal_arr(p, struct json_stream *, 0);
p->used = 0;
p->len_read = 0;
/* rpc. TODO: use ccan/io also for RPC */
/* Async RPC */
p->rpc_buffer = tal_arr(p, char, 64);
p->rpc_js_arr = tal_arr(p, struct json_stream *, 0);
p->rpc_used = 0;
p->rpc_len_read = 0;
p->next_outreq_id = 0;
uintmap_init(&p->out_reqs);
/* Sync RPC FIXME: maybe go full async ? */
membuf_init(&p->rpc_conn.mb,
tal_arr(p, char, READ_CHUNKSIZE), READ_CHUNKSIZE,
membuf_tal_realloc);
@ -954,7 +1085,6 @@ void plugin_main(char *argv[],
notif_subs, num_notif_subs, hook_subs,
num_hook_subs, ap);
va_end(ap);
uintmap_init(&out_reqs);
setup_command_usage(plugin);
timers_init(&plugin->timers, time_mono());
@ -967,11 +1097,6 @@ void plugin_main(char *argv[],
clean_tmpctx();
if (membuf_num_elems(&plugin->rpc_conn.mb) != 0) {
handle_rpc_reply(plugin);
continue;
}
/* Will only exit if a timer has expired. */
io_loop(&plugin->timers, &expired);
call_plugin_timer(plugin, expired);

14
plugins/libplugin.h

@ -3,7 +3,8 @@
#define LIGHTNING_PLUGINS_LIBPLUGIN_H
#include "config.h"
#include <ccan/ccan/membuf/membuf.h>
#include <ccan/intmap/intmap.h>
#include <ccan/membuf/membuf.h>
#include <ccan/strmap/strmap.h>
#include <ccan/time/time.h>
#include <ccan/timer/timer.h>
@ -39,8 +40,19 @@ struct plugin {
/* To write to lightningd */
struct json_stream **js_arr;
/* Asynchronous RPC interaction */
struct io_conn *io_rpc_conn;
struct json_stream **rpc_js_arr;
char *rpc_buffer;
size_t rpc_used, rpc_len_read;
/* Tracking async RPC requests */
UINTMAP(struct out_req *) out_reqs;
u64 next_outreq_id;
/* Synchronous RPC interaction */
struct rpc_conn rpc_conn;
/* Plugin informations */
enum plugin_restartability restartability;
const struct plugin_command *commands;
size_t num_commands;

Loading…
Cancel
Save