You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1002 lines
32 KiB

13 years ago
#!/usr/bin/env python
13 years ago
# Copyright(C) 2012 thomasv@gitorious
13 years ago
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see
# <http://www.gnu.org/licenses/agpl.html>.
"""
Todo:
* server should check and return bitcoind status..
* improve txpoint sorting
* command to check cache
mempool transactions do not need to be added to the database; it slows it down
13 years ago
"""
from Abe.abe import hash_to_address, decode_check_address
from Abe.DataStore import DataStore as Datastore_class
from Abe import DataStore, readconf, BCDataStream, deserialize, util, base58
13 years ago
import psycopg2, binascii
13 years ago
import thread, traceback, sys, urllib, operator
from json import dumps, loads
class MyStore(Datastore_class):
def __init__(self, config, address_queue):
conf = DataStore.CONFIG_DEFAULTS
args, argv = readconf.parse_argv( [], conf)
args.dbtype = config.get('database','type')
if args.dbtype == 'sqlite3':
args.connect_args = { 'database' : config.get('database','database') }
elif args.dbtype == 'MySQLdb':
args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
elif args.dbtype == 'psycopg2':
args.connect_args = { 'database' : config.get('database','database') }
Datastore_class.__init__(self,args)
self.tx_cache = {}
self.mempool_keys = {}
self.bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.get('bitcoind','password'), config.get('bitcoind','host'), config.get('bitcoind','port'))
self.address_queue = address_queue
self.dblock = thread.allocate_lock()
def import_block(self, b, chain_ids=frozenset()):
block_id = super(MyStore, self).import_block(b, chain_ids)
for pos in xrange(len(b['transactions'])):
tx = b['transactions'][pos]
if 'hash' not in tx:
tx['hash'] = util.double_sha256(tx['tx'])
tx_id = store.tx_find_id_and_value(tx)
if tx_id:
self.update_tx_cache(tx_id)
else:
print "error: import_block: no tx_id"
return block_id
def update_tx_cache(self, txid):
inrows = self.get_tx_inputs(txid, False)
for row in inrows:
_hash = self.binout(row[6])
address = hash_to_address(chr(0), _hash)
if self.tx_cache.has_key(address):
print "cache: invalidating", address
self.tx_cache.pop(address)
self.address_queue.put(address)
outrows = self.get_tx_outputs(txid, False)
for row in outrows:
_hash = self.binout(row[6])
address = hash_to_address(chr(0), _hash)
if self.tx_cache.has_key(address):
print "cache: invalidating", address
self.tx_cache.pop(address)
self.address_queue.put(address)
def safe_sql(self,sql, params=(), lock=True):
try:
if lock: self.dblock.acquire()
ret = self.selectall(sql,params)
if lock: self.dblock.release()
return ret
except:
print "sql error", sql
return []
def get_tx_outputs(self, tx_id, lock=True):
return self.safe_sql("""SELECT
txout.txout_pos,
txout.txout_scriptPubKey,
txout.txout_value,
nexttx.tx_hash,
nexttx.tx_id,
txin.txin_pos,
pubkey.pubkey_hash
FROM txout
LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
WHERE txout.tx_id = %d
ORDER BY txout.txout_pos
"""%(tx_id), (), lock)
def get_tx_inputs(self, tx_id, lock=True):
return self.safe_sql(""" SELECT
txin.txin_pos,
txin.txin_scriptSig,
txout.txout_value,
COALESCE(prevtx.tx_hash, u.txout_tx_hash),
prevtx.tx_id,
COALESCE(txout.txout_pos, u.txout_pos),
pubkey.pubkey_hash
FROM txin
LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
WHERE txin.tx_id = %d
ORDER BY txin.txin_pos
"""%(tx_id,), (), lock)
def get_address_out_rows(self, dbhash):
return self.safe_sql(""" SELECT
b.block_nTime,
cc.chain_id,
b.block_height,
1,
b.block_hash,
tx.tx_hash,
tx.tx_id,
txin.txin_pos,
-prevout.txout_value
FROM chain_candidate cc
JOIN block b ON (b.block_id = cc.block_id)
JOIN block_tx ON (block_tx.block_id = b.block_id)
JOIN tx ON (tx.tx_id = block_tx.tx_id)
JOIN txin ON (txin.tx_id = tx.tx_id)
JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
WHERE pubkey.pubkey_hash = ?
AND cc.in_longest = 1""", (dbhash,))
def get_address_out_rows_memorypool(self, dbhash):
return self.safe_sql(""" SELECT
1,
tx.tx_hash,
tx.tx_id,
txin.txin_pos,
-prevout.txout_value
FROM tx
JOIN txin ON (txin.tx_id = tx.tx_id)
JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
WHERE pubkey.pubkey_hash = ? """, (dbhash,))
def get_address_in_rows(self, dbhash):
return self.safe_sql(""" SELECT
b.block_nTime,
cc.chain_id,
b.block_height,
0,
b.block_hash,
tx.tx_hash,
tx.tx_id,
txout.txout_pos,
txout.txout_value
FROM chain_candidate cc
JOIN block b ON (b.block_id = cc.block_id)
JOIN block_tx ON (block_tx.block_id = b.block_id)
JOIN tx ON (tx.tx_id = block_tx.tx_id)
JOIN txout ON (txout.tx_id = tx.tx_id)
JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
WHERE pubkey.pubkey_hash = ?
AND cc.in_longest = 1""", (dbhash,))
def get_address_in_rows_memorypool(self, dbhash):
return self.safe_sql( """ SELECT
0,
tx.tx_hash,
tx.tx_id,
txout.txout_pos,
txout.txout_value
FROM tx
JOIN txout ON (txout.tx_id = tx.tx_id)
JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
WHERE pubkey.pubkey_hash = ? """, (dbhash,))
def get_history(self, addr):
cached_version = self.tx_cache.get( addr )
if cached_version is not None:
return cached_version
version, binaddr = decode_check_address(addr)
if binaddr is None:
return None
dbhash = self.binin(binaddr)
rows = []
rows += self.get_address_out_rows( dbhash )
rows += self.get_address_in_rows( dbhash )
txpoints = []
known_tx = []
for row in rows:
try:
nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
except:
print "cannot unpack row", row
break
tx_hash = self.hashout_hex(tx_hash)
txpoint = {
"nTime": int(nTime),
"height": int(height),
"is_in": int(is_in),
"blk_hash": self.hashout_hex(blk_hash),
"tx_hash": tx_hash,
"tx_id": int(tx_id),
"pos": int(pos),
"value": int(value),
}
txpoints.append(txpoint)
known_tx.append(self.hashout_hex(tx_hash))
# todo: sort them really...
txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
# read memory pool
rows = []
rows += self.get_address_in_rows_memorypool( dbhash )
rows += self.get_address_out_rows_memorypool( dbhash )
address_has_mempool = False
for row in rows:
is_in, tx_hash, tx_id, pos, value = row
tx_hash = self.hashout_hex(tx_hash)
if tx_hash in known_tx:
continue
# this means that pending transactions were added to the db, even if they are not returned by getmemorypool
address_has_mempool = True
# this means pending transactions are returned by getmemorypool
if tx_hash not in self.mempool_keys:
continue
#print "mempool", tx_hash
txpoint = {
"nTime": 0,
"height": 0,
"is_in": int(is_in),
"blk_hash": 'mempool',
"tx_hash": tx_hash,
"tx_id": int(tx_id),
"pos": int(pos),
"value": int(value),
}
txpoints.append(txpoint)
for txpoint in txpoints:
tx_id = txpoint['tx_id']
txinputs = []
inrows = self.get_tx_inputs(tx_id)
for row in inrows:
_hash = self.binout(row[6])
address = hash_to_address(chr(0), _hash)
txinputs.append(address)
txpoint['inputs'] = txinputs
txoutputs = []
outrows = self.get_tx_outputs(tx_id)
for row in outrows:
_hash = self.binout(row[6])
address = hash_to_address(chr(0), _hash)
txoutputs.append(address)
txpoint['outputs'] = txoutputs
# for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
if not txpoint['is_in']:
# detect if already redeemed...
for row in outrows:
if row[6] == dbhash: break
else:
raise
#row = self.get_tx_output(tx_id,dbhash)
# pos, script, value, o_hash, o_id, o_pos, binaddr = row
# if not redeemed, we add the script
if row:
if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
# cache result
if not address_has_mempool:
self.tx_cache[addr] = txpoints
return txpoints
def memorypool_update(store):
ds = BCDataStream.BCDataStream()
previous_transactions = store.mempool_keys
store.mempool_keys = []
postdata = dumps({"method": 'getmemorypool', 'params': [], 'id':'jsonrpc'})
respdata = urllib.urlopen(store.bitcoind_url, postdata).read()
r = loads(respdata)
if r['error'] != None:
return
v = r['result'].get('transactions')
for hextx in v:
ds.clear()
ds.write(hextx.decode('hex'))
tx = deserialize.parse_Transaction(ds)
tx['hash'] = util.double_sha256(tx['tx'])
tx_hash = store.hashin(tx['hash'])
def send_tx(self,tx):
postdata = dumps({"method": 'importtransaction', 'params': [tx], 'id':'jsonrpc'})
respdata = urllib.urlopen(self.bitcoind_url, postdata).read()
r = loads(respdata)
if r['error'] != None:
out = "error: transaction rejected by memorypool\n"+tx
else:
out = r['result']
return out
def main_iteration(store):
try:
store.dblock.acquire()
store.catch_up()
store.memorypool_update()
block_number = store.get_block_number(1)
except IOError:
print "IOError: cannot reach bitcoind"
block_number = 0
except:
traceback.print_exc(file=sys.stdout)
block_number = 0
finally:
store.dblock.release()
return block_number
import time, json, socket, operator, thread, ast, sys, re, traceback
import ConfigParser
from json import dumps, loads
import urllib
config = ConfigParser.ConfigParser()
# set some defaults, which will be overwritten by the config file
config.add_section('server')
config.set('server','banner', 'Welcome to Electrum!')
config.set('server', 'host', 'localhost')
13 years ago
config.set('server', 'port', '50000')
config.set('server', 'password', '')
config.set('server', 'irc', 'yes')
config.set('server', 'ircname', 'Electrum server')
config.add_section('database')
config.set('database', 'type', 'psycopg2')
config.set('database', 'database', 'abe')
13 years ago
try:
f = open('/etc/electrum.conf','r')
config.readfp(f)
13 years ago
f.close()
except:
13 years ago
print "Could not read electrum.conf. I will use the default values."
13 years ago
13 years ago
try:
f = open('/etc/electrum.banner','r')
config.set('server','banner', f.read())
f.close()
except:
pass
13 years ago
password = config.get('server','password')
stopping = False
block_number = -1
13 years ago
sessions = {}
sessions_sub_numblocks = {} # sessions that have subscribed to the service
m_sessions = [{}] # served by http
peer_list = {}
13 years ago
wallets = {} # for ultra-light clients such as bccapi
from Queue import Queue
input_queue = Queue()
output_queue = Queue()
13 years ago
address_queue = Queue()
13 years ago
def random_string(N):
import random, string
return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
def cmd_stop(_,__,pw):
global stopping
if password == pw:
stopping = True
return 'ok'
else:
return 'wrong password'
def cmd_load(_,__,pw):
if password == pw:
return repr( len(sessions) )
else:
return 'wrong password'
def modified_addresses(session):
if 1:
t1 = time.time()
addresses = session['addresses']
session['last_time'] = time.time()
ret = {}
k = 0
for addr in addresses:
status = get_address_status( addr )
msg_id, last_status = addresses.get( addr )
if last_status != status:
addresses[addr] = msg_id, status
ret[addr] = status
t2 = time.time() - t1
#if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
return ret, addresses
def poll_session(session_id):
# native
session = sessions.get(session_id)
if session is None:
print time.asctime(), "session not found", session_id
return -1, {}
else:
ret, addresses = modified_addresses(session)
if ret: sessions[session_id]['addresses'] = addresses
return repr( (block_number,ret))
def poll_session_json(session_id, message_id):
session = m_sessions[0].get(session_id)
if session is None:
raise BaseException("session not found %s"%session_id)
else:
out = []
ret, addresses = modified_addresses(session)
if ret:
m_sessions[0][session_id]['addresses'] = addresses
for addr in ret:
msg_id, status = addresses[addr]
out.append( { 'id':msg_id, 'result':status } )
msg_id, last_nb = session.get('numblocks')
if last_nb:
if last_nb != block_number:
m_sessions[0][session_id]['numblocks'] = msg_id, block_number
out.append( {'id':msg_id, 'result':block_number} )
return out
13 years ago
def do_update_address(addr):
# an address was involved in a transaction; we check if it was subscribed to in a session
# the address can be subscribed in several sessions; the cache should ensure that we don't do redundant requests
13 years ago
for session_id in sessions.keys():
session = sessions[session_id]
13 years ago
if session.get('type') != 'persistent': continue
13 years ago
addresses = session['addresses'].keys()
if addr in addresses:
status = get_address_status( addr )
message_id, last_status = session['addresses'][addr]
13 years ago
if last_status != status:
#print "sending new status for %s:"%addr, status
send_status(session_id,message_id,addr,status)
sessions[session_id]['addresses'][addr] = (message_id,status)
13 years ago
def get_address_status(addr):
13 years ago
# get address status, i.e. the last block for that address.
tx_points = store.get_history(addr)
if not tx_points:
status = None
else:
lastpoint = tx_points[-1]
status = lastpoint['blk_hash']
# this is a temporary hack; move it up once old clients have disappeared
if status == 'mempool': # and session['version'] != "old":
status = status + ':%d'% len(tx_points)
return status
def send_numblocks(session_id):
message_id = sessions_sub_numblocks[session_id]
out = json.dumps( {'id':message_id, 'result':block_number} )
output_queue.put((session_id, out))
def send_status(session_id, message_id, address, status):
13 years ago
out = json.dumps( { 'id':message_id, 'result':status } )
13 years ago
output_queue.put((session_id, out))
def address_get_history_json(_,message_id,address):
return store.get_history(address)
def subscribe_to_numblocks(session_id, message_id):
sessions_sub_numblocks[session_id] = message_id
send_numblocks(session_id)
def subscribe_to_numblocks_json(session_id, message_id):
global m_sessions
m_sessions[0][session_id]['numblocks'] = message_id,block_number
return block_number
def subscribe_to_address(session_id, message_id, address):
status = get_address_status(address)
sessions[session_id]['addresses'][address] = (message_id, status)
sessions[session_id]['last_time'] = time.time()
send_status(session_id, message_id, address, status)
def add_address_to_session_json(session_id, message_id, address):
global m_sessions
sessions = m_sessions[0]
status = get_address_status(address)
sessions[session_id]['addresses'][address] = (message_id, status)
sessions[session_id]['last_time'] = time.time()
m_sessions[0] = sessions
return status
13 years ago
def add_address_to_session(session_id, address):
status = get_address_status(address)
sessions[session_id]['addresses'][address] = ("", status)
13 years ago
sessions[session_id]['last_time'] = time.time()
return status
def new_session(version, addresses):
session_id = random_string(10)
13 years ago
sessions[session_id] = { 'addresses':{}, 'version':version }
for a in addresses:
sessions[session_id]['addresses'][a] = ('','')
out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
sessions[session_id]['last_time'] = time.time()
return out
def client_version_json(session_id, _, version):
global m_sessions
sessions = m_sessions[0]
sessions[session_id]['version'] = version
m_sessions[0] = sessions
def create_session_json(_, __):
sessions = m_sessions[0]
session_id = random_string(10)
print "creating session", session_id
sessions[session_id] = { 'addresses':{}, 'numblocks':('','') }
sessions[session_id]['last_time'] = time.time()
m_sessions[0] = sessions
return session_id
def get_banner(_,__):
13 years ago
return config.get('server','banner').replace('\\n','\n')
13 years ago
def update_session(session_id,addresses):
13 years ago
"""deprecated in 0.42"""
sessions[session_id]['addresses'] = {}
for a in addresses:
sessions[session_id]['addresses'][a] = ''
sessions[session_id]['last_time'] = time.time()
13 years ago
return 'ok'
13 years ago
13 years ago
def native_server_thread():
13 years ago
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((config.get('server','host'), config.getint('server','port')))
13 years ago
s.listen(1)
while not stopping:
13 years ago
conn, addr = s.accept()
13 years ago
try:
13 years ago
thread.start_new_thread(native_client_thread, (addr, conn,))
13 years ago
except:
# can't start new thread if there is no memory..
traceback.print_exc(file=sys.stdout)
13 years ago
13 years ago
def native_client_thread(ipaddr,conn):
13 years ago
#print "client thread", ipaddr
13 years ago
try:
ipaddr = ipaddr[0]
msg = ''
while 1:
d = conn.recv(1024)
13 years ago
msg += d
if not d:
break
if '#' in msg:
msg = msg.split('#', 1)[0]
13 years ago
break
13 years ago
try:
cmd, data = ast.literal_eval(msg)
13 years ago
except:
print "syntax error", repr(msg), ipaddr
13 years ago
conn.close()
return
13 years ago
out = do_command(cmd, data, ipaddr)
if out:
#print ipaddr, cmd, len(out)
13 years ago
try:
conn.send(out)
13 years ago
except:
print "error, could not send"
13 years ago
finally:
conn.close()
13 years ago
def timestr():
return time.strftime("[%d/%m/%Y-%H:%M:%S]")
13 years ago
# used by the native handler
def do_command(cmd, data, ipaddr):
13 years ago
if cmd=='b':
out = "%d"%block_number
13 years ago
elif cmd in ['session','new_session']:
try:
if cmd == 'session':
addresses = ast.literal_eval(data)
version = "old"
else:
version, addresses = ast.literal_eval(data)
if version[0]=="0": version = "v" + version
except:
print "error", data
return None
print timestr(), "new session", ipaddr, addresses[0] if addresses else addresses, len(addresses), version
out = new_session(version, addresses)
13 years ago
13 years ago
elif cmd=='address.subscribe':
try:
session_id, addr = ast.literal_eval(data)
except:
traceback.print_exc(file=sys.stdout)
print data
13 years ago
return None
13 years ago
out = add_address_to_session(session_id,addr)
13 years ago
elif cmd=='update_session':
try:
session_id, addresses = ast.literal_eval(data)
except:
traceback.print_exc(file=sys.stdout)
return None
print timestr(), "update session", ipaddr, addresses[0] if addresses else addresses, len(addresses)
13 years ago
out = update_session(session_id,addresses)
elif cmd=='poll':
out = poll_session(data)
elif cmd == 'h':
# history
address = data
out = repr( store.get_history( address ) )
elif cmd == 'load':
13 years ago
out = cmd_load(None,None,data)
13 years ago
elif cmd =='tx':
out = store.send_tx(data)
print timestr(), "sent tx:", ipaddr, out
elif cmd == 'stop':
out = cmd_stop(data)
13 years ago
elif cmd == 'peers':
out = repr(peer_list.values())
else:
out = None
return out
13 years ago
13 years ago
####################################################################
def tcp_server_thread():
thread.start_new_thread(process_input_queue, ())
thread.start_new_thread(process_output_queue, ())
13 years ago
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((config.get('server','host'), 50001))
s.listen(1)
while not stopping:
conn, addr = s.accept()
try:
thread.start_new_thread(tcp_client_thread, (addr, conn,))
except:
# can't start new thread if there is no memory..
traceback.print_exc(file=sys.stdout)
13 years ago
def close_session(session_id):
13 years ago
#print "lost connection", session_id
13 years ago
sessions.pop(session_id)
13 years ago
if session_id in sessions_sub_numblocks:
sessions_sub_numblocks.pop(session_id)
13 years ago
# one thread per client. put requests in a queue.
13 years ago
def tcp_client_thread(ipaddr,conn):
""" use a persistent connection. put commands in a queue."""
print timestr(), "TCP session", ipaddr
13 years ago
global sessions
session_id = random_string(10)
13 years ago
sessions[session_id] = { 'conn':conn, 'addresses':{}, 'version':'unknown', 'type':'persistent' }
13 years ago
ipaddr = ipaddr[0]
msg = ''
while not stopping:
try:
d = conn.recv(1024)
except socket.error:
d = ''
13 years ago
if not d:
13 years ago
close_session(session_id)
13 years ago
break
msg += d
13 years ago
while True:
s = msg.find('\n')
if s ==-1:
break
else:
c = msg[0:s].strip()
13 years ago
msg = msg[s+1:]
if c == 'quit':
conn.close()
close_session(session_id)
return
try:
c = json.loads(c)
except:
print "json error", repr(c)
continue
13 years ago
try:
message_id = c.get('id')
method = c.get('method')
params = c.get('params')
13 years ago
except:
print "syntax error", repr(c), ipaddr
continue
# add to queue
input_queue.put((session_id, message_id, method, params))
# read commands from the input queue. perform requests, etc. this should be called from the main thread.
def process_input_queue():
while not stopping:
session_id, message_id, method, data = input_queue.get()
13 years ago
if session_id not in sessions.keys():
continue
out = None
if method == 'address.subscribe':
address = data[0]
subscribe_to_address(session_id,message_id,address)
elif method == 'numblocks.subscribe':
subscribe_to_numblocks(session_id,message_id)
elif method == 'client.version':
sessions[session_id]['version'] = data[0]
elif method == 'server.banner':
out = { 'result':config.get('server','banner').replace('\\n','\n') }
elif method == 'server.peers':
out = { 'result':peer_list.values() }
elif method == 'address.get_history':
address = data[0]
out = { 'result':store.get_history( address ) }
elif method == 'transaction.broadcast':
13 years ago
postdata = dumps({"method": 'importtransaction', 'params': [data], 'id':'jsonrpc'})
txo = urllib.urlopen(bitcoind_url, postdata).read()
13 years ago
print "sent tx:", txo
13 years ago
out = json.loads(txo)
else:
print "unknown command", method
if out:
out['id'] = message_id
out = json.dumps( out )
output_queue.put((session_id, out))
13 years ago
# this is a separate thread
def process_output_queue():
while not stopping:
session_id, out = output_queue.get()
session = sessions.get(session_id)
if session:
13 years ago
try:
conn = session.get('conn')
conn.send(out+'\n')
except:
close_session(session_id)
13 years ago
####################################################################
13 years ago
def clean_session_thread():
while not stopping:
time.sleep(30)
t = time.time()
for k,s in sessions.items():
13 years ago
if s.get('type') == 'persistent': continue
t0 = s['last_time']
13 years ago
if t - t0 > 5*60:
sessions.pop(k)
13 years ago
print "lost session", k
def irc_thread():
global peer_list
NICK = 'E_'+random_string(10)
while not stopping:
try:
s = socket.socket()
s.connect(('irc.freenode.net', 6667))
s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
13 years ago
s.send('NICK '+NICK+'\n')
s.send('JOIN #electrum\n')
sf = s.makefile('r', 0)
t = 0
while not stopping:
line = sf.readline()
line = line.rstrip('\r\n')
line = line.split()
if line[0]=='PING':
s.send('PONG '+line[1]+'\n')
elif '353' in line: # answer to /names
k = line.index('353')
for item in line[k+1:]:
if item[0:2] == 'E_':
s.send('WHO %s\n'%item)
elif '352' in line: # answer to /who
# warning: this is a horrible hack which apparently works
k = line.index('352')
ip = line[k+4]
ip = socket.gethostbyname(ip)
name = line[k+6]
host = line[k+9]
peer_list[name] = (ip,host)
if time.time() - t > 5*60:
s.send('NAMES #electrum\n')
t = time.time()
peer_list = {}
except:
traceback.print_exc(file=sys.stdout)
finally:
sf.close()
s.close()
def get_peers_json(_,__):
return peer_list.values()
13 years ago
def http_server_thread():
# see http://code.google.com/p/jsonrpclib/
13 years ago
from SocketServer import ThreadingMixIn
from StratumJSONRPCServer import StratumJSONRPCServer
class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
server = StratumThreadedJSONRPCServer(( config.get('server','host'), 8081))
server.register_function(get_peers_json, 'server.peers')
server.register_function(cmd_stop, 'stop')
server.register_function(cmd_load, 'load')
13 years ago
server.register_function(get_banner, 'server.banner')
server.register_function(lambda a,b,c: store.send_tx(c), 'transaction.broadcast')
server.register_function(address_get_history_json, 'address.get_history')
server.register_function(add_address_to_session_json, 'address.subscribe')
server.register_function(subscribe_to_numblocks_json, 'numblocks.subscribe')
server.register_function(client_version_json, 'client.version')
13 years ago
server.register_function(create_session_json, 'session.create') # internal message (not part of protocol)
server.register_function(poll_session_json, 'session.poll') # internal message (not part of protocol)
server.serve_forever()
13 years ago
if __name__ == '__main__':
if len(sys.argv)>1:
import jsonrpclib
13 years ago
server = jsonrpclib.Server('http://%s:8081'%config.get('server','host'))
cmd = sys.argv[1]
if cmd == 'load':
out = server.load(password)
elif cmd == 'peers':
out = server.server.peers()
elif cmd == 'stop':
out = server.stop(password)
13 years ago
elif cmd == 'clear_cache':
out = server.clear_cache(password)
13 years ago
elif cmd == 'get_cache':
out = server.get_cache(password,sys.argv[2])
elif cmd == 'h':
13 years ago
out = server.address.get_history(sys.argv[2])
elif cmd == 'tx':
13 years ago
out = server.transaction.broadcast(sys.argv[2])
elif cmd == 'b':
out = server.numblocks.subscribe()
else:
out = "Unknown command: '%s'" % cmd
print out
sys.exit(0)
13 years ago
# backend
# from db import MyStore
store = MyStore(config,address_queue)
13 years ago
13 years ago
# supported protocols
thread.start_new_thread(native_server_thread, ())
thread.start_new_thread(tcp_server_thread, ())
13 years ago
thread.start_new_thread(http_server_thread, ())
thread.start_new_thread(clean_session_thread, ())
13 years ago
if (config.get('server','irc') == 'yes' ):
thread.start_new_thread(irc_thread, ())
13 years ago
13 years ago
print "starting Electrum server"
old_block_number = None
while not stopping:
13 years ago
block_number = store.main_iteration()
13 years ago
13 years ago
if block_number != old_block_number:
old_block_number = block_number
for session_id in sessions_sub_numblocks.keys():
send_numblocks(session_id)
13 years ago
while True:
try:
addr = address_queue.get(False)
except:
break
do_update_address(addr)
13 years ago
time.sleep(10)
print "server stopped"