Browse Source

daemon: remove normal operation loop from state.c

It's now in its own little state machine, which is more typesafe.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
ppa-0.6.1
Rusty Russell 9 years ago
parent
commit
1444d407f3
  1. 161
      daemon/peer.c
  2. 102
      state.c
  3. 25
      state.h

161
daemon/peer.c

@ -327,6 +327,10 @@ static struct io_plan *clearing_pkt_in(struct io_conn *conn, struct peer *peer)
{ {
Pkt *err = NULL, *pkt = peer->inpkt; Pkt *err = NULL, *pkt = peer->inpkt;
/* FIXME: always demux via pkt_in */
if (peer->state == STATE_MUTUAL_CLOSING)
return closing_pkt_in(conn, peer);
assert(peer->state == STATE_CLEARING assert(peer->state == STATE_CLEARING
|| peer->state == STATE_CLEARING_COMMITTING); || peer->state == STATE_CLEARING_COMMITTING);
@ -434,15 +438,89 @@ static void peer_start_clearing(struct peer *peer)
} }
} }
/* This is the io loop while we're in normal mode. */
static struct io_plan *normal_pkt_in(struct io_conn *conn, struct peer *peer)
{
Pkt *err = NULL, *pkt = peer->inpkt;
/* FIXME: This happens when we close via json; better to always
* demux via pkt_in. */
if (state_is_clearing(peer->state))
return clearing_pkt_in(conn, peer);
if (state_is_error(peer->state) || state_is_onchain(peer->state))
return peer_close(conn, peer);
assert(peer->state == STATE_NORMAL
|| peer->state == STATE_NORMAL_COMMITTING);
switch (pkt->pkt_case) {
case PKT_UPDATE_ADD_HTLC:
err = accept_pkt_htlc_add(peer, pkt);
break;
case PKT_UPDATE_FULFILL_HTLC:
err = accept_pkt_htlc_fulfill(peer, pkt);
break;
case PKT_UPDATE_FAIL_HTLC:
err = accept_pkt_htlc_fail(peer, pkt);
break;
case PKT_UPDATE_COMMIT:
err = accept_pkt_commit(peer, pkt);
if (!err)
queue_pkt_revocation(peer);
break;
case PKT_CLOSE_CLEARING:
err = accept_pkt_close_clearing(peer, pkt);
if (err)
break;
if (peer->state == STATE_NORMAL)
set_peer_state(peer, STATE_CLEARING, __func__);
else {
assert(peer->state == STATE_NORMAL_COMMITTING);
set_peer_state(peer, STATE_CLEARING_COMMITTING,
__func__);
}
peer_start_clearing(peer);
return peer_read_packet(conn, peer, clearing_pkt_in);
case PKT_UPDATE_REVOCATION:
if (peer->state == STATE_NORMAL_COMMITTING) {
err = accept_pkt_revocation(peer, pkt);
if (!err) {
peer_update_complete(peer, NULL);
set_peer_state(peer, STATE_NORMAL, __func__);
}
break;
}
/* Fall thru. */
default:
if (peer->state == STATE_NORMAL_COMMITTING)
peer_update_complete(peer, err->error->problem);
return peer_received_unexpected_pkt(conn, peer, pkt);
}
if (err) {
if (peer->state == STATE_NORMAL_COMMITTING)
peer_update_complete(peer, err->error->problem);
return peer_comms_err(conn, peer, err);
}
return peer_read_packet(conn, peer, normal_pkt_in);
}
static void state_single(struct peer *peer, static void state_single(struct peer *peer,
const enum state_input input, const enum state_input input,
const union input *idata) const Pkt *pkt)
{ {
const struct bitcoin_tx *broadcast; const struct bitcoin_tx *broadcast;
enum state newstate; enum state newstate;
size_t old_outpkts = tal_count(peer->outpkt); size_t old_outpkts = tal_count(peer->outpkt);
newstate = state(peer, input, idata, &broadcast); newstate = state(peer, input, pkt, &broadcast);
set_peer_state(peer, newstate, input_name(input)); set_peer_state(peer, newstate, input_name(input));
/* If we added uncommitted changes, we should have set them to send. */ /* If we added uncommitted changes, we should have set them to send. */
@ -456,10 +534,7 @@ static void state_single(struct peer *peer,
if (broadcast) if (broadcast)
broadcast_tx(peer, broadcast); broadcast_tx(peer, broadcast);
if (peer->state == STATE_CLEARING if (state_is_error(peer->state)) {
|| peer->state == STATE_CLEARING_COMMITTING) {
peer_start_clearing(peer);
} else if (state_is_error(peer->state)) {
/* Breakdown is common, others less so. */ /* Breakdown is common, others less so. */
if (peer->state != STATE_ERR_BREAKDOWN) if (peer->state != STATE_ERR_BREAKDOWN)
log_broken(peer->log, "Entered error state %s", log_broken(peer->log, "Entered error state %s",
@ -477,14 +552,14 @@ static void state_single(struct peer *peer,
static void state_event(struct peer *peer, static void state_event(struct peer *peer,
const enum state_input input, const enum state_input input,
const union input *idata) const Pkt *pkt)
{ {
if (!state_is_opening(peer->state) && !state_is_normal(peer->state)) { if (!state_is_opening(peer->state)) {
log_unusual(peer->log, log_unusual(peer->log,
"Unexpected input %s while state %s", "Unexpected input %s while state %s",
input_name(input), state_name(peer->state)); input_name(input), state_name(peer->state));
} else { } else {
state_single(peer, input, idata); state_single(peer, input, pkt);
} }
} }
@ -493,17 +568,7 @@ static bool command_htlc_fail(struct peer *peer, u64 id)
if (!state_can_remove_htlc(peer->state)) if (!state_can_remove_htlc(peer->state))
return false; return false;
if (peer->state == STATE_CLEARING) {
queue_pkt_htlc_fail(peer, id); queue_pkt_htlc_fail(peer, id);
} else {
union input idata;
union htlc_staging stage;
stage.fail.fail = HTLC_FAIL;
stage.fail.id = id;
idata.stage = &stage;
state_event(peer, CMD_SEND_HTLC_FAIL, &idata);
}
return true; return true;
} }
@ -514,21 +579,7 @@ static bool command_htlc_fulfill(struct peer *peer,
if (!state_can_remove_htlc(peer->state)) if (!state_can_remove_htlc(peer->state))
return false; return false;
/* Commands should still be blocked during this! */
assert(peer->state != STATE_CLEARING_COMMITTING);
if (peer->state == STATE_CLEARING) {
queue_pkt_htlc_fulfill(peer, id, r); queue_pkt_htlc_fulfill(peer, id, r);
} else {
union input idata;
union htlc_staging stage;
stage.fulfill.fulfill = HTLC_FULFILL;
stage.fulfill.r = *r;
stage.fulfill.id = id;
idata.stage = &stage;
state_event(peer, CMD_SEND_HTLC_FULFILL, &idata);
}
return true; return true;
} }
@ -537,9 +588,6 @@ static struct io_plan *pkt_out(struct io_conn *conn, struct peer *peer)
Pkt *out; Pkt *out;
size_t n = tal_count(peer->outpkt); size_t n = tal_count(peer->outpkt);
if (peer->fake_close || !peer->output_enabled)
return io_out_wait(conn, peer, pkt_out, peer);
if (n == 0) { if (n == 0) {
/* We close the connection once we've sent everything. */ /* We close the connection once we've sent everything. */
if (!state_can_io(peer->state)) if (!state_can_io(peer->state))
@ -547,6 +595,9 @@ static struct io_plan *pkt_out(struct io_conn *conn, struct peer *peer)
return io_out_wait(conn, peer, pkt_out, peer); return io_out_wait(conn, peer, pkt_out, peer);
} }
if (peer->fake_close || !peer->output_enabled)
return io_out_wait(conn, peer, pkt_out, peer);
out = peer->outpkt[0]; out = peer->outpkt[0];
memmove(peer->outpkt, peer->outpkt + 1, (sizeof(*peer->outpkt)*(n-1))); memmove(peer->outpkt, peer->outpkt + 1, (sizeof(*peer->outpkt)*(n-1)));
tal_resize(&peer->outpkt, n-1); tal_resize(&peer->outpkt, n-1);
@ -555,27 +606,18 @@ static struct io_plan *pkt_out(struct io_conn *conn, struct peer *peer)
static struct io_plan *pkt_in(struct io_conn *conn, struct peer *peer) static struct io_plan *pkt_in(struct io_conn *conn, struct peer *peer)
{ {
union input idata; if (state_is_normal(peer->state))
const tal_t *ctx; return normal_pkt_in(conn, peer);
else if (state_is_clearing(peer->state))
/* Did something move us into STATE_CLEARING? */
if (peer->state == STATE_CLEARING
|| peer->state == STATE_CLEARING_COMMITTING)
return clearing_pkt_in(conn, peer); return clearing_pkt_in(conn, peer);
else if (peer->state == STATE_MUTUAL_CLOSING) else if (peer->state == STATE_MUTUAL_CLOSING)
return closing_pkt_in(conn, peer); return closing_pkt_in(conn, peer);
ctx = tal(peer, char);
idata.pkt = tal_steal(ctx, peer->inpkt);
/* We ignore packets if they tell us to. */ /* We ignore packets if they tell us to. */
if (!peer->fake_close && state_can_io(peer->state)) { if (!peer->fake_close && state_can_io(peer->state)) {
state_event(peer, peer->inpkt->pkt_case, &idata); state_event(peer, peer->inpkt->pkt_case, peer->inpkt);
} }
/* Free peer->inpkt unless stolen above. */
tal_free(ctx);
return peer_read_packet(conn, peer, pkt_in); return peer_read_packet(conn, peer, pkt_in);
} }
@ -645,12 +687,12 @@ static void do_commit(struct peer *peer, struct command *jsoncmd)
assert(!peer->commit_jsoncmd); assert(!peer->commit_jsoncmd);
peer->commit_jsoncmd = jsoncmd; peer->commit_jsoncmd = jsoncmd;
if (peer->state == STATE_CLEARING) {
queue_pkt_commit(peer); queue_pkt_commit(peer);
if (peer->state == STATE_CLEARING) {
set_peer_state(peer, STATE_CLEARING_COMMITTING, __func__); set_peer_state(peer, STATE_CLEARING_COMMITTING, __func__);
} else { } else {
assert(peer->state == STATE_NORMAL); assert(peer->state == STATE_NORMAL);
state_event(peer, CMD_SEND_COMMIT, NULL); set_peer_state(peer, STATE_NORMAL_COMMITTING, __func__);
} }
} }
@ -1697,6 +1739,8 @@ static void check_for_resolution(struct peer *peer,
/* It's theoretically possible that peer is still writing output */ /* It's theoretically possible that peer is still writing output */
if (!peer->conn) if (!peer->conn)
io_break(peer); io_break(peer);
else
io_wake(peer);
} }
/* We assume the tx is valid! Don't do a blockchain.info and feed this /* We assume the tx is valid! Don't do a blockchain.info and feed this
@ -2314,20 +2358,14 @@ void peer_add_htlc_expiry(struct peer *peer,
new_abstimer(peer->dstate, peer, absexpiry, check_htlc_expiry, peer); new_abstimer(peer->dstate, peer, absexpiry, check_htlc_expiry, peer);
} }
/* We do final checks just before we start command, as things may have
* changed. */
static void do_newhtlc(struct peer *peer, static void do_newhtlc(struct peer *peer,
const struct channel_htlc *htlc, struct channel_htlc *htlc,
struct command *jsoncmd) struct command *jsoncmd)
{ {
struct channel_state *cstate; struct channel_state *cstate;
union input idata;
union htlc_staging stage;
/* Now we can assign counter and guarantee uniqueness. */ /* Assign unique ID. */
stage.add.add = HTLC_ADD; htlc->id = peer->htlc_id_counter;
stage.add.htlc = *htlc;
stage.add.htlc.id = peer->htlc_id_counter;
/* BOLT #2: /* BOLT #2:
* *
@ -2377,8 +2415,7 @@ static void do_newhtlc(struct peer *peer,
peer->htlc_id_counter++; peer->htlc_id_counter++;
/* FIXME: Never propose duplicate rvalues? */ /* FIXME: Never propose duplicate rvalues? */
idata.stage = &stage; queue_pkt_htlc_add(peer, htlc);
state_event(peer, CMD_SEND_HTLC_ADD, &idata);
command_success(jsoncmd, null_response(jsoncmd)); command_success(jsoncmd, null_response(jsoncmd));
} }

102
state.c

@ -14,19 +14,6 @@ static enum state next_state(struct peer *peer,
return state; return state;
} }
/*
* Simple marker to note we don't update state.
*
* This happens in normal state except when committing or closing.
*/
static enum state unchanged_state(const struct peer *peer,
const enum state_input input)
{
log_debug(peer->log, "%s: %s unchanged",
input_name(input), state_name(peer->state));
return peer->state;
}
static void queue_tx_broadcast(const struct bitcoin_tx **broadcast, static void queue_tx_broadcast(const struct bitcoin_tx **broadcast,
const struct bitcoin_tx *tx) const struct bitcoin_tx *tx)
{ {
@ -37,7 +24,7 @@ static void queue_tx_broadcast(const struct bitcoin_tx **broadcast,
enum state state(struct peer *peer, enum state state(struct peer *peer,
const enum state_input input, const enum state_input input,
const union input *idata, const Pkt *pkt,
const struct bitcoin_tx **broadcast) const struct bitcoin_tx **broadcast)
{ {
Pkt *err; Pkt *err;
@ -63,7 +50,7 @@ enum state state(struct peer *peer,
break; break;
case STATE_OPEN_WAIT_FOR_OPEN_NOANCHOR: case STATE_OPEN_WAIT_FOR_OPEN_NOANCHOR:
if (input_is(input, PKT_OPEN)) { if (input_is(input, PKT_OPEN)) {
err = accept_pkt_open(peer, idata->pkt); err = accept_pkt_open(peer, pkt);
if (err) { if (err) {
peer_open_complete(peer, err->error->problem); peer_open_complete(peer, err->error->problem);
goto err_breakdown; goto err_breakdown;
@ -76,7 +63,7 @@ enum state state(struct peer *peer,
break; break;
case STATE_OPEN_WAIT_FOR_OPEN_WITHANCHOR: case STATE_OPEN_WAIT_FOR_OPEN_WITHANCHOR:
if (input_is(input, PKT_OPEN)) { if (input_is(input, PKT_OPEN)) {
err = accept_pkt_open(peer, idata->pkt); err = accept_pkt_open(peer, pkt);
if (err) { if (err) {
peer_open_complete(peer, err->error->problem); peer_open_complete(peer, err->error->problem);
goto err_breakdown; goto err_breakdown;
@ -102,7 +89,7 @@ enum state state(struct peer *peer,
break; break;
case STATE_OPEN_WAIT_FOR_ANCHOR: case STATE_OPEN_WAIT_FOR_ANCHOR:
if (input_is(input, PKT_OPEN_ANCHOR)) { if (input_is(input, PKT_OPEN_ANCHOR)) {
err = accept_pkt_anchor(peer, idata->pkt); err = accept_pkt_anchor(peer, pkt);
if (err) { if (err) {
peer_open_complete(peer, err->error->problem); peer_open_complete(peer, err->error->problem);
goto err_breakdown; goto err_breakdown;
@ -121,7 +108,7 @@ enum state state(struct peer *peer,
break; break;
case STATE_OPEN_WAIT_FOR_COMMIT_SIG: case STATE_OPEN_WAIT_FOR_COMMIT_SIG:
if (input_is(input, PKT_OPEN_COMMIT_SIG)) { if (input_is(input, PKT_OPEN_COMMIT_SIG)) {
err = accept_pkt_open_commit_sig(peer, idata->pkt); err = accept_pkt_open_commit_sig(peer, pkt);
if (err) { if (err) {
bitcoin_release_anchor(peer, INPUT_NONE); bitcoin_release_anchor(peer, INPUT_NONE);
peer_open_complete(peer, err->error->problem); peer_open_complete(peer, err->error->problem);
@ -141,7 +128,7 @@ enum state state(struct peer *peer,
break; break;
case STATE_OPEN_WAITING_OURANCHOR: case STATE_OPEN_WAITING_OURANCHOR:
if (input_is(input, PKT_OPEN_COMPLETE)) { if (input_is(input, PKT_OPEN_COMPLETE)) {
err = accept_pkt_open_complete(peer, idata->pkt); err = accept_pkt_open_complete(peer, pkt);
if (err) { if (err) {
peer_open_complete(peer, err->error->problem); peer_open_complete(peer, err->error->problem);
/* We no longer care about anchor depth. */ /* We no longer care about anchor depth. */
@ -181,7 +168,7 @@ enum state state(struct peer *peer,
break; break;
case STATE_OPEN_WAITING_THEIRANCHOR: case STATE_OPEN_WAITING_THEIRANCHOR:
if (input_is(input, PKT_OPEN_COMPLETE)) { if (input_is(input, PKT_OPEN_COMPLETE)) {
err = accept_pkt_open_complete(peer, idata->pkt); err = accept_pkt_open_complete(peer, pkt);
if (err) { if (err) {
peer_open_complete(peer, err->error->problem); peer_open_complete(peer, err->error->problem);
/* We no longer care about anchor depth. */ /* We no longer care about anchor depth. */
@ -238,76 +225,9 @@ enum state state(struct peer *peer,
} }
break; break;
/* /* Should never happen. */
* Channel normal operating states.
*/
case STATE_NORMAL: case STATE_NORMAL:
/* You can only issue this command one at a time. */
if (input_is(input, CMD_SEND_COMMIT)) {
queue_pkt_commit(peer);
return next_state(peer, input, STATE_NORMAL_COMMITTING);
}
/* Fall through... */
case STATE_NORMAL_COMMITTING: case STATE_NORMAL_COMMITTING:
if (input_is(input, CMD_SEND_HTLC_ADD)) {
/* We are to send an HTLC add. */
assert(idata->stage->type == HTLC_ADD);
queue_pkt_htlc_add(peer, &idata->stage->add.htlc);
return unchanged_state(peer, input);
} else if (input_is(input, CMD_SEND_HTLC_FULFILL)) {
assert(idata->stage->type == HTLC_FULFILL);
/* We are to send an HTLC fulfill. */
queue_pkt_htlc_fulfill(peer,
idata->stage->fulfill.id,
&idata->stage->fulfill.r);
return unchanged_state(peer, input);
} else if (input_is(input, CMD_SEND_HTLC_FAIL)) {
assert(idata->stage->type == HTLC_FAIL);
/* We are to send an HTLC fail. */
queue_pkt_htlc_fail(peer, idata->stage->fail.id);
return unchanged_state(peer, input);
}
/* Only expect revocation in STATE_NORMAL_COMMITTING */
else if (peer->state == STATE_NORMAL_COMMITTING
&& input_is(input, PKT_UPDATE_REVOCATION)) {
err = accept_pkt_revocation(peer, idata->pkt);
if (err)
goto err_breakdown_maybe_committing;
peer_update_complete(peer, NULL);
return next_state(peer, input, STATE_NORMAL);
}
if (input_is(input, PKT_UPDATE_ADD_HTLC)) {
err = accept_pkt_htlc_add(peer, idata->pkt);
if (err)
goto err_breakdown_maybe_committing;
return unchanged_state(peer, input);
} else if (input_is(input, PKT_UPDATE_FULFILL_HTLC)) {
err = accept_pkt_htlc_fulfill(peer, idata->pkt);
if (err)
goto err_breakdown_maybe_committing;
return unchanged_state(peer, input);
} else if (input_is(input, PKT_UPDATE_FAIL_HTLC)) {
err = accept_pkt_htlc_fail(peer, idata->pkt);
if (err)
goto err_breakdown_maybe_committing;
return unchanged_state(peer, input);
} else if (input_is(input, PKT_UPDATE_COMMIT)) {
err = accept_pkt_commit(peer, idata->pkt);
if (err)
goto err_breakdown_maybe_committing;
queue_pkt_revocation(peer);
return unchanged_state(peer, input);
} else if (input_is(input, PKT_CLOSE_CLEARING)) {
goto accept_clearing;
} else if (input_is_pkt(input)) {
if (peer->state == STATE_NORMAL_COMMITTING)
peer_update_complete(peer, "unexpected packet");
goto unexpected_pkt;
}
break;
/* Should never happen. */
case STATE_ERR_INTERNAL: case STATE_ERR_INTERNAL:
case STATE_ERR_ANCHOR_TIMEOUT: case STATE_ERR_ANCHOR_TIMEOUT:
case STATE_ERR_INFORMATION_LEAK: case STATE_ERR_INFORMATION_LEAK:
@ -328,13 +248,13 @@ enum state state(struct peer *peer,
return next_state(peer, input, STATE_ERR_INTERNAL); return next_state(peer, input, STATE_ERR_INTERNAL);
unexpected_pkt: unexpected_pkt:
peer_unexpected_pkt(peer, idata->pkt); peer_unexpected_pkt(peer, pkt);
/* Don't reply to an error with an error. */ /* Don't reply to an error with an error. */
if (!input_is(input, PKT_ERROR)) { if (!input_is(input, PKT_ERROR)) {
goto breakdown; goto breakdown;
} }
err = pkt_err_unexpected(peer, idata->pkt); err = pkt_err_unexpected(peer, pkt);
goto err_breakdown; goto err_breakdown;
err_breakdown_maybe_committing: err_breakdown_maybe_committing:
@ -346,7 +266,7 @@ breakdown:
return next_state(peer, input, STATE_ERR_BREAKDOWN); return next_state(peer, input, STATE_ERR_BREAKDOWN);
accept_clearing: accept_clearing:
err = accept_pkt_close_clearing(peer, idata->pkt); err = accept_pkt_close_clearing(peer, pkt);
if (err) if (err)
goto err_breakdown_maybe_committing; goto err_breakdown_maybe_committing;

25
state.h

@ -79,35 +79,14 @@ static inline bool input_is_pkt(enum state_input input)
return input <= PKT_ERROR; return input <= PKT_ERROR;
} }
union input {
/* For PKT_* */
Pkt *pkt;
/* For CMD_SEND_HTLC_ADD, CMD_SEND_HTLC_FULFILL, CMD_SEND_HTLC_FAIL */
union htlc_staging *stage;
};
enum state state(struct peer *peer, enum state state(struct peer *peer,
const enum state_input input, const enum state_input input,
const union input *idata, const Pkt *pkt,
const struct bitcoin_tx **broadcast); const struct bitcoin_tx **broadcast);
/* Any CMD_SEND_HTLC_* */ /* a == b? */
#define CMD_SEND_UPDATE_ANY INPUT_MAX
/* a == b? (or one of several for CMD_SEND_UPDATE_ANY) */
static inline bool input_is(enum state_input a, enum state_input b) static inline bool input_is(enum state_input a, enum state_input b)
{ {
if (b == CMD_SEND_UPDATE_ANY) {
/* Single | here, we want to record all. */
return input_is(a, CMD_SEND_HTLC_ADD)
| input_is(a, CMD_SEND_HTLC_FULFILL)
| input_is(a, CMD_SEND_HTLC_FAIL);
}
/* For test_state_coverate to make the states. */
#ifdef MAPPING_INPUTS
MAPPING_INPUTS(b);
#endif
return a == b; return a == b;
} }

Loading…
Cancel
Save