Browse Source

lightning channels list: add mock server for testing

dependabot/pip/contrib/deterministic-build/ecdsa-0.13.3
Janus 7 years ago
committed by ThomasV
parent
commit
d055507003
  1. 67
      gui/qt/lightning_channels_list.py

67
gui/qt/lightning_channels_list.py

@ -13,7 +13,6 @@ datatable = OrderedDict([])
class MyTableRow(QtWidgets.QTreeWidgetItem):
def __init__(self, di):
strs = [str(di[mapping[key]]) for key in range(len(mapping))]
print(strs)
super(MyTableRow, self).__init__(strs)
assert isinstance(di, dict)
self.di = di
@ -47,8 +46,8 @@ class LightningChannelsList(QtWidgets.QWidget):
cur = self._tv.currentItem()
channel_point = cur["channel_point"]
def close():
params = [str(channel_point)] + (["--force"] if cur["active"] else [])
lightningCall(lightningRpc, "closechannel")(*params)
params = [str(channel_point)] + (["--force"] if not cur["active"] else []) # TODO test if force is being used correctly
lightningCall(self.lightningRpc, "closechannel")(*params)
menu.addAction("Close channel", close)
menu.exec_(self._tv.viewport().mapToGlobal(position))
def lightningWorkerHandler(self, sourceClassName, obj):
@ -97,10 +96,11 @@ class LightningChannelsList(QtWidgets.QWidget):
timer = QtCore.QTimer(self)
timer.timeout.connect(tick)
timer.start(20000)
timer.start(5000)
lightningWorker.subscribe(self.lightningWorkerHandler)
lightningRpc.subscribe(self.lightningRpcHandler)
self.lightningRpc = lightningRpc
self._tv=QtWidgets.QTreeWidget(self)
self._tv.setHeaderLabels([mapping[i] for i in range(len(mapping))])
@ -140,13 +140,70 @@ class LightningChannelsList(QtWidgets.QWidget):
self.resize(2500,1000)
class MockLightningWorker:
def subscribe(self, handler):
pass
if __name__=="__main__":
import queue, threading, asyncio
from sys import argv, exit
import signal , traceback, os
loop = asyncio.new_event_loop()
async def loopstop():
loop.stop()
def signal_handler(signal, frame):
asyncio.run_coroutine_threadsafe(loopstop(), loop)
signal.signal(signal.SIGINT, signal_handler)
a=QtWidgets.QApplication(argv)
w=LightningChannelsList()
gotReplyHandlerLock = threading.Lock()
gotReplyHandlerLock.acquire()
replyHandler = None
class MockLightningRPC:
def __init__(self, q):
self.queue = q
def subscribe(self, handler):
global replyHandler
replyHandler = handler
gotReplyHandlerLock.release()
q = queue.Queue()
w=LightningChannelsList(None, MockLightningWorker(), MockLightningRPC(q))
w.show()
w.raise_()
async def the_job():
try:
acquired_once = False
while loop.is_running():
try:
cmd = q.get_nowait()
except queue.Empty:
await asyncio.sleep(1)
continue
if not acquired_once:
gotReplyHandlerLock.acquire()
acquired_once = True
if cmd[0] == "listchannels":
#replyHandler("listchannels", Exception("Test exception"))
replyHandler("listchannels", {"channels": [{"channel_point": binascii.hexlify(os.urandom(32)).decode("ascii"), "active": True}]})
elif cmd[0] == "openchannel":
replyHandler("openchannel", {})
else:
print("mock rpc server ignoring", cmd[0])
except:
traceback.print_exc()
def asyncioThread():
loop.create_task(the_job())
loop.run_forever()
threading.Thread(target=asyncioThread).start()
exit(a.exec_())

Loading…
Cancel
Save