diff --git a/contrib/helloworld-plugin/main.py b/contrib/helloworld-plugin/main.py index d8b53aae9..fd5eaa042 100755 --- a/contrib/helloworld-plugin/main.py +++ b/contrib/helloworld-plugin/main.py @@ -46,7 +46,6 @@ methods = { 'ping': json_ping, } - partial = "" for l in sys.stdin: partial += l diff --git a/lightningd/plugin.c b/lightningd/plugin.c index 439b02131..d0e85d489 100644 --- a/lightningd/plugin.c +++ b/lightningd/plugin.c @@ -1,5 +1,6 @@ #include "lightningd/plugin.h" +#include #include #include #include @@ -10,12 +11,27 @@ struct plugin { int stdin, stdout; pid_t pid; char *cmd; + struct io_conn *stdin_conn, *stdout_conn; + bool stop; + + /* Stuff we read */ + char *buffer; + size_t used, len_read; + + /* Stuff we write */ + struct list_head output; + const char *outbuf; }; struct plugins { struct plugin *plugins; }; +struct json_output { + struct list_node list; + const char *json; +}; + struct plugins *plugins_new(const tal_t *ctx){ struct plugins *p; p = tal(ctx, struct plugins); @@ -32,13 +48,123 @@ void plugin_register(struct plugins *plugins, const char* path TAKES) p->cmd = tal_strdup(p, path); } +/** + * Try to parse a complete message from the plugin's buffer. + * + * Internally calls the handler if it was able to fully parse a JSON message, + * and returns true in that case. + */ +static bool plugin_read_json_one(struct plugin *plugin) +{ + jsmntok_t *toks; + bool valid; + toks = json_parse_input(plugin->buffer, plugin->used, &valid); + if (!toks) { + if (!valid) { + /* FIXME (cdecker) Print error and kill the plugin */ + return io_close(plugin->stdout_conn); + } + /* We need more. */ + return false; + } + + /* Empty buffer? (eg. just whitespace). */ + if (tal_count(toks) == 1) { + plugin->used = 0; + return false; + } + + /* FIXME(cdecker) Call dispatch to handle this message. */ + + /* Move this object out of the buffer */ + memmove(plugin->buffer, plugin->buffer + toks[0].end, + tal_count(plugin->buffer) - toks[0].end); + plugin->used -= toks[0].end; + tal_free(toks); + return true; +} + +static struct io_plan *plugin_read_json(struct io_conn *conn UNUSED, + struct plugin *plugin) +{ + bool success; + plugin->used += plugin->len_read; + if (plugin->used == tal_count(plugin->buffer)) + tal_resize(&plugin->buffer, plugin->used * 2); + + /* Read and process all messages from the connection */ + do { + success = plugin_read_json_one(plugin); + } while (success); + + /* Now read more from the connection */ + return io_read_partial(plugin->stdout_conn, + plugin->buffer + plugin->used, + tal_count(plugin->buffer) - plugin->used, + &plugin->len_read, plugin_read_json, plugin); +} + +static struct io_plan *plugin_write_json(struct io_conn *conn UNUSED, + struct plugin *plugin) +{ + struct json_output *out; + out = list_pop(&plugin->output, struct json_output, list); + if (!out) { + if (plugin->stop) { + return io_close(conn); + } else { + return io_out_wait(plugin->stdin_conn, plugin, + plugin_write_json, plugin); + } + } + + /* We have a message we'd like to send */ + plugin->outbuf = tal_steal(plugin, out->json); + tal_free(out); + return io_write(conn, plugin->outbuf, strlen(plugin->outbuf), + plugin_write_json, plugin); +} + +static struct io_plan *plugin_conn_init(struct io_conn *conn, + struct plugin *plugin) +{ + plugin->stop = false; + if (plugin->stdout == io_conn_fd(conn)) { + /* We read from their stdout */ + plugin->stdout_conn = conn; + return io_read_partial(plugin->stdout_conn, plugin->buffer, + tal_bytelen(plugin->buffer), + &plugin->len_read, plugin_read_json, + plugin); + } else { + /* We write to their stdin */ + plugin->stdin_conn = conn; + /* We don't have anything queued yet, wait for notification */ + return io_wait(plugin->stdin_conn, plugin, plugin_write_json, + plugin); + } +} + void plugins_init(struct plugins *plugins) { struct plugin *p; + char **cmd; /* Spawn the plugin processes before entering the io_loop */ for (size_t i=0; iplugins); i++) { p = &plugins->plugins[i]; - p->pid = pipecmd(&p->stdout, &p->stdin, NULL, p->cmd); + cmd = tal_arr(p, char *, 2); + cmd[0] = p->cmd; + cmd[1] = NULL; + p->pid = pipecmdarr(&p->stdout, &p->stdin, NULL, cmd); + + list_head_init(&p->output); + p->buffer = tal_arr(p, char, 64); + p->used = 0; + + /* Create two connections, one read-only on top of p->stdin, and one + * write-only on p->stdout */ + io_new_conn(p, p->stdout, plugin_conn_init, p); + io_new_conn(p, p->stdin, plugin_conn_init, p); } }