wacom: Split out the Nordic communication protocol in its own class
So the low level bits are not in the same class than the protocols
This commit is contained in:
parent
82e7dbf515
commit
f6e7c72d9d
150
tuhi/wacom.py
150
tuhi/wacom.py
|
@ -95,83 +95,23 @@ class WacomCorruptDataException(WacomException):
|
|||
errno = errno.EPROTO
|
||||
|
||||
|
||||
class WacomProtocol(GObject.Object):
|
||||
class WacomProtocolLowLevelComm(GObject.Object):
|
||||
'''
|
||||
Internal class to handle the communication with the Wacom device.
|
||||
No-one should directly instanciate this.
|
||||
|
||||
|
||||
:param device: the BlueZDevice object that is this wacom device
|
||||
:param uuid: the UUID {to be} assigned to the device
|
||||
'''
|
||||
protocol = Protocol.UNKNOWN
|
||||
|
||||
__gsignals__ = {
|
||||
'drawing':
|
||||
(GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_PYOBJECT,)),
|
||||
'button-press-required':
|
||||
(GObject.SignalFlags.RUN_FIRST, None, ()),
|
||||
# battery level in %, boolean for is-charging
|
||||
"battery-status":
|
||||
(GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_INT, GObject.TYPE_BOOLEAN)),
|
||||
}
|
||||
|
||||
def __init__(self, device, uuid):
|
||||
def __init__(self, device):
|
||||
GObject.Object.__init__(self)
|
||||
self.device = device
|
||||
self.nordic_answer = None
|
||||
self.pen_data_buffer = []
|
||||
self.name = device.name
|
||||
self._uuid = uuid
|
||||
self.fw_logger = logging.getLogger('tuhi.fw')
|
||||
|
||||
device.connect_gatt_value(WACOM_CHRC_LIVE_PEN_DATA_UUID,
|
||||
self._on_pen_data_changed)
|
||||
device.connect_gatt_value(WACOM_OFFLINE_CHRC_PEN_DATA_UUID,
|
||||
self._on_pen_data_received)
|
||||
device.connect_gatt_value(NORDIC_UART_CHRC_RX_UUID,
|
||||
self._on_nordic_data_received)
|
||||
device.connect_gatt_value(MYSTERIOUS_NOTIFICATION_CHRC_UUID,
|
||||
self._on_mysterious_data_received)
|
||||
|
||||
@classmethod
|
||||
def is_spark(cls, device):
|
||||
return MYSTERIOUS_NOTIFICATION_CHRC_UUID not in device.characteristics
|
||||
|
||||
def _on_mysterious_data_received(self, name, value):
|
||||
self.fw_logger.debug(f'mysterious: {binascii.hexlify(bytes(value))}')
|
||||
|
||||
def _on_pen_data_changed(self, name, value):
|
||||
logger.debug(binascii.hexlify(bytes(value)))
|
||||
|
||||
if value[0] == 0x10:
|
||||
pressure = int.from_bytes(value[2:4], byteorder='little')
|
||||
buttons = int(value[10])
|
||||
logger.info(f'New Pen Data: pressure: {pressure}, button: {buttons}')
|
||||
elif value[0] == 0xa2:
|
||||
# entering proximity event
|
||||
length = value[1]
|
||||
pen_id = binascii.hexlify(bytes(value[2:]))
|
||||
logger.info(f'Pen {pen_id} entered proximity')
|
||||
elif value[0] == 0xa1:
|
||||
# data event
|
||||
length = value[1]
|
||||
if length % 6 != 0:
|
||||
logger.error(f'wrong data: {binascii.hexlify(bytes(value))}')
|
||||
return
|
||||
data = value[2:]
|
||||
while data:
|
||||
if bytes(data) == b'\xff\xff\xff\xff\xff\xff':
|
||||
logger.info(f'Pen left proximity')
|
||||
else:
|
||||
x = int.from_bytes(data[0:2], byteorder='little')
|
||||
y = int.from_bytes(data[2:4], byteorder='little')
|
||||
pressure = int.from_bytes(data[4:6], byteorder='little')
|
||||
self.logger.info(f'New Pen Data: ({x},{y}), pressure: {pressure}')
|
||||
data = data[6:]
|
||||
|
||||
def _on_pen_data_received(self, name, data):
|
||||
self.fw_logger.debug(f'RX Pen <-- {list2hex(data)}')
|
||||
self.pen_data_buffer.extend(data)
|
||||
|
||||
def _on_nordic_data_received(self, name, value):
|
||||
self.fw_logger.debug(f'RX Nordic <-- {list2hex(value)}')
|
||||
|
@ -185,7 +125,7 @@ class WacomProtocol(GObject.Object):
|
|||
|
||||
def check_nordic_incoming(self):
|
||||
if self.nordic_answer is None:
|
||||
raise WacomTimeoutException(f'{self.name}: Timeout while reading data')
|
||||
raise WacomTimeoutException(f'{self.device.name}: Timeout while reading data')
|
||||
|
||||
answer = self.nordic_answer
|
||||
self.nordic_answer = None
|
||||
|
@ -243,6 +183,80 @@ class WacomProtocol(GObject.Object):
|
|||
|
||||
return args
|
||||
|
||||
|
||||
class WacomProtocolBase(WacomProtocolLowLevelComm):
|
||||
'''
|
||||
Internal class to handle the basic communications with the Wacom device.
|
||||
No-one should directly instanciate this.
|
||||
|
||||
|
||||
:param device: the BlueZDevice object that is this wacom device
|
||||
:param uuid: the UUID {to be} assigned to the device
|
||||
'''
|
||||
protocol = Protocol.UNKNOWN
|
||||
|
||||
__gsignals__ = {
|
||||
'drawing':
|
||||
(GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_PYOBJECT,)),
|
||||
'button-press-required':
|
||||
(GObject.SignalFlags.RUN_FIRST, None, ()),
|
||||
# battery level in %, boolean for is-charging
|
||||
"battery-status":
|
||||
(GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_INT, GObject.TYPE_BOOLEAN)),
|
||||
}
|
||||
|
||||
def __init__(self, device, uuid):
|
||||
super().__init__(device)
|
||||
self._uuid = uuid
|
||||
self.pen_data_buffer = []
|
||||
|
||||
device.connect_gatt_value(WACOM_CHRC_LIVE_PEN_DATA_UUID,
|
||||
self._on_pen_data_changed)
|
||||
device.connect_gatt_value(WACOM_OFFLINE_CHRC_PEN_DATA_UUID,
|
||||
self._on_pen_data_received)
|
||||
device.connect_gatt_value(MYSTERIOUS_NOTIFICATION_CHRC_UUID,
|
||||
self._on_mysterious_data_received)
|
||||
|
||||
@classmethod
|
||||
def is_spark(cls, device):
|
||||
return MYSTERIOUS_NOTIFICATION_CHRC_UUID not in device.characteristics
|
||||
|
||||
def _on_mysterious_data_received(self, name, value):
|
||||
self.fw_logger.debug(f'mysterious: {binascii.hexlify(bytes(value))}')
|
||||
|
||||
def _on_pen_data_changed(self, name, value):
|
||||
logger.debug(binascii.hexlify(bytes(value)))
|
||||
|
||||
if value[0] == 0x10:
|
||||
pressure = int.from_bytes(value[2:4], byteorder='little')
|
||||
buttons = int(value[10])
|
||||
logger.info(f'New Pen Data: pressure: {pressure}, button: {buttons}')
|
||||
elif value[0] == 0xa2:
|
||||
# entering proximity event
|
||||
length = value[1]
|
||||
pen_id = binascii.hexlify(bytes(value[2:]))
|
||||
logger.info(f'Pen {pen_id} entered proximity')
|
||||
elif value[0] == 0xa1:
|
||||
# data event
|
||||
length = value[1]
|
||||
if length % 6 != 0:
|
||||
logger.error(f'wrong data: {binascii.hexlify(bytes(value))}')
|
||||
return
|
||||
data = value[2:]
|
||||
while data:
|
||||
if bytes(data) == b'\xff\xff\xff\xff\xff\xff':
|
||||
logger.info(f'Pen left proximity')
|
||||
else:
|
||||
x = int.from_bytes(data[0:2], byteorder='little')
|
||||
y = int.from_bytes(data[2:4], byteorder='little')
|
||||
pressure = int.from_bytes(data[4:6], byteorder='little')
|
||||
self.logger.info(f'New Pen Data: ({x},{y}), pressure: {pressure}')
|
||||
data = data[6:]
|
||||
|
||||
def _on_pen_data_received(self, name, data):
|
||||
self.fw_logger.debug(f'RX Pen <-- {list2hex(data)}')
|
||||
self.pen_data_buffer.extend(data)
|
||||
|
||||
def check_connection(self):
|
||||
args = [int(i) for i in binascii.unhexlify(self._uuid)]
|
||||
self.send_nordic_command_sync(command=0xe6,
|
||||
|
@ -429,7 +443,7 @@ class WacomProtocol(GObject.Object):
|
|||
bitmask, opcode, raw_args, args, offset = self.next_pen_data(data, offset)
|
||||
if opcode == 0x3800:
|
||||
logger.info(f'beginning of sequence')
|
||||
drawing = Drawing(self.name, (self.width, self.height), timestamp)
|
||||
drawing = Drawing(self.device.name, (self.width, self.height), timestamp)
|
||||
drawings.append(drawing)
|
||||
continue
|
||||
elif opcode == 0xeeff:
|
||||
|
@ -531,7 +545,7 @@ class WacomProtocol(GObject.Object):
|
|||
logger.info(f'firmware is {fw_high}-{fw_low}')
|
||||
|
||||
|
||||
class WacomProtocolSlate(WacomProtocol):
|
||||
class WacomProtocolSlate(WacomProtocolBase):
|
||||
'''
|
||||
Subclass to handle the communication oddities with the Wacom Slate-like
|
||||
devices.
|
||||
|
@ -543,7 +557,7 @@ class WacomProtocolSlate(WacomProtocol):
|
|||
protocol = Protocol.SLATE
|
||||
|
||||
|
||||
class WacomProtocolSpark(WacomProtocol):
|
||||
class WacomProtocolSpark(WacomProtocolBase):
|
||||
'''
|
||||
Subclass to handle the communication oddities with the Wacom Spark-like
|
||||
devices.
|
||||
|
@ -687,7 +701,7 @@ class WacomDevice(GObject.Object):
|
|||
if protocol == Protocol.UNKNOWN:
|
||||
# we are registering a new device, or we might have an early
|
||||
# config file from an older tuhi version
|
||||
if WacomProtocol.is_spark(self._device):
|
||||
if WacomProtocolBase.is_spark(self._device):
|
||||
protocol = Protocol.SPARK
|
||||
else:
|
||||
protocol = Protocol.SLATE
|
||||
|
|
Loading…
Reference in New Issue