
1930 lines
58 KiB

#!/usr/bin/env python3
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# Implementation of the BLE protocol of the Wacom SmartPad devices.
# Each device has a different set of functionalities and uses different
# opcodes and message formats. So the entry point is the Protocol class, and
# the messages can be accessed by key.
# def my_callback(request, requires_reply=True, userdata=None, timeout=5):
# if request is not None:
# send_to_device(request)
# if requires_reply:
# return read_from device() or eventually_timeout()
# p = Protocol(ProtocolVersion.INTUOS_PRO, my_callback, userdata)
# m1 = P.get(Interactions.GET_NAME)
# m2 = P.get(Interactions.SET_NAME, 'SomeName')
# Each message is defined by a string (see INTERACTIONS) and takes the obvious
# parameters where applicable.
# The message itself is the protocol-specific Msg, a single logical interaction
# with the device.
# The data exchange is prompted by the execute() function, which sets the
# attributes on the message and can be chained where appropriate, e.g.
# name = m1.execute().name
# if name != 'SomeName':
# m2.execute()
# Because we generally expect everything to work fine and where it doesn't
# it affects a few things anyway, so error handling is via exceptions.
# sequence = [m1, m2, m3, ...]
# try:
# for msg in sequence:
# m.execute()
# except AuthorizationError:
# print("oops, we have the wrong uuid")
import binascii
import enum
import time
import logging
import errno
from collections import namedtuple
from .util import list2hex
logger = logging.getLogger('tuhi.protocol')
def little_u16(x):
Convert to or from a 16-bit integer to a little-endian 2-byte array. If
passed an integer, the return value is a 2-byte array. If passed a
2-byte array, the return value is a 16-bit integer.
if isinstance(x, int):
assert(x <= 0xffff and x >= 0x0000)
return x.to_bytes(2, byteorder='little')
assert(len(x) == 2)
return int.from_bytes(x, byteorder='little')
def little_u32(x):
Convert to or from a 16-bit integer to a little-endian 4-byte array. If
passed an integer, the return value is a 4-byte array. If passed a
4-byte array, the return value is a 16-bit integer.
if isinstance(x, int):
assert(x <= 0xffffffff and x >= 0x00000000)
return x.to_bytes(4, byteorder='little')
assert(len(x) == 4)
return int.from_bytes(x, byteorder='little')
def little_u64(x):
Convert to or from a 64-bit integer to a little-endian 4-byte array. If
passed an integer, the return value is a 8-byte array. If passed a
4-byte array, the return value is a 64-bit integer.
if isinstance(x, int):
assert(x <= 0xffffffffffffffff and x >= 0x0000000000000000)
return x.to_bytes(8, byteorder='little')
assert(len(x) == 8)
return int.from_bytes(x, byteorder='little')
class Interactions(enum.Enum):
'''All possible interactions with a device. Not all of these
interactions may be available on any specific device.'''
CONNECT = enum.auto()
GET_NAME = enum.auto()
SET_NAME = enum.auto()
GET_TIME = enum.auto()
SET_TIME = enum.auto()
GET_FIRMWARE = enum.auto()
GET_BATTERY = enum.auto()
GET_WIDTH = enum.auto()
GET_HEIGHT = enum.auto()
SET_MODE = enum.auto()
GET_STROKES = enum.auto()
DELETE_OLDEST_FILE = enum.auto()
WAIT_FOR_END_READ = enum.auto()
GET_POINT_SIZE = enum.auto()
UNKNOWN_E3 = enum.auto()
def as_hex_string(data):
Returns the given byte-like to a debugging hex string in the form::
12 ab 34 cd 05 ..
Supports bytes and lists of integers.
if isinstance(data, bytes):
hx = binascii.hexlify(data).decode('ascii')
return ' '.join([''.join(x) for x in zip(hx[::2], hx[1::2])])
elif isinstance(data, list):
return ' '.join([f'{x:02x}' for x in data])
raise ValueError('Unsupported data format {data.__class__} for {data}')
def _get_protocol_dictionary(protocol):
Returns a dict with the messages available for devices speaking that
particular protocol. These are classes, not objects, instantiate as
required. Usage::
pdict = get_protocol_dictionary(ProtocolVersion.ANY)
m = pdict[Interactions.GET_NAME]
m = pdict[Interactions.SET_NAME]
The list of functions (``GET_NAME`` in the above example) depends on the
implementation state and may vary between devices.
# Load all classes from this module
import sys
import inspect
classes = inspect.getmembers(sys.modules[__name__],
lambda member: inspect.isclass(member) and
member.__module__ == __name__) # NOQA
# Filter to the ones with Msg as base class
msgs = []
for name, cls in classes:
if cls == Msg:
base_classes = inspect.getmro(cls)
if Msg not in base_classes:
# Now compile the protocol-specific LUT for all functions that we
# suppport
pdict = {}
for cls in msgs:
assert cls.opcode is not None
assert cls.interaction
assert cls.protocol >= ProtocolVersion.ANY
if cls.protocol > protocol:
# Only take the latest version of a message
if cls.interaction in pdict and cls.protocol < pdict[cls.interaction].protocol:
pdict[cls.interaction] = cls
return pdict
class ProtocolVersion(enum.IntEnum):
Protocol version numbers, named after the devices first encountered
on. These version numbers are purely for sorting between devices, i.e.
do not use the numeric values of this enum. That value may change
if we discover more devices that have states in between.
The exact behavior of each protocol is varied, but
* opcodes may differ between protocol versions,
* the data inside a message may differ between versions, nd
* some functionality may only be available in some versions but not
ANY = 0
def from_string(cls, string):
Return the Enum value for the given string, allowing for different
spellings. Specifically: INTUOS_PRO, intuos_pro and intuos-pro are
all allowed for the ``INTUOS_PRO`` enum value.
:raise ValueError: if the name cannot be mapped.
names = {e.name: e for e in cls}
if string in names:
return names[string]
names = {e.name.lower(): e for e in cls}
if string in names:
return names[string]
names = {e.name.lower().replace('_', '-'): e for e in cls}
if string in names:
return names[string]
raise ValueError(string)
class Mode(enum.IntEnum):
The mode the tablet is in. ``LIVE`` mode is when the tablet reports pen
strokes immediately. ``IDLE`` is the live mode but without reporting.
``PAPER`` is the normal mode, i.e. where we download drawings.
LIVE = 0x00
PAPER = 0x01
IDLE = 0x02
class Protocol(object):
The main communication class.
:param protocol_version: a :class:`ProtocolVersion`
:param callback: the callback to invoke for any messages
:param userdata: optional data argument provided to the callback function
def __init__(self, protocol_version, callback, userdata=None):
self.protocol_version = protocol_version
self.callback = callback
self.userdata = userdata
self.lut = _get_protocol_dictionary(protocol_version)
def get(self, key, *args, **kwargs):
Return the message with the given :class:`Interactions` key. This
only returns the message but does not execute it. In most cases,
you want to use :func:`execute` instead.
kwargs['callback'] = self.callback
kwargs['userdata'] = self.userdata
msg = self.lut[key]
return msg(*args, **kwargs)
def execute(self, key, *args, **kwargs):
Execute the message with the given :class:`Interactions` key and (where
applicable) the arguments. This returns the already executed message
that has the attributes you'd expect.
return self.get(key, *args, **kwargs).execute()
def parse_pen_data(self, data):
Parse the given pen data. Returns a list of :class:`StrokeFile` objects.
files = []
while data:
logger.debug(f'... remaining data ({len(data)}): {list2hex(data)}')
sf = StrokeFile(data)
data = data[sf.bytesize:]
return files
class NordicData(list):
A set of bytes as expected by the Nordic controller on the device.
First byte is the opcode, second byte is the data length, rest is data.
This is an abstraction of a list. Instantiate with the full raw data,
the list contents will just be the data bytes:
>>> data = NordicData([0xab, 4, 0x1, 0x2, 0x3, 0x4])
>>> data
[1, 2, 3, 4]
>>> data.opcode
>>> data.length
>>> len(data)
.. attribute:: opcode
The opcode for this message
.. attribute:: length
The data length of this message. This field is guaranteed to be
equivalent to len(data) or an exception is raised.
def __init__(self, bs):
data = bs[2:]
self.opcode = bs[0]
self.length = bs[1]
if self.length != len(data):
raise UnexpectedDataError(bs, f'Invalid data: length field {self.length}, data length is {len(data)}')
def __repr__(self):
return f'{self.opcode:02x} / {self.length:02x} / {as_hex_string(self)}'
class ProtocolError(Exception):
Base class for all Tuhi-protocol related errors.
errno = errno.ENOSYS
def __init__(self, message=None):
self.message = message
class MissingReplyError(ProtocolError):
Thrown when we expected a reply but never got one. Usually caused by a
errno = errno.ETIME
def __init__(self, request, message=None):
self.request = request
def __repr__(self):
return f'Missing reply for request {self.request}. {self.message}'
class AuthorizationError(ProtocolError):
The device does not recognize our UUID.
errno = errno.EACCES
class UnexpectedReply(ProtocolError):
Exception thrown when the reply from the device does not match the
opcodes we expected.
This is not an error coming from the device, this is an
implementation bug.
.. attribute:: msg
The Message that caused the unexpected reply.
errno = errno.EPROTO
def __init__(self, msg, message=None):
self.msg = msg
def __repr__(self):
return f'{self.__class__}: {self.msg}: {self.message}'
class UnexpectedDataError(ProtocolError):
Exception thrown when the data is invalid. This is either a bug in our
parsing or a genuine issue with the tablet, but more likely the former.
This is not an error coming from the device, this is an
implementation bug.
.. attribute:: bytes
The raw bytes that caused the unexpected data.
errno = errno.EPROTO
def __init__(self, bytes, *args, **kwargs):
super().__init__(*args, **kwargs)
self.bytes = bytes
def __repr__(self):
return f'{self.__class__}: {self.bytes} - {self.message}'
class DeviceError(ProtocolError):
The device replied with an error. Check the error code for which error
exactly happened.
.. attribute:: errorcode
An error code indicating which error occured on the device.
errno = errno.EPROTO
class ErrorCode(enum.IntEnum):
List of protocol errors as used by the Device.
The error code ``SUCCESS`` is provided for convenience only, it is
filtered by the implementation and not used in an actual exception.
def __init__(self, errorcode, *args, **kwargs):
super().__init__(*args, **kwargs)
self.errorcode = DeviceError.ErrorCode(errorcode)
# All the other errors are something the user can't do
# anything about.
if self.errorcode == DeviceError.ErrorCode.INVALID_STATE:
self.errno = errno.EBADE
def __repr__(self):
return f'DeviceError.{self.errorcode.name}'
def __str__(self):
return repr(self)
class Msg(object):
A single logical interaction (request + reply) with the Wacom device.
In some cases a :class:`Msg` may issue multiple requests and replies as
one logical unit, but that should be considered an implementation
:param callback: The callback to invoke to talk to the device. This
function must take one :class:`NordicData` and an
optional userdata argument and return one
:class:`NordicData` with the reply.
:param userdata: The data passed to the callback.
.. attribute:: request
The :class:`NordicData` sent to the device
.. attribute:: reply
The :class:`NordicData` returned to the device
.. attribute:: errorcode
The :class:`DeviceError.ErrorCodeNordicData` from this message.
opcode = None
''' The message-specific opcode. Must be defined in the subclass '''
protocol = ProtocolVersion.ANY
'''Minimum supported protocol version'''
interaction = None
'''The dictionary name for this interaction (e.g. ``GET_TIME``). Must be
defined in the subclass'''
requires_reply = True
'''True if this message requires the caller to wait for a reply from the
requires_request = True
'''True if this message sends something to the device.'''
OPCODE_NOOP = 'noop'
'''A custom opcode for a noop function. Used where functionality was
removed in later versions but to keep the caller stack simpler, we
just provide a noop Msg.'''
def __init__(self, callback, userdata=None, timeout=None):
assert self.opcode is not None
assert self.protocol is not None
assert self.interaction is not None
self._callback = callback
self.errorcode = None
self.userdata = userdata
self.timeout = timeout
self.args = [0x00] # Empty messages don't exist
def args(self):
The arguments sent to the device as list of integers. Default is
[0x00], i.e. a message of length 1 with a constant 0 as argument.
return self._args
def args(self, args):
self._args = args
def _handle_reply(self, reply):
'''Override this in the subclass to handle the reply.
This is the default reply handler that deals with the 0xb3 ACK/Error
messages and throws the respective exceptions.
:param reply: A :class:`NordicData` object
if reply.opcode != 0xb3:
raise UnexpectedReply(self)
if reply[0] != 0x00:
raise DeviceError(reply[0])
def execute(self):
The function to trigger the actual communication. This function
succeeds or raises a Wacom*Exception on error.
if self.opcode == Msg.OPCODE_NOOP:
return self # allow chaining
self.request = NordicData([self.opcode, len(self.args or []), *(self.args or [])])
self.reply = self._callback(request=self.request if self.requires_request else None,
timeout=self.timeout or None,
if self.requires_reply:
if self.reply is None:
raise MissingReplyError(self.request)
# no exception? we can assume success
self.errorcode = DeviceError.ErrorCode.SUCCESS
except DeviceError as e:
self.errorcode = e.errorcode
raise e
return self # allow chaining
def __repr__(self):
return f'{self.__class__}: {self.interaction} - {self.request}{self.reply}'
class MsgConnectIntuosPro(Msg):
interaction = Interactions.CONNECT
opcode = 0xe6
protocol = ProtocolVersion.INTUOS_PRO
def __init__(self, uuid, *args, **kwargs):
super().__init__(*args, **kwargs)
self.uuid = uuid
self.args = [int(i) for i in binascii.unhexlify(uuid)]
if len(self.args) != 6:
raise ValueError('UUID must be 6 bytes long')
def _handle_reply(self, reply):
if reply.opcode == 0x50:
# maybe check reply.data == the uuid we sent
pass # success
elif reply.opcode == 0x51:
# first 6 bytes are the uuuid we just sent
reason = reply[6]
if reason in [0x00, 0x03]: # invalid state
raise DeviceError(DeviceError.ErrorCode.INVALID_STATE)
elif reason in [0x01, 0x02]: # incorrect uuuid
raise AuthorizationError()
raise UnexpectedReply(reply, message=f'Unknown error code: {reason}')
raise UnexpectedReply(reply)
class MsgConnectSpark(Msg):
interaction = Interactions.CONNECT
opcode = 0xe6
def __init__(self, uuid, *args, **kwargs):
super().__init__(*args, **kwargs)
self.uuid = uuid
self.args = [int(i) for i in binascii.unhexlify(uuid)]
if len(self.args) != 6:
raise ValueError('UUID must be 6 bytes long')
def _handle_reply(self, reply):
except DeviceError as e:
if e.errorcode == DeviceError.ErrorCode.GENERAL_ERROR:
raise AuthorizationError()
raise e
class MsgConnectSlate(Msg):
interaction = Interactions.CONNECT
opcode = 0xe6
protocol = ProtocolVersion.SLATE
def __init__(self, uuid, *args, **kwargs):
super().__init__(*args, **kwargs)
self.uuid = uuid
self.args = [int(i) for i in binascii.unhexlify(uuid)]
if len(self.args) != 6:
raise ValueError('UUID must be 6 bytes long')
def _handle_reply(self, reply):
except DeviceError as e:
# Same as spark but we get 0x7 as error code
if e.errorcode == DeviceError.ErrorCode.AUTHORIZATION_ERROR:
raise AuthorizationError()
raise e
class MsgGetName(Msg):
.. attribute:: name
The device name as reported by the device
interaction = Interactions.GET_NAME
opcode = 0xbb
protocol = ProtocolVersion.ANY
def _handle_reply(self, reply):
if reply.opcode != 0xbc:
raise UnexpectedReply(f'Unknown reply: {reply.opcode}')
self.name = bytes(reply).decode('utf-8')
class MsgGetNameIntuosPro(Msg):
.. attribute:: name
The device name as reported by the device
interaction = Interactions.GET_NAME
opcode = 0xdb
protocol = ProtocolVersion.INTUOS_PRO
def _handle_reply(self, reply):
if reply.opcode != 0xbc:
raise UnexpectedReply(self)
self.name = bytes(reply).decode('utf-8')
class MsgSetName(Msg):
:param name: The device name to set on the device
.. attribute:: name
The device name as set with this request
interaction = Interactions.SET_NAME
opcode = 0xbb
protocol = ProtocolVersion.ANY
def __init__(self, name, *args, **kwargs):
super().__init__(*args, **kwargs)
# On the Spark, the name needs a trailing linebreak, otherwise the
# firmware gets confused.
self.args = [ord(c) for c in name] + [0x0a]
# uses the default 0xb3 handler
class MsgSetNameIntuosPro(Msg):
:param name: The device name to set on the device
.. attribute:: name
The device name as set with this request
interaction = Interactions.SET_NAME
opcode = 0xdb
protocol = ProtocolVersion.INTUOS_PRO
def __init__(self, name, *args, **kwargs):
super().__init__(*args, **kwargs)
self.args = [ord(c) for c in name]
# uses the default 0xb3 handler
class MsgGetTime(Msg):
.. attribute:: timestamp
The time in seconds since UNIX epoch
interaction = Interactions.GET_TIME
opcode = 0xb6
protocol = ProtocolVersion.ANY
def _handle_reply(self, reply):
import calendar
if reply.opcode != 0xbd:
raise UnexpectedReply(self)
if reply.length != 6:
raise UnexpectedDataError(f'Invalid reply length: expected 6, have {reply.length}')
# Assumption: device is in UTC
str_timestamp = ''.join([f'{b:02x}' for b in reply])
t = time.strptime(str_timestamp, '%y%m%d%H%M%S')
self.timestamp = calendar.timegm(t)
class MsgGetTimeIntuosPro(Msg):
.. attribute:: timestamp
The time in seconds since UNIX epoch
interaction = Interactions.GET_TIME
opcode = 0xd6
protocol = ProtocolVersion.INTUOS_PRO
def _handle_reply(self, reply):
if reply.opcode != 0xbd:
raise UnexpectedReply(self)
if reply.length != 6:
raise UnexpectedDataError(f'Invalid reply length: expected 6, have {reply.length}')
self.timestamp = little_u32(reply[0:4]) # bytes[5:6] are ms
class MsgSetTime(Msg):
:param timestamp: The current time in seconds since UNIX epoch
.. attribute:: timestamp
The time in seconds since UNIX epoch
interaction = Interactions.SET_TIME
opcode = 0xb6
protocol = ProtocolVersion.ANY
def __init__(self, timestamp, *args, **kwargs):
super().__init__(*args, **kwargs)
self.timestamp = int(timestamp)
# IntuosPro and later use the same request but a different time format
current_time = time.strftime('%y%m%d%H%M%S', time.gmtime(self.timestamp))
self.args = [int(i) for i in binascii.unhexlify(current_time)]
# uses the default 0xb3 handler
class MsgSetTimeIntuosPro(Msg):
:param timestamp: The current time in seconds since UNIX epoch
.. attribute:: timestamp
The time in seconds since UNIX epoch
interaction = Interactions.SET_TIME
opcode = 0xb6
protocol = ProtocolVersion.INTUOS_PRO
def __init__(self, timestamp, *args, **kwargs):
super().__init__(*args, **kwargs)
self.timestamp = int(timestamp)
self.args = list(little_u32(self.timestamp)) + [0x00, 0x00]
# uses the default 0xb3 handler
class MsgGetFirmwareVersion(Msg):
.. attribute:: firmware
The firmware version as a string
interaction = Interactions.GET_FIRMWARE
opcode = 0xb7
protocol = ProtocolVersion.ANY
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.args = [0]
self._lo = None
self._hi = None
def _handle_reply(self, reply):
if reply.opcode != 0xb8:
raise UnexpectedReply(self)
if self.args[0] == 0:
self._hi = ''.join([hex(d)[2:] for d in reply[1:]])
elif self.args[0] == 1:
self._lo = ''.join([hex(d)[2:] for d in reply[1:]])
if self._hi is not None and self._lo is not None:
self.firmware = f'{self._hi}-{self._lo}'
def execute(self):
# We need two requests with different args to get the full
# firmware information
self.args = [0]
self.args = [1]
return self
class MsgGetFirmwareVersionIntuosPro(Msg):
.. attribute:: firmware
The firmware version as a string
interaction = Interactions.GET_FIRMWARE
opcode = 0xb7
protocol = ProtocolVersion.INTUOS_PRO
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._lo = None
self._hi = None
def _handle_reply(self, reply):
if reply.opcode != 0xb8:
raise UnexpectedReply(self)
if self.args[0] == 0:
self._hi = ''.join([chr(d) for d in reply[1:]])
elif self.args[0] == 1:
self._lo = ''.join([chr(d) for d in reply[1:]])
if self._hi is not None and self._lo is not None:
self.firmware = f'{self._hi}-{self._lo}'
def execute(self):
# We need two requests with different args to get the full
# firmware information
self.args = [0]
self.args = [1]
return self
class MsgGetBattery(Msg):
.. attribute:: battery_percent
The battery charge in percent
.. attribute:: battery_is_charging
``True`` if charging, ``False`` if discharging
interaction = Interactions.GET_BATTERY
opcode = 0xb9
protocol = ProtocolVersion.ANY
def _handle_reply(self, reply):
if reply.opcode != 0xba:
raise UnexpectedReply(self)
self.battery_percent = int(reply[0])
self.battery_is_charging = reply[1] == 1
class MsgGetWidthSpark(Msg):
This is a fake message. The Spark doesn't seem to have a getter for this
one, it just times out. We just hardcode the value here.
.. attribute:: width
The width of the tablet in points (see :class:`MsgGetPointSize`)
interaction = Interactions.GET_WIDTH
opcode = Msg.OPCODE_NOOP
protocol = ProtocolVersion.ANY
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.width = 21000
class MsgGetWidthSlate(Msg):
.. attribute:: width
The width of the tablet in points (see :class:`MsgGetPointSize`)
interaction = Interactions.GET_WIDTH
opcode = 0xea
protocol = ProtocolVersion.SLATE
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.args = [0x03, 0x00]
def _handle_reply(self, reply):
if reply.opcode != 0xeb:
raise UnexpectedReply(self)
if little_u16(reply[0:2]) != 0x3 or len(reply) != 6:
raise UnexpectedDataError(reply)
self.width = little_u32(reply[2:6])
class MsgGetHeightSpark(Msg):
This is a fake message. The Spark doesn't seem to have a getter for this
one, it just times out. We just hardcode the value here.
.. attribute:: height
The height of the tablet in points (see :class:`MsgGetPointSize`)
interaction = Interactions.GET_HEIGHT
opcode = Msg.OPCODE_NOOP
protocol = ProtocolVersion.ANY
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.height = 14800
class MsgGetHeightSlate(Msg):
.. attribute:: height
The height of the tablet in points (see :class:`MsgGetPointSize`)
interaction = Interactions.GET_HEIGHT
opcode = 0xea
protocol = ProtocolVersion.SLATE
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.args = little_u16(0x04)
def _handle_reply(self, reply):
if reply.opcode != 0xeb:
raise UnexpectedReply(self)
if little_u16(reply[0:2]) != 0x4 or len(reply) != 6:
raise UnexpectedDataError(reply)
self.height = little_u32(reply[2:6])
class MsgGetPointSizeSpark(Msg):
This is a fake message. The Spark and Slate doesn't seem to have a
getter for this one, it just times out. We just hardcode the value here.
.. attribute:: point_size
The point_size of the tablet in µm
interaction = Interactions.GET_POINT_SIZE
opcode = Msg.OPCODE_NOOP
protocol = ProtocolVersion.ANY
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.point_size = 10
class MsgGetPointSize(Msg):
.. attribute:: point_size
The point size in micrometers
interaction = Interactions.GET_POINT_SIZE
opcode = 0xea
protocol = ProtocolVersion.INTUOS_PRO
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.args = little_u16(0x14)
def _handle_reply(self, reply):
if reply.opcode != 0xeb:
raise UnexpectedReply(self)
if little_u16(reply[0:2]) != 0x14:
raise UnexpectedDataError(reply)
# This is strange. The return value is supposed to be the point size
# but it's off by one. The IntuosPro returns 6 but a point size of 5
# matches the physical dimensions. So let's assume there's a bug in
# the firmware or the specs are wrong or something.
self.point_size = little_u32(reply[2:6]) - 1
class MsgUnknownE3Command(Msg):
interaction = Interactions.UNKNOWN_E3
opcode = 0xe3
protocol = ProtocolVersion.ANY
# no arguments, uses the default 0xb3 handler
class MsgSetFileTransferReportingType(Msg):
Changes where the device needs to send the data to.
0x00 is on the FFEE0003 GATT.
interaction = Interactions.SET_FILE_TRANSFER_REPORTING_TYPE
opcode = 0xec
protocol = ProtocolVersion.ANY
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.args = [0x06, 0x00, 0x00, 0x00, 0x00, 0x00]
# uses the default 0xb3 handler
class MsgSetMode(Msg):
:param mode: one of :class:`Mode`
.. attribute:: mode
The :class:`Mode` of the tablet
interaction = Interactions.SET_MODE
opcode = 0xb1
protocol = ProtocolVersion.ANY
def __init__(self, mode, *args, **kwargs):
super().__init__(*args, **kwargs)
self.mode = Mode(mode)
self.args = [int(mode)]
# uses the default 0xb3 handler
class MsgGetStrokesSpark(Msg):
.. attribute:: count
The number of drawings available
.. attribute:: timestamp
The timestamp of the strokes sequence
interaction = Interactions.GET_STROKES
opcode = 0xc5
protocol = ProtocolVersion.ANY
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.count = 0
def _handle_reply(self, reply):
# This is an odd message, we have one request but one or two
# replies. The 0xc7 reply is sometimes missing, unclear though when.
if reply.opcode == 0xc7:
self.count = int.from_bytes(reply[0:4], byteorder='big')
# Re-execute the message but this time without a new request
self.requires_request = False
self.requires_request = True
elif reply.opcode == 0xcd:
import calendar
str_timestamp = ''.join([f'{d:02x}' for d in reply])
t = time.strptime(str_timestamp, '%y%m%d%H%M%S')
self.timestamp = calendar.timegm(t)
raise UnexpectedReply(reply)
class MsgGetStrokesSlate(Msg):
.. attribute:: count
The number of drawings available
.. attribute:: timestamp
The timestamp of the strokes sequence in seconds since UNIX epoch
interaction = Interactions.GET_STROKES
opcode = 0xcc
protocol = ProtocolVersion.SLATE
def _handle_reply(self, reply):
import calendar
if reply.opcode != 0xcf:
raise UnexpectedReply(reply)
self.count = little_u32(reply[0:4])
str_timestamp = ''.join([f'{d:02x}' for d in reply[4:]])
t = time.strptime(str_timestamp, '%y%m%d%H%M%S')
self.timestamp = calendar.timegm(t)
class MsgGetStrokesIntuosPro(Msg):
.. attribute:: count
The number of drawings available
.. attribute:: timestamp
The timestamp of the strokes sequence
interaction = Interactions.GET_STROKES
opcode = 0xcc
protocol = ProtocolVersion.INTUOS_PRO
# same as the slate version, but the timestamp handling differs
def _handle_reply(self, reply):
if reply.opcode != 0xcf:
raise UnexpectedReply(reply)
self.count = little_u32(reply[0:4])
seconds = little_u32(reply[4:8])
self.timestamp = seconds
class MsgAvailableFilesCount(Msg):
.. attribute:: count
The number of drawings available
interaction = Interactions.AVAILABLE_FILES_COUNT
opcode = 0xc1
protocol = ProtocolVersion.ANY
def _handle_reply(self, reply):
if reply.opcode != 0xc2:
raise UnexpectedReply(self)
self.count = int.from_bytes(reply[0:2], byteorder='big')
class MsgAvailableFilesCountSlate(Msg):
.. attribute:: count
The number of drawings available
interaction = Interactions.AVAILABLE_FILES_COUNT
opcode = 0xc1
protocol = ProtocolVersion.SLATE
def _handle_reply(self, reply):
if reply.opcode != 0xc2:
raise UnexpectedReply(self)
self.count = little_u16(reply[0:2])
class MsgDownloadOldestFile(Msg):
interaction = Interactions.DOWNLOAD_OLDEST_FILE
opcode = 0xc3
protocol = ProtocolVersion.ANY
def _handle_reply(self, reply):
if reply.opcode != 0xc8:
raise UnexpectedReply(self)
if reply[0] != 0xbe:
raise UnexpectedDataError(reply)
class MsgWaitForEndRead(Msg):
.. attribute:: crc
The checksum provided for the (out of band) pen data.
interaction = Interactions.WAIT_FOR_END_READ
requires_request = False
opcode = 0x00 # unused
protocol = ProtocolVersion.ANY
def __init__(self, *args, **kwargs):
super().__init__(timeout=5, *args, **kwargs)
def _handle_reply(self, reply):
if reply.opcode == 0xc8:
if reply[0] != 0xed:
raise UnexpectedDataError(reply, 'Expected c8 ed')
pass # nothing to do here
elif reply.opcode == 0xc9:
self.crc = int(binascii.hexlify(bytes(reply)), 16)
raise UnexpectedReply(reply)
def execute(self):
# This is a double-reply , once c8, then c9
return self
class MsgWaitForEndReadSlate(Msg):
.. attribute:: crc
The checksum provided for the (out of band) pen data.
interaction = Interactions.WAIT_FOR_END_READ
requires_request = False
opcode = 0x00 # unused
protocol = ProtocolVersion.SLATE
def __init__(self, *args, **kwargs):
super().__init__(timeout=5, *args, **kwargs)
def _handle_reply(self, reply):
if reply.opcode == 0xc8:
if reply[0] != 0xed:
raise UnexpectedDataError(reply, 'Expected c8 ed')
crc = reply[1:]
self.crc = int(binascii.hexlify(bytes(crc)), 16)
raise UnexpectedReply(reply)
class MsgDeleteOldestFile(Msg):
interaction = Interactions.DELETE_OLDEST_FILE
opcode = 0xca
protocol = ProtocolVersion.ANY
requires_reply = False
class MsgDeleteOldestFileSlate(Msg):
interaction = Interactions.DELETE_OLDEST_FILE
opcode = 0xca
protocol = ProtocolVersion.SLATE
# uses the default 0xb3 handler
class MsgRegisterComplete(Msg):
interaction = Interactions.REGISTER_COMPLETE
opcode = 0xe5
protocol = ProtocolVersion.ANY
# uses the default 0xb3 handler
class MsgRegisterCompleteSlate(Msg):
'''A noop Msg. This message only exists for the Spark'''
interaction = Interactions.REGISTER_COMPLETE
opcode = Msg.OPCODE_NOOP
protocol = ProtocolVersion.SLATE
class MsgRegisterPressButtonSpark(Msg):
interaction = Interactions.REGISTER_PRESS_BUTTON
opcode = 0xe3
protocol = ProtocolVersion.ANY
# Does not require a reply, the reply is sent in response to the
# physical button press.
requires_reply = False
# uuid is unused, just there so it's compatible with the slate message
def __init__(self, uuid=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.args = [0x01]
self.uuid = uuid
class MsgRegisterPressButtonSlateOrIntuosPro(Msg):
interaction = Interactions.REGISTER_PRESS_BUTTON
opcode = 0xe7
protocol = ProtocolVersion.SLATE
# Does not require a reply, the reply is sent in response to the
# physical button press.
requires_reply = False
def __init__(self, uuid, *args, **kwargs):
super().__init__(*args, **kwargs)
self.uuid = uuid
self.args = [int(i) for i in binascii.unhexlify(uuid)]
class MsgRegisterWaitForButtonSpark(Msg):
.. attribute:: protocol_version
The protocol version used by this device, according to this message.
interaction = Interactions.REGISTER_WAIT_FOR_BUTTON
requires_request = False
opcode = 0x00 # unused
protocol = ProtocolVersion.ANY
def __init__(self, *args, **kwargs):
kwargs['timeout'] = 10
super().__init__(*args, **kwargs)
self.protocol_version = ProtocolVersion.ANY
def _handle_reply(self, reply):
if reply.opcode != 0xe4:
raise UnexpectedReply(reply)
self.protocol_version = ProtocolVersion.SPARK
class MsgRegisterWaitForButtonSlateOrIntuosPro(Msg):
.. attribute:: protocol_version
The protocol version used by this device, according to this message.
interaction = Interactions.REGISTER_WAIT_FOR_BUTTON
requires_request = False
opcode = 0x00 # unused
protocol = ProtocolVersion.SLATE
def __init__(self, *args, **kwargs):
kwargs['timeout'] = 10
super().__init__(*args, **kwargs)
self.protocol_version = ProtocolVersion.ANY
def _handle_reply(self, reply):
if reply.opcode == 0xe4:
self.protocol_version = ProtocolVersion.SLATE
elif reply.opcode == 0x53:
self.protocol_version = ProtocolVersion.INTUOS_PRO
raise UnexpectedReply(reply)
class StrokeParsingError(ProtocolError):
def __init__(self, message, data=[]):
self.message = message
self.data = data
def __repr__(self):
if self.data:
datastr = f' data: {list2hex(self.data)}'
datastr = ''
return f'{self.message}{datastr}'
def __str__(self):
return self.__repr__()
class StrokeDataType(enum.Enum):
UNKNOWN = enum.auto()
FILE_HEADER = enum.auto()
STROKE_HEADER = enum.auto()
STROKE_END = enum.auto()
POINT = enum.auto()
DELTA = enum.auto()
EOF = enum.auto()
LOST_POINT = enum.auto()
def identify(cls, data):
Returns the identified packet type for the next packet.
header = data[0]
nbytes = bin(header).count('1')
payload = data[1:1 + nbytes]
# Note: the order of the checks below matters
# Known file format headers. This is just a version number, I think.
if data[0:4] == [0x67, 0x82, 0x69, 0x65] or \
data[0:4] == [0x62, 0x38, 0x62, 0x74]:
return StrokeDataType.FILE_HEADER
# End of stroke, but can sometimes mean end of file too
if data[0:7] == [0xfc, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff]:
return StrokeDataType.STROKE_END
if payload == [0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff]:
return StrokeDataType.EOF
# all special headers have the lowest two bits set
if header & 0x3 == 0:
return StrokeDataType.DELTA
if not payload:
return StrokeDataType.UNKNOWN
if payload[0] == 0xfa or payload[0:3] == [0xff, 0xee, 0xee]:
return StrokeDataType.STROKE_HEADER
if payload[0:2] == [0xff, 0xff]:
return StrokeDataType.POINT
if payload[0:2] == [0xdd, 0xdd]:
return StrokeDataType.LOST_POINT
return StrokeDataType.UNKNOWN
class StrokeFile(object):
Represents a single file as coming from the device. Note that pen data
received from the device may include more than one file, this object is
merely the first represented in this file.
.. attribute:: bytesize
The length in bytes of the data consumed.
.. attribute:: timestamp
Creation time of the drawing (when the button was pressed) or None where
this is not supported by the device.
.. attribute:: strokes
A list of strokes, each a list of Point(x, y, p) namedtuples.
Coordinates for the points are in absolute device units.
def __init__(self, data):
self.data = data
self.file_header = StrokeFileHeader(data[:16])
self.bytesize = self.file_header.size
offset = self.file_header.size
self.timestamp = self.file_header.timestamp
self.bytesize += self._parse_data(data[offset:])
def _parse_data(self, data):
# the data formats we return
Stroke = namedtuple('Stroke', ['points'])
Point = namedtuple('Point', ['x', 'y', 'p'])
# The Spark can have a delta on the first point in a file. Let's
# default to 0, 0, 0 because I don't know what else could be
# sensible here.
last_point = Point(0, 0, 0) # abs coords for most recent point
last_delta = Point(0, 0, 0) # delta accumulates
strokes = [] # all strokes
points = [] # Points of current strokes
consumed = 0
# Note about the below: this was largely reverse-engineered because
# the specs we have access to are either ambiguous or outright wrong.
# First byte is a bitmask that seems to indicate how many bytes.
# Where the header byte has the lowest two bits set, it can be
# one of several packages:
# - a StrokeHeader [0xfa] to indicate a new stroke
# - end of stroke - all payload bytes are 0xff
# - lost point [0xdd, 0xdd] - firmware couldn't record a point
# - a StrokePoint [0xff, 0xff] a fully specified point. Always
# the first after a StrokeHeader but may also appear elsewhere.
# Where the header byte has the lowest two bits on zero, it is
# a StrokeDelta, a variable sized payload following the header,
# values depend on the bits set in the header.
# In theory all the header packages should have a header of 0xff,
# but they don't. End of stroke may have 0xfc, a StrokePoint
# sometimes has 0xbf. It is unknown why.
# The StrokePoint is strange since if can sometimes contain deltas
# (bitmask 0xbf). So it's just a delta with an extra two bytes for
# headers, so what is the point of it? Presumably a firmware bug or
# something.
while data:
packet_type = StrokeDataType.identify(data)
logger.debug(f'Next data packet {packet_type.name}: {list2hex(data[:16])}')
packet = None
if packet_type == StrokeDataType.UNKNOWN:
packet = StrokePacketUnknown(data)
elif packet_type == StrokeDataType.FILE_HEADER:
# This code shouldn't be triggered, we handle the file
# header outside this function.
packet = StrokeFileHeader(data)
logger.error(f'Unexpected file header at byte {consumed}: {packet}')
elif packet_type == StrokeDataType.STROKE_END:
packet = StrokeEndOfStroke(data)
if points:
points = []
elif packet_type == StrokeDataType.EOF:
# EOF means pack
packet = StrokeEOF(data)
if points:
points = []
data = data[packet.size:]
consumed += packet.size
elif packet_type == StrokeDataType.STROKE_HEADER:
# New stroke means resetting delta and storing the last
# stroke
packet = StrokeHeader(data)
last_delta = Point(0, 0, 0)
if points:
points = []
elif packet_type == StrokeDataType.LOST_POINT:
# We don't yet handle lost points
packet = StrokeLostPoint(data)
elif (packet_type == StrokeDataType.POINT or
packet_type == StrokeDataType.DELTA):
# POINT and DELTA *should* be two different packages but
# sometimes a POINT includes a delta for a coordinate. So
# all a POINT is is a delta with an added [0xff 0xff] after
# the header byte. The StrokePoint packet hides this so we
# can process both the same way.
if packet_type == StrokeDataType.POINT:
packet = StrokePoint(data)
packet = StrokeDelta(data)
# Compression algorithm in the device basically keeps a
# cumulative delta so that
# P0 = absolute x, y, z
# P1 = P0 + d1
# P2 = P0 + 2*d1 + d2
# P3 = P0 + 3*d1 + 2*d2 + d3
# And we use that here by just keeping the last delta
# around, adding to it where necessary and then adding it to
# the last point we have.
# Whenever we get an absolute coordinate, the delta resets
# to 0. Since this is per axis, our fictional P4 may be:
# P4(x) = P0 + 4*d1 + 3*d2 + 2*d3 + d4
# P4(y) = P0 + 4*d1 + 2*d3 ... d2 and d4 are missing (zero)
# P4(p) = P4(p) .... absolute
dx, dy, dp = last_delta
x, y, p = last_point
if packet.dx is not None:
dx += packet.dx
elif packet.x is not None:
x = packet.x
dx = 0
if packet.dy is not None:
dy += packet.dy
elif packet.y is not None:
y = packet.y
dy = 0
if packet.dp is not None:
dp += packet.dp
elif packet.p is not None:
p = packet.p
dp = 0
# dx,dy,dp ... are cumulative deltas for this packet
# x,y,p ... most recent known abs coordinates
# add those two together and we have the real coordinates
# and the baseline for the next point
last_delta = Point(dx, dy, dp)
current_point = Point(x, y, p)
last_point = Point(current_point.x + last_delta.x,
current_point.y + last_delta.y,
current_point.p + last_delta.p)
logger.debug(f'Calculated point: {last_point}')
# should never get here
raise StrokeParsingError(f'Failed to parse', data[:16])
logger.debug(f'Offset {consumed}: {packet}')
consumed += packet.size
data = data[packet.size:]
self.strokes = strokes
return consumed
class StrokePacket(object):
.. attribute: size
Size of the packet in bytes
def __init__(self):
self.size = 0
class StrokePacketUnknown(StrokePacket):
def __init__(self, data):
header = data[0]
nbytes = bin(header).count('1')
self.size = 1 + nbytes
self.data = data[:self.size]
def __repr__(self):
return f'Unknown packet: {list2hex(self.data)}'
class StrokeFileHeader(StrokePacket):
Each data packet has a file header consisting of 4 bytes file version
number and optionally extra data.
.. attribute: timestamp
The timestamp of this drawing or ``None`` where not available.
.. attribute: nstrokes
The count of strokes within this drawing or ``None`` where not
available. This count is inaccurate anyway, so it should only be
used for basic internal checks.
def __init__(self, data):
key = little_u32(data[:4])
file_formats = {
little_u32([0x67, 0x82, 0x69, 0x65]): self._parse_intuos_pro,
little_u32([0x62, 0x38, 0x62, 0x74]): self._parse_spark,
self.timestamp = None
self.nstrokes = None
func = file_formats[key]
except KeyError:
raise StrokeParsingError(f'Unknown file format:', data[:4])
def __repr__(self):
t = time.strftime("%y%m%d%H%M%S", time.gmtime(self.timestamp))
return f'FileHeader: time: {t}, stroke count: {self.nstrokes}'
def _parse_intuos_pro(self, data):
self.timestamp = int.from_bytes(data[4:8], byteorder='little')
# plus two bytes for ms, always zero
self.nstrokes = int.from_bytes(data[10:14], byteorder='little')
# plus two bytes always zero
self.size = 16
def _parse_spark(self, data):
self.size = 4
class StrokeHeader(StrokePacket):
.. attribute:: pen_id
The pen serial number or 0 if none is set
.. attribute:: pen_type
The pen type
.. attribute:: timestamp
The timestamp of this stroke or None if none was recorded
.. attribute:: time_offset
The time offset in ms since powerup or None if this stroke has an
absolute timestamp.
.. attribute:: is_new_layer
True if this stroke is on a new layer
def __init__(self, data):
header = data[0]
payload = data[1:]
self.size = bin(header).count('1') + 1
if payload[0] == 0xfa:
self._parse_intuos_pro(data, header, payload)
elif payload[0:3] == [0xff, 0xee, 0xee]:
self._parse_slate(data, header, payload)
raise StrokeParsingError(f'Invalid StrokeHeader, expected ff fa or ff ee.', data[:8])
def _parse_slate(self, data, header, payload):
self.pen_id = 0
self.pen_type = 0
self.is_new_layer = False
self.timestamp = None
self.time_offset = little_u16(payload[4:6]) * 5 # in 5ms resolution
# On the first stroke after the file header, this packet is 6 bytes
# only. Other strokes have 8 bytes but the last two bytes are always
# zero.
def _parse_intuos_pro(self, data, header, payload):
flags = payload[1]
needs_pen_id = flags & 0x80
self.pen_type = flags & 0x3f
self.is_new_layer = (flags & 0x40) != 0
self.pen_id = 0
self.timestamp = int.from_bytes(payload[2:6], byteorder='little')
self.time_offset = None
# FIXME: plus two bytes for milis
self.size = bin(header).count('1') + 1
# if the pen id flag is set, the pen ID comes in the next 8-byte
# packet (plus 0xff header)
if needs_pen_id:
pen_packet = data[self.size + 1:]
if not pen_packet:
raise StrokeParsingError('Missing pen ID packet')
header = data[0]
if header != 0xff:
raise StrokeParsingError(f'Unexpected pen id packet header: {header}.', data[:9])
nbytes = bin(header).count('1')
self.pen_id = little_u64(pen_packet[:8])
self.size += 1 + nbytes
def __repr__(self):
if self.timestamp is not None:
t = time.strftime(f'%y%m%d%H%M%S', time.gmtime(self.timestamp))
t = time.strftime(f'boot+{self.time_offset/1000}s')
return f'StrokeHeader: time: {t} new layer: {self.is_new_layer}, pen type: {self.pen_type}, pen id: {self.pen_id:#x}'
class StrokeDelta(object):
.. attribute:: x
The absolute x coordinate or None if this is packet contains a delta
.. attribute:: y
The absolute y coordinate or None if this is packet contains a delta
.. attribute:: p
The absolute pressure coordinate or None if this is packet contains a delta
.. attribute:: dx
The x delta or None if this is packet contains an absolute
.. attribute:: dy
The y delta or None if this is packet contains an absolute
.. attribute:: dp
The pressure delta or None if this is packet contains an absolute
def __init__(self, data):
def extract(mask, databytes):
value = None
delta = None
size = 0
if mask == 0:
# No data for this coordinate
elif mask == 1:
# Supposedly not implemented by any device.
# If this would exist, it would throw off the byte count
# anyway, so this cannot ever exist without breaking
# everything.
raise NotImplementedError('This device is not supposed to be exist')
elif mask == 2:
# 8 bit delta
delta = int.from_bytes(bytes([databytes[0]]), byteorder='little', signed=True)
if delta == 0:
raise StrokeParsingError(f'StrokeDelta: invalid delta of zero', data)
assert delta != 0
size = 1
elif mask == 3:
# full abs coordinate
value = little_u16(databytes[:2])
size = 2
return value, delta, size
if (data[0] & 0b11) != 0:
raise NotImplementedError(f'LSB two bits set in mask - this is not supposed to happen')
xmask = (data[0] & 0b00001100) >> 2
ymask = (data[0] & 0b00110000) >> 4
pmask = (data[0] & 0b11000000) >> 6
offset = 1
x, dx, size = extract(xmask, data[offset:])
offset += size
y, dy, size = extract(ymask, data[offset:])
offset += size
p, dp, size = extract(pmask, data[offset:])
offset += size
# Note: any of these will be None depending on the packet
self.dx = dx
self.dy = dy
self.dp = dp
self.x = x
self.y = y
self.p = p
self.size = offset
def __repr__(self):
def printstring(delta, abs):
return f'{delta:+5d}' if delta is not None \
else f'{abs:5d}' if abs is not None \
else ' ' # noqa
strx = printstring(self.dx, self.x)
stry = printstring(self.dy, self.y)
strp = printstring(self.dp, self.p)
return f'StrokeDelta: {strx}/{stry} pressure: {strp}'
class StrokePoint(StrokeDelta):
A full point identified by three coordinates (x, y, pressure) in
absolute coordinates.
def __init__(self, data):
header = data[0]
payload = data[1:]
if payload[:2] != [0xff, 0xff]:
raise StrokeParsingError(f'Invalid StrokePoint, expected ff ff ff', data[:9])
# This is a wrapper around StrokeDelta which does the mask parsing.
# In theory the StrokePoint would be a separate packet but it
# occasionally uses a header other than 0xff. Which means the packet
# is completely useless and shouldn't exist because now it's just a
# StrokeDelta in the form of [header, 0xff, 0xff, payload] and the
# 0xff just keep the room warm.
# StrokeDelta assumes the bottom two bits are unset
header &= ~0x3
super().__init__([header] + payload[2:])
self.size += 2
# self.x = little_u16(data[2:4])
# self.y = little_u16(data[4:6])
# self.pressure = little_u16(data[6:8])
def __repr__(self):
return f'StrokePoint: {self.x}/{self.y} pressure: {self.p}'
class StrokeEOF(StrokePacket):
def __init__(self, data):
header = data[0]
payload = data[1:]
nbytes = bin(header).count('1')
if payload[:nbytes] != [0xff] * nbytes:
raise StrokeParsingError(f'Invalid EOF, expected 0xff only', data[:9])
self.size = nbytes + 1
class StrokeEndOfStroke(StrokePacket):
def __init__(self, data):
header = data[0]
payload = data[1:]
nbytes = bin(header).count('1')
if payload[:nbytes] != [0xff] * nbytes:
raise StrokeParsingError(f'Invalid EndOfStroke, expected 0xff only', data[:9])
self.size = nbytes + 1
self.data = data[:self.size]
def __repr__(self):
return f'EndOfStroke: {list2hex(self.data)}'
class StrokeLostPoint(StrokePacket):
Marker for lost points that the firmware couldn't record coordinates
.. attribute:: nlost
The number of points not recorded.
def __init__(self, data):
header = data[0]
payload = data[1:]
if payload[:2] != [0xdd, 0xdd]:
raise StrokeParsingError(f'Invalid StrokeLostPoint, expected ff dd dd', data[:9])
self.nlost = little_u16(payload[2:4])
self.size = bin(header).count('1') + 1