Browse Source

Migrating daemon_conn to msg_queue and msg_queue takes over messages

We have some duplication in handling queues, so this is an attempt at
deduplicating some of that work. `daemon_conn` now uses the
`msg_queue` and `channeld` was also migrated to `msg_queue`. At the
same time I made `msg_queue` create a copy of the messages or takes
over messages marked with `take()`. This should make cleaning up
messages easier.
ppa-0.6.1
Christian Decker 8 years ago
parent
commit
8ae698d1dc
  1. 26
      lightningd/channel/channel.c
  2. 26
      lightningd/connection.c
  3. 5
      lightningd/connection.h
  4. 3
      lightningd/msg_queue.c
  5. 1
      lightningd/msg_queue.h
  6. 4
      lightningd/subd.c

26
lightningd/channel/channel.c

@ -17,6 +17,7 @@
#include <lightningd/debug.h> #include <lightningd/debug.h>
#include <lightningd/derive_basepoints.h> #include <lightningd/derive_basepoints.h>
#include <lightningd/key_derive.h> #include <lightningd/key_derive.h>
#include <lightningd/msg_queue.h>
#include <lightningd/peer_failed.h> #include <lightningd/peer_failed.h>
#include <secp256k1.h> #include <secp256k1.h>
#include <signal.h> #include <signal.h>
@ -52,32 +53,13 @@ struct peer {
struct channel *channel; struct channel *channel;
u8 *req_in; u8 *req_in;
const u8 **peer_out;
struct msg_queue peer_out;
int gossip_client_fd; int gossip_client_fd;
struct daemon_conn gossip_client; struct daemon_conn gossip_client;
}; };
static void msg_enqueue(const u8 ***q, const u8 *add)
{
size_t n = tal_count(*q);
tal_resize(q, n+1);
(*q)[n] = add;
}
static const u8 *msg_dequeue(const u8 ***q)
{
size_t n = tal_count(*q);
const u8 *msg;
if (!n)
return NULL;
msg = (*q)[0];
memmove(*q, *q + 1, sizeof(**q) * (n-1));
tal_resize(q, n-1);
return msg;
}
static void queue_pkt(struct peer *peer, const u8 *msg) static void queue_pkt(struct peer *peer, const u8 *msg)
{ {
msg_enqueue(&peer->peer_out, msg); msg_enqueue(&peer->peer_out, msg);
@ -199,7 +181,7 @@ int main(int argc, char *argv[])
secp256k1_ctx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY secp256k1_ctx = secp256k1_context_create(SECP256K1_CONTEXT_VERIFY
| SECP256K1_CONTEXT_SIGN); | SECP256K1_CONTEXT_SIGN);
status_setup(REQ_FD); status_setup(REQ_FD);
peer->peer_out = tal_arr(peer, const u8 *, 0); msg_queue_init(&peer->peer_out, peer);
init_peer_crypto_state(peer, &peer->pcs); init_peer_crypto_state(peer, &peer->pcs);
peer->funding_locked[LOCAL] = peer->funding_locked[REMOTE] = false; peer->funding_locked[LOCAL] = peer->funding_locked[REMOTE] = false;

26
lightningd/connection.c

@ -2,26 +2,6 @@
#include <ccan/take/take.h> #include <ccan/take/take.h>
#include <wire/wire_io.h> #include <wire/wire_io.h>
static void daemon_conn_enqueue(struct daemon_conn *dc, u8 *msg)
{
size_t n = tal_count(dc->msg_out);
tal_resize(&dc->msg_out, n + 1);
dc->msg_out[n] = tal_dup_arr(dc->ctx, u8, msg, tal_count(msg), 0);
}
static const u8 *daemon_conn_dequeue(struct daemon_conn *dc)
{
const u8 *msg;
size_t n = tal_count(dc->msg_out);
if (n == 0)
return NULL;
msg = dc->msg_out[0];
memmove(dc->msg_out, dc->msg_out + 1, sizeof(dc->msg_in[0]) * (n - 1));
tal_resize(&dc->msg_out, n - 1);
return msg;
}
struct io_plan *daemon_conn_read_next(struct io_conn *conn, struct io_plan *daemon_conn_read_next(struct io_conn *conn,
struct daemon_conn *dc) struct daemon_conn *dc)
{ {
@ -33,7 +13,7 @@ struct io_plan *daemon_conn_read_next(struct io_conn *conn,
struct io_plan *daemon_conn_write_next(struct io_conn *conn, struct io_plan *daemon_conn_write_next(struct io_conn *conn,
struct daemon_conn *dc) struct daemon_conn *dc)
{ {
const u8 *msg = daemon_conn_dequeue(dc); const u8 *msg = msg_dequeue(&dc->out);
if (msg) { if (msg) {
return io_write_wire(conn, take(msg), daemon_conn_write_next, return io_write_wire(conn, take(msg), daemon_conn_write_next,
dc); dc);
@ -60,7 +40,7 @@ void daemon_conn_init(tal_t *ctx, struct daemon_conn *dc, int fd,
dc->ctx = ctx; dc->ctx = ctx;
dc->msg_in = NULL; dc->msg_in = NULL;
dc->msg_out = tal_arr(ctx, u8 *, 0); msg_queue_init(&dc->out, dc->ctx);
dc->conn_fd = fd; dc->conn_fd = fd;
dc->msg_queue_cleared_cb = NULL; dc->msg_queue_cleared_cb = NULL;
io_new_conn(ctx, fd, daemon_conn_start, dc); io_new_conn(ctx, fd, daemon_conn_start, dc);
@ -68,6 +48,6 @@ void daemon_conn_init(tal_t *ctx, struct daemon_conn *dc, int fd,
void daemon_conn_send(struct daemon_conn *dc, u8 *msg) void daemon_conn_send(struct daemon_conn *dc, u8 *msg)
{ {
daemon_conn_enqueue(dc, msg); msg_enqueue(&dc->out, msg);
io_wake(dc); io_wake(dc);
} }

5
lightningd/connection.h

@ -4,6 +4,7 @@
#include "config.h" #include "config.h"
#include <ccan/io/io.h> #include <ccan/io/io.h>
#include <ccan/short_types/short_types.h> #include <ccan/short_types/short_types.h>
#include <lightningd/msg_queue.h>
struct daemon_conn { struct daemon_conn {
/* Context to tallocate all things from, possibly the /* Context to tallocate all things from, possibly the
@ -13,8 +14,8 @@ struct daemon_conn {
/* Last message we received */ /* Last message we received */
u8 *msg_in; u8 *msg_in;
/* Array of queued outgoing messages */ /* Queue of outgoing messages */
u8 **msg_out; struct msg_queue out;
int conn_fd; int conn_fd;
struct io_conn *conn; struct io_conn *conn;

3
lightningd/msg_queue.c

@ -3,13 +3,14 @@
void msg_queue_init(struct msg_queue *q, const tal_t *ctx) void msg_queue_init(struct msg_queue *q, const tal_t *ctx)
{ {
q->q = tal_arr(ctx, const u8 *, 0); q->q = tal_arr(ctx, const u8 *, 0);
q->ctx = ctx;
} }
void msg_enqueue(struct msg_queue *q, const u8 *add) void msg_enqueue(struct msg_queue *q, const u8 *add)
{ {
size_t n = tal_count(q->q); size_t n = tal_count(q->q);
tal_resize(&q->q, n+1); tal_resize(&q->q, n+1);
q->q[n] = add; q->q[n] = tal_dup_arr(q->ctx, u8, add, tal_len(add), 0);
/* In case someone is waiting */ /* In case someone is waiting */
io_wake(q); io_wake(q);

1
lightningd/msg_queue.h

@ -7,6 +7,7 @@
struct msg_queue { struct msg_queue {
const u8 **q; const u8 **q;
const tal_t *ctx;
}; };
void msg_queue_init(struct msg_queue *q, const tal_t *ctx); void msg_queue_init(struct msg_queue *q, const tal_t *ctx);

4
lightningd/subd.c

@ -373,7 +373,7 @@ void subd_send_msg(struct subd *sd, const u8 *msg_out)
assert(fromwire_peektype(msg_out) != STATUS_TRACE); assert(fromwire_peektype(msg_out) != STATUS_TRACE);
if (!taken(msg_out)) if (!taken(msg_out))
msg_out = tal_dup_arr(sd, u8, msg_out, tal_len(msg_out), 0); msg_out = tal_dup_arr(sd, u8, msg_out, tal_len(msg_out), 0);
msg_enqueue(&sd->outq, msg_out); msg_enqueue(&sd->outq, take(msg_out));
} }
void subd_send_fd(struct subd *sd, int fd) void subd_send_fd(struct subd *sd, int fd)
@ -382,7 +382,7 @@ void subd_send_fd(struct subd *sd, int fd)
u8 *fdmsg = tal_arr(sd, u8, 0); u8 *fdmsg = tal_arr(sd, u8, 0);
towire_u16(&fdmsg, STATUS_TRACE); towire_u16(&fdmsg, STATUS_TRACE);
towire_u32(&fdmsg, fd); towire_u32(&fdmsg, fd);
msg_enqueue(&sd->outq, fdmsg); msg_enqueue(&sd->outq, take(fdmsg));
} }
void subd_req_(struct subd *sd, void subd_req_(struct subd *sd,

Loading…
Cancel
Save