Browse Source

channeld: fix dev_disconnect.

I noted a spurious failure on test_reconnect_sender_add1: we
actually sent an update_commit, which should have been suppressed.

This was because we call dev_disconnect() when we *dequeue* the packet,
which might be too late to suppress the timer.  So instead, call it
when the packet in enqueued, and flush synchronously to make sure
we get the right packet.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
ppa-0.6.1
Rusty Russell 7 years ago
committed by Christian Decker
parent
commit
425143646c
  1. 193
      channeld/channel.c

193
channeld/channel.c

@ -112,11 +112,6 @@ struct peer {
const u8 *peer_outmsg; const u8 *peer_outmsg;
size_t peer_outoff; size_t peer_outoff;
#if DEVELOPER
/* Sabotage fd after sending next msg. */
bool post_sabotage;
#endif
/* Messages from master / gossipd: we queue them since we /* Messages from master / gossipd: we queue them since we
* might be waiting for a specific reply. */ * might be waiting for a specific reply. */
struct msg_queue from_master, from_gossipd; struct msg_queue from_master, from_gossipd;
@ -178,6 +173,84 @@ static void *tal_arr_append_(void **p, size_t size)
} }
#define tal_arr_append(p) tal_arr_append_((void **)(p), sizeof(**(p))) #define tal_arr_append(p) tal_arr_append_((void **)(p), sizeof(**(p)))
static void do_peer_write(struct peer *peer)
{
int r;
size_t len = tal_len(peer->peer_outmsg);
r = write(PEER_FD, peer->peer_outmsg + peer->peer_outoff,
len - peer->peer_outoff);
if (r < 0)
status_failed(STATUS_FAIL_PEER_IO,
"Peer write failed: %s", strerror(errno));
peer->peer_outoff += r;
if (peer->peer_outoff == len)
peer->peer_outmsg = tal_free(peer->peer_outmsg);
}
static bool peer_write_pending(struct peer *peer)
{
const u8 *msg;
if (peer->peer_outmsg)
return true;
msg = msg_dequeue(&peer->peer_out);
if (!msg)
return false;
status_trace("peer_out %s", wire_type_name(fromwire_peektype(msg)));
peer->peer_outmsg = cryptomsg_encrypt_msg(peer, &peer->cs, take(msg));
peer->peer_outoff = 0;
return true;
}
/* Synchronous flush of all pending packets. */
static void flush_peer_out(struct peer *peer)
{
while (peer_write_pending(peer))
do_peer_write(peer);
}
static void enqueue_peer_msg(struct peer *peer, const u8 *msg TAKES)
{
#if DEVELOPER
enum dev_disconnect d = dev_disconnect(fromwire_peektype(msg));
/* We want to effect this exact packet, so flush any pending. */
if (d != DEV_DISCONNECT_NORMAL)
flush_peer_out(peer);
switch (d) {
case DEV_DISCONNECT_BEFORE:
/* Fail immediately. */
dev_sabotage_fd(PEER_FD);
msg_enqueue(&peer->peer_out, msg);
flush_peer_out(peer);
/* Should not return */
abort();
case DEV_DISCONNECT_DROPPKT:
tal_free(msg);
/* Fail next time we try to do something. */
dev_sabotage_fd(PEER_FD);
return;
case DEV_DISCONNECT_AFTER:
msg_enqueue(&peer->peer_out, msg);
flush_peer_out(peer);
dev_sabotage_fd(PEER_FD);
return;
case DEV_DISCONNECT_BLACKHOLE:
msg_enqueue(&peer->peer_out, msg);
dev_blackhole_fd(PEER_FD);
return;
case DEV_DISCONNECT_NORMAL:
break;
}
#endif
msg_enqueue(&peer->peer_out, msg);
}
static void gossip_in(struct peer *peer, const u8 *msg) static void gossip_in(struct peer *peer, const u8 *msg)
{ {
u8 *gossip; u8 *gossip;
@ -192,7 +265,7 @@ static void gossip_in(struct peer *peer, const u8 *msg)
if (type == WIRE_CHANNEL_ANNOUNCEMENT || type == WIRE_CHANNEL_UPDATE || if (type == WIRE_CHANNEL_ANNOUNCEMENT || type == WIRE_CHANNEL_UPDATE ||
type == WIRE_NODE_ANNOUNCEMENT) type == WIRE_NODE_ANNOUNCEMENT)
msg_enqueue(&peer->peer_out, gossip); enqueue_peer_msg(peer, gossip);
else else
status_failed(STATUS_FAIL_GOSSIP_IO, status_failed(STATUS_FAIL_GOSSIP_IO,
"Got bad message type %s from gossipd: %s", "Got bad message type %s from gossipd: %s",
@ -287,7 +360,7 @@ static void send_announcement_signatures(struct peer *peer)
tmpctx, &peer->channel_id, &peer->short_channel_ids[LOCAL], tmpctx, &peer->channel_id, &peer->short_channel_ids[LOCAL],
&peer->announcement_node_sigs[LOCAL], &peer->announcement_node_sigs[LOCAL],
&peer->announcement_bitcoin_sigs[LOCAL]); &peer->announcement_bitcoin_sigs[LOCAL]);
msg_enqueue(&peer->peer_out, take(msg)); enqueue_peer_msg(peer, take(msg));
tal_free(tmpctx); tal_free(tmpctx);
} }
@ -612,7 +685,7 @@ static void maybe_send_shutdown(struct peer *peer)
msg = towire_shutdown(peer, &peer->channel_id, msg = towire_shutdown(peer, &peer->channel_id,
peer->unsent_shutdown_scriptpubkey); peer->unsent_shutdown_scriptpubkey);
msg_enqueue(&peer->peer_out, take(msg)); enqueue_peer_msg(peer, take(msg));
peer->unsent_shutdown_scriptpubkey peer->unsent_shutdown_scriptpubkey
= tal_free(peer->unsent_shutdown_scriptpubkey); = tal_free(peer->unsent_shutdown_scriptpubkey);
peer->shutdown_sent[LOCAL] = true; peer->shutdown_sent[LOCAL] = true;
@ -804,7 +877,7 @@ static void send_commit(struct peer *peer)
feerate, max); feerate, max);
msg = towire_update_fee(peer, &peer->channel_id, feerate); msg = towire_update_fee(peer, &peer->channel_id, feerate);
msg_enqueue(&peer->peer_out, take(msg)); enqueue_peer_msg(peer, take(msg));
} }
/* BOLT #2: /* BOLT #2:
@ -846,7 +919,7 @@ static void send_commit(struct peer *peer)
msg = towire_commitment_signed(peer, &peer->channel_id, msg = towire_commitment_signed(peer, &peer->channel_id,
&peer->next_commit_sigs->commit_sig, &peer->next_commit_sigs->commit_sig,
peer->next_commit_sigs->htlc_sigs); peer->next_commit_sigs->htlc_sigs);
msg_enqueue(&peer->peer_out, take(msg)); enqueue_peer_msg(peer, take(msg));
peer->next_commit_sigs = tal_free(peer->next_commit_sigs); peer->next_commit_sigs = tal_free(peer->next_commit_sigs);
maybe_send_shutdown(peer); maybe_send_shutdown(peer);
@ -918,7 +991,7 @@ static void send_revocation(struct peer *peer)
start_commit_timer(peer); start_commit_timer(peer);
} }
msg_enqueue(&peer->peer_out, take(msg)); enqueue_peer_msg(peer, take(msg));
} }
static u8 *got_commitsig_msg(const tal_t *ctx, static u8 *got_commitsig_msg(const tal_t *ctx,
@ -1389,7 +1462,7 @@ static void handle_ping(struct peer *peer, const u8 *msg)
: "nothing"); : "nothing");
if (pong) if (pong)
msg_enqueue(&peer->peer_out, take(pong)); enqueue_peer_msg(peer, take(pong));
} }
static void handle_pong(struct peer *peer, const u8 *pong) static void handle_pong(struct peer *peer, const u8 *pong)
@ -1541,7 +1614,7 @@ static void resend_revoke(struct peer *peer)
{ {
/* Current commit is peer->next_index[LOCAL]-1, revoke prior */ /* Current commit is peer->next_index[LOCAL]-1, revoke prior */
u8 *msg = make_revocation_msg(peer, peer->next_index[LOCAL]-2); u8 *msg = make_revocation_msg(peer, peer->next_index[LOCAL]-2);
msg_enqueue(&peer->peer_out, take(msg)); enqueue_peer_msg(peer, take(msg));
} }
static void send_fail_or_fulfill(struct peer *peer, const struct htlc *h) static void send_fail_or_fulfill(struct peer *peer, const struct htlc *h)
@ -1566,7 +1639,7 @@ static void send_fail_or_fulfill(struct peer *peer, const struct htlc *h)
&peer->channel_id, &peer->channel_id,
"HTLC %"PRIu64" state %s not failed/fulfilled", "HTLC %"PRIu64" state %s not failed/fulfilled",
h->id, htlc_state_name(h->state)); h->id, htlc_state_name(h->state));
msg_enqueue(&peer->peer_out, take(msg)); enqueue_peer_msg(peer, take(msg));
} }
static void resend_commitment(struct peer *peer, const struct changed_htlc *last) static void resend_commitment(struct peer *peer, const struct changed_htlc *last)
@ -1607,7 +1680,7 @@ static void resend_commitment(struct peer *peer, const struct changed_htlc *last
abs_locktime_to_blocks( abs_locktime_to_blocks(
&h->expiry), &h->expiry),
h->routing); h->routing);
msg_enqueue(&peer->peer_out, take(msg)); enqueue_peer_msg(peer, take(msg));
} else if (h->state == SENT_REMOVE_COMMIT) { } else if (h->state == SENT_REMOVE_COMMIT) {
send_fail_or_fulfill(peer, h); send_fail_or_fulfill(peer, h);
} }
@ -1617,7 +1690,7 @@ static void resend_commitment(struct peer *peer, const struct changed_htlc *last
if (peer->channel->funder == LOCAL) { if (peer->channel->funder == LOCAL) {
msg = towire_update_fee(peer, &peer->channel_id, msg = towire_update_fee(peer, &peer->channel_id,
channel_feerate(peer->channel, REMOTE)); channel_feerate(peer->channel, REMOTE));
msg_enqueue(&peer->peer_out, take(msg)); enqueue_peer_msg(peer, take(msg));
} }
/* Re-send the commitment_signed itself. */ /* Re-send the commitment_signed itself. */
@ -1625,7 +1698,7 @@ static void resend_commitment(struct peer *peer, const struct changed_htlc *last
msg = towire_commitment_signed(peer, &peer->channel_id, msg = towire_commitment_signed(peer, &peer->channel_id,
&commit_sigs->commit_sig, &commit_sigs->commit_sig,
commit_sigs->htlc_sigs); commit_sigs->htlc_sigs);
msg_enqueue(&peer->peer_out, take(msg)); enqueue_peer_msg(peer, take(msg));
tal_free(commit_sigs); tal_free(commit_sigs);
assert(peer->revocations_received == peer->next_index[REMOTE] - 2); assert(peer->revocations_received == peer->next_index[REMOTE] - 2);
@ -1703,7 +1776,7 @@ again:
msg = towire_funding_locked(peer, msg = towire_funding_locked(peer,
&peer->channel_id, &peer->channel_id,
&next_per_commit_point); &next_per_commit_point);
msg_enqueue(&peer->peer_out, take(msg)); enqueue_peer_msg(peer, take(msg));
} }
/* Note: next_index is the index of the current commit we're working /* Note: next_index is the index of the current commit we're working
@ -1807,7 +1880,7 @@ again:
* disable flag */ * disable flag */
cupdate = create_channel_update(peer, peer, false); cupdate = create_channel_update(peer, peer, false);
wire_sync_write(GOSSIP_FD, cupdate); wire_sync_write(GOSSIP_FD, cupdate);
msg_enqueue(&peer->peer_out, take(cupdate)); enqueue_peer_msg(peer, take(cupdate));
/* Corner case: we will get upset with them if they send /* Corner case: we will get upset with them if they send
* commitment_signed with no changes. But it could be that we sent a * commitment_signed with no changes. But it could be that we sent a
@ -1835,7 +1908,7 @@ static void handle_funding_locked(struct peer *peer, const u8 *msg)
type_to_string(trc, struct pubkey, &next_per_commit_point)); type_to_string(trc, struct pubkey, &next_per_commit_point));
msg = towire_funding_locked(peer, msg = towire_funding_locked(peer,
&peer->channel_id, &next_per_commit_point); &peer->channel_id, &next_per_commit_point);
msg_enqueue(&peer->peer_out, take(msg)); enqueue_peer_msg(peer, take(msg));
peer->funding_locked[LOCAL] = true; peer->funding_locked[LOCAL] = true;
if (peer->funding_locked[REMOTE]) { if (peer->funding_locked[REMOTE]) {
@ -1893,7 +1966,7 @@ static void handle_offer_htlc(struct peer *peer, const u8 *inmsg)
peer->htlc_id, amount_msat, peer->htlc_id, amount_msat,
&payment_hash, cltv_expiry, &payment_hash, cltv_expiry,
onion_routing_packet); onion_routing_packet);
msg_enqueue(&peer->peer_out, take(msg)); enqueue_peer_msg(peer, take(msg));
peer->funding_locked[LOCAL] = true; peer->funding_locked[LOCAL] = true;
start_commit_timer(peer); start_commit_timer(peer);
/* Tell the master. */ /* Tell the master. */
@ -1983,7 +2056,7 @@ static void handle_preimage(struct peer *peer, const u8 *inmsg)
case CHANNEL_ERR_REMOVE_OK: case CHANNEL_ERR_REMOVE_OK:
msg = towire_update_fulfill_htlc(peer, &peer->channel_id, msg = towire_update_fulfill_htlc(peer, &peer->channel_id,
id, &preimage); id, &preimage);
msg_enqueue(&peer->peer_out, take(msg)); enqueue_peer_msg(peer, take(msg));
start_commit_timer(peer); start_commit_timer(peer);
return; return;
/* These shouldn't happen, because any offered HTLC (which would give /* These shouldn't happen, because any offered HTLC (which would give
@ -2149,7 +2222,7 @@ static void handle_fail(struct peer *peer, const u8 *inmsg)
msg = towire_update_fail_htlc(peer, &peer->channel_id, msg = towire_update_fail_htlc(peer, &peer->channel_id,
id, reply); id, reply);
} }
msg_enqueue(&peer->peer_out, take(msg)); enqueue_peer_msg(peer, take(msg));
start_commit_timer(peer); start_commit_timer(peer);
return; return;
case CHANNEL_ERR_NO_SUCH_ID: case CHANNEL_ERR_NO_SUCH_ID:
@ -2175,7 +2248,7 @@ static void handle_ping_cmd(struct peer *peer, const u8 *inmsg)
if (tal_len(ping) > 65535) if (tal_len(ping) > 65535)
status_failed(STATUS_FAIL_MASTER_IO, "Oversize channel_ping"); status_failed(STATUS_FAIL_MASTER_IO, "Oversize channel_ping");
msg_enqueue(&peer->peer_out, take(ping)); enqueue_peer_msg(peer, take(ping));
status_trace("sending ping expecting %sresponse", status_trace("sending ping expecting %sresponse",
num_pong_bytes >= 65532 ? "no " : ""); num_pong_bytes >= 65532 ? "no " : "");
@ -2400,79 +2473,15 @@ static void init_channel(struct peer *peer)
/* If we have a funding_signed message, send that immediately */ /* If we have a funding_signed message, send that immediately */
if (funding_signed) if (funding_signed)
msg_enqueue(&peer->peer_out, take(funding_signed)); enqueue_peer_msg(peer, take(funding_signed));
tal_free(msg); tal_free(msg);
} }
#ifndef TESTING
static void do_peer_write(struct peer *peer)
{
int r;
size_t len = tal_len(peer->peer_outmsg);
r = write(PEER_FD, peer->peer_outmsg + peer->peer_outoff,
len - peer->peer_outoff);
if (r < 0)
status_failed(STATUS_FAIL_PEER_IO,
"Peer write failed: %s", strerror(errno));
peer->peer_outoff += r;
if (peer->peer_outoff == len) {
peer->peer_outmsg = tal_free(peer->peer_outmsg);
#if DEVELOPER
if (peer->post_sabotage)
dev_sabotage_fd(PEER_FD);
#endif
}
}
static bool peer_write_pending(struct peer *peer)
{
const u8 *msg;
if (peer->peer_outmsg)
return true;
msg = msg_dequeue(&peer->peer_out);
if (!msg)
return false;
#if DEVELOPER
peer->post_sabotage = false;
switch (dev_disconnect(fromwire_peektype(msg))) {
case DEV_DISCONNECT_BEFORE:
dev_sabotage_fd(PEER_FD);
break;
case DEV_DISCONNECT_DROPPKT:
tal_free(msg);
peer->post_sabotage = true;
peer->peer_outmsg = NULL;
peer->peer_outoff = 0;
return true;
case DEV_DISCONNECT_AFTER:
peer->post_sabotage = true;
break;
case DEV_DISCONNECT_BLACKHOLE:
dev_blackhole_fd(PEER_FD);
break;
case DEV_DISCONNECT_NORMAL:
break;
}
#endif
status_trace("peer_out %s", wire_type_name(fromwire_peektype(msg)));
peer->peer_outmsg = cryptomsg_encrypt_msg(peer, &peer->cs, take(msg));
peer->peer_outoff = 0;
return true;
}
static void send_shutdown_complete(struct peer *peer) static void send_shutdown_complete(struct peer *peer)
{ {
/* Push out any incomplete messages to peer. */ /* Push out any incomplete messages to peer. */
while (peer_write_pending(peer)) flush_peer_out(peer);
do_peer_write(peer);
/* Now we can tell master shutdown is complete. */ /* Now we can tell master shutdown is complete. */
wire_sync_write(MASTER_FD, wire_sync_write(MASTER_FD,
@ -2513,9 +2522,6 @@ int main(int argc, char *argv[])
msg_queue_init(&peer->peer_out, peer); msg_queue_init(&peer->peer_out, peer);
peer->peer_outmsg = NULL; peer->peer_outmsg = NULL;
peer->peer_outoff = 0; peer->peer_outoff = 0;
#if DEVELOPER
peer->post_sabotage = false;
#endif
peer->next_commit_sigs = NULL; peer->next_commit_sigs = NULL;
peer->shutdown_sent[LOCAL] = false; peer->shutdown_sent[LOCAL] = false;
@ -2637,4 +2643,3 @@ int main(int argc, char *argv[])
return 0; return 0;
} }
#endif /* TESTING */

Loading…
Cancel
Save