Browse Source

Test

etomic
jl777 7 years ago
parent
commit
071f269394
  1. 99
      iguana/exchanges/LP_socket.c

99
iguana/exchanges/LP_socket.c

@ -243,6 +243,7 @@ int32_t LP_socketrecv(int32_t sock,uint8_t *recvbuf,int32_t maxlen)
struct electrum_info struct electrum_info
{ {
queue_t sendQ,pendingQ; queue_t sendQ,pendingQ;
cJSON *retjson;
int32_t bufsize,sock,*heightp; int32_t bufsize,sock,*heightp;
uint32_t stratumid,lasttime,pending,*heighttimep; uint32_t stratumid,lasttime,pending,*heighttimep;
char ipaddr[64],symbol[16]; char ipaddr[64],symbol[16];
@ -303,21 +304,27 @@ struct electrum_info *electrum_server(char *symbol,struct electrum_info *ep)
return(ep); return(ep);
} }
// overlapped execution not debugged
cJSON *electrum_submit(char *symbol,struct electrum_info *ep,cJSON **retjsonp,char *method,char *params,int32_t timeout) cJSON *electrum_submit(char *symbol,struct electrum_info *ep,cJSON **retjsonp,char *method,char *params,int32_t timeout)
{ {
// queue id and string and callback // queue id and string and callback
char stratumreq[16384]; uint32_t expiration; struct stritem *sitem; cJSON *retjson = 0; char stratumreq[16384]; uint32_t expiration; struct stritem *sitem;
if ( strcmp(method,"getrawmempool") == 0 )
{
retjson = cJSON_Parse("{\"error\":\"unsupported method\"}");
if ( retjsonp != 0 )
*retjsonp = retjson;
return(retjson);
}
if ( ep == 0 ) if ( ep == 0 )
ep = electrum_server(symbol,0); ep = electrum_server(symbol,0);
if ( ep != 0 ) if ( ep != 0 )
{ {
if ( ep->retjson != 0 )
{
free_json(ep->retjson);
ep->retjson = 0;
}
if ( strcmp(method,"getrawmempool") == 0 )
{
ep->retjson = cJSON_Parse("{\"error\":\"unsupported method\"}");
if ( retjsonp != 0 )
*retjsonp = ep->retjson;
return(ep->retjson);
}
sprintf(stratumreq,"{ \"jsonrpc\":\"2.0\", \"id\": %u, \"method\":\"%s\", \"params\": %s }\n",ep->stratumid,method,params); sprintf(stratumreq,"{ \"jsonrpc\":\"2.0\", \"id\": %u, \"method\":\"%s\", \"params\": %s }\n",ep->stratumid,method,params);
ep->buf[0] = 0; ep->buf[0] = 0;
sitem = (struct stritem *)queueitem(stratumreq); sitem = (struct stritem *)queueitem(stratumreq);
@ -325,21 +332,21 @@ cJSON *electrum_submit(char *symbol,struct electrum_info *ep,cJSON **retjsonp,ch
sitem->DL.type = ep->stratumid++; sitem->DL.type = ep->stratumid++;
if ( retjsonp != 0 ) if ( retjsonp != 0 )
sitem->retptrp = (void **)retjsonp; sitem->retptrp = (void **)retjsonp;
else sitem->retptrp = (void **)&retjson; else sitem->retptrp = (void **)&ep->retjson;
queue_enqueue("sendQ",&ep->sendQ,&sitem->DL); queue_enqueue("sendQ",&ep->sendQ,&sitem->DL);
if ( sitem->retptrp == (void **)&retjson ) if ( sitem->retptrp == (void **)&ep->retjson )
{ {
expiration = (uint32_t)time(NULL) + timeout + 1; expiration = (uint32_t)time(NULL) + timeout + 1;
while ( retjson == 0 && time(NULL) <= expiration ) while ( ep->retjson == 0 && time(NULL) <= expiration )
usleep(10000); usleep(10000);
if ( retjson == 0 ) if ( ep->retjson == 0 )
{ {
printf("unexpected timeout with null retjson: %s %s\n",method,params); printf("unexpected timeout with null retjson: %s %s\n",method,params);
retjson = cJSON_Parse("{\"error\":\"timeout\"}"); ep->retjson = cJSON_Parse("{\"error\":\"timeout\"}");
} }
} }
} else printf("couldnt find electrum server for (%s %s)\n",method,params); } else printf("couldnt find electrum server for (%s %s)\n",method,params);
return(retjson); return(ep->retjson);
} }
cJSON *electrum_noargs(char *symbol,struct electrum_info *ep,cJSON **retjsonp,char *method,int32_t timeout) cJSON *electrum_noargs(char *symbol,struct electrum_info *ep,cJSON **retjsonp,char *method,int32_t timeout)
@ -542,45 +549,42 @@ int32_t LP_recvfunc(struct electrum_info *ep,char *str,int32_t len)
} }
} }
idnum = juint(strjson,"id"); idnum = juint(strjson,"id");
//if ( 0 ) // crashes cipi's node likely due to mutex across threads portable_mutex_lock(&ep->pendingQ.mutex);
if ( ep->pendingQ.list != 0 )
{ {
portable_mutex_lock(&ep->pendingQ.mutex); DL_FOREACH(ep->pendingQ.list,item)
if ( ep->pendingQ.list != 0 )
{ {
DL_FOREACH(ep->pendingQ.list,item) if ( item->type == 0xffffffff )
continue;
stritem = (struct stritem *)item;
if ( item->type == idnum )
{ {
if ( item->type == 0xffffffff ) //printf("matched idnum.%d\n",idnum);
continue; item->type = 0xffffffff;
stritem = (struct stritem *)item; if ( stritem->retptrp != 0 )
if ( item->type == idnum )
{ {
//printf("matched idnum.%d\n",idnum); *((cJSON **)stritem->retptrp) = strjson;
item->type = 0xffffffff; strjson = 0;
if ( stritem->retptrp != 0 )
{
*((cJSON **)stritem->retptrp) = strjson;
strjson = 0;
}
//DL_DELETE(ep->pendingQ.list,item);
break;
} }
if ( stritem->expiration < ep->lasttime ) //DL_DELETE(ep->pendingQ.list,item);
break;
}
if ( stritem->expiration < ep->lasttime )
{
printf("expired (%s)\n",stritem->str);
item->type = 0xffffffff;
//DL_DELETE(ep->pendingQ.list,item);
if ( stritem->retptrp != 0 )
{ {
printf("expired (%s)\n",stritem->str); errjson = cJSON_CreateObject();
item->type = 0xffffffff; jaddnum(errjson,"id",item->type);
//DL_DELETE(ep->pendingQ.list,item); jaddstr(errjson,"error","timeout");
if ( stritem->retptrp != 0 ) *((cJSON **)stritem->retptrp) = errjson;
{ };
errjson = cJSON_CreateObject();
jaddnum(errjson,"id",item->type);
jaddstr(errjson,"error","timeout");
*((cJSON **)stritem->retptrp) = errjson;
};
}
} }
} }
portable_mutex_unlock(&ep->pendingQ.mutex);
} }
portable_mutex_unlock(&ep->pendingQ.mutex);
if ( strjson != 0 ) if ( strjson != 0 )
free_json(strjson); free_json(strjson);
} }
@ -614,21 +618,22 @@ void LP_dedicatedloop(void *arg)
if ( sitem->expiration != 0 ) if ( sitem->expiration != 0 )
sitem->expiration += (uint32_t)time(NULL); sitem->expiration += (uint32_t)time(NULL);
else sitem->expiration = (uint32_t)time(NULL) + ELECTRUM_TIMEOUT; else sitem->expiration = (uint32_t)time(NULL) + ELECTRUM_TIMEOUT;
//printf("SEND.(%s) to %s:%u\n",sitem->str,ep->ipaddr,ep->port);
//queue_enqueue("pendingQ",&ep->pendingQ,(struct queueitem *)sitem);
portable_mutex_lock(&ep->pendingQ.mutex); portable_mutex_lock(&ep->pendingQ.mutex);
if ( ep->pendingQ.list != 0 ) if ( ep->pendingQ.list != 0 )
{ {
printf("list %p\n",ep->pendingQ.list);
DL_FOREACH(ep->pendingQ.list,item) DL_FOREACH(ep->pendingQ.list,item)
{ {
printf("item.%p\n",item);
if ( item->type == 0xffffffff ) if ( item->type == 0xffffffff )
{ {
printf("purge %s",((struct stritem *)item)->str); printf("%p purge %s",item,((struct stritem *)item)->str);
DL_DELETE(ep->pendingQ.list,item); DL_DELETE(ep->pendingQ.list,item);
free(item); free(item);
} }
} }
} }
printf("%p SEND.(%s) to %s:%u\n",sitem,sitem->str,ep->ipaddr,ep->port);
DL_APPEND(ep->pendingQ.list,&sitem->DL); DL_APPEND(ep->pendingQ.list,&sitem->DL);
portable_mutex_unlock(&ep->pendingQ.mutex); portable_mutex_unlock(&ep->pendingQ.mutex);
flag++; flag++;

Loading…
Cancel
Save