You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

306 lines
12 KiB

8 years ago
/******************************************************************************
* Copyright © 2014-2017 The SuperNET Developers. *
* *
* See the AUTHORS, DEVELOPER-AGREEMENT and LICENSE files at *
* the top-level directory of this distribution for the individual copyright *
* holder information and the developer policies on copyright and licensing. *
* *
* Unless otherwise agreed in a custom licensing agreement, no part of the *
* SuperNET software, including this file may be copied, modified, propagated *
* or distributed except according to the terms contained in the LICENSE file *
* *
* Removal or modification of this copyright notice is prohibited. *
* *
******************************************************************************/
//
// LP_peers.c
// marketmaker
//
struct LP_peerinfo *LP_peerfind(uint32_t ipbits,uint16_t port)
{
struct LP_peerinfo *peer=0; uint64_t ip_port;
ip_port = ((uint64_t)port << 32) | ipbits;
portable_mutex_lock(&LP_peermutex);
HASH_FIND(hh,LP_peerinfos,&ip_port,sizeof(ip_port),peer);
portable_mutex_unlock(&LP_peermutex);
return(peer);
}
cJSON *LP_peerjson(struct LP_peerinfo *peer)
{
cJSON *item = cJSON_CreateObject();
7 years ago
jaddstr(item,"isLP",peer->ipaddr);
7 years ago
jaddnum(item,"remoteport",peer->port-1);
7 years ago
jaddnum(item,"netid",peer->netid);
8 years ago
if ( strcmp(peer->ipaddr,LP_myipaddr) == 0 )
8 years ago
{
7 years ago
jaddnum(item,"session",G.LP_sessionid);
//if ( LP_mypeer != 0 )
// jaddnum(item,"numutxos",LP_mypeer->numutxos);
8 years ago
} else jaddnum(item,"session",peer->sessionid);
8 years ago
//jaddnum(item,"profit",peer->profitmargin);
8 years ago
return(item);
}
char *LP_peers()
{
struct LP_peerinfo *peer,*tmp; cJSON *peersjson = cJSON_CreateArray();
HASH_ITER(hh,LP_peerinfos,peer,tmp)
{
8 years ago
//if ( peer->errors < LP_MAXPEER_ERRORS )
7 years ago
if ( peer->isLP != 0 )
8 years ago
jaddi(peersjson,LP_peerjson(peer));
8 years ago
}
return(jprint(peersjson,1));
}
7 years ago
struct LP_peerinfo *LP_addpeer(struct LP_peerinfo *mypeer,int32_t mypubsock,char *ipaddr,uint16_t port,uint16_t pushport,uint16_t subport,int32_t isLP,uint32_t sessionid,uint16_t netid)
8 years ago
{
7 years ago
uint32_t ipbits; int32_t valid,pushsock,subsock,timeout; char checkip[64],pushaddr[64],subaddr[64]; struct LP_peerinfo *peer = 0;
8 years ago
#ifdef LP_STRICTPEERS
if ( strncmp("5.9.253",ipaddr,strlen("5.9.253")) != 0 )
return(0);
#endif
8 years ago
ipbits = (uint32_t)calc_ipbits(ipaddr);
expand_ipbits(checkip,ipbits);
if ( strcmp(checkip,ipaddr) == 0 )
{
if ( (peer= LP_peerfind(ipbits,port)) != 0 )
{
7 years ago
if ( peer->netid != netid )
{
printf("netid mismatch for %s? %d vs %d\n",peer->ipaddr,peer->netid,G.netid);
return(0);
}
if ( isLP != 0 && peer->isLP == 0 )
{
if ( (peer->isLP= isLP) != 0 )
LP_numactive_LP++;
}
8 years ago
/*if ( numpeers > peer->numpeers )
8 years ago
peer->numpeers = numpeers;
if ( numutxos > peer->numutxos )
peer->numutxos = numutxos;
8 years ago
if ( peer->sessionid == 0 )
8 years ago
peer->sessionid = sessionid;*/
8 years ago
}
7 years ago
else if ( IAMLP != 0 || LP_numactive_LP < 10 )
8 years ago
{
7 years ago
//printf("addpeer (%s:%u) pushport.%u subport.%u\n",ipaddr,port,pushport,subport);
8 years ago
peer = calloc(1,sizeof(*peer));
8 years ago
if ( strcmp(peer->ipaddr,LP_myipaddr) == 0 )
7 years ago
peer->sessionid = G.LP_sessionid;
8 years ago
else peer->sessionid = sessionid;
8 years ago
peer->pushsock = peer->subsock = pushsock = subsock = -1;
strcpy(peer->ipaddr,ipaddr);
7 years ago
peer->netid = netid;
7 years ago
//peer->profitmargin = profitmargin;
peer->ipbits = ipbits;
if ( (peer->isLP= isLP) != 0 )
LP_numactive_LP++;
7 years ago
peer->port = port;
peer->ip_port = ((uint64_t)port << 32) | ipbits;
8 years ago
if ( pushport != 0 && subport != 0 && (pushsock= nn_socket(AF_SP,NN_PUSH)) >= 0 )
{
8 years ago
nanomsg_transportname(0,pushaddr,peer->ipaddr,pushport);
7 years ago
//nanomsg_transportname2(0,pushaddr2,peer->ipaddr,pushport);
7 years ago
valid = 0;
8 years ago
if ( nn_connect(pushsock,pushaddr) >= 0 )
7 years ago
valid++;
7 years ago
//if ( nn_connect(pushsock,pushaddr2) >= 0 )
// valid++;
7 years ago
if ( valid > 0 )
8 years ago
{
7 years ago
timeout = 10;
7 years ago
nn_setsockopt(pushsock,NN_SOL_SOCKET,NN_MAXTTL,&timeout,sizeof(timeout));
timeout = 100;
8 years ago
nn_setsockopt(pushsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout));
8 years ago
//maxsize = 2 * 1024 * 1024;
//nn_setsockopt(pushsock,NN_SOL_SOCKET,NN_SNDBUF,&maxsize,sizeof(maxsize));
7 years ago
printf("connected to push.(%s) pushsock.%d valid.%d | ",pushaddr,pushsock,valid);
8 years ago
peer->connected = (uint32_t)time(NULL);
peer->pushsock = pushsock;
8 years ago
if ( (subsock= nn_socket(AF_SP,NN_SUB)) >= 0 )
8 years ago
{
7 years ago
timeout = 100;
8 years ago
nn_setsockopt(subsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout));
nn_setsockopt(subsock,NN_SUB,NN_SUB_SUBSCRIBE,"",0);
8 years ago
nanomsg_transportname(0,subaddr,peer->ipaddr,subport);
7 years ago
//nanomsg_transportname2(0,subaddr2,peer->ipaddr,subport);
7 years ago
valid = 0;
8 years ago
if ( nn_connect(subsock,subaddr) >= 0 )
7 years ago
valid++;
7 years ago
//if ( nn_connect(subsock,subaddr2) >= 0 )
// valid++;
7 years ago
if ( valid > 0 )
8 years ago
{
peer->subsock = subsock;
printf("connected to sub.(%s) subsock.%d valid.%d numactive.%d\n",subaddr,peer->subsock,valid,LP_numactive_LP);
7 years ago
}
else
{
7 years ago
printf("error connecting to subsock.%d (%s)\n",subsock,subaddr);
7 years ago
nn_close(subsock);
subsock = -1;
}
8 years ago
}
}
else
{
nn_close(pushsock);
7 years ago
pushsock = -1;
7 years ago
printf("error connecting to push.(%s)\n",pushaddr);
8 years ago
}
7 years ago
} else printf("%s pushport.%u subport.%u pushsock.%d isLP.%d\n",ipaddr,pushport,subport,pushsock,isLP);
7 years ago
if ( peer->pushsock >= 0 && peer->subsock >= 0 )
8 years ago
{
7 years ago
//printf("add peer %s isLP.%d\n",peer->ipaddr,peer->isLP);
7 years ago
portable_mutex_lock(&LP_peermutex);
HASH_ADD(hh,LP_peerinfos,ip_port,sizeof(peer->ip_port),peer);
if ( mypeer != 0 )
{
mypeer->numpeers++;
7 years ago
printf("_LPaddpeer %s -> numpeers.%d mypubsock.%d other.(%d)\n",ipaddr,mypeer->numpeers,mypubsock,isLP);
7 years ago
} else peer->numpeers = 1; // will become mypeer
portable_mutex_unlock(&LP_peermutex);
7 years ago
if ( IAMLP == 0 )
8 years ago
{
7 years ago
char connectaddr[128],publicaddr[128],*retstr; int32_t pullsock,pubsock; uint16_t cmdport;
7 years ago
if ( (cmdport= LP_psock_get(connectaddr,publicaddr,1,1,peer->ipaddr)) != 0 )
8 years ago
{
7 years ago
printf("call _LP_psock_create\n");
7 years ago
if ( (retstr= _LP_psock_create(&pullsock,&pubsock,peer->ipaddr,cmdport,cmdport,1,1)) != 0 )
8 years ago
{
7 years ago
printf("cmdchannel! %s\n",retstr);
free(retstr);
8 years ago
}
7 years ago
} else printf("error getting cmdchannel with %s\n",peer->ipaddr);
}
7 years ago
} else printf("%s invalid pushsock.%d or subsock.%d\n",peer->ipaddr,peer->pushsock,peer->subsock);
8 years ago
}
} else printf("LP_addpeer: checkip.(%s) vs (%s)\n",checkip,ipaddr);
return(peer);
}
7 years ago
void LP_closepeers()
{
struct LP_peerinfo *peer,*tmp;
HASH_ITER(hh,LP_peerinfos,peer,tmp)
{
portable_mutex_lock(&LP_peermutex);
HASH_DELETE(hh,LP_peerinfos,peer);
portable_mutex_unlock(&LP_peermutex);
if ( peer->pushsock >= 0 )
nn_close(peer->pushsock), peer->pushsock = -1;
if ( peer->subsock >= 0 )
nn_close(peer->subsock), peer->subsock = -1;
// free(peer); a small memleak to avoid freein inflight requests
}
}
7 years ago
/*int32_t LP_coinbus(uint16_t coin_busport)
8 years ago
{
struct LP_peerinfo *peer,*tmp; char busaddr[64]; int32_t timeout,bussock = -1;
8 years ago
return(-1);
8 years ago
if ( IAMLP != 0 && LP_mypeer != 0 && (bussock= nn_socket(AF_SP,NN_BUS)) >= 0 )
{
timeout = 1;
nn_setsockopt(bussock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout));
nn_setsockopt(bussock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout));
nanomsg_transportname(0,busaddr,LP_mypeer->ipaddr,coin_busport);
if ( nn_bind(bussock,busaddr) < 0 )
{
printf("error binding to coin_busport.%s\n",busaddr);
nn_close(bussock);
}
else
{
HASH_ITER(hh,LP_peerinfos,peer,tmp)
{
if ( LP_mypeer->port != peer->port || strcmp(LP_mypeer->ipaddr,peer->ipaddr) != 0 )
{
nanomsg_transportname(0,busaddr,peer->ipaddr,coin_busport);
nn_connect(bussock,busaddr);
}
}
}
}
return(bussock);
7 years ago
}*/
8 years ago
7 years ago
void LP_peer_recv(char *ipaddr,int32_t ismine)
7 years ago
{
struct LP_peerinfo *peer;
if ( (peer= LP_peerfind((uint32_t)calc_ipbits(ipaddr),RPC_port)) != 0 )
7 years ago
{
7 years ago
peer->numrecv++;
7 years ago
//if ( ismine != 0 )
peer->recvtime = (uint32_t)time(NULL);
7 years ago
}
7 years ago
}
8 years ago
8 years ago
int32_t LP_numpeers()
{
struct LP_peerinfo *peer,*tmp; int32_t numpeers = 0;
HASH_ITER(hh,LP_peerinfos,peer,tmp)
{
7 years ago
if ( peer->isLP != 0 )
numpeers++;
8 years ago
}
return(numpeers);
}
7 years ago
uint16_t LP_randpeer(char *destip)
{
struct LP_peerinfo *peer,*tmp; uint16_t port = 0; int32_t n,r,numpeers = 0;
7 years ago
destip[0] = 0;
numpeers = LP_numpeers();
7 years ago
if ( numpeers > 0 )
{
7 years ago
r = LP_rand() % numpeers;
7 years ago
n = 0;
HASH_ITER(hh,LP_peerinfos,peer,tmp)
{
7 years ago
if ( peer->isLP != 0 )
7 years ago
{
7 years ago
if ( n++ == r )
{
strcpy(destip,peer->ipaddr);
port = peer->port;
break;
}
7 years ago
}
}
}
return(port);
}
7 years ago
uint16_t LP_rarestpeer(char *destip)
{
7 years ago
struct LP_peerinfo *peer,*tmp,*rarest = 0; int32_t iter; uint32_t now;
now = (uint32_t)time(NULL);
7 years ago
destip[0] = 0;
7 years ago
for (iter=0; iter<2; iter++)
7 years ago
{
7 years ago
HASH_ITER(hh,LP_peerinfos,peer,tmp)
7 years ago
{
7 years ago
if ( strcmp(peer->ipaddr,LP_myipaddr) != 0 && iter == 0 && peer->recvtime < now-3600 )
7 years ago
continue;
if ( peer->isLP != 0 )
{
if ( rarest == 0 || peer->numrecv < rarest->numrecv )
rarest = peer;
}
7 years ago
}
7 years ago
if ( rarest != 0 )
break;
7 years ago
}
if ( rarest == 0 )
LP_randpeer(destip);
else strcpy(destip,rarest->ipaddr);
return(rarest != 0 ? rarest->port : RPC_port);
}