mirror of https://github.com/labapart/gattlib
228 lines
7.8 KiB
Python
228 lines
7.8 KiB
Python
#
|
|
# SPDX-License-Identifier: BSD-3-Clause
|
|
#
|
|
# Copyright (c) 2016-2021, Olivier Martin <olivier@labapart.org>
|
|
#
|
|
|
|
import logging
|
|
import uuid
|
|
|
|
from gattlib import *
|
|
from .exception import handle_return, DeviceError
|
|
from .gatt import GattService, GattCharacteristic
|
|
from .uuid import gattlib_uuid_to_int
|
|
|
|
CONNECTION_OPTIONS_LEGACY_BDADDR_LE_PUBLIC = (1 << 0)
|
|
CONNECTION_OPTIONS_LEGACY_BDADDR_LE_RANDOM = (1 << 1)
|
|
CONNECTION_OPTIONS_LEGACY_BT_SEC_LOW = (1 << 2)
|
|
CONNECTION_OPTIONS_LEGACY_BT_SEC_MEDIUM = (1 << 3)
|
|
CONNECTION_OPTIONS_LEGACY_BT_SEC_HIGH = (1 << 4)
|
|
|
|
CONNECTION_OPTIONS_LEGACY_DEFAULT = \
|
|
CONNECTION_OPTIONS_LEGACY_BDADDR_LE_PUBLIC | \
|
|
CONNECTION_OPTIONS_LEGACY_BDADDR_LE_RANDOM | \
|
|
CONNECTION_OPTIONS_LEGACY_BT_SEC_LOW
|
|
|
|
|
|
class Device:
|
|
|
|
def __init__(self, adapter, addr, name=None):
|
|
self._adapter = adapter
|
|
if type(addr) == str:
|
|
self._addr = addr.encode("utf-8")
|
|
else:
|
|
self._addr = addr
|
|
self._name = name
|
|
self._connection = None
|
|
|
|
# Keep track if notification handler has been initialized
|
|
self._is_notification_init = False
|
|
|
|
# Dictionnary for GATT characteristic callback
|
|
self._gatt_characteristic_callbacks = {}
|
|
|
|
@property
|
|
def id(self):
|
|
return self._addr.decode("utf-8")
|
|
|
|
@property
|
|
def connection(self):
|
|
if self._connection:
|
|
return self._connection
|
|
else:
|
|
return c_void_p(None)
|
|
|
|
def connect(self, options=CONNECTION_OPTIONS_LEGACY_DEFAULT):
|
|
if self._adapter:
|
|
adapter_name = self._adapter.name
|
|
else:
|
|
adapter_name = None
|
|
|
|
self._connection = gattlib_connect(adapter_name, self._addr, options)
|
|
if self._connection == 0:
|
|
raise DeviceError()
|
|
|
|
# Disable until https://github.com/labapart/gattlib/issues/75 is resolved
|
|
# @property
|
|
# def rssi(self):
|
|
# _rssi = c_int16(0)
|
|
# ret = gattlib_get_rssi(self._connection, byref(_rssi))
|
|
# handle_return(ret)
|
|
# return _rssi.value
|
|
|
|
@staticmethod
|
|
def on_disconnection(user_data):
|
|
this = user_data
|
|
|
|
this.disconnection_callback(this.disconnection_user_data)
|
|
|
|
def register_on_disconnect(self, callback, user_data):
|
|
self.disconnection_callback = callback
|
|
self.disconnection_user_data = user_data
|
|
|
|
gattlib_register_on_disconnect(self.connection, Device.on_disconnection, self)
|
|
|
|
def disconnect(self):
|
|
ret = gattlib_disconnect(self.connection)
|
|
handle_return(ret)
|
|
|
|
def discover(self):
|
|
#
|
|
# Discover GATT Services
|
|
#
|
|
_services = POINTER(GattlibPrimaryService)()
|
|
_services_count = c_int(0)
|
|
ret = gattlib_discover_primary(self.connection, byref(_services), byref(_services_count))
|
|
handle_return(ret)
|
|
|
|
self._services = {}
|
|
for i in range(0, _services_count.value):
|
|
service = GattService(self, _services[i])
|
|
self._services[service.short_uuid] = service
|
|
|
|
logging.debug("Service UUID:0x%x" % service.short_uuid)
|
|
|
|
#
|
|
# Discover GATT Characteristics
|
|
#
|
|
_characteristics = POINTER(GattlibCharacteristic)()
|
|
_characteristics_count = c_int(0)
|
|
ret = gattlib_discover_char(self.connection, byref(_characteristics), byref(_characteristics_count))
|
|
handle_return(ret)
|
|
|
|
self._characteristics = {}
|
|
for i in range(0, _characteristics_count.value):
|
|
characteristic = GattCharacteristic(self, _characteristics[i])
|
|
self._characteristics[characteristic.short_uuid] = characteristic
|
|
|
|
logging.debug("Characteristic UUID:0x%x" % characteristic.short_uuid)
|
|
|
|
def get_advertisement_data(self):
|
|
_advertisement_data = POINTER(GattlibAdvertisementData)()
|
|
_advertisement_data_count = c_size_t(0)
|
|
_manufacturer_id = c_uint16(0)
|
|
_manufacturer_data = c_void_p(None)
|
|
_manufacturer_data_len = c_size_t(0)
|
|
|
|
if self._connection is None:
|
|
ret = gattlib_get_advertisement_data_from_mac(self._adapter._adapter, self._addr,
|
|
byref(_advertisement_data), byref(_advertisement_data_count),
|
|
byref(_manufacturer_id),
|
|
byref(_manufacturer_data), byref(_manufacturer_data_len))
|
|
else:
|
|
ret = gattlib_get_advertisement_data(self._connection,
|
|
byref(_advertisement_data), byref(_advertisement_data_count),
|
|
byref(_manufacturer_id),
|
|
byref(_manufacturer_data), byref(_manufacturer_data_len))
|
|
|
|
handle_return(ret)
|
|
|
|
advertisement_data = {}
|
|
manufacturer_data = None
|
|
|
|
for i in range(0, _advertisement_data_count.value):
|
|
service_data = _advertisement_data[i]
|
|
uuid = gattlib_uuid_to_int(service_data.uuid)
|
|
|
|
pointer_type = POINTER(c_byte * service_data.data_length)
|
|
c_bytearray = cast(service_data.data, pointer_type)
|
|
|
|
data = bytearray(service_data.data_length)
|
|
for i in range(service_data.data_length):
|
|
data[i] = c_bytearray.contents[i] & 0xFF
|
|
|
|
advertisement_data[uuid] = data
|
|
|
|
if _manufacturer_data_len.value > 0:
|
|
pointer_type = POINTER(c_byte * _manufacturer_data_len.value)
|
|
c_bytearray = cast(_manufacturer_data, pointer_type)
|
|
|
|
manufacturer_data = bytearray(_manufacturer_data_len.value)
|
|
for i in range(_manufacturer_data_len.value):
|
|
manufacturer_data[i] = c_bytearray.contents[i] & 0xFF
|
|
|
|
return advertisement_data, _manufacturer_id.value, manufacturer_data
|
|
|
|
@property
|
|
def services(self):
|
|
if not hasattr(self, '_services'):
|
|
logging.warning("Start GATT discovery implicitly")
|
|
self.discover()
|
|
|
|
return self._services
|
|
|
|
@property
|
|
def characteristics(self):
|
|
if not hasattr(self, '_characteristics'):
|
|
logging.warning("Start GATT discovery implicitly")
|
|
self.discover()
|
|
|
|
return self._characteristics
|
|
|
|
@staticmethod
|
|
def notification_callback(uuid_str, data, data_len, user_data):
|
|
this = user_data
|
|
|
|
notification_uuid = uuid.UUID(uuid_str)
|
|
|
|
short_uuid = notification_uuid.int
|
|
if short_uuid not in this._gatt_characteristic_callbacks:
|
|
raise RuntimeError("UUID '%s' is expected to be part of the notification list")
|
|
else:
|
|
characteristic_callback = this._gatt_characteristic_callbacks[short_uuid]
|
|
|
|
# value = bytearray(data_len)
|
|
# for i in range(data_len):
|
|
# value[i] = data[i]
|
|
|
|
pointer_type = POINTER(c_ubyte * data_len)
|
|
c_bytearray = cast(data, pointer_type)
|
|
|
|
value = bytearray(data_len)
|
|
for i in range(data_len):
|
|
value[i] = c_bytearray.contents[i]
|
|
|
|
# Call GATT characteristic Notification callback
|
|
characteristic_callback['callback'](value, characteristic_callback['user_data'])
|
|
|
|
def _notification_init(self):
|
|
if self._is_notification_init:
|
|
return
|
|
|
|
self._is_notification_init = True
|
|
|
|
gattlib_register_notification(self._connection, Device.notification_callback, self)
|
|
|
|
def _notification_add_gatt_characteristic_callback(self, gatt_characteristic, callback, user_data):
|
|
if not self._is_notification_init:
|
|
self._notification_init()
|
|
|
|
self._gatt_characteristic_callbacks[gatt_characteristic.short_uuid] = { 'callback': callback, 'user_data': user_data }
|
|
|
|
def __str__(self):
|
|
name = self._name
|
|
if name:
|
|
return str(name)
|
|
else:
|
|
return str(self._addr)
|