508 lines
20 KiB

/******************************************************************************
* Copyright © 2014-2015 The SuperNET Developers. *
* *
* See the AUTHORS, DEVELOPER-AGREEMENT and LICENSE files at *
* the top-level directory of this distribution for the individual copyright *
* holder information and the developer policies on copyright and licensing. *
* *
* Unless otherwise agreed in a custom licensing agreement, no part of the *
* SuperNET software, including this file may be copied, modified, propagated *
* or distributed except according to the terms contained in the LICENSE file *
* *
* Removal or modification of this copyright notice is prohibited. *
* *
******************************************************************************/
#include "../crypto777/OS_portable.h"
#include "SuperNET.h"
void SuperNET_rpcloop(void *args)
{
struct supernet_info *myinfo = args;
int32_t recvlen,bindsock,postflag,sock,remains,jsonflag,numsent,len; socklen_t clilen;
char remoteaddr[64],jsonbuf[8192],*buf,*retstr,*space;//,*retbuf; ,n,i,m
struct sockaddr_in cli_addr; uint32_t ipbits,i; uint16_t port;
int32_t size = 1024 * 1024 * 2;
port = SUPERNET_PORT;
bindsock = iguana_socket(1,"127.0.0.1",port);
printf("SuperNET_rpcloop 127.0.0.1:%d bind sock.%d\n",port,bindsock);
space = calloc(1,size);
while ( bindsock >= 0 )
{
clilen = sizeof(cli_addr);
//printf("ACCEPT (%s:%d) on sock.%d\n","127.0.0.1",port,bindsock);
sock = accept(bindsock,(struct sockaddr *)&cli_addr,&clilen);
if ( sock < 0 )
{
//printf("iguana_rpcloop ERROR on accept usock.%d\n",sock);
continue;
}
memcpy(&ipbits,&cli_addr.sin_addr.s_addr,sizeof(ipbits));
expand_ipbits(remoteaddr,ipbits);
memset(jsonbuf,0,sizeof(jsonbuf));
remains = (int32_t)(sizeof(jsonbuf) - 1);
buf = jsonbuf;
recvlen = 0;
retstr = 0;
while ( remains > 0 )
{
if ( (len= (int32_t)recv(sock,buf,remains,0)) < 0 )
{
if ( errno == EAGAIN )
{
printf("EAGAIN for len %d, remains.%d\n",len,remains);
usleep(10000);
}
break;
}
else
{
if ( len > 0 )
{
remains -= len;
recvlen += len;
buf = &buf[len];
retstr = SuperNET_rpcparse(myinfo,space,size,&jsonflag,&postflag,jsonbuf,remoteaddr);
break;
} else usleep(10000);
}
}
if ( retstr != 0 )
{
i = 0;
if ( postflag == 0 && jsonflag == 0 )
retstr = SuperNET_htmlresponse(space,size,&remains,1,retstr,1);
else remains = (int32_t)strlen(retstr);
printf("RETBUF.(%s)\n",retstr);
while ( remains > 0 )
{
if ( (numsent= (int32_t)send(sock,&retstr[i],remains,MSG_NOSIGNAL)) < 0 )
{
if ( errno != EAGAIN && errno != EWOULDBLOCK )
{
//printf("%s: %s numsent.%d vs remains.%d len.%d errno.%d (%s) usock.%d\n",retstr,ipaddr,numsent,remains,recvlen,errno,strerror(errno),sock);
break;
}
}
else if ( remains > 0 )
{
remains -= numsent;
i += numsent;
if ( remains > 0 )
printf("iguana sent.%d remains.%d of len.%d\n",numsent,remains,recvlen);
}
}
if ( retstr != space )
free(retstr);
}
//printf("done response sock.%d\n",sock);
closesocket(sock);
}
}
/*
struct endpoint find_epbits(struct relay_info *list,uint32_t ipbits,uint16_t port,int32_t type)
{
int32_t i; struct endpoint epbits;
memset(&epbits,0,sizeof(epbits));
if ( list != 0 && list->num > 0 )
{
if ( type >= 0 )
type = nn_portoffset(type);
for (i=0; i<list->num&&i<(int32_t)(sizeof(list->connections)/sizeof(*list->connections)); i++)
if ( list->connections[i].ipbits == ipbits && (port == 0 || port == list->connections[i].port) && (type < 0 || type == list->connections[i].nn) )
return(list->connections[i]);
}
return(epbits);
}
int32_t add_relay(struct relay_info *list,struct endpoint epbits)
{
list->connections[list->num % (sizeof(list->connections)/sizeof(*list->connections))] = epbits, list->num++;
if ( list->num > (sizeof(list->connections)/sizeof(*list->connections)) )
printf("add_relay warning num.%d > %ld\n",list->num,(long)(sizeof(list->connections)/sizeof(*list->connections)));
return(list->num);
}
int32_t nn_add_lbservers(struct supernet_info *myinfo,uint16_t port,uint16_t globalport,uint16_t relaysport,int32_t priority,int32_t sock,char servers[][MAX_SERVERNAME],int32_t num)
{
int32_t i; char endpoint[512],pubendpoint[512]; struct endpoint epbits; uint32_t ipbits;
if ( num > 0 && servers != 0 && nn_setsockopt(sock,NN_SOL_SOCKET,NN_SNDPRIO,&priority,sizeof(priority)) >= 0 )
{
for (i=0; i<num; i++)
{
if ( (ipbits= (uint32_t)calc_ipbits(servers[i])) == 0 )
{
printf("null ipbits.(%s)\n",servers[i]);
continue;
}
//printf("epbits.%llx ipbits.%x %s\n",*(long long *)&epbits,(uint32_t)ipbits,endpoint);
if ( ismyaddress(servers[i],myinfo) == 0 )
{
epbits = calc_epbits("tcp",ipbits,port,NN_REP);
expand_epbits(endpoint,epbits);
if ( nn_connect(sock,endpoint) >= 0 )
{
printf("+R%s ",endpoint);
add_relay(&myinfo->active,epbits);
}
if ( myinfo->subclient >= 0 )
{
if ( myinfo->iamrelay != 0 )
{
epbits = calc_epbits("tcp",ipbits,relaysport,NN_PUB);
expand_epbits(pubendpoint,epbits);
if ( nn_connect(myinfo->subclient,pubendpoint) >= 0 )
printf("+P%s ",pubendpoint);
}
epbits = calc_epbits("tcp",ipbits,globalport,NN_PUB);
expand_epbits(pubendpoint,epbits);
if ( nn_connect(myinfo->subclient,pubendpoint) >= 0 )
printf("+P%s ",pubendpoint);
}
}
}
printf("added priority.%d\n",priority);
priority++;
} else printf("error setting priority.%d (%s)\n",priority,nn_errstr());
return(priority);
}
int32_t _lb_socket(struct supernet_info *myinfo,uint16_t port,uint16_t globalport,uint16_t relaysport,int32_t maxmillis,char servers[][MAX_SERVERNAME],int32_t num,char backups[][MAX_SERVERNAME],int32_t numbacks,char failsafes[][MAX_SERVERNAME],int32_t numfailsafes)
{
int32_t lbsock,timeout,retrymillis,priority = 1;
if ( (lbsock= nn_socket(AF_SP,NN_REQ)) >= 0 )
{
retrymillis = (maxmillis / 30) + 1;
printf("!!!!!!!!!!!! lbsock.%d !!!!!!!!!!!\n",lbsock);
if ( nn_setsockopt(lbsock,NN_SOL_SOCKET,NN_RECONNECT_IVL,&retrymillis,sizeof(retrymillis)) < 0 )
printf("error setting NN_REQ NN_RECONNECT_IVL_MAX socket %s\n",nn_errstr());
else if ( nn_setsockopt(lbsock,NN_SOL_SOCKET,NN_RECONNECT_IVL_MAX,&maxmillis,sizeof(maxmillis)) < 0 )
fprintf(stderr,"error setting NN_REQ NN_RECONNECT_IVL_MAX socket %s\n",nn_errstr());
timeout = SUPERNET_NETWORKTIMEOUT;
if ( 1 && nn_setsockopt(lbsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)) < 0 )
printf("error setting NN_SOL_SOCKET NN_RCVTIMEO socket %s\n",nn_errstr());
timeout = 100;
if ( 1 && nn_setsockopt(lbsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)) < 0 )
printf("error setting NN_SOL_SOCKET NN_SNDTIMEO socket %s\n",nn_errstr());
if ( num > 0 )
priority = nn_add_lbservers(myinfo,port,globalport,relaysport,priority,lbsock,servers,num);
if ( numbacks > 0 )
priority = nn_add_lbservers(myinfo,port,globalport,relaysport,priority,lbsock,backups,numbacks);
if ( numfailsafes > 0 )
priority = nn_add_lbservers(myinfo,port,globalport,relaysport,priority,lbsock,failsafes,numfailsafes);
} else printf("error getting req socket %s\n",nn_errstr());
//printf("myinfo->lb.num %d\n",myinfo->lb.num);
return(lbsock);
}
int32_t nn_lbsocket(struct supernet_info *myinfo,int32_t maxmillis,int32_t port,uint16_t globalport,uint16_t relaysport)
{
char Cservers[32][MAX_SERVERNAME],Bservers[32][MAX_SERVERNAME],failsafes[4][MAX_SERVERNAME];
int32_t n,m,lbsock,numfailsafes = 0;
printf("redo lbsocket()\n"), exit(-1);
//strcpy(failsafes[numfailsafes++],"5.9.56.103");
//strcpy(failsafes[numfailsafes++],"5.9.102.210");
// n = crackfoo_servers(Cservers,sizeof(Cservers)/sizeof(*Cservers),port);
// m = badass_servers(Bservers,sizeof(Bservers)/sizeof(*Bservers),port);
lbsock = _lb_socket(myinfo,port,globalport,relaysport,maxmillis,Bservers,m,Cservers,n*0,failsafes,numfailsafes);
return(lbsock);
}
void add_standard_fields(char *request)
{
cJSON *json; uint64_t tag;
if ( (json= cJSON_Parse(request)) != 0 )
{
if ( get_API_nxt64bits(cJSON_GetObjectItem(json,"NXT")) == 0 )
{
randombytes((void *)&tag,sizeof(tag));
sprintf(request + strlen(request) - 1,",\"NXT\":\"%s\",\"tag\":\"%llu\"}",myinfo->NXTADDR,(long long)tag);
if ( myinfo->iamrelay != 0 && (myinfo->hostname[0] != 0 || myinfo->ipaddr[0] != 0) )
sprintf(request + strlen(request) - 1,",\"iamrelay\":\"%s\"}",myinfo->hostname[0]!=0?myinfo->hostname:myinfo->myipaddr);
}
free_json(json);
}
}
char *nn_loadbalanced(struct supernet_info *myinfo,uint8_t *data,int32_t len)
{
char *msg,*jsonstr = 0;
int32_t sendlen,i,lbsock,recvlen = 0;
if ( (lbsock= myinfo->lbclient) < 0 )
return(clonestr("{\"error\":\"invalid load balanced socket\"}"));
for (i=0; i<10; i++)
if ( (nn_socket_status(lbsock,1) & NN_POLLOUT) != 0 )
break;
if ( myinfo->Debuglevel > 2 )
printf("sock.%d NN_LBSEND.(%s)\n",lbsock,data);
//fprintf(stderr,"send to network\n");
if ( (sendlen= nn_send(lbsock,data,len,0)) == len )
{
for (i=0; i<10; i++)
if ( (nn_socket_status(lbsock,1) & NN_POLLIN) != 0 )
break;
if ( (recvlen= nn_recv(lbsock,&msg,NN_MSG,0)) > 0 )
{
if ( myinfo->Debuglevel > 2 )
printf("LBRECV.(%s)\n",msg);
jsonstr = clonestr((char *)msg);
nn_freemsg(msg);
}
else
{
printf("nn_loadbalanced got recvlen.%d %s\n",recvlen,nn_errstr());
jsonstr = clonestr("{\"error\":\"lb recv error, probably timeout\"}");
}
} else printf("got sendlen.%d instead of %d %s\n",sendlen,len,nn_errstr()), jsonstr = clonestr("{\"error\":\"lb send error\"}");
return(jsonstr);
}
cJSON *relay_json(struct relay_info *list)
{
cJSON *json,*array; char endpoint[512]; int32_t i;
if ( list == 0 || list->num == 0 )
return(0);
array = cJSON_CreateArray();
for (i=0; i<list->num&&i<(int32_t)(sizeof(list->connections)/sizeof(*list->connections)); i++)
{
expand_epbits(endpoint,list->connections[i]);
jaddistr(array,endpoint);
}
json = cJSON_CreateObject();
jadd(json,"endpoints",array);
//cJSON_AddItemToObject(json,"type",cJSON_CreateString(nn_typestr(list->mytype)));
//cJSON_AddItemToObject(json,"dest",cJSON_CreateString(nn_typestr(list->desttype)));
jaddnum(json,"total",list->num);
return(json);
}
char *relays_jsonstr(struct supernet_info *myinfo,char *jsonstr,cJSON *argjson)
{
cJSON *json;
if ( myinfo->iamrelay != 0 && myinfo->ipaddr[0] != 0 )
{
json = cJSON_CreateObject();
jaddstr(json,"relay",myinfo->ipaddr);
if ( myinfo->active.num > 0 )
jadd(json,"relays",relay_json(&myinfo->active));
return(jprint(json,1));
}
else return(clonestr("{\"error\":\"get relay list from relay\"}"));
}
int32_t init_SUPERNET_pullsock(struct supernet_info *myinfo,int32_t sendtimeout,int32_t recvtimeout)
{
char bindaddr[64],*transportstr; int32_t iter;
myinfo->pullsock = -1;
if ( (myinfo->pullsock= nn_socket(AF_SP,NN_PULL)) < 0 )
{
printf("error creating pullsock %s\n",nn_strerror(nn_errno()));
return(-1);
}
printf("got pullsock.%d\n",myinfo->pullsock);
if ( nn_settimeouts(myinfo->pullsock,sendtimeout,recvtimeout) < 0 )
{
printf("error settime pullsock timeouts %s\n",nn_strerror(nn_errno()));
return(-1);
}
printf("PULLsock.%d\n",myinfo->pullsock);
for (iter=0; iter<2; iter++)
{
transportstr = (iter == 0) ? "ipc" : "inproc";
sprintf(bindaddr,"%s://SuperNET.agents",transportstr);
if ( nn_bind(myinfo->pullsock,bindaddr) < 0 )
{
printf("error binding pullsock to (%s) %s\n",bindaddr,nn_strerror(nn_errno()));
return(-1);
}
}
return(0);
}
void busdata_init(struct supernet_info *myinfo,int32_t sendtimeout,int32_t recvtimeout,int32_t firstiter)
{
char endpoint[512]; int32_t i;
myinfo->servicesock = myinfo->pubglobal = myinfo->pubrelays = myinfo->lbserver = -1;
endpoint[0] = 0;
if ( (myinfo->subclient= nn_createsocket(myinfo,endpoint,0,"NN_SUB",NN_SUB,0,sendtimeout,recvtimeout)) >= 0 )
{
myinfo->pfd[myinfo->numservers++].fd = myinfo->subclient, printf("numservers.%d\n",myinfo->numservers);
nn_setsockopt(myinfo->subclient,NN_SUB,NN_SUB_SUBSCRIBE,"",0);
} else printf("error creating subclient\n");
myinfo->lbclient = nn_lbsocket(myinfo,SUPERNET_NETWORKTIMEOUT,SUPERNET_PORT + LB_OFFSET,myinfo->port + PUBGLOBALS_OFFSET,myinfo->port + PUBRELAYS_OFFSET);
printf("LBclient.%d port.%d\n",myinfo->lbclient,SUPERNET_PORT + LB_OFFSET);
sprintf(endpoint,"%s://%s:%u",myinfo->transport,myinfo->ipaddr,myinfo->serviceport);
if ( (myinfo->servicesock= nn_createsocket(myinfo,endpoint,1,"NN_REP",NN_REP,myinfo->serviceport,sendtimeout,recvtimeout)) >= 0 )
myinfo->pfd[myinfo->numservers++].fd = myinfo->servicesock, printf("numservers.%d\n",myinfo->numservers);
else printf("error creating servicesock\n");
for (i=0; i<myinfo->numservers; i++)
myinfo->pfd[i].events = NN_POLLIN | NN_POLLOUT;
printf("myinfo->iamrelay %d, numservers.%d ipaddr.(%s://%s) port.%d serviceport.%d\n",myinfo->iamrelay,myinfo->numservers,myinfo->transport,myinfo->ipaddr,myinfo->port,myinfo->serviceport);
}
void SuperNET_init(struct supernet_info *myinfo,char *jsonstr)
{
char *str;
if ( jsonstr != 0 && (str= SuperNET_JSON(myinfo,jsonstr)) != 0 )
free(str);
busdata_init(myinfo,10,1,0);
init_SUPERNET_pullsock(myinfo,10,10);
}*/
int32_t Supernet_lineparse(char *key,int32_t keymax,char *value,int32_t valuemax,char *src)
{
int32_t a,b,c,n = 0;
key[0] = value[0] = 0;
while ( (c= src[n]) == ' ' || c == '\t' || c == '\n' || c == '\t' )
n++;
while ( (c= src[n]) != ':' && c != 0 )
{
*key++ = c;
if ( ++n >= keymax-1 )
{
*key = 0;
printf("lineparse overflow key.(%s)\n",src);
return(-1);
}
}
*key = 0;
if ( src[n] != ':' )
return(n);
n++;
while ( (c= src[n]) == ' ' || c == '\t' )
n++;
while ( (c= src[n]) != 0 && c != '\r' && c != '\n' )
{
if ( c == '%' && (a= src[n+1]) != 0 && (b= src[n+2]) != 0 )
c = ((unhex(a) << 4) | unhex(b)), n += 2;
*value++ = c;
n++;
if ( n >= valuemax-1 )
{
*value = 0;
printf("lineparse overflow.(%s)\n",src);
return(-1);
}
}
*value = 0;
if ( src[n] != 0 )
{
n++;
while ( (c= src[n]) == '\r' || c == '\n' )
n++;
}
return(n);
}
cJSON *SuperNET_urlconv(char *value,int32_t bufsize,char *urlstr)
{
int32_t i,n,totallen,datalen,len = 0; cJSON *json,*array; char key[8192],*data;
json = cJSON_CreateObject();
array = cJSON_CreateArray();
totallen = (int32_t)strlen(urlstr);
while ( 1 )
{
for (i=len; urlstr[i]!=0; i++)
if ( urlstr[i] == '\r' || urlstr[i] == '\n' )
break;
if ( i == len && (urlstr[len] == '\r' || urlstr[len] == '\n') )
{
len++;
continue;
}
urlstr[i] = 0;
if ( (n= Supernet_lineparse(key,sizeof(key),value,bufsize,&urlstr[len])) > 0 )
{
if ( value[0] != 0 )
jaddstr(json,key,value);
else jaddistr(array,key);
len += (n + 1);
if ( strcmp(key,"Content-Length") == 0 && (datalen= atoi(value)) > 0 )
{
data = &urlstr[totallen - datalen];
data[-1] = 0;
//printf("post.(%s) (%c)\n",data,data[0]);
jaddstr(json,"POST",data);
}
} else break;
}
jadd(json,"lines",array);
return(json);
}
char *SuperNET_rpcparse(struct supernet_info *myinfo,char *retbuf,int32_t bufsize,int32_t *jsonflagp,int32_t *postflagp,char *urlstr,char *remoteaddr)
{
cJSON *tokens,*argjson,*json = 0; char urlmethod[16],*data,url[1024],*retstr,*token = 0; int32_t i,j,n;
//printf("rpcparse.(%s)\n",urlstr);
for (i=0; i<sizeof(urlmethod)-1&&urlstr[i]!=0&&urlstr[i]!=' '; i++)
urlmethod[i] = urlstr[i];
urlmethod[i++] = 0;
n = i;
//printf("URLMETHOD.(%s)\n",urlmethod);
*postflagp = (strcmp(urlmethod,"POST") == 0);
for (i=0; i<sizeof(url)-1&&urlstr[n+i]!=0&&urlstr[n+i]!=' '; i++)
url[i] = urlstr[n+i];
url[i++] = 0;
n += i;
//printf("URL.(%s)\n",url);
tokens = cJSON_CreateArray();
j = i = 0;
if ( strncmp(&url[i],"/api",strlen("/url")) == 0 )
{
*jsonflagp = 1;
i += strlen("/api");
} else *jsonflagp = 0;
if ( strncmp(&url[i],"/bitmap",strlen("/bitmap")) == 0 )
{
i += strlen("/bitmap");
*jsonflagp = 2;
iguana_bitmap(retbuf,bufsize,&url[i]);
return(retbuf);
}
if ( url[i] != '/' )
token = url;
for (; url[i]!=0; i++)
{
if ( url[i] == '/' )
{
url[i] = 0;
if ( token != 0 )
jaddistr(tokens,token);
token = &url[i+1];
continue;
}
}
if ( token != 0 )
jaddistr(tokens,token);
if ( (json= SuperNET_urlconv(retbuf,bufsize,urlstr+n)) != 0 )
{
jadd(json,"tokens",tokens);
jaddstr(json,"urlmethod",urlmethod);
if ( (data= jstr(json,"POST")) == 0 || (argjson= cJSON_Parse(data)) == 0 )
{
argjson = cJSON_CreateObject();
if ( (n= cJSON_GetArraySize(tokens)) > 0 )
{
jaddstr(argjson,"agent",jstri(tokens,0));
if ( n > 1 )
jaddstr(argjson,"method",jstri(tokens,1));
for (i=2; i<n; i++)
{
if ( i == n-1 )
jaddstr(argjson,"data",jstri(tokens,i));
else
{
jaddstr(argjson,jstri(tokens,i),jstri(tokens,i+1));
i++;
}
}
}
}
retstr = SuperNET_JSON(myinfo,argjson,remoteaddr);
printf("(%s) -> (%s) postflag.%d (%s)\n",urlstr,cJSON_Print(json),*postflagp,jprint(argjson,0));
return(retstr);
}
return(clonestr("{\"error\":\"couldnt process packet\"}"));
}