Browse Source

channeld: it's OK to block on writing to peer.

In fact, it's good.  We don't want to queue up infinite gossip.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
ppa-0.6.1
Rusty Russell 6 years ago
parent
commit
74d428109a
  1. 183
      channeld/channel.c

183
channeld/channel.c

@ -103,13 +103,6 @@ struct peer {
struct channel_id channel_id; struct channel_id channel_id;
struct channel *channel; struct channel *channel;
/* Pending msgs to send (not encrypted) */
struct msg_queue peer_out;
/* Current msg to send, and offset (encrypted) */
const u8 *peer_outmsg;
size_t peer_outoff;
/* Messages from master / gossipd: we queue them since we /* Messages from master / gossipd: we queue them since we
* might be waiting for a specific reply. */ * might be waiting for a specific reply. */
struct msg_queue from_master, from_gossipd; struct msg_queue from_master, from_gossipd;
@ -220,84 +213,6 @@ static void *tal_arr_append_(void **p, size_t size)
} }
#define tal_arr_append(p) tal_arr_append_((void **)(p), sizeof(**(p))) #define tal_arr_append(p) tal_arr_append_((void **)(p), sizeof(**(p)))
static void do_peer_write(struct peer *peer)
{
int r;
size_t len = tal_count(peer->peer_outmsg);
r = write(PEER_FD, peer->peer_outmsg + peer->peer_outoff,
len - peer->peer_outoff);
if (r < 0)
peer_failed_connection_lost();
peer->peer_outoff += r;
if (peer->peer_outoff == len)
peer->peer_outmsg = tal_free(peer->peer_outmsg);
}
static bool peer_write_pending(struct peer *peer)
{
const u8 *msg;
if (peer->peer_outmsg)
return true;
msg = msg_dequeue(&peer->peer_out);
if (!msg)
return false;
status_peer_io(LOG_IO_OUT, msg);
peer->peer_outmsg = cryptomsg_encrypt_msg(peer, &peer->cs, take(msg));
peer->peer_outoff = 0;
return true;
}
/* Synchronous flush of all pending packets. */
static void flush_peer_out(struct peer *peer)
{
while (peer_write_pending(peer))
do_peer_write(peer);
}
static void enqueue_peer_msg(struct peer *peer, const u8 *msg TAKES)
{
#if DEVELOPER
enum dev_disconnect d = dev_disconnect(fromwire_peektype(msg));
/* We want to effect this exact packet, so flush any pending. */
if (d != DEV_DISCONNECT_NORMAL)
flush_peer_out(peer);
switch (d) {
case DEV_DISCONNECT_BEFORE:
/* Fail immediately. */
dev_sabotage_fd(PEER_FD);
msg_enqueue(&peer->peer_out, msg);
flush_peer_out(peer);
/* Should not return */
abort();
case DEV_DISCONNECT_DROPPKT:
if (taken(msg))
tal_free(msg);
/* Fail next time we try to do something. */
dev_sabotage_fd(PEER_FD);
return;
case DEV_DISCONNECT_AFTER:
msg_enqueue(&peer->peer_out, msg);
flush_peer_out(peer);
dev_sabotage_fd(PEER_FD);
return;
case DEV_DISCONNECT_BLACKHOLE:
msg_enqueue(&peer->peer_out, msg);
dev_blackhole_fd(PEER_FD);
return;
case DEV_DISCONNECT_NORMAL:
break;
}
#endif
msg_enqueue(&peer->peer_out, msg);
}
static const u8 *hsm_req(const tal_t *ctx, const u8 *req TAKES) static const u8 *hsm_req(const tal_t *ctx, const u8 *req TAKES)
{ {
u8 *msg; u8 *msg;
@ -411,7 +326,8 @@ static void send_announcement_signatures(struct peer *peer)
NULL, &peer->channel_id, &peer->short_channel_ids[LOCAL], NULL, &peer->channel_id, &peer->short_channel_ids[LOCAL],
&peer->announcement_node_sigs[LOCAL], &peer->announcement_node_sigs[LOCAL],
&peer->announcement_bitcoin_sigs[LOCAL]); &peer->announcement_bitcoin_sigs[LOCAL]);
enqueue_peer_msg(peer, take(msg)); if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg)))
peer_failed_connection_lost();
} }
/* Tentatively create a channel_announcement, possibly with invalid /* Tentatively create a channel_announcement, possibly with invalid
@ -755,7 +671,8 @@ static void maybe_send_shutdown(struct peer *peer)
send_channel_update(peer, ROUTING_FLAGS_DISABLED); send_channel_update(peer, ROUTING_FLAGS_DISABLED);
msg = towire_shutdown(NULL, &peer->channel_id, peer->final_scriptpubkey); msg = towire_shutdown(NULL, &peer->channel_id, peer->final_scriptpubkey);
enqueue_peer_msg(peer, take(msg)); if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg)))
peer_failed_connection_lost();
peer->send_shutdown = false; peer->send_shutdown = false;
peer->shutdown_sent[LOCAL] = true; peer->shutdown_sent[LOCAL] = true;
billboard_update(peer); billboard_update(peer);
@ -1075,7 +992,8 @@ static void send_commit(struct peer *peer)
feerate, max); feerate, max);
msg = towire_update_fee(NULL, &peer->channel_id, feerate); msg = towire_update_fee(NULL, &peer->channel_id, feerate);
enqueue_peer_msg(peer, take(msg)); if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg)))
peer_failed_connection_lost();
} }
/* BOLT #2: /* BOLT #2:
@ -1117,7 +1035,8 @@ static void send_commit(struct peer *peer)
msg = towire_commitment_signed(NULL, &peer->channel_id, msg = towire_commitment_signed(NULL, &peer->channel_id,
&peer->next_commit_sigs->commit_sig, &peer->next_commit_sigs->commit_sig,
peer->next_commit_sigs->htlc_sigs); peer->next_commit_sigs->htlc_sigs);
enqueue_peer_msg(peer, take(msg)); if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg)))
peer_failed_connection_lost();
peer->next_commit_sigs = tal_free(peer->next_commit_sigs); peer->next_commit_sigs = tal_free(peer->next_commit_sigs);
maybe_send_shutdown(peer); maybe_send_shutdown(peer);
@ -1181,7 +1100,8 @@ static void send_revocation(struct peer *peer)
start_commit_timer(peer); start_commit_timer(peer);
} }
enqueue_peer_msg(peer, take(msg)); if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg)))
peer_failed_connection_lost();
} }
static u8 *got_commitsig_msg(const tal_t *ctx, static u8 *got_commitsig_msg(const tal_t *ctx,
@ -1738,7 +1658,8 @@ static void resend_revoke(struct peer *peer)
struct pubkey point; struct pubkey point;
/* Current commit is peer->next_index[LOCAL]-1, revoke prior */ /* Current commit is peer->next_index[LOCAL]-1, revoke prior */
u8 *msg = make_revocation_msg(peer, peer->next_index[LOCAL]-2, &point); u8 *msg = make_revocation_msg(peer, peer->next_index[LOCAL]-2, &point);
enqueue_peer_msg(peer, take(msg)); if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg)))
peer_failed_connection_lost();
} }
static void send_fail_or_fulfill(struct peer *peer, const struct htlc *h) static void send_fail_or_fulfill(struct peer *peer, const struct htlc *h)
@ -1777,7 +1698,8 @@ static void send_fail_or_fulfill(struct peer *peer, const struct htlc *h)
&peer->channel_id, &peer->channel_id,
"HTLC %"PRIu64" state %s not failed/fulfilled", "HTLC %"PRIu64" state %s not failed/fulfilled",
h->id, htlc_state_name(h->state)); h->id, htlc_state_name(h->state));
enqueue_peer_msg(peer, take(msg)); if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg)))
peer_failed_connection_lost();
} }
static void resend_commitment(struct peer *peer, const struct changed_htlc *last) static void resend_commitment(struct peer *peer, const struct changed_htlc *last)
@ -1812,13 +1734,14 @@ static void resend_commitment(struct peer *peer, const struct changed_htlc *last
last[i].id); last[i].id);
if (h->state == SENT_ADD_COMMIT) { if (h->state == SENT_ADD_COMMIT) {
u8 *msg = towire_update_add_htlc(peer, &peer->channel_id, u8 *msg = towire_update_add_htlc(NULL, &peer->channel_id,
h->id, h->msatoshi, h->id, h->msatoshi,
&h->rhash, &h->rhash,
abs_locktime_to_blocks( abs_locktime_to_blocks(
&h->expiry), &h->expiry),
h->routing); h->routing);
enqueue_peer_msg(peer, take(msg)); if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg)))
peer_failed_connection_lost();
} else if (h->state == SENT_REMOVE_COMMIT) { } else if (h->state == SENT_REMOVE_COMMIT) {
send_fail_or_fulfill(peer, h); send_fail_or_fulfill(peer, h);
} }
@ -1828,7 +1751,8 @@ static void resend_commitment(struct peer *peer, const struct changed_htlc *last
if (peer->channel->funder == LOCAL) { if (peer->channel->funder == LOCAL) {
msg = towire_update_fee(NULL, &peer->channel_id, msg = towire_update_fee(NULL, &peer->channel_id,
channel_feerate(peer->channel, REMOTE)); channel_feerate(peer->channel, REMOTE));
enqueue_peer_msg(peer, take(msg)); if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg)))
peer_failed_connection_lost();
} }
/* Re-send the commitment_signed itself. */ /* Re-send the commitment_signed itself. */
@ -1836,7 +1760,8 @@ static void resend_commitment(struct peer *peer, const struct changed_htlc *last
msg = towire_commitment_signed(NULL, &peer->channel_id, msg = towire_commitment_signed(NULL, &peer->channel_id,
&commit_sigs->commit_sig, &commit_sigs->commit_sig,
commit_sigs->htlc_sigs); commit_sigs->htlc_sigs);
enqueue_peer_msg(peer, take(msg)); if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg)))
peer_failed_connection_lost();
tal_free(commit_sigs); tal_free(commit_sigs);
/* If we have already received the revocation for the previous, the /* If we have already received the revocation for the previous, the
@ -1848,21 +1773,12 @@ static void resend_commitment(struct peer *peer, const struct changed_htlc *last
peer->revocations_received); peer->revocations_received);
} }
static bool channeld_send_reply(struct crypto_state *cs UNUSED,
int peer_fd UNUSED,
const u8 *msg,
struct peer *peer)
{
enqueue_peer_msg(peer, msg);
return true;
}
static u8 *channeld_read_peer_msg(struct peer *peer) static u8 *channeld_read_peer_msg(struct peer *peer)
{ {
return read_peer_msg(peer, &peer->cs, return read_peer_msg(peer, &peer->cs,
&peer->channel_id, &peer->channel_id,
channeld_send_reply, sync_crypto_write_arg,
peer); NULL);
} }
static void peer_reconnect(struct peer *peer) static void peer_reconnect(struct peer *peer)
@ -1905,7 +1821,7 @@ static void peer_reconnect(struct peer *peer)
* before we've reestablished channel). */ * before we've reestablished channel). */
while ((msg = read_peer_msg_nogossip(peer, &peer->cs, while ((msg = read_peer_msg_nogossip(peer, &peer->cs,
&peer->channel_id, &peer->channel_id,
channeld_send_reply, sync_crypto_write_arg,
peer)) == NULL) peer)) == NULL)
clean_tmpctx(); clean_tmpctx();
@ -1940,7 +1856,8 @@ static void peer_reconnect(struct peer *peer)
msg = towire_funding_locked(NULL, msg = towire_funding_locked(NULL,
&peer->channel_id, &peer->channel_id,
&peer->next_local_per_commit); &peer->next_local_per_commit);
enqueue_peer_msg(peer, take(msg)); if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg)))
peer_failed_connection_lost();
} }
/* Note: next_index is the index of the current commit we're working /* Note: next_index is the index of the current commit we're working
@ -2082,7 +1999,8 @@ static void handle_funding_locked(struct peer *peer, const u8 *msg)
msg = towire_funding_locked(NULL, msg = towire_funding_locked(NULL,
&peer->channel_id, &peer->channel_id,
&peer->next_local_per_commit); &peer->next_local_per_commit);
enqueue_peer_msg(peer, take(msg)); if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg)))
peer_failed_connection_lost();
peer->funding_locked[LOCAL] = true; peer->funding_locked[LOCAL] = true;
} }
@ -2129,7 +2047,8 @@ static void handle_offer_htlc(struct peer *peer, const u8 *inmsg)
peer->htlc_id, amount_msat, peer->htlc_id, amount_msat,
&payment_hash, cltv_expiry, &payment_hash, cltv_expiry,
onion_routing_packet); onion_routing_packet);
enqueue_peer_msg(peer, take(msg)); if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg)))
peer_failed_connection_lost();
start_commit_timer(peer); start_commit_timer(peer);
/* Tell the master. */ /* Tell the master. */
msg = towire_channel_offer_htlc_reply(NULL, peer->htlc_id, msg = towire_channel_offer_htlc_reply(NULL, peer->htlc_id,
@ -2493,8 +2412,10 @@ static void init_channel(struct peer *peer)
peer_reconnect(peer); peer_reconnect(peer);
/* If we have a funding_signed message, send that immediately */ /* If we have a funding_signed message, send that immediately */
if (funding_signed) if (funding_signed) {
enqueue_peer_msg(peer, take(funding_signed)); if (!sync_crypto_write(&peer->cs, PEER_FD, take(funding_signed)))
peer_failed_connection_lost();
}
/* Reenable channel */ /* Reenable channel */
channel_announcement_negotiate(peer); channel_announcement_negotiate(peer);
@ -2504,9 +2425,6 @@ static void init_channel(struct peer *peer)
static void send_shutdown_complete(struct peer *peer) static void send_shutdown_complete(struct peer *peer)
{ {
/* Push out any incomplete messages to peer. */
flush_peer_out(peer);
/* Now we can tell master shutdown is complete. */ /* Now we can tell master shutdown is complete. */
wire_sync_write(MASTER_FD, wire_sync_write(MASTER_FD,
take(towire_channel_shutdown_complete(NULL, &peer->cs))); take(towire_channel_shutdown_complete(NULL, &peer->cs)));
@ -2534,9 +2452,6 @@ int main(int argc, char *argv[])
peer->channel_local_active = false; peer->channel_local_active = false;
msg_queue_init(&peer->from_master, peer); msg_queue_init(&peer->from_master, peer);
msg_queue_init(&peer->from_gossipd, peer); msg_queue_init(&peer->from_gossipd, peer);
msg_queue_init(&peer->peer_out, peer);
peer->peer_outmsg = NULL;
peer->peer_outoff = 0;
peer->next_commit_sigs = NULL; peer->next_commit_sigs = NULL;
peer->shutdown_sent[LOCAL] = false; peer->shutdown_sent[LOCAL] = false;
peer->last_update_timestamp = 0; peer->last_update_timestamp = 0;
@ -2564,7 +2479,7 @@ int main(int argc, char *argv[])
while (!shutdown_complete(peer)) { while (!shutdown_complete(peer)) {
struct timemono first; struct timemono first;
fd_set rfds = fds_in, wfds, *wptr; fd_set rfds = fds_in;
struct timeval timeout, *tptr; struct timeval timeout, *tptr;
struct timer *expired; struct timer *expired;
const u8 *msg; const u8 *msg;
@ -2595,8 +2510,8 @@ int main(int argc, char *argv[])
status_trace("Now dealing with deferred gossip %u", status_trace("Now dealing with deferred gossip %u",
fromwire_peektype(msg)); fromwire_peektype(msg));
handle_gossip_msg(take(msg), &peer->cs, handle_gossip_msg(take(msg), &peer->cs,
channeld_send_reply, sync_crypto_write_arg,
peer); NULL);
continue; continue;
} }
@ -2607,13 +2522,7 @@ int main(int argc, char *argv[])
} else } else
tptr = NULL; tptr = NULL;
if (peer_write_pending(peer)) { if (select(nfds, &rfds, NULL, NULL, tptr) < 0) {
wfds = fds_out;
wptr = &wfds;
} else
wptr = NULL;
if (select(nfds, &rfds, wptr, NULL, tptr) < 0) {
/* Signals OK, eg. SIGUSR1 */ /* Signals OK, eg. SIGUSR1 */
if (errno == EINTR) if (errno == EINTR)
continue; continue;
@ -2621,20 +2530,6 @@ int main(int argc, char *argv[])
"select failed: %s", strerror(errno)); "select failed: %s", strerror(errno));
} }
/* Try writing out encrypted packet if any (don't block!) */
if (wptr && FD_ISSET(PEER_FD, wptr)) {
if (!io_fd_block(PEER_FD, false))
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"NONBLOCK failed: %s",
strerror(errno));
do_peer_write(peer);
if (!io_fd_block(PEER_FD, true))
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"NONBLOCK unset failed: %s",
strerror(errno));
continue;
}
if (FD_ISSET(MASTER_FD, &rfds)) { if (FD_ISSET(MASTER_FD, &rfds)) {
msg = wire_sync_read(tmpctx, MASTER_FD); msg = wire_sync_read(tmpctx, MASTER_FD);
@ -2650,7 +2545,7 @@ int main(int argc, char *argv[])
if (!msg) if (!msg)
peer_failed_connection_lost(); peer_failed_connection_lost();
handle_gossip_msg(msg, &peer->cs, handle_gossip_msg(msg, &peer->cs,
channeld_send_reply, sync_crypto_write_arg,
peer); peer);
} else if (FD_ISSET(PEER_FD, &rfds)) { } else if (FD_ISSET(PEER_FD, &rfds)) {
/* This could take forever, but who cares? */ /* This could take forever, but who cares? */

Loading…
Cancel
Save