@ -4,6 +4,7 @@
# include <ccan/container_of/container_of.h>
# include <ccan/crypto/hkdf_sha256/hkdf_sha256.h>
# include <ccan/crypto/shachain/shachain.h>
# include <ccan/err/err.h>
# include <ccan/fdpass/fdpass.h>
# include <ccan/io/io.h>
# include <ccan/mem/mem.h>
@ -44,7 +45,7 @@
# include <wire/wire_sync.h>
/* stdin == requests, 3 == peer, 4 = gossip, 5 = HSM */
# define REQ _FD STDIN_FILENO
# define MASTER _FD STDIN_FILENO
# define PEER_FD 3
# define GOSSIP_FD 4
# define HSM_FD 5
@ -92,12 +93,10 @@ struct peer {
struct io_conn * peer_conn ;
struct daemon_conn gossip_client ;
struct daemon_conn master ;
/* If we're waiting for a specific reply, defer other messages. */
enum channel_wire_type master_reply_type ;
void ( * handle_master_reply ) ( struct peer * peer , const u8 * msg ) ;
struct msg_queue master_deferred ;
/* Messages from master: we queue them since we might be waiting for
* a specific reply . */
struct msg_queue from_master ;
struct timers timers ;
struct oneshot * commit_timer ;
@ -337,13 +336,13 @@ static struct io_plan *handle_peer_funding_locked(struct io_conn *conn,
" Wrong channel id in %s " , tal_hex ( trc , msg ) ) ;
peer - > funding_locked [ REMOTE ] = true ;
daemon_conn_send ( & peer - > master ,
take ( towire_channel_got_funding_locked ( peer ,
wire_sync_write ( MASTER_FD ,
take ( towire_channel_got_funding_locked ( peer ,
& peer - > remote_per_commit ) ) ) ;
if ( peer - > funding_locked [ LOCAL ] ) {
daemon_conn_send ( & peer - > master ,
take ( towire_channel_normal_operation ( peer ) ) ) ;
wire_sync_write ( MASTER_FD ,
take ( towire_channel_normal_operation ( peer ) ) ) ;
}
send_announcement_signatures ( peer ) ;
@ -357,7 +356,7 @@ static void announce_channel(struct peer *peer)
send_channel_update ( peer , false ) ;
/* Tell the master that we just announced the channel,
* so it may announce the node */
daemon_conn_send ( & peer - > master , take ( towire_channel_announced ( peer ) ) ) ;
wire_sync_write ( MASTER_FD , take ( towire_channel_announced ( peer ) ) ) ;
}
static struct io_plan * handle_peer_announcement_signatures ( struct io_conn * conn ,
@ -487,41 +486,41 @@ static void maybe_send_shutdown(struct peer *peer)
peer - > shutdown_sent [ LOCAL ] = true ;
}
/* Master has acknowledged that we're sending commitment, so send it. */
static void handle_sending_commitsig_reply ( struct peer * peer , const u8 * msg )
/* This queues other traffic from the master until we get reply. */
static u8 * master_wait_sync_reply ( const tal_t * ctx ,
struct peer * peer , const u8 * msg ,
enum channel_wire_type replytype )
{
status_trace ( " Sending commit_sig with %zu htlc sigs " ,
tal_count ( peer - > next_commit_sigs - > htlc_sigs ) ) ;
u8 * reply ;
peer - > next_index [ REMOTE ] + + ;
status_trace ( " Sending master %s " ,
channel_wire_type_name ( fromwire_peektype ( msg ) ) ) ;
msg = towire_commitment_signed ( peer , & peer - > channel_id ,
& peer - > next_commit_sigs - > commit_sig ,
peer - > next_commit_sigs - > htlc_sigs ) ;
msg_enqueue ( & peer - > peer_out , take ( msg ) ) ;
peer - > next_commit_sigs = tal_free ( peer - > next_commit_sigs ) ;
if ( ! wire_sync_write ( MASTER_FD , msg ) )
status_failed ( WIRE_CHANNEL_INTERNAL_ERROR ,
" Could not set sync write to master: %s " ,
strerror ( errno ) ) ;
maybe_send_shutdown ( peer ) ;
status_trace ( " ... , awaiting %s " ,
channel_wire_type_name ( replytype ) ) ;
/* Timer now considered expired, you can add a new one. */
peer - > commit_timer = NULL ;
start_commit_timer ( peer ) ;
if ( shutdown_complete ( peer ) )
io_break ( peer ) ;
}
/* This blocks other traffic from the master until we get reply. */
static void master_sync_reply ( struct peer * peer , const u8 * msg ,
enum channel_wire_type replytype ,
void ( * handle ) ( struct peer * peer , const u8 * msg ) )
{
assert ( ! peer - > handle_master_reply ) ;
for ( ; ; ) {
reply = wire_sync_read ( ctx , MASTER_FD ) ;
if ( ! reply )
status_failed ( WIRE_CHANNEL_INTERNAL_ERROR ,
" Could not set sync read from master: %s " ,
strerror ( errno ) ) ;
if ( fromwire_peektype ( reply ) = = replytype ) {
status_trace ( " Got it! " ) ;
break ;
}
peer - > handle_master_reply = handle ;
peer - > master_reply_type = replytype ;
status_trace ( " Nope, got %s instead " ,
channel_wire_type_name ( fromwire_peektype ( reply ) ) ) ;
msg_enqueue ( & peer - > from_master , take ( reply ) ) ;
}
daemon_conn_send ( & peer - > master , msg ) ;
return reply ;
}
static struct commit_sigs * calc_commitsigs ( const tal_t * ctx ,
@ -615,10 +614,8 @@ static void send_commit(struct peer *peer)
/* FIXME: Document this requirement in BOLT 2! */
/* We can't send two commits in a row. */
if ( channel_awaiting_revoke_and_ack ( peer - > channel )
| | peer - > handle_master_reply ) {
status_trace ( " Can't send commit: waiting for revoke_and_ack %s " ,
peer - > handle_master_reply ? " processing " : " reply " ) ;
if ( channel_awaiting_revoke_and_ack ( peer - > channel ) ) {
status_trace ( " Can't send commit: waiting for revoke_and_ack " ) ;
/* Mark this as done and try again. */
peer - > commit_timer = NULL ;
start_commit_timer ( peer ) ;
@ -655,9 +652,30 @@ static void send_commit(struct peer *peer)
changed_htlcs ,
& peer - > next_commit_sigs - > commit_sig ,
peer - > next_commit_sigs - > htlc_sigs ) ;
master_sync_reply ( peer , take ( msg ) ,
WIRE_CHANNEL_SENDING_COMMITSIG_REPLY ,
handle_sending_commitsig_reply ) ;
/* Message is empty; receiving it is the point. */
master_wait_sync_reply ( tmpctx , peer , take ( msg ) ,
WIRE_CHANNEL_SENDING_COMMITSIG_REPLY ) ;
status_trace ( " Sending commit_sig with %zu htlc sigs " ,
tal_count ( peer - > next_commit_sigs - > htlc_sigs ) ) ;
peer - > next_index [ REMOTE ] + + ;
msg = towire_commitment_signed ( peer , & peer - > channel_id ,
& peer - > next_commit_sigs - > commit_sig ,
peer - > next_commit_sigs - > htlc_sigs ) ;
msg_enqueue ( & peer - > peer_out , take ( msg ) ) ;
peer - > next_commit_sigs = tal_free ( peer - > next_commit_sigs ) ;
maybe_send_shutdown ( peer ) ;
/* Timer now considered expired, you can add a new one. */
peer - > commit_timer = NULL ;
start_commit_timer ( peer ) ;
if ( shutdown_complete ( peer ) )
io_break ( peer ) ;
tal_free ( tmpctx ) ;
}
@ -834,12 +852,6 @@ static u8 *got_commitsig_msg(const tal_t *ctx,
return msg ;
}
/* Tell peer to continue now master has replied. */
static void handle_reply_wake_peer ( struct peer * peer , const u8 * msg )
{
io_wake ( peer ) ;
}
static struct io_plan * handle_peer_commit_sig ( struct io_conn * conn ,
struct peer * peer , const u8 * msg )
{
@ -957,12 +969,10 @@ static struct io_plan *handle_peer_commit_sig(struct io_conn *conn,
msg = got_commitsig_msg ( tmpctx , peer - > next_index [ LOCAL ] , & commit_sig ,
htlc_sigs , changed_htlcs , txs [ 0 ] ) ;
master_sync_reply ( peer , take ( msg ) ,
WIRE_CHANNEL_GOT_COMMITSIG_REPLY ,
handle_reply_wake_peer ) ;
/* And peer waits for reply. */
return io_wait ( conn , peer , send_revocation , peer ) ;
master_wait_sync_reply ( tmpctx , peer , take ( msg ) ,
WIRE_CHANNEL_GOT_COMMITSIG_REPLY ) ;
tal_free ( tmpctx ) ;
return send_revocation ( conn , peer ) ;
}
static u8 * got_revoke_msg ( const tal_t * ctx , u64 revoke_num ,
@ -1013,6 +1023,7 @@ static struct io_plan *handle_peer_revoke_and_ack(struct io_conn *conn,
struct privkey privkey ;
struct channel_id channel_id ;
struct pubkey per_commit_point , next_per_commit ;
tal_t * tmpctx = tal_tmpctx ( msg ) ;
const struct htlc * * changed_htlcs = tal_arr ( msg , const struct htlc * , 0 ) ;
if ( ! fromwire_revoke_and_ack ( msg , NULL , & channel_id , & old_commit_secret ,
@ -1059,12 +1070,11 @@ static struct io_plan *handle_peer_revoke_and_ack(struct io_conn *conn,
status_trace ( " No commits outstanding after recv revoke_and_ack " ) ;
/* Tell master about things this locks in, wait for response */
msg = got_revoke_msg ( msg , peer - > next_index [ REMOTE ] - 2 ,
msg = got_revoke_msg ( tmpctx , peer - > next_index [ REMOTE ] - 2 ,
& old_commit_secret , & next_per_commit ,
changed_htlcs ) ;
master_sync_reply ( peer , take ( msg ) ,
WIRE_CHANNEL_GOT_REVOKE_REPLY ,
handle_reply_wake_peer ) ;
master_wait_sync_reply ( tmpctx , peer , take ( msg ) ,
WIRE_CHANNEL_GOT_REVOKE_REPLY ) ;
peer - > old_remote_per_commit = peer - > remote_per_commit ;
peer - > remote_per_commit = next_per_commit ;
@ -1075,8 +1085,8 @@ static struct io_plan *handle_peer_revoke_and_ack(struct io_conn *conn,
type_to_string ( trc , struct pubkey ,
& peer - > old_remote_per_commit ) ) ;
/* And peer waits for reply. */
return io_wait ( conn , peer , accepted_revocatio n, peer ) ;
tal_free ( tmpctx ) ;
return accepted_revocation ( con n, peer ) ;
}
static struct io_plan * handle_peer_fulfill_htlc ( struct io_conn * conn ,
@ -1275,8 +1285,8 @@ static struct io_plan *handle_pong(struct io_conn *conn,
status_failed ( WIRE_CHANNEL_PEER_READ_FAILED , " Unexpected pong " ) ;
peer - > num_pings_outstanding - - ;
daemon_conn_send ( & peer - > master ,
take ( towire_channel_ping_reply ( pong , tal_len ( pong ) ) ) ) ;
wire_sync_write ( MASTER_FD ,
take ( towire_channel_ping_reply ( pong , tal_len ( pong ) ) ) ) ;
return peer_read_message ( conn , & peer - > pcs , peer_in ) ;
}
@ -1291,8 +1301,8 @@ static struct io_plan *handle_peer_shutdown(struct io_conn *conn,
status_failed ( WIRE_CHANNEL_PEER_READ_FAILED , " Bad shutdown " ) ;
/* Tell master, it will tell us what to send (if any). */
daemon_conn_send ( & peer - > master ,
take ( towire_channel_got_shutdown ( peer , scriptpubkey ) ) ) ;
wire_sync_write ( MASTER_FD ,
take ( towire_channel_got_shutdown ( peer , scriptpubkey ) ) ) ;
peer - > shutdown_sent [ REMOTE ] = true ;
if ( shutdown_complete ( peer ) )
@ -1681,8 +1691,8 @@ static void handle_funding_locked(struct peer *peer, const u8 *msg)
peer - > funding_locked [ LOCAL ] = true ;
if ( peer - > funding_locked [ REMOTE ] ) {
daemon_conn_send ( & peer - > master ,
take ( towire_channel_normal_operation ( peer ) ) ) ;
wire_sync_write ( MASTER_FD ,
take ( towire_channel_normal_operation ( peer ) ) ) ;
}
}
@ -1737,7 +1747,7 @@ static void handle_offer_htlc(struct peer *peer, const u8 *inmsg)
/* Tell the master. */
msg = towire_channel_offer_htlc_reply ( inmsg , peer - > htlc_id ,
0 , NULL ) ;
daemon_conn_send ( & peer - > master , take ( msg ) ) ;
wire_sync_write ( MASTER_FD , take ( msg ) ) ;
peer - > htlc_id + + ;
return ;
case CHANNEL_ERR_INVALID_EXPIRY :
@ -1776,7 +1786,7 @@ failed:
/* Note: tal_fmt doesn't set tal_len() to exact length, so fix here. */
tal_resize ( & failmsg , strlen ( failmsg ) + 1 ) ;
msg = towire_channel_offer_htlc_reply ( inmsg , 0 , failcode , ( u8 * ) failmsg ) ;
daemon_conn_send ( & peer - > master , take ( msg ) ) ;
wire_sync_write ( MASTER_FD , take ( msg ) ) ;
}
static void handle_preimage ( struct peer * peer , const u8 * inmsg )
@ -1885,8 +1895,8 @@ static void handle_ping_cmd(struct peer *peer, const u8 *inmsg)
* it MUST ignore the ` ping ` .
*/
if ( num_pong_bytes > = 65532 )
daemon_conn_send ( & peer - > master ,
take ( towire_channel_ping_reply ( peer , 0 ) ) ) ;
wire_sync_write ( MASTER_FD ,
take ( towire_channel_ping_reply ( peer , 0 ) ) ) ;
else
peer - > num_pings_outstanding + + ;
}
@ -1903,51 +1913,31 @@ static void handle_shutdown_cmd(struct peer *peer, const u8 *inmsg)
start_commit_timer ( peer ) ;
}
static struct io_plan * req_in ( struct io_conn * conn , struct daemon_conn * master )
static void req_in ( struct peer * peer , const u8 * msg )
{
struct peer * peer = container_of ( master , struct peer , master ) ;
enum channel_wire_type t = fromwire_peektype ( master - > msg_in ) ;
/* Waiting for something specific? Defer others. */
if ( peer - > handle_master_reply ) {
void ( * handle ) ( struct peer * peer , const u8 * msg ) ;
if ( t ! = peer - > master_reply_type ) {
msg_enqueue ( & peer - > master_deferred ,
take ( master - > msg_in ) ) ;
master - > msg_in = NULL ;
goto out_next ;
}
/* Just in case it resets this. */
handle = peer - > handle_master_reply ;
peer - > handle_master_reply = NULL ;
handle ( peer , master - > msg_in ) ;
goto out ;
}
enum channel_wire_type t = fromwire_peektype ( msg ) ;
switch ( t ) {
case WIRE_CHANNEL_FUNDING_LOCKED :
handle_funding_locked ( peer , master - > m sg_in ) ;
handle_funding_locked ( peer , msg ) ;
goto out ;
case WIRE_CHANNEL_FUNDING_ANNOUNCE_DEPTH :
handle_funding_announce_depth ( peer , master - > m sg_in ) ;
handle_funding_announce_depth ( peer , msg ) ;
goto out ;
case WIRE_CHANNEL_OFFER_HTLC :
handle_offer_htlc ( peer , master - > m sg_in ) ;
handle_offer_htlc ( peer , msg ) ;
goto out ;
case WIRE_CHANNEL_FULFILL_HTLC :
handle_preimage ( peer , master - > m sg_in ) ;
handle_preimage ( peer , msg ) ;
goto out ;
case WIRE_CHANNEL_FAIL_HTLC :
handle_fail ( peer , master - > m sg_in ) ;
handle_fail ( peer , msg ) ;
goto out ;
case WIRE_CHANNEL_PING :
handle_ping_cmd ( peer , master - > m sg_in ) ;
handle_ping_cmd ( peer , msg ) ;
goto out ;
case WIRE_CHANNEL_SEND_SHUTDOWN :
handle_shutdown_cmd ( peer , master - > m sg_in ) ;
handle_shutdown_cmd ( peer , msg ) ;
goto out ;
case WIRE_CHANNEL_BAD_COMMAND :
@ -1978,25 +1968,7 @@ static struct io_plan *req_in(struct io_conn *conn, struct daemon_conn *master)
channel_wire_type_name ( t ) ) ;
out :
/* In case we've now processed reply, process packet backlog. */
if ( ! peer - > handle_master_reply ) {
const u8 * msg = msg_dequeue ( & peer - > master_deferred ) ;
if ( msg ) {
/* Free old packet exactly like daemon_conn_read_next */
master - > msg_in = tal_free ( master - > msg_in ) ;
master - > msg_in = cast_const ( u8 * , tal_steal ( peer , msg ) ) ;
return req_in ( conn , master ) ;
}
}
out_next :
return daemon_conn_read_next ( conn , master ) ;
}
static void master_gone ( struct io_conn * unused , struct daemon_conn * dc )
{
/* Can't tell master, it's gone. */
exit ( 2 ) ;
tal_free ( msg ) ;
}
/* We do this synchronously. */
@ -2021,9 +1993,9 @@ static void init_channel(struct peer *peer)
u8 * msg ;
u32 feerate_per_kw ;
assert ( ! ( fcntl ( REQ _FD, F_GETFL ) & O_NONBLOCK ) ) ;
assert ( ! ( fcntl ( MASTER _FD, F_GETFL ) & O_NONBLOCK ) ) ;
msg = wire_sync_read ( peer , REQ _FD) ;
msg = wire_sync_read ( peer , MASTER _FD) ;
if ( ! fromwire_channel_init ( peer , msg , NULL ,
& peer - > chain_hash ,
& funding_txid , & funding_txout ,
@ -2070,9 +2042,7 @@ static void init_channel(struct peer *peer)
status_failed ( WIRE_CHANNEL_BAD_COMMAND , " Init: %s " ,
tal_hex ( msg , msg ) ) ;
/* After this we'll be async, so set up now. */
daemon_conn_init ( peer , & peer - > master , REQ_FD , req_in , master_gone ) ;
status_setup_async ( & peer - > master ) ;
status_setup_sync ( MASTER_FD ) ;
status_trace ( " init %s: remote_per_commit = %s, old_remote_per_commit = %s "
" next_idx_local = % " PRIu64
@ -2156,21 +2126,72 @@ static void send_shutdown_complete(struct peer *peer)
}
/* Now we can tell master shutdown is complete. */
daemon_conn_send ( & peer - > master ,
take ( towire_channel_shutdown_complete ( peer ,
& peer - > pcs . cs ) ) ) ;
daemon_conn_send_fd ( & peer - > master , PEER_FD ) ;
daemon_conn_send_fd ( & peer - > master , GOSSIP_FD ) ;
if ( ! daemon_conn_sync_flush ( & peer - > master ) )
status_failed ( WIRE_CHANNEL_INTERNAL_ERROR , " Flushing master " ) ;
wire_sync_write ( MASTER_FD ,
take ( towire_channel_shutdown_complete ( peer ,
& peer - > pcs . cs ) ) ) ;
fdpass_send ( MASTER_FD , PEER_FD ) ;
fdpass_send ( MASTER_FD , GOSSIP_FD ) ;
close ( MASTER_FD ) ;
}
static bool process_reqs ( struct peer * peer )
{
const u8 * msg ;
bool changed = false ;
/* In case we've deferred, process packet backlog. */
while ( ( msg = msg_dequeue ( & peer - > from_master ) ) ! = NULL ) {
status_trace ( " Now dealing with deferred %s " ,
channel_wire_type_name ( fromwire_peektype ( msg ) ) ) ;
req_in ( peer , msg ) ;
changed = true ;
}
return changed ;
}
static struct peer * peer ;
/* If this becomes a common pattern, we could make it a helper in common/ */
static int poll_with_masterfd ( struct pollfd * fds , nfds_t nfds , int timeout )
{
struct pollfd * fds_plus ;
int r ;
/* This can change things, so return as if poll found nothing. */
if ( process_reqs ( peer ) )
return 0 ;
/* Add master fd to fds. */
fds_plus = tal_dup_arr ( peer , struct pollfd , fds , nfds , 1 ) ;
fds_plus [ nfds ] . fd = MASTER_FD ;
fds_plus [ nfds ] . events = POLLIN ;
fds_plus [ nfds ] . revents = 0 ;
r = poll ( fds_plus , nfds + 1 , timeout ) ;
if ( r > 0 ) {
if ( fds_plus [ nfds ] . revents & POLLIN ) {
u8 * msg = wire_sync_read ( peer , MASTER_FD ) ;
if ( ! msg )
status_failed ( WIRE_CHANNEL_BAD_COMMAND ,
" Can't read command: %s " ,
strerror ( errno ) ) ;
msg_enqueue ( & peer - > from_master , take ( msg ) ) ;
r - - ;
} else if ( fds_plus [ nfds ] . revents & ( POLLHUP | POLLNVAL | POLLERR ) )
/* Can't report error, master gone. */
errx ( 2 , " Error polling master fd " ) ;
}
/* Copy back revents values */
memcpy ( fds , fds_plus , nfds * sizeof ( * fds ) ) ;
tal_free ( fds_plus ) ;
return r ;
}
int main ( int argc , char * argv [ ] )
{
struct peer * peer = tal ( NULL , struct peer ) ;
int i ;
if ( argc = = 2 & & streq ( argv [ 1 ] , " --version " ) ) {
printf ( " %s \n " , version ( ) ) ;
exit ( 0 ) ;
@ -2183,14 +2204,13 @@ int main(int argc, char *argv[])
secp256k1_ctx = secp256k1_context_create ( SECP256K1_CONTEXT_VERIFY
| SECP256K1_CONTEXT_SIGN ) ;
peer = tal ( NULL , struct peer ) ;
peer - > num_pings_outstanding = 0 ;
timers_init ( & peer - > timers , time_mono ( ) ) ;
peer - > commit_timer = NULL ;
peer - > have_sigs [ LOCAL ] = peer - > have_sigs [ REMOTE ] = false ;
peer - > announce_depth_reached = false ;
peer - > handle_master_reply = NULL ;
peer - > master_reply_type = 0 ;
msg_queue_init ( & peer - > master_deferred , peer ) ;
msg_queue_init ( & peer - > from_master , peer ) ;
msg_queue_init ( & peer - > peer_out , peer ) ;
peer - > next_commit_sigs = NULL ;
peer - > shutdown_sent [ LOCAL ] = false ;
@ -2212,6 +2232,9 @@ int main(int argc, char *argv[])
/* Read init_channel message sync. */
init_channel ( peer ) ;
/* Make sure we process and listen for master msgs. */
io_poll_override ( poll_with_masterfd ) ;
for ( ; ; ) {
struct timer * expired = NULL ;
io_loop ( & peer - > timers , & expired ) ;