From 8bd79d6b79590be789eba57a432b0d5679ffa3e1 Mon Sep 17 00:00:00 2001 From: Benjamin Tissoires Date: Fri, 26 Jan 2018 11:20:36 +0100 Subject: [PATCH] kete: add a prompt for interactive commands Only list and listen commands are currently implemented. The Ctrl-C handling has been a little bit tricky. The default GLib mainloop tends to add its own SIGINT handler, which prevents us to gracefully handle the KeyboardInterrupt exception during cmdloop(). So we need to create the mainloop in TuhiKeteShellWorker directly, but bypassing the GLib.Mainloop() python facility. --- tools/tuhi-kete.py | 134 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 131 insertions(+), 3 deletions(-) diff --git a/tools/tuhi-kete.py b/tools/tuhi-kete.py index 3ba0d03..589d6b9 100755 --- a/tools/tuhi-kete.py +++ b/tools/tuhi-kete.py @@ -14,11 +14,13 @@ from gi.repository import GObject, Gio, GLib import sys import argparse +import cmd import os import json import logging import re import select +import threading import time import svgwrite @@ -189,7 +191,7 @@ class TuhiKeteManager(_DBusObject): None, self._on_name_vanished) - self.mainloop = GObject.MainLoop() + self.mainloop = None self._devices = {} self._pairable_devices = {} for objpath in self.property('Devices'): @@ -213,6 +215,9 @@ class TuhiKeteManager(_DBusObject): self._pairable_devices = {} def run(self): + if self.mainloop is None: + self.mainloop = GObject.MainLoop() + try: self.mainloop.run() except KeyboardInterrupt: @@ -220,7 +225,8 @@ class TuhiKeteManager(_DBusObject): self.mainloop.quit() def quit(self): - self.mainloop.quit() + if self.mainloop is not None: + self.mainloop.quit() def _on_properties_changed(self, proxy, changed_props, invalidated_props): if changed_props is None: @@ -265,6 +271,10 @@ class TuhiKeteManager(_DBusObject): pass +class Args(object): + pass + + class Worker(GObject.Object): """Implements a command to be executed. Subclasses need to overwrite run() that will be executed @@ -480,6 +490,118 @@ class Printer(Worker): print(d) +class TuhiKeteShell(cmd.Cmd): + intro = 'Tuhi shell control' + prompt = 'tuhi> ' + + def __init__(self, manager, completekey='tab', stdin=None, stdout=None): + super(TuhiKeteShell, self).__init__(completekey, stdin, stdout) + self._manager = manager + self._workers = [] + + def emptyline(self): + # make sure we do not re-enter the last typed command + pass + + def do_EOF(self, arg): + '''leave the shell''' + print('\n\r', end='') # to remove the appended weird char + return self.do_exit(arg) + + def do_exit(self, args): + '''leave the shell''' + for worker in self._workers: + worker.stop() + return True + + def run(self, init=None): + try: + self.cmdloop(init) + except KeyboardInterrupt as e: + print("^C") + self.run('') + + def start_worker(self, worker_class, args=None): + worker = worker_class(self._manager, args) + worker.run() + self._workers.append(worker) + + def do_list(self, arg): + '''list known devices''' + self.start_worker(Printer) + + _listen_usage = 'Usage: listen 12:34:56:AB:CD:EF [on|off]' + def do_listen(self, args): + '''Listen to a specific device: + Usage: listen 12:34:56:AB:CD:EF [on|off]''' + + if args is '': + print(self._listen_usage) + return + + args = args.split(' ') + address = args[0] + try: + mode = args[1] + except IndexError: + mode = 'on' + + if mode != 'on' and mode != 'off': + print(self._listen_usage) + return + + for d in self._manager.devices: + if d.address == address: + if mode == 'on' and d.listening: + print(f'Already listening on {address}') + return + elif mode == 'off' and not d.listening: + print(f'Not listening on {address}') + return + break + else: + print(f'Device {address} not found') + return + + if mode == 'off': + for worker in [w for w in self._workers if isinstance(w, Listener)]: + if worker.device.address == address: + worker.stop() + self._workers.remove(worker) + break + return + + wargs = Args() + wargs.address = address + self.start_worker(Listener, wargs) + + +class TuhiKeteShellWorker(Worker): + def __init__(self, manager, args): + super(TuhiKeteShellWorker, self).__init__(manager) + + def start_mainloop(self): + # we can not call GLib.MainLoop() here or it will install a unix signal + # handler for SIGINT, and we will not be able to catch + # KeyboardInterrupt in cmdloop() + mainloop = GLib.MainLoop.new(None, False) + + mainloop.run() + + def start(self): + self._glib_thread = threading.Thread(target=self.start_mainloop) + self._glib_thread.daemon = True + self._glib_thread.start() + + self.run() + + self.stop() + + def run(self): + self._shell = TuhiKeteShell(self.manager) + self._shell.run() + + def parse_list(parser): sub = parser.add_parser('list', help='list known devices') sub.set_defaults(worker=Printer) @@ -515,6 +637,11 @@ def parse_fetch(parser): sub.set_defaults(worker=Fetcher) +def parse_shell(parser): + sub = parser.add_parser('shell', help='run a bash-like shell') + sub.set_defaults(worker=TuhiKeteShellWorker) + + def parse(args): desc = 'Commandline client to the Tuhi DBus daemon' parser = argparse.ArgumentParser(description=desc) @@ -528,6 +655,7 @@ def parse(args): parse_pair(subparser) parse_listen(subparser) parse_fetch(subparser) + parse_shell(subparser) return parser.parse_args(args[1:]) @@ -538,7 +666,7 @@ def main(args): logger.setLevel(logging.DEBUG) if not hasattr(args, 'worker'): - args.worker = Printer + args.worker = TuhiKeteShellWorker try: with TuhiKeteManager() as mgr: