Browse Source

Merge pull request #573 from jl777/fifo

fix bugs with trade statemachine
automatically establish private comm channels with LP nodes, include auto-reconnect
dev
jl777 7 years ago
committed by GitHub
parent
commit
ec9c89c463
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      iguana/exchanges/LP_commands.c
  2. 12
      iguana/exchanges/LP_include.h
  3. 28
      iguana/exchanges/LP_nativeDEX.c
  4. 291
      iguana/exchanges/LP_network.c
  5. 102
      iguana/exchanges/LP_ordermatch.c
  6. 90
      iguana/exchanges/LP_peers.c
  7. 1
      iguana/exchanges/LP_privkey.c
  8. 11
      iguana/exchanges/LP_signatures.c
  9. 6
      iguana/exchanges/LP_utxo.c
  10. 1
      iguana/exchanges/stats.c

25
iguana/exchanges/LP_commands.c

@ -752,16 +752,21 @@ jpg(srcfile, destfile, power2=7, password, data="", required, ind=0)\n\
{
if ( strcmp(method,"psock") == 0 )
{
if ( myipaddr == 0 || myipaddr[0] == 0 || strcmp(myipaddr,"127.0.0.1") == 0 )
{
if ( LP_mypeer != 0 )
myipaddr = LP_mypeer->ipaddr;
else printf("LP_psock dont have actual ipaddr?\n");
}
if ( jint(argjson,"ispaired") != 0 )
return(LP_psock(myipaddr,jint(argjson,"ispaired")));
else return(clonestr("{\"error\":\"you are running an obsolete version, update\"}"));
}
int32_t psock;
if ( myipaddr == 0 || myipaddr[0] == 0 || strcmp(myipaddr,"127.0.0.1") == 0 )
{
if ( LP_mypeer != 0 )
myipaddr = LP_mypeer->ipaddr;
else printf("LP_psock dont have actual ipaddr?\n");
}
if ( jint(argjson,"ispaired") != 0 )
{
retstr = LP_psock(&psock,myipaddr,1,jint(argjson,"cmdchannel"),jbits256(argjson,"pubkey"));
//printf("LP_commands.(%s)\n",retstr);
return(retstr);
}
else return(clonestr("{\"error\":\"you are running an obsolete version, update\"}"));
}
}
else
{

12
iguana/exchanges/LP_include.h

@ -301,7 +301,7 @@ struct iguana_info
portable_mutex_t txmutex,addrmutex; struct LP_transaction *transactions; struct LP_address *addresses;
uint64_t txfee;
int32_t numutxos,notarized,longestchain,firstrefht,firstscanht,lastscanht,height; uint16_t busport;
uint32_t txversion,dPoWtime,loadedcache,electrumlist,lastunspent,importedprivkey,lastpushtime,lastutxosync,addr_listunspent_requested,lastutxos,updaterate,counter,inactive,lastmempool,lastgetinfo,ratetime,heighttime,lastmonitor,obooktime;
uint32_t txversion,dPoWtime,lastresetutxo,loadedcache,electrumlist,lastunspent,importedprivkey,lastpushtime,lastutxosync,addr_listunspent_requested,lastutxos,updaterate,counter,inactive,lastmempool,lastgetinfo,ratetime,heighttime,lastmonitor,obooktime;
uint8_t pubtype,p2shtype,isPoS,wiftype,wiftaddr,taddr,noimportprivkey_flag,userconfirms,isassetchain,maxconfirms;
char symbol[128],smartaddr[64],userpass[1024],serverport[128],instantdex_address[64];
// portfolio
@ -361,9 +361,10 @@ struct LP_address
struct LP_peerinfo
{
UT_hash_handle hh;
bits256 pubkey;
uint64_t ip_port;
uint32_t recvtime,numrecv,ipbits,errortime,errors,numpeers,needping,lasttime,connected,lastutxos,lastpeers,diduquery,good,sessionid;
int32_t pushsock,subsock,isLP;
int32_t pushsock,subsock,isLP,pairsock;
uint16_t port,netid;
char ipaddr[64];
};
@ -431,7 +432,7 @@ struct LP_pubkey_info
struct LP_pubswap *bobswaps,*aliceswaps;
int64_t dynamictrust,unconfcredits;
uint32_t timestamp,numerrors,lasttime,slowresponse;
int32_t istrusted;
int32_t istrusted,pairsock;
uint8_t rmd160[20],sig[65],pubsecp[33],siglen;
};
@ -486,7 +487,7 @@ int32_t LP_iseligible(uint64_t *valp,uint64_t *val2p,int32_t iambob,char *symbol
int32_t LP_pullsock_check(void *ctx,char **retstrp,char *myipaddr,int32_t pubsock,int32_t pullsock);
int64_t LP_listunspent_parseitem(struct iguana_info *coin,bits256 *txidp,int32_t *voutp,int32_t *heightp,cJSON *item);
void LP_unspents_cache(char *symbol,char *addr,char *arraystr,int32_t updatedflag);
uint16_t LP_psock_get(char *connectaddr,char *publicaddr,int32_t ispaired);
uint16_t LP_psock_get(char *connectaddr,char *publicaddr,int32_t ispaired,int32_t cmdchannel,char *ipaddr);
//void LP_utxo_clientpublish(struct LP_utxoinfo *utxo);
//int32_t LP_coinbus(uint16_t coin_busport);
int32_t LP_nanomsg_recvs(void *ctx);
@ -544,9 +545,10 @@ int32_t LP_listunspent_both(char *symbol,char *coinaddr,int32_t fullflag);
uint16_t LP_randpeer(char *destip);
void LP_tradebot_pauseall();
void LP_portfolio_reset();
struct LP_pubkey_info *LP_pubkeyadd(bits256 pubkey);
uint32_t LP_atomic_locktime(char *base,char *rel);
struct LP_pubkey_info *LP_pubkeyfind(bits256 pubkey);
char *issue_LP_psock(char *destip,uint16_t destport,int32_t ispaired);
char *issue_LP_psock(char *destip,uint16_t destport,int32_t ispaired,int32_t cmdchannel);
char *LP_unspents_filestr(char *symbol,char *addr);
cJSON *bitcoin_data2json(char *symbol,uint8_t taddr,uint8_t pubtype,uint8_t p2shtype,uint8_t isPoS,int32_t height,bits256 *txidp,struct iguana_msgtx *msgtx,uint8_t *extraspace,int32_t extralen,uint8_t *serialized,int32_t len,cJSON *vins,int32_t suppress_pubkeys,int32_t zcash);
//int32_t LP_butxo_findeither(bits256 txid,int32_t vout);

28
iguana/exchanges/LP_nativeDEX.c

@ -206,7 +206,7 @@ char *LP_command_process(void *ctx,char *myipaddr,int32_t pubsock,cJSON *argjson
//LP_send(pubsock,retstr,(int32_t)strlen(retstr)+1,0);
}
}
else if ( LP_statslog_parse() > 0 )
else if ( LP_statslog_parse() > 0 && 0 )
{
memset(zero.bytes,0,sizeof(zero));
if ( (retjson= LP_statslog_disp(2000000000,2000000000,"",zero,0,0))) // pending swaps
@ -303,7 +303,7 @@ char *LP_process_message(void *ctx,char *typestr,char *myipaddr,int32_t pubsock,
if ( jsonstr != 0 && argjson != 0 )
{
len = (int32_t)strlen(jsonstr) + 1;
if ( (method= jstr(argjson,"method")) != 0 && strcmp(method,"broadcast") == 0 )
if ( (method= jstr(argjson,"method")) != 0 && strcmp(method,"psock") != 0 && strcmp(method,"broadcast") == 0 )
{
bits256 zero; cJSON *reqjson; char *cipherstr; int32_t cipherlen; uint8_t cipher[LP_ENCRYPTED_MAXSIZE];
if ( (reqjson= LP_dereference(argjson,"broadcast")) != 0 )
@ -1138,9 +1138,29 @@ void LP_reserved_msgs(void *ignore)
int32_t LP_reserved_msg(int32_t priority,char *base,char *rel,bits256 pubkey,char *msg)
{
int32_t n = 0;
struct LP_pubkey_info *pubp; int32_t sentbytes,n = 0;
if ( strcmp(G.USERPASS,"1d8b27b21efabcd96571cd56f91a40fb9aa4cc623d273c63bf9223dc6f8cd81f") == 0 )
return(-1);
if ( priority > 0 && bits256_nonz(pubkey) != 0 )
{
if ( (pubp= LP_pubkeyfind(pubkey)) != 0 )
{
if ( pubp->pairsock > 0 )
{
if ( (sentbytes= nn_send(pubp->pairsock,msg,(int32_t)strlen(msg)+1,0)) < 0 )
{
pubp->pairsock = -1;
LP_peer_pairsock(pubkey);
printf("mark cmdchannel %d closed sentbytes.%d\n",pubp->pairsock,sentbytes);
}
else
{
printf("sent %d bytes to cmdchannel.%d\n",sentbytes,pubp->pairsock);
return(sentbytes);
}
}
}
}
portable_mutex_lock(&LP_reservedmutex);
if ( num_Reserved_msgs[priority] < sizeof(Reserved_msgs[priority])/sizeof(*Reserved_msgs[priority]) )
{
@ -1312,7 +1332,7 @@ void LPinit(uint16_t myport,uint16_t mypullport,uint16_t mypubport,uint16_t mybu
printf("got %s, initpeers. LP_mypubsock.%d pullsock.%d RPC_port.%u mypullport.%d mypubport.%d pushaddr.%s\n",myipaddr,LP_mypubsock,LP_mypullsock,RPC_port,mypullport,mypubport,pushaddr);
LP_passphrase_init(passphrase,jstr(argjson,"gui"),juint(argjson,"netid"),jstr(argjson,"seednode"));
#ifndef FROM_JS
if ( IAMLP != 0 && OS_thread_create(malloc(sizeof(pthread_t)),NULL,(void *)LP_psockloop,(void *)myipaddr) != 0 )
if ( OS_thread_create(malloc(sizeof(pthread_t)),NULL,(void *)LP_psockloop,(void *)myipaddr) != 0 )
{
printf("error launching LP_psockloop for (%s)\n",myipaddr);
exit(-1);

291
iguana/exchanges/LP_network.c

@ -21,12 +21,12 @@
struct psock
{
uint32_t lasttime,lastping,errors;
int32_t publicsock,sendsock,ispaired;
int32_t publicsock,sendsock,ispaired,cmdchannel;
uint16_t publicport,sendport;
char sendaddr[128],publicaddr[128];
} *PSOCKS;
uint16_t Numpsocks,Psockport = MIN_PSOCK_PORT;
uint16_t Numpsocks,Psockport = MIN_PSOCK_PORT,Pcmdport = MAX_PSOCK_PORT;
#ifdef FROM_JS
@ -299,16 +299,18 @@ void LP_queuesend(uint32_t crc32,int32_t pubsock,char *base,char *rel,uint8_t *m
void LP_broadcast_finish(int32_t pubsock,char *base,char *rel,uint8_t *msg,cJSON *argjson,uint32_t crc32)
{
int32_t msglen;
int32_t msglen; char *method;
if ( (method= jstr(argjson,"method")) == 0 )
return;
msg = (void *)jprint(argjson,0);
msglen = (int32_t)strlen((char *)msg) + 1;
if ( crc32 == 0 )
crc32 = calc_crc32(0,&msg[2],msglen - 2);
//printf("crc32.%x IAMLP.%d pubsock.%d\n",crc32,G.LP_IAMLP,pubsock);
#ifdef FROM_MARKETMAKER
if ( G.LP_IAMLP == 0 || pubsock < 0 )
if ( (G.LP_IAMLP == 0 || pubsock < 0) && strcmp(method,"psock") != 0 )
#else
if ( IAMLP == 0 || pubsock < 0 )
if ( (IAMLP == 0 || pubsock < 0 && strcmp(method,"psock") != 0 )
#endif
{
free(msg);
@ -412,10 +414,10 @@ uint32_t LP_swapsend(int32_t pairsock,struct basilisk_swap *swap,uint32_t msgbit
return(nextbits);
}
void LP_psockloop(void *_ptr) // printouts seem to be needed for forwarding to work
void LP_psockloop(void *_ptr)
{
static struct nn_pollfd *pfds;
int32_t i,n,nonz,iter,retval,sentbytes,size=0,sendsock = -1; uint32_t now; struct psock *ptr=0; void *buf=0; char keepalive[512];
int32_t nexti=0,j,i,n,nonz,iter,retval,sentbytes,size=0,sendsock = -1; uint32_t now; struct psock *ptr=0; void *buf=0; char keepalive[512]; void *ctx = bitcoin_ctx();
strcpy(LP_psockloop_stats.name,"LP_psockloop");
LP_psockloop_stats.threshold = 1000.;
while ( LP_STOP_RECEIVED == 0 )
@ -447,12 +449,14 @@ void LP_psockloop(void *_ptr) // printouts seem to be needed for forwarding to w
memset(pfds,0,sizeof(*pfds) * ((Numpsocks*2 <= MAX_PSOCK_PORT) ? Numpsocks*2 : MAX_PSOCK_PORT));
for (iter=0; iter<2; iter++)
{
for (i=n=0; i<Numpsocks; i++)
for (j=n=0; j<Numpsocks; j++)
{
i = (j + nexti) % Numpsocks;
ptr = &PSOCKS[i];
if ( iter == 0 )
{
pfds[n].fd = ptr->publicsock;
//printf("check sock.%d\n",ptr->publicsock);
pfds[n].events = POLLIN;
}
else
@ -460,16 +464,35 @@ void LP_psockloop(void *_ptr) // printouts seem to be needed for forwarding to w
if ( pfds[n].fd != ptr->publicsock )
{
printf("unexpected fd.%d mismatched publicsock.%d\n",pfds[n].fd,ptr->publicsock);
nexti = i+1;
break;
}
else if ( (pfds[n].revents & POLLIN) != 0 )
{
//printf("publicsock.%d %s has pollin\n",ptr->publicsock,ptr->publicaddr);
//printf("cmd.%d publicsock.%d %s has pollin\n",ptr->cmdchannel,ptr->publicsock,ptr->publicaddr);
buf = 0;
if ( (size= nn_recv(ptr->publicsock,&buf,NN_MSG,0)) > 0 )
{
ptr->lasttime = now;
sendsock = ptr->sendsock;
if ( ptr->cmdchannel == 0 )
sendsock = ptr->sendsock;
else
{
char *retstr; cJSON *argjson;
//printf("nn_recv.(%s)\n",(char *)buf);
if ( (argjson= cJSON_Parse((char *)buf)) != 0 )
{
if ( (retstr= LP_command_process(ctx,"127.0.0.0",ptr->publicsock,argjson,buf,size)) != 0 )
{
printf("processed.(%s)\n",retstr);
if ( (size= nn_send(ptr->publicsock,retstr,(int32_t)strlen(retstr)+1,0)) <= 0 )
printf("error sending result\n");
free(retstr);
}
free_json(argjson);
}
}
nexti = i+1;
break;
}
else if ( buf != 0 )
@ -481,52 +504,57 @@ void LP_psockloop(void *_ptr) // printouts seem to be needed for forwarding to w
}
}
n++;
if ( iter == 0 )
{
pfds[n].fd = ptr->sendsock;
pfds[n].events = POLLIN;
}
else
if ( ptr->sendsock > 0 )
{
if ( pfds[n].fd != ptr->sendsock )
if ( iter == 0 )
{
printf("unexpected fd.%d mismatched sendsock.%d\n",pfds[n].fd,ptr->sendsock);
break;
pfds[n].fd = ptr->sendsock;
pfds[n].events = POLLIN;
}
else if ( (pfds[n].revents & POLLIN) != 0 )
else
{
if ( (size= nn_recv(ptr->sendsock,&buf,NN_MSG,0)) > 0 )
if ( pfds[n].fd != ptr->sendsock )
{
//printf("%s paired has pollin (%s)\n",ptr->sendaddr,(char *)buf);
ptr->lasttime = now;
if ( ptr->ispaired != 0 )
{
sendsock = ptr->publicsock;
break;
}
printf("unexpected fd.%d mismatched sendsock.%d\n",pfds[n].fd,ptr->sendsock);
nexti = i+1;
break;
}
if ( buf != 0 )
else if ( (pfds[n].revents & POLLIN) != 0 )
{
nn_freemsg(buf);
buf = 0;
size = 0;
if ( (size= nn_recv(ptr->sendsock,&buf,NN_MSG,0)) > 0 )
{
printf("%s paired has pollin (%s)\n",ptr->sendaddr,(char *)buf);
ptr->lasttime = now;
if ( ptr->ispaired != 0 )
{
sendsock = ptr->publicsock;
nexti = i+1;
break;
}
}
if ( buf != 0 )
{
nn_freemsg(buf);
buf = 0;
size = 0;
}
}
}
n++;
}
n++;
}
if ( iter == 0 )
{
if ( (retval= nn_poll(pfds,n,1)) <= 0 )
{
if ( retval != 0 )
printf("nn_poll retval.%d\n",retval);
//if ( retval != 0 )
// printf("nn_poll retval.%d\n",retval);
break;
} else printf("num pfds.%d retval.%d\n",n,retval);
} // else printf("num pfds.%d retval.%d\n",n,retval);
}
}
//free(pfds);
//printf("sendsock.%d Numpsocks.%d\n",sendsock,Numpsocks);
if ( sendsock >= 0 )
printf("sendsock.%d Numpsocks.%d\n",sendsock,Numpsocks);
if ( sendsock < 0 )
{
for (i=nonz=0; i<Numpsocks; i++)
@ -537,10 +565,10 @@ void LP_psockloop(void *_ptr) // printouts seem to be needed for forwarding to w
if ( now > ptr->lasttime+PSOCK_KEEPALIVE )
{
printf("PSOCKS[%d] of %d (%u %u) lag.%d IDLETIMEOUT\n",i,Numpsocks,ptr->publicport,ptr->sendport,now - ptr->lasttime);
if ( ptr->sendsock != ptr->publicsock && ptr->sendsock >= 0 )
nn_close(ptr->sendsock);
if ( ptr->publicsock >= 0 )
nn_close(ptr->publicsock);
if ( ptr->sendsock >= 0 )
nn_close(ptr->sendsock);
//portable_mutex_lock(&LP_psockmutex);
if ( Numpsocks > 1 )
{
@ -573,13 +601,14 @@ void LP_psockloop(void *_ptr) // printouts seem to be needed for forwarding to w
}
}
void LP_psockadd(int32_t ispaired,int32_t publicsock,uint16_t recvport,int32_t sendsock,uint16_t sendport,char *subaddr,char *publicaddr)
void LP_psockadd(int32_t ispaired,int32_t publicsock,uint16_t recvport,int32_t sendsock,uint16_t sendport,char *subaddr,char *publicaddr,int32_t cmdchannel)
{
struct psock *ptr;
portable_mutex_lock(&LP_psockmutex);
PSOCKS = realloc(PSOCKS,sizeof(*PSOCKS) * (Numpsocks + 1));
ptr = &PSOCKS[Numpsocks++];
ptr->ispaired = ispaired;
ptr->cmdchannel = cmdchannel;
ptr->publicsock = publicsock;
ptr->publicport = recvport;
ptr->sendsock = sendsock;
@ -609,65 +638,111 @@ int32_t LP_psockmark(char *publicaddr)
return(retval);
}
char *LP_psock(char *myipaddr,int32_t ispaired)
char *_LP_psock_create(int32_t *pullsockp,int32_t *pubsockp,char *ipaddr,uint16_t publicport,uint16_t subport,int32_t ispaired,int32_t cmdchannel,bits256 pubkey)
{
char pushaddr[128],subaddr[128]; uint16_t i,publicport,subport,maxiters=100; int32_t timeout,pullsock=-1,pubsock=-1; cJSON *retjson=0;
retjson = cJSON_CreateObject();
publicport = Psockport++;
subport = Psockport++;
for (i=0; i<maxiters; i++,publicport+=2,subport+=2)
int32_t i,pullsock,bindflag=(IAMLP != 0),pubsock,arg; struct LP_pubkey_info *pubp; char pushaddr[128],subaddr[128]; cJSON *retjson = 0;
pullsock = pubsock = -1;
*pullsockp = *pubsockp = -1;
nanomsg_transportname(bindflag,pushaddr,ipaddr,publicport);
nanomsg_transportname(bindflag,subaddr,ipaddr,subport);
if ( (pullsock= nn_socket(AF_SP,ispaired!=0?NN_PAIR:NN_PULL)) >= 0 && (cmdchannel != 0 ||(pubsock= nn_socket(AF_SP,ispaired!=0?NN_PAIR:NN_PAIR)) >= 0) )
{
if ( publicport < MIN_PSOCK_PORT )
publicport = MIN_PSOCK_PORT+1;
if ( subport <= publicport )
subport = publicport + 1;
pullsock = pubsock = -1;
nanomsg_transportname(1,pushaddr,myipaddr,publicport);
nanomsg_transportname(1,subaddr,myipaddr,subport);
if ( (pullsock= nn_socket(AF_SP,ispaired!=0?NN_PAIR:NN_PULL)) >= 0 && (pubsock= nn_socket(AF_SP,ispaired!=0?NN_PAIR:NN_PAIR)) >= 0 )
if ( nn_bind(pullsock,pushaddr) >= 0 && (cmdchannel != 0 || nn_bind(pubsock,subaddr) >= 0) )
{
if ( nn_bind(pullsock,pushaddr) >= 0 && nn_bind(pubsock,subaddr) >= 0 )
arg = 100;
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_SNDTIMEO,&arg,sizeof(arg));
if ( pubsock >= 0 )
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_SNDTIMEO,&arg,sizeof(arg));
arg = 1;
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVTIMEO,&arg,sizeof(arg));
if ( pubsock >= 0 )
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_RCVTIMEO,&arg,sizeof(arg));
arg = 2;
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_MAXTTL,&arg,sizeof(arg));
if ( pubsock >= 0 )
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_MAXTTL,&arg,sizeof(arg));
nanomsg_transportname(0,pushaddr,ipaddr,publicport);
nanomsg_transportname(0,subaddr,ipaddr,subport);
LP_psockadd(ispaired,pullsock,publicport,pubsock,subport,subaddr,pushaddr,cmdchannel);
if ( IAMLP != 0 && bits256_nonz(pubkey) != 0 )
{
timeout = 100;
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout));
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout));
if ( ispaired != 0 )
char str[65];
if ( (pubp= LP_pubkeyadd(pubkey)) != 0 )
{
//maxsize = 1024 * 1024;
//nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize));
}
//if ( ispaired != 0 )
{
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout));
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout));
if ( pubp->pairsock > 0 )
{
printf("warning %s already has pairsock.%d, mark for purge\n",bits256_str(str,pubkey),pubp->pairsock);
for (i=0; i<Numpsocks; i++)
if ( PSOCKS[i].publicsock == pubp->pairsock )
{
PSOCKS[i].lasttime = (uint32_t)time(NULL) - PSOCK_KEEPALIVE - 1;
break;
}
}
printf("pairsock for %s <- %d\n",bits256_str(str,pubkey),pullsock);
pubp->pairsock = pullsock;
}
timeout = 10;
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_MAXTTL,&timeout,sizeof(timeout));
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_MAXTTL,&timeout,sizeof(timeout));
nanomsg_transportname(0,pushaddr,myipaddr,publicport);
nanomsg_transportname(0,subaddr,myipaddr,subport);
LP_psockadd(ispaired,pullsock,publicport,pubsock,subport,subaddr,pushaddr);
jaddstr(retjson,"result","success");
jaddstr(retjson,"LPipaddr",myipaddr);
jaddstr(retjson,"connectaddr",subaddr);
jaddnum(retjson,"connectport",subport);
jaddnum(retjson,"ispaired",ispaired);
jaddstr(retjson,"publicaddr",pushaddr);
jaddnum(retjson,"publicport",publicport);
printf("i.%d publicaddr.(%s) for subaddr.(%s), pullsock.%d pubsock.%d\n",i,pushaddr,subaddr,pullsock,pubsock);
break;
} else printf("bind error on %s or %s\n",pushaddr,subaddr);
if ( pullsock >= 0 )
nn_close(pullsock);
if ( pubsock >= 0 )
nn_close(pubsock);
}
retjson = cJSON_CreateObject();
jaddstr(retjson,"result","success");
jaddstr(retjson,"LPipaddr",ipaddr);
jaddstr(retjson,"connectaddr",subaddr);
jaddnum(retjson,"connectport",subport);
jaddnum(retjson,"ispaired",ispaired);
jaddnum(retjson,"cmdchannel",cmdchannel);
jaddstr(retjson,"publicaddr",pushaddr);
jaddnum(retjson,"publicport",publicport);
//printf("cmd.%d publicaddr.(%s) for subaddr.(%s), pullsock.%d pubsock.%d\n",cmdchannel,pushaddr,subaddr,pullsock,pubsock);
*pullsockp = pullsock;
*pubsockp = pubsock;
return(jprint(retjson,1));
} else printf("bind error on %s or %s\n",pushaddr,subaddr);
if ( pullsock >= 0 )
nn_close(pullsock);
if ( pubsock >= 0 )
nn_close(pubsock);
}
return(0);
}
char *LP_psock(int32_t *pullsockp,char *ipaddr,int32_t ispaired,int32_t cmdchannel,bits256 pubkey)
{
char *retstr=0; uint16_t i,publicport,subport,maxport; int32_t pubsock=-1;
*pullsockp = -1;
//printf("LP_psock ipaddr.%s ispaird.%d cmdchannel.%d\n",ipaddr,ispaired,cmdchannel);
if ( cmdchannel == 0 )
{
maxport = MAX_PSOCK_PORT;
publicport = Psockport++;
subport = Psockport++;
}
else
{
if ( cmdchannel != 0 && bits256_nonz(pubkey) == 0 )
return(clonestr("{\"error\",\"cant do pairsock for null pubkey\"}"));
maxport = 65534;
publicport = subport = Pcmdport++;
}
for (i=0; i<maxport; i++)
{
if ( publicport < MIN_PSOCK_PORT )
publicport = MIN_PSOCK_PORT+1;
if ( cmdchannel == 0 && subport <= publicport )
subport = publicport + 1;
if ( (retstr= _LP_psock_create(pullsockp,&pubsock,ipaddr,publicport,subport,ispaired,cmdchannel,pubkey)) != 0 )
{
//printf("LP_psock returns.(%s)\n",retstr);
return(retstr);
}
if ( cmdchannel == 0 )
publicport+=2, subport+=2;
else publicport++, subport++;
}
if ( Psockport > MAX_PSOCK_PORT )
if ( Psockport >= MAX_PSOCK_PORT )
Psockport = MIN_PSOCK_PORT;
if ( i == maxiters )
jaddstr(retjson,"error","cant find psock ports");
return(jprint(retjson,1));
if ( Pcmdport >= 65534 )
Pcmdport = MAX_PSOCK_PORT;
return(clonestr("{\"error\",\"cant find psock ports\"}"));
}
/*
@ -681,23 +756,26 @@ char *LP_psock(char *myipaddr,int32_t ispaired)
*/
char *issue_LP_psock(char *destip,uint16_t destport,int32_t ispaired)
char *issue_LP_psock(char *destip,uint16_t destport,int32_t ispaired,int32_t cmdchannel)
{
char url[512],*retstr;
sprintf(url,"http://%s:%u/api/stats/psock?ispaired=%d",destip,destport-1,ispaired);
char str[65],url[512],*retstr;
sprintf(url,"http://%s:%u/api/stats/psock?ispaired=%d&cmdchannel=%d&pubkey=%s",destip,destport-1,ispaired,cmdchannel,bits256_str(str,G.LP_mypub25519));
//return(LP_issue_curl("psock",destip,destport,url));
retstr = issue_curlt(url,LP_HTTP_TIMEOUT*3);
printf("issue_LP_psock got (%s) from %s\n",retstr,destip);
//printf("issue_LP_psock got (%s) from %s\n",retstr,url); // this is needed?!
return(retstr);
}
uint16_t LP_psock_get(char *connectaddr,char *publicaddr,int32_t ispaired)
uint16_t LP_psock_get(char *connectaddr,char *publicaddr,int32_t ispaired,int32_t cmdchannel,char *ipaddr)
{
uint16_t publicport = 0; char *retstr,*addr; cJSON *retjson; struct LP_peerinfo *peer,*tmp;
connectaddr[0] = publicaddr[0] = 0;
HASH_ITER(hh,LP_peerinfos,peer,tmp)
{
if ( ipaddr != 0 && strcmp(ipaddr,peer->ipaddr) != 0 )
continue;
connectaddr[0] = publicaddr[0] = 0;
if ( peer->errors < LP_MAXPEER_ERRORS && (retstr= issue_LP_psock(peer->ipaddr,peer->port,ispaired)) != 0 )
if ( peer->errors < LP_MAXPEER_ERRORS && (retstr= issue_LP_psock(peer->ipaddr,peer->port,ispaired,cmdchannel)) != 0 )
{
if ( (retjson= cJSON_Parse(retstr)) != 0 )
{
@ -706,26 +784,22 @@ uint16_t LP_psock_get(char *connectaddr,char *publicaddr,int32_t ispaired)
safecopy(publicaddr,addr,128);
if ( (addr= jstr(retjson,"connectaddr")) != 0 )
safecopy(connectaddr,addr,128);
//if ( (addr= jstr(retjson,"connectaddr2")) != 0 )
// safecopy(connectaddr2,addr,128);
if ( publicaddr[0] != 0 && connectaddr[0] != 0 )
publicport = juint(retjson,"publicport");
free_json(retjson);
}
printf("got.(%s) connect.%s public.%s\n",retstr,connectaddr,publicaddr);
//printf("got.(%s) connect.%s public.%s publicport.%u\n",retstr,connectaddr,publicaddr,publicport);
free(retstr);
return(publicport);
} else printf("error psock from %s:%u\n",peer->ipaddr,peer->port);
if ( publicport != 0 )
break;
}
return(publicport);
return(0);
}
int32_t LP_initpublicaddr(void *ctx,uint16_t *mypullportp,char *publicaddr,char *myipaddr,uint16_t mypullport,int32_t ispaired)
{
int32_t nntype,pullsock,timeout; char bindaddr[128],connectaddr[128];
*mypullportp = mypullport;
//connectaddr2[0] = 0;
if ( ispaired == 0 )
{
if ( LP_canbind != 0 )
@ -736,7 +810,6 @@ int32_t LP_initpublicaddr(void *ctx,uint16_t *mypullportp,char *publicaddr,char
{
nanomsg_transportname(0,publicaddr,myipaddr,mypullport);
nanomsg_transportname(1,bindaddr,myipaddr,mypullport);
//nanomsg_transportname2(1,bindaddr2,myipaddr,mypullport);
}
else
{
@ -748,7 +821,7 @@ int32_t LP_initpublicaddr(void *ctx,uint16_t *mypullportp,char *publicaddr,char
}
while ( *mypullportp == 0 )
{
if ( (*mypullportp= LP_psock_get(connectaddr,publicaddr,ispaired)) != 0 )
if ( (*mypullportp= LP_psock_get(connectaddr,publicaddr,ispaired,0,0)) != 0 )
break;
sleep(10);
printf("try to get publicaddr again\n");
@ -767,8 +840,6 @@ int32_t LP_initpublicaddr(void *ctx,uint16_t *mypullportp,char *publicaddr,char
}
else
{
//if ( connectaddr2[0] != 0 && nn_connect(pullsock,connectaddr2) > 0 )
// printf("%s ",connectaddr2);
printf("nntype.%d NN_PAIR.%d connect to %s connectsock.%d\n",nntype,NN_PAIR,connectaddr,pullsock);
}
}
@ -779,14 +850,10 @@ int32_t LP_initpublicaddr(void *ctx,uint16_t *mypullportp,char *publicaddr,char
printf("bind to %s error for %s: %s\n",bindaddr,publicaddr,nn_strerror(nn_errno()));
exit(-1);
}
//if ( nn_bind(pullsock,bindaddr2) >= 0 )
// printf("bound to %s\n",bindaddr2);
}
timeout = 100;
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout));
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout));
//maxsize = 2 * 1024 * 1024;
//nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize));
if ( nntype == NN_SUB )
nn_setsockopt(pullsock,NN_SUB,NN_SUB_SUBSCRIBE,"",0);
}

