diff --git a/tuhi/wacom.py b/tuhi/wacom.py index cbd23e5..14e8594 100644 --- a/tuhi/wacom.py +++ b/tuhi/wacom.py @@ -14,7 +14,6 @@ import binascii import enum -import inspect import logging import threading import time @@ -25,7 +24,7 @@ from gi.repository import GObject from .drawing import Drawing from .uhid import UHIDDevice import tuhi.protocol -from tuhi.protocol import NordicData, Interactions, Mode, ProtocolVersion +from tuhi.protocol import NordicData, Interactions, Mode, ProtocolVersion, StrokeFile from .util import list2hex, flatten logger = logging.getLogger('tuhi.wacom') @@ -348,80 +347,6 @@ class WacomPacket(GObject.Object): return " ".join(debug_data) -class WacomPacketHandler(GObject.Object): - def process(self, packet, drawing): - raise NotImplementedError('This method must be implemented in the subclass') - - -class WacomPacketHandlerEndOfSequence(WacomPacketHandler): - def process(self, packet, drawing): - if bytes(packet.args) != b'\xff\xff\xff\xff\xff\xff\xff\xff': - return False - - logger.info(f'end of sequence') - return True - - -class WacomPacketHandlerEndOfStroke(WacomPacketHandler): - def process(self, packet, drawing): - if bytes(packet.args) != b'\x00\x00\xff\xff\xff\xff\xff\xff': - return False - - logger.info(f'end of stroke') - drawing.current_stroke.seal() - return True - - -class WacomPacketHandlerStrokePrefixSlate(WacomPacketHandler): - def process(self, packet, drawing): - if packet.opcode != 0xeeff: - return False - - # some sort of headers - time_offset = int.from_bytes(packet.bytes[4:], byteorder='little') - logger.info(f'time offset since boot: {time_offset * 0.005} secs') - drawing.new_stroke() - return True - - -class WacomPacketHandlerStrokePrefixIntuosPro(WacomPacketHandler): - def process(self, packet, drawing): - if packet.opcode != 0x03fa: - return False - - t = WacomProtocolIntuosPro.time_from_bytes(packet.bytes[2:]) - t = time.strftime("%y%m%d%H%M%S", t) - logger.info(f'stroke time: {t}') - drawing.new_stroke() - return True - - -class WacomPacketHandlerStrokeTimestampIntuosPro(WacomPacketHandler): - def process(self, packet, drawing): - if packet.opcode != 0xc3fa: - return False - - # First stroke timestamp, note this is less than the drawing's - # timestamp - # ff fa c3 <6-byte-timestamp> - t = WacomProtocolIntuosPro.time_from_bytes(packet.bytes[2:]) - t = time.strftime("%y%m%d%H%M%S", t) - logger.info(f'stroke time: {t}') - return True - - -class WacomPacketHandlerUnknownFixedStrokeDataIntuosPro(WacomPacketHandler): - def process(self, packet, drawing): - if packet.opcode != 0x870a: - return False - - # Unclear what this header is - expected_bytes = b'\xff\x0a\x87\x75\x80\x28\x42\x00\x10' - if bytes(packet.bytes) != expected_bytes: - logger.debug(f'Missing header, got {packet.bytes}') - return True - - class WacomProtocolLowLevelComm(GObject.Object): ''' Internal class to handle the communication with the Wacom device. @@ -589,26 +514,6 @@ class WacomProtocolBase(WacomProtocolLowLevelComm): device.connect_gatt_value(WACOM_OFFLINE_CHRC_PEN_DATA_UUID, self._on_pen_data_received) - # Instantiate all packet_handlers from our current object and its - # parent classes - self.packet_handlers = [] - parents = inspect.getmro(self.__class__) - child = None - for cls in parents: - if cls.__name__ == 'WacomProtocolBase': - break - - if (child is not None and - child.packet_handlers and - child.packet_handlers == cls.packet_handlers): - raise NotImplementedError(f'Subclass {child} must override packet_handlers') - - for handler in cls.packet_handlers: - h = handler() - self.packet_handlers.append(h) - - child = cls - @GObject.property def dimensions(self): return (self.width, self.height) @@ -820,59 +725,20 @@ class WacomProtocolBase(WacomProtocolLowLevelComm): ''' :param timestamp: seconds since UNIX epoch ''' - x, y, p = 0, 0, 0 - dx, dy, dp = 0, 0, 0 - - stroke = None - - success, offset = self.parse_pen_data_prefix(data) - if not success: - return None + f = StrokeFile(data) drawing = Drawing(self.device.name, (self.width, self.height), timestamp) ps = self.point_size - while offset < len(data): - packet = WacomPacket(data[offset:]) - logger.debug(f'packet: {packet}') - offset += packet.length - - has_handler = False - for handler in self.packet_handlers: - if handler.process(packet, drawing): - has_handler = True - break - if has_handler: - continue - - stroke = drawing.current_stroke - if stroke is None: - stroke = drawing.new_stroke() - - # data is in device units - x, dx, xrel = self.get_coordinate(packet.bitmask, 0, packet.args, x, dx) - y, dy, yrel = self.get_coordinate(packet.bitmask, 1, packet.args, y, dy) - p, dp, prel = self.get_coordinate(packet.bitmask, 2, packet.args, p, dp) - - x += dx - y += dy - p += dp - - xr = '*' if xrel else '' - yr = '*' if yrel else '' - pr = '*' if prel else '' - logger.info(f'point at {x},{y} ({dx:+}{xr}, {dy:+}{yr}) with pressure {p} ({dp:+}{pr})') - - if packet.bitmask & 0b00111100 == 0: - continue - - def normalize(p): - NORMALIZED_RANGE = 0x10000 - return NORMALIZED_RANGE * p / self.pressure - - # now convert to actual µm - stroke.new_abs((x * ps, y * ps), normalize(p)) + def normalize(p): + NORMALIZED_RANGE = 0x10000 + return NORMALIZED_RANGE * p / self.pressure + for s in f.strokes: + stroke = drawing.new_stroke() + for p in s.points: + stroke.new_abs((p.x * ps, p.y * ps), normalize(p.p)) + stroke.seal() drawing.seal() return drawing @@ -933,8 +799,6 @@ class WacomProtocolSpark(WacomProtocolBase): pressure = 1023 protocol = ProtocolVersion.SPARK - packet_handlers = [WacomPacketHandlerEndOfStroke, - WacomPacketHandlerEndOfSequence] orientation = 'portrait' @@ -961,7 +825,6 @@ class WacomProtocolSlate(WacomProtocolSpark): pressure = 2047 protocol = ProtocolVersion.SLATE - packet_handlers = [WacomPacketHandlerStrokePrefixSlate] orientation = 'portrait' @@ -1044,9 +907,6 @@ class WacomProtocolIntuosPro(WacomProtocolSlate): pressure = 8192 protocol = ProtocolVersion.INTUOS_PRO - packet_handlers = [WacomPacketHandlerStrokePrefixIntuosPro, - WacomPacketHandlerStrokeTimestampIntuosPro, - WacomPacketHandlerUnknownFixedStrokeDataIntuosPro] orientation = 'landscape'