|
@ -94,6 +94,21 @@ cJSON *basilisk_resultsjson(struct supernet_info *myinfo,char *symbol,char *remo |
|
|
return(retjson); |
|
|
return(retjson); |
|
|
}*/ |
|
|
}*/ |
|
|
|
|
|
|
|
|
|
|
|
struct basilisk_item *basilisk_itemcreate(struct supernet_info *myinfo,uint32_t basilisktag,int32_t minresults,cJSON *vals,int32_t timeoutmillis,void *metricfunc,char *symbol) |
|
|
|
|
|
{ |
|
|
|
|
|
struct basilisk_item *ptr; |
|
|
|
|
|
ptr = calloc(1,sizeof(*ptr)); |
|
|
|
|
|
ptr->basilisktag = basilisktag; |
|
|
|
|
|
if ( (ptr->numrequired= minresults) == 0 ) |
|
|
|
|
|
ptr->numrequired = 1; |
|
|
|
|
|
if ( (ptr->metricfunc= metricfunc) != 0 ) |
|
|
|
|
|
ptr->vals = jduplicate(vals); |
|
|
|
|
|
strcpy(ptr->symbol,symbol); |
|
|
|
|
|
ptr->expiration = OS_milliseconds() + timeoutmillis; |
|
|
|
|
|
queue_enqueue("submitQ",&myinfo->basilisks.submitQ,&ptr->DL,0); |
|
|
|
|
|
return(ptr); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
#include "basilisk_bitcoin.c" |
|
|
#include "basilisk_bitcoin.c" |
|
|
#include "basilisk_nxt.c" |
|
|
#include "basilisk_nxt.c" |
|
|
#include "basilisk_ether.c" |
|
|
#include "basilisk_ether.c" |
|
@ -135,40 +150,43 @@ int32_t basilisk_submit(struct supernet_info *myinfo,cJSON *reqjson,int32_t time |
|
|
return(n); |
|
|
return(n); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
struct basilisk_item *basilisk_issueremote(struct supernet_info *myinfo,char *methodstr,char *symbol,cJSON *vals,int32_t timeoutmillis,int32_t fanout,int32_t minresults,uint32_t basilisktag,void *metricfunc) |
|
|
struct basilisk_item *basilisk_issueremote(struct supernet_info *myinfo,char *methodstr,char *symbol,cJSON *vals,int32_t timeoutmillis,int32_t fanout,int32_t minresults,uint32_t basilisktag,void *_metricfunc,char *retstr) |
|
|
{ |
|
|
{ |
|
|
struct basilisk_item *ptr; cJSON *hexjson; |
|
|
struct basilisk_item *ptr; cJSON *hexjson; basilisk_metricfunc metricfunc = _metricfunc; |
|
|
if ( basilisktag == 0 ) |
|
|
if ( basilisktag == 0 ) |
|
|
basilisktag = rand(); |
|
|
basilisktag = rand(); |
|
|
hexjson = cJSON_CreateObject(); |
|
|
ptr = basilisk_itemcreate(myinfo,basilisktag,minresults,vals,timeoutmillis,metricfunc,symbol); |
|
|
jaddstr(hexjson,"agent","basilisk"); |
|
|
if ( retstr != 0 ) |
|
|
jaddnum(hexjson,"basilisktag",basilisktag); |
|
|
|
|
|
jaddstr(hexjson,"method",methodstr); |
|
|
|
|
|
jaddstr(hexjson,"activecoin",symbol); |
|
|
|
|
|
jaddnum(hexjson,"timeout",timeoutmillis); |
|
|
|
|
|
if ( vals != 0 ) |
|
|
|
|
|
jadd(hexjson,"vals",jduplicate(vals)); |
|
|
|
|
|
//printf("issue.(%s) timeout.%d\n",jprint(hexjson,0),timeoutmillis);
|
|
|
|
|
|
ptr = calloc(1,sizeof(*ptr)); |
|
|
|
|
|
ptr->basilisktag = basilisktag; |
|
|
|
|
|
if ( (ptr->numrequired= minresults) == 0 ) |
|
|
|
|
|
ptr->numrequired = 1; |
|
|
|
|
|
if ( (ptr->metricfunc= metricfunc) != 0 ) |
|
|
|
|
|
ptr->vals = jduplicate(vals); |
|
|
|
|
|
strcpy(ptr->symbol,symbol); |
|
|
|
|
|
ptr->expiration = OS_milliseconds() + timeoutmillis; |
|
|
|
|
|
queue_enqueue("submitQ",&myinfo->basilisks.submitQ,&ptr->DL,0); |
|
|
|
|
|
if ( basilisk_submit(myinfo,hexjson,timeoutmillis,fanout,ptr) > 0 ) |
|
|
|
|
|
{ |
|
|
{ |
|
|
/*if ( timeoutmillis > 0 )
|
|
|
ptr->retstr = retstr; |
|
|
|
|
|
ptr->results[0] = retstr; |
|
|
|
|
|
ptr->numresults = ptr->numrequired; |
|
|
|
|
|
ptr->metrics[0] = (*metricfunc)(myinfo,ptr,retstr); |
|
|
|
|
|
ptr->finished = (uint32_t)time(NULL); |
|
|
|
|
|
} |
|
|
|
|
|
else |
|
|
|
|
|
{ |
|
|
|
|
|
hexjson = cJSON_CreateObject(); |
|
|
|
|
|
jaddstr(hexjson,"agent","basilisk"); |
|
|
|
|
|
jaddnum(hexjson,"basilisktag",basilisktag); |
|
|
|
|
|
jaddstr(hexjson,"method",methodstr); |
|
|
|
|
|
jaddstr(hexjson,"activecoin",symbol); |
|
|
|
|
|
jaddnum(hexjson,"timeout",timeoutmillis); |
|
|
|
|
|
if ( vals != 0 ) |
|
|
|
|
|
jadd(hexjson,"vals",jduplicate(vals)); |
|
|
|
|
|
printf("issue.(%s) timeout.%d\n",jprint(hexjson,0),timeoutmillis); |
|
|
|
|
|
if ( basilisk_submit(myinfo,hexjson,timeoutmillis,fanout,ptr) > 0 ) |
|
|
{ |
|
|
{ |
|
|
printf("unexpected blocking\n"); |
|
|
/*if ( timeoutmillis > 0 )
|
|
|
expiration = OS_milliseconds() + ((timeoutmillis == 0) ? BASILISK_TIMEOUT : timeoutmillis); |
|
|
{ |
|
|
while ( OS_milliseconds() < expiration && ptr->finished == 0 && ptr->numresults < ptr->numrequired ) |
|
|
printf("unexpected blocking\n"); |
|
|
usleep(timeoutmillis/100 + 1); |
|
|
expiration = OS_milliseconds() + ((timeoutmillis == 0) ? BASILISK_TIMEOUT : timeoutmillis); |
|
|
}*/ |
|
|
while ( OS_milliseconds() < expiration && ptr->finished == 0 && ptr->numresults < ptr->numrequired ) |
|
|
|
|
|
usleep(timeoutmillis/100 + 1); |
|
|
|
|
|
}*/ |
|
|
|
|
|
} |
|
|
|
|
|
free_json(hexjson); |
|
|
} |
|
|
} |
|
|
free_json(hexjson); |
|
|
|
|
|
return(ptr); |
|
|
return(ptr); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -265,6 +283,7 @@ char *basilisk_block(struct supernet_info *myinfo,struct iguana_info *coin,char |
|
|
{ |
|
|
{ |
|
|
if ( (retstr= Lptr->retstr) == 0 ) |
|
|
if ( (retstr= Lptr->retstr) == 0 ) |
|
|
retstr = clonestr("{\"result\":\"null return from local basilisk_issuecmd\"}"); |
|
|
retstr = clonestr("{\"result\":\"null return from local basilisk_issuecmd\"}"); |
|
|
|
|
|
ptr = basilisk_itemcreate(myinfo,Lptr->basilisktag,Lptr->numrequired,Lptr->vals,OS_milliseconds() - Lptr->expiration,Lptr->metricfunc,Lptr->symbol); |
|
|
//printf("block got local.(%s)\n",retstr);
|
|
|
//printf("block got local.(%s)\n",retstr);
|
|
|
} |
|
|
} |
|
|
else |
|
|
else |
|
@ -520,6 +539,7 @@ void basilisks_loop(void *arg) |
|
|
if ( ptr->finished == 0 ) |
|
|
if ( ptr->finished == 0 ) |
|
|
HASH_ADD(hh,myinfo->basilisks.issued,basilisktag,sizeof(ptr->basilisktag),ptr); |
|
|
HASH_ADD(hh,myinfo->basilisks.issued,basilisktag,sizeof(ptr->basilisktag),ptr); |
|
|
else free(ptr); |
|
|
else free(ptr); |
|
|
|
|
|
continue; |
|
|
} |
|
|
} |
|
|
if ( (ptr= queue_dequeue(&myinfo->basilisks.resultsQ,0)) != 0 ) |
|
|
if ( (ptr= queue_dequeue(&myinfo->basilisks.resultsQ,0)) != 0 ) |
|
|
{ |
|
|
{ |
|
@ -531,12 +551,14 @@ void basilisks_loop(void *arg) |
|
|
pending->numresults++; |
|
|
pending->numresults++; |
|
|
if ( (metricfunc= pending->metricfunc) == 0 ) |
|
|
if ( (metricfunc= pending->metricfunc) == 0 ) |
|
|
pending->metrics[n] = n + 1; |
|
|
pending->metrics[n] = n + 1; |
|
|
else pending->metrics[n] = (*metricfunc)(myinfo,pending,ptr->retstr); |
|
|
else if ( (pending->metrics[n]= (*metricfunc)(myinfo,pending,ptr->retstr)) != 0. ) |
|
|
|
|
|
pending->childrendone++; |
|
|
printf("%u Add results[%d] <- (%s) metric %f\n",pending->basilisktag,n,ptr->retstr,pending->metrics[n]); |
|
|
printf("%u Add results[%d] <- (%s) metric %f\n",pending->basilisktag,n,ptr->retstr,pending->metrics[n]); |
|
|
pending->results[n] = ptr->retstr; |
|
|
pending->results[n] = ptr->retstr; |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
free(ptr); |
|
|
free(ptr); |
|
|
|
|
|
continue; |
|
|
} |
|
|
} |
|
|
flag = 0; |
|
|
flag = 0; |
|
|
HASH_ITER(hh,myinfo->basilisks.issued,pending,tmp) |
|
|
HASH_ITER(hh,myinfo->basilisks.issued,pending,tmp) |
|
@ -547,7 +569,8 @@ void basilisks_loop(void *arg) |
|
|
for (i=0; i<pending->numresults; i++) |
|
|
for (i=0; i<pending->numresults; i++) |
|
|
if ( pending->metrics[i] == 0. && pending->results[i] != 0 ) |
|
|
if ( pending->metrics[i] == 0. && pending->results[i] != 0 ) |
|
|
{ |
|
|
{ |
|
|
pending->metrics[i] = (*metricfunc)(myinfo,pending,pending->results[i]); |
|
|
if ( (pending->metrics[i]= (*metricfunc)(myinfo,pending,pending->results[i])) != 0 ) |
|
|
|
|
|
pending->childrendone++; |
|
|
// printf("iter.%d %p.[%d] poll metrics.%u metric %f\n",iter,pending,i,pending->basilisktag,pending->metrics[i]);
|
|
|
// printf("iter.%d %p.[%d] poll metrics.%u metric %f\n",iter,pending,i,pending->basilisktag,pending->metrics[i]);
|
|
|
flag++; |
|
|
flag++; |
|
|
} |
|
|
} |
|
@ -590,8 +613,8 @@ void basilisks_loop(void *arg) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
if ( flag == 0 ) |
|
|
if ( flag == 0 ) |
|
|
usleep(100000); |
|
|
usleep(50000); |
|
|
else usleep(25000); |
|
|
else usleep(10000); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|