Browse Source

Sock

etomic
jl777 8 years ago
parent
commit
9ed0c33015
  1. 24
      basilisk/basilisk_MSG.c
  2. 3
      iguana/dPoW.h
  3. 147
      iguana/dpow/dpow_network.c
  4. 4
      iguana/iguana777.h
  5. 10
      iguana/iguana_notary.c
  6. 3
      iguana/main.c
  7. 2
      includes/iguana_apideclares.h
  8. 1
      includes/iguana_funcs.h

24
basilisk/basilisk_MSG.c

@ -295,6 +295,30 @@ cJSON *dpow_getmessage(struct supernet_info *myinfo,char *jsonstr)
return(retjson);
}
cJSON *dpow_addmessage(struct supernet_info *myinfo,char *jsonstr)
{
cJSON *vals,*retjson=0; char *retstr=0,*datastr; int32_t datalen,keylen; uint8_t *data=0,key[BASILISK_KEYSIZE];
if ( (vals= cJSON_Parse(jsonstr)) != 0 )
{
keylen = basilisk_messagekey(key,juint(vals,"channel"),juint(vals,"msgid"),jbits256(vals,"srchash"),jbits256(vals,"desthash"));
if ( (datastr= jstr(vals,"data")) != 0 )
{
datalen = (int32_t)strlen(datastr) >> 1;
data = malloc(datalen);
decode_hex(data,datalen,datastr);
if ( (retstr= basilisk_respond_addmessage(myinfo,key,keylen,data,datalen,0,juint(vals,"duration"))) != 0 )
retjson = cJSON_Parse(retstr);
}
if ( retstr != 0 )
free(retstr);
if ( data != 0 )
free(data);
}
if ( retjson == 0 )
retjson = cJSON_Parse("{\"error\":\"couldnt add message\"}");
return(retjson);
}
#include "../includes/iguana_apidefs.h"
#include "../includes/iguana_apideclares.h"

3
iguana/dPoW.h

