diff --git a/iguana/exchanges/LP_commands.c b/iguana/exchanges/LP_commands.c index 1050c2736..ad13174e8 100644 --- a/iguana/exchanges/LP_commands.c +++ b/iguana/exchanges/LP_commands.c @@ -752,16 +752,21 @@ jpg(srcfile, destfile, power2=7, password, data="", required, ind=0)\n\ { if ( strcmp(method,"psock") == 0 ) { - if ( myipaddr == 0 || myipaddr[0] == 0 || strcmp(myipaddr,"127.0.0.1") == 0 ) - { - if ( LP_mypeer != 0 ) - myipaddr = LP_mypeer->ipaddr; - else printf("LP_psock dont have actual ipaddr?\n"); - } - if ( jint(argjson,"ispaired") != 0 ) - return(LP_psock(myipaddr,jint(argjson,"ispaired"))); - else return(clonestr("{\"error\":\"you are running an obsolete version, update\"}")); - } + int32_t psock; + if ( myipaddr == 0 || myipaddr[0] == 0 || strcmp(myipaddr,"127.0.0.1") == 0 ) + { + if ( LP_mypeer != 0 ) + myipaddr = LP_mypeer->ipaddr; + else printf("LP_psock dont have actual ipaddr?\n"); + } + if ( jint(argjson,"ispaired") != 0 ) + { + retstr = LP_psock(&psock,myipaddr,1,jint(argjson,"cmdchannel"),jbits256(argjson,"pubkey")); + //printf("LP_commands.(%s)\n",retstr); + return(retstr); + } + else return(clonestr("{\"error\":\"you are running an obsolete version, update\"}")); + } } else { diff --git a/iguana/exchanges/LP_include.h b/iguana/exchanges/LP_include.h index ed797f3ba..178cd1e3b 100644 --- a/iguana/exchanges/LP_include.h +++ b/iguana/exchanges/LP_include.h @@ -301,7 +301,7 @@ struct iguana_info portable_mutex_t txmutex,addrmutex; struct LP_transaction *transactions; struct LP_address *addresses; uint64_t txfee; int32_t numutxos,notarized,longestchain,firstrefht,firstscanht,lastscanht,height; uint16_t busport; - uint32_t txversion,dPoWtime,loadedcache,electrumlist,lastunspent,importedprivkey,lastpushtime,lastutxosync,addr_listunspent_requested,lastutxos,updaterate,counter,inactive,lastmempool,lastgetinfo,ratetime,heighttime,lastmonitor,obooktime; + uint32_t txversion,dPoWtime,lastresetutxo,loadedcache,electrumlist,lastunspent,importedprivkey,lastpushtime,lastutxosync,addr_listunspent_requested,lastutxos,updaterate,counter,inactive,lastmempool,lastgetinfo,ratetime,heighttime,lastmonitor,obooktime; uint8_t pubtype,p2shtype,isPoS,wiftype,wiftaddr,taddr,noimportprivkey_flag,userconfirms,isassetchain,maxconfirms; char symbol[128],smartaddr[64],userpass[1024],serverport[128],instantdex_address[64]; // portfolio @@ -361,9 +361,10 @@ struct LP_address struct LP_peerinfo { UT_hash_handle hh; + bits256 pubkey; uint64_t ip_port; uint32_t recvtime,numrecv,ipbits,errortime,errors,numpeers,needping,lasttime,connected,lastutxos,lastpeers,diduquery,good,sessionid; - int32_t pushsock,subsock,isLP; + int32_t pushsock,subsock,isLP,pairsock; uint16_t port,netid; char ipaddr[64]; }; @@ -431,7 +432,7 @@ struct LP_pubkey_info struct LP_pubswap *bobswaps,*aliceswaps; int64_t dynamictrust,unconfcredits; uint32_t timestamp,numerrors,lasttime,slowresponse; - int32_t istrusted; + int32_t istrusted,pairsock; uint8_t rmd160[20],sig[65],pubsecp[33],siglen; }; @@ -486,7 +487,7 @@ int32_t LP_iseligible(uint64_t *valp,uint64_t *val2p,int32_t iambob,char *symbol int32_t LP_pullsock_check(void *ctx,char **retstrp,char *myipaddr,int32_t pubsock,int32_t pullsock); int64_t LP_listunspent_parseitem(struct iguana_info *coin,bits256 *txidp,int32_t *voutp,int32_t *heightp,cJSON *item); void LP_unspents_cache(char *symbol,char *addr,char *arraystr,int32_t updatedflag); -uint16_t LP_psock_get(char *connectaddr,char *publicaddr,int32_t ispaired); +uint16_t LP_psock_get(char *connectaddr,char *publicaddr,int32_t ispaired,int32_t cmdchannel,char *ipaddr); //void LP_utxo_clientpublish(struct LP_utxoinfo *utxo); //int32_t LP_coinbus(uint16_t coin_busport); int32_t LP_nanomsg_recvs(void *ctx); @@ -544,9 +545,10 @@ int32_t LP_listunspent_both(char *symbol,char *coinaddr,int32_t fullflag); uint16_t LP_randpeer(char *destip); void LP_tradebot_pauseall(); void LP_portfolio_reset(); +struct LP_pubkey_info *LP_pubkeyadd(bits256 pubkey); uint32_t LP_atomic_locktime(char *base,char *rel); struct LP_pubkey_info *LP_pubkeyfind(bits256 pubkey); -char *issue_LP_psock(char *destip,uint16_t destport,int32_t ispaired); +char *issue_LP_psock(char *destip,uint16_t destport,int32_t ispaired,int32_t cmdchannel); char *LP_unspents_filestr(char *symbol,char *addr); cJSON *bitcoin_data2json(char *symbol,uint8_t taddr,uint8_t pubtype,uint8_t p2shtype,uint8_t isPoS,int32_t height,bits256 *txidp,struct iguana_msgtx *msgtx,uint8_t *extraspace,int32_t extralen,uint8_t *serialized,int32_t len,cJSON *vins,int32_t suppress_pubkeys,int32_t zcash); //int32_t LP_butxo_findeither(bits256 txid,int32_t vout); diff --git a/iguana/exchanges/LP_nativeDEX.c b/iguana/exchanges/LP_nativeDEX.c index ce82286d9..dc3067eb1 100644 --- a/iguana/exchanges/LP_nativeDEX.c +++ b/iguana/exchanges/LP_nativeDEX.c @@ -206,7 +206,7 @@ char *LP_command_process(void *ctx,char *myipaddr,int32_t pubsock,cJSON *argjson //LP_send(pubsock,retstr,(int32_t)strlen(retstr)+1,0); } } - else if ( LP_statslog_parse() > 0 ) + else if ( LP_statslog_parse() > 0 && 0 ) { memset(zero.bytes,0,sizeof(zero)); if ( (retjson= LP_statslog_disp(2000000000,2000000000,"",zero,0,0))) // pending swaps @@ -303,7 +303,7 @@ char *LP_process_message(void *ctx,char *typestr,char *myipaddr,int32_t pubsock, if ( jsonstr != 0 && argjson != 0 ) { len = (int32_t)strlen(jsonstr) + 1; - if ( (method= jstr(argjson,"method")) != 0 && strcmp(method,"broadcast") == 0 ) + if ( (method= jstr(argjson,"method")) != 0 && strcmp(method,"psock") != 0 && strcmp(method,"broadcast") == 0 ) { bits256 zero; cJSON *reqjson; char *cipherstr; int32_t cipherlen; uint8_t cipher[LP_ENCRYPTED_MAXSIZE]; if ( (reqjson= LP_dereference(argjson,"broadcast")) != 0 ) @@ -1138,9 +1138,29 @@ void LP_reserved_msgs(void *ignore) int32_t LP_reserved_msg(int32_t priority,char *base,char *rel,bits256 pubkey,char *msg) { - int32_t n = 0; + struct LP_pubkey_info *pubp; int32_t sentbytes,n = 0; if ( strcmp(G.USERPASS,"1d8b27b21efabcd96571cd56f91a40fb9aa4cc623d273c63bf9223dc6f8cd81f") == 0 ) return(-1); + if ( priority > 0 && bits256_nonz(pubkey) != 0 ) + { + if ( (pubp= LP_pubkeyfind(pubkey)) != 0 ) + { + if ( pubp->pairsock > 0 ) + { + if ( (sentbytes= nn_send(pubp->pairsock,msg,(int32_t)strlen(msg)+1,0)) < 0 ) + { + pubp->pairsock = -1; + LP_peer_pairsock(pubkey); + printf("mark cmdchannel %d closed sentbytes.%d\n",pubp->pairsock,sentbytes); + } + else + { + printf("sent %d bytes to cmdchannel.%d\n",sentbytes,pubp->pairsock); + return(sentbytes); + } + } + } + } portable_mutex_lock(&LP_reservedmutex); if ( num_Reserved_msgs[priority] < sizeof(Reserved_msgs[priority])/sizeof(*Reserved_msgs[priority]) ) { @@ -1312,7 +1332,7 @@ void LPinit(uint16_t myport,uint16_t mypullport,uint16_t mypubport,uint16_t mybu printf("got %s, initpeers. LP_mypubsock.%d pullsock.%d RPC_port.%u mypullport.%d mypubport.%d pushaddr.%s\n",myipaddr,LP_mypubsock,LP_mypullsock,RPC_port,mypullport,mypubport,pushaddr); LP_passphrase_init(passphrase,jstr(argjson,"gui"),juint(argjson,"netid"),jstr(argjson,"seednode")); #ifndef FROM_JS - if ( IAMLP != 0 && OS_thread_create(malloc(sizeof(pthread_t)),NULL,(void *)LP_psockloop,(void *)myipaddr) != 0 ) + if ( OS_thread_create(malloc(sizeof(pthread_t)),NULL,(void *)LP_psockloop,(void *)myipaddr) != 0 ) { printf("error launching LP_psockloop for (%s)\n",myipaddr); exit(-1); diff --git a/iguana/exchanges/LP_network.c b/iguana/exchanges/LP_network.c index 01fb592c7..5c06facc5 100644 --- a/iguana/exchanges/LP_network.c +++ b/iguana/exchanges/LP_network.c @@ -21,12 +21,12 @@ struct psock { uint32_t lasttime,lastping,errors; - int32_t publicsock,sendsock,ispaired; + int32_t publicsock,sendsock,ispaired,cmdchannel; uint16_t publicport,sendport; char sendaddr[128],publicaddr[128]; } *PSOCKS; -uint16_t Numpsocks,Psockport = MIN_PSOCK_PORT; +uint16_t Numpsocks,Psockport = MIN_PSOCK_PORT,Pcmdport = MAX_PSOCK_PORT; #ifdef FROM_JS @@ -299,16 +299,18 @@ void LP_queuesend(uint32_t crc32,int32_t pubsock,char *base,char *rel,uint8_t *m void LP_broadcast_finish(int32_t pubsock,char *base,char *rel,uint8_t *msg,cJSON *argjson,uint32_t crc32) { - int32_t msglen; + int32_t msglen; char *method; + if ( (method= jstr(argjson,"method")) == 0 ) + return; msg = (void *)jprint(argjson,0); msglen = (int32_t)strlen((char *)msg) + 1; if ( crc32 == 0 ) crc32 = calc_crc32(0,&msg[2],msglen - 2); //printf("crc32.%x IAMLP.%d pubsock.%d\n",crc32,G.LP_IAMLP,pubsock); #ifdef FROM_MARKETMAKER - if ( G.LP_IAMLP == 0 || pubsock < 0 ) + if ( (G.LP_IAMLP == 0 || pubsock < 0) && strcmp(method,"psock") != 0 ) #else - if ( IAMLP == 0 || pubsock < 0 ) + if ( (IAMLP == 0 || pubsock < 0 && strcmp(method,"psock") != 0 ) #endif { free(msg); @@ -412,10 +414,10 @@ uint32_t LP_swapsend(int32_t pairsock,struct basilisk_swap *swap,uint32_t msgbit return(nextbits); } -void LP_psockloop(void *_ptr) // printouts seem to be needed for forwarding to work +void LP_psockloop(void *_ptr) { static struct nn_pollfd *pfds; - int32_t i,n,nonz,iter,retval,sentbytes,size=0,sendsock = -1; uint32_t now; struct psock *ptr=0; void *buf=0; char keepalive[512]; + int32_t nexti=0,j,i,n,nonz,iter,retval,sentbytes,size=0,sendsock = -1; uint32_t now; struct psock *ptr=0; void *buf=0; char keepalive[512]; void *ctx = bitcoin_ctx(); strcpy(LP_psockloop_stats.name,"LP_psockloop"); LP_psockloop_stats.threshold = 1000.; while ( LP_STOP_RECEIVED == 0 ) @@ -447,12 +449,14 @@ void LP_psockloop(void *_ptr) // printouts seem to be needed for forwarding to w memset(pfds,0,sizeof(*pfds) * ((Numpsocks*2 <= MAX_PSOCK_PORT) ? Numpsocks*2 : MAX_PSOCK_PORT)); for (iter=0; iter<2; iter++) { - for (i=n=0; ipublicsock; + //printf("check sock.%d\n",ptr->publicsock); pfds[n].events = POLLIN; } else @@ -460,16 +464,35 @@ void LP_psockloop(void *_ptr) // printouts seem to be needed for forwarding to w if ( pfds[n].fd != ptr->publicsock ) { printf("unexpected fd.%d mismatched publicsock.%d\n",pfds[n].fd,ptr->publicsock); + nexti = i+1; break; } else if ( (pfds[n].revents & POLLIN) != 0 ) { - //printf("publicsock.%d %s has pollin\n",ptr->publicsock,ptr->publicaddr); + //printf("cmd.%d publicsock.%d %s has pollin\n",ptr->cmdchannel,ptr->publicsock,ptr->publicaddr); buf = 0; if ( (size= nn_recv(ptr->publicsock,&buf,NN_MSG,0)) > 0 ) { ptr->lasttime = now; - sendsock = ptr->sendsock; + if ( ptr->cmdchannel == 0 ) + sendsock = ptr->sendsock; + else + { + char *retstr; cJSON *argjson; + //printf("nn_recv.(%s)\n",(char *)buf); + if ( (argjson= cJSON_Parse((char *)buf)) != 0 ) + { + if ( (retstr= LP_command_process(ctx,"127.0.0.0",ptr->publicsock,argjson,buf,size)) != 0 ) + { + printf("processed.(%s)\n",retstr); + if ( (size= nn_send(ptr->publicsock,retstr,(int32_t)strlen(retstr)+1,0)) <= 0 ) + printf("error sending result\n"); + free(retstr); + } + free_json(argjson); + } + } + nexti = i+1; break; } else if ( buf != 0 ) @@ -481,52 +504,57 @@ void LP_psockloop(void *_ptr) // printouts seem to be needed for forwarding to w } } n++; - if ( iter == 0 ) - { - pfds[n].fd = ptr->sendsock; - pfds[n].events = POLLIN; - } - else + if ( ptr->sendsock > 0 ) { - if ( pfds[n].fd != ptr->sendsock ) + if ( iter == 0 ) { - printf("unexpected fd.%d mismatched sendsock.%d\n",pfds[n].fd,ptr->sendsock); - break; + pfds[n].fd = ptr->sendsock; + pfds[n].events = POLLIN; } - else if ( (pfds[n].revents & POLLIN) != 0 ) + else { - if ( (size= nn_recv(ptr->sendsock,&buf,NN_MSG,0)) > 0 ) + if ( pfds[n].fd != ptr->sendsock ) { - //printf("%s paired has pollin (%s)\n",ptr->sendaddr,(char *)buf); - ptr->lasttime = now; - if ( ptr->ispaired != 0 ) - { - sendsock = ptr->publicsock; - break; - } + printf("unexpected fd.%d mismatched sendsock.%d\n",pfds[n].fd,ptr->sendsock); + nexti = i+1; + break; } - if ( buf != 0 ) + else if ( (pfds[n].revents & POLLIN) != 0 ) { - nn_freemsg(buf); - buf = 0; - size = 0; + if ( (size= nn_recv(ptr->sendsock,&buf,NN_MSG,0)) > 0 ) + { + printf("%s paired has pollin (%s)\n",ptr->sendaddr,(char *)buf); + ptr->lasttime = now; + if ( ptr->ispaired != 0 ) + { + sendsock = ptr->publicsock; + nexti = i+1; + break; + } + } + if ( buf != 0 ) + { + nn_freemsg(buf); + buf = 0; + size = 0; + } } } + n++; } - n++; } if ( iter == 0 ) { if ( (retval= nn_poll(pfds,n,1)) <= 0 ) { - if ( retval != 0 ) - printf("nn_poll retval.%d\n",retval); + //if ( retval != 0 ) + // printf("nn_poll retval.%d\n",retval); break; - } else printf("num pfds.%d retval.%d\n",n,retval); + } // else printf("num pfds.%d retval.%d\n",n,retval); } } - //free(pfds); - //printf("sendsock.%d Numpsocks.%d\n",sendsock,Numpsocks); + if ( sendsock >= 0 ) + printf("sendsock.%d Numpsocks.%d\n",sendsock,Numpsocks); if ( sendsock < 0 ) { for (i=nonz=0; i ptr->lasttime+PSOCK_KEEPALIVE ) { printf("PSOCKS[%d] of %d (%u %u) lag.%d IDLETIMEOUT\n",i,Numpsocks,ptr->publicport,ptr->sendport,now - ptr->lasttime); + if ( ptr->sendsock != ptr->publicsock && ptr->sendsock >= 0 ) + nn_close(ptr->sendsock); if ( ptr->publicsock >= 0 ) nn_close(ptr->publicsock); - if ( ptr->sendsock >= 0 ) - nn_close(ptr->sendsock); //portable_mutex_lock(&LP_psockmutex); if ( Numpsocks > 1 ) { @@ -573,13 +601,14 @@ void LP_psockloop(void *_ptr) // printouts seem to be needed for forwarding to w } } -void LP_psockadd(int32_t ispaired,int32_t publicsock,uint16_t recvport,int32_t sendsock,uint16_t sendport,char *subaddr,char *publicaddr) +void LP_psockadd(int32_t ispaired,int32_t publicsock,uint16_t recvport,int32_t sendsock,uint16_t sendport,char *subaddr,char *publicaddr,int32_t cmdchannel) { struct psock *ptr; portable_mutex_lock(&LP_psockmutex); PSOCKS = realloc(PSOCKS,sizeof(*PSOCKS) * (Numpsocks + 1)); ptr = &PSOCKS[Numpsocks++]; ptr->ispaired = ispaired; + ptr->cmdchannel = cmdchannel; ptr->publicsock = publicsock; ptr->publicport = recvport; ptr->sendsock = sendsock; @@ -609,65 +638,111 @@ int32_t LP_psockmark(char *publicaddr) return(retval); } -char *LP_psock(char *myipaddr,int32_t ispaired) +char *_LP_psock_create(int32_t *pullsockp,int32_t *pubsockp,char *ipaddr,uint16_t publicport,uint16_t subport,int32_t ispaired,int32_t cmdchannel,bits256 pubkey) { - char pushaddr[128],subaddr[128]; uint16_t i,publicport,subport,maxiters=100; int32_t timeout,pullsock=-1,pubsock=-1; cJSON *retjson=0; - retjson = cJSON_CreateObject(); - publicport = Psockport++; - subport = Psockport++; - for (i=0; i= 0 && (cmdchannel != 0 ||(pubsock= nn_socket(AF_SP,ispaired!=0?NN_PAIR:NN_PAIR)) >= 0) ) { - if ( publicport < MIN_PSOCK_PORT ) - publicport = MIN_PSOCK_PORT+1; - if ( subport <= publicport ) - subport = publicport + 1; - pullsock = pubsock = -1; - nanomsg_transportname(1,pushaddr,myipaddr,publicport); - nanomsg_transportname(1,subaddr,myipaddr,subport); - if ( (pullsock= nn_socket(AF_SP,ispaired!=0?NN_PAIR:NN_PULL)) >= 0 && (pubsock= nn_socket(AF_SP,ispaired!=0?NN_PAIR:NN_PAIR)) >= 0 ) + if ( nn_bind(pullsock,pushaddr) >= 0 && (cmdchannel != 0 || nn_bind(pubsock,subaddr) >= 0) ) { - if ( nn_bind(pullsock,pushaddr) >= 0 && nn_bind(pubsock,subaddr) >= 0 ) + arg = 100; + nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_SNDTIMEO,&arg,sizeof(arg)); + if ( pubsock >= 0 ) + nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_SNDTIMEO,&arg,sizeof(arg)); + arg = 1; + nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVTIMEO,&arg,sizeof(arg)); + if ( pubsock >= 0 ) + nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_RCVTIMEO,&arg,sizeof(arg)); + arg = 2; + nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_MAXTTL,&arg,sizeof(arg)); + if ( pubsock >= 0 ) + nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_MAXTTL,&arg,sizeof(arg)); + nanomsg_transportname(0,pushaddr,ipaddr,publicport); + nanomsg_transportname(0,subaddr,ipaddr,subport); + LP_psockadd(ispaired,pullsock,publicport,pubsock,subport,subaddr,pushaddr,cmdchannel); + if ( IAMLP != 0 && bits256_nonz(pubkey) != 0 ) { - timeout = 100; - nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); - nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); - if ( ispaired != 0 ) + char str[65]; + if ( (pubp= LP_pubkeyadd(pubkey)) != 0 ) { - //maxsize = 1024 * 1024; - //nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize)); - } - //if ( ispaired != 0 ) - { - nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); - nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); + if ( pubp->pairsock > 0 ) + { + printf("warning %s already has pairsock.%d, mark for purge\n",bits256_str(str,pubkey),pubp->pairsock); + for (i=0; ipairsock ) + { + PSOCKS[i].lasttime = (uint32_t)time(NULL) - PSOCK_KEEPALIVE - 1; + break; + } + } + printf("pairsock for %s <- %d\n",bits256_str(str,pubkey),pullsock); + pubp->pairsock = pullsock; } - timeout = 10; - nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_MAXTTL,&timeout,sizeof(timeout)); - nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_MAXTTL,&timeout,sizeof(timeout)); - nanomsg_transportname(0,pushaddr,myipaddr,publicport); - nanomsg_transportname(0,subaddr,myipaddr,subport); - LP_psockadd(ispaired,pullsock,publicport,pubsock,subport,subaddr,pushaddr); - jaddstr(retjson,"result","success"); - jaddstr(retjson,"LPipaddr",myipaddr); - jaddstr(retjson,"connectaddr",subaddr); - jaddnum(retjson,"connectport",subport); - jaddnum(retjson,"ispaired",ispaired); - jaddstr(retjson,"publicaddr",pushaddr); - jaddnum(retjson,"publicport",publicport); - printf("i.%d publicaddr.(%s) for subaddr.(%s), pullsock.%d pubsock.%d\n",i,pushaddr,subaddr,pullsock,pubsock); - break; - } else printf("bind error on %s or %s\n",pushaddr,subaddr); - if ( pullsock >= 0 ) - nn_close(pullsock); - if ( pubsock >= 0 ) - nn_close(pubsock); + } + retjson = cJSON_CreateObject(); + jaddstr(retjson,"result","success"); + jaddstr(retjson,"LPipaddr",ipaddr); + jaddstr(retjson,"connectaddr",subaddr); + jaddnum(retjson,"connectport",subport); + jaddnum(retjson,"ispaired",ispaired); + jaddnum(retjson,"cmdchannel",cmdchannel); + jaddstr(retjson,"publicaddr",pushaddr); + jaddnum(retjson,"publicport",publicport); + //printf("cmd.%d publicaddr.(%s) for subaddr.(%s), pullsock.%d pubsock.%d\n",cmdchannel,pushaddr,subaddr,pullsock,pubsock); + *pullsockp = pullsock; + *pubsockp = pubsock; + return(jprint(retjson,1)); + } else printf("bind error on %s or %s\n",pushaddr,subaddr); + if ( pullsock >= 0 ) + nn_close(pullsock); + if ( pubsock >= 0 ) + nn_close(pubsock); + } + return(0); +} + +char *LP_psock(int32_t *pullsockp,char *ipaddr,int32_t ispaired,int32_t cmdchannel,bits256 pubkey) +{ + char *retstr=0; uint16_t i,publicport,subport,maxport; int32_t pubsock=-1; + *pullsockp = -1; + //printf("LP_psock ipaddr.%s ispaird.%d cmdchannel.%d\n",ipaddr,ispaired,cmdchannel); + if ( cmdchannel == 0 ) + { + maxport = MAX_PSOCK_PORT; + publicport = Psockport++; + subport = Psockport++; + } + else + { + if ( cmdchannel != 0 && bits256_nonz(pubkey) == 0 ) + return(clonestr("{\"error\",\"cant do pairsock for null pubkey\"}")); + maxport = 65534; + publicport = subport = Pcmdport++; + } + for (i=0; i MAX_PSOCK_PORT ) + if ( Psockport >= MAX_PSOCK_PORT ) Psockport = MIN_PSOCK_PORT; - if ( i == maxiters ) - jaddstr(retjson,"error","cant find psock ports"); - return(jprint(retjson,1)); + if ( Pcmdport >= 65534 ) + Pcmdport = MAX_PSOCK_PORT; + return(clonestr("{\"error\",\"cant find psock ports\"}")); } /* @@ -681,23 +756,26 @@ char *LP_psock(char *myipaddr,int32_t ispaired) */ -char *issue_LP_psock(char *destip,uint16_t destport,int32_t ispaired) +char *issue_LP_psock(char *destip,uint16_t destport,int32_t ispaired,int32_t cmdchannel) { - char url[512],*retstr; - sprintf(url,"http://%s:%u/api/stats/psock?ispaired=%d",destip,destport-1,ispaired); + char str[65],url[512],*retstr; + sprintf(url,"http://%s:%u/api/stats/psock?ispaired=%d&cmdchannel=%d&pubkey=%s",destip,destport-1,ispaired,cmdchannel,bits256_str(str,G.LP_mypub25519)); //return(LP_issue_curl("psock",destip,destport,url)); retstr = issue_curlt(url,LP_HTTP_TIMEOUT*3); - printf("issue_LP_psock got (%s) from %s\n",retstr,destip); + //printf("issue_LP_psock got (%s) from %s\n",retstr,url); // this is needed?! return(retstr); } -uint16_t LP_psock_get(char *connectaddr,char *publicaddr,int32_t ispaired) +uint16_t LP_psock_get(char *connectaddr,char *publicaddr,int32_t ispaired,int32_t cmdchannel,char *ipaddr) { uint16_t publicport = 0; char *retstr,*addr; cJSON *retjson; struct LP_peerinfo *peer,*tmp; + connectaddr[0] = publicaddr[0] = 0; HASH_ITER(hh,LP_peerinfos,peer,tmp) { + if ( ipaddr != 0 && strcmp(ipaddr,peer->ipaddr) != 0 ) + continue; connectaddr[0] = publicaddr[0] = 0; - if ( peer->errors < LP_MAXPEER_ERRORS && (retstr= issue_LP_psock(peer->ipaddr,peer->port,ispaired)) != 0 ) + if ( peer->errors < LP_MAXPEER_ERRORS && (retstr= issue_LP_psock(peer->ipaddr,peer->port,ispaired,cmdchannel)) != 0 ) { if ( (retjson= cJSON_Parse(retstr)) != 0 ) { @@ -706,26 +784,22 @@ uint16_t LP_psock_get(char *connectaddr,char *publicaddr,int32_t ispaired) safecopy(publicaddr,addr,128); if ( (addr= jstr(retjson,"connectaddr")) != 0 ) safecopy(connectaddr,addr,128); - //if ( (addr= jstr(retjson,"connectaddr2")) != 0 ) - // safecopy(connectaddr2,addr,128); if ( publicaddr[0] != 0 && connectaddr[0] != 0 ) publicport = juint(retjson,"publicport"); free_json(retjson); } - printf("got.(%s) connect.%s public.%s\n",retstr,connectaddr,publicaddr); + //printf("got.(%s) connect.%s public.%s publicport.%u\n",retstr,connectaddr,publicaddr,publicport); free(retstr); + return(publicport); } else printf("error psock from %s:%u\n",peer->ipaddr,peer->port); - if ( publicport != 0 ) - break; } - return(publicport); + return(0); } int32_t LP_initpublicaddr(void *ctx,uint16_t *mypullportp,char *publicaddr,char *myipaddr,uint16_t mypullport,int32_t ispaired) { int32_t nntype,pullsock,timeout; char bindaddr[128],connectaddr[128]; *mypullportp = mypullport; - //connectaddr2[0] = 0; if ( ispaired == 0 ) { if ( LP_canbind != 0 ) @@ -736,7 +810,6 @@ int32_t LP_initpublicaddr(void *ctx,uint16_t *mypullportp,char *publicaddr,char { nanomsg_transportname(0,publicaddr,myipaddr,mypullport); nanomsg_transportname(1,bindaddr,myipaddr,mypullport); - //nanomsg_transportname2(1,bindaddr2,myipaddr,mypullport); } else { @@ -748,7 +821,7 @@ int32_t LP_initpublicaddr(void *ctx,uint16_t *mypullportp,char *publicaddr,char } while ( *mypullportp == 0 ) { - if ( (*mypullportp= LP_psock_get(connectaddr,publicaddr,ispaired)) != 0 ) + if ( (*mypullportp= LP_psock_get(connectaddr,publicaddr,ispaired,0,0)) != 0 ) break; sleep(10); printf("try to get publicaddr again\n"); @@ -767,8 +840,6 @@ int32_t LP_initpublicaddr(void *ctx,uint16_t *mypullportp,char *publicaddr,char } else { - //if ( connectaddr2[0] != 0 && nn_connect(pullsock,connectaddr2) > 0 ) - // printf("%s ",connectaddr2); printf("nntype.%d NN_PAIR.%d connect to %s connectsock.%d\n",nntype,NN_PAIR,connectaddr,pullsock); } } @@ -779,14 +850,10 @@ int32_t LP_initpublicaddr(void *ctx,uint16_t *mypullportp,char *publicaddr,char printf("bind to %s error for %s: %s\n",bindaddr,publicaddr,nn_strerror(nn_errno())); exit(-1); } - //if ( nn_bind(pullsock,bindaddr2) >= 0 ) - // printf("bound to %s\n",bindaddr2); } timeout = 100; nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); - //maxsize = 2 * 1024 * 1024; - //nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize)); if ( nntype == NN_SUB ) nn_setsockopt(pullsock,NN_SUB,NN_SUB_SUBSCRIBE,"",0); } diff --git a/iguana/exchanges/LP_ordermatch.c b/iguana/exchanges/LP_ordermatch.c index 1f3cbeefa..1bb578a21 100644 --- a/iguana/exchanges/LP_ordermatch.c +++ b/iguana/exchanges/LP_ordermatch.c @@ -478,13 +478,14 @@ int32_t LP_connectstartbob(void *ctx,int32_t pubsock,char *base,char *rel,double if ( (kmdcoin= LP_coinfind("KMD")) != 0 ) jadd(reqjson,"proof",LP_instantdex_txids(0,kmdcoin->smartaddr)); char str[65]; printf("BOB pubsock.%d binds to %d (%s)\n",pubsock,pair,bits256_str(str,qp->desthash)); - bits256 zero; - memset(zero.bytes,0,sizeof(zero)); - LP_reserved_msg(1,base,rel,zero,jprint(reqjson,0)); - sleep(1); LP_reserved_msg(1,qp->srccoin,qp->destcoin,qp->desthash,jprint(reqjson,0)); - sleep(1); - LP_reserved_msg(0,base,rel,zero,jprint(reqjson,0)); + if ( 0 ) + { + bits256 zero; + memset(zero.bytes,0,sizeof(zero)); + LP_reserved_msg(1,base,rel,zero,jprint(reqjson,0)); + LP_reserved_msg(0,base,rel,zero,jprint(reqjson,0)); + } free_json(reqjson); LP_importaddress(qp->destcoin,qp->destaddr); LP_otheraddress(qp->srccoin,otheraddr,qp->destcoin,qp->destaddr); @@ -512,6 +513,7 @@ char *LP_trade(void *ctx,char *myipaddr,int32_t mypubsock,struct LP_quoteinfo *q qp->aliceid = LP_aliceid_calc(qp->desttxid,qp->destvout,qp->feetxid,qp->feevout); if ( (qp->tradeid= tradeid) == 0 ) qp->tradeid = LP_rand(); + qp->srchash = destpubkey; LP_query(ctx,myipaddr,mypubsock,"request",qp); LP_Alicequery = *qp, LP_Alicemaxprice = maxprice, Alice_expiration = qp->timestamp + timeout, LP_Alicedestpubkey = destpubkey; char str[65]; printf("LP_trade %s/%s %.8f vol %.8f dest.(%s) maxprice %.8f\n",qp->srccoin,qp->destcoin,dstr(qp->satoshis),dstr(qp->destsatoshis),bits256_str(str,LP_Alicedestpubkey),maxprice); @@ -835,8 +837,6 @@ struct LP_quoteinfo *LP_trades_gotrequest(void *ctx,struct LP_quoteinfo *qp,stru qp = newqp; if ( (coin= LP_coinfind(qp->srccoin)) == 0 ) return(0); - //if ( strcmp(qp->srccoin,"GRS") == 0 || strcmp(qp->destcoin,"GRS") == 0 ) - // printf("LP_trades_gotrequest %s/%s myprice %.8f\n",qp->srccoin,qp->destcoin,LP_trades_bobprice(&bid,&ask,qp)); if ( (myprice= LP_trades_bobprice(&bid,&ask,qp)) == 0. ) return(0); autxo = &A; @@ -855,8 +855,6 @@ struct LP_quoteinfo *LP_trades_gotrequest(void *ctx,struct LP_quoteinfo *qp,stru memset(&qp->txid2,0,sizeof(qp->txid2)); qp->vout = qp->vout2 = -1; } else return(0); - //if ( strcmp(qp->srccoin,"GRS") == 0 || strcmp(qp->destcoin,"GRS") == 0 ) - // printf("LP_trades_gotrequest qprice %.8f vs myprice %.8f\n",qprice,myprice); if ( qprice > myprice ) { r = (LP_rand() % 100); @@ -873,7 +871,9 @@ struct LP_quoteinfo *LP_trades_gotrequest(void *ctx,struct LP_quoteinfo *qp,stru printf("request from blacklisted %s, ignore\n",bits256_str(str,qp->desthash)); return(0); } + printf("LP_address_utxo_reset\n"); LP_address_utxo_reset(coin); + printf("done LP_address_utxo_reset\n"); if ( (butxo= LP_address_myutxopair(butxo,1,utxos,max,LP_coinfind(qp->srccoin),qp->coinaddr,qp->txfee,dstr(qp->destsatoshis),price,qp->desttxfee)) != 0 ) { strcpy(qp->gui,G.gui); @@ -893,8 +893,6 @@ struct LP_quoteinfo *LP_trades_gotrequest(void *ctx,struct LP_quoteinfo *qp,stru } if ( (qprice= LP_trades_pricevalidate(qp,coin,myprice)) < 0. ) return(0); - //if ( strcmp(qp->srccoin,"GRS") == 0 || strcmp(qp->destcoin,"GRS") == 0 ) - // printf("final checks\n"); if ( LP_allocated(qp->txid,qp->vout) == 0 && LP_allocated(qp->txid2,qp->vout2) == 0 ) { reqjson = LP_quotejson(qp); @@ -905,15 +903,7 @@ struct LP_quoteinfo *LP_trades_gotrequest(void *ctx,struct LP_quoteinfo *qp,stru jaddnum(reqjson,"quotetime",qp->quotetime); jaddnum(reqjson,"pending",qp->timestamp + LP_RESERVETIME); jaddstr(reqjson,"method","reserved"); - bits256 zero; - memset(zero.bytes,0,sizeof(zero)); - LP_reserved_msg(1,qp->srccoin,qp->destcoin,zero,jprint(reqjson,0)); - if ( 0 )//if ( IAMLP == 0 ) - { - sleep(1); - LP_reserved_msg(1,qp->srccoin,qp->destcoin,qp->desthash,jprint(reqjson,0)); - } - //LP_reserved_msg(0,qp->srccoin,qp->destcoin,zero,jprint(reqjson,0)); + LP_reserved_msg(1,qp->srccoin,qp->destcoin,qp->desthash,jprint(reqjson,0)); free_json(reqjson); return(qp); } else printf("request processing selected ineligible utxos?\n"); @@ -1023,7 +1013,7 @@ void LP_tradesloop(void *ctx) now = (uint32_t)time(NULL); Q = qtp->Q; funcid = qtp->funcid; -//printf("dequeue %p funcid.%d aliceid.%llu iambob.%d\n",qtp,funcid,(long long)qtp->aliceid,qtp->iambob); +printf("dequeue %p funcid.%d aliceid.%llu iambob.%d\n",qtp,funcid,(long long)qtp->aliceid,qtp->iambob); portable_mutex_lock(&LP_tradesmutex); DL_DELETE(LP_tradesQ,qtp); HASH_FIND(hh,LP_trades,&qtp->aliceid,sizeof(qtp->aliceid),tp); @@ -1047,7 +1037,13 @@ void LP_tradesloop(void *ctx) } nonz++; tp->firstprocessed = tp->lastprocessed = (uint32_t)time(NULL); - //printf("iambob.%d funcid.%d vs %d\n",tp->iambob,funcid,LP_REQUEST); + if ( funcid == LP_CONNECT && tp->negotiationdone == 0 ) // bob all done + { + flag = 1; + tp->negotiationdone = now; + printf("bob sets negotiationdone.%u\n",now); + LP_trades_gotconnect(ctx,&tp->Q,&tp->Qs[LP_CONNECT],tp->pairstr); + } } continue; } @@ -1058,7 +1054,10 @@ void LP_tradesloop(void *ctx) //printf("finished dequeue %p funcid.%d aliceid.%llu iambob.%d\n",qtp,funcid,(long long)qtp->aliceid,qtp->iambob); free(qtp); if ( tp->negotiationdone != 0 ) + { + printf("iambob.%d negotiationdone.%u\n",tp->iambob,tp->negotiationdone); continue; + } flag = 0; if ( qtp->iambob == tp->iambob ) { @@ -1082,6 +1081,7 @@ void LP_tradesloop(void *ctx) { flag = 1; tp->negotiationdone = now; + printf("bob sets negotiationdone.%u\n",now); LP_trades_gotconnect(ctx,&tp->Q,&tp->Qs[LP_CONNECT],tp->pairstr); } } @@ -1090,7 +1090,7 @@ void LP_tradesloop(void *ctx) tp->lastprocessed = (uint32_t)time(NULL); nonz++; } - } + } else printf("qtp->iambob.%d vs tp->iambob.%d\n",qtp->iambob,tp->iambob); } HASH_ITER(hh,LP_trades,tp,tmp) { @@ -1110,6 +1110,7 @@ void LP_tradesloop(void *ctx) { if ( tp->connectsent == 0 ) { + tp->negotiationdone = (uint32_t)time(NULL); LP_Alicemaxprice = tp->bestprice; LP_reserved(ctx,LP_myipaddr,LP_mypubsock,&tp->Qs[LP_CONNECT]); // send LP_CONNECT tp->connectsent = now; @@ -1117,22 +1118,31 @@ void LP_tradesloop(void *ctx) } else if ( now < tp->firstprocessed+timeout && ((tp->firstprocessed - now) % 10) == 9 ) { - LP_Alicemaxprice = tp->bestprice; - LP_reserved(ctx,LP_myipaddr,LP_mypubsock,&tp->Qs[LP_CONNECT]); // send LP_CONNECT - printf("repeat LP_connect aliceid.%llu %.8f\n",(long long)tp->aliceid,tp->bestprice); + //LP_Alicemaxprice = tp->bestprice; + //LP_reserved(ctx,LP_myipaddr,LP_mypubsock,&tp->Qs[LP_CONNECT]); // send LP_CONNECT + printf("mark slow LP_connect aliceid.%llu %.8f\n",(long long)tp->aliceid,tp->bestprice); if ( (pubp= LP_pubkeyfind(tp->Qs[LP_CONNECT].srchash)) != 0 ) pubp->slowresponse++; } } } - if ( now > tp->firstprocessed+timeout*10 ) - { - //printf("purge swap aliceid.%llu\n",(long long)tp->aliceid); - portable_mutex_lock(&LP_tradesmutex); - HASH_DELETE(hh,LP_trades,tp); - portable_mutex_unlock(&LP_tradesmutex); - free(tp); - } + } + } + now = (uint32_t)time(NULL); + HASH_ITER(hh,LP_trades,tp,tmp) + { + timeout = LP_AUTOTRADE_TIMEOUT; + if ( (coin= LP_coinfind(tp->Q.srccoin)) != 0 && coin->electrum != 0 ) + timeout += LP_AUTOTRADE_TIMEOUT * .5; + if ( (coin= LP_coinfind(tp->Q.destcoin)) != 0 && coin->electrum != 0 ) + timeout += LP_AUTOTRADE_TIMEOUT * .5; + if ( now > tp->firstprocessed+timeout*10 ) + { + printf("purge swap aliceid.%llu\n",(long long)tp->aliceid); + portable_mutex_lock(&LP_tradesmutex); + HASH_DELETE(hh,LP_trades,tp); + portable_mutex_unlock(&LP_tradesmutex); + free(tp); } } if ( nonz == 0 ) @@ -1174,10 +1184,10 @@ int32_t LP_tradecommand(void *ctx,char *myipaddr,int32_t pubsock,cJSON *argjson, LP_tradecommand_log(argjson); rq = ((uint64_t)Q.R.requestid << 32) | Q.R.quoteid; printf("%-4d (%-10u %10u) %12s id.%-20llu %5s/%-5s %12.8f -> %12.8f (%11.8f) | RT.%d %d n%d\n",(uint32_t)time(NULL) % 3600,Q.R.requestid,Q.R.quoteid,method,(long long)Q.aliceid,Q.srccoin,Q.destcoin,dstr(Q.satoshis),dstr(Q.destsatoshis),(double)Q.destsatoshis/Q.satoshis,LP_RTcount,LP_swapscount,G.netid); - if ( Q.timestamp > 0 && time(NULL) > Q.timestamp + LP_AUTOTRADE_TIMEOUT*20 ) // eat expired packets + if ( Q.timestamp > 0 && time(NULL) > Q.timestamp + LP_AUTOTRADE_TIMEOUT*20 ) // eat expired packets, some old timestamps floating about? { - //printf("aliceid.%llu is expired by %d\n",(long long)Q.aliceid,(uint32_t)time(NULL) - (Q.timestamp + LP_AUTOTRADE_TIMEOUT*20)); - //return(1); + printf("aliceid.%llu is expired by %d\n",(long long)Q.aliceid,(uint32_t)time(NULL) - (Q.timestamp + LP_AUTOTRADE_TIMEOUT*20)); + return(1); } //LP_autoprices_update(method,Q.srccoin,dstr(Q.satoshis),Q.destcoin,dstr(Q.destsatoshis)); retval = 1; @@ -1187,7 +1197,7 @@ int32_t LP_tradecommand(void *ctx,char *myipaddr,int32_t pubsock,cJSON *argjson, { bestprice = LP_bob_competition(&counter,aliceid,qprice,1); //printf("%s lag %ld: aliceid.%llu price %.8f -> bestprice %.8f Alice max %.8f\n",jprint(argjson,0),Q.quotetime - (time(NULL)-20),(long long)aliceid,qprice,bestprice,LP_Alicemaxprice); - if ( 0 ) + if ( 1 ) { if ( LP_Alicemaxprice == 0. ) return(retval); @@ -1200,7 +1210,7 @@ int32_t LP_tradecommand(void *ctx,char *myipaddr,int32_t pubsock,cJSON *argjson, } else printf("got reserved response from destpubkey %s\n",bits256_str(str,Q.srchash)); } } - if ( bits256_cmp(G.LP_mypub25519,Q.desthash) == 0 && bits256_cmp(G.LP_mypub25519,Q.srchash) != 0 ) + if ( bits256_cmp(G.LP_mypub25519,Q.desthash) == 0 && bits256_cmp(G.LP_mypub25519,Q.srchash) != 0 ) // alice { if ( Qtrades == 0 ) { @@ -1216,7 +1226,7 @@ int32_t LP_tradecommand(void *ctx,char *myipaddr,int32_t pubsock,cJSON *argjson, else if ( strcmp(method,"connected") == 0 ) { bestprice = LP_bob_competition(&counter,aliceid,qprice,1000); - if ( bits256_cmp(G.LP_mypub25519,Q.desthash) == 0 && bits256_cmp(G.LP_mypub25519,Q.srchash) != 0 ) + if ( bits256_cmp(G.LP_mypub25519,Q.desthash) == 0 && bits256_cmp(G.LP_mypub25519,Q.srchash) != 0 ) // alice { static uint64_t rqs[1024]; for (i=0; i bestprice %.8f\n",jprint(argjson,0),Q.quotetime - (time(NULL)-20),(long long)aliceid,qprice,bestprice); - if ( Qtrades == 0 ) + if ( Qtrades == 0 || (bits256_cmp(Q.srchash,G.LP_mypub25519) == 0 && bits256_cmp(G.LP_mypub25519,Q.desthash) != 0) ) LP_trades_gotrequest(ctx,&Q,&Q2,jstr(argjson,"pair")); else LP_tradecommandQ(&Q,jstr(argjson,"pair"),LP_REQUEST); } else if ( strcmp(method,"connect") == 0 ) { LP_bob_competition(&counter,aliceid,qprice,1000); - if ( bits256_cmp(G.LP_mypub25519,Q.srchash) == 0 && bits256_cmp(G.LP_mypub25519,Q.desthash) != 0 ) + if ( bits256_cmp(G.LP_mypub25519,Q.srchash) == 0 && bits256_cmp(G.LP_mypub25519,Q.desthash) != 0 ) // bob { static uint64_t rqs[1024]; for (i=0; i 0 ) Q.othercredits = LP_instantdex_proofcheck(Q.destcoin,Q.destaddr,proof,num); - if ( 1 || Qtrades == 0 ) + if ( Qtrades == 0 ) LP_trades_gotconnect(ctx,&Q,&Q2,jstr(argjson,"pair")); else LP_tradecommandQ(&Q,jstr(argjson,"pair"),LP_CONNECT); } diff --git a/iguana/exchanges/LP_peers.c b/iguana/exchanges/LP_peers.c index bdac0d50e..95c8b6821 100644 --- a/iguana/exchanges/LP_peers.c +++ b/iguana/exchanges/LP_peers.c @@ -56,6 +56,58 @@ char *LP_peers() return(jprint(peersjson,1)); } +void LP_cmdchannel(struct LP_peerinfo *peer) +{ + char *hellostr = "{\"method\":\"hello\"}"; + char connectaddr[128],publicaddr[128],*retstr; int32_t pairsock=-1,pubsock,sentbytes=-2; uint16_t cmdport; + if ( bits256_nonz(G.LP_mypub25519) == 0 || strcmp(G.USERPASS,"1d8b27b21efabcd96571cd56f91a40fb9aa4cc623d273c63bf9223dc6f8cd81f") == 0 ) + return; + if ( (cmdport= LP_psock_get(connectaddr,publicaddr,1,1,peer->ipaddr)) != 0 ) + { + if ( (retstr= _LP_psock_create(&pairsock,&pubsock,peer->ipaddr,cmdport,cmdport,1,1,G.LP_mypub25519)) != 0 ) + { + if ( nn_connect(pairsock,connectaddr) < 0 ) + printf("error connecting cmdchannel with %s\n",connectaddr); + else + { + peer->pairsock = pairsock; + sentbytes = nn_send(peer->pairsock,hellostr,(int32_t)strlen(hellostr)+1,0); + printf("cmdchannel %d created %s sent.%d\n",peer->pairsock,retstr,sentbytes); + } + free(retstr); + } + } else printf("error getting cmdchannel with %s\n",peer->ipaddr); +} + +void LP_cmdchannels() +{ + struct LP_peerinfo *peer,*tmp; + if ( IAMLP == 0 ) + { + HASH_ITER(hh,LP_peerinfos,peer,tmp) + { + if ( peer->pairsock <= 0 ) + LP_cmdchannel(peer); + } + } +} + +void LP_peer_pairsock(bits256 pubkey) +{ + struct LP_peerinfo *peer,*tmp; + if ( IAMLP == 0 ) + { + HASH_ITER(hh,LP_peerinfos,peer,tmp) + { + if ( bits256_cmp(pubkey,peer->pubkey) == 0 ) + { + peer->pairsock = -1; + break; + } + } + } +} + struct LP_peerinfo *LP_addpeer(struct LP_peerinfo *mypeer,int32_t mypubsock,char *ipaddr,uint16_t port,uint16_t pushport,uint16_t subport,int32_t isLP,uint32_t sessionid,uint16_t netid) { uint32_t ipbits; int32_t valid,pushsock,subsock,timeout; char checkip[64],pushaddr[64],subaddr[64]; struct LP_peerinfo *peer = 0; @@ -79,6 +131,8 @@ struct LP_peerinfo *LP_addpeer(struct LP_peerinfo *mypeer,int32_t mypubsock,char if ( (peer->isLP= isLP) != 0 ) LP_numactive_LP++; } + if ( IAMLP == 0 && peer->pairsock <= 0 ) + LP_cmdchannel(peer); /*if ( numpeers > peer->numpeers ) peer->numpeers = numpeers; if ( numutxos > peer->numutxos ) @@ -86,7 +140,7 @@ struct LP_peerinfo *LP_addpeer(struct LP_peerinfo *mypeer,int32_t mypubsock,char if ( peer->sessionid == 0 ) peer->sessionid = sessionid;*/ } - else if ( IAMLP != 0 || LP_numactive_LP < 3 ) + else if ( IAMLP != 0 || LP_numactive_LP < 10 ) { //printf("addpeer (%s:%u) pushport.%u subport.%u\n",ipaddr,port,pushport,subport); peer = calloc(1,sizeof(*peer)); @@ -117,8 +171,6 @@ struct LP_peerinfo *LP_addpeer(struct LP_peerinfo *mypeer,int32_t mypubsock,char nn_setsockopt(pushsock,NN_SOL_SOCKET,NN_MAXTTL,&timeout,sizeof(timeout)); timeout = 100; nn_setsockopt(pushsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout)); - //maxsize = 2 * 1024 * 1024; - //nn_setsockopt(pushsock,NN_SOL_SOCKET,NN_SNDBUF,&maxsize,sizeof(maxsize)); printf("connected to push.(%s) pushsock.%d valid.%d | ",pushaddr,pushsock,valid); peer->connected = (uint32_t)time(NULL); peer->pushsock = pushsock; @@ -128,12 +180,9 @@ struct LP_peerinfo *LP_addpeer(struct LP_peerinfo *mypeer,int32_t mypubsock,char nn_setsockopt(subsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout)); nn_setsockopt(subsock,NN_SUB,NN_SUB_SUBSCRIBE,"",0); nanomsg_transportname(0,subaddr,peer->ipaddr,subport); - //nanomsg_transportname2(0,subaddr2,peer->ipaddr,subport); valid = 0; if ( nn_connect(subsock,subaddr) >= 0 ) valid++; - //if ( nn_connect(subsock,subaddr2) >= 0 ) - // valid++; if ( valid > 0 ) { peer->subsock = subsock; @@ -165,24 +214,8 @@ struct LP_peerinfo *LP_addpeer(struct LP_peerinfo *mypeer,int32_t mypubsock,char printf("_LPaddpeer %s -> numpeers.%d mypubsock.%d other.(%d)\n",ipaddr,mypeer->numpeers,mypubsock,isLP); } else peer->numpeers = 1; // will become mypeer portable_mutex_unlock(&LP_peermutex); - /*if ( IAMLP != 0 && mypubsock >= 0 ) - { - //struct iguana_info *coin,*ctmp; char busaddr[64]; // - //memset(zero.bytes,0,sizeof(zero)); - //LP_send(mypubsock,msg,(int32_t)strlen(msg)+1,1); - //LP_reserved_msg(0,"","",zero,jprint(LP_peerjson(peer),1)); - if ( 0 ) - { - HASH_ITER(hh,LP_coins,coin,ctmp) - { - if ( coin->bussock >= 0 ) - { - nanomsg_transportname(0,busaddr,peer->ipaddr,coin->busport); - nn_connect(coin->bussock,busaddr); - } - } - } - }*/ + if ( IAMLP == 0 && peer->pairsock <= 0 ) + LP_cmdchannel(peer); } else printf("%s invalid pushsock.%d or subsock.%d\n",peer->ipaddr,peer->pushsock,peer->subsock); } } else printf("LP_addpeer: checkip.(%s) vs (%s)\n",checkip,ipaddr); @@ -235,13 +268,18 @@ void LP_closepeers() return(bussock); }*/ -void LP_peer_recv(char *ipaddr,int32_t ismine) +void LP_peer_recv(char *ipaddr,int32_t ismine,struct LP_pubkey_info *pubp) { struct LP_peerinfo *peer; if ( (peer= LP_peerfind((uint32_t)calc_ipbits(ipaddr),RPC_port)) != 0 ) { peer->numrecv++; - //if ( ismine != 0 ) + if ( ismine != 0 && bits256_cmp(G.LP_mypub25519,pubp->pubkey) != 0 && (bits256_nonz(peer->pubkey) == 0 || pubp->pairsock <= 0) ) + { + peer->pubkey = pubp->pubkey; + pubp->pairsock = peer->pairsock; + char str[65]; printf("set pubkey for %s <- %s, pairsock.%d\n",ipaddr,bits256_str(str,pubp->pubkey),pubp->pairsock); + } peer->recvtime = (uint32_t)time(NULL); } } diff --git a/iguana/exchanges/LP_privkey.c b/iguana/exchanges/LP_privkey.c index 191c5fde1..f0c8c2719 100644 --- a/iguana/exchanges/LP_privkey.c +++ b/iguana/exchanges/LP_privkey.c @@ -417,6 +417,7 @@ int32_t LP_passphrase_init(char *passphrase,char *gui,uint16_t netid,char *seedn LP_priceinfos_clear(); G.USERPASS_COUNTER = counter; G.initializing = 0; + LP_cmdchannels(); return(0); } diff --git a/iguana/exchanges/LP_signatures.c b/iguana/exchanges/LP_signatures.c index 4ac5a051b..0cacc3ac5 100644 --- a/iguana/exchanges/LP_signatures.c +++ b/iguana/exchanges/LP_signatures.c @@ -547,7 +547,7 @@ void LP_notify_pubkeys(void *ctx,int32_t pubsock) } else printf("no LPipaddr\n"); } jaddnum(reqjson,"session",G.LP_sessionid); - LP_reserved_msg(0,"","",zero,jprint(reqjson,1)); + LP_reserved_msg(1,"","",zero,jprint(reqjson,1)); } char *LP_notify_recv(cJSON *argjson) @@ -561,7 +561,7 @@ char *LP_notify_recv(cJSON *argjson) if ( (ipaddr= jstr(argjson,"isLP")) != 0 ) { //printf("notify got isLP %s %d\n",ipaddr,jint(argjson,"ismine")); - LP_peer_recv(ipaddr,jint(argjson,"ismine")); + LP_peer_recv(ipaddr,jint(argjson,"ismine"),pubp); if ( IAMLP != 0 && G.LP_IAMLP == 0 && strcmp(ipaddr,LP_myipaddr) == 0 ) { if ( bits256_cmp(pub,G.LP_mypub25519) != 0 ) @@ -675,12 +675,11 @@ void LP_query(void *ctx,char *myipaddr,int32_t mypubsock,char *method,struct LP_ //if ( bits256_nonz(qp->srchash) == 0 || strcmp(method,"request") != 0 ) { memset(&zero,0,sizeof(zero)); - LP_reserved_msg(1,qp->srccoin,qp->destcoin,zero,clonestr(msg)); - if ( strcmp(method,"connect") == 0 ) + if ( bits256_nonz(qp->srchash) != 0 ) + LP_reserved_msg(1,qp->srccoin,qp->destcoin,qp->srchash,clonestr(msg)); + else { - sleep(1); LP_reserved_msg(1,qp->srccoin,qp->destcoin,zero,clonestr(msg)); - sleep(1); LP_reserved_msg(0,qp->srccoin,qp->destcoin,zero,clonestr(msg)); } free(msg); diff --git a/iguana/exchanges/LP_utxo.c b/iguana/exchanges/LP_utxo.c index 1bf47ebc2..4887f76af 100644 --- a/iguana/exchanges/LP_utxo.c +++ b/iguana/exchanges/LP_utxo.c @@ -447,7 +447,6 @@ struct LP_address *LP_address_utxo_reset(struct iguana_info *coin) { struct LP_address *ap; struct LP_address_utxo *up,*tmp; int32_t i,n,m,vout,height; cJSON *array,*item,*txobj; bits256 zero; int64_t value; bits256 txid; uint32_t now; LP_address(coin,coin->smartaddr); - //printf("call listunspent issue %s (%s)\n",coin->symbol,coin->smartaddr); memset(zero.bytes,0,sizeof(zero)); LP_listunspent_issue(coin->symbol,coin->smartaddr,2,zero,zero); if ( (ap= LP_addressfind(coin,coin->smartaddr)) == 0 ) @@ -455,8 +454,12 @@ struct LP_address *LP_address_utxo_reset(struct iguana_info *coin) printf("LP_address_utxo_reset: cant find address data\n"); return(0); } + if ( time(NULL) < coin->lastresetutxo+30 ) + return(ap); + coin->lastresetutxo = (uint32_t)time(NULL); if ( (array= LP_listunspent(coin->symbol,coin->smartaddr,zero,zero)) != 0 ) { + //printf("clear ap->utxos\n"); DL_FOREACH_SAFE(ap->utxos,up,tmp) { portable_mutex_lock(&coin->addrmutex); @@ -467,6 +470,7 @@ struct LP_address *LP_address_utxo_reset(struct iguana_info *coin) DL_APPEND(LP_garbage_collector2,up); portable_mutex_unlock(&LP_gcmutex); } + //printf("done clearing ap->utxos\n"); now = (uint32_t)time(NULL); if ( (n= cJSON_GetArraySize(array)) > 0 ) { diff --git a/iguana/exchanges/stats.c b/iguana/exchanges/stats.c index 922e4606e..d08202993 100644 --- a/iguana/exchanges/stats.c +++ b/iguana/exchanges/stats.c @@ -568,6 +568,7 @@ char *stats_rpcparse(char *retbuf,int32_t bufsize,int32_t *jsonflagp,int32_t *po free_json(json); if ( tmpjson != 0 ) free(tmpjson); + //printf("stats_JSON rpc return.(%s)\n",retstr); return(retstr); } free_json(argjson);