kete: parse incoming uhid data

This commit is contained in:
Benjamin Tissoires 2018-02-13 20:01:56 +01:00 committed by Peter Hutterer
parent d295e12310
commit 99a2f7bdc5
1 changed files with 34 additions and 2 deletions

View File

@ -14,6 +14,7 @@
from gi.repository import GObject, Gio, GLib
import sys
import argparse
import binascii
import cmd
import errno
import os
@ -21,6 +22,7 @@ import json
import logging
import re
import readline
import struct
import threading
import time
import svgwrite
@ -80,6 +82,12 @@ completer_delims = completer_delims.replace(':', '')
readline.set_completer_delims(completer_delims)
def b2hex(bs):
'''Convert bytes() to a two-letter hex string in the form "1a 2b c3"'''
hx = binascii.hexlify(bs).decode('ascii')
return ' '.join([''.join(s) for s in zip(hx[::2], hx[1::2])])
class DBusError(Exception):
def __init__(self, message):
self.message = message
@ -613,8 +621,31 @@ class LiveChanger(Worker):
if self.device is None:
return
logger.debug(f'{self.device}: starting live mode')
self.device.start_live(-1)
read_fd, write_fd = os.pipe()
logger.info(f'{self.device}: starting live mode, please press button on device')
self._cb = GLib.io_add_watch(read_fd, GLib.IO_IN, self._on_uhid_data)
self.device.start_live(write_fd)
def _on_uhid_data(self, source, cb_condition):
buf = os.read(source, 4380)
header = '< L'
uhid_type = struct.unpack_from(header, buf)[0]
if uhid_type == 11: # UHID_CREATE2
fmt = '< L 128s 64s 64s H H L L L L 4096s'
uhid_type, name, phys, uniq, rdesc_size, bus, vid, pid, version, country, rdesc = struct.unpack_from(fmt, buf)
name = name.rstrip(b'\x00')
rdesc = rdesc[:rdesc_size]
logger.info(f'Live mode started for device {name} with rdesc {b2hex(rdesc)}')
elif uhid_type == 12: # UHID_INPUT2
fmt = '< L H 4096s'
uhid_type, data_len, data = struct.unpack_from(fmt, buf)
data = data[:data_len]
logger.info(f'Live data: {b2hex(data)}')
return True
def stop(self):
logger.debug(f'{self.device}: stopping live mode')
@ -625,6 +656,7 @@ class LiveChanger(Worker):
e.code != Gio.IOErrorEnum.EXISTS or
Gio.dbus_error_get_remote_error(e) != 'org.freedesktop.DBus.Error.ServiceUnknown'):
raise e
GLib.source_remove(self._cb)
class TuhiKeteShellLogHandler(logging.StreamHandler):