wacom: move protocol instantiation into a class
During the first attempt to register, we might not know before hand the protocol of the device. Have a special class for it, and then instantiate the correct protocol before continuing. This makes the 'Protocol' entry in the config file mandatory
This commit is contained in:
parent
59db1a28ae
commit
c2bba6bd43
107
tuhi/wacom.py
107
tuhi/wacom.py
|
@ -184,6 +184,52 @@ class WacomProtocolLowLevelComm(GObject.Object):
|
|||
return args
|
||||
|
||||
|
||||
class WacomRegisterHelper(WacomProtocolLowLevelComm):
|
||||
'''
|
||||
Class used to register a device. This class is only useful for
|
||||
the very first register commands and attempts to detect the type of
|
||||
device based on the responses.
|
||||
|
||||
Once register_device has finished, the correct protocol is returned.
|
||||
This may later be used for init_protocol() to instantiate the
|
||||
right class.
|
||||
'''
|
||||
__gsignals__ = {
|
||||
'button-press-required':
|
||||
(GObject.SignalFlags.RUN_FIRST, None, ()),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def is_spark(cls, device):
|
||||
return MYSTERIOUS_NOTIFICATION_CHRC_UUID not in device.characteristics
|
||||
|
||||
def register_connection(self, uuid):
|
||||
args = [int(i) for i in binascii.unhexlify(uuid)]
|
||||
|
||||
if self.is_spark(self.device):
|
||||
try:
|
||||
self.send_nordic_command_sync(command=0xe6,
|
||||
expected_opcode=0xb3,
|
||||
arguments=args)
|
||||
except WacomWrongModeException:
|
||||
# this is expected
|
||||
pass
|
||||
self.send_nordic_command(command=0xe3,
|
||||
arguments=[0x01])
|
||||
return Protocol.SPARK
|
||||
|
||||
self.send_nordic_command(command=0xe7, arguments=args)
|
||||
|
||||
return Protocol.SLATE
|
||||
|
||||
def register_device(self, uuid):
|
||||
protocol = self.register_connection(uuid)
|
||||
logger.info('Press the button now to confirm')
|
||||
self.emit('button-press-required')
|
||||
|
||||
return protocol
|
||||
|
||||
|
||||
class WacomProtocolBase(WacomProtocolLowLevelComm):
|
||||
'''
|
||||
Internal class to handle the basic communications with the Wacom device.
|
||||
|
@ -198,8 +244,6 @@ class WacomProtocolBase(WacomProtocolLowLevelComm):
|
|||
__gsignals__ = {
|
||||
'drawing':
|
||||
(GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_PYOBJECT,)),
|
||||
'button-press-required':
|
||||
(GObject.SignalFlags.RUN_FIRST, None, ()),
|
||||
# battery level in %, boolean for is-charging
|
||||
"battery-status":
|
||||
(GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_INT, GObject.TYPE_BOOLEAN)),
|
||||
|
@ -215,10 +259,6 @@ class WacomProtocolBase(WacomProtocolLowLevelComm):
|
|||
device.connect_gatt_value(WACOM_OFFLINE_CHRC_PEN_DATA_UUID,
|
||||
self._on_pen_data_received)
|
||||
|
||||
@classmethod
|
||||
def is_spark(cls, device):
|
||||
return MYSTERIOUS_NOTIFICATION_CHRC_UUID not in device.characteristics
|
||||
|
||||
def _on_pen_data_changed(self, name, value):
|
||||
logger.debug(binascii.hexlify(bytes(value)))
|
||||
|
||||
|
@ -258,11 +298,6 @@ class WacomProtocolBase(WacomProtocolLowLevelComm):
|
|||
expected_opcode=0xb3,
|
||||
arguments=args)
|
||||
|
||||
def register_connection(self):
|
||||
args = [int(i) for i in binascii.unhexlify(self._uuid)]
|
||||
self.send_nordic_command(command=0xe7,
|
||||
arguments=args)
|
||||
|
||||
def e3_command(self):
|
||||
self.send_nordic_command_sync(command=0xe3,
|
||||
expected_opcode=0xb3)
|
||||
|
@ -522,16 +557,7 @@ class WacomProtocolBase(WacomProtocolLowLevelComm):
|
|||
transaction_count += 1
|
||||
return transaction_count
|
||||
|
||||
def register_device(self):
|
||||
try:
|
||||
self.check_connection()
|
||||
except WacomWrongModeException:
|
||||
# this is expected
|
||||
pass
|
||||
self.send_nordic_command(command=0xe3,
|
||||
arguments=[0x01])
|
||||
logger.info('Press the button now to confirm')
|
||||
self.emit('button-press-required')
|
||||
def register_device_finish(self):
|
||||
# Wait for the button confirmation event, or any error
|
||||
data = self.wait_nordic_data([0xe4, 0xb3], 10)
|
||||
if data.opcode == 0xb3:
|
||||
|
@ -602,10 +628,8 @@ class WacomProtocolSlate(WacomProtocolSpark):
|
|||
timestamp = time.strptime(str_timestamp, '%y%m%d%H%M%S')
|
||||
return count, timestamp
|
||||
|
||||
def register_device(self):
|
||||
self.register_connection()
|
||||
logger.info('Press the button now to confirm')
|
||||
self.emit('button-press-required')
|
||||
def register_device_finish(self):
|
||||
# Wait for the button confirmation event, or any error
|
||||
self.wait_nordic_data(0xe4, 10)
|
||||
self.set_time()
|
||||
self.read_time()
|
||||
|
@ -683,34 +707,30 @@ class WacomDevice(GObject.Object):
|
|||
self.thread = None
|
||||
self._is_running = False
|
||||
self._config = None
|
||||
self._wacom_protocol = None
|
||||
|
||||
try:
|
||||
self._config = config.devices[device.address]
|
||||
except KeyError:
|
||||
# unregistered device
|
||||
self._uuid = None
|
||||
self._wacom_protocol = None
|
||||
else:
|
||||
self._uuid = self._config['uuid']
|
||||
self._init_protocol()
|
||||
|
||||
def _init_protocol(self):
|
||||
protocol = Protocol.UNKNOWN
|
||||
if self._config is not None:
|
||||
# retrieve the protocol from the config file
|
||||
protocol = Protocol.UNKNOWN
|
||||
try:
|
||||
protocol = next(p for p in Protocol if p.value == self._config['Protocol'])
|
||||
except StopIteration:
|
||||
logger.error(f'Unknown protocol in configuration: {self._config["Protocol"]}')
|
||||
raise WacomCorruptDataException(f'Unknown Protocol {self._config["Protocol"]}')
|
||||
|
||||
if protocol == Protocol.UNKNOWN:
|
||||
# we are registering a new device, or we might have an early
|
||||
# config file from an older tuhi version
|
||||
if WacomProtocolBase.is_spark(self._device):
|
||||
protocol = Protocol.SPARK
|
||||
else:
|
||||
protocol = Protocol.SLATE
|
||||
if protocol == Protocol.UNKNOWN:
|
||||
raise WacomCorruptDataException(f'Missing Protocol entry from config file. Please delete config file and re-register device')
|
||||
|
||||
self._init_protocol(protocol)
|
||||
|
||||
def _init_protocol(self, protocol):
|
||||
if protocol == Protocol.SPARK:
|
||||
self._wacom_protocol = WacomProtocolSpark(self._device, self._uuid)
|
||||
elif protocol == Protocol.SLATE:
|
||||
|
@ -724,7 +744,6 @@ class WacomDevice(GObject.Object):
|
|||
logger.debug(f'{self._device.name} is using {type(self._wacom_protocol)}')
|
||||
|
||||
self._wacom_protocol.connect('drawing', self._on_drawing_received)
|
||||
self._wacom_protocol.connect('button-press-required', self._on_button_press_required)
|
||||
self._wacom_protocol.connect('battery-status', self._on_battery_status)
|
||||
|
||||
def _on_drawing_received(self, protocol, drawing):
|
||||
|
@ -749,8 +768,16 @@ class WacomDevice(GObject.Object):
|
|||
def register_device(self):
|
||||
self._uuid = uuid.uuid4().hex[:12]
|
||||
logger.debug(f'{self._device.address}: registering device, assigned {self.uuid}')
|
||||
self._init_protocol()
|
||||
self._wacom_protocol.register_device()
|
||||
|
||||
wp = WacomRegisterHelper(self._device)
|
||||
s = wp.connect('button-press-required', self._on_button_press_required)
|
||||
protocol = wp.register_device(self._uuid)
|
||||
wp.disconnect(s)
|
||||
del wp
|
||||
|
||||
self._init_protocol(protocol)
|
||||
self._wacom_protocol.register_device_finish()
|
||||
|
||||
logger.info('registration completed')
|
||||
self.notify('uuid')
|
||||
|
||||
|
|
Loading…
Reference in New Issue