|
|
@ -133,9 +133,9 @@ int32_t basilisk_submit(struct supernet_info *myinfo,cJSON *reqjson,int32_t time |
|
|
|
return(n); |
|
|
|
} |
|
|
|
|
|
|
|
struct basilisk_item *basilisk_issueremote(struct supernet_info *myinfo,char *methodstr,char *symbol,cJSON *vals,int32_t timeoutmillis,int32_t fanout,int32_t minresults,uint32_t basilisktag) |
|
|
|
struct basilisk_item *basilisk_issueremote(struct supernet_info *myinfo,char *methodstr,char *symbol,cJSON *vals,int32_t timeoutmillis,int32_t fanout,int32_t minresults,uint32_t basilisktag,void *metricfunc) |
|
|
|
{ |
|
|
|
double expiration; struct basilisk_item *ptr; cJSON *hexjson; |
|
|
|
struct basilisk_item *ptr; cJSON *hexjson; |
|
|
|
if ( basilisktag == 0 ) |
|
|
|
basilisktag = rand(); |
|
|
|
hexjson = cJSON_CreateObject(); |
|
|
@ -143,22 +143,27 @@ struct basilisk_item *basilisk_issueremote(struct supernet_info *myinfo,char *me |
|
|
|
jaddnum(hexjson,"basilisktag",basilisktag); |
|
|
|
jaddstr(hexjson,"method",methodstr); |
|
|
|
jaddstr(hexjson,"activecoin",symbol); |
|
|
|
jaddnum(hexjson,"timeout",timeoutmillis); |
|
|
|
if ( vals != 0 ) |
|
|
|
jadd(hexjson,"vals",jduplicate(vals)); |
|
|
|
printf("issue.(%s)\n",jprint(hexjson,0)); |
|
|
|
printf("issue.(%s) timeout.%d\n",jprint(hexjson,0),timeoutmillis); |
|
|
|
ptr = calloc(1,sizeof(*ptr)); |
|
|
|
ptr->basilisktag = basilisktag; |
|
|
|
ptr->numrequired = minresults; |
|
|
|
if ( (ptr->metricfunc= metricfunc) != 0 ) |
|
|
|
ptr->vals = jduplicate(vals); |
|
|
|
strcpy(ptr->symbol,symbol); |
|
|
|
ptr->expiration = OS_milliseconds() + timeoutmillis; |
|
|
|
queue_enqueue("submitQ",&myinfo->basilisks.submitQ,&ptr->DL,0); |
|
|
|
if ( basilisk_submit(myinfo,hexjson,timeoutmillis,fanout,ptr) > 0 ) |
|
|
|
{ |
|
|
|
if ( timeoutmillis > 0 ) |
|
|
|
/*if ( timeoutmillis > 0 )
|
|
|
|
{ |
|
|
|
printf("unexpected blocking\n"); |
|
|
|
expiration = OS_milliseconds() + ((timeoutmillis == 0) ? BASILISK_TIMEOUT : timeoutmillis); |
|
|
|
while ( OS_milliseconds() < expiration && ptr->finished == 0 && ptr->numresults < ptr->numrequired ) |
|
|
|
usleep(timeoutmillis/100 + 1); |
|
|
|
} |
|
|
|
}*/ |
|
|
|
} |
|
|
|
free_json(hexjson); |
|
|
|
return(ptr); |
|
|
@ -208,9 +213,9 @@ int32_t basilisk_besti(struct basilisk_item *ptr) |
|
|
|
else bestmetric = 0.; |
|
|
|
for (i=0; i<ptr->numresults; i++) |
|
|
|
{ |
|
|
|
if ( (metric= ptr->metrics[i]) != 0. ) |
|
|
|
if ( (metric= ptr->metrics[i]) > 0. ) |
|
|
|
{ |
|
|
|
if ( (ptr->metricdir < 0 && (bestmetric > 0. || metric < bestmetric)) || (ptr->metricdir > 0 && (bestmetric < 0. || metric > bestmetric)) || (ptr->metricdir == 0 && bestmetric == 0.) ) |
|
|
|
if ( (ptr->metricdir < 0 && (bestmetric == 0. || metric < bestmetric)) || (ptr->metricdir > 0 && (bestmetric == 0. || metric > bestmetric)) || (ptr->metricdir == 0 && bestmetric == 0.) ) |
|
|
|
{ |
|
|
|
bestmetric = metric; |
|
|
|
besti = i; |
|
|
@ -219,16 +224,43 @@ int32_t basilisk_besti(struct basilisk_item *ptr) |
|
|
|
} |
|
|
|
if ( besti >= 0 ) |
|
|
|
{ |
|
|
|
for (i=0; i<ptr->numresults; i++) |
|
|
|
for (ptr->numexact=i=0; i<ptr->numresults; i++) |
|
|
|
if ( fabs(ptr->metrics[i] - bestmetric) < SMALLVAL ) |
|
|
|
ptr->numexact++; |
|
|
|
} |
|
|
|
return(besti); |
|
|
|
} |
|
|
|
|
|
|
|
char *basilisk_iscomplete(struct basilisk_item *ptr) |
|
|
|
{ |
|
|
|
int32_t i,numvalid,besti=-1; char *errstr = 0,*retstr = 0; |
|
|
|
if ( ptr->retstr != 0 ) |
|
|
|
return(ptr->retstr); |
|
|
|
if ( (numvalid= ptr->numresults) >= ptr->numrequired ) |
|
|
|
{ |
|
|
|
for (i=numvalid=0; i<ptr->numresults; i++) |
|
|
|
{ |
|
|
|
if ( ptr->metrics[i] != 0. ) |
|
|
|
numvalid++; |
|
|
|
} |
|
|
|
} |
|
|
|
if ( numvalid < ptr->numrequired ) |
|
|
|
{ |
|
|
|
printf("%u: numvalid.%d < required.%d m %f\n",ptr->basilisktag,numvalid,ptr->numrequired,ptr->metrics[0]); |
|
|
|
return(0); |
|
|
|
} |
|
|
|
if ( ptr->uniqueflag == 0 && ptr->numexact != ptr->numresults && ptr->numexact < (ptr->numresults >> 1) ) |
|
|
|
besti = -1, errstr = "{\"error\":\"basilisk non-consensus results\"}"; |
|
|
|
else besti = basilisk_besti(ptr), errstr = "{\"error\":\"basilisk no valid results\"}"; |
|
|
|
printf("%u complete besti.%d\n",ptr->basilisktag,besti); |
|
|
|
retstr = basilisk_finish(ptr,besti,errstr); |
|
|
|
printf("%u besti.%d numexact.%d numresults.%d -> (%s)\n",ptr->basilisktag,besti,ptr->numexact,ptr->numresults,retstr); |
|
|
|
return(retstr); |
|
|
|
} |
|
|
|
|
|
|
|
char *basilisk_block(struct supernet_info *myinfo,struct iguana_info *coin,char *remoteaddr,struct basilisk_item *Lptr,struct basilisk_item *ptr) |
|
|
|
{ |
|
|
|
int32_t besti,i,j,numvalid; char *retstr = 0,*errstr; struct iguana_peer *addr; cJSON *hexobj,*retjson,*valsobj; |
|
|
|
int32_t i,j; char *retstr = 0; struct iguana_peer *addr; cJSON *hexobj,*retjson,*valsobj; |
|
|
|
if ( ptr == Lptr ) |
|
|
|
{ |
|
|
|
if ( (retstr= Lptr->retstr) == 0 ) |
|
|
@ -241,26 +273,9 @@ char *basilisk_block(struct supernet_info *myinfo,struct iguana_info *coin,char |
|
|
|
ptr->numrequired = 1; |
|
|
|
while ( OS_milliseconds() < ptr->expiration ) |
|
|
|
{ |
|
|
|
if ( (numvalid= ptr->numresults) >= ptr->numrequired ) |
|
|
|
{ |
|
|
|
for (i=numvalid=0; i<ptr->numresults; i++) |
|
|
|
{ |
|
|
|
if ( ptr->metrics[i] != 0. ) |
|
|
|
numvalid++; |
|
|
|
} |
|
|
|
} |
|
|
|
if ( numvalid < ptr->numrequired ) |
|
|
|
{ |
|
|
|
usleep(1000000); |
|
|
|
printf("%u: numvalid.%d < required.%d\n",ptr->basilisktag,numvalid,ptr->numrequired); |
|
|
|
continue; |
|
|
|
} |
|
|
|
if ( ptr->uniqueflag == 0 && ptr->numexact <= (ptr->numresults >> 1) ) |
|
|
|
besti = -1, errstr = "{\"error\":\"basilisk non-consensus results\"}"; |
|
|
|
else besti = basilisk_besti(ptr), errstr = "{\"error\":\"basilisk no valid results\"}"; |
|
|
|
retstr = basilisk_finish(ptr,besti,errstr); |
|
|
|
printf("besti.%d numexact.%d numresults.%d -> (%s)\n",besti,ptr->numexact,ptr->numresults,retstr); |
|
|
|
break; |
|
|
|
if ( (retstr= basilisk_iscomplete(ptr)) != 0 ) |
|
|
|
break; |
|
|
|
usleep(1000000); |
|
|
|
} |
|
|
|
if ( retstr == 0 ) |
|
|
|
retstr = basilisk_finish(ptr,-1,"{\"error\":\"basilisk timeout\"}"); |
|
|
@ -486,12 +501,14 @@ char *basilisk_hexmsg(struct supernet_info *myinfo,struct category_info *cat,voi |
|
|
|
|
|
|
|
void basilisks_loop(void *arg) |
|
|
|
{ |
|
|
|
basilisk_metricfunc metricfunc; struct basilisk_item *ptr,*tmp,*pending,*parent; int32_t i,flag,n; struct supernet_info *myinfo = arg; |
|
|
|
basilisk_metricfunc metricfunc; struct basilisk_item *ptr,*tmp,*pending,*parent; int32_t i,iter,flag,n; char *retstr; struct supernet_info *myinfo = arg; |
|
|
|
//uint8_t *blockspace; struct OS_memspace RAWMEM;
|
|
|
|
//memset(&RAWMEM,0,sizeof(RAWMEM));
|
|
|
|
//blockspace = calloc(1,IGUANA_MAXPACKETSIZE);
|
|
|
|
iter = 0; |
|
|
|
while ( 1 ) |
|
|
|
{ |
|
|
|
iter++; |
|
|
|
//for (i=0; i<IGUANA_MAXCOINS; i++)
|
|
|
|
// if ( (coin= Coins[i]) != 0 && coin->RELAYNODE == 0 && coin->VALIDATENODE == 0 && coin->active != 0 && coin->chain->userpass[0] != 0 && coin->MAXPEERS == 1 )
|
|
|
|
// basilisk_bitcoinscan(coin,blockspace,&RAWMEM);
|
|
|
@ -500,9 +517,8 @@ void basilisks_loop(void *arg) |
|
|
|
if ( ptr->finished == 0 ) |
|
|
|
HASH_ADD(hh,myinfo->basilisks.issued,basilisktag,sizeof(ptr->basilisktag),ptr); |
|
|
|
else free(ptr); |
|
|
|
continue; |
|
|
|
} |
|
|
|
else if ( (ptr= queue_dequeue(&myinfo->basilisks.resultsQ,0)) != 0 ) |
|
|
|
if ( (ptr= queue_dequeue(&myinfo->basilisks.resultsQ,0)) != 0 ) |
|
|
|
{ |
|
|
|
HASH_FIND(hh,myinfo->basilisks.issued,&ptr->basilisktag,sizeof(ptr->basilisktag),pending); |
|
|
|
if ( pending != 0 ) |
|
|
@ -510,70 +526,70 @@ void basilisks_loop(void *arg) |
|
|
|
if ( (n= pending->numresults) < sizeof(pending->results)/sizeof(*pending->results) ) |
|
|
|
{ |
|
|
|
pending->results[n] = ptr->retstr; |
|
|
|
printf("%p Add results[%d] <- (%s)\n",&pending->results[n],n,ptr->retstr); |
|
|
|
pending->numresults++; |
|
|
|
if ( (metricfunc= pending->metricfunc) == 0 ) |
|
|
|
pending->metrics[n] = n + 1; |
|
|
|
else pending->metrics[n] = (*metricfunc)(myinfo,pending,pending->results[n]); |
|
|
|
printf("%u Add results[%d] <- (%s) metric %f\n",pending->basilisktag,n,ptr->retstr,pending->metrics[n]); |
|
|
|
} |
|
|
|
} |
|
|
|
free(ptr); |
|
|
|
continue; |
|
|
|
} |
|
|
|
else |
|
|
|
flag = 0; |
|
|
|
HASH_ITER(hh,myinfo->basilisks.issued,pending,tmp) |
|
|
|
{ |
|
|
|
flag = 0; |
|
|
|
HASH_ITER(hh,myinfo->basilisks.issued,pending,tmp) |
|
|
|
{ |
|
|
|
for (i=0; i<pending->numresults; i++) |
|
|
|
if ( pending->metrics[i] == 0. ) |
|
|
|
{ |
|
|
|
if ( (metricfunc= pending->metricfunc) != 0 ) |
|
|
|
{ |
|
|
|
pending->metrics[i] = (*metricfunc)(myinfo,pending,pending->results[i]); |
|
|
|
//printf("poll metrics\n");
|
|
|
|
} |
|
|
|
flag++; |
|
|
|
} |
|
|
|
if ( OS_milliseconds() > pending->expiration ) |
|
|
|
printf("pending.%u numresults.%d m %f func.%p\n",pending->basilisktag,pending->numresults,pending->metrics[0],pending->metricfunc); |
|
|
|
for (i=0; i<pending->numresults; i++) |
|
|
|
if ( pending->metrics[i] == 0. ) |
|
|
|
{ |
|
|
|
if ( pending->finished == 0 ) |
|
|
|
if ( (metricfunc= pending->metricfunc) != 0 && pending->metrics[i] == 0. ) |
|
|
|
{ |
|
|
|
if ( (parent= pending->parent) != 0 ) |
|
|
|
{ |
|
|
|
pending->parent = 0; |
|
|
|
parent->childrendone++; |
|
|
|
} |
|
|
|
pending->finished = (uint32_t)time(NULL); |
|
|
|
pending->metrics[i] = (*metricfunc)(myinfo,pending,pending->results[i]); |
|
|
|
//printf("iter.%d %p.[%d] poll metrics.%u metric %f\n",iter,pending,i,pending->basilisktag,pending->metrics[i]);
|
|
|
|
} |
|
|
|
if ( pending->retstr == 0 ) |
|
|
|
pending->retstr = clonestr("{\"error\":\"basilisk timeout\"}"); |
|
|
|
printf("timeout call metrics.%u\n",pending->basilisktag); |
|
|
|
for (i=0; i<pending->numresults; i++) |
|
|
|
if ( (metricfunc= pending->metricfunc) != 0 ) |
|
|
|
pending->metrics[i] = (*metricfunc)(myinfo,pending,pending->results[i]); |
|
|
|
flag++; |
|
|
|
} |
|
|
|
if ( pending->finished != 0 ) |
|
|
|
if ( (retstr= basilisk_iscomplete(pending)) != 0 ) |
|
|
|
printf("completed.(%s)\n",retstr); |
|
|
|
if ( OS_milliseconds() > pending->expiration ) |
|
|
|
{ |
|
|
|
if ( pending->finished == 0 ) |
|
|
|
{ |
|
|
|
if ( pending->dependents == 0 || pending->childrendone >= pending->numchildren ) |
|
|
|
if ( (parent= pending->parent) != 0 ) |
|
|
|
{ |
|
|
|
HASH_DELETE(hh,myinfo->basilisks.issued,pending); |
|
|
|
if ( pending->dependents != 0 ) |
|
|
|
free(pending->dependents); |
|
|
|
printf("free ptr.%u\n",pending->basilisktag); |
|
|
|
for (i=0; i<pending->numresults; i++) |
|
|
|
if ( pending->results[i] != 0 ) |
|
|
|
free(pending->results[i]); |
|
|
|
if ( pending->vals != 0 ) |
|
|
|
free_json(pending->vals); |
|
|
|
free(pending); |
|
|
|
flag++; |
|
|
|
pending->parent = 0; |
|
|
|
parent->childrendone++; |
|
|
|
} |
|
|
|
pending->finished = (uint32_t)time(NULL); |
|
|
|
} |
|
|
|
if ( pending->retstr == 0 ) |
|
|
|
pending->retstr = clonestr("{\"error\":\"basilisk timeout\"}"); |
|
|
|
printf("timeout call metrics.%u lag %f - %f\n",pending->basilisktag,OS_milliseconds(),pending->expiration); |
|
|
|
for (i=0; i<pending->numresults; i++) |
|
|
|
if ( (metricfunc= pending->metricfunc) != 0 ) |
|
|
|
pending->metrics[i] = (*metricfunc)(myinfo,pending,pending->results[i]); |
|
|
|
} |
|
|
|
if ( pending->finished != 0 && time(NULL) > pending->finished+60 ) |
|
|
|
{ |
|
|
|
if ( pending->dependents == 0 || pending->childrendone >= pending->numchildren ) |
|
|
|
{ |
|
|
|
HASH_DELETE(hh,myinfo->basilisks.issued,pending); |
|
|
|
if ( pending->dependents != 0 ) |
|
|
|
free(pending->dependents); |
|
|
|
printf("HASH_DELETE free ptr.%u\n",pending->basilisktag); |
|
|
|
for (i=0; i<pending->numresults; i++) |
|
|
|
if ( pending->results[i] != 0 ) |
|
|
|
free(pending->results[i]); |
|
|
|
if ( pending->vals != 0 ) |
|
|
|
free_json(pending->vals); |
|
|
|
free(pending); |
|
|
|
flag++; |
|
|
|
} |
|
|
|
} |
|
|
|
if ( flag == 0 ) |
|
|
|
usleep(100000); |
|
|
|
} |
|
|
|
if ( flag == 0 ) |
|
|
|
sleep(1); |
|
|
|
else sleep(3); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|