diff --git a/basilisk/basilisk.c b/basilisk/basilisk.c index 9ee041300..4c56f977b 100755 --- a/basilisk/basilisk.c +++ b/basilisk/basilisk.c @@ -894,7 +894,8 @@ void basilisks_loop(void *arg) } else { - dex_update(myinfo); + if ( myinfo->IAMNOTARY == 0 ) + dex_updateclient(myinfo); if ( myinfo->IAMLP != 0 ) endmilli = startmilli + 1000; else endmilli = startmilli + 2000; diff --git a/iguana/dPoW.h b/iguana/dPoW.h index 2f69039ad..43d85e8b1 100755 --- a/iguana/dPoW.h +++ b/iguana/dPoW.h @@ -140,6 +140,6 @@ struct dpow_info }; uint64_t dpow_notarybestk(uint64_t refmask,struct dpow_block *bp,int8_t *lastkp); int32_t dpow_paxpending(uint8_t *hex,uint32_t *paxwdcrcp); -void dex_update(struct supernet_info *myinfo); +void dex_updateclient(struct supernet_info *myinfo); #endif diff --git a/iguana/dpow/dpow_network.c b/iguana/dpow/dpow_network.c index baa36b731..f6b751383 100755 --- a/iguana/dpow/dpow_network.c +++ b/iguana/dpow/dpow_network.c @@ -61,38 +61,46 @@ void dex_packet(struct supernet_info *myinfo,struct dex_nanomsghdr *dexp,int32_t int32_t dex_reqsend(struct supernet_info *myinfo,uint8_t *data,int32_t datalen) { - struct dex_nanomsghdr *dexp; char ipaddr[64],str[128]; int32_t retval=0,timeout,i,n,size,recvbytes,sentbytes = 0; uint32_t *retptr,ipbits; - if ( myinfo->reqsock < 0 && (myinfo->reqsock= nn_socket(AF_SP,NN_REQ)) >= 0 ) + struct dex_nanomsghdr *dexp; char ipaddr[64],str[128]; int32_t retval=0,timeout,i,n,size,recvbytes,sentbytes = 0,reqsock,subsock; uint32_t *retptr,ipbits; + portable_mutex_lock(&myinfo->dexmutex); + subsock = myinfo->subsock; + reqsock = myinfo->reqsock; + if ( reqsock < 0 && (reqsock= nn_socket(AF_SP,NN_REQ)) >= 0 ) { - if ( nn_connect(myinfo->reqsock,nanomsg_tcpname(0,str,myinfo->dexseed_ipaddr,REP_SOCK)) < 0 ) + if ( nn_connect(reqsock,nanomsg_tcpname(0,str,myinfo->dexseed_ipaddr,REP_SOCK)) < 0 ) { - nn_close(myinfo->reqsock); - myinfo->reqsock = -1; + nn_close(reqsock); + reqsock = -1; } else { timeout = 500; - nn_setsockopt(myinfo->reqsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); - nn_setsockopt(myinfo->reqsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); - if ( myinfo->IAMNOTARY == 0 && myinfo->subsock < 0 && (myinfo->subsock= nn_socket(AF_SP,NN_SUB)) >= 0 ) + nn_setsockopt(reqsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); + nn_setsockopt(reqsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); + if ( myinfo->IAMNOTARY == 0 && subsock < 0 && (subsock= nn_socket(AF_SP,NN_SUB)) >= 0 ) { - if ( nn_connect(myinfo->subsock,nanomsg_tcpname(0,str,myinfo->dexseed_ipaddr,PUB_SOCK)) < 0 ) + if ( nn_connect(subsock,nanomsg_tcpname(0,str,myinfo->dexseed_ipaddr,PUB_SOCK)) < 0 ) { - nn_close(myinfo->reqsock); - myinfo->reqsock = -1; - nn_close(myinfo->subsock); - myinfo->subsock = -1; + nn_close(reqsock); + reqsock = -1; + nn_close(subsock); + subsock = -1; } else { - nn_setsockopt(myinfo->subsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); - nn_setsockopt(myinfo->subsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); - nn_setsockopt(myinfo->subsock,NN_SUB,NN_SUB_SUBSCRIBE,"",0); - printf("CLIENT sockets req.%d sub.%d\n",myinfo->reqsock,myinfo->subsock); + nn_setsockopt(subsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); + nn_setsockopt(subsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); + nn_setsockopt(subsock,NN_SUB,NN_SUB_SUBSCRIBE,"",0); + printf("CLIENT sockets req.%d sub.%d\n",reqsock,subsock); } } } } + if ( myinfo->subsock != subsock ) + myinfo->subsock = subsock; + if ( myinfo->reqsock != reqsock ) + myinfo->reqsock = reqsock; + portable_mutex_unlock(&myinfo->dexmutex); if ( myinfo->reqsock >= 0 ) { size = (int32_t)(sizeof(*dexp) + datalen); @@ -175,8 +183,8 @@ int32_t dex_packetcheck(struct supernet_info *myinfo,struct dex_nanomsghdr *dexp int32_t dex_subsock_poll(struct supernet_info *myinfo) { - int32_t size; struct dex_nanomsghdr *dexp; - if ( (size= nn_recv(myinfo->subsock,&dexp,NN_MSG,0)) >= 0 ) + int32_t size= -1; struct dex_nanomsghdr *dexp; + if ( myinfo->subsock >= 0 && (size= nn_recv(myinfo->subsock,&dexp,NN_MSG,0)) >= 0 ) { if ( dex_packetcheck(myinfo,dexp,size) == 0 ) { @@ -189,12 +197,15 @@ int32_t dex_subsock_poll(struct supernet_info *myinfo) return(size); } -void dex_update(struct supernet_info *myinfo) +void dex_updateclient(struct supernet_info *myinfo) { int32_t i; - for (i=0; i<100; i++) - if ( dex_subsock_poll(myinfo) <= 0 ) - break; + if ( myinfo->IAMNOTARY == 0 ) + { + for (i=0; i<100; i++) + if ( dex_subsock_poll(myinfo) <= 0 ) + break; + } } #if ISNOTARYNODE @@ -274,7 +285,7 @@ int32_t dpow_addnotary(struct supernet_info *myinfo,struct dpow_info *dp,char *i void dpow_nanomsginit(struct supernet_info *myinfo,char *ipaddr) { - char str[512]; int32_t timeout,retval,maxsize; + char str[512]; int32_t timeout,retval,maxsize,dpowsock,dexsock,repsock,pubsock; if ( myinfo->ipaddr[0] == 0 ) { printf("need to set ipaddr before nanomsg\n"); @@ -283,69 +294,73 @@ void dpow_nanomsginit(struct supernet_info *myinfo,char *ipaddr) if ( myinfo->IAMNOTARY == 0 ) return; portable_mutex_lock(&myinfo->notarymutex); - if ( myinfo->dpowsock < 0 && (myinfo->dpowsock= nn_socket(AF_SP,NN_BUS)) >= 0 ) + dpowsock = myinfo->dpowsock; + dexsock = myinfo->dexsock; + repsock = myinfo->repsock; + pubsock = myinfo->pubsock; + if ( dpowsock < 0 && (dpowsock= nn_socket(AF_SP,NN_BUS)) >= 0 ) { - if ( nn_bind(myinfo->dpowsock,nanomsg_tcpname(myinfo,str,myinfo->ipaddr,DPOW_SOCK)) < 0 ) + if ( nn_bind(dpowsock,nanomsg_tcpname(myinfo,str,myinfo->ipaddr,DPOW_SOCK)) < 0 ) { printf("error binding to dpowsock (%s)\n",nanomsg_tcpname(myinfo,str,myinfo->ipaddr,DPOW_SOCK)); - nn_close(myinfo->dpowsock); - myinfo->dpowsock = -1; + nn_close(dpowsock); + dpowsock = -1; } else { printf("NN_BIND to %s\n",str); - if ( myinfo->dexsock < 0 && (myinfo->dexsock= nn_socket(AF_SP,NN_BUS)) >= 0 ) + if ( dexsock < 0 && (dexsock= nn_socket(AF_SP,NN_BUS)) >= 0 ) { - if ( nn_bind(myinfo->dexsock,nanomsg_tcpname(myinfo,str,myinfo->ipaddr,DEX_SOCK)) < 0 ) + if ( nn_bind(dexsock,nanomsg_tcpname(myinfo,str,myinfo->ipaddr,DEX_SOCK)) < 0 ) { printf("error binding to dexsock (%s)\n",nanomsg_tcpname(myinfo,str,myinfo->ipaddr,DEX_SOCK)); - nn_close(myinfo->dexsock); - myinfo->dexsock = -1; - nn_close(myinfo->dpowsock); - myinfo->dpowsock = -1; + nn_close(dexsock); + dexsock = -1; + nn_close(dpowsock); + dpowsock = -1; } else { - if ( myinfo->pubsock < 0 && (myinfo->pubsock= nn_socket(AF_SP,NN_PUB)) >= 0 ) + if ( pubsock < 0 && (pubsock= nn_socket(AF_SP,NN_PUB)) >= 0 ) { - if ( nn_bind(myinfo->pubsock,nanomsg_tcpname(myinfo,str,myinfo->ipaddr,PUB_SOCK)) < 0 ) + if ( nn_bind(pubsock,nanomsg_tcpname(myinfo,str,myinfo->ipaddr,PUB_SOCK)) < 0 ) { printf("error binding to pubsock (%s)\n",nanomsg_tcpname(myinfo,str,myinfo->ipaddr,PUB_SOCK)); - nn_close(myinfo->pubsock); - myinfo->pubsock = -1; - nn_close(myinfo->dexsock); - myinfo->dexsock = -1; - nn_close(myinfo->dpowsock); - myinfo->dpowsock = -1; + nn_close(pubsock); + pubsock = -1; + nn_close(dexsock); + dexsock = -1; + nn_close(dpowsock); + dpowsock = -1; } else { - if ( myinfo->repsock < 0 && (myinfo->repsock= nn_socket(AF_SP,NN_REP)) >= 0 ) + if ( repsock < 0 && (repsock= nn_socket(AF_SP,NN_REP)) >= 0 ) { - if ( nn_bind(myinfo->repsock,nanomsg_tcpname(myinfo,str,myinfo->ipaddr,REP_SOCK)) < 0 ) + if ( nn_bind(repsock,nanomsg_tcpname(myinfo,str,myinfo->ipaddr,REP_SOCK)) < 0 ) { printf("error binding to repsock (%s)\n",nanomsg_tcpname(myinfo,str,myinfo->ipaddr,REP_SOCK)); - nn_close(myinfo->repsock); - myinfo->repsock = -1; - nn_close(myinfo->pubsock); - myinfo->pubsock = -1; - nn_close(myinfo->dexsock); - myinfo->dexsock = -1; - nn_close(myinfo->dpowsock); - myinfo->dpowsock = -1; + nn_close(repsock); + repsock = -1; + nn_close(pubsock); + pubsock = -1; + nn_close(dexsock); + dexsock = -1; + nn_close(dpowsock); + dpowsock = -1; } else { timeout = 100; - nn_setsockopt(myinfo->dexsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); - nn_setsockopt(myinfo->repsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); - nn_setsockopt(myinfo->dexsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); + nn_setsockopt(dexsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); + nn_setsockopt(repsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); + nn_setsockopt(dexsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); timeout = 500; - nn_setsockopt(myinfo->repsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); + nn_setsockopt(repsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); maxsize = 1024 * 1024; - printf("RCVBUF.%d\n",nn_setsockopt(myinfo->dexsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize))); - printf("RCVBUF.%d\n",nn_setsockopt(myinfo->repsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize))); - printf("DEXINIT dpow.%d dex.%d rep.%d\n",myinfo->dpowsock,myinfo->dexsock,myinfo->repsock); + printf("RCVBUF.%d\n",nn_setsockopt(dexsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize))); + printf("RCVBUF.%d\n",nn_setsockopt(repsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize))); + printf("DEXINIT dpow.%d dex.%d rep.%d\n",dpowsock,myinfo->dexsock,myinfo->repsock); } } } @@ -355,12 +370,21 @@ void dpow_nanomsginit(struct supernet_info *myinfo,char *ipaddr) myinfo->dpowipbits[0] = (uint32_t)calc_ipbits(myinfo->ipaddr); myinfo->numdpowipbits = 1; timeout = 1000; - nn_setsockopt(myinfo->dpowsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); + nn_setsockopt(dpowsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); maxsize = 1024 * 1024; - printf("RCVBUF.%d\n",nn_setsockopt(myinfo->dpowsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize))); + printf("RCVBUF.%d\n",nn_setsockopt(dpowsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize))); + myinfo->nanoinit = (uint32_t)time(NULL); } } //else printf("error creating nanosocket\n"); + if ( myinfo->dpowsock != dpowsock ) + myinfo->dpowsock = dpowsock; + if ( myinfo->dexsock != dexsock ) + myinfo->dexsock = dexsock; + if ( myinfo->repsock != repsock ) + myinfo->repsock = repsock; + if ( myinfo->pubsock != pubsock ) + myinfo->pubsock = pubsock; portable_mutex_unlock(&myinfo->notarymutex); dpow_addnotary(myinfo,0,ipaddr); }