kete: add a prompt for interactive commands

Only list and listen commands are currently implemented.

The Ctrl-C handling has been a little bit tricky. The default GLib
mainloop tends to add its own SIGINT handler, which prevents us to
gracefully handle the KeyboardInterrupt exception during cmdloop().

So we need to create the mainloop in TuhiKeteShellWorker directly,
but bypassing the GLib.Mainloop() python facility.
This commit is contained in:
Benjamin Tissoires 2018-01-26 11:20:36 +01:00 committed by Peter Hutterer
parent e1e5a9357b
commit 8bd79d6b79
1 changed files with 131 additions and 3 deletions

View File

@ -14,11 +14,13 @@
from gi.repository import GObject, Gio, GLib from gi.repository import GObject, Gio, GLib
import sys import sys
import argparse import argparse
import cmd
import os import os
import json import json
import logging import logging
import re import re
import select import select
import threading
import time import time
import svgwrite import svgwrite
@ -189,7 +191,7 @@ class TuhiKeteManager(_DBusObject):
None, None,
self._on_name_vanished) self._on_name_vanished)
self.mainloop = GObject.MainLoop() self.mainloop = None
self._devices = {} self._devices = {}
self._pairable_devices = {} self._pairable_devices = {}
for objpath in self.property('Devices'): for objpath in self.property('Devices'):
@ -213,6 +215,9 @@ class TuhiKeteManager(_DBusObject):
self._pairable_devices = {} self._pairable_devices = {}
def run(self): def run(self):
if self.mainloop is None:
self.mainloop = GObject.MainLoop()
try: try:
self.mainloop.run() self.mainloop.run()
except KeyboardInterrupt: except KeyboardInterrupt:
@ -220,6 +225,7 @@ class TuhiKeteManager(_DBusObject):
self.mainloop.quit() self.mainloop.quit()
def quit(self): def quit(self):
if self.mainloop is not None:
self.mainloop.quit() self.mainloop.quit()
def _on_properties_changed(self, proxy, changed_props, invalidated_props): def _on_properties_changed(self, proxy, changed_props, invalidated_props):
@ -265,6 +271,10 @@ class TuhiKeteManager(_DBusObject):
pass pass
class Args(object):
pass
class Worker(GObject.Object): class Worker(GObject.Object):
"""Implements a command to be executed. """Implements a command to be executed.
Subclasses need to overwrite run() that will be executed Subclasses need to overwrite run() that will be executed
@ -480,6 +490,118 @@ class Printer(Worker):
print(d) print(d)
class TuhiKeteShell(cmd.Cmd):
intro = 'Tuhi shell control'
prompt = 'tuhi> '
def __init__(self, manager, completekey='tab', stdin=None, stdout=None):
super(TuhiKeteShell, self).__init__(completekey, stdin, stdout)
self._manager = manager
self._workers = []
def emptyline(self):
# make sure we do not re-enter the last typed command
pass
def do_EOF(self, arg):
'''leave the shell'''
print('\n\r', end='') # to remove the appended weird char
return self.do_exit(arg)
def do_exit(self, args):
'''leave the shell'''
for worker in self._workers:
worker.stop()
return True
def run(self, init=None):
try:
self.cmdloop(init)
except KeyboardInterrupt as e:
print("^C")
self.run('')
def start_worker(self, worker_class, args=None):
worker = worker_class(self._manager, args)
worker.run()
self._workers.append(worker)
def do_list(self, arg):
'''list known devices'''
self.start_worker(Printer)
_listen_usage = 'Usage: listen 12:34:56:AB:CD:EF [on|off]'
def do_listen(self, args):
'''Listen to a specific device:
Usage: listen 12:34:56:AB:CD:EF [on|off]'''
if args is '':
print(self._listen_usage)
return
args = args.split(' ')
address = args[0]
try:
mode = args[1]
except IndexError:
mode = 'on'
if mode != 'on' and mode != 'off':
print(self._listen_usage)
return
for d in self._manager.devices:
if d.address == address:
if mode == 'on' and d.listening:
print(f'Already listening on {address}')
return
elif mode == 'off' and not d.listening:
print(f'Not listening on {address}')
return
break
else:
print(f'Device {address} not found')
return
if mode == 'off':
for worker in [w for w in self._workers if isinstance(w, Listener)]:
if worker.device.address == address:
worker.stop()
self._workers.remove(worker)
break
return
wargs = Args()
wargs.address = address
self.start_worker(Listener, wargs)
class TuhiKeteShellWorker(Worker):
def __init__(self, manager, args):
super(TuhiKeteShellWorker, self).__init__(manager)
def start_mainloop(self):
# we can not call GLib.MainLoop() here or it will install a unix signal
# handler for SIGINT, and we will not be able to catch
# KeyboardInterrupt in cmdloop()
mainloop = GLib.MainLoop.new(None, False)
mainloop.run()
def start(self):
self._glib_thread = threading.Thread(target=self.start_mainloop)
self._glib_thread.daemon = True
self._glib_thread.start()
self.run()
self.stop()
def run(self):
self._shell = TuhiKeteShell(self.manager)
self._shell.run()
def parse_list(parser): def parse_list(parser):
sub = parser.add_parser('list', help='list known devices') sub = parser.add_parser('list', help='list known devices')
sub.set_defaults(worker=Printer) sub.set_defaults(worker=Printer)
@ -515,6 +637,11 @@ def parse_fetch(parser):
sub.set_defaults(worker=Fetcher) sub.set_defaults(worker=Fetcher)
def parse_shell(parser):
sub = parser.add_parser('shell', help='run a bash-like shell')
sub.set_defaults(worker=TuhiKeteShellWorker)
def parse(args): def parse(args):
desc = 'Commandline client to the Tuhi DBus daemon' desc = 'Commandline client to the Tuhi DBus daemon'
parser = argparse.ArgumentParser(description=desc) parser = argparse.ArgumentParser(description=desc)
@ -528,6 +655,7 @@ def parse(args):
parse_pair(subparser) parse_pair(subparser)
parse_listen(subparser) parse_listen(subparser)
parse_fetch(subparser) parse_fetch(subparser)
parse_shell(subparser)
return parser.parse_args(args[1:]) return parser.parse_args(args[1:])
@ -538,7 +666,7 @@ def main(args):
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
if not hasattr(args, 'worker'): if not hasattr(args, 'worker'):
args.worker = Printer args.worker = TuhiKeteShellWorker
try: try:
with TuhiKeteManager() as mgr: with TuhiKeteManager() as mgr: