You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

154 lines
3.7 KiB

#include <ccan/fdpass/fdpass.h>
#include <ccan/io/fdpass/fdpass.h>
#include <ccan/take/take.h>
#include <common/daemon_conn.h>
#include <wire/wire_io.h>
#include <wire/wire_sync.h>
struct daemon_conn {
/* Last message we received */
u8 *msg_in;
/* Queue of outgoing messages */
struct msg_queue *out;
/* Underlying connection: we're freed if it closes, and vice versa */
struct io_conn *conn;
/* Callback for incoming messages */
struct io_plan *(*recv)(struct io_conn *conn, const u8 *, void *);
/* Called whenever we've cleared the msg_out queue. */
void (*outq_empty)(void *);
/* Arg for both callbacks. */
void *arg;
};
static struct io_plan *handle_read(struct io_conn *conn,
struct daemon_conn *dc)
{
return dc->recv(conn, dc->msg_in, dc->arg);
}
struct io_plan *daemon_conn_read_next(struct io_conn *conn,
struct daemon_conn *dc)
{
/* FIXME: We could use disposable parent instead, and recv() could
* tal_steal() it? If they did that now, we'd free it here. */
tal_free(dc->msg_in);
return io_read_wire(conn, dc, &dc->msg_in, handle_read, dc);
}
static struct io_plan *daemon_conn_write_next(struct io_conn *conn,
struct daemon_conn *dc)
{
const u8 *msg;
msg = msg_dequeue(dc->out);
/* If nothing in queue, give empty callback a chance to queue somthing */
if (!msg && dc->outq_empty) {
dc->outq_empty(dc->arg);
msg = msg_dequeue(dc->out);
}
if (msg) {
int fd = msg_extract_fd(msg);
if (fd >= 0) {
tal_free(msg);
return io_send_fd(conn, fd, true,
daemon_conn_write_next, dc);
}
return io_write_wire(conn, take(msg), daemon_conn_write_next,
dc);
}
return msg_queue_wait(conn, dc->out, daemon_conn_write_next, dc);
}
bool daemon_conn_sync_flush(struct daemon_conn *dc)
{
const u8 *msg;
int daemon_fd;
/* Flush any current packet. */
if (!io_flush_sync(dc->conn))
return false;
/* Make fd blocking for the duration */
daemon_fd = io_conn_fd(dc->conn);
if (!io_fd_block(daemon_fd, true))
return false;
/* Flush existing messages. */
while ((msg = msg_dequeue(dc->out)) != NULL) {
int fd = msg_extract_fd(msg);
if (fd >= 0) {
tal_free(msg);
if (!fdpass_send(daemon_fd, fd))
break;
} else if (!wire_sync_write(daemon_fd, take(msg)))
break;
}
io_fd_block(daemon_fd, false);
/* Success if and only if we flushed them all. */
return msg == NULL;
}
static struct io_plan *daemon_conn_start(struct io_conn *conn,
struct daemon_conn *dc)
{
return io_duplex(conn, daemon_conn_read_next(conn, dc),
/* Could call daemon_conn_write_next, but we don't
* want it to call empty_cb just yet! */
msg_queue_wait(conn, dc->out,
daemon_conn_write_next, dc));
}
static void destroy_dc_from_conn(struct io_conn *conn, struct daemon_conn *dc)
{
/* Harmless free loop if conn is being destroyed because dc freed */
tal_free(dc);
}
struct daemon_conn *daemon_conn_new_(const tal_t *ctx, int fd,
struct io_plan *(*recv)(struct io_conn *,
const u8 *,
void *),
void (*outq_empty)(void *),
void *arg)
{
struct daemon_conn *dc = tal(NULL, struct daemon_conn);
dc->recv = recv;
dc->outq_empty = outq_empty;
dc->arg = arg;
dc->msg_in = NULL;
dc->out = msg_queue_new(dc);
dc->conn = io_new_conn(dc, fd, daemon_conn_start, dc);
tal_add_destructor2(dc->conn, destroy_dc_from_conn, dc);
return dc;
}
void daemon_conn_send(struct daemon_conn *dc, const u8 *msg)
{
msg_enqueue(dc->out, msg);
}
void daemon_conn_send_fd(struct daemon_conn *dc, int fd)
{
msg_enqueue_fd(dc->out, fd);
}
void daemon_conn_wake(struct daemon_conn *dc)
{
msg_wake(dc->out);
}
size_t daemon_conn_queue_length(const struct daemon_conn *dc)
{
return msg_queue_length(dc->out);
}