Browse Source

subd: simplify and cleanup lifetime handling.

There are now only two kinds of subdaemons: global ones (hsmd, gossipd) and
per-peer ones.  We can handle many callbacks internally now.

We can have a handler to set a new peer owner, and automatically do
the cleanup of the old one if necessary, since we now know which ones
are per-peer.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
ppa-0.6.1
Rusty Russell 7 years ago
committed by Christian Decker
parent
commit
5a256c724a
  1. 15
      lightningd/gossip_control.c
  2. 6
      lightningd/lightningd.c
  3. 152
      lightningd/peer_control.c
  4. 2
      lightningd/peer_control.h
  5. 123
      lightningd/subd.c
  6. 57
      lightningd/subd.h
  7. 5
      tests/test_lightningd.py

15
lightningd/gossip_control.c

@ -19,14 +19,6 @@
#include <wire/gen_peer_wire.h> #include <wire/gen_peer_wire.h>
#include <wire/wire_sync.h> #include <wire/wire_sync.h>
static void gossip_finished(struct subd *gossip, int status)
{
if (WIFEXITED(status))
errx(1, "Gossip failed (exit status %i), exiting.",
WEXITSTATUS(status));
errx(1, "Gossip failed (signal %u), exiting.", WTERMSIG(status));
}
static void peer_nongossip(struct subd *gossip, const u8 *msg, static void peer_nongossip(struct subd *gossip, const u8 *msg,
int peer_fd, int gossip_fd) int peer_fd, int gossip_fd)
{ {
@ -117,10 +109,9 @@ void gossip_init(struct lightningd *ld)
if (hsmfd < 0) if (hsmfd < 0)
fatal("Could not read fd from HSM: %s", strerror(errno)); fatal("Could not read fd from HSM: %s", strerror(errno));
ld->gossip = new_subd(ld, "lightning_gossipd", NULL, ld->gossip = new_global_subd(ld, "lightning_gossipd",
gossip_wire_type_name, gossip_wire_type_name, gossip_msg,
gossip_msg, NULL, gossip_finished, take(&hsmfd), NULL);
take(&hsmfd), NULL);
if (!ld->gossip) if (!ld->gossip)
err(1, "Could not subdaemon gossip"); err(1, "Could not subdaemon gossip");

6
lightningd/lightningd.c

@ -196,10 +196,8 @@ static void shutdown_subdaemons(struct lightningd *ld)
close(ld->hsm_fd); close(ld->hsm_fd);
subd_shutdown(ld->gossip, 10); subd_shutdown(ld->gossip, 10);
/* Duplicates are OK: no need to check here. */ while ((p = list_top(&ld->peers, struct peer, list)) != NULL)
list_for_each(&ld->peers, p, list) tal_free(p);
if (p->owner)
subd_shutdown(p->owner, 0);
} }
struct chainparams *get_chainparams(const struct lightningd *ld) struct chainparams *get_chainparams(const struct lightningd *ld)

152
lightningd/peer_control.c