102
iguana/exchanges/LP_ordermatch.c

@ -478,13 +478,14 @@ int32_t LP_connectstartbob(void *ctx,int32_t pubsock,char *base,char *rel,double
if ( (kmdcoin= LP_coinfind("KMD")) != 0 )
jadd(reqjson,"proof",LP_instantdex_txids(0,kmdcoin->smartaddr));
char str[65]; printf("BOB pubsock.%d binds to %d (%s)\n",pubsock,pair,bits256_str(str,qp->desthash));
bits256 zero;
memset(zero.bytes,0,sizeof(zero));
LP_reserved_msg(1,base,rel,zero,jprint(reqjson,0));
sleep(1);
LP_reserved_msg(1,qp->srccoin,qp->destcoin,qp->desthash,jprint(reqjson,0));
sleep(1);
LP_reserved_msg(0,base,rel,zero,jprint(reqjson,0));
if ( 0 )
{
bits256 zero;
memset(zero.bytes,0,sizeof(zero));
LP_reserved_msg(1,base,rel,zero,jprint(reqjson,0));
LP_reserved_msg(0,base,rel,zero,jprint(reqjson,0));
}
free_json(reqjson);
LP_importaddress(qp->destcoin,qp->destaddr);
LP_otheraddress(qp->srccoin,otheraddr,qp->destcoin,qp->destaddr);
@ -512,6 +513,7 @@ char *LP_trade(void *ctx,char *myipaddr,int32_t mypubsock,struct LP_quoteinfo *q
qp->aliceid = LP_aliceid_calc(qp->desttxid,qp->destvout,qp->feetxid,qp->feevout);
if ( (qp->tradeid= tradeid) == 0 )
qp->tradeid = LP_rand();
qp->srchash = destpubkey;
LP_query(ctx,myipaddr,mypubsock,"request",qp);
LP_Alicequery = *qp, LP_Alicemaxprice = maxprice, Alice_expiration = qp->timestamp + timeout, LP_Alicedestpubkey = destpubkey;
char str[65]; printf("LP_trade %s/%s %.8f vol %.8f dest.(%s) maxprice %.8f\n",qp->srccoin,qp->destcoin,dstr(qp->satoshis),dstr(qp->destsatoshis),bits256_str(str,LP_Alicedestpubkey),maxprice);
@ -835,8 +837,6 @@ struct LP_quoteinfo *LP_trades_gotrequest(void *ctx,struct LP_quoteinfo *qp,stru
qp = newqp;
if ( (coin= LP_coinfind(qp->srccoin)) == 0 )
return(0);
//if ( strcmp(qp->srccoin,"GRS") == 0 || strcmp(qp->destcoin,"GRS") == 0 )
// printf("LP_trades_gotrequest %s/%s myprice %.8f\n",qp->srccoin,qp->destcoin,LP_trades_bobprice(&bid,&ask,qp));
if ( (myprice= LP_trades_bobprice(&bid,&ask,qp)) == 0. )
return(0);
autxo = &A;
@ -855,8 +855,6 @@ struct LP_quoteinfo *LP_trades_gotrequest(void *ctx,struct LP_quoteinfo *qp,stru
memset(&qp->txid2,0,sizeof(qp->txid2));
qp->vout = qp->vout2 = -1;
} else return(0);
//if ( strcmp(qp->srccoin,"GRS") == 0 || strcmp(qp->destcoin,"GRS") == 0 )
// printf("LP_trades_gotrequest qprice %.8f vs myprice %.8f\n",qprice,myprice);
if ( qprice > myprice )
{
r = (LP_rand() % 100);
@ -873,7 +871,9 @@ struct LP_quoteinfo *LP_trades_gotrequest(void *ctx,struct LP_quoteinfo *qp,stru
printf("request from blacklisted %s, ignore\n",bits256_str(str,qp->desthash));
return(0);
}
printf("LP_address_utxo_reset\n");
LP_address_utxo_reset(coin);
printf("done LP_address_utxo_reset\n");
if ( (butxo= LP_address_myutxopair(butxo,1,utxos,max,LP_coinfind(qp->srccoin),qp->coinaddr,qp->txfee,dstr(qp->destsatoshis),price,qp->desttxfee)) != 0 )
{
strcpy(qp->gui,G.gui);
@ -893,8 +893,6 @@ struct LP_quoteinfo *LP_trades_gotrequest(void *ctx,struct LP_quoteinfo *qp,stru
}
if ( (qprice= LP_trades_pricevalidate(qp,coin,myprice)) < 0. )
return(0);
//if ( strcmp(qp->srccoin,"GRS") == 0 || strcmp(qp->destcoin,"GRS") == 0 )
// printf("final checks\n");
if ( LP_allocated(qp->txid,qp->vout) == 0 && LP_allocated(qp->txid2,qp->vout2) == 0 )
{
reqjson = LP_quotejson(qp);
@ -905,15 +903,7 @@ struct LP_quoteinfo *LP_trades_gotrequest(void *ctx,struct LP_quoteinfo *qp,stru
jaddnum(reqjson,"quotetime",qp->quotetime);
jaddnum(reqjson,"pending",qp->timestamp + LP_RESERVETIME);
jaddstr(reqjson,"method","reserved");
bits256 zero;
memset(zero.bytes,0,sizeof(zero));
LP_reserved_msg(1,qp->srccoin,qp->destcoin,zero,jprint(reqjson,0));
if ( 0 )//if ( IAMLP == 0 )
{
sleep(1);
LP_reserved_msg(1,qp->srccoin,qp->destcoin,qp->desthash,jprint(reqjson,0));
}
//LP_reserved_msg(0,qp->srccoin,qp->destcoin,zero,jprint(reqjson,0));
LP_reserved_msg(1,qp->srccoin,qp->destcoin,qp->desthash,jprint(reqjson,0));
free_json(reqjson);
return(qp);
} else printf("request processing selected ineligible utxos?\n");
@ -1023,7 +1013,7 @@ void LP_tradesloop(void *ctx)
now = (uint32_t)time(NULL);
Q = qtp->Q;
funcid = qtp->funcid;
//printf("dequeue %p funcid.%d aliceid.%llu iambob.%d\n",qtp,funcid,(long long)qtp->aliceid,qtp->iambob);
printf("dequeue %p funcid.%d aliceid.%llu iambob.%d\n",qtp,funcid,(long long)qtp->aliceid,qtp->iambob);
portable_mutex_lock(&LP_tradesmutex);
DL_DELETE(LP_tradesQ,qtp);
HASH_FIND(hh,LP_trades,&qtp->aliceid,sizeof(qtp->aliceid),tp);
@ -1047,7 +1037,13 @@ void LP_tradesloop(void *ctx)
}
nonz++;
tp->firstprocessed = tp->lastprocessed = (uint32_t)time(NULL);
//printf("iambob.%d funcid.%d vs %d\n",tp->iambob,funcid,LP_REQUEST);
if ( funcid == LP_CONNECT && tp->negotiationdone == 0 ) // bob all done
{
flag = 1;
tp->negotiationdone = now;
printf("bob sets negotiationdone.%u\n",now);
LP_trades_gotconnect(ctx,&tp->Q,&tp->Qs[LP_CONNECT],tp->pairstr);
}
}
continue;
}
@ -1058,7 +1054,10 @@ void LP_tradesloop(void *ctx)
//printf("finished dequeue %p funcid.%d aliceid.%llu iambob.%d\n",qtp,funcid,(long long)qtp->aliceid,qtp->iambob);
free(qtp);
if ( tp->negotiationdone != 0 )
{
printf("iambob.%d negotiationdone.%u\n",tp->iambob,tp->negotiationdone);
continue;
}
flag = 0;
if ( qtp->iambob == tp->iambob )
{
@ -1082,6 +1081,7 @@ void LP_tradesloop(void *ctx)
{
flag = 1;
tp->negotiationdone = now;
printf("bob sets negotiationdone.%u\n",now);
LP_trades_gotconnect(ctx,&tp->Q,&tp->Qs[LP_CONNECT],tp->pairstr);
}
}
@ -1090,7 +1090,7 @@ void LP_tradesloop(void *ctx)
tp->lastprocessed = (uint32_t)time(NULL);
nonz++;
}
}
} else printf("qtp->iambob.%d vs tp->iambob.%d\n",qtp->iambob,tp->iambob);
}
HASH_ITER(hh,LP_trades,tp,tmp)
{
@ -1110,6 +1110,7 @@ void LP_tradesloop(void *ctx)
{
if ( tp->connectsent == 0 )
{
tp->negotiationdone = (uint32_t)time(NULL);
LP_Alicemaxprice = tp->bestprice;
LP_reserved(ctx,LP_myipaddr,LP_mypubsock,&tp->Qs[LP_CONNECT]); // send LP_CONNECT
tp->connectsent = now;
@ -1117,22 +1118,31 @@ void LP_tradesloop(void *ctx)
}
else if ( now < tp->firstprocessed+timeout && ((tp->firstprocessed - now) % 10) == 9 )
{
LP_Alicemaxprice = tp->bestprice;
LP_reserved(ctx,LP_myipaddr,LP_mypubsock,&tp->Qs[LP_CONNECT]); // send LP_CONNECT
printf("repeat LP_connect aliceid.%llu %.8f\n",(long long)tp->aliceid,tp->bestprice);
//LP_Alicemaxprice = tp->bestprice;
//LP_reserved(ctx,LP_myipaddr,LP_mypubsock,&tp->Qs[LP_CONNECT]); // send LP_CONNECT
printf("mark slow LP_connect aliceid.%llu %.8f\n",(long long)tp->aliceid,tp->bestprice);
if ( (pubp= LP_pubkeyfind(tp->Qs[LP_CONNECT].srchash)) != 0 )
pubp->slowresponse++;
}
}
}
if ( now > tp->firstprocessed+timeout*10 )
{
//printf("purge swap aliceid.%llu\n",(long long)tp->aliceid);
portable_mutex_lock(&LP_tradesmutex);
HASH_DELETE(hh,LP_trades,tp);
portable_mutex_unlock(&LP_tradesmutex);
free(tp);
}
}
}
now = (uint32_t)time(NULL);
HASH_ITER(hh,LP_trades,tp,tmp)
{
timeout = LP_AUTOTRADE_TIMEOUT;
if ( (coin= LP_coinfind(tp->Q.srccoin)) != 0 && coin->electrum != 0 )
timeout += LP_AUTOTRADE_TIMEOUT * .5;
if ( (coin= LP_coinfind(tp->Q.destcoin)) != 0 && coin->electrum != 0 )
timeout += LP_AUTOTRADE_TIMEOUT * .5;
if ( now > tp->firstprocessed+timeout*10 )
{
printf("purge swap aliceid.%llu\n",(long long)tp->aliceid);
portable_mutex_lock(&LP_tradesmutex);
HASH_DELETE(hh,LP_trades,tp);
portable_mutex_unlock(&LP_tradesmutex);
free(tp);
}
}
if ( nonz == 0 )
@ -1174,10 +1184,10 @@ int32_t LP_tradecommand(void *ctx,char *myipaddr,int32_t pubsock,cJSON *argjson,
LP_tradecommand_log(argjson);
rq = ((uint64_t)Q.R.requestid << 32) | Q.R.quoteid;
printf("%-4d (%-10u %10u) %12s id.%-20llu %5s/%-5s %12.8f -> %12.8f (%11.8f) | RT.%d %d n%d\n",(uint32_t)time(NULL) % 3600,Q.R.requestid,Q.R.quoteid,method,(long long)Q.aliceid,Q.srccoin,Q.destcoin,dstr(Q.satoshis),dstr(Q.destsatoshis),(double)Q.destsatoshis/Q.satoshis,LP_RTcount,LP_swapscount,G.netid);
if ( Q.timestamp > 0 && time(NULL) > Q.timestamp + LP_AUTOTRADE_TIMEOUT*20 ) // eat expired packets
if ( Q.timestamp > 0 && time(NULL) > Q.timestamp + LP_AUTOTRADE_TIMEOUT*20 ) // eat expired packets, some old timestamps floating about?
{
//printf("aliceid.%llu is expired by %d\n",(long long)Q.aliceid,(uint32_t)time(NULL) - (Q.timestamp + LP_AUTOTRADE_TIMEOUT*20));
//return(1);
printf("aliceid.%llu is expired by %d\n",(long long)Q.aliceid,(uint32_t)time(NULL) - (Q.timestamp + LP_AUTOTRADE_TIMEOUT*20));
return(1);
}
//LP_autoprices_update(method,Q.srccoin,dstr(Q.satoshis),Q.destcoin,dstr(Q.destsatoshis));
retval = 1;
@ -1187,7 +1197,7 @@ int32_t LP_tradecommand(void *ctx,char *myipaddr,int32_t pubsock,cJSON *argjson,
{
bestprice = LP_bob_competition(&counter,aliceid,qprice,1);
//printf("%s lag %ld: aliceid.%llu price %.8f -> bestprice %.8f Alice max %.8f\n",jprint(argjson,0),Q.quotetime - (time(NULL)-20),(long long)aliceid,qprice,bestprice,LP_Alicemaxprice);
if ( 0 )
if ( 1 )
{
if ( LP_Alicemaxprice == 0. )
return(retval);
@ -1200,7 +1210,7 @@ int32_t LP_tradecommand(void *ctx,char *myipaddr,int32_t pubsock,cJSON *argjson,
} else printf("got reserved response from destpubkey %s\n",bits256_str(str,Q.srchash));
}
}
if ( bits256_cmp(G.LP_mypub25519,Q.desthash) == 0 && bits256_cmp(G.LP_mypub25519,Q.srchash) != 0 )
if ( bits256_cmp(G.LP_mypub25519,Q.desthash) == 0 && bits256_cmp(G.LP_mypub25519,Q.srchash) != 0 ) // alice
{
if ( Qtrades == 0 )
{
@ -1216,7 +1226,7 @@ int32_t LP_tradecommand(void *ctx,char *myipaddr,int32_t pubsock,cJSON *argjson,
else if ( strcmp(method,"connected") == 0 )
{
bestprice = LP_bob_competition(&counter,aliceid,qprice,1000);
if ( bits256_cmp(G.LP_mypub25519,Q.desthash) == 0 && bits256_cmp(G.LP_mypub25519,Q.srchash) != 0 )
if ( bits256_cmp(G.LP_mypub25519,Q.desthash) == 0 && bits256_cmp(G.LP_mypub25519,Q.srchash) != 0 ) // alice
{
static uint64_t rqs[1024];
for (i=0; i<sizeof(rqs)/sizeof(*rqs); i++)
@ -1252,19 +1262,17 @@ int32_t LP_tradecommand(void *ctx,char *myipaddr,int32_t pubsock,cJSON *argjson,
printf("{\"error\":\"GAME can only be alice coin\"}\n");
return(retval);
}
if ( strcmp(method,"request") == 0 )
if ( strcmp(method,"request") == 0 ) // bob
{
bestprice = LP_bob_competition(&counter,aliceid,qprice,-1);
//if ( strcmp(Q.srccoin,"GRS") == 0 || strcmp(Q.destcoin,"GRS") == 0 )
// printf("%s lag %ld: aliceid.%llu price %.8f -> bestprice %.8f\n",jprint(argjson,0),Q.quotetime - (time(NULL)-20),(long long)aliceid,qprice,bestprice);
if ( Qtrades == 0 )
if ( Qtrades == 0 || (bits256_cmp(Q.srchash,G.LP_mypub25519) == 0 && bits256_cmp(G.LP_mypub25519,Q.desthash) != 0) )
LP_trades_gotrequest(ctx,&Q,&Q2,jstr(argjson,"pair"));
else LP_tradecommandQ(&Q,jstr(argjson,"pair"),LP_REQUEST);
}
else if ( strcmp(method,"connect") == 0 )
{
LP_bob_competition(&counter,aliceid,qprice,1000);
if ( bits256_cmp(G.LP_mypub25519,Q.srchash) == 0 && bits256_cmp(G.LP_mypub25519,Q.desthash) != 0 )
if ( bits256_cmp(G.LP_mypub25519,Q.srchash) == 0 && bits256_cmp(G.LP_mypub25519,Q.desthash) != 0 ) // bob
{
static uint64_t rqs[1024];
for (i=0; i<sizeof(rqs)/sizeof(*rqs); i++)
@ -1279,7 +1287,7 @@ int32_t LP_tradecommand(void *ctx,char *myipaddr,int32_t pubsock,cJSON *argjson,
printf("CONNECT.(%s)\n",jprint(argjson,0));
if ( (proof= jarray(&num,argjson,"proof")) != 0 && num > 0 )
Q.othercredits = LP_instantdex_proofcheck(Q.destcoin,Q.destaddr,proof,num);
if ( 1 || Qtrades == 0 )
if ( Qtrades == 0 )
LP_trades_gotconnect(ctx,&Q,&Q2,jstr(argjson,"pair"));
else LP_tradecommandQ(&Q,jstr(argjson,"pair"),LP_CONNECT);
}

90
iguana/exchanges/LP_peers.c

@ -56,6 +56,58 @@ char *LP_peers()
return(jprint(peersjson,1));
}
void LP_cmdchannel(struct LP_peerinfo *peer)
{
char *hellostr = "{\"method\":\"hello\"}";
char connectaddr[128],publicaddr[128],*retstr; int32_t pairsock=-1,pubsock,sentbytes=-2; uint16_t cmdport;
if ( bits256_nonz(G.LP_mypub25519) == 0 || strcmp(G.USERPASS,"1d8b27b21efabcd96571cd56f91a40fb9aa4cc623d273c63bf9223dc6f8cd81f") == 0 )
return;
if ( (cmdport= LP_psock_get(connectaddr,publicaddr,1,1,peer->ipaddr)) != 0 )
{
if ( (retstr= _LP_psock_create(&pairsock,&pubsock,peer->ipaddr,cmdport,cmdport,1,1,G.LP_mypub25519)) != 0 )
{
if ( nn_connect(pairsock,connectaddr) < 0 )
printf("error connecting cmdchannel with %s\n",connectaddr);
else
{
peer->pairsock = pairsock;
sentbytes = nn_send(peer->pairsock,hellostr,(int32_t)strlen(hellostr)+1,0);
printf("cmdchannel %d created %s sent.%d\n",peer->pairsock,retstr,sentbytes);
}
free(retstr);
}
} else printf("error getting cmdchannel with %s\n",peer->ipaddr);
}
void LP_cmdchannels()
{
struct LP_peerinfo *peer,*tmp;
if ( IAMLP == 0 )
{
HASH_ITER(hh,LP_peerinfos,peer,tmp)
{
if ( peer->pairsock <= 0 )
LP_cmdchannel(peer);
}
}
}
void LP_peer_pairsock(bits256 pubkey)
{
struct LP_peerinfo *peer,*tmp;
if ( IAMLP == 0 )
{
HASH_ITER(hh,LP_peerinfos,peer,tmp)
{
if ( bits256_cmp(pubkey,peer->pubkey) == 0 )
{
peer->pairsock = -1;
break;
}
}
}
}
struct LP_peerinfo *LP_addpeer(struct LP_peerinfo *mypeer,int32_t mypubsock,char *ipaddr,uint16_t port,uint16_t pushport,uint16_t subport,int32_t isLP,uint32_t sessionid,uint16_t netid)
{
uint32_t ipbits; int32_t valid,pushsock,subsock,timeout; char checkip[64],pushaddr[64],subaddr[64]; struct LP_peerinfo *peer = 0;
@ -79,6 +131,8 @@ struct LP_peerinfo *LP_addpeer(struct LP_peerinfo *mypeer,int32_t mypubsock,char
if ( (peer->isLP= isLP) != 0 )
LP_numactive_LP++;
}
if ( IAMLP == 0 && peer->pairsock <= 0 )
LP_cmdchannel(peer);
/*if ( numpeers > peer->numpeers )
peer->numpeers = numpeers;
if ( numutxos > peer->numutxos )
@ -86,7 +140,7 @@ struct LP_peerinfo *LP_addpeer(struct LP_peerinfo *mypeer,int32_t mypubsock,char
if ( peer->sessionid == 0 )
peer->sessionid = sessionid;*/
}
else if ( IAMLP != 0 || LP_numactive_LP < 3 )
else if ( IAMLP != 0 || LP_numactive_LP < 10 )
{
//printf("addpeer (%s:%u) pushport.%u subport.%u\n",ipaddr,port,pushport,subport);
peer = calloc(1,sizeof(*peer));
@ -117,8 +171,6 @@ struct LP_peerinfo *LP_addpeer(struct LP_peerinfo *mypeer,int32_t mypubsock,char
nn_setsockopt(pushsock,NN_SOL_SOCKET,NN_MAXTTL,&timeout,sizeof(timeout));
timeout = 100;
nn_setsockopt(pushsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout));
//maxsize = 2 * 1024 * 1024;
//nn_setsockopt(pushsock,NN_SOL_SOCKET,NN_SNDBUF,&maxsize,sizeof(maxsize));
printf("connected to push.(%s) pushsock.%d valid.%d | ",pushaddr,pushsock,valid);
peer->connected = (uint32_t)time(NULL);
peer->pushsock = pushsock;
@ -128,12 +180,9 @@ struct LP_peerinfo *LP_addpeer(struct LP_peerinfo *mypeer,int32_t mypubsock,char
nn_setsockopt(subsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout));
nn_setsockopt(subsock,NN_SUB,NN_SUB_SUBSCRIBE,"",0);
nanomsg_transportname(0,subaddr,peer->ipaddr,subport);
//nanomsg_transportname2(0,subaddr2,peer->ipaddr,subport);
valid = 0;
if ( nn_connect(subsock,subaddr) >= 0 )
valid++;
//if ( nn_connect(subsock,subaddr2) >= 0 )
// valid++;
if ( valid > 0 )
{
peer->subsock = subsock;
@ -165,24 +214,8 @@ struct LP_peerinfo *LP_addpeer(struct LP_peerinfo *mypeer,int32_t mypubsock,char
printf("_LPaddpeer %s -> numpeers.%d mypubsock.%d other.(%d)\n",ipaddr,mypeer->numpeers,mypubsock,isLP);
} else peer->numpeers = 1; // will become mypeer
portable_mutex_unlock(&LP_peermutex);
/*if ( IAMLP != 0 && mypubsock >= 0 )
{
//struct iguana_info *coin,*ctmp; char busaddr[64]; //
//memset(zero.bytes,0,sizeof(zero));
//LP_send(mypubsock,msg,(int32_t)strlen(msg)+1,1);
//LP_reserved_msg(0,"","",zero,jprint(LP_peerjson(peer),1));
if ( 0 )
{
HASH_ITER(hh,LP_coins,coin,ctmp)
{
if ( coin->bussock >= 0 )
{
nanomsg_transportname(0,busaddr,peer->ipaddr,coin->busport);
nn_connect(coin->bussock,busaddr);
}
}
}
}*/
if ( IAMLP == 0 && peer->pairsock <= 0 )
LP_cmdchannel(peer);
} else printf("%s invalid pushsock.%d or subsock.%d\n",peer->ipaddr,peer->pushsock,peer->subsock);
}
} else printf("LP_addpeer: checkip.(%s) vs (%s)\n",checkip,ipaddr);
@ -235,13 +268,18 @@ void LP_closepeers()
return(bussock);
}*/
void LP_peer_recv(char *ipaddr,int32_t ismine)
void LP_peer_recv(char *ipaddr,int32_t ismine,struct LP_pubkey_info *pubp)
{
struct LP_peerinfo *peer;
if ( (peer= LP_peerfind((uint32_t)calc_ipbits(ipaddr),RPC_port)) != 0 )
{
peer->numrecv++;
//if ( ismine != 0 )
if ( ismine != 0 && bits256_cmp(G.LP_mypub25519,pubp->pubkey) != 0 && (bits256_nonz(peer->pubkey) == 0 || pubp->pairsock <= 0) )
{
peer->pubkey = pubp->pubkey;
pubp->pairsock = peer->pairsock;
char str[65]; printf("set pubkey for %s <- %s, pairsock.%d\n",ipaddr,bits256_str(str,pubp->pubkey),pubp->pairsock);
}
peer->recvtime = (uint32_t)time(NULL);
}
}

1
iguana/exchanges/LP_privkey.c

@ -417,6 +417,7 @@ int32_t LP_passphrase_init(char *passphrase,char *gui,uint16_t netid,char *seedn
LP_priceinfos_clear();
G.USERPASS_COUNTER = counter;
G.initializing = 0;
LP_cmdchannels();
return(0);
}

11
iguana/exchanges/LP_signatures.c

@ -547,7 +547,7 @@ void LP_notify_pubkeys(void *ctx,int32_t pubsock)
} else printf("no LPipaddr\n");
}
jaddnum(reqjson,"session",G.LP_sessionid);
LP_reserved_msg(0,"","",zero,jprint(reqjson,1));
LP_reserved_msg(1,"","",zero,jprint(reqjson,1));
}
char *LP_notify_recv(cJSON *argjson)
@ -561,7 +561,7 @@ char *LP_notify_recv(cJSON *argjson)
if ( (ipaddr= jstr(argjson,"isLP")) != 0 )
{
//printf("notify got isLP %s %d\n",ipaddr,jint(argjson,"ismine"));
LP_peer_recv(ipaddr,jint(argjson,"ismine"));
LP_peer_recv(ipaddr,jint(argjson,"ismine"),pubp);
if ( IAMLP != 0 && G.LP_IAMLP == 0 && strcmp(ipaddr,LP_myipaddr) == 0 )
{
if ( bits256_cmp(pub,G.LP_mypub25519) != 0 )
@ -675,12 +675,11 @@ void LP_query(void *ctx,char *myipaddr,int32_t mypubsock,char *method,struct LP_
//if ( bits256_nonz(qp->srchash) == 0 || strcmp(method,"request") != 0 )
{
memset(&zero,0,sizeof(zero));
LP_reserved_msg(1,qp->srccoin,qp->destcoin,zero,clonestr(msg));
if ( strcmp(method,"connect") == 0 )
if ( bits256_nonz(qp->srchash) != 0 )
LP_reserved_msg(1,qp->srccoin,qp->destcoin,qp->srchash,clonestr(msg));
else
{
sleep(1);
LP_reserved_msg(1,qp->srccoin,qp->destcoin,zero,clonestr(msg));
sleep(1);
LP_reserved_msg(0,qp->srccoin,qp->destcoin,zero,clonestr(msg));
}
free(msg);

6
iguana/exchanges/LP_utxo.c

@ -447,7 +447,6 @@ struct LP_address *LP_address_utxo_reset(struct iguana_info *coin)
{
struct LP_address *ap; struct LP_address_utxo *up,*tmp; int32_t i,n,m,vout,height; cJSON *array,*item,*txobj; bits256 zero; int64_t value; bits256 txid; uint32_t now;
LP_address(coin,coin->smartaddr);
//printf("call listunspent issue %s (%s)\n",coin->symbol,coin->smartaddr);
memset(zero.bytes,0,sizeof(zero));
LP_listunspent_issue(coin->symbol,coin->smartaddr,2,zero,zero);
if ( (ap= LP_addressfind(coin,coin->smartaddr)) == 0 )
@ -455,8 +454,12 @@ struct LP_address *LP_address_utxo_reset(struct iguana_info *coin)
printf("LP_address_utxo_reset: cant find address data\n");
return(0);
}
if ( time(NULL) < coin->lastresetutxo+30 )
return(ap);
coin->lastresetutxo = (uint32_t)time(NULL);
if ( (array= LP_listunspent(coin->symbol,coin->smartaddr,zero,zero)) != 0 )
{
//printf("clear ap->utxos\n");
DL_FOREACH_SAFE(ap->utxos,up,tmp)
{
portable_mutex_lock(&coin->addrmutex);
@ -467,6 +470,7 @@ struct LP_address *LP_address_utxo_reset(struct iguana_info *coin)
DL_APPEND(LP_garbage_collector2,up);
portable_mutex_unlock(&LP_gcmutex);
}
//printf("done clearing ap->utxos\n");
now = (uint32_t)time(NULL);
if ( (n= cJSON_GetArraySize(array)) > 0 )
{

1
iguana/exchanges/stats.c

@ -568,6 +568,7 @@ char *stats_rpcparse(char *retbuf,int32_t bufsize,int32_t *jsonflagp,int32_t *po
free_json(json);
if ( tmpjson != 0 )
free(tmpjson);
//printf("stats_JSON rpc return.(%s)\n",retstr);
return(retstr);
}
free_json(argjson);

Loading…
Cancel
Save