diff --git a/electrum b/electrum index a449de3c5..74b0077cc 100755 --- a/electrum +++ b/electrum @@ -114,15 +114,16 @@ def run_command(cmd, password=None, args=[]): cmd_runner.password = password if cmd.requires_network and not options.offline: - cmd_runner.network = xmlrpclib.ServerProxy('http://localhost:8000') while True: try: - if cmd_runner.network.ping() == 'pong': - break + cmd_runner.network = NetworkProxy(config) + cmd_runner.network.start() + break except socket.error: if cmd.name != 'daemon': - start_daemon() + print "starting daemon" + start_daemon(config) else: print "Daemon not running" sys.exit(1) @@ -133,6 +134,7 @@ def run_command(cmd, password=None, args=[]): else: cmd_runner.network = None + try: result = func(*args[1:]) except Exception: @@ -153,50 +155,7 @@ def run_command(cmd, password=None, args=[]): -def start_server(): - network = Network(config) - if not network.start(wait=True): - print_msg("Not connected, aborting.") - sys.exit(1) - print_msg("Network daemon connected to " + network.interface.connection_msg) - from SimpleXMLRPCServer import SimpleXMLRPCServer - server = SimpleXMLRPCServer(('localhost',8000), allow_none=True, logRequests=False) - server.network = network - server.register_function(lambda: 'pong', 'ping') - server.register_function(network.synchronous_get, 'synchronous_get') - server.register_function(network.get_servers, 'get_servers') - server.register_function(network.main_server, 'main_server') - server.register_function(network.send, 'send') - server.register_function(network.subscribe, 'subscribe') - server.register_function(network.is_connected, 'is_connected') - server.register_function(network.is_up_to_date, 'is_up_to_date') - server.register_function(lambda: setattr(server,'running', False), 'stop') - return server - -def start_daemon(): - pid = os.fork() - if (pid == 0): # The first child. - os.chdir("/") - os.setsid() - os.umask(0) - pid2 = os.fork() - if (pid2 == 0): # Second child - server = start_server() - server.running = True - timeout = 60 - t0 = time.time() - server.socket.settimeout(timeout) - while server.running: - server.handle_request() - t = time.time() - if t - t0 > 0.9*timeout: - break - if not server.network.is_connected(): - break - t0 = t - sys.exit(0) - time.sleep(2) if __name__ == '__main__': diff --git a/lib/__init__.py b/lib/__init__.py index ddf27d559..d01b64572 100644 --- a/lib/__init__.py +++ b/lib/__init__.py @@ -14,3 +14,4 @@ from plugins import BasePlugin from mnemonic import mn_encode as mnemonic_encode from mnemonic import mn_decode as mnemonic_decode from commands import Commands, known_commands +from daemon import start_daemon, NetworkProxy diff --git a/lib/daemon.py b/lib/daemon.py new file mode 100644 index 000000000..b26ef88a0 --- /dev/null +++ b/lib/daemon.py @@ -0,0 +1,307 @@ +#!/usr/bin/env python +# +# Electrum - lightweight Bitcoin client +# Copyright (C) 2014 Thomas Voegtlin +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import socket +import select +import time +import sys +import os +import threading +import traceback +import json +import Queue +from network import Network + + + +class NetworkProxy(threading.Thread): + # connects to daemon + # sends requests, runs callbacks + + def __init__(self, config): + threading.Thread.__init__(self) + self.daemon = True + self.config = config + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.connect(('', 8000)) + self.message_id = 0 + self.unanswered_requests = {} + self.subscriptions = {} + self.debug = True + self.lock = threading.Lock() + + + def parse_json(self, message): + s = message.find('\n') + if s==-1: + return None, message + j = json.loads( message[0:s] ) + return j, message[s+1:] + + + def run(self): + # read responses and trigger callbacks + message = '' + while True: + try: + data = self.socket.recv(1024) + except: + data = '' + if not data: + break + + message += data + while True: + response, message = self.parse_json(message) + if response is not None: + self.process(response) + else: + break + + print "NetworkProxy: exiting" + + + def process(self, response): + # runs callbacks + #print "<--", response + + msg_id = response.get('id') + with self.lock: + method, params, callback = self.unanswered_requests.pop(msg_id) + + result = response.get('result') + callback({'method':method, 'params':params, 'result':result, 'id':msg_id}) + + + def send(self, messages, callback): + # detect if it is a subscription + with self.lock: + if self.subscriptions.get(callback) is None: + self.subscriptions[callback] = [] + for message in messages: + if message not in self.subscriptions[callback]: + self.subscriptions[callback].append(message) + + self.do_send( messages, callback ) + + + def do_send(self, messages, callback): + """return the ids of the requests that we sent""" + out = '' + ids = [] + for m in messages: + method, params = m + request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } ) + self.unanswered_requests[self.message_id] = method, params, callback + ids.append(self.message_id) + # print "-->", request + self.message_id += 1 + out += request + '\n' + while out: + sent = self.socket.send( out ) + out = out[sent:] + return ids + + + def synchronous_get(self, requests, timeout=100000000): + queue = Queue.Queue() + ids = self.do_send(requests, queue.put) + id2 = ids[:] + res = {} + while ids: + r = queue.get(True, timeout) + _id = r.get('id') + if _id in ids: + ids.remove(_id) + res[_id] = r.get('result') + out = [] + for _id in id2: + out.append(res[_id]) + return out + + + def get_servers(self): + return self.synchronous_get([('network.getservers',[])])[0] + + def stop(self): + return self.synchronous_get([('network.shutdown',[])])[0] + + + + + + +class ClientThread(threading.Thread): + # read messages from client (socket), and sends them to Network + # responses are sent back on the same socket + + def __init__(self, server, network, socket): + threading.Thread.__init__(self) + self.server = server + self.daemon = True + self.s = socket + self.s.settimeout(0.1) + self.network = network + self.queue = Queue.Queue() + self.unanswered_requests = {} + + + def run(self): + message = '' + while True: + self.send_responses() + try: + data = self.s.recv(1024) + except socket.timeout: + continue + + if not data: + break + message += data + + while True: + cmd, message = self.parse_json(message) + if not cmd: + break + self.process(cmd) + + #print "client thread terminating" + + + def parse_json(self, message): + n = message.find('\n') + if n==-1: + return None, message + j = json.loads( message[0:n] ) + return j, message[n+1:] + + + def process(self, request): + #print "<--", request + method = request['method'] + params = request['params'] + _id = request['id'] + + if method.startswith('network.'): + if method == 'network.shutdown': + self.server.running = False + r = {'id':_id, 'result':True} + elif method == 'network.getservers': + servers = self.network.get_servers() + r = {'id':_id, 'result':servers} + else: + r = {'id':_id, 'error':'unknown method'} + self.queue.put(r) + return + + def cb(i,r): + _id = r.get('id') + if _id is not None: + my_id = self.unanswered_requests.pop(_id) + r['id'] = my_id + self.queue.put(r) + + new_id = self.network.interface.send([(method, params)], cb) [0] + self.unanswered_requests[new_id] = _id + + + def send_responses(self): + while True: + try: + r = self.queue.get_nowait() + except Queue.Empty: + break + out = json.dumps(r) + '\n' + while out: + n = self.s.send(out) + out = out[n:] + #print "-->", r + + +#Server: +# start network() object +# accept connections, forward requests + + +class NetworkServer: + + def __init__(self, config): + network = Network(config) + if not network.start(wait=True): + print_msg("Not connected, aborting.") + sys.exit(1) + self.network = network + self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.server.bind(('', 8000)) + self.server.listen(5) + self.server.settimeout(1) + self.running = False + self.timeout = 60 + + + def main_loop(self): + self.running = True + t = time.time() + while self.running: + try: + connection, address = self.server.accept() + except socket.timeout: + if time.time() - t > self.timeout: + break + continue + t = time.time() + client = ClientThread(self, self.network, connection) + client.start() + #print "Done." + + + + +def start_daemon(config): + pid = os.fork() + if (pid == 0): # The first child. + os.chdir("/") + os.setsid() + os.umask(0) + pid2 = os.fork() + if (pid2 == 0): # Second child + server = NetworkServer(config) + try: + server.main_loop() + except KeyboardInterrupt: + print "Ctrl C - Stopping server" + sys.exit(1) + + sys.exit(0) + + # should use a signal + time.sleep(2) + + + +if __name__ == '__main__': + import simple_config + config = simple_config.SimpleConfig({'verbose':True, 'server':'ecdsa.net:50002:s'}) + server = NetworkServer(config) + try: + server.main_loop() + except KeyboardInterrupt: + print "Ctrl C - Stopping server" + sys.exit(1) diff --git a/lib/network.py b/lib/network.py index aab98acf0..6ac7ec5d4 100644 --- a/lib/network.py +++ b/lib/network.py @@ -413,24 +413,14 @@ class Network(threading.Thread): -class NetworkProxy: - # interface to the network object. - # handle subscriptions and callbacks - # the network object can be jsonrpc server - def __init__(self, network): - self.network = network - - - - if __name__ == "__main__": - import simple_config - config = simple_config.SimpleConfig({'verbose':True, 'server':'ecdsa.org:50002:s'}) - network = Network(config) + network = NetworkProxy({}) network.start() + print network.get_servers() - while 1: - time.sleep(1) - - + q = Queue.Queue() + network.send([('blockchain.headers.subscribe',[])], q.put) + while True: + r = q.get(timeout=10000) + print r diff --git a/setup.py b/setup.py index b79f65901..f10e0644d 100644 --- a/setup.py +++ b/setup.py @@ -71,6 +71,7 @@ setup( 'electrum.blockchain', 'electrum.bmp', 'electrum.commands', + 'electrum.daemon', 'electrum.i18n', 'electrum.interface', 'electrum.mnemonic',