Browse Source

subscribed services

283
ThomasV 13 years ago
parent
commit
58f180078e
  1. 97
      server/server.py

97
server/server.py

@ -75,12 +75,19 @@ bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.g
stopping = False stopping = False
block_number = -1 block_number = -1
old_block_number = -1
sessions = {} sessions = {}
sessions_sub_numblocks = [] # sessions that have subscribed to the service
dblock = thread.allocate_lock() dblock = thread.allocate_lock()
peer_list = {} peer_list = {}
wallets = {} # for ultra-light clients such as bccapi wallets = {} # for ultra-light clients such as bccapi
from Queue import Queue
input_queue = Queue()
output_queue = Queue()
class MyStore(Datastore_class): class MyStore(Datastore_class):
def import_tx(self, tx, is_coinbase): def import_tx(self, tx, is_coinbase):
@ -408,18 +415,7 @@ def poll_session(session_id):
k = 0 k = 0
for addr in addresses: for addr in addresses:
if store.tx_cache.get( addr ) is not None: k += 1 if store.tx_cache.get( addr ) is not None: k += 1
status = get_address_status( addr )
# get addtess status, i.e. the last block for that address.
tx_points = store.get_history(addr)
if not tx_points:
status = None
else:
lastpoint = tx_points[-1]
status = lastpoint['blk_hash']
# this is a temporary hack; move it up once old clients have disappeared
if status == 'mempool' and session['version'] != "old":
status = status + ':%d'% len(tx_points)
last_status = addresses.get( addr ) last_status = addresses.get( addr )
if last_status != status: if last_status != status:
addresses[addr] = status addresses[addr] = status
@ -433,6 +429,36 @@ def poll_session(session_id):
return out return out
def get_address_status(addr):
# get addtess status, i.e. the last block for that address.
tx_points = store.get_history(addr)
if not tx_points:
status = None
else:
lastpoint = tx_points[-1]
status = lastpoint['blk_hash']
# this is a temporary hack; move it up once old clients have disappeared
if status == 'mempool': # and session['version'] != "old":
status = status + ':%d'% len(tx_points)
return status
def send_numblocks(session_id):
out = json.dumps( {'method':'numblocks.subscribe', 'result':block_number} )
output_queue.put((session_id, out))
def subscribe_to_numblocks(session_id):
sessions_sub_numblocks.append(session_id)
send_numblocks(session_id)
def subscribe_to_address(session_id, address):
#print "%s subscribing to %s"%(session_id,address)
status = get_address_status(address)
sessions[session_id]['type'] = 'subscribe'
sessions[session_id]['addresses'][address] = status
sessions[session_id]['last_time'] = time.time()
out = json.dumps( { 'method':'address.subscribe', 'address':address, 'status':status } )
output_queue.put((session_id, out))
def new_session(version, addresses): def new_session(version, addresses):
session_id = random_string(10) session_id = random_string(10)
@ -443,11 +469,6 @@ def new_session(version, addresses):
sessions[session_id]['last_time'] = time.time() sessions[session_id]['last_time'] = time.time()
return out return out
def subscribe_to_address(session_id, address):
sessions[session_id]['addresses'][address] = ''
sessions[session_id]['last_time'] = time.time()
def update_session(session_id,addresses): def update_session(session_id,addresses):
sessions[session_id]['addresses'] = {} sessions[session_id]['addresses'] = {}
for a in addresses: for a in addresses:
@ -611,6 +632,9 @@ def do_command(cmd, data, ipaddr):
#################################################################### ####################################################################
def tcp_server_thread(): def tcp_server_thread():
thread.start_new_thread(process_input_queue, ())
thread.start_new_thread(process_output_queue, ())
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((config.get('server','host'), 50001)) s.bind((config.get('server','host'), 50001))
@ -624,19 +648,19 @@ def tcp_server_thread():
traceback.print_exc(file=sys.stdout) traceback.print_exc(file=sys.stdout)
# one thread per client. put requests in a queue.
def tcp_client_thread(ipaddr,conn): def tcp_client_thread(ipaddr,conn):
""" use a persistent connection. put commands in a queue.""" """ use a persistent connection. put commands in a queue."""
print "persistent client thread", ipaddr print "persistent client thread", ipaddr
global sessions global sessions
session_id = random_string(10) session_id = random_string(10)
sessions[session_id] = { 'addresses':{}, 'version':'unknown' } sessions[session_id] = { 'conn':conn, 'addresses':{}, 'version':'unknown' }
ipaddr = ipaddr[0] ipaddr = ipaddr[0]
msg = '' msg = ''
while True: while not stopping:
d = conn.recv(1024) d = conn.recv(1024)
msg += d msg += d
if not d: break if not d: break
@ -655,17 +679,40 @@ def tcp_client_thread(ipaddr,conn):
print "syntax error", repr(c), ipaddr print "syntax error", repr(c), ipaddr
continue continue
# add to queue
input_queue.put((session_id, cmd, data))
# read commands from the input queue. perform requests, etc. this should be called from the main thread.
def process_input_queue():
while not stopping:
session_id, cmd, data = input_queue.get()
out = None out = None
if cmd == 'blockchain.address.subscribe': if cmd == 'address.subscribe':
subscribe_to_address(session_id,data) subscribe_to_address(session_id,data)
elif cmd == 'numblocks.subscribe':
subscribe_to_numblocks(session_id)
elif cmd == 'client.version': elif cmd == 'client.version':
sessions[session_id]['version'] = data sessions[session_id]['version'] = data
elif cmd == 'server.banner': elif cmd == 'server.banner':
out = json.dumps( { 'method':'server.banner', 'result':config.get('server','banner').replace('\\n','\n') } ) out = json.dumps( { 'method':'server.banner', 'result':config.get('server','banner').replace('\\n','\n') } )
elif cmd == 'address.get_history':
address = data
out = json.dumps( { 'method':'address.get_history', 'address':address, 'result':store.get_history( address ) } )
elif cmd == 'transaction.broadcast':
out = json.dumps( { 'method':'transaction.broadcast', 'result':send_tx(data) } )
else: else:
print "unknown command", cmd print "unknown command", cmd
if out: if out:
output_queue.put((session_id, out))
# this is a separate thread
def process_output_queue():
while not stopping:
session_id, out = output_queue.get()
session = sessions.get(session_id)
if session:
conn = session.get('conn')
conn.send(out+'\n') conn.send(out+'\n')
@ -835,6 +882,12 @@ if __name__ == '__main__':
store.catch_up() store.catch_up()
memorypool_update(store) memorypool_update(store)
block_number = store.get_block_number(1) block_number = store.get_block_number(1)
if block_number != old_block_number:
old_block_number = block_number
for session_id in sessions_sub_numblocks:
send_numblocks(session_id)
except IOError: except IOError:
print "IOError: cannot reach bitcoind" print "IOError: cannot reach bitcoind"
block_number = 0 block_number = 0

Loading…
Cancel
Save