Browse Source

gossip: Add timer and normal queue for messages

The peer is woken up every 30 seconds to deliver the backlog of
messages. Additionally I added the normal message queue to be able to
send non-gossip message to the peer.
ppa-0.6.1
Christian Decker 8 years ago
parent
commit
fccab6411d
  1. 28
      lightningd/gossip/gossip.c

28
lightningd/gossip/gossip.c

@ -59,6 +59,7 @@ struct peer {
/* High water mark for the staggered broadcast */ /* High water mark for the staggered broadcast */
u64 broadcast_index; u64 broadcast_index;
u8 **msg_out;
}; };
static void destroy_peer(struct peer *peer) static void destroy_peer(struct peer *peer)
@ -79,6 +80,7 @@ static struct peer *setup_new_peer(struct daemon *daemon, const u8 *msg)
peer->cs->peer = peer; peer->cs->peer = peer;
peer->daemon = daemon; peer->daemon = daemon;
peer->error = NULL; peer->error = NULL;
peer->msg_out = tal_arr(peer, u8*, 0);
list_add_tail(&daemon->peers, &peer->list); list_add_tail(&daemon->peers, &peer->list);
tal_add_destructor(peer, destroy_peer); tal_add_destructor(peer, destroy_peer);
return peer; return peer;
@ -154,6 +156,16 @@ static struct io_plan *peer_msgin(struct io_conn *conn,
* queued gossip messages and processes them. */ * queued gossip messages and processes them. */
static struct io_plan *pkt_out(struct io_conn *conn, struct peer *peer); static struct io_plan *pkt_out(struct io_conn *conn, struct peer *peer);
/* Wake up the outgoing direction of the connection and write any
* queued messages. Needed since the `io_wake` method signature does
* not allow us to specify it as the callback for `new_reltimer`.
*/
static void wake_pkt_out(struct peer *peer)
{
/* */
io_wake(peer);
}
/* Loop through the backlog of channel_{announcements,updates} and /* Loop through the backlog of channel_{announcements,updates} and
* node_announcements, writing out one on each iteration. Once we are * node_announcements, writing out one on each iteration. Once we are
* through wait for the broadcast interval and start again. */ * through wait for the broadcast interval and start again. */
@ -164,7 +176,8 @@ static struct io_plan *peer_dump_gossip(struct io_conn *conn, struct peer *peer)
peer->daemon->rstate->broadcasts, &peer->broadcast_index); peer->daemon->rstate->broadcasts, &peer->broadcast_index);
if (!next) { if (!next) {
//FIXME(cdecker) Add wakeup timer once timers are refactored. new_reltimer(&peer->daemon->timers, peer, time_from_sec(30), wake_pkt_out, peer);
/* Going to wake up in pkt_out since we mix time based and message based wakeups */
return io_out_wait(conn, peer, pkt_out, peer); return io_out_wait(conn, peer, pkt_out, peer);
} else { } else {
return peer_write_message(conn, peer->cs, next->payload, peer_dump_gossip); return peer_write_message(conn, peer->cs, next->payload, peer_dump_gossip);
@ -173,8 +186,17 @@ static struct io_plan *peer_dump_gossip(struct io_conn *conn, struct peer *peer)
static struct io_plan *pkt_out(struct io_conn *conn, struct peer *peer) static struct io_plan *pkt_out(struct io_conn *conn, struct peer *peer)
{ {
//FIXME(cdecker) Add logic to enable sending of non-broadcast messages /* First we process queued packets, if any */
/* Send any queued up messages */ u8 *out;
size_t n = tal_count(peer->msg_out);
if (n > 0) {
out = peer->msg_out[0];
memmove(peer->msg_out, peer->msg_out + 1, (sizeof(*peer->msg_out)*(n-1)));
tal_resize(&peer->msg_out, n-1);
return peer_write_message(conn, peer->cs, out, pkt_out);
}
/* Send any queued up broadcast messages */
return peer_dump_gossip(conn, peer); return peer_dump_gossip(conn, peer);
} }

Loading…
Cancel
Save