tuhi/tuhi/base.py

478 lines
17 KiB
Python

#!/usr/bin/env python3
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
import argparse
import enum
import logging
import sys
import time
import xdg.BaseDirectory
from pathlib import Path
try:
from gi.repository import GObject, GLib
except Exception as e:
print('************ Importing gi.repository failed **********')
print('* This is an issue with the gi module, not with tuhi *')
print('******************************************************')
print('The full exception is below:')
print('')
raise e
from tuhi.dbusserver import TuhiDBusServer
from tuhi.ble import BlueZDeviceManager
from tuhi.wacom import WacomDevice, DeviceMode
from tuhi.config import TuhiConfig
DEFAULT_CONFIG_PATH = Path(xdg.BaseDirectory.xdg_data_home, 'tuhi')
logger = logging.getLogger('tuhi')
WACOM_COMPANY_IDS = [0x4755, 0x4157, 0x424d]
class TuhiDevice(GObject.Object):
'''
Glue object to combine the backend bluez DBus object (that talks to the
real device) with the frontend DBusServer object that exports the device
over Tuhi's DBus interface
'''
class BatteryState(enum.Enum):
UNKNOWN = 0
CHARGING = 1
DISCHARGING = 2
__gsignals__ = {
# Signal sent when an error occurs on the device itself.
# Argument is a Wacom*Exception
'device-error':
(GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_PYOBJECT,)),
}
BATTERY_UPDATE_MIN_INTERVAL = 300
def __init__(self, bluez_device, config, uuid=None, mode=DeviceMode.LISTEN):
GObject.Object.__init__(self)
self.config = config
self._wacom_device = None
# We need either uuid or registered as false
assert uuid is not None or mode == DeviceMode.REGISTER
self._mode = mode
self._battery_state = TuhiDevice.BatteryState.UNKNOWN
self._battery_percent = 0
self._last_battery_update_time = 0
self._battery_timer_source = None
self._signals = {'connected': None,
'disconnected': None}
self._bluez_device = bluez_device
self._tuhi_dbus_device = None
@GObject.Property
def dimensions(self):
if self._wacom_device is None:
return 0, 0
return self._wacom_device.dimensions
@GObject.Property
def mode(self):
return self._mode
@mode.setter
def mode(self, mode):
if self._mode != mode:
self._mode = mode
self.notify('registered')
@GObject.Property
def registered(self):
return self.mode == DeviceMode.LISTEN
@GObject.Property
def name(self):
return self._bluez_device.name
@GObject.Property
def address(self):
return self._bluez_device.address
@GObject.Property
def bluez_device(self):
return self._bluez_device
@GObject.Property
def dbus_device(self):
return self._tuhi_dbus_device
@dbus_device.setter
def dbus_device(self, device):
assert self._tuhi_dbus_device is None
self._tuhi_dbus_device = device
self._tuhi_dbus_device.connect('register-requested', self._on_register_requested)
self._tuhi_dbus_device.connect('notify::listening', self._on_listening_updated)
self._tuhi_dbus_device.connect('notify::live', self._on_live_updated)
drawings = self.config.load_drawings(self.address)
if drawings:
logger.debug(f'{self.address}: loaded {len(drawings)} drawings from disk')
for d in drawings:
self._tuhi_dbus_device.add_drawing(d)
@GObject.Property
def listening(self):
return self._tuhi_dbus_device.listening
@GObject.Property
def live(self):
return self._tuhi_dbus_device.live
@GObject.Property
def battery_percent(self):
return self._battery_percent
@battery_percent.setter
def battery_percent(self, value):
self._battery_percent = value
@GObject.Property
def battery_state(self):
return self._battery_state
@battery_state.setter
def battery_state(self, value):
self._battery_state = value
@GObject.Property
def sync_state(self):
return self._sync_state
def _connect_device(self, mode):
if self._signals['connected'] is None:
self._signals['connected'] = self._bluez_device.connect('connected', self._on_bluez_device_connected, mode)
if self._signals['disconnected'] is None:
self._signals['disconnected'] = self._bluez_device.connect('disconnected', self._on_bluez_device_disconnected)
self._bluez_device.connect_device()
def register(self):
self._connect_device(DeviceMode.REGISTER)
def listen(self):
self._connect_device(DeviceMode.LISTEN)
def _on_bluez_device_connected(self, bluez_device, mode):
logger.debug(f'{bluez_device.address}: connected for {mode}')
if self._wacom_device is None:
self._wacom_device = WacomDevice(bluez_device, self.config)
self._wacom_device.connect('drawing', self._on_drawing_received)
self._wacom_device.connect('done', self._on_fetching_finished, bluez_device)
self._wacom_device.connect('button-press-required', self._on_button_press_required)
self._wacom_device.connect('notify::uuid', self._on_uuid_updated, bluez_device)
self._wacom_device.connect('battery-status', self._on_battery_status, bluez_device)
self._wacom_device.connect('notify::sync-state', self._on_sync_state)
self._wacom_device.connect('notify::dimensions', self._on_dimensions)
if mode == DeviceMode.REGISTER:
self._wacom_device.start_register()
elif mode == DeviceMode.LIVE:
self._wacom_device.start_live(self._tuhi_dbus_device.uhid_fd)
else:
self._wacom_device.start_listen()
try:
bluez_device.disconnect(self._signals['connected'])
self._signals['connected'] = None
except KeyError:
pass
def _on_dimensions(self, device, pspec):
self.notify('dimensions')
def _on_sync_state(self, device, pspec):
self._sync_state = device.sync_state
self.notify('sync-state')
def _on_bluez_device_disconnected(self, bluez_device):
logger.debug(f'{bluez_device.address}: disconnected')
try:
bluez_device.disconnect(self._signals['disconnected'])
self._signals['disconnected'] = None
except KeyError:
pass
def _on_register_requested(self, dbus_device):
# FIXME: this needs to throw an exception/return the value
if self.mode == DeviceMode.LISTEN:
return
self.register()
def _on_drawing_received(self, device, drawing):
logger.debug('Drawing received')
self._tuhi_dbus_device.add_drawing(drawing)
self.config.store_drawing(self.address, drawing)
def _on_fetching_finished(self, device, exception, bluez_device):
if self.live:
return
bluez_device.disconnect_device()
if exception is not None:
logger.info(exception)
self.emit('device-error', exception)
def _on_button_press_required(self, device):
self._tuhi_dbus_device.notify_button_press_required()
def _on_uuid_updated(self, wacom_device, pspec, bluez_device):
self.config.new_device(bluez_device.address, wacom_device.uuid, wacom_device.protocol)
# FIXME: we have registered and that *should* set us to listen. But
# the ManufacturerData doesn't update until (some time into) the
# next connection request.
self.mode = DeviceMode.LISTEN
def _on_listening_updated(self, dbus_device, pspec):
# Callback when a DBus client calls Start/Stop listening
self.notify('listening')
def _on_live_updated(self, dbus_device, pspec):
if self.live:
self._connect_device(DeviceMode.LIVE)
else:
if self._wacom_device is not None:
self._wacom_device.stop_live()
def _on_battery_status(self, wacom_device, percent, is_charging, bluez_device):
if is_charging:
self.battery_state = TuhiDevice.BatteryState.CHARGING
else:
self.battery_state = TuhiDevice.BatteryState.DISCHARGING
self.battery_percent = percent
# If we don't get battery updates for a while, switch the state
# to unknown
if self._battery_timer_source is not None:
GObject.source_remove(self._battery_timer_source)
self._battery_timer_source = \
GObject.timeout_add_seconds(self.BATTERY_UPDATE_MIN_INTERVAL,
self._on_battery_timeout)
self._last_battery_update_time = time.time()
def _on_battery_timeout(self):
if self._last_battery_update_time < time.time() - self.BATTERY_UPDATE_MIN_INTERVAL:
self.battery_state = TuhiDevice.BatteryState.UNKNOWN
self._battery_timer_source = None # gets auto-destroyed
return False
class Tuhi(GObject.Object):
'''
The Tuhi object is the main entry point and glue object between the
backend and the DBus server.
'''
__gsignals__ = {
'device-added':
(GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_PYOBJECT,)),
'device-connected':
(GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_PYOBJECT,)),
'terminate':
(GObject.SignalFlags.RUN_FIRST, None, ()),
}
def __init__(self, config_dir=None):
GObject.Object.__init__(self)
self.server = TuhiDBusServer()
self.server.connect('bus-name-acquired', self._on_tuhi_bus_name_acquired)
self.server.connect('bus-name-lost', self._on_tuhi_bus_name_lost)
self.server.connect('search-start-requested', self._on_start_search_requested)
self.server.connect('search-stop-requested', self._on_stop_search_requested)
self.bluez = BlueZDeviceManager()
self.bluez.connect('discovery-started', self._on_bluez_discovery_started)
self.bluez.connect('discovery-stopped', self._on_bluez_discovery_stopped)
self.config = TuhiConfig()
self.devices = {}
self._search_stop_handler = None
def _on_tuhi_bus_name_acquired(self, dbus_server):
self.bluez.connect_to_bluez()
for dev in self.bluez.devices:
self._add_device(self.bluez, dev)
self.bluez.connect('device-added',
lambda mgr, dev: self._add_device(mgr, dev, True))
self.bluez.connect('device-updated',
lambda mgr, dev: self._add_device(mgr, dev, True))
def _on_tuhi_bus_name_lost(self, dbus_server):
self.emit('terminate')
def _on_start_search_requested(self, dbus_server, stop_handler):
self._search_stop_handler = stop_handler
self.bluez.start_discovery()
def _on_stop_search_requested(self, dbus_server):
# If you request to stop, you get a successful stop and we ignore
# anything the server does underneath
self._search_stop_handler(0)
self._search_stop_handler = None
self.bluez.stop_discovery()
self._search_device_handler = None
unregistered = [addr for (addr, d) in self.devices.items() if not d.registered]
for addr in unregistered:
del self.devices[addr]
def _on_bluez_discovery_started(self, manager):
# Something else may turn discovery mode on, we don't care about
# it then
if not self._search_stop_handler:
return
def _on_bluez_discovery_stopped(self, manager):
if self._search_stop_handler is not None:
self._search_stop_handler(0)
# restart discovery if some users are already in the listening mode
self._on_listening_updated(None, None)
def _add_device(self, manager, bluez_device, from_live_update=False):
'''
Process a new BlueZ device that may be one of our devices.
This function is called once during intial setup to enumerate the
BlueZ devices and for every BlueZ device property change. Including
RSSI which will give you a value every second or so.
.. :param from_live_update: True if this function was called from a BlueZ
device property update. False when called during the initial setup
stage.
'''
# We have a reverse-engineered protocol. Let's not talk to anyone
# who doesn't look like we know them to avoid potentially bricking a
# device. If the vendor id is None it may still be one of our
# devices, provided it's been registered previously.
if bluez_device.vendor_id is not None and bluez_device.vendor_id not in WACOM_COMPANY_IDS:
return
# check if the device is already known to us
try:
config = self.config.devices[bluez_device.address]
uuid = config['uuid']
except KeyError:
if bluez_device.vendor_id is None:
return
uuid = None
# if we got here from a currently live BlueZ device,
# ManufacturerData is reliable. Else, consider the device not in
# register mode
#
# When the device is in register mode (blue light blinking), the
# manufacturer is merely 4 bytes. This will reset to 7 bytes even
# when the device simply times out and does not register fully.
if from_live_update and len(bluez_device.manufacturer_data or []) == 4:
mode = DeviceMode.REGISTER
else:
mode = DeviceMode.LISTEN
if uuid is None:
logger.info(f'{bluez_device.address}: device without config, must be registered first')
return
logger.debug(f'{bluez_device.address}: UUID {uuid} protocol: {config["Protocol"]}')
# create the device if unknown from us
if bluez_device.address not in self.devices:
d = TuhiDevice(bluez_device, self.config, uuid, mode)
d.dbus_device = self.server.create_device(d)
d.connect('notify::listening', self._on_listening_updated)
self.devices[bluez_device.address] = d
d = self.devices[bluez_device.address]
if mode == DeviceMode.REGISTER:
d.mode = mode
logger.debug(f'{bluez_device.objpath}: call Register() on device')
elif d.listening:
d.listen()
def _on_listening_updated(self, tuhi_dbus_device, pspec):
listen = self._search_stop_handler is not None
for dev in self.devices.values():
if dev.listening:
listen = True
break
if listen:
self.bluez.start_discovery()
else:
self.bluez.stop_discovery()
def setup_logging(config_dir):
session_log_file = Path(config_dir, 'session-logs', f'tuhi-{time.strftime("%y-%m-%d-%H:%M:%S")}.log')
session_log_file.parent.mkdir(parents=True, exist_ok=True)
formatter = logging.Formatter(fmt='%(asctime)s %(levelname)s: %(name)s: %(message)s',
datefmt='%H:%M:%S')
fh = logging.FileHandler(session_log_file)
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.addHandler(fh)
logger.info(f'Session log: {session_log_file}')
def main(args=sys.argv):
if sys.version_info < (3, 6):
sys.exit('Python 3.6 or later required')
desc = 'Daemon to extract the pen stroke data from Wacom SmartPad devices'
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('-v', '--verbose',
help='Show some debugging informations',
action='store_true',
default=False)
parser.add_argument('--config-dir',
help='Base directory for configuration',
type=str,
default=DEFAULT_CONFIG_PATH)
parser.add_argument('--peek',
help='Download first drawing only but do not remove it from the device',
action='store_true',
default=False)
ns = parser.parse_args(args[1:])
TuhiConfig.set_base_path(ns.config_dir)
TuhiConfig().peek_at_drawing = ns.peek
setup_logging(ns.config_dir)
if ns.verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
try:
mainloop = GLib.MainLoop()
tuhi = Tuhi(config_dir=ns.config_dir)
tuhi.connect('terminate', lambda tuhi: mainloop.quit())
mainloop.run()
except KeyboardInterrupt:
pass