1894 lines
57 KiB
Python
1894 lines
57 KiB
Python
#!/usr/bin/env python3
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
|
|
# Implementation of the BLE protocol of the Wacom SmartPad devices.
|
|
#
|
|
# Each device has a different set of functionalities and uses different
|
|
# opcodes and message formats. So the entry point is the Protocol class, and
|
|
# the messages can be accessed by key.
|
|
#
|
|
# def my_callback(request, requires_reply=True, userdata=None, timeout=5):
|
|
# if request is not None:
|
|
# send_to_device(request)
|
|
# if requires_reply:
|
|
# return read_from device() or eventually_timeout()
|
|
#
|
|
# p = Protocol(ProtocolVersion.INTUOS_PRO, my_callback, userdata)
|
|
# m1 = P.get(Interactions.GET_NAME)
|
|
# m2 = P.get(Interactions.SET_NAME, 'SomeName')
|
|
#
|
|
# Each message is defined by a string (see INTERACTIONS) and takes the obvious
|
|
# parameters where applicable.
|
|
# The message itself is the protocol-specific Msg, a single logical interaction
|
|
# with the device.
|
|
#
|
|
# The data exchange is prompted by the execute() function, which sets the
|
|
# attributes on the message and can be chained where appropriate, e.g.
|
|
#
|
|
# name = m1.execute().name
|
|
# if name != 'SomeName':
|
|
# m2.execute()
|
|
#
|
|
# Because we generally expect everything to work fine and where it doesn't
|
|
# it affects a few things anyway, so error handling is via exceptions.
|
|
#
|
|
# sequence = [m1, m2, m3, ...]
|
|
# try:
|
|
# for msg in sequence:
|
|
# m.execute()
|
|
# except AuthorizationError:
|
|
# print("oops, we have the wrong uuid")
|
|
#
|
|
|
|
import binascii
|
|
import enum
|
|
import time
|
|
import logging
|
|
import errno
|
|
from collections import namedtuple
|
|
from .util import list2hex
|
|
|
|
logger = logging.getLogger('tuhi.protocol')
|
|
|
|
|
|
def little_u16(x):
|
|
'''
|
|
Convert to or from a 16-bit integer to a little-endian 2-byte array. If
|
|
passed an integer, the return value is a 2-byte array. If passed a
|
|
2-byte array, the return value is a 16-bit integer.
|
|
'''
|
|
if isinstance(x, int):
|
|
assert(x <= 0xffff and x >= 0x0000)
|
|
return x.to_bytes(2, byteorder='little')
|
|
else:
|
|
assert(len(x) == 2)
|
|
return int.from_bytes(x, byteorder='little')
|
|
|
|
|
|
def little_u32(x):
|
|
'''
|
|
Convert to or from a 16-bit integer to a little-endian 4-byte array. If
|
|
passed an integer, the return value is a 4-byte array. If passed a
|
|
4-byte array, the return value is a 16-bit integer.
|
|
'''
|
|
if isinstance(x, int):
|
|
assert(x <= 0xffffffff and x >= 0x00000000)
|
|
return x.to_bytes(4, byteorder='little')
|
|
else:
|
|
assert(len(x) == 4)
|
|
return int.from_bytes(x, byteorder='little')
|
|
|
|
|
|
def little_u64(x):
|
|
'''
|
|
Convert to or from a 64-bit integer to a little-endian 4-byte array. If
|
|
passed an integer, the return value is a 8-byte array. If passed a
|
|
4-byte array, the return value is a 64-bit integer.
|
|
'''
|
|
if isinstance(x, int):
|
|
assert(x <= 0xffffffffffffffff and x >= 0x0000000000000000)
|
|
return x.to_bytes(8, byteorder='little')
|
|
else:
|
|
assert(len(x) == 8)
|
|
return int.from_bytes(x, byteorder='little')
|
|
|
|
|
|
class Interactions(enum.Enum):
|
|
'''All possible interactions with a device. Not all of these
|
|
interactions may be available on any specific device.'''
|
|
CONNECT = enum.auto()
|
|
GET_NAME = enum.auto()
|
|
SET_NAME = enum.auto()
|
|
GET_TIME = enum.auto()
|
|
SET_TIME = enum.auto()
|
|
GET_FIRMWARE = enum.auto()
|
|
GET_BATTERY = enum.auto()
|
|
GET_WIDTH = enum.auto()
|
|
GET_HEIGHT = enum.auto()
|
|
SET_MODE = enum.auto()
|
|
GET_STROKES = enum.auto()
|
|
AVAILABLE_FILES_COUNT = enum.auto()
|
|
DOWNLOAD_OLDEST_FILE = enum.auto()
|
|
DELETE_OLDEST_FILE = enum.auto()
|
|
WAIT_FOR_END_READ = enum.auto()
|
|
REGISTER_PRESS_BUTTON = enum.auto()
|
|
REGISTER_WAIT_FOR_BUTTON = enum.auto()
|
|
REGISTER_COMPLETE = enum.auto()
|
|
SET_FILE_TRANSFER_REPORTING_TYPE = enum.auto()
|
|
GET_POINT_SIZE = enum.auto()
|
|
|
|
UNKNOWN_E3 = enum.auto()
|
|
|
|
|
|
def as_hex_string(data):
|
|
'''
|
|
Returns the given byte-like to a debugging hex string in the form::
|
|
|
|
12 ab 34 cd 05 ..
|
|
|
|
Supports bytes and lists of integers.
|
|
'''
|
|
if isinstance(data, bytes):
|
|
hx = binascii.hexlify(data).decode('ascii')
|
|
return ' '.join([''.join(x) for x in zip(hx[::2], hx[1::2])])
|
|
elif isinstance(data, list):
|
|
return ' '.join([f'{x:02x}' for x in data])
|
|
|
|
raise ValueError('Unsupported data format {data.__class__} for {data}')
|
|
|
|
|
|
def _get_protocol_dictionary(protocol):
|
|
'''
|
|
Returns a dict with the messages available for devices speaking that
|
|
particular protocol. These are classes, not objects, instantiate as
|
|
required. Usage::
|
|
|
|
pdict = get_protocol_dictionary(ProtocolVersion.ANY)
|
|
m = pdict[Interactions.GET_NAME]
|
|
print(m().execute().name)
|
|
m = pdict[Interactions.SET_NAME]
|
|
m('mynewname').execute()
|
|
|
|
The list of functions (``GET_NAME`` in the above example) depends on the
|
|
implementation state and may vary between devices.
|
|
'''
|
|
# Load all classes from this module
|
|
import sys
|
|
import inspect
|
|
classes = inspect.getmembers(sys.modules[__name__],
|
|
lambda member: inspect.isclass(member) and
|
|
member.__module__ == __name__) # NOQA
|
|
# Filter to the ones with Msg as base class
|
|
msgs = []
|
|
for name, cls in classes:
|
|
if cls == Msg:
|
|
continue
|
|
base_classes = inspect.getmro(cls)
|
|
if Msg not in base_classes:
|
|
continue
|
|
msgs.append(cls)
|
|
|
|
# Now compile the protocol-specific LUT for all functions that we
|
|
# suppport
|
|
pdict = {}
|
|
for cls in msgs:
|
|
assert cls.opcode is not None
|
|
assert cls.interaction
|
|
assert cls.protocol >= ProtocolVersion.ANY
|
|
|
|
if cls.protocol > protocol:
|
|
continue
|
|
|
|
# Only take the latest version of a message
|
|
if cls.interaction in pdict and cls.protocol < pdict[cls.interaction].protocol:
|
|
continue
|
|
pdict[cls.interaction] = cls
|
|
return pdict
|
|
|
|
|
|
@enum.unique
|
|
class ProtocolVersion(enum.IntEnum):
|
|
'''
|
|
Protocol version numbers, named after the devices first encountered
|
|
on. These version numbers are purely for sorting between devices, i.e.
|
|
do not use the numeric values of this enum. That value may change
|
|
if we discover more devices that have states in between.
|
|
|
|
The exact behavior of each protocol is varied, but
|
|
|
|
* opcodes may differ between protocol versions,
|
|
* the data inside a message may differ between versions, nd
|
|
* some functionality may only be available in some versions but not
|
|
others.
|
|
'''
|
|
ANY = 0
|
|
SPARK = 1
|
|
SLATE = 2
|
|
INTUOS_PRO = 3
|
|
|
|
@classmethod
|
|
def from_string(cls, string):
|
|
'''
|
|
Return the Enum value for the given string, allowing for different
|
|
spellings. Specifically: INTUOS_PRO, intuos_pro and intuos-pro are
|
|
all allowed for the ``INTUOS_PRO`` enum value.
|
|
|
|
:raise ValueError: if the name cannot be mapped.
|
|
'''
|
|
names = {e.name: e for e in cls}
|
|
if string in names:
|
|
return names[string]
|
|
|
|
names = {e.name.lower(): e for e in cls}
|
|
if string in names:
|
|
return names[string]
|
|
|
|
names = {e.name.lower().replace('_', '-'): e for e in cls}
|
|
if string in names:
|
|
return names[string]
|
|
|
|
raise ValueError(string)
|
|
|
|
|
|
class Mode(enum.IntEnum):
|
|
'''
|
|
The mode the tablet is in. ``LIVE`` mode is when the tablet reports pen
|
|
strokes immediately. ``IDLE`` is the live mode but without reporting.
|
|
``PAPER`` is the normal mode, i.e. where we download drawings.
|
|
'''
|
|
LIVE = 0x00
|
|
PAPER = 0x01
|
|
IDLE = 0x02
|
|
|
|
|
|
class Protocol(object):
|
|
'''
|
|
The main communication class.
|
|
|
|
:param protocol_version: a :class:`ProtocolVersion`
|
|
:param callback: the callback to invoke for any messages
|
|
:param userdata: optional data argument provided to the callback function
|
|
'''
|
|
def __init__(self, protocol_version, callback, userdata=None):
|
|
self.protocol_version = protocol_version
|
|
self.callback = callback
|
|
self.userdata = userdata
|
|
self.lut = _get_protocol_dictionary(protocol_version)
|
|
|
|
def get(self, key, *args, **kwargs):
|
|
'''
|
|
Return the message with the given :class:`Interactions` key. This
|
|
only returns the message but does not execute it. In most cases,
|
|
you want to use :func:`execute` instead.
|
|
'''
|
|
kwargs['callback'] = self.callback
|
|
kwargs['userdata'] = self.userdata
|
|
msg = self.lut[key]
|
|
return msg(*args, **kwargs)
|
|
|
|
def execute(self, key, *args, **kwargs):
|
|
'''
|
|
Execute the message with the given :class:`Interactions` key and (where
|
|
applicable) the arguments. This returns the already executed message
|
|
that has the attributes you'd expect.
|
|
'''
|
|
return self.get(key, *args, **kwargs).execute()
|
|
|
|
def parse_pen_data(self, data):
|
|
'''
|
|
Parse the given pen data. Returns a list of :class:`StrokeFile` objects.
|
|
'''
|
|
files = []
|
|
while data:
|
|
logger.debug(f'... remaining data ({len(data)}): {list2hex(data)}')
|
|
sf = StrokeFile(data)
|
|
files.append(sf)
|
|
data = data[sf.bytesize:]
|
|
return files
|
|
|
|
|
|
class NordicData(list):
|
|
'''
|
|
A set of bytes as expected by the Nordic controller on the device.
|
|
First byte is the opcode, second byte is the data length, rest is data.
|
|
|
|
This is an abstraction of a list. Instantiate with the full raw data,
|
|
the list contents will just be the data bytes:
|
|
|
|
>>> data = NordicData([0xab, 4, 0x1, 0x2, 0x3, 0x4])
|
|
>>> data
|
|
[1, 2, 3, 4]
|
|
>>> data.opcode
|
|
0xab
|
|
>>> data.length
|
|
4
|
|
>>> len(data)
|
|
4
|
|
|
|
.. attribute:: opcode
|
|
|
|
The opcode for this message
|
|
|
|
.. attribute:: length
|
|
|
|
The data length of this message. This field is guaranteed to be
|
|
equivalent to len(data) or an exception is raised.
|
|
|
|
'''
|
|
def __init__(self, bs):
|
|
data = bs[2:]
|
|
super().__init__(data)
|
|
self.opcode = bs[0]
|
|
self.length = bs[1]
|
|
if self.length != len(data):
|
|
raise UnexpectedDataError(bs, f'Invalid data: length field {self.length}, data length is {len(data)}')
|
|
|
|
def __repr__(self):
|
|
return f'{self.opcode:02x} / {self.length:02x} / {as_hex_string(self)}'
|
|
|
|
|
|
class ProtocolError(Exception):
|
|
'''
|
|
Base class for all Tuhi-protocol related errors.
|
|
'''
|
|
errno = errno.ENOSYS
|
|
|
|
def __init__(self, message=None):
|
|
self.message = message
|
|
|
|
|
|
class MissingReplyError(ProtocolError):
|
|
'''
|
|
Thrown when we expected a reply but never got one. Usually caused by a
|
|
timeout.
|
|
'''
|
|
errno = errno.ETIME
|
|
|
|
def __init__(self, request, message=None):
|
|
self.request = request
|
|
|
|
def __repr__(self):
|
|
return f'Missing reply for request {self.request}. {self.message}'
|
|
|
|
|
|
class AuthorizationError(ProtocolError):
|
|
'''
|
|
The device does not recognize our UUID.
|
|
'''
|
|
errno = errno.EACCES
|
|
|
|
|
|
class UnexpectedReply(ProtocolError):
|
|
'''
|
|
Exception thrown when the reply from the device does not match the
|
|
opcodes we expected.
|
|
|
|
This is not an error coming from the device, this is an
|
|
implementation bug.
|
|
|
|
.. attribute:: msg
|
|
|
|
The Message that caused the unexpected reply.
|
|
'''
|
|
errno = errno.EPROTO
|
|
|
|
def __init__(self, msg, message=None):
|
|
super().__init__(message)
|
|
self.msg = msg
|
|
|
|
def __repr__(self):
|
|
return f'{self.__class__}: {self.msg}: {self.message}'
|
|
|
|
|
|
class UnexpectedDataError(ProtocolError):
|
|
'''
|
|
Exception thrown when the data is invalid. This is either a bug in our
|
|
parsing or a genuine issue with the tablet, but more likely the former.
|
|
|
|
This is not an error coming from the device, this is an
|
|
implementation bug.
|
|
|
|
.. attribute:: bytes
|
|
|
|
The raw bytes that caused the unexpected data.
|
|
'''
|
|
errno = errno.EPROTO
|
|
|
|
def __init__(self, bytes, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.bytes = bytes
|
|
|
|
def __repr__(self):
|
|
return f'{self.__class__}: {self.bytes} - {self.message}'
|
|
|
|
|
|
class DeviceError(ProtocolError):
|
|
'''
|
|
The device replied with an error. Check the error code for which error
|
|
exactly happened.
|
|
|
|
.. attribute:: errorcode
|
|
|
|
An error code indicating which error occured on the device.
|
|
|
|
'''
|
|
errno = errno.EPROTO
|
|
|
|
class ErrorCode(enum.IntEnum):
|
|
'''
|
|
List of protocol errors as used by the Device.
|
|
|
|
The error code ``SUCCESS`` is provided for convenience only, it is
|
|
filtered by the implementation and not used in an actual exception.
|
|
'''
|
|
SUCCESS = 0x0
|
|
GENERAL_ERROR = 0x1
|
|
INVALID_STATE = 0x2
|
|
READ_ONLY_PARAM = 0x3
|
|
COMMAND_NOT_SUPPORTED = 0x4
|
|
|
|
def __init__(self, errorcode, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.errorcode = DeviceError.ErrorCode(errorcode)
|
|
|
|
# All the other errors are something the user can't do
|
|
# anything about.
|
|
if self.errorcode == DeviceError.ErrorCode.INVALID_STATE:
|
|
self.errno = errno.EBADE
|
|
|
|
|
|
class Msg(object):
|
|
'''
|
|
A single logical interaction (request + reply) with the Wacom device.
|
|
In some cases a :class:`Msg` may issue multiple requests and replies as
|
|
one logical unit, but that should be considered an implementation
|
|
detail.
|
|
|
|
:param callback: The callback to invoke to talk to the device. This
|
|
function must take one :class:`NordicData` and an
|
|
optional userdata argument and return one
|
|
:class:`NordicData` with the reply.
|
|
:param userdata: The data passed to the callback.
|
|
|
|
.. attribute:: request
|
|
|
|
The :class:`NordicData` sent to the device
|
|
|
|
.. attribute:: reply
|
|
|
|
The :class:`NordicData` returned to the device
|
|
|
|
.. attribute:: errorcode
|
|
|
|
The :class:`DeviceError.ErrorCodeNordicData` from this message.
|
|
|
|
'''
|
|
opcode = None
|
|
''' The message-specific opcode. Must be defined in the subclass '''
|
|
protocol = ProtocolVersion.ANY
|
|
'''Minimum supported protocol version'''
|
|
interaction = None
|
|
'''The dictionary name for this interaction (e.g. ``GET_TIME``). Must be
|
|
defined in the subclass'''
|
|
requires_reply = True
|
|
'''True if this message requires the caller to wait for a reply from the
|
|
device.'''
|
|
requires_request = True
|
|
'''True if this message sends something to the device.'''
|
|
|
|
OPCODE_NOOP = 'noop'
|
|
'''A custom opcode for a noop function. Used where functionality was
|
|
removed in later versions but to keep the caller stack simpler, we
|
|
just provide a noop Msg.'''
|
|
|
|
def __init__(self, callback, userdata=None, timeout=None):
|
|
super().__init__()
|
|
assert self.opcode is not None
|
|
assert self.protocol is not None
|
|
assert self.interaction is not None
|
|
self._callback = callback
|
|
self.errorcode = None
|
|
self.userdata = userdata
|
|
self.timeout = timeout
|
|
self.args = [0x00] # Empty messages don't exist
|
|
|
|
@property
|
|
def args(self):
|
|
'''
|
|
The arguments sent to the device as list of integers. Default is
|
|
[0x00], i.e. a message of length 1 with a constant 0 as argument.
|
|
'''
|
|
return self._args
|
|
|
|
@args.setter
|
|
def args(self, args):
|
|
self._args = args
|
|
|
|
def _handle_reply(self, reply):
|
|
'''Override this in the subclass to handle the reply.
|
|
|
|
This is the default reply handler that deals with the 0xb3 ACK/Error
|
|
messages and throws the respective exceptions.
|
|
|
|
:param reply: A :class:`NordicData` object
|
|
'''
|
|
if reply.opcode != 0xb3:
|
|
raise UnexpectedReply(self)
|
|
|
|
if reply[0] != 0x00:
|
|
raise DeviceError(reply[0])
|
|
|
|
def execute(self):
|
|
'''
|
|
The function to trigger the actual communication. This function
|
|
succeeds or raises a Wacom*Exception on error.
|
|
'''
|
|
if self.opcode == Msg.OPCODE_NOOP:
|
|
return self # allow chaining
|
|
|
|
self.request = NordicData([self.opcode, len(self.args or []), *(self.args or [])])
|
|
self.reply = self._callback(request=self.request if self.requires_request else None,
|
|
requires_reply=self.requires_reply,
|
|
timeout=self.timeout or None,
|
|
userdata=self.userdata)
|
|
if self.requires_reply:
|
|
if self.reply is None:
|
|
raise MissingReplyError(self.request)
|
|
try:
|
|
self._handle_reply(self.reply)
|
|
# no exception? we can assume success
|
|
self.errorcode = DeviceError.ErrorCode.SUCCESS
|
|
except DeviceError as e:
|
|
self.errorcode = e.errorcode
|
|
raise e
|
|
return self # allow chaining
|
|
|
|
def __repr__(self):
|
|
return f'{self.__class__}: {self.interaction} - {self.request} → {self.reply}'
|
|
|
|
|
|
class MsgConnectIntuosPro(Msg):
|
|
interaction = Interactions.CONNECT
|
|
opcode = 0xe6
|
|
protocol = ProtocolVersion.INTUOS_PRO
|
|
|
|
def __init__(self, uuid, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.uuid = uuid
|
|
self.args = [int(i) for i in binascii.unhexlify(uuid)]
|
|
if len(self.args) != 6:
|
|
raise ValueError('UUID must be 6 bytes long')
|
|
|
|
def _handle_reply(self, reply):
|
|
if reply.opcode == 0x50:
|
|
# maybe check reply.data == the uuid we sent
|
|
pass # success
|
|
elif reply.opcode == 0x51:
|
|
# first 6 bytes are the uuuid we just sent
|
|
reason = reply[6]
|
|
if reason in [0x00, 0x03]: # invalid state
|
|
raise DeviceError(DeviceError.ErrorCode.INVALID_STATE)
|
|
elif reason in [0x01, 0x02]: # incorrect uuuid
|
|
raise AuthorizationError()
|
|
raise UnexpectedReply(reply, message=f'Unknown error code: {reason}')
|
|
else:
|
|
raise UnexpectedReply(reply)
|
|
|
|
|
|
class MsgConnectSpark(Msg):
|
|
interaction = Interactions.CONNECT
|
|
opcode = 0xe6
|
|
|
|
def __init__(self, uuid, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.uuid = uuid
|
|
self.args = [int(i) for i in binascii.unhexlify(uuid)]
|
|
if len(self.args) != 6:
|
|
raise ValueError('UUID must be 6 bytes long')
|
|
|
|
# uses the default 0xb3 handler
|
|
|
|
|
|
class MsgGetName(Msg):
|
|
'''
|
|
.. attribute:: name
|
|
|
|
The device name as reported by the device
|
|
'''
|
|
interaction = Interactions.GET_NAME
|
|
opcode = 0xbb
|
|
protocol = ProtocolVersion.ANY
|
|
|
|
def _handle_reply(self, reply):
|
|
if reply.opcode != 0xbc:
|
|
raise UnexpectedReply(f'Unknown reply: {reply.opcode}')
|
|
self.name = bytes(reply).decode('utf-8')
|
|
|
|
|
|
class MsgGetNameIntuosPro(Msg):
|
|
'''
|
|
.. attribute:: name
|
|
|
|
The device name as reported by the device
|
|
'''
|
|
interaction = Interactions.GET_NAME
|
|
opcode = 0xdb
|
|
protocol = ProtocolVersion.INTUOS_PRO
|
|
|
|
def _handle_reply(self, reply):
|
|
if reply.opcode != 0xbc:
|
|
raise UnexpectedReply(self)
|
|
self.name = bytes(reply).decode('utf-8')
|
|
|
|
|
|
class MsgSetName(Msg):
|
|
'''
|
|
:param name: The device name to set on the device
|
|
|
|
.. attribute:: name
|
|
|
|
The device name as set with this request
|
|
'''
|
|
interaction = Interactions.SET_NAME
|
|
opcode = 0xbb
|
|
protocol = ProtocolVersion.ANY
|
|
|
|
def __init__(self, name, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
# On the Spark, the name needs a trailing linebreak, otherwise the
|
|
# firmware gets confused.
|
|
self.args = [ord(c) for c in name] + [0x0a]
|
|
|
|
# uses the default 0xb3 handler
|
|
|
|
|
|
class MsgSetNameIntuosPro(Msg):
|
|
'''
|
|
:param name: The device name to set on the device
|
|
|
|
.. attribute:: name
|
|
|
|
The device name as set with this request
|
|
'''
|
|
interaction = Interactions.SET_NAME
|
|
opcode = 0xdb
|
|
protocol = ProtocolVersion.INTUOS_PRO
|
|
|
|
def __init__(self, name, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.args = [ord(c) for c in name]
|
|
|
|
# uses the default 0xb3 handler
|
|
|
|
|
|
class MsgGetTime(Msg):
|
|
'''
|
|
.. attribute:: timestamp
|
|
|
|
The time in seconds since UNIX epoch
|
|
'''
|
|
interaction = Interactions.GET_TIME
|
|
opcode = 0xb6
|
|
protocol = ProtocolVersion.ANY
|
|
|
|
def _handle_reply(self, reply):
|
|
import calendar
|
|
|
|
if reply.opcode != 0xbd:
|
|
raise UnexpectedReply(self)
|
|
|
|
if reply.length != 6:
|
|
raise UnexpectedDataError(f'Invalid reply length: expected 6, have {reply.length}')
|
|
|
|
# Assumption: device is in UTC
|
|
str_timestamp = ''.join([f'{b:02x}' for b in reply])
|
|
t = time.strptime(str_timestamp, '%y%m%d%H%M%S')
|
|
self.timestamp = calendar.timegm(t)
|
|
|
|
|
|
class MsgGetTimeIntuosPro(Msg):
|
|
'''
|
|
.. attribute:: timestamp
|
|
|
|
The time in seconds since UNIX epoch
|
|
'''
|
|
interaction = Interactions.GET_TIME
|
|
opcode = 0xd6
|
|
protocol = ProtocolVersion.INTUOS_PRO
|
|
|
|
def _handle_reply(self, reply):
|
|
if reply.opcode != 0xbd:
|
|
raise UnexpectedReply(self)
|
|
|
|
if reply.length != 6:
|
|
raise UnexpectedDataError(f'Invalid reply length: expected 6, have {reply.length}')
|
|
|
|
self.timestamp = little_u32(reply[0:4]) # bytes[5:6] are ms
|
|
|
|
|
|
class MsgSetTime(Msg):
|
|
'''
|
|
:param timestamp: The current time in seconds since UNIX epoch
|
|
|
|
.. attribute:: timestamp
|
|
|
|
The time in seconds since UNIX epoch
|
|
'''
|
|
interaction = Interactions.SET_TIME
|
|
opcode = 0xb6
|
|
protocol = ProtocolVersion.ANY
|
|
|
|
def __init__(self, timestamp, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.timestamp = int(timestamp)
|
|
# IntuosPro and later use the same request but a different time format
|
|
current_time = time.strftime('%y%m%d%H%M%S', time.gmtime(self.timestamp))
|
|
self.args = [int(i) for i in binascii.unhexlify(current_time)]
|
|
|
|
# uses the default 0xb3 handler
|
|
|
|
|
|
class MsgSetTimeIntuosPro(Msg):
|
|
'''
|
|
:param timestamp: The current time in seconds since UNIX epoch
|
|
|
|
.. attribute:: timestamp
|
|
|
|
The time in seconds since UNIX epoch
|
|
'''
|
|
interaction = Interactions.SET_TIME
|
|
opcode = 0xb6
|
|
protocol = ProtocolVersion.INTUOS_PRO
|
|
|
|
def __init__(self, timestamp, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.timestamp = int(timestamp)
|
|
self.args = list(little_u32(self.timestamp)) + [0x00, 0x00]
|
|
|
|
# uses the default 0xb3 handler
|
|
|
|
|
|
class MsgGetFirmwareVersion(Msg):
|
|
'''
|
|
.. attribute:: firmware
|
|
|
|
The firmware version as a string
|
|
'''
|
|
interaction = Interactions.GET_FIRMWARE
|
|
opcode = 0xb7
|
|
protocol = ProtocolVersion.ANY
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.args = [0]
|
|
self._lo = None
|
|
self._hi = None
|
|
|
|
def _handle_reply(self, reply):
|
|
if reply.opcode != 0xb8:
|
|
raise UnexpectedReply(self)
|
|
|
|
if self.args[0] == 0:
|
|
self._hi = ''.join([hex(d)[2:] for d in reply[1:]])
|
|
elif self.args[0] == 1:
|
|
self._lo = ''.join([hex(d)[2:] for d in reply[1:]])
|
|
|
|
if self._hi is not None and self._lo is not None:
|
|
self.firmware = f'{self._hi}-{self._lo}'
|
|
|
|
def execute(self):
|
|
# We need two requests with different args to get the full
|
|
# firmware information
|
|
self.args = [0]
|
|
super().execute()
|
|
self.args = [1]
|
|
super().execute()
|
|
return self
|
|
|
|
|
|
class MsgGetFirmwareVersionIntuosPro(Msg):
|
|
'''
|
|
.. attribute:: firmware
|
|
|
|
The firmware version as a string
|
|
'''
|
|
interaction = Interactions.GET_FIRMWARE
|
|
opcode = 0xb7
|
|
protocol = ProtocolVersion.INTUOS_PRO
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self._lo = None
|
|
self._hi = None
|
|
|
|
def _handle_reply(self, reply):
|
|
if reply.opcode != 0xb8:
|
|
raise UnexpectedReply(self)
|
|
|
|
if self.args[0] == 0:
|
|
self._hi = ''.join([chr(d) for d in reply[1:]])
|
|
elif self.args[0] == 1:
|
|
self._lo = ''.join([chr(d) for d in reply[1:]])
|
|
|
|
if self._hi is not None and self._lo is not None:
|
|
self.firmware = f'{self._hi}-{self._lo}'
|
|
|
|
def execute(self):
|
|
# We need two requests with different args to get the full
|
|
# firmware information
|
|
self.args = [0]
|
|
super().execute()
|
|
self.args = [1]
|
|
super().execute()
|
|
return self
|
|
|
|
|
|
class MsgGetBattery(Msg):
|
|
'''
|
|
.. attribute:: battery_percent
|
|
|
|
The battery charge in percent
|
|
|
|
.. attribute:: battery_is_charging
|
|
|
|
``True`` if charging, ``False`` if discharging
|
|
'''
|
|
interaction = Interactions.GET_BATTERY
|
|
opcode = 0xb9
|
|
protocol = ProtocolVersion.ANY
|
|
|
|
def _handle_reply(self, reply):
|
|
if reply.opcode != 0xba:
|
|
raise UnexpectedReply(self)
|
|
|
|
self.battery_percent = int(reply[0])
|
|
self.battery_is_charging = reply[1] == 1
|
|
|
|
|
|
class MsgGetWidthSpark(Msg):
|
|
'''
|
|
This is a fake message. The Spark doesn't seem to have a getter for this
|
|
one, it just times out. We just hardcode the value here.
|
|
|
|
.. attribute:: width
|
|
|
|
The width of the tablet in points (see :class:`MsgGetPointSize`)
|
|
'''
|
|
interaction = Interactions.GET_WIDTH
|
|
opcode = Msg.OPCODE_NOOP
|
|
protocol = ProtocolVersion.ANY
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.width = 21000
|
|
|
|
|
|
class MsgGetWidthSlate(Msg):
|
|
'''
|
|
.. attribute:: width
|
|
|
|
The width of the tablet in points (see :class:`MsgGetPointSize`)
|
|
'''
|
|
interaction = Interactions.GET_WIDTH
|
|
opcode = 0xea
|
|
protocol = ProtocolVersion.SLATE
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.args = [0x03, 0x00]
|
|
|
|
def _handle_reply(self, reply):
|
|
if reply.opcode != 0xeb:
|
|
raise UnexpectedReply(self)
|
|
|
|
if little_u16(reply[0:2]) != 0x3 or len(reply) != 6:
|
|
raise UnexpectedDataError(reply)
|
|
|
|
self.width = little_u32(reply[2:6])
|
|
|
|
|
|
class MsgGetHeightSpark(Msg):
|
|
'''
|
|
This is a fake message. The Spark doesn't seem to have a getter for this
|
|
one, it just times out. We just hardcode the value here.
|
|
|
|
.. attribute:: height
|
|
|
|
The height of the tablet in points (see :class:`MsgGetPointSize`)
|
|
'''
|
|
interaction = Interactions.GET_HEIGHT
|
|
opcode = Msg.OPCODE_NOOP
|
|
protocol = ProtocolVersion.ANY
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.height = 14800
|
|
|
|
|
|
class MsgGetHeightSlate(Msg):
|
|
'''
|
|
.. attribute:: height
|
|
|
|
The height of the tablet in points (see :class:`MsgGetPointSize`)
|
|
'''
|
|
interaction = Interactions.GET_HEIGHT
|
|
opcode = 0xea
|
|
protocol = ProtocolVersion.SLATE
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.args = little_u16(0x04)
|
|
|
|
def _handle_reply(self, reply):
|
|
if reply.opcode != 0xeb:
|
|
raise UnexpectedReply(self)
|
|
|
|
if little_u16(reply[0:2]) != 0x4 or len(reply) != 6:
|
|
raise UnexpectedDataError(reply)
|
|
|
|
self.height = little_u32(reply[2:6])
|
|
|
|
|
|
class MsgGetPointSizeSpark(Msg):
|
|
'''
|
|
This is a fake message. The Spark and Slate doesn't seem to have a
|
|
getter for this one, it just times out. We just hardcode the value here.
|
|
|
|
.. attribute:: point_size
|
|
|
|
The point_size of the tablet in µm
|
|
'''
|
|
interaction = Interactions.GET_POINT_SIZE
|
|
opcode = Msg.OPCODE_NOOP
|
|
protocol = ProtocolVersion.ANY
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.point_size = 10
|
|
|
|
|
|
class MsgGetPointSize(Msg):
|
|
'''
|
|
.. attribute:: point_size
|
|
|
|
The point size in micrometers
|
|
'''
|
|
interaction = Interactions.GET_POINT_SIZE
|
|
opcode = 0xea
|
|
protocol = ProtocolVersion.INTUOS_PRO
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.args = little_u16(0x14)
|
|
|
|
def _handle_reply(self, reply):
|
|
if reply.opcode != 0xeb:
|
|
raise UnexpectedReply(self)
|
|
|
|
if little_u16(reply[0:2]) != 0x14:
|
|
raise UnexpectedDataError(reply)
|
|
|
|
# This is strange. The return value is supposed to be the point size
|
|
# but it's off by one. The IntuosPro returns 6 but a point size of 5
|
|
# matches the physical dimensions. So let's assume there's a bug in
|
|
# the firmware or the specs are wrong or something.
|
|
self.point_size = little_u32(reply[2:6]) - 1
|
|
|
|
|
|
class MsgUnknownE3Command(Msg):
|
|
interaction = Interactions.UNKNOWN_E3
|
|
opcode = 0xe3
|
|
protocol = ProtocolVersion.ANY
|
|
|
|
# no arguments, uses the default 0xb3 handler
|
|
|
|
|
|
class MsgSetFileTransferReportingType(Msg):
|
|
'''
|
|
Changes where the device needs to send the data to.
|
|
0x00 is on the FFEE0003 GATT.
|
|
'''
|
|
interaction = Interactions.SET_FILE_TRANSFER_REPORTING_TYPE
|
|
opcode = 0xec
|
|
protocol = ProtocolVersion.ANY
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.args = [0x06, 0x00, 0x00, 0x00, 0x00, 0x00]
|
|
|
|
# uses the default 0xb3 handler
|
|
|
|
|
|
class MsgSetMode(Msg):
|
|
'''
|
|
:param mode: one of :class:`Mode`
|
|
|
|
.. attribute:: mode
|
|
|
|
The :class:`Mode` of the tablet
|
|
'''
|
|
interaction = Interactions.SET_MODE
|
|
opcode = 0xb1
|
|
protocol = ProtocolVersion.ANY
|
|
|
|
def __init__(self, mode, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.mode = Mode(mode)
|
|
self.args = [int(mode)]
|
|
|
|
# uses the default 0xb3 handler
|
|
|
|
|
|
class MsgGetStrokesSpark(Msg):
|
|
'''
|
|
.. attribute:: count
|
|
|
|
The number of drawings available
|
|
|
|
.. attribute:: timestamp
|
|
|
|
The timestamp of the strokes sequence
|
|
'''
|
|
interaction = Interactions.GET_STROKES
|
|
opcode = 0xc5
|
|
protocol = ProtocolVersion.ANY
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.count = 0
|
|
|
|
def _handle_reply(self, reply):
|
|
# This is an odd message, we have one request but one or two
|
|
# replies. The 0xc7 reply is sometimes missing, unclear though when.
|
|
if reply.opcode == 0xc7:
|
|
self.count = int.from_bytes(reply[0:4], byteorder='big')
|
|
|
|
# Re-execute the message but this time without a new request
|
|
self.requires_request = False
|
|
self.execute()
|
|
self.requires_request = True
|
|
elif reply.opcode == 0xcd:
|
|
import calendar
|
|
|
|
str_timestamp = ''.join([f'{d:02x}' for d in reply])
|
|
t = time.strptime(str_timestamp, '%y%m%d%H%M%S')
|
|
self.timestamp = calendar.timegm(t)
|
|
else:
|
|
raise UnexpectedReply(reply)
|
|
|
|
|
|
class MsgGetStrokesSlate(Msg):
|
|
'''
|
|
.. attribute:: count
|
|
|
|
The number of drawings available
|
|
|
|
.. attribute:: timestamp
|
|
|
|
The timestamp of the strokes sequence in seconds since UNIX epoch
|
|
'''
|
|
interaction = Interactions.GET_STROKES
|
|
opcode = 0xcc
|
|
protocol = ProtocolVersion.SLATE
|
|
|
|
def _handle_reply(self, reply):
|
|
import calendar
|
|
|
|
if reply.opcode != 0xcf:
|
|
raise UnexpectedReply(reply)
|
|
|
|
self.count = little_u32(reply[0:4])
|
|
str_timestamp = ''.join([f'{d:02x}' for d in reply[4:]])
|
|
t = time.strptime(str_timestamp, '%y%m%d%H%M%S')
|
|
self.timestamp = calendar.timegm(t)
|
|
|
|
|
|
class MsgGetStrokesIntuosPro(Msg):
|
|
'''
|
|
.. attribute:: count
|
|
|
|
The number of drawings available
|
|
|
|
.. attribute:: timestamp
|
|
|
|
The timestamp of the strokes sequence
|
|
'''
|
|
interaction = Interactions.GET_STROKES
|
|
opcode = 0xcc
|
|
protocol = ProtocolVersion.INTUOS_PRO
|
|
|
|
# same as the slate version, but the timestamp handling differs
|
|
|
|
def _handle_reply(self, reply):
|
|
if reply.opcode != 0xcf:
|
|
raise UnexpectedReply(reply)
|
|
|
|
self.count = little_u32(reply[0:4])
|
|
seconds = little_u32(reply[4:8])
|
|
self.timestamp = seconds
|
|
|
|
|
|
class MsgAvailableFilesCount(Msg):
|
|
'''
|
|
.. attribute:: count
|
|
|
|
The number of drawings available
|
|
'''
|
|
interaction = Interactions.AVAILABLE_FILES_COUNT
|
|
opcode = 0xc1
|
|
protocol = ProtocolVersion.ANY
|
|
|
|
def _handle_reply(self, reply):
|
|
if reply.opcode != 0xc2:
|
|
raise UnexpectedReply(self)
|
|
|
|
self.count = int.from_bytes(reply[0:2], byteorder='big')
|
|
|
|
|
|
class MsgAvailableFilesCountSlate(Msg):
|
|
'''
|
|
.. attribute:: count
|
|
|
|
The number of drawings available
|
|
'''
|
|
interaction = Interactions.AVAILABLE_FILES_COUNT
|
|
opcode = 0xc1
|
|
protocol = ProtocolVersion.SLATE
|
|
|
|
def _handle_reply(self, reply):
|
|
if reply.opcode != 0xc2:
|
|
raise UnexpectedReply(self)
|
|
|
|
self.count = little_u16(reply[0:2])
|
|
|
|
|
|
class MsgDownloadOldestFile(Msg):
|
|
interaction = Interactions.DOWNLOAD_OLDEST_FILE
|
|
opcode = 0xc3
|
|
protocol = ProtocolVersion.ANY
|
|
|
|
def _handle_reply(self, reply):
|
|
if reply.opcode != 0xc8:
|
|
raise UnexpectedReply(self)
|
|
|
|
if reply[0] != 0xbe:
|
|
raise UnexpectedDataError(reply)
|
|
|
|
|
|
class MsgWaitForEndRead(Msg):
|
|
'''
|
|
.. attribute:: crc
|
|
|
|
The checksum provided for the (out of band) pen data.
|
|
'''
|
|
interaction = Interactions.WAIT_FOR_END_READ
|
|
requires_request = False
|
|
opcode = 0x00 # unused
|
|
protocol = ProtocolVersion.ANY
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(timeout=5, *args, **kwargs)
|
|
|
|
def _handle_reply(self, reply):
|
|
if reply.opcode == 0xc8:
|
|
if reply[0] != 0xed:
|
|
raise UnexpectedDataError(reply, 'Expected c8 ed')
|
|
pass # nothing to do here
|
|
elif reply.opcode == 0xc9:
|
|
self.crc = int(binascii.hexlify(bytes(reply)), 16)
|
|
else:
|
|
raise UnexpectedReply(reply)
|
|
|
|
def execute(self):
|
|
# This is a double-reply , once c8, then c9
|
|
super().execute()
|
|
super().execute()
|
|
return self
|
|
|
|
|
|
class MsgWaitForEndReadSlate(Msg):
|
|
'''
|
|
.. attribute:: crc
|
|
|
|
The checksum provided for the (out of band) pen data.
|
|
'''
|
|
interaction = Interactions.WAIT_FOR_END_READ
|
|
requires_request = False
|
|
opcode = 0x00 # unused
|
|
protocol = ProtocolVersion.SLATE
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(timeout=5, *args, **kwargs)
|
|
|
|
def _handle_reply(self, reply):
|
|
if reply.opcode == 0xc8:
|
|
if reply[0] != 0xed:
|
|
raise UnexpectedDataError(reply, 'Expected c8 ed')
|
|
crc = reply[1:]
|
|
crc.reverse()
|
|
self.crc = int(binascii.hexlify(bytes(crc)), 16)
|
|
else:
|
|
raise UnexpectedReply(reply)
|
|
|
|
|
|
class MsgDeleteOldestFile(Msg):
|
|
interaction = Interactions.DELETE_OLDEST_FILE
|
|
opcode = 0xca
|
|
protocol = ProtocolVersion.ANY
|
|
requires_reply = False
|
|
|
|
|
|
class MsgDeleteOldestFileSlate(Msg):
|
|
interaction = Interactions.DELETE_OLDEST_FILE
|
|
opcode = 0xca
|
|
protocol = ProtocolVersion.SLATE
|
|
|
|
# uses the default 0xb3 handler
|
|
|
|
|
|
class MsgRegisterComplete(Msg):
|
|
interaction = Interactions.REGISTER_COMPLETE
|
|
opcode = 0xe5
|
|
protocol = ProtocolVersion.ANY
|
|
|
|
# uses the default 0xb3 handler
|
|
|
|
|
|
class MsgRegisterCompleteSlate(Msg):
|
|
'''A noop Msg. This message only exists for the Spark'''
|
|
interaction = Interactions.REGISTER_COMPLETE
|
|
opcode = Msg.OPCODE_NOOP
|
|
protocol = ProtocolVersion.SLATE
|
|
|
|
|
|
class MsgRegisterPressButtonSpark(Msg):
|
|
interaction = Interactions.REGISTER_PRESS_BUTTON
|
|
opcode = 0xe3
|
|
protocol = ProtocolVersion.ANY
|
|
# Does not require a reply, the reply is sent in response to the
|
|
# physical button press.
|
|
requires_reply = False
|
|
|
|
# uuid is unused, just there so it's compatible with the slate message
|
|
def __init__(self, uuid=None, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.args = [0x01]
|
|
self.uuid = uuid
|
|
|
|
|
|
class MsgRegisterPressButtonSlateOrIntuosPro(Msg):
|
|
interaction = Interactions.REGISTER_PRESS_BUTTON
|
|
opcode = 0xe7
|
|
protocol = ProtocolVersion.SLATE
|
|
# Does not require a reply, the reply is sent in response to the
|
|
# physical button press.
|
|
requires_reply = False
|
|
|
|
def __init__(self, uuid, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.uuid = uuid
|
|
self.args = [int(i) for i in binascii.unhexlify(uuid)]
|
|
|
|
|
|
class MsgRegisterWaitForButtonSpark(Msg):
|
|
'''
|
|
.. attribute:: protocol_version
|
|
|
|
The protocol version used by this device, according to this message.
|
|
|
|
'''
|
|
interaction = Interactions.REGISTER_WAIT_FOR_BUTTON
|
|
requires_request = False
|
|
opcode = 0x00 # unused
|
|
protocol = ProtocolVersion.ANY
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
kwargs['timeout'] = 10
|
|
super().__init__(*args, **kwargs)
|
|
self.protocol_version = ProtocolVersion.ANY
|
|
|
|
def _handle_reply(self, reply):
|
|
if reply.opcode != 0xe4:
|
|
raise UnexpectedReply(reply)
|
|
self.protocol_version = ProtocolVersion.SPARK
|
|
|
|
|
|
class MsgRegisterWaitForButtonSlateOrIntuosPro(Msg):
|
|
'''
|
|
.. attribute:: protocol_version
|
|
|
|
The protocol version used by this device, according to this message.
|
|
|
|
'''
|
|
interaction = Interactions.REGISTER_WAIT_FOR_BUTTON
|
|
requires_request = False
|
|
opcode = 0x00 # unused
|
|
protocol = ProtocolVersion.SLATE
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
kwargs['timeout'] = 10
|
|
super().__init__(*args, **kwargs)
|
|
self.protocol_version = ProtocolVersion.ANY
|
|
|
|
def _handle_reply(self, reply):
|
|
if reply.opcode == 0xe4:
|
|
self.protocol_version = ProtocolVersion.SLATE
|
|
elif reply.opcode == 0x53:
|
|
self.protocol_version = ProtocolVersion.INTUOS_PRO
|
|
else:
|
|
raise UnexpectedReply(reply)
|
|
|
|
|
|
class StrokeParsingError(ProtocolError):
|
|
def __init__(self, message, data=[]):
|
|
self.message = message
|
|
self.data = data
|
|
|
|
def __repr__(self):
|
|
if self.data:
|
|
datastr = f' data: {list2hex(self.data)}'
|
|
else:
|
|
datastr = ''
|
|
return f'{self.message}{datastr}'
|
|
|
|
def __str__(self):
|
|
return self.__repr__()
|
|
|
|
|
|
class StrokeDataType(enum.Enum):
|
|
UNKNOWN = enum.auto()
|
|
FILE_HEADER = enum.auto()
|
|
STROKE_HEADER = enum.auto()
|
|
STROKE_END = enum.auto()
|
|
POINT = enum.auto()
|
|
DELTA = enum.auto()
|
|
EOF = enum.auto()
|
|
LOST_POINT = enum.auto()
|
|
|
|
@classmethod
|
|
def identify(cls, data):
|
|
'''
|
|
Returns the identified packet type for the next packet.
|
|
'''
|
|
header = data[0]
|
|
nbytes = bin(header).count('1')
|
|
payload = data[1:1 + nbytes]
|
|
|
|
# Note: the order of the checks below matters
|
|
|
|
# Known file format headers. This is just a version number, I think.
|
|
if data[0:4] == [0x67, 0x82, 0x69, 0x65] or \
|
|
data[0:4] == [0x62, 0x38, 0x62, 0x74]:
|
|
return StrokeDataType.FILE_HEADER
|
|
|
|
# End of stroke, but can sometimes mean end of file too
|
|
if data[0:7] == [0xfc, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff]:
|
|
return StrokeDataType.STROKE_END
|
|
|
|
if payload == [0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff]:
|
|
return StrokeDataType.EOF
|
|
|
|
# all special headers have the lowest two bits set
|
|
if header & 0x3 == 0:
|
|
return StrokeDataType.DELTA
|
|
|
|
if not payload:
|
|
return StrokeDataType.UNKNOWN
|
|
|
|
if payload[0] == 0xfa or payload[0:3] == [0xff, 0xee, 0xee]:
|
|
return StrokeDataType.STROKE_HEADER
|
|
|
|
if payload[0:2] == [0xff, 0xff]:
|
|
return StrokeDataType.POINT
|
|
|
|
if payload[0:2] == [0xdd, 0xdd]:
|
|
return StrokeDataType.LOST_POINT
|
|
|
|
return StrokeDataType.UNKNOWN
|
|
|
|
|
|
class StrokeFile(object):
|
|
'''
|
|
Represents a single file as coming from the device. Note that pen data
|
|
received from the device may include more than one file, this object is
|
|
merely the first represented in this file.
|
|
|
|
.. attribute:: bytesize
|
|
|
|
The length in bytes of the data consumed.
|
|
|
|
.. attribute:: timestamp
|
|
|
|
Creation time of the drawing (when the button was pressed) or None where
|
|
this is not supported by the device.
|
|
|
|
.. attribute:: strokes
|
|
|
|
A list of strokes, each a list of Point(x, y, p) namedtuples.
|
|
Coordinates for the points are in absolute device units.
|
|
|
|
'''
|
|
def __init__(self, data):
|
|
self.data = data
|
|
self.file_header = StrokeFileHeader(data[:16])
|
|
|
|
logger.debug(self.file_header)
|
|
|
|
self.bytesize = self.file_header.size
|
|
|
|
offset = self.file_header.size
|
|
self.timestamp = self.file_header.timestamp
|
|
self.bytesize += self._parse_data(data[offset:])
|
|
|
|
def _parse_data(self, data):
|
|
# the data formats we return
|
|
Stroke = namedtuple('Stroke', ['points'])
|
|
Point = namedtuple('Point', ['x', 'y', 'p'])
|
|
|
|
last_point = None # abs coords for most recent point
|
|
last_delta = Point(0, 0, 0) # delta accumulates
|
|
|
|
strokes = [] # all strokes
|
|
points = [] # Points of current strokes
|
|
|
|
consumed = 0
|
|
|
|
# Note about the below: this was largely reverse-engineered because
|
|
# the specs we have access to are either ambiguous or outright wrong.
|
|
#
|
|
# First byte is a bitmask that seems to indicate how many bytes.
|
|
#
|
|
# Where the header byte has the lowest two bits set, it can be
|
|
# one of several packages:
|
|
# - a StrokeHeader [0xfa] to indicate a new stroke
|
|
# - end of stroke - all payload bytes are 0xff
|
|
# - lost point [0xdd, 0xdd] - firmware couldn't record a point
|
|
# - a StrokePoint [0xff, 0xff] a fully specified point. Always
|
|
# the first after a StrokeHeader but may also appear elsewhere.
|
|
#
|
|
# Where the header byte has the lowest two bits on zero, it is
|
|
# a StrokeDelta, a variable sized payload following the header,
|
|
# values depend on the bits set in the header.
|
|
#
|
|
# In theory all the header packages should have a header of 0xff,
|
|
# but they don't. End of stroke may have 0xfc, a StrokePoint
|
|
# sometimes has 0xbf. It is unknown why.
|
|
#
|
|
# The StrokePoint is strange since if can sometimes contain deltas
|
|
# (bitmask 0xbf). So it's just a delta with an extra two bytes for
|
|
# headers, so what is the point of it? Presumably a firmware bug or
|
|
# something.
|
|
while data:
|
|
packet_type = StrokeDataType.identify(data)
|
|
logger.debug(f'Next data packet {packet_type.name}: {list2hex(data[:16])} …')
|
|
|
|
packet = None
|
|
if packet_type == StrokeDataType.UNKNOWN:
|
|
packet = StrokePacketUnknown(data)
|
|
elif packet_type == StrokeDataType.FILE_HEADER:
|
|
# This code shouldn't be triggered, we handle the file
|
|
# header outside this function.
|
|
packet = StrokeFileHeader(data)
|
|
logger.error(f'Unexpected file header at byte {consumed}: {packet}')
|
|
break
|
|
elif packet_type == StrokeDataType.STROKE_END:
|
|
packet = StrokeEndOfStroke(data)
|
|
if points:
|
|
strokes.append(Stroke(points))
|
|
points = []
|
|
elif packet_type == StrokeDataType.EOF:
|
|
# EOF means pack
|
|
packet = StrokeEOF(data)
|
|
if points:
|
|
strokes.append(Stroke(points))
|
|
points = []
|
|
data = data[packet.size:]
|
|
consumed += packet.size
|
|
break
|
|
elif packet_type == StrokeDataType.STROKE_HEADER:
|
|
# New stroke means resetting delta and storing the last
|
|
# stroke
|
|
packet = StrokeHeader(data)
|
|
last_delta = Point(0, 0, 0)
|
|
if points:
|
|
strokes.append(Stroke(points))
|
|
points = []
|
|
elif packet_type == StrokeDataType.LOST_POINT:
|
|
# We don't yet handle lost points
|
|
packet = StrokeLostPoint(data)
|
|
elif (packet_type == StrokeDataType.POINT or
|
|
packet_type == StrokeDataType.DELTA):
|
|
# POINT and DELTA *should* be two different packages but
|
|
# sometimes a POINT includes a delta for a coordinate. So
|
|
# all a POINT is is a delta with an added [0xff 0xff] after
|
|
# the header byte. The StrokePoint packet hides this so we
|
|
# can process both the same way.
|
|
if packet_type == StrokeDataType.POINT:
|
|
packet = StrokePoint(data)
|
|
if last_point is None:
|
|
last_point = Point(packet.x, packet.y, packet.p)
|
|
else:
|
|
packet = StrokeDelta(data)
|
|
|
|
# Compression algorithm in the device basically keeps a
|
|
# cumulative delta so that
|
|
# P0 = absolute x, y, z
|
|
# P1 = P0 + d1
|
|
# P2 = P0 + 2*d1 + d2
|
|
# P3 = P0 + 3*d1 + 2*d2 + d3
|
|
# And we use that here by just keeping the last delta
|
|
# around, adding to it where necessary and then adding it to
|
|
# the last point we have.
|
|
#
|
|
# Whenever we get an absolute coordinate, the delta resets
|
|
# to 0. Since this is per axis, our fictional P4 may be:
|
|
# P4(x) = P0 + 4*d1 + 3*d2 + 2*d3 + d4
|
|
# P4(y) = P0 + 4*d1 + 2*d3 ... d2 and d4 are missing (zero)
|
|
# P4(p) = P4(p) .... absolute
|
|
dx, dy, dp = last_delta
|
|
x, y, p = last_point
|
|
if packet.dx is not None:
|
|
dx += packet.dx
|
|
elif packet.x is not None:
|
|
x = packet.x
|
|
dx = 0
|
|
|
|
if packet.dy is not None:
|
|
dy += packet.dy
|
|
elif packet.y is not None:
|
|
y = packet.y
|
|
dy = 0
|
|
|
|
if packet.dp is not None:
|
|
dp += packet.dp
|
|
elif packet.p is not None:
|
|
p = packet.p
|
|
dp = 0
|
|
|
|
# dx,dy,dp ... are cumulative deltas for this packet
|
|
# x,y,p ... most recent known abs coordinates
|
|
# add those two together and we have the real coordinates
|
|
# and the baseline for the next point
|
|
last_delta = Point(dx, dy, dp)
|
|
current_point = Point(x, y, p)
|
|
last_point = Point(current_point.x + last_delta.x,
|
|
current_point.y + last_delta.y,
|
|
current_point.p + last_delta.p)
|
|
logger.debug(f'Calculated point: {last_point}')
|
|
points.append(last_point)
|
|
else:
|
|
# should never get here
|
|
raise StrokeParsingError(f'Failed to parse', data[:16])
|
|
|
|
logger.debug(f'Offset {consumed}: {packet}')
|
|
consumed += packet.size
|
|
data = data[packet.size:]
|
|
|
|
self.strokes = strokes
|
|
return consumed
|
|
|
|
|
|
class StrokePacket(object):
|
|
'''
|
|
.. attribute: size
|
|
|
|
Size of the packet in bytes
|
|
'''
|
|
def __init__(self):
|
|
self.size = 0
|
|
|
|
|
|
class StrokePacketUnknown(StrokePacket):
|
|
def __init__(self, data):
|
|
header = data[0]
|
|
nbytes = bin(header).count('1')
|
|
self.size = 1 + nbytes
|
|
self.data = data[:self.size]
|
|
|
|
def __repr__(self):
|
|
return f'Unknown packet: {list2hex(self.data)}'
|
|
|
|
|
|
class StrokeFileHeader(StrokePacket):
|
|
'''
|
|
Each data packet has a file header consisting of 4 bytes file version
|
|
number and optionally extra data.
|
|
|
|
.. attribute: timestamp
|
|
|
|
The timestamp of this drawing or ``None`` where not available.
|
|
|
|
.. attribute: nstrokes
|
|
|
|
The count of strokes within this drawing or ``None`` where not
|
|
available. This count is inaccurate anyway, so it should only be
|
|
used for basic internal checks.
|
|
|
|
'''
|
|
def __init__(self, data):
|
|
key = little_u32(data[:4])
|
|
file_formats = {
|
|
little_u32([0x67, 0x82, 0x69, 0x65]): self._parse_intuos_pro,
|
|
little_u32([0x62, 0x38, 0x62, 0x74]): self._parse_spark,
|
|
}
|
|
|
|
self.timestamp = None
|
|
self.nstrokes = None
|
|
|
|
try:
|
|
func = file_formats[key]
|
|
func(data)
|
|
except KeyError:
|
|
raise StrokeParsingError(f'Unknown file format:', data[:4])
|
|
|
|
def __repr__(self):
|
|
t = time.strftime("%y%m%d%H%M%S", time.gmtime(self.timestamp))
|
|
return f'FileHeader: time: {t}, stroke count: {self.nstrokes}'
|
|
|
|
def _parse_intuos_pro(self, data):
|
|
self.timestamp = int.from_bytes(data[4:8], byteorder='little')
|
|
# plus two bytes for ms, always zero
|
|
self.nstrokes = int.from_bytes(data[10:14], byteorder='little')
|
|
# plus two bytes always zero
|
|
self.size = 16
|
|
|
|
def _parse_spark(self, data):
|
|
self.size = 4
|
|
|
|
|
|
class StrokeHeader(StrokePacket):
|
|
'''
|
|
.. attribute:: pen_id
|
|
|
|
The pen serial number or 0 if none is set
|
|
|
|
.. attribute:: pen_type
|
|
|
|
The pen type
|
|
|
|
.. attribute:: timestamp
|
|
|
|
The timestamp of this stroke or None if none was recorded
|
|
|
|
.. attribute:: time_offset
|
|
|
|
The time offset in ms since powerup or None if this stroke has an
|
|
absolute timestamp.
|
|
|
|
.. attribute:: is_new_layer
|
|
|
|
True if this stroke is on a new layer
|
|
'''
|
|
def __init__(self, data):
|
|
header = data[0]
|
|
payload = data[1:]
|
|
self.size = bin(header).count('1') + 1
|
|
if payload[0] == 0xfa:
|
|
self._parse_intuos_pro(data, header, payload)
|
|
elif payload[0:3] == [0xff, 0xee, 0xee]:
|
|
self._parse_slate(data, header, payload)
|
|
else:
|
|
raise StrokeParsingError(f'Invalid StrokeHeader, expected ff fa or ff ee.', data[:8])
|
|
|
|
def _parse_slate(self, data, header, payload):
|
|
self.pen_id = 0
|
|
self.pen_type = 0
|
|
self.is_new_layer = False
|
|
|
|
self.timestamp = None
|
|
self.time_offset = little_u16(payload[4:6]) * 5 # in 5ms resolution
|
|
|
|
# On the first stroke after the file header, this packet is 6 bytes
|
|
# only. Other strokes have 8 bytes but the last two bytes are always
|
|
# zero.
|
|
|
|
def _parse_intuos_pro(self, data, header, payload):
|
|
flags = payload[1]
|
|
needs_pen_id = flags & 0x80
|
|
self.pen_type = flags & 0x3f
|
|
self.is_new_layer = (flags & 0x40) != 0
|
|
self.pen_id = 0
|
|
self.timestamp = int.from_bytes(payload[2:6], byteorder='little')
|
|
self.time_offset = None
|
|
# FIXME: plus two bytes for milis
|
|
self.size = bin(header).count('1') + 1
|
|
|
|
# if the pen id flag is set, the pen ID comes in the next 8-byte
|
|
# packet (plus 0xff header)
|
|
if needs_pen_id:
|
|
pen_packet = data[self.size + 1:]
|
|
if not pen_packet:
|
|
raise StrokeParsingError('Missing pen ID packet')
|
|
|
|
header = data[0]
|
|
if header != 0xff:
|
|
raise StrokeParsingError(f'Unexpected pen id packet header: {header}.', data[:9])
|
|
|
|
nbytes = bin(header).count('1')
|
|
self.pen_id = little_u64(pen_packet[:8])
|
|
self.size += 1 + nbytes
|
|
|
|
def __repr__(self):
|
|
if self.timestamp is not None:
|
|
t = time.strftime(f'%y%m%d%H%M%S', time.gmtime(self.timestamp))
|
|
else:
|
|
t = time.strftime(f'boot+{self.time_offset/1000}s')
|
|
return f'StrokeHeader: time: {t} new layer: {self.is_new_layer}, pen type: {self.pen_type}, pen id: {self.pen_id:#x}'
|
|
|
|
|
|
class StrokeDelta(object):
|
|
'''
|
|
.. attribute:: x
|
|
|
|
The absolute x coordinate or None if this is packet contains a delta
|
|
|
|
.. attribute:: y
|
|
|
|
The absolute y coordinate or None if this is packet contains a delta
|
|
|
|
.. attribute:: p
|
|
|
|
The absolute pressure coordinate or None if this is packet contains a delta
|
|
|
|
.. attribute:: dx
|
|
|
|
The x delta or None if this is packet contains an absolute
|
|
coordinate
|
|
|
|
.. attribute:: dy
|
|
|
|
The y delta or None if this is packet contains an absolute
|
|
coordinate
|
|
|
|
.. attribute:: dp
|
|
|
|
The pressure delta or None if this is packet contains an absolute
|
|
coordinate
|
|
'''
|
|
def __init__(self, data):
|
|
def extract(mask, databytes):
|
|
value = None
|
|
delta = None
|
|
size = 0
|
|
if mask == 0:
|
|
# No data for this coordinate
|
|
pass
|
|
elif mask == 1:
|
|
# Supposedly not implemented by any device.
|
|
#
|
|
# If this would exist, it would throw off the byte count
|
|
# anyway, so this cannot ever exist without breaking
|
|
# everything.
|
|
raise NotImplementedError('This device is not supposed to be exist')
|
|
elif mask == 2:
|
|
# 8 bit delta
|
|
delta = int.from_bytes(bytes([databytes[0]]), byteorder='little', signed=True)
|
|
if delta == 0:
|
|
raise StrokeParsingError(f'StrokeDelta: invalid delta of zero', data)
|
|
assert delta != 0
|
|
size = 1
|
|
elif mask == 3:
|
|
# full abs coordinate
|
|
value = little_u16(databytes[:2])
|
|
size = 2
|
|
return value, delta, size
|
|
|
|
if (data[0] & 0b11) != 0:
|
|
raise NotImplementedError(f'LSB two bits set in mask - this is not supposed to happen')
|
|
|
|
xmask = (data[0] & 0b00001100) >> 2
|
|
ymask = (data[0] & 0b00110000) >> 4
|
|
pmask = (data[0] & 0b11000000) >> 6
|
|
|
|
offset = 1
|
|
x, dx, size = extract(xmask, data[offset:])
|
|
offset += size
|
|
y, dy, size = extract(ymask, data[offset:])
|
|
offset += size
|
|
p, dp, size = extract(pmask, data[offset:])
|
|
offset += size
|
|
|
|
# Note: any of these will be None depending on the packet
|
|
self.dx = dx
|
|
self.dy = dy
|
|
self.dp = dp
|
|
self.x = x
|
|
self.y = y
|
|
self.p = p
|
|
|
|
self.size = offset
|
|
|
|
def __repr__(self):
|
|
def printstring(delta, abs):
|
|
return f'{delta:+5d}' if delta is not None \
|
|
else f'{abs:5d}' if abs is not None \
|
|
else ' ' # noqa
|
|
strx = printstring(self.dx, self.x)
|
|
stry = printstring(self.dy, self.y)
|
|
strp = printstring(self.dp, self.p)
|
|
|
|
return f'StrokeDelta: {strx}/{stry} pressure: {strp}'
|
|
|
|
|
|
class StrokePoint(StrokeDelta):
|
|
'''
|
|
A full point identified by three coordinates (x, y, pressure) in
|
|
absolute coordinates.
|
|
'''
|
|
def __init__(self, data):
|
|
header = data[0]
|
|
payload = data[1:]
|
|
if payload[:2] != [0xff, 0xff]:
|
|
raise StrokeParsingError(f'Invalid StrokePoint, expected ff ff ff', data[:9])
|
|
|
|
# This is a wrapper around StrokeDelta which does the mask parsing.
|
|
# In theory the StrokePoint would be a separate packet but it
|
|
# occasionally uses a header other than 0xff. Which means the packet
|
|
# is completely useless and shouldn't exist because now it's just a
|
|
# StrokeDelta in the form of [header, 0xff, 0xff, payload] and the
|
|
# 0xff just keep the room warm.
|
|
|
|
# StrokeDelta assumes the bottom two bits are unset
|
|
header &= ~0x3
|
|
super().__init__([header] + payload[2:])
|
|
self.size += 2
|
|
|
|
# self.x = little_u16(data[2:4])
|
|
# self.y = little_u16(data[4:6])
|
|
# self.pressure = little_u16(data[6:8])
|
|
|
|
def __repr__(self):
|
|
return f'StrokePoint: {self.x}/{self.y} pressure: {self.p}'
|
|
|
|
|
|
class StrokeEOF(StrokePacket):
|
|
def __init__(self, data):
|
|
header = data[0]
|
|
payload = data[1:]
|
|
nbytes = bin(header).count('1')
|
|
if payload[:nbytes] != [0xff] * nbytes:
|
|
raise StrokeParsingError(f'Invalid EOF, expected 0xff only', data[:9])
|
|
self.size = nbytes + 1
|
|
|
|
|
|
class StrokeEndOfStroke(StrokePacket):
|
|
def __init__(self, data):
|
|
header = data[0]
|
|
payload = data[1:]
|
|
nbytes = bin(header).count('1')
|
|
if payload[:nbytes] != [0xff] * nbytes:
|
|
raise StrokeParsingError(f'Invalid EndOfStroke, expected 0xff only', data[:9])
|
|
self.size = nbytes + 1
|
|
self.data = data[:self.size]
|
|
|
|
def __repr__(self):
|
|
return f'EndOfStroke: {list2hex(self.data)}'
|
|
|
|
|
|
class StrokeLostPoint(StrokePacket):
|
|
'''
|
|
Marker for lost points that the firmware couldn't record coordinates
|
|
for.
|
|
|
|
.. attribute:: nlost
|
|
|
|
The number of points not recorded.
|
|
'''
|
|
def __init__(self, data):
|
|
header = data[0]
|
|
payload = data[1:]
|
|
if payload[:2] != [0xdd, 0xdd]:
|
|
raise StrokeParsingError(f'Invalid StrokeLostPoint, expected ff dd dd', data[:9])
|
|
self.nlost = little_u16(payload[2:4])
|
|
self.size = bin(header).count('1') + 1
|