|
|
@ -21,12 +21,12 @@ |
|
|
|
struct psock |
|
|
|
{ |
|
|
|
uint32_t lasttime,lastping,errors; |
|
|
|
int32_t publicsock,sendsock,ispaired; |
|
|
|
int32_t publicsock,sendsock,ispaired,cmdchannel; |
|
|
|
uint16_t publicport,sendport; |
|
|
|
char sendaddr[128],publicaddr[128]; |
|
|
|
} *PSOCKS; |
|
|
|
|
|
|
|
uint16_t Numpsocks,Psockport = MIN_PSOCK_PORT; |
|
|
|
uint16_t Numpsocks,Psockport = MIN_PSOCK_PORT,Pcmdport = MAX_PSOCK_PORT; |
|
|
|
|
|
|
|
#ifdef FROM_JS |
|
|
|
|
|
|
@ -299,16 +299,18 @@ void LP_queuesend(uint32_t crc32,int32_t pubsock,char *base,char *rel,uint8_t *m |
|
|
|
|
|
|
|
void LP_broadcast_finish(int32_t pubsock,char *base,char *rel,uint8_t *msg,cJSON *argjson,uint32_t crc32) |
|
|
|
{ |
|
|
|
int32_t msglen; |
|
|
|
int32_t msglen; char *method; |
|
|
|
if ( (method= jstr(argjson,"method")) == 0 ) |
|
|
|
return; |
|
|
|
msg = (void *)jprint(argjson,0); |
|
|
|
msglen = (int32_t)strlen((char *)msg) + 1; |
|
|
|
if ( crc32 == 0 ) |
|
|
|
crc32 = calc_crc32(0,&msg[2],msglen - 2); |
|
|
|
//printf("crc32.%x IAMLP.%d pubsock.%d\n",crc32,G.LP_IAMLP,pubsock);
|
|
|
|
#ifdef FROM_MARKETMAKER |
|
|
|
if ( G.LP_IAMLP == 0 || pubsock < 0 ) |
|
|
|
if ( (G.LP_IAMLP == 0 || pubsock < 0) && strcmp(method,"psock") != 0 ) |
|
|
|
#else |
|
|
|
if ( IAMLP == 0 || pubsock < 0 ) |
|
|
|
if ( (IAMLP == 0 || pubsock < 0 && strcmp(method,"psock") != 0 ) |
|
|
|
#endif |
|
|
|
{ |
|
|
|
free(msg); |
|
|
@ -412,10 +414,10 @@ uint32_t LP_swapsend(int32_t pairsock,struct basilisk_swap *swap,uint32_t msgbit |
|
|
|
return(nextbits); |
|
|
|
} |
|
|
|
|
|
|
|
void LP_psockloop(void *_ptr) // printouts seem to be needed for forwarding to work
|
|
|
|
void LP_psockloop(void *_ptr) |
|
|
|
{ |
|
|
|
static struct nn_pollfd *pfds; |
|
|
|
int32_t i,n,nonz,iter,retval,sentbytes,size=0,sendsock = -1; uint32_t now; struct psock *ptr=0; void *buf=0; char keepalive[512]; |
|
|
|
int32_t nexti=0,j,i,n,nonz,iter,retval,sentbytes,size=0,sendsock = -1; uint32_t now; struct psock *ptr=0; void *buf=0; char keepalive[512]; |
|
|
|
strcpy(LP_psockloop_stats.name,"LP_psockloop"); |
|
|
|
LP_psockloop_stats.threshold = 1000.; |
|
|
|
while ( LP_STOP_RECEIVED == 0 ) |
|
|
@ -426,7 +428,11 @@ void LP_psockloop(void *_ptr) // printouts seem to be needed for forwarding to w |
|
|
|
{ |
|
|
|
if ( size > 0 ) |
|
|
|
{ |
|
|
|
if ( (sentbytes= nn_send(sendsock,buf,size,0)) != size ) // need tight loop
|
|
|
|
if ( ptr->cmdchannel != 0 ) |
|
|
|
{ |
|
|
|
printf("got cmdchannel.(%s)\n",buf); |
|
|
|
} |
|
|
|
else if ( (sentbytes= nn_send(sendsock,buf,size,0)) != size ) // need tight loop
|
|
|
|
printf("LP_psockloop sent %d instead of %d\n",sentbytes,size); |
|
|
|
if ( buf != 0 ) |
|
|
|
{ |
|
|
@ -447,8 +453,9 @@ void LP_psockloop(void *_ptr) // printouts seem to be needed for forwarding to w |
|
|
|
memset(pfds,0,sizeof(*pfds) * ((Numpsocks*2 <= MAX_PSOCK_PORT) ? Numpsocks*2 : MAX_PSOCK_PORT)); |
|
|
|
for (iter=0; iter<2; iter++) |
|
|
|
{ |
|
|
|
for (i=n=0; i<Numpsocks; i++) |
|
|
|
for (j=n=0; j<Numpsocks; j++) |
|
|
|
{ |
|
|
|
i = (j + nexti) % Numpsocks; |
|
|
|
ptr = &PSOCKS[i]; |
|
|
|
if ( iter == 0 ) |
|
|
|
{ |
|
|
@ -460,6 +467,7 @@ void LP_psockloop(void *_ptr) // printouts seem to be needed for forwarding to w |
|
|
|
if ( pfds[n].fd != ptr->publicsock ) |
|
|
|
{ |
|
|
|
printf("unexpected fd.%d mismatched publicsock.%d\n",pfds[n].fd,ptr->publicsock); |
|
|
|
nexti = i+1; |
|
|
|
break; |
|
|
|
} |
|
|
|
else if ( (pfds[n].revents & POLLIN) != 0 ) |
|
|
@ -470,6 +478,7 @@ void LP_psockloop(void *_ptr) // printouts seem to be needed for forwarding to w |
|
|
|
{ |
|
|
|
ptr->lasttime = now; |
|
|
|
sendsock = ptr->sendsock; |
|
|
|
nexti = i+1; |
|
|
|
break; |
|
|
|
} |
|
|
|
else if ( buf != 0 ) |
|
|
@ -491,6 +500,7 @@ void LP_psockloop(void *_ptr) // printouts seem to be needed for forwarding to w |
|
|
|
if ( pfds[n].fd != ptr->sendsock ) |
|
|
|
{ |
|
|
|
printf("unexpected fd.%d mismatched sendsock.%d\n",pfds[n].fd,ptr->sendsock); |
|
|
|
nexti = i+1; |
|
|
|
break; |
|
|
|
} |
|
|
|
else if ( (pfds[n].revents & POLLIN) != 0 ) |
|
|
@ -502,6 +512,7 @@ void LP_psockloop(void *_ptr) // printouts seem to be needed for forwarding to w |
|
|
|
if ( ptr->ispaired != 0 ) |
|
|
|
{ |
|
|
|
sendsock = ptr->publicsock; |
|
|
|
nexti = i+1; |
|
|
|
break; |
|
|
|
} |
|
|
|
} |
|
|
@ -537,10 +548,10 @@ void LP_psockloop(void *_ptr) // printouts seem to be needed for forwarding to w |
|
|
|
if ( now > ptr->lasttime+PSOCK_KEEPALIVE ) |
|
|
|
{ |
|
|
|
printf("PSOCKS[%d] of %d (%u %u) lag.%d IDLETIMEOUT\n",i,Numpsocks,ptr->publicport,ptr->sendport,now - ptr->lasttime); |
|
|
|
if ( ptr->sendsock != ptr->publicsock && ptr->sendsock >= 0 ) |
|
|
|
nn_close(ptr->sendsock); |
|
|
|
if ( ptr->publicsock >= 0 ) |
|
|
|
nn_close(ptr->publicsock); |
|
|
|
if ( ptr->sendsock >= 0 ) |
|
|
|
nn_close(ptr->sendsock); |
|
|
|
//portable_mutex_lock(&LP_psockmutex);
|
|
|
|
if ( Numpsocks > 1 ) |
|
|
|
{ |
|
|
@ -573,13 +584,14 @@ void LP_psockloop(void *_ptr) // printouts seem to be needed for forwarding to w |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
void LP_psockadd(int32_t ispaired,int32_t publicsock,uint16_t recvport,int32_t sendsock,uint16_t sendport,char *subaddr,char *publicaddr) |
|
|
|
void LP_psockadd(int32_t ispaired,int32_t publicsock,uint16_t recvport,int32_t sendsock,uint16_t sendport,char *subaddr,char *publicaddr,int32_t cmdchannel) |
|
|
|
{ |
|
|
|
struct psock *ptr; |
|
|
|
portable_mutex_lock(&LP_psockmutex); |
|
|
|
PSOCKS = realloc(PSOCKS,sizeof(*PSOCKS) * (Numpsocks + 1)); |
|
|
|
ptr = &PSOCKS[Numpsocks++]; |
|
|
|
ptr->ispaired = ispaired; |
|
|
|
ptr->cmdchannel = cmdchannel; |
|
|
|
ptr->publicsock = publicsock; |
|
|
|
ptr->publicport = recvport; |
|
|
|
ptr->sendsock = sendsock; |
|
|
@ -609,65 +621,87 @@ int32_t LP_psockmark(char *publicaddr) |
|
|
|
return(retval); |
|
|
|
} |
|
|
|
|
|
|
|
char *LP_psock(char *myipaddr,int32_t ispaired) |
|
|
|
char *_LP_psock_create(int32_t *pullsockp,int32_t *pubsockp,char *ipaddr,uint16_t publicport,uint16_t subport,int32_t ispaired,int32_t cmdchannel) |
|
|
|
{ |
|
|
|
int32_t pullsock,pubsock,arg; char pushaddr[64],subaddr[64]; cJSON *retjson = 0; |
|
|
|
pullsock = pubsock = -1; |
|
|
|
*pullsockp = *pubsockp = -1; |
|
|
|
nanomsg_transportname(1,pushaddr,ipaddr,publicport); |
|
|
|
nanomsg_transportname(1,subaddr,ipaddr,subport); |
|
|
|
if ( (pullsock= nn_socket(AF_SP,ispaired!=0?NN_PAIR:NN_PULL)) >= 0 && (cmdchannel != 0 ||(pubsock= nn_socket(AF_SP,ispaired!=0?NN_PAIR:NN_PAIR)) >= 0) ) |
|
|
|
{ |
|
|
|
if ( nn_bind(pullsock,pushaddr) >= 0 && (cmdchannel != 0 || nn_bind(pubsock,subaddr) >= 0) ) |
|
|
|
{ |
|
|
|
arg = 100; |
|
|
|
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_SNDTIMEO,&arg,sizeof(arg)); |
|
|
|
if ( pubsock >= 0 ) |
|
|
|
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_SNDTIMEO,&arg,sizeof(arg)); |
|
|
|
arg = 1; |
|
|
|
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVTIMEO,&arg,sizeof(arg)); |
|
|
|
if ( pubsock >= 0 ) |
|
|
|
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_RCVTIMEO,&arg,sizeof(arg)); |
|
|
|
arg = 2; |
|
|
|
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_MAXTTL,&arg,sizeof(arg)); |
|
|
|
if ( pubsock >= 0 ) |
|
|
|
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_MAXTTL,&arg,sizeof(arg)); |
|
|
|
nanomsg_transportname(0,pushaddr,ipaddr,publicport); |
|
|
|
nanomsg_transportname(0,subaddr,ipaddr,subport); |
|
|
|
LP_psockadd(ispaired,pullsock,publicport,pubsock,subport,subaddr,pushaddr,cmdchannel); |
|
|
|
jaddstr(retjson,"result","success"); |
|
|
|
jaddstr(retjson,"LPipaddr",ipaddr); |
|
|
|
jaddstr(retjson,"connectaddr",subaddr); |
|
|
|
jaddnum(retjson,"connectport",subport); |
|
|
|
jaddnum(retjson,"ispaired",ispaired); |
|
|
|
jaddnum(retjson,"cmdchannel",cmdchannel); |
|
|
|
jaddstr(retjson,"publicaddr",pushaddr); |
|
|
|
jaddnum(retjson,"publicport",publicport); |
|
|
|
printf("cmd.%d publicaddr.(%s) for subaddr.(%s), pullsock.%d pubsock.%d\n",cmdchannel,pushaddr,subaddr,pullsock,pubsock); |
|
|
|
*pullsockp = pullsock; |
|
|
|
if ( cmdchannel == 0 ) |
|
|
|
*pubsockp = pubsock; |
|
|
|
else *pubsockp = pullsock; |
|
|
|
return(jprint(retjson,1)); |
|
|
|
} else printf("bind error on %s or %s\n",pushaddr,subaddr); |
|
|
|
if ( pullsock >= 0 ) |
|
|
|
nn_close(pullsock); |
|
|
|
if ( pubsock >= 0 ) |
|
|
|
nn_close(pubsock); |
|
|
|
} |
|
|
|
return(0); |
|
|
|
} |
|
|
|
|
|
|
|
char *LP_psock(int32_t *pubsockp,char *ipaddr,int32_t ispaired,int32_t cmdchannel) |
|
|
|
{ |
|
|
|
char pushaddr[128],subaddr[128]; uint16_t i,publicport,subport,maxiters=100; int32_t timeout,pullsock=-1,pubsock=-1; cJSON *retjson=0; |
|
|
|
retjson = cJSON_CreateObject(); |
|
|
|
publicport = Psockport++; |
|
|
|
subport = Psockport++; |
|
|
|
for (i=0; i<maxiters; i++,publicport+=2,subport+=2) |
|
|
|
char *retstr=0; uint16_t i,publicport,subport,maxport; int32_t pullsock=-1; |
|
|
|
*pubsockp = -1; |
|
|
|
if ( cmdchannel == 0 ) |
|
|
|
{ |
|
|
|
maxport = MAX_PSOCK_PORT; |
|
|
|
publicport = Psockport++; |
|
|
|
subport = Psockport++; |
|
|
|
} |
|
|
|
else |
|
|
|
{ |
|
|
|
maxport = 65534; |
|
|
|
publicport = subport = Pcmdport++; |
|
|
|
} |
|
|
|
for (i=0; i<maxport; i++) |
|
|
|
{ |
|
|
|
if ( publicport < MIN_PSOCK_PORT ) |
|
|
|
publicport = MIN_PSOCK_PORT+1; |
|
|
|
if ( subport <= publicport ) |
|
|
|
subport = publicport + 1; |
|
|
|
pullsock = pubsock = -1; |
|
|
|
nanomsg_transportname(1,pushaddr,myipaddr,publicport); |
|
|
|
nanomsg_transportname(1,subaddr,myipaddr,subport); |
|
|
|
if ( (pullsock= nn_socket(AF_SP,ispaired!=0?NN_PAIR:NN_PULL)) >= 0 && (pubsock= nn_socket(AF_SP,ispaired!=0?NN_PAIR:NN_PAIR)) >= 0 ) |
|
|
|
{ |
|
|
|
if ( nn_bind(pullsock,pushaddr) >= 0 && nn_bind(pubsock,subaddr) >= 0 ) |
|
|
|
{ |
|
|
|
timeout = 100; |
|
|
|
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); |
|
|
|
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); |
|
|
|
if ( ispaired != 0 ) |
|
|
|
{ |
|
|
|
//maxsize = 1024 * 1024;
|
|
|
|
//nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize));
|
|
|
|
} |
|
|
|
//if ( ispaired != 0 )
|
|
|
|
{ |
|
|
|
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); |
|
|
|
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); |
|
|
|
} |
|
|
|
timeout = 10; |
|
|
|
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_MAXTTL,&timeout,sizeof(timeout)); |
|
|
|
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_MAXTTL,&timeout,sizeof(timeout)); |
|
|
|
nanomsg_transportname(0,pushaddr,myipaddr,publicport); |
|
|
|
nanomsg_transportname(0,subaddr,myipaddr,subport); |
|
|
|
LP_psockadd(ispaired,pullsock,publicport,pubsock,subport,subaddr,pushaddr); |
|
|
|
jaddstr(retjson,"result","success"); |
|
|
|
jaddstr(retjson,"LPipaddr",myipaddr); |
|
|
|
jaddstr(retjson,"connectaddr",subaddr); |
|
|
|
jaddnum(retjson,"connectport",subport); |
|
|
|
jaddnum(retjson,"ispaired",ispaired); |
|
|
|
jaddstr(retjson,"publicaddr",pushaddr); |
|
|
|
jaddnum(retjson,"publicport",publicport); |
|
|
|
printf("i.%d publicaddr.(%s) for subaddr.(%s), pullsock.%d pubsock.%d\n",i,pushaddr,subaddr,pullsock,pubsock); |
|
|
|
break; |
|
|
|
} else printf("bind error on %s or %s\n",pushaddr,subaddr); |
|
|
|
if ( pullsock >= 0 ) |
|
|
|
nn_close(pullsock); |
|
|
|
if ( pubsock >= 0 ) |
|
|
|
nn_close(pubsock); |
|
|
|
} |
|
|
|
if ( (retstr= _LP_psock_create(&pullsock,pubsockp,ipaddr,publicport,subport,ispaired,cmdchannel)) != 0 ) |
|
|
|
return(retstr); |
|
|
|
if ( cmdchannel == 0 ) |
|
|
|
publicport+=2, subport+=2; |
|
|
|
else publicport++, subport++; |
|
|
|
} |
|
|
|
if ( Psockport > MAX_PSOCK_PORT ) |
|
|
|
if ( Psockport >= MAX_PSOCK_PORT ) |
|
|
|
Psockport = MIN_PSOCK_PORT; |
|
|
|
if ( i == maxiters ) |
|
|
|
jaddstr(retjson,"error","cant find psock ports"); |
|
|
|
return(jprint(retjson,1)); |
|
|
|
if ( Pcmdport >= 65534 ) |
|
|
|
Pcmdport = MAX_PSOCK_PORT; |
|
|
|
return(clonestr("{\"error\",\"cant find psock ports\"}")); |
|
|
|
} |
|
|
|
|
|
|
|
/*
|
|
|
@ -681,23 +715,26 @@ char *LP_psock(char *myipaddr,int32_t ispaired) |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
char *issue_LP_psock(char *destip,uint16_t destport,int32_t ispaired) |
|
|
|
char *issue_LP_psock(char *destip,uint16_t destport,int32_t ispaired,int32_t cmdchannel) |
|
|
|
{ |
|
|
|
char url[512],*retstr; |
|
|
|
sprintf(url,"http://%s:%u/api/stats/psock?ispaired=%d",destip,destport-1,ispaired); |
|
|
|
sprintf(url,"http://%s:%u/api/stats/psock?ispaired=%d&cmdchannel=%d",destip,destport-1,ispaired,cmdchannel); |
|
|
|
//return(LP_issue_curl("psock",destip,destport,url));
|
|
|
|
retstr = issue_curlt(url,LP_HTTP_TIMEOUT*3); |
|
|
|
printf("issue_LP_psock got (%s) from %s\n",retstr,destip); |
|
|
|
return(retstr); |
|
|
|
} |
|
|
|
|
|
|
|
uint16_t LP_psock_get(char *connectaddr,char *publicaddr,int32_t ispaired) |
|
|
|
uint16_t LP_psock_get(char *connectaddr,char *publicaddr,int32_t ispaired,int32_t cmdchannel,char *ipaddr) |
|
|
|
{ |
|
|
|
uint16_t publicport = 0; char *retstr,*addr; cJSON *retjson; struct LP_peerinfo *peer,*tmp; |
|
|
|
connectaddr[0] = publicaddr[0] = 0; |
|
|
|
HASH_ITER(hh,LP_peerinfos,peer,tmp) |
|
|
|
{ |
|
|
|
if ( ipaddr != 0 && strcmp(ipaddr,peer->ipaddr) != 0 ) |
|
|
|
continue; |
|
|
|
connectaddr[0] = publicaddr[0] = 0; |
|
|
|
if ( peer->errors < LP_MAXPEER_ERRORS && (retstr= issue_LP_psock(peer->ipaddr,peer->port,ispaired)) != 0 ) |
|
|
|
if ( peer->errors < LP_MAXPEER_ERRORS && (retstr= issue_LP_psock(peer->ipaddr,peer->port,ispaired,cmdchannel)) != 0 ) |
|
|
|
{ |
|
|
|
if ( (retjson= cJSON_Parse(retstr)) != 0 ) |
|
|
|
{ |
|
|
@ -706,8 +743,6 @@ uint16_t LP_psock_get(char *connectaddr,char *publicaddr,int32_t ispaired) |
|
|
|
safecopy(publicaddr,addr,128); |
|
|
|
if ( (addr= jstr(retjson,"connectaddr")) != 0 ) |
|
|
|
safecopy(connectaddr,addr,128); |
|
|
|
//if ( (addr= jstr(retjson,"connectaddr2")) != 0 )
|
|
|
|
// safecopy(connectaddr2,addr,128);
|
|
|
|
if ( publicaddr[0] != 0 && connectaddr[0] != 0 ) |
|
|
|
publicport = juint(retjson,"publicport"); |
|
|
|
free_json(retjson); |
|
|
@ -748,7 +783,7 @@ int32_t LP_initpublicaddr(void *ctx,uint16_t *mypullportp,char *publicaddr,char |
|
|
|
} |
|
|
|
while ( *mypullportp == 0 ) |
|
|
|
{ |
|
|
|
if ( (*mypullportp= LP_psock_get(connectaddr,publicaddr,ispaired)) != 0 ) |
|
|
|
if ( (*mypullportp= LP_psock_get(connectaddr,publicaddr,ispaired,0,0)) != 0 ) |
|
|
|
break; |
|
|
|
sleep(10); |
|
|
|
printf("try to get publicaddr again\n"); |
|
|
|