From d62a97a8cd5d0163289a9a3643962e784bdb7618 Mon Sep 17 00:00:00 2001 From: Benjamin Tissoires Date: Wed, 7 Feb 2018 18:52:48 +0100 Subject: [PATCH] wacom: have subclasses to distinguish between protocols The Intuos Pro Paper is close enough to the Slate but with some subtleties. Instead of having a bunch of ifs, let's have nice subclasses for the differences in the protocol. --- tuhi/wacom.py | 203 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 131 insertions(+), 72 deletions(-) diff --git a/tuhi/wacom.py b/tuhi/wacom.py index 22d8f54..41bc203 100644 --- a/tuhi/wacom.py +++ b/tuhi/wacom.py @@ -39,9 +39,6 @@ WACOM_OFFLINE_CHRC_PEN_DATA_UUID = 'ffee0003-bbaa-9988-7766-554433221100' MYSTERIOUS_NOTIFICATION_SERVICE_UUID = '3a340720-c572-11e5-86c5-0002a5d5c51b' MYSTERIOUS_NOTIFICATION_CHRC_UUID = '3a340721-c572-11e5-86c5-0002a5d5c51b' -WACOM_SLATE_WIDTH = 21600 -WACOM_SLATE_HEIGHT = 14800 - @enum.unique class Protocol(enum.Enum): @@ -102,8 +99,11 @@ class WacomProtocol(GObject.Object): ''' Internal class to handle the communication with the Wacom device. + :param device: the BlueZDevice object that is this wacom device + :param uuid: the UUID {to be} assigned to the device ''' + protocol = Protocol.UNKNOWN __gsignals__ = { 'drawing': @@ -120,8 +120,6 @@ class WacomProtocol(GObject.Object): self.device = device self.nordic_answer = None self.pen_data_buffer = [] - self.width = WACOM_SLATE_WIDTH - self.height = WACOM_SLATE_HEIGHT self.name = device.name self._uuid = uuid self.fw_logger = logging.getLogger('tuhi.fw') @@ -135,8 +133,9 @@ class WacomProtocol(GObject.Object): device.connect_gatt_value(MYSTERIOUS_NOTIFICATION_CHRC_UUID, self._on_mysterious_data_received) - def is_spark(self): - return MYSTERIOUS_NOTIFICATION_CHRC_UUID not in self.device.characteristics + @classmethod + def is_spark(cls, device): + return MYSTERIOUS_NOTIFICATION_CHRC_UUID not in device.characteristics def _on_mysterious_data_received(self, name, value): self.fw_logger.debug(f'mysterious: {binascii.hexlify(bytes(value))}') @@ -329,15 +328,11 @@ class WacomProtocol(GObject.Object): def is_data_available(self): data = self.send_nordic_command_sync(command=0xc1, expected_opcode=0xc2) - n = 0 - if self.is_spark(): - n = int.from_bytes(data[0:2], byteorder='big') - else: - n = int.from_bytes(data[0:2], byteorder='little') + n = int.from_bytes(data[0:2], byteorder='little') logger.debug(f'Drawings available: {n}') return n > 0 - def get_stroke_data_slate(self): + def get_stroke_data(self): data = self.send_nordic_command_sync(command=0xcc, expected_opcode=0xcf) # logger.debug(f'cc returned {data} ') @@ -346,26 +341,6 @@ class WacomProtocol(GObject.Object): timestamp = time.strptime(str_timestamp, '%y%m%d%H%M%S') return count, timestamp - def get_stroke_data_spark(self): - data = self.send_nordic_command_sync(command=0xc5, - expected_opcode=[0xc7, 0xcd]) - # FIXME: Sometimes the 0xc7 is missing on the spark? Not in any of - # the btsnoop logs but I only rarely get a c7 response here - count = 0 - if data.opcode == 0xc7: - count = int.from_bytes(data[0:4], byteorder='little') - data = self.wait_nordic_data(0xcd, 5) - # logger.debug(f'cc returned {data} ') - - str_timestamp = ''.join([f'{d:02x}' for d in data]) - timestamp = time.strptime(str_timestamp, '%y%m%d%H%M%S') - return count, timestamp - - def get_stroke_data(self): - if not self.is_spark(): - return self.get_stroke_data_slate() - return self.get_stroke_data_spark() - def start_reading(self): data = self.send_nordic_command_sync(command=0xc3, expected_opcode=0xc8) @@ -377,27 +352,17 @@ class WacomProtocol(GObject.Object): if data[0] != 0xed: raise WacomException(f'unexpected answer: {data[0]:02x}') crc = data[1:] - if self.is_spark(): - data = self.wait_nordic_data(0xc9, 5) - crc = data crc.reverse() crc = int(binascii.hexlify(bytes(crc)), 16) pen_data = self.pen_data_buffer self.pen_data_buffer = [] if crc != binascii.crc32(bytes(pen_data)): - if not self.is_spark(): - raise WacomCorruptDataException("CRCs don't match") - else: - logger.error("CRCs don't match") + raise WacomCorruptDataException("CRCs don't match") return pen_data def ack_transaction(self): - if self.is_spark(): - opcode = None - else: - opcode = 0xb3 self.send_nordic_command_sync(command=0xca, - expected_opcode=opcode) + expected_opcode=0xb3) def next_pen_data(self, data, offset): debug_data = [] @@ -527,8 +492,6 @@ class WacomProtocol(GObject.Object): def retrieve_data(self): try: self.check_connection() - if self.is_spark(): - self.e3_command() self.set_time() battery, charging = self.get_battery_info() if charging: @@ -536,21 +499,20 @@ class WacomProtocol(GObject.Object): else: logger.debug(f'device is discharging: {battery}%') self.emit('battery-status', battery, charging) - if not self.is_spark(): - self.width = w = self.get_dimensions('width') - self.height = h = self.get_dimensions('height') - logger.debug(f'dimensions: {w}x{h}') + self.width = w = self.get_dimensions('width') + self.height = h = self.get_dimensions('height') + logger.debug(f'dimensions: {w}x{h}') - fw_high = self.get_firmware_version(0) - fw_low = self.get_firmware_version(1) - logger.debug(f'firmware is {fw_high}-{fw_low}') - self.ec_command() + fw_high = self.get_firmware_version(0) + fw_low = self.get_firmware_version(1) + logger.debug(f'firmware is {fw_high}-{fw_low}') + self.ec_command() if self.read_offline_data() == 0: logger.info('no data to retrieve') except WacomEEAGAINException: logger.warning('no data, please make sure the LED is blue and the button is pressed to switch it back to green') - def register_device_slate(self): + def register_device(self): self.register_connection() logger.info('Press the button now to confirm') self.emit('button-press-required') @@ -571,7 +533,88 @@ class WacomProtocol(GObject.Object): fw_low = self.get_firmware_version(1) logger.info(f'firmware is {fw_high}-{fw_low}') - def register_device_spark(self): + +class WacomProtocolSlate(WacomProtocol): + ''' + Subclass to handle the communication oddities with the Wacom Slate-like + devices. + + :param device: the BlueZDevice object that is this wacom device + ''' + width = 21600 + height = 14800 + protocol = Protocol.SLATE + + +class WacomProtocolSpark(WacomProtocol): + ''' + Subclass to handle the communication oddities with the Wacom Spark-like + devices. + + :param device: the BlueZDevice object that is this wacom device + ''' + width = 21600 + height = 14800 + protocol = Protocol.SPARK + + def is_data_available(self): + data = self.send_nordic_command_sync(command=0xc1, + expected_opcode=0xc2) + n = int.from_bytes(data[0:2], byteorder='big') + logger.debug(f'Drawings available: {n}') + return n > 0 + + def get_stroke_data(self): + data = self.send_nordic_command_sync(command=0xc5, + expected_opcode=[0xc7, 0xcd]) + # FIXME: Sometimes the 0xc7 is missing on the spark? Not in any of + # the btsnoop logs but I only rarely get a c7 response here + count = 0 + if data.opcode == 0xc7: + count = int.from_bytes(data[0:4], byteorder='little') + data = self.wait_nordic_data(0xcd, 5) + # logger.debug(f'cc returned {data} ') + + str_timestamp = ''.join([f'{d:02x}' for d in data]) + timestamp = time.strptime(str_timestamp, '%y%m%d%H%M%S') + return count, timestamp + + def wait_for_end_read(self): + data = self.wait_nordic_data(0xc8, 5) + if data[0] != 0xed: + raise WacomException(f'unexpected answer: {data[0]:02x}') + crc = data[1:] + data = self.wait_nordic_data(0xc9, 5) + crc = data + crc.reverse() + crc = int(binascii.hexlify(bytes(crc)), 16) + pen_data = self.pen_data_buffer + self.pen_data_buffer = [] + if crc != binascii.crc32(bytes(pen_data)): + logger.error("CRCs don't match") + return pen_data + + def ack_transaction(self): + self.send_nordic_command_sync(command=0xca, + expected_opcode=None) + + def retrieve_data(self): + try: + self.check_connection() + self.e3_command() + self.set_time() + battery, charging = self.get_battery_info() + if charging: + logger.debug(f'device is plugged in and charged at {battery}%') + else: + logger.debug(f'device is discharging: {battery}%') + self.emit('battery-status', battery, charging) + if self.read_offline_data() == 0: + logger.info('no data to retrieve') + except WacomEEAGAINException: + logger.warning('no data, please make sure the LED is blue and the button is pressed to switch it back to green') + + def register_device(self): try: self.check_connection() except WacomWrongModeException: @@ -630,19 +673,40 @@ class WacomDevice(GObject.Object): except KeyError: # unregistered device self._uuid = None - self._protocol = None self._wacom_protocol = None else: self._uuid = self._config['uuid'] - try: - self._protocol = next(p for p in Protocol if p.value == self._config['Protocol']) - except StopIteration: - logger.error(f'Unknown protocol in configuration: {self._config["Protocol"]}') - raise WacomCorruptDataException(f'Unknown Protocol {self._config["Protocol"]}') self._init_protocol() def _init_protocol(self): - self._wacom_protocol = WacomProtocol(self._device, self._uuid) + protocol = Protocol.UNKNOWN + if self._config is not None: + try: + protocol = next(p for p in Protocol if p.value == self._config['Protocol']) + except StopIteration: + logger.error(f'Unknown protocol in configuration: {self._config["Protocol"]}') + raise WacomCorruptDataException(f'Unknown Protocol {self._config["Protocol"]}') + + if protocol == Protocol.UNKNOWN: + # we are registering a new device, or we might have an early + # config file from an older tuhi version + if WacomProtocol.is_spark(self._device): + protocol = Protocol.SPARK + else: + protocol = Protocol.SLATE + + if protocol == Protocol.SPARK: + self._wacom_protocol = WacomProtocolSpark(self._device, self._uuid) + elif protocol == Protocol.SLATE: + self._wacom_protocol = WacomProtocolSlate(self._device, self._uuid) + else: + # FIXME: change to an assert once intuos-pro is implemented, we + # never get here + logger.error(f'Unknown Protocol {protocol}') + raise WacomCorruptDataException(f'Protocol "{protocol}" not implemented') + + logger.debug(f'{self._device.name} is using {type(self._wacom_protocol)}') + self._wacom_protocol.connect('drawing', self._on_drawing_received) self._wacom_protocol.connect('button-press-required', self._on_button_press_required) self._wacom_protocol.connect('battery-status', self._on_battery_status) @@ -663,19 +727,14 @@ class WacomDevice(GObject.Object): @GObject.Property def protocol(self): - assert self._protocol is not None - return self._protocol + assert self._wacom_protocol is not None + return self._wacom_protocol.protocol def register_device(self): self._uuid = uuid.uuid4().hex[:12] logger.debug(f'{self._device.address}: registering device, assigned {self.uuid}') self._init_protocol() - if self._wacom_protocol.is_spark(): - self._wacom_protocol.register_device_spark() - self._protocol = Protocol.SPARK - else: - self._wacom_protocol.register_device_slate() - self._protocol = Protocol.SLATE + self._wacom_protocol.register_device() logger.info('registration completed') self.notify('uuid')