You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

645 lines
21 KiB

13 years ago
#!/usr/bin/env python
# Copyright(C) 2011 thomasv@gitorious
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see
# <http://www.gnu.org/licenses/agpl.html>.
"""
Todo:
* server should check and return bitcoind status..
* improve txpoint sorting
* command to check cache
13 years ago
"""
13 years ago
import time, socket, operator, thread, ast, sys,re
13 years ago
import psycopg2, binascii
import bitcoinrpc
13 years ago
from Abe.abe import hash_to_address, decode_check_address
from Abe.DataStore import DataStore as Datastore_class
from Abe import DataStore, readconf, BCDataStream, deserialize, util, base58
import ConfigParser
config = ConfigParser.ConfigParser()
# set some defaults, which will be overwritten by the config file
config.add_section('server')
config.set('server','banner', 'Welcome to Electrum!')
config.set('server', 'host', 'localhost')
config.set('server', 'port', 50000)
config.set('server', 'password', '')
config.set('server', 'irc', 'yes')
config.set('server', 'cache', 'no')
config.set('server', 'ircname', 'Electrum server')
config.add_section('database')
config.set('database', 'type', 'psycopg2')
config.set('database', 'database', 'abe')
13 years ago
try:
f = open('/etc/electrum.conf','r')
config.readfp(f)
13 years ago
f.close()
except:
13 years ago
print "Could not read electrum.conf. I will use the default values."
13 years ago
stopping = False
block_number = -1
13 years ago
sessions = {}
dblock = thread.allocate_lock()
peer_list = {}
13 years ago
13 years ago
class MyStore(Datastore_class):
def import_tx(self, tx, is_coinbase):
tx_id = super(MyStore, self).import_tx(tx, is_coinbase)
if config.get('server', 'cache') == 'yes': self.update_tx_cache(tx_id)
def update_tx_cache(self, txid):
inrows = self.get_tx_inputs(txid, False)
for row in inrows:
_hash = store.binout(row[6])
address = hash_to_address(chr(0), _hash)
if self.tx_cache.has_key(address):
13 years ago
print "cache: invalidating", address, self.ismempool
self.tx_cache.pop(address)
outrows = self.get_tx_outputs(txid, False)
for row in outrows:
_hash = store.binout(row[6])
address = hash_to_address(chr(0), _hash)
if self.tx_cache.has_key(address):
13 years ago
print "cache: invalidating", address, self.ismempool
self.tx_cache.pop(address)
def safe_sql(self,sql, params=(), lock=True):
13 years ago
try:
if lock: dblock.acquire()
13 years ago
ret = self.selectall(sql,params)
if lock: dblock.release()
13 years ago
return ret
except:
print "sql error", sql
return []
def get_tx_outputs(self, tx_id, lock=True):
13 years ago
return self.safe_sql("""SELECT
txout.txout_pos,
txout.txout_scriptPubKey,
txout.txout_value,
nexttx.tx_hash,
nexttx.tx_id,
txin.txin_pos,
pubkey.pubkey_hash
FROM txout
LEFT JOIN txin ON (txin.txout_id = txout.txout_id)
LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
LEFT JOIN tx nexttx ON (txin.tx_id = nexttx.tx_id)
WHERE txout.tx_id = %d
ORDER BY txout.txout_pos
"""%(tx_id), (), lock)
13 years ago
def get_tx_inputs(self, tx_id, lock=True):
13 years ago
return self.safe_sql(""" SELECT
txin.txin_pos,
txin.txin_scriptSig,
txout.txout_value,
COALESCE(prevtx.tx_hash, u.txout_tx_hash),
prevtx.tx_id,
COALESCE(txout.txout_pos, u.txout_pos),
pubkey.pubkey_hash
FROM txin
LEFT JOIN txout ON (txout.txout_id = txin.txout_id)
LEFT JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
LEFT JOIN tx prevtx ON (txout.tx_id = prevtx.tx_id)
LEFT JOIN unlinked_txin u ON (u.txin_id = txin.txin_id)
WHERE txin.tx_id = %d
ORDER BY txin.txin_pos
"""%(tx_id,), (), lock)
13 years ago
def get_address_out_rows(self, dbhash):
return self.safe_sql(""" SELECT
b.block_nTime,
cc.chain_id,
b.block_height,
1,
b.block_hash,
tx.tx_hash,
tx.tx_id,
txin.txin_pos,
-prevout.txout_value
FROM chain_candidate cc
JOIN block b ON (b.block_id = cc.block_id)
JOIN block_tx ON (block_tx.block_id = b.block_id)
JOIN tx ON (tx.tx_id = block_tx.tx_id)
JOIN txin ON (txin.tx_id = tx.tx_id)
JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
13 years ago
WHERE pubkey.pubkey_hash = ?
AND cc.in_longest = 1""", (dbhash,))
13 years ago
def get_address_out_rows_memorypool(self, dbhash):
return self.safe_sql(""" SELECT
1,
tx.tx_hash,
tx.tx_id,
txin.txin_pos,
-prevout.txout_value
FROM tx
JOIN txin ON (txin.tx_id = tx.tx_id)
JOIN txout prevout ON (txin.txout_id = prevout.txout_id)
JOIN pubkey ON (pubkey.pubkey_id = prevout.pubkey_id)
13 years ago
WHERE pubkey.pubkey_hash = ? """, (dbhash,))
13 years ago
def get_address_in_rows(self, dbhash):
return self.safe_sql(""" SELECT
b.block_nTime,
cc.chain_id,
b.block_height,
0,
b.block_hash,
tx.tx_hash,
tx.tx_id,
txout.txout_pos,
txout.txout_value
FROM chain_candidate cc
JOIN block b ON (b.block_id = cc.block_id)
JOIN block_tx ON (block_tx.block_id = b.block_id)
JOIN tx ON (tx.tx_id = block_tx.tx_id)
JOIN txout ON (txout.tx_id = tx.tx_id)
JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
13 years ago
WHERE pubkey.pubkey_hash = ?
AND cc.in_longest = 1""", (dbhash,))
13 years ago
def get_address_in_rows_memorypool(self, dbhash):
return self.safe_sql( """ SELECT
0,
tx.tx_hash,
tx.tx_id,
txout.txout_pos,
txout.txout_value
FROM tx
JOIN txout ON (txout.tx_id = tx.tx_id)
JOIN pubkey ON (pubkey.pubkey_id = txout.pubkey_id)
13 years ago
WHERE pubkey.pubkey_hash = ? """, (dbhash,))
13 years ago
def get_txpoints(self, addr):
if config.get('server','cache') == 'yes':
cached_version = self.tx_cache.get( addr )
if cached_version is not None:
return cached_version
13 years ago
version, binaddr = decode_check_address(addr)
if binaddr is None:
return "err"
dbhash = self.binin(binaddr)
rows = []
rows += self.get_address_out_rows( dbhash )
rows += self.get_address_in_rows( dbhash )
txpoints = []
known_tx = []
for row in rows:
try:
nTime, chain_id, height, is_in, blk_hash, tx_hash, tx_id, pos, value = row
except:
print "cannot unpack row", row
break
tx_hash = self.hashout_hex(tx_hash)
txpoint = {
"nTime": int(nTime),
#"chain_id": int(chain_id),
"height": int(height),
"is_in": int(is_in),
"blk_hash": self.hashout_hex(blk_hash),
"tx_hash": tx_hash,
"tx_id": int(tx_id),
"pos": int(pos),
"value": int(value),
}
txpoints.append(txpoint)
known_tx.append(self.hashout_hex(tx_hash))
# todo: sort them really...
txpoints = sorted(txpoints, key=operator.itemgetter("nTime"))
# read memory pool
rows = []
rows += self.get_address_in_rows_memorypool( dbhash )
rows += self.get_address_out_rows_memorypool( dbhash )
13 years ago
address_has_mempool = False
13 years ago
for row in rows:
is_in, tx_hash, tx_id, pos, value = row
tx_hash = self.hashout_hex(tx_hash)
if tx_hash in known_tx:
continue
13 years ago
address_has_mempool = True
13 years ago
#print "mempool", tx_hash
txpoint = {
"nTime": 0,
#"chain_id": 1,
13 years ago
"height": 0,
"is_in": int(is_in),
"blk_hash": 'mempool',
13 years ago
"tx_hash": tx_hash,
"tx_id": int(tx_id),
"pos": int(pos),
"value": int(value),
}
txpoints.append(txpoint)
for txpoint in txpoints:
tx_id = txpoint['tx_id']
txinputs = []
inrows = self.get_tx_inputs(tx_id)
for row in inrows:
_hash = self.binout(row[6])
address = hash_to_address(chr(0), _hash)
txinputs.append(address)
txpoint['inputs'] = txinputs
txoutputs = []
outrows = self.get_tx_outputs(tx_id)
for row in outrows:
_hash = self.binout(row[6])
address = hash_to_address(chr(0), _hash)
txoutputs.append(address)
txpoint['outputs'] = txoutputs
# for all unspent inputs, I want their scriptpubkey. (actually I could deduce it from the address)
if not txpoint['is_in']:
# detect if already redeemed...
for row in outrows:
if row[6] == dbhash: break
else:
raise
#row = self.get_tx_output(tx_id,dbhash)
# pos, script, value, o_hash, o_id, o_pos, binaddr = row
# if not redeemed, we add the script
if row:
if not row[4]: txpoint['raw_scriptPubKey'] = row[1]
# cache result
13 years ago
if config.get('server','cache') == 'yes' and not address_has_mempool:
self.tx_cache[addr] = txpoints
return txpoints
13 years ago
def send_tx(tx):
import bitcoinrpc
conn = bitcoinrpc.connect_to_local()
13 years ago
try:
v = conn.importtransaction(tx)
except:
v = "error: transaction rejected by memorypool"
return v
def listen_thread(store):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((config.get('server','host'), config.getint('server','port')))
13 years ago
s.listen(1)
while not stopping:
13 years ago
conn, addr = s.accept()
thread.start_new_thread(client_thread, (addr, conn,))
def random_string(N):
import random, string
return ''.join(random.choice(string.ascii_uppercase + string.digits) for x in range(N))
13 years ago
def client_thread(ipaddr,conn):
13 years ago
#print "client thread", ipaddr
13 years ago
try:
ipaddr = ipaddr[0]
msg = ''
while 1:
d = conn.recv(1024)
13 years ago
msg += d
if not d:
break
13 years ago
if d[-1]=='#':
13 years ago
break
13 years ago
13 years ago
#print msg
13 years ago
try:
cmd, data = ast.literal_eval(msg[:-1])
except:
print "syntax error", repr(msg)
conn.close()
return
13 years ago
if cmd=='b':
out = "%d"%block_number
13 years ago
elif cmd in ['session','new_session']:
session_id = random_string(10)
13 years ago
try:
if cmd == 'session':
addresses = ast.literal_eval(data)
version = "old"
else:
version, addresses = ast.literal_eval(data)
13 years ago
except:
print "error"
conn.close()
return
print time.asctime(), "new session", version, ipaddr, session_id, addresses[0] if addresses else addresses, len(addresses)
13 years ago
sessions[session_id] = { 'addresses':{}, 'version':version }
13 years ago
for a in addresses:
sessions[session_id]['addresses'][a] = ''
13 years ago
out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
sessions[session_id]['last_time'] = time.time()
13 years ago
13 years ago
elif cmd=='update_session':
try:
session_id, addresses = ast.literal_eval(data)
except:
print "error"
conn.close()
return
print time.asctime(), "update session", ipaddr, session_id, addresses[0] if addresses else addresses, len(addresses)
sessions[session_id]['addresses'] = {}
13 years ago
for a in addresses:
sessions[session_id]['addresses'][a] = ''
13 years ago
out = 'ok'
sessions[session_id]['last_time'] = time.time()
13 years ago
13 years ago
elif cmd=='poll':
13 years ago
session_id = data
session = sessions.get(session_id)
if session is None:
13 years ago
print time.asctime(), "session not found", session_id, ipaddr
13 years ago
out = repr( (-1, {}))
else:
13 years ago
t1 = time.time()
addresses = session['addresses']
session['last_time'] = time.time()
13 years ago
ret = {}
13 years ago
k = 0
13 years ago
for addr in addresses:
13 years ago
if store.tx_cache.get( addr ) is not None: k += 1
# get addtess status, i.e. the last block for that address.
tx_points = store.get_txpoints(addr)
if not tx_points:
status = None
else:
lastpoint = tx_points[-1]
status = lastpoint['blk_hash']
# this is a temporary hack; move it up once old clients have disappeared
if status == 'mempool' and session['version'] != "old":
status = status + ':%s'% lastpoint['tx_hash']
13 years ago
last_status = addresses.get( addr )
13 years ago
if last_status != status:
13 years ago
addresses[addr] = status
13 years ago
ret[addr] = status
13 years ago
if ret:
sessions[session_id]['addresses'] = addresses
out = repr( (block_number, ret ) )
13 years ago
t2 = time.time() - t1
if t2 > 10:
print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
13 years ago
elif cmd == 'h':
# history
address = data
out = repr( store.get_txpoints( address ) )
13 years ago
elif cmd == 'load':
if config.get('server','password') == data:
out = repr( len(sessions) )
else:
out = 'wrong password'
13 years ago
elif cmd =='tx':
out = send_tx(data)
print "sent tx:", out
13 years ago
elif cmd =='clear_cache':
if config.get('server','password') == data:
self.tx_cache = {}
out = 'ok'
else:
out = 'wrong password'
elif cmd =='get_cache':
try:
pw, addr = data
except:
addr = None
if addr:
if config.get('server','password') == pw:
out = store.tx_cache.get(addr)
out = repr(out)
else:
out = 'wrong password'
else:
out = "error: "+ repr(data)
elif cmd == 'stop':
global stopping
if config.get('server','password') == data:
stopping = True
out = 'ok'
else:
out = 'wrong password'
elif cmd == 'peers':
out = repr(peer_list.values())
13 years ago
else:
out = None
if out:
#print ipaddr, cmd, len(out)
try:
conn.send(out)
except:
print "error, could not send"
finally:
conn.close()
ds = BCDataStream.BCDataStream()
def memorypool_update(store):
conn = bitcoinrpc.connect_to_local()
13 years ago
try:
v = conn.getmemorypool()
except:
print "cannot contact bitcoin daemon"
13 years ago
return
v = v['transactions']
for hextx in v:
ds.clear()
ds.write(hextx.decode('hex'))
tx = deserialize.parse_Transaction(ds)
tx['hash'] = util.double_sha256(tx['tx'])
if store.tx_find_id_and_value(tx):
pass
else:
#print "new tx", tx['hash'][::-1].encode('hex')
13 years ago
store.import_tx(tx, False)
13 years ago
store.commit()
def clean_session_thread():
while not stopping:
time.sleep(30)
t = time.time()
for k,s in sessions.items():
t0 = s['last_time']
13 years ago
if t - t0 > 5*60:
print time.asctime(), "lost session",k
sessions.pop(k)
def irc_thread():
global peer_list
NICK = 'E_'+random_string(10)
while not stopping:
try:
s = socket.socket()
s.connect(('irc.freenode.net', 6667))
s.send('USER electrum 0 * :'+config.get('server','host')+' '+config.get('server','ircname')+'\n')
13 years ago
s.send('NICK '+NICK+'\n')
s.send('JOIN #electrum\n')
sf = s.makefile('r', 0)
t = 0
while not stopping:
line = sf.readline()
line = line.rstrip('\r\n')
line = line.split()
if line[0]=='PING':
s.send('PONG '+line[1]+'\n')
elif '353' in line: # answer to /names
k = line.index('353')
for item in line[k+1:]:
if item[0:2] == 'E_':
s.send('WHO %s\n'%item)
elif '352' in line: # answer to /who
# warning: this is a horrible hack which apparently works
k = line.index('352')
ip = line[k+4]
ip = socket.gethostbyname(ip)
name = line[k+6]
host = line[k+9]
peer_list[name] = (ip,host)
if time.time() - t > 5*60:
s.send('NAMES #electrum\n')
t = time.time()
peer_list = {}
except:
traceback.print_exc(file=sys.stdout)
finally:
sf.close()
s.close()
13 years ago
import traceback
if __name__ == '__main__':
if len(sys.argv)>1:
cmd = sys.argv[1]
if cmd == 'load':
request = "('load','%s')#"%config.get('server','password')
elif cmd == 'peers':
request = "('peers','')#"
elif cmd == 'stop':
request = "('stop','%s')#"%config.get('server','password')
13 years ago
elif cmd == 'clear_cache':
request = "('clear_cache','%s')#"%config.get('server','password')
elif cmd == 'get_cache':
request = "('get_cache',('%s','%s'))#"%(config.get('server','password'),sys.argv[2])
elif cmd == 'h':
request = "('h','%s')#"%sys.argv[2]
elif cmd == 'b':
request = "('b','')#"
s = socket.socket( socket.AF_INET, socket.SOCK_STREAM)
s.connect((config.get('server','host'), config.getint('server','port')))
s.send( request )
out = ''
while 1:
msg = s.recv(1024)
if msg: out += msg
else: break
s.close()
print out
sys.exit(0)
13 years ago
print "starting Electrum server"
13 years ago
print "cache:", config.get('server', 'cache')
13 years ago
conf = DataStore.CONFIG_DEFAULTS
args, argv = readconf.parse_argv( [], conf)
args.dbtype= config.get('database','type')
if args.dbtype == 'sqlite3':
args.connect_args = { 'database' : config.get('database','database') }
elif args.dbtype == 'MySQLdb':
args.connect_args = { 'db' : config.get('database','database'), 'user' : config.get('database','username'), 'passwd' : config.get('database','password') }
elif args.dbtype == 'psycopg2':
args.connect_args = { 'database' : config.get('database','database') }
13 years ago
store = MyStore(args)
store.tx_cache = {}
store.ismempool = False
13 years ago
13 years ago
thread.start_new_thread(listen_thread, (store,))
thread.start_new_thread(clean_session_thread, ())
if (config.get('server','irc') == 'yes' ):
thread.start_new_thread(irc_thread, ())
13 years ago
while not stopping:
13 years ago
try:
dblock.acquire()
store.catch_up()
store.ismempool = True
13 years ago
memorypool_update(store)
store.ismempool = False
block_number = store.get_block_number(1)
13 years ago
dblock.release()
except:
traceback.print_exc(file=sys.stdout)
time.sleep(10)
print "server stopped"