From 8d88b0702da6102f2f2aeae801ab679382506367 Mon Sep 17 00:00:00 2001 From: ThomasV Date: Mon, 19 Mar 2012 21:19:36 +0300 Subject: [PATCH] stratum http server --- server/StratumJSONRPCServer.py | 296 +++++++++++++++++++++++++++++++++ server/server.py | 140 ++++++++++++---- 2 files changed, 401 insertions(+), 35 deletions(-) create mode 100644 server/StratumJSONRPCServer.py diff --git a/server/StratumJSONRPCServer.py b/server/StratumJSONRPCServer.py new file mode 100644 index 000000000..15cdbf65a --- /dev/null +++ b/server/StratumJSONRPCServer.py @@ -0,0 +1,296 @@ +import jsonrpclib +from jsonrpclib import Fault +from jsonrpclib.jsonrpc import USE_UNIX_SOCKETS +import SimpleXMLRPCServer +import SocketServer +import socket +import logging +import os +import types +import traceback +import sys +try: + import fcntl +except ImportError: + # For Windows + fcntl = None + +import json + +def get_version(request): + # must be a dict + if 'jsonrpc' in request.keys(): + return 2.0 + if 'id' in request.keys(): + return 1.0 + return None + +def validate_request(request): + if type(request) is not types.DictType: + fault = Fault( + -32600, 'Request must be {}, not %s.' % type(request) + ) + return fault + rpcid = request.get('id', None) + version = get_version(request) + if not version: + fault = Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid) + return fault + request.setdefault('params', []) + method = request.get('method', None) + params = request.get('params') + param_types = (types.ListType, types.DictType, types.TupleType) + if not method or type(method) not in types.StringTypes or \ + type(params) not in param_types: + fault = Fault( + -32600, 'Invalid request parameters or method.', rpcid=rpcid + ) + return fault + return True + +class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher): + + def __init__(self, encoding=None): + SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, + allow_none=True, + encoding=encoding) + + def _marshaled_dispatch(self, data, dispatch_method = None): + response = None + try: + request = jsonrpclib.loads(data) + except Exception, e: + fault = Fault(-32700, 'Request %s invalid. (%s)' % (data, e)) + response = fault.response() + return response + + responses = [] + if type(request) is not types.ListType: + request = [ request ] + + for req_entry in request: + result = validate_request(req_entry) + if type(result) is Fault: + responses.append(result.response()) + continue + resp_entry = self._marshaled_single_dispatch(req_entry) + if resp_entry is not None: + responses.append(resp_entry) + + # poll + r = self._marshaled_single_dispatch({'method':'session.poll', 'params':[], 'id':'z' }) + r = jsonrpclib.loads(r) + r = r.get('result') + for item in r: + responses.append(json.dumps(item)) + + if len(responses) > 1: + response = '[%s]' % ','.join(responses) + elif len(responses) == 1: + response = responses[0] + else: + response = '' + + return response + + def _marshaled_single_dispatch(self, request): + # TODO - Use the multiprocessing and skip the response if + # it is a notification + # Put in support for custom dispatcher here + # (See SimpleXMLRPCServer._marshaled_dispatch) + method = request.get('method') + params = request.get('params') + if params is None: params=[] + params = [ self.session_id, request['id'] ] + params + print method, params + try: + response = self._dispatch(method, params) + except: + exc_type, exc_value, exc_tb = sys.exc_info() + fault = Fault(-32603, '%s:%s' % (exc_type, exc_value)) + return fault.response() + if 'id' not in request.keys() or request['id'] == None: + # It's a notification + return None + + try: + response = jsonrpclib.dumps(response, + methodresponse=True, + rpcid=request['id'] + ) + return response + except: + exc_type, exc_value, exc_tb = sys.exc_info() + fault = Fault(-32603, '%s:%s' % (exc_type, exc_value)) + return fault.response() + + def _dispatch(self, method, params): + func = None + try: + func = self.funcs[method] + except KeyError: + if self.instance is not None: + if hasattr(self.instance, '_dispatch'): + return self.instance._dispatch(method, params) + else: + try: + func = SimpleXMLRPCServer.resolve_dotted_attribute( + self.instance, + method, + True + ) + except AttributeError: + pass + if func is not None: + try: + if type(params) is types.ListType: + response = func(*params) + else: + response = func(**params) + return response + except TypeError: + return Fault(-32602, 'Invalid parameters.') + except: + err_lines = traceback.format_exc().splitlines() + trace_string = '%s | %s' % (err_lines[-3], err_lines[-1]) + fault = jsonrpclib.Fault(-32603, 'Server error: %s' % + trace_string) + return fault + else: + return Fault(-32601, 'Method %s not supported.' % method) + +class StratumJSONRPCRequestHandler( + SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): + + def do_GET(self): + if not self.is_rpc_path_valid(): + self.report_404() + return + try: + print "GET" + + self.server.session_id = None + c = self.headers.get('cookie') + if c: + if c[0:8]=='SESSION=': + print "found cookie", c[8:] + self.server.session_id = c[8:] + + if self.server.session_id is None: + r = self.server._marshaled_single_dispatch({'method':'session.create', 'params':[], 'id':'z' }) + r = jsonrpclib.loads(r) + self.server.session_id = r.get('result') + print "setting cookie", self.server.session_id + + data = json.dumps([]) + response = self.server._marshaled_dispatch(data) + self.send_response(200) + except Exception, e: + self.send_response(500) + err_lines = traceback.format_exc().splitlines() + trace_string = '%s | %s' % (err_lines[-3], err_lines[-1]) + fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string) + response = fault.response() + print "500", trace_string + if response == None: + response = '' + + if hasattr(self.server, 'session_id'): + if self.server.session_id: + self.send_header("Set-Cookie", "SESSION=%s"%self.server.session_id) + self.session_id = None + + self.send_header("Content-type", "application/json-rpc") + self.send_header("Content-length", str(len(response))) + self.end_headers() + self.wfile.write(response) + self.wfile.flush() + self.connection.shutdown(1) + + + def do_POST(self): + if not self.is_rpc_path_valid(): + self.report_404() + return + try: + max_chunk_size = 10*1024*1024 + size_remaining = int(self.headers["content-length"]) + L = [] + while size_remaining: + chunk_size = min(size_remaining, max_chunk_size) + L.append(self.rfile.read(chunk_size)) + size_remaining -= len(L[-1]) + data = ''.join(L) + + self.server.session_id = None + c = self.headers.get('cookie') + if c: + if c[0:8]=='SESSION=': + print "found cookie", c[8:] + self.server.session_id = c[8:] + + if self.server.session_id is None: + r = self.server._marshaled_single_dispatch({'method':'session.create', 'params':[], 'id':'z' }) + r = jsonrpclib.loads(r) + self.server.session_id = r.get('result') + #print "setting cookie", self.server.session_id + + response = self.server._marshaled_dispatch(data) + self.send_response(200) + except Exception, e: + self.send_response(500) + err_lines = traceback.format_exc().splitlines() + trace_string = '%s | %s' % (err_lines[-3], err_lines[-1]) + fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string) + response = fault.response() + print "500", trace_string + if response == None: + response = '' + + if hasattr(self.server, 'session_id'): + if self.server.session_id: + self.send_header("Set-Cookie", "SESSION=%s"%self.server.session_id) + self.session_id = None + + self.send_header("Content-type", "application/json-rpc") + self.send_header("Content-length", str(len(response))) + self.end_headers() + self.wfile.write(response) + self.wfile.flush() + self.connection.shutdown(1) + + +class StratumJSONRPCServer(SocketServer.TCPServer, StratumJSONRPCDispatcher): + + allow_reuse_address = True + + def __init__(self, addr, requestHandler=StratumJSONRPCRequestHandler, + logRequests=True, encoding=None, bind_and_activate=True, + address_family=socket.AF_INET): + self.logRequests = logRequests + StratumJSONRPCDispatcher.__init__(self, encoding) + # TCPServer.__init__ has an extra parameter on 2.6+, so + # check Python version and decide on how to call it + vi = sys.version_info + self.address_family = address_family + if USE_UNIX_SOCKETS and address_family == socket.AF_UNIX: + # Unix sockets can't be bound if they already exist in the + # filesystem. The convention of e.g. X11 is to unlink + # before binding again. + if os.path.exists(addr): + try: + os.unlink(addr) + except OSError: + logging.warning("Could not unlink socket %s", addr) + # if python 2.5 and lower + if vi[0] < 3 and vi[1] < 6: + SocketServer.TCPServer.__init__(self, addr, requestHandler) + else: + SocketServer.TCPServer.__init__(self, addr, requestHandler, + bind_and_activate) + if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'): + flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD) + flags |= fcntl.FD_CLOEXEC + fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags) + + diff --git a/server/server.py b/server/server.py index a3c3bec3f..cf1326207 100755 --- a/server/server.py +++ b/server/server.py @@ -78,6 +78,8 @@ old_block_number = -1 sessions = {} sessions_sub_numblocks = {} # sessions that have subscribed to the service +m_sessions = [{}] # served by http + dblock = thread.allocate_lock() peer_list = {} @@ -88,6 +90,8 @@ input_queue = Queue() output_queue = Queue() address_queue = Queue() + + class MyStore(Datastore_class): def import_block(self, b, chain_ids=frozenset()): @@ -384,41 +388,39 @@ def random_string(N): -def cmd_stop(data): +def cmd_stop(_,__,pw): global stopping - if password == data: + if password == pw: stopping = True return 'ok' else: return 'wrong password' -def cmd_load(pw): +def cmd_load(_,__,pw): if password == pw: return repr( len(sessions) ) else: return 'wrong password' -def clear_cache(pw): +def clear_cache(_,__,pw): if password == pw: store.tx_cache = {} return 'ok' else: return 'wrong password' -def get_cache(pw,addr): +def get_cache(_,__,pw,addr): if password == pw: return store.tx_cache.get(addr) else: return 'wrong password' -def poll_session(session_id): - session = sessions.get(session_id) - if session is None: - print time.asctime(), "session not found", session_id - out = repr( (-1, {})) - else: + + +def modified_addresses(session): + if 1: t1 = time.time() addresses = session['addresses'] session['last_time'] = time.time() @@ -427,16 +429,47 @@ def poll_session(session_id): for addr in addresses: if store.tx_cache.get( addr ) is not None: k += 1 status = get_address_status( addr ) - last_status = addresses.get( addr ) + msg_id, last_status = addresses.get( addr ) if last_status != status: - addresses[addr] = status + addresses[addr] = msg_id, status ret[addr] = status - if ret: - sessions[session_id]['addresses'] = addresses - out = repr( (block_number, ret ) ) + t2 = time.time() - t1 - if t2 > 10: - print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2 + #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2 + return ret, addresses + + +def poll_session(session_id): + # native + session = sessions.get(session_id) + if session is None: + print time.asctime(), "session not found", session_id + return -1, {} + else: + ret, addresses = modified_addresses(session) + if ret: sessions[session_id]['addresses'] = addresses + return repr( (block_number,ret)) + + +def poll_session_json(session_id, message_id): + session = m_sessions[0].get(session_id) + if session is None: + raise BaseException("session not found %s"%session_id) + else: + print "poll: session found", session_id + out = [] + ret, addresses = modified_addresses(session) + if ret: + m_sessions[0][session_id]['addresses'] = addresses + for addr in ret: + msg_id, status = addresses[addr] + out.append( { 'id':msg_id, 'result':status } ) + + msg_id, last_nb = session.get('numblocks') + if last_nb: + if last_nb != block_number: + m_sessions[0][session_id]['numblocks'] = msg_id, block_number + out.append( {'id':msg_id, 'result':block_number} ) return out @@ -444,6 +477,7 @@ def poll_session(session_id): def do_update_address(addr): # an address was involved in a transaction; we check if it was subscribed to in a session # the address can be subscribed in several sessions; the cache should ensure that we don't do redundant requests + for session_id in sessions.keys(): session = sessions[session_id] if session.get('type') != 'persistent': continue @@ -457,7 +491,6 @@ def do_update_address(addr): send_status(session_id,message_id,addr,status) sessions[session_id]['addresses'][addr] = (message_id,status) - def get_address_status(addr): # get address status, i.e. the last block for that address. tx_points = store.get_history(addr) @@ -481,19 +514,36 @@ def send_status(session_id, message_id, address, status): out = json.dumps( { 'id':message_id, 'result':status } ) output_queue.put((session_id, out)) +def address_get_history_json(_,message_id,address): + return store.get_history(address) + def subscribe_to_numblocks(session_id, message_id): sessions_sub_numblocks[session_id] = message_id send_numblocks(session_id) +def subscribe_to_numblocks_json(session_id, message_id): + global m_sessions + m_sessions[0][session_id]['numblocks'] = message_id,block_number + return block_number + def subscribe_to_address(session_id, message_id, address): status = get_address_status(address) sessions[session_id]['addresses'][address] = (message_id, status) sessions[session_id]['last_time'] = time.time() send_status(session_id, message_id, address, status) +def add_address_to_session_json(session_id, message_id, address): + global m_sessions + sessions = m_sessions[0] + status = get_address_status(address) + sessions[session_id]['addresses'][address] = (message_id, status) + sessions[session_id]['last_time'] = time.time() + m_sessions[0] = sessions + return status + def add_address_to_session(session_id, address): status = get_address_status(address) - sessions[session_id]['addresses'][addr] = status + sessions[session_id]['addresses'][addr] = ("", status) sessions[session_id]['last_time'] = time.time() return status @@ -501,13 +551,30 @@ def new_session(version, addresses): session_id = random_string(10) sessions[session_id] = { 'addresses':{}, 'version':version } for a in addresses: - sessions[session_id]['addresses'][a] = '' + sessions[session_id]['addresses'][a] = ('','') out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) ) sessions[session_id]['last_time'] = time.time() return out -def get_banner(): - print "get banner" + +def client_version_json(session_id, _, version): + global m_sessions + sessions = m_sessions[0] + sessions[session_id]['version'] = version + m_sessions[0] = sessions + +def create_session_json(_, __): + sessions = m_sessions[0] + session_id = random_string(10) + print "creating session", session_id + sessions[session_id] = { 'addresses':{}, 'numblocks':('','') } + sessions[session_id]['last_time'] = time.time() + m_sessions[0] = sessions + return session_id + + + +def get_banner(_,__): return config.get('server','banner').replace('\\n','\n') def update_session(session_id,addresses): @@ -892,26 +959,29 @@ def irc_thread(): s.close() +def get_peers_json(_,__): + return peer_list.values() def http_server_thread(store): # see http://code.google.com/p/jsonrpclib/ from SocketServer import ThreadingMixIn - from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer - class SimpleThreadedJSONRPCServer(ThreadingMixIn, SimpleJSONRPCServer): pass - server = SimpleThreadedJSONRPCServer(( config.get('server','host'), 8081)) - server.register_function(lambda : peer_list.values(), 'server.peers') + from StratumJSONRPCServer import StratumJSONRPCServer + class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass + server = StratumThreadedJSONRPCServer(( config.get('server','host'), 8081)) + server.register_function(get_peers_json, 'server.peers') server.register_function(cmd_stop, 'stop') server.register_function(cmd_load, 'load') - server.register_function(lambda : block_number, 'blocks') server.register_function(clear_cache, 'clear_cache') server.register_function(get_cache, 'get_cache') server.register_function(get_banner, 'server.banner') server.register_function(send_tx, 'transaction.broadcast') - server.register_function(store.get_history, 'address.get_history') - server.register_function(add_address_to_session, 'address.subscribe') - server.register_function(new_session, 'session.new') - server.register_function(update_session, 'session.update') - server.register_function(poll_session, 'session.poll') + server.register_function(address_get_history_json, 'address.get_history') + server.register_function(add_address_to_session_json, 'address.subscribe') + server.register_function(create_session_json, 'session.create') #internal message + server.register_function(poll_session_json, 'session.poll') + server.register_function(subscribe_to_numblocks_json, 'numblocks.subscribe') + server.register_function(client_version_json, 'client.version') + server.register_function(lambda a,b:None, 'ping') server.serve_forever() @@ -927,7 +997,7 @@ if __name__ == '__main__': if cmd == 'load': out = server.load(password) elif cmd == 'peers': - out = server.peers() + out = server.server.peers() elif cmd == 'stop': out = server.stop(password) elif cmd == 'clear_cache': @@ -939,7 +1009,7 @@ if __name__ == '__main__': elif cmd == 'tx': out = server.transaction.broadcast(sys.argv[2]) elif cmd == 'b': - out = server.blocks() + out = server.numblocks.subscribe() else: out = "Unknown command: '%s'" % cmd print out