@ -19,17 +19,17 @@
# define GOSSIP_STORE_TEMP_FILENAME "gossip_store.tmp"
struct gossip_store {
/* This is -1 when we're loading */
int fd ;
u8 version ;
/* Offset of current EOF */
u64 len ;
/* Counters for entries in the gossip_store entries. This is used to
* decide whether we should rewrite the on - disk store or not */
size_t count ;
/* The broadcast struct we source messages from when rewriting the
* gossip_store */
struct broadcast_state * broadcast ;
/* Handle to the routing_state to retrieve additional information,
* should it be needed */
struct routing_state * rstate ;
@ -44,15 +44,14 @@ static void gossip_store_destroy(struct gossip_store *gs)
close ( gs - > fd ) ;
}
struct gossip_store * gossip_store_new ( struct routing_state * rstate ,
struct broadcast_state * bstate )
struct gossip_store * gossip_store_new ( struct routing_state * rstate )
{
struct gossip_store * gs = tal ( rstate , struct gossip_store ) ;
gs - > count = 0 ;
gs - > fd = open ( GOSSIP_STORE_FILENAME , O_RDWR | O_APPEND | O_CREAT , 0600 ) ;
gs - > broadcast = bstate ;
gs - > rstate = rstate ;
gs - > disable_compaction = false ;
gs - > len = sizeof ( gs - > version ) ;
tal_add_destructor ( gs , gossip_store_destroy ) ;
@ -114,10 +113,15 @@ static u8 *gossip_store_wrap_channel_announcement(const tal_t *ctx,
* Wrap the raw gossip message and write it to fd
*
* @ param fd File descriptor to write the wrapped message into
* @ param rstate Routing state if we need to look up channel capacity
* @ param gossip_msg The message to write
* @ param len The length to increase by amount written .
* @ return true if the message was wrapped and written
*/
static bool gossip_store_append ( int fd , struct routing_state * rstate , const u8 * gossip_msg )
static bool gossip_store_append ( int fd ,
struct routing_state * rstate ,
const u8 * gossip_msg ,
u64 * len )
{
int t = fromwire_peektype ( gossip_msg ) ;
u32 msglen ;
@ -144,6 +148,8 @@ static bool gossip_store_append(int fd, struct routing_state *rstate, const u8 *
belen = cpu_to_be32 ( msglen ) ;
checksum = cpu_to_be32 ( crc32c ( 0 , msg , msglen ) ) ;
* len + = sizeof ( belen ) + sizeof ( checksum ) + msglen ;
return ( write ( fd , & belen , sizeof ( belen ) ) = = sizeof ( belen ) & &
write ( fd , & checksum , sizeof ( checksum ) ) = = sizeof ( checksum ) & &
write ( fd , msg , msglen ) = = msglen ) ;
@ -157,7 +163,8 @@ static bool gossip_store_append(int fd, struct routing_state *rstate, const u8 *
*/
static bool add_local_unnannounced ( int fd ,
struct routing_state * rstate ,
struct node * self )
struct node * self ,
u64 * len )
{
struct chan_map_iter i ;
struct chan * c ;
@ -172,15 +179,18 @@ static bool add_local_unnannounced(int fd,
msg = towire_gossipd_local_add_channel ( tmpctx , & c - > scid ,
& peer - > id , c - > sat ) ;
if ( ! gossip_store_append ( fd , rstate , msg ) )
if ( ! gossip_store_append ( fd , rstate , msg , len ) )
return false ;
for ( size_t i = 0 ; i < 2 ; i + + ) {
u32 idx ;
msg = c - > half [ i ] . channel_update ;
if ( ! msg )
continue ;
if ( ! gossip_store_append ( fd , rstate , msg ) )
idx = * len ;
if ( ! gossip_store_append ( fd , rstate , msg , len ) )
return false ;
c - > half [ i ] . bcast . index = idx ;
}
}
@ -193,19 +203,27 @@ static bool add_local_unnannounced(int fd,
* Creates a new file , writes all the updates from the ` broadcast_state ` , and
* then atomically swaps the files .
*/
bool gossip_store_compact ( struct gossip_store * gs )
bool gossip_store_compact ( struct gossip_store * gs ,
struct broadcast_state * * bs )
{
size_t count = 0 ;
u32 index = 0 ;
int fd ;
const u8 * msg ;
struct node * self ;
u64 len = sizeof ( gs - > version ) ;
struct broadcastable * bcast ;
struct broadcast_state * oldb = * bs ;
struct broadcast_state * newb ;
if ( gs - > disable_compaction )
return false ;
assert ( gs - > broadcast ) ;
assert ( oldb ) ;
status_trace (
" Compacting gossip_store with %zu entries, %zu of which are stale " ,
gs - > count , gs - > count - gs - > broadcast - > count ) ;
gs - > count , gs - > count - oldb - > count ) ;
newb = new_broadcast_state ( gs - > rstate ) ;
fd = open ( GOSSIP_STORE_TEMP_FILENAME , O_RDWR | O_APPEND | O_CREAT , 0600 ) ;
if ( fd < 0 ) {
@ -220,8 +238,10 @@ bool gossip_store_compact(struct gossip_store *gs)
goto unlink_disable ;
}
while ( ( msg = next_broadcast ( gs - > broadcast , 0 , UINT32_MAX , & index ) ) ! = NULL ) {
if ( ! gossip_store_append ( fd , gs - > rstate , msg ) ) {
while ( ( msg = pop_first_broadcast ( oldb , & bcast ) ) ! = NULL ) {
bcast - > index = len ;
insert_broadcast_nostore ( newb , msg , bcast ) ;
if ( ! gossip_store_append ( fd , gs - > rstate , msg , & len ) ) {
status_broken ( " Failed writing to gossip store: %s " ,
strerror ( errno ) ) ;
goto unlink_disable ;
@ -231,7 +251,8 @@ bool gossip_store_compact(struct gossip_store *gs)
/* Local unannounced channels are not in the store! */
self = get_node ( gs - > rstate , & gs - > rstate - > local_id ) ;
if ( self & & ! add_local_unnannounced ( fd , gs - > rstate , self ) ) {
if ( self & & ! add_local_unnannounced ( fd , gs - > rstate , self ,
& len ) ) {
status_broken ( " Failed writing unannounced to gossip store: %s " ,
strerror ( errno ) ) ;
goto unlink_disable ;
@ -248,8 +269,12 @@ bool gossip_store_compact(struct gossip_store *gs)
" Compaction completed: dropped %zu messages, new count %zu " ,
gs - > count - count , count ) ;
gs - > count = count ;
gs - > len = len ;
close ( gs - > fd ) ;
gs - > fd = fd ;
tal_free ( oldb ) ;
* bs = newb ;
return true ;
unlink_disable :
@ -258,32 +283,49 @@ disable:
status_trace ( " Encountered an error while compacting, disabling "
" future compactions. " ) ;
gs - > disable_compaction = true ;
tal_free ( newb ) ;
return false ;
}
void gossip_store_add ( struct gossip_store * gs , const u8 * gossip_msg )
void gossip_store_maybe_compact ( struct gossip_store * gs ,
struct broadcast_state * * bs )
{
/* Only give error message once. */
/* Don't compact while loading! */
if ( gs - > fd = = - 1 )
return ;
if ( gs - > count < 1000 )
return ;
if ( gs - > count < ( * bs ) - > count * 1.25 )
return ;
if ( ! gossip_store_append ( gs - > fd , gs - > rstate , gossip_msg ) ) {
gossip_store_compact ( gs , bs ) ;
}
u64 gossip_store_add ( struct gossip_store * gs , const u8 * gossip_msg )
{
u64 off = gs - > len ;
/* Should never get here during loading! */
assert ( gs - > fd ! = - 1 ) ;
if ( ! gossip_store_append ( gs - > fd , gs - > rstate , gossip_msg , & gs - > len ) ) {
status_broken ( " Failed writing to gossip store: %s " ,
strerror ( errno ) ) ;
gs - > fd = - 1 ;
return 0 ;
}
gs - > count + + ;
if ( gs - > count > = 1000 & & gs - > count > gs - > broadcast - > count * 1.25 & &
! gs - > disable_compaction )
gossip_store_compact ( gs ) ;
return off ;
}
void gossip_store_add_channel_delete ( struct gossip_store * gs ,
const struct short_channel_id * scid )
{
u8 * msg = towire_gossip_store_channel_delete ( NULL , scid ) ;
gossip_store_append ( gs - > fd , gs - > rstate , msg ) ;
if ( ! gossip_store_append ( gs - > fd , gs - > rstate , msg , & gs - > len ) )
status_failed ( STATUS_FAIL_INTERNAL_ERROR ,
" Failed writing channel_delete to gossip store: %s " ,
strerror ( errno ) ) ;
tal_free ( msg ) ;
}
@ -294,15 +336,13 @@ void gossip_store_load(struct routing_state *rstate, struct gossip_store *gs)
u8 * msg , * gossip_msg ;
struct amount_sat satoshis ;
struct short_channel_id scid ;
/* We set/check version byte on creation */
off_t known_good = 1 ;
const char * bad ;
size_t stats [ ] = { 0 , 0 , 0 , 0 } ;
int fd = gs - > fd ;
gs - > fd = - 1 ;
struct timeabs start = time_now ( ) ;
if ( lseek ( fd , known_good , SEEK_SET ) < 0 ) {
if ( lseek ( fd , gs - > len , SEEK_SET ) < 0 ) {
status_unusual ( " gossip_store: lseek failure " ) ;
goto truncate_nomsg ;
}
@ -327,7 +367,8 @@ void gossip_store_load(struct routing_state *rstate, struct gossip_store *gs)
& satoshis ) ) {
if ( ! routing_add_channel_announcement ( rstate ,
take ( gossip_msg ) ,
satoshis ) ) {
satoshis ,
gs - > len ) ) {
bad = " Bad channel_announcement " ;
goto truncate ;
}
@ -335,7 +376,8 @@ void gossip_store_load(struct routing_state *rstate, struct gossip_store *gs)
} else if ( fromwire_gossip_store_channel_update ( msg , msg ,
& gossip_msg ) ) {
if ( ! routing_add_channel_update ( rstate ,
take ( gossip_msg ) ) ) {
take ( gossip_msg ) ,
gs - > len ) ) {
bad = " Bad channel_update " ;
goto truncate ;
}
@ -343,7 +385,8 @@ void gossip_store_load(struct routing_state *rstate, struct gossip_store *gs)
} else if ( fromwire_gossip_store_node_announcement ( msg , msg ,
& gossip_msg ) ) {
if ( ! routing_add_node_announcement ( rstate ,
take ( gossip_msg ) ) ) {
take ( gossip_msg ) ,
gs - > len ) ) {
bad = " Bad node_announcement " ;
goto truncate ;
}
@ -363,7 +406,7 @@ void gossip_store_load(struct routing_state *rstate, struct gossip_store *gs)
bad = " Unknown message " ;
goto truncate ;
}
known_good + = sizeof ( belen ) + sizeof ( becsum ) + msglen ;
gs - > len + = sizeof ( belen ) + sizeof ( becsum ) + msglen ;
gs - > count + + ;
tal_free ( msg ) ;
}
@ -385,13 +428,13 @@ out:
status_info ( " total store load time: % " PRIu64 " msec (%zu entries, %zu bytes) " ,
time_to_msec ( time_between ( time_now ( ) , start ) ) ,
stats [ 0 ] + stats [ 1 ] + stats [ 2 ] + stats [ 3 ] ,
( size_t ) known_good ) ;
( size_t ) gs - > len ) ;
# else
status_trace ( " total store load time: % " PRIu64 " msec " ,
time_to_msec ( time_between ( time_now ( ) , start ) ) ) ;
# endif
status_trace ( " gossip_store: Read %zu/%zu/%zu/%zu cannounce/cupdate/nannounce/cdelete from store in % " PRIu64 " bytes " ,
stats [ 0 ] , stats [ 1 ] , stats [ 2 ] , stats [ 3 ] ,
( u64 ) known_good ) ;
gs - > len ) ;
gs - > fd = fd ;
}