diff --git a/iguana/exchanges/bitcoin.c b/iguana/exchanges/bitcoin.c index 283d29684..d1aa93c26 100755 --- a/iguana/exchanges/bitcoin.c +++ b/iguana/exchanges/bitcoin.c @@ -336,10 +336,18 @@ static char *BASERELS[][2] = { {"btcd","btc"}, {"nxt","btc"}, {"asset","btc"} }; double UPDATE(struct exchange_info *exchange,char *base,char *rel,struct exchange_quote *bidasks,int32_t maxdepth,double commission,cJSON *argjson,int32_t invert) { - cJSON *retjson,*bids,*asks; double hbla; + struct iguana_bundlereq *req; cJSON *retjson,*bids,*asks; double hbla; struct iguana_info *coin; struct supernet_info *myinfo; + myinfo = SuperNET_MYINFO(0); + coin = iguana_coinfind("BTCD"); + while ( (req= queue_dequeue(&exchange->recvQ,0)) != 0 ) + { + if ( instantdex_recvquotes(coin,req,req->hashes,req->n) != 0 ) + myfree(req->hashes,(req->n+1) * sizeof(*req->hashes)), req->hashes = 0; + } + iguana_inv2poll(myinfo,coin); bids = cJSON_CreateArray(); asks = cJSON_CreateArray(); - instantdex_offerfind(SuperNET_MYINFO(0),exchange,bids,asks,0,base,rel,1,0); + instantdex_offerfind(myinfo,exchange,bids,asks,0,base,rel,1,0); //printf("bids.(%s) asks.(%s)\n",jprint(bids,0),jprint(asks,0)); retjson = cJSON_CreateObject(); cJSON_AddItemToObject(retjson,"bids",bids); @@ -453,7 +461,7 @@ uint64_t TRADE(int32_t dotrade,char **retstrp,struct exchange_info *exchange,cha jaddstr(json,"BTC",myinfo->myaddr.BTC); jaddnum(json,"minperc",jdouble(argjson,"minperc")); printf("trade dir.%d (%s/%s) %.6f vol %.8f\n",dir,base,"BTC",price,volume); - if ( (str= instantdex_createaccept(myinfo,&ap,exchange,base,"BTC",price,volume,-dir,dir > 0 ? "BTC" : base,INSTANTDEX_OFFERDURATION,myinfo->myaddr.nxt64bits,0,jdouble(argjson,"minperc"))) != 0 && ap != 0 ) + if ( (str= instantdex_createaccept(myinfo,&ap,exchange,base,"BTC",price,volume,-dir,dir > 0 ? "BTC" : base,INSTANTDEX_OFFERDURATION,myinfo->myaddr.nxt64bits,1,jdouble(argjson,"minperc"))) != 0 && ap != 0 ) retstr = instantdex_checkoffer(myinfo,&txid,exchange,ap,json), free(str); else printf("null return queueaccept\n"); if ( retstrp != 0 ) diff --git a/iguana/exchanges777.h b/iguana/exchanges777.h index 353cfa8a4..bc5241e1a 100755 --- a/iguana/exchanges777.h +++ b/iguana/exchanges777.h @@ -64,10 +64,10 @@ struct exchange_info { struct exchange_funcs issue; char name[16],apikey[MAX_JSON_FIELD],apisecret[MAX_JSON_FIELD],tradepassword[MAX_JSON_FIELD],userid[MAX_JSON_FIELD]; - uint32_t exchangeid,pollgap,lastpoll; + uint32_t exchangeid,pollgap,lastpoll; portable_mutex_t mutex; uint64_t lastnonce,exchangebits; double commission; void *privatedata; - CURL *cHandle; queue_t requestQ,pricesQ,statemachineQ,tradebotsQ,acceptableQ,historyQ; + CURL *cHandle; queue_t requestQ,pricesQ,statemachineQ,tradebotsQ,acceptableQ,historyQ,recvQ; }; struct instantdex_msghdr diff --git a/iguana/iguana777.c b/iguana/iguana777.c index 1313c3a2d..7bd00634b 100755 --- a/iguana/iguana777.c +++ b/iguana/iguana777.c @@ -746,7 +746,6 @@ void iguana_coinloop(void *arg) flag += iguana_processrecv(myinfo,coin); } coin->idletime = (uint32_t)time(NULL); - iguana_inv2poll(myinfo,coin); } } if ( flag == 0 && coin->isRT == 0 ) diff --git a/iguana/iguana777.h b/iguana/iguana777.h index 1c6967ebe..09bafaab4 100755 --- a/iguana/iguana777.h +++ b/iguana/iguana777.h @@ -979,6 +979,8 @@ int32_t iguana_inv2packet(uint8_t *serialized,int32_t maxsize,int32_t type,bits2 int32_t instantdex_inv2data(struct supernet_info *myinfo,struct iguana_info *coin,struct iguana_peer *addr,struct exchange_info *exchange); struct iguana_bundlereq *instantdex_recvquotes(struct iguana_info *coin,struct iguana_bundlereq *req,bits256 *encodedhash,int32_t n); struct exchange_info *exchange_create(char *exchangestr,cJSON *argjson); +int32_t iguana_inv2poll(struct supernet_info *myinfo,struct iguana_info *coin); +struct iguana_bundlereq *iguana_bundlereq(struct iguana_info *coin,struct iguana_peer *addr,int32_t type,int32_t datalen); extern int32_t HDRnet,netBLOCKS; diff --git a/iguana/iguana_exchanges.c b/iguana/iguana_exchanges.c index b8d76bdb1..67c344026 100755 --- a/iguana/iguana_exchanges.c +++ b/iguana/iguana_exchanges.c @@ -849,6 +849,15 @@ struct exchange_info *exchanges777_find(char *exchangestr) return(0); } +void iguana_gotquotesM(struct iguana_info *coin,struct iguana_peer *addr,bits256 *quotes,int32_t n) +{ + struct iguana_bundlereq *req; struct exchange_info *exchange = exchanges777_find("bitcoin"); + //printf("got %d quotes from %s\n",n,addr->ipaddr); + req = iguana_bundlereq(coin,addr,'Q',0); + req->hashes = quotes, req->n = n; + queue_enqueue("recvQ",&exchange->recvQ,&req->DL,0); +} + struct exchange_info *exchange_create(char *exchangestr,cJSON *argjson) { static int didinit; @@ -885,11 +894,13 @@ struct exchange_info *exchange_create(char *exchangestr,cJSON *argjson) return(0); } exchange = calloc(1,sizeof(*exchange)); + portable_mutex_init(&exchange->mutex); exchange->issue = *Exchange_funcs[i]; iguana_initQ(&exchange->pricesQ,"prices"); iguana_initQ(&exchange->requestQ,"request"); iguana_initQ(&exchange->acceptableQ,"acceptable"); iguana_initQ(&exchange->tradebotsQ,"tradebots"); + iguana_initQ(&exchange->recvQ,"recvQ"); iguana_initQ(&exchange->historyQ,"history"); iguana_initQ(&exchange->statemachineQ,"statemachineQ"); exchange->exchangeid = exchangeid; diff --git a/iguana/iguana_instantdex.c b/iguana/iguana_instantdex.c index 0e4a645d2..e8b408ac7 100755 --- a/iguana/iguana_instantdex.c +++ b/iguana/iguana_instantdex.c @@ -652,6 +652,7 @@ struct bitcoin_swapinfo *instantdex_historyfind(struct supernet_info *myinfo,str { struct bitcoin_swapinfo PAD,*swap,*retswap = 0; uint32_t now; now = (uint32_t)time(NULL); + portable_mutex_lock(&exchange->mutex); memset(&PAD,0,sizeof(PAD)); queue_enqueue("historyQ",&exchange->historyQ,&PAD.DL,0); while ( (swap= queue_dequeue(&exchange->historyQ,0)) != 0 && swap != &PAD ) @@ -660,6 +661,7 @@ struct bitcoin_swapinfo *instantdex_historyfind(struct supernet_info *myinfo,str retswap = swap; queue_enqueue("historyQ",&exchange->historyQ,&swap->DL,0); } + portable_mutex_unlock(&exchange->mutex); return(retswap); } @@ -667,6 +669,7 @@ struct bitcoin_swapinfo *instantdex_statemachinefind(struct supernet_info *myinf { struct bitcoin_swapinfo PAD,*swap,*retswap = 0; uint32_t now; now = (uint32_t)time(NULL); + portable_mutex_lock(&exchange->mutex); memset(&PAD,0,sizeof(PAD)); queue_enqueue("statemachineQ",&exchange->statemachineQ,&PAD.DL,0); while ( (swap= queue_dequeue(&exchange->statemachineQ,0)) != 0 && swap != &PAD ) @@ -691,6 +694,7 @@ struct bitcoin_swapinfo *instantdex_statemachinefind(struct supernet_info *myinf queue_enqueue("statemachineQ",&exchange->statemachineQ,&swap->DL,0); } //printf("found statemachine.%p\n",retswap); + portable_mutex_unlock(&exchange->mutex); return(retswap); } @@ -699,6 +703,8 @@ struct instantdex_accept *instantdex_offerfind(struct supernet_info *ignore,stru struct instantdex_accept PAD,*ap,*retap = 0; uint32_t now; cJSON *item,*offerobj; char *type; if ( exchange == 0 ) return(0); + printf("offerfind.%d\n",queue_size(&exchange->acceptableQ)); + portable_mutex_lock(&exchange->mutex); now = (uint32_t)time(NULL); memset(&PAD,0,sizeof(PAD)); queue_enqueue("acceptableQ",&exchange->acceptableQ,&PAD.DL,0); @@ -737,6 +743,8 @@ struct instantdex_accept *instantdex_offerfind(struct supernet_info *ignore,stru } } else free(ap); } + portable_mutex_unlock(&exchange->mutex); + printf("done offerfind\n"); return(retap); } @@ -745,7 +753,8 @@ int32_t instantdex_peerhas_clear(struct iguana_info *coin,struct iguana_peer *ad struct instantdex_accept PAD,*ap; struct exchange_info *exchange; int32_t ind,num = 0; if ( addr != 0 && (exchange= exchanges777_find("bitcoin")) != 0 ) { - //printf("clear all bits for addrind.%d\n",addr->addrind); + printf("clear all bits for addrind.%d\n",addr->addrind); + portable_mutex_lock(&exchange->mutex); ind = addr->addrind; memset(&PAD,0,sizeof(PAD)); queue_enqueue("acceptableQ",&exchange->acceptableQ,&PAD.DL,0); @@ -754,6 +763,8 @@ int32_t instantdex_peerhas_clear(struct iguana_info *coin,struct iguana_peer *ad CLEARBIT(ap->peerhas,ind); queue_enqueue("acceptableQ",&exchange->acceptableQ,&ap->DL,0); } + portable_mutex_unlock(&exchange->mutex); + printf("done clear all bits for addrind.%d\n",addr->addrind); } return(num); } @@ -816,10 +827,12 @@ bits256 instantdex_encodehash(char *base,char *rel,int64_t price,uint64_t orderi int32_t instantdex_inv2data(struct supernet_info *myinfo,struct iguana_info *coin,struct iguana_peer *addr,struct exchange_info *exchange) { - struct instantdex_accept PAD,*ap; uint32_t now,n=0,len; bits256 hashes[100]; uint8_t serialized[100*36 + 1024]; + struct instantdex_accept PAD,*ap; uint32_t now,n=0,len; bits256 encodedhash,hashes[100]; uint8_t serialized[100*36 + 1024]; //printf("instantdex_inv2data exchange.%p (%s)\n",exchange,addr->ipaddr); if ( exchange == 0 ) return(0); + printf("instantdex_inv2data\n"); + portable_mutex_lock(&exchange->mutex); now = (uint32_t)time(NULL); memset(&PAD,0,sizeof(PAD)); queue_enqueue("acceptableQ",&exchange->acceptableQ,&PAD.DL,0); @@ -827,14 +840,17 @@ int32_t instantdex_inv2data(struct supernet_info *myinfo,struct iguana_info *coi { if ( now < ap->offer.expiration && ap->dead == 0 ) { + encodedhash = instantdex_encodehash(ap->offer.base,ap->offer.rel,ap->offer.price64*instantdex_bidaskdir(&ap->offer),ap->orderid,ap->offer.account); if ( n < sizeof(hashes)/sizeof(*hashes) )//&& GETBIT(ap->peerhas,addr->addrind) == 0 ) { - hashes[n++] = instantdex_encodehash(ap->offer.base,ap->offer.rel,ap->offer.price64*instantdex_bidaskdir(&ap->offer),ap->orderid,ap->offer.account); + hashes[n++] = encodedhash; printf("%llu ",(long long)ap->orderid); } queue_enqueue("acceptableQ",&exchange->acceptableQ,&ap->DL,0); } else free(ap); } + portable_mutex_unlock(&exchange->mutex); + printf("done instantdex_inv2data\n"); if ( n > 0 ) { len = iguana_inv2packet(serialized,sizeof(serialized),MSG_QUOTE,hashes,n); @@ -902,7 +918,7 @@ int32_t instantdex_quotep2p(struct supernet_info *myinfo,struct iguana_info *coi encodedhash = instantdex_encodehash(A.offer.base,A.offer.rel,A.offer.price64 * instantdex_bidaskdir(&A.offer),A.orderid,A.offer.account); if ( (ap= instantdex_quotefind(myinfo,coin,addr,encodedhash)) == 0 ) { - //printf("add quote here!\n"); + printf("add quote here!\n"); if ( exchange != 0 ) { ap = calloc(1,sizeof(*ap)); @@ -948,6 +964,8 @@ struct instantdex_accept *instantdex_acceptable(struct supernet_info *myinfo,str printf("instantdex_acceptable null exchange\n"); return(0); } + printf("instantdex_acceptable\n"); + portable_mutex_lock(&exchange->mutex); aveprice = 0;//instantdex_avehbla(myinfo,retvals,A->offer.base,A->offer.rel,dstr(A->offer.basevolume64)); now = (uint32_t)time(NULL); memset(&PAD,0,sizeof(PAD)); @@ -992,6 +1010,8 @@ struct instantdex_accept *instantdex_acceptable(struct supernet_info *myinfo,str queue_enqueue("acceptableQ",&exchange->acceptableQ,&ap->DL,0); else free(ap); } + portable_mutex_unlock(&exchange->mutex); + printf("done instantdex_acceptable\n"); return(retap); } @@ -1209,6 +1229,7 @@ char *instantdex_gotoffer(struct supernet_info *myinfo,struct exchange_info *exc } else //if ( (retstr= instantdex_addfeetx(myinfo,newjson,ap,swap,"BOB_gotoffer","ALICE_gotoffer")) == 0 ) { + printf("add to both queues\n"); queue_enqueue("acceptableQ",&exchange->acceptableQ,&swap->DL,0); queue_enqueue("statemachineQ",&exchange->statemachineQ,&swap->DL,0); if ( (retstr= instantdex_choosei(swap,newjson,argjson,serdata,serdatalen)) != 0 ) @@ -1382,15 +1403,18 @@ char *instantdex_createaccept(struct supernet_info *myinfo,struct instantdex_acc printf("myside.(%s) != base.%s or rel.%s\n",mysidestr,base,rel); } instantdex_acceptset(ap,base,rel,duration,myside,acceptdir,price,basevolume,account,0,minperc); - instantdex_propagate(myinfo,exchange,ap); - if ( queueflag != 0 ) + if ( instantdex_offerfind(myinfo,exchange,0,0,ap->orderid,ap->offer.base,ap->offer.rel,1,0) == 0 ) { - printf("acceptableQ <- %llu\n",(long long)ap->orderid); - queue_enqueue("acceptableQ",&exchange->acceptableQ,&ap->DL,0); - } - retstr = jprint(instantdex_acceptjson(ap),1); - //printf("acceptableQ %llu (%s)\n",(long long)ap->orderid,retstr); - return(retstr); + instantdex_propagate(myinfo,exchange,ap); + if ( queueflag != 0 ) + { + printf("acceptableQ <- %llu\n",(long long)ap->orderid); + queue_enqueue("acceptableQ",&exchange->acceptableQ,&ap->DL,0); + } + retstr = jprint(instantdex_acceptjson(ap),1); + //printf("acceptableQ %llu (%s)\n",(long long)ap->orderid,retstr); + return(retstr); + } else return(0); } else return(clonestr("{\"error\":\"invalid exchange\"}")); } diff --git a/iguana/iguana_recv.c b/iguana/iguana_recv.c index b96b90d9c..ed69958d2 100755 --- a/iguana/iguana_recv.c +++ b/iguana/iguana_recv.c @@ -496,15 +496,6 @@ void iguana_gottxidsM(struct iguana_info *coin,struct iguana_peer *addr,bits256 queue_enqueue("recvQ",&coin->recvQ,&req->DL,0); } -void iguana_gotquotesM(struct iguana_info *coin,struct iguana_peer *addr,bits256 *quotes,int32_t n) -{ - struct iguana_bundlereq *req; - //printf("got %d quotes from %s\n",n,addr->ipaddr); - req = iguana_bundlereq(coin,addr,'Q',0); - req->hashes = quotes, req->n = n; - queue_enqueue("recvQ",&coin->recvQ,&req->DL,0); -} - void iguana_gotheadersM(struct iguana_info *coin,struct iguana_peer *addr,struct iguana_block *blocks,int32_t n) { struct iguana_bundlereq *req; int32_t i,num; @@ -1306,11 +1297,11 @@ int32_t iguana_processrecvQ(struct iguana_info *coin,int32_t *newhwmp) // single if ( (req= iguana_recvtxids(coin,req,req->hashes,req->n)) != 0 ) myfree(req->hashes,(req->n+1) * sizeof(*req->hashes)), req->hashes = 0; } - else if ( req->type == 'Q' ) // quotes from inv + /*else if ( req->type == 'Q' ) // quotes from inv { if ( (req= instantdex_recvquotes(coin,req,req->hashes,req->n)) != 0 ) myfree(req->hashes,(req->n+1) * sizeof(*req->hashes)), req->hashes = 0; - } + }*/ else printf("iguana_updatebundles unknown type.%c\n",req->type);//, getchar(); //fprintf(stderr,"finished coin->recvQ\n"); if ( req != 0 ) diff --git a/iguana/main.c b/iguana/main.c index 854a474b1..6bf297b2b 100755 --- a/iguana/main.c +++ b/iguana/main.c @@ -1159,7 +1159,7 @@ void iguana_appletests(struct supernet_info *myinfo) if ( 1 && (str= SuperNET_JSON(myinfo,cJSON_Parse("{\"RELAY\":1,\"VALIDATE\":1,\"prefetchlag\":-1,\"agent\":\"iguana\",\"method\":\"addcoin\",\"startpend\":4,\"endpend\":4,\"services\":129,\"maxpeers\":64,\"newcoin\":\"BTCD\",\"active\":1,\"numhelpers\":4,\"poll\":100}"),0,myinfo->rpcport)) != 0 ) { free(str); - if ( 1 && (str= SuperNET_JSON(myinfo,cJSON_Parse("{\"RELAY\":1,\"VALIDATE\":1,\"prefetchlag\":-1,\"agent\":\"iguana\",\"method\":\"addcoin\",\"startpend\":4,\"endpend\":4,\"services\":129,\"maxpeers\":64,\"newcoin\":\"BTC\",\"active\":1,\"numhelpers\":4,\"poll\":100}"),0,myinfo->rpcport)) != 0 ) + if ( 0 && (str= SuperNET_JSON(myinfo,cJSON_Parse("{\"RELAY\":1,\"VALIDATE\":1,\"prefetchlag\":-1,\"agent\":\"iguana\",\"method\":\"addcoin\",\"startpend\":4,\"endpend\":4,\"services\":129,\"maxpeers\":64,\"newcoin\":\"BTC\",\"active\":1,\"numhelpers\":4,\"poll\":100}"),0,myinfo->rpcport)) != 0 ) { free(str); if ( 0 && (str= SuperNET_JSON(myinfo,cJSON_Parse("{\"agent\":\"SuperNET\",\"method\":\"login\",\"handle\":\"alice\",\"password\":\"alice\",\"passphrase\":\"alice\"}"),0,myinfo->rpcport)) != 0 )