Browse Source

gossip: Moving to intmap-based broadcast for the legacy daemon

Moved the broadcast functionality to broadcast.[ch]. So far this
includes only the enqueuing side of broadcasts, the dequeuing and
actual push to the peer is daemon dependent. This also adds the
broadcast_state to the routing_state and the last broadcast index to
the peer for the legacy daemon.
ppa-0.6.1
Christian Decker 8 years ago
committed by Rusty Russell
parent
commit
76e2c980e1
  1. 2
      daemon/Makefile
  2. 39
      daemon/broadcast.c
  3. 43
      daemon/broadcast.h
  4. 97
      daemon/p2p_announce.c
  5. 1
      daemon/p2p_announce.h
  6. 1
      daemon/peer.c
  7. 3
      daemon/peer.h
  8. 1
      daemon/routing.c
  9. 3
      daemon/routing.h
  10. 1
      lightningd/Makefile

2
daemon/Makefile

@ -19,6 +19,7 @@ DAEMON_LIB_OBJS := $(DAEMON_LIB_SRC:.c=.o)
DAEMON_SRC := \
daemon/bitcoind.c \
daemon/broadcast.c \
daemon/chaintopology.c \
daemon/channel.c \
daemon/commit_tx.c \
@ -66,6 +67,7 @@ DAEMON_GEN_HEADERS := \
DAEMON_HEADERS := \
daemon/bitcoind.h \
daemon/broadcast.h \
daemon/chaintopology.h \
daemon/channel.h \
daemon/commit_tx.h \

39
daemon/broadcast.c

@ -0,0 +1,39 @@
#include "daemon/broadcast.h"
struct broadcast_state *new_broadcast_state(tal_t *ctx)
{
struct broadcast_state *bstate = tal(ctx, struct broadcast_state);
uintmap_init(&bstate->broadcasts);
/* Skip 0 because we initialize peers with 0 */
bstate->next_index = 1;
return bstate;
}
static struct queued_message *new_queued_message(tal_t *ctx,
const int type,
const u8 *tag,
const u8 *payload)
{
struct queued_message *msg = tal(ctx, struct queued_message);
msg->type = type;
msg->tag = tal_dup_arr(msg, u8, tag, tal_count(tag), 0);
msg->payload = tal_dup_arr(msg, u8, payload, tal_count(payload), 0);
return msg;
}
void queue_broadcast(struct broadcast_state *bstate,
const int type,
const u8 *tag,
const u8 *payload)
{
struct queued_message *msg = new_queued_message(bstate, type, tag, payload);
/*FIXME(cdecker) Walk through old messages and purge collisions */
uintmap_add(&bstate->broadcasts, bstate->next_index, msg);
bstate->next_index += 1;
}
struct queued_message *next_broadcast_message(struct broadcast_state *bstate, u64 *last_index)
{
return uintmap_after(&bstate->broadcasts, last_index);
}

43
daemon/broadcast.h

@ -0,0 +1,43 @@
#ifndef LIGHTNING_DAEMON_BROADCAST_H
#define LIGHTNING_DAEMON_BROADCAST_H
#include "config.h"
#include <ccan/intmap/intmap.h>
#include <ccan/list/list.h>
#include <ccan/short_types/short_types.h>
#include <ccan/tal/tal.h>
/* Common functionality to implement staggered broadcasts with replacement. */
struct queued_message {
int type;
/* Unique tag specifying the msg origin */
void *tag;
/* Timestamp for `channel_update`s and `node_announcement`s, 0
* for `channel_announcement`s */
/*u32 timestamp;*/
/* Serialized payload */
u8 *payload;
//FIXME(cdecker) Remove after migrating to intmap
struct list_node list;
};
struct broadcast_state {
u32 next_index;
UINTMAP(struct queued_message *) broadcasts;
};
struct broadcast_state *new_broadcast_state(tal_t *ctx);
void queue_broadcast(struct broadcast_state *bstate,
const int type,
const u8 *tag,
const u8 *payload);
struct queued_message *next_broadcast_message(struct broadcast_state *bstate, u64 *last_index);
#endif /* LIGHTNING_DAEMON_BROADCAST_H */

97
daemon/p2p_announce.c

