diff --git a/iguana/dPoW.h b/iguana/dPoW.h index b85235628..b8e316cdd 100755 --- a/iguana/dPoW.h +++ b/iguana/dPoW.h @@ -45,6 +45,12 @@ #define DPOW_MAXRELAYS 64 #define DPOW_MAXSIGLEN 128 +#define DEX_VERSION 0x0101 +#define DPOW_SOCK 7775 +#define DEX_SOCK 7774 +#define PUB_SOCK 7773 +#define REP_SOCK 7772 + struct dpow_coinentry { bits256 prev_hash; @@ -125,7 +131,7 @@ struct dpow_info struct dpow_checkpoint checkpoint,last,destchaintip,srcfifo[DPOW_FIFOSIZE],destfifo[DPOW_FIFOSIZE]; struct dpow_hashheight approved[DPOW_FIFOSIZE],notarized[DPOW_FIFOSIZE]; bits256 srctx[DPOW_MAXTX],desttx[DPOW_MAXTX]; - uint32_t SRCREALTIME,destupdated,srcconfirms,numdesttx,numsrctx,lastsplit,cancelratify,crcs[16]; + uint32_t SRCREALTIME,destupdated,srcconfirms,numdesttx,numsrctx,lastsplit,cancelratify; int32_t lastheight,maxblocks,SRCHEIGHT,SHORTFLAG,ratifying; struct pax_transaction *PAX; portable_mutex_t mutex; diff --git a/iguana/dpow/dpow_network.c b/iguana/dpow/dpow_network.c index a5e7c29b0..0d7a102b7 100755 --- a/iguana/dpow/dpow_network.c +++ b/iguana/dpow/dpow_network.c @@ -34,14 +34,20 @@ struct dpow_nanomsghdr uint8_t senderind,version0,version1,packet[]; } PACKED; +struct dex_nanomsghdr +{ + uint32_t size,datalen,crc32; + uint8_t version0,version1,packet[]; +} PACKED; + uint64_t dpow_ratifybest(uint64_t refmask,struct dpow_block *bp,int8_t *lastkp); struct dpow_block *dpow_heightfind(struct supernet_info *myinfo,struct dpow_info *dp,int32_t height); int32_t dpow_signedtxgen(struct supernet_info *myinfo,struct dpow_info *dp,struct iguana_info *coin,struct dpow_block *bp,int8_t bestk,uint64_t bestmask,int32_t myind,uint32_t deprec,int32_t src_or_dest,int32_t useratified); void dpow_sigscheck(struct supernet_info *myinfo,struct dpow_info *dp,struct dpow_block *bp,int32_t myind,int32_t src_or_dest,int8_t bestk,uint64_t bestmask,uint8_t pubkeys[64][33],int32_t numratified); -char *nanomsg_tcpname(char *str,char *ipaddr) +char *nanomsg_tcpname(char *str,char *ipaddr,uint16_t port) { - sprintf(str,"tcp://%s:7775",ipaddr); + sprintf(str,"tcp://%s:%u",ipaddr,port); return(str); } @@ -84,7 +90,10 @@ int32_t dpow_addnotary(struct supernet_info *myinfo,struct dpow_info *dp,char *i { ptr[n] = ipbits; if ( iter == 0 && strcmp(ipaddr,myinfo->ipaddr) != 0 ) - retval = nn_connect(myinfo->dpowsock,nanomsg_tcpname(str,ipaddr)); + { + retval = nn_connect(myinfo->dpowsock,nanomsg_tcpname(str,ipaddr,DPOW_SOCK)); + retval = nn_connect(myinfo->dexsock,nanomsg_tcpname(str,ipaddr,DEX_SOCK)); + } n++; qsort(ptr,n,sizeof(uint32_t),_increasing_ipbits); if ( iter == 0 ) @@ -112,35 +121,89 @@ void dpow_nanomsginit(struct supernet_info *myinfo,char *ipaddr) } if ( myinfo->dpowsock < 0 && (myinfo->dpowsock= nn_socket(AF_SP,NN_BUS)) >= 0 ) { - if ( nn_bind(myinfo->dpowsock,nanomsg_tcpname(str,myinfo->ipaddr)) < 0 ) + if ( nn_bind(myinfo->dpowsock,nanomsg_tcpname(str,myinfo->ipaddr,DPOW_SOCK)) < 0 ) { - printf("error binding to (%s)\n",nanomsg_tcpname(str,myinfo->ipaddr)); + printf("error binding to dpowsock (%s)\n",nanomsg_tcpname(str,myinfo->ipaddr,DPOW_SOCK)); nn_close(myinfo->dpowsock); myinfo->dpowsock = -1; } - timeout = 1000; - nn_setsockopt(myinfo->dpowsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); - myinfo->dpowipbits[0] = (uint32_t)calc_ipbits(myinfo->ipaddr); - myinfo->numdpowipbits = 1; + else + { + if ( myinfo->dexsock < 0 && (myinfo->dexsock= nn_socket(AF_SP,NN_BUS)) >= 0 ) + { + if ( nn_bind(myinfo->dexsock,nanomsg_tcpname(str,myinfo->ipaddr,DEX_SOCK)) < 0 ) + { + printf("error binding to dexsock (%s)\n",nanomsg_tcpname(str,myinfo->ipaddr,DEX_SOCK)); + nn_close(myinfo->dexsock); + myinfo->dexsock = -1; + nn_close(myinfo->dpowsock); + myinfo->dpowsock = -1; + } + else + { + if ( myinfo->pubsock < 0 && (myinfo->pubsock= nn_socket(AF_SP,NN_PUB)) >= 0 ) + { + if ( nn_bind(myinfo->pubsock,nanomsg_tcpname(str,myinfo->ipaddr,PUB_SOCK)) < 0 ) + { + printf("error binding to pubsock (%s)\n",nanomsg_tcpname(str,myinfo->ipaddr,PUB_SOCK)); + nn_close(myinfo->pubsock); + myinfo->pubsock = -1; + nn_close(myinfo->dexsock); + myinfo->dexsock = -1; + nn_close(myinfo->dpowsock); + myinfo->dpowsock = -1; + } + else + { + if ( myinfo->repsock < 0 && (myinfo->repsock= nn_socket(AF_SP,NN_REP)) >= 0 ) + { + if ( nn_bind(myinfo->repsock,nanomsg_tcpname(str,myinfo->ipaddr,REP_SOCK)) < 0 ) + { + printf("error binding to repsock (%s)\n",nanomsg_tcpname(str,myinfo->ipaddr,REP_SOCK)); + nn_close(myinfo->repsock); + myinfo->repsock = -1; + nn_close(myinfo->pubsock); + myinfo->pubsock = -1; + nn_close(myinfo->dexsock); + myinfo->dexsock = -1; + nn_close(myinfo->dpowsock); + myinfo->dpowsock = -1; + } + else + { + myinfo->dpowipbits[0] = (uint32_t)calc_ipbits(myinfo->ipaddr); + myinfo->numdpowipbits = 1; + timeout = 1000; + nn_setsockopt(myinfo->dpowsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); + nn_setsockopt(myinfo->dexsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); + nn_setsockopt(myinfo->repsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); + } + } + } + } + } + } + } } dpow_addnotary(myinfo,0,ipaddr); } -int32_t dpow_crc32find(struct supernet_info *myinfo,struct dpow_info *dp,uint32_t crc32,uint32_t channel) +int32_t dpow_crc32find(struct supernet_info *myinfo,uint32_t crc32) { int32_t i,firstz = -1; for (i=0; icrcs)/sizeof(*dp->crcs); i++) { - if ( dp->crcs[i] == crc32 ) + if ( myinfo->dexcrcs[i] == crc32 ) { //printf("NANODUPLICATE.%08x\n",crc32); return(-1); } - else if ( firstz < 0 && dp->crcs[i] == 0 ) + else if ( firstz < 0 && myinfo->dexcrcs[i] == 0 ) firstz = i; } if ( firstz < 0 ) - firstz = (rand() % (sizeof(dp->crcs)/sizeof(*dp->crcs))); + firstz = (rand() % (sizeof(myinfo->dexcrcs)/sizeof(*myinfo->dexcrcs))); + myinfo->dexcrcs[firstz] = crc32; return(firstz); } @@ -446,44 +509,41 @@ void dpow_send(struct supernet_info *myinfo,struct dpow_info *dp,struct dpow_blo { struct dpow_nanomsghdr *np; int32_t i,size,sentbytes = 0; uint32_t crc32; crc32 = calc_crc32(0,data,datalen); - //if ( (firstz= dpow_crc32find(myinfo,crc32,channel)) >= 0 ) + //dp->crcs[firstz] = crc32; + size = (int32_t)(sizeof(*np) + datalen); + np = calloc(1,size); // endian dependent! + if ( (np->numipbits= dp->numipbits) == 0 ) { - //dp->crcs[firstz] = crc32; - size = (int32_t)(sizeof(*np) + datalen); - np = calloc(1,size); // endian dependent! - if ( (np->numipbits= dp->numipbits) == 0 ) - { - dp->ipbits[0] = myinfo->myaddr.myipbits; - np->numipbits = dp->numipbits = 1; - } - np->senderind = bp->myind; - memcpy(np->ipbits,dp->ipbits,dp->numipbits * sizeof(*dp->ipbits)); - //for (i=0; inumipbits; i++) - // printf("%08x ",np->ipbits[i]); - //printf(" dpow_send.(%d) size.%d numipbits.%d myind.%d\n",datalen,size,np->numipbits,bp->myind); - if ( bp->isratify == 0 ) - dpow_nanoutxoset(&np->notarize,bp,0); - else dpow_nanoutxoset(&np->ratify,bp,1); - np->size = size; - np->datalen = datalen; - np->crc32 = crc32; - for (i=0; i<2; i++) - np->ratify.pendingcrcs[i] = bp->pendingcrcs[i]; - for (i=0; i<32; i++) - np->srchash.bytes[i] = dp->minerkey33[i+1]; - //np->srchash = srchash; - np->desthash = desthash; - np->channel = channel; - np->height = msgbits; - np->myipbits = myinfo->myaddr.myipbits; - strcpy(np->symbol,dp->symbol); - np->version0 = DPOW_VERSION & 0xff; - np->version1 = (DPOW_VERSION >> 8) & 0xff; - memcpy(np->packet,data,datalen); - sentbytes = nn_send(myinfo->dpowsock,np,size,0); - free(np); - //printf("NANOSEND ht.%d channel.%08x (%d) crc32.%08x datalen.%d\n",np->height,np->channel,size,np->crc32,datalen); + dp->ipbits[0] = myinfo->myaddr.myipbits; + np->numipbits = dp->numipbits = 1; } + np->senderind = bp->myind; + memcpy(np->ipbits,dp->ipbits,dp->numipbits * sizeof(*dp->ipbits)); + //for (i=0; inumipbits; i++) + // printf("%08x ",np->ipbits[i]); + //printf(" dpow_send.(%d) size.%d numipbits.%d myind.%d\n",datalen,size,np->numipbits,bp->myind); + if ( bp->isratify == 0 ) + dpow_nanoutxoset(&np->notarize,bp,0); + else dpow_nanoutxoset(&np->ratify,bp,1); + np->size = size; + np->datalen = datalen; + np->crc32 = crc32; + for (i=0; i<2; i++) + np->ratify.pendingcrcs[i] = bp->pendingcrcs[i]; + for (i=0; i<32; i++) + np->srchash.bytes[i] = dp->minerkey33[i+1]; + //np->srchash = srchash; + np->desthash = desthash; + np->channel = channel; + np->height = msgbits; + np->myipbits = myinfo->myaddr.myipbits; + strcpy(np->symbol,dp->symbol); + np->version0 = DPOW_VERSION & 0xff; + np->version1 = (DPOW_VERSION >> 8) & 0xff; + memcpy(np->packet,data,datalen); + sentbytes = nn_send(myinfo->dpowsock,np,size,0); + free(np); + //printf("NANOSEND ht.%d channel.%08x (%d) crc32.%08x datalen.%d\n",np->height,np->channel,size,np->crc32,datalen); } void dpow_ipbitsadd(struct supernet_info *myinfo,struct dpow_info *dp,uint32_t *ipbits,int32_t numipbits,int32_t fromid,uint32_t senderipbits) @@ -528,11 +588,32 @@ void dpow_ipbitsadd(struct supernet_info *myinfo,struct dpow_info *dp,uint32_t * //printf("recv numips.(%d %d)\n",myinfo->numdpowipbits,dp->numipbits); } +int32_t dex_packetcheck(struct supernet_info *myinfo,struct dex_nanomsghdr *dexp) +{ + int32_t firstz; uint32_t crc32; + if ( dexp->version0 == (DEX_VERSION & 0xff) && dexp->version1 == ((DEX_VERSION >> 8) & 0xff) ) + { + if ( dexp->datalen == (size - sizeof(*dexp)) ) + { + crc32 = calc_crc32(0,dexp->packet,dexp->datalen); + if ( dexp->crc32 == crc32 && (firstz= dpow_crc32find(myinfo,crc32)) >= 0 ) + return(0); + } + } + return(-1); +} + +void dex_packet(struct supernet_info *myinfo,struct dex_nanomsghdr *dexp,int32_t size) +{ + printf("DEX_PACKET.[%d]\n",size); +} + void dpow_nanomsg_update(struct supernet_info *myinfo) { - int32_t i,n=0,size,firstz = -1; uint32_t crc32; struct dpow_nanomsghdr *np; struct dpow_info *dp; struct dpow_block *bp; + int32_t i,n=0,num=0,size,firstz = -1; uint32_t crc32,r,m; struct dpow_nanomsghdr *np=0; struct dpow_info *dp; struct dpow_block *bp; struct dex_nanomsghdr *dexp = 0; while ( (size= nn_recv(myinfo->dpowsock,&np,NN_MSG,0)) >= 0 ) { + num++; if ( size >= 0 ) { if ( np->version0 == (DPOW_VERSION & 0xff) && np->version1 == ((DPOW_VERSION >> 8) & 0xff) ) @@ -550,7 +631,7 @@ void dpow_nanomsg_update(struct supernet_info *myinfo) break; } } - if ( dp != 0 && crc32 == np->crc32 )//&& (firstz= dpow_crc32find(myinfo,dp,crc32,np->channel)) >= 0 ) + if ( dp != 0 && crc32 == np->crc32 ) { //char str[65]; printf("%s RECV ht.%d ch.%08x (%d) crc32.%08x:%08x datalen.%d:%d firstz.%d\n",bits256_str(str,np->srchash),np->height,np->channel,size,np->crc32,crc32,np->datalen,(int32_t)(size - sizeof(*np)),firstz); if ( i == myinfo->numdpows ) @@ -573,14 +654,52 @@ void dpow_nanomsg_update(struct supernet_info *myinfo) } } //else printf("ignore np->datalen.%d %d (size %d - %ld)\n",np->datalen,(int32_t)(size-sizeof(*np)),size,sizeof(*np)); } - if ( np != 0 ) - nn_freemsg(np); } + if ( np != 0 ) + nn_freemsg(np), np = 0; if ( size == 0 || n++ > 100 ) break; } if ( 0 && n != 0 ) printf("nanoupdates.%d\n",n); + n = 0; + while ( (size= nn_recv(myinfo->dexsock,&dexp,NN_MSG,0)) >= 0 ) + { + num++; + if ( dex_packetcheck(myinfo,dexp) == 0 ) + { + printf("FROM BUS.%08x -> pub\n",dexp->crc32); + nn_send(myinfo->pubsock,dexp,size,0); + dex_packet(myinfo,dexp,size); + } + if ( dexp != 0 ) + nn_freemsg(dexp), dexp = 0; + if ( size == 0 || n++ > 100 ) + break; + } + n = 0; + if ( num == 0 ) + { + while ( (size= nn_recv(myinfo->repsock,&dexp,NN_MSG,0)) >= 0 ) + { + num++; + if ( dex_packetcheck(myinfo,dexp) == 0 ) + { + nn_send(myinfo->dexsock,dexp,size,0); + if ( (m= myinfo->numdpowipbits) > 0 ) + { + r = myinfo->dpowipbits[rand() % m]; + nn_send(myinfo->repsock,&r,sizeof(r),0); + printf("REP.%08x -> dexbus, rep.%08x",dexp->crc32,r); + } + dex_packet(myinfo,dexp,size); + } + if ( dexp != 0 ) + nn_freemsg(dexp), dexp = 0; + if ( size == 0 || n++ > 100 ) + break; + } + } } #else diff --git a/iguana/iguana777.h b/iguana/iguana777.h index e63073fd6..324d875c6 100755 --- a/iguana/iguana777.h +++ b/iguana/iguana777.h @@ -83,7 +83,7 @@ struct supernet_info char ipaddr[64],NXTAPIURL[512],secret[4096],password[4096],rpcsymbol[64],handle[1024],permanentfile[1024]; char *decryptstr; int32_t maxdelay,IAMRELAY,IAMNOTARY,IAMLP,publicRPC,basilisk_busy,genesisresults,remoteorigin; - uint32_t expiration,dirty,DEXactive,DEXpoll,totalcoins; + uint32_t expiration,dirty,DEXactive,DEXpoll,totalcoins,dexcrcs[1024]; uint16_t argport,rpcport; struct basilisk_info basilisks; struct exchange_info *tradingexchanges[SUPERNET_MAXEXCHANGES]; int32_t numexchanges; @@ -96,7 +96,7 @@ struct supernet_info void *ctx; uint8_t *pingbuf; FILE *dexfp; - struct dpow_info DPOWS[64]; int32_t numdpows,dpowsock; + struct dpow_info DPOWS[64]; int32_t numdpows,dpowsock,dexsock,pubsock,repsock,subsock,reqsock; struct delayedPoW_info dPoW; struct basilisk_spend *spends; int32_t numspends; //struct peggy_info *PEGS; diff --git a/iguana/main.c b/iguana/main.c index c2e0241df..f855318c9 100755 --- a/iguana/main.c +++ b/iguana/main.c @@ -1586,7 +1586,7 @@ void iguana_main(void *arg) myinfo = SuperNET_MYINFO(0); libgfshare_init(myinfo,myinfo->logs,myinfo->exps); myinfo->rpcport = IGUANA_RPCPORT; - myinfo->dpowsock = -1; + myinfo->dpowsock = myinfo->dexsock = myinfo->pubsock = myinfo->subsock = myinfo->reqsock = myinfo->repsock = -1; //myinfo->rpcport = IGUANA_NOTARYPORT; //myinfo->IAMNOTARY = 1; if ( arg != 0 )