From cd84de4f3275d8b3b586b511c23996db5b16a10e Mon Sep 17 00:00:00 2001 From: Peter Hutterer Date: Wed, 28 Aug 2019 09:52:16 +1000 Subject: [PATCH] tools: use the shared dbus client bindings for kete Since most of the signals are propagated by the bindings we largely just need to hook up simple methods to log what is happening. One nasty hack: we import some of the bits into the kete namespace so tuhi-live continues to work. This can be fixed up in a follow-up commit. Signed-off-by: Peter Hutterer --- tools/kete.py | 387 +++++++------------------------------------------- 1 file changed, 48 insertions(+), 339 deletions(-) diff --git a/tools/kete.py b/tools/kete.py index dfaa327..f3e60cc 100755 --- a/tools/kete.py +++ b/tools/kete.py @@ -20,7 +20,6 @@ import errno import os import json import logging -import re import readline import struct import threading @@ -31,12 +30,17 @@ from pathlib import Path try: from tuhi.svg import JsonSvg + import tuhi.dbusclient except ModuleNotFoundError: # If PYTHONPATH isn't set up or we never installed Tuhi, the module # isn't available. And since we don't install kete, we can assume that # we're still in the git repo, so messing with the path is "fine". sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)) + '/..') # noqa from tuhi.svg import JsonSvg + import tuhi.dbusclient + + # get those into our namespace for tuhi-live's beneft + from tuhi.dbusclient import TUHI_DBUS_NAME, ROOT_PATH, ORG_FREEDESKTOP_TUHI1_MANAGER CONFIG_PATH = Path(xdg.BaseDirectory.xdg_data_home, 'tuhi-kete') @@ -98,13 +102,6 @@ logger = logging.getLogger('tuhi-kete') logger.addHandler(logger_handler) logger.setLevel(logging.INFO) -TUHI_DBUS_NAME = 'org.freedesktop.tuhi1' -ORG_FREEDESKTOP_TUHI1_MANAGER = 'org.freedesktop.tuhi1.Manager' -ORG_FREEDESKTOP_TUHI1_DEVICE = 'org.freedesktop.tuhi1.Device' -ROOT_PATH = '/org/freedesktop/tuhi1' - -ORG_BLUEZ_DEVICE1 = 'org.bluez.Device1' - # remove ':' from the completer delimiters of readline so we can match on # device addresses completer_delims = readline.get_completer_delims() @@ -118,340 +115,52 @@ def b2hex(bs): return ' '.join([''.join(s) for s in zip(hx[::2], hx[1::2])]) -class DBusError(Exception): - def __init__(self, message): - self.message = message - - -class _DBusObject(GObject.Object): - _connection = None - - def __init__(self, name, interface, objpath): - GObject.GObject.__init__(self) - - if _DBusObject._connection is None: - self._connect_to_session() - - self.interface = interface - self.objpath = objpath - - try: - self.proxy = Gio.DBusProxy.new_sync(self._connection, - Gio.DBusProxyFlags.NONE, None, - name, objpath, interface, None) - except GLib.Error as e: - if (e.domain == 'g-io-error-quark' and - e.code == Gio.IOErrorEnum.DBUS_ERROR): - raise DBusError(e.message) - else: - raise e - - if self.proxy.get_name_owner() is None: - raise DBusError(f'No-one is handling {name}, is the daemon running?') - - self.proxy.connect('g-properties-changed', self._on_properties_changed) - self.proxy.connect('g-signal', self._on_signal_received) - - def _connect_to_session(self): - try: - _DBusObject._connection = Gio.bus_get_sync(Gio.BusType.SESSION, None) - except GLib.Error as e: - if (e.domain == 'g-io-error-quark' and - e.code == Gio.IOErrorEnum.DBUS_ERROR): - raise DBusError(e.message) - else: - raise e - - def _on_properties_changed(self, proxy, changed_props, invalidated_props): - # Implement this in derived classes to respond to property changes - pass - - def _on_signal_received(self, proxy, sender, signal, parameters): - # Implement this in derived classes to respond to signals - pass - - def property(self, name): - p = self.proxy.get_cached_property(name) - if p is not None: - return p.unpack() - return p - - def terminate(self): - del(self.proxy) - - -class _DBusSystemObject(_DBusObject): - ''' - Same as the _DBusObject, but connects to the system bus instead - ''' - def __init__(self, name, interface, objpath): - self._connect_to_system() - super().__init__(name, interface, objpath) - - def _connect_to_system(self): - try: - self._connection = Gio.bus_get_sync(Gio.BusType.SYSTEM, None) - except GLib.Error as e: - if (e.domain == 'g-io-error-quark' and - e.code == Gio.IOErrorEnum.DBUS_ERROR): - raise DBusError(e.message) - else: - raise e - - -class BlueZDevice(_DBusSystemObject): - def __init__(self, objpath): - super().__init__('org.bluez', ORG_BLUEZ_DEVICE1, objpath) - self.proxy.connect('g-properties-changed', self._on_properties_changed) - - @GObject.Property - def connected(self): - return self.proxy.get_cached_property('Connected').unpack() - - def _on_properties_changed(self, obj, properties, invalidated_properties): - properties = properties.unpack() - - if 'Connected' in properties: - self.notify('connected') - - -class TuhiKeteDevice(_DBusObject): - def __init__(self, manager, objpath): - _DBusObject.__init__(self, TUHI_DBUS_NAME, - ORG_FREEDESKTOP_TUHI1_DEVICE, - objpath) - self.manager = manager - self.is_registering = False - self.live = False - self._bluez_device = BlueZDevice(self.property('BlueZDevice')) - self._bluez_device.connect('notify::connected', self._on_connected) - - @classmethod - def is_device_address(cls, string): - if re.match(r'[0-9a-f]{2}(:[0-9a-f]{2}){5}$', string.lower()): - return string - raise argparse.ArgumentTypeError(f'"{string}" is not a valid device address') - - @GObject.Property - def address(self): - return self._bluez_device.property('Address') - - @GObject.Property - def name(self): - return self._bluez_device.property('Name') - - @GObject.Property - def listening(self): - return self.property('Listening') - - @GObject.Property - def drawings_available(self): - return self.property('DrawingsAvailable') - - @GObject.Property - def battery_percent(self): - return self.property('BatteryPercent') - - @GObject.Property - def battery_state(self): - return self.property('BatteryState') - - @GObject.Property - def connected(self): - return self._bluez_device.connected - - def _on_connected(self, bluez_device, pspec): - self.notify('connected') - - def register(self): - logger.debug(f'{self}: Register') - # FIXME: Register() doesn't return anything useful yet, so we wait until - # the device is in the Manager's Devices property - self.s1 = self.manager.connect('notify::devices', self._on_mgr_devices_updated) - self.is_registering = True - self.proxy.Register() - - def start_listening(self): - self.proxy.StartListening() - - def stop_listening(self): - try: - self.proxy.StopListening() - except GLib.Error as e: - if (e.domain != 'g-dbus-error-quark' or - e.code != Gio.IOErrorEnum.EXISTS or - Gio.dbus_error_get_remote_error(e) != 'org.freedesktop.DBus.Error.ServiceUnknown'): - raise e - - def start_live(self, fd): - fd_list = Gio.UnixFDList.new() - fd_list.append(fd) - - res, fds = self.proxy.call_with_unix_fd_list_sync('org.freedesktop.tuhi1.Device.StartLive', - GLib.Variant('(h)', (fd,)), - Gio.DBusCallFlags.NO_AUTO_START, - -1, - fd_list, - None) - if res[0] == 0: - self.live = True - - def stop_live(self): - self.proxy.StopLive() - self.live = False - - def json(self, timestamp): - SUPPORTED_FILE_FORMAT = 1 - return self.proxy.GetJSONData('(ut)', SUPPORTED_FILE_FORMAT, timestamp) - - def _on_signal_received(self, proxy, sender, signal, parameters): - if signal == 'ButtonPressRequired': - logger.info(f'{self}: Press button on device now') - elif signal == 'ListeningStopped': - err = parameters[0] - if err == -errno.EACCES: - logger.error(f'{self}: wrong device, please re-register.') - elif err < 0: - logger.error(f'{self}: an error occured: {os.strerror(-err)}') - self.notify('listening') - elif signal == 'SyncState': - state = parameters[0] - if state: - logger.debug(f'{self}: Downloading from device') - else: - logger.debug(f'{self}: Download done') - - def _on_properties_changed(self, proxy, changed_props, invalidated_props): - if changed_props is None: - return - - changed_props = changed_props.unpack() - - if 'DrawingsAvailable' in changed_props: - self.notify('drawings-available') - elif 'Listening' in changed_props: - self.notify('listening') - elif 'BatteryPercent' in changed_props: - self.notify('battery-percent') - elif 'BatteryState' in changed_props: - self.notify('battery-state') - - def __repr__(self): - return f'{self.address} - {self.name}' - - def _on_mgr_devices_updated(self, manager, pspec): - if not self.is_registering: - return - - for d in manager.devices: - if d.address == self.address: - self.is_registering = False - self.manager.disconnect(self.s1) - del(self.s1) - logger.info(f'{self}: Registration successful') - - def terminate(self): - try: - self.manager.disconnect(self.s1) - except AttributeError: - pass - self._bluez_device.terminate() - super(TuhiKeteDevice, self).terminate() - - -class TuhiKeteManager(_DBusObject): - __gsignals__ = { - 'unregistered-device': - (GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_PYOBJECT,)), - } - +class TuhiKeteManager(tuhi.dbusclient.TuhiDBusClientManager): def __init__(self): - _DBusObject.__init__(self, TUHI_DBUS_NAME, - ORG_FREEDESKTOP_TUHI1_MANAGER, - ROOT_PATH) + super().__init__() + self.connect('unregistered_device', self._on_unregistered_device) - self._devices = {} - self._unregistered_devices = {} + self.sigs = {} + for d in self.devices: + self.sigs[d] = [] + self._connect_device(d) - for objpath in self.property('Devices'): - device = TuhiKeteDevice(self, objpath) - self._devices[device.address] = device - - @GObject.Property - def devices(self): - return [v for k, v in self._devices.items()] - - @GObject.Property - def unregistered_devices(self): - return [v for k, v in self._unregistered_devices.items()] - - @GObject.Property - def searching(self): - return self.proxy.get_cached_property('Searching') - - def start_search(self): - self._unregistered_devices = {} - self.proxy.StartSearch() - - def stop_search(self): + def _disconnect_device_signals(self, device): try: - self.proxy.StopSearch() - except GLib.Error as e: - if (e.domain != 'g-dbus-error-quark' or - e.code != Gio.IOErrorEnum.EXISTS or - Gio.dbus_error_get_remote_error(e) != 'org.freedesktop.DBus.Error.ServiceUnknown'): - raise e - self._unregistered_devices = {} + for s in self.sigs[device]: + device.disconnect(s) + self.sigs[device] = [] + except KeyError: + pass - def terminate(self): - for dev in self._devices.values(): - dev.terminate() - self._devices = {} - self._unregistered_devices = {} - super(TuhiKeteManager, self).terminate() + def _on_unregistered_device(self, manager, device): + self._disconnect_device_signals(device) - def _on_properties_changed(self, proxy, changed_props, invalidated_props): - if changed_props is None: - return + def log_press_required(device): + logger.info(f'{device}: Press button on device now') + device.connect('button-press-required', log_press_required) - changed_props = changed_props.unpack() + def log_registered(device): + logger.info(f'{device}: Registration successful') + device.connect('registered', log_registered) + device.connect('registered', self._connect_device) - if 'Devices' in changed_props: - objpaths = changed_props['Devices'] - for objpath in objpaths: - try: - d = self._unregistered_devices[objpath] - self._devices[d.address] = d - del self._unregistered_devices[objpath] - except KeyError: - # if we called Register() on an existing device it's not - # in unregistered devices - pass - self.notify('devices') - if 'Searching' in changed_props: - self.notify('searching') + def _connect_device(self, device): + self._disconnect_device_signals(device) - def _handle_unregistered_device(self, objpath): - for addr, dev in self._devices.items(): - if dev.objpath == objpath: - self.emit('unregistered-device', dev) - return + def log_sync_state(device, pspec): + if device.sync_state: + logger.debug(f'{device}: Communicating with device') + else: + logger.debug(f'{device}: Communication complete') + device.connect('notify::sync-state', log_sync_state) - device = TuhiKeteDevice(self, objpath) - self._unregistered_devices[objpath] = device - - logger.debug(f'New unregistered device: {device}') - self.emit('unregistered-device', device) - - def _on_signal_received(self, proxy, sender, signal, parameters): - if signal == 'SearchStopped': - self.notify('searching') - elif signal == 'UnregisteredDevice': - objpath = parameters[0] - self._handle_unregistered_device(objpath) - - def __getitem__(self, btaddr): - return self._devices[btaddr] + def log_device_error(d, err): + if err == -errno.EACCES: + logger.error(f'{device}: wrong device, please re-register.') + elif err < 0: + logger.error(f'{device}: an error occured: {os.strerror(-err)}') + device.connect('device-error', log_device_error) class Worker(GObject.Object): @@ -743,7 +452,7 @@ class TuhiKeteShell(cmd.Cmd): readline.set_history_length(100) Gio.bus_watch_name(Gio.BusType.SESSION, - TUHI_DBUS_NAME, + tuhi.dbusclient.TUHI_DBUS_NAME, Gio.BusNameWatcherFlags.NONE, self._on_name_appeared, self._on_name_vanished) @@ -875,7 +584,7 @@ class TuhiKeteShell(cmd.Cmd): add_help=False) parser.add_argument('-h', action='help', help=argparse.SUPPRESS) parser.add_argument('address', metavar='12:34:56:AB:CD:EF', - type=TuhiKeteDevice.is_device_address, + type=tuhi.dbusclient.TuhiDBusClientDevice.is_device_address, default=None, help='the address of the device to listen to') parser.add_argument('mode', choices=['on', 'off'], nargs='?', @@ -986,7 +695,7 @@ class TuhiKeteShell(cmd.Cmd): add_help=False) parser.add_argument('-h', action='help', help=argparse.SUPPRESS) parser.add_argument('address', metavar='12:34:56:AB:CD:EF', - type=TuhiKeteDevice.is_device_address, + type=tuhi.dbusclient.TuhiDBusClientDevice.is_device_address, default=None, help='the address of the device to fetch drawing from') parser.add_argument('index', metavar='{|all}', @@ -1083,7 +792,7 @@ class TuhiKeteShell(cmd.Cmd): add_help=False) parser.add_argument('-h', action='help', help=argparse.SUPPRESS) parser.add_argument('address', metavar='12:34:56:AB:CD:EF', - type=TuhiKeteDevice.is_device_address, + type=tuhi.dbusclient.TuhiDBusClientDevice.is_device_address, default=None, help='the address of the device to register') @@ -1137,7 +846,7 @@ class TuhiKeteShell(cmd.Cmd): add_help=False) parser.add_argument('-h', action='help', help=argparse.SUPPRESS) parser.add_argument('address', metavar='12:34:56:AB:CD:EF', - type=TuhiKeteDevice.is_device_address, + type=tuhi.dbusclient.TuhiDBusClientDevice.is_device_address, default=None, nargs='?', help='the address of the device to listen to') @@ -1189,7 +898,7 @@ class TuhiKeteShell(cmd.Cmd): add_help=False) parser.add_argument('-h', action='help', help=argparse.SUPPRESS) parser.add_argument('address', metavar='12:34:56:AB:CD:EF', - type=TuhiKeteDevice.is_device_address, + type=tuhi.dbusclient.TuhiDBusClientDevice.is_device_address, default=None, nargs='?', help='the address of the device to listen to') parser.add_argument('mode', choices=['on', 'off'], nargs='?', @@ -1246,7 +955,7 @@ def main(args): with TuhiKeteShell() as shell: shell.run() - except DBusError as e: + except tuhi.dbusclient.DBusError as e: logger.error(e.message)