|
@ -64,7 +64,10 @@ struct peer { |
|
|
|
|
|
|
|
|
/* High water mark for the staggered broadcast */ |
|
|
/* High water mark for the staggered broadcast */ |
|
|
u64 broadcast_index; |
|
|
u64 broadcast_index; |
|
|
u8 **msg_out; |
|
|
|
|
|
|
|
|
/* Message queue for outgoing. */ |
|
|
|
|
|
struct msg_queue peer_out; |
|
|
|
|
|
|
|
|
/* Is it time to continue the staggered broadcast? */ |
|
|
/* Is it time to continue the staggered broadcast? */ |
|
|
bool gossip_sync; |
|
|
bool gossip_sync; |
|
|
|
|
|
|
|
@ -99,7 +102,7 @@ static struct peer *setup_new_peer(struct daemon *daemon, const u8 *msg) |
|
|
peer->daemon = daemon; |
|
|
peer->daemon = daemon; |
|
|
peer->error = NULL; |
|
|
peer->error = NULL; |
|
|
peer->local = true; |
|
|
peer->local = true; |
|
|
peer->msg_out = tal_arr(peer, u8*, 0); |
|
|
msg_queue_init(&peer->peer_out, peer); |
|
|
list_add_tail(&daemon->peers, &peer->list); |
|
|
list_add_tail(&daemon->peers, &peer->list); |
|
|
tal_add_destructor(peer, destroy_peer); |
|
|
tal_add_destructor(peer, destroy_peer); |
|
|
wake_pkt_out(peer); |
|
|
wake_pkt_out(peer); |
|
@ -239,7 +242,7 @@ static void wake_pkt_out(struct peer *peer) |
|
|
new_reltimer(&peer->daemon->timers, peer, time_from_sec(30), |
|
|
new_reltimer(&peer->daemon->timers, peer, time_from_sec(30), |
|
|
wake_pkt_out, peer); |
|
|
wake_pkt_out, peer); |
|
|
/* Notify the peer-write loop */ |
|
|
/* Notify the peer-write loop */ |
|
|
io_wake(peer); |
|
|
io_wake(&peer->peer_out); |
|
|
/* Notify the daemon_conn-write loop */ |
|
|
/* Notify the daemon_conn-write loop */ |
|
|
io_wake(&peer->owner_conn.out); |
|
|
io_wake(&peer->owner_conn.out); |
|
|
} |
|
|
} |
|
@ -256,7 +259,7 @@ static struct io_plan *peer_dump_gossip(struct io_conn *conn, struct peer *peer) |
|
|
if (!next) { |
|
|
if (!next) { |
|
|
/* Going to wake up in pkt_out since we mix time based and
|
|
|
/* Going to wake up in pkt_out since we mix time based and
|
|
|
* message based wakeups */ |
|
|
* message based wakeups */ |
|
|
return io_out_wait(conn, peer, pkt_out, peer); |
|
|
return msg_queue_wait(conn, &peer->peer_out, pkt_out, peer); |
|
|
} else { |
|
|
} else { |
|
|
/* Do not free the message after send, queue_broadcast takes
|
|
|
/* Do not free the message after send, queue_broadcast takes
|
|
|
* care of that */ |
|
|
* care of that */ |
|
@ -269,21 +272,16 @@ static struct io_plan *pkt_out(struct io_conn *conn, struct peer *peer) |
|
|
{ |
|
|
{ |
|
|
assert(peer->local); |
|
|
assert(peer->local); |
|
|
/* First we process queued packets, if any */ |
|
|
/* First we process queued packets, if any */ |
|
|
u8 *out; |
|
|
const u8 *out = msg_dequeue(&peer->peer_out); |
|
|
size_t n = tal_count(peer->msg_out); |
|
|
if (out) |
|
|
if (n > 0) { |
|
|
|
|
|
out = peer->msg_out[0]; |
|
|
|
|
|
memmove(peer->msg_out, peer->msg_out + 1, (sizeof(*peer->msg_out)*(n-1))); |
|
|
|
|
|
tal_resize(&peer->msg_out, n-1); |
|
|
|
|
|
return peer_write_message(conn, &peer->pcs, take(out), pkt_out); |
|
|
return peer_write_message(conn, &peer->pcs, take(out), pkt_out); |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if (peer->gossip_sync && peer->local){ |
|
|
if (peer->gossip_sync && peer->local){ |
|
|
/* Send any queued up broadcast messages */ |
|
|
/* Send any queued up broadcast messages */ |
|
|
peer->gossip_sync = false; |
|
|
peer->gossip_sync = false; |
|
|
return peer_dump_gossip(conn, peer); |
|
|
return peer_dump_gossip(conn, peer); |
|
|
} else { |
|
|
} else { |
|
|
return io_out_wait(conn, peer, pkt_out, peer); |
|
|
return msg_queue_wait(conn, &peer->peer_out, pkt_out, peer); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|