Browse Source

pylightning: Add notification subscription handlers

Just like we added the RPC methods, the notification handlers can also
be registered using a function decorator, and we auto-subscribe when
asked for a manifest.

Signed-off-by: Christian Decker <decker.christian@gmail.com>
plugin-7
Christian Decker 6 years ago
parent
commit
eee1e750e1
No known key found for this signature in database GPG Key ID: 1416D83DC4F0E86D
  1. 104
      contrib/pylightning/lightning/plugin.py

104
contrib/pylightning/lightning/plugin.py

@ -18,6 +18,9 @@ class Plugin(object):
self.methods = {} self.methods = {}
self.options = {} self.options = {}
# A dict from topics to handler functions
self.subscriptions = {}
if not stdout: if not stdout:
self.stdout = sys.stdout self.stdout = sys.stdout
if not stdin: if not stdin:
@ -60,6 +63,33 @@ class Plugin(object):
# Register the function with the name # Register the function with the name
self.methods[name] = func self.methods[name] = func
def add_subscription(self, topic, func):
"""Add a subscription to our list of subscriptions.
A subscription is an association between a topic and a handler
function. Adding a subscription means that we will
automatically subscribe to events from that topic with
`lightningd` and, upon receiving a matching notification, we
will call the associated handler. Notice that in order for the
automatic subscriptions to work, the handlers need to be
registered before we send our manifest, hence before
`Plugin.run` is called.
"""
if topic in self.subscriptions:
raise ValueError(
"Topic {} already has a handler".format(topic)
)
self.subscriptions[topic] = func
def subscribe(self, topic):
"""Function decorator to register a notification handler.
"""
def decorator(f):
self.add_subscription(topic, f)
return f
return decorator
def add_option(self, name, default, description): def add_option(self, name, default, description):
"""Add an option that we'd like to register with lightningd. """Add an option that we'd like to register with lightningd.
@ -98,17 +128,11 @@ class Plugin(object):
return f return f
return decorator return decorator
def _dispatch(self, request): def _exec_func(self, func, request):
name = request['method']
params = request['params'] params = request['params']
if name not in self.methods:
raise ValueError("No method {} found.".format(name))
args = params.copy() if isinstance(params, list) else [] args = params.copy() if isinstance(params, list) else []
kwargs = params.copy() if isinstance(params, dict) else {} kwargs = params.copy() if isinstance(params, dict) else {}
func = self.methods[name]
sig = inspect.signature(func) sig = inspect.signature(func)
if 'plugin' in sig.parameters: if 'plugin' in sig.parameters:
@ -121,6 +145,43 @@ class Plugin(object):
ba.apply_defaults() ba.apply_defaults()
return func(*ba.args, **ba.kwargs) return func(*ba.args, **ba.kwargs)
def _dispatch_request(self, request):
name = request['method']
if name not in self.methods:
raise ValueError("No method {} found.".format(name))
func = self.methods[name]
try:
result = {
'jsonrpc': '2.0',
'id': request['id'],
'result': self._exec_func(func, request)
}
except Exception as e:
result = {
'jsonrpc': '2.0',
'id': request['id'],
"error": "Error while processing {}: {}".format(
request['method'], repr(e)
),
}
self.log(traceback.format_exc())
json.dump(result, fp=self.stdout)
self.stdout.write('\n\n')
self.stdout.flush()
def _dispatch_notification(self, request):
name = request['method']
if name not in self.subscriptions:
raise ValueError("No subscription for {} found.".format(name))
func = self.subscriptions[name]
try:
self._exec_func(func, request)
except Exception as _:
self.log(traceback.format_exc())
def notify(self, method, params): def notify(self, method, params):
payload = { payload = {
'jsonrpc': '2.0', 'jsonrpc': '2.0',
@ -145,24 +206,14 @@ class Plugin(object):
for payload in msgs[:-1]: for payload in msgs[:-1]:
request = json.loads(payload) request = json.loads(payload)
try: # If this has an 'id'-field, it's a request and returns a
result = { # result. Otherwise it's a notification and it doesn't
"jsonrpc": "2.0", # return anything.
"result": self._dispatch(request), if 'id' in request:
"id": request['id'] self._dispatch_request(request)
} else:
except Exception as e: self._dispatch_notification(request)
result = {
"jsonrpc": "2.0",
"error": "Error while processing {}".format(
request['method']
),
"id": request['id']
}
self.log(traceback.format_exc())
json.dump(result, fp=self.stdout)
self.stdout.write('\n\n')
self.stdout.flush()
return msgs[-1] return msgs[-1]
def run(self): def run(self):
@ -204,6 +255,7 @@ class Plugin(object):
return { return {
'options': list(self.options.values()), 'options': list(self.options.values()),
'rpcmethods': methods, 'rpcmethods': methods,
'subscriptions': list(self.subscriptions.keys()),
} }
def _init(self, options, configuration, request): def _init(self, options, configuration, request):
@ -217,7 +269,7 @@ class Plugin(object):
if self.init: if self.init:
self.methods['init'] = self.init self.methods['init'] = self.init
self.init = None self.init = None
return self._dispatch(request) return self._exec_func(self.methods['init'], request)
return None return None

Loading…
Cancel
Save