@ -126,6 +126,9 @@ struct peer {
/* If we die, should we reach again? */
bool reach_again ;
/* Waiting to send_peer_with_fds to master? */
const u8 * send_to_master ;
} ;
struct addrhint {
@ -140,7 +143,7 @@ struct addrhint {
/* FIXME: Reorder */
static struct io_plan * peer_start_gossip ( struct io_conn * conn ,
struct peer * peer ) ;
static void send_peer_with_fds ( struct peer * peer , const u8 * msg ) ;
static bool send_peer_with_fds ( struct peer * peer , const u8 * msg ) ;
static void wake_pkt_out ( struct peer * peer ) ;
static void try_reach_peer ( struct daemon * daemon , const struct pubkey * id ) ;
@ -191,6 +194,7 @@ static struct peer *new_peer(const tal_t *ctx,
peer - > daemon = daemon ;
peer - > local = true ;
peer - > reach_again = false ;
peer - > send_to_master = NULL ;
peer - > num_pings_outstanding = 0 ;
peer - > broadcast_index = 0 ;
msg_queue_init ( & peer - > peer_out , peer ) ;
@ -295,9 +299,11 @@ static struct io_plan *peer_init_received(struct io_conn *conn,
* gossipfd closed ( forget_peer ) or reconnect . */
peer_finalized ( peer ) ;
/* We will not have anything queued, since we're not duplex. */
msg = towire_gossip_peer_connected ( peer , & peer - > id , & peer - > pcs . cs ,
peer - > gfeatures , peer - > lfeatures ) ;
send_peer_with_fds ( peer , msg ) ;
if ( ! send_peer_with_fds ( peer , msg ) )
return io_close ( conn ) ;
/* Start the gossip flowing. */
/* FIXME: This is a bit wasteful in the common case where master
@ -398,10 +404,49 @@ static void handle_pong(struct peer *peer, const u8 *pong)
tal_len ( pong ) ) ) ) ;
}
/* If master asks us to release peer, we attach this destructor in case it
* dies while we ' re waiting for it to finish IO */
static void fail_release ( struct peer * peer )
{
u8 * msg = towire_gossipctl_release_peer_replyfail ( peer ) ;
daemon_conn_send ( & peer - > daemon - > master , take ( msg ) ) ;
}
static struct io_plan * wait_until_ready_for_master ( struct io_conn * conn ,
struct peer * peer )
{
/* One of these is always true, since we've just finished read/write */
if ( ! peer_in_started ( conn , & peer - > pcs )
& & ! peer_out_started ( conn , & peer - > pcs ) ) {
if ( send_peer_with_fds ( peer , take ( peer - > send_to_master ) ) ) {
/* In case we set this earlier. */
tal_del_destructor ( peer , fail_release ) ;
peer - > send_to_master = NULL ;
return io_close_taken_fd ( conn ) ;
} else
return io_close ( conn ) ;
}
/* Don't do any more I/O. */
return io_wait ( conn , peer , wait_until_ready_for_master , peer ) ;
}
static struct io_plan * peer_msgin ( struct io_conn * conn ,
struct peer * peer , u8 * msg ) ;
/* Wrapper around peer_read_message: don't read another if we want to
* pass up to master */
static struct io_plan * peer_next_in ( struct io_conn * conn , struct peer * peer )
{
if ( peer - > send_to_master )
return wait_until_ready_for_master ( conn , peer ) ;
return peer_read_message ( conn , & peer - > pcs , peer_msgin ) ;
}
static struct io_plan * peer_msgin ( struct io_conn * conn ,
struct peer * peer , u8 * msg )
{
u8 * s ;
enum wire_type t = fromwire_peektype ( msg ) ;
switch ( t ) {
@ -415,15 +460,15 @@ static struct io_plan *peer_msgin(struct io_conn *conn,
case WIRE_NODE_ANNOUNCEMENT :
case WIRE_CHANNEL_UPDATE :
handle_gossip_msg ( peer - > daemon - > rstate , msg ) ;
return peer_read_message ( conn , & peer - > pcs , peer_msgin ) ;
return peer_next_in ( conn , peer ) ;
case WIRE_PING :
handle_ping ( peer , msg ) ;
return peer_read_message ( conn , & peer - > pcs , peer_msgin ) ;
return peer_next_in ( conn , peer ) ;
case WIRE_PONG :
handle_pong ( peer , msg ) ;
return peer_read_message ( conn , & peer - > pcs , peer_msgin ) ;
return peer_next_in ( conn , peer ) ;
case WIRE_OPEN_CHANNEL :
case WIRE_CHANNEL_REESTABLISH :
@ -443,13 +488,15 @@ static struct io_plan *peer_msgin(struct io_conn *conn,
case WIRE_REVOKE_AND_ACK :
case WIRE_INIT :
/* Not our place to handle this, so we punt */
s = towire_gossip_peer_nongossip ( msg , & peer - > id ,
& peer - > pcs . cs ,
peer - > gfeatures ,
peer - > lfeatures ,
msg ) ;
send_peer_with_fds ( peer , take ( s ) ) ;
return io_close_taken_fd ( conn ) ;
peer - > send_to_master
= towire_gossip_peer_nongossip ( peer , & peer - > id ,
& peer - > pcs . cs ,
peer - > gfeatures ,
peer - > lfeatures ,
msg ) ;
/* This will wait. */
return peer_next_in ( conn , peer ) ;
}
/* BOLT #1:
@ -463,7 +510,7 @@ static struct io_plan *peer_msgin(struct io_conn *conn,
} else
peer_error ( peer , " Unknown packet %u " , t ) ;
return peer_read_message ( conn , & peer - > pcs , peer_msgin ) ;
return peer_next_in ( conn , peer ) ;
}
/* Wake up the outgoing direction of the connection and write any
@ -495,6 +542,10 @@ static struct io_plan *peer_pkt_out(struct io_conn *conn, struct peer *peer)
peer_pkt_out ) ;
}
/* Do we want to send this peer to the master daemon? */
if ( peer - > send_to_master )
return wait_until_ready_for_master ( conn , peer ) ;
/* If we're supposed to be sending gossip, do so now. */
if ( peer - > gossip_sync ) {
struct queued_message * next ;
@ -518,7 +569,7 @@ static struct io_plan *peer_start_gossip(struct io_conn *conn, struct peer *peer
{
wake_pkt_out ( peer ) ;
return io_duplex ( conn ,
peer_read_message ( conn , & peer - > pcs , peer_msgin ) ,
peer_next_in ( conn , peer ) ,
peer_pkt_out ( conn , peer ) ) ;
}
@ -554,7 +605,7 @@ static void forget_peer(struct io_conn *conn, struct daemon_conn *dc)
/* When a peer is to be owned by another daemon, we create a socket
* pair to send / receive gossip from it */
static void send_peer_with_fds ( struct peer * peer , const u8 * msg )
static bool send_peer_with_fds ( struct peer * peer , const u8 * msg )
{
int fds [ 2 ] ;
@ -564,7 +615,7 @@ static void send_peer_with_fds(struct peer *peer, const u8 *msg)
/* FIXME: Send error to peer? */
/* Peer will be freed when caller closes conn. */
return ;
return false ;
}
/* Now we talk to socket to get to peer's owner daemon. */
@ -583,6 +634,7 @@ static void send_peer_with_fds(struct peer *peer, const u8 *msg)
/* Don't get confused: we can't use this any more. */
peer - > fd = - 1 ;
return true ;
}
/**
@ -678,7 +730,7 @@ static struct io_plan *handle_peer(struct io_conn *conn, struct daemon *daemon,
static struct io_plan * release_peer ( struct io_conn * conn , struct daemon * daemon ,
const u8 * msg )
{
{
struct pubkey id ;
struct peer * peer ;
@ -686,20 +738,24 @@ static struct io_plan *release_peer(struct io_conn *conn, struct daemon *daemon,
master_badmsg ( WIRE_GOSSIPCTL_RELEASE_PEER , msg ) ;
peer = find_peer ( daemon , & id ) ;
if ( ! peer | | ! peer - > local ) {
status_trace ( " release_peer: peer %s not %s " ,
type_to_string ( trc , struct pubkey , & id ) ,
peer ? " local " : " found " ) ;
if ( ! peer | | ! peer - > local | | peer - > send_to_master ) {
/* This can happen with dying peers, or reconnect */
status_trace ( " release_peer: peer %s %s " ,
type_to_string ( trc , struct pubkey , & id ) ,
! peer ? " not found "
: ! peer - > send_to_master ? " already releasing "
: " not local " ) ;
msg = towire_gossipctl_release_peer_replyfail ( msg ) ;
daemon_conn_send ( & daemon - > master , take ( msg ) ) ;
} else {
msg = towire_gossipctl_release_peer_reply ( msg ,
msg = towire_gossipctl_release_peer_reply ( peer ,
& peer - > pcs . cs ,
peer - > gfeatures ,
peer - > lfeatures ) ;
send_peer_with_fds ( peer , take ( msg ) ) ;
io_close_taken_fd ( peer - > conn ) ;
peer - > send_to_master = msg ;
/* Wake output, in case it's idle. */
msg_wake ( & peer - > peer_out ) ;
}
return daemon_conn_read_next ( conn , & daemon - > master ) ;
}