diff --git a/tools/kete.py b/tools/kete.py index dfaa327..f3e60cc 100755 --- a/tools/kete.py +++ b/tools/kete.py @@ -20,7 +20,6 @@ import errno import os import json import logging -import re import readline import struct import threading @@ -31,12 +30,17 @@ from pathlib import Path try: from tuhi.svg import JsonSvg + import tuhi.dbusclient except ModuleNotFoundError: # If PYTHONPATH isn't set up or we never installed Tuhi, the module # isn't available. And since we don't install kete, we can assume that # we're still in the git repo, so messing with the path is "fine". sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)) + '/..') # noqa from tuhi.svg import JsonSvg + import tuhi.dbusclient + + # get those into our namespace for tuhi-live's beneft + from tuhi.dbusclient import TUHI_DBUS_NAME, ROOT_PATH, ORG_FREEDESKTOP_TUHI1_MANAGER CONFIG_PATH = Path(xdg.BaseDirectory.xdg_data_home, 'tuhi-kete') @@ -98,13 +102,6 @@ logger = logging.getLogger('tuhi-kete') logger.addHandler(logger_handler) logger.setLevel(logging.INFO) -TUHI_DBUS_NAME = 'org.freedesktop.tuhi1' -ORG_FREEDESKTOP_TUHI1_MANAGER = 'org.freedesktop.tuhi1.Manager' -ORG_FREEDESKTOP_TUHI1_DEVICE = 'org.freedesktop.tuhi1.Device' -ROOT_PATH = '/org/freedesktop/tuhi1' - -ORG_BLUEZ_DEVICE1 = 'org.bluez.Device1' - # remove ':' from the completer delimiters of readline so we can match on # device addresses completer_delims = readline.get_completer_delims() @@ -118,340 +115,52 @@ def b2hex(bs): return ' '.join([''.join(s) for s in zip(hx[::2], hx[1::2])]) -class DBusError(Exception): - def __init__(self, message): - self.message = message - - -class _DBusObject(GObject.Object): - _connection = None - - def __init__(self, name, interface, objpath): - GObject.GObject.__init__(self) - - if _DBusObject._connection is None: - self._connect_to_session() - - self.interface = interface - self.objpath = objpath - - try: - self.proxy = Gio.DBusProxy.new_sync(self._connection, - Gio.DBusProxyFlags.NONE, None, - name, objpath, interface, None) - except GLib.Error as e: - if (e.domain == 'g-io-error-quark' and - e.code == Gio.IOErrorEnum.DBUS_ERROR): - raise DBusError(e.message) - else: - raise e - - if self.proxy.get_name_owner() is None: - raise DBusError(f'No-one is handling {name}, is the daemon running?') - - self.proxy.connect('g-properties-changed', self._on_properties_changed) - self.proxy.connect('g-signal', self._on_signal_received) - - def _connect_to_session(self): - try: - _DBusObject._connection = Gio.bus_get_sync(Gio.BusType.SESSION, None) - except GLib.Error as e: - if (e.domain == 'g-io-error-quark' and - e.code == Gio.IOErrorEnum.DBUS_ERROR): - raise DBusError(e.message) - else: - raise e - - def _on_properties_changed(self, proxy, changed_props, invalidated_props): - # Implement this in derived classes to respond to property changes - pass - - def _on_signal_received(self, proxy, sender, signal, parameters): - # Implement this in derived classes to respond to signals - pass - - def property(self, name): - p = self.proxy.get_cached_property(name) - if p is not None: - return p.unpack() - return p - - def terminate(self): - del(self.proxy) - - -class _DBusSystemObject(_DBusObject): - ''' - Same as the _DBusObject, but connects to the system bus instead - ''' - def __init__(self, name, interface, objpath): - self._connect_to_system() - super().__init__(name, interface, objpath) - - def _connect_to_system(self): - try: - self._connection = Gio.bus_get_sync(Gio.BusType.SYSTEM, None) - except GLib.Error as e: - if (e.domain == 'g-io-error-quark' and - e.code == Gio.IOErrorEnum.DBUS_ERROR): - raise DBusError(e.message) - else: - raise e - - -class BlueZDevice(_DBusSystemObject): - def __init__(self, objpath): - super().__init__('org.bluez', ORG_BLUEZ_DEVICE1, objpath) - self.proxy.connect('g-properties-changed', self._on_properties_changed) - - @GObject.Property - def connected(self): - return self.proxy.get_cached_property('Connected').unpack() - - def _on_properties_changed(self, obj, properties, invalidated_properties): - properties = properties.unpack() - - if 'Connected' in properties: - self.notify('connected') - - -class TuhiKeteDevice(_DBusObject): - def __init__(self, manager, objpath): - _DBusObject.__init__(self, TUHI_DBUS_NAME, - ORG_FREEDESKTOP_TUHI1_DEVICE, - objpath) - self.manager = manager - self.is_registering = False - self.live = False - self._bluez_device = BlueZDevice(self.property('BlueZDevice')) - self._bluez_device.connect('notify::connected', self._on_connected) - - @classmethod - def is_device_address(cls, string): - if re.match(r'[0-9a-f]{2}(:[0-9a-f]{2}){5}$', string.lower()): - return string - raise argparse.ArgumentTypeError(f'"{string}" is not a valid device address') - - @GObject.Property - def address(self): - return self._bluez_device.property('Address') - - @GObject.Property - def name(self): - return self._bluez_device.property('Name') - - @GObject.Property - def listening(self): - return self.property('Listening') - - @GObject.Property - def drawings_available(self): - return self.property('DrawingsAvailable') - - @GObject.Property - def battery_percent(self): - return self.property('BatteryPercent') - - @GObject.Property - def battery_state(self): - return self.property('BatteryState') - - @GObject.Property - def connected(self): - return self._bluez_device.connected - - def _on_connected(self, bluez_device, pspec): - self.notify('connected') - - def register(self): - logger.debug(f'{self}: Register') - # FIXME: Register() doesn't return anything useful yet, so we wait until - # the device is in the Manager's Devices property - self.s1 = self.manager.connect('notify::devices', self._on_mgr_devices_updated) - self.is_registering = True - self.proxy.Register() - - def start_listening(self): - self.proxy.StartListening() - - def stop_listening(self): - try: - self.proxy.StopListening() - except GLib.Error as e: - if (e.domain != 'g-dbus-error-quark' or - e.code != Gio.IOErrorEnum.EXISTS or - Gio.dbus_error_get_remote_error(e) != 'org.freedesktop.DBus.Error.ServiceUnknown'): - raise e - - def start_live(self, fd): - fd_list = Gio.UnixFDList.new() - fd_list.append(fd) - - res, fds = self.proxy.call_with_unix_fd_list_sync('org.freedesktop.tuhi1.Device.StartLive', - GLib.Variant('(h)', (fd,)), - Gio.DBusCallFlags.NO_AUTO_START, - -1, - fd_list, - None) - if res[0] == 0: - self.live = True - - def stop_live(self): - self.proxy.StopLive() - self.live = False - - def json(self, timestamp): - SUPPORTED_FILE_FORMAT = 1 - return self.proxy.GetJSONData('(ut)', SUPPORTED_FILE_FORMAT, timestamp) - - def _on_signal_received(self, proxy, sender, signal, parameters): - if signal == 'ButtonPressRequired': - logger.info(f'{self}: Press button on device now') - elif signal == 'ListeningStopped': - err = parameters[0] - if err == -errno.EACCES: - logger.error(f'{self}: wrong device, please re-register.') - elif err < 0: - logger.error(f'{self}: an error occured: {os.strerror(-err)}') - self.notify('listening') - elif signal == 'SyncState': - state = parameters[0] - if state: - logger.debug(f'{self}: Downloading from device') - else: - logger.debug(f'{self}: Download done') - - def _on_properties_changed(self, proxy, changed_props, invalidated_props): - if changed_props is None: - return - - changed_props = changed_props.unpack() - - if 'DrawingsAvailable' in changed_props: - self.notify('drawings-available') - elif 'Listening' in changed_props: - self.notify('listening') - elif 'BatteryPercent' in changed_props: - self.notify('battery-percent') - elif 'BatteryState' in changed_props: - self.notify('battery-state') - - def __repr__(self): - return f'{self.address} - {self.name}' - - def _on_mgr_devices_updated(self, manager, pspec): - if not self.is_registering: - return - - for d in manager.devices: - if d.address == self.address: - self.is_registering = False - self.manager.disconnect(self.s1) - del(self.s1) - logger.info(f'{self}: Registration successful') - - def terminate(self): - try: - self.manager.disconnect(self.s1) - except AttributeError: - pass - self._bluez_device.terminate() - super(TuhiKeteDevice, self).terminate() - - -class TuhiKeteManager(_DBusObject): - __gsignals__ = { - 'unregistered-device': - (GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_PYOBJECT,)), - } - +class TuhiKeteManager(tuhi.dbusclient.TuhiDBusClientManager): def __init__(self): - _DBusObject.__init__(self, TUHI_DBUS_NAME, - ORG_FREEDESKTOP_TUHI1_MANAGER, - ROOT_PATH) + super().__init__() + self.connect('unregistered_device', self._on_unregistered_device) - self._devices = {} - self._unregistered_devices = {} + self.sigs = {} + for d in self.devices: + self.sigs[d] = [] + self._connect_device(d) - for objpath in self.property('Devices'): - device = TuhiKeteDevice(self, objpath) - self._devices[device.address] = device - - @GObject.Property - def devices(self): - return [v for k, v in self._devices.items()] - - @GObject.Property - def unregistered_devices(self): - return [v for k, v in self._unregistered_devices.items()] - - @GObject.Property - def searching(self): - return self.proxy.get_cached_property('Searching') - - def start_search(self): - self._unregistered_devices = {} - self.proxy.StartSearch() - - def stop_search(self): + def _disconnect_device_signals(self, device): try: - self.proxy.StopSearch() - except GLib.Error as e: - if (e.domain != 'g-dbus-error-quark' or - e.code != Gio.IOErrorEnum.EXISTS or - Gio.dbus_error_get_remote_error(e) != 'org.freedesktop.DBus.Error.ServiceUnknown'): - raise e - self._unregistered_devices = {} + for s in self.sigs[device]: + device.disconnect(s) + self.sigs[device] = [] + except KeyError: + pass - def terminate(self): - for dev in self._devices.values(): - dev.terminate() - self._devices = {} - self._unregistered_devices = {} - super(TuhiKeteManager, self).terminate() + def _on_unregistered_device(self, manager, device): + self._disconnect_device_signals(device) - def _on_properties_changed(self, proxy, changed_props, invalidated_props): - if changed_props is None: - return + def log_press_required(device): + logger.info(f'{device}: Press button on device now') + device.connect('button-press-required', log_press_required) - changed_props = changed_props.unpack() + def log_registered(device): + logger.info(f'{device}: Registration successful') + device.connect('registered', log_registered) + device.connect('registered', self._connect_device) - if 'Devices' in changed_props: - objpaths = changed_props['Devices'] - for objpath in objpaths: - try: - d = self._unregistered_devices[objpath] - self._devices[d.address] = d - del self._unregistered_devices[objpath] - except KeyError: - # if we called Register() on an existing device it's not - # in unregistered devices - pass - self.notify('devices') - if 'Searching' in changed_props: - self.notify('searching') + def _connect_device(self, device): + self._disconnect_device_signals(device) - def _handle_unregistered_device(self, objpath): - for addr, dev in self._devices.items(): - if dev.objpath == objpath: - self.emit('unregistered-device', dev) - return + def log_sync_state(device, pspec): + if device.sync_state: + logger.debug(f'{device}: Communicating with device') + else: + logger.debug(f'{device}: Communication complete') + device.connect('notify::sync-state', log_sync_state) - device = TuhiKeteDevice(self, objpath) - self._unregistered_devices[objpath] = device - - logger.debug(f'New unregistered device: {device}') - self.emit('unregistered-device', device) - - def _on_signal_received(self, proxy, sender, signal, parameters): - if signal == 'SearchStopped': - self.notify('searching') - elif signal == 'UnregisteredDevice': - objpath = parameters[0] - self._handle_unregistered_device(objpath) - - def __getitem__(self, btaddr): - return self._devices[btaddr] + def log_device_error(d, err): + if err == -errno.EACCES: + logger.error(f'{device}: wrong device, please re-register.') + elif err < 0: + logger.error(f'{device}: an error occured: {os.strerror(-err)}') + device.connect('device-error', log_device_error) class Worker(GObject.Object): @@ -743,7 +452,7 @@ class TuhiKeteShell(cmd.Cmd): readline.set_history_length(100) Gio.bus_watch_name(Gio.BusType.SESSION, - TUHI_DBUS_NAME, + tuhi.dbusclient.TUHI_DBUS_NAME, Gio.BusNameWatcherFlags.NONE, self._on_name_appeared, self._on_name_vanished) @@ -875,7 +584,7 @@ class TuhiKeteShell(cmd.Cmd): add_help=False) parser.add_argument('-h', action='help', help=argparse.SUPPRESS) parser.add_argument('address', metavar='12:34:56:AB:CD:EF', - type=TuhiKeteDevice.is_device_address, + type=tuhi.dbusclient.TuhiDBusClientDevice.is_device_address, default=None, help='the address of the device to listen to') parser.add_argument('mode', choices=['on', 'off'], nargs='?', @@ -986,7 +695,7 @@ class TuhiKeteShell(cmd.Cmd): add_help=False) parser.add_argument('-h', action='help', help=argparse.SUPPRESS) parser.add_argument('address', metavar='12:34:56:AB:CD:EF', - type=TuhiKeteDevice.is_device_address, + type=tuhi.dbusclient.TuhiDBusClientDevice.is_device_address, default=None, help='the address of the device to fetch drawing from') parser.add_argument('index', metavar='{|all}', @@ -1083,7 +792,7 @@ class TuhiKeteShell(cmd.Cmd): add_help=False) parser.add_argument('-h', action='help', help=argparse.SUPPRESS) parser.add_argument('address', metavar='12:34:56:AB:CD:EF', - type=TuhiKeteDevice.is_device_address, + type=tuhi.dbusclient.TuhiDBusClientDevice.is_device_address, default=None, help='the address of the device to register') @@ -1137,7 +846,7 @@ class TuhiKeteShell(cmd.Cmd): add_help=False) parser.add_argument('-h', action='help', help=argparse.SUPPRESS) parser.add_argument('address', metavar='12:34:56:AB:CD:EF', - type=TuhiKeteDevice.is_device_address, + type=tuhi.dbusclient.TuhiDBusClientDevice.is_device_address, default=None, nargs='?', help='the address of the device to listen to') @@ -1189,7 +898,7 @@ class TuhiKeteShell(cmd.Cmd): add_help=False) parser.add_argument('-h', action='help', help=argparse.SUPPRESS) parser.add_argument('address', metavar='12:34:56:AB:CD:EF', - type=TuhiKeteDevice.is_device_address, + type=tuhi.dbusclient.TuhiDBusClientDevice.is_device_address, default=None, nargs='?', help='the address of the device to listen to') parser.add_argument('mode', choices=['on', 'off'], nargs='?', @@ -1246,7 +955,7 @@ def main(args): with TuhiKeteShell() as shell: shell.run() - except DBusError as e: + except tuhi.dbusclient.DBusError as e: logger.error(e.message)