wacom: have subclasses to distinguish between protocols

The Intuos Pro Paper is close enough to the Slate but with some
subtleties. Instead of having a bunch of ifs, let's have nice subclasses
for the differences in the protocol.
This commit is contained in:
Benjamin Tissoires 2018-02-07 18:52:48 +01:00
parent e346787f5a
commit 2c77a2f36d
1 changed files with 109 additions and 64 deletions

View File

@ -101,6 +101,7 @@ class WacomProtocol(GObject.Object):
:param device: the BlueZDevice object that is this wacom device
'''
type = TuhiConfig.Protocol.UNKNOWN
__gsignals__ = {
'drawing':
@ -118,8 +119,6 @@ class WacomProtocol(GObject.Object):
self.nordic_answer = None
self.pen_data_buffer = []
self.orientation = ORIENTATION_PORTRAIT
self.width = WACOM_SLATE_WIDTH
self.height = WACOM_SLATE_HEIGHT
self.name = device.name
self._uuid = uuid
self.fw_logger = logging.getLogger('tuhi.fw')
@ -133,8 +132,9 @@ class WacomProtocol(GObject.Object):
device.connect_gatt_value(MYSTERIOUS_NOTIFICATION_CHRC_UUID,
self._on_mysterious_data_received)
def is_spark(self):
return MYSTERIOUS_NOTIFICATION_CHRC_UUID not in self.device.characteristics
@classmethod
def is_spark(cls, device):
return MYSTERIOUS_NOTIFICATION_CHRC_UUID not in device.characteristics
def _on_mysterious_data_received(self, name, value):
self.fw_logger.debug(f'mysterious: {binascii.hexlify(bytes(value))}')
@ -340,15 +340,11 @@ class WacomProtocol(GObject.Object):
def is_data_available(self):
data = self.send_nordic_command_sync(command=0xc1,
expected_opcode=0xc2)
n = 0
if self.is_spark():
n = int.from_bytes(data[0:2], byteorder='big')
else:
n = int.from_bytes(data[0:2], byteorder='little')
n = int.from_bytes(data[0:2], byteorder='little')
logger.debug(f'Drawings available: {n}')
return n > 0
def get_stroke_data_slate(self):
def get_stroke_data(self):
data = self.send_nordic_command_sync(command=0xcc,
expected_opcode=0xcf)
# logger.debug(f'cc returned {data} ')
@ -357,26 +353,6 @@ class WacomProtocol(GObject.Object):
timestamp = time.strptime(str_timestamp, '%y%m%d%H%M%S')
return count, timestamp
def get_stroke_data_spark(self):
data = self.send_nordic_command_sync(command=0xc5,
expected_opcode=[0xc7, 0xcd])
# FIXME: Sometimes the 0xc7 is missing on the spark? Not in any of
# the btsnoop logs but I only rarely get a c7 response here
count = 0
if data.opcode == 0xc7:
count = int.from_bytes(data[0:4], byteorder='little')
data = self.wait_nordic_data(0xcd, 5)
# logger.debug(f'cc returned {data} ')
str_timestamp = ''.join([f'{d:02x}' for d in data])
timestamp = time.strptime(str_timestamp, '%y%m%d%H%M%S')
return count, timestamp
def get_stroke_data(self):
if not self.is_spark():
return self.get_stroke_data_slate()
return self.get_stroke_data_spark()
def start_reading(self):
data = self.send_nordic_command_sync(command=0xc3,
expected_opcode=0xc8)
@ -388,27 +364,17 @@ class WacomProtocol(GObject.Object):
if data[0] != 0xed:
raise WacomException(f'unexpected answer: {data[0]:02x}')
crc = data[1:]
if self.is_spark():
data = self.wait_nordic_data(0xc9, 5)
crc = data
crc.reverse()
crc = int(binascii.hexlify(bytes(crc)), 16)
pen_data = self.pen_data_buffer
self.pen_data_buffer = []
if crc != binascii.crc32(bytes(pen_data)):
if not self.is_spark():
raise WacomCorruptDataException("CRCs don't match")
else:
logger.error("CRCs don't match")
raise WacomCorruptDataException("CRCs don't match")
return pen_data
def ack_transaction(self):
if self.is_spark():
opcode = None
else:
opcode = 0xb3
self.send_nordic_command_sync(command=0xca,
expected_opcode=opcode)
expected_opcode=0xb3)
def next_pen_data(self, data, offset):
debug_data = []
@ -538,8 +504,6 @@ class WacomProtocol(GObject.Object):
def retrieve_data(self):
try:
self.check_connection()
if self.is_spark():
self.e3_command()
self.set_time()
battery, charging = self.get_battery_info()
if charging:
@ -547,25 +511,24 @@ class WacomProtocol(GObject.Object):
else:
logger.debug(f'device is discharging: {battery}%')
self.emit('battery-status', battery, charging)
if not self.is_spark():
self.width = w = self.get_dimensions('width')
self.height = h = self.get_dimensions('height')
if self.orientation in [ORIENTATION_PORTRAIT,
ORIENTATION_UPSIDEDOWN_PORTRAIT]:
w = self.height
h = self.width
logger.debug(f'dimensions: {w}x{h}')
self.width = w = self.get_dimensions('width')
self.height = h = self.get_dimensions('height')
if self.orientation in [ORIENTATION_PORTRAIT,
ORIENTATION_UPSIDEDOWN_PORTRAIT]:
w = self.height
h = self.width
logger.debug(f'dimensions: {w}x{h}')
fw_high = self.get_firmware_version(0)
fw_low = self.get_firmware_version(1)
logger.debug(f'firmware is {fw_high}-{fw_low}')
self.ec_command()
fw_high = self.get_firmware_version(0)
fw_low = self.get_firmware_version(1)
logger.debug(f'firmware is {fw_high}-{fw_low}')
self.ec_command()
if self.read_offline_data() == 0:
logger.info('no data to retrieve')
except WacomEEAGAINException:
logger.warning('no data, please make sure the LED is blue and the button is pressed to switch it back to green')
def register_device_slate(self):
def register_device(self):
self.register_connection()
logger.info('Press the button now to confirm')
self.emit('button-press-required')
@ -585,7 +548,88 @@ class WacomProtocol(GObject.Object):
fw_low = self.get_firmware_version(1)
logger.info(f'firmware is {fw_high}-{fw_low}')
def register_device_spark(self):
class WacomProtocolSlate(WacomProtocol):
'''
Subclass to handle the communication oddities with the Wacom Slate-like
devices.
:param device: the BlueZDevice object that is this wacom device
'''
width = WACOM_SLATE_WIDTH
height = WACOM_SLATE_HEIGHT
type = TuhiConfig.Protocol.SLATE
class WacomProtocolSpark(WacomProtocol):
'''
Subclass to handle the communication oddities with the Wacom Spark-like
devices.
:param device: the BlueZDevice object that is this wacom device
'''
width = WACOM_SLATE_WIDTH
height = WACOM_SLATE_HEIGHT
type = TuhiConfig.Protocol.SPARK
def is_data_available(self):
data = self.send_nordic_command_sync(command=0xc1,
expected_opcode=0xc2)
n = int.from_bytes(data[0:2], byteorder='big')
logger.debug(f'Drawings available: {n}')
return n > 0
def get_stroke_data(self):
data = self.send_nordic_command_sync(command=0xc5,
expected_opcode=[0xc7, 0xcd])
# FIXME: Sometimes the 0xc7 is missing on the spark? Not in any of
# the btsnoop logs but I only rarely get a c7 response here
count = 0
if data.opcode == 0xc7:
count = int.from_bytes(data[0:4], byteorder='little')
data = self.wait_nordic_data(0xcd, 5)
# logger.debug(f'cc returned {data} ')
str_timestamp = ''.join([f'{d:02x}' for d in data])
timestamp = time.strptime(str_timestamp, '%y%m%d%H%M%S')
return count, timestamp
def wait_for_end_read(self):
data = self.wait_nordic_data(0xc8, 5)
if data[0] != 0xed:
raise WacomException(f'unexpected answer: {data[0]:02x}')
crc = data[1:]
data = self.wait_nordic_data(0xc9, 5)
crc = data
crc.reverse()
crc = int(binascii.hexlify(bytes(crc)), 16)
pen_data = self.pen_data_buffer
self.pen_data_buffer = []
if crc != binascii.crc32(bytes(pen_data)):
logger.error("CRCs don't match")
return pen_data
def ack_transaction(self):
self.send_nordic_command_sync(command=0xca,
expected_opcode=None)
def retrieve_data(self):
try:
self.check_connection()
self.e3_command()
self.set_time()
battery, charging = self.get_battery_info()
if charging:
logger.debug(f'device is plugged in and charged at {battery}%')
else:
logger.debug(f'device is discharging: {battery}%')
self.emit('battery-status', battery, charging)
if self.read_offline_data() == 0:
logger.info('no data to retrieve')
except WacomEEAGAINException:
logger.warning('no data, please make sure the LED is blue and the button is pressed to switch it back to green')
def register_device(self):
try:
self.check_connection()
except WacomWrongModeException:
@ -648,7 +692,10 @@ class WacomDevice(GObject.Object):
self._init_protocol()
def _init_protocol(self):
self._wacom_protocol = WacomProtocol(self._device, self._uuid)
if WacomProtocol.is_spark(self._device):
self._wacom_protocol = WacomProtocolSpark(self._device, self._uuid)
else:
self._wacom_protocol = WacomProtocolSlate(self._device, self._uuid)
self._wacom_protocol.connect('drawing', self._on_drawing_received)
self._wacom_protocol.connect('button-press-required', self._on_button_press_required)
self._wacom_protocol.connect('battery-status', self._on_battery_status)
@ -670,7 +717,8 @@ class WacomDevice(GObject.Object):
@GObject.Property
def protocol(self):
assert self._wacom_protocol is not None
if self._wacom_protocol.is_spark():
return self._wacom_protocol.type
if WacomProtocol.is_spark(self._device):
return TuhiConfig.Protocol.SPARK
return TuhiConfig.Protocol.SLATE
@ -678,10 +726,7 @@ class WacomDevice(GObject.Object):
self._uuid = uuid.uuid4().hex[:12]
logger.debug(f'{self._device.address}: registering device, assigned {self.uuid}')
self._init_protocol()
if self._wacom_protocol.is_spark():
self._wacom_protocol.register_device_spark()
else:
self._wacom_protocol.register_device_slate()
self._wacom_protocol.register_device()
logger.info('registration completed')
self.notify('uuid')