tools: use the shared dbus client bindings for kete

Since most of the signals are propagated by the bindings we largely just need
to hook up simple methods to log what is happening.

One nasty hack: we import some of the bits into the kete namespace so
tuhi-live continues to work. This can be fixed up in a follow-up commit.

Signed-off-by: Peter Hutterer <peter.hutterer@who-t.net>
This commit is contained in:
Peter Hutterer 2019-08-28 09:52:16 +10:00 committed by Benjamin Tissoires
parent 07cefa3c26
commit cd84de4f32
1 changed files with 48 additions and 339 deletions

View File

@ -20,7 +20,6 @@ import errno
import os
import json
import logging
import re
import readline
import struct
import threading
@ -31,12 +30,17 @@ from pathlib import Path
try:
from tuhi.svg import JsonSvg
import tuhi.dbusclient
except ModuleNotFoundError:
# If PYTHONPATH isn't set up or we never installed Tuhi, the module
# isn't available. And since we don't install kete, we can assume that
# we're still in the git repo, so messing with the path is "fine".
sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)) + '/..') # noqa
from tuhi.svg import JsonSvg
import tuhi.dbusclient
# get those into our namespace for tuhi-live's beneft
from tuhi.dbusclient import TUHI_DBUS_NAME, ROOT_PATH, ORG_FREEDESKTOP_TUHI1_MANAGER
CONFIG_PATH = Path(xdg.BaseDirectory.xdg_data_home, 'tuhi-kete')
@ -98,13 +102,6 @@ logger = logging.getLogger('tuhi-kete')
logger.addHandler(logger_handler)
logger.setLevel(logging.INFO)
TUHI_DBUS_NAME = 'org.freedesktop.tuhi1'
ORG_FREEDESKTOP_TUHI1_MANAGER = 'org.freedesktop.tuhi1.Manager'
ORG_FREEDESKTOP_TUHI1_DEVICE = 'org.freedesktop.tuhi1.Device'
ROOT_PATH = '/org/freedesktop/tuhi1'
ORG_BLUEZ_DEVICE1 = 'org.bluez.Device1'
# remove ':' from the completer delimiters of readline so we can match on
# device addresses
completer_delims = readline.get_completer_delims()
@ -118,340 +115,52 @@ def b2hex(bs):
return ' '.join([''.join(s) for s in zip(hx[::2], hx[1::2])])
class DBusError(Exception):
def __init__(self, message):
self.message = message
class _DBusObject(GObject.Object):
_connection = None
def __init__(self, name, interface, objpath):
GObject.GObject.__init__(self)
if _DBusObject._connection is None:
self._connect_to_session()
self.interface = interface
self.objpath = objpath
try:
self.proxy = Gio.DBusProxy.new_sync(self._connection,
Gio.DBusProxyFlags.NONE, None,
name, objpath, interface, None)
except GLib.Error as e:
if (e.domain == 'g-io-error-quark' and
e.code == Gio.IOErrorEnum.DBUS_ERROR):
raise DBusError(e.message)
else:
raise e
if self.proxy.get_name_owner() is None:
raise DBusError(f'No-one is handling {name}, is the daemon running?')
self.proxy.connect('g-properties-changed', self._on_properties_changed)
self.proxy.connect('g-signal', self._on_signal_received)
def _connect_to_session(self):
try:
_DBusObject._connection = Gio.bus_get_sync(Gio.BusType.SESSION, None)
except GLib.Error as e:
if (e.domain == 'g-io-error-quark' and
e.code == Gio.IOErrorEnum.DBUS_ERROR):
raise DBusError(e.message)
else:
raise e
def _on_properties_changed(self, proxy, changed_props, invalidated_props):
# Implement this in derived classes to respond to property changes
pass
def _on_signal_received(self, proxy, sender, signal, parameters):
# Implement this in derived classes to respond to signals
pass
def property(self, name):
p = self.proxy.get_cached_property(name)
if p is not None:
return p.unpack()
return p
def terminate(self):
del(self.proxy)
class _DBusSystemObject(_DBusObject):
'''
Same as the _DBusObject, but connects to the system bus instead
'''
def __init__(self, name, interface, objpath):
self._connect_to_system()
super().__init__(name, interface, objpath)
def _connect_to_system(self):
try:
self._connection = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)
except GLib.Error as e:
if (e.domain == 'g-io-error-quark' and
e.code == Gio.IOErrorEnum.DBUS_ERROR):
raise DBusError(e.message)
else:
raise e
class BlueZDevice(_DBusSystemObject):
def __init__(self, objpath):
super().__init__('org.bluez', ORG_BLUEZ_DEVICE1, objpath)
self.proxy.connect('g-properties-changed', self._on_properties_changed)
@GObject.Property
def connected(self):
return self.proxy.get_cached_property('Connected').unpack()
def _on_properties_changed(self, obj, properties, invalidated_properties):
properties = properties.unpack()
if 'Connected' in properties:
self.notify('connected')
class TuhiKeteDevice(_DBusObject):
def __init__(self, manager, objpath):
_DBusObject.__init__(self, TUHI_DBUS_NAME,
ORG_FREEDESKTOP_TUHI1_DEVICE,
objpath)
self.manager = manager
self.is_registering = False
self.live = False
self._bluez_device = BlueZDevice(self.property('BlueZDevice'))
self._bluez_device.connect('notify::connected', self._on_connected)
@classmethod
def is_device_address(cls, string):
if re.match(r'[0-9a-f]{2}(:[0-9a-f]{2}){5}$', string.lower()):
return string
raise argparse.ArgumentTypeError(f'"{string}" is not a valid device address')
@GObject.Property
def address(self):
return self._bluez_device.property('Address')
@GObject.Property
def name(self):
return self._bluez_device.property('Name')
@GObject.Property
def listening(self):
return self.property('Listening')
@GObject.Property
def drawings_available(self):
return self.property('DrawingsAvailable')
@GObject.Property
def battery_percent(self):
return self.property('BatteryPercent')
@GObject.Property
def battery_state(self):
return self.property('BatteryState')
@GObject.Property
def connected(self):
return self._bluez_device.connected
def _on_connected(self, bluez_device, pspec):
self.notify('connected')
def register(self):
logger.debug(f'{self}: Register')
# FIXME: Register() doesn't return anything useful yet, so we wait until
# the device is in the Manager's Devices property
self.s1 = self.manager.connect('notify::devices', self._on_mgr_devices_updated)
self.is_registering = True
self.proxy.Register()
def start_listening(self):
self.proxy.StartListening()
def stop_listening(self):
try:
self.proxy.StopListening()
except GLib.Error as e:
if (e.domain != 'g-dbus-error-quark' or
e.code != Gio.IOErrorEnum.EXISTS or
Gio.dbus_error_get_remote_error(e) != 'org.freedesktop.DBus.Error.ServiceUnknown'):
raise e
def start_live(self, fd):
fd_list = Gio.UnixFDList.new()
fd_list.append(fd)
res, fds = self.proxy.call_with_unix_fd_list_sync('org.freedesktop.tuhi1.Device.StartLive',
GLib.Variant('(h)', (fd,)),
Gio.DBusCallFlags.NO_AUTO_START,
-1,
fd_list,
None)
if res[0] == 0:
self.live = True
def stop_live(self):
self.proxy.StopLive()
self.live = False
def json(self, timestamp):
SUPPORTED_FILE_FORMAT = 1
return self.proxy.GetJSONData('(ut)', SUPPORTED_FILE_FORMAT, timestamp)
def _on_signal_received(self, proxy, sender, signal, parameters):
if signal == 'ButtonPressRequired':
logger.info(f'{self}: Press button on device now')
elif signal == 'ListeningStopped':
err = parameters[0]
if err == -errno.EACCES:
logger.error(f'{self}: wrong device, please re-register.')
elif err < 0:
logger.error(f'{self}: an error occured: {os.strerror(-err)}')
self.notify('listening')
elif signal == 'SyncState':
state = parameters[0]
if state:
logger.debug(f'{self}: Downloading from device')
else:
logger.debug(f'{self}: Download done')
def _on_properties_changed(self, proxy, changed_props, invalidated_props):
if changed_props is None:
return
changed_props = changed_props.unpack()
if 'DrawingsAvailable' in changed_props:
self.notify('drawings-available')
elif 'Listening' in changed_props:
self.notify('listening')
elif 'BatteryPercent' in changed_props:
self.notify('battery-percent')
elif 'BatteryState' in changed_props:
self.notify('battery-state')
def __repr__(self):
return f'{self.address} - {self.name}'
def _on_mgr_devices_updated(self, manager, pspec):
if not self.is_registering:
return
for d in manager.devices:
if d.address == self.address:
self.is_registering = False
self.manager.disconnect(self.s1)
del(self.s1)
logger.info(f'{self}: Registration successful')
def terminate(self):
try:
self.manager.disconnect(self.s1)
except AttributeError:
pass
self._bluez_device.terminate()
super(TuhiKeteDevice, self).terminate()
class TuhiKeteManager(_DBusObject):
__gsignals__ = {
'unregistered-device':
(GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_PYOBJECT,)),
}
class TuhiKeteManager(tuhi.dbusclient.TuhiDBusClientManager):
def __init__(self):
_DBusObject.__init__(self, TUHI_DBUS_NAME,
ORG_FREEDESKTOP_TUHI1_MANAGER,
ROOT_PATH)
super().__init__()
self.connect('unregistered_device', self._on_unregistered_device)
self._devices = {}
self._unregistered_devices = {}
self.sigs = {}
for d in self.devices:
self.sigs[d] = []
self._connect_device(d)
for objpath in self.property('Devices'):
device = TuhiKeteDevice(self, objpath)
self._devices[device.address] = device
@GObject.Property
def devices(self):
return [v for k, v in self._devices.items()]
@GObject.Property
def unregistered_devices(self):
return [v for k, v in self._unregistered_devices.items()]
@GObject.Property
def searching(self):
return self.proxy.get_cached_property('Searching')
def start_search(self):
self._unregistered_devices = {}
self.proxy.StartSearch()
def stop_search(self):
def _disconnect_device_signals(self, device):
try:
self.proxy.StopSearch()
except GLib.Error as e:
if (e.domain != 'g-dbus-error-quark' or
e.code != Gio.IOErrorEnum.EXISTS or
Gio.dbus_error_get_remote_error(e) != 'org.freedesktop.DBus.Error.ServiceUnknown'):
raise e
self._unregistered_devices = {}
for s in self.sigs[device]:
device.disconnect(s)
self.sigs[device] = []
except KeyError:
pass
def terminate(self):
for dev in self._devices.values():
dev.terminate()
self._devices = {}
self._unregistered_devices = {}
super(TuhiKeteManager, self).terminate()
def _on_unregistered_device(self, manager, device):
self._disconnect_device_signals(device)
def _on_properties_changed(self, proxy, changed_props, invalidated_props):
if changed_props is None:
return
def log_press_required(device):
logger.info(f'{device}: Press button on device now')
device.connect('button-press-required', log_press_required)
changed_props = changed_props.unpack()
def log_registered(device):
logger.info(f'{device}: Registration successful')
device.connect('registered', log_registered)
device.connect('registered', self._connect_device)
if 'Devices' in changed_props:
objpaths = changed_props['Devices']
for objpath in objpaths:
try:
d = self._unregistered_devices[objpath]
self._devices[d.address] = d
del self._unregistered_devices[objpath]
except KeyError:
# if we called Register() on an existing device it's not
# in unregistered devices
pass
self.notify('devices')
if 'Searching' in changed_props:
self.notify('searching')
def _connect_device(self, device):
self._disconnect_device_signals(device)
def _handle_unregistered_device(self, objpath):
for addr, dev in self._devices.items():
if dev.objpath == objpath:
self.emit('unregistered-device', dev)
return
def log_sync_state(device, pspec):
if device.sync_state:
logger.debug(f'{device}: Communicating with device')
else:
logger.debug(f'{device}: Communication complete')
device.connect('notify::sync-state', log_sync_state)
device = TuhiKeteDevice(self, objpath)
self._unregistered_devices[objpath] = device
logger.debug(f'New unregistered device: {device}')
self.emit('unregistered-device', device)
def _on_signal_received(self, proxy, sender, signal, parameters):
if signal == 'SearchStopped':
self.notify('searching')
elif signal == 'UnregisteredDevice':
objpath = parameters[0]
self._handle_unregistered_device(objpath)
def __getitem__(self, btaddr):
return self._devices[btaddr]
def log_device_error(d, err):
if err == -errno.EACCES:
logger.error(f'{device}: wrong device, please re-register.')
elif err < 0:
logger.error(f'{device}: an error occured: {os.strerror(-err)}')
device.connect('device-error', log_device_error)
class Worker(GObject.Object):
@ -743,7 +452,7 @@ class TuhiKeteShell(cmd.Cmd):
readline.set_history_length(100)
Gio.bus_watch_name(Gio.BusType.SESSION,
TUHI_DBUS_NAME,
tuhi.dbusclient.TUHI_DBUS_NAME,
Gio.BusNameWatcherFlags.NONE,
self._on_name_appeared,
self._on_name_vanished)
@ -875,7 +584,7 @@ class TuhiKeteShell(cmd.Cmd):
add_help=False)
parser.add_argument('-h', action='help', help=argparse.SUPPRESS)
parser.add_argument('address', metavar='12:34:56:AB:CD:EF',
type=TuhiKeteDevice.is_device_address,
type=tuhi.dbusclient.TuhiDBusClientDevice.is_device_address,
default=None,
help='the address of the device to listen to')
parser.add_argument('mode', choices=['on', 'off'], nargs='?',
@ -986,7 +695,7 @@ class TuhiKeteShell(cmd.Cmd):
add_help=False)
parser.add_argument('-h', action='help', help=argparse.SUPPRESS)
parser.add_argument('address', metavar='12:34:56:AB:CD:EF',
type=TuhiKeteDevice.is_device_address,
type=tuhi.dbusclient.TuhiDBusClientDevice.is_device_address,
default=None,
help='the address of the device to fetch drawing from')
parser.add_argument('index', metavar='{<index>|all}',
@ -1083,7 +792,7 @@ class TuhiKeteShell(cmd.Cmd):
add_help=False)
parser.add_argument('-h', action='help', help=argparse.SUPPRESS)
parser.add_argument('address', metavar='12:34:56:AB:CD:EF',
type=TuhiKeteDevice.is_device_address,
type=tuhi.dbusclient.TuhiDBusClientDevice.is_device_address,
default=None,
help='the address of the device to register')
@ -1137,7 +846,7 @@ class TuhiKeteShell(cmd.Cmd):
add_help=False)
parser.add_argument('-h', action='help', help=argparse.SUPPRESS)
parser.add_argument('address', metavar='12:34:56:AB:CD:EF',
type=TuhiKeteDevice.is_device_address,
type=tuhi.dbusclient.TuhiDBusClientDevice.is_device_address,
default=None, nargs='?',
help='the address of the device to listen to')
@ -1189,7 +898,7 @@ class TuhiKeteShell(cmd.Cmd):
add_help=False)
parser.add_argument('-h', action='help', help=argparse.SUPPRESS)
parser.add_argument('address', metavar='12:34:56:AB:CD:EF',
type=TuhiKeteDevice.is_device_address,
type=tuhi.dbusclient.TuhiDBusClientDevice.is_device_address,
default=None, nargs='?',
help='the address of the device to listen to')
parser.add_argument('mode', choices=['on', 'off'], nargs='?',
@ -1246,7 +955,7 @@ def main(args):
with TuhiKeteShell() as shell:
shell.run()
except DBusError as e:
except tuhi.dbusclient.DBusError as e:
logger.error(e.message)