@ -1,3 +1,4 @@
#include "daemon/broadcast.h"
#include "daemon/chaintopology.h"
#include "daemon/log.h"
#include "daemon/p2p_announce.h"
@ -12,62 +13,6 @@
#include <ccan/tal/tal.h>
#include <secp256k1.h>
struct queued_message {
int type;
/* Unique tag specifying the msg origin */
void *tag;
/* Timestamp for `channel_update`s and `node_announcement`s, 0
* for `channel_announcement`s */
u32 timestamp;
/* Serialized payload */
u8 *payload;
struct list_node list;
};
static void broadcast(struct lightningd_state *dstate,
int type, u8 *pkt,
struct peer *origin)
{
struct peer *p;
list_for_each(&dstate->peers, p, list) {
if (state_is_normal(p->state) && origin != p)
queue_pkt_nested(p, type, pkt);
}
}
static void queue_broadcast(struct lightningd_state *dstate,
const int type,
const u32 timestamp,
const u8 *tag,
const u8 *payload)
{
struct queued_message *el, *msg;
list_for_each(&dstate->broadcast_queue, el, list) {
if (el->type == type &&
tal_count(tag) == tal_count(el->tag) &&
memcmp(el->tag, tag, tal_count(tag)) == 0 &&
el->timestamp < timestamp){
/* Found a replacement */
el->payload = tal_free(el->payload);
el->payload = tal_dup_arr(el, u8, payload, tal_count(payload), 0);
el->timestamp = timestamp;
return;
}
}
/* No match found, add a new message to the queue */
msg = tal(dstate, struct queued_message);
msg->type = type;
msg->timestamp = timestamp;
msg->tag = tal_dup_arr(msg, u8, tag, tal_count(tag), 0);
msg->payload = tal_dup_arr(msg, u8, payload, tal_count(payload), 0);
list_add_tail(&dstate->broadcast_queue, &msg->list);
}
void handle_channel_announcement(
struct peer *peer,
const u8 *announce, size_t len)
@ -124,8 +69,7 @@ void handle_channel_announcement(
u8 *tag = tal_arr(tmpctx, u8, 0);
towire_channel_id(&tag, &channel_id);
queue_broadcast(peer->dstate, WIRE_CHANNEL_ANNOUNCEMENT,
0, /* `channel_announcement`s do not have a timestamp */
queue_broadcast(peer->dstate->rstate->broadcasts, WIRE_CHANNEL_ANNOUNCEMENT,
tag, serialized);
tal_free(tmpctx);
@ -194,9 +138,8 @@ void handle_channel_update(struct peer *peer, const u8 *update, size_t len)
u8 *tag = tal_arr(tmpctx, u8, 0);
towire_channel_id(&tag, &channel_id);
queue_broadcast(peer->dstate,
queue_broadcast(peer->dstate->rstate->broadcasts,
WIRE_CHANNEL_UPDATE,
timestamp,
tag,
serialized);
@ -265,9 +208,8 @@ void handle_node_announcement(
u8 *tag = tal_arr(tmpctx, u8, 0);
towire_pubkey(&tag, &node_id);
queue_broadcast(peer->dstate,
queue_broadcast(peer->dstate->rstate->broadcasts,
WIRE_NODE_ANNOUNCEMENT,
timestamp,
tag,
serialized);
tal_free(node->node_announcement);
@ -309,8 +251,9 @@ static void broadcast_channel_update(struct lightningd_state *dstate, struct pee
1,
dstate->config.fee_base,
dstate->config.fee_per_satoshi);
broadcast(dstate, WIRE_CHANNEL_UPDATE, serialized, NULL);
u8 *tag = tal_arr(tmpctx, u8, 0);
towire_channel_id(&tag, &channel_id);
queue_broadcast(dstate->rstate->broadcasts, WIRE_CHANNEL_UPDATE, tag, serialized);
tal_free(tmpctx);
}
@ -346,7 +289,10 @@ static void broadcast_node_announcement(struct lightningd_state *dstate)
&dstate->id, rgb_color, alias,
NULL,
address);
broadcast(dstate, WIRE_NODE_ANNOUNCEMENT, serialized, NULL);
u8 *tag = tal_arr(tmpctx, u8, 0);
towire_pubkey(&tag, &dstate->id);
queue_broadcast(dstate->rstate->broadcasts, WIRE_NODE_ANNOUNCEMENT, tag,
serialized);
tal_free(tmpctx);
}
@ -422,7 +368,10 @@ static void broadcast_channel_announcement(struct lightningd_state *dstate, stru
bitcoin_key[0],
bitcoin_key[1],
NULL);
broadcast(dstate, WIRE_CHANNEL_ANNOUNCEMENT, serialized, NULL);
u8 *tag = tal_arr(tmpctx, u8, 0);
towire_channel_id(&tag, &channel_id);
queue_broadcast(dstate->rstate->broadcasts, WIRE_CHANNEL_ANNOUNCEMENT,
tag, serialized);
tal_free(tmpctx);
}
@ -456,11 +405,19 @@ void announce_channel(struct lightningd_state *dstate, struct peer *peer)
static void process_broadcast_queue(struct lightningd_state *dstate)
{
struct peer *p;
struct queued_message *msg;
new_reltimer(dstate, dstate, time_from_sec(30), process_broadcast_queue, dstate);
struct queued_message *el;
while ((el = list_pop(&dstate->broadcast_queue, struct queued_message, list)) != NULL) {
broadcast(dstate, el->type, el->payload, NULL);
tal_free(el);
list_for_each(&dstate->peers, p, list) {
if (!state_is_normal(p->state))
continue;
msg = next_broadcast_message(dstate->rstate->broadcasts,
&p->broadcast_index);
while (msg != NULL) {
queue_pkt_nested(p, msg->type, msg->payload);
msg = next_broadcast_message(dstate->rstate->broadcasts,
&p->broadcast_index);
}
}
}

1
daemon/p2p_announce.h

@ -1,6 +1,7 @@
#ifndef LIGHTNING_DAEMON_P2P_ANNOUNCE_H
#define LIGHTNING_DAEMON_P2P_ANNOUNCE_H
#include "config.h"
#include "daemon/broadcast.h"
#include "daemon/lightningd.h"
#include "daemon/routing.h"
#include "lightningd.h"

1
daemon/peer.c

@ -2772,6 +2772,7 @@ struct peer *new_peer(struct lightningd_state *dstate,
peer->fake_close = false;
peer->output_enabled = true;
peer->local.offer_anchor = offer_anchor;
peer->broadcast_index = 0;
if (!blocks_to_rel_locktime(dstate->config.locktime_blocks,
&peer->local.locktime))
fatal("Could not convert locktime_blocks");

3
daemon/peer.h

@ -231,6 +231,9 @@ struct peer {
/* this is where we will store their revocation preimages*/
struct shachain their_preimages;
/* High water mark for the staggered broadcast */
u64 broadcast_index;
};
/* Mapping for id -> network address. */

1
daemon/routing.c

@ -21,6 +21,7 @@ struct routing_state *new_routing_state(const tal_t *ctx, struct log *base_log)
struct routing_state *rstate = tal(ctx, struct routing_state);
rstate->base_log = base_log;
rstate->nodes = empty_node_map(rstate);
rstate->broadcasts = new_broadcast_state(rstate);
return rstate;
}

3
daemon/routing.h

@ -2,6 +2,7 @@
#define LIGHTNING_DAEMON_ROUTING_H
#include "config.h"
#include "bitcoin/pubkey.h"
#include "daemon/broadcast.h"
#include "wire/wire.h"
#include <ccan/htable/htable_type.h>
@ -83,6 +84,8 @@ struct routing_state {
struct node_map *nodes;
struct log *base_log;
struct broadcast_state *broadcasts;
};
//FIXME(cdecker) The log will have to be replaced for the new subdaemon, keeping for now to keep changes small.

1
lightningd/Makefile

@ -9,6 +9,7 @@ lightningd-all: lightningd/lightningd lightningd/lightningd_hsm lightningd/light
default: lightningd-all
LIGHTNINGD_OLD_SRC := \
daemon/broadcast.c \
daemon/configdir.c \
daemon/dns.c \
daemon/netaddr.c \

Loading…
Cancel
Save