Browse Source

Test

etomic
jl777 8 years ago
parent
commit
b2f5e3e426
  1. 8
      iguana/exchanges/LP_commands.c
  2. 11
      iguana/exchanges/LP_forwarding.c
  3. 2
      iguana/exchanges/LP_include.h
  4. 226
      iguana/exchanges/LP_nativeDEX.c
  5. 315
      iguana/exchanges/LP_network.c
  6. 42
      iguana/exchanges/LP_ordermatch.c
  7. 37
      iguana/exchanges/LP_rpc.c
  8. 11
      iguana/exchanges/LP_transaction.c

8
iguana/exchanges/LP_commands.c

@ -211,6 +211,12 @@ forwardhex(pubkey,hex)\n\
} else retstr = clonestr("{\"error\":\"cant recurse forwards\"}");
}
else if ( strcmp(method,"keepalive") == 0 )
{
printf("got keepalive lag.%d\n",(int32_t)time(NULL) - LP_deadman_switch);
LP_deadman_switch = (uint32_t)time(NULL);
return(0);
}
else if ( strcmp(method,"getpeers") == 0 )
return(LP_peers());
else if ( strcmp(method,"getutxos") == 0 )
@ -232,6 +238,8 @@ forwardhex(pubkey,hex)\n\
return(LP_lookup(jbits256(argjson,"client")));
else if ( strcmp(method,"forwardhex") == 0 )
retstr = LP_forwardhex(pubsock,jbits256(argjson,"pubkey"),jstr(argjson,"hex"));
else if ( strcmp(method,"psock") == 0 )
return(LP_psock(myipaddr,jint(argjson,"ispaired")));
else if ( strcmp(method,"notify") == 0 )
retstr = clonestr("{\"result\":\"success\",\"notify\":\"received\"}");
}

11
iguana/exchanges/LP_forwarding.c

@ -116,10 +116,13 @@ char *LP_register(bits256 pubkey,char *ipaddr,uint16_t port)
ptr->lasttime = (uint32_t)time(NULL);
if ( ptr->pushsock >= 0 )
{
nn_close(ptr->pushsock);
printf("recreate pushsock for %s\n",pushaddr);
if ( (ptr->pushsock= LP_pushsock_create(ptr,pushaddr)) < 0 )
return(clonestr("{\"error\":\"couldnt recreate pushsock\",\"registered\":0}"));
if ( strcmp(pushaddr,ptr->pushaddr) != 0 )
{
nn_close(ptr->pushsock);
printf("recreate pushsock for %s\n",pushaddr);
if ( (ptr->pushsock= LP_pushsock_create(ptr,pushaddr)) < 0 )
return(clonestr("{\"error\":\"couldnt recreate pushsock\",\"registered\":0}"));
} else printf("no need to create identical endpoint\n");
}
return(clonestr("{\"error\":\"already registered\",\"registered\":1}"));
}

2
iguana/exchanges/LP_include.h

@ -235,6 +235,8 @@ struct LP_peerinfo *LP_peerfind(uint32_t ipbits,uint16_t port);
char *LP_command_process(char *myipaddr,int32_t pubsock,cJSON *argjson,uint8_t *data,int32_t datalen,double profitmargin);
void LP_availableset(struct LP_utxoinfo *utxo);
int32_t LP_iseligible(uint64_t *valp,uint64_t *val2p,int32_t iambob,char *symbol,bits256 txid,int32_t vout,uint64_t satoshis,bits256 txid2,int32_t vout2,bits256 pubkey);
int32_t LP_pullsock_check(char **retstrp,char *myipaddr,int32_t pubsock,int32_t pullsock,double profitmargin);
uint16_t LP_psock_get(char *connectaddr,char *publicaddr,int32_t ispaired);
#endif

226
iguana/exchanges/LP_nativeDEX.c

