Browse Source

Merge pull request #152 from jl777/dev

Dev
win-cross
jl777 8 years ago
committed by GitHub
parent
commit
9a1fe05ee1
  1. 51
      iguana/dpow/dpow_network.c

51
iguana/dpow/dpow_network.c

@ -24,6 +24,7 @@ struct dex_nanomsghdr
void dex_init(struct supernet_info *myinfo) void dex_init(struct supernet_info *myinfo)
{ {
return;
strcpy(myinfo->dexseed_ipaddr,"78.47.196.146"); strcpy(myinfo->dexseed_ipaddr,"78.47.196.146");
myinfo->dexipbits[0] = (uint32_t)calc_ipbits(myinfo->dexseed_ipaddr); myinfo->dexipbits[0] = (uint32_t)calc_ipbits(myinfo->dexseed_ipaddr);
myinfo->numdexipbits = 1; myinfo->numdexipbits = 1;
@ -57,6 +58,8 @@ void dex_packet(struct supernet_info *myinfo,struct dex_nanomsghdr *dexp,int32_t
void dex_reqsend(struct supernet_info *myinfo,uint8_t *data,int32_t datalen) void dex_reqsend(struct supernet_info *myinfo,uint8_t *data,int32_t datalen)
{ {
struct dex_nanomsghdr *dexp; char ipaddr[64],str[128]; int32_t timeout,i,n,size,recvbytes,sentbytes = 0; uint32_t crc32,*retptr,ipbits; struct dex_nanomsghdr *dexp; char ipaddr[64],str[128]; int32_t timeout,i,n,size,recvbytes,sentbytes = 0; uint32_t crc32,*retptr,ipbits;
printf("dex_reqsend not active yet\n");
return;
if ( myinfo->reqsock < 0 && (myinfo->reqsock= nn_socket(AF_SP,NN_REQ)) >= 0 ) if ( myinfo->reqsock < 0 && (myinfo->reqsock= nn_socket(AF_SP,NN_REQ)) >= 0 )
{ {
if ( nn_connect(myinfo->reqsock,nanomsg_tcpname(str,myinfo->dexseed_ipaddr,REP_SOCK)) < 0 ) if ( nn_connect(myinfo->reqsock,nanomsg_tcpname(str,myinfo->dexseed_ipaddr,REP_SOCK)) < 0 )
@ -166,6 +169,7 @@ int32_t dex_packetcheck(struct supernet_info *myinfo,struct dex_nanomsghdr *dexp
void dex_subsock_poll(struct supernet_info *myinfo) void dex_subsock_poll(struct supernet_info *myinfo)
{ {
int32_t size,n=0; struct dex_nanomsghdr *dexp; int32_t size,n=0; struct dex_nanomsghdr *dexp;
printf("subsock poll not active yet\n");
while ( (size= nn_recv(myinfo->subsock,&dexp,NN_MSG,0)) >= 0 ) while ( (size= nn_recv(myinfo->subsock,&dexp,NN_MSG,0)) >= 0 )
{ {
n++; n++;
@ -210,9 +214,9 @@ void dpow_sigscheck(struct supernet_info *myinfo,struct dpow_info *dp,struct dpo
int32_t dpow_addnotary(struct supernet_info *myinfo,struct dpow_info *dp,char *ipaddr) int32_t dpow_addnotary(struct supernet_info *myinfo,struct dpow_info *dp,char *ipaddr)
{ {
char str[512]; uint32_t ipbits,*ptr; int32_t i,iter,n,retval = -1; char str[512]; uint32_t ipbits,*ptr; int32_t i,iter,n,retval = -1;
portable_mutex_lock(&myinfo->notarymutex);
if ( myinfo->dpowsock >= 0 && myinfo->dexsock >= 0 ) if ( myinfo->dpowsock >= 0 && myinfo->dexsock >= 0 )
{ {
portable_mutex_lock(&myinfo->notarymutex);
ipbits = (uint32_t)calc_ipbits(ipaddr); ipbits = (uint32_t)calc_ipbits(ipaddr);
for (iter=0; iter<2; iter++) for (iter=0; iter<2; iter++)
{ {
@ -250,8 +254,8 @@ int32_t dpow_addnotary(struct supernet_info *myinfo,struct dpow_info *dp,char *i
if ( dp == 0 ) if ( dp == 0 )
break; break;
} }
portable_mutex_unlock(&myinfo->notarymutex);
} }
portable_mutex_unlock(&myinfo->notarymutex);
return(retval); return(retval);
} }
@ -263,6 +267,7 @@ void dpow_nanomsginit(struct supernet_info *myinfo,char *ipaddr)
printf("need to set ipaddr before nanomsg\n"); printf("need to set ipaddr before nanomsg\n");
return; return;
} }
portable_mutex_lock(&myinfo->notarymutex);
if ( myinfo->dpowsock < 0 && (myinfo->dpowsock= nn_socket(AF_SP,NN_BUS)) >= 0 ) if ( myinfo->dpowsock < 0 && (myinfo->dpowsock= nn_socket(AF_SP,NN_BUS)) >= 0 )
{ {
if ( nn_bind(myinfo->dpowsock,nanomsg_tcpname(str,myinfo->ipaddr,DPOW_SOCK)) < 0 ) if ( nn_bind(myinfo->dpowsock,nanomsg_tcpname(str,myinfo->ipaddr,DPOW_SOCK)) < 0 )
@ -339,7 +344,8 @@ void dpow_nanomsginit(struct supernet_info *myinfo,char *ipaddr)
printf("RCVBUF.%d\n",nn_setsockopt(myinfo->dpowsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize))); printf("RCVBUF.%d\n",nn_setsockopt(myinfo->dpowsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize)));
myinfo->nanoinit = (uint32_t)time(NULL); myinfo->nanoinit = (uint32_t)time(NULL);
} }
} else printf("error creating nanosocket\n"); } //else printf("error creating nanosocket\n");
portable_mutex_unlock(&myinfo->notarymutex);
dpow_addnotary(myinfo,0,ipaddr); dpow_addnotary(myinfo,0,ipaddr);
} }
@ -385,9 +391,9 @@ void dpow_bestconsensus(struct dpow_block *bp)
} }
if ( besti >= 0 && bestks[besti] >= 0 && masks[besti] != 0 && (recvmask & masks[besti]) == masks[besti] ) if ( besti >= 0 && bestks[besti] >= 0 && masks[besti] != 0 && (recvmask & masks[besti]) == masks[besti] )
{ {
bp->bestmask = masks[besti]; bp->notaries[bp->myind].bestmask = bp->bestmask = masks[besti];
bp->bestk = bestks[besti]; bp->notaries[bp->myind].bestk = bp->bestk = bestks[besti];
printf("set best to (%d %llx) recv.%llx\n",bp->bestk,(long long)bp->bestmask,(long long)recvmask); //printf("set best.%d to (%d %llx) recv.%llx\n",best,bp->bestk,(long long)bp->bestmask,(long long)recvmask);
} }
bp->recvmask = recvmask; bp->recvmask = recvmask;
if ( bp->bestmask == 0 )//|| (time(NULL) / 180) != bp->lastepoch ) if ( bp->bestmask == 0 )//|| (time(NULL) / 180) != bp->lastepoch )
@ -690,13 +696,13 @@ void dpow_notarize_update(struct supernet_info *myinfo,struct dpow_info *dp,stru
{ {
bestmatches++; bestmatches++;
paxbestmatches++; paxbestmatches++;
} //else printf("?%x ",bp->notaries[i].paxwdcrc);
} }
} }
} if ( 0 && bp->myind <= 2 && bp->notaries[i].paxwdcrc != 0 )
if ( bp->myind <= 2 && bp->notaries[i].paxwdcrc != 0 )
printf("%d.(%x %d %llx r%llx) ",i,bp->notaries[i].paxwdcrc,bp->notaries[i].bestk,(long long)bp->notaries[i].bestmask,(long long)bp->notaries[i].recvmask); printf("%d.(%x %d %llx r%llx) ",i,bp->notaries[i].paxwdcrc,bp->notaries[i].bestk,(long long)bp->notaries[i].bestmask,(long long)bp->notaries[i].recvmask);
} }
if ( bp->myind <= 2 ) if ( 0 && bp->myind <= 2 )
printf("recv.%llx best.(%d %llx) m.%d p.%d:%d b.%d\n",(long long)bp->recvmask,bp->bestk,(long long)bp->bestmask,matches,paxmatches,paxbestmatches,bestmatches); printf("recv.%llx best.(%d %llx) m.%d p.%d:%d b.%d\n",(long long)bp->recvmask,bp->bestk,(long long)bp->bestmask,matches,paxmatches,paxbestmatches,bestmatches);
if ( bestmatches >= bp->minsigs && paxbestmatches >= bp->minsigs ) if ( bestmatches >= bp->minsigs && paxbestmatches >= bp->minsigs )
{ {
@ -719,7 +725,7 @@ void dpow_notarize_update(struct supernet_info *myinfo,struct dpow_info *dp,stru
} //else printf("destmask.%llx != bestmask.%llx\n",(long long)bp->destsigsmasks[bp->bestk],(long long)bp->bestmask); } //else printf("destmask.%llx != bestmask.%llx\n",(long long)bp->destsigsmasks[bp->bestk],(long long)bp->bestmask);
} }
} }
if ( (rand() % 30) == 0 ) //if ( (rand() % 30) == 0 )
printf("[%d] ips.%d %s NOTARIZE.%d matches.%d paxmatches.%d bestmatches.%d bestk.%d %llx recv.%llx sigmasks.(%llx %llx) senderind.%d state.%x (%x %x %x) pax.%x\n",bp->myind,dp->numipbits,dp->symbol,bp->minsigs,matches,paxmatches,bestmatches,bp->bestk,(long long)bp->bestmask,(long long)bp->recvmask,(long long)(bp->bestk>=0?bp->destsigsmasks[bp->bestk]:0),(long long)(bp->bestk>=0?bp->srcsigsmasks[bp->bestk]:0),senderind,bp->state,bp->hashmsg.uints[0],bp->desttxid.uints[0],bp->srctxid.uints[0],bp->paxwdcrc); printf("[%d] ips.%d %s NOTARIZE.%d matches.%d paxmatches.%d bestmatches.%d bestk.%d %llx recv.%llx sigmasks.(%llx %llx) senderind.%d state.%x (%x %x %x) pax.%x\n",bp->myind,dp->numipbits,dp->symbol,bp->minsigs,matches,paxmatches,bestmatches,bp->bestk,(long long)bp->bestmask,(long long)bp->recvmask,(long long)(bp->bestk>=0?bp->destsigsmasks[bp->bestk]:0),(long long)(bp->bestk>=0?bp->srcsigsmasks[bp->bestk]:0),senderind,bp->state,bp->hashmsg.uints[0],bp->desttxid.uints[0],bp->srctxid.uints[0],bp->paxwdcrc);
} }
} }
@ -733,7 +739,7 @@ void dpow_nanoutxoget(struct supernet_info *myinfo,struct dpow_info *dp,struct d
else else
{ {
dpow_notarize_update(myinfo,dp,bp,senderind,(int8_t)np->bestk,np->bestmask,np->recvmask,np->srcutxo,np->srcvout,np->destutxo,np->destvout,np->siglens,np->sigs,np->paxwdcrc); dpow_notarize_update(myinfo,dp,bp,senderind,(int8_t)np->bestk,np->bestmask,np->recvmask,np->srcutxo,np->srcvout,np->destutxo,np->destvout,np->siglens,np->sigs,np->paxwdcrc);
if ( bp->myind <= 2 ) if ( 0 && bp->myind <= 2 )
printf("lag.[%d] RECV.%d r%llx (%d %llx) %llx/%llx\n",(int32_t)(time(NULL)-channel),senderind,(long long)np->recvmask,(int8_t)np->bestk,(long long)np->bestmask,(long long)np->srcutxo.txid,(long long)np->destutxo.txid); printf("lag.[%d] RECV.%d r%llx (%d %llx) %llx/%llx\n",(int32_t)(time(NULL)-channel),senderind,(long long)np->recvmask,(int8_t)np->bestk,(long long)np->bestmask,(long long)np->srcutxo.txid,(long long)np->destutxo.txid);
} }
//dpow_bestmask_update(myinfo,dp,bp,nn_senderind,nn_bestk,nn_bestmask,nn_recvmask); //dpow_bestmask_update(myinfo,dp,bp,nn_senderind,nn_bestk,nn_bestmask,nn_recvmask);
@ -744,8 +750,8 @@ void dpow_send(struct supernet_info *myinfo,struct dpow_info *dp,struct dpow_blo
struct dpow_nanomsghdr *np; int32_t i,size,extralen=0,sentbytes = 0; uint32_t crc32,paxwdcrc; uint8_t extras[10000]; struct dpow_nanomsghdr *np; int32_t i,size,extralen=0,sentbytes = 0; uint32_t crc32,paxwdcrc; uint8_t extras[10000];
if ( bp->myind < 0 ) if ( bp->myind < 0 )
return; return;
//if ( time(NULL) < myinfo->nanoinit+5 ) if ( time(NULL) < myinfo->nanoinit+5 )
// return; return;
crc32 = calc_crc32(0,data,datalen); crc32 = calc_crc32(0,data,datalen);
//dp->crcs[firstz] = crc32; //dp->crcs[firstz] = crc32;
size = (int32_t)(sizeof(*np) + datalen); size = (int32_t)(sizeof(*np) + datalen);
@ -800,15 +806,11 @@ void dpow_send(struct supernet_info *myinfo,struct dpow_info *dp,struct dpow_blo
sentbytes = nn_send(myinfo->dpowsock,np,size,0); sentbytes = nn_send(myinfo->dpowsock,np,size,0);
break; break;
} }
} usleep(1000);
if ( myinfo->dexsock >= 0 )
{
printf("SEND DEX PACKET\n");
nn_send(myinfo->dexsock,np,size,0);
} }
portable_mutex_unlock(&myinfo->dpowmutex); portable_mutex_unlock(&myinfo->dpowmutex);
free(np); free(np);
if ( bp->myind <= 2 ) if ( 0 && bp->myind <= 2 )
printf("%d NANOSEND.%d ht.%d channel.%08x (%d) pax.%08x datalen.%d (%d %llx) (%d %llx) recv.%llx\n",i,sentbytes,np->height,np->channel,size,np->notarize.paxwdcrc,datalen,(int8_t)np->notarize.bestk,(long long)np->notarize.bestmask,bp->notaries[bp->myind].bestk,(long long)bp->notaries[bp->myind].bestmask,(long long)bp->recvmask); printf("%d NANOSEND.%d ht.%d channel.%08x (%d) pax.%08x datalen.%d (%d %llx) (%d %llx) recv.%llx\n",i,sentbytes,np->height,np->channel,size,np->notarize.paxwdcrc,datalen,(int8_t)np->notarize.bestk,(long long)np->notarize.bestmask,bp->notaries[bp->myind].bestk,(long long)bp->notaries[bp->myind].bestmask,(long long)bp->recvmask);
} }
@ -867,6 +869,7 @@ int32_t dpow_nanomsg_update(struct supernet_info *myinfo)
pfd.events = NN_POLLIN; pfd.events = NN_POLLIN;
if ( nn_poll(&pfd,1,100) > 0 ) if ( nn_poll(&pfd,1,100) > 0 )
break; break;
usleep(1000);
} }
if ( i < 100 && (size= nn_recv(myinfo->dpowsock,&np,NN_MSG,0)) >= 0 ) if ( i < 100 && (size= nn_recv(myinfo->dpowsock,&np,NN_MSG,0)) >= 0 )
{ {
@ -917,25 +920,25 @@ int32_t dpow_nanomsg_update(struct supernet_info *myinfo)
} else printf("illegal size.%d\n",size); } else printf("illegal size.%d\n",size);
if ( np != 0 ) if ( np != 0 )
nn_freemsg(np), np = 0; nn_freemsg(np), np = 0;
} else printf("no packets\n"); } //else printf("no packets\n");
n = 0; n = 0;
if ( myinfo->dexsock >= 0 ) if ( 0 && myinfo->dexsock >= 0 )
{ {
if ( (size= nn_recv(myinfo->dexsock,&dexp,NN_MSG,0)) >= 0 ) if ( (size= nn_recv(myinfo->dexsock,&dexp,NN_MSG,0)) >= 0 )
{ {
num++; num++;
/*if ( dex_packetcheck(myinfo,dexp,size) == 0 ) if ( dex_packetcheck(myinfo,dexp,size) == 0 )
{ {
printf("FROM BUS.%08x -> pub\n",dexp->crc32); printf("FROM BUS.%08x -> pub\n",dexp->crc32);
nn_send(myinfo->pubsock,dexp,size,0); nn_send(myinfo->pubsock,dexp,size,0);
dex_packet(myinfo,dexp,size); dex_packet(myinfo,dexp,size);
}*/ }
printf("GOT DEX PACKET.%d\n",size); printf("GOT DEX PACKET.%d\n",size);
if ( dexp != 0 ) if ( dexp != 0 )
nn_freemsg(dexp), dexp = 0; nn_freemsg(dexp), dexp = 0;
} }
} }
if ( myinfo->repsock >= 0 ) if ( 0 && myinfo->repsock >= 0 )
{ {
if ( (size= nn_recv(myinfo->repsock,&dexp,NN_MSG,0)) >= 0 ) if ( (size= nn_recv(myinfo->repsock,&dexp,NN_MSG,0)) >= 0 )
{ {

Loading…
Cancel
Save