Browse Source

daemon_conn: make it a tal object, typesafe callbacks.

It means an extra allocation at startup, but it means we can hide the definition,
and use standard patterns (new_daemon_conn and typesafe callbacks).

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
plugin-1
Rusty Russell 6 years ago
parent
commit
3c97f3954e
  1. 76
      common/daemon_conn.c
  2. 59
      common/daemon_conn.h
  3. 63
      connectd/connectd.c
  4. 237
      gossipd/gossipd.c
  5. 4
      hsmd/hsmd.c

76
common/daemon_conn.c

@ -5,12 +5,40 @@
#include <wire/wire_io.h>
#include <wire/wire_sync.h>
struct daemon_conn {
/* Last message we received */
u8 *msg_in;
/* Queue of outgoing messages */
struct msg_queue out;
/* Underlying connection: we're freed if it closes, and vice versa */
struct io_conn *conn;
/* Callback for incoming messages */
struct io_plan *(*recv)(struct io_conn *conn, const u8 *, void *);
/* Called whenever we've cleared the msg_out queue. If it returns
* true, it has added packets to msg_out queue. */
bool (*outq_empty)(void *);
/* Arg for both callbacks. */
void *arg;
};
static struct io_plan *handle_read(struct io_conn *conn,
struct daemon_conn *dc)
{
return dc->recv(conn, dc->msg_in, dc->arg);
}
struct io_plan *daemon_conn_read_next(struct io_conn *conn,
struct daemon_conn *dc)
{
dc->msg_in = tal_free(dc->msg_in);
return io_read_wire(conn, dc->ctx, &dc->msg_in, dc->daemon_conn_recv,
dc);
/* FIXME: We could use disposable parent instead, and recv() could
* tal_steal() it? If they did that now, we'd free it here. */
tal_free(dc->msg_in);
return io_read_wire(conn, dc, &dc->msg_in, handle_read, dc);
}
static struct io_plan *daemon_conn_write_next(struct io_conn *conn,
@ -29,8 +57,8 @@ again:
}
return io_write_wire(conn, take(msg), daemon_conn_write_next,
dc);
} else if (dc->msg_queue_cleared_cb) {
if (dc->msg_queue_cleared_cb(conn, dc))
} else if (dc->outq_empty) {
if (dc->outq_empty(dc->arg))
goto again;
}
return msg_queue_wait(conn, &dc->out, daemon_conn_write_next, dc);
@ -70,26 +98,36 @@ static struct io_plan *daemon_conn_start(struct io_conn *conn,
struct daemon_conn *dc)
{
return io_duplex(conn, daemon_conn_read_next(conn, dc),
daemon_conn_write_next(conn, dc));
/* Could call daemon_conn_write_next, but we don't
* want it to call empty_cb just yet! */
msg_queue_wait(conn, dc->out,
daemon_conn_write_next, dc));
}
void daemon_conn_init(tal_t *ctx, struct daemon_conn *dc, int fd,
struct io_plan *(*daemon_conn_recv)(struct io_conn *,
struct daemon_conn *))
static void destroy_dc_from_conn(struct io_conn *conn, struct daemon_conn *dc)
{
dc->daemon_conn_recv = daemon_conn_recv;
dc->ctx = ctx;
dc->msg_in = NULL;
msg_queue_init(&dc->out, dc->ctx);
dc->msg_queue_cleared_cb = NULL;
dc->conn = io_new_conn(ctx, fd, daemon_conn_start, dc);
/* Harmless free loop if conn is being destroyed because dc freed */
tal_free(dc);
}
void daemon_conn_clear(struct daemon_conn *dc)
struct daemon_conn *daemon_conn_new_(const tal_t *ctx, int fd,
struct io_plan *(*recv)(struct io_conn *,
const u8 *,
void *),
bool (*outq_empty)(void *),
void *arg)
{
io_set_finish(dc->conn, NULL, NULL);
io_close(dc->conn);
struct daemon_conn *dc = tal(NULL, struct daemon_conn);
dc->recv = recv;
dc->outq_empty = outq_empty;
dc->arg = arg;
dc->msg_in = NULL;
msg_queue_init(&dc->out, dc);
dc->conn = io_new_conn(dc, fd, daemon_conn_start, dc);
tal_add_destructor2(dc->conn, destroy_dc_from_conn, dc);
return dc;
}
void daemon_conn_send(struct daemon_conn *dc, const u8 *msg)

59
common/daemon_conn.h

