diff --git a/server/server.py b/server/server.py index 9b2fc491e..5802a8d34 100755 --- a/server/server.py +++ b/server/server.py @@ -25,7 +25,7 @@ Todo: """ -import time, socket, operator, thread, ast, sys,re +import time, json, socket, operator, thread, ast, sys,re import psycopg2, binascii from Abe.abe import hash_to_address, decode_check_address @@ -353,7 +353,7 @@ def send_tx(tx): respdata = urllib.urlopen(bitcoind_url, postdata).read() r = loads(respdata) if r['error'] != None: - out = "error: transaction rejected by memorypool" + out = "error: transaction rejected by memorypool\n"+tx else: out = r['result'] return out @@ -443,6 +443,11 @@ def new_session(version, addresses): sessions[session_id]['last_time'] = time.time() return out +def subscribe_to_address(session_id, address): + sessions[session_id]['addresses'][address] = '' + sessions[session_id]['last_time'] = time.time() + + def update_session(session_id,addresses): sessions[session_id]['addresses'] = {} for a in addresses: @@ -450,7 +455,7 @@ def update_session(session_id,addresses): sessions[session_id]['last_time'] = time.time() return 'ok' -def listen_thread(store): +def native_server_thread(): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((config.get('server','host'), config.getint('server','port'))) @@ -458,14 +463,13 @@ def listen_thread(store): while not stopping: conn, addr = s.accept() try: - thread.start_new_thread(client_thread, (addr, conn,)) + thread.start_new_thread(native_client_thread, (addr, conn,)) except: # can't start new thread if there is no memory.. traceback.print_exc(file=sys.stdout) - -def client_thread(ipaddr,conn): +def native_client_thread(ipaddr,conn): #print "client thread", ipaddr try: ipaddr = ipaddr[0] @@ -497,6 +501,8 @@ def client_thread(ipaddr,conn): conn.close() + +# used by the native handler def do_command(cmd, data, ipaddr): timestr = time.strftime("[%d/%m/%Y-%H:%M:%S]") @@ -602,6 +608,70 @@ def do_command(cmd, data, ipaddr): +#################################################################### + +def tcp_server_thread(): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind((config.get('server','host'), 50001)) + s.listen(1) + while not stopping: + conn, addr = s.accept() + try: + thread.start_new_thread(tcp_client_thread, (addr, conn,)) + except: + # can't start new thread if there is no memory.. + traceback.print_exc(file=sys.stdout) + + + +def tcp_client_thread(ipaddr,conn): + """ use a persistent connection. put commands in a queue.""" + print "persistent client thread", ipaddr + global sessions + + session_id = random_string(10) + sessions[session_id] = { 'addresses':{}, 'version':'unknown' } + + ipaddr = ipaddr[0] + msg = '' + + while True: + d = conn.recv(1024) + msg += d + if not d: break + while True: + s = msg.find('\n') + if s ==-1: + break + else: + c = msg[0:s] + msg = msg[s+1:] + c = json.loads(c) + try: + cmd = c['method'] + data = c['params'] + except: + print "syntax error", repr(c), ipaddr + continue + + out = None + if cmd == 'blockchain.address.subscribe': + subscribe_to_address(session_id,data) + elif cmd == 'client.version': + sessions[session_id]['version'] = data + elif cmd == 'server.banner': + out = json.dumps( { 'method':'server.banner', 'result':config.get('server','banner').replace('\\n','\n') } ) + else: + print "unknown command", cmd + + if out: + conn.send(out+'\n') + + + +#################################################################### + def memorypool_update(store): ds = BCDataStream.BCDataStream() @@ -683,7 +753,7 @@ def irc_thread(): -def jsonrpc_thread(store): +def http_server_thread(store): # see http://code.google.com/p/jsonrpclib/ from SocketServer import ThreadingMixIn from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer @@ -750,8 +820,11 @@ if __name__ == '__main__': store.tx_cache = {} store.mempool_keys = {} - thread.start_new_thread(listen_thread, (store,)) - thread.start_new_thread(jsonrpc_thread, (store,)) + # supported protocols + thread.start_new_thread(native_server_thread, ()) + thread.start_new_thread(tcp_server_thread, ()) + thread.start_new_thread(http_server_thread, (store,)) + thread.start_new_thread(clean_session_thread, ()) if (config.get('server','irc') == 'yes' ): thread.start_new_thread(irc_thread, ())