diff --git a/tuhi.py b/tuhi.py index e142e95..7776a73 100755 --- a/tuhi.py +++ b/tuhi.py @@ -17,6 +17,7 @@ from gi.repository import GObject from tuhi.dbusserver import TuhiDBusServer from tuhi.ble import BlueZDeviceManager +from tuhi.wacom import WacomDevice logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger('tuhi') diff --git a/tuhi/wacom.py b/tuhi/wacom.py new file mode 100644 index 0000000..910dc0e --- /dev/null +++ b/tuhi/wacom.py @@ -0,0 +1,543 @@ +#!/usr/bin/env python3 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# + + +import binascii +import logging +import threading +import sys +import time +from gi.repository import GObject + +from tuhi.dbusserver import TuhiDBusServer +from tuhi.ble import BlueZDeviceManager + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger('wacom') + +WACOM_COMPANY_ID = 0x4755 +NORDIC_UART_SERVICE_UUID = '6e400001-b5a3-f393-e0a9-e50e24dcca9e' +NORDIC_UART_CHRC_TX_UUID = '6e400002-b5a3-f393-e0a9-e50e24dcca9e' +NORDIC_UART_CHRC_RX_UUID = '6e400003-b5a3-f393-e0a9-e50e24dcca9e' + +WACOM_LIVE_SERVICE_UUID = '00001523-1212-efde-1523-785feabcd123' +WACOM_CHRC_LIVE_PEN_DATA_UUID = '00001524-1212-efde-1523-785feabcd123' + +WACOM_OFFLINE_SERVICE_UUID = 'ffee0001-bbaa-9988-7766-554433221100' +WACOM_OFFLINE_CHRC_PEN_DATA_UUID = 'ffee0003-bbaa-9988-7766-554433221100' + +WACOM_SLATE_WIDTH = 14800 +WACOM_SLATE_HEIGHT = 21600 + +# FIXME: this should be generated once and stored for future use (dconf?) +SMARTPAD_UUID = 'dead00beef00' +SMARTPAD_UUID = '1d6adc5fac76' +SMARTPAD_UUID = '4810d75d5d4d' + +def signed_char_to_int(v): + if v & 0x80: + return v - (1 << 8) + return v + + +def b2hex(bs): + '''Convert bytes() to a two-letter hex string in the form "1a 2b c3"''' + hx = binascii.hexlify(bs).decode("ascii") + return ' '.join([''.join(s) for s in zip(hx[::2], hx[1::2])]) + + +def list2hex(l): + '''Converts a list of integers to a two-letter hex string in the form + "1a 2b c3"''' + return ' '.join(['{:02x}'.format(x) for x in l]) + + +class NordicData(list): + def __init__(self, bs): + super().__init__(bs[2:]) + self.opcode = bs[0] + self.length = bs[1] + +class Stroke(object): + def __init__(self): + self.points = [] + + def add_pos(self, x, y): + self.points.append(('M', x, y)) + + def add_rel(self, x, y, p=None): + self.points.append(('l', x, y, p)) + + +class Drawing(list): + def __init__(self, size, timestamp): + super().__init__() + self.timestamp = timestamp + self.size = size + + +class WacomException(Exception): + pass + + +class WacomEEAGAINException(WacomException): + pass + + +class WacomNotPairedException(WacomException): + pass + + +class WacomTimeoutException(WacomException): + pass + + +class WacomCorruptDataException(WacomException): + pass + + +class WacomDevice(GObject.Object): + def __init__(self, device): + self.device = device + self.nordic_answer = None + self.pen_data_buffer = [] + self.thread = None + self.width = WACOM_SLATE_WIDTH + self.height = WACOM_SLATE_HEIGHT + self.name = device.name + + device.connect_gatt_value(WACOM_CHRC_LIVE_PEN_DATA_UUID, + self._on_pen_data_changed) + device.connect_gatt_value(WACOM_OFFLINE_CHRC_PEN_DATA_UUID, + self._on_pen_data_received) + device.connect_gatt_value(NORDIC_UART_CHRC_RX_UUID, + self._on_nordic_data_received) + + def is_slate(self): + return self.name == "Bamboo Slate" + + def _on_pen_data_changed(self, name, value): + logger.debug(binascii.hexlify(bytes(value))) + + if value[0] == 0x10: + pressure = (value[3] << 8) | value[2] + buttons = int(value[10]) + logger.info(f'New Pen Data: pressure: {pressure}, button: {buttons}') + elif value[0] == 0xa2: + # entering proximity event + length = value[1] + pen_id = binascii.hexlify(bytes(value[2:])) + logger.info(f'Pen {pen_id} entered proximity') + elif value[0] == 0xa1: + # data event + length = value[1] + if length % 6 != 0: + logger.error(f'wrong data: {binascii.hexlify(bytes(value))}') + return + data = value[2:] + while data: + if bytes(data) == b'\xff\xff\xff\xff\xff\xff': + logger.info(f'Pen left proximity') + else: + y = (data[1] << 8) | data[0] + x = (data[3] << 8) | data[2] + pressure = (data[5] << 8) | data[4] + logger.info(f'New Pen Data: ({x},{y}), pressure: {pressure}') + data = data[6:] + + def _on_pen_data_received(self, name, data): + logger.debug(f"received pen data: {data}") + self.pen_data_buffer.extend(data) + + def _on_nordic_data_received(self, name, value): + logger.debug(f"received nordic data: {value}") + self.nordic_answer = value + + def send_nordic_command(self, command, arguments): + chrc = self.device.characteristics[NORDIC_UART_CHRC_TX_UUID] + data = [command, len(arguments), *arguments] + logger.debug(f'sending ' + + f'{command:02x} / {len(arguments):02x} / {list2hex(arguments)}') + chrc.write_value(data) + + def check_nordic_incoming(self): + if self.nordic_answer is None: + raise WacomTimeoutException(f"{self.name}:" + + " Timeout while reading data") + + answer = self.nordic_answer + self.nordic_answer = None + length = answer[1] + args = answer[2:] + if length != len(args): + raise WacomException(f"error while processing answer, should" + + " get an answer of size {length} instead" + + " of {len(args)}") + return NordicData(answer) + + def wait_nordic_data(self, expected_opcode, timeout): + t = time.time() + while self.nordic_answer is None and time.time() - t < timeout: + time.sleep(0.1) + + data = self.check_nordic_incoming() + + logger.debug(f'received {data.opcode:02x} / {data.length:02x} / {b2hex(bytes(data))}') + + if isinstance(expected_opcode, list): + if data.opcode not in expected_opcode: + raise WacomException(f"unexpected opcode: {data.opcode:02x}") + else: + if data.opcode != expected_opcode: + raise WacomException(f"unexpected opcode: {data.opcode:02x}") + + return data + + def check_ack(self, data): + if len(data) != 1: + str_b = binascii.hexlify(bytes(data)) + raise WacomException(f"unexpected data: {str_b}") + if data[0] == 0x07: + raise WacomNotPairedException(f"wrong device, please redo pairing") + if data[0] == 0x02: + raise WacomEEAGAINException(f"unexpected answer: {data[0]:02x}") + + def send_nordic_command_sync(self, + command, + expected_opcode, + arguments=None): + if arguments is None: + arguments = [0x00] + + self.send_nordic_command(command, arguments) + + if expected_opcode is None: + return None + + args = self.wait_nordic_data(expected_opcode, 5) + + if expected_opcode == 0xb3: # generic ACK + self.check_ack(args) + + return args + + def check_connection(self): + args = [int(i) for i in binascii.unhexlify(SMARTPAD_UUID)] + self.send_nordic_command_sync(command=0xe6, + expected_opcode=0xb3, + arguments=args) + + def register_connection(self): + args = [int(i) for i in binascii.unhexlify(SMARTPAD_UUID)] + self.send_nordic_command(command=0xe7, + arguments=args) + + def e3_command(self): + self.send_nordic_command_sync(command=0xe3, + expected_opcode=0xb3) + + def set_time(self): + self.current_time = time.strftime("%y%m%d%H%M%S") + args = [int(i) for i in binascii.unhexlify(self.current_time)] + self.send_nordic_command_sync(command=0xb6, + expected_opcode=0xb3, + arguments=args) + + def read_time(self): + data = self.send_nordic_command_sync(command=0xb6, + expected_opcode=0xbd) + logger.debug(f'b6 returned {data}') + # FIXME: check if data matches self.current_time + + def get_battery_info(self): + data = self.send_nordic_command_sync(command=0xb9, + expected_opcode=0xba) + return int(data[0]), data[1] == 1 + + def get_firmware_version(self, arg): + data = self.send_nordic_command_sync(command=0xb7, + expected_opcode=0xb8, + arguments=(arg,)) + fw = ''.join([hex(d)[2:] for d in data]) + return fw.upper() + + def bb_command(self): + data = self.send_nordic_command_sync(command=0xbb, + expected_opcode=0xbc) + logger.debug(f'bb returned {data}') + + def get_dimensions(self, arg): + possible_args = { + 'height': 3, + 'width': 4, + } + args = [possible_args[arg], 0x00] + data = self.send_nordic_command_sync(command=0xea, + expected_opcode=0xeb, + arguments=args) + if len(data) != 6: + str_data = binascii.hexlify(bytes(data)) + raise WacomCorruptDataException(f'unexpected answer for ' + + f'get_dimensions: {str_data}') + return (data[3] << 8) | data[2] + + def ec_command(self): + args = [0x06, 0x00, 0x00, 0x00, 0x00, 0x00] + self.send_nordic_command_sync(command=0xec, + expected_opcode=0xb3, + arguments=args) + + def start_live(self): + self.send_nordic_command_sync(command=0xb1, + expected_opcode=0xb3) + + def stop_live(self): + args = [0x02] + self.send_nordic_command_sync(command=0xb1, + expected_opcode=0xb3, + arguments=args) + + def b1_command(self): + args = [0x01] + self.send_nordic_command_sync(command=0xb1, + expected_opcode=0xb3, + arguments=args) + + def is_data_available(self): + data = self.send_nordic_command_sync(command=0xc1, + expected_opcode=0xc2) + n = data[1] | (data[0] << 8) + logger.debug(f'Drawings available: {n}') + return n > 0 + + def get_stroke_data_slate(self): + data = self.send_nordic_command_sync(command=0xcc, + expected_opcode=0xcf) + # logger.debug(f'cc returned {data} ') + count = 0 + for i in range(4): + count |= data[i] << (i * 8) + str_timestamp = ''.join([hex(d)[2:] for d in data[4:]]) + timestamp = time.strptime(str_timestamp, "%y%m%d%H%M%S") + return count, timestamp + + def get_stroke_data_spark(self): + data = self.send_nordic_command_sync(command=0xc5, + expected_opcode=[0xc7, 0xcd]) + # FIXME: Sometimes the 0xc7 is missing on the spark? Not in any of + # the btsnoop logs but I only rarely get a c7 response here + count = 0 + if data.opcode == 0xc7: + for i in range(4): + count |= data[i] << (i * 8) + data = self.wait_nordic_data(0xcd, 5) + # logger.debug(f'cc returned {data} ') + + str_timestamp = ''.join([hex(d)[2:] for d in data]) + timestamp = time.strptime(str_timestamp, "%y%m%d%H%M%S") + return count, timestamp + + def get_stroke_data(self): + if self.is_slate(): + return self.get_stroke_data_slate() + return self.get_stroke_data_spark() + + def start_reading(self): + data = self.send_nordic_command_sync(command=0xc3, + expected_opcode=0xc8) + if data[0] != 0xbe: + raise WacomException(f"unexpected answer: {data[0]:02x}") + + def wait_for_end_read(self): + data = self.wait_nordic_data(0xc8, 5) + if data[0] != 0xed: + raise WacomException(f"unexpected answer: {data[0]:02x}") + crc = data[1:] + if not self.is_slate(): + data = self.wait_nordic_data(0xc9, 5) + crc = data + crc.reverse() + crc = int(binascii.hexlify(bytes(crc)), 16) + pen_data = self.pen_data_buffer + self.pen_data_buffer = [] + if crc != binascii.crc32(bytes(pen_data)): + if self.is_slate(): + raise WacomCorruptDataException("CRCs don't match") + else: + logger.error("CRCs don't match") + return pen_data + + def ack_transaction(self): + if self.is_slate(): + opcode = 0xb3 + else: + opcode = None + self.send_nordic_command_sync(command=0xca, + expected_opcode=opcode) + + def next_pen_data(self, data, offset): + debug_data = [] + bitmask = data[offset] + opcode = 0 + offset += 1 + debug_data.append(f'{bitmask:02x} ({bitmask:08b})') + debug_data.append('|') + args_length = bin(bitmask).count('1') + args = data[offset:offset + args_length] + formatted_args = [] + n = 0 + for i in range(2): + if (1 << i) & bitmask: + debug_data.append(f'{args[n]:02x}') + opcode |= args[n] << (i * 8) + formatted_args.append(args[n]) + n += 1 + else: + formatted_args.append(0) + debug_data.append(' ') + debug_data.append(f'|') + for i in range(2, 8): + if (1 << i) & bitmask: + debug_data.append(f'{args[n]:02x}') + formatted_args.append(args[n]) + n += 1 + else: + formatted_args.append(0) + debug_data.append(' ') + logger.debug(f'{" ".join(debug_data)}') + return bitmask, opcode, formatted_args, offset + args_length + + def get_coordinate(self, bitmask, n, data, v, dv): + # drop the first 2 bytes as they are not valuable here + bitmask >>= 2 + data = data[2:] + is_rel = False + + full_coord_bitmask = 0b11 << (2 * n) + delta_coord_bitmask = 0b10 << (2 * n) + if (bitmask & full_coord_bitmask) == full_coord_bitmask: + v = (data[2 * n + 1] << 8) | data[2 * n] + dv = 0 + elif bitmask & delta_coord_bitmask: + dv += signed_char_to_int(data[2 * n + 1]) + is_rel = True + return v, dv, is_rel + + def parse_pen_data(self, data, timestamp): + offset = 0 + x, y, p = 0, 0, 0 + dx, dy, dp = 0, 0, 0 + + drawings = [] + drawing = None + stroke = None + while offset < len(data): + bitmask, opcode, args, offset = self.next_pen_data(data, offset) + if opcode == 0x3800: + logger.info(f'beginning of sequence') + drawing = Drawing((self.width, self.height), timestamp) + drawings.append(drawing) + continue + elif opcode == 0xeeff: + # some sort of headers + stroke = Stroke() + drawing.append(stroke) + continue + if bytes(args) == b'\xff\xff\xff\xff\xff\xff\xff\xff': + logger.info(f'end of sequence') + continue + if bytes(args) == b'\x00\x00\xff\xff\xff\xff\xff\xff': + logger.info(f'end of stroke') + stroke = None + continue + + if stroke is None: + stroke = Stroke() + drawing.append(stroke) + + y, dy, yrel = self.get_coordinate(bitmask, 0, args, y, dy) + x, dx, xrel = self.get_coordinate(bitmask, 1, args, x, dx) + p, dp, prel = self.get_coordinate(bitmask, 2, args, p, dp) + + x += dx + y += dy + p += dp + + logger.info(f'point at {x},{y} ({dx:+}, {dy:+}) ' + + f'with pressure {p} ({dp:+})') + + if bitmask & 0b00111100 == 0: + continue + if xrel or yrel or prel: + stroke.add_rel(dx, dy, dp) + else: + stroke.add_pos(x, y) + + return drawings + + def read_offline_data(self): + self.b1_command() + transaction_count = 0 + while self.is_data_available(): + count, timestamp = self.get_stroke_data() + logger.info(f"receiving {count} bytes" + + f" drawn on {time.asctime(timestamp)}") + self.start_reading() + pen_data = self.wait_for_end_read() + str_pen = binascii.hexlify(bytes(pen_data)) + logger.info(f"received {str_pen}") + prefix = pen_data[:4] + # not sure if we really need this check + # note: \x38\x62\x74 translates to '8bt' + if bytes(prefix) == b'\x62\x38\x62\x74': + drawings = self.parse_pen_data(pen_data, timestamp) + # FIXME: Do something with the drawing + self.ack_transaction() + transaction_count += 1 + return transaction_count + + def retrieve_data(self): + try: + self.check_connection() + if not self.is_slate(): + self.e3_command() + self.set_time() + battery, charging = self.get_battery_info() + if charging: + logger.debug(f'device is plugged in and charged at {battery}%') + else: + logger.debug(f'device is discharging: {battery}%') + if self.is_slate(): + self.width = self.get_dimensions('width') + self.height = self.get_dimensions('height') + self.debug(f'dimensions: {self.width}x{self.height}') + + fw_high = self.get_firmware_version(0) + fw_low = self.get_firmware_version(1) + self.debug(f'firmware is {fw_high}-{fw_low}') + self.ec_command() + if self.read_offline_data() == 0: + logger.info("no data to retrieve") + except WacomEEAGAINException: + logger.warning("no data, please make sure the LED is blue and" + + "the button is pressed to switch it back to green") + + def run(self): + time.sleep(2) + logger.debug('{}: starting'.format(self.device.address)) + self.retrieve_data() + + def start(self): + self.thread = threading.Thread(target=self.run) + self.thread.start() +