|
|
@ -472,6 +472,25 @@ void LP_queuecommand(char **retstrp,char *buf,int32_t responsesock,int32_t stats |
|
|
|
portable_mutex_unlock(&LP_commandQmutex); |
|
|
|
} |
|
|
|
|
|
|
|
void mynn_close(int32_t sock) |
|
|
|
{ |
|
|
|
struct nn_pollfd pfd; int32_t n; void *buf; |
|
|
|
if ( sock >= 0 ) |
|
|
|
{ |
|
|
|
while ( (n= nn_recv(sock,&buf,NN_MSG,0)) > 0 ) |
|
|
|
printf("got n.%d bytes from nn_close(%d)\n",n,sock); |
|
|
|
pfd.fd = sock; |
|
|
|
pfd.events = POLLOUT; |
|
|
|
while ( nn_poll(&pfd,1,100) > 0 ) |
|
|
|
{ |
|
|
|
printf("cant send to nn_close(%d)\n",sock); |
|
|
|
sleep(1); |
|
|
|
} |
|
|
|
if ( IAMLP != 0 ) |
|
|
|
nn_close(sock); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
void LP_psockloop(void *_ptr) |
|
|
|
{ |
|
|
|
static struct nn_pollfd *pfds; |
|
|
@ -505,7 +524,7 @@ void LP_psockloop(void *_ptr) |
|
|
|
pfds = calloc(MAX_PSOCK_PORT,sizeof(*pfds)); |
|
|
|
portable_mutex_lock(&LP_psockmutex); |
|
|
|
memset(pfds,0,sizeof(*pfds) * ((Numpsocks*2 <= MAX_PSOCK_PORT) ? Numpsocks*2 : MAX_PSOCK_PORT)); |
|
|
|
for (iter=0; iter<2; iter++) |
|
|
|
for (iter=j=0; iter<2; iter++) |
|
|
|
{ |
|
|
|
for (j=n=0; j<Numpsocks; j++) |
|
|
|
{ |
|
|
@ -596,48 +615,41 @@ void LP_psockloop(void *_ptr) |
|
|
|
} // else printf("num pfds.%d retval.%d\n",n,retval);
|
|
|
|
} |
|
|
|
} |
|
|
|
if ( 0 && sendsock >= 0 ) |
|
|
|
printf("sendsock.%d Numpsocks.%d\n",sendsock,Numpsocks); |
|
|
|
if ( sendsock < 0 ) |
|
|
|
{ |
|
|
|
usleep(30000); |
|
|
|
for (i=nonz=0; i<Numpsocks; i++) |
|
|
|
{ |
|
|
|
if ( i < Numpsocks ) |
|
|
|
{ |
|
|
|
ptr = &PSOCKS[i]; |
|
|
|
if ( now > ptr->lasttime+PSOCK_KEEPALIVE ) |
|
|
|
if ( ptr->cmdchannel == 0 && now > ptr->lasttime+PSOCK_KEEPALIVE ) |
|
|
|
{ |
|
|
|
//printf("PSOCKS[%d] of %d (%u %u) lag.%d IDLETIMEOUT\n",i,Numpsocks,ptr->publicport,ptr->sendport,now - ptr->lasttime);
|
|
|
|
printf("PSOCKS[%d] of %d (%u %u) lag.%d IDLETIMEOUT\n",i,Numpsocks,ptr->publicport,ptr->sendport,now - ptr->lasttime); |
|
|
|
if ( ptr->sendsock != ptr->publicsock && ptr->sendsock >= 0 ) |
|
|
|
nn_close(ptr->sendsock); |
|
|
|
mynn_close(ptr->sendsock), ptr->sendsock = -1; |
|
|
|
if ( ptr->publicsock >= 0 ) |
|
|
|
nn_close(ptr->publicsock); |
|
|
|
//portable_mutex_lock(&LP_psockmutex);
|
|
|
|
if ( Numpsocks > 1 ) |
|
|
|
{ |
|
|
|
PSOCKS[i] = PSOCKS[--Numpsocks]; |
|
|
|
memset(&PSOCKS[Numpsocks],0,sizeof(*ptr)); |
|
|
|
} else Numpsocks = 0; |
|
|
|
//portable_mutex_unlock(&LP_psockmutex);
|
|
|
|
break; |
|
|
|
mynn_close(ptr->publicsock), ptr->publicsock = -1; |
|
|
|
nonz++; |
|
|
|
} |
|
|
|
else if ( now > ptr->lastping+PSOCK_KEEPALIVE/2 && ptr->errors < 3 ) |
|
|
|
} |
|
|
|
} |
|
|
|
if ( nonz > 0 ) |
|
|
|
{ |
|
|
|
n = Numpsocks; |
|
|
|
for (i=0; i<n; i++) |
|
|
|
{ |
|
|
|
ptr = &PSOCKS[i]; |
|
|
|
if ( ptr->sendsock < 0 && ptr->publicsock < 0 ) |
|
|
|
{ |
|
|
|
ptr->lastping = now; |
|
|
|
if ( 0 ) |
|
|
|
{ |
|
|
|
sendsock = ptr->sendsock; |
|
|
|
sprintf(keepalive,"{\"method\":\"keepalive\",\"endpoint\":\"%s\"}",ptr->sendaddr); |
|
|
|
size = (int32_t)strlen(keepalive) + 1; |
|
|
|
buf = keepalive; |
|
|
|
printf("send keepalive.(%s)\n",keepalive); |
|
|
|
} |
|
|
|
break; |
|
|
|
for (j=i; j<n-1; j++) |
|
|
|
PSOCKS[j] = PSOCKS[j+1]; |
|
|
|
n--; |
|
|
|
} |
|
|
|
} |
|
|
|
printf("PSOCKS purge nonz.%d n.%d vs Numpsocks.%d\n",nonz,n,Numpsocks); |
|
|
|
Numpsocks = n; |
|
|
|
} |
|
|
|
if ( nonz == 0 && i == Numpsocks ) |
|
|
|
usleep(100000); |
|
|
|
} |
|
|
|
portable_mutex_unlock(&LP_psockmutex); |
|
|
|
} else usleep(100000); |
|
|
@ -694,6 +706,7 @@ char *_LP_psock_create(int32_t *pullsockp,int32_t *pubsockp,char *ipaddr,uint16_ |
|
|
|
if ( pubp->pairsock >= 0 ) |
|
|
|
{ |
|
|
|
printf("%s already has pairsock.%d\n",bits256_str(str,pubkey),pubp->pairsock); |
|
|
|
portable_mutex_lock(&LP_psockmutex); |
|
|
|
for (i=0; i<Numpsocks; i++) |
|
|
|
if ( PSOCKS[i].publicsock == pubp->pairsock ) |
|
|
|
{ |
|
|
@ -710,9 +723,10 @@ char *_LP_psock_create(int32_t *pullsockp,int32_t *pubsockp,char *ipaddr,uint16_ |
|
|
|
//printf("cmd.%d publicaddr.(%s) for subaddr.(%s), pullsock.%d pubsock.%d\n",cmdchannel,pushaddr,subaddr,pullsock,pubsock);
|
|
|
|
*pullsockp = pullsock; |
|
|
|
*pubsockp = pubsock; |
|
|
|
portable_mutex_unlock(&LP_psockmutex); |
|
|
|
return(jprint(retjson,1)); |
|
|
|
} |
|
|
|
LP_psockadd(ispaired,pullsock,publicport,pubsock,subport,subaddr,pushaddr,cmdchannel); |
|
|
|
portable_mutex_unlock(&LP_psockmutex); |
|
|
|
} |
|
|
|
//printf("pairsock for %s <- %d\n",bits256_str(str,pubkey),pullsock);
|
|
|
|
//pubp->pairsock = pullsock;
|
|
|
|