Browse Source

Merge pull request #436 from jl777/spvdex

optimize CPU and bandwidth usage
etomic
jl777 7 years ago
committed by GitHub
parent
commit
94d0aa4970
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      iguana/exchanges/LP_bitcoin.c
  2. 8
      iguana/exchanges/LP_commands.c
  3. 13
      iguana/exchanges/LP_nativeDEX.c
  4. 79
      iguana/exchanges/LP_network.c
  5. 12
      iguana/exchanges/stats.c

12
iguana/exchanges/LP_bitcoin.c

@ -1959,6 +1959,18 @@ int32_t bitcoin_timelockspend(uint8_t *script,int32_t n,uint8_t rmd160[20],uint3
return(n);
}
int32_t bitcoin_performancebond(uint8_t p2sh_rmd160[20],uint8_t *script,int32_t n,uint32_t unlocktimestamp,uint8_t cltv_rmd160[20],uint8_t anytime_rmd160[20])
{
script[n++] = SCRIPT_OP_IF;
n = bitcoin_checklocktimeverify(script,n,unlocktimestamp);
n = bitcoin_standardspend(script,n,cltv_rmd160);
script[n++] = SCRIPT_OP_ELSE;
n = bitcoin_standardspend(script,n,anytime_rmd160);
script[n++] = SCRIPT_OP_ENDIF;
calc_rmd160_sha256(p2sh_rmd160,script,n);
return(n);
}
int32_t bitcoin_MofNspendscript(uint8_t p2sh_rmd160[20],uint8_t *script,int32_t n,const struct vin_info *vp)
{
int32_t i,plen;

8
iguana/exchanges/LP_commands.c

@ -111,8 +111,8 @@ getrawtransaction(coin, txid)\n\
inventory(coin)\n\
bestfit(rel, relvolume)\n\
lastnonce()\n\
buy(base, rel, price, relvolume, timeout=10, duration=3600, nonce, pubkey="")\n\
sell(base, rel, price, basevolume, timeout=10, duration=3600, nonce, pubkey="")\n\
buy(base, rel, price, relvolume, timeout=10, duration=3600, nonce, destpubkey="")\n\
sell(base, rel, price, basevolume, timeout=10, duration=3600, nonce, destpubkey="")\n\
withdraw(coin, outputs[])\n\
sendrawtransaction(coin, signedtx)\n\
swapstatus()\n\
@ -306,7 +306,7 @@ bot_resume(botid)\n\
//*
if ( price > SMALLVAL )
{
return(LP_autobuy(ctx,myipaddr,pubsock,base,rel,price,jdouble(argjson,"relvolume"),jint(argjson,"timeout"),jint(argjson,"duration"),jstr(argjson,"gui"),juint(argjson,"nonce"),jbits256(argjson,"pubkey"),0));
return(LP_autobuy(ctx,myipaddr,pubsock,base,rel,price,jdouble(argjson,"relvolume"),jint(argjson,"timeout"),jint(argjson,"duration"),jstr(argjson,"gui"),juint(argjson,"nonce"),jbits256(argjson,"destpubkey"),0));
} else return(clonestr("{\"error\":\"no price set\"}"));
}
else if ( strcmp(method,"sell") == 0 )
@ -314,7 +314,7 @@ bot_resume(botid)\n\
//*
if ( price > SMALLVAL )
{
return(LP_autobuy(ctx,myipaddr,pubsock,rel,base,1./price,jdouble(argjson,"basevolume"),jint(argjson,"timeout"),jint(argjson,"duration"),jstr(argjson,"gui"),juint(argjson,"nonce"),jbits256(argjson,"pubkey"),0));
return(LP_autobuy(ctx,myipaddr,pubsock,rel,base,1./price,jdouble(argjson,"basevolume"),jint(argjson,"timeout"),jint(argjson,"duration"),jstr(argjson,"gui"),juint(argjson,"nonce"),jbits256(argjson,"destpubkey"),0));
} else return(clonestr("{\"error\":\"no price set\"}"));
}
}

13
iguana/exchanges/LP_nativeDEX.c

