|
@ -19,6 +19,8 @@ |
|
|
//
|
|
|
//
|
|
|
|
|
|
|
|
|
#define PSOCK_KEEPALIVE 600 |
|
|
#define PSOCK_KEEPALIVE 600 |
|
|
|
|
|
#define MAX_PSOCK_PORT 60000 |
|
|
|
|
|
#define MIN_PSOCK_PORT 10000 |
|
|
|
|
|
|
|
|
struct psock |
|
|
struct psock |
|
|
{ |
|
|
{ |
|
@ -28,7 +30,7 @@ struct psock |
|
|
char sendaddr[128],publicaddr[128]; |
|
|
char sendaddr[128],publicaddr[128]; |
|
|
} *PSOCKS; |
|
|
} *PSOCKS; |
|
|
|
|
|
|
|
|
uint16_t Numpsocks,Psockport = 10000; |
|
|
uint16_t Numpsocks,Psockport = MIN_PSOCK_PORT; |
|
|
|
|
|
|
|
|
char *nanomsg_transportname(int32_t bindflag,char *str,char *ipaddr,uint16_t port) |
|
|
char *nanomsg_transportname(int32_t bindflag,char *str,char *ipaddr,uint16_t port) |
|
|
{ |
|
|
{ |
|
@ -47,7 +49,7 @@ int32_t LP_send(int32_t sock,char *msg,int32_t freeflag) |
|
|
return(-1); |
|
|
return(-1); |
|
|
} |
|
|
} |
|
|
len = (int32_t)strlen(msg) + 1; |
|
|
len = (int32_t)strlen(msg) + 1; |
|
|
for (i=0; i<1000; i++) |
|
|
for (i=0; i<1000; i++) // 1000 * (1 ms + 1000 us) = 2 seconds
|
|
|
{ |
|
|
{ |
|
|
pfd.fd = sock; |
|
|
pfd.fd = sock; |
|
|
pfd.events = NN_POLLOUT; |
|
|
pfd.events = NN_POLLOUT; |
|
@ -66,8 +68,6 @@ int32_t LP_send(int32_t sock,char *msg,int32_t freeflag) |
|
|
usleep(1000); |
|
|
usleep(1000); |
|
|
} |
|
|
} |
|
|
printf("error LP_send sock.%d, i.%d timeout.(%s) %s\n",sock,i,msg,nn_strerror(nn_errno())); |
|
|
printf("error LP_send sock.%d, i.%d timeout.(%s) %s\n",sock,i,msg,nn_strerror(nn_errno())); |
|
|
//if ( (sentbytes= nn_send(sock,msg,len,0)) != len )
|
|
|
|
|
|
// printf("LP_send sent %d instead of %d\n",sentbytes,len);
|
|
|
|
|
|
if ( freeflag != 0 ) |
|
|
if ( freeflag != 0 ) |
|
|
free(msg); |
|
|
free(msg); |
|
|
return(-1); |
|
|
return(-1); |
|
@ -132,9 +132,9 @@ void LP_psockloop(void *_ptr) |
|
|
else if ( Numpsocks > 0 ) |
|
|
else if ( Numpsocks > 0 ) |
|
|
{ |
|
|
{ |
|
|
if ( pfds == 0 ) |
|
|
if ( pfds == 0 ) |
|
|
pfds = calloc(60000,sizeof(*pfds)); |
|
|
pfds = calloc(MAX_PSOCK_PORT,sizeof(*pfds)); |
|
|
portable_mutex_lock(&LP_psockmutex); |
|
|
portable_mutex_lock(&LP_psockmutex); |
|
|
memset(pfds,0,sizeof(*pfds) * ((Numpsocks < 30000) ? Numpsocks*2 : 60000)); |
|
|
memset(pfds,0,sizeof(*pfds) * ((Numpsocks*2 <= MAX_PSOCK_PORT) ? Numpsocks*2 : MAX_PSOCK_PORT)); |
|
|
for (iter=0; iter<2; iter++) |
|
|
for (iter=0; iter<2; iter++) |
|
|
{ |
|
|
{ |
|
|
for (i=n=0; i<Numpsocks; i++) |
|
|
for (i=n=0; i<Numpsocks; i++) |
|
@ -296,14 +296,14 @@ int32_t LP_psockmark(char *publicaddr) |
|
|
|
|
|
|
|
|
char *LP_psock(char *myipaddr,int32_t ispaired) |
|
|
char *LP_psock(char *myipaddr,int32_t ispaired) |
|
|
{ |
|
|
{ |
|
|
char pushaddr[128],subaddr[128]; uint16_t i,publicport,subport; int32_t timeout,maxsize,pullsock=-1,pubsock=-1; cJSON *retjson=0; |
|
|
char pushaddr[128],subaddr[128]; uint16_t i,publicport,subport,maxiters=100; int32_t timeout,maxsize,pullsock=-1,pubsock=-1; cJSON *retjson=0; |
|
|
retjson = cJSON_CreateObject(); |
|
|
retjson = cJSON_CreateObject(); |
|
|
publicport = Psockport++; |
|
|
publicport = Psockport++; |
|
|
subport = Psockport++; |
|
|
subport = Psockport++; |
|
|
for (i=0; i<100; i++,publicport+=2,subport+=2) |
|
|
for (i=0; i<maxiters; i++,publicport+=2,subport+=2) |
|
|
{ |
|
|
{ |
|
|
if ( publicport < 10000 ) |
|
|
if ( publicport < MIN_PSOCK_PORT ) |
|
|
publicport = 10001; |
|
|
publicport = MIN_PSOCK_PORT+1; |
|
|
if ( subport <= publicport ) |
|
|
if ( subport <= publicport ) |
|
|
subport = publicport + 1; |
|
|
subport = publicport + 1; |
|
|
pullsock = pubsock = -1; |
|
|
pullsock = pubsock = -1; |
|
@ -317,12 +317,14 @@ char *LP_psock(char *myipaddr,int32_t ispaired) |
|
|
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); |
|
|
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); |
|
|
timeout = 1; |
|
|
timeout = 1; |
|
|
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); |
|
|
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); |
|
|
|
|
|
if ( ispaired != 0 ) |
|
|
|
|
|
{ |
|
|
maxsize = 1024 * 1024; |
|
|
maxsize = 1024 * 1024; |
|
|
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize)); |
|
|
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize)); |
|
|
|
|
|
} |
|
|
//if ( ispaired != 0 )
|
|
|
//if ( ispaired != 0 )
|
|
|
{ |
|
|
{ |
|
|
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); |
|
|
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); |
|
|
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize)); |
|
|
|
|
|
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); |
|
|
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); |
|
|
} |
|
|
} |
|
|
nanomsg_transportname(0,pushaddr,myipaddr,publicport); |
|
|
nanomsg_transportname(0,pushaddr,myipaddr,publicport); |
|
@ -344,9 +346,9 @@ char *LP_psock(char *myipaddr,int32_t ispaired) |
|
|
nn_close(pubsock); |
|
|
nn_close(pubsock); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
if ( Psockport > 60000 ) |
|
|
if ( Psockport > MAX_PSOCK_PORT ) |
|
|
Psockport = 10000; |
|
|
Psockport = MIN_PSOCK_PORT; |
|
|
if ( i == 100 ) |
|
|
if ( i == maxiters ) |
|
|
jaddstr(retjson,"error","cant find psock ports"); |
|
|
jaddstr(retjson,"error","cant find psock ports"); |
|
|
return(jprint(retjson,1)); |
|
|
return(jprint(retjson,1)); |
|
|
} |
|
|
} |
|
@ -419,6 +421,8 @@ int32_t LP_initpublicaddr(void *ctx,uint16_t *mypullportp,char *publicaddr,char |
|
|
printf("try to get publicaddr again\n"); |
|
|
printf("try to get publicaddr again\n"); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
while ( 1 ) |
|
|
|
|
|
{ |
|
|
if ( (pullsock= nn_socket(AF_SP,nntype)) >= 0 ) |
|
|
if ( (pullsock= nn_socket(AF_SP,nntype)) >= 0 ) |
|
|
{ |
|
|
{ |
|
|
if ( LP_canbind == 0 ) |
|
|
if ( LP_canbind == 0 ) |
|
@ -441,16 +445,17 @@ int32_t LP_initpublicaddr(void *ctx,uint16_t *mypullportp,char *publicaddr,char |
|
|
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); |
|
|
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); |
|
|
timeout = 1; |
|
|
timeout = 1; |
|
|
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); |
|
|
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); |
|
|
maxsize = 2 * 1024 * 1024; |
|
|
//maxsize = 2 * 1024 * 1024;
|
|
|
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize)); |
|
|
//nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize));
|
|
|
if ( nntype == NN_SUB ) |
|
|
if ( nntype == NN_SUB ) |
|
|
nn_setsockopt(pullsock,NN_SUB,NN_SUB_SUBSCRIBE,"",0); |
|
|
nn_setsockopt(pullsock,NN_SUB,NN_SUB_SUBSCRIBE,"",0); |
|
|
//LP_send(pullsock,"hello init",0);
|
|
|
|
|
|
} |
|
|
} |
|
|
if ( LP_canbind == 0 && ispaired == 0 && nn_tests(ctx,pullsock,publicaddr,NN_PUSH) < 0 ) |
|
|
if ( LP_canbind != 0 || ispaired != 0 || nn_tests(ctx,pullsock,publicaddr,NN_PUSH) >= 0 ) |
|
|
{ |
|
|
break; |
|
|
printf("command socket didnt work\n"); |
|
|
printf("nn_tests failed, try again\n"); |
|
|
exit(-1); |
|
|
sleep(3); |
|
|
|
|
|
if ( pullsock >= 0 ) |
|
|
|
|
|
nn_close(pullsock); |
|
|
} |
|
|
} |
|
|
return(pullsock); |
|
|
return(pullsock); |
|
|
} |
|
|
} |
|
|