wacom: switch to the new protocol stroke parsers

Signed-off-by: Peter Hutterer <peter.hutterer@who-t.net>
pull/180/head
Peter Hutterer 2019-08-20 12:18:45 +10:00 committed by Benjamin Tissoires
parent a24c54a570
commit fa888b742f
1 changed files with 10 additions and 150 deletions

View File

@ -14,7 +14,6 @@
import binascii
import enum
import inspect
import logging
import threading
import time
@ -25,7 +24,7 @@ from gi.repository import GObject
from .drawing import Drawing
from .uhid import UHIDDevice
import tuhi.protocol
from tuhi.protocol import NordicData, Interactions, Mode, ProtocolVersion
from tuhi.protocol import NordicData, Interactions, Mode, ProtocolVersion, StrokeFile
from .util import list2hex, flatten
logger = logging.getLogger('tuhi.wacom')
@ -348,80 +347,6 @@ class WacomPacket(GObject.Object):
return " ".join(debug_data)
class WacomPacketHandler(GObject.Object):
def process(self, packet, drawing):
raise NotImplementedError('This method must be implemented in the subclass')
class WacomPacketHandlerEndOfSequence(WacomPacketHandler):
def process(self, packet, drawing):
if bytes(packet.args) != b'\xff\xff\xff\xff\xff\xff\xff\xff':
return False
logger.info(f'end of sequence')
return True
class WacomPacketHandlerEndOfStroke(WacomPacketHandler):
def process(self, packet, drawing):
if bytes(packet.args) != b'\x00\x00\xff\xff\xff\xff\xff\xff':
return False
logger.info(f'end of stroke')
drawing.current_stroke.seal()
return True
class WacomPacketHandlerStrokePrefixSlate(WacomPacketHandler):
def process(self, packet, drawing):
if packet.opcode != 0xeeff:
return False
# some sort of headers
time_offset = int.from_bytes(packet.bytes[4:], byteorder='little')
logger.info(f'time offset since boot: {time_offset * 0.005} secs')
drawing.new_stroke()
return True
class WacomPacketHandlerStrokePrefixIntuosPro(WacomPacketHandler):
def process(self, packet, drawing):
if packet.opcode != 0x03fa:
return False
t = WacomProtocolIntuosPro.time_from_bytes(packet.bytes[2:])
t = time.strftime("%y%m%d%H%M%S", t)
logger.info(f'stroke time: {t}')
drawing.new_stroke()
return True
class WacomPacketHandlerStrokeTimestampIntuosPro(WacomPacketHandler):
def process(self, packet, drawing):
if packet.opcode != 0xc3fa:
return False
# First stroke timestamp, note this is less than the drawing's
# timestamp
# ff fa c3 <6-byte-timestamp>
t = WacomProtocolIntuosPro.time_from_bytes(packet.bytes[2:])
t = time.strftime("%y%m%d%H%M%S", t)
logger.info(f'stroke time: {t}')
return True
class WacomPacketHandlerUnknownFixedStrokeDataIntuosPro(WacomPacketHandler):
def process(self, packet, drawing):
if packet.opcode != 0x870a:
return False
# Unclear what this header is
expected_bytes = b'\xff\x0a\x87\x75\x80\x28\x42\x00\x10'
if bytes(packet.bytes) != expected_bytes:
logger.debug(f'Missing header, got {packet.bytes}')
return True
class WacomProtocolLowLevelComm(GObject.Object):
'''
Internal class to handle the communication with the Wacom device.
@ -589,26 +514,6 @@ class WacomProtocolBase(WacomProtocolLowLevelComm):
device.connect_gatt_value(WACOM_OFFLINE_CHRC_PEN_DATA_UUID,
self._on_pen_data_received)
# Instantiate all packet_handlers from our current object and its
# parent classes
self.packet_handlers = []
parents = inspect.getmro(self.__class__)
child = None
for cls in parents:
if cls.__name__ == 'WacomProtocolBase':
break
if (child is not None and
child.packet_handlers and
child.packet_handlers == cls.packet_handlers):
raise NotImplementedError(f'Subclass {child} must override packet_handlers')
for handler in cls.packet_handlers:
h = handler()
self.packet_handlers.append(h)
child = cls
@GObject.property
def dimensions(self):
return (self.width, self.height)
@ -820,59 +725,20 @@ class WacomProtocolBase(WacomProtocolLowLevelComm):
'''
:param timestamp: seconds since UNIX epoch
'''
x, y, p = 0, 0, 0
dx, dy, dp = 0, 0, 0
stroke = None
success, offset = self.parse_pen_data_prefix(data)
if not success:
return None
f = StrokeFile(data)
drawing = Drawing(self.device.name, (self.width, self.height), timestamp)
ps = self.point_size
while offset < len(data):
packet = WacomPacket(data[offset:])
logger.debug(f'packet: {packet}')
offset += packet.length
has_handler = False
for handler in self.packet_handlers:
if handler.process(packet, drawing):
has_handler = True
break
if has_handler:
continue
stroke = drawing.current_stroke
if stroke is None:
stroke = drawing.new_stroke()
# data is in device units
x, dx, xrel = self.get_coordinate(packet.bitmask, 0, packet.args, x, dx)
y, dy, yrel = self.get_coordinate(packet.bitmask, 1, packet.args, y, dy)
p, dp, prel = self.get_coordinate(packet.bitmask, 2, packet.args, p, dp)
x += dx
y += dy
p += dp
xr = '*' if xrel else ''
yr = '*' if yrel else ''
pr = '*' if prel else ''
logger.info(f'point at {x},{y} ({dx:+}{xr}, {dy:+}{yr}) with pressure {p} ({dp:+}{pr})')
if packet.bitmask & 0b00111100 == 0:
continue
def normalize(p):
NORMALIZED_RANGE = 0x10000
return NORMALIZED_RANGE * p / self.pressure
# now convert to actual µm
stroke.new_abs((x * ps, y * ps), normalize(p))
def normalize(p):
NORMALIZED_RANGE = 0x10000
return NORMALIZED_RANGE * p / self.pressure
for s in f.strokes:
stroke = drawing.new_stroke()
for p in s.points:
stroke.new_abs((p.x * ps, p.y * ps), normalize(p.p))
stroke.seal()
drawing.seal()
return drawing
@ -933,8 +799,6 @@ class WacomProtocolSpark(WacomProtocolBase):
pressure = 1023
protocol = ProtocolVersion.SPARK
packet_handlers = [WacomPacketHandlerEndOfStroke,
WacomPacketHandlerEndOfSequence]
orientation = 'portrait'
@ -961,7 +825,6 @@ class WacomProtocolSlate(WacomProtocolSpark):
pressure = 2047
protocol = ProtocolVersion.SLATE
packet_handlers = [WacomPacketHandlerStrokePrefixSlate]
orientation = 'portrait'
@ -1044,9 +907,6 @@ class WacomProtocolIntuosPro(WacomProtocolSlate):
pressure = 8192
protocol = ProtocolVersion.INTUOS_PRO
packet_handlers = [WacomPacketHandlerStrokePrefixIntuosPro,
WacomPacketHandlerStrokeTimestampIntuosPro,
WacomPacketHandlerUnknownFixedStrokeDataIntuosPro]
orientation = 'landscape'