@ -168,6 +168,8 @@ cJSON *dpow_kvupdate(struct supernet_info *myinfo,struct iguana_info *coin,char
cJSON *dpow_kvsearch(struct supernet_info *myinfo,struct iguana_info *coin,char *key);
void init_alladdresses(struct supernet_info *myinfo,struct iguana_info *coin);
cJSON *dpow_getmessage(struct supernet_info *myinfo,char *jsonstr);
cJSON *dpow_addmessage(struct supernet_info *myinfo,char *jsonstr);
cJSON *dpow_psock(struct supernet_info *myinfo,char *jsonstr);
char *_dex_getinfo(struct supernet_info *myinfo,char *symbol);
char *_dex_getrawtransaction(struct supernet_info *myinfo,char *symbol,bits256 txid);
@ -191,6 +193,7 @@ int32_t _dex_getheight(struct supernet_info *myinfo,char *symbol);
char *_dex_getnotaries(struct supernet_info *myinfo,char *symbol);
char *_dex_kvupdate(struct supernet_info *myinfo,char *symbol,char *key,char *value,int32_t flags);
char *_dex_kvsearch(struct supernet_info *myinfo,char *symbol,char *key);
char *_dex_psock(struct supernet_info *myinfo,char *jsonstr);
int32_t komodo_notaries(char *symbol,uint8_t pubkeys[64][33],int32_t height);
cJSON *dpow_checkaddress(struct supernet_info *myinfo,struct iguana_info *coin,char *address);

147
iguana/dpow/dpow_network.c

@ -239,6 +239,106 @@ char *nanomsg_tcpname(struct supernet_info *myinfo,char *str,char *ipaddr,uint16
return(str);
}
void dpow_psockloop(void *_ptr)
{
int32_t i,nonz,size,sentbytes; uint32_t now; struct psock *ptr; void *buf; struct supernet_info *myinfo = _ptr;
while ( 1 )
{
now = (uint32_t)time(NULL);
for (i=nonz=0; i<myinfo->numpsocks; i++) // change to nn_poll!
{
portable_mutex_lock(&myinfo->psockmutex);
if ( i < myinfo->numpsocks )
{
ptr = &myinfo->PSOCKS[i];
if ( (size= nn_recv(ptr->pullsock,&buf,NN_MSG,0)) > 0 )
{
ptr->lasttime = now;
if ( (sentbytes= nn_send(ptr->pubsock,buf,size,0)) > 0 )
{
printf("PSOCKS[%d] of %d (%u %u) -> %d/%d bytes\n",i,myinfo->numpsocks,ptr->pushport,ptr->subport,size,sentbytes);
nonz++;
}
}
else if ( now > ptr->lasttime+PSOCK_IDLETIMEOUT )
{
printf("PSOCKS[%d] of %d (%u %u) lag.%d IDLETIMEOUT\n",i,myinfo->numpsocks,ptr->pushport,ptr->subport,now - ptr->lasttime);
nn_close(ptr->pullsock);
nn_close(ptr->pubsock);
if ( myinfo->numpsocks > 1 )
{
myinfo->PSOCKS[i] = myinfo->PSOCKS[--myinfo->numpsocks];
memset(&myinfo->PSOCKS[myinfo->numpsocks],0,sizeof(*ptr));
} else myinfo->numpsocks = 0;
}
if ( buf != 0 )
nn_freemsg(buf), buf = 0;
}
portable_mutex_unlock(&myinfo->psockmutex);
}
if ( nonz == 0 )
usleep(100000);
}
}
void dpow_psockadd(struct supernet_info *myinfo,int32_t pullsock,uint16_t pushport,int32_t pubsock,uint16_t subport)
{
struct psock *ptr;
portable_mutex_lock(&myinfo->psockmutex);
myinfo->PSOCKS = realloc(myinfo->PSOCKS,sizeof(*myinfo->PSOCKS) * (myinfo->numpsocks + 1));
ptr = &myinfo->PSOCKS[myinfo->numpsocks++];
ptr->pullsock = pullsock;
ptr->pushport = pushport;
ptr->pubsock = pubsock;
ptr->subport = subport;
ptr->lasttime = (uint32_t)time(NULL);
portable_mutex_unlock(&myinfo->psockmutex);
}
cJSON *dpow_psock(struct supernet_info *myinfo,char *jsonstr)
{
char pushaddr[128],subaddr[128]; uint16_t i,pushport,subport; int32_t timeout,maxsize,pullsock=-1,pubsock=-1; cJSON *retjson=0;
retjson = cJSON_CreateObject();
pushport = myinfo->psockport++;
subport = myinfo->psockport++;
for (i=0; i<100; i++)
{
nanomsg_tcpname(myinfo,pushaddr,myinfo->ipaddr,pushport), pushport += 2;
nanomsg_tcpname(myinfo,subaddr,myinfo->ipaddr,subport), subport += 2;
if ( (pullsock= nn_socket(AF_SP,NN_PULL)) >= 0 && (pubsock= nn_socket(AF_SP,NN_PUB)) >= 0 )
{
if ( nn_bind(pullsock,pushaddr) >= 0 && nn_bind(pubsock,subaddr) >= 0 )
{
timeout = 10;
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout));
timeout = 1;
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout));
timeout = 1;
maxsize = 1024 * 1024;
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize));
dpow_psockadd(myinfo,pullsock,pushport,pubsock,subport);
jaddstr(retjson,"result","success");
jaddstr(retjson,"pushaddr",pushaddr);
jaddstr(retjson,"subaddr",subaddr);
break;
}
}
if ( pushport < 1000 )
pushport = 1001;
if ( subport < 1000 )
subport = 1001;
}
if ( i == 100 )
{
jaddstr(retjson,"error","cant find psock ports");
if ( pullsock >= 0 )
nn_close(pullsock);
if ( pubsock >= 0 )
nn_close(pubsock);
}
return(retjson);
}
static int _increasing_ipbits(const void *a,const void *b)
{
#define uint32_a (*(uint32_t *)a)
@ -254,7 +354,7 @@ static int _increasing_ipbits(const void *a,const void *b)
void dex_packet(struct supernet_info *myinfo,struct dex_nanomsghdr *dexp,int32_t size)
{
char *retstr; int32_t datalen; struct iguana_info *coin; struct dex_request dexreq;
char *retstr; int32_t datalen; struct dex_request dexreq; //struct iguana_info *coin;
//for (i=0; i<size; i++)
// printf("%02x",((uint8_t *)dexp)[i]);
if ( strcmp(dexp->handler,"DEX") == 0 && dexp->datalen > BASILISK_KEYSIZE )
@ -266,12 +366,12 @@ void dex_packet(struct supernet_info *myinfo,struct dex_nanomsghdr *dexp,int32_t
else if ( strcmp(dexp->handler,"request") == 0 )
{
datalen = dex_rwrequest(0,dexp->packet,&dexreq);
if ( myinfo->IAMNOTARY != 0 && dexreq.func == 'A' && (coin= iguana_coinfind(dexreq.name)) != 0 )
/*if ( myinfo->IAMNOTARY != 0 && dexreq.func == 'A' && (coin= iguana_coinfind(dexreq.name)) != 0 )
{
if ( (retstr= dpow_importaddress(myinfo,coin,(char *)&dexp->packet[datalen])) != 0 )
free(retstr);
printf("process broadcast importaddress.(%s) [%s]\n",(char *)&dexp->packet[datalen],dexreq.name);
}
}*/
}
}
@ -652,6 +752,23 @@ char *dex_response(int32_t *broadcastflagp,struct supernet_info *myinfo,struct d
retstr = jprint(retjson,1);
}
}
else if ( dexreq.func == 'Z' )
{
if ( (retjson= dpow_psock(myinfo,(char *)&dexp->packet[datalen])) != 0 )
{
dpow_randipbits(myinfo,coin,retjson);
retstr = jprint(retjson,1);
}
}
else if ( 0 && dexreq.func == 'a' )
{
if ( (retjson= dpow_addmessage(myinfo,(char *)&dexp->packet[datalen])) != 0 )
{
*broadcastflagp = 1;
dpow_randipbits(myinfo,coin,retjson);
retstr = jprint(retjson,1);
}
}
else if ( dexreq.func == 'N' )
{
uint8_t pubkeys[64][33]; char str[128]; int32_t numnotaries; cJSON *array,*item;
@ -1010,6 +1127,24 @@ char *_dex_getmessage(struct supernet_info *myinfo,char *jsonstr)
return(_dex_sendrequeststr(myinfo,&dexreq,jsonstr,0,1,""));
}
char *_dex_sendmessage(struct supernet_info *myinfo,char *jsonstr)
{
struct dex_request dexreq;
memset(&dexreq,0,sizeof(dexreq));
safecopy(dexreq.name,"KMD",sizeof(dexreq.name));
dexreq.func = 'a';
return(_dex_sendrequeststr(myinfo,&dexreq,jsonstr,0,1,""));
}
char *_dex_psock(struct supernet_info *myinfo,char *jsonstr)
{
struct dex_request dexreq;
memset(&dexreq,0,sizeof(dexreq));
safecopy(dexreq.name,"KMD",sizeof(dexreq.name));
dexreq.func = 'Z';
return(_dex_sendrequeststr(myinfo,&dexreq,jsonstr,0,1,""));
}
char *_dex_listunspentarg(struct supernet_info *myinfo,char *symbol,char *address,uint8_t arg)
{
struct dex_request dexreq; char *retstr;
@ -1102,7 +1237,7 @@ int32_t dex_crc32find(struct supernet_info *myinfo,uint32_t crc32)
int32_t dex_packetcheck(struct supernet_info *myinfo,struct dex_nanomsghdr *dexp,int32_t size)
{
int32_t firstz=-1; uint32_t crc32;
uint32_t crc32; //int32_t firstz=-1;
if ( dexp->version0 == (DEX_VERSION & 0xff) && dexp->version1 == ((DEX_VERSION >> 8) & 0xff) )
{
if ( dexp->datalen == (size - sizeof(*dexp)) )
@ -1988,7 +2123,7 @@ int32_t dpow_nanomsg_update(struct supernet_info *myinfo)
if ( dex_packetcheck(myinfo,dexp,size) == 0 )
{
//printf("FROM BUS.%08x -> pub\n",dexp->crc32);
signed_nn_send(myinfo,myinfo->ctx,myinfo->persistent_priv,myinfo->pubsock,dexp,size);
//signed_nn_send(myinfo,myinfo->ctx,myinfo->persistent_priv,myinfo->pubsock,dexp,size);
dex_packet(myinfo,dexp,size);
}
//printf("GOT DEX bus PACKET.%d\n",size);
@ -2012,7 +2147,7 @@ int32_t dpow_nanomsg_update(struct supernet_info *myinfo)
{
printf("BROADCAST dexp request.[%d]\n",size);
signed_nn_send(myinfo,myinfo->ctx,myinfo->persistent_priv,myinfo->dexsock,dexp,size);
signed_nn_send(myinfo,myinfo->ctx,myinfo->persistent_priv,myinfo->pubsock,dexp,size);
//signed_nn_send(myinfo,myinfo->ctx,myinfo->persistent_priv,myinfo->pubsock,dexp,size);
}
}
else

4
iguana/iguana777.h

@ -90,6 +90,9 @@ struct supernet_address
struct pending_trade { UT_hash_handle hh; double basevolume,relvolume,dir; char base[32],rel[32]; };
#define PSOCK_IDLETIMEOUT 600
struct psock { uint32_t lasttime; int32_t pullsock,pubsock; uint16_t pushport,subport; };
#define JUMBLR_DEPOSITPREFIX "deposit "
struct jumblr_item
{
@ -149,6 +152,7 @@ struct supernet_info
uint8_t logs[256],exps[510];
struct message_info msgids[8192];
double *svmfeatures;
uint16_t psockport,numpsocks; struct psock *PSOCKS; portable_mutex_t psockmutex;
uint8_t notaries[64][33]; int32_t numnotaries,DEXEXPLORER;
};

10
iguana/iguana_notary.c

@ -747,6 +747,16 @@ TWO_STRINGS(dex,validateaddress,symbol,address)
return(_dex_validateaddress(myinfo,symbol,address));
}
STRING_ARG(dex,getmessage,argstr)
{
return(_dex_getmessage(myinfo,argstr));
}
STRING_ARG(dex,psock,argstr)
{
return(_dex_psock(myinfo,argstr));
}
STRING_ARG(dex,getnotaries,symbol)
{
return(_dex_getnotaries(myinfo,symbol));

3
iguana/main.c

@ -785,6 +785,7 @@ void iguana_launchdaemons(struct supernet_info *myinfo)
printf("launch mainloop\n");
OS_thread_create(malloc(sizeof(pthread_t)),NULL,(void *)DEX_explorerloop,(void *)myinfo);
OS_thread_create(malloc(sizeof(pthread_t)),NULL,(void *)jumblr_loop,(void *)myinfo);
OS_thread_create(malloc(sizeof(pthread_t)),NULL,(void *)dpow_psockloop,(void *)myinfo);
mainloop(myinfo);
}
@ -1749,6 +1750,7 @@ void iguana_main(void *arg)
myinfo->rpcport = IGUANA_RPCPORT;
myinfo->dpowsock = myinfo->dexsock = myinfo->pubsock = myinfo->subsock = myinfo->reqsock = myinfo->repsock = -1;
dex_init(myinfo);
myinfo->psockport = 30000;
if ( arg != 0 )
{
if ( strcmp((char *)arg,"OStests") == 0 )
@ -1775,6 +1777,7 @@ void iguana_main(void *arg)
portable_mutex_init(&myinfo->pending_mutex);
portable_mutex_init(&myinfo->dpowmutex);
portable_mutex_init(&myinfo->notarymutex);
portable_mutex_init(&myinfo->psockmutex);
#if LIQUIDITY_PROVIDER
myinfo->tradingexchanges[myinfo->numexchanges++] = exchange_create(clonestr("nxtae"),0);
myinfo->tradingexchanges[myinfo->numexchanges++] = exchange_create(clonestr("bitcoin"),0);

2
includes/iguana_apideclares.h

@ -49,6 +49,8 @@ HASH_AND_STRING_AND_INT(dex,gettxin,txid,symbol,vout);
TWO_STRINGS(dex,listspent,symbol,address);
TWO_STRINGS(dex,getbalance,symbol,address);
STRING_ARG(dex,explorer,symbol);
STRING_ARG(dex,getmessage,argstr);
STRING_ARG(dex,psock,argstr);
HASH_ARRAY_STRING(basilisk,genesis_opreturn,hash,vals,hexstr);
HASH_ARRAY_STRING(basilisk,history,hash,vals,hexstr);

1
includes/iguana_funcs.h

@ -596,6 +596,7 @@ void basilisk_ensurerelay(struct supernet_info *myinfo,struct iguana_info *notar
void dpow_nanomsginit(struct supernet_info *myinfo,char *ipaddr);
int32_t iguana_datachain_scan(struct supernet_info *myinfo,struct iguana_info *coin,uint8_t rmd160[20]);
void basilisk_requests_poll(struct supernet_info *myinfo,bits256 privkey);
void dpow_psockloop(void *_ptr);
void iguana_RTreset(struct iguana_info *coin);
void iguana_RTpurge(struct iguana_info *coin,int32_t lastheight);

Loading…
Cancel
Save