wacom: add a WacomPacket class
Instad of carrying around a lot of parameters, move this into a class with properties. Eventually we can subclass that one for the special opcodes (or something...)
This commit is contained in:
parent
23e18cea67
commit
43064b58fc
121
tuhi/wacom.py
121
tuhi/wacom.py
|
@ -148,6 +148,78 @@ class WacomCorruptDataException(WacomException):
|
|||
errno = errno.EPROTO
|
||||
|
||||
|
||||
class WacomPacket(GObject.Object):
|
||||
'''
|
||||
A single protocol packet of variable length. The protocol format is a
|
||||
single-byte bitmask followed by up to 8 bytes (depending on the number
|
||||
of 1-bits in the bitmask). Each byte represents the matching bit in the
|
||||
bitmask, i.e. the data is non-sparse.
|
||||
|
||||
If the bitmask has 0x1 and/or 0x2 set, those two bytes make up the
|
||||
opcode of the command. So the possible layouts are:
|
||||
|
||||
| bitmask | opcode1 | opcode2 | payload ...
|
||||
| bitmask | opcode1 | payload ...
|
||||
| bitmask | opcode2 | payload ...
|
||||
| bitmask | payload
|
||||
|
||||
On most normal packets containing motion data, the opcode is not
|
||||
present.
|
||||
|
||||
Attributes:
|
||||
bitmask .. single byte with a bitmask denoting the contents
|
||||
opcode ... the 16-bit opcode or None for 'special' packets. Note that
|
||||
the opcode is converted into an integer from the
|
||||
little-endian protocol format
|
||||
bytes .... a list of the payload bytes as sent by the device. This is
|
||||
a non-sparse list matching the number of set bits in the
|
||||
bitmask. it does not include the bitmask.
|
||||
args ..... a sparse list of the payload bytes, expanded to match the
|
||||
bitmask so that args[x] is the value for each bit x in
|
||||
bitmask. it does not include the bitmask.
|
||||
length ... length of the packet in bytes, including bitmask
|
||||
'''
|
||||
def __init__(self, data):
|
||||
self.bitmask = data[0]
|
||||
nbytes = bin(self.bitmask).count('1')
|
||||
self.bytes = data[1:1 + nbytes]
|
||||
self.length = nbytes + 1 # for the bitmask
|
||||
|
||||
idx = 0
|
||||
# 2-byte opcode, but only if the bitmask is set for either byte
|
||||
opcode = 0
|
||||
if self.bitmask & 0x1:
|
||||
opcode |= self.bytes[idx]
|
||||
idx += 1
|
||||
if self.bitmask & 0x2:
|
||||
opcode |= self.bytes[idx] << 8
|
||||
idx += 1
|
||||
|
||||
self.opcode = opcode if opcode else None
|
||||
|
||||
self.args = []
|
||||
vals = self.bytes.copy()
|
||||
mask = self.bitmask
|
||||
while mask != 0:
|
||||
self.args.append(vals.pop(0) if mask & 0x1 else 0x00)
|
||||
mask >>= 1
|
||||
|
||||
def __repr__(self):
|
||||
debug_data = []
|
||||
debug_data.append(f'{self.bitmask:02x} ({self.bitmask:08b}) |')
|
||||
if self.opcode:
|
||||
debug_data.append(f'{self.opcode:04x} |')
|
||||
else:
|
||||
debug_data.append(f' |')
|
||||
|
||||
for i in range(2, 8): # start at 2 to skip the opcode
|
||||
if self.bitmask & (1 << i):
|
||||
debug_data.append(f'{self.args[i]:02x}')
|
||||
else:
|
||||
debug_data.append(' ')
|
||||
return " ".join(debug_data)
|
||||
|
||||
|
||||
class WacomProtocolLowLevelComm(GObject.Object):
|
||||
'''
|
||||
Internal class to handle the communication with the Wacom device.
|
||||
|
@ -544,37 +616,6 @@ class WacomProtocolBase(WacomProtocolLowLevelComm):
|
|||
self.send_nordic_command_sync(command=0xca,
|
||||
expected_opcode=None)
|
||||
|
||||
def next_pen_data(self, data, offset):
|
||||
debug_data = []
|
||||
bitmask = data[offset]
|
||||
opcode = 0
|
||||
offset += 1
|
||||
debug_data.append(f'{bitmask:02x} ({bitmask:08b}) |')
|
||||
args_length = bin(bitmask).count('1')
|
||||
args = data[offset:offset + args_length]
|
||||
formatted_args = []
|
||||
n = 0
|
||||
for i in range(2):
|
||||
if (1 << i) & bitmask:
|
||||
debug_data.append(f'{args[n]:02x}')
|
||||
opcode |= args[n] << (i * 8)
|
||||
formatted_args.append(args[n])
|
||||
n += 1
|
||||
else:
|
||||
formatted_args.append(0)
|
||||
debug_data.append(' ')
|
||||
debug_data.append(f'|')
|
||||
for i in range(2, 8):
|
||||
if (1 << i) & bitmask:
|
||||
debug_data.append(f'{args[n]:02x}')
|
||||
formatted_args.append(args[n])
|
||||
n += 1
|
||||
else:
|
||||
formatted_args.append(0)
|
||||
debug_data.append(' ')
|
||||
logger.debug(f'{" ".join(debug_data)}')
|
||||
return bitmask, opcode, args, formatted_args, offset + args_length
|
||||
|
||||
def get_coordinate(self, bitmask, n, data, v, dv):
|
||||
# drop the first 2 bytes as they are not valuable here
|
||||
bitmask >>= 2
|
||||
|
@ -626,15 +667,17 @@ class WacomProtocolBase(WacomProtocolLowLevelComm):
|
|||
have_abs = 0x00 # bitmask 3-bits: pyx
|
||||
|
||||
while offset < len(data):
|
||||
bitmask, opcode, raw_args, args, offset = self.next_pen_data(data, offset)
|
||||
packet = WacomPacket(data[offset:])
|
||||
logger.debug(f'packet: {packet}')
|
||||
offset += packet.length
|
||||
|
||||
if self.parse_next_stroke_prefix(opcode, raw_args):
|
||||
if self.parse_next_stroke_prefix(packet.opcode, packet.bytes):
|
||||
stroke = drawing.new_stroke()
|
||||
continue
|
||||
if bytes(args) == b'\xff\xff\xff\xff\xff\xff\xff\xff':
|
||||
if bytes(packet.args) == b'\xff\xff\xff\xff\xff\xff\xff\xff':
|
||||
logger.info(f'end of sequence')
|
||||
continue
|
||||
if bytes(args) == b'\x00\x00\xff\xff\xff\xff\xff\xff':
|
||||
if bytes(packet.args) == b'\x00\x00\xff\xff\xff\xff\xff\xff':
|
||||
logger.info(f'end of stroke')
|
||||
stroke.seal()
|
||||
continue
|
||||
|
@ -643,9 +686,9 @@ class WacomProtocolBase(WacomProtocolLowLevelComm):
|
|||
if stroke is None:
|
||||
stroke = drawing.new_stroke()
|
||||
|
||||
x, dx, xrel = self.get_coordinate(bitmask, 0, args, x, dx)
|
||||
y, dy, yrel = self.get_coordinate(bitmask, 1, args, y, dy)
|
||||
p, dp, prel = self.get_coordinate(bitmask, 2, args, p, dp)
|
||||
x, dx, xrel = self.get_coordinate(packet.bitmask, 0, packet.args, x, dx)
|
||||
y, dy, yrel = self.get_coordinate(packet.bitmask, 1, packet.args, y, dy)
|
||||
p, dp, prel = self.get_coordinate(packet.bitmask, 2, packet.args, p, dp)
|
||||
|
||||
x += dx
|
||||
y += dy
|
||||
|
@ -656,7 +699,7 @@ class WacomProtocolBase(WacomProtocolLowLevelComm):
|
|||
pr = '*' if prel else ''
|
||||
logger.info(f'point at {x},{y} ({dx:+}{xr}, {dy:+}{yr}) with pressure {p} ({dp:+}{pr})')
|
||||
|
||||
if bitmask & 0b00111100 == 0:
|
||||
if packet.bitmask & 0b00111100 == 0:
|
||||
continue
|
||||
|
||||
if not xrel:
|
||||
|
|
Loading…
Reference in New Issue