From 99a2f7bdc5d8fb057f9ddf9ffe9185bd28704233 Mon Sep 17 00:00:00 2001 From: Benjamin Tissoires Date: Tue, 13 Feb 2018 20:01:56 +0100 Subject: [PATCH] kete: parse incoming uhid data --- tools/kete.py | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/tools/kete.py b/tools/kete.py index ae4d31c..6d77c74 100755 --- a/tools/kete.py +++ b/tools/kete.py @@ -14,6 +14,7 @@ from gi.repository import GObject, Gio, GLib import sys import argparse +import binascii import cmd import errno import os @@ -21,6 +22,7 @@ import json import logging import re import readline +import struct import threading import time import svgwrite @@ -80,6 +82,12 @@ completer_delims = completer_delims.replace(':', '') readline.set_completer_delims(completer_delims) +def b2hex(bs): + '''Convert bytes() to a two-letter hex string in the form "1a 2b c3"''' + hx = binascii.hexlify(bs).decode('ascii') + return ' '.join([''.join(s) for s in zip(hx[::2], hx[1::2])]) + + class DBusError(Exception): def __init__(self, message): self.message = message @@ -613,8 +621,31 @@ class LiveChanger(Worker): if self.device is None: return - logger.debug(f'{self.device}: starting live mode') - self.device.start_live(-1) + read_fd, write_fd = os.pipe() + + logger.info(f'{self.device}: starting live mode, please press button on device') + self._cb = GLib.io_add_watch(read_fd, GLib.IO_IN, self._on_uhid_data) + self.device.start_live(write_fd) + + def _on_uhid_data(self, source, cb_condition): + buf = os.read(source, 4380) + + header = '< L' + uhid_type = struct.unpack_from(header, buf)[0] + + if uhid_type == 11: # UHID_CREATE2 + fmt = '< L 128s 64s 64s H H L L L L 4096s' + uhid_type, name, phys, uniq, rdesc_size, bus, vid, pid, version, country, rdesc = struct.unpack_from(fmt, buf) + name = name.rstrip(b'\x00') + rdesc = rdesc[:rdesc_size] + logger.info(f'Live mode started for device {name} with rdesc {b2hex(rdesc)}') + elif uhid_type == 12: # UHID_INPUT2 + fmt = '< L H 4096s' + uhid_type, data_len, data = struct.unpack_from(fmt, buf) + data = data[:data_len] + logger.info(f'Live data: {b2hex(data)}') + + return True def stop(self): logger.debug(f'{self.device}: stopping live mode') @@ -625,6 +656,7 @@ class LiveChanger(Worker): e.code != Gio.IOErrorEnum.EXISTS or Gio.dbus_error_get_remote_error(e) != 'org.freedesktop.DBus.Error.ServiceUnknown'): raise e + GLib.source_remove(self._cb) class TuhiKeteShellLogHandler(logging.StreamHandler):