@ -9,7 +9,8 @@
# define GOSSIP_STORE_FILENAME "gossip_store"
# define GOSSIP_STORE_FILENAME "gossip_store"
struct gossip_store {
struct gossip_store {
int read_fd , write_fd ;
int fd ;
off_t read_pos , write_pos ;
/* What was the size of the gossip_store when we started replaying
/* What was the size of the gossip_store when we started replaying
* it ? */
* it ? */
@ -18,17 +19,16 @@ struct gossip_store {
static void gossip_store_destroy ( struct gossip_store * gs )
static void gossip_store_destroy ( struct gossip_store * gs )
{
{
if ( gs - > read_fd ! = - 1 )
close ( gs - > fd ) ;
close ( gs - > read_fd ) ;
close ( gs - > write_fd ) ;
}
}
struct gossip_store * gossip_store_new ( const tal_t * ctx )
struct gossip_store * gossip_store_new ( const tal_t * ctx )
{
{
struct gossip_store * gs = tal ( ctx , struct gossip_store ) ;
struct gossip_store * gs = tal ( ctx , struct gossip_store ) ;
gs - > write_fd = open ( GOSSIP_STORE_FILENAME , O_RDWR | O_APPEND | O_CREAT , 0600 ) ;
gs - > fd = open ( GOSSIP_STORE_FILENAME , O_RDWR | O_APPEND | O_CREAT , 0600 ) ;
gs - > read_fd = open ( GOSSIP_STORE_FILENAME , O_RDONLY ) ;
gs - > read_pos = 0 ;
gs - > replaysize = lseek ( gs - > write_fd , 0 , SEEK_END ) ;
gs - > write_pos = lseek ( gs - > fd , 0 , SEEK_END ) ;
gs - > replaysize = gs - > write_pos ;
tal_add_destructor ( gs , gossip_store_destroy ) ;
tal_add_destructor ( gs , gossip_store_destroy ) ;
@ -40,17 +40,11 @@ void gossip_store_append(struct gossip_store *gs, const u8 *msg)
u16 msglen = tal_len ( msg ) ;
u16 msglen = tal_len ( msg ) ;
beint16_t belen = cpu_to_be16 ( msglen ) ;
beint16_t belen = cpu_to_be16 ( msglen ) ;
/* FIXME: this method of detecting replayed messages is best effort
if ( pwrite ( gs - > fd , & belen , sizeof ( belen ) , gs - > write_pos ) ! = 2 | |
* only . It should avoid doubling the store file size on every start ,
pwrite ( gs - > fd , msg , msglen , gs - > write_pos + 2 ) ! = msglen ) {
* but it ' ll allow the last few messages to be duplicated since replay
* and write are async , and we ' ll think we are done replaying a bit too
* early . */
/* Check if we are replaying the store */
if ( lseek ( gs - > read_fd , 0 , SEEK_CUR ) < gs - > replaysize )
return ;
return ;
} else
write_all ( gs - > write_fd , & belen , sizeof ( belen ) ) ;
gs - > write_pos + = 2 + msglen ;
write_all ( gs - > write_fd , msg , msglen ) ;
}
}
const u8 * gossip_store_read_next ( const tal_t * ctx , struct gossip_store * gs )
const u8 * gossip_store_read_next ( const tal_t * ctx , struct gossip_store * gs )
@ -60,22 +54,29 @@ const u8 *gossip_store_read_next(const tal_t *ctx, struct gossip_store *gs)
u8 * msg ;
u8 * msg ;
/* Did we already reach the end of the gossip_store? */
/* Did we already reach the end of the gossip_store? */
if ( gs - > read_fd = = - 1 )
if ( gs - > read_pos = = - 1 )
return NULL ;
return NULL ;
/* Can we read one message? */
/* Can we read one message? */
if ( ! read_all ( gs - > read_ fd, & belen , sizeof ( belen ) ) ) {
if ( pread ( gs - > fd , & belen , sizeof ( belen ) , gs - > read_pos ) ! = 2 ) {
gs - > read_fd = - 1 ;
gs - > read_pos = - 1 ;
return NULL ;
return NULL ;
}
}
msglen = be16_to_cpu ( belen ) ;
msglen = be16_to_cpu ( belen ) ;
msg = tal_arr ( ctx , u8 , msglen ) ;
msg = tal_arr ( ctx , u8 , msglen ) ;
if ( ! read_all ( gs - > read_fd , msg , msglen ) )
if ( ! pread ( gs - > fd , msg , msglen , gs - > read_pos + 2 ) ) {
status_failed (
status_trace ( " Short read from gossip-store, expected lenght %d " ,
STATUS_FAIL_INTERNAL_ERROR ,
msglen ) ;
" Short read from gossip-store, expected lenght %d " , msglen ) ;
/* Reset write_pos to truncate this message and disable future
* reads */
gs - > write_pos = gs - > read_pos ;
gs - > read_pos = - 1 ;
ftruncate ( gs - > fd , gs - > write_pos ) ;
} else
gs - > read_pos + = 2 + msglen ;
return msg ;
return msg ;
}
}