@ -226,11 +226,6 @@ static struct io_plan *peer_msgin(struct io_conn *conn,
return io_close ( conn ) ;
return io_close ( conn ) ;
}
}
/* Gets called by the outgoing IO loop when woken up. Sends messages
* to the peer if there are any queued . Also checks if we have any
* queued gossip messages and processes them . */
static struct io_plan * pkt_out ( struct io_conn * conn , struct peer * peer ) ;
/* Wake up the outgoing direction of the connection and write any
/* Wake up the outgoing direction of the connection and write any
* queued messages . Needed since the ` io_wake ` method signature does
* queued messages . Needed since the ` io_wake ` method signature does
* not allow us to specify it as the callback for ` new_reltimer ` , but
* not allow us to specify it as the callback for ` new_reltimer ` , but
@ -247,42 +242,30 @@ static void wake_pkt_out(struct peer *peer)
io_wake ( & peer - > owner_conn . out ) ;
io_wake ( & peer - > owner_conn . out ) ;
}
}
/* Loop through the backlog of channel_{announcements,updates} and
static struct io_plan * peer_pkt_out ( struct io_conn * conn , struct peer * peer )
* node_announcements , writing out one on each iteration . Once we are
* through wait for the broadcast interval and start again . */
static struct io_plan * peer_dump_gossip ( struct io_conn * conn , struct peer * peer )
{
struct queued_message * next ;
next = next_broadcast_message ( peer - > daemon - > rstate - > broadcasts ,
& peer - > broadcast_index ) ;
if ( ! next ) {
/* Going to wake up in pkt_out since we mix time based and
* message based wakeups */
return msg_queue_wait ( conn , & peer - > peer_out , pkt_out , peer ) ;
} else {
/* Do not free the message after send, queue_broadcast takes
* care of that */
return peer_write_message ( conn , & peer - > pcs , next - > payload ,
peer_dump_gossip ) ;
}
}
static struct io_plan * pkt_out ( struct io_conn * conn , struct peer * peer )
{
{
assert ( peer - > local ) ;
/* First priority is queued packets, if any */
/* First we process queued packets, if any */
const u8 * out = msg_dequeue ( & peer - > peer_out ) ;
const u8 * out = msg_dequeue ( & peer - > peer_out ) ;
if ( out )
if ( out )
return peer_write_message ( conn , & peer - > pcs , take ( out ) , pkt_out ) ;
return peer_write_message ( conn , & peer - > pcs , take ( out ) ,
peer_pkt_out ) ;
/* If we're supposed to be sending gossip, do so now. */
if ( peer - > gossip_sync ) {
struct queued_message * next ;
if ( peer - > gossip_sync & & peer - > local ) {
next = next_broadcast_message ( peer - > daemon - > rstate - > broadcasts ,
/* Send any queued up broadcast messages */
& peer - > broadcast_index ) ;
if ( next )
return peer_write_message ( conn , & peer - > pcs ,
next - > payload , peer_pkt_out ) ;
/* Gossip is drained. Wait for next timer. */
peer - > gossip_sync = false ;
peer - > gossip_sync = false ;
return peer_dump_gossip ( conn , peer ) ;
} else {
return msg_queue_wait ( conn , & peer - > peer_out , pkt_out , peer ) ;
}
}
return msg_queue_wait ( conn , & peer - > peer_out , peer_pkt_out , peer ) ;
}
}
static bool has_even_bit ( const u8 * bitmap )
static bool has_even_bit ( const u8 * bitmap )
@ -382,7 +365,7 @@ static struct io_plan *peer_parse_init(struct io_conn *conn,
* we both wait indefinitely */
* we both wait indefinitely */
return io_duplex ( conn ,
return io_duplex ( conn ,
peer_read_message ( conn , & peer - > pcs , peer_msgin ) ,
peer_read_message ( conn , & peer - > pcs , peer_msgin ) ,
peer_dump_gossip ( conn , peer ) ) ;
peer_pkt_out ( conn , peer ) ) ;
}
}
static struct io_plan * peer_init_sent ( struct io_conn * conn , struct peer * peer )
static struct io_plan * peer_init_sent ( struct io_conn * conn , struct peer * peer )