tuhi/tuhi/ble.py

465 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
import logging
from functools import partial
from gi.repository import GObject, Gio, GLib
logger = logging.getLogger('tuhi.ble')
ORG_BLUEZ_GATTCHARACTERISTIC1 = 'org.bluez.GattCharacteristic1'
ORG_BLUEZ_GATTSERVICE1 = 'org.bluez.GattService1'
ORG_BLUEZ_DEVICE1 = 'org.bluez.Device1'
ORG_BLUEZ_ADAPTER1 = 'org.bluez.Adapter1'
class BlueZCharacteristic(GObject.Object):
'''
Abstraction for a org.bluez.GattCharacteristic1 object.
Use start_notify() to receive notifications about the characteristics.
Hook up a property with connect_property() first.
'''
def __init__(self, obj):
'''
:param obj: the org.bluez.GattCharacteristic1 DBus proxy object
'''
self.obj = obj
assert(self.interface is not None)
assert(self.uuid is not None)
self._property_callbacks = {}
self.interface.connect('g-properties-changed',
self._on_properties_changed)
@GObject.Property
def interface(self):
return self.obj.get_interface(ORG_BLUEZ_GATTCHARACTERISTIC1)
@GObject.Property
def objpath(self):
return self.obj.get_object_path()
@GObject.Property
def uuid(self):
return self.interface.get_cached_property('UUID').unpack()
def connect_property(self, propname, callback):
'''
Connect the property with the given name to the callback function
provide. When the property chages, callback is invoked as:
callback(propname, value)
The common way is connect_property('Value', do_something) to get
notified about Value changes on this characteristic.
'''
self._property_callbacks[propname] = callback
def start_notify(self):
self.interface.StartNotify()
def write_value(self, data):
return self.interface.WriteValue('(aya{sv})', data, {})
def _on_properties_changed(self, obj, properties, invalidated_properties):
properties = properties.unpack()
for name, value in properties.items():
try:
self._property_callbacks[name](name, value)
except KeyError:
pass
def __repr__(self):
return f'Characteristic {self.uuid}:{self.objpath}'
class BlueZDevice(GObject.Object):
'''
Abstraction for a org.bluez.Device1 object
The device initializes itself based on the given object manager and
object, specifically: it resolves its gatt characteristics.
To connect to the real device, call connect_to_device(). The 'connected'
and 'disconnected' signals are emitted when the connection is
established.
The device's characteristics are in self.characteristics[uuid]
'''
__gsignals__ = {
'connected':
(GObject.SignalFlags.RUN_FIRST, None, ()),
'disconnected':
(GObject.SignalFlags.RUN_FIRST, None, ()),
'updated':
(GObject.SignalFlags.RUN_FIRST, None, ()),
}
def __init__(self, om, obj):
'''
:param om: The ObjectManager for name org.bluez path /
:param obj: The org.bluez.Device1 DBus proxy object
'''
GObject.Object.__init__(self)
self.obj = obj
self.om = om
self.logger = logger.getChild(self.address)
assert(self.interface is not None)
self.logger.debug(f'Device {self.objpath} - {self.name}')
self.characteristics = {}
self._resolve_gatt_characteristics()
self.interface.connect('g-properties-changed', self._on_properties_changed)
if self.connected:
self.emit('connected')
@GObject.Property
def objpath(self):
return self.obj.get_object_path()
@GObject.Property
def interface(self):
return self.obj.get_interface(ORG_BLUEZ_DEVICE1)
@GObject.Property
def name(self):
try:
return self.interface.get_cached_property('Name').unpack()
except AttributeError:
return 'UNKNOWN'
@GObject.Property
def address(self):
return self.interface.get_cached_property('Address').unpack()
@GObject.Property
def uuids(self):
return self.interface.get_cached_property('UUIDs').unpack()
@GObject.Property
def vendor_id(self):
md = self.interface.get_cached_property('ManufacturerData')
if md is None:
return None
try:
return next(iter(dict(md)))
except StopIteration:
# dict is empty
pass
return None
@GObject.Property
def connected(self):
return (self.interface.get_cached_property('Connected').unpack() and
self.interface.get_cached_property('ServicesResolved').unpack())
@GObject.Property
def manufacturer_data(self):
md = self.interface.get_cached_property('ManufacturerData')
if md is None:
return None
try:
return next(iter(dict(md).values()))
except StopIteration:
# dict is empty
pass
return None
def _resolve_gatt_characteristics(self):
'''
Resolve the GattCharacteristics.
'''
objs = self.om.get_objects(interface=ORG_BLUEZ_GATTCHARACTERISTIC1,
base_path=self.objpath)
for obj in objs:
i = obj.get_interface(ORG_BLUEZ_GATTCHARACTERISTIC1)
uuid = i.get_cached_property('UUID').unpack()
if uuid in self.characteristics:
continue
self.characteristics[uuid] = BlueZCharacteristic(obj)
self.logger.debug(f'GattCharacteristic: {uuid}')
def connect_device(self):
'''
Connect to the bluetooth device via bluez. This function is
asynchronous and returns immediately.
'''
i = self.obj.get_interface(ORG_BLUEZ_DEVICE1)
if self.connected:
self.logger.info('Device is already connected')
self.emit('connected')
return
self.logger.debug('Connecting')
i.Connect(result_handler=self._on_connect_result)
def _on_connect_result(self, obj, result, user_data):
if (isinstance(result, GLib.Error) and
result.domain == 'g-io-error-quark' and
result.code == Gio.IOErrorEnum.DBUS_ERROR and
Gio.dbus_error_get_remote_error(result) == 'org.bluez.Error.Failed' and
'Operation already in progress' in result.message):
self.logger.debug('Already connecting')
elif isinstance(result, Exception):
self.logger.error(f'Connection failed: {result}')
def disconnect_device(self):
'''
Disconnect the bluetooth device via bluez. This function is
asynchronous and returns immediately.
'''
i = self.obj.get_interface(ORG_BLUEZ_DEVICE1)
if not i.get_cached_property('Connected').get_boolean():
self.logger.info('Device is already disconnected')
self.emit('disconnected')
return
self.logger.debug('Disconnecting')
i.Disconnect(result_handler=self._on_disconnect_result)
def _on_disconnect_result(self, obj, result, user_data):
if isinstance(result, Exception):
self.logger.error(f'Disconnection failed: {result}')
def _on_properties_changed(self, obj, properties, invalidated_properties):
properties = properties.unpack()
if 'Connected' in properties:
if properties['Connected']:
self.logger.debug('Connection established')
else:
self.logger.debug('Disconnected')
self.emit('disconnected')
if 'ServicesResolved' in properties:
if properties['ServicesResolved']:
self._resolve_gatt_characteristics()
self.emit('connected')
if 'RSSI' in properties:
self.emit('updated')
if 'ManufacturerData' in properties:
self.notify('manufacturer-data')
def connect_gatt_value(self, uuid, callback):
'''
Connects Value property changes of the given GATT Characteristics
UUID to the callback.
'''
try:
chrc = self.characteristics[uuid]
chrc.connect_property('Value', callback)
chrc.start_notify()
except KeyError:
pass
def __repr__(self):
return f'Device {self.name}:{self.objpath}'
class BlueZObjectManager:
'''
Namespace to encapsulate our modification to the object manager.
'''
@classmethod
def instance(cls):
proxy = Gio.DBusObjectManagerClient.new_for_bus_sync(
Gio.BusType.SYSTEM,
Gio.DBusObjectManagerClientFlags.NONE,
'org.bluez',
'/',
None,
None,
None)
# Replace the object managers get_objects() with our pimped one.
proxy.get_objects_unsorted = proxy.get_objects
proxy.get_objects = partial(cls.get_objects, proxy)
return proxy
def get_objects(self, interface=None, base_path=None):
'''
Get objects sorted by their object path.
Optional arguments can be used to filter the returned object list.
:param interface: filter objects by interface, default is None
:param base_path: filter objects by object path, default is None
(the objects path has to start with `base_path`)
'''
def base_path_filter(obj):
return obj.get_object_path().startswith(base_path)
def interface_filter(obj):
return obj.get_interface(interface) is not None
objs = self.get_objects_unsorted()
if base_path is not None:
objs = filter(base_path_filter, objs)
if interface is not None:
objs = filter(interface_filter, objs)
return sorted(objs, key=lambda obj: obj.get_object_path())
class BlueZDeviceManager(GObject.Object):
'''
Manager object that connects to org.bluez's root object and handles the
devices.
'''
__gsignals__ = {
'device-added':
(GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_PYOBJECT,)),
'device-updated':
(GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_PYOBJECT,)),
'discovery-started':
(GObject.SignalFlags.RUN_FIRST, None, ()),
'discovery-stopped':
(GObject.SignalFlags.RUN_FIRST, None, ()),
}
def __init__(self, **kwargs):
GObject.Object.__init__(self, **kwargs)
self.devices = []
self._discovery = False
def connect_to_bluez(self):
'''
Connect to bluez's DBus interface. Once called, devices will be
resolved as they come in. The device-added signal is emitted for
each device.
'''
self._om = BlueZObjectManager.instance()
self._om.connect('object-added', self._on_om_object_added)
self._om.connect('object-removed', self._on_om_object_removed)
for obj in self._om.get_objects():
self._process_object(obj)
def _discovery_timeout_expired(self):
self.stop_discovery()
return False
def start_discovery(self, timeout=0):
'''
Start discovery mode, terminating after the specified timeout (in
seconds). If timeout is 0, no timeout is imposed and the discovery
mode stays on.
This emits the discovery-started signal
'''
self.emit('discovery-started')
if self._discovery:
return
self._discovery = True
for obj in self._om.get_objects(interface=ORG_BLUEZ_ADAPTER1):
i = obj.get_interface(ORG_BLUEZ_ADAPTER1)
# remove the duplicate data filter so we get notifications as they come in
i.SetDiscoveryFilter('(a{sv})', {'DuplicateData': GLib.Variant.new_boolean(False)})
objpath = obj.get_object_path()
try:
i.StartDiscovery()
logger.debug(f'{objpath}: Discovery started (timeout {timeout})')
except GLib.Error as e:
if (e.domain == 'g-io-error-quark' and
e.code == Gio.IOErrorEnum.DBUS_ERROR and
Gio.dbus_error_get_remote_error(e) == 'org.bluez.Error.InProgress'):
logger.debug(f'{objpath}: Already listening')
if timeout > 0:
GObject.timeout_add_seconds(timeout, self._discovery_timeout_expired)
# FIXME: Any errors up to here should trigger discovery-stopped
# signal with the status code
def stop_discovery(self):
'''
Stop an ongoing discovery mode. Any errors are logged but ignored.
This emits the discovery-stopped signal
'''
if not self._discovery:
return
self._discovery = False
for obj in self._om.get_objects(interface=ORG_BLUEZ_ADAPTER1):
i = obj.get_interface(ORG_BLUEZ_ADAPTER1)
objpath = obj.get_object_path()
try:
i.StopDiscovery()
logger.debug(f'{objpath}: Discovery stopped')
except GLib.Error as e:
logger.debug(f'{objpath}: Failed to stop discovery ({e})')
# reset the discovery filters
i.SetDiscoveryFilter('(a{sv})', {})
self.emit('discovery-stopped')
def _on_device_updated(self, device):
'''Callback for Device's properties-changed'''
# logger.debug(f'Object updated: {device.name}')
self.emit('device-updated', device)
def _on_om_object_added(self, om, obj):
'''Callback for ObjectManager's object-added'''
objpath = obj.get_object_path()
logger.debug(f'Object added: {objpath}')
self._process_object(obj, event=True)
def _on_om_object_removed(self, om, obj):
'''Callback for ObjectManager's object-removed'''
objpath = obj.get_object_path()
logger.debug(f'Object removed: {objpath}')
def _process_object(self, obj, event=True):
'''Process a single DBusProxyObject'''
if obj.get_interface(ORG_BLUEZ_ADAPTER1) is not None:
self._process_adapter(obj)
elif obj.get_interface(ORG_BLUEZ_DEVICE1) is not None:
self._process_device(obj)
elif obj.get_interface(ORG_BLUEZ_GATTCHARACTERISTIC1) is not None:
self._process_characteristic(obj)
def _process_adapter(self, obj):
objpath = obj.get_object_path()
logger.debug(f'Adapter: {objpath}')
def _process_device(self, obj):
dev = BlueZDevice(self._om, obj)
self.devices.append(dev)
dev.connect('updated', self._on_device_updated)
self.emit('device-added', dev)
def _process_characteristic(self, obj):
objpath = obj.get_object_path()
logger.debug(f'Characteristic {objpath}')