Browse Source

Split out roc request packet processing

etomic
jl777 7 years ago
parent
commit
aa666ca036
  1. 2
      iguana/exchanges/LP_swap.c
  2. 264
      iguana/exchanges/stats.c

2
iguana/exchanges/LP_swap.c

@ -671,7 +671,7 @@ uint32_t LP_swapdata_rawtxsend(int32_t pairsock,struct basilisk_swap *swap,uint3
int32_t LP_swapwait(uint32_t requestid,uint32_t quoteid,int32_t duration,int32_t sleeptime) int32_t LP_swapwait(uint32_t requestid,uint32_t quoteid,int32_t duration,int32_t sleeptime)
{ {
char *retstr; cJSON *retjson=0; uint32_t divisor=8,expiration = (uint32_t)(time(NULL) + duration); char *retstr; cJSON *retjson=0; uint32_t expiration = (uint32_t)(time(NULL) + duration);
printf("wait %d:%d for SWAP.(r%u/q%u) to complete\n",duration,sleeptime,requestid,quoteid); printf("wait %d:%d for SWAP.(r%u/q%u) to complete\n",duration,sleeptime,requestid,quoteid);
sleep(10); sleep(10);
//if ( sleeptime < divisor*60 ) //if ( sleeptime < divisor*60 )

264
iguana/exchanges/stats.c

@ -553,159 +553,171 @@ int32_t iguana_getheadersize(char *buf,int32_t recvlen)
return(recvlen); return(recvlen);
} }
void stats_rpcloop(void *args) uint16_t RPC_port;
void LP_rpc_processreq(void *_ptr)
{ {
static char *jsonbuf; uint64_t arg64 = *(uint64_t *)_ptr;
uint16_t port; char filetype[128],content_type[128]; char filetype[128],content_type[128];
int32_t recvlen,flag,bindsock,postflag=0,contentlen,sock,remains,numsent,jsonflag=0,hdrsize,len; int32_t recvlen,flag,postflag=0,contentlen,remains,sock,numsent,jsonflag=0,hdrsize,len;
socklen_t clilen; char helpname[512],remoteaddr[64],*buf,*retstr,*space; char helpname[512],remoteaddr[64],*buf,*retstr,*space,*jsonbuf;
struct sockaddr_in cli_addr; uint32_t ipbits,i,size = 32*IGUANA_MAXPACKETSIZE + 512; uint32_t ipbits,i,size = 32*IGUANA_MAXPACKETSIZE + 512;
if ( (port= *(uint16_t *)args) == 0 ) ipbits = (arg64 >> 32);
port = 7779; expand_ipbits(remoteaddr,ipbits);
if ( jsonbuf == 0 ) sock = (arg64 & 0xffffffff);
jsonbuf = calloc(1,IGUANA_MAXPACKETSIZE); recvlen = flag = 0;
while ( (bindsock= iguana_socket(1,"0.0.0.0",port)) < 0 ) retstr = 0;
{
//if ( coin->MAXPEERS == 1 )
// break;
//exit(-1);
sleep(3);
}
printf(">>>>>>>>>> DEX stats 127.0.0.1:%d bind sock.%d DEX stats API enabled <<<<<<<<<\n",port,bindsock);
space = calloc(1,size); space = calloc(1,size);
while ( bindsock >= 0 ) jsonbuf = calloc(1,size);
remains = size-1;
buf = jsonbuf;
while ( remains > 0 )
{ {
clilen = sizeof(cli_addr); //printf("flag.%d remains.%d recvlen.%d\n",flag,remains,recvlen);
sock = accept(bindsock,(struct sockaddr *)&cli_addr,&clilen); if ( (len= (int32_t)recv(sock,buf,remains,0)) < 0 )
if ( sock < 0 )
{
//printf("iguana_rpcloop ERROR on accept usock.%d errno %d %s\n",sock,errno,strerror(errno));
continue;
}
memcpy(&ipbits,&cli_addr.sin_addr.s_addr,sizeof(ipbits));
expand_ipbits(remoteaddr,ipbits);
//printf("remote RPC request from (%s) %x\n",remoteaddr,ipbits);
memset(jsonbuf,0,IGUANA_MAXPACKETSIZE);
remains = (int32_t)(IGUANA_MAXPACKETSIZE - 1);
buf = jsonbuf;
recvlen = flag = 0;
retstr = 0;
while ( remains > 0 )
{ {
//printf("flag.%d remains.%d recvlen.%d\n",flag,remains,recvlen); if ( errno == EAGAIN )
if ( (len= (int32_t)recv(sock,buf,remains,0)) < 0 )
{ {
if ( errno == EAGAIN ) printf("EAGAIN for len %d, remains.%d\n",len,remains);
{ usleep(10000);
printf("EAGAIN for len %d, remains.%d\n",len,remains);
usleep(10000);
}
break;
} }
else break;
}
else
{
if ( len > 0 )
{ {
if ( len > 0 ) buf[len] = 0;
if ( recvlen == 0 )
{ {
buf[len] = 0; if ( (contentlen= iguana_getcontentlen(buf,recvlen)) > 0 )
if ( recvlen == 0 )
{ {
if ( (contentlen= iguana_getcontentlen(buf,recvlen)) > 0 ) hdrsize = iguana_getheadersize(buf,recvlen);
if ( hdrsize > 0 )
{ {
hdrsize = iguana_getheadersize(buf,recvlen); if ( len < (hdrsize + contentlen) )
if ( hdrsize > 0 )
{ {
if ( len < (hdrsize + contentlen) ) remains = (hdrsize + contentlen) - len;
{ buf = &buf[len];
remains = (hdrsize + contentlen) - len; flag = 1;
buf = &buf[len]; //printf("got.(%s) %d remains.%d of len.%d contentlen.%d hdrsize.%d remains.%d\n",buf,recvlen,remains,len,contentlen,hdrsize,(hdrsize+contentlen)-len);
flag = 1; continue;
//printf("got.(%s) %d remains.%d of len.%d contentlen.%d hdrsize.%d remains.%d\n",buf,recvlen,remains,len,contentlen,hdrsize,(hdrsize+contentlen)-len);
continue;
}
} }
} }
} }
recvlen += len;
remains -= len;
buf = &buf[len];
if ( flag == 0 || remains <= 0 )
break;
}
else
{
usleep(10000);
//printf("got.(%s) %d remains.%d of total.%d\n",jsonbuf,recvlen,remains,len);
//retstr = iguana_rpcparse(space,size,&postflag,jsonbuf);
if ( flag == 0 )
break;
} }
recvlen += len;
remains -= len;
buf = &buf[len];
if ( flag == 0 || remains <= 0 )
break;
} }
} else
content_type[0] = 0;
if ( recvlen > 0 )
{
retstr = stats_rpcparse(space,size,&jsonflag,&postflag,jsonbuf,remoteaddr,filetype,port);
if ( filetype[0] != 0 )
{ {
static cJSON *mimejson; char *tmp,*typestr=0; long tmpsize; usleep(10000);
sprintf(helpname,"%s/mime.json",GLOBAL_HELPDIR); //printf("got.(%s) %d remains.%d of total.%d\n",jsonbuf,recvlen,remains,len);
if ( (tmp= OS_filestr(&tmpsize,helpname)) != 0 ) //retstr = iguana_rpcparse(space,size,&postflag,jsonbuf);
{ if ( flag == 0 )
mimejson = cJSON_Parse(tmp); break;
free(tmp);
}
if ( mimejson != 0 )
{
if ( (typestr= jstr(mimejson,filetype)) != 0 )
sprintf(content_type,"Content-Type: %s\r\n",typestr);
} else printf("parse error.(%s)\n",tmp);
//printf("filetype.(%s) json.%p type.%p tmp.%p [%s]\n",filetype,mimejson,typestr,tmp,content_type);
} }
} }
if ( retstr != 0 ) }
content_type[0] = 0;
if ( recvlen > 0 )
{
jsonflag = postflag = 0;
retstr = stats_rpcparse(space,size,&jsonflag,&postflag,jsonbuf,remoteaddr,filetype,RPC_port);
if ( filetype[0] != 0 )
{ {
char *response,hdrs[1024]; static cJSON *mimejson; char *tmp,*typestr=0; long tmpsize;
//printf("RETURN.(%s) jsonflag.%d postflag.%d\n",retstr,jsonflag,postflag); sprintf(helpname,"%s/mime.json",GLOBAL_HELPDIR);
if ( jsonflag != 0 || postflag != 0 ) if ( (tmp= OS_filestr(&tmpsize,helpname)) != 0 )
{ {
if ( retstr == 0 ) mimejson = cJSON_Parse(tmp);
retstr = clonestr("{}"); free(tmp);
response = malloc(strlen(retstr)+1024+1+1);
sprintf(hdrs,"HTTP/1.1 200 OK\r\nAccess-Control-Allow-Origin: *\r\nAccess-Control-Allow-Credentials: true\r\nAccess-Control-Allow-Methods: GET, POST\r\nCache-Control : no-cache, no-store, must-revalidate\r\n%sContent-Length : %8d\r\n\r\n",content_type,(int32_t)strlen(retstr));
response[0] = '\0';
strcat(response,hdrs);
strcat(response,retstr);
strcat(response,"\n");
if ( retstr != space )
free(retstr);
retstr = response;
//printf("RET.(%s)\n",retstr);
} }
remains = (int32_t)strlen(retstr); if ( mimejson != 0 )
i = 0;
while ( remains > 0 )
{ {
if ( (numsent= (int32_t)send(sock,&retstr[i],remains,MSG_NOSIGNAL)) < 0 ) if ( (typestr= jstr(mimejson,filetype)) != 0 )
{ sprintf(content_type,"Content-Type: %s\r\n",typestr);
if ( errno != EAGAIN && errno != EWOULDBLOCK ) } else printf("parse error.(%s)\n",tmp);
{ //printf("filetype.(%s) json.%p type.%p tmp.%p [%s]\n",filetype,mimejson,typestr,tmp,content_type);
//printf("%s: %s numsent.%d vs remains.%d len.%d errno.%d (%s) usock.%d\n",retstr,ipaddr,numsent,remains,recvlen,errno,strerror(errno),sock); }
break; }
} if ( retstr != 0 )
} {
else if ( remains > 0 ) char *response,hdrs[1024];
//printf("RETURN.(%s) jsonflag.%d postflag.%d\n",retstr,jsonflag,postflag);
if ( jsonflag != 0 || postflag != 0 )
{
if ( retstr == 0 )
retstr = clonestr("{}");
response = malloc(strlen(retstr)+1024+1+1);
sprintf(hdrs,"HTTP/1.1 200 OK\r\nAccess-Control-Allow-Origin: *\r\nAccess-Control-Allow-Credentials: true\r\nAccess-Control-Allow-Methods: GET, POST\r\nCache-Control : no-cache, no-store, must-revalidate\r\n%sContent-Length : %8d\r\n\r\n",content_type,(int32_t)strlen(retstr));
response[0] = '\0';
strcat(response,hdrs);
strcat(response,retstr);
strcat(response,"\n");
if ( retstr != space )
free(retstr);
retstr = response;
//printf("RET.(%s)\n",retstr);
}
remains = (int32_t)strlen(retstr);
i = 0;
while ( remains > 0 )
{
if ( (numsent= (int32_t)send(sock,&retstr[i],remains,MSG_NOSIGNAL)) < 0 )
{
if ( errno != EAGAIN && errno != EWOULDBLOCK )
{ {
remains -= numsent; //printf("%s: %s numsent.%d vs remains.%d len.%d errno.%d (%s) usock.%d\n",retstr,ipaddr,numsent,remains,recvlen,errno,strerror(errno),sock);
i += numsent; break;
if ( remains > 0 )
printf("iguana sent.%d remains.%d of len.%d\n",numsent,remains,recvlen);
} }
} }
if ( retstr != space) else if ( remains > 0 )
free(retstr); {
remains -= numsent;
i += numsent;
if ( remains > 0 )
printf("iguana sent.%d remains.%d of len.%d\n",numsent,remains,recvlen);
}
} }
closesocket(sock); if ( retstr != space)
free(retstr);
}
free(space);
free(jsonbuf);
closesocket(sock);
}
void stats_rpcloop(void *args)
{
uint16_t port; int32_t sock,bindsock; socklen_t clilen; struct sockaddr_in cli_addr; uint32_t ipbits; uint64_t arg64;
if ( (port= *(uint16_t *)args) == 0 )
port = 7779;
RPC_port = port;
while ( (bindsock= iguana_socket(1,"0.0.0.0",port)) < 0 )
{
//if ( coin->MAXPEERS == 1 )
// break;
//exit(-1);
sleep(3);
}
printf(">>>>>>>>>> DEX stats 127.0.0.1:%d bind sock.%d DEX stats API enabled <<<<<<<<<\n",port,bindsock);
while ( bindsock >= 0 )
{
clilen = sizeof(cli_addr);
sock = accept(bindsock,(struct sockaddr *)&cli_addr,&clilen);
if ( sock < 0 )
{
//printf("iguana_rpcloop ERROR on accept usock.%d errno %d %s\n",sock,errno,strerror(errno));
continue;
}
memcpy(&ipbits,&cli_addr.sin_addr.s_addr,sizeof(ipbits));
//printf("remote RPC request from (%s) %x\n",remoteaddr,ipbits);
arg64 = ((uint64_t)ipbits << 32) | (sock & 0xffffffff);
LP_rpc_processreq((void *)&arg64);
} }
} }

Loading…
Cancel
Save