@ -29,7 +29,7 @@ struct LP_millistats
double lastmilli,millisum,threshold;
uint32_t count;
char name[64];
} LP_psockloop_stats,LP_reserved_msgs_stats,utxosQ_loop_stats,command_rpcloop_stats,queue_loop_stats,prices_loop_stats,LP_coinsloop_stats,LP_coinsloopBTC_stats,LP_coinsloopKMD_stats,LP_pubkeysloop_stats,LP_privkeysloop_stats,LP_swapsloop_stats;
} LP_psockloop_stats,LP_reserved_msgs_stats,utxosQ_loop_stats,command_rpcloop_stats,queue_loop_stats,prices_loop_stats,LP_coinsloop_stats,LP_coinsloopBTC_stats,LP_coinsloopKMD_stats,LP_pubkeysloop_stats,LP_privkeysloop_stats,LP_swapsloop_stats,LP_gcloop_stats;
extern int32_t IAMLP;
void LP_millistats_update(struct LP_millistats *mp)
@ -52,6 +52,7 @@ void LP_millistats_update(struct LP_millistats *mp)
mp = &LP_pubkeysloop_stats, printf("%32s lag %10.2f millis, threshold %10.2f, ave %10.2f millis, count.%u\n",mp->name,OS_milliseconds() - mp->lastmilli,mp->threshold,mp->millisum/(mp->count > 0 ? mp->count: 1),mp->count);
mp = &LP_privkeysloop_stats, printf("%32s lag %10.2f millis, threshold %10.2f, ave %10.2f millis, count.%u\n",mp->name,OS_milliseconds() - mp->lastmilli,mp->threshold,mp->millisum/(mp->count > 0 ? mp->count: 1),mp->count);
mp = &LP_swapsloop_stats, printf("%32s lag %10.2f millis, threshold %10.2f, ave %10.2f millis, count.%u\n",mp->name,OS_milliseconds() - mp->lastmilli,mp->threshold,mp->millisum/(mp->count > 0 ? mp->count: 1),mp->count);
mp = &LP_gcloop_stats, printf("%32s lag %10.2f millis, threshold %10.2f, ave %10.2f millis, count.%u\n",mp->name,OS_milliseconds() - mp->lastmilli,mp->threshold,mp->millisum/(mp->count > 0 ? mp->count: 1),mp->count);
}
else
{
@ -65,7 +66,7 @@ void LP_millistats_update(struct LP_millistats *mp)
mp->millisum += elapsed;
if ( mp->threshold != 0. && elapsed > mp->threshold )
{
if ( IAMLP == 0 )
//if ( IAMLP == 0 )
printf("%32s elapsed %10.2f millis > threshold %10.2f, ave %10.2f millis, count.%u\n",mp->name,elapsed,mp->threshold,mp->millisum/mp->count,mp->count);
}
mp->lastmilli = millis;
@ -74,7 +75,7 @@ void LP_millistats_update(struct LP_millistats *mp)
}
#include "LP_include.h"
portable_mutex_t LP_peermutex,LP_UTXOmutex,LP_utxomutex,LP_commandmutex,LP_cachemutex,LP_swaplistmutex,LP_forwardmutex,LP_pubkeymutex,LP_networkmutex,LP_psockmutex,LP_coinmutex,LP_messagemutex,LP_portfoliomutex,LP_electrummutex,LP_butxomutex,LP_reservedmutex,LP_nanorecvsmutex,LP_tradebotsmutex;
portable_mutex_t LP_peermutex,LP_UTXOmutex,LP_utxomutex,LP_commandmutex,LP_cachemutex,LP_swaplistmutex,LP_forwardmutex,LP_pubkeymutex,LP_networkmutex,LP_psockmutex,LP_coinmutex,LP_messagemutex,LP_portfoliomutex,LP_electrummutex,LP_butxomutex,LP_reservedmutex,LP_nanorecvsmutex,LP_tradebotsmutex,LP_gcmutex;
int32_t LP_canbind;
char *Broadcaststr,*Reserved_msgs[2][1000];
int32_t num_Reserved_msgs[2],max_Reserved_msgs[2];
@ -994,6 +995,7 @@ void LPinit(uint16_t myport,uint16_t mypullport,uint16_t mypubport,uint16_t mybu
portable_mutex_init(&LP_swaplistmutex);
portable_mutex_init(&LP_cachemutex);
portable_mutex_init(&LP_networkmutex);
portable_mutex_init(&LP_gcmutex);
portable_mutex_init(&LP_forwardmutex);
portable_mutex_init(&LP_psockmutex);
portable_mutex_init(&LP_coinmutex);
@ -1112,6 +1114,11 @@ void LPinit(uint16_t myport,uint16_t mypullport,uint16_t mypubport,uint16_t mybu
printf("error launching queue_loop for port.%u\n",myport);
exit(-1);
}
if ( OS_thread_create(malloc(sizeof(pthread_t)),NULL,(void *)gc_loop,(void *)myipaddr) != 0 )
{
printf("error launching gc_loop for port.%u\n",myport);
exit(-1);
}
if ( OS_thread_create(malloc(sizeof(pthread_t)),NULL,(void *)prices_loop,ctx) != 0 )
{
printf("error launching prices_loop for port.%u\n",myport);

79
iguana/exchanges/LP_network.c

@ -140,7 +140,7 @@ struct LP_queue
{
struct LP_queue *next,*prev;
int32_t sock,peerind,msglen;
uint32_t starttime,crc32;
uint32_t starttime,crc32,notready;
uint8_t msg[];
} *LP_Q;
int32_t LP_Qenqueued,LP_Qerrors,LP_Qfound;
@ -265,44 +265,66 @@ int32_t LP_peerindsock(int32_t *peerindp)
return(-1);
}
void queue_loop(void *arg)
void gc_loop(void *arg)
{
struct rpcrequest_info *req,*rtmp; struct LP_queue *ptr,*tmp; int32_t sentbytes,nonz,flag,duplicate,n=0;
strcpy(queue_loop_stats.name,"queue_loop");
queue_loop_stats.threshold = 500.;
struct rpcrequest_info *req,*rtmp; int32_t flag = 0;
strcpy(LP_gcloop_stats.name,"gc_loop");
LP_gcloop_stats.threshold = 1100.;
while ( 1 )
{
LP_millistats_update(&queue_loop_stats);
portable_mutex_lock(&LP_networkmutex);
flag = 0;
LP_millistats_update(&LP_gcloop_stats);
portable_mutex_lock(&LP_gcmutex);
DL_FOREACH_SAFE(LP_garbage_collector,req,rtmp)
{
DL_DELETE(LP_garbage_collector,req);
//printf("garbage collect ipbits.%x\n",req->ipbits);
free(req);
flag++;
}
portable_mutex_unlock(&LP_networkmutex);
portable_mutex_unlock(&LP_gcmutex);
if ( 0 && flag != 0 )
printf("gc_loop.%d\n",flag);
sleep(1);
}
}
nonz = 0;
void queue_loop(void *arg)
{
struct LP_queue *ptr,*tmp; int32_t sentbytes,nonz,flag,duplicate,n=0;
strcpy(queue_loop_stats.name,"queue_loop");
queue_loop_stats.threshold = 100.;
while ( 1 )
{
LP_millistats_update(&queue_loop_stats);
//printf("LP_Q.%p next.%p prev.%p\n",LP_Q,LP_Q!=0?LP_Q->next:0,LP_Q!=0?LP_Q->prev:0);
n = 0;
n = nonz = flag = 0;
DL_FOREACH_SAFE(LP_Q,ptr,tmp)
{
n++;
flag = 0;
if ( ptr->sock >= 0 )
{
if ( LP_sockcheck(ptr->sock) > 0 )
if ( ptr->notready == 0 || (rand() % ptr->notready) == 0 )
{
bits256 magic;
magic = LP_calc_magic(ptr->msg,(int32_t)(ptr->msglen - sizeof(bits256)));
memcpy(&ptr->msg[ptr->msglen - sizeof(bits256)],&magic,sizeof(magic));
if ( (sentbytes= nn_send(ptr->sock,ptr->msg,ptr->msglen,0)) != ptr->msglen )
printf("%d LP_send sent %d instead of %d\n",n,sentbytes,ptr->msglen);
ptr->sock = -1;
if ( ptr->peerind > 0 )
ptr->starttime = (uint32_t)time(NULL);
else flag = 1;
} //else printf("sock not ready to send.%d\n",ptr->msglen);
if ( LP_sockcheck(ptr->sock) > 0 )
{
bits256 magic;
magic = LP_calc_magic(ptr->msg,(int32_t)(ptr->msglen - sizeof(bits256)));
memcpy(&ptr->msg[ptr->msglen - sizeof(bits256)],&magic,sizeof(magic));
if ( (sentbytes= nn_send(ptr->sock,ptr->msg,ptr->msglen,0)) != ptr->msglen )
printf("%d LP_send sent %d instead of %d\n",n,sentbytes,ptr->msglen);
else flag++;
ptr->sock = -1;
if ( ptr->peerind > 0 )
ptr->starttime = (uint32_t)time(NULL);
}
else
{
if ( ptr->notready++ > 1000 )
flag = 1;
}
}
}
else if ( 0 && time(NULL) > ptr->starttime+13 )
{
@ -312,7 +334,7 @@ void queue_loop(void *arg)
LP_Qfound++;
if ( (LP_Qfound % 100) == 0 )
printf("found.%u Q.%d err.%d match.%d\n",ptr->crc32,LP_Qenqueued,LP_Qerrors,LP_Qfound);
flag = 1;
flag++;
}
else if ( 0 ) // too much beyond duplicate filter when network is busy
{
@ -321,7 +343,7 @@ void queue_loop(void *arg)
if ( (ptr->sock= LP_peerindsock(&ptr->peerind)) < 0 )
{
printf("%d no more peers to try at peerind.%d %p Q_LP.%p\n",n,ptr->peerind,ptr,LP_Q);
flag = 1;
flag++;
LP_Qerrors++;
}
}
@ -338,13 +360,12 @@ void queue_loop(void *arg)
}
if ( arg == 0 )
break;
//if ( n != 0 )
// printf("LP_Q.[%d]\n",n);
if ( nonz == 0 )
usleep(25000);
else if ( IAMLP == 0 )
usleep(10000);
else usleep(1000);
{
if ( IAMLP == 0 )
usleep(50000);
else usleep(10000);
}
}
}

