Browse Source

connectd/peer_exchange_initmsg: handle peer comms ourselves.

connectd is the only user of the cryptomsg async APIs; better to
open-code it here.  We need to expose a little from cryptomsg(),
but we remove the 'struct peer' entirely from connectd.

One trick is that we still need to defer telling lightningd when a
peer reconnects (until it tells us the old one is disconnected).  So
now we generate the message for lightningd and send it once we're woken.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
fee-tracking2
Rusty Russell 6 years ago
parent
commit
a1bdaa8f99
  1. 2
      common/cryptomsg.c
  2. 16
      common/cryptomsg.h
  3. 1
      connectd/Makefile
  4. 208
      connectd/connectd.c
  5. 12
      connectd/connectd.h
  6. 177
      connectd/peer_exchange_initmsg.c
  7. 18
      connectd/peer_exchange_initmsg.h

2
common/cryptomsg.c

@ -315,7 +315,7 @@ u8 *cryptomsg_encrypt_msg(const tal_t *ctx,
tal_hexstr(trc, msg, mlen),
tal_hexstr(trc, npub, sizeof(npub)),
tal_hexstr(trc, &cs->sk, sizeof(cs->sk)),
tal_hexstr(trc, out + 18, clen));
tal_hexstr(trc, out + CRYPTOMSG_HDR_SIZE, clen));
#endif
maybe_rotate_key(&cs->sn, &cs->sk, &cs->s_ck);

16
common/cryptomsg.h

