Browse Source

lightningd: forward notifications from plugins if enabled.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
travis-experimental
Rusty Russell 4 years ago
parent
commit
9f687d60d9
  1. 15
      lightningd/bitcoind.c
  2. 11
      lightningd/jsonrpc.c
  3. 19
      lightningd/jsonrpc.h
  4. 62
      lightningd/plugin.c
  5. 4
      lightningd/plugin_hook.c

15
lightningd/bitcoind.c

@ -57,7 +57,7 @@ static void config_plugin(struct plugin *plugin)
struct jsonrpc_request *req; struct jsonrpc_request *req;
req = jsonrpc_request_start(plugin, "init", plugin->log, req = jsonrpc_request_start(plugin, "init", plugin->log,
plugin_config_cb, plugin); NULL, plugin_config_cb, plugin);
plugin_populate_init_request(plugin, req); plugin_populate_init_request(plugin, req);
jsonrpc_request_end(req); jsonrpc_request_end(req);
plugin_request_send(plugin, req); plugin_request_send(plugin, req);
@ -232,7 +232,7 @@ void bitcoind_estimate_fees_(struct bitcoind *bitcoind,
call->arg = arg; call->arg = arg;
req = jsonrpc_request_start(bitcoind, "estimatefees", bitcoind->log, req = jsonrpc_request_start(bitcoind, "estimatefees", bitcoind->log,
estimatefees_callback, call); NULL, estimatefees_callback, call);
jsonrpc_request_end(req); jsonrpc_request_end(req);
plugin_request_send(strmap_get(&bitcoind->pluginsmap, plugin_request_send(strmap_get(&bitcoind->pluginsmap,
"estimatefees"), req); "estimatefees"), req);
@ -348,7 +348,7 @@ static void sendrawtx_compatv090_callback(const char *buf,
/* Retry with a single argument, hextx. */ /* Retry with a single argument, hextx. */
req = jsonrpc_request_start(call->bitcoind, "sendrawtransaction", req = jsonrpc_request_start(call->bitcoind, "sendrawtransaction",
call->bitcoind->log, call->bitcoind->log,
&sendrawtx_callback, call); NULL, sendrawtx_callback, call);
json_add_string(req->stream, "tx", call->hextx); json_add_string(req->stream, "tx", call->hextx);
jsonrpc_request_end(req); jsonrpc_request_end(req);
bitcoin_plugin_send(call->bitcoind, req); bitcoin_plugin_send(call->bitcoind, req);
@ -385,7 +385,7 @@ void bitcoind_sendrawtx_ahf_(struct bitcoind *bitcoind,
req = jsonrpc_request_start(bitcoind, "sendrawtransaction", req = jsonrpc_request_start(bitcoind, "sendrawtransaction",
bitcoind->log, bitcoind->log,
sendrawtx_compatv090_callback, NULL, sendrawtx_compatv090_callback,
call); call);
json_add_string(req->stream, "tx", hextx); json_add_string(req->stream, "tx", hextx);
json_add_bool(req->stream, "allowhighfees", allowhighfees); json_add_bool(req->stream, "allowhighfees", allowhighfees);
@ -494,7 +494,8 @@ void bitcoind_getrawblockbyheight_(struct bitcoind *bitcoind,
call->cb_arg = cb_arg; call->cb_arg = cb_arg;
req = jsonrpc_request_start(bitcoind, "getrawblockbyheight", req = jsonrpc_request_start(bitcoind, "getrawblockbyheight",
bitcoind->log, getrawblockbyheight_callback, bitcoind->log,
NULL, getrawblockbyheight_callback,
/* Freed in cb. */ /* Freed in cb. */
notleak(call)); notleak(call));
json_add_num(req->stream, "height", height); json_add_num(req->stream, "height", height);
@ -591,7 +592,7 @@ void bitcoind_getchaininfo_(struct bitcoind *bitcoind,
call->first_call = first_call; call->first_call = first_call;
req = jsonrpc_request_start(bitcoind, "getchaininfo", bitcoind->log, req = jsonrpc_request_start(bitcoind, "getchaininfo", bitcoind->log,
getchaininfo_callback, call); NULL, getchaininfo_callback, call);
jsonrpc_request_end(req); jsonrpc_request_end(req);
bitcoin_plugin_send(bitcoind, req); bitcoin_plugin_send(bitcoind, req);
} }
@ -672,7 +673,7 @@ void bitcoind_getutxout_(struct bitcoind *bitcoind,
call->cb_arg = cb_arg; call->cb_arg = cb_arg;
req = jsonrpc_request_start(bitcoind, "getutxout", bitcoind->log, req = jsonrpc_request_start(bitcoind, "getutxout", bitcoind->log,
getutxout_callback, call); NULL, getutxout_callback, call);
json_add_txid(req->stream, "txid", txid); json_add_txid(req->stream, "txid", txid);
json_add_num(req->stream, "vout", outnum); json_add_num(req->stream, "vout", outnum);
jsonrpc_request_end(req); jsonrpc_request_end(req);

11
lightningd/jsonrpc.c

@ -531,6 +531,10 @@ struct json_stream *json_stream_raw_for_cmd(struct command *cmd)
{ {
struct json_stream *js; struct json_stream *js;
/* Might have already opened it for a notification */
if (cmd->json_stream)
return cmd->json_stream;
/* If they still care about the result, attach it to them. */ /* If they still care about the result, attach it to them. */
if (cmd->jcon) if (cmd->jcon)
js = jcon_new_json_stream(cmd, cmd->jcon, cmd); js = jcon_new_json_stream(cmd, cmd->jcon, cmd);
@ -806,6 +810,7 @@ parse_request(struct json_connection *jcon, const jsmntok_t tok[])
* the connection since the command may outlive `conn`. */ * the connection since the command may outlive `conn`. */
c = tal(jcon->ld->jsonrpc, struct command); c = tal(jcon->ld->jsonrpc, struct command);
c->jcon = jcon; c->jcon = jcon;
c->send_notifications = jcon->notifications_enabled;
c->ld = jcon->ld; c->ld = jcon->ld;
c->pending = false; c->pending = false;
c->json_stream = NULL; c->json_stream = NULL;
@ -1200,6 +1205,11 @@ void jsonrpc_notification_end(struct jsonrpc_notification *n)
struct jsonrpc_request *jsonrpc_request_start_( struct jsonrpc_request *jsonrpc_request_start_(
const tal_t *ctx, const char *method, struct log *log, const tal_t *ctx, const char *method, struct log *log,
void (*notify_cb)(const char *buffer,
const jsmntok_t *methodtok,
const jsmntok_t *paramtoks,
const jsmntok_t *idtok,
void *),
void (*response_cb)(const char *buffer, const jsmntok_t *toks, void (*response_cb)(const char *buffer, const jsmntok_t *toks,
const jsmntok_t *idtok, void *), const jsmntok_t *idtok, void *),
void *response_cb_arg) void *response_cb_arg)
@ -1207,6 +1217,7 @@ struct jsonrpc_request *jsonrpc_request_start_(
struct jsonrpc_request *r = tal(ctx, struct jsonrpc_request); struct jsonrpc_request *r = tal(ctx, struct jsonrpc_request);
static u64 next_request_id = 0; static u64 next_request_id = 0;
r->id = next_request_id++; r->id = next_request_id++;
r->notify_cb = notify_cb;
r->response_cb = response_cb; r->response_cb = response_cb;
r->response_cb_arg = response_cb_arg; r->response_cb_arg = response_cb_arg;
r->method = NULL; r->method = NULL;

19
lightningd/jsonrpc.h

@ -34,6 +34,8 @@ struct command {
const struct json_command *json_cmd; const struct json_command *json_cmd;
/* The connection, or NULL if it closed. */ /* The connection, or NULL if it closed. */
struct json_connection *jcon; struct json_connection *jcon;
/* Does this want notifications? */
bool send_notifications;
/* Have we been marked by command_still_pending? For debugging... */ /* Have we been marked by command_still_pending? For debugging... */
bool pending; bool pending;
/* Tell param() how to process the command */ /* Tell param() how to process the command */
@ -72,6 +74,11 @@ struct jsonrpc_request {
u64 id; u64 id;
const char *method; const char *method;
struct json_stream *stream; struct json_stream *stream;
void (*notify_cb)(const char *buffer,
const jsmntok_t *idtok,
const jsmntok_t *methodtok,
const jsmntok_t *paramtoks,
void *);
void (*response_cb)(const char *buffer, const jsmntok_t *toks, void (*response_cb)(const char *buffer, const jsmntok_t *toks,
const jsmntok_t *idtok, void *); const jsmntok_t *idtok, void *);
void *response_cb_arg; void *response_cb_arg;
@ -193,9 +200,14 @@ struct jsonrpc_notification *jsonrpc_notification_start(const tal_t *ctx, const
*/ */
void jsonrpc_notification_end(struct jsonrpc_notification *n); void jsonrpc_notification_end(struct jsonrpc_notification *n);
#define jsonrpc_request_start(ctx, method, log, response_cb, response_cb_arg) \ #define jsonrpc_request_start(ctx, method, log, notify_cb, response_cb, response_cb_arg) \
jsonrpc_request_start_( \ jsonrpc_request_start_( \
(ctx), (method), (log), \ (ctx), (method), (log), \
typesafe_cb_preargs(void, void *, (notify_cb), (response_cb_arg), \
const char *buffer, \
const jsmntok_t *idtok, \
const jsmntok_t *methodtok, \
const jsmntok_t *paramtoks), \
typesafe_cb_preargs(void, void *, (response_cb), (response_cb_arg), \ typesafe_cb_preargs(void, void *, (response_cb), (response_cb_arg), \
const char *buffer, \ const char *buffer, \
const jsmntok_t *toks, \ const jsmntok_t *toks, \
@ -204,6 +216,11 @@ void jsonrpc_notification_end(struct jsonrpc_notification *n);
struct jsonrpc_request *jsonrpc_request_start_( struct jsonrpc_request *jsonrpc_request_start_(
const tal_t *ctx, const char *method, struct log *log, const tal_t *ctx, const char *method, struct log *log,
void (*notify_cb)(const char *buffer,
const jsmntok_t *idtok,
const jsmntok_t *methodtok,
const jsmntok_t *paramtoks,
void *),
void (*response_cb)(const char *buffer, const jsmntok_t *toks, void (*response_cb)(const char *buffer, const jsmntok_t *toks,
const jsmntok_t *idtok, void *), const jsmntok_t *idtok, void *),
void *response_cb_arg); void *response_cb_arg);

62
lightningd/plugin.c

@ -299,6 +299,35 @@ static const char *plugin_log_handle(struct plugin *plugin,
return NULL; return NULL;
} }
static const char *plugin_notify_handle(struct plugin *plugin,
const jsmntok_t *methodtok,
const jsmntok_t *paramstok)
{
const jsmntok_t *idtok;
u64 id;
struct jsonrpc_request *request;
/* id inside params tells us which id to redirect to. */
idtok = json_get_member(plugin->buffer, paramstok, "id");
if (!idtok || !json_to_u64(plugin->buffer, idtok, &id)) {
return tal_fmt(plugin,
"JSON-RPC notify \"id\"-field is not a u64");
}
request = uintmap_get(&plugin->plugins->pending_requests, id);
if (!request) {
return tal_fmt(
plugin,
"Received a JSON-RPC notify for non-existent request");
}
/* Ignore if they don't have a callback */
if (request->notify_cb)
request->notify_cb(plugin->buffer, methodtok, paramstok, idtok,
request->response_cb_arg);
return NULL;
}
/* Returns the error string, or NULL */ /* Returns the error string, or NULL */
static const char *plugin_notification_handle(struct plugin *plugin, static const char *plugin_notification_handle(struct plugin *plugin,
const jsmntok_t *toks) const jsmntok_t *toks)
@ -326,6 +355,9 @@ static const char *plugin_notification_handle(struct plugin *plugin,
* register notification handlers in a variety of places. */ * register notification handlers in a variety of places. */
if (json_tok_streq(plugin->buffer, methtok, "log")) { if (json_tok_streq(plugin->buffer, methtok, "log")) {
return plugin_log_handle(plugin, paramstok); return plugin_log_handle(plugin, paramstok);
} else if (json_tok_streq(plugin->buffer, methtok, "message")
|| json_tok_streq(plugin->buffer, methtok, "progress")) {
return plugin_notify_handle(plugin, methtok, paramstok);
} else { } else {
return tal_fmt(plugin, "Unknown notification method %.*s", return tal_fmt(plugin, "Unknown notification method %.*s",
json_tok_full_len(methtok), json_tok_full_len(methtok),
@ -793,6 +825,31 @@ static void plugin_rpcmethod_cb(const char *buffer,
tal_free(call); tal_free(call);
} }
static void plugin_notify_cb(const char *buffer,
const jsmntok_t *methodtok,
const jsmntok_t *paramtoks,
const jsmntok_t *idtok,
struct plugin_rpccall *call)
{
struct command *cmd = call->cmd;
struct json_stream *response;
if (!cmd->jcon || !cmd->send_notifications)
return;
response = json_stream_raw_for_cmd(cmd);
json_object_start(response, NULL);
json_add_string(response, "jsonrpc", "2.0");
json_add_tok(response, "method", methodtok, buffer);
json_stream_append(response, ",\"params\":", strlen(",\"params\":"));
json_stream_forward_change_id(response, buffer,
paramtoks, idtok, cmd->id);
json_object_end(response);
json_stream_double_cr(response);
json_stream_flush(response);
}
struct plugin *find_plugin_for_command(struct lightningd *ld, struct plugin *find_plugin_for_command(struct lightningd *ld,
const char *cmd_name) const char *cmd_name)
{ {
@ -836,6 +893,7 @@ static struct command_result *plugin_rpcmethod_dispatch(struct command *cmd,
call->cmd = cmd; call->cmd = cmd;
req = jsonrpc_request_start(plugin, NULL, plugin->log, req = jsonrpc_request_start(plugin, NULL, plugin->log,
plugin_notify_cb,
plugin_rpcmethod_cb, call); plugin_rpcmethod_cb, call);
call->request = req; call->request = req;
call->plugin = plugin; call->plugin = plugin;
@ -1301,7 +1359,7 @@ const char *plugin_send_getmanifest(struct plugin *p)
p->stdout_conn = io_new_conn(p, stdoutfd, plugin_stdout_conn_init, p); p->stdout_conn = io_new_conn(p, stdoutfd, plugin_stdout_conn_init, p);
p->stdin_conn = io_new_conn(p, stdinfd, plugin_stdin_conn_init, p); p->stdin_conn = io_new_conn(p, stdinfd, plugin_stdin_conn_init, p);
req = jsonrpc_request_start(p, "getmanifest", p->log, req = jsonrpc_request_start(p, "getmanifest", p->log,
plugin_manifest_cb, p); NULL, plugin_manifest_cb, p);
/* Adding allow-deprecated-apis is part of the deprecation cycle! */ /* Adding allow-deprecated-apis is part of the deprecation cycle! */
if (!deprecated_apis) if (!deprecated_apis)
json_add_bool(req->stream, "allow-deprecated-apis", deprecated_apis); json_add_bool(req->stream, "allow-deprecated-apis", deprecated_apis);
@ -1455,7 +1513,7 @@ plugin_config(struct plugin *plugin)
struct jsonrpc_request *req; struct jsonrpc_request *req;
req = jsonrpc_request_start(plugin, "init", plugin->log, req = jsonrpc_request_start(plugin, "init", plugin->log,
plugin_config_cb, plugin); NULL, plugin_config_cb, plugin);
plugin_populate_init_request(plugin, req); plugin_populate_init_request(plugin, req);
jsonrpc_request_end(req); jsonrpc_request_end(req);
plugin_request_send(plugin, req); plugin_request_send(plugin, req);

4
lightningd/plugin_hook.c

@ -231,6 +231,7 @@ static void plugin_hook_call_next(struct plugin_hook_request *ph_req)
req = jsonrpc_request_start(NULL, hook->name, req = jsonrpc_request_start(NULL, hook->name,
plugin_get_log(ph_req->plugin), plugin_get_log(ph_req->plugin),
NULL,
plugin_hook_callback, ph_req); plugin_hook_callback, ph_req);
hook->serialize_payload(ph_req->cb_arg, req->stream); hook->serialize_payload(ph_req->cb_arg, req->stream);
@ -328,7 +329,8 @@ void plugin_hook_db_sync(struct db *db)
ph_req = notleak(tal(hook->plugins, struct plugin_hook_request)); ph_req = notleak(tal(hook->plugins, struct plugin_hook_request));
/* FIXME: do IO logging for this! */ /* FIXME: do IO logging for this! */
req = jsonrpc_request_start(NULL, hook->name, NULL, db_hook_response, req = jsonrpc_request_start(NULL, hook->name, NULL, NULL,
db_hook_response,
ph_req); ph_req);
ph_req->hook = hook; ph_req->hook = hook;

Loading…
Cancel
Save