12
iguana/exchanges/stats.c

@ -589,7 +589,7 @@ int32_t iguana_getheadersize(char *buf,int32_t recvlen)
}
uint16_t RPC_port;
extern portable_mutex_t LP_commandmutex,LP_networkmutex;
extern portable_mutex_t LP_commandmutex,LP_gcmutex;
extern struct rpcrequest_info *LP_garbage_collector;
void LP_rpc_processreq(void *_ptr)
@ -608,9 +608,7 @@ void LP_rpc_processreq(void *_ptr)
jsonbuf = calloc(1,size);
remains = size-1;
buf = jsonbuf;
portable_mutex_lock(&LP_networkmutex);
spawned++;
portable_mutex_unlock(&LP_networkmutex);
if ( spawned > maxspawned )
{
printf("max rpc threads spawned and alive %d <- %d\n",maxspawned,spawned);
@ -735,10 +733,10 @@ void LP_rpc_processreq(void *_ptr)
free(space);
free(jsonbuf);
closesocket(sock);
portable_mutex_lock(&LP_networkmutex);
portable_mutex_lock(&LP_gcmutex);
DL_APPEND(LP_garbage_collector,req);
spawned--;
portable_mutex_unlock(&LP_networkmutex);
portable_mutex_unlock(&LP_gcmutex);
}
extern int32_t IAMLP;
@ -779,13 +777,13 @@ continue;
printf("error launching rpc handler on port %d, retval.%d\n",port,retval);
close(bindsock);
bindsock = -1;
portable_mutex_lock(&LP_networkmutex);
portable_mutex_lock(&LP_gcmutex);
DL_FOREACH_SAFE(LP_garbage_collector,req2,rtmp)
{
DL_DELETE(LP_garbage_collector,req2);
free(req2);
}
portable_mutex_unlock(&LP_networkmutex);
portable_mutex_unlock(&LP_gcmutex);
if ( (retval= OS_thread_create(&req->T,NULL,(void *)LP_rpc_processreq,req)) != 0 )
{
printf("error2 launching rpc handler on port %d, retval.%d\n",port,retval);

Loading…
Cancel
Save