Browse Source

channeld: don't queue gossip msgs while waiting for foreign_channel_update.

We ask gossipd for the channel_update for the outgoing channel; any other
messages it sends us get queued for later processing.

But this is overzealous: we can shunt those msgs to the peer while
we're waiting.  This fixes a nasty case where we have to handle
WIRE_GOSSIPD_NEW_STORE_FD messages by queuing the fd for later.

This then means that WIRE_GOSSIPD_NEW_STORE_FD can be handled
internally inside handle_gossip_msg(), since it's always dealt with
the same, simplifying all callers.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
htlc_accepted_hook
Rusty Russell 6 years ago
parent
commit
f1b4b14be5
  1. 99
      channeld/channeld.c
  2. 8
      closingd/closingd.c
  3. 28
      common/read_peer_msg.c
  4. 8
      common/read_peer_msg.h
  5. 15
      openingd/openingd.c

99
channeld/channeld.c

@ -103,9 +103,9 @@ struct peer {
struct channel_id channel_id;
struct channel *channel;
/* Messages from master / gossipd: we queue them since we
* might be waiting for a specific reply. */
struct msg_queue *from_master, *from_gossipd;
/* Messages from master: we queue them since we might be
* waiting for a specific reply. */
struct msg_queue *from_master;
struct timers timers;
struct oneshot *commit_timer;
@ -786,32 +786,30 @@ static void maybe_send_shutdown(struct peer *peer)
}
/* This queues other traffic from the fd until we get reply. */
static u8 *wait_sync_reply(const tal_t *ctx,
const u8 *msg,
int replytype,
int fd,
struct msg_queue *queue,
const char *who)
static u8 *master_wait_sync_reply(const tal_t *ctx,
struct peer *peer,
const u8 *msg,
int replytype)
{
u8 *reply;
status_trace("Sending %s %u", who, fromwire_peektype(msg));
status_trace("Sending master %u", fromwire_peektype(msg));
if (!wire_sync_write(fd, msg))
if (!wire_sync_write(MASTER_FD, msg))
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Could not set sync write to %s: %s",
who, strerror(errno));
"Could not set sync write to master: %s",
strerror(errno));
status_trace("... , awaiting %u", replytype);
for (;;) {
int type;
reply = wire_sync_read(ctx, fd);
reply = wire_sync_read(ctx, MASTER_FD);
if (!reply)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Could not set sync read from %s: %s",
who, strerror(errno));
"Could not set sync read from master: %s",
strerror(errno));
type = fromwire_peektype(reply);
if (type == replytype) {
status_trace("Got it!");
@ -819,29 +817,44 @@ static u8 *wait_sync_reply(const tal_t *ctx,
}
status_trace("Nope, got %u instead", type);
msg_enqueue(queue, take(reply));
/* This one has an fd appended */
if (type == WIRE_GOSSIPD_NEW_STORE_FD)
msg_enqueue_fd(queue, fdpass_recv(fd));
msg_enqueue(peer->from_master, take(reply));
}
return reply;
}
static u8 *master_wait_sync_reply(const tal_t *ctx,
struct peer *peer, const u8 *msg,
enum channel_wire_type replytype)
{
return wait_sync_reply(ctx, msg, replytype,
MASTER_FD, peer->from_master, "master");
}
static u8 *gossipd_wait_sync_reply(const tal_t *ctx,
struct peer *peer, const u8 *msg,
enum gossip_peerd_wire_type replytype)
{
return wait_sync_reply(ctx, msg, replytype,
GOSSIP_FD, peer->from_gossipd, "gossipd");
/* We can forward gossip packets while waiting for our reply. */
u8 *reply;
status_trace("Sending gossipd %u", fromwire_peektype(msg));
wire_sync_write(GOSSIP_FD, msg);
status_trace("... , awaiting %u", replytype);
for (;;) {
int type;
reply = wire_sync_read(tmpctx, GOSSIP_FD);
/* Gossipd hangs up on us to kill us when a new
* connection comes in. */
if (!reply)
peer_failed_connection_lost();
type = fromwire_peektype(reply);
if (type == replytype) {
status_trace("Got it!");
break;
}
handle_gossip_msg(PEER_FD, GOSSIP_FD, GOSSIP_STORE_FD,
&peer->cs, take(reply));
}
return reply;
}
static u8 *foreign_channel_update(const tal_t *ctx,
@ -3031,7 +3044,6 @@ int main(int argc, char *argv[])
peer->announce_depth_reached = false;
peer->channel_local_active = false;
peer->from_master = msg_queue_new(peer);
peer->from_gossipd = msg_queue_new(peer);
peer->shutdown_sent[LOCAL] = false;
peer->last_update_timestamp = 0;
/* We actually received it in the previous daemon, but near enough */
@ -3087,23 +3099,6 @@ int main(int argc, char *argv[])
continue;
}
msg = msg_dequeue(peer->from_gossipd);
if (msg) {
if (fromwire_gossipd_new_store_fd(msg)) {
tal_free(msg);
msg = msg_dequeue(peer->from_gossipd);
new_gossip_store(GOSSIP_STORE_FD,
msg_extract_fd(msg));
tal_free(msg);
continue;
}
status_trace("Now dealing with deferred gossip %u",
fromwire_peektype(msg));
handle_gossip_msg(PEER_FD, GOSSIP_STORE_FD,
&peer->cs, take(msg));
continue;
}
if (timer_earliest(&peer->timers, &first)) {
timeout = timespec_to_timeval(
timemono_between(first, now).ts);
@ -3137,13 +3132,7 @@ int main(int argc, char *argv[])
* connection comes in. */
if (!msg)
peer_failed_connection_lost();
if (fromwire_gossipd_new_store_fd(msg)) {
tal_free(msg);
new_gossip_store(GOSSIP_STORE_FD,
fdpass_recv(GOSSIP_FD));
continue;
}
handle_gossip_msg(PEER_FD, GOSSIP_STORE_FD,
handle_gossip_msg(PEER_FD, GOSSIP_FD, GOSSIP_STORE_FD,
&peer->cs, take(msg));
}
}

