From 9de382719930a607707c30c895f227bce40911fc Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Wed, 29 Nov 2017 09:20:14 +1030 Subject: [PATCH] channeld: don't use ccan/io, go sync. We revert to a simple select() loop. This makes things simpler, and fixes the problem where we want to exit but we've partially read a peer packet. We still queue up outgoing peer packets for non-blocking send: if we went full sync there, we'd risk deadlock if both sides wrote a huge number of packets and neither was reading. This also greatly simplifies the next patches, where we want to make our first get/response from gossipd. Signed-off-by: Rusty Russell --- channeld/channel.c | 556 +++++++++++++++++++++-------------------- wire/gen_peer_wire_csv | 2 + 2 files changed, 285 insertions(+), 273 deletions(-) diff --git a/channeld/channel.c b/channeld/channel.c index 656fbd4e4..5ef1c149b 100644 --- a/channeld/channel.c +++ b/channeld/channel.c @@ -1,3 +1,15 @@ +/* Main channel operation daemon: runs from funding_locked to shutdown_complete. + * + * We're fairly synchronous: our main loop looks for gossip, master or + * peer requests and services them synchronously. + * + * The exceptions are: + * 1. When we've asked the master something: in that case, we queue + * non-response packets for later processing while we await the reply. + * 2. We queue and send non-blocking responses to peers: if both peers were + * reading and writing synchronously we could deadlock if we hit buffer + * limits, unlikely as that is. + */ #include #include #include @@ -6,7 +18,6 @@ #include #include #include -#include #include #include #include @@ -16,8 +27,6 @@ #include #include #include -#include -#include #include #include #include @@ -59,7 +68,7 @@ struct commit_sigs { }; struct peer { - struct peer_crypto_state pcs; + struct crypto_state cs; struct channel_config conf[NUM_SIDES]; bool funding_locked[NUM_SIDES]; u64 next_index[NUM_SIDES]; @@ -94,10 +103,17 @@ struct peer { struct channel_id channel_id; struct channel *channel; + /* Pending msgs to send (not encrypted) */ struct msg_queue peer_out; - struct io_conn *peer_conn; - struct daemon_conn gossip_client; + /* Current msg to send, and offset (encrypted) */ + const u8 *peer_outmsg; + size_t peer_outoff; + +#if DEVELOPER + /* Sabotage fd after sending next msg. */ + bool post_sabotage; +#endif /* Messages from master: we queue them since we might be waiting for * a specific reply. */ @@ -157,11 +173,8 @@ static void *tal_arr_append_(void **p, size_t size) } #define tal_arr_append(p) tal_arr_append_((void **)(p), sizeof(**(p))) -static struct io_plan *gossip_client_recv(struct io_conn *conn, - struct daemon_conn *dc) +static void gossip_in(struct peer *peer, const u8 *msg) { - u8 *msg = dc->msg_in; - struct peer *peer = container_of(dc, struct peer, gossip_client); u16 type = fromwire_peektype(msg); if (type == WIRE_CHANNEL_ANNOUNCEMENT || type == WIRE_CHANNEL_UPDATE || @@ -171,8 +184,6 @@ static struct io_plan *gossip_client_recv(struct io_conn *conn, status_failed(STATUS_FAIL_GOSSIP_IO, "Got bad message from gossipd: %s", tal_hex(msg, msg)); - - return daemon_conn_read_next(conn, dc); } static void send_announcement_signatures(struct peer *peer) @@ -304,21 +315,7 @@ static u8 *create_channel_announcement(const tal_t *ctx, struct peer *peer) return cannounce; } -static struct io_plan *peer_out(struct io_conn *conn, struct peer *peer) -{ - const u8 *out = msg_dequeue(&peer->peer_out); - if (!out) - return msg_queue_wait(conn, &peer->peer_out, peer_out, peer); - - status_trace("peer_out %s", wire_type_name(fromwire_peektype(out))); - return peer_write_message(conn, &peer->pcs, out, peer_out); -} - -static struct io_plan *peer_in(struct io_conn *conn, struct peer *peer, u8 *msg); - -static struct io_plan *handle_peer_funding_locked(struct io_conn *conn, - struct peer *peer, - const u8 *msg) +static void handle_peer_funding_locked(struct peer *peer, const u8 *msg) { struct channel_id chanid; @@ -328,16 +325,16 @@ static struct io_plan *handle_peer_funding_locked(struct io_conn *conn, * it receives one. */ if (peer->funding_locked[REMOTE]) - return peer_read_message(conn, &peer->pcs, peer_in); + return; peer->old_remote_per_commit = peer->remote_per_commit; if (!fromwire_funding_locked(msg, NULL, &chanid, &peer->remote_per_commit)) - peer_failed(PEER_FD, &peer->pcs.cs, &peer->channel_id, + peer_failed(PEER_FD, &peer->cs, &peer->channel_id, "Bad funding_locked %s", tal_hex(msg, msg)); if (!structeq(&chanid, &peer->channel_id)) - peer_failed(PEER_FD, &peer->pcs.cs, &peer->channel_id, + peer_failed(PEER_FD, &peer->cs, &peer->channel_id, "Wrong channel id in %s (expected %s)", tal_hex(trc, msg), type_to_string(msg, struct channel_id, @@ -354,8 +351,6 @@ static struct io_plan *handle_peer_funding_locked(struct io_conn *conn, } send_announcement_signatures(peer); - - return peer_read_message(conn, &peer->pcs, peer_in); } static void announce_channel(struct peer *peer) @@ -372,9 +367,7 @@ static void announce_channel(struct peer *peer) tal_free(cannounce); } -static struct io_plan *handle_peer_announcement_signatures(struct io_conn *conn, - struct peer *peer, - const u8 *msg) +static void handle_peer_announcement_signatures(struct peer *peer, const u8 *msg) { struct channel_id chanid; @@ -383,14 +376,14 @@ static struct io_plan *handle_peer_announcement_signatures(struct io_conn *conn, &peer->short_channel_ids[REMOTE], &peer->announcement_node_sigs[REMOTE], &peer->announcement_bitcoin_sigs[REMOTE])) - peer_failed(PEER_FD, &peer->pcs.cs, &peer->channel_id, + peer_failed(PEER_FD, &peer->cs, &peer->channel_id, "Bad announcement_signatures %s", tal_hex(msg, msg)); /* Make sure we agree on the channel ids */ /* FIXME: Check short_channel_id */ if (!structeq(&chanid, &peer->channel_id)) { - peer_failed(PEER_FD, &peer->pcs.cs, &peer->channel_id, + peer_failed(PEER_FD, &peer->cs, &peer->channel_id, "Wrong channel_id or short_channel_id in %s or %s", tal_hexstr(trc, &chanid, sizeof(struct channel_id)), tal_hexstr(trc, &peer->short_channel_ids[REMOTE], @@ -402,8 +395,6 @@ static struct io_plan *handle_peer_announcement_signatures(struct io_conn *conn, /* We have the remote sigs, do we have the local ones as well? */ if (peer->funding_locked[LOCAL] && peer->have_sigs[LOCAL]) announce_channel(peer); - - return peer_read_message(conn, &peer->pcs, peer_in); } static void get_shared_secret(const struct htlc *htlc, @@ -435,8 +426,7 @@ static void get_shared_secret(const struct htlc *htlc, tal_free(tmpctx); } -static struct io_plan *handle_peer_add_htlc(struct io_conn *conn, - struct peer *peer, const u8 *msg) +static void handle_peer_add_htlc(struct peer *peer, const u8 *msg) { struct channel_id channel_id; u64 id; @@ -451,7 +441,7 @@ static struct io_plan *handle_peer_add_htlc(struct io_conn *conn, &payment_hash, &cltv_expiry, onion_routing_packet)) peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Bad peer_add_htlc %s", tal_hex(msg, msg)); @@ -460,7 +450,7 @@ static struct io_plan *handle_peer_add_htlc(struct io_conn *conn, onion_routing_packet, &htlc); if (add_err != CHANNEL_ERR_ADD_OK) peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Bad peer_add_htlc: %u", add_err); @@ -468,19 +458,16 @@ static struct io_plan *handle_peer_add_htlc(struct io_conn *conn, * send it to the master which handles all HTLC failures. */ htlc->shared_secret = tal(htlc, struct secret); get_shared_secret(htlc, htlc->shared_secret); - - return peer_read_message(conn, &peer->pcs, peer_in); } -static struct io_plan *handle_peer_feechange(struct io_conn *conn, - struct peer *peer, const u8 *msg) +static void handle_peer_feechange(struct peer *peer, const u8 *msg) { struct channel_id channel_id; u32 feerate; if (!fromwire_update_fee(msg, NULL, &channel_id, &feerate)) { peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Bad update_fee %s", tal_hex(msg, msg)); } @@ -492,7 +479,7 @@ static struct io_plan *handle_peer_feechange(struct io_conn *conn, */ if (peer->channel->funder != REMOTE) peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "update_fee from non-funder?"); @@ -503,7 +490,7 @@ static struct io_plan *handle_peer_feechange(struct io_conn *conn, */ if (feerate < peer->feerate_min || feerate > peer->feerate_max) peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "update_fee %u outside range %u-%u", feerate, peer->feerate_min, peer->feerate_max); @@ -517,13 +504,12 @@ static struct io_plan *handle_peer_feechange(struct io_conn *conn, */ if (!channel_update_feerate(peer->channel, feerate)) peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "update_fee %u unaffordable", feerate); status_trace("peer updated fee to %u", feerate); - return peer_read_message(conn, &peer->pcs, peer_in); } static struct changed_htlc *changed_htlc_arr(const tal_t *ctx, @@ -774,9 +760,6 @@ static void send_commit(struct peer *peer) /* Covers the case where we've just been told to shutdown. */ maybe_send_shutdown(peer); - if (shutdown_complete(peer)) - io_break(peer); - peer->commit_timer = NULL; tal_free(tmpctx); return; @@ -812,10 +795,6 @@ static void send_commit(struct peer *peer) /* Timer now considered expired, you can add a new one. */ peer->commit_timer = NULL; start_commit_timer(peer); - - if (shutdown_complete(peer)) - io_break(peer); - tal_free(tmpctx); } @@ -866,8 +845,7 @@ static u8 *make_revocation_msg(const struct peer *peer, u64 revoke_index) &point); } -/* We come back here once master has acked the commit_sig we received */ -static struct io_plan *send_revocation(struct io_conn *conn, struct peer *peer) +static void send_revocation(struct peer *peer) { /* Revoke previous commit. */ u8 *msg = make_revocation_msg(peer, peer->next_index[LOCAL]-1); @@ -882,12 +860,6 @@ static struct io_plan *send_revocation(struct io_conn *conn, struct peer *peer) } msg_enqueue(&peer->peer_out, take(msg)); - - /* This might have been the final revoke_and_ack... */ - if (shutdown_complete(peer)) - io_break(peer); - - return peer_read_message(conn, &peer->pcs, peer_in); } static u8 *got_commitsig_msg(const tal_t *ctx, @@ -964,8 +936,7 @@ static u8 *got_commitsig_msg(const tal_t *ctx, return msg; } -static struct io_plan *handle_peer_commit_sig(struct io_conn *conn, - struct peer *peer, const u8 *msg) +static void handle_peer_commit_sig(struct peer *peer, const u8 *msg) { const tal_t *tmpctx = tal_tmpctx(peer); struct channel_id channel_id; @@ -984,7 +955,7 @@ static struct io_plan *handle_peer_commit_sig(struct io_conn *conn, * does not include any updates. */ peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "commit_sig with no changes"); } @@ -998,7 +969,7 @@ static struct io_plan *handle_peer_commit_sig(struct io_conn *conn, if (!fromwire_commitment_signed(tmpctx, msg, NULL, &channel_id, &commit_sig, &htlc_sigs)) peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Bad commit_sig %s", tal_hex(msg, msg)); @@ -1030,7 +1001,7 @@ static struct io_plan *handle_peer_commit_sig(struct io_conn *conn, &peer->channel->funding_pubkey[REMOTE], &commit_sig)) { dump_htlcs(peer->channel, "receiving commit_sig"); peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Bad commit_sig signature %"PRIu64" %s for tx %s wscript %s key %s", peer->next_index[LOCAL], @@ -1051,7 +1022,7 @@ static struct io_plan *handle_peer_commit_sig(struct io_conn *conn, */ if (tal_count(htlc_sigs) != tal_count(txs) - 1) peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Expected %zu htlc sigs, not %zu", tal_count(txs) - 1, tal_count(htlc_sigs)); @@ -1066,7 +1037,7 @@ static struct io_plan *handle_peer_commit_sig(struct io_conn *conn, if (!check_tx_sig(txs[1+i], 0, NULL, wscripts[1+i], &remote_htlckey, &htlc_sigs[i])) peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Bad commit_sig signature %s for htlc %s wscript %s key %s", type_to_string(msg, secp256k1_ecdsa_signature, &htlc_sigs[i]), @@ -1087,7 +1058,7 @@ static struct io_plan *handle_peer_commit_sig(struct io_conn *conn, master_wait_sync_reply(tmpctx, peer, take(msg), WIRE_CHANNEL_GOT_COMMITSIG_REPLY); tal_free(tmpctx); - return send_revocation(conn, peer); + return send_revocation(peer); } static u8 *got_revoke_msg(const tal_t *ctx, u64 revoke_num, @@ -1117,22 +1088,7 @@ static u8 *got_revoke_msg(const tal_t *ctx, u64 revoke_num, return msg; } -/* We come back here once master has acked the revoke_and_ack we received */ -static struct io_plan *accepted_revocation(struct io_conn *conn, - struct peer *peer) -{ - start_commit_timer(peer); - - /* We might now have an empty HTLC. */ - if (shutdown_complete(peer)) - io_break(peer); - - return peer_read_message(conn, &peer->pcs, peer_in); -} - -static struct io_plan *handle_peer_revoke_and_ack(struct io_conn *conn, - struct peer *peer, - const u8 *msg) +static void handle_peer_revoke_and_ack(struct peer *peer, const u8 *msg) { struct sha256 old_commit_secret; struct privkey privkey; @@ -1144,7 +1100,7 @@ static struct io_plan *handle_peer_revoke_and_ack(struct io_conn *conn, if (!fromwire_revoke_and_ack(msg, NULL, &channel_id, &old_commit_secret, &next_per_commit)) { peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Bad revoke_and_ack %s", tal_hex(msg, msg)); } @@ -1165,14 +1121,14 @@ static struct io_plan *handle_peer_revoke_and_ack(struct io_conn *conn, memcpy(&privkey, &old_commit_secret, sizeof(privkey)); if (!pubkey_from_privkey(&privkey, &per_commit_point)) { peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Bad privkey %s", type_to_string(msg, struct privkey, &privkey)); } if (!pubkey_eq(&per_commit_point, &peer->old_remote_per_commit)) { peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Wrong privkey %s for %"PRIu64" %s", type_to_string(msg, struct privkey, &privkey), @@ -1204,12 +1160,12 @@ static struct io_plan *handle_peer_revoke_and_ack(struct io_conn *conn, type_to_string(trc, struct pubkey, &peer->old_remote_per_commit)); + start_commit_timer(peer); + tal_free(tmpctx); - return accepted_revocation(conn, peer); } -static struct io_plan *handle_peer_fulfill_htlc(struct io_conn *conn, - struct peer *peer, const u8 *msg) +static void handle_peer_fulfill_htlc(struct peer *peer, const u8 *msg) { struct channel_id channel_id; u64 id; @@ -1219,7 +1175,7 @@ static struct io_plan *handle_peer_fulfill_htlc(struct io_conn *conn, if (!fromwire_update_fulfill_htlc(msg, NULL, &channel_id, &id, &preimage)) { peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Bad update_fulfill_htlc %s", tal_hex(msg, msg)); } @@ -1229,7 +1185,7 @@ static struct io_plan *handle_peer_fulfill_htlc(struct io_conn *conn, case CHANNEL_ERR_REMOVE_OK: /* FIXME: We could send preimages to master immediately. */ start_commit_timer(peer); - return peer_read_message(conn, &peer->pcs, peer_in); + return; /* These shouldn't happen, because any offered HTLC (which would give * us the preimage) should have timed out long before. If we * were to get preimages from other sources, this could happen. */ @@ -1239,7 +1195,7 @@ static struct io_plan *handle_peer_fulfill_htlc(struct io_conn *conn, case CHANNEL_ERR_HTLC_NOT_IRREVOCABLE: case CHANNEL_ERR_BAD_PREIMAGE: peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Bad update_fulfill_htlc: failed to fulfill %" PRIu64 " error %u", id, e); @@ -1247,8 +1203,7 @@ static struct io_plan *handle_peer_fulfill_htlc(struct io_conn *conn, abort(); } -static struct io_plan *handle_peer_fail_htlc(struct io_conn *conn, - struct peer *peer, const u8 *msg) +static void handle_peer_fail_htlc(struct peer *peer, const u8 *msg) { struct channel_id channel_id; u64 id; @@ -1259,7 +1214,7 @@ static struct io_plan *handle_peer_fail_htlc(struct io_conn *conn, if (!fromwire_update_fail_htlc(msg, msg, NULL, &channel_id, &id, &reason)) { peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Bad update_fulfill_htlc %s", tal_hex(msg, msg)); } @@ -1271,14 +1226,14 @@ static struct io_plan *handle_peer_fail_htlc(struct io_conn *conn, htlc = channel_get_htlc(peer->channel, LOCAL, id); htlc->fail = tal_steal(htlc, reason); start_commit_timer(peer); - return peer_read_message(conn, &peer->pcs, peer_in); + return; case CHANNEL_ERR_NO_SUCH_ID: case CHANNEL_ERR_ALREADY_FULFILLED: case CHANNEL_ERR_HTLC_UNCOMMITTED: case CHANNEL_ERR_HTLC_NOT_IRREVOCABLE: case CHANNEL_ERR_BAD_PREIMAGE: peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Bad update_fail_htlc: failed to remove %" PRIu64 " error %u", id, e); @@ -1286,9 +1241,7 @@ static struct io_plan *handle_peer_fail_htlc(struct io_conn *conn, abort(); } -static struct io_plan *handle_peer_fail_malformed_htlc(struct io_conn *conn, - struct peer *peer, - const u8 *msg) +static void handle_peer_fail_malformed_htlc(struct peer *peer, const u8 *msg) { struct channel_id channel_id; u64 id; @@ -1302,7 +1255,7 @@ static struct io_plan *handle_peer_fail_malformed_htlc(struct io_conn *conn, &sha256_of_onion, &failure_code)) { peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Bad update_fail_malformed_htlc %s", tal_hex(msg, msg)); @@ -1315,7 +1268,7 @@ static struct io_plan *handle_peer_fail_malformed_htlc(struct io_conn *conn, */ if (!(failure_code & BADONION)) { peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Bad update_fail_malformed_htlc failure code %u", failure_code); @@ -1347,14 +1300,14 @@ static struct io_plan *handle_peer_fail_malformed_htlc(struct io_conn *conn, /* FIXME: Make htlc->fail a u8 *! */ htlc->fail = fail; start_commit_timer(peer); - return peer_read_message(conn, &peer->pcs, peer_in); + return; case CHANNEL_ERR_NO_SUCH_ID: case CHANNEL_ERR_ALREADY_FULFILLED: case CHANNEL_ERR_HTLC_UNCOMMITTED: case CHANNEL_ERR_HTLC_NOT_IRREVOCABLE: case CHANNEL_ERR_BAD_PREIMAGE: peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Bad update_fail_malformed_htlc: failed to remove %" PRIu64 " error %u", id, e); @@ -1362,14 +1315,13 @@ static struct io_plan *handle_peer_fail_malformed_htlc(struct io_conn *conn, abort(); } -static struct io_plan *handle_ping(struct io_conn *conn, - struct peer *peer, const u8 *msg) +static void handle_ping(struct peer *peer, const u8 *msg) { u8 *pong; if (!check_ping_make_pong(peer, msg, &pong)) peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Bad ping"); @@ -1379,43 +1331,38 @@ static struct io_plan *handle_ping(struct io_conn *conn, if (pong) msg_enqueue(&peer->peer_out, take(pong)); - return peer_read_message(conn, &peer->pcs, peer_in); } -static struct io_plan *handle_pong(struct io_conn *conn, - struct peer *peer, const u8 *pong) +static void handle_pong(struct peer *peer, const u8 *pong) { u8 *ignored; status_trace("Got pong!"); if (!fromwire_pong(pong, pong, NULL, &ignored)) peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Bad pong %s", tal_hex(pong, pong)); if (!peer->num_pings_outstanding) peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Unexpected pong"); peer->num_pings_outstanding--; wire_sync_write(MASTER_FD, take(towire_channel_ping_reply(pong, tal_len(pong)))); - return peer_read_message(conn, &peer->pcs, peer_in); } -static struct io_plan *handle_peer_shutdown(struct io_conn *conn, - struct peer *peer, - const u8 *shutdown) +static void handle_peer_shutdown(struct peer *peer, const u8 *shutdown) { struct channel_id channel_id; u8 *scriptpubkey; if (!fromwire_shutdown(peer, shutdown, NULL, &channel_id, &scriptpubkey)) peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Bad shutdown %s", tal_hex(peer, shutdown)); @@ -1424,13 +1371,9 @@ static struct io_plan *handle_peer_shutdown(struct io_conn *conn, take(towire_channel_got_shutdown(peer, scriptpubkey))); peer->shutdown_sent[REMOTE] = true; - if (shutdown_complete(peer)) - io_break(peer); - - return peer_read_message(conn, &peer->pcs, peer_in); } -static struct io_plan *peer_in(struct io_conn *conn, struct peer *peer, u8 *msg) +static void peer_in(struct peer *peer, const u8 *msg) { enum wire_type type = fromwire_peektype(msg); status_trace("peer_in %s", wire_type_name(type)); @@ -1444,7 +1387,7 @@ static struct io_plan *peer_in(struct io_conn *conn, struct peer *peer, u8 *msg) && type != WIRE_NODE_ANNOUNCEMENT && type != WIRE_PING) { peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "%s (%u) before funding locked", wire_type_name(type), type); @@ -1453,35 +1396,50 @@ static struct io_plan *peer_in(struct io_conn *conn, struct peer *peer, u8 *msg) switch (type) { case WIRE_FUNDING_LOCKED: - return handle_peer_funding_locked(conn, peer, msg); + handle_peer_funding_locked(peer, msg); + return; case WIRE_ANNOUNCEMENT_SIGNATURES: - return handle_peer_announcement_signatures(conn, peer, msg); + handle_peer_announcement_signatures(peer, msg); + return; case WIRE_CHANNEL_ANNOUNCEMENT: case WIRE_CHANNEL_UPDATE: case WIRE_NODE_ANNOUNCEMENT: /* Forward to gossip daemon */ - daemon_conn_send(&peer->gossip_client, msg); - return peer_read_message(conn, &peer->pcs, peer_in); + if (!wire_sync_write(GOSSIP_FD, msg)) + status_failed(STATUS_FAIL_GOSSIP_IO, + "Forwarding to gossipd: %s", + strerror(errno)); + return; case WIRE_UPDATE_ADD_HTLC: - return handle_peer_add_htlc(conn, peer, msg); + handle_peer_add_htlc(peer, msg); + return; case WIRE_COMMITMENT_SIGNED: - return handle_peer_commit_sig(conn, peer, msg); + handle_peer_commit_sig(peer, msg); + return; case WIRE_UPDATE_FEE: - return handle_peer_feechange(conn, peer, msg); + handle_peer_feechange(peer, msg); + return; case WIRE_REVOKE_AND_ACK: - return handle_peer_revoke_and_ack(conn, peer, msg); + handle_peer_revoke_and_ack(peer, msg); + return; case WIRE_UPDATE_FULFILL_HTLC: - return handle_peer_fulfill_htlc(conn, peer, msg); + handle_peer_fulfill_htlc(peer, msg); + return; case WIRE_UPDATE_FAIL_HTLC: - return handle_peer_fail_htlc(conn, peer, msg); + handle_peer_fail_htlc(peer, msg); + return; case WIRE_UPDATE_FAIL_MALFORMED_HTLC: - return handle_peer_fail_malformed_htlc(conn, peer, msg); + handle_peer_fail_malformed_htlc(peer, msg); + return; case WIRE_PING: - return handle_ping(conn, peer, msg); + handle_ping(peer, msg); + return; case WIRE_PONG: - return handle_pong(conn, peer, msg); + handle_pong(peer, msg); + return; case WIRE_SHUTDOWN: - return handle_peer_shutdown(conn, peer, msg); + handle_peer_shutdown(peer, msg); + return; case WIRE_INIT: case WIRE_ERROR: @@ -1491,37 +1449,27 @@ static struct io_plan *peer_in(struct io_conn *conn, struct peer *peer, u8 *msg) case WIRE_FUNDING_SIGNED: case WIRE_CHANNEL_REESTABLISH: case WIRE_CLOSING_SIGNED: - goto badmessage; + break; } -badmessage: peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Peer sent unknown message %u (%s)", type, wire_type_name(type)); } -static struct io_plan *setup_peer_conn(struct io_conn *conn, struct peer *peer) +static void peer_conn_broken(struct peer *peer) { - return io_duplex(conn, peer_read_message(conn, &peer->pcs, peer_in), - peer_out(conn, peer)); -} + const char *e = strerror(errno); -static void peer_conn_broken(struct io_conn *conn, struct peer *peer) -{ /* If we have signatures, send an update to say we're disabled. */ if (peer->have_sigs[LOCAL] && peer->have_sigs[REMOTE]) { - u8 *cupdate = create_channel_update(conn, peer, true); + u8 *cupdate = create_channel_update(peer, peer, true); - daemon_conn_send(&peer->gossip_client, cupdate); - msg_enqueue(&peer->peer_out, take(cupdate)); - - /* Make sure gossipd actually gets this message before dying */ - daemon_conn_sync_flush(&peer->gossip_client); + wire_sync_write(GOSSIP_FD, take(cupdate)); } - status_failed(STATUS_FAIL_PEER_IO, - "peer connection broken: %s", strerror(errno)); + status_failed(STATUS_FAIL_PEER_IO, "peer read failed: %s", e); } static void resend_revoke(struct peer *peer) @@ -1549,7 +1497,7 @@ static void send_fail_or_fulfill(struct peer *peer, const struct htlc *h) h->r); } else peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "HTLC %"PRIu64" state %s not failed/fulfilled", h->id, htlc_state_name(h->state)); @@ -1582,7 +1530,7 @@ static void resend_commitment(struct peer *peer, const struct changed_htlc *last * then they asked for a retransmit */ if (!h) peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "Can't find HTLC %"PRIu64" to resend", last[i].id); @@ -1644,19 +1592,19 @@ static void peer_reconnect(struct peer *peer) msg = towire_channel_reestablish(peer, &peer->channel_id, peer->next_index[LOCAL], peer->revocations_received); - if (!sync_crypto_write(&peer->pcs.cs, PEER_FD, take(msg))) + if (!sync_crypto_write(&peer->cs, PEER_FD, take(msg))) status_failed(STATUS_FAIL_PEER_IO, "Failed writing reestablish: %s", strerror(errno)); again: - msg = sync_crypto_read(peer, &peer->pcs.cs, PEER_FD); + msg = sync_crypto_read(peer, &peer->cs, PEER_FD); if (!msg) status_failed(STATUS_FAIL_PEER_IO, "Failed reading reestablish: %s", strerror(errno)); if (is_gossip_msg(msg)) { /* Forward to gossip daemon */ - daemon_conn_send(&peer->gossip_client, msg); + wire_sync_write(GOSSIP_FD, take(msg)); goto again; } @@ -1755,7 +1703,7 @@ again: */ } else if (next_local_commitment_number != peer->next_index[REMOTE]) peer_failed(PEER_FD, - &peer->pcs.cs, + &peer->cs, &peer->channel_id, "bad reestablish commitment_number: %"PRIu64 " vs %"PRIu64, @@ -1793,7 +1741,7 @@ again: /* Reenable channel by sending a channel_update without the * disable flag */ cupdate = create_channel_update(peer, peer, false); - daemon_conn_send(&peer->gossip_client, cupdate); + wire_sync_write(GOSSIP_FD, cupdate); msg_enqueue(&peer->peer_out, take(cupdate)); /* Corner case: we will get upset with them if they send @@ -2199,32 +2147,32 @@ static void req_in(struct peer *peer, const u8 *msg) switch (t) { case WIRE_CHANNEL_FUNDING_LOCKED: handle_funding_locked(peer, msg); - goto out; + return; case WIRE_CHANNEL_FUNDING_ANNOUNCE_DEPTH: handle_funding_announce_depth(peer, msg); - goto out; + return; case WIRE_CHANNEL_OFFER_HTLC: handle_offer_htlc(peer, msg); - goto out; + return; case WIRE_CHANNEL_FEERATES: handle_feerates(peer, msg); - goto out; + return; case WIRE_CHANNEL_FULFILL_HTLC: handle_preimage(peer, msg); - goto out; + return; case WIRE_CHANNEL_FAIL_HTLC: handle_fail(peer, msg); - goto out; + return; case WIRE_CHANNEL_PING: handle_ping_cmd(peer, msg); - goto out; + return; case WIRE_CHANNEL_SEND_SHUTDOWN: handle_shutdown_cmd(peer, msg); - goto out; + return; case WIRE_CHANNEL_DEV_REENABLE_COMMIT: #if DEVELOPER handle_dev_reenable_commit(peer); - goto out; + return; #endif /* DEVELOPER */ case WIRE_CHANNEL_NORMAL_OPERATION: case WIRE_CHANNEL_INIT: @@ -2244,9 +2192,6 @@ static void req_in(struct peer *peer, const u8 *msg) break; } master_badmsg(-1, msg); - -out: - tal_free(msg); } /* We do this synchronously. */ @@ -2284,7 +2229,7 @@ static void init_channel(struct peer *peer) feerate_per_kw, &peer->feerate_min, &peer->feerate_max, &peer->their_commit_sig, - &peer->pcs.cs, + &peer->cs, &funding_pubkey[REMOTE], &points[REMOTE].revocation, &points[REMOTE].payment, @@ -2377,9 +2322,6 @@ static void init_channel(struct peer *peer) if (reconnected) peer_reconnect(peer); - peer->peer_conn = io_new_conn(peer, PEER_FD, setup_peer_conn, peer); - io_set_finish(peer->peer_conn, peer_conn_broken, peer); - /* If we have a funding_signed message, send that immediately */ if (funding_signed) msg_enqueue(&peer->peer_out, take(funding_signed)); @@ -2388,101 +2330,89 @@ static void init_channel(struct peer *peer) } #ifndef TESTING -static void gossip_gone(struct io_conn *unused, struct daemon_conn *dc) +static void do_peer_write(struct peer *peer) { - status_failed(STATUS_FAIL_GOSSIP_IO, - "Gossip connection closed"); + int r; + size_t len = tal_len(peer->peer_outmsg); + + r = write(PEER_FD, peer->peer_outmsg + peer->peer_outoff, + len - peer->peer_outoff); + if (r < 0) + status_failed(STATUS_FAIL_PEER_IO, + "Peer write failed: %s", strerror(errno)); + + peer->peer_outoff += r; + if (peer->peer_outoff == len) { + peer->peer_outmsg = tal_free(peer->peer_outmsg); +#if DEVELOPER + if (peer->post_sabotage) + dev_sabotage_fd(PEER_FD); +#endif + } } -/* FIXME: This doesn't cover partly read packets! We could be halfway - * through receiving a gossip msg, for example. We'll simply reconnect - * in this case, but the real fix is to wean off ccan/io here, as it doesn't - * buy us anything: a poll for read on gossipfd, masterfd and peerfd then acting - * synchronous would be a simpler model. */ -static void send_shutdown_complete(struct peer *peer) +static bool peer_write_pending(struct peer *peer) { const u8 *msg; - /* Push out any outstanding messages to peer. */ - if (!io_flush_sync(peer->peer_conn)) - status_failed(STATUS_FAIL_PEER_IO, "Syncing conn"); + if (peer->peer_outmsg) + return true; - /* Set FD blocking to flush it */ - io_fd_block(PEER_FD, true); + msg = msg_dequeue(&peer->peer_out); + if (!msg) + return false; - while ((msg = msg_dequeue(&peer->peer_out)) != NULL) { - if (!sync_crypto_write(&peer->pcs.cs, PEER_FD, take(msg))) - status_failed(STATUS_FAIL_PEER_IO, - "Flushing msgs"); +#if DEVELOPER + peer->post_sabotage = false; + + switch (dev_disconnect(fromwire_peektype(msg))) { + case DEV_DISCONNECT_BEFORE: + dev_sabotage_fd(PEER_FD); + break; + case DEV_DISCONNECT_DROPPKT: + msg = tal_free(msg); + peer->post_sabotage = true; + peer->peer_outmsg = NULL; + peer->peer_outoff = 0; + return true; + case DEV_DISCONNECT_AFTER: + peer->post_sabotage = true; + break; + case DEV_DISCONNECT_BLACKHOLE: + dev_blackhole_fd(PEER_FD); + break; + case DEV_DISCONNECT_NORMAL: + break; } +#endif + + status_trace("peer_out %s", wire_type_name(fromwire_peektype(msg))); + peer->peer_outmsg = cryptomsg_encrypt_msg(peer, &peer->cs, take(msg)); + peer->peer_outoff = 0; + return true; +} + +static void send_shutdown_complete(struct peer *peer) +{ + /* Push out any incomplete messages to peer. */ + while (peer_write_pending(peer)) + do_peer_write(peer); /* Now we can tell master shutdown is complete. */ wire_sync_write(MASTER_FD, take(towire_channel_shutdown_complete(peer, - &peer->pcs.cs))); + &peer->cs))); fdpass_send(MASTER_FD, PEER_FD); fdpass_send(MASTER_FD, GOSSIP_FD); close(MASTER_FD); } -static bool process_reqs(struct peer *peer) -{ - const u8 *msg; - bool changed = false; - - /* In case we've deferred, process packet backlog. */ - while ((msg = msg_dequeue(&peer->from_master)) != NULL) { - status_trace("Now dealing with deferred %s", - channel_wire_type_name(fromwire_peektype(msg))); - req_in(peer, msg); - changed = true; - } - - return changed; -} - -static struct peer *peer; - -/* If this becomes a common pattern, we could make it a helper in common/ */ -static int poll_with_masterfd(struct pollfd *fds, nfds_t nfds, int timeout) -{ - struct pollfd *fds_plus; - int r; - - /* This can change things, so return as if poll found nothing. */ - if (process_reqs(peer)) - return 0; - - /* Add master fd to fds. */ - fds_plus = tal_dup_arr(peer, struct pollfd, fds, nfds, 1); - fds_plus[nfds].fd = MASTER_FD; - fds_plus[nfds].events = POLLIN; - fds_plus[nfds].revents = 0; - - r = debug_poll(fds_plus, nfds+1, timeout); - if (r > 0) { - if (fds_plus[nfds].revents & POLLIN) { - u8 *msg = wire_sync_read(peer, MASTER_FD); - - if (!msg) - status_failed(STATUS_FAIL_MASTER_IO, - "Can't read command: %s", - strerror(errno)); - msg_enqueue(&peer->from_master, take(msg)); - r--; - } else if (fds_plus[nfds].revents & (POLLHUP|POLLNVAL|POLLERR)) - /* Can't report error, master gone. */ - errx(2, "Error polling master fd"); - } - /* Copy back revents values */ - memcpy(fds, fds_plus, nfds * sizeof(*fds)); - tal_free(fds_plus); - return r; -} - int main(int argc, char *argv[]) { - int i; + int i, nfds; + fd_set fds_in, fds_out; + struct peer *peer; + if (argc == 2 && streq(argv[1], "--version")) { printf("%s\n", version()); exit(0); @@ -2503,6 +2433,11 @@ int main(int argc, char *argv[]) peer->announce_depth_reached = false; msg_queue_init(&peer->from_master, peer); msg_queue_init(&peer->peer_out, peer); + peer->peer_outmsg = NULL; + peer->peer_outoff = 0; +#if DEVELOPER + peer->post_sabotage = false; +#endif peer->next_commit_sigs = NULL; peer->shutdown_sent[LOCAL] = false; @@ -2515,24 +2450,99 @@ int main(int argc, char *argv[]) sizeof(peer->announcement_bitcoin_sigs[i])); } - daemon_conn_init(peer, &peer->gossip_client, GOSSIP_FD, - gossip_client_recv, gossip_gone); - - init_peer_crypto_state(peer, &peer->pcs); - /* Read init_channel message sync. */ init_channel(peer); - /* Make sure we process and listen for master msgs. */ - io_poll_override(poll_with_masterfd); + FD_ZERO(&fds_in); + FD_SET(MASTER_FD, &fds_in); + FD_SET(PEER_FD, &fds_in); + FD_SET(GOSSIP_FD, &fds_in); + + FD_ZERO(&fds_out); + FD_SET(PEER_FD, &fds_out); + nfds = GOSSIP_FD+1; + + while (!shutdown_complete(peer)) { + struct timemono first; + fd_set rfds = fds_in, wfds, *wptr; + struct timeval timeout, *tptr; + struct timer *expired; + const u8 *msg; + struct timemono now = time_mono(); + + /* For simplicity, we process one event at a time. */ + msg = msg_dequeue(&peer->from_master); + if (msg) { + status_trace("Now dealing with deferred %s", + channel_wire_type_name( + fromwire_peektype(msg))); + req_in(peer, msg); + continue; + } - for (;;) { - struct timer *expired = NULL; - io_loop(&peer->timers, &expired); + expired = timers_expire(&peer->timers, now); + if (expired) { + timer_expired(peer, expired); + continue; + } - if (!expired) - break; - timer_expired(peer, expired); + if (timer_earliest(&peer->timers, &first)) { + timeout = timespec_to_timeval( + timemono_between(first, now).ts); + tptr = &timeout; + } else + tptr = NULL; + + if (peer_write_pending(peer)) { + wfds = fds_out; + wptr = &wfds; + } else + wptr = NULL; + + if (select(nfds, &rfds, wptr, NULL, tptr) < 0) + status_failed(STATUS_FAIL_INTERNAL_ERROR, + "select failed: %s", strerror(errno)); + + /* Try writing out encrypted packet if any (don't block!) */ + if (wptr && FD_ISSET(PEER_FD, wptr)) { + if (!io_fd_block(PEER_FD, false)) + status_failed(STATUS_FAIL_INTERNAL_ERROR, + "NONBLOCK failed: %s", + strerror(errno)); + do_peer_write(peer); + if (!io_fd_block(PEER_FD, true)) + status_failed(STATUS_FAIL_INTERNAL_ERROR, + "NONBLOCK unset failed: %s", + strerror(errno)); + continue; + } + + if (FD_ISSET(MASTER_FD, &rfds)) { + msg = wire_sync_read(peer, MASTER_FD); + + if (!msg) + status_failed(STATUS_FAIL_MASTER_IO, + "Can't read command: %s", + strerror(errno)); + req_in(peer, msg); + } else if (FD_ISSET(GOSSIP_FD, &rfds)) { + msg = wire_sync_read(peer, GOSSIP_FD); + + if (!msg) + status_failed(STATUS_FAIL_GOSSIP_IO, + "Can't read command: %s", + strerror(errno)); + gossip_in(peer, msg); + } else if (FD_ISSET(PEER_FD, &rfds)) { + /* This could take forever, but who cares? */ + msg = sync_crypto_read(peer, &peer->cs, PEER_FD); + + if (!msg) + peer_conn_broken(peer); + peer_in(peer, msg); + } else + msg = NULL; + tal_free(msg); } /* We only exit when shutdown is complete. */ diff --git a/wire/gen_peer_wire_csv b/wire/gen_peer_wire_csv index 7b9398361..4ac2c04ae 100644 --- a/wire/gen_peer_wire_csv +++ b/wire/gen_peer_wire_csv @@ -33,6 +33,7 @@ open_channel,219,delayed_payment_basepoint,33 open_channel,252,htlc_basepoint,33 open_channel,285,first_per_commitment_point,33 open_channel,318,channel_flags,1 +open_channel,319,shutdown_len,2,option_upfront_shutdown_script accept_channel,33 accept_channel,0,temporary_channel_id,32 accept_channel,32,dust_limit_satoshis,8 @@ -48,6 +49,7 @@ accept_channel,138,payment_basepoint,33 accept_channel,171,delayed_payment_basepoint,33 accept_channel,204,htlc_basepoint,33 accept_channel,237,first_per_commitment_point,33 +accept_channel,270,shutdown_len,2,option_upfront_shutdown_script funding_created,34 funding_created,0,temporary_channel_id,32 funding_created,32,funding_txid,32