Browse Source

Merge pull request #430 from jl777/spvdex

bug fixes: fixes connection reset, also should fix the non-responsive state LP nodes get into after many hours
etomic
jl777 7 years ago
committed by GitHub
parent
commit
0ef82574c3
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      crypto777/OS_portable.h
  2. 1
      iguana/exchanges/LP_nativeDEX.c
  3. 11
      iguana/exchanges/LP_network.c
  4. 8
      iguana/exchanges/LP_portfolio.c
  5. 52
      iguana/exchanges/stats.c

8
crypto777/OS_portable.h

@ -143,6 +143,14 @@ typedef struct queue
char name[64],initflag;
} queue_t;
struct rpcrequest_info
{
struct rpcrequest_info *next,*prev;
pthread_t T;
int32_t sock;
uint32_t ipbits;
};
struct OS_mappedptr
{
char fname[512];

1
iguana/exchanges/LP_nativeDEX.c

@ -81,6 +81,7 @@ struct LP_peerinfo *LP_peerinfos,*LP_mypeer;
struct LP_forwardinfo *LP_forwardinfos;
struct iguana_info *LP_coins;
struct LP_pubkeyinfo *LP_pubkeyinfos;
struct rpcrequest_info *LP_garbage_collector;
#include "LP_network.c"

11
iguana/exchanges/LP_network.c

@ -267,12 +267,21 @@ int32_t LP_peerindsock(int32_t *peerindp)
void queue_loop(void *arg)
{
struct LP_queue *ptr,*tmp; int32_t sentbytes,nonz,flag,duplicate,n=0;
struct rpcrequest_info *req,*rtmp; struct LP_queue *ptr,*tmp; int32_t sentbytes,nonz,flag,duplicate,n=0;
strcpy(queue_loop_stats.name,"queue_loop");
queue_loop_stats.threshold = 500.;
while ( 1 )
{
LP_millistats_update(&queue_loop_stats);
portable_mutex_lock(&LP_networkmutex);
DL_FOREACH_SAFE(LP_garbage_collector,req,rtmp)
{
DL_DELETE(LP_garbage_collector,req);
//printf("garbage collect ipbits.%x\n",req->ipbits);
free(req);
}
portable_mutex_unlock(&LP_networkmutex);
nonz = 0;
//printf("LP_Q.%p next.%p prev.%p\n",LP_Q,LP_Q!=0?LP_Q->next:0,LP_Q!=0?LP_Q->prev:0);
n = 0;

8
iguana/exchanges/LP_portfolio.c

@ -293,8 +293,6 @@ void LP_autopriceset(void *ctx,int32_t dir,struct LP_priceinfo *basepp,struct LP
if ( dir > 0 )
newprice = (1. / price) * (1. + margin);
else newprice = (price * (1. + margin));
//newprice = 1. / (price * (1. - margin));
if ( (minprice= basepp->minprices[relpp->ind]) == 0. || price >= minprice )
{
LP_mypriceset(&changed,relpp->symbol,basepp->symbol,newprice);
@ -468,7 +466,10 @@ void LP_autoprice_iter(void *ctx,struct LP_priceinfo *btcpp)
basepp = LP_priceinfofind(LP_autorefs[i].base);
relpp = LP_priceinfofind(LP_autorefs[i].rel);
if ( basepp != 0 && relpp != 0 )
LP_autopriceset(ctx,1,basepp,relpp,0,LP_autorefs[i].refbase,LP_autorefs[i].refrel);
{
//printf("check ref-autoprice %s/%s\n",LP_autorefs[i].refbase,LP_autorefs[i].refrel);
LP_autopriceset(ctx,1,basepp,relpp,0.,LP_autorefs[i].refbase,LP_autorefs[i].refrel);
}
}
}
@ -597,6 +598,7 @@ void prices_loop(void *ctx)
LP_tradebots_timeslice(ctx);
if ( (btcpp= LP_priceinfofind("BTC")) == 0 )
{
printf("prices_loop BTC not in LP_priceinfofind\n");
sleep(60);
continue;
}

52
iguana/exchanges/stats.c

@ -589,24 +589,33 @@ int32_t iguana_getheadersize(char *buf,int32_t recvlen)
}
uint16_t RPC_port;
extern portable_mutex_t LP_commandmutex;
extern portable_mutex_t LP_commandmutex,LP_networkmutex;
extern struct rpcrequest_info *LP_garbage_collector;
void LP_rpc_processreq(void *_ptr)
{
uint64_t arg64 = *(uint64_t *)_ptr;
static uint32_t spawned,maxspawned;
char filetype[128],content_type[128];
int32_t recvlen,flag,postflag=0,contentlen,remains,sock,numsent,jsonflag=0,hdrsize,len;
char helpname[512],remoteaddr[64],*buf,*retstr,*space,*jsonbuf;
char helpname[512],remoteaddr[64],*buf,*retstr,*space,*jsonbuf; struct rpcrequest_info *req = _ptr;
uint32_t ipbits,i,size = 32*IGUANA_MAXPACKETSIZE + 512;
ipbits = (arg64 >> 32);
ipbits = req->ipbits;;
expand_ipbits(remoteaddr,ipbits);
sock = (arg64 & 0xffffffff);
sock = req->sock;
recvlen = flag = 0;
retstr = 0;
space = calloc(1,size);
jsonbuf = calloc(1,size);
remains = size-1;
buf = jsonbuf;
portable_mutex_lock(&LP_networkmutex);
spawned++;
portable_mutex_unlock(&LP_networkmutex);
if ( spawned > maxspawned )
{
printf("max rpc threads spawned and alive %d <- %d\n",maxspawned,spawned);
maxspawned = spawned;
}
while ( remains > 0 )
{
//printf("flag.%d remains.%d recvlen.%d\n",flag,remains,recvlen);
@ -651,8 +660,7 @@ void LP_rpc_processreq(void *_ptr)
else
{
usleep(10000);
//printf("got.(%s) %d remains.%d of total.%d\n",jsonbuf,recvlen,remains,len);
//retstr = iguana_rpcparse(space,size,&postflag,jsonbuf);
printf("got.(%s) %d remains.%d of total.%d\n",jsonbuf,recvlen,remains,len);
if ( flag == 0 )
break;
}
@ -664,8 +672,6 @@ void LP_rpc_processreq(void *_ptr)
jsonflag = postflag = 0;
portable_mutex_lock(&LP_commandmutex);
retstr = stats_rpcparse(space,size,&jsonflag,&postflag,jsonbuf,remoteaddr,filetype,RPC_port);
//if ( strcmp("5.9.253.195",remoteaddr) == 0 )
// printf("RPC.(%s)%s\n",jsonbuf,retstr);
portable_mutex_unlock(&LP_commandmutex);
if ( filetype[0] != 0 )
{
@ -728,13 +734,18 @@ void LP_rpc_processreq(void *_ptr)
}
free(space);
free(jsonbuf);
closesocket(sock);
portable_mutex_lock(&LP_networkmutex);
DL_APPEND(LP_garbage_collector,req);
spawned--;
portable_mutex_unlock(&LP_networkmutex);
}
extern int32_t IAMLP;
void stats_rpcloop(void *args)
{
static uint32_t counter;
uint16_t port; int32_t sock,bindsock=-1; socklen_t clilen; struct sockaddr_in cli_addr; uint32_t ipbits,localhostbits; uint64_t arg64; void *arg64ptr;
uint16_t port; int32_t sock,bindsock=-1; socklen_t clilen; struct sockaddr_in cli_addr; uint32_t ipbits,localhostbits; struct rpcrequest_info *req;
if ( (port= *(uint16_t *)args) == 0 )
port = 7779;
RPC_port = port;
@ -766,25 +777,24 @@ void stats_rpcloop(void *args)
continue;
}
memcpy(&ipbits,&cli_addr.sin_addr.s_addr,sizeof(ipbits));
arg64 = ((uint64_t)ipbits << 32) | (sock & 0xffffffff);
arg64ptr = malloc(sizeof(arg64));
memcpy(arg64ptr,&arg64,sizeof(arg64));
if ( 1 )
req = calloc(1,sizeof(*req));
req->sock = sock;
req->ipbits = ipbits;
if ( 0 )
{
LP_rpc_processreq((void *)&arg64);
free(arg64ptr);
closesocket(sock);
//LP_rpc_processreq((void *)&arg64);
//free(arg64ptr);
//closesocket(sock);
}
else if ( OS_thread_create(malloc(sizeof(pthread_t)),NULL,(void *)LP_rpc_processreq,arg64ptr) != 0 )
else if ( OS_thread_create(&req->T,NULL,(void *)LP_rpc_processreq,req) != 0 )
{
printf("error launching rpc handler on port %d\n",port);
// yes, small leak per command
}
if ( 0 && IAMLP != 0 && ipbits != localhostbits )
/*if ( 0 && IAMLP != 0 && ipbits != localhostbits )
{
close(bindsock);
bindsock = iguana_socket(1,"0.0.0.0",port);
} //else printf("skip close and rebind\n");
} //else printf("skip close and rebind\n");*/
}
}

Loading…
Cancel
Save