wacom: move the fw logging into a custom class

Instead of just doing a single log line, let's log this to a yaml file so we
have a permanent record of the interactions with the device for bug reports.
This commit is contained in:
Peter Hutterer 2019-07-14 19:07:32 +10:00 committed by Benjamin Tissoires
parent bcdf730a72
commit b6494efd40
1 changed files with 155 additions and 5 deletions

View File

@ -17,6 +17,7 @@ import calendar
import enum
import inspect
import logging
import os
import threading
import time
import uuid
@ -123,7 +124,156 @@ def list2hex(l):
return ' '.join([f'{x:02x}' for x in l])
def list2hexlist(l):
'''Converts a list of integers to a two-letter prefixed hex string in the form
"[0x1a, 0x32, 0xab]"'''
return '[' + ', '.join([f'{x:#04x}' for x in l]) + ']'
class DataLogger(object):
'''
A wrapper to log data transfer between the device and Tuhi. Use as::
logger = DataLogger()
logger.nordic.send([1, 2, 3...])
logger.nordic.recv([1, 2, 3...])
This uses a logger for stdout, but it also writes the log files to disk
for future re-use.
'''
class _Nordic(object):
source = 'NORDIC'
def __init__(self, parent):
self.parent = parent
def recv(self, data):
return self.parent._recv(self.source, data)
def send(self, data):
return self.parent._send(self.source, data)
class _Pen(object):
source = 'PEN'
def __init__(self, parent):
self.parent = parent
def recv(self, data):
return self.parent._recv(self.source, data)
class _Mysterious(object):
source = 'MYSTERIOUS'
def __init__(self, parent):
self.parent = parent
def recv(self, data):
return self.parent._recv(self.source, data)
commands = {
0xb1: 'start/stop live',
0xb6: 'set time',
0xb7: 'get firmware',
0xb9: 'read battery info',
0xbb: 'get/set name',
0xc1: 'check for data',
0xc3: 'start reading',
0xc5: 'fetch data',
0xc8: 'end of data',
0xca: 'ack transaction',
0xcc: 'fetch data',
0xea: 'get dimensions',
0xe5: 'finish registering',
0xe6: 'check connection',
0xdb: 'get name',
}
def __init__(self, bluez_device):
import xdg.BaseDirectory
self.logger = logging.getLogger('tuhi.fw')
self.device = bluez_device
self.btaddr = bluez_device.address
self.logdir = os.path.join(xdg.BaseDirectory.xdg_data_home, 'tuhi', self.btaddr)
os.makedirs(self.logdir, exist_ok=True)
bluez_device.connect('connected', self._on_bluez_connected)
bluez_device.connect('disconnected', self._on_bluez_disconnected)
self.nordic = DataLogger._Nordic(self)
self.pen = DataLogger._Pen(self)
self.mysterious = DataLogger._Mysterious(self)
self.logfile = None
def _on_bluez_connected(self, bluez_device):
self._init_file()
def _on_bluez_disconnected(self, bluez_device):
self._close_file()
def _init_file(self):
if self.logfile is not None:
return
timestamp = time.strftime('%Y%m%d-%H%M%S')
fname = f'log-{timestamp}.yaml'
path = os.path.join(self.logdir, fname)
self.logfile = open(path, 'w+')
self.logfile.write(f'name: {self.device.name}\n')
self.logfile.write(f'bluetooth: {self.btaddr}\n')
self.logfile.write(f'time: {time.strftime("%Y-%m-%d %H:%M:%S")}\n')
self.logfile.write(f'data:\n')
def _close_file(self):
if self.logfile is None:
return
self.logfile.write(f'# closed at {time.strftime("%Y-%m-%d %H:%M:%S")}')
self.logfile.close()
self.logfile = None
def _recv(self, source, data):
if source in ['NORDIC', 'PEN']:
def _convert(values): return list2hex(values)
convert = _convert
else:
def _convert(values): return binascii.hexlify(bytes(values))
convert = _convert
self.logger.debug(f'{self.btaddr}: RX {source} <-- {convert(data)}')
self._init_file()
if data[0] in self.commands:
self.logfile.write(f'# {self.commands[data[0]]}\n')
self.logfile.write(f' - recv: {list2hexlist(data)}\n')
if source != 'NORDIC':
self.logfile.write(f' source: {source}')
def _send(self, source, data):
command = data[0]
arguments = data[1:]
if data[0] in self.commands:
self.logger.debug(f'command: {self.commands[data[0]]}')
self.logger.debug(f'{self.btaddr}: TX {source} --> {command:02x} / {len(arguments):02x} / {list2hex(arguments)}')
self._init_file()
if data[0] in self.commands:
self.logfile.write(f'# {self.commands[data[0]]}\n')
self.logfile.write(f' - send: {list2hexlist(data)}\n')
if source != 'NORDIC':
self.logfile.write(f' source: {source}')
class NordicData(list):
'''A set of bytes as expected by the Nordic controller on the device.
First byte is the opcode, second byte is the length, rest is data.
This is an abstraction of a list. Instantiate with the full raw data,
the list contents will just be the data bytes.
'''
def __init__(self, bs):
super().__init__(bs[2:])
self.opcode = bs[0]
@ -313,21 +463,21 @@ class WacomProtocolLowLevelComm(GObject.Object):
GObject.Object.__init__(self)
self.device = device
self.nordic_answer = []
self.fw_logger = logging.getLogger('tuhi.fw')
self.fw_logger = DataLogger(device)
self.nordic_event = threading.Semaphore(value=0)
device.connect_gatt_value(NORDIC_UART_CHRC_RX_UUID,
self._on_nordic_data_received)
def _on_nordic_data_received(self, name, value):
self.fw_logger.debug(f'RX Nordic <-- {list2hex(value)}')
self.fw_logger.nordic.recv(value)
self.nordic_answer += value
self.nordic_event.release()
def send_nordic_command(self, command, arguments):
chrc = self.device.characteristics[NORDIC_UART_CHRC_TX_UUID]
data = [command, len(arguments), *arguments]
self.fw_logger.debug(f'TX Nordic --> {command:02x} / {len(arguments):02x} / {list2hex(arguments)}')
self.fw_logger.nordic.send(data)
chrc.write_value(data)
def check_nordic_incoming(self):
@ -546,7 +696,7 @@ class WacomProtocolBase(WacomProtocolLowLevelComm):
self._timestamp += 5
def _on_pen_data_received(self, name, data):
self.fw_logger.debug(f'RX Pen <-- {list2hex(data)}')
self.fw_logger.pen.recv(data)
self.pen_data_buffer.extend(data)
self._last_pen_data_time = time.time()
@ -906,7 +1056,7 @@ class WacomProtocolSlate(WacomProtocolSpark):
return super().live_mode(mode, uhid)
def _on_mysterious_data_received(self, name, value):
self.fw_logger.debug(f'mysterious: {binascii.hexlify(bytes(value))}')
self.fw_logger.mysterious.recv(value)
def ack_transaction(self):
self.send_nordic_command_sync(command=0xca)