wacom: use the new protocol implemenation for all "normal" messages

This is an in-situ replacement of the old functionality vs the new one,
resulting in a bit of duplication. Now that the Protocol takes care of the
device-specifics most of our functions look identical. This will be fixed in a
follow-up commit.

Missing from the conversion here is the initial register handling (too
convoluted right now) and the parsing of the pen data.

Signed-off-by: Peter Hutterer <peter.hutterer@who-t.net>
pull/153/head
Peter Hutterer 2019-08-02 15:50:10 +10:00
parent bcbb0982c8
commit 023decb1d4
1 changed files with 56 additions and 131 deletions

View File

@ -13,7 +13,6 @@
import binascii
import calendar
import enum
import inspect
import logging
@ -25,7 +24,8 @@ import errno
from gi.repository import GObject
from .drawing import Drawing
from .uhid import UHIDDevice
from tuhi.protocol import NordicData
import tuhi.protocol
from tuhi.protocol import NordicData, Interactions, Mode, ProtocolVersion
logger = logging.getLogger('tuhi.wacom')
@ -621,8 +621,10 @@ class WacomProtocolBase(WacomProtocolLowLevelComm):
(GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_INT, GObject.TYPE_BOOLEAN)),
}
def __init__(self, device, uuid):
def __init__(self, device, uuid, protocol_version=ProtocolVersion.ANY):
super().__init__(device)
self.p = tuhi.protocol.Protocol(protocol_version, self.nordic_data_exchange)
self._uuid = uuid
self._timestamp = 0
self.pen_data_buffer = []
@ -704,11 +706,10 @@ class WacomProtocolBase(WacomProtocolLowLevelComm):
self._last_pen_data_time = time.time()
def check_connection(self):
args = [int(i) for i in binascii.unhexlify(self._uuid)]
self.send_nordic_command_sync(command=0xe6, arguments=args)
self.p.execute(Interactions.CONNECT, self._uuid)
def e3_command(self):
self.send_nordic_command_sync(command=0xe3)
self.p.execute(Interactions.UNKNOWN_E3)
def time_to_bytes(self):
# Device time is UTC
@ -722,75 +723,42 @@ class WacomProtocolBase(WacomProtocolLowLevelComm):
return time.strptime(str_timestamp, '%y%m%d%H%M%S')
def set_time(self):
args = self.time_to_bytes()
self.send_nordic_command_sync(command=0xb6, arguments=args)
self.p.execute(Interactions.SET_TIME, time.time())
def read_time(self):
data = self.send_nordic_command_sync(command=0xb6,
expected_opcode=0xbd)
ts = self.time_from_bytes(data)
logger.debug(f'device time: UTC {time.strftime("%y%m%d%H%M%S", ts)}')
ts = self.p.execute(Interactions.GET_TIME).timestamp
t = time.gmtime(ts)
logger.debug(f'device time: UTC {time.strftime("%y%m%d%H%M%S", t)}')
tdelta = time.mktime(time.gmtime()) - time.mktime(ts)
tdelta = time.mktime(time.gmtime()) - time.mktime(t)
if abs(tdelta) > 300:
logger.error(f'device time is out by more than 5 minutes')
def get_battery_info(self):
data = self.send_nordic_command_sync(command=0xb9,
expected_opcode=0xba)
battery, charging = int(data[0]), data[1] == 1
logger.info(f'device battery: {battery}% ({"dis" if not charging else ""}charging)')
return battery, charging
msg = self.p.execute(Interactions.GET_BATTERY)
logger.info(f'device battery: {msg.battery_percent}% ({"dis" if not msg.battery_is_charging else ""}charging)')
return msg.battery_percent, msg.battery_is_charging
def get_firmware_version(self):
hi = self.send_nordic_command_sync(command=0xb7,
expected_opcode=0xb8,
arguments=(0,))
lo = self.send_nordic_command_sync(command=0xb7,
expected_opcode=0xb8,
arguments=(1,))
fw_hi = ''.join([hex(d)[2:] for d in hi[1:]])
fw_lo = ''.join([hex(d)[2:] for d in lo[1:]])
fw = f'{fw_hi}-{fw_lo}'
fw = self.p.execute(Interactions.GET_FIRMWARE).firmware
logger.info(f'firmware is {fw}')
return fw
def get_name(self):
data = self.send_nordic_command_sync(command=0xbb,
expected_opcode=0xbc)
name = bytes(data)
name = self.p.execute(Interactions.GET_NAME).name
logger.info(f'device name is {name}')
return name
def get_dimensions(self):
args = [3, 0x00] # width
data = self.send_nordic_command_sync(command=0xea,
expected_opcode=0xeb,
arguments=args)
if len(data) != 6:
str_data = binascii.hexlify(bytes(data))
raise WacomCorruptDataException(f'unexpected answer for get_dimensions: {str_data}')
width = int.from_bytes(data[2:4], byteorder='little')
args = [4, 0x00] # height
data = self.send_nordic_command_sync(command=0xea,
expected_opcode=0xeb,
arguments=args)
if len(data) != 6:
str_data = binascii.hexlify(bytes(data))
raise WacomCorruptDataException(f'unexpected answer for get_dimensions: {str_data}')
height = int.from_bytes(data[2:4], byteorder='little')
logger.info(f'dimensions: {width}x{height}')
return width, height
msg = self.p.execute(Interactions.GET_DIMENSIONS)
logger.info(f'dimensions: {msg.width}x{msg.height}')
return msg.width, msg.height
def ec_command(self):
args = [0x06, 0x00, 0x00, 0x00, 0x00, 0x00]
self.send_nordic_command_sync(command=0xec, arguments=args)
self.p.execute(Interactions.UNKNOWN_EC)
def start_live(self, fd):
self.send_nordic_command_sync(command=0xb1)
self.p.execute(Interactions.SET_MODE, Mode.LIVE)
logger.debug(f'Starting wacom live mode on fd: {fd}')
rdesc = wacom_live_rdesc_template[:]
@ -807,40 +775,23 @@ class WacomProtocolBase(WacomProtocolLowLevelComm):
self._uhid_device = uhid_device
def stop_live(self):
args = [0x02]
self.send_nordic_command_sync(command=0xb1, arguments=args)
self.p.execute(Interactions.SET_MODE, Mode.IDLE)
def b1_command(self):
args = [0x01]
self.send_nordic_command_sync(command=0xb1, arguments=args)
self.p.execute(Interactions.SET_MODE, Mode.PAPER).execute()
def is_data_available(self):
data = self.send_nordic_command_sync(command=0xc1,
expected_opcode=0xc2)
n = int.from_bytes(data[0:2], byteorder='big')
n = self.p.execute(Interactions.GET_DATA_AVAILABLE).count
logger.debug(f'Drawings available: {n}')
return n > 0
def get_stroke_data(self):
data = self.send_nordic_command_sync(command=0xc5,
expected_opcode=[0xc7, 0xcd])
# FIXME: Sometimes the 0xc7 is missing on the spark? Not in any of
# the btsnoop logs but I only rarely get a c7 response here
count = 0
if data.opcode == 0xc7:
count = int.from_bytes(data[0:4], byteorder='little')
data = self.wait_nordic_data(0xcd, 5)
# logger.debug(f'cc returned {data} ')
str_timestamp = ''.join([f'{d:02x}' for d in data])
timestamp = time.strptime(str_timestamp, '%y%m%d%H%M%S')
return count, timestamp
msg = self.p.execute(Interactions.GET_STROKES)
# logger.debug(f'cc returned {data} ')
return msg.count, msg.timestamp
def start_reading(self):
data = self.send_nordic_command_sync(command=0xc3,
expected_opcode=0xc8)
if data[0] != 0xbe:
raise WacomException(f'unexpected answer: {data[0]:02x}')
self.p.execute(Interactions.START_READING)
def wait_nordic_unless_pen_data(self, opcode, timeout=None):
data = None
@ -882,8 +833,7 @@ class WacomProtocolBase(WacomProtocolLowLevelComm):
logger.warning('no data, please make sure the LED is blue and the button is pressed to switch it back to green')
def ack_transaction(self):
self.send_nordic_command_sync(command=0xca,
expected_opcode=None)
self.p.execute(Interactions.ACK_TRANSACTION)
def get_coordinate(self, bitmask, n, data, v, dv):
# drop the first 2 bytes as they are not valuable here
@ -976,12 +926,13 @@ class WacomProtocolBase(WacomProtocolLowLevelComm):
transaction_count = 0
while self.is_data_available():
count, timestamp = self.get_stroke_data()
logger.info(f'receiving {count} bytes drawn on UTC {time.strftime("%y%m%d%H%M%S", timestamp)}')
t = time.gmtime(timestamp)
logger.info(f'receiving {count} bytes drawn on UTC {time.strftime("%y%m%d%H%M%S", t)}')
self.start_reading()
pen_data = self.wait_for_end_read()
str_pen = binascii.hexlify(bytes(pen_data))
logger.info(f'received {str_pen}')
drawing = self.parse_pen_data(pen_data, timestamp)
drawing = self.parse_pen_data(pen_data, t)
if drawing:
self.emit('drawing', drawing)
self.ack_transaction()
@ -989,16 +940,10 @@ class WacomProtocolBase(WacomProtocolLowLevelComm):
return transaction_count
def set_name(self, name):
# On the Spark, the name needs a trailing linebreak, otherwise the
# firmware gets confused.
args = [ord(c) for c in name] + [0x0a]
data = self.send_nordic_command_sync(command=0xbb,
arguments=args)
return bytes(data)
self.p.execute(Interactions.SET_NAME, name)
def register_device_finish(self):
self.send_nordic_command_sync(command=0xe5,
arguments=None)
self.p.execute(Interactions.REGISTER_COMPLETE)
self.set_time()
self.read_time()
self.get_name()
@ -1036,6 +981,10 @@ class WacomProtocolSpark(WacomProtocolBase):
packet_handlers = [WacomPacketHandlerEndOfStroke,
WacomPacketHandlerEndOfSequence]
def __init__(self, device, uuid, protocol_version=ProtocolVersion.SPARK):
assert(protocol_version >= ProtocolVersion.SPARK)
super().__init__(device, uuid, protocol_version=protocol_version)
class WacomProtocolSlate(WacomProtocolSpark):
'''
@ -1057,9 +1006,9 @@ class WacomProtocolSlate(WacomProtocolSpark):
protocol = Protocol.SLATE
packet_handlers = [WacomPacketHandlerStrokePrefixSlate]
def __init__(self, device, uuid):
super().__init__(device, uuid)
def __init__(self, device, uuid, protocol_version=ProtocolVersion.SLATE):
assert(protocol_version >= ProtocolVersion.SLATE)
super().__init__(device, uuid, protocol_version=protocol_version)
device.connect_gatt_value(SYSEVENT_NOTIFICATION_CHRC_UUID,
self._on_sysevent_data_received)
@ -1078,22 +1027,17 @@ class WacomProtocolSlate(WacomProtocolSpark):
self.fw_logger.sysevent.recv(value)
def ack_transaction(self):
self.send_nordic_command_sync(command=0xca)
self.p.execute(Interactions.ACK_TRANSACTION)
def is_data_available(self):
data = self.send_nordic_command_sync(command=0xc1,
expected_opcode=0xc2)
n = int.from_bytes(data[0:2], byteorder='little')
n = self.p.execute(Interactions.GET_DATA_AVAILABLE).count
logger.debug(f'Drawings available: {n}')
return n > 0
def get_stroke_data(self):
data = self.send_nordic_command_sync(command=0xcc,
expected_opcode=0xcf)
msg = self.p.execute(Interactions.GET_STROKES)
# logger.debug(f'cc returned {data} ')
count = int.from_bytes(data[0:4], byteorder='little')
timestamp = self.time_from_bytes(data[4:])
return count, timestamp
return msg.count, msg.timestamp
def register_device_finish(self):
self.set_time()
@ -1164,8 +1108,9 @@ class WacomProtocolIntuosPro(WacomProtocolSlate):
WacomPacketHandlerStrokeTimestampIntuosPro,
WacomPacketHandlerUnknownFixedStrokeDataIntuosPro]
def __init__(self, device, uuid):
super().__init__(device, uuid)
def __init__(self, device, uuid, protocol_version=ProtocolVersion.INTUOS_PRO):
assert(protocol_version >= ProtocolVersion.INTUOS_PRO)
super().__init__(device, uuid, protocol_version=protocol_version)
def time_to_bytes(self):
t = int(time.time())
@ -1179,44 +1124,24 @@ class WacomProtocolIntuosPro(WacomProtocolSlate):
# set_time is identical to spark/slate except the timestamp format
def read_time(self):
data = self.send_nordic_command_sync(command=0xd6,
expected_opcode=0xbd)
timestamp = self.p.execute(Interactions.GET_TIME).timestamp
# Last two bytes are unknown
ts = time.strftime('%y-%m-%d %H:%M:%S', self.time_from_bytes(data))
logger.debug(f'device time: {ts}')
t = time.strftime('%y-%m-%d %H:%M:%S', time.localtime(timestamp))
logger.debug(f'device time: {t}')
def get_firmware_version(self):
hi = self.send_nordic_command_sync(command=0xb7,
expected_opcode=0xb8,
arguments=(0,))
lo = self.send_nordic_command_sync(command=0xb7,
expected_opcode=0xb8,
arguments=(1,))
fw_hi = ''.join([chr(d) for d in hi[1:]])
fw_lo = ''.join([chr(d) for d in lo[1:]])
fw = f'{fw_hi}-{fw_lo}'
fw = self.p.execute(Interactions.GET_FIRMWARE).firmware
logger.info(f'firmware is {fw}')
return fw
def get_name(self):
data = self.send_nordic_command_sync(command=0xdb,
expected_opcode=0xbc)
name = bytes(data)
name = self.p.execute(Interactions.GET_NAME).name
logger.info(f'device name is {name}')
return name
def set_name(self, name):
args = [ord(c) for c in name]
data = self.send_nordic_command_sync(command=0xbb,
arguments=args)
return bytes(data)
self.p.execute(Interactions.SET_NAME, name)
def check_connection(self):
args = [int(i) for i in binascii.unhexlify(self._uuid)]
self.send_nordic_command_sync(command=0xe6,
expected_opcode=[0x50, 0x51],
arguments=args)
self.p.execute(Interactions.CONNECT, self._uuid)
def parse_pen_data_prefix(self, data):
expected_prefix = b'\x67\x82\x69\x65'