@ -20,12 +20,13 @@
#include <stdio.h>
#include "LP_include.h"
portable_mutex_t LP_peermutex,LP_UTXOmutex,LP_utxomutex,LP_commandmutex,LP_cachemutex,LP_swaplistmutex,LP_forwardmutex,LP_pubkeymutex,LP_networkmutex;
portable_mutex_t LP_peermutex,LP_UTXOmutex,LP_utxomutex,LP_commandmutex,LP_cachemutex,LP_swaplistmutex,LP_forwardmutex,LP_pubkeymutex,LP_networkmutex,LP_psockmutex;
int32_t LP_canbind;
#include "LP_network.c"
struct LP_utxoinfo *LP_utxoinfos[2],*LP_utxoinfos2[2];
struct LP_peerinfo *LP_peerinfos,*LP_mypeer;
struct LP_forwardinfo *LP_forwardinfos;
char *activecoins[] = { "BTC", "KMD" };
char GLOBAL_DBDIR[] = { "DB" };
@ -33,6 +34,7 @@ char USERPASS[65],USERPASS_WIFSTR[64],USERHOME[512] = { "/root" };
char *default_LPnodes[] = { "5.9.253.195", "5.9.253.196", "5.9.253.197", "5.9.253.198", "5.9.253.199", "5.9.253.200", "5.9.253.201", "5.9.253.202", "5.9.253.203", "5.9.253.204" }; //
uint32_t LP_deadman_switch;
int32_t LP_mypubsock = -1;
int32_t USERPASS_COUNTER,IAMLP = 0;
double LP_profitratio = 1.;
@ -91,7 +93,7 @@ char *LP_command_process(char *myipaddr,int32_t pubsock,cJSON *argjson,uint8_t *
return(retstr);
}
void LP_process_message(char *typestr,char *myipaddr,int32_t pubsock,double profitmargin,void *ptr,int32_t recvlen,int32_t recvsock)
char *LP_process_message(char *typestr,char *myipaddr,int32_t pubsock,double profitmargin,void *ptr,int32_t recvlen,int32_t recvsock)
{
int32_t len,datalen=0; char *retstr=0,*jsonstr=0; cJSON *argjson,*reqjson;
if ( (datalen= is_hexstr((char *)ptr,0)) > 0 )
@ -111,7 +113,8 @@ void LP_process_message(char *typestr,char *myipaddr,int32_t pubsock,double prof
{
//printf("got forwardhex\n");
if ( (retstr= LP_forwardhex(pubsock,jbits256(argjson,"pubkey"),jstr(argjson,"hex"))) != 0 )
free(retstr), retstr = 0;
{
}
}
else if ( jstr(argjson,"method") != 0 && strcmp(jstr(argjson,"method"),"publish") == 0 )
{
@ -135,7 +138,6 @@ void LP_process_message(char *typestr,char *myipaddr,int32_t pubsock,double prof
printf("%d got REQ.(%s) -> (%s)\n",recvsock,jprint(argjson,0),retstr);
LP_send(recvsock,retstr,0);
}
free(retstr);
}
else if ( strcmp("PULL",typestr) == 0 )
{
@ -149,26 +151,29 @@ void LP_process_message(char *typestr,char *myipaddr,int32_t pubsock,double prof
free(jsonstr);
if ( ptr != 0 )
nn_freemsg(ptr), ptr = 0;
return(retstr);
}
int32_t LP_pullsock_check(char *myipaddr,int32_t pubsock,int32_t pullsock,double profitmargin)
int32_t LP_pullsock_check(char **retstrp,char *myipaddr,int32_t pubsock,int32_t pullsock,double profitmargin)
{
void *ptr; int32_t recvlen,nonz = 0;
*retstrp = 0;
while ( pullsock >= 0 && (recvlen= nn_recv(pullsock,&ptr,NN_MSG,0)) >= 0 )
{
nonz++;
LP_process_message("PULL",myipaddr,pubsock,profitmargin,ptr,recvlen,pullsock);
*retstrp = LP_process_message("PULL",myipaddr,pubsock,profitmargin,ptr,recvlen,pullsock);
}
return(nonz);
}
int32_t LP_subsock_check(char *myipaddr,int32_t pubsock,int32_t sock,double profitmargin)
{
int32_t recvlen,nonz = 0; void *ptr;
int32_t recvlen,nonz = 0; void *ptr; char *retstr;
while ( sock >= 0 && (recvlen= nn_recv(sock,&ptr,NN_MSG,0)) >= 0 )
{
nonz++;
LP_process_message("SUB",myipaddr,pubsock,profitmargin,ptr,recvlen,sock);
if ( (retstr= LP_process_message("SUB",myipaddr,pubsock,profitmargin,ptr,recvlen,sock)) != 0 )
free(retstr);
/*if ( (argjson= cJSON_Parse((char *)ptr)) != 0 )
{
printf("%s SUB.[%d] %s\n",myipaddr,recvsize,jprint(argjson,0));
@ -298,40 +303,18 @@ int32_t LP_mainloop_iter(char *myipaddr,struct LP_peerinfo *mypeer,int32_t pubso
free(retstr);
}
}
nonz += LP_pullsock_check(myipaddr,pubsock,pullsock,profitmargin);
nonz += LP_pullsock_check(&retstr,myipaddr,pubsock,pullsock,profitmargin);
if ( retstr != 0 )
free(retstr);
if ( IAMLP != 0 && (counter % 600) == 42 )
LP_hellos();
counter++;
return(nonz);
}
void LP_mainloop(char *myipaddr,struct LP_peerinfo *mypeer,uint16_t mypubport,int32_t pubsock,char *pushaddr,uint16_t pushport,int32_t pullsock,uint16_t myport,char *passphrase,double profitmargin,cJSON *coins,char *seednode)
void LP_initcoins(int32_t pubsock,cJSON *coins,char *passphrase)
{
uint8_t r; int32_t i,n,j; cJSON *item;
if ( IAMLP != 0 )
{
if ( seednode == 0 || seednode[0] == 0 )
{
for (i=0; i<sizeof(default_LPnodes)/sizeof(*default_LPnodes); i++)
{
if ( (rand() % 100) > 25 )
continue;
LP_peersquery(mypeer,pubsock,default_LPnodes[i],myport,mypeer->ipaddr,myport,profitmargin);
}
} else LP_peersquery(mypeer,pubsock,seednode,myport,mypeer->ipaddr,myport,profitmargin);
}
else
{
if ( seednode == 0 || seednode[0] == 0 )
{
OS_randombytes((void *)&r,sizeof(r));
for (j=0; j<sizeof(default_LPnodes)/sizeof(*default_LPnodes); j++)
{
i = (r + j) % (sizeof(default_LPnodes)/sizeof(*default_LPnodes));
LP_peersquery(mypeer,pubsock,default_LPnodes[i],myport,"127.0.0.1",myport,profitmargin);
}
} else LP_peersquery(mypeer,pubsock,seednode,myport,"127.0.0.1",myport,profitmargin);
}
int32_t i,n; cJSON *item;
for (i=0; i<sizeof(activecoins)/sizeof(*activecoins); i++)
{
fprintf(stderr,"%s ",activecoins[i]);
@ -350,50 +333,64 @@ void LP_mainloop(char *myipaddr,struct LP_peerinfo *mypeer,uint16_t mypubport,in
}
fprintf(stderr,"privkey updates\n");
LP_privkey_updates(pubsock,passphrase,1);
if ( OS_thread_create(malloc(sizeof(pthread_t)),NULL,(void *)stats_rpcloop,(void *)&myport) != 0 )
{
printf("error launching stats rpcloop for port.%u\n",myport);
exit(-1);
}
while ( 1 )
{
if ( 0 && (rand() % 100) == 0 )
printf("mainloop\n");
if ( LP_mainloop_iter(myipaddr,mypeer,pubsock,pushaddr,pushport,pullsock,myport,passphrase,profitmargin) == 0 )
usleep(100000);
}
}
void nn_tests(int32_t pullsock,char *pushaddr)
void LP_initpeers(int32_t pubsock,struct LP_peerinfo *mypeer,char *myipaddr,uint16_t myport,char *seednode,double profitmargin)
{
int32_t sock,n,timeout,m=0; char msg[512]; //void *ptr;
if ( (sock= nn_socket(AF_SP,LP_COMMAND_SENDSOCK)) >= 0 )
int32_t i,j; uint32_t r;
if ( IAMLP != 0 )
{
if ( nn_connect(sock,pushaddr) < 0 )
printf("connect error %s\n",nn_strerror(nn_errno()));
else
LP_mypeer = mypeer = LP_addpeer(mypeer,pubsock,myipaddr,myport,0,0,profitmargin,0,0);
if ( myipaddr == 0 || mypeer == 0 )
{
sleep(2);
timeout = 1;
nn_setsockopt(sock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout));
nn_setsockopt(sock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout));
//n = nn_send(sock,"nn_tests",(int32_t)strlen("nn_tests")+1,0*NN_DONTWAIT);
sprintf(msg,"{\"method\":\"nn_tests\",\"ipaddr\":\"%s\"}",pushaddr);
n = LP_send(sock,msg,0);//
sleep(1);
//m = nn_recv(pullsock,&ptr,NN_MSG,0);
LP_pullsock_check("127.0.0.1",-1,pullsock,0.);
printf(">>>>>>>>>>>>>>>>>>>>>> sent %d bytes -> %d, recv.%d\n",n,pullsock,m);
printf("couldnt get myipaddr or null mypeer.%p\n",mypeer);
exit(-1);
}
if ( seednode == 0 || seednode[0] == 0 )
{
for (i=0; i<sizeof(default_LPnodes)/sizeof(*default_LPnodes); i++)
{
if ( (rand() % 100) > 25 )
continue;
LP_peersquery(mypeer,pubsock,default_LPnodes[i],myport,mypeer->ipaddr,myport,profitmargin);
}
} else LP_peersquery(mypeer,pubsock,seednode,myport,mypeer->ipaddr,myport,profitmargin);
}
else
{
if ( myipaddr == 0 )
{
printf("couldnt get myipaddr\n");
exit(-1);
}
if ( seednode == 0 || seednode[0] == 0 )
{
OS_randombytes((void *)&r,sizeof(r));
for (j=0; j<sizeof(default_LPnodes)/sizeof(*default_LPnodes); j++)
{
i = (r + j) % (sizeof(default_LPnodes)/sizeof(*default_LPnodes));
LP_peersquery(mypeer,pubsock,default_LPnodes[i],myport,"127.0.0.1",myport,profitmargin);
}
} else LP_peersquery(mypeer,pubsock,seednode,myport,"127.0.0.1",myport,profitmargin);
}
}
void LPinit(uint16_t myport,uint16_t mypullport,uint16_t mypubport,double profitmargin,char *passphrase,int32_t amclient,char *userhome,cJSON *argjson)
{
char *myipaddr=0; long filesize,n; int32_t maxsize,timeout,pullsock=-1,pubsock=-1; struct LP_peerinfo *mypeer=0; char pushaddr[128],subaddr[128],bindaddr[128];
char *myipaddr=0; long filesize,n; int32_t timeout,pullsock=-1,pubsock=-1; struct LP_peerinfo *mypeer=0; char pushaddr[128],subaddr[128],bindaddr[128];
IAMLP = !amclient;
#ifndef __linux__
if ( IAMLP != 0 )
{
printf("must run a unix node for LP node\n");
exit(-1);
}
#endif
LP_profitratio += profitmargin;
OS_randombytes((void *)&n,sizeof(n));
if ( jobj(argjson,"canbind") == 0 )
LP_canbind = IAMLP;
else LP_canbind = jint(argjson,"canbind");
srand((int32_t)n);
if ( userhome != 0 && userhome[0] != 0 )
{
@ -410,6 +407,7 @@ void LPinit(uint16_t myport,uint16_t mypullport,uint16_t mypubport,double profit
portable_mutex_init(&LP_cachemutex);
portable_mutex_init(&LP_networkmutex);
portable_mutex_init(&LP_forwardmutex);
portable_mutex_init(&LP_psockmutex);
portable_mutex_init(&LP_pubkeymutex);
if ( profitmargin == 0. || profitmargin == 0.01 )
{
@ -425,65 +423,59 @@ void LPinit(uint16_t myport,uint16_t mypullport,uint16_t mypubport,double profit
myipaddr[--n] = 0;
} else printf("error getting myipaddr\n");
} else printf("error issuing curl\n");
nanomsg_transportname(0,pushaddr,myipaddr,mypullport);
if ( (pullsock= nn_socket(AF_SP,LP_COMMAND_RECVSOCK)) >= 0 )
{
timeout = 1;
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout));
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout));
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_MAXTTL,&timeout,sizeof(timeout));
nanomsg_transportname(1,bindaddr,myipaddr,mypullport);
if ( nn_bind(pullsock,bindaddr) >= 0 )
{
maxsize = 2 * 1024 * 1024;
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize));
//LP_pullsock_check(myipaddr,-1,pullsock,0.);
}
else
{
printf("bind to %s error for %s: %s\n",bindaddr,pushaddr,nn_strerror(nn_errno()));
exit(-1);
}
}
//nn_tests(pullsock,pushaddr);
printf("my command address is (%s) pullsock.%d\n",pushaddr,pullsock);
if ( IAMLP != 0 )
{
if ( myipaddr != 0 )
pubsock = -1;
nanomsg_transportname(0,subaddr,myipaddr,mypubport);
nanomsg_transportname(1,bindaddr,myipaddr,mypubport);
if ( (pubsock= nn_socket(AF_SP,NN_PUB)) >= 0 )
{
pubsock = -1;
nanomsg_transportname(0,subaddr,myipaddr,mypubport);
nanomsg_transportname(1,bindaddr,myipaddr,mypubport);
if ( (pubsock= nn_socket(AF_SP,NN_PUB)) >= 0 )
if ( nn_bind(pubsock,bindaddr) >= 0 )
{
if ( nn_bind(pubsock,bindaddr) >= 0 )
{
timeout = 10;
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout));
}
else
{
printf("error binding to (%s).%d (%s).%d\n",pushaddr,pullsock,subaddr,pubsock);
if ( pubsock >= 0 )
nn_close(pubsock), pubsock = -1;
}
} else printf("error getting sockets %d %d\n",pullsock,pubsock);
printf(">>>>>>>>> myipaddr.%s (%s %s) pubsock.%d pullsock.%d\n",myipaddr,pushaddr,subaddr,pubsock,pullsock);
LP_mypubsock = pubsock;
LP_mypeer = mypeer = LP_addpeer(mypeer,pubsock,myipaddr,myport,0,0,profitmargin,0,0);
}
if ( myipaddr == 0 || mypeer == 0 )
{
printf("couldnt get myipaddr or null mypeer.%p\n",mypeer);
exit(-1);
}
timeout = 10;
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout));
}
else
{
printf("error binding to (%s).%d\n",subaddr,pubsock);
if ( pubsock >= 0 )
nn_close(pubsock), pubsock = -1;
}
} else printf("error getting pubsock %d\n",pubsock);
printf(">>>>>>>>> myipaddr.%s (%s) pullsock.%d\n",myipaddr,subaddr,pubsock);
LP_mypubsock = pubsock;
}
else if ( myipaddr == 0 )
LP_initpeers(pubsock,mypeer,myipaddr,myport,jstr(argjson,"seednode"),profitmargin);
pullsock = LP_initpublicaddr(&mypullport,pushaddr,myipaddr,mypullport,0);
LP_deadman_switch = (uint32_t)time(NULL);
printf("my command address is (%s) pullsock.%d pullport.%u\n",pushaddr,pullsock,mypullport);
LP_initcoins(pubsock,jobj(argjson,"coins"),passphrase);
if ( IAMLP != 0 && OS_thread_create(malloc(sizeof(pthread_t)),NULL,(void *)LP_psockloop,(void *)&myipaddr) != 0 )
{
printf("couldnt get myipaddr\n");
printf("error launching LP_psockloop for (%s)\n",myipaddr);
exit(-1);
}
LP_mainloop(myipaddr,mypeer,mypubport,pubsock,pushaddr,mypullport,pullsock,myport,passphrase,profitmargin,jobj(argjson,"coins"),jstr(argjson,"seednode"));
if ( OS_thread_create(malloc(sizeof(pthread_t)),NULL,(void *)stats_rpcloop,(void *)&myport) != 0 )
{
printf("error launching stats rpcloop for port.%u\n",myport);
exit(-1);
}
while ( 1 )
{
if ( 0 && (rand() % 100) == 0 )
printf("mainloop\n");
if ( LP_mainloop_iter(myipaddr,mypeer,pubsock,pushaddr,mypullport,pullsock,myport,passphrase,profitmargin) == 0 )
usleep(100000);
if ( LP_canbind == 0 && LP_deadman_switch < time(NULL)-777 )
{
printf("DEAD man's switch activated, register forwarding again\n");
if ( pullsock >= 0 )
nn_close(pullsock);
pullsock = LP_initpublicaddr(&mypullport,pushaddr,myipaddr,mypullport,0);
LP_deadman_switch = (uint32_t)time(NULL);
LP_forwarding_register(LP_mypubkey,pushaddr,mypullport,100000);
}
}
}

315
iguana/exchanges/LP_network.c

@ -17,7 +17,17 @@
// LP_network.c
// marketmaker
//
// jl777: might need to queue outbound packets and send via separate thread
#define PSOCK_IDLETIMEOUT (2 * INSTANTDEX_LOCKTIME + 600)
struct psock
{
uint32_t lasttime,lastping;
int32_t recvsock,sendsock,ispaired;
uint16_t recvport,sendport;
char sendaddr[128];
} *PSOCKS;
uint16_t Numpsocks,Psockport = 10000;
char *nanomsg_transportname(int32_t bindflag,char *str,char *ipaddr,uint16_t port)
{
@ -43,7 +53,7 @@ int32_t LP_send(int32_t sock,char *msg,int32_t freeflag)
//portable_mutex_lock(&LP_networkmutex);
if ( nn_poll(&pfd,1,1) > 0 )
{
if ( (sentbytes= nn_send(sock,msg,len,0*NN_DONTWAIT)) != len )
if ( (sentbytes= nn_send(sock,msg,len,0)) != len )
printf("LP_send sent %d instead of %d\n",sentbytes,len);
//else printf("SENT.(%s)\n",msg);
if ( freeflag != 0 )
@ -86,3 +96,304 @@ uint32_t LP_swapsend(int32_t pairsock,struct basilisk_swap *swap,uint32_t msgbit
free(buf);
return(nextbits);
}
void LP_psockloop(void *_ptr)
{
int32_t i,n,nonz,iter,size=0,sentbytes,sendsock = -1; uint32_t now; struct psock *ptr=0; void *buf=0; struct nn_pollfd pfd,*pfds; char keepalive[512];//,*myipaddr = _ptr;
while ( 1 )
{
now = (uint32_t)time(NULL);
if ( buf != 0 && ptr != 0 && sendsock >= 0 )
{
if ( size > 0 )
{
pfd.fd = ptr->sendsock;
pfd.events = NN_POLLOUT;
if ( nn_poll(&pfd,1,1) > 0 )
{
if ( (sentbytes= nn_send(sendsock,buf,size,0)) > 0 )
{
ptr->lasttime = now;
printf("PSOCKS (%d %d %d) -> %d/%d bytes\n",ptr->recvsock,ptr->sendsock,sendsock,size,sentbytes);
} else printf("send error to %s\n",ptr->sendaddr);
if ( buf != 0 )
{
if ( buf != keepalive )
nn_freemsg(buf);
buf = 0;
size = 0;
ptr = 0;
sendsock = -1;
}
}
}
}
else if ( Numpsocks > 0 )
{
pfds = calloc(Numpsocks,sizeof(*pfds) * 2);
portable_mutex_lock(&LP_psockmutex);
for (iter=0; iter<2; iter++)
{
for (i=n=0; i<Numpsocks; i++)
{
ptr = &PSOCKS[i];
if ( iter == 0 )
{
pfds[n].fd = ptr->recvsock;
pfds[n].events = POLLIN;
}
else
{
if ( pfds[n].fd != ptr->recvsock )
{
printf("unexpected fd.%d mismatched recvsock.%d\n",pfds[n].fd,ptr->recvsock);
break;
}
else if ( (pfds[n].revents & POLLIN) != 0 )
{
if ( (size= nn_recv(ptr->recvsock,&buf,NN_MSG,0)) > 0 )
{
ptr->lasttime = now;
sendsock = ptr->sendsock;
break;
}
}
}
n++;
if ( ptr->ispaired != 0 )
{
if ( iter == 0 )
{
pfds[n].fd = ptr->sendsock;
pfds[n].events = POLLIN;
}
else
{
if ( pfds[n].fd != ptr->sendsock )
{
printf("unexpected fd.%d mismatched sendsock.%d\n",pfds[n].fd,ptr->sendsock);
break;
}
else if ( (pfds[n].revents & POLLIN) != 0 )
{
ptr->lasttime = now;
sendsock = ptr->recvsock;
}
}
n++;
}
}
if ( iter == 0 && nn_poll(pfds,n,10) <= 0 )
break;
}
portable_mutex_unlock(&LP_psockmutex);
if ( sendsock < 0 )
{
for (i=nonz=0; i<Numpsocks; i++)
{
if ( i < Numpsocks )
{
ptr = &PSOCKS[i];
if ( now > ptr->lasttime+PSOCK_IDLETIMEOUT )
{
printf("PSOCKS[%d] of %d (%u %u) lag.%d IDLETIMEOUT\n",i,Numpsocks,ptr->recvport,ptr->sendport,now - ptr->lasttime);
if ( ptr->recvsock >= 0 )
nn_close(ptr->recvsock);
if ( ptr->sendsock >= 0 )
nn_close(ptr->sendsock);
portable_mutex_lock(&LP_psockmutex);
if ( Numpsocks > 1 )
{
PSOCKS[i] = PSOCKS[--Numpsocks];
memset(&PSOCKS[Numpsocks],0,sizeof(*ptr));
} else Numpsocks = 0;
portable_mutex_unlock(&LP_psockmutex);
break;
}
else if ( now > ptr->lastping+600 )
{
ptr->lastping = now;
sendsock = ptr->sendsock;
printf("keep %s alive\n",ptr->sendaddr);
sprintf(keepalive,"{\"method\":\"keepalive\",\"endpoint\":\"%s\"}",ptr->sendaddr);
size = (int32_t)strlen(keepalive) + 1;
buf = keepalive;
break;
}
}
}
if ( nonz == 0 && i == Numpsocks )
usleep(100000);
}
} else usleep(100000);
}
}
void LP_psockadd(int32_t ispaired,int32_t recvsock,uint16_t recvport,int32_t sendsock,uint16_t sendport,char *subaddr)
{
struct psock *ptr;
portable_mutex_lock(&LP_psockmutex);
PSOCKS = realloc(PSOCKS,sizeof(*PSOCKS) * (Numpsocks + 1));
ptr = &PSOCKS[Numpsocks++];
ptr->ispaired = ispaired;
ptr->recvsock = recvsock;
ptr->recvport = recvport;
ptr->sendsock = sendsock;
ptr->sendport = sendport;
safecopy(ptr->sendaddr,subaddr,sizeof(ptr->sendaddr));
ptr->lasttime = (uint32_t)time(NULL);
portable_mutex_unlock(&LP_psockmutex);
}
char *LP_psock(char *myipaddr,int32_t ispaired)
{
char pushaddr[128],subaddr[128]; uint16_t i,pushport,subport; int32_t timeout,maxsize,pullsock=-1,pubsock=-1; cJSON *retjson=0;
retjson = cJSON_CreateObject();
pushport = Psockport++;
subport = Psockport++;
for (i=0; i<100; i++)
{
if ( pushport < 10000 )
pushport = 10001;
if ( subport <= pushport )
subport = pushport + 1;
pullsock = pubsock = -1;
nanomsg_transportname(1,pushaddr,myipaddr,pushport), pushport += 2;
nanomsg_transportname(1,subaddr,myipaddr,subport), subport += 2;
if ( (pullsock= nn_socket(AF_SP,ispaired!=0?NN_PAIR:NN_PULL)) >= 0 && (pubsock= nn_socket(AF_SP,ispaired!=0?NN_PAIR:NN_PUB)) >= 0 )
{
if ( nn_bind(pullsock,pushaddr) >= 0 && nn_bind(pubsock,subaddr) >= 0 )
{
timeout = 1;
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout));
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout));
maxsize = 1024 * 1024;
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize));
if ( ispaired != 0 )
{
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout));
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize));
nn_setsockopt(pubsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout));
}
LP_psockadd(ispaired,pullsock,pushport,pubsock,subport,subaddr);
jaddstr(retjson,"result","success");
jaddstr(retjson,"connectaddr",subaddr);
jaddstr(retjson,"LPipaddr",myipaddr);
jaddnum(retjson,"connectport",subport);
jaddnum(retjson,"ispaired",ispaired);
jaddstr(retjson,"publicaddr",pushaddr);
jaddnum(retjson,"publicport",pushport);
break;
}
if ( pullsock >= 0 )
nn_close(pullsock);
if ( pubsock >= 0 )
nn_close(pubsock);
}
}
if ( Psockport > 60000 )
Psockport = 10000;
if ( i == 100 )
jaddstr(retjson,"error","cant find psock ports");
return(jprint(retjson,1));
}
/*
LP_pushaddr_get makes transparent the fact that most nodes cannot bind()!
The idea is to create an LP node NN_PAIR sock that the LP node binds to and client node connects to. Additionally, the LP node creates an NN_PULL that other nodes can NN_PUSH to and returns this address in pushaddr/retval for the client node to register with. The desired result is that other than the initial LP node, all the other nodes do a normal NN_PUSH, requiring no change to the NN_PUSH/NN_PULL logic. Of course, the initial LP node needs to autoforward all packets from the public NN_PULL to the NN_PUB
similar to LP_pushaddr_get, create an NN_PAIR for DEX atomic data, can be assumed to have a max lifetime of 2*INSTANTDEX_LOCKTIME
both are combined in LP_psock_get
*/
int32_t nn_tests(int32_t pullsock,char *pushaddr,int32_t nnother)
{
int32_t sock,n,timeout,retval = -1; char msg[512],*retstr;
printf("nn_tests.(%s)\n",pushaddr);
if ( (sock= nn_socket(AF_SP,nnother)) >= 0 )
{
if ( nn_connect(sock,pushaddr) < 0 )
printf("connect error %s\n",nn_strerror(nn_errno()));
else
{
timeout = 1;
//nn_setsockopt(sock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout));
nn_setsockopt(sock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout));
sprintf(msg,"{\"method\":\"nn_tests\",\"ipaddr\":\"%s\"}",pushaddr);
n = LP_send(sock,msg,0);
LP_pullsock_check(&retstr,"127.0.0.1",-1,pullsock,0.);
printf(">>>>>>>>>>>>>>>>>>>>>> sent %d bytes -> %d (%s)\n",n,pullsock,retstr!=0?retstr:"");
if ( retstr != 0 )
{
free(retstr);
retval = 0;
}
}
nn_close(sock);
}
return(retval);
}
int32_t LP_initpublicaddr(uint16_t *mypullportp,char *publicaddr,char *myipaddr,uint16_t mypullport,int32_t ispaired)
{
int32_t nntype,pullsock,timeout,maxsize; char bindaddr[128],connectaddr[128];
*mypullportp = mypullport;
if ( ispaired == 0 )
{
if ( LP_canbind != 0 )
nntype = LP_COMMAND_RECVSOCK;
else nntype = NN_SUB;
}
else nntype = NN_PAIR;
if ( LP_canbind != 0 )
{
nanomsg_transportname(0,publicaddr,myipaddr,mypullport);
nanomsg_transportname(1,bindaddr,myipaddr,mypullport);
}
else
{
*mypullportp = 0;
while ( *mypullportp == 0 )
{
*mypullportp = LP_psock_get(connectaddr,publicaddr,ispaired);
sleep(10);
printf("try to get publicaddr again\n");
}
}
if ( (pullsock= nn_socket(AF_SP,nntype)) >= 0 )
{
timeout = 1;
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout));
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout));
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_MAXTTL,&timeout,sizeof(timeout));
maxsize = 2 * 1024 * 1024;
nn_setsockopt(pullsock,NN_SOL_SOCKET,NN_RCVBUF,&maxsize,sizeof(maxsize));
if ( LP_canbind == 0 )
{
if ( nn_connect(pullsock,connectaddr) < 0 )
{
printf("bind to %s error for %s: %s\n",connectaddr,publicaddr,nn_strerror(nn_errno()));
exit(-1);
}
if ( nntype == NN_SUB )
nn_setsockopt(pullsock,NN_SUB,NN_SUB_SUBSCRIBE,"",0);
}
else
{
if ( nn_bind(pullsock,bindaddr) < 0 )
{
printf("bind to %s error for %s: %s\n",bindaddr,publicaddr,nn_strerror(nn_errno()));
exit(-1);
}
}
}
if ( ispaired == 0 && nn_tests(pullsock,publicaddr,LP_COMMAND_SENDSOCK) < 0 )
{
printf("command socket didnt work\n");
exit(-1);
}
return(pullsock);
}