@ -6,49 +6,30 @@
#include <ccan/short_types/short_types.h>
#include <common/msg_queue.h>
struct daemon_conn {
/* Context to tallocate all things from, possibly the
* container of this connection. */
tal_t *ctx;
/* Last message we received */
u8 *msg_in;
/* Queue of outgoing messages */
struct msg_queue out;
/* Underlying connection */
struct io_conn *conn;
/* Callback for incoming messages */
struct io_plan *(*daemon_conn_recv)(struct io_conn *conn,
struct daemon_conn *);
/* Called whenever we've cleared the msg_out queue. If it returns
* true, it has added packets to msg_out queue. */
bool (*msg_queue_cleared_cb)(struct io_conn *, struct daemon_conn *);
};
/**
* daemon_conn_init - Initialize a new daemon connection
* daemon_conn_new - Allocate a new daemon connection
*
* @ctx: context to allocate from
* @dc: daemon_conn to initialize
* @ctx: context to allocate the daemon_conn's conn from
* @fd: socket file descriptor to wrap
* @daemon_conn_recv: callback function to be called upon receiving a message
*/
void daemon_conn_init(tal_t *ctx, struct daemon_conn *dc, int fd,
struct io_plan *(*daemon_conn_recv)(
struct io_conn *, struct daemon_conn *));
/**
* daemon_conn_clear - discard a daemon conn without triggering finish.
* @dc: the daemon_conn to clean up.
*
* This is used by gossipd when a peer is handed back, and we no longer
* want to deal with it via daemon_conn. @dc must not be used after this!
* @recv: callback function to be called upon receiving a message
* @outq_empty: callback function to be called when queue is empty: returns
* true if it added something to the queue. Can be NULL.
*/
void daemon_conn_clear(struct daemon_conn *dc);
#define daemon_conn_new(ctx, fd, recv, outq_empty, arg) \
daemon_conn_new_((ctx), (fd), \
typesafe_cb_preargs(struct io_plan *, void *, \
(recv), (arg), \
struct io_conn *, \
const u8 *), \
typesafe_cb(bool, void *, (outq_empty), (arg)), \
arg)
struct daemon_conn *daemon_conn_new_(const tal_t *ctx, int fd,
struct io_plan *(*recv)(struct io_conn *,
const u8 *,
void *),
bool (*outq_empty)(void *),
void *arg);
/**
* daemon_conn_send - Enqueue an outgoing message to be sent

63
connectd/connectd.c

@ -121,7 +121,7 @@ struct daemon {
struct list_head connecting;
/* Connection to main daemon. */
struct daemon_conn master;
struct daemon_conn *master;
/* Local and global features to offer to peers. */
u8 *localfeatures, *globalfeatures;
@ -372,7 +372,7 @@ static struct io_plan *peer_reconnected(struct io_conn *conn,
/* Tell master to kill it: will send peer_disconnect */
msg = towire_connect_reconnected(NULL, id);
daemon_conn_send(&daemon->master, take(msg));
daemon_conn_send(daemon->master, take(msg));
/* Save arguments for next time. */
r = tal(daemon, struct peer_reconnected);
@ -425,10 +425,10 @@ struct io_plan *peer_connected(struct io_conn *conn,
/*~ daemon_conn is a message queue for inter-daemon communication: we
* queue up the `connect_peer_connected` message to tell lightningd
* we have connected, and give the the peer and gossip fds. */
daemon_conn_send(&daemon->master, peer_connected_msg);
daemon_conn_send(daemon->master, peer_connected_msg);
/* io_conn_fd() extracts the fd from ccan/io's io_conn */
daemon_conn_send_fd(&daemon->master, io_conn_fd(conn));
daemon_conn_send_fd(&daemon->master, gossip_fd);
daemon_conn_send_fd(daemon->master, io_conn_fd(conn));
daemon_conn_send_fd(daemon->master, gossip_fd);
/*~ Finally, we add it to the set of pubkeys: tal_dup will handle
* take() args for us, by simply tal_steal()ing it. */
@ -550,7 +550,7 @@ static void PRINTF_FMT(5,6)
* asking. */
msg = towire_connectctl_connect_failed(NULL, id, err, wait_seconds,
addrhint);
daemon_conn_send(&daemon->master, take(msg));
daemon_conn_send(daemon->master, take(msg));
status_trace("Failed connected out for %s: %s",
type_to_string(tmpctx, struct pubkey, id),
@ -1080,9 +1080,9 @@ static struct wireaddr_internal *setup_listeners(const tal_t *ctx,
/*~ Parse the incoming connect init message from lightningd ("master") and
* assign config variables to the daemon; it should be the first message we
* get. */
static struct io_plan *connect_init(struct daemon_conn *master,
struct daemon *daemon,
const u8 *msg)
static struct io_plan *connect_init(struct io_conn *conn,
struct daemon *daemon,
const u8 *msg)
{
struct wireaddr *proxyaddr;
struct wireaddr_internal *binding;
@ -1127,19 +1127,19 @@ static struct io_plan *connect_init(struct daemon_conn *master,
&announcable);
/* Tell it we're ready, handing it the addresses we have. */
daemon_conn_send(&daemon->master,
daemon_conn_send(daemon->master,
take(towire_connectctl_init_reply(NULL,
binding,
announcable)));
/* Read the next message. */
return daemon_conn_read_next(master->conn, master);
return daemon_conn_read_next(conn, daemon->master);
}
/*~ lightningd tells us to go! */
static struct io_plan *connect_activate(struct daemon_conn *master,
struct daemon *daemon,
const u8 *msg)
static struct io_plan *connect_activate(struct io_conn *conn,
struct daemon *daemon,
const u8 *msg)
{
bool do_listen;
@ -1166,9 +1166,9 @@ static struct io_plan *connect_activate(struct daemon_conn *master,
daemon->listen_fds = tal_free(daemon->listen_fds);
/* OK, we're ready! */
daemon_conn_send(&daemon->master,
daemon_conn_send(daemon->master,
take(towire_connectctl_activate_reply(NULL)));
return daemon_conn_read_next(master->conn, master);
return daemon_conn_read_next(conn, daemon->master);
}
/*~ This is where we'd put a BOLT #10 reference, but it doesn't exist :( */
@ -1334,7 +1334,7 @@ static struct io_plan *connect_to_peer(struct io_conn *conn,
master_badmsg(WIRE_CONNECTCTL_CONNECT_TO_PEER, msg);
try_connect_peer(daemon, &id, seconds_waited, addrhint);
return daemon_conn_read_next(conn, &daemon->master);
return daemon_conn_read_next(conn, daemon->master);
}
/* lightningd tells us a peer has disconnected. */
@ -1362,28 +1362,29 @@ static struct io_plan *peer_disconnected(struct io_conn *conn,
tal_free(key);
/* Read the next message from lightningd. */
return daemon_conn_read_next(conn, &daemon->master);
return daemon_conn_read_next(conn, daemon->master);
}
static struct io_plan *recv_req(struct io_conn *conn, struct daemon_conn *master)
static struct io_plan *recv_req(struct io_conn *conn,
const u8 *msg,
struct daemon *daemon)
{
struct daemon *daemon = container_of(master, struct daemon, master);
enum connect_wire_type t = fromwire_peektype(master->msg_in);
enum connect_wire_type t = fromwire_peektype(msg);
/* Demux requests from lightningd: we expect INIT then ACTIVATE, then
* connect requests and disconnected messages. */
switch (t) {
case WIRE_CONNECTCTL_INIT:
return connect_init(master, daemon, master->msg_in);
return connect_init(conn, daemon, msg);
case WIRE_CONNECTCTL_ACTIVATE:
return connect_activate(master, daemon, master->msg_in);
return connect_activate(conn, daemon, msg);
case WIRE_CONNECTCTL_CONNECT_TO_PEER:
return connect_to_peer(conn, daemon, master->msg_in);
return connect_to_peer(conn, daemon, msg);
case WIRE_CONNECTCTL_PEER_DISCONNECTED:
return peer_disconnected(conn, daemon, master->msg_in);
return peer_disconnected(conn, daemon, msg);
/* We send these, we don't receive them */
case WIRE_CONNECTCTL_INIT_REPLY:
@ -1396,7 +1397,7 @@ static struct io_plan *recv_req(struct io_conn *conn, struct daemon_conn *master
/* Master shouldn't give bad requests. */
status_failed(STATUS_FAIL_MASTER_IO, "%i: %s",
t, tal_hex(tmpctx, master->msg_in));
t, tal_hex(tmpctx, msg));
}
/*~ Helper for handshake.c: we ask `hsmd` to do the ECDH to get the shared
@ -1424,8 +1425,7 @@ bool hsm_do_ecdh(struct secret *ss, const struct pubkey *point)
*
* The C++ method of omitting unused parameter names is *much* neater, and I
* hope we'll eventually see it in a C standard. */
static void master_gone(struct io_conn *unused UNUSED,
struct daemon *daemon UNUSED)
static void master_gone(struct daemon_conn *master UNUSED)
{
/* Can't tell master, it's gone. */
exit(2);
@ -1446,12 +1446,13 @@ int main(int argc, char *argv[])
list_head_init(&daemon->connecting);
daemon->listen_fds = tal_arr(daemon, struct listen_fd, 0);
/* stdin == control */
daemon_conn_init(daemon, &daemon->master, STDIN_FILENO, recv_req);
io_set_finish(daemon->master.conn, master_gone, daemon);
daemon->master = daemon_conn_new(daemon, STDIN_FILENO, recv_req, NULL,
daemon);
tal_add_destructor(daemon->master, master_gone);
/* This tells the status_* subsystem to use this connection to send
* our status_ and failed messages. */
status_setup_async(&daemon->master);
status_setup_async(daemon->master);
/* Should never exit. */
io_loop(NULL, NULL);

237
gossipd/gossipd.c

@ -70,10 +70,10 @@ struct daemon {
struct list_head peers;
/* Connection to main daemon. */
struct daemon_conn master;
struct daemon_conn *master;
/* Connection to connect daemon. */
struct daemon_conn connectd;
struct daemon_conn *connectd;
/* Routing information */
struct routing_state *rstate;
@ -131,8 +131,7 @@ struct peer {
u32 first_channel_range;
struct short_channel_id *query_channel_scids;
/* FIXME: Doesn't need to be a pointer. */
struct daemon_conn *remote;
struct daemon_conn *dc;
};
static void peer_disable_channels(struct daemon *daemon, struct node *node)
@ -157,7 +156,7 @@ static void destroy_peer(struct peer *peer)
/* In case we've been manually freed, close conn (our parent: if
* it is freed, this will be a noop). */
io_close(peer->remote->conn);
tal_free(peer->dc);
}
static struct peer *find_peer(struct daemon *daemon, const struct pubkey *id)
@ -237,7 +236,7 @@ static void queue_peer_msg(struct peer *peer, const u8 *msg TAKES)
const u8 *send = towire_gossip_send_gossip(NULL, msg);
if (taken(msg))
tal_free(msg);
daemon_conn_send(peer->remote, take(send));
daemon_conn_send(peer->dc, take(send));
}
static void wake_gossip_out(struct peer *peer)
@ -246,7 +245,7 @@ static void wake_gossip_out(struct peer *peer)
peer->gossip_timer = tal_free(peer->gossip_timer);
/* Notify the daemon_conn-write loop */
daemon_conn_wake(peer->remote);
daemon_conn_wake(peer->dc);
}
static void peer_error(struct peer *peer, const char *fmt, ...)
@ -279,7 +278,7 @@ static void setup_gossip_range(struct peer *peer)
queue_peer_msg(peer, take(msg));
}
static bool nonlocal_dump_gossip(struct io_conn *conn, struct daemon_conn *dc);
static bool dump_gossip(struct peer *peer);
/* Create a node_announcement with the given signature. It may be NULL
* in the case we need to create a provisional announcement for the
@ -415,7 +414,7 @@ static u8 *handle_gossip_msg(struct daemon *daemon, const u8 *msg,
if (err)
return err;
else if (scid)
daemon_conn_send(&daemon->master,
daemon_conn_send(daemon->master,
take(towire_gossip_get_txout(NULL,
scid)));
break;
@ -440,7 +439,7 @@ static u8 *handle_gossip_msg(struct daemon *daemon, const u8 *msg,
return NULL;
}
static void handle_query_short_channel_ids(struct peer *peer, u8 *msg)
static void handle_query_short_channel_ids(struct peer *peer, const u8 *msg)
{
struct routing_state *rstate = peer->daemon->rstate;
struct bitcoin_blkid chain;
@ -491,10 +490,10 @@ static void handle_query_short_channel_ids(struct peer *peer, u8 *msg)
peer->scid_query_nodes = tal_arr(peer, struct pubkey, 0);
/* Notify the daemon_conn-write loop */
daemon_conn_wake(peer->remote);
daemon_conn_wake(peer->dc);
}
static void handle_gossip_timestamp_filter(struct peer *peer, u8 *msg)
static void handle_gossip_timestamp_filter(struct peer *peer, const u8 *msg)
{
struct bitcoin_blkid chain_hash;
u32 first_timestamp, timestamp_range;
@ -615,7 +614,7 @@ static void queue_channel_ranges(struct peer *peer,
number_of_blocks - number_of_blocks / 2);
}
static void handle_query_channel_range(struct peer *peer, u8 *msg)
static void handle_query_channel_range(struct peer *peer, const u8 *msg)
{
struct bitcoin_blkid chain_hash;
u32 first_blocknum, number_of_blocks;
@ -643,7 +642,7 @@ static void handle_query_channel_range(struct peer *peer, u8 *msg)
queue_channel_ranges(peer, first_blocknum, number_of_blocks);
}
static void handle_ping(struct peer *peer, u8 *ping)
static void handle_ping(struct peer *peer, const u8 *ping)
{
u8 *pong;
@ -665,12 +664,12 @@ static void handle_pong(struct peer *peer, const u8 *pong)
return;
}
daemon_conn_send(&peer->daemon->master,
daemon_conn_send(peer->daemon->master,
take(towire_gossip_ping_reply(NULL, &peer->id, true,
tal_count(pong))));
}
static void handle_reply_short_channel_ids_end(struct peer *peer, u8 *msg)
static void handle_reply_short_channel_ids_end(struct peer *peer, const u8 *msg)
{
struct bitcoin_blkid chain;
u8 complete;
@ -695,10 +694,10 @@ static void handle_reply_short_channel_ids_end(struct peer *peer, u8 *msg)
peer->num_scid_queries_outstanding--;
msg = towire_gossip_scids_reply(msg, true, complete);
daemon_conn_send(&peer->daemon->master, take(msg));
daemon_conn_send(peer->daemon->master, take(msg));
}
static void handle_reply_channel_range(struct peer *peer, u8 *msg)
static void handle_reply_channel_range(struct peer *peer, const u8 *msg)
{
struct bitcoin_blkid chain;
u8 complete;
@ -784,7 +783,7 @@ static void handle_reply_channel_range(struct peer *peer, u8 *msg)
number_of_blocks,
complete,
peer->query_channel_scids);
daemon_conn_send(&peer->daemon->master, take(msg));
daemon_conn_send(peer->daemon->master, take(msg));
peer->query_channel_scids = tal_free(peer->query_channel_scids);
peer->query_channel_blocks = tal_free(peer->query_channel_blocks);
}
@ -1096,7 +1095,7 @@ out:
update ? "got" : "no");
msg = towire_gossip_get_update_reply(NULL, update);
daemon_conn_send(peer->remote, take(msg));
daemon_conn_send(peer->dc, take(msg));
}
/* Return true if the information has changed. */
@ -1189,38 +1188,38 @@ static void handle_local_channel_update(struct peer *peer, const u8 *msg)
* message
*/
static struct io_plan *owner_msg_in(struct io_conn *conn,
struct daemon_conn *dc)
const u8 *msg,
struct peer *peer)
{
struct peer *peer = dc->ctx;
u8 *msg = dc->msg_in, *err;
u8 *err;
int type = fromwire_peektype(msg);
if (type == WIRE_CHANNEL_ANNOUNCEMENT || type == WIRE_CHANNEL_UPDATE ||
type == WIRE_NODE_ANNOUNCEMENT) {
err = handle_gossip_msg(peer->daemon, dc->msg_in, "subdaemon");
err = handle_gossip_msg(peer->daemon, msg, "subdaemon");
if (err)
queue_peer_msg(peer, take(err));
} else if (type == WIRE_QUERY_SHORT_CHANNEL_IDS) {
handle_query_short_channel_ids(peer, dc->msg_in);
handle_query_short_channel_ids(peer, msg);
} else if (type == WIRE_REPLY_SHORT_CHANNEL_IDS_END) {
handle_reply_short_channel_ids_end(peer, dc->msg_in);
handle_reply_short_channel_ids_end(peer, msg);
} else if (type == WIRE_GOSSIP_TIMESTAMP_FILTER) {
handle_gossip_timestamp_filter(peer, dc->msg_in);
handle_gossip_timestamp_filter(peer, msg);
} else if (type == WIRE_GOSSIP_GET_UPDATE) {
handle_get_update(peer, dc->msg_in);
handle_get_update(peer, msg);
} else if (type == WIRE_GOSSIP_LOCAL_ADD_CHANNEL) {
gossip_store_add(peer->daemon->rstate->store, dc->msg_in);
handle_local_add_channel(peer->daemon->rstate, dc->msg_in);
gossip_store_add(peer->daemon->rstate->store, msg);
handle_local_add_channel(peer->daemon->rstate, msg);
} else if (type == WIRE_GOSSIP_LOCAL_CHANNEL_UPDATE) {
handle_local_channel_update(peer, dc->msg_in);
handle_local_channel_update(peer, msg);
} else if (type == WIRE_QUERY_CHANNEL_RANGE) {
handle_query_channel_range(peer, dc->msg_in);
handle_query_channel_range(peer, msg);
} else if (type == WIRE_REPLY_CHANNEL_RANGE) {
handle_reply_channel_range(peer, dc->msg_in);
handle_reply_channel_range(peer, msg);
} else if (type == WIRE_PING) {
handle_ping(peer, dc->msg_in);
handle_ping(peer, msg);
} else if (type == WIRE_PONG) {
handle_pong(peer, dc->msg_in);
handle_pong(peer, msg);
} else {
status_broken("peer %s: send us unknown msg of type %s",
type_to_string(tmpctx, struct pubkey, &peer->id),
@ -1228,7 +1227,7 @@ static struct io_plan *owner_msg_in(struct io_conn *conn,
return io_close(conn);
}
return daemon_conn_read_next(conn, dc);
return daemon_conn_read_next(conn, peer->dc);
}
static struct io_plan *connectd_new_peer(struct io_conn *conn,
@ -1249,7 +1248,7 @@ static struct io_plan *connectd_new_peer(struct io_conn *conn,
if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) {
status_broken("Failed to create socketpair: %s",
strerror(errno));
daemon_conn_send(&daemon->connectd,
daemon_conn_send(daemon->connectd,
take(towire_gossip_new_peer_reply(NULL, false)));
goto done;
}
@ -1258,12 +1257,6 @@ static struct io_plan *connectd_new_peer(struct io_conn *conn,
tal_free(find_peer(daemon, &peer->id));
peer->daemon = daemon;
peer->remote = tal(peer, struct daemon_conn);
daemon_conn_init(peer, peer->remote, fds[0], owner_msg_in);
/* Free peer if conn closed. */
tal_steal(peer->remote->conn, peer);
peer->remote->msg_queue_cleared_cb = nonlocal_dump_gossip;
peer->scid_queries = NULL;
peer->scid_query_idx = 0;
peer->scid_query_nodes = NULL;
@ -1307,29 +1300,30 @@ static struct io_plan *connectd_new_peer(struct io_conn *conn,
= peer->daemon->rstate->broadcasts->next_index;
}
/* Start the gossip flowing. */
wake_gossip_out(peer);
peer->dc = daemon_conn_new(daemon, fds[0],
owner_msg_in, dump_gossip, peer);
/* Free peer if conn closed (destroy_peer closes conn if peer freed) */
tal_steal(peer->dc, peer);
setup_gossip_range(peer);
/* Start the gossip flowing. */
wake_gossip_out(peer);
/* Reply with success, and the new fd */
daemon_conn_send(&daemon->connectd,
daemon_conn_send(daemon->connectd,
take(towire_gossip_new_peer_reply(NULL, true)));
daemon_conn_send_fd(&daemon->connectd, fds[1]);
daemon_conn_send_fd(daemon->connectd, fds[1]);
done:
return daemon_conn_read_next(conn, &daemon->connectd);
return daemon_conn_read_next(conn, daemon->connectd);
}
/**
* nonlocal_dump_gossip - catch the nonlocal peer up with the latest gossip.
*
* Registered as `msg_queue_cleared_cb` by the `peer->remote`.
* dump_gossip - catch the peer up with the latest gossip.
*/
static bool nonlocal_dump_gossip(struct io_conn *conn, struct daemon_conn *dc)
static bool dump_gossip(struct peer *peer)
{
struct peer *peer = dc->ctx;
/* Do we have scid query replies to send? */
if (create_next_scid_reply(peer))
return true;
@ -1339,7 +1333,7 @@ static bool nonlocal_dump_gossip(struct io_conn *conn, struct daemon_conn *dc)
}
static struct io_plan *getroute_req(struct io_conn *conn, struct daemon *daemon,
u8 *msg)
const u8 *msg)
{
struct pubkey source, destination;
u64 msatoshi;
@ -1365,8 +1359,8 @@ static struct io_plan *getroute_req(struct io_conn *conn, struct daemon *daemon,
fuzz, &seed);
out = towire_gossip_getroute_reply(msg, hops);
daemon_conn_send(&daemon->master, out);
return daemon_conn_read_next(conn, &daemon->master);
daemon_conn_send(daemon->master, out);
return daemon_conn_read_next(conn, daemon->master);
}
#define raw_pubkey(arr, id) \
@ -1407,8 +1401,9 @@ static void append_channel(struct gossip_getchannels_entry **entries,
append_half_channel(entries, chan, 1);
}
static struct io_plan *getchannels_req(struct io_conn *conn, struct daemon *daemon,
u8 *msg)
static struct io_plan *getchannels_req(struct io_conn *conn,
struct daemon *daemon,
const u8 *msg)
{
u8 *out;
struct gossip_getchannels_entry *entries;
@ -1434,8 +1429,8 @@ static struct io_plan *getchannels_req(struct io_conn *conn, struct daemon *daem
}
out = towire_gossip_getchannels_reply(NULL, entries);
daemon_conn_send(&daemon->master, take(out));
return daemon_conn_read_next(conn, &daemon->master);
daemon_conn_send(daemon->master, take(out));
return daemon_conn_read_next(conn, daemon->master);
}
/* We keep pointers into n, assuming it won't change! */
@ -1484,8 +1479,8 @@ static struct io_plan *getnodes(struct io_conn *conn, struct daemon *daemon,
}
}
out = towire_gossip_getnodes_reply(NULL, nodes);
daemon_conn_send(&daemon->master, take(out));
return daemon_conn_read_next(conn, &daemon->master);
daemon_conn_send(daemon->master, take(out));
return daemon_conn_read_next(conn, daemon->master);
}
static struct io_plan *ping_req(struct io_conn *conn, struct daemon *daemon,
@ -1501,7 +1496,7 @@ static struct io_plan *ping_req(struct io_conn *conn, struct daemon *daemon,
peer = find_peer(daemon, &id);
if (!peer) {
daemon_conn_send(&daemon->master,
daemon_conn_send(daemon->master,
take(towire_gossip_ping_reply(NULL, &id,
false, 0)));
goto out;
@ -1526,14 +1521,14 @@ static struct io_plan *ping_req(struct io_conn *conn, struct daemon *daemon,
* - MUST ignore the `ping`.
*/
if (num_pong_bytes >= 65532)
daemon_conn_send(&daemon->master,
daemon_conn_send(daemon->master,
take(towire_gossip_ping_reply(NULL, &id,
true, 0)));
else
peer->num_pings_outstanding++;
out:
return daemon_conn_read_next(conn, &daemon->master);
return daemon_conn_read_next(conn, daemon->master);
}
static struct io_plan *get_incoming_channels(struct io_conn *conn,
@ -1572,9 +1567,9 @@ static struct io_plan *get_incoming_channels(struct io_conn *conn,
}
msg = towire_gossip_get_incoming_channels_reply(NULL, r);
daemon_conn_send(&daemon->master, take(msg));
daemon_conn_send(daemon->master, take(msg));
return daemon_conn_read_next(conn, &daemon->master);
return daemon_conn_read_next(conn, daemon->master);
}
#if DEVELOPER
@ -1630,10 +1625,10 @@ static struct io_plan *query_scids_req(struct io_conn *conn,
status_trace("sending query for %zu scids", tal_count(scids));
out:
return daemon_conn_read_next(conn, &daemon->master);
return daemon_conn_read_next(conn, daemon->master);
fail:
daemon_conn_send(&daemon->master,
daemon_conn_send(daemon->master,
take(towire_gossip_scids_reply(NULL, false, false)));
goto out;
}
@ -1666,7 +1661,7 @@ static struct io_plan *send_timestamp_filter(struct io_conn *conn,
first, range);
queue_peer_msg(peer, take(msg));
out:
return daemon_conn_read_next(conn, &daemon->master);
return daemon_conn_read_next(conn, daemon->master);
}
static struct io_plan *query_channel_range(struct io_conn *conn,
@ -1710,10 +1705,10 @@ static struct io_plan *query_channel_range(struct io_conn *conn,
peer->query_channel_scids = tal_arr(peer, struct short_channel_id, 0);
out:
return daemon_conn_read_next(conn, &daemon->master);
return daemon_conn_read_next(conn, daemon->master);
fail:
daemon_conn_send(&daemon->master,
daemon_conn_send(daemon->master,
take(towire_gossip_query_channel_range_reply(NULL,
0, 0,
false,
@ -1730,7 +1725,7 @@ static struct io_plan *dev_set_max_scids_encode_size(struct io_conn *conn,
master_badmsg(WIRE_GOSSIP_DEV_SET_MAX_SCIDS_ENCODE_SIZE, msg);
status_trace("Set max_scids_encode_bytes to %u", max_scids_encode_bytes);
return daemon_conn_read_next(conn, &daemon->master);
return daemon_conn_read_next(conn, daemon->master);
}
static struct io_plan *dev_gossip_suppress(struct io_conn *conn,
@ -1742,7 +1737,7 @@ static struct io_plan *dev_gossip_suppress(struct io_conn *conn,
status_unusual("Suppressing all gossip");
suppress_gossip = true;
return daemon_conn_read_next(conn, &daemon->master);
return daemon_conn_read_next(conn, daemon->master);
}
#endif /* DEVELOPER */
@ -1826,7 +1821,7 @@ static void gossip_disable_local_channels(struct daemon *daemon)
/* Parse an incoming gossip init message and assign config variables
* to the daemon.
*/
static struct io_plan *gossip_init(struct daemon_conn *master,
static struct io_plan *gossip_init(struct io_conn *conn,
struct daemon *daemon,
const u8 *msg)
{
@ -1859,7 +1854,7 @@ static struct io_plan *gossip_init(struct daemon_conn *master,
time_from_sec(daemon->rstate->prune_timeout/4),
gossip_refresh_network, daemon);
return daemon_conn_read_next(master->conn, master);
return daemon_conn_read_next(conn, daemon->master);
}
static struct io_plan *get_channel_peer(struct io_conn *conn,
@ -1886,9 +1881,9 @@ static struct io_plan *get_channel_peer(struct io_conn *conn,
&scid));
key = NULL;
}
daemon_conn_send(&daemon->master,
daemon_conn_send(daemon->master,
take(towire_gossip_get_channel_peer_reply(NULL, key)));
return daemon_conn_read_next(conn, &daemon->master);
return daemon_conn_read_next(conn, daemon->master);
}
static struct io_plan *handle_txout_reply(struct io_conn *conn,
@ -1904,7 +1899,7 @@ static struct io_plan *handle_txout_reply(struct io_conn *conn,
handle_pending_cannouncement(daemon->rstate, &scid, satoshis, outscript);
maybe_send_own_node_announce(daemon);
return daemon_conn_read_next(conn, &daemon->master);
return daemon_conn_read_next(conn, daemon->master);
}
static struct io_plan *handle_routing_failure(struct io_conn *conn,
@ -1930,7 +1925,7 @@ static struct io_plan *handle_routing_failure(struct io_conn *conn,
(enum onion_type) failcode,
channel_update);
return daemon_conn_read_next(conn, &daemon->master);
return daemon_conn_read_next(conn, daemon->master);
}
static struct io_plan *
handle_mark_channel_unroutable(struct io_conn *conn,
@ -1944,7 +1939,7 @@ handle_mark_channel_unroutable(struct io_conn *conn,
mark_channel_unroutable(daemon->rstate, &channel);
return daemon_conn_read_next(conn, &daemon->master);
return daemon_conn_read_next(conn, daemon->master);
}
static struct io_plan *handle_outpoint_spent(struct io_conn *conn,
@ -1970,7 +1965,7 @@ static struct io_plan *handle_outpoint_spent(struct io_conn *conn,
gossip_store_add_channel_delete(rstate->store, &scid);
}
return daemon_conn_read_next(conn, &daemon->master);
return daemon_conn_read_next(conn, daemon->master);
}
/**
@ -1995,68 +1990,66 @@ static struct io_plan *handle_local_channel_close(struct io_conn *conn,
chan = get_channel(rstate, &scid);
if (chan)
chan->local_disabled = true;
return daemon_conn_read_next(conn, &daemon->master);
return daemon_conn_read_next(conn, daemon->master);
}
static struct io_plan *recv_req(struct io_conn *conn, struct daemon_conn *master)
static struct io_plan *recv_req(struct io_conn *conn,
const u8 *msg,
struct daemon *daemon)
{
struct daemon *daemon = container_of(master, struct daemon, master);
enum gossip_wire_type t = fromwire_peektype(master->msg_in);
enum gossip_wire_type t = fromwire_peektype(msg);
switch (t) {
case WIRE_GOSSIPCTL_INIT:
return gossip_init(master, daemon, master->msg_in);
return gossip_init(conn, daemon, msg);
case WIRE_GOSSIP_GETNODES_REQUEST:
return getnodes(conn, daemon, daemon->master.msg_in);
return getnodes(conn, daemon, msg);
case WIRE_GOSSIP_GETROUTE_REQUEST:
return getroute_req(conn, daemon, daemon->master.msg_in);
return getroute_req(conn, daemon, msg);
case WIRE_GOSSIP_GETCHANNELS_REQUEST:
return getchannels_req(conn, daemon, daemon->master.msg_in);
return getchannels_req(conn, daemon, msg);
case WIRE_GOSSIP_GET_CHANNEL_PEER:
return get_channel_peer(conn, daemon, daemon->master.msg_in);
return get_channel_peer(conn, daemon, msg);
case WIRE_GOSSIP_GET_TXOUT_REPLY:
return handle_txout_reply(conn, daemon, master->msg_in);
return handle_txout_reply(conn, daemon, msg);
case WIRE_GOSSIP_ROUTING_FAILURE:
return handle_routing_failure(conn, daemon, master->msg_in);
return handle_routing_failure(conn, daemon, msg);
case WIRE_GOSSIP_MARK_CHANNEL_UNROUTABLE:
return handle_mark_channel_unroutable(conn, daemon, master->msg_in);
return handle_mark_channel_unroutable(conn, daemon, msg);
case WIRE_GOSSIP_OUTPOINT_SPENT:
return handle_outpoint_spent(conn, daemon, master->msg_in);
return handle_outpoint_spent(conn, daemon, msg);
case WIRE_GOSSIP_LOCAL_CHANNEL_CLOSE:
return handle_local_channel_close(conn, daemon, master->msg_in);
return handle_local_channel_close(conn, daemon, msg);
case WIRE_GOSSIP_PING:
return ping_req(conn, daemon, daemon->master.msg_in);
return ping_req(conn, daemon, msg);
case WIRE_GOSSIP_GET_INCOMING_CHANNELS:
return get_incoming_channels(conn, daemon,
daemon->master.msg_in);
return get_incoming_channels(conn, daemon, msg);
#if DEVELOPER
case WIRE_GOSSIP_QUERY_SCIDS:
return query_scids_req(conn, daemon, daemon->master.msg_in);
return query_scids_req(conn, daemon, msg);
case WIRE_GOSSIP_SEND_TIMESTAMP_FILTER:
return send_timestamp_filter(conn, daemon, daemon->master.msg_in);
return send_timestamp_filter(conn, daemon, msg);
case WIRE_GOSSIP_QUERY_CHANNEL_RANGE:
return query_channel_range(conn, daemon, daemon->master.msg_in);
return query_channel_range(conn, daemon, msg);
case WIRE_GOSSIP_DEV_SET_MAX_SCIDS_ENCODE_SIZE:
return dev_set_max_scids_encode_size(conn, daemon,
daemon->master.msg_in);
return dev_set_max_scids_encode_size(conn, daemon, msg);
case WIRE_GOSSIP_DEV_SUPPRESS:
return dev_gossip_suppress(conn, daemon,
daemon->master.msg_in);
return dev_gossip_suppress(conn, daemon, msg);
#else
case WIRE_GOSSIP_QUERY_SCIDS:
case WIRE_GOSSIP_SEND_TIMESTAMP_FILTER:
@ -2086,7 +2079,7 @@ static struct io_plan *recv_req(struct io_conn *conn, struct daemon_conn *master
/* Master shouldn't give bad requests. */
status_failed(STATUS_FAIL_MASTER_IO, "%i: %s",
t, tal_hex(tmpctx, master->msg_in));
t, tal_hex(tmpctx, msg));
}
static struct io_plan *connectd_get_address(struct io_conn *conn,
@ -2109,23 +2102,23 @@ static struct io_plan *connectd_get_address(struct io_conn *conn,
else
addrs = NULL;
daemon_conn_send(&daemon->connectd,
daemon_conn_send(daemon->connectd,
take(towire_gossip_get_addrs_reply(NULL, addrs)));
return daemon_conn_read_next(conn, &daemon->connectd);
return daemon_conn_read_next(conn, daemon->connectd);
}
static struct io_plan *connectd_req(struct io_conn *conn,
struct daemon_conn *connectd)
const u8 *msg,
struct daemon *daemon)
{
struct daemon *daemon = container_of(connectd, struct daemon, connectd);
enum connect_gossip_wire_type t = fromwire_peektype(connectd->msg_in);
enum connect_gossip_wire_type t = fromwire_peektype(msg);
switch (t) {
case WIRE_GOSSIP_NEW_PEER:
return connectd_new_peer(conn, daemon, connectd->msg_in);
return connectd_new_peer(conn, daemon, msg);
case WIRE_GOSSIP_GET_ADDRS:
return connectd_get_address(conn, daemon, connectd->msg_in);
return connectd_get_address(conn, daemon, msg);
/* We send these, don't receive them. */
case WIRE_GOSSIP_NEW_PEER_REPLY:
@ -2134,12 +2127,12 @@ static struct io_plan *connectd_req(struct io_conn *conn,
}
status_broken("Bad msg from connectd: %s",
tal_hex(tmpctx, connectd->msg_in));
tal_hex(tmpctx, msg));
return io_close(conn);
}
#ifndef TESTING
static void master_gone(struct io_conn *unused UNUSED, struct daemon *daemon UNUSED)
static void master_gone(struct daemon_conn *master UNUSED)
{
/* Can't tell master, it's gone. */
exit(2);
@ -2158,11 +2151,13 @@ int main(int argc, char *argv[])
timers_init(&daemon->timers, time_mono());
/* stdin == control */
daemon_conn_init(daemon, &daemon->master, STDIN_FILENO, recv_req);
io_set_finish(daemon->master.conn, master_gone, daemon);
daemon->master = daemon_conn_new(daemon, STDIN_FILENO,
recv_req, NULL, daemon);
tal_add_destructor(daemon->master, master_gone);
status_setup_async(&daemon->master);
daemon_conn_init(daemon, &daemon->connectd, CONNECTD_FD, connectd_req);
status_setup_async(daemon->master);
daemon->connectd = daemon_conn_new(daemon, CONNECTD_FD,
connectd_req, NULL, daemon);
for (;;) {
struct timer *expired = NULL;

4
hsmd/hsmd.c

@ -1726,9 +1726,7 @@ int main(int argc, char *argv[])
subdaemon_setup(argc, argv);
/* A trivial daemon_conn just for writing. */
status_conn = tal(NULL, struct daemon_conn);
daemon_conn_init(status_conn, status_conn, STDIN_FILENO,
(void *)io_never);
status_conn = daemon_conn_new(NULL, STDIN_FILENO, NULL, NULL, NULL);
status_setup_async(status_conn);
uintmap_init(&clients);

Loading…
Cancel
Save