From 74d428109af4bc9e43afc4a3872595424b4e75f1 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Thu, 2 Aug 2018 16:19:55 +0930 Subject: [PATCH] channeld: it's OK to block on writing to peer. In fact, it's good. We don't want to queue up infinite gossip. Signed-off-by: Rusty Russell --- channeld/channel.c | 183 ++++++++++----------------------------------- 1 file changed, 39 insertions(+), 144 deletions(-) diff --git a/channeld/channel.c b/channeld/channel.c index a8d2599ea..1b5b6ce64 100644 --- a/channeld/channel.c +++ b/channeld/channel.c @@ -103,13 +103,6 @@ struct peer { struct channel_id channel_id; struct channel *channel; - /* Pending msgs to send (not encrypted) */ - struct msg_queue peer_out; - - /* Current msg to send, and offset (encrypted) */ - const u8 *peer_outmsg; - size_t peer_outoff; - /* Messages from master / gossipd: we queue them since we * might be waiting for a specific reply. */ struct msg_queue from_master, from_gossipd; @@ -220,84 +213,6 @@ static void *tal_arr_append_(void **p, size_t size) } #define tal_arr_append(p) tal_arr_append_((void **)(p), sizeof(**(p))) -static void do_peer_write(struct peer *peer) -{ - int r; - size_t len = tal_count(peer->peer_outmsg); - - r = write(PEER_FD, peer->peer_outmsg + peer->peer_outoff, - len - peer->peer_outoff); - if (r < 0) - peer_failed_connection_lost(); - - peer->peer_outoff += r; - if (peer->peer_outoff == len) - peer->peer_outmsg = tal_free(peer->peer_outmsg); -} - -static bool peer_write_pending(struct peer *peer) -{ - const u8 *msg; - - if (peer->peer_outmsg) - return true; - - msg = msg_dequeue(&peer->peer_out); - if (!msg) - return false; - - status_peer_io(LOG_IO_OUT, msg); - peer->peer_outmsg = cryptomsg_encrypt_msg(peer, &peer->cs, take(msg)); - peer->peer_outoff = 0; - return true; -} - -/* Synchronous flush of all pending packets. */ -static void flush_peer_out(struct peer *peer) -{ - while (peer_write_pending(peer)) - do_peer_write(peer); -} - -static void enqueue_peer_msg(struct peer *peer, const u8 *msg TAKES) -{ -#if DEVELOPER - enum dev_disconnect d = dev_disconnect(fromwire_peektype(msg)); - - /* We want to effect this exact packet, so flush any pending. */ - if (d != DEV_DISCONNECT_NORMAL) - flush_peer_out(peer); - - switch (d) { - case DEV_DISCONNECT_BEFORE: - /* Fail immediately. */ - dev_sabotage_fd(PEER_FD); - msg_enqueue(&peer->peer_out, msg); - flush_peer_out(peer); - /* Should not return */ - abort(); - case DEV_DISCONNECT_DROPPKT: - if (taken(msg)) - tal_free(msg); - /* Fail next time we try to do something. */ - dev_sabotage_fd(PEER_FD); - return; - case DEV_DISCONNECT_AFTER: - msg_enqueue(&peer->peer_out, msg); - flush_peer_out(peer); - dev_sabotage_fd(PEER_FD); - return; - case DEV_DISCONNECT_BLACKHOLE: - msg_enqueue(&peer->peer_out, msg); - dev_blackhole_fd(PEER_FD); - return; - case DEV_DISCONNECT_NORMAL: - break; - } -#endif - msg_enqueue(&peer->peer_out, msg); -} - static const u8 *hsm_req(const tal_t *ctx, const u8 *req TAKES) { u8 *msg; @@ -411,7 +326,8 @@ static void send_announcement_signatures(struct peer *peer) NULL, &peer->channel_id, &peer->short_channel_ids[LOCAL], &peer->announcement_node_sigs[LOCAL], &peer->announcement_bitcoin_sigs[LOCAL]); - enqueue_peer_msg(peer, take(msg)); + if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg))) + peer_failed_connection_lost(); } /* Tentatively create a channel_announcement, possibly with invalid @@ -755,7 +671,8 @@ static void maybe_send_shutdown(struct peer *peer) send_channel_update(peer, ROUTING_FLAGS_DISABLED); msg = towire_shutdown(NULL, &peer->channel_id, peer->final_scriptpubkey); - enqueue_peer_msg(peer, take(msg)); + if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg))) + peer_failed_connection_lost(); peer->send_shutdown = false; peer->shutdown_sent[LOCAL] = true; billboard_update(peer); @@ -1075,7 +992,8 @@ static void send_commit(struct peer *peer) feerate, max); msg = towire_update_fee(NULL, &peer->channel_id, feerate); - enqueue_peer_msg(peer, take(msg)); + if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg))) + peer_failed_connection_lost(); } /* BOLT #2: @@ -1117,7 +1035,8 @@ static void send_commit(struct peer *peer) msg = towire_commitment_signed(NULL, &peer->channel_id, &peer->next_commit_sigs->commit_sig, peer->next_commit_sigs->htlc_sigs); - enqueue_peer_msg(peer, take(msg)); + if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg))) + peer_failed_connection_lost(); peer->next_commit_sigs = tal_free(peer->next_commit_sigs); maybe_send_shutdown(peer); @@ -1181,7 +1100,8 @@ static void send_revocation(struct peer *peer) start_commit_timer(peer); } - enqueue_peer_msg(peer, take(msg)); + if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg))) + peer_failed_connection_lost(); } static u8 *got_commitsig_msg(const tal_t *ctx, @@ -1738,7 +1658,8 @@ static void resend_revoke(struct peer *peer) struct pubkey point; /* Current commit is peer->next_index[LOCAL]-1, revoke prior */ u8 *msg = make_revocation_msg(peer, peer->next_index[LOCAL]-2, &point); - enqueue_peer_msg(peer, take(msg)); + if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg))) + peer_failed_connection_lost(); } static void send_fail_or_fulfill(struct peer *peer, const struct htlc *h) @@ -1777,7 +1698,8 @@ static void send_fail_or_fulfill(struct peer *peer, const struct htlc *h) &peer->channel_id, "HTLC %"PRIu64" state %s not failed/fulfilled", h->id, htlc_state_name(h->state)); - enqueue_peer_msg(peer, take(msg)); + if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg))) + peer_failed_connection_lost(); } static void resend_commitment(struct peer *peer, const struct changed_htlc *last) @@ -1812,13 +1734,14 @@ static void resend_commitment(struct peer *peer, const struct changed_htlc *last last[i].id); if (h->state == SENT_ADD_COMMIT) { - u8 *msg = towire_update_add_htlc(peer, &peer->channel_id, + u8 *msg = towire_update_add_htlc(NULL, &peer->channel_id, h->id, h->msatoshi, &h->rhash, abs_locktime_to_blocks( &h->expiry), h->routing); - enqueue_peer_msg(peer, take(msg)); + if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg))) + peer_failed_connection_lost(); } else if (h->state == SENT_REMOVE_COMMIT) { send_fail_or_fulfill(peer, h); } @@ -1828,7 +1751,8 @@ static void resend_commitment(struct peer *peer, const struct changed_htlc *last if (peer->channel->funder == LOCAL) { msg = towire_update_fee(NULL, &peer->channel_id, channel_feerate(peer->channel, REMOTE)); - enqueue_peer_msg(peer, take(msg)); + if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg))) + peer_failed_connection_lost(); } /* Re-send the commitment_signed itself. */ @@ -1836,7 +1760,8 @@ static void resend_commitment(struct peer *peer, const struct changed_htlc *last msg = towire_commitment_signed(NULL, &peer->channel_id, &commit_sigs->commit_sig, commit_sigs->htlc_sigs); - enqueue_peer_msg(peer, take(msg)); + if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg))) + peer_failed_connection_lost(); tal_free(commit_sigs); /* If we have already received the revocation for the previous, the @@ -1848,21 +1773,12 @@ static void resend_commitment(struct peer *peer, const struct changed_htlc *last peer->revocations_received); } -static bool channeld_send_reply(struct crypto_state *cs UNUSED, - int peer_fd UNUSED, - const u8 *msg, - struct peer *peer) -{ - enqueue_peer_msg(peer, msg); - return true; -} - static u8 *channeld_read_peer_msg(struct peer *peer) { return read_peer_msg(peer, &peer->cs, &peer->channel_id, - channeld_send_reply, - peer); + sync_crypto_write_arg, + NULL); } static void peer_reconnect(struct peer *peer) @@ -1905,7 +1821,7 @@ static void peer_reconnect(struct peer *peer) * before we've reestablished channel). */ while ((msg = read_peer_msg_nogossip(peer, &peer->cs, &peer->channel_id, - channeld_send_reply, + sync_crypto_write_arg, peer)) == NULL) clean_tmpctx(); @@ -1940,7 +1856,8 @@ static void peer_reconnect(struct peer *peer) msg = towire_funding_locked(NULL, &peer->channel_id, &peer->next_local_per_commit); - enqueue_peer_msg(peer, take(msg)); + if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg))) + peer_failed_connection_lost(); } /* Note: next_index is the index of the current commit we're working @@ -2082,7 +1999,8 @@ static void handle_funding_locked(struct peer *peer, const u8 *msg) msg = towire_funding_locked(NULL, &peer->channel_id, &peer->next_local_per_commit); - enqueue_peer_msg(peer, take(msg)); + if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg))) + peer_failed_connection_lost(); peer->funding_locked[LOCAL] = true; } @@ -2129,7 +2047,8 @@ static void handle_offer_htlc(struct peer *peer, const u8 *inmsg) peer->htlc_id, amount_msat, &payment_hash, cltv_expiry, onion_routing_packet); - enqueue_peer_msg(peer, take(msg)); + if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg))) + peer_failed_connection_lost(); start_commit_timer(peer); /* Tell the master. */ msg = towire_channel_offer_htlc_reply(NULL, peer->htlc_id, @@ -2493,8 +2412,10 @@ static void init_channel(struct peer *peer) peer_reconnect(peer); /* If we have a funding_signed message, send that immediately */ - if (funding_signed) - enqueue_peer_msg(peer, take(funding_signed)); + if (funding_signed) { + if (!sync_crypto_write(&peer->cs, PEER_FD, take(funding_signed))) + peer_failed_connection_lost(); + } /* Reenable channel */ channel_announcement_negotiate(peer); @@ -2504,9 +2425,6 @@ static void init_channel(struct peer *peer) static void send_shutdown_complete(struct peer *peer) { - /* Push out any incomplete messages to peer. */ - flush_peer_out(peer); - /* Now we can tell master shutdown is complete. */ wire_sync_write(MASTER_FD, take(towire_channel_shutdown_complete(NULL, &peer->cs))); @@ -2534,9 +2452,6 @@ int main(int argc, char *argv[]) peer->channel_local_active = false; msg_queue_init(&peer->from_master, peer); msg_queue_init(&peer->from_gossipd, peer); - msg_queue_init(&peer->peer_out, peer); - peer->peer_outmsg = NULL; - peer->peer_outoff = 0; peer->next_commit_sigs = NULL; peer->shutdown_sent[LOCAL] = false; peer->last_update_timestamp = 0; @@ -2564,7 +2479,7 @@ int main(int argc, char *argv[]) while (!shutdown_complete(peer)) { struct timemono first; - fd_set rfds = fds_in, wfds, *wptr; + fd_set rfds = fds_in; struct timeval timeout, *tptr; struct timer *expired; const u8 *msg; @@ -2595,8 +2510,8 @@ int main(int argc, char *argv[]) status_trace("Now dealing with deferred gossip %u", fromwire_peektype(msg)); handle_gossip_msg(take(msg), &peer->cs, - channeld_send_reply, - peer); + sync_crypto_write_arg, + NULL); continue; } @@ -2607,13 +2522,7 @@ int main(int argc, char *argv[]) } else tptr = NULL; - if (peer_write_pending(peer)) { - wfds = fds_out; - wptr = &wfds; - } else - wptr = NULL; - - if (select(nfds, &rfds, wptr, NULL, tptr) < 0) { + if (select(nfds, &rfds, NULL, NULL, tptr) < 0) { /* Signals OK, eg. SIGUSR1 */ if (errno == EINTR) continue; @@ -2621,20 +2530,6 @@ int main(int argc, char *argv[]) "select failed: %s", strerror(errno)); } - /* Try writing out encrypted packet if any (don't block!) */ - if (wptr && FD_ISSET(PEER_FD, wptr)) { - if (!io_fd_block(PEER_FD, false)) - status_failed(STATUS_FAIL_INTERNAL_ERROR, - "NONBLOCK failed: %s", - strerror(errno)); - do_peer_write(peer); - if (!io_fd_block(PEER_FD, true)) - status_failed(STATUS_FAIL_INTERNAL_ERROR, - "NONBLOCK unset failed: %s", - strerror(errno)); - continue; - } - if (FD_ISSET(MASTER_FD, &rfds)) { msg = wire_sync_read(tmpctx, MASTER_FD); @@ -2650,7 +2545,7 @@ int main(int argc, char *argv[]) if (!msg) peer_failed_connection_lost(); handle_gossip_msg(msg, &peer->cs, - channeld_send_reply, + sync_crypto_write_arg, peer); } else if (FD_ISSET(PEER_FD, &rfds)) { /* This could take forever, but who cares? */