From 8ae698d1dc82f060846bf06b722b24a4b7cf4591 Mon Sep 17 00:00:00 2001 From: Christian Decker Date: Mon, 13 Mar 2017 17:24:05 +0100 Subject: [PATCH] Migrating daemon_conn to msg_queue and msg_queue takes over messages We have some duplication in handling queues, so this is an attempt at deduplicating some of that work. `daemon_conn` now uses the `msg_queue` and `channeld` was also migrated to `msg_queue`. At the same time I made `msg_queue` create a copy of the messages or takes over messages marked with `take()`. This should make cleaning up messages easier. --- lightningd/channel/channel.c | 26 ++++---------------------- lightningd/connection.c | 26 +++----------------------- lightningd/connection.h | 5 +++-- lightningd/msg_queue.c | 3 ++- lightningd/msg_queue.h | 1 + lightningd/subd.c | 4 ++-- 6 files changed, 15 insertions(+), 50 deletions(-) diff --git a/lightningd/channel/channel.c b/lightningd/channel/channel.c index f0f9236a7..c31ac8dd9 100644 --- a/lightningd/channel/channel.c +++ b/lightningd/channel/channel.c @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -52,32 +53,13 @@ struct peer { struct channel *channel; u8 *req_in; - const u8 **peer_out; + + struct msg_queue peer_out; int gossip_client_fd; struct daemon_conn gossip_client; }; -static void msg_enqueue(const u8 ***q, const u8 *add) -{ - size_t n = tal_count(*q); - tal_resize(q, n+1); - (*q)[n] = add; -} - -static const u8 *msg_dequeue(const u8 ***q) -{ - size_t n = tal_count(*q); - const u8 *msg; - - if (!n) - return NULL; - msg = (*q)[0]; - memmove(*q, *q + 1, sizeof(**q) * (n-1)); - tal_resize(q, n-1); - return msg; -} - static void queue_pkt(struct peer *peer, const u8 *msg) { msg_enqueue(&peer->peer_out, msg); @@ -199,7 +181,7 @@ int main(int argc, char *argv[]) secp256k1_ctx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY | SECP256K1_CONTEXT_SIGN); status_setup(REQ_FD); - peer->peer_out = tal_arr(peer, const u8 *, 0); + msg_queue_init(&peer->peer_out, peer); init_peer_crypto_state(peer, &peer->pcs); peer->funding_locked[LOCAL] = peer->funding_locked[REMOTE] = false; diff --git a/lightningd/connection.c b/lightningd/connection.c index 1e69e6bf9..5a92ebaed 100644 --- a/lightningd/connection.c +++ b/lightningd/connection.c @@ -2,26 +2,6 @@ #include #include -static void daemon_conn_enqueue(struct daemon_conn *dc, u8 *msg) -{ - size_t n = tal_count(dc->msg_out); - tal_resize(&dc->msg_out, n + 1); - dc->msg_out[n] = tal_dup_arr(dc->ctx, u8, msg, tal_count(msg), 0); -} - -static const u8 *daemon_conn_dequeue(struct daemon_conn *dc) -{ - const u8 *msg; - size_t n = tal_count(dc->msg_out); - - if (n == 0) - return NULL; - msg = dc->msg_out[0]; - memmove(dc->msg_out, dc->msg_out + 1, sizeof(dc->msg_in[0]) * (n - 1)); - tal_resize(&dc->msg_out, n - 1); - return msg; -} - struct io_plan *daemon_conn_read_next(struct io_conn *conn, struct daemon_conn *dc) { @@ -33,7 +13,7 @@ struct io_plan *daemon_conn_read_next(struct io_conn *conn, struct io_plan *daemon_conn_write_next(struct io_conn *conn, struct daemon_conn *dc) { - const u8 *msg = daemon_conn_dequeue(dc); + const u8 *msg = msg_dequeue(&dc->out); if (msg) { return io_write_wire(conn, take(msg), daemon_conn_write_next, dc); @@ -60,7 +40,7 @@ void daemon_conn_init(tal_t *ctx, struct daemon_conn *dc, int fd, dc->ctx = ctx; dc->msg_in = NULL; - dc->msg_out = tal_arr(ctx, u8 *, 0); + msg_queue_init(&dc->out, dc->ctx); dc->conn_fd = fd; dc->msg_queue_cleared_cb = NULL; io_new_conn(ctx, fd, daemon_conn_start, dc); @@ -68,6 +48,6 @@ void daemon_conn_init(tal_t *ctx, struct daemon_conn *dc, int fd, void daemon_conn_send(struct daemon_conn *dc, u8 *msg) { - daemon_conn_enqueue(dc, msg); + msg_enqueue(&dc->out, msg); io_wake(dc); } diff --git a/lightningd/connection.h b/lightningd/connection.h index dd69bcf1e..d30df1be0 100644 --- a/lightningd/connection.h +++ b/lightningd/connection.h @@ -4,6 +4,7 @@ #include "config.h" #include #include +#include struct daemon_conn { /* Context to tallocate all things from, possibly the @@ -13,8 +14,8 @@ struct daemon_conn { /* Last message we received */ u8 *msg_in; - /* Array of queued outgoing messages */ - u8 **msg_out; + /* Queue of outgoing messages */ + struct msg_queue out; int conn_fd; struct io_conn *conn; diff --git a/lightningd/msg_queue.c b/lightningd/msg_queue.c index 5c0678c43..8b3b48312 100644 --- a/lightningd/msg_queue.c +++ b/lightningd/msg_queue.c @@ -3,13 +3,14 @@ void msg_queue_init(struct msg_queue *q, const tal_t *ctx) { q->q = tal_arr(ctx, const u8 *, 0); + q->ctx = ctx; } void msg_enqueue(struct msg_queue *q, const u8 *add) { size_t n = tal_count(q->q); tal_resize(&q->q, n+1); - q->q[n] = add; + q->q[n] = tal_dup_arr(q->ctx, u8, add, tal_len(add), 0); /* In case someone is waiting */ io_wake(q); diff --git a/lightningd/msg_queue.h b/lightningd/msg_queue.h index fc69d9b6e..3bee179e6 100644 --- a/lightningd/msg_queue.h +++ b/lightningd/msg_queue.h @@ -7,6 +7,7 @@ struct msg_queue { const u8 **q; + const tal_t *ctx; }; void msg_queue_init(struct msg_queue *q, const tal_t *ctx); diff --git a/lightningd/subd.c b/lightningd/subd.c index 69958dc96..44668de4f 100644 --- a/lightningd/subd.c +++ b/lightningd/subd.c @@ -373,7 +373,7 @@ void subd_send_msg(struct subd *sd, const u8 *msg_out) assert(fromwire_peektype(msg_out) != STATUS_TRACE); if (!taken(msg_out)) msg_out = tal_dup_arr(sd, u8, msg_out, tal_len(msg_out), 0); - msg_enqueue(&sd->outq, msg_out); + msg_enqueue(&sd->outq, take(msg_out)); } void subd_send_fd(struct subd *sd, int fd) @@ -382,7 +382,7 @@ void subd_send_fd(struct subd *sd, int fd) u8 *fdmsg = tal_arr(sd, u8, 0); towire_u16(&fdmsg, STATUS_TRACE); towire_u32(&fdmsg, fd); - msg_enqueue(&sd->outq, fdmsg); + msg_enqueue(&sd->outq, take(fdmsg)); } void subd_req_(struct subd *sd,