@ -54,7 +54,6 @@ struct connect {
/* FIXME: Reorder */ /* FIXME: Reorder */
struct funding_channel; struct funding_channel;
static void peer_owner_finished(struct subd *subd, int status);
static void peer_offer_channel(struct lightningd *ld, static void peer_offer_channel(struct lightningd *ld,
struct funding_channel *fc, struct funding_channel *fc,
const struct crypto_state *cs, const struct crypto_state *cs,
@ -76,11 +75,19 @@ static void peer_accept_channel(struct lightningd *ld,
int peer_fd, int gossip_fd, int peer_fd, int gossip_fd,
const u8 *open_msg); const u8 *open_msg);
static void peer_set_owner(struct peer *peer, struct subd *owner)
{
struct subd *old_owner = peer->owner;
peer->owner = owner;
if (old_owner)
subd_release_peer(old_owner, peer);
}
static void destroy_peer(struct peer *peer) static void destroy_peer(struct peer *peer)
{ {
/* Don't leave owner pointer dangling. */ /* Free any old owner still hanging around. */
if (peer->owner && peer->owner->peer == peer) peer_set_owner(peer, NULL);
peer->owner->peer = NULL;
list_del_from(&peer->ld->peers, &peer->list); list_del_from(&peer->ld->peers, &peer->list);
} }
@ -137,7 +144,7 @@ static void drop_to_chain(struct peer *peer)
broadcast_tx(peer->ld->topology, peer, peer->last_tx, NULL); broadcast_tx(peer->ld->topology, peer, peer->last_tx, NULL);
} }
void peer_fail_permanent(struct peer *peer, const u8 *msg) void peer_fail_permanent(struct peer *peer, const u8 *msg TAKES)
{ {
/* BOLT #1: /* BOLT #1:
* *
@ -150,7 +157,7 @@ void peer_fail_permanent(struct peer *peer, const u8 *msg)
peer_state_name(peer->state), peer_state_name(peer->state),
(int)tal_len(msg), (char *)msg); (int)tal_len(msg), (char *)msg);
peer->error = towire_error(peer, &all_channels, msg); peer->error = towire_error(peer, &all_channels, msg);
peer->owner = tal_free(peer->owner); peer_set_owner(peer, NULL);
if (taken(msg)) if (taken(msg))
tal_free(msg); tal_free(msg);
@ -198,7 +205,7 @@ void peer_fail_transient(struct peer *peer, const char *fmt, ...)
return; return;
} }
peer->owner = NULL; peer_set_owner(peer, NULL);
/* If we haven't reached awaiting locked, we don't need to reconnect */ /* If we haven't reached awaiting locked, we don't need to reconnect */
if (!peer_persists(peer)) { if (!peer_persists(peer)) {
@ -220,17 +227,6 @@ void peer_fail_transient(struct peer *peer, const char *fmt, ...)
} }
} }
/* When daemon reports a STATUS_FAIL_PEER_BAD, it goes here. */
static void bad_peer(struct subd *subd, const char *msg)
{
struct peer *peer = subd->peer;
/* Don't close peer->owner, subd will clean that up. */
peer->owner = NULL;
subd->peer = NULL;
peer_fail_permanent_str(peer, msg);
}
void peer_set_condition(struct peer *peer, enum peer_state old_state, void peer_set_condition(struct peer *peer, enum peer_state old_state,
enum peer_state state) enum peer_state state)
{ {
@ -530,8 +526,6 @@ void peer_connected(struct lightningd *ld, const u8 *msg,
/* Now, do we already know this peer? */ /* Now, do we already know this peer? */
peer = peer_by_id(ld, &id); peer = peer_by_id(ld, &id);
if (peer) { if (peer) {
struct subd *owner;
log_debug(peer->log, "Peer has reconnected, state %s", log_debug(peer->log, "Peer has reconnected, state %s",
peer_state_name(peer->state)); peer_state_name(peer->state));
@ -548,9 +542,7 @@ void peer_connected(struct lightningd *ld, const u8 *msg,
/* Reconnect: discard old one. */ /* Reconnect: discard old one. */
case OPENINGD: case OPENINGD:
/* This kills daemon (frees peer!) */ peer = tal_free(peer);
tal_free(peer->owner);
peer = NULL;
goto return_to_gossipd; goto return_to_gossipd;
case ONCHAIND_CHEATED: case ONCHAIND_CHEATED:
@ -567,9 +559,7 @@ void peer_connected(struct lightningd *ld, const u8 *msg,
case CHANNELD_SHUTTING_DOWN: case CHANNELD_SHUTTING_DOWN:
/* Stop any existing daemon, without triggering error /* Stop any existing daemon, without triggering error
* on this peer. */ * on this peer. */
owner = peer->owner; peer_set_owner(peer, NULL);
peer->owner = NULL;
tal_free(owner);
peer_start_channeld(peer, &cs, peer_fd, gossip_fd, NULL, peer_start_channeld(peer, &cs, peer_fd, gossip_fd, NULL,
true); true);
@ -579,9 +569,7 @@ void peer_connected(struct lightningd *ld, const u8 *msg,
case CLOSINGD_COMPLETE: case CLOSINGD_COMPLETE:
/* Stop any existing daemon, without triggering error /* Stop any existing daemon, without triggering error
* on this peer. */ * on this peer. */
owner = peer->owner; peer_set_owner(peer, NULL);
peer->owner = NULL;
tal_free(owner);
peer_start_closingd(peer, &cs, peer_fd, gossip_fd, peer_start_closingd(peer, &cs, peer_fd, gossip_fd,
true); true);
@ -705,24 +693,6 @@ struct peer *peer_by_id(struct lightningd *ld, const struct pubkey *id)
return NULL; return NULL;
} }
/* When a per-peer subdaemon exits, see if we need to do anything. */
static void peer_owner_finished(struct subd *subd, int status)
{
/* If peer has moved on, do nothing (can be NULL if it errored out) */
if (!subd->peer || subd->peer->owner != subd) {
log_debug(subd->ld->log, "Subdaemon %s died (%i), peer moved",
subd->name, status);
return;
}
subd->peer->owner = NULL;
/* Don't do a transient error if it's already perm failed. */
if (!subd->peer->error)
peer_fail_transient(subd->peer, "Owning subdaemon %s died (%i)",
subd->name, status);
}
static void json_connect(struct command *cmd, static void json_connect(struct command *cmd,
const char *buffer, const jsmntok_t *params) const char *buffer, const jsmntok_t *params)
{ {
@ -1085,10 +1055,8 @@ static enum watch_result onchain_tx_watched(struct peer *peer,
struct sha256_double txid; struct sha256_double txid;
if (depth == 0) { if (depth == 0) {
struct subd *old_onchaind = peer->owner;
log_unusual(peer->log, "Chain reorganization!"); log_unusual(peer->log, "Chain reorganization!");
peer->owner = NULL; peer_set_owner(peer, NULL);
tal_free(old_onchaind);
/* FIXME! /* FIXME!
topology_rescan(peer->ld->topology, peer->funding_txid); topology_rescan(peer->ld->topology, peer->funding_txid);
@ -1330,12 +1298,11 @@ static enum watch_result funding_spent(struct peer *peer,
/* We could come from almost any state. */ /* We could come from almost any state. */
peer_set_condition(peer, peer->state, FUNDING_SPEND_SEEN); peer_set_condition(peer, peer->state, FUNDING_SPEND_SEEN);
peer->owner = new_subd(peer->ld, peer_set_owner(peer, new_peer_subd(peer->ld,
"lightning_onchaind", peer, "lightning_onchaind", peer,
onchain_wire_type_name, onchain_wire_type_name,
onchain_msg, onchain_msg,
NULL, peer_owner_finished, NULL));
NULL, NULL);
if (!peer->owner) { if (!peer->owner) {
log_broken(peer->log, "Could not subdaemon onchain: %s", log_broken(peer->log, "Could not subdaemon onchain: %s",
@ -1890,14 +1857,11 @@ static void peer_start_closingd(struct peer *peer,
return; return;
} }
peer->owner = new_subd(peer->ld, peer_set_owner(peer, new_peer_subd(peer->ld,
"lightning_closingd", peer, "lightning_closingd", peer,
closing_wire_type_name, closing_wire_type_name, closing_msg,
closing_msg, take(&peer_fd), take(&gossip_fd),
bad_peer, NULL));
peer_owner_finished,
take(&peer_fd),
take(&gossip_fd), NULL);
if (!peer->owner) { if (!peer->owner) {
log_unusual(peer->log, "Could not subdaemon closing: %s", log_unusual(peer->log, "Could not subdaemon closing: %s",
strerror(errno)); strerror(errno));
@ -2066,15 +2030,14 @@ static bool peer_start_channeld(struct peer *peer,
if (hsmfd < 0) if (hsmfd < 0)
fatal("Could not read fd from HSM: %s", strerror(errno)); fatal("Could not read fd from HSM: %s", strerror(errno));
peer->owner = new_subd(peer->ld, peer_set_owner(peer, new_peer_subd(peer->ld,
"lightning_channeld", peer, "lightning_channeld", peer,
channel_wire_type_name, channel_wire_type_name,
channel_msg, channel_msg,
bad_peer, take(&peer_fd),
peer_owner_finished, take(&gossip_fd),
take(&peer_fd), take(&hsmfd), NULL));
take(&gossip_fd),
take(&hsmfd), NULL);
if (!peer->owner) { if (!peer->owner) {
log_unusual(peer->log, "Could not subdaemon channel: %s", log_unusual(peer->log, "Could not subdaemon channel: %s",
strerror(errno)); strerror(errno));
@ -2267,7 +2230,8 @@ static bool opening_funder_finished(struct subd *opening, const u8 *resp,
utxos); utxos);
tal_free(utxos); tal_free(utxos);
fc->peer->owner = NULL; /* Unowned (will free openingd). */
peer_set_owner(fc->peer, NULL);
if (!wire_sync_write(fc->peer->ld->hsm_fd, take(msg))) if (!wire_sync_write(fc->peer->ld->hsm_fd, take(msg)))
fatal("Could not write to HSM: %s", strerror(errno)); fatal("Could not write to HSM: %s", strerror(errno));
@ -2275,7 +2239,7 @@ static bool opening_funder_finished(struct subd *opening, const u8 *resp,
msg = hsm_sync_read(fc, fc->peer->ld); msg = hsm_sync_read(fc, fc->peer->ld);
opening_got_hsm_funding_sig(fc, fds[0], fds[1], msg, &cs); opening_got_hsm_funding_sig(fc, fds[0], fds[1], msg, &cs);
/* Tell opening daemon to exit. */ /* openingd already exited. */
return false; return false;
} }
@ -2341,14 +2305,14 @@ static bool opening_fundee_finished(struct subd *opening,
watch_txo(peer, peer->ld->topology, peer, peer->funding_txid, watch_txo(peer, peer->ld->topology, peer, peer->funding_txid,
peer->funding_outnum, funding_spent, NULL); peer->funding_outnum, funding_spent, NULL);
/* Unowned. */ /* Unowned (will free openingd). */
peer->owner = NULL; peer_set_owner(peer, NULL);
/* On to normal operation! */ /* On to normal operation! */
peer_start_channeld(peer, &cs, fds[0], fds[1], funding_signed, false); peer_start_channeld(peer, &cs, fds[0], fds[1], funding_signed, false);
peer_set_condition(peer, OPENINGD, CHANNELD_AWAITING_LOCKIN); peer_set_condition(peer, OPENINGD, CHANNELD_AWAITING_LOCKIN);
/* Tell opening daemon to exit. */ /* openingd already exited. */
return false; return false;
} }
@ -2365,7 +2329,6 @@ static void peer_accept_channel(struct lightningd *ld,
u8 *errmsg; u8 *errmsg;
u8 *msg; u8 *msg;
struct peer *peer; struct peer *peer;
struct subd *opening;
assert(fromwire_peektype(open_msg) == WIRE_OPEN_CHANNEL); assert(fromwire_peektype(open_msg) == WIRE_OPEN_CHANNEL);
@ -2380,17 +2343,15 @@ static void peer_accept_channel(struct lightningd *ld,
} }
peer_set_condition(peer, UNINITIALIZED, OPENINGD); peer_set_condition(peer, UNINITIALIZED, OPENINGD);
opening = new_subd(ld, peer_set_owner(peer,
"lightning_openingd", peer, new_peer_subd(ld, "lightning_openingd", peer,
opening_wire_type_name, opening_wire_type_name, NULL,
NULL, bad_peer, peer_owner_finished, take(&peer_fd), take(&gossip_fd), NULL));
take(&peer_fd), take(&gossip_fd), NULL); if (!peer->owner) {
if (!opening) {
peer_fail_transient(peer, "Failed to subdaemon opening: %s", peer_fail_transient(peer, "Failed to subdaemon opening: %s",
strerror(errno)); strerror(errno));
return; return;
} }
peer->owner = opening;
/* They will open channel. */ /* They will open channel. */
peer->funder = REMOTE; peer->funder = REMOTE;
@ -2449,7 +2410,6 @@ static void peer_offer_channel(struct lightningd *ld,
int peer_fd, int gossip_fd) int peer_fd, int gossip_fd)
{ {
u8 *msg; u8 *msg;
struct subd *opening;
u32 max_to_self_delay, max_minimum_depth; u32 max_to_self_delay, max_minimum_depth;
u64 min_effective_htlc_capacity_msat; u64 min_effective_htlc_capacity_msat;
struct utxo *utxos; struct utxo *utxos;
@ -2470,19 +2430,18 @@ static void peer_offer_channel(struct lightningd *ld,
fc->peer->push_msat = fc->push_msat; fc->peer->push_msat = fc->push_msat;
peer_set_condition(fc->peer, UNINITIALIZED, OPENINGD); peer_set_condition(fc->peer, UNINITIALIZED, OPENINGD);
opening = new_subd(ld, peer_set_owner(fc->peer,
"lightning_openingd", fc->peer, new_peer_subd(ld,
opening_wire_type_name, "lightning_openingd", fc->peer,
NULL, bad_peer, peer_owner_finished, opening_wire_type_name, NULL,
take(&peer_fd), take(&gossip_fd), NULL); take(&peer_fd), take(&gossip_fd), NULL));
if (!opening) { if (!fc->peer->owner) {
fc->peer = tal_free(fc->peer); fc->peer = tal_free(fc->peer);
command_fail(fc->cmd, command_fail(fc->cmd,
"Failed to launch openingd: %s", "Failed to launch openingd: %s",
strerror(errno)); strerror(errno));
return; return;
} }
fc->peer->owner = opening;
/* FIXME: This is wrong in several ways. /* FIXME: This is wrong in several ways.
* *
@ -2516,7 +2475,7 @@ static void peer_offer_channel(struct lightningd *ld,
max_to_self_delay, max_to_self_delay,
min_effective_htlc_capacity_msat, min_effective_htlc_capacity_msat,
cs, fc->peer->seed); cs, fc->peer->seed);
subd_send_msg(opening, take(msg)); subd_send_msg(fc->peer->owner, take(msg));
utxos = from_utxoptr_arr(fc, fc->utxomap); utxos = from_utxoptr_arr(fc, fc->utxomap);
@ -2532,7 +2491,8 @@ static void peer_offer_channel(struct lightningd *ld,
tal_steal(fc->peer, fc); tal_steal(fc->peer, fc);
tal_add_destructor(fc, fail_fundchannel_command); tal_add_destructor(fc, fail_fundchannel_command);
subd_req(fc, opening, take(msg), -1, 2, opening_funder_finished, fc); subd_req(fc, fc->peer->owner,
take(msg), -1, 2, opening_funder_finished, fc);
} }
/* Peer has been released from gossip. Start opening. */ /* Peer has been released from gossip. Start opening. */

2
lightningd/peer_control.h

@ -47,7 +47,7 @@ struct peer {
/* Inside ld->peers. */ /* Inside ld->peers. */
struct list_node list; struct list_node list;
/* What stage is this in? NULL during first creation. */ /* Is there a single subdaemon responsible for us? */
struct subd *owner; struct subd *owner;
/* History */ /* History */

123
lightningd/subd.c

@ -1,3 +1,4 @@
#include <ccan/err/err.h>
#include <ccan/io/fdpass/fdpass.h> #include <ccan/io/fdpass/fdpass.h>
#include <ccan/io/io.h> #include <ccan/io/io.h>
#include <ccan/mem/mem.h> #include <ccan/mem/mem.h>
@ -433,9 +434,15 @@ static struct io_plan *sd_msg_read(struct io_conn *conn, struct subd *sd)
} }
/* If they care, tell them about invalid peer behavior */ /* If they care, tell them about invalid peer behavior */
if (sd->peerbadcb && type == STATUS_FAIL_PEER_BAD) { if (sd->peer && type == STATUS_FAIL_PEER_BAD) {
const char *errmsg = tal_fmt(sd, "%.*s", str_len, str); /* Don't free ourselves; we're about to do that. */
sd->peerbadcb(sd, errmsg); struct peer *peer = sd->peer;
sd->peer = NULL;
peer_fail_permanent(peer,
take(tal_dup_arr(peer, u8,
(u8 *)str, str_len,
0)));
} }
return io_close(conn); return io_close(conn);
} }
@ -495,9 +502,23 @@ static void destroy_subd(struct subd *sd)
if (sd->conn) if (sd->conn)
sd->conn = tal_free(sd->conn); sd->conn = tal_free(sd->conn);
log_debug(sd->log, "finishing: %p", sd->finished); /* Peer still attached? */
if (sd->finished) if (sd->peer) {
sd->finished(sd, status); /* Don't loop back when we fail it. */
struct peer *peer = sd->peer;
sd->peer = NULL;
peer_fail_transient(peer,
"Owning subdaemon %s died (%i)",
sd->name, status);
}
if (sd->must_not_exit) {
if (WIFEXITED(status))
errx(1, "%s failed (exit status %i), exiting.",
sd->name, WEXITSTATUS(status));
errx(1, "%s failed (signal %u), exiting.",
sd->name, WTERMSIG(status));
}
} }
static struct io_plan *msg_send_next(struct io_conn *conn, struct subd *sd) static struct io_plan *msg_send_next(struct io_conn *conn, struct subd *sd)
@ -524,23 +545,18 @@ static struct io_plan *msg_setup(struct io_conn *conn, struct subd *sd)
msg_send_next(conn, sd)); msg_send_next(conn, sd));
} }
struct subd *new_subd(struct lightningd *ld, static struct subd *new_subd(struct lightningd *ld,
const char *name, const char *name,
struct peer *peer, struct peer *peer,
const char *(*msgname)(int msgtype), const char *(*msgname)(int msgtype),
int (*msgcb)(struct subd *, const u8 *, const int *fds), int (*msgcb)(struct subd *, const u8 *, const int *fds),
void (*peerbadcb)(struct subd *, const char *), va_list *ap)
void (*finished)(struct subd *, int),
...)
{ {
va_list ap;
struct subd *sd = tal(ld, struct subd); struct subd *sd = tal(ld, struct subd);
int msg_fd; int msg_fd;
va_start(ap, finished);
sd->pid = subd(ld->daemon_dir, name, ld->dev_debug_subdaemon, sd->pid = subd(ld->daemon_dir, name, ld->dev_debug_subdaemon,
&msg_fd, ld->dev_disconnect_fd, &ap); &msg_fd, ld->dev_disconnect_fd, ap);
va_end(ap);
if (sd->pid == (pid_t)-1) { if (sd->pid == (pid_t)-1) {
log_unusual(ld->log, "subd %s failed: %s", log_unusual(ld->log, "subd %s failed: %s",
name, strerror(errno)); name, strerror(errno));
@ -549,10 +565,9 @@ struct subd *new_subd(struct lightningd *ld,
sd->ld = ld; sd->ld = ld;
sd->log = new_log(sd, ld->log_book, "%s(%u):", name, sd->pid); sd->log = new_log(sd, ld->log_book, "%s(%u):", name, sd->pid);
sd->name = name; sd->name = name;
sd->finished = finished; sd->must_not_exit = false;
sd->msgname = msgname; sd->msgname = msgname;
sd->msgcb = msgcb; sd->msgcb = msgcb;
sd->peerbadcb = peerbadcb;
sd->fds_in = NULL; sd->fds_in = NULL;
msg_queue_init(&sd->outq, sd); msg_queue_init(&sd->outq, sd);
tal_add_destructor(sd, destroy_subd); tal_add_destructor(sd, destroy_subd);
@ -568,6 +583,40 @@ struct subd *new_subd(struct lightningd *ld,
return sd; return sd;
} }
struct subd *new_global_subd(struct lightningd *ld,
const char *name,
const char *(*msgname)(int msgtype),
int (*msgcb)(struct subd *, const u8 *,
const int *fds),
...)
{
va_list ap;
struct subd *sd;
va_start(ap, msgcb);
sd = new_subd(ld, name, NULL, msgname, msgcb, &ap);
va_end(ap);
sd->must_not_exit = true;
return sd;
}
struct subd *new_peer_subd(struct lightningd *ld,
const char *name,
struct peer *peer,
const char *(*msgname)(int msgtype),
int (*msgcb)(struct subd *, const u8 *,
const int *fds), ...)
{
va_list ap;
struct subd *sd;
va_start(ap, msgcb);
sd = new_subd(ld, name, peer, msgname, msgcb, &ap);
va_end(ap);
return sd;
}
void subd_send_msg(struct subd *sd, const u8 *msg_out) void subd_send_msg(struct subd *sd, const u8 *msg_out)
{ {
/* FIXME: We should use unique upper bits for each daemon, then /* FIXME: We should use unique upper bits for each daemon, then
@ -600,33 +649,37 @@ void subd_req_(const tal_t *ctx,
void subd_shutdown(struct subd *sd, unsigned int seconds) void subd_shutdown(struct subd *sd, unsigned int seconds)
{ {
/* Idempotent. */
if (!sd->conn)
return;
log_debug(sd->log, "Shutting down"); log_debug(sd->log, "Shutting down");
/* No finished callback any more. */ tal_del_destructor(sd, destroy_subd);
sd->finished = NULL;
/* Don't let destroy_peer dereference us */ /* This should make it exit; steal so it stays around. */
if (sd->peer) {
sd->peer->owner = NULL;
sd->peer = NULL;
}
/* Don't free sd when we close connection manually. */
tal_steal(sd->ld, sd); tal_steal(sd->ld, sd);
/* Close connection: should begin shutdown now. */
sd->conn = tal_free(sd->conn); sd->conn = tal_free(sd->conn);
/* Do we actually want to wait? */ /* Wait for a while. */
while (seconds) { while (seconds) {
if (waitpid(sd->pid, NULL, WNOHANG) > 0) { if (waitpid(sd->pid, NULL, WNOHANG) > 0) {
tal_del_destructor(sd, destroy_subd);
return; return;
} }
sleep(1); sleep(1);
seconds--; seconds--;
} }
/* Didn't die? This will kill it harder */
sd->must_not_exit = false;
destroy_subd(sd);
tal_free(sd);
}
void subd_release_peer(struct subd *owner, struct peer *peer)
{
/* If owner is a per-peer-daemon, and not already freeing itself... */
if (owner->peer) {
assert(owner->peer == peer);
owner->peer = NULL;
tal_free(owner);
}
} }
char *opt_subd_debug(const char *optarg, struct lightningd *ld) char *opt_subd_debug(const char *optarg, struct lightningd *ld)

57
lightningd/subd.h

@ -34,10 +34,6 @@ struct subd {
/* Callback when non-reply message comes in. */ /* Callback when non-reply message comes in. */
int (*msgcb)(struct subd *, const u8 *, const int *); int (*msgcb)(struct subd *, const u8 *, const int *);
const char *(*msgname)(int msgtype); const char *(*msgname)(int msgtype);
void (*finished)(struct subd *sd, int status);
/* Callback when the peer misbehaves. */
void (*peerbadcb)(struct subd *, const char *what);
/* Buffer for input. */ /* Buffer for input. */
u8 *msg_in; u8 *msg_in;
@ -46,6 +42,9 @@ struct subd {
size_t num_fds_in_read; size_t num_fds_in_read;
int *fds_in; int *fds_in;
/* For global daemons: we fail if they fail. */
bool must_not_exit;
/* Messages queue up here. */ /* Messages queue up here. */
struct msg_queue outq; struct msg_queue outq;
@ -54,14 +53,11 @@ struct subd {
}; };
/** /**
* new_subd - create a new subdaemon. * new_global_subd - create a new global subdaemon.
* @ld: global state * @ld: global state
* @name: basename of daemon * @name: basename of daemon
* @peer: peer to associate (if any).
* @msgname: function to get name from messages * @msgname: function to get name from messages
* @msgcb: function to call when non-fatal message received (or NULL) * @msgcb: function to call when non-fatal message received (or NULL)
* @peerbadcb: function to call for STATUS_FAIL_PEER_BAD (or NULL for none)
* @finished: function to call when it's finished (with exit status).
* @...: NULL-terminated list of pointers to fds to hand as fd 3, 4... * @...: NULL-terminated list of pointers to fds to hand as fd 3, 4...
* (can be take, if so, set to -1) * (can be take, if so, set to -1)
* *
@ -69,13 +65,34 @@ struct subd {
* that many @fds are received before calling again. If it returns -1, the * that many @fds are received before calling again. If it returns -1, the
* subdaemon is shutdown. * subdaemon is shutdown.
*/ */
struct subd *new_subd(struct lightningd *ld, struct subd *new_global_subd(struct lightningd *ld,
const char *name, const char *name,
struct peer *peer, const char *(*msgname)(int msgtype),
const char *(*msgname)(int msgtype), int (*msgcb)(struct subd *, const u8 *,
int (*msgcb)(struct subd *, const u8 *, const int *fds), const int *fds),
void (*peerbadcb)(struct subd *, const char *), ...);
void (*finished)(struct subd *, int), ...);
/**
* new_peer_subd - create a new subdaemon for a specific peer.
* @ld: global state
* @name: basename of daemon
* @peer: peer to associate.
* @msgname: function to get name from messages
* @msgcb: function to call when non-fatal message received (or NULL)
* @...: NULL-terminated list of pointers to fds to hand as fd 3, 4...
* (can be take, if so, set to -1)
*
* @msgcb gets called with @fds set to NULL: if it returns a positive number,
* that many @fds are received before calling again. If it returns -1, the
* subdaemon is shutdown.
*/
struct subd *new_peer_subd(struct lightningd *ld,
const char *name,
struct peer *peer,
const char *(*msgname)(int msgtype),
int (*msgcb)(struct subd *, const u8 *,
const int *fds),
...);
/** /**
* subd_raw - raw interface to get a subdaemon on an fd (for HSM) * subd_raw - raw interface to get a subdaemon on an fd (for HSM)
@ -126,6 +143,16 @@ void subd_req_(const tal_t *ctx,
bool (*replycb)(struct subd *, const u8 *, const int *, void *), bool (*replycb)(struct subd *, const u8 *, const int *, void *),
void *replycb_data); void *replycb_data);
/**
* subd_release_peer - try to politely shut down a subdaemon.
* @owner: subd which owned peer.
* @peer: peer to release.
*
* If the subdaemon is not already shutting down, and it is a per-peer
* subdaemon, this shuts it down.
*/
void subd_release_peer(struct subd *owner, struct peer *peer);
/** /**
* subd_shutdown - try to politely shut down a subdaemon. * subd_shutdown - try to politely shut down a subdaemon.
* @subd: subd to shutdown. * @subd: subd to shutdown.

5
tests/test_lightningd.py

@ -1252,16 +1252,15 @@ class LightningDTests(BaseLightningDTests):
# Reconnect. # Reconnect.
l1.rpc.connect(l2.info['id'], 'localhost:{}'.format(l2.info['port'])) l1.rpc.connect(l2.info['id'], 'localhost:{}'.format(l2.info['port']))
# We should get a message about old one exiting. # We should get a message about reconnecting.
l2.daemon.wait_for_log('Peer has reconnected, state OPENINGD') l2.daemon.wait_for_log('Peer has reconnected, state OPENINGD')
l2.daemon.wait_for_log('Owning subdaemon lightning_openingd died')
# Should work fine. # Should work fine.
l1.rpc.fundchannel(l2.info['id'], 20000) l1.rpc.fundchannel(l2.info['id'], 20000)
l1.daemon.wait_for_log('sendrawtx exit 0') l1.daemon.wait_for_log('sendrawtx exit 0')
# Just to be sure, second openingd hand over to channeld. # Just to be sure, second openingd hand over to channeld.
l2.daemon.wait_for_log('Subdaemon lightning_openingd died \(0\)') l2.daemon.wait_for_log('lightning_openingd.*REPLY WIRE_OPENING_FUNDEE_REPLY with 2 fds')
def test_reconnect_normal(self): def test_reconnect_normal(self):
# Should reconnect fine even if locked message gets lost. # Should reconnect fine even if locked message gets lost.

Loading…
Cancel
Save