wacom: have subclasses to distinguish between protocols
The Intuos Pro Paper is close enough to the Slate but with some subtleties. Instead of having a bunch of ifs, let's have nice subclasses for the differences in the protocol.
This commit is contained in:
parent
1f843b1434
commit
d62a97a8cd
203
tuhi/wacom.py
203
tuhi/wacom.py
|
@ -39,9 +39,6 @@ WACOM_OFFLINE_CHRC_PEN_DATA_UUID = 'ffee0003-bbaa-9988-7766-554433221100'
|
||||||
MYSTERIOUS_NOTIFICATION_SERVICE_UUID = '3a340720-c572-11e5-86c5-0002a5d5c51b'
|
MYSTERIOUS_NOTIFICATION_SERVICE_UUID = '3a340720-c572-11e5-86c5-0002a5d5c51b'
|
||||||
MYSTERIOUS_NOTIFICATION_CHRC_UUID = '3a340721-c572-11e5-86c5-0002a5d5c51b'
|
MYSTERIOUS_NOTIFICATION_CHRC_UUID = '3a340721-c572-11e5-86c5-0002a5d5c51b'
|
||||||
|
|
||||||
WACOM_SLATE_WIDTH = 21600
|
|
||||||
WACOM_SLATE_HEIGHT = 14800
|
|
||||||
|
|
||||||
|
|
||||||
@enum.unique
|
@enum.unique
|
||||||
class Protocol(enum.Enum):
|
class Protocol(enum.Enum):
|
||||||
|
@ -102,8 +99,11 @@ class WacomProtocol(GObject.Object):
|
||||||
'''
|
'''
|
||||||
Internal class to handle the communication with the Wacom device.
|
Internal class to handle the communication with the Wacom device.
|
||||||
|
|
||||||
|
|
||||||
:param device: the BlueZDevice object that is this wacom device
|
:param device: the BlueZDevice object that is this wacom device
|
||||||
|
:param uuid: the UUID {to be} assigned to the device
|
||||||
'''
|
'''
|
||||||
|
protocol = Protocol.UNKNOWN
|
||||||
|
|
||||||
__gsignals__ = {
|
__gsignals__ = {
|
||||||
'drawing':
|
'drawing':
|
||||||
|
@ -120,8 +120,6 @@ class WacomProtocol(GObject.Object):
|
||||||
self.device = device
|
self.device = device
|
||||||
self.nordic_answer = None
|
self.nordic_answer = None
|
||||||
self.pen_data_buffer = []
|
self.pen_data_buffer = []
|
||||||
self.width = WACOM_SLATE_WIDTH
|
|
||||||
self.height = WACOM_SLATE_HEIGHT
|
|
||||||
self.name = device.name
|
self.name = device.name
|
||||||
self._uuid = uuid
|
self._uuid = uuid
|
||||||
self.fw_logger = logging.getLogger('tuhi.fw')
|
self.fw_logger = logging.getLogger('tuhi.fw')
|
||||||
|
@ -135,8 +133,9 @@ class WacomProtocol(GObject.Object):
|
||||||
device.connect_gatt_value(MYSTERIOUS_NOTIFICATION_CHRC_UUID,
|
device.connect_gatt_value(MYSTERIOUS_NOTIFICATION_CHRC_UUID,
|
||||||
self._on_mysterious_data_received)
|
self._on_mysterious_data_received)
|
||||||
|
|
||||||
def is_spark(self):
|
@classmethod
|
||||||
return MYSTERIOUS_NOTIFICATION_CHRC_UUID not in self.device.characteristics
|
def is_spark(cls, device):
|
||||||
|
return MYSTERIOUS_NOTIFICATION_CHRC_UUID not in device.characteristics
|
||||||
|
|
||||||
def _on_mysterious_data_received(self, name, value):
|
def _on_mysterious_data_received(self, name, value):
|
||||||
self.fw_logger.debug(f'mysterious: {binascii.hexlify(bytes(value))}')
|
self.fw_logger.debug(f'mysterious: {binascii.hexlify(bytes(value))}')
|
||||||
|
@ -329,15 +328,11 @@ class WacomProtocol(GObject.Object):
|
||||||
def is_data_available(self):
|
def is_data_available(self):
|
||||||
data = self.send_nordic_command_sync(command=0xc1,
|
data = self.send_nordic_command_sync(command=0xc1,
|
||||||
expected_opcode=0xc2)
|
expected_opcode=0xc2)
|
||||||
n = 0
|
n = int.from_bytes(data[0:2], byteorder='little')
|
||||||
if self.is_spark():
|
|
||||||
n = int.from_bytes(data[0:2], byteorder='big')
|
|
||||||
else:
|
|
||||||
n = int.from_bytes(data[0:2], byteorder='little')
|
|
||||||
logger.debug(f'Drawings available: {n}')
|
logger.debug(f'Drawings available: {n}')
|
||||||
return n > 0
|
return n > 0
|
||||||
|
|
||||||
def get_stroke_data_slate(self):
|
def get_stroke_data(self):
|
||||||
data = self.send_nordic_command_sync(command=0xcc,
|
data = self.send_nordic_command_sync(command=0xcc,
|
||||||
expected_opcode=0xcf)
|
expected_opcode=0xcf)
|
||||||
# logger.debug(f'cc returned {data} ')
|
# logger.debug(f'cc returned {data} ')
|
||||||
|
@ -346,26 +341,6 @@ class WacomProtocol(GObject.Object):
|
||||||
timestamp = time.strptime(str_timestamp, '%y%m%d%H%M%S')
|
timestamp = time.strptime(str_timestamp, '%y%m%d%H%M%S')
|
||||||
return count, timestamp
|
return count, timestamp
|
||||||
|
|
||||||
def get_stroke_data_spark(self):
|
|
||||||
data = self.send_nordic_command_sync(command=0xc5,
|
|
||||||
expected_opcode=[0xc7, 0xcd])
|
|
||||||
# FIXME: Sometimes the 0xc7 is missing on the spark? Not in any of
|
|
||||||
# the btsnoop logs but I only rarely get a c7 response here
|
|
||||||
count = 0
|
|
||||||
if data.opcode == 0xc7:
|
|
||||||
count = int.from_bytes(data[0:4], byteorder='little')
|
|
||||||
data = self.wait_nordic_data(0xcd, 5)
|
|
||||||
# logger.debug(f'cc returned {data} ')
|
|
||||||
|
|
||||||
str_timestamp = ''.join([f'{d:02x}' for d in data])
|
|
||||||
timestamp = time.strptime(str_timestamp, '%y%m%d%H%M%S')
|
|
||||||
return count, timestamp
|
|
||||||
|
|
||||||
def get_stroke_data(self):
|
|
||||||
if not self.is_spark():
|
|
||||||
return self.get_stroke_data_slate()
|
|
||||||
return self.get_stroke_data_spark()
|
|
||||||
|
|
||||||
def start_reading(self):
|
def start_reading(self):
|
||||||
data = self.send_nordic_command_sync(command=0xc3,
|
data = self.send_nordic_command_sync(command=0xc3,
|
||||||
expected_opcode=0xc8)
|
expected_opcode=0xc8)
|
||||||
|
@ -377,27 +352,17 @@ class WacomProtocol(GObject.Object):
|
||||||
if data[0] != 0xed:
|
if data[0] != 0xed:
|
||||||
raise WacomException(f'unexpected answer: {data[0]:02x}')
|
raise WacomException(f'unexpected answer: {data[0]:02x}')
|
||||||
crc = data[1:]
|
crc = data[1:]
|
||||||
if self.is_spark():
|
|
||||||
data = self.wait_nordic_data(0xc9, 5)
|
|
||||||
crc = data
|
|
||||||
crc.reverse()
|
crc.reverse()
|
||||||
crc = int(binascii.hexlify(bytes(crc)), 16)
|
crc = int(binascii.hexlify(bytes(crc)), 16)
|
||||||
pen_data = self.pen_data_buffer
|
pen_data = self.pen_data_buffer
|
||||||
self.pen_data_buffer = []
|
self.pen_data_buffer = []
|
||||||
if crc != binascii.crc32(bytes(pen_data)):
|
if crc != binascii.crc32(bytes(pen_data)):
|
||||||
if not self.is_spark():
|
raise WacomCorruptDataException("CRCs don't match")
|
||||||
raise WacomCorruptDataException("CRCs don't match")
|
|
||||||
else:
|
|
||||||
logger.error("CRCs don't match")
|
|
||||||
return pen_data
|
return pen_data
|
||||||
|
|
||||||
def ack_transaction(self):
|
def ack_transaction(self):
|
||||||
if self.is_spark():
|
|
||||||
opcode = None
|
|
||||||
else:
|
|
||||||
opcode = 0xb3
|
|
||||||
self.send_nordic_command_sync(command=0xca,
|
self.send_nordic_command_sync(command=0xca,
|
||||||
expected_opcode=opcode)
|
expected_opcode=0xb3)
|
||||||
|
|
||||||
def next_pen_data(self, data, offset):
|
def next_pen_data(self, data, offset):
|
||||||
debug_data = []
|
debug_data = []
|
||||||
|
@ -527,8 +492,6 @@ class WacomProtocol(GObject.Object):
|
||||||
def retrieve_data(self):
|
def retrieve_data(self):
|
||||||
try:
|
try:
|
||||||
self.check_connection()
|
self.check_connection()
|
||||||
if self.is_spark():
|
|
||||||
self.e3_command()
|
|
||||||
self.set_time()
|
self.set_time()
|
||||||
battery, charging = self.get_battery_info()
|
battery, charging = self.get_battery_info()
|
||||||
if charging:
|
if charging:
|
||||||
|
@ -536,21 +499,20 @@ class WacomProtocol(GObject.Object):
|
||||||
else:
|
else:
|
||||||
logger.debug(f'device is discharging: {battery}%')
|
logger.debug(f'device is discharging: {battery}%')
|
||||||
self.emit('battery-status', battery, charging)
|
self.emit('battery-status', battery, charging)
|
||||||
if not self.is_spark():
|
self.width = w = self.get_dimensions('width')
|
||||||
self.width = w = self.get_dimensions('width')
|
self.height = h = self.get_dimensions('height')
|
||||||
self.height = h = self.get_dimensions('height')
|
logger.debug(f'dimensions: {w}x{h}')
|
||||||
logger.debug(f'dimensions: {w}x{h}')
|
|
||||||
|
|
||||||
fw_high = self.get_firmware_version(0)
|
fw_high = self.get_firmware_version(0)
|
||||||
fw_low = self.get_firmware_version(1)
|
fw_low = self.get_firmware_version(1)
|
||||||
logger.debug(f'firmware is {fw_high}-{fw_low}')
|
logger.debug(f'firmware is {fw_high}-{fw_low}')
|
||||||
self.ec_command()
|
self.ec_command()
|
||||||
if self.read_offline_data() == 0:
|
if self.read_offline_data() == 0:
|
||||||
logger.info('no data to retrieve')
|
logger.info('no data to retrieve')
|
||||||
except WacomEEAGAINException:
|
except WacomEEAGAINException:
|
||||||
logger.warning('no data, please make sure the LED is blue and the button is pressed to switch it back to green')
|
logger.warning('no data, please make sure the LED is blue and the button is pressed to switch it back to green')
|
||||||
|
|
||||||
def register_device_slate(self):
|
def register_device(self):
|
||||||
self.register_connection()
|
self.register_connection()
|
||||||
logger.info('Press the button now to confirm')
|
logger.info('Press the button now to confirm')
|
||||||
self.emit('button-press-required')
|
self.emit('button-press-required')
|
||||||
|
@ -571,7 +533,88 @@ class WacomProtocol(GObject.Object):
|
||||||
fw_low = self.get_firmware_version(1)
|
fw_low = self.get_firmware_version(1)
|
||||||
logger.info(f'firmware is {fw_high}-{fw_low}')
|
logger.info(f'firmware is {fw_high}-{fw_low}')
|
||||||
|
|
||||||
def register_device_spark(self):
|
|
||||||
|
class WacomProtocolSlate(WacomProtocol):
|
||||||
|
'''
|
||||||
|
Subclass to handle the communication oddities with the Wacom Slate-like
|
||||||
|
devices.
|
||||||
|
|
||||||
|
:param device: the BlueZDevice object that is this wacom device
|
||||||
|
'''
|
||||||
|
width = 21600
|
||||||
|
height = 14800
|
||||||
|
protocol = Protocol.SLATE
|
||||||
|
|
||||||
|
|
||||||
|
class WacomProtocolSpark(WacomProtocol):
|
||||||
|
'''
|
||||||
|
Subclass to handle the communication oddities with the Wacom Spark-like
|
||||||
|
devices.
|
||||||
|
|
||||||
|
:param device: the BlueZDevice object that is this wacom device
|
||||||
|
'''
|
||||||
|
width = 21600
|
||||||
|
height = 14800
|
||||||
|
protocol = Protocol.SPARK
|
||||||
|
|
||||||
|
def is_data_available(self):
|
||||||
|
data = self.send_nordic_command_sync(command=0xc1,
|
||||||
|
expected_opcode=0xc2)
|
||||||
|
n = int.from_bytes(data[0:2], byteorder='big')
|
||||||
|
logger.debug(f'Drawings available: {n}')
|
||||||
|
return n > 0
|
||||||
|
|
||||||
|
def get_stroke_data(self):
|
||||||
|
data = self.send_nordic_command_sync(command=0xc5,
|
||||||
|
expected_opcode=[0xc7, 0xcd])
|
||||||
|
# FIXME: Sometimes the 0xc7 is missing on the spark? Not in any of
|
||||||
|
# the btsnoop logs but I only rarely get a c7 response here
|
||||||
|
count = 0
|
||||||
|
if data.opcode == 0xc7:
|
||||||
|
count = int.from_bytes(data[0:4], byteorder='little')
|
||||||
|
data = self.wait_nordic_data(0xcd, 5)
|
||||||
|
# logger.debug(f'cc returned {data} ')
|
||||||
|
|
||||||
|
str_timestamp = ''.join([f'{d:02x}' for d in data])
|
||||||
|
timestamp = time.strptime(str_timestamp, '%y%m%d%H%M%S')
|
||||||
|
return count, timestamp
|
||||||
|
|
||||||
|
def wait_for_end_read(self):
|
||||||
|
data = self.wait_nordic_data(0xc8, 5)
|
||||||
|
if data[0] != 0xed:
|
||||||
|
raise WacomException(f'unexpected answer: {data[0]:02x}')
|
||||||
|
crc = data[1:]
|
||||||
|
data = self.wait_nordic_data(0xc9, 5)
|
||||||
|
crc = data
|
||||||
|
crc.reverse()
|
||||||
|
crc = int(binascii.hexlify(bytes(crc)), 16)
|
||||||
|
pen_data = self.pen_data_buffer
|
||||||
|
self.pen_data_buffer = []
|
||||||
|
if crc != binascii.crc32(bytes(pen_data)):
|
||||||
|
logger.error("CRCs don't match")
|
||||||
|
return pen_data
|
||||||
|
|
||||||
|
def ack_transaction(self):
|
||||||
|
self.send_nordic_command_sync(command=0xca,
|
||||||
|
expected_opcode=None)
|
||||||
|
|
||||||
|
def retrieve_data(self):
|
||||||
|
try:
|
||||||
|
self.check_connection()
|
||||||
|
self.e3_command()
|
||||||
|
self.set_time()
|
||||||
|
battery, charging = self.get_battery_info()
|
||||||
|
if charging:
|
||||||
|
logger.debug(f'device is plugged in and charged at {battery}%')
|
||||||
|
else:
|
||||||
|
logger.debug(f'device is discharging: {battery}%')
|
||||||
|
self.emit('battery-status', battery, charging)
|
||||||
|
if self.read_offline_data() == 0:
|
||||||
|
logger.info('no data to retrieve')
|
||||||
|
except WacomEEAGAINException:
|
||||||
|
logger.warning('no data, please make sure the LED is blue and the button is pressed to switch it back to green')
|
||||||
|
|
||||||
|
def register_device(self):
|
||||||
try:
|
try:
|
||||||
self.check_connection()
|
self.check_connection()
|
||||||
except WacomWrongModeException:
|
except WacomWrongModeException:
|
||||||
|
@ -630,19 +673,40 @@ class WacomDevice(GObject.Object):
|
||||||
except KeyError:
|
except KeyError:
|
||||||
# unregistered device
|
# unregistered device
|
||||||
self._uuid = None
|
self._uuid = None
|
||||||
self._protocol = None
|
|
||||||
self._wacom_protocol = None
|
self._wacom_protocol = None
|
||||||
else:
|
else:
|
||||||
self._uuid = self._config['uuid']
|
self._uuid = self._config['uuid']
|
||||||
try:
|
|
||||||
self._protocol = next(p for p in Protocol if p.value == self._config['Protocol'])
|
|
||||||
except StopIteration:
|
|
||||||
logger.error(f'Unknown protocol in configuration: {self._config["Protocol"]}')
|
|
||||||
raise WacomCorruptDataException(f'Unknown Protocol {self._config["Protocol"]}')
|
|
||||||
self._init_protocol()
|
self._init_protocol()
|
||||||
|
|
||||||
def _init_protocol(self):
|
def _init_protocol(self):
|
||||||
self._wacom_protocol = WacomProtocol(self._device, self._uuid)
|
protocol = Protocol.UNKNOWN
|
||||||
|
if self._config is not None:
|
||||||
|
try:
|
||||||
|
protocol = next(p for p in Protocol if p.value == self._config['Protocol'])
|
||||||
|
except StopIteration:
|
||||||
|
logger.error(f'Unknown protocol in configuration: {self._config["Protocol"]}')
|
||||||
|
raise WacomCorruptDataException(f'Unknown Protocol {self._config["Protocol"]}')
|
||||||
|
|
||||||
|
if protocol == Protocol.UNKNOWN:
|
||||||
|
# we are registering a new device, or we might have an early
|
||||||
|
# config file from an older tuhi version
|
||||||
|
if WacomProtocol.is_spark(self._device):
|
||||||
|
protocol = Protocol.SPARK
|
||||||
|
else:
|
||||||
|
protocol = Protocol.SLATE
|
||||||
|
|
||||||
|
if protocol == Protocol.SPARK:
|
||||||
|
self._wacom_protocol = WacomProtocolSpark(self._device, self._uuid)
|
||||||
|
elif protocol == Protocol.SLATE:
|
||||||
|
self._wacom_protocol = WacomProtocolSlate(self._device, self._uuid)
|
||||||
|
else:
|
||||||
|
# FIXME: change to an assert once intuos-pro is implemented, we
|
||||||
|
# never get here
|
||||||
|
logger.error(f'Unknown Protocol {protocol}')
|
||||||
|
raise WacomCorruptDataException(f'Protocol "{protocol}" not implemented')
|
||||||
|
|
||||||
|
logger.debug(f'{self._device.name} is using {type(self._wacom_protocol)}')
|
||||||
|
|
||||||
self._wacom_protocol.connect('drawing', self._on_drawing_received)
|
self._wacom_protocol.connect('drawing', self._on_drawing_received)
|
||||||
self._wacom_protocol.connect('button-press-required', self._on_button_press_required)
|
self._wacom_protocol.connect('button-press-required', self._on_button_press_required)
|
||||||
self._wacom_protocol.connect('battery-status', self._on_battery_status)
|
self._wacom_protocol.connect('battery-status', self._on_battery_status)
|
||||||
|
@ -663,19 +727,14 @@ class WacomDevice(GObject.Object):
|
||||||
|
|
||||||
@GObject.Property
|
@GObject.Property
|
||||||
def protocol(self):
|
def protocol(self):
|
||||||
assert self._protocol is not None
|
assert self._wacom_protocol is not None
|
||||||
return self._protocol
|
return self._wacom_protocol.protocol
|
||||||
|
|
||||||
def register_device(self):
|
def register_device(self):
|
||||||
self._uuid = uuid.uuid4().hex[:12]
|
self._uuid = uuid.uuid4().hex[:12]
|
||||||
logger.debug(f'{self._device.address}: registering device, assigned {self.uuid}')
|
logger.debug(f'{self._device.address}: registering device, assigned {self.uuid}')
|
||||||
self._init_protocol()
|
self._init_protocol()
|
||||||
if self._wacom_protocol.is_spark():
|
self._wacom_protocol.register_device()
|
||||||
self._wacom_protocol.register_device_spark()
|
|
||||||
self._protocol = Protocol.SPARK
|
|
||||||
else:
|
|
||||||
self._wacom_protocol.register_device_slate()
|
|
||||||
self._protocol = Protocol.SLATE
|
|
||||||
logger.info('registration completed')
|
logger.info('registration completed')
|
||||||
self.notify('uuid')
|
self.notify('uuid')
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue