|
@ -18,7 +18,7 @@ |
|
|
|
|
|
|
|
|
struct dex_nanomsghdr |
|
|
struct dex_nanomsghdr |
|
|
{ |
|
|
{ |
|
|
uint32_t size,datalen,crc32; |
|
|
uint32_t size,datalen,crc32,timestamp; |
|
|
uint8_t version0,version1,packet[]; |
|
|
uint8_t version0,version1,packet[]; |
|
|
} PACKED; |
|
|
} PACKED; |
|
|
|
|
|
|
|
@ -56,7 +56,7 @@ static int _increasing_ipbits(const void *a,const void *b) |
|
|
|
|
|
|
|
|
void dex_packet(struct supernet_info *myinfo,struct dex_nanomsghdr *dexp,int32_t size) |
|
|
void dex_packet(struct supernet_info *myinfo,struct dex_nanomsghdr *dexp,int32_t size) |
|
|
{ |
|
|
{ |
|
|
printf("DEX_PACKET.[%d]\n",size); |
|
|
printf("DEX_PACKET.[%d] crc.%x lag.%d\n",size,calc_crc32(0,(void *)dexp,size),(int32_t)(time(NULL)-dexp->timestamp)); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
int32_t dex_reqsend(struct supernet_info *myinfo,uint8_t *data,int32_t datalen) |
|
|
int32_t dex_reqsend(struct supernet_info *myinfo,uint8_t *data,int32_t datalen) |
|
@ -71,7 +71,10 @@ int32_t dex_reqsend(struct supernet_info *myinfo,uint8_t *data,int32_t datalen) |
|
|
} |
|
|
} |
|
|
else |
|
|
else |
|
|
{ |
|
|
{ |
|
|
if ( myinfo->subsock < 0 && (myinfo->subsock= nn_socket(AF_SP,NN_SUB)) >= 0 ) |
|
|
timeout = 500; |
|
|
|
|
|
nn_setsockopt(myinfo->reqsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); |
|
|
|
|
|
nn_setsockopt(myinfo->reqsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); |
|
|
|
|
|
if ( myinfo->IAMNOTARY == 0 && myinfo->subsock < 0 && (myinfo->subsock= nn_socket(AF_SP,NN_SUB)) >= 0 ) |
|
|
{ |
|
|
{ |
|
|
if ( nn_connect(myinfo->subsock,nanomsg_tcpname(0,str,myinfo->dexseed_ipaddr,PUB_SOCK)) < 0 ) |
|
|
if ( nn_connect(myinfo->subsock,nanomsg_tcpname(0,str,myinfo->dexseed_ipaddr,PUB_SOCK)) < 0 ) |
|
|
{ |
|
|
{ |
|
@ -82,13 +85,10 @@ int32_t dex_reqsend(struct supernet_info *myinfo,uint8_t *data,int32_t datalen) |
|
|
} |
|
|
} |
|
|
else |
|
|
else |
|
|
{ |
|
|
{ |
|
|
timeout = 100; |
|
|
|
|
|
nn_setsockopt(myinfo->reqsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); |
|
|
|
|
|
nn_setsockopt(myinfo->subsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); |
|
|
nn_setsockopt(myinfo->subsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); |
|
|
nn_setsockopt(myinfo->reqsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); |
|
|
|
|
|
nn_setsockopt(myinfo->subsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); |
|
|
nn_setsockopt(myinfo->subsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); |
|
|
nn_setsockopt(myinfo->subsock,NN_SUB,NN_SUB_SUBSCRIBE,"",0); |
|
|
nn_setsockopt(myinfo->subsock,NN_SUB,NN_SUB_SUBSCRIBE,"",0); |
|
|
printf("DEXINIT req.%d sub.%d\n",myinfo->reqsock,myinfo->subsock); |
|
|
printf("CLIENT sockets req.%d sub.%d\n",myinfo->reqsock,myinfo->subsock); |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@ -101,36 +101,44 @@ int32_t dex_reqsend(struct supernet_info *myinfo,uint8_t *data,int32_t datalen) |
|
|
dexp->size = size; |
|
|
dexp->size = size; |
|
|
dexp->datalen = datalen; |
|
|
dexp->datalen = datalen; |
|
|
dexp->crc32 = crc32; |
|
|
dexp->crc32 = crc32; |
|
|
|
|
|
dexp->timestamp = (uint32_t)time(NULL); |
|
|
dexp->version0 = DEX_VERSION & 0xff; |
|
|
dexp->version0 = DEX_VERSION & 0xff; |
|
|
dexp->version1 = (DEX_VERSION >> 8) & 0xff; |
|
|
dexp->version1 = (DEX_VERSION >> 8) & 0xff; |
|
|
memcpy(dexp->packet,data,datalen); |
|
|
memcpy(dexp->packet,data,datalen); |
|
|
sentbytes = nn_send(myinfo->reqsock,dexp,size,0); |
|
|
sentbytes = nn_send(myinfo->reqsock,dexp,size,0); |
|
|
if ( (recvbytes= nn_recv(myinfo->reqsock,&retptr,NN_MSG,0)) >= 0 ) |
|
|
if ( (recvbytes= nn_recv(myinfo->reqsock,&retptr,NN_MSG,0)) >= 0 ) |
|
|
{ |
|
|
{ |
|
|
portable_mutex_lock(&myinfo->dexmutex); |
|
|
|
|
|
ipbits = *retptr; |
|
|
ipbits = *retptr; |
|
|
expand_ipbits(ipaddr,ipbits); |
|
|
expand_ipbits(ipaddr,ipbits); |
|
|
printf("req returned.[%d] %08x %s\n",recvbytes,*retptr,ipaddr); |
|
|
printf("req returned.[%d] %08x %s\n",recvbytes,*retptr,ipaddr); |
|
|
n = myinfo->numdexipbits; |
|
|
if ( myinfo->IAMNOTARY == 0 ) |
|
|
for (i=0; i<n; i++) |
|
|
|
|
|
if ( ipbits == myinfo->dexipbits[i] ) |
|
|
|
|
|
break; |
|
|
|
|
|
if ( i == n && n < 64 ) |
|
|
|
|
|
{ |
|
|
{ |
|
|
myinfo->dexipbits[n++] = ipbits; |
|
|
portable_mutex_lock(&myinfo->dexmutex); |
|
|
qsort(myinfo->dexipbits,n,sizeof(uint32_t),_increasing_ipbits); |
|
|
n = myinfo->numdexipbits; |
|
|
if ( (myinfo->numdexipbits= n) < 3 ) |
|
|
for (i=0; i<n; i++) |
|
|
|
|
|
if ( ipbits == myinfo->dexipbits[i] ) |
|
|
|
|
|
break; |
|
|
|
|
|
if ( i == n && n < 64 ) |
|
|
{ |
|
|
{ |
|
|
if ( myinfo->subsock >= 0 ) |
|
|
myinfo->dexipbits[n++] = ipbits; |
|
|
nn_connect(myinfo->subsock,nanomsg_tcpname(0,str,ipaddr,PUB_SOCK)); |
|
|
qsort(myinfo->dexipbits,n,sizeof(uint32_t),_increasing_ipbits); |
|
|
|
|
|
if ( (myinfo->numdexipbits= n) < 3 ) |
|
|
|
|
|
{ |
|
|
|
|
|
if ( myinfo->subsock >= 0 ) |
|
|
|
|
|
{ |
|
|
|
|
|
nn_connect(myinfo->subsock,nanomsg_tcpname(0,str,ipaddr,PUB_SOCK)); |
|
|
|
|
|
printf("%d: subscribe connect (%s)\n",myinfo->numdexipbits,str); |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
nn_connect(myinfo->reqsock,nanomsg_tcpname(0,str,ipaddr,REP_SOCK)); |
|
|
|
|
|
printf("%d: req connect (%s)\n",myinfo->numdexipbits,str); |
|
|
} |
|
|
} |
|
|
nn_connect(myinfo->reqsock,nanomsg_tcpname(0,str,ipaddr,REP_SOCK)); |
|
|
portable_mutex_unlock(&myinfo->dexmutex); |
|
|
} |
|
|
} |
|
|
portable_mutex_unlock(&myinfo->dexmutex); |
|
|
|
|
|
nn_freemsg(retptr); |
|
|
nn_freemsg(retptr); |
|
|
} else retval = -2; |
|
|
} else retval = -2; |
|
|
free(dexp); |
|
|
free(dexp); |
|
|
printf("DEXREQ.[%d] crc32.%08x datalen.%d sent.%d\n",size,dexp->crc32,datalen,sentbytes); |
|
|
printf("DEXREQ.[%d] crc32.%08x datalen.%d sent.%d timestamp.%u\n",size,dexp->crc32,datalen,sentbytes,dexp->timestamp); |
|
|
} else retval = -1; |
|
|
} else retval = -1; |
|
|
return(retval); |
|
|
return(retval); |
|
|
} |
|
|
} |
|
@ -169,12 +177,11 @@ int32_t dex_packetcheck(struct supernet_info *myinfo,struct dex_nanomsghdr *dexp |
|
|
return(-1); |
|
|
return(-1); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
void dex_subsock_poll(struct supernet_info *myinfo) |
|
|
int32_t dex_subsock_poll(struct supernet_info *myinfo) |
|
|
{ |
|
|
{ |
|
|
int32_t size,n=0; struct dex_nanomsghdr *dexp; |
|
|
int32_t size; struct dex_nanomsghdr *dexp; |
|
|
while ( (size= nn_recv(myinfo->subsock,&dexp,NN_MSG,0)) >= 0 ) |
|
|
if ( (size= nn_recv(myinfo->subsock,&dexp,NN_MSG,0)) >= 0 ) |
|
|
{ |
|
|
{ |
|
|
n++; |
|
|
|
|
|
if ( dex_packetcheck(myinfo,dexp,size) == 0 ) |
|
|
if ( dex_packetcheck(myinfo,dexp,size) == 0 ) |
|
|
{ |
|
|
{ |
|
|
printf("SUBSOCK.%08x",dexp->crc32); |
|
|
printf("SUBSOCK.%08x",dexp->crc32); |
|
@ -182,13 +189,19 @@ void dex_subsock_poll(struct supernet_info *myinfo) |
|
|
} |
|
|
} |
|
|
if ( dexp != 0 ) |
|
|
if ( dexp != 0 ) |
|
|
nn_freemsg(dexp), dexp = 0; |
|
|
nn_freemsg(dexp), dexp = 0; |
|
|
if ( size == 0 || n++ > 100 ) |
|
|
|
|
|
break; |
|
|
|
|
|
} |
|
|
} |
|
|
|
|
|
return(size); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
#if ISNOTARYNODE |
|
|
void dex_update(struct supernet_info *myinfo) |
|
|
|
|
|
{ |
|
|
|
|
|
int32_t i; |
|
|
|
|
|
for (i=0; i<100; i++) |
|
|
|
|
|
if ( dex_subsock_poll(myinfo) <= 0 ) |
|
|
|
|
|
break; |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
#if ISNOTARYNODE |
|
|
struct dpow_nanoutxo |
|
|
struct dpow_nanoutxo |
|
|
{ |
|
|
{ |
|
|
bits256 srcutxo,destutxo; |
|
|
bits256 srcutxo,destutxo; |
|
@ -216,6 +229,8 @@ void dpow_sigscheck(struct supernet_info *myinfo,struct dpow_info *dp,struct dpo |
|
|
int32_t dpow_addnotary(struct supernet_info *myinfo,struct dpow_info *dp,char *ipaddr) |
|
|
int32_t dpow_addnotary(struct supernet_info *myinfo,struct dpow_info *dp,char *ipaddr) |
|
|
{ |
|
|
{ |
|
|
char str[512]; uint32_t ipbits,*ptr; int32_t i,iter,n,retval = -1; |
|
|
char str[512]; uint32_t ipbits,*ptr; int32_t i,iter,n,retval = -1; |
|
|
|
|
|
if ( myinfo->IAMNOTARY == 0 ) |
|
|
|
|
|
return(-1); |
|
|
portable_mutex_lock(&myinfo->notarymutex); |
|
|
portable_mutex_lock(&myinfo->notarymutex); |
|
|
if ( myinfo->dpowsock >= 0 && myinfo->dexsock >= 0 ) |
|
|
if ( myinfo->dpowsock >= 0 && myinfo->dexsock >= 0 ) |
|
|
{ |
|
|
{ |
|
@ -269,6 +284,8 @@ void dpow_nanomsginit(struct supernet_info *myinfo,char *ipaddr) |
|
|
printf("need to set ipaddr before nanomsg\n"); |
|
|
printf("need to set ipaddr before nanomsg\n"); |
|
|
return; |
|
|
return; |
|
|
} |
|
|
} |
|
|
|
|
|
if ( myinfo->IAMNOTARY == 0 ) |
|
|
|
|
|
return; |
|
|
portable_mutex_lock(&myinfo->notarymutex); |
|
|
portable_mutex_lock(&myinfo->notarymutex); |
|
|
if ( myinfo->dpowsock < 0 && (myinfo->dpowsock= nn_socket(AF_SP,NN_BUS)) >= 0 ) |
|
|
if ( myinfo->dpowsock < 0 && (myinfo->dpowsock= nn_socket(AF_SP,NN_BUS)) >= 0 ) |
|
|
{ |
|
|
{ |
|
@ -327,6 +344,7 @@ void dpow_nanomsginit(struct supernet_info *myinfo,char *ipaddr) |
|
|
nn_setsockopt(myinfo->dexsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); |
|
|
nn_setsockopt(myinfo->dexsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); |
|
|
nn_setsockopt(myinfo->repsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); |
|
|
nn_setsockopt(myinfo->repsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); |
|
|
nn_setsockopt(myinfo->dexsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); |
|
|
nn_setsockopt(myinfo->dexsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); |
|
|
|
|
|
timeout = 500; |
|
|
nn_setsockopt(myinfo->repsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); |
|
|
nn_setsockopt(myinfo->repsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); |
|
|
maxsize = 1024 * 1024; |
|
|
maxsize = 1024 * 1024; |
|
|
printf("RCVBUF.%d\n",nn_setsockopt(myinfo->dexsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize))); |
|
|
printf("RCVBUF.%d\n",nn_setsockopt(myinfo->dexsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize))); |
|
|