From e21f04bb8bc133f15b5fdba7bed828b9a8c0bb55 Mon Sep 17 00:00:00 2001 From: Peter Hutterer Date: Mon, 22 Jan 2018 13:28:35 +1000 Subject: [PATCH] Add tuhi-kete as a CLI interface to tuhi --- tuhi-kete.py | 282 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 282 insertions(+) create mode 100755 tuhi-kete.py diff --git a/tuhi-kete.py b/tuhi-kete.py new file mode 100755 index 0000000..7f7febf --- /dev/null +++ b/tuhi-kete.py @@ -0,0 +1,282 @@ +#!/usr/bin/env python3 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# + +from gi.repository import GObject, Gio, GLib +import sys +import argparse +import logging +import select + +logging.basicConfig(format='%(levelname)s: %(message)s', + level=logging.INFO) +logger = logging.getLogger('tuhi-kete') + +TUHI_DBUS_NAME = 'org.freedesktop.tuhi1' +ORG_FREEDESKTOP_TUHI1_MANAGER = 'org.freedesktop.tuhi1.Manager' +ORG_FREEDESKTOP_TUHI1_DEVICE = 'org.freedesktop.tuhi1.Device' +ROOT_PATH = '/org/freedesktop/tuhi1' + + +class DBusError(Exception): + def __init__(self, message): + self.message = message + + +class _DBusObject(GObject.Object): + _connection = None + + def __init__(self, name, interface, objpath): + GObject.GObject.__init__(self) + + if _DBusObject._connection is None: + self._connect_to_session() + + self.interface = interface + self.objpath = objpath + + try: + self.proxy = Gio.DBusProxy.new_sync(_DBusObject._connection, + Gio.DBusProxyFlags.NONE, None, + name, objpath, interface, None) + except GLib.Error as e: + if (e.domain == 'g-io-error-quark' and + e.code == Gio.IOErrorEnum.DBUS_ERROR): + raise DBusError(e.message) + else: + raise e + + if self.proxy.get_name_owner() is None: + raise DBusError('{} is not owned'.format(name)) + + self.proxy.connect('g-properties-changed', self._on_properties_changed) + self.proxy.connect('g-signal', self._on_signal_received) + + def _connect_to_session(self): + try: + _DBusObject._connection = Gio.bus_get_sync(Gio.BusType.SESSION, None) + except GLib.Error as e: + if (e.domain == 'g-io-error-quark' and + e.code == Gio.IOErrorEnum.DBUS_ERROR): + raise DBusError(e.message) + else: + raise e + + def _on_properties_changed(self, proxy, changed_props, invalidated_props): + # Implement this in derived classes to respond to property changes + pass + + def _on_signal_received(self, proxy, sender, signal, parameters): + # Implement this in derived classes to respond to signals + pass + + def property(self, name): + p = self.proxy.get_cached_property(name) + if p is not None: + return p.unpack() + return p + + +class TuhiKeteDevice(_DBusObject): + def __init__(self, manager, objpath): + _DBusObject.__init__(self, TUHI_DBUS_NAME, + ORG_FREEDESKTOP_TUHI1_DEVICE, + objpath) + self.manager = manager + self.is_pairing = False + + @GObject.Property + def address(self): + return self.property('Address') + + @GObject.Property + def name(self): + return self.property('Name') + + def pair(self): + logger.debug('{}: Pairing'.format(self)) + # FIXME: Pair() doesn't return anything useful yet, so we wait until + # the device is in the Manager's Devices property + self.manager.connect('notify::devices', self._on_mgr_devices_updated) + self.is_pairing = True + self.proxy.Pair() + + def _on_signal_received(self, proxy, sender, signal, parameters): + if signal == 'ButtonPressRequired': + print("{}: Press button on device now".format(self)) + + def __repr__(self): + return '{} - {}'.format(self.address, self.name) + + def _on_mgr_devices_updated(self, manager, pspec): + if not self.is_pairing: + return + + for d in manager.devices: + if d.address == self.address: + self.is_pairing = False + print('{}: Pairing successful'.format(self)) + + +class TuhiKeteManager(_DBusObject): + __gsignals__ = { + "pairable-device": + (GObject.SIGNAL_RUN_FIRST, None, (GObject.TYPE_PYOBJECT,)), + } + + def __init__(self): + _DBusObject.__init__(self, TUHI_DBUS_NAME, + ORG_FREEDESKTOP_TUHI1_MANAGER, + ROOT_PATH) + + self._devices = {} + self._searching = False + for objpath in self.property('Devices'): + device = TuhiKeteDevice(self, objpath) + self._devices[device.address] = device + + @GObject.Property + def devices(self): + return [v for k, v in self._devices.items()] + + @GObject.Property + def searching(self): + return self._searching + + def start_search(self): + self.proxy.StartSearch() + + def stop_search(self): + self.proxy.StopSearch() + + def _on_signal_received(self, proxy, sender, signal, parameters): + if signal == 'SearchStopped': + self._searching = False + self.notify('searching') + elif signal == 'PairableDevice': + objpath = parameters[0] + device = TuhiKeteDevice(self, objpath) + logger.debug('Found pairable device: {}'.format(device)) + self.emit('pairable-device', device) + + def __getitem__(self, btaddr): + return self._devices[btaddr] + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + return self + + +class Searcher(GObject.Object): + def __init__(self, manager, address=None): + GObject.GObject.__init__(self) + self.manager = manager + self.mainloop = GObject.MainLoop() + self.address = address + self.is_pairing = False + + def run(self): + self.manager.connect('notify::searching', self._on_notify_search) + self.manager.connect('pairable-device', self._on_pairable_device) + self.manager.start_search() + logger.debug('Started searching') + try: + self.mainloop.run() + except KeyboardInterrupt: + self.manager.stop_search() + + if self.manager.searching: + logger.debug('Stopping search') + self.manager.stop_search() + + def _on_notify_search(self, manager, pspec): + logger.info('Search timeout') + if not self.is_pairing: + self.mainloop.quit() + + def _on_pairable_device(self, manager, device): + print('Pairable device: {}'.format(device)) + + if self.address is None: + print('Connect to device? [y/N] ', end='') + sys.stdout.flush() + i, o, e = select.select([sys.stdin], [], [], 5) + if i: + answer = sys.stdin.readline().strip() + if answer.lower() == 'y': + self.address = device.address + else: + print('timed out') + + if device.address == self.address: + self.is_pairing = True + device.pair() + + +def print_device(d): + print('{}: {}'.format(d.address, d.name)) + + +def cmd_list(manager, args): + for d in manager.devices: + print_device(d) + + +def cmd_pair(manager, args): + Searcher(manager, args.address).run() + + +def parse_list(parser): + sub = parser.add_parser('list', help='list known devices') + sub.set_defaults(func=cmd_list) + + +def parse_pair(parser): + sub = parser.add_parser('pair', help='pair a new device') + sub.add_argument('address', metavar='12:34:56:AB:CD:EF', type=str, + nargs='?', default=None, + help='the address of the device to pair') + sub.set_defaults(func=cmd_pair) + + +def parse(args): + desc = 'Commandline client to the Tuhi DBus daemon' + parser = argparse.ArgumentParser(description=desc) + parser.add_argument('-v', '--verbose', + help='Show some debugging informations', + action='store_true', + default=False) + + subparser = parser.add_subparsers(help='Available commands') + parse_list(subparser) + parse_pair(subparser) + + return parser.parse_args(args[1:]) + + +def main(args): + args = parse(args) + if args.verbose: + logger.setLevel(logging.DEBUG) + + try: + with TuhiKeteManager() as mgr: + args.func(mgr, args) + + except DBusError as e: + logger.error(e.message) + + +if __name__ == "__main__": + main(sys.argv)