jl777 7 years ago
parent
commit
366cd80e67
  1. 24
      iguana/exchanges/LP_network.c

24
iguana/exchanges/LP_network.c

@ -522,6 +522,7 @@ void LP_psockloop(void *_ptr)
{
if ( pfds == 0 )
pfds = calloc(MAX_PSOCK_PORT,sizeof(*pfds));
nexti = (rand() % Numpsocks);
portable_mutex_lock(&LP_psockmutex);
memset(pfds,0,sizeof(*pfds) * ((Numpsocks*2 <= MAX_PSOCK_PORT) ? Numpsocks*2 : MAX_PSOCK_PORT));
for (iter=j=0; iter<2; iter++)
@ -541,7 +542,6 @@ void LP_psockloop(void *_ptr)
if ( pfds[n].fd != ptr->publicsock )
{
printf("unexpected fd.%d mismatched publicsock.%d\n",pfds[n].fd,ptr->publicsock);
nexti = i+1;
break;
}
else if ( (pfds[n].revents & POLLIN) != 0 )
@ -552,12 +552,12 @@ void LP_psockloop(void *_ptr)
{
ptr->lasttime = now;
if ( ptr->cmdchannel == 0 )
{
sendsock = ptr->sendsock;
else LP_queuecommand(0,(char *)buf,ptr->publicsock,0);
nexti = i+1;
break;
} else LP_queuecommand(0,(char *)buf,ptr->publicsock,0);
}
else if ( buf != 0 )
if ( buf != 0 )
{
nn_freemsg(buf);
buf = 0;
@ -578,7 +578,6 @@ void LP_psockloop(void *_ptr)
if ( pfds[n].fd != ptr->sendsock )
{
printf("unexpected fd.%d mismatched sendsock.%d\n",pfds[n].fd,ptr->sendsock);
nexti = i+1;
break;
}
else if ( (pfds[n].revents & POLLIN) != 0 )
@ -590,7 +589,6 @@ void LP_psockloop(void *_ptr)
if ( ptr->ispaired != 0 )
{
sendsock = ptr->publicsock;
nexti = i+1;
break;
}
}
@ -615,25 +613,22 @@ void LP_psockloop(void *_ptr)
} // else printf("num pfds.%d retval.%d\n",n,retval);
}
}
if ( sendsock < 0 )
if ( IAMLP != 0 && sendsock < 0 )
{
usleep(30000);
for (i=nonz=0; i<Numpsocks; i++)
{
if ( i < Numpsocks )
{
ptr = &PSOCKS[i];
if ( ptr->cmdchannel == 0 && now > ptr->lasttime+PSOCK_KEEPALIVE )
if ( ptr->cmdchannel == 0 && now > ptr->lasttime+PSOCK_KEEPALIVE/10 )
{
printf("PSOCKS[%d] of %d (%u %u) lag.%d IDLETIMEOUT\n",i,Numpsocks,ptr->publicport,ptr->sendport,now - ptr->lasttime);
if ( ptr->sendsock != ptr->publicsock && ptr->sendsock >= 0 )
mynn_close(ptr->sendsock), ptr->sendsock = -1;
nn_close(ptr->sendsock), ptr->sendsock = -1;
if ( ptr->publicsock >= 0 )
mynn_close(ptr->publicsock), ptr->publicsock = -1;
nn_close(ptr->publicsock), ptr->publicsock = -1;
nonz++;
}
}
}
if ( nonz > 0 )
{
n = Numpsocks;
@ -662,6 +657,7 @@ void LP_psockadd(int32_t ispaired,int32_t publicsock,uint16_t recvport,int32_t s
portable_mutex_lock(&LP_psockmutex);
PSOCKS = realloc(PSOCKS,sizeof(*PSOCKS) * (Numpsocks + 1));
ptr = &PSOCKS[Numpsocks++];
memset(ptr,0,sizeof(*ptr));
ptr->ispaired = ispaired;
ptr->cmdchannel = cmdchannel;
ptr->publicsock = publicsock;
@ -762,7 +758,7 @@ char *_LP_psock_create(int32_t *pullsockp,int32_t *pubsockp,char *ipaddr,uint16_
jaddnum(retjson,"cmdchannel",cmdchannel);
jaddstr(retjson,"publicaddr",pushaddr);
jaddnum(retjson,"publicport",publicport);
//printf("cmd.%d publicaddr.(%s) for subaddr.(%s), pullsock.%d pubsock.%d\n",cmdchannel,pushaddr,subaddr,pullsock,pubsock);
printf("PSOCK cmd.%d publicaddr.(%s) for subaddr.(%s), pullsock.%d pubsock.%d\n",cmdchannel,pushaddr,subaddr,pullsock,pubsock);
*pullsockp = pullsock;
*pubsockp = pubsock;
return(jprint(retjson,1));

Loading…
Cancel
Save