42
iguana/exchanges/LP_ordermatch.c

@ -338,24 +338,32 @@ double LP_query(char *myipaddr,int32_t mypubsock,double profitmargin,char *metho
return(price);
}
int32_t LP_nanobind(int32_t pair,char *pairstr,char *myipaddr)
int32_t LP_nanobind(char *pairstr,char *myipaddr)
{
int32_t i,timeout,r; char bindaddr[128];
for (i=0; i<10; i++)
int32_t i,timeout,r,pairsock = -1; uint16_t mypullport; char bindaddr[128];
if ( LP_canbind != 0 )
{
r = (10000 + (rand() % 50000)) & 0xffff;
nanomsg_transportname(0,pairstr,myipaddr,r);
nanomsg_transportname(1,bindaddr,myipaddr,r);
if ( nn_bind(pair,bindaddr) >= 0 )
if ( (pairsock= nn_socket(AF_SP,NN_PAIR)) < 0 )
printf("error creating utxo->pair\n");
else
{
timeout = 100;
nn_setsockopt(pair,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout));
nn_setsockopt(pair,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout));
printf("nanobind %s to %d\n",pairstr,pair);
return(0);
} else printf("error binding to %s for %s\n",bindaddr,pairstr);
}
return(-1);
for (i=0; i<10; i++)
{
r = (10000 + (rand() % 50000)) & 0xffff;
nanomsg_transportname(0,pairstr,myipaddr,r);
nanomsg_transportname(1,bindaddr,myipaddr,r);
if ( nn_bind(pairsock,bindaddr) >= 0 )
{
timeout = 100;
nn_setsockopt(pairsock,NN_SOL_SOCKET,NN_SNDTIMEO,&timeout,sizeof(timeout));
nn_setsockopt(pairsock,NN_SOL_SOCKET,NN_RCVTIMEO,&timeout,sizeof(timeout));
printf("nanobind %s to %d\n",pairstr,pairsock);
return(pairsock);
} else printf("error binding to %s for %s\n",bindaddr,pairstr);
}
}
} else pairsock = LP_initpublicaddr(&mypullport,pairstr,myipaddr,0,0);
return(pairsock);
}
int32_t LP_connectstartbob(int32_t pubsock,struct LP_utxoinfo *utxo,cJSON *argjson,char *myipaddr,char *base,char *rel,double profitmargin,double price,struct LP_quoteinfo *qp)
@ -366,9 +374,7 @@ int32_t LP_connectstartbob(int32_t pubsock,struct LP_utxoinfo *utxo,cJSON *argjs
privkey = LP_privkey(utxo->coinaddr);
if ( bits256_nonz(privkey) != 0 && qp->quotetime >= qp->timestamp-3 && qp->quotetime <= utxo->T.swappending && bits256_cmp(LP_mypubkey,qp->srchash) == 0 )
{
if ( (pair= nn_socket(AF_SP,NN_PAIR)) < 0 )
printf("error creating utxo->pair\n");
else if ( LP_nanobind(pair,pairstr,myipaddr) >= 0 )
if ( (pair= LP_nanobind(pairstr,myipaddr)) >= 0 )
{
LP_requestinit(&qp->R,qp->srchash,qp->desthash,base,qp->satoshis,rel,qp->destsatoshis,qp->timestamp,qp->quotetime,DEXselector);
swap = LP_swapinit(1,0,privkey,&qp->R,qp);

37
iguana/exchanges/LP_rpc.c

@ -109,12 +109,47 @@ char *issue_LP_notifyutxo(char *destip,uint16_t destport,struct LP_utxoinfo *utx
char *issue_LP_register(char *destip,uint16_t destport,bits256 pubkey,char *ipaddr,uint16_t pushport)
{
char url[512],str[65],*retstr;
sprintf(url,"http://%s:%u/api/stats/register?client=%s&pushaddr=%s&pushport=%u",destip,destport,bits256_str(str,pubkey),ipaddr,pushport);
sprintf(url,"http://%s:%u/api/stats/register?client=%s&pushaddr=%s&pushport=%u",destip,destport,bits256_str(str,pubkey),ipaddr,pushport);
retstr = issue_curlt(url,LP_HTTP_TIMEOUT);
//printf("getutxo.(%s) -> (%s)\n",url,retstr!=0?retstr:"");
return(retstr);
}
char *issue_LP_psock(char *destip,uint16_t destport,int32_t ispaired)
{
char url[512],*retstr;
sprintf(url,"http://%s:%u/api/stats/psock?ispaired=%d",destip,destport,ispaired);
retstr = issue_curlt(url,LP_HTTP_TIMEOUT);
//printf("getutxo.(%s) -> (%s)\n",url,retstr!=0?retstr:"");
return(retstr);
}
uint16_t LP_psock_get(char *connectaddr,char *publicaddr,int32_t ispaired)
{
uint16_t publicport = 0; char *retstr,*addr; cJSON *retjson; struct LP_peerinfo *peer,*tmp;
connectaddr[0] = publicaddr[0] = 0;
HASH_ITER(hh,LP_peerinfos,peer,tmp)
{
if ( (retstr= issue_LP_psock(peer->ipaddr,peer->port,ispaired)) != 0 )
{
if ( (retjson= cJSON_Parse(retstr)) != 0 )
{
if ( (addr= jstr(retjson,"publicaddr")) != 0 )
safecopy(publicaddr,addr,128);
if ( (addr= jstr(retjson,"connectaddr")) != 0 )
safecopy(connectaddr,addr,128);
if ( publicaddr[0] != 0 && connectaddr[0] != 0 )
publicport = juint(retjson,"publicport");
free_json(retjson);
}
free(retstr);
}
if ( publicport != 0 )
break;
}
return(publicport);
}
char *issue_LP_lookup(char *destip,uint16_t destport,bits256 pubkey)
{
char url[512],str[65];

11
iguana/exchanges/LP_transaction.c

@ -735,7 +735,7 @@ char *basilisk_swap_bobtxspend(bits256 *signedtxidp,uint64_t txfee,char *name,ch
memset(signedtxidp,0,sizeof(*signedtxidp));
if ( finalseqid == 0 )
locktime = expiration;
printf("bobtxspend.%s redeem.[%d]\n",symbol,redeemlen);
//printf("bobtxspend.%s redeem.[%d]\n",symbol,redeemlen);
if ( redeemlen < 0 )
return(0);
#ifndef BASILISK_DISABLESENDTX
@ -758,11 +758,8 @@ char *basilisk_swap_bobtxspend(bits256 *signedtxidp,uint64_t txfee,char *name,ch
else printf("unexpected small value %.8f vs txfee %.8f\n",dstr(value),dstr(txfee));
if ( destamountp != 0 )
*destamountp = satoshis;
printf("destamountp %p\n",destamountp);
timestamp = (uint32_t)time(NULL);
printf("t %u V.%p %ld\n",timestamp,V,sizeof(V));
memset(V,0,sizeof(V));
printf("cleared V\n");
privkeys = cJSON_CreateArray();
if ( privkey2p != 0 )
{
@ -772,7 +769,6 @@ char *basilisk_swap_bobtxspend(bits256 *signedtxidp,uint64_t txfee,char *name,ch
jaddistr(privkeys,wifstr);
V[0].N = V[0].M = 2;
} else V[0].N = V[0].M = 1;
printf("what?\n");
V[0].signers[0].privkey = privkey;
bitcoin_pubkey33(ctx,V[0].signers[0].pubkey,privkey);
bitcoin_priv2wif(wifstr,privkey,wiftype);
@ -795,7 +791,6 @@ char *basilisk_swap_bobtxspend(bits256 *signedtxidp,uint64_t txfee,char *name,ch
jaddnum(item,"vout",vout);
bitcoin_address(tmpaddr,pubtype,pubkey33,33);
bitcoin_addr2rmd160(&addrtype,rmd160,tmpaddr);
printf("before redeem\n");
if ( redeemlen != 0 )
{
init_hexbytes_noT(hexstr,redeemscript,redeemlen);
@ -835,13 +830,12 @@ char *basilisk_swap_bobtxspend(bits256 *signedtxidp,uint64_t txfee,char *name,ch
changelen = bitcoin_standardspend(changescript,0,changermd160);
txobj = bitcoin_txoutput(txobj,changescript,changelen,change);
}
printf("call json2hex\n");
if ( (rawtxbytes= bitcoin_json2hex(isPoS,&txid,txobj,V)) != 0 )
{
char str[65];
completed = 0;
memset(signedtxidp,0,sizeof(*signedtxidp));
printf("locktime.%u sequenceid.%x rawtx.(%s) vins.(%s)\n",locktime,sequenceid,rawtxbytes,jprint(vins,0));
//printf("locktime.%u sequenceid.%x rawtx.(%s) vins.(%s)\n",locktime,sequenceid,rawtxbytes,jprint(vins,0));
if ( (completed= iguana_signrawtransaction(ctx,symbol,pubtype,p2shtype,isPoS,1000000,&msgtx,&signedtx,signedtxidp,V,1,rawtxbytes,vins,privkeys)) < 0 )
//if ( (signedtx= LP_signrawtx(symbol,signedtxidp,&completed,vins,rawtxbytes,privkeys,V)) == 0 )
printf("couldnt sign transaction.%s %s\n",name,bits256_str(str,*signedtxidp));
@ -1501,7 +1495,6 @@ void basilisk_alicepayment(struct basilisk_swap *swap,struct iguana_info *coin,s
int32_t basilisk_alicetxs(int32_t pairsock,struct basilisk_swap *swap,uint8_t *data,int32_t maxlen)
{
char coinaddr[64]; int32_t i,retval = -1;
printf("alicetx\n");
if ( swap->alicepayment.I.datalen == 0 )
basilisk_alicepayment(swap,swap->alicepayment.coin,&swap->alicepayment,swap->I.pubAm,swap->I.pubBn);
if ( swap->alicepayment.I.datalen == 0 || swap->alicepayment.I.spendlen == 0 )

Loading…
Cancel
Save