Browse Source

daemon_conn: don't rely on outq_empty callback telling us to retry queue.

We had at least one bug caused by it not returning true when it had
queued something.  Instead, just re-check thq queue after it's called.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
trytravis
Rusty Russell 6 years ago
parent
commit
07b16e37d0
  1. 18
      common/daemon_conn.c
  2. 4
      common/daemon_conn.h
  3. 23
      gossipd/gossipd.c

18
common/daemon_conn.c

@ -18,9 +18,8 @@ struct daemon_conn {
/* Callback for incoming messages */ /* Callback for incoming messages */
struct io_plan *(*recv)(struct io_conn *conn, const u8 *, void *); struct io_plan *(*recv)(struct io_conn *conn, const u8 *, void *);
/* Called whenever we've cleared the msg_out queue. If it returns /* Called whenever we've cleared the msg_out queue. */
* true, it has added packets to msg_out queue. */ void (*outq_empty)(void *);
bool (*outq_empty)(void *);
/* Arg for both callbacks. */ /* Arg for both callbacks. */
void *arg; void *arg;
@ -46,8 +45,14 @@ static struct io_plan *daemon_conn_write_next(struct io_conn *conn,
{ {
const u8 *msg; const u8 *msg;
again:
msg = msg_dequeue(dc->out); msg = msg_dequeue(dc->out);
/* If nothing in queue, give empty callback a chance to queue somthing */
if (!msg && dc->outq_empty) {
dc->outq_empty(dc->arg);
msg = msg_dequeue(dc->out);
}
if (msg) { if (msg) {
int fd = msg_extract_fd(msg); int fd = msg_extract_fd(msg);
if (fd >= 0) { if (fd >= 0) {
@ -57,9 +62,6 @@ again:
} }
return io_write_wire(conn, take(msg), daemon_conn_write_next, return io_write_wire(conn, take(msg), daemon_conn_write_next,
dc); dc);
} else if (dc->outq_empty) {
if (dc->outq_empty(dc->arg))
goto again;
} }
return msg_queue_wait(conn, dc->out, daemon_conn_write_next, dc); return msg_queue_wait(conn, dc->out, daemon_conn_write_next, dc);
} }
@ -114,7 +116,7 @@ struct daemon_conn *daemon_conn_new_(const tal_t *ctx, int fd,
struct io_plan *(*recv)(struct io_conn *, struct io_plan *(*recv)(struct io_conn *,
const u8 *, const u8 *,
void *), void *),
bool (*outq_empty)(void *), void (*outq_empty)(void *),
void *arg) void *arg)
{ {
struct daemon_conn *dc = tal(NULL, struct daemon_conn); struct daemon_conn *dc = tal(NULL, struct daemon_conn);

4
common/daemon_conn.h

@ -21,14 +21,14 @@
(recv), (arg), \ (recv), (arg), \
struct io_conn *, \ struct io_conn *, \
const u8 *), \ const u8 *), \
typesafe_cb(bool, void *, (outq_empty), (arg)), \ typesafe_cb(void, void *, (outq_empty), (arg)), \
arg) arg)
struct daemon_conn *daemon_conn_new_(const tal_t *ctx, int fd, struct daemon_conn *daemon_conn_new_(const tal_t *ctx, int fd,
struct io_plan *(*recv)(struct io_conn *, struct io_plan *(*recv)(struct io_conn *,
const u8 *, const u8 *,
void *), void *),
bool (*outq_empty)(void *), void (*outq_empty)(void *),
void *arg); void *arg);
/** /**

23
gossipd/gossipd.c

@ -808,7 +808,7 @@ static void uniquify_node_ids(struct pubkey **ids)
tal_resize(ids, dst); tal_resize(ids, dst);
} }
static bool create_next_scid_reply(struct peer *peer) static void maybe_create_next_scid_reply(struct peer *peer)
{ {
struct routing_state *rstate = peer->daemon->rstate; struct routing_state *rstate = peer->daemon->rstate;
size_t i, num; size_t i, num;
@ -883,27 +883,24 @@ static bool create_next_scid_reply(struct peer *peer)
&rstate->chain_hash, &rstate->chain_hash,
true); true);
queue_peer_msg(peer, take(end)); queue_peer_msg(peer, take(end));
sent = true;
peer->scid_queries = tal_free(peer->scid_queries); peer->scid_queries = tal_free(peer->scid_queries);
peer->scid_query_idx = 0; peer->scid_query_idx = 0;
peer->scid_query_nodes = tal_free(peer->scid_query_nodes); peer->scid_query_nodes = tal_free(peer->scid_query_nodes);
peer->scid_query_nodes_idx = 0; peer->scid_query_nodes_idx = 0;
} }
return sent;
} }
/* If we're supposed to be sending gossip, do so now. */ /* If we're supposed to be sending gossip, do so now. */
static bool maybe_queue_gossip(struct peer *peer) static void maybe_queue_gossip(struct peer *peer)
{ {
const u8 *next; const u8 *next;
if (peer->gossip_timer) if (peer->gossip_timer)
return false; return;
#if DEVELOPER #if DEVELOPER
if (suppress_gossip) if (suppress_gossip)
return false; return;
#endif #endif
next = next_broadcast(peer->daemon->rstate->broadcasts, next = next_broadcast(peer->daemon->rstate->broadcasts,
@ -913,7 +910,7 @@ static bool maybe_queue_gossip(struct peer *peer)
if (next) { if (next) {
queue_peer_msg(peer, next); queue_peer_msg(peer, next);
return true; return;
} }
/* Gossip is drained. Wait for next timer. */ /* Gossip is drained. Wait for next timer. */
@ -921,20 +918,18 @@ static bool maybe_queue_gossip(struct peer *peer)
= new_reltimer(&peer->daemon->timers, peer, = new_reltimer(&peer->daemon->timers, peer,
time_from_msec(peer->daemon->broadcast_interval_msec), time_from_msec(peer->daemon->broadcast_interval_msec),
wake_gossip_out, peer); wake_gossip_out, peer);
return false;
} }
/** /**
* dump_gossip - catch the peer up with the latest gossip. * dump_gossip - catch the peer up with the latest gossip.
*/ */
static bool dump_gossip(struct peer *peer) static void dump_gossip(struct peer *peer)
{ {
/* Do we have scid query replies to send? */ /* Do we have scid query replies to send? */
if (create_next_scid_reply(peer)) maybe_create_next_scid_reply(peer);
return true;
/* Otherwise queue any gossip we want to send */ /* Queue any gossip we want to send */
return maybe_queue_gossip(peer); maybe_queue_gossip(peer);
} }
static void update_local_channel(struct daemon *daemon, static void update_local_channel(struct daemon *daemon,

Loading…
Cancel
Save