@ -36,6 +36,22 @@ struct io_plan *peer_write_message(struct io_conn *conn,
const u8 *msg,
struct io_plan *(*next)(struct io_conn *,
struct peer *));
/* BOLT #8:
*
* ### Receiving and Decrypting Messages
*
* In order to decrypt the _next_ message in the network stream, the
* following steps are completed:
*
* 1. Read _exactly_ 18 bytes from the network buffer.
*/
#define CRYPTOMSG_HDR_SIZE 18
/* BOLT #8:
*
* 4. Read _exactly_ `l+16` bytes from the network buffer
*/
#define CRYPTOMSG_BODY_OVERHEAD 16
/* Low-level functions for sync comms: doesn't discard unknowns! */
u8 *cryptomsg_encrypt_msg(const tal_t *ctx,

1
connectd/Makefile

@ -15,6 +15,7 @@ LIGHTNINGD_CONNECT_CONTROL_OBJS := $(LIGHTNINGD_CONNECT_CONTROL_SRC:.c=.o)
LIGHTNINGD_CONNECT_HEADERS := connectd/gen_connect_wire.h \
connectd/gen_connect_gossip_wire.h \
connectd/connectd.h \
connectd/peer_exchange_initmsg.h \
connectd/handshake.h \
connectd/netaddress.h \
connectd/tor_autoservice.h \

208
connectd/connectd.c

@ -33,6 +33,7 @@
#include <connectd/gen_connect_wire.h>
#include <connectd/handshake.h>
#include <connectd/netaddress.h>
#include <connectd/peer_exchange_initmsg.h>
#include <connectd/tor.h>
#include <connectd/tor_autoservice.h>
#include <errno.h>
@ -140,27 +141,6 @@ struct reaching {
u32 seconds_waited;
};
/* This is a transitory structure: we hand off to the master daemon as soon
* as we've completed INIT read/write. */
struct peer {
struct daemon *daemon;
/* The ID of the peer */
struct pubkey id;
/* Where it's connected to. */
struct wireaddr_internal addr;
/* Feature bitmaps. */
u8 *gfeatures, *lfeatures;
/* Cryptostate */
struct peer_crypto_state pcs;
/* Our connection (and owner) */
struct io_conn *conn;
};
/* Mutual recursion */
static void try_reach_one_addr(struct reaching *reach);
@ -194,24 +174,6 @@ static bool broken_resolver(struct daemon *daemon)
return daemon->broken_resolver_response != NULL;
}
static struct peer *new_peer(struct io_conn *conn,
struct daemon *daemon,
const struct pubkey *their_id,
const struct wireaddr_internal *addr,
const struct crypto_state *cs)
{
struct peer *peer = tal(conn, struct peer);
peer->conn = conn;
peer->id = *their_id;
peer->addr = *addr;
peer->daemon = daemon;
init_peer_crypto_state(peer, &peer->pcs);
peer->pcs.cs = *cs;
return peer;
}
static void destroy_reaching(struct reaching *reach)
{
list_del_from(&reach->daemon->reaching, &reach->list);
@ -228,10 +190,12 @@ static struct reaching *find_reaching(struct daemon *daemon,
return NULL;
}
static void reached_peer(struct peer *peer, struct io_conn *conn)
static void reached_peer(struct io_conn *conn,
struct daemon *daemon,
const struct pubkey *id)
{
/* OK, we've reached the peer successfully, tell everyone. */
struct reaching *r = find_reaching(peer->daemon, &peer->id);
struct reaching *r = find_reaching(daemon, id);
if (!r)
return;
@ -240,166 +204,122 @@ static void reached_peer(struct peer *peer, struct io_conn *conn)
io_set_finish(conn, NULL, NULL);
/* Don't free conn with reach */
tal_steal(peer->daemon, conn);
tal_steal(daemon, conn);
tal_free(r);
}
static int get_gossipfd(struct peer *peer)
static int get_gossipfd(struct daemon *daemon,
const struct pubkey *id,
const u8 *lfeatures)
{
bool gossip_queries_feature, initial_routing_sync, success;
u8 *msg;
gossip_queries_feature
= feature_offered(peer->lfeatures, LOCAL_GOSSIP_QUERIES)
&& feature_offered(peer->daemon->localfeatures,
= feature_offered(lfeatures, LOCAL_GOSSIP_QUERIES)
&& feature_offered(daemon->localfeatures,
LOCAL_GOSSIP_QUERIES);
initial_routing_sync
= feature_offered(peer->lfeatures, LOCAL_INITIAL_ROUTING_SYNC);
= feature_offered(lfeatures, LOCAL_INITIAL_ROUTING_SYNC);
/* We do this communication sync. */
msg = towire_gossip_new_peer(NULL, &peer->id, gossip_queries_feature,
msg = towire_gossip_new_peer(NULL, id, gossip_queries_feature,
initial_routing_sync);
if (!wire_sync_write(GOSSIPCTL_FD, take(msg)))
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Failed writing to gossipctl: %s",
strerror(errno));
msg = wire_sync_read(peer, GOSSIPCTL_FD);
msg = wire_sync_read(tmpctx, GOSSIPCTL_FD);
if (!fromwire_gossip_new_peer_reply(msg, &success))
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Failed parsing msg gossipctl: %s",
tal_hex(tmpctx, msg));
if (!success) {
status_broken("Gossipd did not give us an fd: losing peer %s",
type_to_string(tmpctx, struct pubkey, &peer->id));
type_to_string(tmpctx, struct pubkey, id));
return -1;
}
return fdpass_recv(GOSSIPCTL_FD);
}
static struct io_plan *peer_close_after_error(struct io_conn *conn,
struct peer *peer)
{
status_trace("%s: we sent them a fatal error, closing",
type_to_string(tmpctx, struct pubkey, &peer->id));
return io_close(conn);
}
struct peer_reconnected {
struct daemon *daemon;
struct pubkey id;
const u8 *peer_connected_msg;
const u8 *lfeatures;
};
/* Mutual recursion */
static struct io_plan *peer_connected(struct io_conn *conn, struct peer *peer);
static struct io_plan *retry_peer_connected(struct io_conn *conn,
struct peer *peer)
struct peer_reconnected *pr)
{
struct io_plan *plan;
status_trace("peer %s: processing now old peer gone",
type_to_string(tmpctx, struct pubkey, &peer->id));
type_to_string(tmpctx, struct pubkey, &pr->id));
return peer_connected(conn, peer);
plan = peer_connected(conn, pr->daemon, &pr->id,
take(pr->peer_connected_msg),
take(pr->lfeatures));
tal_free(pr);
return plan;
}
static struct io_plan *peer_connected(struct io_conn *conn, struct peer *peer)
struct io_plan *peer_connected(struct io_conn *conn,
struct daemon *daemon,
const struct pubkey *id TAKES,
const u8 *peer_connected_msg TAKES,
const u8 *lfeatures TAKES)
{
struct daemon *daemon = peer->daemon;
u8 *msg;
int gossip_fd;
struct pubkey *key;
int gossip_fd;
/* FIXME: We could do this before exchanging init msgs. */
key = pubkey_set_get(&daemon->peers, &peer->id);
key = pubkey_set_get(&daemon->peers, id);
if (key) {
u8 *msg;
struct peer_reconnected *r;
status_trace("peer %s: reconnect",
type_to_string(tmpctx, struct pubkey, &peer->id));
type_to_string(tmpctx, struct pubkey, id));
/* Tell master to kill it: will send peer_disconnect */
msg = towire_connect_reconnected(NULL, &peer->id);
msg = towire_connect_reconnected(NULL, id);
daemon_conn_send(&daemon->master, take(msg));
return io_wait(conn, key, retry_peer_connected, peer);
/* Save arguments for next time. */
r = tal(daemon, struct peer_reconnected);
r->daemon = daemon;
r->id = *id;
r->peer_connected_msg
= tal_dup_arr(r, u8, peer_connected_msg,
tal_count(peer_connected_msg), 0);
r->lfeatures
= tal_dup_arr(r, u8, lfeatures, tal_count(lfeatures), 0);
return io_wait(conn, key, retry_peer_connected, r);
}
reached_peer(peer, conn);
reached_peer(conn, daemon, id);
gossip_fd = get_gossipfd(daemon, id, lfeatures);
/* We promised we'd take it. */
if (taken(lfeatures))
tal_free(lfeatures);
gossip_fd = get_gossipfd(peer);
if (gossip_fd < 0)
return io_close(conn);
msg = towire_connect_peer_connected(tmpctx, &peer->id, &peer->addr,
&peer->pcs.cs,
peer->gfeatures, peer->lfeatures);
daemon_conn_send(&daemon->master, msg);
daemon_conn_send(&daemon->master, peer_connected_msg);
daemon_conn_send_fd(&daemon->master, io_conn_fd(conn));
daemon_conn_send_fd(&daemon->master, gossip_fd);
pubkey_set_add(&daemon->peers,
tal_dup(daemon, struct pubkey, &peer->id));
pubkey_set_add(&daemon->peers, tal_dup(daemon, struct pubkey, id));
/* This frees the peer. */
return io_close_taken_fd(conn);
}
static struct io_plan *peer_init_received(struct io_conn *conn,
struct peer *peer,
u8 *msg)
{
if (!fromwire_init(peer, msg, &peer->gfeatures, &peer->lfeatures)) {
status_trace("peer %s bad fromwire_init '%s', closing",
type_to_string(tmpctx, struct pubkey, &peer->id),
tal_hex(tmpctx, msg));
return io_close(conn);
}
if (!features_supported(peer->gfeatures, peer->lfeatures)) {
const u8 *global_features = get_offered_global_features(msg);
const u8 *local_features = get_offered_local_features(msg);
msg = towire_errorfmt(NULL, NULL, "Unsupported features %s/%s:"
" we only offer globalfeatures %s"
" and localfeatures %s",
tal_hex(msg, peer->gfeatures),
tal_hex(msg, peer->lfeatures),
tal_hexstr(msg,
global_features,
tal_count(global_features)),
tal_hexstr(msg,
local_features,
tal_count(local_features)));
return peer_write_message(conn, &peer->pcs, take(msg),
peer_close_after_error);
}
return peer_connected(conn, peer);
}
static struct io_plan *read_init(struct io_conn *conn, struct peer *peer)
{
/* BOLT #1:
*
* The receiving node:
* - MUST wait to receive `init` before sending any other messages.
*/
return peer_read_message(conn, &peer->pcs, peer_init_received);
}
/* This creates a temporary peer which is not in the list and is owner
* by the connection; it's placed in the list and owned by daemon once
* we have the features. */
static struct io_plan *init_new_peer(struct io_conn *conn,
const struct pubkey *their_id,
const struct wireaddr_internal *addr,
const struct crypto_state *cs,
struct daemon *daemon)
{
struct peer *peer = new_peer(conn, daemon, their_id, addr, cs);
u8 *initmsg;
/* BOLT #1:
*
* The sending node:
* - MUST send `init` as the first Lightning message for any
* connection.
*/
initmsg = towire_init(NULL,
daemon->globalfeatures, daemon->localfeatures);
return peer_write_message(conn, &peer->pcs, take(initmsg), read_init);
}
struct listen_fd {
int fd;
/* If we bind() IPv6 then IPv4 to same port, we *may* fail to listen()
@ -456,7 +376,7 @@ static struct io_plan *handshake_in_success(struct io_conn *conn,
{
status_trace("Connect IN from %s",
type_to_string(tmpctx, struct pubkey, id));
return init_new_peer(conn, id, addr, cs, daemon);
return peer_exchange_initmsg(conn, daemon, cs, id, addr);
}
static struct io_plan *connection_in(struct io_conn *conn, struct daemon *daemon)
@ -830,7 +750,7 @@ static struct io_plan *handshake_out_success(struct io_conn *conn,
reach->connstate = "Exchanging init messages";
status_trace("Connect OUT to %s",
type_to_string(tmpctx, struct pubkey, id));
return init_new_peer(conn, id, addr, cs, reach->daemon);
return peer_exchange_initmsg(conn, reach->daemon, cs, id, addr);
}
struct io_plan *connection_out(struct io_conn *conn, struct reaching *reach)

12
connectd/connectd.h

@ -1,10 +1,22 @@
#ifndef LIGHTNING_CONNECTD_CONNECTD_H
#define LIGHTNING_CONNECTD_CONNECTD_H
#include "config.h"
#include <bitcoin/pubkey.h>
#include <common/crypto_state.h>
struct io_conn;
struct peer;
struct reaching;
struct daemon;
/* Called by io_tor_connect once it has a connection out. */
struct io_plan *connection_out(struct io_conn *conn, struct reaching *reach);
/* Called by peer_exchange_initmsg if successful. */
struct io_plan *peer_connected(struct io_conn *conn,
struct daemon *daemon,
const struct pubkey *id TAKES,
const u8 *peer_connected_msg TAKES,
const u8 *lfeatures TAKES);
#endif /* LIGHTNING_CONNECTD_CONNECTD_H */

177
connectd/peer_exchange_initmsg.c

@ -0,0 +1,177 @@
#include <ccan/io/io.h>
#include <common/cryptomsg.h>
#include <common/dev_disconnect.h>
#include <common/features.h>
#include <common/status.h>
#include <common/wire_error.h>
#include <connectd/connectd.h>
#include <connectd/gen_connect_wire.h>
#include <connectd/peer_exchange_initmsg.h>
#include <wire/peer_wire.h>
/* Temporary structure for us to read peer message in */
struct peer {
struct daemon *daemon;
/* The ID of the peer */
struct pubkey id;
/* Where it's connected to/from. */
struct wireaddr_internal addr;
/* Crypto state for writing/reading peer initmsg */
struct crypto_state cs;
/* Buffer for reading/writing message. */
u8 *msg;
};
/* Here in case we need to read another message. */
static struct io_plan *read_init(struct io_conn *conn, struct peer *peer);
static struct io_plan *peer_init_received(struct io_conn *conn,
struct peer *peer)
{
u8 *msg = cryptomsg_decrypt_body(peer, &peer->cs, peer->msg);
u8 *gfeatures, *lfeatures;
if (!msg)
return io_close(conn);
status_peer_io(LOG_IO_IN, msg);
/* BOLT #1:
*
* A receiving node:
* - upon receiving a message of _odd_, unknown type:
* - MUST ignore the received message.
*/
if (unlikely(is_unknown_msg_discardable(msg)))
return read_init(conn, peer);
if (!fromwire_init(peer, msg, &gfeatures, &lfeatures)) {
status_trace("peer %s bad fromwire_init '%s', closing",
type_to_string(tmpctx, struct pubkey, &peer->id),
tal_hex(tmpctx, msg));
return io_close(conn);
}
/* BOLT #1:
*
* The receiving node:
* ...
* - upon receiving unknown _odd_ feature bits that are non-zero:
* - MUST ignore the bit.
* - upon receiving unknown _even_ feature bits that are non-zero:
* - MUST fail the connection.
*/
if (!features_supported(gfeatures, lfeatures)) {
const u8 *global_features = get_offered_global_features(msg);
const u8 *local_features = get_offered_local_features(msg);
msg = towire_errorfmt(NULL, NULL, "Unsupported features %s/%s:"
" we only offer globalfeatures %s"
" and localfeatures %s",
tal_hex(msg, gfeatures),
tal_hex(msg, lfeatures),
tal_hex(msg, global_features),
tal_hex(msg, local_features));
msg = cryptomsg_encrypt_msg(NULL, &peer->cs, take(msg));
return io_write(conn, msg, tal_count(msg), io_close_cb, NULL);
}
/* Create message to tell master peer has connected. */
msg = towire_connect_peer_connected(NULL, &peer->id, &peer->addr,
&peer->cs, gfeatures, lfeatures);
/* Usually return io_close_taken_fd, but may wait for old peer to
* be disconnected if it's a reconnect. */
return peer_connected(conn, peer->daemon, &peer->id,
take(msg), take(lfeatures));
}
static struct io_plan *peer_init_hdr_received(struct io_conn *conn,
struct peer *peer)
{
u16 len;
if (!cryptomsg_decrypt_header(&peer->cs, peer->msg, &len))
return io_close(conn);
tal_free(peer->msg);
peer->msg = tal_arr(peer, u8, (u32)len + CRYPTOMSG_BODY_OVERHEAD);
return io_read(conn, peer->msg, tal_count(peer->msg),
peer_init_received, peer);
}
static struct io_plan *read_init(struct io_conn *conn, struct peer *peer)
{
/* Free our sent init msg. */
tal_free(peer->msg);
/* BOLT #1:
*
* The receiving node:
* - MUST wait to receive `init` before sending any other messages.
*/
peer->msg = tal_arr(peer, u8, CRYPTOMSG_HDR_SIZE);
return io_read(conn, peer->msg, tal_bytelen(peer->msg),
peer_init_hdr_received, peer);
}
#if DEVELOPER
static struct io_plan *peer_write_postclose(struct io_conn *conn,
struct peer *peer)
{
dev_sabotage_fd(io_conn_fd(conn));
return read_init(conn, peer);
}
#endif
struct io_plan *peer_exchange_initmsg(struct io_conn *conn,
struct daemon *daemon,
const struct crypto_state *cs,
const struct pubkey *id,
const struct wireaddr_internal *addr)
{
/* If conn is closed, forget peer */
struct peer *peer = tal(conn, struct peer);
struct io_plan *(*next)(struct io_conn *, struct peer *);
peer->daemon = daemon;
peer->id = *id;
peer->addr = *addr;
peer->cs = *cs;
/* BOLT #1:
*
* The sending node:
* - MUST send `init` as the first Lightning message for any
* connection.
*/
peer->msg = towire_init(NULL,
get_offered_global_features(tmpctx),
get_offered_local_features(tmpctx));
status_peer_io(LOG_IO_OUT, peer->msg);
peer->msg = cryptomsg_encrypt_msg(peer, &peer->cs, take(peer->msg));
next = read_init;
#if DEVELOPER
switch (dev_disconnect(WIRE_INIT)) {
case DEV_DISCONNECT_BEFORE:
dev_sabotage_fd(io_conn_fd(conn));
break;
case DEV_DISCONNECT_DROPPKT:
peer->msg = tal_free(peer->msg); /* FALL THRU */
case DEV_DISCONNECT_AFTER:
next = peer_write_postclose;
break;
case DEV_DISCONNECT_BLACKHOLE:
dev_blackhole_fd(io_conn_fd(conn));
break;
case DEV_DISCONNECT_NORMAL:
break;
}
#endif /* DEVELOPER */
return io_write(conn, peer->msg, tal_bytelen(peer->msg), next, peer);
}

18
connectd/peer_exchange_initmsg.h

@ -0,0 +1,18 @@
#ifndef LIGHTNING_CONNECTD_PEER_EXCHANGE_INITMSG_H
#define LIGHTNING_CONNECTD_PEER_EXCHANGE_INITMSG_H
#include "config.h"
struct crypto_state;
struct daemon;
struct io_conn;
struct pubkey;
struct wireaddr_internal;
/* If successful, calls peer_connected() */
struct io_plan *peer_exchange_initmsg(struct io_conn *conn,
struct daemon *daemon,
const struct crypto_state *cs,
const struct pubkey *id,
const struct wireaddr_internal *addr);
#endif /* LIGHTNING_CONNECTD_PEER_EXCHANGE_INITMSG_H */
Loading…
Cancel
Save