Browse Source

Merge pull request #363 from jl777/spvdex

Spvdex
etomic
jl777 7 years ago
committed by GitHub
parent
commit
baa4340461
  1. 4
      iguana/exchanges/LP_prices.c
  2. 32
      iguana/exchanges/LP_rpc.c
  3. 93
      iguana/exchanges/LP_socket.c

4
iguana/exchanges/LP_prices.c

@ -109,9 +109,9 @@ void LP_address_monitor(struct LP_pubkeyinfo *pubp)
portable_mutex_unlock(&coin->txmutex); portable_mutex_unlock(&coin->txmutex);
if ( coin->electrum != 0 ) if ( coin->electrum != 0 )
{ {
if ( (retjson= electrum_address_subscribe(coin->symbol,coin->electrum,0,coinaddr)) != 0 ) if ( (retjson= electrum_address_subscribe(coin->symbol,coin->electrum,&retjson,coinaddr)) != 0 )
{ {
printf("%s MONITOR.(%s)\n",coin->symbol,coinaddr); printf("%s MONITOR.(%s) -> %p\n",coin->symbol,coinaddr,retjson);
free_json(retjson); free_json(retjson);
} }
} }

32
iguana/exchanges/LP_rpc.c

@ -105,24 +105,21 @@ char *issue_LP_getprices(char *destip,uint16_t destport)
char *LP_apicall(struct iguana_info *coin,char *method,char *params) char *LP_apicall(struct iguana_info *coin,char *method,char *params)
{ {
cJSON *retjson,*resultjson; cJSON *retjson; char *retstr;
if ( coin->electrum != 0 ) if ( coin->electrum != 0 )
{ {
if ( (retjson= electrum_submit(coin->symbol,coin->electrum,0,method,params,LP_HTTP_TIMEOUT)) != 0 ) if ( (retjson= electrum_submit(coin->symbol,coin->electrum,&retjson,method,params,LP_HTTP_TIMEOUT)) != 0 )
{ {
if ( (resultjson= jobj(retjson,"result")) != 0 ) retstr = jprint(retjson,0);
{ //printf("got.%p (%s)\n",retjson,retstr);
resultjson = jduplicate(resultjson); return(retstr);
free_json(retjson);
return(jprint(resultjson,1));
} else return(jprint(retjson,1));
} return(clonestr("{\"error\":\"electrum no response\"}")); } return(clonestr("{\"error\":\"electrum no response\"}"));
} else return(bitcoind_passthru(coin->symbol,coin->serverport,coin->userpass,method,params)); } else return(bitcoind_passthru(coin->symbol,coin->serverport,coin->userpass,method,params));
} }
cJSON *bitcoin_json(struct iguana_info *coin,char *method,char *params) cJSON *bitcoin_json(struct iguana_info *coin,char *method,char *params)
{ {
cJSON *resultjson,*retjson = 0; char *retstr; cJSON *retjson = 0; char *retstr;
// "getinfo", "getrawmempool", "paxprice", "gettxout", "getrawtransaction", "getblock", "listunspent", "listtransactions", "validateaddress", "importprivkey" // "getinfo", "getrawmempool", "paxprice", "gettxout", "getrawtransaction", "getblock", "listunspent", "listtransactions", "validateaddress", "importprivkey"
// bitcoind_passthru callers: "importaddress", "estimatefee", "getblockhash", "sendrawtransaction", "signrawtransaction" // bitcoind_passthru callers: "importaddress", "estimatefee", "getblockhash", "sendrawtransaction", "signrawtransaction"
if ( coin != 0 ) if ( coin != 0 )
@ -142,13 +139,15 @@ cJSON *bitcoin_json(struct iguana_info *coin,char *method,char *params)
} }
else else
{ {
retjson = electrum_submit(coin->symbol,coin->electrum,0,method,params,LP_HTTP_TIMEOUT); if ( (retjson= electrum_submit(coin->symbol,coin->electrum,&retjson,method,params,LP_HTTP_TIMEOUT)) != 0 )
{
//printf("electrum %s.%s -> (%s)\n",method,params,jprint(retjson,0)); //printf("electrum %s.%s -> (%s)\n",method,params,jprint(retjson,0));
if ( (resultjson= jobj(retjson,"result")) != 0 ) /*if ( (resultjson= jobj(retjson,"result")) != 0 )
{ {
resultjson = jduplicate(resultjson); resultjson = jduplicate(resultjson);
free_json(retjson); free_json(retjson);
retjson = resultjson; retjson = resultjson;
}*/
} }
} }
} else retjson = cJSON_Parse("{\"result\":\"disabled\"}"); } else retjson = cJSON_Parse("{\"result\":\"disabled\"}");
@ -271,7 +270,7 @@ cJSON *LP_gettx(char *symbol,bits256 txid)
cJSON *LP_gettxout(char *symbol,bits256 txid,int32_t vout) cJSON *LP_gettxout(char *symbol,bits256 txid,int32_t vout)
{ {
char buf[128],str[65],coinaddr[64],*hexstr; uint64_t value; uint8_t *serialized; cJSON *sobj,*addresses,*item,*array,*hexobj,*listjson,*retjson=0; int32_t i,n,v,len; bits256 t; struct iguana_info *coin; char buf[128],str[65],coinaddr[64],*hexstr; uint64_t value; uint8_t *serialized; cJSON *sobj,*addresses,*item,*array,*hexobj,*retjson=0; int32_t i,n,v,len; bits256 t; struct iguana_info *coin;
coin = LP_coinfind(symbol); coin = LP_coinfind(symbol);
if ( coin == 0 ) if ( coin == 0 )
return(cJSON_Parse("{\"error\":\"no coin\"}")); return(cJSON_Parse("{\"error\":\"no coin\"}"));
@ -295,9 +294,10 @@ cJSON *LP_gettxout(char *symbol,bits256 txid,int32_t vout)
decode_hex(serialized,len,hexstr+1); decode_hex(serialized,len,hexstr+1);
LP_swap_coinaddr(coin,coinaddr,&value,serialized,len,vout); LP_swap_coinaddr(coin,coinaddr,&value,serialized,len,vout);
//printf("HEX.(%s) len.%d %s %.8f\n",hexstr+1,len,coinaddr,dstr(value)); //printf("HEX.(%s) len.%d %s %.8f\n",hexstr+1,len,coinaddr,dstr(value));
if ( (listjson= electrum_address_listunspent(coin->symbol,0,0,coinaddr)) != 0 ) if ( (array= electrum_address_listunspent(coin->symbol,0,&array,coinaddr)) != 0 )
{ {
if ( (array= jarray(&n,listjson,"result")) != 0 ) //printf("array.(%s)\n",jprint(array,0));
if ( array != 0 && (n= cJSON_GetArraySize(array)) > 0 )
{ {
for (i=0; i<n; i++) for (i=0; i<n; i++)
{ {
@ -340,7 +340,7 @@ cJSON *LP_gettxout(char *symbol,bits256 txid,int32_t vout)
} }
} }
} }
free_json(listjson); free_json(array);
} }
} }
return(retjson); return(retjson);
@ -493,7 +493,7 @@ int32_t LP_importaddress(char *symbol,char *address)
return(-2); return(-2);
if ( coin->electrum != 0 ) if ( coin->electrum != 0 )
{ {
if ( (retjson= electrum_address_subscribe(symbol,0,0,address)) != 0 ) if ( (retjson= electrum_address_subscribe(symbol,0,&retjson,address)) != 0 )
{ {
printf("importaddress.(%s) -> %s\n",address,jprint(retjson,0)); printf("importaddress.(%s) -> %s\n",address,jprint(retjson,0));
free_json(retjson); free_json(retjson);

93
iguana/exchanges/LP_socket.c

@ -26,7 +26,7 @@
#include <WinSock2.h> #include <WinSock2.h>
#endif #endif
#define ELECTRUM_TIMEOUT 5 #define ELECTRUM_TIMEOUT 3
int32_t LP_socket(int32_t bindflag,char *hostname,uint16_t port) int32_t LP_socket(int32_t bindflag,char *hostname,uint16_t port)
{ {
@ -253,7 +253,7 @@ int32_t Num_electrums;
// purge timedout // purge timedout
/* /*
if ( (retjson= electrum_address_listunspent(symbol,ep,0,addr)) != 0 ) if ( (retjson= electrum_address_listunspent(symbol,ep,&retjson,addr)) != 0 )
you can call it like the above, where symbol is the coin, ep is the electrum server info pointer, the 0 is a callback ptr where 0 means to block till it is done you can call it like the above, where symbol is the coin, ep is the electrum server info pointer, the 0 is a callback ptr where 0 means to block till it is done
all the API calls have the same three args all the API calls have the same three args
if the callback ptr is &retjson, then on completion it will put the cJSON *ptr into it, so to spawn a bunch of calls you need to call with symbol,ep,&retjsons[i],... if the callback ptr is &retjson, then on completion it will put the cJSON *ptr into it, so to spawn a bunch of calls you need to call with symbol,ep,&retjsons[i],...
@ -306,52 +306,52 @@ struct electrum_info *electrum_server(char *symbol,struct electrum_info *ep)
cJSON *electrum_submit(char *symbol,struct electrum_info *ep,cJSON **retjsonp,char *method,char *params,int32_t timeout) cJSON *electrum_submit(char *symbol,struct electrum_info *ep,cJSON **retjsonp,char *method,char *params,int32_t timeout)
{ {
// queue id and string and callback // queue id and string and callback
char stratumreq[16384]; uint32_t expiration; struct stritem *sitem; cJSON *retjson = 0; char stratumreq[16384]; uint32_t expiration; struct stritem *sitem;
if ( strcmp(method,"getrawmempool") == 0 )
{
retjson = cJSON_Parse("{\"error\":\"unsupported method\"}");
if ( retjsonp != 0 )
*retjsonp = retjson;
return(retjson);
}
if ( ep == 0 ) if ( ep == 0 )
ep = electrum_server(symbol,0); ep = electrum_server(symbol,0);
if ( ep != 0 ) if ( ep != 0 && retjsonp != 0 )
{
*retjsonp = 0;
if ( strcmp(method,"getrawmempool") == 0 )
{ {
*retjsonp = cJSON_Parse("{\"error\":\"unsupported method\"}");
return(*retjsonp);
}
sprintf(stratumreq,"{ \"jsonrpc\":\"2.0\", \"id\": %u, \"method\":\"%s\", \"params\": %s }\n",ep->stratumid,method,params); sprintf(stratumreq,"{ \"jsonrpc\":\"2.0\", \"id\": %u, \"method\":\"%s\", \"params\": %s }\n",ep->stratumid,method,params);
ep->buf[0] = 0; ep->buf[0] = 0;
sitem = (struct stritem *)queueitem(stratumreq); sitem = (struct stritem *)queueitem(stratumreq);
sitem->expiration = timeout; sitem->expiration = timeout;
sitem->DL.type = ep->stratumid++; sitem->DL.type = ep->stratumid++;
if ( retjsonp != 0 )
sitem->retptrp = (void **)retjsonp; sitem->retptrp = (void **)retjsonp;
else sitem->retptrp = (void **)&retjson;
queue_enqueue("sendQ",&ep->sendQ,&sitem->DL); queue_enqueue("sendQ",&ep->sendQ,&sitem->DL);
if ( sitem->retptrp == (void **)&retjson )
{
expiration = (uint32_t)time(NULL) + timeout + 1; expiration = (uint32_t)time(NULL) + timeout + 1;
while ( retjson == 0 && time(NULL) <= expiration ) while ( *retjsonp == 0 && time(NULL) <= expiration )
usleep(10000); usleep(10000);
if ( retjson == 0 ) if ( *retjsonp == 0 )
{ {
printf("unexpected timeout with null retjson: %s %s\n",method,params); printf("unexpected timeout with null retjson: %s %s\n",method,params);
retjson = cJSON_Parse("{\"error\":\"timeout\"}"); *retjsonp = cJSON_Parse("{\"error\":\"timeout\"}");
}
} }
} else printf("couldnt find electrum server for (%s %s)\n",method,params); return(*retjsonp);
return(retjson); } else printf("couldnt find electrum server for (%s %s) or no retjsonp.%p\n",method,params,retjsonp);
return(0);
} }
cJSON *electrum_noargs(char *symbol,struct electrum_info *ep,cJSON **retjsonp,char *method,int32_t timeout) cJSON *electrum_noargs(char *symbol,struct electrum_info *ep,cJSON **retjsonp,char *method,int32_t timeout)
{ {
cJSON *retjson;
if ( retjsonp == 0 )
retjsonp = &retjson;
return(electrum_submit(symbol,ep,retjsonp,method,"[]",timeout)); return(electrum_submit(symbol,ep,retjsonp,method,"[]",timeout));
} }
cJSON *electrum_strarg(char *symbol,struct electrum_info *ep,cJSON **retjsonp,char *method,char *arg,int32_t timeout) cJSON *electrum_strarg(char *symbol,struct electrum_info *ep,cJSON **retjsonp,char *method,char *arg,int32_t timeout)
{ {
char params[16384]; char params[16384]; cJSON *retjson;
if ( strlen(arg) < sizeof(params) ) if ( strlen(arg) < sizeof(params) )
{ {
if ( retjsonp == 0 )
retjsonp = &retjson;
sprintf(params,"[\"%s\"]",arg); sprintf(params,"[\"%s\"]",arg);
return(electrum_submit(symbol,ep,retjsonp,method,params,timeout)); return(electrum_submit(symbol,ep,retjsonp,method,params,timeout));
} else return(0); } else return(0);
@ -359,14 +359,18 @@ cJSON *electrum_strarg(char *symbol,struct electrum_info *ep,cJSON **retjsonp,ch
cJSON *electrum_intarg(char *symbol,struct electrum_info *ep,cJSON **retjsonp,char *method,int32_t arg,int32_t timeout) cJSON *electrum_intarg(char *symbol,struct electrum_info *ep,cJSON **retjsonp,char *method,int32_t arg,int32_t timeout)
{ {
char params[64]; char params[64]; cJSON *retjson;
if ( retjsonp == 0 )
retjsonp = &retjson;
sprintf(params,"[\"%d\"]",arg); sprintf(params,"[\"%d\"]",arg);
return(electrum_submit(symbol,ep,retjsonp,method,params,timeout)); return(electrum_submit(symbol,ep,retjsonp,method,params,timeout));
} }
cJSON *electrum_hasharg(char *symbol,struct electrum_info *ep,cJSON **retjsonp,char *method,bits256 arg,int32_t timeout) cJSON *electrum_hasharg(char *symbol,struct electrum_info *ep,cJSON **retjsonp,char *method,bits256 arg,int32_t timeout)
{ {
char params[128],str[65]; char params[128],str[65]; cJSON *retjson;
if ( retjsonp == 0 )
retjsonp = &retjson;
sprintf(params,"[\"%s\"]",bits256_str(str,arg)); sprintf(params,"[\"%s\"]",bits256_str(str,arg));
return(electrum_submit(symbol,ep,retjsonp,method,params,timeout)); return(electrum_submit(symbol,ep,retjsonp,method,params,timeout));
} }
@ -519,7 +523,7 @@ int32_t LP_recvfunc(struct electrum_info *ep,char *str,int32_t len)
if ( (strjson= cJSON_Parse(str)) != 0 ) if ( (strjson= cJSON_Parse(str)) != 0 )
{ {
resultjson = jobj(strjson,"result"); resultjson = jobj(strjson,"result");
//printf("result.(%s)\n",jprint(resultjson,0)); //printf("strjson.(%s)\n",jprint(strjson,0));
if ( (method= jstr(strjson,"method")) != 0 ) if ( (method= jstr(strjson,"method")) != 0 )
{ {
if ( strcmp(method,"blockchain.headers.subscribe") == 0 ) if ( strcmp(method,"blockchain.headers.subscribe") == 0 )
@ -542,45 +546,34 @@ int32_t LP_recvfunc(struct electrum_info *ep,char *str,int32_t len)
} }
} }
idnum = juint(strjson,"id"); idnum = juint(strjson,"id");
//if ( 0 ) // crashes cipi's node likely due to mutex across threads
{
portable_mutex_lock(&ep->pendingQ.mutex); portable_mutex_lock(&ep->pendingQ.mutex);
if ( ep->pendingQ.list != 0 ) if ( ep->pendingQ.list != 0 )
{ {
DL_FOREACH(ep->pendingQ.list,item) DL_FOREACH(ep->pendingQ.list,item)
{ {
if ( item->type == 0xffffffff )
continue;
stritem = (struct stritem *)item; stritem = (struct stritem *)item;
if ( item->type == idnum ) if ( item->type == idnum )
{ {
//printf("matched idnum.%d\n",idnum); //printf("matched idnum.%d result.%p\n",idnum,resultjson);
item->type = 0xffffffff; DL_DELETE(ep->pendingQ.list,item);
if ( stritem->retptrp != 0 ) *((cJSON **)stritem->retptrp) = (resultjson != 0 ? jduplicate(resultjson) : strjson);
{ resultjson = strjson = 0;
*((cJSON **)stritem->retptrp) = strjson; free(item);
strjson = 0;
}
//DL_DELETE(ep->pendingQ.list,item);
break; break;
} }
if ( stritem->expiration < ep->lasttime ) if ( stritem->expiration < ep->lasttime )
{ {
DL_DELETE(ep->pendingQ.list,item);
printf("expired (%s)\n",stritem->str); printf("expired (%s)\n",stritem->str);
item->type = 0xffffffff;
//DL_DELETE(ep->pendingQ.list,item);
if ( stritem->retptrp != 0 )
{
errjson = cJSON_CreateObject(); errjson = cJSON_CreateObject();
jaddnum(errjson,"id",item->type); jaddnum(errjson,"id",item->type);
jaddstr(errjson,"error","timeout"); jaddstr(errjson,"error","timeout");
*((cJSON **)stritem->retptrp) = errjson; *((cJSON **)stritem->retptrp) = errjson;
}; free(item);
} }
} }
} }
portable_mutex_unlock(&ep->pendingQ.mutex); portable_mutex_unlock(&ep->pendingQ.mutex);
}
if ( strjson != 0 ) if ( strjson != 0 )
free_json(strjson); free_json(strjson);
} }
@ -589,7 +582,7 @@ int32_t LP_recvfunc(struct electrum_info *ep,char *str,int32_t len)
void LP_dedicatedloop(void *arg) void LP_dedicatedloop(void *arg)
{ {
struct pollfd fds; int32_t i,len,flag,timeout = 10; struct iguana_info *coin; cJSON *retjson; struct stritem *sitem; struct queueitem *item = 0; struct electrum_info *ep = arg; struct pollfd fds; int32_t i,len,flag,timeout = 10; struct iguana_info *coin; cJSON *retjson; struct stritem *sitem; struct electrum_info *ep = arg;
if ( (coin= LP_coinfind(ep->symbol)) != 0 ) if ( (coin= LP_coinfind(ep->symbol)) != 0 )
ep->heightp = &coin->height, ep->heighttimep = &coin->heighttime; ep->heightp = &coin->height, ep->heighttimep = &coin->heighttime;
if ( (retjson= electrum_headers_subscribe(ep->symbol,ep,0)) != 0 ) if ( (retjson= electrum_headers_subscribe(ep->symbol,ep,0)) != 0 )
@ -614,23 +607,25 @@ void LP_dedicatedloop(void *arg)
if ( sitem->expiration != 0 ) if ( sitem->expiration != 0 )
sitem->expiration += (uint32_t)time(NULL); sitem->expiration += (uint32_t)time(NULL);
else sitem->expiration = (uint32_t)time(NULL) + ELECTRUM_TIMEOUT; else sitem->expiration = (uint32_t)time(NULL) + ELECTRUM_TIMEOUT;
//printf("SEND.(%s) to %s:%u\n",sitem->str,ep->ipaddr,ep->port); /*portable_mutex_lock(&ep->pendingQ.mutex);
//queue_enqueue("pendingQ",&ep->pendingQ,(struct queueitem *)sitem);
portable_mutex_lock(&ep->pendingQ.mutex);
if ( ep->pendingQ.list != 0 ) if ( ep->pendingQ.list != 0 )
{ {
printf("list %p\n",ep->pendingQ.list);
DL_FOREACH(ep->pendingQ.list,item) DL_FOREACH(ep->pendingQ.list,item)
{ {
printf("item.%p\n",item);
if ( item->type == 0xffffffff ) if ( item->type == 0xffffffff )
{ {
printf("purge %s",((struct stritem *)item)->str); printf("%p purge %s",item,((struct stritem *)item)->str);
DL_DELETE(ep->pendingQ.list,item); DL_DELETE(ep->pendingQ.list,item);
free(item); free(item);
} }
} }
} }
DL_APPEND(ep->pendingQ.list,&sitem->DL); DL_APPEND(ep->pendingQ.list,&sitem->DL);
portable_mutex_unlock(&ep->pendingQ.mutex); portable_mutex_unlock(&ep->pendingQ.mutex);*/
//printf("%p SEND.(%s) to %s:%u\n",sitem,sitem->str,ep->ipaddr,ep->port);
queue_enqueue("pendingQ",&ep->pendingQ,&sitem->DL);
flag++; flag++;
} }
if ( flag == 0 ) if ( flag == 0 )

Loading…
Cancel
Save