Browse Source

Test

etomic
jl777 8 years ago
parent
commit
a571558c2c
  1. 56
      iguana/exchanges/LP_network.c

56
iguana/exchanges/LP_network.c

@ -23,8 +23,8 @@
struct psock struct psock
{ {
uint32_t lasttime,lastping; uint32_t lasttime,lastping;
int32_t recvsock,sendsock,ispaired; int32_t publicsock,sendsock,ispaired;
uint16_t recvport,sendport; uint16_t publicport,sendport;
char sendaddr[128],publicaddr[128]; char sendaddr[128],publicaddr[128];
} *PSOCKS; } *PSOCKS;
@ -109,7 +109,7 @@ void LP_psockloop(void *_ptr)
if ( size > 0 ) if ( size > 0 )
{ {
if ( (sentbytes= LP_send(sendsock,buf,0)) > 0 ) if ( (sentbytes= LP_send(sendsock,buf,0)) > 0 )
printf("PSOCKS (%d %d %d) (%s) -> %d/%d bytes %s\n",ptr->recvsock,ptr->sendsock,sendsock,(char *)buf,size,sentbytes,ptr->sendaddr); printf("PSOCKS (%d %d %d) (%s) -> %d/%d bytes %s\n",ptr->publicsock,ptr->sendsock,sendsock,(char *)buf,size,sentbytes,ptr->sendaddr);
else printf("send error to %s\n",ptr->sendaddr); else printf("send error to %s\n",ptr->sendaddr);
if ( buf != 0 ) if ( buf != 0 )
{ {
@ -133,23 +133,23 @@ void LP_psockloop(void *_ptr)
ptr = &PSOCKS[i]; ptr = &PSOCKS[i];
if ( iter == 0 ) if ( iter == 0 )
{ {
pfds[n].fd = ptr->recvsock; pfds[n].fd = ptr->publicsock;
pfds[n].events = POLLIN; pfds[n].events = POLLIN;
} }
else else
{ {
if ( pfds[n].fd != ptr->recvsock ) if ( pfds[n].fd != ptr->publicsock )
{ {
printf("unexpected fd.%d mismatched recvsock.%d\n",pfds[n].fd,ptr->recvsock); printf("unexpected fd.%d mismatched publicsock.%d\n",pfds[n].fd,ptr->publicsock);
break; break;
} }
else if ( (pfds[n].revents & POLLIN) != 0 ) else if ( (pfds[n].revents & POLLIN) != 0 )
{ {
printf("%s has pollin\n",ptr->sendaddr); printf("publicsock.%d %s has pollin\n",ptr->publicsock,ptr->publicaddr);
if ( (size= nn_recv(ptr->recvsock,&buf,NN_MSG,0)) > 0 ) if ( (size= nn_recv(ptr->publicsock,&buf,NN_MSG,0)) > 0 )
{ {
sendsock = ptr->sendsock; sendsock = ptr->sendsock;
//printf("[%s]\n",(char *)buf); printf("[%s] -> sendsock.%d\n",(char *)buf,sendsock);
break; break;
} }
} }
@ -175,7 +175,7 @@ void LP_psockloop(void *_ptr)
ptr->lasttime = now; ptr->lasttime = now;
if ( ptr->ispaired != 0 ) if ( ptr->ispaired != 0 )
{ {
sendsock = ptr->recvsock; sendsock = ptr->publicsock;
break; break;
} }
else else
@ -196,7 +196,7 @@ void LP_psockloop(void *_ptr)
if ( retval != 0 ) if ( retval != 0 )
printf("nn_poll retval.%d\n",retval); printf("nn_poll retval.%d\n",retval);
break; break;
} else printf("num pfds.%d retval.%d\n",n,retval); } // else printf("num pfds.%d retval.%d\n",n,retval);
} }
} }
portable_mutex_unlock(&LP_psockmutex); portable_mutex_unlock(&LP_psockmutex);
@ -210,9 +210,9 @@ void LP_psockloop(void *_ptr)
ptr = &PSOCKS[i]; ptr = &PSOCKS[i];
if ( now > ptr->lasttime+PSOCK_KEEPALIVE*2 ) if ( now > ptr->lasttime+PSOCK_KEEPALIVE*2 )
{ {
printf("PSOCKS[%d] of %d (%u %u) lag.%d IDLETIMEOUT\n",i,Numpsocks,ptr->recvport,ptr->sendport,now - ptr->lasttime); printf("PSOCKS[%d] of %d (%u %u) lag.%d IDLETIMEOUT\n",i,Numpsocks,ptr->publicport,ptr->sendport,now - ptr->lasttime);
if ( ptr->recvsock >= 0 ) if ( ptr->publicsock >= 0 )
nn_close(ptr->recvsock); nn_close(ptr->publicsock);
if ( ptr->sendsock >= 0 ) if ( ptr->sendsock >= 0 )
nn_close(ptr->sendsock); nn_close(ptr->sendsock);
portable_mutex_lock(&LP_psockmutex); portable_mutex_lock(&LP_psockmutex);
@ -242,15 +242,15 @@ void LP_psockloop(void *_ptr)
} }
} }
void LP_psockadd(int32_t ispaired,int32_t recvsock,uint16_t recvport,int32_t sendsock,uint16_t sendport,char *subaddr,char *publicaddr) void LP_psockadd(int32_t ispaired,int32_t publicsock,uint16_t recvport,int32_t sendsock,uint16_t sendport,char *subaddr,char *publicaddr)
{ {
struct psock *ptr; struct psock *ptr;
portable_mutex_lock(&LP_psockmutex); portable_mutex_lock(&LP_psockmutex);
PSOCKS = realloc(PSOCKS,sizeof(*PSOCKS) * (Numpsocks + 1)); PSOCKS = realloc(PSOCKS,sizeof(*PSOCKS) * (Numpsocks + 1));
ptr = &PSOCKS[Numpsocks++]; ptr = &PSOCKS[Numpsocks++];
ptr->ispaired = ispaired; ptr->ispaired = ispaired;
ptr->recvsock = recvsock; ptr->publicsock = publicsock;
ptr->recvport = recvport; ptr->publicport = recvport;
ptr->sendsock = sendsock; ptr->sendsock = sendsock;
ptr->sendport = sendport; ptr->sendport = sendport;
safecopy(ptr->sendaddr,subaddr,sizeof(ptr->sendaddr)); safecopy(ptr->sendaddr,subaddr,sizeof(ptr->sendaddr));
@ -280,18 +280,18 @@ int32_t LP_psockmark(char *publicaddr)
char *LP_psock(char *myipaddr,int32_t ispaired) char *LP_psock(char *myipaddr,int32_t ispaired)
{ {
char pushaddr[128],subaddr[128]; uint16_t i,pushport,subport; int32_t timeout,maxsize,pullsock=-1,pubsock=-1; cJSON *retjson=0; char pushaddr[128],subaddr[128]; uint16_t i,publicport,subport; int32_t timeout,maxsize,pullsock=-1,pubsock=-1; cJSON *retjson=0;
retjson = cJSON_CreateObject(); retjson = cJSON_CreateObject();
pushport = Psockport++; publicport = Psockport++;
subport = Psockport++; subport = Psockport++;
for (i=0; i<100; i++,pushport+=2,subport+=2) for (i=0; i<100; i++,publicport+=2,subport+=2)
{ {
if ( pushport < 10000 ) if ( publicport < 10000 )
pushport = 10001; publicport = 10001;
if ( subport <= pushport ) if ( subport <= publicport )
subport = pushport + 1; subport = publicport + 1;
pullsock = pubsock = -1; pullsock = pubsock = -1;
nanomsg_transportname(1,pushaddr,myipaddr,pushport); nanomsg_transportname(1,pushaddr,myipaddr,publicport);
nanomsg_transportname(1,subaddr,myipaddr,subport); nanomsg_transportname(1,subaddr,myipaddr,subport);
if ( (pullsock= nn_socket(AF_SP,ispaired!=0?NN_PAIR:NN_PULL)) >= 0 && (pubsock= nn_socket(AF_SP,ispaired!=0?NN_PAIR:NN_PAIR)) >= 0 ) if ( (pullsock= nn_socket(AF_SP,ispaired!=0?NN_PAIR:NN_PULL)) >= 0 && (pubsock= nn_socket(AF_SP,ispaired!=0?NN_PAIR:NN_PAIR)) >= 0 )
{ {
@ -309,16 +309,16 @@ char *LP_psock(char *myipaddr,int32_t ispaired)
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize)); nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize));
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout));
} }
nanomsg_transportname(0,pushaddr,myipaddr,pushport); nanomsg_transportname(0,pushaddr,myipaddr,publicport);
nanomsg_transportname(0,subaddr,myipaddr,subport); nanomsg_transportname(0,subaddr,myipaddr,subport);
LP_psockadd(ispaired,pullsock,pushport,pubsock,subport,subaddr,pushaddr); LP_psockadd(ispaired,pullsock,publicport,pubsock,subport,subaddr,pushaddr);
jaddstr(retjson,"result","success"); jaddstr(retjson,"result","success");
jaddstr(retjson,"LPipaddr",myipaddr); jaddstr(retjson,"LPipaddr",myipaddr);
jaddstr(retjson,"connectaddr",subaddr); jaddstr(retjson,"connectaddr",subaddr);
jaddnum(retjson,"connectport",subport); jaddnum(retjson,"connectport",subport);
jaddnum(retjson,"ispaired",ispaired); jaddnum(retjson,"ispaired",ispaired);
jaddstr(retjson,"publicaddr",pushaddr); jaddstr(retjson,"publicaddr",pushaddr);
jaddnum(retjson,"publicport",pushport); jaddnum(retjson,"publicport",publicport);
printf("i.%d publicaddr.(%s) for subaddr.(%s), pullsock.%d pubsock.%d\n",i,pushaddr,subaddr,pullsock,pubsock); printf("i.%d publicaddr.(%s) for subaddr.(%s), pullsock.%d pubsock.%d\n",i,pushaddr,subaddr,pullsock,pubsock);
break; break;
} else printf("bind error on %s or %s\n",pushaddr,subaddr); } else printf("bind error on %s or %s\n",pushaddr,subaddr);

Loading…
Cancel
Save