8
closingd/closingd.c

@ -101,13 +101,7 @@ static u8 *closing_read_peer_msg(const tal_t *ctx,
msg = peer_or_gossip_sync_read(ctx, PEER_FD, GOSSIP_FD,
cs, &from_gossipd);
if (from_gossipd) {
if (fromwire_gossipd_new_store_fd(msg)) {
tal_free(msg);
new_gossip_store(GOSSIP_STORE_FD,
fdpass_recv(GOSSIP_FD));
continue;
}
handle_gossip_msg(PEER_FD, GOSSIP_STORE_FD,
handle_gossip_msg(PEER_FD, GOSSIP_FD, GOSSIP_STORE_FD,
cs, take(msg));
continue;
}

28
common/read_peer_msg.c

@ -1,3 +1,4 @@
#include <ccan/fdpass/fdpass.h>
#include <common/crypto_sync.h>
#include <common/gossip_store.h>
#include <common/peer_failed.h>
@ -82,13 +83,25 @@ bool is_wrong_channel(const u8 *msg, const struct channel_id *expected,
return !channel_id_eq(expected, actual);
}
void handle_gossip_msg(int peer_fd, int gossip_store_fd,
static void new_gossip_store(int gossip_store_fd, int new_gossip_store_fd)
{
if (dup2(new_gossip_store_fd, gossip_store_fd) == -1)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Could not dup2 new fd %i onto %i: %s",
new_gossip_store_fd, gossip_store_fd,
strerror(errno));
}
void handle_gossip_msg(int peer_fd, int gossip_fd, int gossip_store_fd,
struct crypto_state *cs, const u8 *msg TAKES)
{
u8 *gossip;
u64 offset;
if (fromwire_gossipd_send_gossip_from_store(msg, &offset))
if (fromwire_gossipd_new_store_fd(msg)) {
new_gossip_store(gossip_store_fd, fdpass_recv(gossip_fd));
goto out;
} else if (fromwire_gossipd_send_gossip_from_store(msg, &offset))
gossip = gossip_store_read(tmpctx, gossip_store_fd, offset);
else if (!fromwire_gossipd_send_gossip(tmpctx, msg, &gossip)) {
status_broken("Got bad message from gossipd: %s",
@ -108,6 +121,8 @@ void handle_gossip_msg(int peer_fd, int gossip_store_fd,
tal_hex(msg, msg));
peer_failed_connection_lost();
}
out:
if (taken(msg))
tal_free(msg);
}
@ -158,12 +173,3 @@ handled:
tal_free(msg);
return true;
}
void new_gossip_store(int gossip_store_fd, int new_gossip_store_fd)
{
if (dup2(new_gossip_store_fd, gossip_store_fd) == -1)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Could not dup2 new fd %i onto %i: %s",
new_gossip_store_fd, gossip_store_fd,
strerror(errno));
}

8
common/read_peer_msg.h

@ -73,14 +73,8 @@ bool handle_peer_gossip_or_error(int peer_fd, int gossip_fd, int gossip_store_fd
/* We got this message from gossipd: forward/quit as it asks. */
void handle_gossip_msg(int peer_fd,
int gossip_fd,
int gossip_store_fd,
struct crypto_state *cs,
const u8 *msg TAKES);
/**
* new_gossip_store - handle replacement gossip_store_fd.
* @gossip_store_fd: our fixed fd we expect to use to read gossip_store.
* @new_gossip_store_fd: fd received from gossipd.
*/
void new_gossip_store(int gossip_store_fd, int new_gossip_store_fd);
#endif /* LIGHTNING_COMMON_READ_PEER_MSG_H */

15
openingd/openingd.c

@ -371,13 +371,7 @@ static u8 *opening_negotiate_msg(const tal_t *ctx, struct state *state,
/* Use standard helper for gossip msgs (forwards, if it's an
* error, exits). */
if (from_gossipd) {
if (fromwire_gossipd_new_store_fd(msg)) {
tal_free(msg);
new_gossip_store(GOSSIP_STORE_FD,
fdpass_recv(GOSSIP_FD));
continue;
}
handle_gossip_msg(PEER_FD, GOSSIP_STORE_FD,
handle_gossip_msg(PEER_FD, GOSSIP_FD, GOSSIP_STORE_FD,
&state->cs, take(msg));
continue;
}
@ -1360,12 +1354,7 @@ static void handle_gossip_in(struct state *state)
status_failed(STATUS_FAIL_GOSSIP_IO,
"Reading gossip: %s", strerror(errno));
if (fromwire_gossipd_new_store_fd(msg)) {
tal_free(msg);
new_gossip_store(GOSSIP_STORE_FD, fdpass_recv(GOSSIP_FD));
return;
}
handle_gossip_msg(PEER_FD, GOSSIP_STORE_FD, &state->cs, take(msg));
handle_gossip_msg(PEER_FD, GOSSIP_FD, GOSSIP_STORE_FD, &state->cs, take(msg));
}
/*~ Is this message of type `error` with the special zero-id

Loading…
Cancel
Save