Browse Source

Merge branch 'block_server' into develop

master
Neil Booth 8 years ago
parent
commit
b06b090188
  1. 2
      electrumx_server.py
  2. 62
      server/block_processor.py
  3. 1
      server/db.py
  4. 59
      server/protocol.py

2
electrumx_server.py

@ -17,7 +17,7 @@ import traceback
from functools import partial from functools import partial
from server.env import Env from server.env import Env
from server.block_processor import BlockServer from server.protocol import BlockServer
def close_loop(loop): def close_loop(loop):

62
server/block_processor.py

@ -9,18 +9,15 @@
import array import array
import ast
import asyncio import asyncio
import ssl
import struct import struct
import time import time
from bisect import bisect_left from bisect import bisect_left
from collections import defaultdict, namedtuple from collections import defaultdict
from functools import partial from functools import partial
from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY from server.cache import FSCache, UTXOCache, NO_CACHE_ENTRY
from server.daemon import Daemon, DaemonError from server.daemon import Daemon, DaemonError
from server.protocol import ElectrumX, LocalRPC, JSONRPC
from lib.hash import hash_to_str from lib.hash import hash_to_str
from lib.tx import Deserializer from lib.tx import Deserializer
from lib.util import chunks, LoggedClass from lib.util import chunks, LoggedClass
@ -808,60 +805,3 @@ class BlockProcessor(server.db.DB):
Can be positive or negative. Can be positive or negative.
''' '''
return self.mempool.value(hash168) return self.mempool.value(hash168)
class BlockServer(BlockProcessor):
'''Like BlockProcessor but also starts servers when caught up.'''
def __init__(self, env):
'''on_update is awaitable, and called only when caught up with the
daemon and a new block arrives or the mempool is updated.'''
super().__init__(env)
self.servers = []
async def caught_up(self, mempool_hashes):
await super().caught_up(mempool_hashes)
if not self.servers:
await self.start_servers()
ElectrumX.notify(self.height, self.touched)
async def start_servers(self):
'''Start listening on RPC, TCP and SSL ports.
Does not start a server if the port wasn't specified.
'''
env = self.env
loop = asyncio.get_event_loop()
JSONRPC.init(self, self.daemon, self.coin)
protocol = LocalRPC
if env.rpc_port is not None:
host = 'localhost'
rpc_server = loop.create_server(protocol, host, env.rpc_port)
self.servers.append(await rpc_server)
self.logger.info('RPC server listening on {}:{:d}'
.format(host, env.rpc_port))
protocol = partial(ElectrumX, env)
if env.tcp_port is not None:
tcp_server = loop.create_server(protocol, env.host, env.tcp_port)
self.servers.append(await tcp_server)
self.logger.info('TCP server listening on {}:{:d}'
.format(env.host, env.tcp_port))
if env.ssl_port is not None:
# FIXME: update if we want to require Python >= 3.5.3
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_context.load_cert_chain(env.ssl_certfile,
keyfile=env.ssl_keyfile)
ssl_server = loop.create_server(protocol, env.host, env.ssl_port,
ssl=ssl_context)
self.servers.append(await ssl_server)
self.logger.info('SSL server listening on {}:{:d}'
.format(env.host, env.ssl_port))
def stop(self):
'''Close the listening servers.'''
for server in self.servers:
server.close()

1
server/db.py

@ -7,6 +7,7 @@
'''Interface to the blockchain database.''' '''Interface to the blockchain database.'''
import array import array
import ast import ast
import os import os

59
server/protocol.py

@ -11,10 +11,12 @@
import asyncio import asyncio
import codecs import codecs
import json import json
import ssl
import traceback import traceback
from collections import namedtuple from collections import namedtuple
from functools import partial from functools import partial
from server.block_processor import BlockProcessor
from server.daemon import DaemonError from server.daemon import DaemonError
from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash from lib.hash import sha256, double_sha256, hash_to_str, hex_str_to_hash
from lib.util import LoggedClass from lib.util import LoggedClass
@ -30,6 +32,63 @@ def json_notification(method, params):
return {'id': None, 'method': method, 'params': params} return {'id': None, 'method': method, 'params': params}
class BlockServer(BlockProcessor):
'''Like BlockProcessor but also starts servers when caught up.'''
def __init__(self, env):
'''on_update is awaitable, and called only when caught up with the
daemon and a new block arrives or the mempool is updated.'''
super().__init__(env)
self.servers = []
async def caught_up(self, mempool_hashes):
await super().caught_up(mempool_hashes)
if not self.servers:
await self.start_servers()
ElectrumX.notify(self.height, self.touched)
async def start_servers(self):
'''Start listening on RPC, TCP and SSL ports.
Does not start a server if the port wasn't specified.
'''
env = self.env
loop = asyncio.get_event_loop()
JSONRPC.init(self, self.daemon, self.coin)
protocol = LocalRPC
if env.rpc_port is not None:
host = 'localhost'
rpc_server = loop.create_server(protocol, host, env.rpc_port)
self.servers.append(await rpc_server)
self.logger.info('RPC server listening on {}:{:d}'
.format(host, env.rpc_port))
protocol = partial(ElectrumX, env)
if env.tcp_port is not None:
tcp_server = loop.create_server(protocol, env.host, env.tcp_port)
self.servers.append(await tcp_server)
self.logger.info('TCP server listening on {}:{:d}'
.format(env.host, env.tcp_port))
if env.ssl_port is not None:
# FIXME: update if we want to require Python >= 3.5.3
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
ssl_context.load_cert_chain(env.ssl_certfile,
keyfile=env.ssl_keyfile)
ssl_server = loop.create_server(protocol, env.host, env.ssl_port,
ssl=ssl_context)
self.servers.append(await ssl_server)
self.logger.info('SSL server listening on {}:{:d}'
.format(env.host, env.ssl_port))
def stop(self):
'''Close the listening servers.'''
for server in self.servers:
server.close()
AsyncTask = namedtuple('AsyncTask', 'session job') AsyncTask = namedtuple('AsyncTask', 'session job')
class SessionManager(LoggedClass): class SessionManager(LoggedClass):

Loading…
Cancel
Save