#!/usr/bin/env python3 # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # from gi.repository import GObject, Gio, GLib import argparse import errno import os import logging import re logger = logging.getLogger('tuhi.dbusclient') TUHI_DBUS_NAME = 'org.freedesktop.tuhi1' ORG_FREEDESKTOP_TUHI1_MANAGER = 'org.freedesktop.tuhi1.Manager' ORG_FREEDESKTOP_TUHI1_DEVICE = 'org.freedesktop.tuhi1.Device' ROOT_PATH = '/org/freedesktop/tuhi1' ORG_BLUEZ_DEVICE1 = 'org.bluez.Device1' class DBusError(Exception): def __init__(self, message): self.message = message class _DBusObject(GObject.Object): _connection = None def __init__(self, name, interface, objpath): super().__init__() # this is not handled asynchronously because if we fail to # get the session bus, we have other issues if _DBusObject._connection is None: self._connect_to_session() self.interface = interface self.objpath = objpath self._online = False self._name = name try: self._connect() except DBusError: self._reconnect_timer = GObject.timeout_add_seconds(2, self._on_reconnect_timer) def _connect(self): try: self.proxy = Gio.DBusProxy.new_sync(self._connection, Gio.DBusProxyFlags.NONE, None, self._name, self.objpath, self.interface, None) if self.proxy.get_name_owner() is None: raise DBusError(f'No-one is handling {self._name}, is the daemon running?') self._online = True self.notify('online') except GLib.Error as e: if (e.domain == 'g-io-error-quark' and e.code == Gio.IOErrorEnum.DBUS_ERROR): raise DBusError(e.message) else: raise e self.proxy.connect('g-properties-changed', self._on_properties_changed) self.proxy.connect('g-signal', self._on_signal_received) def _on_reconnect_timer(self): try: logger.debug('reconnecting') self._connect() return False except DBusError: return True @GObject.Property def online(self): return self._online def _connect_to_session(self): try: _DBusObject._connection = Gio.bus_get_sync(Gio.BusType.SESSION, None) except GLib.Error as e: if (e.domain == 'g-io-error-quark' and e.code == Gio.IOErrorEnum.DBUS_ERROR): raise DBusError(e.message) else: raise e def _on_properties_changed(self, proxy, changed_props, invalidated_props): # Implement this in derived classes to respond to property changes pass def _on_signal_received(self, proxy, sender, signal, parameters): # Implement this in derived classes to respond to signals pass def property(self, name): p = self.proxy.get_cached_property(name) if p is not None: return p.unpack() return p def terminate(self): del(self.proxy) class _DBusSystemObject(_DBusObject): ''' Same as the _DBusObject, but connects to the system bus instead ''' def __init__(self, name, interface, objpath): self._connect_to_system() super().__init__(name, interface, objpath) def _connect_to_system(self): try: self._connection = Gio.bus_get_sync(Gio.BusType.SYSTEM, None) except GLib.Error as e: if (e.domain == 'g-io-error-quark' and e.code == Gio.IOErrorEnum.DBUS_ERROR): raise DBusError(e.message) else: raise e class BlueZDevice(_DBusSystemObject): def __init__(self, objpath): super().__init__('org.bluez', ORG_BLUEZ_DEVICE1, objpath) self.proxy.connect('g-properties-changed', self._on_properties_changed) @GObject.Property def connected(self): return self.proxy.get_cached_property('Connected').unpack() def _on_properties_changed(self, obj, properties, invalidated_properties): properties = properties.unpack() if 'Connected' in properties: self.notify('connected') class TuhiDBusClientDevice(_DBusObject): __gsignals__ = { 'button-press-required': (GObject.SignalFlags.RUN_FIRST, None, ()), 'registered': (GObject.SignalFlags.RUN_FIRST, None, ()), 'device-error': (GObject.SignalFlags.RUN_FIRST, None, (int,)), } def __init__(self, manager, objpath): super().__init__(TUHI_DBUS_NAME, ORG_FREEDESKTOP_TUHI1_DEVICE, objpath) self.manager = manager self.is_registering = False self._bluez_device = BlueZDevice(self.property('BlueZDevice')) self._bluez_device.connect('notify::connected', self._on_connected) self._sync_state = 0 @classmethod def is_device_address(cls, string): if re.match(r'[0-9a-f]{2}(:[0-9a-f]{2}){5}$', string.lower()): return string raise argparse.ArgumentTypeError(f'"{string}" is not a valid device address') @GObject.Property def address(self): return self._bluez_device.property('Address') @GObject.Property def name(self): return self._bluez_device.property('Name') @GObject.Property def dimensions(self): return self.property('Dimensions') @GObject.Property def listening(self): return self.property('Listening') @GObject.Property def drawings_available(self): return self.property('DrawingsAvailable') @GObject.Property def battery_percent(self): return self.property('BatteryPercent') @GObject.Property def battery_state(self): return self.property('BatteryState') @GObject.Property def connected(self): return self._bluez_device.connected @GObject.Property def sync_state(self): return self._sync_state @GObject.Property def live(self): return self.property('Live') def _on_connected(self, bluez_device, pspec): self.notify('connected') def register(self): logger.debug(f'{self}: Register') # FIXME: Register() doesn't return anything useful yet, so we wait until # the device is in the Manager's Devices property self.s1 = self.manager.connect('notify::devices', self._on_mgr_devices_updated) self.is_registering = True self.proxy.Register() def start_listening(self): self.proxy.StartListening() def stop_listening(self): try: self.proxy.StopListening() except GLib.Error as e: if (e.domain != 'g-dbus-error-quark' or e.code != Gio.IOErrorEnum.EXISTS or Gio.dbus_error_get_remote_error(e) != 'org.freedesktop.DBus.Error.ServiceUnknown'): raise e def json(self, timestamp): SUPPORTED_FILE_FORMAT = 1 return self.proxy.GetJSONData('(ut)', SUPPORTED_FILE_FORMAT, timestamp) def _on_signal_received(self, proxy, sender, signal, parameters): if signal == 'ButtonPressRequired': logger.info(f'{self}: Press button on device now') self.emit('button-press-required') elif signal == 'ListeningStopped': err = parameters[0] if err == -errno.EACCES: logger.error(f'{self}: wrong device, please re-register.') elif err < 0: logger.error(f'{self}: an error occured: {os.strerror(-err)}') self.emit('device-error', err) self.notify('listening') elif signal == 'SyncState': self._sync_state = parameters[0] self.notify('sync-state') def _on_properties_changed(self, proxy, changed_props, invalidated_props): if changed_props is None: return changed_props = changed_props.unpack() if 'DrawingsAvailable' in changed_props: self.notify('drawings-available') elif 'Listening' in changed_props: self.notify('listening') elif 'BatteryPercent' in changed_props: self.notify('battery-percent') elif 'BatteryState' in changed_props: self.notify('battery-state') elif 'Live' in changed_props: self.notify('live') def __repr__(self): return f'{self.address} - {self.name}' def _on_mgr_devices_updated(self, manager, pspec): if not self.is_registering: return for d in manager.devices: if d.address == self.address: self.is_registering = False self.manager.disconnect(self.s1) del(self.s1) logger.info(f'{self}: Registration successful') self.emit('registered') def start_live(self, fd): fd_list = Gio.UnixFDList.new() fd_list.append(fd) res, fds = self.proxy.call_with_unix_fd_list_sync('org.freedesktop.tuhi1.Device.StartLive', GLib.Variant('(h)', (fd,)), Gio.DBusCallFlags.NO_AUTO_START, -1, fd_list, None) def stop_live(self): self.proxy.StopLive() def terminate(self): try: self.manager.disconnect(self.s1) except AttributeError: pass self._bluez_device.terminate() super().terminate() class TuhiDBusClientManager(_DBusObject): __gsignals__ = { 'unregistered-device': (GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_PYOBJECT,)), } def __init__(self): super().__init__(TUHI_DBUS_NAME, ORG_FREEDESKTOP_TUHI1_MANAGER, ROOT_PATH) self._devices = {} self._unregistered_devices = {} logger.info('starting up') if not self.online: self.connect('notify::online', self._init) else: self._init() def _init(self, *args, **kwargs): logger.info('manager is online') for objpath in self.property('Devices'): device = TuhiDBusClientDevice(self, objpath) self._devices[device.address] = device @GObject.Property def devices(self): return [v for k, v in self._devices.items()] @GObject.Property def unregistered_devices(self): return [v for k, v in self._unregistered_devices.items()] @GObject.Property def searching(self): return self.proxy.get_cached_property('Searching') def start_search(self): self._unregistered_devices = {} self.proxy.StartSearch() def stop_search(self): try: self.proxy.StopSearch() except GLib.Error as e: if (e.domain != 'g-dbus-error-quark' or e.code != Gio.IOErrorEnum.EXISTS or Gio.dbus_error_get_remote_error(e) != 'org.freedesktop.DBus.Error.ServiceUnknown'): raise e self._unregistered_devices = {} def terminate(self): for dev in self._devices.values(): dev.terminate() self._devices = {} self._unregistered_devices = {} super().terminate() def _on_properties_changed(self, proxy, changed_props, invalidated_props): if changed_props is None: return changed_props = changed_props.unpack() if 'Devices' in changed_props: objpaths = changed_props['Devices'] for objpath in objpaths: try: d = self._unregistered_devices[objpath] self._devices[d.address] = d del self._unregistered_devices[objpath] except KeyError: # if we called Register() on an existing device it's not # in unregistered devices pass self.notify('devices') if 'Searching' in changed_props: self.notify('searching') def _handle_unregistered_device(self, objpath): for addr, dev in self._devices.items(): if dev.objpath == objpath: self.emit('unregistered-device', dev) return device = TuhiDBusClientDevice(self, objpath) self._unregistered_devices[objpath] = device logger.debug(f'New unregistered device: {device}') self.emit('unregistered-device', device) def _on_signal_received(self, proxy, sender, signal, parameters): if signal == 'SearchStopped': self.notify('searching') elif signal == 'UnregisteredDevice': objpath = parameters[0] self._handle_unregistered_device(objpath) def __getitem__(self, btaddr): return self._devices[btaddr]