Browse Source

daemon: queue commands rather than executing them immediately.

When the only commands are via JSON, you might argue that we should
simply insist the user not operate on the same peer in parallel.  That
would suck, and also we need to handle the case of a command from
a timer (eg. HTLC expiry!) or a bitcoin event.

So, we need a queue for commands, but also we need to do some of the
command checking just before the command runs: the HTLC we're dealing
with might have vanished for example.

The current command is wrapped in an anonymous "curr_cmd" struct
for extra clarity.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
ppa-0.6.1
Rusty Russell 9 years ago
parent
commit
8b666ea449
  1. 396
      daemon/peer.c
  2. 13
      daemon/peer.h

396
daemon/peer.c

@ -39,6 +39,12 @@ struct json_connecting {
u64 satoshis;
};
struct pending_cmd {
struct list_node list;
void (*dequeue)(struct peer *, void *arg);
void *arg;
};
static struct peer *find_peer(struct lightningd_state *dstate,
const struct pubkey *id)
{
@ -71,23 +77,37 @@ static struct json_result *null_response(const tal_t *ctx)
static void peer_cmd_complete(struct peer *peer, enum command_status status)
{
assert(peer->cmd != INPUT_NONE);
assert(peer->curr_cmd.cmd != INPUT_NONE);
if (peer->jsoncmd) {
/* If it's a json command, complete that now. */
if (peer->curr_cmd.jsoncmd) {
if (status == CMD_FAIL)
/* FIXME: y'know, details. */
command_fail(peer->jsoncmd, "Failed");
command_fail(peer->curr_cmd.jsoncmd, "Failed");
else {
assert(status == CMD_SUCCESS);
command_success(peer->jsoncmd,
null_response(peer->jsoncmd));
command_success(peer->curr_cmd.jsoncmd,
null_response(peer->curr_cmd.jsoncmd));
}
peer->jsoncmd = NULL;
}
peer->cmd = INPUT_NONE;
peer->curr_cmd.cmd = INPUT_NONE;
}
static void set_current_command(struct peer *peer,
const enum state_input input,
void *idata,
struct command *jsoncmd)
{
assert(peer->curr_cmd.cmd == INPUT_NONE);
assert(input != INPUT_NONE);
peer->curr_cmd.cmd = input;
/* This is a union, so assign to any member. */
peer->curr_cmd.cmddata.pkt = idata;
peer->curr_cmd.jsoncmd = jsoncmd;
}
static void update_state(struct peer *peer,
static void state_single(struct peer *peer,
const enum state_input input,
const union input *idata)
{
@ -128,6 +148,61 @@ static void update_state(struct peer *peer,
txid.sha.u.u8[2], txid.sha.u.u8[3]);
bitcoind_send_tx(peer->dstate, broadcast);
}
if (peer->cond == PEER_CLOSED)
io_close(peer->conn);
}
static void try_command(struct peer *peer)
{
/* If we can accept a command, and we have one queued, run it. */
while (peer->cond == PEER_CMD_OK
&& !list_empty(&peer->pending_cmd)) {
struct pending_cmd *pend = list_pop(&peer->pending_cmd,
struct pending_cmd, list);
assert(peer->curr_cmd.cmd == INPUT_NONE);
/* This can fail to enqueue a command! */
pend->dequeue(peer, pend->arg);
tal_free(pend);
if (peer->curr_cmd.cmd != INPUT_NONE) {
state_single(peer, peer->curr_cmd.cmd,
&peer->curr_cmd.cmddata);
if (peer->cond == PEER_CLOSED)
io_close(peer->conn);
}
}
}
#define queue_cmd(peer, cb, arg) \
queue_cmd_((peer), \
typesafe_cb_preargs(void, void *, \
(cb), (arg), \
struct peer *), \
(arg))
static void queue_cmd_(struct peer *peer,
void (*dequeue)(struct peer *peer, void *arg),
void *arg)
{
struct pending_cmd *pend = tal(peer, struct pending_cmd);
pend->dequeue = dequeue;
pend->arg = arg;
list_add_tail(&peer->pending_cmd, &pend->list);
try_command(peer);
};
static void state_event(struct peer *peer,
const enum state_input input,
const union input *idata)
{
state_single(peer, input, idata);
try_command(peer);
}
static struct io_plan *pkt_out(struct io_conn *conn, struct peer *peer)
@ -141,36 +216,30 @@ static struct io_plan *pkt_out(struct io_conn *conn, struct peer *peer)
return peer_write_packet(conn, peer, out, pkt_out);
}
static void try_command(struct peer *peer)
{
while (peer->cond == PEER_CMD_OK && peer->cmd != INPUT_NONE)
update_state(peer, peer->cmd, &peer->cmddata);
if (peer->cond == PEER_CLOSED)
io_close(peer->conn);
}
static struct io_plan *pkt_in(struct io_conn *conn, struct peer *peer)
{
union input idata;
const tal_t *ctx = tal(peer, char);
idata.pkt = tal_steal(ctx, peer->inpkt);
update_state(peer, peer->inpkt->pkt_case, &idata);
state_event(peer, peer->inpkt->pkt_case, &idata);
/* Free peer->inpkt unless stolen above. */
tal_free(ctx);
/* If we've closed (above), don't try to read (we can call
* io_close multiple times with no harm). */
if (peer->cond == PEER_CLOSED)
return io_close(conn);
/* Ready for command? */
if (peer->cond == PEER_CMD_OK)
try_command(peer);
return peer_read_packet(conn, peer, pkt_in);
}
static void do_anchor_offer(struct peer *peer, void *unused)
{
set_current_command(peer, peer->us.offer_anchor, NULL, NULL);
}
/* Crypto is on, we are live. */
static struct io_plan *peer_crypto_on(struct io_conn *conn, struct peer *peer)
{
@ -178,9 +247,16 @@ static struct io_plan *peer_crypto_on(struct io_conn *conn, struct peer *peer)
peer_get_revocation_hash(peer, 0, &peer->us.revocation_hash);
assert(peer->state == STATE_INIT);
peer->cmd = peer->us.offer_anchor;
/* Using queue_cmd is overkill here, but it works. */
queue_cmd(peer, do_anchor_offer, NULL);
try_command(peer);
/* If we've closed (above), don't continue (we can call
* io_close multiple times with no harm). */
if (peer->cond == PEER_CLOSED)
return io_close(conn);
return io_duplex(conn,
peer_read_packet(conn, peer, pkt_in),
pkt_out(conn, peer));
@ -250,7 +326,8 @@ static struct peer *new_peer(struct lightningd_state *dstate,
peer->secrets = NULL;
list_head_init(&peer->watches);
peer->num_outpkt = 0;
peer->cmd = INPUT_NONE;
peer->curr_cmd.cmd = INPUT_NONE;
list_head_init(&peer->pending_cmd);
peer->current_htlc = NULL;
peer->num_htlcs = 0;
peer->close_tx = NULL;
@ -306,7 +383,6 @@ static struct io_plan *peer_connected_out(struct io_conn *conn,
connect->name, connect->port);
peer->anchor.satoshis = connect->satoshis;
peer->jsoncmd = NULL;
command_success(connect->cmd, null_response(connect));
return peer_crypto_setup(conn, peer, peer_crypto_on);
}
@ -320,7 +396,6 @@ static struct io_plan *peer_connected_in(struct io_conn *conn,
return io_close(conn);
log_info(peer->log, "Peer connected in");
peer->jsoncmd = NULL;
return peer_crypto_setup(conn, peer, peer_crypto_on);
}
@ -475,13 +550,13 @@ static void anchor_depthchange(struct peer *peer, int depth,
if (depth >= (int)peer->us.mindepth) {
enum state_input in = w->depthok;
w->depthok = INPUT_NONE;
update_state(peer, in, NULL);
state_event(peer, in, NULL);
}
} else {
if (depth < 0 && w->unspent != INPUT_NONE) {
enum state_input in = w->unspent;
w->unspent = INPUT_NONE;
update_state(peer, in, NULL);
state_event(peer, in, NULL);
}
}
}
@ -526,9 +601,9 @@ static void anchor_spent(struct peer *peer,
/* FIXME: change type in idata? */
idata.btc = (struct bitcoin_event *)tx;
if (txmatch(tx, peer->them.commit))
update_state(peer, w->theyspent, &idata);
state_event(peer, w->theyspent, &idata);
else
update_state(peer, w->otherspent, &idata);
state_event(peer, w->otherspent, &idata);
}
void peer_watch_anchor(struct peer *peer,
@ -576,7 +651,7 @@ void peer_watch_tx(struct peer *peer,
static void send_close_timeout(struct peer *peer)
{
update_state(peer, INPUT_CLOSE_COMPLETE_TIMEOUT, NULL);
state_event(peer, INPUT_CLOSE_COMPLETE_TIMEOUT, NULL);
}
void peer_watch_close(struct peer *peer,
@ -826,7 +901,7 @@ static void created_anchor(struct lightningd_state *dstate,
&peer->us.commit,
&peer->them.commit);
update_state(peer, BITCOIN_ANCHOR_CREATED, NULL);
state_event(peer, BITCOIN_ANCHOR_CREATED, NULL);
}
/* Start creation of the bitcoin anchor tx. */
@ -948,7 +1023,7 @@ static void json_getpeers(struct command *cmd,
json_object_start(response, NULL);
json_add_string(response, "name", log_prefix(p->log));
json_add_string(response, "state", state_name(p->state));
json_add_string(response, "cmd", input_name(p->cmd));
json_add_string(response, "cmd", input_name(p->curr_cmd.cmd));
/* This is only valid after crypto setup. */
if (p->state != STATE_INIT)
@ -975,6 +1050,61 @@ const struct json_command getpeers_command = {
"Returns a 'peers' array"
};
static void set_htlc_command(struct peer *peer,
struct channel_state *cstate,
struct command *jsoncmd,
struct channel_htlc *htlc,
enum state_input cmd,
const struct sha256 *r_fulfill)
{
assert(!peer->current_htlc);
peer->current_htlc = tal(peer, struct htlc_progress);
peer->current_htlc->cstate = tal_steal(peer->current_htlc, cstate);
peer->current_htlc->htlc = htlc;
if (r_fulfill)
peer->current_htlc->r = *r_fulfill;
peer_get_revocation_hash(peer, peer->num_htlcs+1,
&peer->current_htlc->our_revocation_hash);
/* FIXME: Do we need current_htlc as idata arg? */
set_current_command(peer, cmd, peer->current_htlc, jsoncmd);
}
struct newhtlc {
struct channel_htlc *htlc;
struct command *jsoncmd;
};
/* We do final checks just before we start command, as things may have
* changed. */
static void do_newhtlc(struct peer *peer, struct newhtlc *newhtlc)
{
struct channel_state *cstate;
/* Can we even offer this much? We check now, just before we
* execute. */
cstate = copy_funding(newhtlc, peer->cstate);
if (!funding_delta(peer->us.offer_anchor == CMD_OPEN_WITH_ANCHOR,
peer->anchor.satoshis,
0, newhtlc->htlc->msatoshis,
&cstate->a, &cstate->b)) {
command_fail(newhtlc->jsoncmd,
"Cannot afford %"PRIu64" milli-satoshis",
newhtlc->htlc->msatoshis);
return;
}
/* Add the htlc to our side of channel. */
funding_add_htlc(&cstate->a, newhtlc->htlc->msatoshis,
&newhtlc->htlc->expiry, &newhtlc->htlc->rhash);
set_htlc_command(peer, cstate, newhtlc->jsoncmd,
&cstate->a.htlcs[tal_count(cstate->a.htlcs)-1],
CMD_SEND_HTLC_UPDATE, NULL);
}
static void json_newhtlc(struct command *cmd,
const char *buffer, const jsmntok_t *params)
{
@ -982,7 +1112,7 @@ static void json_newhtlc(struct command *cmd,
jsmntok_t *idtok, *msatoshistok, *expirytok, *rhashtok;
struct pubkey id;
unsigned int expiry;
struct htlc_progress *cur;
struct newhtlc *newhtlc;
json_get_params(buffer, params,
"id", &idtok,
@ -1009,9 +1139,11 @@ static void json_newhtlc(struct command *cmd,
}
/* Attach to cmd until it's complete. */
cur = tal(cmd, struct htlc_progress);
cur->htlc = tal(cur, struct channel_htlc);
if (!json_tok_u64(buffer, msatoshistok, &cur->htlc->msatoshis)) {
newhtlc = tal(cmd, struct newhtlc);
newhtlc->htlc = tal(newhtlc, struct channel_htlc);
newhtlc->jsoncmd = cmd;
if (!json_tok_u64(buffer, msatoshistok, &newhtlc->htlc->msatoshis)) {
command_fail(cmd, "'%.*s' is not a valid number",
(int)(msatoshistok->end - msatoshistok->start),
buffer + msatoshistok->start);
@ -1024,7 +1156,7 @@ static void json_newhtlc(struct command *cmd,
return;
}
if (!seconds_to_abs_locktime(expiry, &cur->htlc->expiry)) {
if (!seconds_to_abs_locktime(expiry, &newhtlc->htlc->expiry)) {
command_fail(cmd, "'%.*s' is not a valid number",
(int)(expirytok->end - expirytok->start),
buffer + expirytok->start);
@ -1032,35 +1164,15 @@ static void json_newhtlc(struct command *cmd,
}
if (!hex_decode(buffer + rhashtok->start,
rhashtok->end - rhashtok->start,
&cur->htlc->rhash, sizeof(cur->htlc->rhash))) {
&newhtlc->htlc->rhash,
sizeof(newhtlc->htlc->rhash))) {
command_fail(cmd, "'%.*s' is not a valid sha256 hash",
(int)(rhashtok->end - rhashtok->start),
buffer + rhashtok->start);
return;
}
peer_get_revocation_hash(peer, peer->num_htlcs+1,
&cur->our_revocation_hash);
/* Can we even offer this much? */
cur->cstate = copy_funding(cur, peer->cstate);
if (!funding_delta(peer->us.offer_anchor == CMD_OPEN_WITH_ANCHOR,
peer->anchor.satoshis,
0, cur->htlc->msatoshis,
&cur->cstate->a, &cur->cstate->b)) {
command_fail(cmd, "Cannot afford %"PRIu64" milli-satoshis",
cur->htlc->msatoshis);
return;
}
/* Add the htlc to our side of channel. */
funding_add_htlc(&cur->cstate->a, cur->htlc->msatoshis,
&cur->htlc->expiry, &cur->htlc->rhash);
peer->current_htlc = tal_steal(peer, cur);
peer->jsoncmd = cmd;
/* FIXME: do we need this? */
peer->cmddata.htlc_prog = peer->current_htlc;
peer->cmd = CMD_SEND_HTLC_UPDATE;
queue_cmd(peer, do_newhtlc, newhtlc);
try_command(peer);
}
@ -1071,15 +1183,54 @@ const struct json_command newhtlc_command = {
"Returns an empty result on success"
};
struct fulfillhtlc {
struct command *jsoncmd;
struct sha256 r;
};
static void do_fullfill(struct peer *peer,
struct fulfillhtlc *fulfillhtlc)
{
struct channel_state *cstate;
struct sha256 rhash;
size_t i;
struct channel_htlc *htlc;
sha256(&rhash, &fulfillhtlc->r, sizeof(fulfillhtlc->r));
i = funding_find_htlc(&peer->cstate->b, &rhash);
if (i == tal_count(peer->cstate->b.htlcs)) {
command_fail(fulfillhtlc->jsoncmd,
"preimage htlc not found");
return;
}
/* Point at current one, since we remove from new cstate. */
htlc = &peer->cstate->b.htlcs[i];
cstate = copy_funding(fulfillhtlc, peer->cstate);
/* This should never fail! */
if (!funding_delta(peer->them.offer_anchor == CMD_OPEN_WITH_ANCHOR,
peer->anchor.satoshis,
-htlc->msatoshis,
-htlc->msatoshis,
&cstate->b, &cstate->a)) {
fatal("Unexpected failure fulfilling HTLC of %"PRIu64
" milli-satoshis", htlc->msatoshis);
return;
}
funding_remove_htlc(&cstate->b, i);
set_htlc_command(peer, cstate, fulfillhtlc->jsoncmd, htlc,
CMD_SEND_HTLC_FULFILL, &fulfillhtlc->r);
}
static void json_fulfillhtlc(struct command *cmd,
const char *buffer, const jsmntok_t *params)
{
struct peer *peer;
jsmntok_t *idtok, *rtok;
struct pubkey id;
struct sha256 rhash;
struct htlc_progress *cur;
size_t i;
struct fulfillhtlc *fulfillhtlc;
json_get_params(buffer, params,
"id", &idtok,
@ -1103,61 +1254,66 @@ static void json_fulfillhtlc(struct command *cmd,
return;
}
/* Attach to cmd until it's complete. */
cur = tal(cmd, struct htlc_progress);
fulfillhtlc = tal(cmd, struct fulfillhtlc);
fulfillhtlc->jsoncmd = cmd;
if (!hex_decode(buffer + rtok->start,
rtok->end - rtok->start,
&cur->r, sizeof(cur->r))) {
&fulfillhtlc->r, sizeof(fulfillhtlc->r))) {
command_fail(cmd, "'%.*s' is not a valid sha256 preimage",
(int)(rtok->end - rtok->start),
buffer + rtok->start);
return;
}
sha256(&rhash, &cur->r, sizeof(cur->r));
queue_cmd(peer, do_fullfill, fulfillhtlc);
try_command(peer);
}
const struct json_command fulfillhtlc_command = {
"fulfillhtlc",
json_fulfillhtlc,
"Redeem htlc proposed by {id} using {r}",
"Returns an empty result on success"
};
i = funding_find_htlc(&peer->cstate->b, &rhash);
struct failhtlc {
struct command *jsoncmd;
struct sha256 rhash;
};
static void do_failhtlc(struct peer *peer,
struct failhtlc *failhtlc)
{
struct channel_state *cstate;
size_t i;
struct channel_htlc *htlc;
i = funding_find_htlc(&peer->cstate->b, &failhtlc->rhash);
if (i == tal_count(peer->cstate->b.htlcs)) {
command_fail(cmd, "'%.*s' preimage htlc not found",
(int)(rtok->end - rtok->start),
buffer + rtok->start);
command_fail(failhtlc->jsoncmd, "htlc not found");
return;
}
cur->htlc = &peer->cstate->b.htlcs[i];
/* Point to current one, since we remove from new cstate. */
htlc = &peer->cstate->b.htlcs[i];
cstate = copy_funding(failhtlc, peer->cstate);
/* Removing it should not fail: we gain HTLC amount */
cur->cstate = copy_funding(cur, peer->cstate);
/* This should never fail! */
if (!funding_delta(peer->them.offer_anchor == CMD_OPEN_WITH_ANCHOR,
peer->anchor.satoshis,
-cur->htlc->msatoshis,
-cur->htlc->msatoshis,
&cur->cstate->b, &cur->cstate->a)) {
fatal("Unexpected failure fulfilling HTLC of %"PRIu64
" milli-satoshis", cur->htlc->msatoshis);
0,
-htlc->msatoshis,
&cstate->b, &cstate->a)) {
fatal("Unexpected failure routefailing HTLC of %"PRIu64
" milli-satoshis", htlc->msatoshis);
return;
}
funding_remove_htlc(&cur->cstate->b, i);
funding_remove_htlc(&cstate->b, i);
peer_get_revocation_hash(peer, peer->num_htlcs+1,
&cur->our_revocation_hash);
peer->current_htlc = tal_steal(peer, cur);
peer->jsoncmd = cmd;
/* FIXME: do we need this? */
peer->cmddata.htlc_prog = peer->current_htlc;
peer->cmd = CMD_SEND_HTLC_FULFILL;
try_command(peer);
set_htlc_command(peer, cstate, failhtlc->jsoncmd, htlc,
CMD_SEND_HTLC_ROUTEFAIL, NULL);
}
const struct json_command fulfillhtlc_command = {
"fulfillhtlc",
json_fulfillhtlc,
"Redeem htlc proposed by {id} using {r}",
"Returns an empty result on success"
};
static void json_failhtlc(struct command *cmd,
const char *buffer, const jsmntok_t *params)
@ -1165,9 +1321,7 @@ static void json_failhtlc(struct command *cmd,
struct peer *peer;
jsmntok_t *idtok, *rhashtok;
struct pubkey id;
struct sha256 rhash;
struct htlc_progress *cur;
size_t i;
struct failhtlc *failhtlc;
json_get_params(buffer, params,
"id", &idtok,
@ -1191,49 +1345,19 @@ static void json_failhtlc(struct command *cmd,
return;
}
/* Attach to cmd until it's complete. */
cur = tal(cmd, struct htlc_progress);
failhtlc = tal(cmd, struct failhtlc);
failhtlc->jsoncmd = cmd;
if (!hex_decode(buffer + rhashtok->start,
rhashtok->end - rhashtok->start,
&rhash, sizeof(rhash))) {
&failhtlc->rhash, sizeof(failhtlc->rhash))) {
command_fail(cmd, "'%.*s' is not a valid sha256 preimage",
(int)(rhashtok->end - rhashtok->start),
buffer + rhashtok->start);
return;
}
i = funding_find_htlc(&peer->cstate->b, &rhash);
if (i == tal_count(peer->cstate->b.htlcs)) {
command_fail(cmd, "'%.*s' htlc not found",
(int)(rhashtok->end - rhashtok->start),
buffer + rhashtok->start);
return;
}
cur->htlc = &peer->cstate->b.htlcs[i];
/* Removing it should not fail: they gain HTLC amount */
cur->cstate = copy_funding(cur, peer->cstate);
if (!funding_delta(peer->them.offer_anchor == CMD_OPEN_WITH_ANCHOR,
peer->anchor.satoshis,
0,
-cur->htlc->msatoshis,
&cur->cstate->b, &cur->cstate->a)) {
fatal("Unexpected failure routefailing HTLC of %"PRIu64
" milli-satoshis", cur->htlc->msatoshis);
return;
}
funding_remove_htlc(&cur->cstate->b, i);
peer_get_revocation_hash(peer, peer->num_htlcs+1,
&cur->our_revocation_hash);
peer->current_htlc = tal_steal(peer, cur);
peer->jsoncmd = cmd;
/* FIXME: do we need this? */
peer->cmddata.htlc_prog = peer->current_htlc;
peer->cmd = CMD_SEND_HTLC_ROUTEFAIL;
queue_cmd(peer, do_failhtlc, failhtlc);
try_command(peer);
}

13
daemon/peer.h

@ -57,11 +57,16 @@ struct peer {
/* Network connection. */
struct io_conn *conn;
/* Current command (if any) */
enum state_input cmd;
union input cmddata;
struct command *jsoncmd;
/* Current command (or INPUT_NONE) */
struct {
enum state_input cmd;
union input cmddata;
struct command *jsoncmd;
} curr_cmd;
/* Pending commands. */
struct list_head pending_cmd;
/* Global state. */
struct lightningd_state *dstate;

Loading…
Cancel
Save