You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

751 lines
27 KiB

/******************************************************************************
* Copyright © 2014-2015 The SuperNET Developers. *
* *
* See the AUTHORS, DEVELOPER-AGREEMENT and LICENSE files at *
* the top-level directory of this distribution for the individual copyright *
* holder information and the developer policies on copyright and licensing. *
* *
* Unless otherwise agreed in a custom licensing agreement, no part of the *
* SuperNET software, including this file may be copied, modified, propagated *
* or distributed except according to the terms contained in the LICENSE file *
* *
* Removal or modification of this copyright notice is prohibited. *
* *
******************************************************************************/
#define CHROMEAPP_NAME SuperNET
#define CHROMEAPP_STR "SuperNET"
#define CHROMEAPP_CONF "SuperNET.conf"
#define CHROMEAPP_MAIN SuperNET_main
#define CHROMEAPP_JSON SuperNET_JSON
#define CHROMEAPP_HANDLER Handler_SuperNET
#include "../pnacl_main.h"
//#include "SuperNET.h"
#ifndef MSG_NOSIGNAL
#define MSG_NOSIGNAL 0x4000 // Do not generate SIGPIPE
#endif
// ALL globals must be here!
int32_t nn_typelist[] = { NN_REP, NN_REQ, NN_RESPONDENT, NN_SURVEYOR, NN_PUB, NN_SUB, NN_PULL, NN_PUSH, NN_BUS, NN_PAIR };
char *nn_transports[] = { "tcp", "ws", "ipc", "inproc", "tcpmux", "", "", "" };
void expand_epbits(char *endpoint,struct endpoint epbits)
{
char ipaddr[64];
if ( epbits.ipbits != 0 )
expand_ipbits(ipaddr,epbits.ipbits);
else strcpy(ipaddr,"*");
sprintf(endpoint,"%s://%s:%d",nn_transports[epbits.transport],ipaddr,epbits.port);
}
struct endpoint calc_epbits(char *transport,uint32_t ipbits,uint16_t port,int32_t type)
{
int32_t i; struct endpoint epbits;
memset(&epbits,0,sizeof(epbits));
for (i=0; i<(int32_t)(sizeof(nn_transports)/sizeof(*nn_transports)); i++)
if ( strcmp(transport,nn_transports[i]) == 0 )
{
epbits.ipbits = ipbits;
epbits.port = port;
epbits.transport = i;
epbits.nn = type;
break;
}
return(epbits);
}
int32_t ismyaddress(struct supernet_info *myinfo,char *server)
{
uint32_t ipbits; int32_t i,tlen; char str[64];
for (i=0; i<sizeof(nn_transports)/sizeof(*nn_transports); i++)
{
if ( nn_transports[i] == 0 )
break;
sprintf(str,"%s://",nn_transports[i]);
tlen = (int32_t)strlen(str);
if ( strncmp(server,str,tlen) == 0 )
{
server += tlen;
break;
}
}
if ( (ipbits= is_ipaddr(server)) != 0 )
{
if ( strcmp(server,myinfo->ipaddr) == 0 || myinfo->ipbits == ipbits )
{
printf("(%s) MATCHES me (%s)\n",server,myinfo->ipaddr);
return(1);
}
}
else if ( myinfo->my64bits == ipbits )
return(1);
//printf("(%s) is not me (%s)\n",server,myipaddr);
return(0);
}
char *nn_typestr(int32_t type)
{
switch ( type )
{
// Messages that need a response from the set of peers: SURVEY
case NN_SURVEYOR: return("NN_SURVEYOR"); break;
case NN_RESPONDENT: return("NN_RESPONDENT"); break;
// Messages that need a response, but only from one peer: REQ/REP
case NN_REQ: return("NN_REQ"); break;
case NN_REP: return("NN_REP"); break;
// One-way messages to one peer: PUSH/PULL
case NN_PUSH: return("NN_PUSH"); break;
case NN_PULL: return("NN_PULL"); break;
// One-way messages to all: PUB/SUB
case NN_PUB: return("NN_PUB"); break;
case NN_SUB: return("NN_SUB"); break;
case NN_BUS: return("NN_BUS"); break;
case NN_PAIR: return("NN_PAIR"); break;
}
return("NN_ERROR");
}
int32_t nn_oppotype(int32_t type)
{
switch ( type )
{
// Messages that need a response from the set of peers: SURVEY
case NN_SURVEYOR: return(NN_RESPONDENT); break;
case NN_RESPONDENT: return(NN_SURVEYOR); break;
// Messages that need a response, but only from one peer: REQ/REP
case NN_REQ: return(NN_REP); break;
case NN_REP: return(NN_REQ); break;
// One-way messages to one peer: PUSH/PULL
case NN_PUSH: return(NN_PULL); break;
case NN_PULL: return(NN_PUSH); break;
// One-way messages to all: PUB/SUB
case NN_PUB: return(NN_SUB); break;
case NN_SUB: return(NN_PUB); break;
case NN_BUS: return(NN_BUS); break;
case NN_PAIR: return(NN_PAIR); break;
}
return(-1);
}
int32_t nn_portoffset(int32_t type)
{
int32_t i;
for (i=0; i<(int32_t)(sizeof(nn_typelist)/sizeof(*nn_typelist)); i++)
if ( nn_typelist[i] == type )
return(i + 2);
return(-1);
}
int32_t nn_socket_status(int32_t nnsock,int32_t timeoutmillis)
{
struct nn_pollfd pfd;
int32_t rc;
pfd.fd = nnsock;
pfd.events = NN_POLLIN | NN_POLLOUT;
if ( (rc= nn_poll(&pfd,1,timeoutmillis)) == 0 )
return(pfd.revents);
else return(-1);
}
int32_t SuperNET_msglen(struct supernet_msghdr *msg)
{
return(msg->serlen[0] + ((int32_t)msg->serlen[1] << 8) + ((int32_t)msg->serlen[2] << 16));
}
int32_t SuperNET_msgvalidate(struct supernet_msghdr *msg)
{
int32_t msglen = 0;
msglen = SuperNET_msglen(msg);
return(msglen);
}
int32_t nn_settimeouts(int32_t sock,int32_t sendtimeout,int32_t recvtimeout)
{
int32_t retrymillis,maxmillis;
if ( (maxmillis= SUPERNET_NETWORKTIMEOUT) == 0 )
maxmillis = 3000;
retrymillis = maxmillis/40;
if ( nn_setsockopt(sock,NN_SOL_SOCKET,NN_RECONNECT_IVL,&retrymillis,sizeof(retrymillis)) < 0 )
fprintf(stderr,"error setting NN_REQ NN_RECONNECT_IVL_MAX socket %s\n",nn_errstr());
else if ( nn_setsockopt(sock,NN_SOL_SOCKET,NN_RECONNECT_IVL_MAX,&maxmillis,sizeof(maxmillis)) < 0 )
fprintf(stderr,"error setting NN_REQ NN_RECONNECT_IVL_MAX socket %s\n",nn_errstr());
else if ( sendtimeout > 0 && nn_setsockopt(sock,NN_SOL_SOCKET,NN_SNDTIMEO,&sendtimeout,sizeof(sendtimeout)) < 0 )
fprintf(stderr,"error setting sendtimeout %s\n",nn_errstr());
else if ( recvtimeout > 0 && nn_setsockopt(sock,NN_SOL_SOCKET,NN_RCVTIMEO,&recvtimeout,sizeof(recvtimeout)) < 0 )
fprintf(stderr,"error setting sendtimeout %s\n",nn_errstr());
else return(0);
return(-1);
}
int32_t nn_createsocket(struct supernet_info *myinfo,char *endpoint,int32_t bindflag,char *name,int32_t type,uint16_t port,int32_t sendtimeout,int32_t recvtimeout)
{
int32_t sock;
if ( (sock= nn_socket(AF_SP,type)) < 0 )
fprintf(stderr,"error getting socket %s\n",nn_errstr());
if ( bindflag != 0 )
{
if ( endpoint[0] == 0 )
expand_epbits(endpoint,calc_epbits(myinfo->transport,0,port,type));
if ( nn_bind(sock,endpoint) < 0 )
fprintf(stderr,"error binding to relaypoint sock.%d type.%d to (%s) (%s) %s\n",sock,type,name,endpoint,nn_errstr());
else fprintf(stderr,"BIND.(%s) <- %s\n",endpoint,name);
}
else if ( bindflag == 0 && endpoint != 0 && endpoint[0] != 0 )
{
if ( nn_connect(sock,endpoint) < 0 )
fprintf(stderr,"error connecting to relaypoint sock.%d type.%d to (%s) (%s) %s\n",sock,type,name,endpoint,nn_errstr());
else fprintf(stderr,"%s -> CONNECT.(%s)\n",name,endpoint);
}
if ( nn_settimeouts(sock,sendtimeout,recvtimeout) < 0 )
{
fprintf(stderr,"nn_createsocket.(%s) %d\n",name,sock);
return(-1);
}
return(sock);
}
bits256 SuperNET_OPRETURN(struct supernet_info *myinfo,char *symbol,double fee,uint8_t *buf,int32_t len)
{
bits256 txid;
printf("send OPRETURN\n");
return(txid);
}
bits256 SuperNET_agentannounce(struct supernet_info *myinfo,struct supernet_agent *agent,cJSON *network)
{
static const bits256 zero;
uint8_t buf[80 + sizeof(struct iguana_msghdr)],*data;
bits256 pubkey,sig; int32_t i,len=0; uint8_t netmagic[4]; char *sigstr,*announce,*pubkeystr;
memset(buf,0,sizeof(buf));
data = &buf[sizeof(struct iguana_msghdr)];
if ( (announce= jstr(network,"announce")) != 0 )
{
data[len++] = SCRIPT_OPRETURN;
data[len++] = 75;
iguana_rwnum(1,&data[len],sizeof(myinfo->ipbits),&myinfo->ipbits);
for (i=0; i<7; i++)
if ( (data[len+i]= announce[i]) == 0 )
break;
len = 13;
if ( (pubkeystr= jstr(network,"pubkey")) == 0 || strlen(pubkeystr) != sizeof(bits256)*2 )
pubkeystr = GENESIS_PUBKEYSTR;
decode_hex(pubkey.bytes,sizeof(pubkey),pubkeystr);
len += iguana_rwbignum(1,&data[len],sizeof(pubkey),pubkey.bytes); // 45 bytes
if ( (sigstr= jstr(network,"sig")) != 0 && strlen(sigstr) == sizeof(bits256)*2 )
{
sigstr = GENESIS_PUBKEYSTR;
len += iguana_rwbignum(1,&data[len],sizeof(sig),sig.bytes); // 77 bytes
}
decode_hex(netmagic,4,"e4c2d8e6");
iguana_sethdr((struct iguana_msghdr *)buf,netmagic,"SuperNET",data,len);
return(SuperNET_OPRETURN(myinfo,"BTCD",.001,buf,len));
}
printf("invalid SuperNET OPRETURN protocol.(%s)\n",announce!=0?announce:"");
return(zero);
}
void Supernet_networkadd(struct supernet_info *myinfo,struct supernet_agent *agent,cJSON *network)
{
int32_t sendtimeout=0,recvtimeout=0;
agent->pubpoint[0] = agent->reppoint[0] = 0;
if ( (agent->pubport= juint(network,"pubport")) > 1000 )
{
agent->pubsock = nn_createsocket(myinfo,agent->pubpoint,1,"NN_PUB",NN_PUB,agent->pubport,sendtimeout,recvtimeout);
SuperNET_agentannounce(myinfo,agent,network);
}
else agent->pubport = -1;
if ( (agent->repport= juint(network,"repport")) > 1000 )
agent->repsock = nn_createsocket(myinfo,agent->reppoint,1,"NN_REP",NN_REP,agent->repport,sendtimeout,recvtimeout);
else agent->repport = -1;
agent->subsock = nn_createsocket(myinfo,0,0,"NN_SUB",NN_SUB,0,sendtimeout,recvtimeout);
nn_setsockopt(agent->subsock,NN_SUB,NN_SUB_SUBSCRIBE,"",0);
agent->reqsock = nn_createsocket(myinfo,0,0,"NN_REQ",NN_REQ,0,sendtimeout,recvtimeout);
}
int32_t SuperNET_agentcommand(struct supernet_info *myinfo,struct supernet_agent *agent,struct supernet_msghdr *H,uint8_t *buf,int32_t buflen)
{
char *name; cJSON *json; int32_t i;
if ( strcmp(H->command,"register") == 0 )
{
if ( (json= cJSON_Parse((char *)buf)) != 0 )
{
if ( (name= jstr(json,"name")) != 0 )
{
memset(agent->name,0,sizeof(agent->name));
strncpy(agent->name,name,sizeof(agent->name)-1);
if ( (agent->networks= jarray(&agent->num,json,"networks")) != 0 )
{
for (i=0; i<agent->num; i++)
Supernet_networkadd(myinfo,agent,jitem(agent->networks,i));
}
} else free_json(json);
}
}
return(0);
}
int32_t SuperNET_socket(int32_t bindflag,char *hostname,uint16_t port)
{
int32_t opt,sock,result; uint32_t ipbits; char ipaddr[64]; struct timeval timeout;
struct sockaddr_in saddr; socklen_t addrlen;
addrlen = sizeof(saddr);
struct hostent *hostent = gethostbyname(hostname);
if ( hostent == NULL )
{
printf("gethostbyname() returned error: %d",errno);
return(-1);
}
saddr.sin_family = AF_INET;
saddr.sin_port = htons(port);
memcpy(&saddr.sin_addr.s_addr,hostent->h_addr_list[0],hostent->h_length);
ipbits = (uint32_t)calc_ipbits(hostname);
//printf("ipbits.%08x vs %08x\n",ipbits,saddr.sin_addr.s_addr);
expand_ipbits(ipaddr,saddr.sin_addr.s_addr);
//if ( bindflag != 0 )
// printf("iguana_socket.(%s:%d) bind.%d\n",ipaddr,port,bindflag), getchar();
if ( strcmp(ipaddr,hostname) != 0 )
printf("iguana_socket mismatch (%s) -> (%s)?\n",hostname,ipaddr);
if ( (sock= socket(AF_INET,SOCK_STREAM,0)) < 0 )
{
if ( errno != ETIMEDOUT )
printf("socket() failed: %s errno.%d", strerror(errno),errno);
return(-1);
}
if ( 0 && bindflag != 0 )
{
timeout.tv_sec = 0;
timeout.tv_usec = 100000;
setsockopt(sock,SOL_SOCKET,SO_RCVTIMEO,(char *)&timeout,sizeof(timeout));
}
opt = 1;
setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,(void*)&opt,sizeof(opt));
#ifdef __APPLE__
setsockopt(sock,SOL_SOCKET,SO_NOSIGPIPE,&opt,sizeof(opt));
#endif
result = (bindflag != 0) ? bind(sock,(struct sockaddr*)&saddr,addrlen) : connect(sock,(struct sockaddr *)&saddr,addrlen);
if ( result != 0 )
{
if ( errno != ECONNRESET && errno != ENOTCONN && errno != ECONNREFUSED && errno != ETIMEDOUT && errno != EHOSTUNREACH )
printf("connect(%s) port.%d failed: %s sock.%d. errno.%d\n",hostname,port,strerror(errno),sock,errno);
if ( sock >= 0 )
close(sock);
return(-1);
}
if ( bindflag != 0 && listen(sock,3) != 0 )
{
printf("listen(%s) port.%d failed: %s sock.%d. errno.%d\n",hostname,port,strerror(errno),sock,errno);
if ( sock >= 0 )
close(sock);
return(-1);
}
return(sock);
}
int32_t SuperNET_recv(int32_t sock,uint8_t *recvbuf,int32_t len)
{
int32_t recvlen,remains = len;
while ( remains > 0 )
{
if ( (recvlen= (int32_t)recv(sock,recvbuf,remains,0)) < 0 )
{
if ( errno == EAGAIN )
{
//printf("EAGAIN for len %d, remains.%d\n",len,remains);
usleep(10000);
}
else return(-errno);
}
else
{
if ( recvlen > 0 )
{
remains -= recvlen;
recvbuf = &recvbuf[recvlen];
} else usleep(10000);
}
}
return(len);
}
int32_t SuperNET_send(struct supernet_info *myinfo,struct supernet_agent *agent,uint8_t *serialized,int32_t len)
{
int32_t numsent,remains,sock;
if ( agent == 0 )
return(-1);
if ( (sock= agent->sock) < 0 || agent->dead != 0 )
{
return(-1);
}
remains = len;
while ( remains > 0 )
{
if ( (numsent= (int32_t)send(sock,serialized,remains,MSG_NOSIGNAL)) < 0 )
{
printf("send errno.%d %s\n",errno,strerror(errno));
if ( errno != EAGAIN && errno != EWOULDBLOCK )
{
printf("bad errno.%d %s zombify.%p\n",errno,strerror(errno),agent->name);
agent->dead = (uint32_t)time(NULL);
return(-errno);
} //else usleep(*sleeptimep), *sleeptimep *= 1.1;
}
else if ( remains > 0 )
{
remains -= numsent;
serialized += numsent;
if ( remains > 0 )
printf("SuperNET sent.%d remains.%d of len.%d\n",numsent,remains,len);
}
}
agent->totalsent += len;
//printf(" sent.%d bytes to %s\n",len,addr->ipaddr);// getchar();
return(len);
}
char *SuperNET_JSON(struct supernet_info *myinfo,cJSON *argjson,char *remoteaddr)
{
char *agent,*method;
if ( (agent= jstr(argjson,"agent")) == 0 || (method= jstr(argjson,"method")) == 0 )
return(clonestr("{\"error\":\"need both agent and method\"}"));
}
void SuperNET_rpcloop(void *args)
{
struct supernet_info *myinfo = args;
int32_t recvlen,bindsock,postflag,sock,remains,numsent,len; socklen_t clilen;
char remoteaddr[64],jsonbuf[8192],*buf,*retstr,*space;//,*retbuf; ,n,i,m
struct sockaddr_in cli_addr; uint32_t ipbits,i; uint16_t port;
int32_t size = 1024 * 1024 * 2;
port = SUPERNET_PORT;
bindsock = SuperNET_socket(1,"127.0.0.1",port);
printf("SuperNET_rpcloop 127.0.0.1:%d bind sock.%d\n",port,bindsock);
space = calloc(1,size);
while ( bindsock >= 0 )
{
clilen = sizeof(cli_addr);
//printf("ACCEPT (%s:%d) on sock.%d\n","127.0.0.1",port,bindsock);
sock = accept(bindsock,(struct sockaddr *)&cli_addr,&clilen);
if ( sock < 0 )
{
//printf("iguana_rpcloop ERROR on accept usock.%d\n",sock);
continue;
}
memcpy(&ipbits,&cli_addr.sin_addr.s_addr,sizeof(ipbits));
expand_ipbits(remoteaddr,ipbits);
memset(jsonbuf,0,sizeof(jsonbuf));
remains = (int32_t)(sizeof(jsonbuf) - 1);
buf = jsonbuf;
recvlen = 0;
retstr = 0;
while ( remains > 0 )
{
if ( (len= (int32_t)recv(sock,buf,remains,0)) < 0 )
{
if ( errno == EAGAIN )
{
printf("EAGAIN for len %d, remains.%d\n",len,remains);
usleep(10000);
}
break;
}
else
{
if ( len > 0 )
{
remains -= len;
recvlen += len;
buf = &buf[len];
retstr = SuperNET_rpcparse(myinfo,space,size,&postflag,jsonbuf,remoteaddr);
break;
} else usleep(10000);
}
}
if ( retstr != 0 )
{
i = 0;
if ( postflag == 0 )
{
//retstr = SuperNET_htmlresponse(space,size,&remains,1,retstr,1);
}
else remains = (int32_t)strlen(retstr);
printf("RETBUF.(%s)\n",retstr);
while ( remains > 0 )
{
if ( (numsent= (int32_t)send(sock,&retstr[i],remains,MSG_NOSIGNAL)) < 0 )
{
if ( errno != EAGAIN && errno != EWOULDBLOCK )
{
//printf("%s: %s numsent.%d vs remains.%d len.%d errno.%d (%s) usock.%d\n",retstr,ipaddr,numsent,remains,recvlen,errno,strerror(errno),sock);
break;
}
}
else if ( remains > 0 )
{
remains -= numsent;
i += numsent;
if ( remains > 0 )
printf("iguana sent.%d remains.%d of len.%d\n",numsent,remains,recvlen);
}
}
if ( retstr != space )
free(retstr);
}
//printf("done response sock.%d\n",sock);
closesocket(sock);
}
}
int32_t SuperNET_msgrecv(struct supernet_info *myinfo,struct supernet_agent *agent,uint8_t *_buf,int32_t maxlen)
{
int32_t len,recvlen; void *buf = _buf; struct supernet_msghdr H;
printf("got.(%s) from %s | sock.%d\n",H.command,agent->ipaddr,agent->sock);
memset(&H,0,sizeof(H));
if ( (recvlen= (int32_t)SuperNET_recv(agent->sock,(uint8_t *)&H,sizeof(H))) == sizeof(H) )
{
agent->totalrecv += recvlen;
if ( (len= SuperNET_msgvalidate(&H)) >= 0 )
{
recvlen = 0;
if ( len > 0 )
{
if ( len > maxlen )
buf = calloc(1,len);
if ( (recvlen= SuperNET_recv(agent->sock,buf,len)) < 0 )
{
printf("recv error on (%s) len.%d errno.%d (%s)\n",H.command,len,-recvlen,strerror(-recvlen));
if ( buf != _buf )
free(buf);
agent->dead = (uint32_t)time(NULL);
return(recvlen);
} else agent->totalrecv += recvlen;
}
printf("PROCESS.%c NNRECV(%s) recvlen.%d\n",H.type,H.command,recvlen);
if ( H.type == 'C' )
SuperNET_agentcommand(myinfo,agent,&H,buf,recvlen);
else if ( agent->recvfunc != 0 )
(*agent->recvfunc)(myinfo,agent,&H,buf,recvlen);
if ( buf != _buf )
free(buf);
return(recvlen);
}
printf("invalid header received from (%s)\n",agent->ipaddr);
}
printf("%s recv error on hdr errno.%d (%s)\n",agent->ipaddr,-recvlen,strerror(-recvlen));
return(-1);
}
int32_t SuperNET_msgsend(struct supernet_info *myinfo,struct supernet_agent *agent,struct supernet_msghdr *msg)
{
return(SuperNET_send(myinfo,agent,(uint8_t *)msg,SuperNET_msglen(msg) + sizeof(*msg)));
}
int32_t SuperNET_nnsend(struct supernet_info *myinfo,struct supernet_endpoint *ptr,int32_t ind,struct supernet_msghdr *msg)
{
return(nn_send(ptr->eps[ind].nnsock,(uint8_t *)msg,SuperNET_msglen(msg) + sizeof(*msg),0));
}
struct supernet_msghdr *SuperNET_msgpending(struct supernet_info *myinfo,struct supernet_agent *agent)
{
return(queue_dequeue(&agent->recvQ,0));
}
struct supernet_msghdr *SuperNET_nnpending(struct supernet_info *myinfo,struct supernet_endpoint *ptr,int32_t ind)
{
return(queue_dequeue(&ptr->eps[ind].nnrecvQ,0));
}
int32_t SuperNET_nnrecv(struct supernet_info *myinfo,struct supernet_endpoint *ptr,int32_t ind)
{
void *msg; int32_t nnlen;
if ( (nnlen= nn_recv(ptr->eps[ind].nnsock,&msg,NN_MSG,0)) > 0 )
{
printf("PROCESS NNRECV(%s)\n",msg);
if ( ptr->nnrecvfunc != 0 )
(*ptr->nnrecvfunc)(myinfo,ptr,ind,msg,nnlen);
nn_freemsg(msg);
}
return(nnlen);
}
int32_t Supernet_poll(struct supernet_info *myinfo,uint8_t *buf,int32_t bufsize,struct supernet_agent *agents,int32_t num,int32_t timeout)
{
struct pollfd fds[SUPERNET_MAXAGENTS]; int32_t i,nonz,flag; struct supernet_msghdr *msg; struct supernet_agent *agent;
if ( num == 0 )
return(0);;
memset(fds,0,sizeof(fds));
flag = 0;
for (i=nonz=0; i<num; i++)
{
agent = &agents[i];
fds[i].fd = -1;
if ( agent->sock >= 0 )
{
fds[i].fd = agent->sock;
fds[i].events = (POLLIN | POLLOUT);
nonz++;
}
}
if ( nonz != 0 && poll(fds,num,timeout) > 0 )
{
for (i=0; i<num; i++)
{
agent = &agents[i];
if ( agent->sock < 0 )
continue;
if ( (fds[i].revents & POLLIN) != 0 && SuperNET_msgrecv(myinfo,agent,buf,bufsize) >= 0 )
flag++;
if ( (fds[i].revents & POLLOUT) != 0 )
{
if ( (msg= SuperNET_msgpending(myinfo,agent)) != 0 && SuperNET_msgsend(myinfo,agent,msg) > 0 )
flag++;
}
}
}
return(flag);
}
int32_t Supernet_nnpoll(struct supernet_info *myinfo,uint8_t *buf,int32_t bufsize,struct supernet_endpoint **eps,int32_t num,int32_t timeout)
{
struct nn_pollfd fds[1024]; int32_t i,j,n,k,r,starti,nonz,flag; struct supernet_msghdr *msg; struct supernet_endpoint *ptr;
if ( num == 0 )
return(0);
memset(fds,0,sizeof(fds));
flag = 0;
r = rand();
for (j=k=nonz=n=0; j<num; j++)
{
i = (j + r) % num;
ptr = eps[i];
starti = n;
for (k=0; k<ptr->num; k++,n++)
{
fds[n].fd = -1;
if ( ptr->eps[k].nnsock >= 0 )
{
fds[n].fd = ptr->eps[k].nnsock;
fds[n].events = (POLLIN | POLLOUT);
nonz++;
}
}
}
if ( nonz != 0 && nn_poll(fds,num,timeout) > 0 )
{
for (j=k=0; j<num; j++)
{
i = (j + r) % num;
ptr = eps[i];
starti = n;
for (k=0; k<ptr->num; k++,n++)
{
if ( (fds[i].revents & POLLIN) != 0 && SuperNET_nnrecv(myinfo,ptr,n - starti) >= 0 )
flag++;
if ( (fds[i].revents & POLLOUT) != 0 )
{
if ( (msg= SuperNET_nnpending(myinfo,ptr,n - starti)) != 0 && SuperNET_nnsend(myinfo,ptr,n - starti,msg) > 0 )
flag++;
}
}
}
}
return(flag);
}
void SuperNET_acceptloop(void *args)
{
int32_t bindsock,sock; struct supernet_agent *agent; struct supernet_info *myinfo = args;
socklen_t clilen; uint16_t agentport; struct sockaddr_in cli_addr; char ipaddr[64]; uint32_t ipbits;
bindsock = SuperNET_socket(1,"127.0.0.1",myinfo->acceptport);
printf("SuperNET_acceptloop 127.0.0.1:%d bind sock.%d\n",myinfo->acceptport,bindsock);
while ( bindsock >= 0 )
{
clilen = sizeof(cli_addr);
printf("ACCEPT (%s:%d) on sock.%d\n","127.0.0.1",myinfo->acceptport,bindsock);
sock = accept(bindsock,(struct sockaddr *)&cli_addr,&clilen);
if ( sock < 0 )
{
printf("ERROR on accept bindsock.%d errno.%d (%s)\n",bindsock,errno,strerror(errno));
continue;
}
memcpy(&ipbits,&cli_addr.sin_addr.s_addr,sizeof(ipbits));
agentport = cli_addr.sin_port;
expand_ipbits(ipaddr,ipbits);
printf("NEWSOCK.%d for %x (%s:%u)\n",sock,ipbits,ipaddr,agentport);
agent = calloc(1,sizeof(*agent));
strcpy(agent->ipaddr,ipaddr);
sprintf(agent->name,"%s:%d",ipaddr,agentport);
agent->ipbits = ipbits;
agent->sock = sock;
agent->port = myinfo->acceptport;
queue_enqueue("acceptQ",&myinfo->acceptQ,&agent->DL,0);
}
}
int32_t SuperNET_acceptport(struct supernet_info *myinfo,uint16_t port)
{
struct supernet_info *ptr;
ptr = calloc(1,sizeof(*myinfo));
*ptr = *myinfo;
ptr->acceptport = port;
if ( OS_thread_create(malloc(sizeof(pthread_t)),NULL,(void *)SuperNET_acceptloop,(void *)ptr) != 0 )
{
printf("error launching accept thread for port.%u\n",port);
return(-1);
}
return(0);
}
void SuperNET_loop(struct supernet_info *myinfo)
{
struct supernet_agent *ptr; char *buf; int32_t bufsize = 65536 * 32;
buf = calloc(1,bufsize);
while ( myinfo->dead == 0 )
{
if ( (ptr= queue_dequeue(&myinfo->acceptQ,0)) != 0 )
{
if ( myinfo->numagents < sizeof(myinfo->agents)/sizeof(*myinfo->agents)-1 )
{
myinfo->agents[myinfo->numagents++] = *ptr;
free(ptr);
}
printf("SuperNET.[%d] got new socket %d for %s:%d\n",myinfo->numagents,ptr->sock,ptr->ipaddr,ptr->port);
}
else if ( Supernet_poll(myinfo,(uint8_t *)buf,bufsize,myinfo->agents,myinfo->numagents,myinfo->POLLTIMEOUT) <= 0 )
usleep(10000);
}
free(buf);
}
void SuperNET_main(void *arg)
{
struct supernet_info MYINFO; int32_t i;//cJSON *json,*array; uint16_t port;,n = 0;
memset(&MYINFO,0,sizeof(MYINFO));
if ( 1 )
{
strcpy(MYINFO.transport,"tcp");
strcpy(MYINFO.ipaddr,"127.0.0.1");
MYINFO.acceptport = SUPERNET_PORT; MYINFO.serviceport = SUPERNET_PORT - 2;
// SuperNET_init(&MYINFO,arg); parse supernet.conf
if ( MYINFO.POLLTIMEOUT == 0 )
MYINFO.POLLTIMEOUT = SUPERNET_POLLTIMEOUT;
}
/*if ( arg == 0 || (json= cJSON_Parse(arg)) == 0 )
SuperNET_acceptport(&MYINFO,MYINFO.acceptport);
else
{
if ( (array= jarray(&n,json,"accept")) != 0 )
{
for (i=0; i<n; i++)
if ( (port= juint(jitem(array,i),0)) != 0 )
SuperNET_acceptport(&MYINFO,port);
}
free_json(json);
}*/
printf("start SuperNET_loop on port.%u\n",SUPERNET_PORT);
OS_thread_create(malloc(sizeof(pthread_t)),NULL,(void *)SuperNET_rpcloop,&MYINFO);
for (i=0; i<sizeof(MYINFO.agents)/sizeof(*MYINFO.agents); i++)
MYINFO.agents[i].sock = -1;
SuperNET_loop(&MYINFO);
}