From f6e7c72d9d5f4917ecf5c46bd7f57884ae7e8f13 Mon Sep 17 00:00:00 2001 From: Benjamin Tissoires Date: Thu, 8 Feb 2018 13:33:28 +0100 Subject: [PATCH] wacom: Split out the Nordic communication protocol in its own class So the low level bits are not in the same class than the protocols --- tuhi/wacom.py | 150 +++++++++++++++++++++++++++----------------------- 1 file changed, 82 insertions(+), 68 deletions(-) diff --git a/tuhi/wacom.py b/tuhi/wacom.py index 9044b73..0080b35 100644 --- a/tuhi/wacom.py +++ b/tuhi/wacom.py @@ -95,83 +95,23 @@ class WacomCorruptDataException(WacomException): errno = errno.EPROTO -class WacomProtocol(GObject.Object): +class WacomProtocolLowLevelComm(GObject.Object): ''' Internal class to handle the communication with the Wacom device. + No-one should directly instanciate this. :param device: the BlueZDevice object that is this wacom device - :param uuid: the UUID {to be} assigned to the device ''' - protocol = Protocol.UNKNOWN - __gsignals__ = { - 'drawing': - (GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_PYOBJECT,)), - 'button-press-required': - (GObject.SignalFlags.RUN_FIRST, None, ()), - # battery level in %, boolean for is-charging - "battery-status": - (GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_INT, GObject.TYPE_BOOLEAN)), - } - - def __init__(self, device, uuid): + def __init__(self, device): GObject.Object.__init__(self) self.device = device self.nordic_answer = None - self.pen_data_buffer = [] - self.name = device.name - self._uuid = uuid self.fw_logger = logging.getLogger('tuhi.fw') - device.connect_gatt_value(WACOM_CHRC_LIVE_PEN_DATA_UUID, - self._on_pen_data_changed) - device.connect_gatt_value(WACOM_OFFLINE_CHRC_PEN_DATA_UUID, - self._on_pen_data_received) device.connect_gatt_value(NORDIC_UART_CHRC_RX_UUID, self._on_nordic_data_received) - device.connect_gatt_value(MYSTERIOUS_NOTIFICATION_CHRC_UUID, - self._on_mysterious_data_received) - - @classmethod - def is_spark(cls, device): - return MYSTERIOUS_NOTIFICATION_CHRC_UUID not in device.characteristics - - def _on_mysterious_data_received(self, name, value): - self.fw_logger.debug(f'mysterious: {binascii.hexlify(bytes(value))}') - - def _on_pen_data_changed(self, name, value): - logger.debug(binascii.hexlify(bytes(value))) - - if value[0] == 0x10: - pressure = int.from_bytes(value[2:4], byteorder='little') - buttons = int(value[10]) - logger.info(f'New Pen Data: pressure: {pressure}, button: {buttons}') - elif value[0] == 0xa2: - # entering proximity event - length = value[1] - pen_id = binascii.hexlify(bytes(value[2:])) - logger.info(f'Pen {pen_id} entered proximity') - elif value[0] == 0xa1: - # data event - length = value[1] - if length % 6 != 0: - logger.error(f'wrong data: {binascii.hexlify(bytes(value))}') - return - data = value[2:] - while data: - if bytes(data) == b'\xff\xff\xff\xff\xff\xff': - logger.info(f'Pen left proximity') - else: - x = int.from_bytes(data[0:2], byteorder='little') - y = int.from_bytes(data[2:4], byteorder='little') - pressure = int.from_bytes(data[4:6], byteorder='little') - self.logger.info(f'New Pen Data: ({x},{y}), pressure: {pressure}') - data = data[6:] - - def _on_pen_data_received(self, name, data): - self.fw_logger.debug(f'RX Pen <-- {list2hex(data)}') - self.pen_data_buffer.extend(data) def _on_nordic_data_received(self, name, value): self.fw_logger.debug(f'RX Nordic <-- {list2hex(value)}') @@ -185,7 +125,7 @@ class WacomProtocol(GObject.Object): def check_nordic_incoming(self): if self.nordic_answer is None: - raise WacomTimeoutException(f'{self.name}: Timeout while reading data') + raise WacomTimeoutException(f'{self.device.name}: Timeout while reading data') answer = self.nordic_answer self.nordic_answer = None @@ -243,6 +183,80 @@ class WacomProtocol(GObject.Object): return args + +class WacomProtocolBase(WacomProtocolLowLevelComm): + ''' + Internal class to handle the basic communications with the Wacom device. + No-one should directly instanciate this. + + + :param device: the BlueZDevice object that is this wacom device + :param uuid: the UUID {to be} assigned to the device + ''' + protocol = Protocol.UNKNOWN + + __gsignals__ = { + 'drawing': + (GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_PYOBJECT,)), + 'button-press-required': + (GObject.SignalFlags.RUN_FIRST, None, ()), + # battery level in %, boolean for is-charging + "battery-status": + (GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_INT, GObject.TYPE_BOOLEAN)), + } + + def __init__(self, device, uuid): + super().__init__(device) + self._uuid = uuid + self.pen_data_buffer = [] + + device.connect_gatt_value(WACOM_CHRC_LIVE_PEN_DATA_UUID, + self._on_pen_data_changed) + device.connect_gatt_value(WACOM_OFFLINE_CHRC_PEN_DATA_UUID, + self._on_pen_data_received) + device.connect_gatt_value(MYSTERIOUS_NOTIFICATION_CHRC_UUID, + self._on_mysterious_data_received) + + @classmethod + def is_spark(cls, device): + return MYSTERIOUS_NOTIFICATION_CHRC_UUID not in device.characteristics + + def _on_mysterious_data_received(self, name, value): + self.fw_logger.debug(f'mysterious: {binascii.hexlify(bytes(value))}') + + def _on_pen_data_changed(self, name, value): + logger.debug(binascii.hexlify(bytes(value))) + + if value[0] == 0x10: + pressure = int.from_bytes(value[2:4], byteorder='little') + buttons = int(value[10]) + logger.info(f'New Pen Data: pressure: {pressure}, button: {buttons}') + elif value[0] == 0xa2: + # entering proximity event + length = value[1] + pen_id = binascii.hexlify(bytes(value[2:])) + logger.info(f'Pen {pen_id} entered proximity') + elif value[0] == 0xa1: + # data event + length = value[1] + if length % 6 != 0: + logger.error(f'wrong data: {binascii.hexlify(bytes(value))}') + return + data = value[2:] + while data: + if bytes(data) == b'\xff\xff\xff\xff\xff\xff': + logger.info(f'Pen left proximity') + else: + x = int.from_bytes(data[0:2], byteorder='little') + y = int.from_bytes(data[2:4], byteorder='little') + pressure = int.from_bytes(data[4:6], byteorder='little') + self.logger.info(f'New Pen Data: ({x},{y}), pressure: {pressure}') + data = data[6:] + + def _on_pen_data_received(self, name, data): + self.fw_logger.debug(f'RX Pen <-- {list2hex(data)}') + self.pen_data_buffer.extend(data) + def check_connection(self): args = [int(i) for i in binascii.unhexlify(self._uuid)] self.send_nordic_command_sync(command=0xe6, @@ -429,7 +443,7 @@ class WacomProtocol(GObject.Object): bitmask, opcode, raw_args, args, offset = self.next_pen_data(data, offset) if opcode == 0x3800: logger.info(f'beginning of sequence') - drawing = Drawing(self.name, (self.width, self.height), timestamp) + drawing = Drawing(self.device.name, (self.width, self.height), timestamp) drawings.append(drawing) continue elif opcode == 0xeeff: @@ -531,7 +545,7 @@ class WacomProtocol(GObject.Object): logger.info(f'firmware is {fw_high}-{fw_low}') -class WacomProtocolSlate(WacomProtocol): +class WacomProtocolSlate(WacomProtocolBase): ''' Subclass to handle the communication oddities with the Wacom Slate-like devices. @@ -543,7 +557,7 @@ class WacomProtocolSlate(WacomProtocol): protocol = Protocol.SLATE -class WacomProtocolSpark(WacomProtocol): +class WacomProtocolSpark(WacomProtocolBase): ''' Subclass to handle the communication oddities with the Wacom Spark-like devices. @@ -687,7 +701,7 @@ class WacomDevice(GObject.Object): if protocol == Protocol.UNKNOWN: # we are registering a new device, or we might have an early # config file from an older tuhi version - if WacomProtocol.is_spark(self._device): + if WacomProtocolBase.is_spark(self._device): protocol = Protocol.SPARK else: protocol = Protocol.SLATE