Compare commits

..

No commits in common. "master" and "0.2" have entirely different histories.
master ... 0.2

38 changed files with 1121 additions and 1629 deletions

18
.circleci/config.yml Normal file
View File

@ -0,0 +1,18 @@
version: 2
jobs:
build:
working_directory: ~/tuhi
docker:
- image: fedora:latest
steps:
- run:
command: |
dnf install -y meson gettext python3-devel pygobject3-devel python3-flake8 desktop-file-utils libappstream-glib python3-pytest python3-pyxdg python3-pyyaml
- checkout
- run:
command: |
meson builddir
ninja -C builddir test
- store_artifacts:
path: ~/tuhi/builddir/meson-logs

View File

@ -1,26 +0,0 @@
on: [ push, pull_request ]
env:
CFLAGS: "-Werror -Wall -Wextra -Wno-error=sign-compare -Wno-error=unused-parameter -Wno-error=missing-field-initializers"
UBUNTU_PACKAGES: meson gettext python3-dev python-gi-dev flake8 desktop-file-utils libappstream-glib-dev appstream-util python3-pytest python3-xdg python3-yaml python3-svgwrite python3-cairo
jobs:
meson_test:
runs-on: ubuntu-20.04
steps:
- name: Install dependencies
run: |
sudo apt-get update -yq
sudo apt-get install -yq --no-install-suggests --no-install-recommends $UBUNTU_PACKAGES
- uses: actions/checkout@v3
- name: meson
run: meson builddir
- name: ninja
run: ninja -C builddir test
- name: capture build logs
uses: actions/upload-artifact@v3
if: ${{ always() }} # even if we fail
with:
name: meson logs
path: |
builddir/meson-logs

View File

@ -26,7 +26,7 @@ To build and run Tuhi from the repository directly:
```
$> git clone http://github.com/tuhiproject/tuhi
$> cd tuhi
$> meson setup builddir
$> meson builddir
$> ninja -C builddir
$> ./builddir/tuhi.devel
```
@ -41,7 +41,7 @@ To install and run Tuhi:
```
$> git clone http://github.com/tuhiproject/tuhi
$> cd tuhi
$> meson setup builddir
$> meson builddir
$> ninja -C builddir install
```

View File

@ -9,6 +9,5 @@
<file preprocess="xml-stripblanks">ui/SetupPerspective.ui</file>
<file preprocess="xml-stripblanks">ui/ErrorPerspective.ui</file>
<file preprocess="xml-stripblanks">input-tablet-missing-symbolic.svg</file>
<file preprocess="xml-stripblanks">ui/AppMenu.ui</file>
</gresource>
</gresources>

View File

@ -8,7 +8,7 @@
<property name="type_hint">normal</property>
<property name="program_name">Tuhi</property>
<property name="version">@version@</property>
<property name="copyright">Copyright © 2020 Tuhi Developers</property>
<property name="copyright">Copyright © 2019 Tuhi Developers</property>
<property name="website">@url@</property>
<property name="website_label" translatable="yes">Visit Tuhis website</property>
<property name="logo_icon_name">org.freedesktop.Tuhi</property>

View File

@ -1,26 +0,0 @@
<interface>
<menu id="primary-menu">
<section>
<item>
<attribute name="label" translatable="yes">Portrait</attribute>
<attribute name="action">win.orientation</attribute>
<attribute name="target">portrait</attribute>
</item>
<item>
<attribute name="label" translatable="yes">Landscape</attribute>
<attribute name="action">win.orientation</attribute>
<attribute name="target">landscape</attribute>
</item>
</section>
<section>
<item>
<attribute name="label" translatable="yes">Help</attribute>
<attribute name="action">app.help</attribute>
</item>
<item>
<attribute name="label" translatable="yes">About</attribute>
<attribute name="action">app.about</attribute>
</item>
</section>
</menu>
</interface>

View File

@ -140,7 +140,7 @@
</object>
<packing>
<property name="name">page0</property>
<property name="title">page0</property>
<property name="title" translatable="yes">page0</property>
</packing>
</child>
</template>

View File

@ -22,7 +22,7 @@
Tuhi connects to tablets of the Wacom Ink range. It allows you to download the drawings stored on those devices as SVGs for processing later.
Tuhi is a DBus server that needs to be running for the Tuhi GUI to connect to it. Connecting to the DBus server should take less than a second. If you read this far, your Tuhi DBus server is not running or responding and needs to be restarted.</property>
Tuhi is a DBus server that needs to be running for the Tuhi GUI to connect to it. Connecting to the DBus server should take less than a second. If you read this far, your Tuhi DBus server is not running or responsing and needs to be restarted.</property>
<property name="wrap">True</property>
<property name="max_width_chars">55</property>
</object>

View File

@ -118,7 +118,7 @@
</object>
<packing>
<property name="name">page0</property>
<property name="title">page0</property>
<property name="title" translatable="yes">page0</property>
</packing>
</child>
<child>
@ -185,7 +185,7 @@
</object>
<packing>
<property name="name">page1</property>
<property name="title">page1</property>
<property name="title" translatable="yes">page1</property>
<property name="position">1</property>
</packing>
</child>
@ -255,7 +255,7 @@
</object>
<packing>
<property name="name">page2</property>
<property name="title">page2</property>
<property name="title" translatable="yes">page2</property>
<property name="position">2</property>
</packing>
</child>

View File

@ -1,12 +1,13 @@
project('tuhi',
version: '0.6',
version: '0.2',
license: 'GPLv2',
meson_version: '>= 0.50.0')
meson_version: '>= 0.48.0')
# The tag date of the project_version(), update when the version bumps.
version_date='2022-04-28'
version_date='2019-08-26'
# Dependencies
dependency('pygobject-3.0', version: '>= 3.30', required: true)
dependency('python3', required: true)
dependency('pygobject-3.0', required: true)
prefix = get_option('prefix')
datadir = join_paths(prefix, get_option('datadir'))
@ -23,32 +24,13 @@ libexecdir = join_paths(get_option('prefix'), get_option('libexecdir'), 'tuhi')
i18n = import('i18n')
# Workaround for https://github.com/mesonbuild/meson/issues/6165
find_program('gettext')
subdir('po')
subdir('data')
# Find the directory to install our Python code
pymod = import('python')
# external python modules that are required for running Tuhi
python_modules = [
'svgwrite',
'xdg',
'gi',
'cairo',
]
if meson.version().version_compare('>=0.51')
py3 = pymod.find_installation(modules: python_modules)
else
py3 = pymod.find_installation()
foreach module: python_modules
if run_command(py3, '-c', 'import @0@'.format(module)).returncode() != 0
error('Failed to find required python module \'@0@\'.'.format(module))
endif
endforeach
endif
py3 = pymod.find_installation()
python_dir = py3.get_install_dir()
install_subdir('tuhi',
install_dir: python_dir,
@ -139,7 +121,7 @@ appdata = i18n.merge_file(input: appdata_intl,
install_data('data/org.freedesktop.Tuhi.svg', install_dir: icondir)
flake8 = find_program('flake8-3', 'flake8', required: false)
flake8 = find_program('flake8-3', required: false)
if flake8.found()
test('flake8', flake8,
args: ['--ignore=E501,W504',

View File

@ -1,18 +1,17 @@
{
"app-id": "org.freedesktop.Tuhi",
"runtime": "org.gnome.Platform",
"runtime-version": "42",
"runtime-version": "3.32",
"sdk": "org.gnome.Sdk",
"command": "tuhi",
"finish-args": [
"finish-args": [
"--share=ipc",
"--socket=x11",
"--socket=wayland",
"--talk-name=org.freedesktop.tuhi1",
"--own-name=org.freedesktop.tuhi1",
"--system-talk-name=org.bluez",
"--filesystem=home"
],
"--system-talk-name=org.bluez"
],
"modules": [
{
"name": "pyxdg",
@ -20,9 +19,7 @@
"sources": [
{
"type": "git",
"url": "https://gitlab.freedesktop.org/xdg/pyxdg.git",
"tag": "rel-0.27",
"commit": "f097a66923a65e93640c48da83e6e9cfbddd86ba"
"url": "git://anongit.freedesktop.org/xdg/pyxdg"
}
],
"build-commands": [
@ -35,8 +32,8 @@
"sources": [
{
"type": "archive",
"url": "https://github.com/pyparsing/pyparsing/releases/download/pyparsing_2.4.7/pyparsing-2.4.7.tar.gz",
"sha512": "0b9f8f18907f65cb3af1b48ed57989e183f28d71646f2b2f820e772476f596ca15ee1a689f3042f18458206457f4683d10daa6e73dfd3ae82d5e4405882f9dd2"
"url": "https://github.com/pyparsing/pyparsing/releases/download/pyparsing_2.4.2/pyparsing-2.4.2.tar.gz",
"sha512": "27e5959eb1cf0c4d899746d2d32f5f000c3753278bdbbb670d24a077053e5c08caf8429f684186c502f6d9bf358702e0a8b3fea40cd2b50807cf02ea38c750dd"
}
],
"build-commands": [
@ -49,9 +46,7 @@
"sources": [
{
"type": "git",
"url": "https://github.com/mozman/svgwrite.git",
"tag": "v1.4.2",
"commit": "e2617741ab018956e638e18aa21827405bd8edd1"
"url": "https://github.com/mozman/svgwrite.git"
}
],
"build-commands": [

View File

@ -1,4 +1 @@
# Language list must be in alphabetical order
it
pl
tr

View File

@ -2,15 +2,16 @@ data/org.freedesktop.Tuhi.appdata.xml.in.in
data/org.freedesktop.Tuhi.desktop.in
data/ui/AboutDialog.ui.in
data/ui/AppMenu.ui
data/ui/Drawing.ui
data/ui/DrawingPerspective.ui
data/ui/ErrorPerspective.ui
data/ui/MainWindow.ui
data/ui/SetupPerspective.ui
tuhi/gui/application.py
tuhi/gui/config.py
tuhi/gui/drawing.py
tuhi/gui/drawingperspective.py
tuhi/gui/window.py
tuhigui/application.py
tuhigui/config.py
tuhigui/drawing.py
tuhigui/drawingperspective.py
tuhigui/svg.py
tuhigui/tuhi.py
tuhigui/window.py

193
po/it.po
View File

@ -1,193 +0,0 @@
# Italian translation for tuhi.
# Copyright © 2019 the tuhi authors.
# This file is distributed under the same license as the tuhi package.
# ALBANO BATTISTELLA <albano_battistella@hotmail.com>, 2020,2022.
#
msgid ""
msgstr ""
"Project-Id-Version: tuhi\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2022-04-28 09:34+1000\n"
"PO-Revision-Date: 2022-04-25 18:23+0100\n"
"Last-Translator: Albano Battistella <piotrd>\n"
"Language-Team: Italian <>\n"
"Language: it\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Plural-Forms: nplurals=3; plural=(n==1 ? 0 : n%10>=2 && n%10<=4 && (n%100<10 "
"|| n%100>=20) ? 1 : 2);\n"
#: data/org.freedesktop.Tuhi.appdata.xml.in.in:7
#: data/org.freedesktop.Tuhi.desktop.in:3
msgid "Tuhi"
msgstr "Tuhi"
#: data/org.freedesktop.Tuhi.appdata.xml.in.in:8
#: data/org.freedesktop.Tuhi.desktop.in:4
msgid "Utility to download drawings from the Wacom Ink range of devices"
msgstr ""
"Utilità per scaricare i disegni dalla gamma Inchiostro di dispositivi Wacom"
#: data/org.freedesktop.Tuhi.appdata.xml.in.in:10
msgid ""
"Tuhi is a graphical user interface to download drawings stored on tablet "
"devices from the Wacom Ink range, e.g. Intuos Pro Paper or Bamboo Slate."
msgstr ""
"Tuhi è un'interfaccia utente grafica per scaricare disegni archiviati su "
"tablet dispositivi della gamma di Inchiostro Wacom, ad esempio Intuos Pro "
"Paper o Bamboo Slate."
#: data/org.freedesktop.Tuhi.appdata.xml.in.in:15
#, fuzzy
msgid ""
"Tuhi requires Tuhi, the daemon to actually communicate with the devices. "
"ThiGui is merely a front end to Tuhi, Tuhi must be installed and running "
"when Tuhi is launched."
msgstr ""
"Tuhi richiede Tuhi, il demone per comunicare effettivamente con i "
"dispositivi.ThiGui è semplicemente un front-end di Tuhi, Tuhi deve essere "
"installato e funzionante quando Tuhi viene lanciato."
#: data/org.freedesktop.Tuhi.appdata.xml.in.in:32
msgid "Tuhi's main window"
msgstr "Finestra principale di Tuhi"
#: data/org.freedesktop.Tuhi.appdata.xml.in.in:36
msgid "Tuhi's main window (zoomed)"
msgstr "Finestra principale di Tuhi (ingrandita)"
#. Translators: Search terms to find this application. Do NOT translate or localize the semicolons! The list MUST also end with a semicolon!
#: data/org.freedesktop.Tuhi.desktop.in:12
msgid "tablet;wacom;ink;"
msgstr "tablet;wacom;inchiostro;"
#: data/ui/AboutDialog.ui.in:13
msgid "Visit Tuhis website"
msgstr "Visita il sito web di Tuhi"
#: data/ui/AppMenu.ui:5
msgid "Portrait"
msgstr "Ritratto"
#: data/ui/AppMenu.ui:10
msgid "Landscape"
msgstr "Paesaggio"
#: data/ui/AppMenu.ui:17
msgid "Help"
msgstr "Aiuto"
#: data/ui/AppMenu.ui:21
msgid "About"
msgstr "Informazioni"
#: data/ui/DrawingPerspective.ui:68
msgid "Undo delete drawing"
msgstr "Annulla cancellazione del disegno"
#: data/ui/DrawingPerspective.ui:132
msgid "Press the button on the device to synchronize drawings"
msgstr "Premere il pulsante sul dispositivo per sincronizzare i disegni"
#: data/ui/ErrorPerspective.ui:21
#, fuzzy
msgid ""
"TuhiGUI is an interactive GUI to download data from Tuhi.\n"
"\n"
"Tuhi connects to tablets of the Wacom Ink range. It allows you to download "
"the drawings stored on those devices as SVGs for processing later.\n"
"\n"
"Tuhi is a DBus server that needs to be running for the Tuhi GUI to connect "
"to it. Connecting to the DBus server should take less than a second. If you "
"read this far, your Tuhi DBus server is not running or responding and needs "
"to be restarted."
msgstr ""
"TuhiGUI è una GUI interattiva per scaricare dati da Tuhi.\n"
"\n"
"Tuhi si collega ai tablet della gamma Wacom. Ti permette di scaricare i "
"disegni memorizzati su tali dispositivi come SVG per l'elaborazione "
"successiva.\n"
"\n"
"Tuhi è un server DBus che deve essere in esecuzione perché la GUI Tuhi possa "
"connettersi ad esso. La connessione al server DBus dovrebbe richiedere meno "
"di un secondo. Se hai letto fin qui, il tuo server Tuhi DBus non è in "
"esecuzione o non risponde e ha bisogno di essere riavviato."
#: data/ui/ErrorPerspective.ui:69
msgid "Connecting to Tuhi"
msgstr "Connessione a Tuhi"
#: data/ui/ErrorPerspective.ui:96
msgid ""
"This should take less than a second. Make sure the Tuhi DBus server is "
"running."
msgstr ""
"Questo dovrebbe richiedere meno di un secondo. Assicurati che il server Tuhi "
"DBus sia in esecuzione."
#: data/ui/MainWindow.ui:166
msgid "Authorization error while connecting to the device "
msgstr "Errore di autorizzazione durante la connessione al dispositivo "
#: data/ui/MainWindow.ui:176
msgid "Register"
msgstr "Registro"
#: data/ui/SetupPerspective.ui:7
msgid "Initial Device Setup"
msgstr "Configurazione iniziale del dispositivo"
#: data/ui/SetupPerspective.ui:30
msgid "Quit"
msgstr "Esci"
#: data/ui/SetupPerspective.ui:70
msgid "Hold the button on the device until the blue light is flashing."
msgstr ""
"Tieni premuto il pulsante sul dispositivo finché la luce blu non lampeggia."
#: data/ui/SetupPerspective.ui:103
msgid "Searching for device"
msgstr "Ricerca del dispositivo"
#: data/ui/SetupPerspective.ui:137
msgid "Connecting to LE Paper"
msgstr "Collegamento a LE Paper"
#: data/ui/SetupPerspective.ui:170
msgid "Connecting to device..."
msgstr "Connessione al dispositivo..."
#: data/ui/SetupPerspective.ui:206
msgid "Press the button on the device now!"
msgstr "Premi ora il pulsante sul dispositivo!"
#: data/ui/SetupPerspective.ui:240
msgid "waiting for reply"
msgstr "in attesa di risposta"
#. Translators: the default filename to save to
#: tuhi/gui/drawing.py:121
msgid "untitled.svg"
msgstr "senza titolo.svg"
#. Translators: filter name to show all/any files
#: tuhi/gui/drawing.py:125
msgid "Any files"
msgstr "Qualsiasi file"
#. Translators: filter to show svg files only
#: tuhi/gui/drawing.py:129
msgid "SVG files"
msgstr "File SVG"
#. Translators: filter to show png files only
#: tuhi/gui/drawing.py:133
msgid "PNG files"
msgstr "File PNG"
#: tuhi/gui/window.py:68
#, python-brace-format
msgid "Connecting to {device.name}"
msgstr "Connessione a {device.name}"

193
po/pl.po
View File

@ -1,193 +0,0 @@
# Polish translation for tuhi.
# Copyright © 2019 the tuhi authors.
# This file is distributed under the same license as the tuhi package.
# Piotr Drąg <piotrdrag@gmail.com>, 2019.
# Aviary.pl <community-poland@mozilla.org>, 2019.
#
msgid ""
msgstr ""
"Project-Id-Version: tuhi\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2022-04-28 09:34+1000\n"
"PO-Revision-Date: 2019-10-24 16:23+0200\n"
"Last-Translator: Piotr Drąg <piotrdrag@gmail.com>\n"
"Language-Team: Polish <community-poland@mozilla.org>\n"
"Language: pl\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Plural-Forms: nplurals=3; plural=(n==1 ? 0 : n%10>=2 && n%10<=4 && (n%100<10 "
"|| n%100>=20) ? 1 : 2);\n"
#: data/org.freedesktop.Tuhi.appdata.xml.in.in:7
#: data/org.freedesktop.Tuhi.desktop.in:3
msgid "Tuhi"
msgstr "Tuhi"
#: data/org.freedesktop.Tuhi.appdata.xml.in.in:8
#: data/org.freedesktop.Tuhi.desktop.in:4
msgid "Utility to download drawings from the Wacom Ink range of devices"
msgstr "Narzędzie do pobierania rysunków z rodziny urządzeń Wacom Ink"
#: data/org.freedesktop.Tuhi.appdata.xml.in.in:10
msgid ""
"Tuhi is a graphical user interface to download drawings stored on tablet "
"devices from the Wacom Ink range, e.g. Intuos Pro Paper or Bamboo Slate."
msgstr ""
"Tuhi to graficzny interfejs użytkownika do pobierania rysunków "
"przechowywanych na tabletach z rodziny Wacom Ink, np. Intuos Pro Paper "
"i Bamboo Slate."
#: data/org.freedesktop.Tuhi.appdata.xml.in.in:15
msgid ""
"Tuhi requires Tuhi, the daemon to actually communicate with the devices. "
"ThiGui is merely a front end to Tuhi, Tuhi must be installed and running "
"when Tuhi is launched."
msgstr ""
"TuhiGUI wymaga Tuhi, usługi komunikującej się z urządzeniem. TuhiGUI to "
"interfejs dla usługi Tuhi, która musi być zainstalowana i uruchomiona, aby "
"narzędzie Tuhi mogło działać."
#: data/org.freedesktop.Tuhi.appdata.xml.in.in:32
msgid "Tuhi's main window"
msgstr "Główne okno Tuhi"
#: data/org.freedesktop.Tuhi.appdata.xml.in.in:36
msgid "Tuhi's main window (zoomed)"
msgstr "Główne okno Tuhi (powiększone)"
#. Translators: Search terms to find this application. Do NOT translate or localize the semicolons! The list MUST also end with a semicolon!
#: data/org.freedesktop.Tuhi.desktop.in:12
msgid "tablet;wacom;ink;"
msgstr "tablet;wacom;ink;"
#: data/ui/AboutDialog.ui.in:13
msgid "Visit Tuhis website"
msgstr "Witryna programu Tuhi"
#: data/ui/AppMenu.ui:5
msgid "Portrait"
msgstr "Pionowo"
#: data/ui/AppMenu.ui:10
msgid "Landscape"
msgstr "Poziomo"
#: data/ui/AppMenu.ui:17
msgid "Help"
msgstr "Pomoc"
#: data/ui/AppMenu.ui:21
msgid "About"
msgstr "O programie"
#: data/ui/DrawingPerspective.ui:68
msgid "Undo delete drawing"
msgstr "Cofnij usunięcie rysunku"
#: data/ui/DrawingPerspective.ui:132
msgid "Press the button on the device to synchronize drawings"
msgstr "Proszę nacisnąć przycisk na urządzeniu, aby zsynchronizować rysunki"
#: data/ui/ErrorPerspective.ui:21
#, fuzzy
msgid ""
"TuhiGUI is an interactive GUI to download data from Tuhi.\n"
"\n"
"Tuhi connects to tablets of the Wacom Ink range. It allows you to download "
"the drawings stored on those devices as SVGs for processing later.\n"
"\n"
"Tuhi is a DBus server that needs to be running for the Tuhi GUI to connect "
"to it. Connecting to the DBus server should take less than a second. If you "
"read this far, your Tuhi DBus server is not running or responding and needs "
"to be restarted."
msgstr ""
"TuhiGUI to interaktywny interfejs użytkownika do pobierania danych z usługi "
"Tuhi.\n"
"\n"
"Tuhi łączy się z tabletami z rodziny Wacom Ink. Umożliwia pobieranie obrazów "
"przechowywanych na tych urządzeniach jako pliki SVG do przetwarzania "
"w późniejszym czasie.\n"
"\n"
"Tuhi to serwer D-Bus, który musi być uruchomiony, aby interfejs Tuhi mógł "
"się z nim połączyć. Połączenie z serwerem D-Bus powinno zająć mniej niż "
"sekundę. Jeśli jeszcze czytasz ten tekst, to znaczy że serwer D-Bus Tuhi nie "
"jest uruchomiony lub nie odpowiada i wymaga ponownego uruchomienia."
#: data/ui/ErrorPerspective.ui:69
msgid "Connecting to Tuhi"
msgstr "Łączenie z usługą Tuhi"
#: data/ui/ErrorPerspective.ui:96
msgid ""
"This should take less than a second. Make sure the Tuhi DBus server is "
"running."
msgstr ""
"To powinno zająć mniej niż sekundę. Proszę się upewnić, że serwer D-Bus Tuhi "
"jest uruchomiony."
#: data/ui/MainWindow.ui:166
msgid "Authorization error while connecting to the device "
msgstr "Błąd upoważnienia podczas łączenia z urządzeniem "
#: data/ui/MainWindow.ui:176
msgid "Register"
msgstr "Zarejestruj"
#: data/ui/SetupPerspective.ui:7
msgid "Initial Device Setup"
msgstr "Pierwsza konfiguracja urządzenia"
#: data/ui/SetupPerspective.ui:30
msgid "Quit"
msgstr "Zakończ"
#: data/ui/SetupPerspective.ui:70
msgid "Hold the button on the device until the blue light is flashing."
msgstr ""
"Proszę przytrzymać przycisk na urządzeniu, aż niebieska dioda zacznie migać."
#: data/ui/SetupPerspective.ui:103
msgid "Searching for device"
msgstr "Wyszukiwanie urządzenia"
#: data/ui/SetupPerspective.ui:137
msgid "Connecting to LE Paper"
msgstr "Łączenie z LE Paper"
#: data/ui/SetupPerspective.ui:170
msgid "Connecting to device..."
msgstr "Łączenie z urządzeniem…"
#: data/ui/SetupPerspective.ui:206
msgid "Press the button on the device now!"
msgstr "Proszę teraz nacisnąć przycisk na urządzeniu."
#: data/ui/SetupPerspective.ui:240
msgid "waiting for reply"
msgstr "oczekiwanie na odpowiedź"
#. Translators: the default filename to save to
#: tuhi/gui/drawing.py:121
msgid "untitled.svg"
msgstr "bez tytułu.svg"
#. Translators: filter name to show all/any files
#: tuhi/gui/drawing.py:125
msgid "Any files"
msgstr "Wszystkie pliki"
#. Translators: filter to show svg files only
#: tuhi/gui/drawing.py:129
msgid "SVG files"
msgstr "Pliki SVG"
#. Translators: filter to show png files only
#: tuhi/gui/drawing.py:133
msgid "PNG files"
msgstr "Pliki PNG"
#: tuhi/gui/window.py:68
#, python-brace-format
msgid "Connecting to {device.name}"
msgstr "Łączenie z urządzeniem {device.name}"

190
po/tr.po
View File

@ -1,190 +0,0 @@
# Turkish translation for tuhi.
# Copyright © 2019 the tuhi authors.
# This file is distributed under the same license as the tuhi package.
# Gündüzhan Gündüz <gunduzhan@gmail.com>, 2021.
#
msgid ""
msgstr ""
"Project-Id-Version: tuhi\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2022-04-28 09:34+1000\n"
"PO-Revision-Date: \n"
"Last-Translator: Gündüzhan Gündüz <gunduzhan@gmail.com>\n"
"Language-Team: Turkish <gnome-turk@gnome.org>\n"
"Language: tr\n"
"MIME-Version: 1.0\n"
"Content-Type: text/plain; charset=UTF-8\n"
"Content-Transfer-Encoding: 8bit\n"
"Plural-Forms: nplurals=3; plural=(n==1 ? 0 : n%10>=2 && n%10<=4 && (n%100<10 "
"|| n%100>=20) ? 1 : 2);\n"
"X-Generator: Poedit 2.4.1\n"
#: data/org.freedesktop.Tuhi.appdata.xml.in.in:7
#: data/org.freedesktop.Tuhi.desktop.in:3
msgid "Tuhi"
msgstr "Tuhi"
#: data/org.freedesktop.Tuhi.appdata.xml.in.in:8
#: data/org.freedesktop.Tuhi.desktop.in:4
msgid "Utility to download drawings from the Wacom Ink range of devices"
msgstr "Wacom Ink cihazlarınızdan çizimlerinizi indirmek için bir araç"
#: data/org.freedesktop.Tuhi.appdata.xml.in.in:10
msgid ""
"Tuhi is a graphical user interface to download drawings stored on tablet "
"devices from the Wacom Ink range, e.g. Intuos Pro Paper or Bamboo Slate."
msgstr ""
"Tuhi, Wacom Ink serisi cihazlarda depolanan çizimlerinizi indirmenize olanak "
"sağlayan bir grafik kullanıcı arayüzüdür. Ör. Intuos Pro Paper veya Bamboo "
"Slate."
#: data/org.freedesktop.Tuhi.appdata.xml.in.in:15
#, fuzzy
msgid ""
"Tuhi requires Tuhi, the daemon to actually communicate with the devices. "
"ThiGui is merely a front end to Tuhi, Tuhi must be installed and running "
"when Tuhi is launched."
msgstr ""
"Tuhi cihazınız ile iletişim kurmak için arka plan programı Tuhi'ye ihtiyaç "
"duyar. ThiGui, Tuhi için sadece bir başlangıç aşamasıdır. Tuhi "
"başlatıldığında kurulmalı ve çalıştırılmalıdır."
#: data/org.freedesktop.Tuhi.appdata.xml.in.in:32
msgid "Tuhi's main window"
msgstr "Tuhi ana penceresi"
#: data/org.freedesktop.Tuhi.appdata.xml.in.in:36
msgid "Tuhi's main window (zoomed)"
msgstr "Tuhi ana penceresi (yakın)"
#. Translators: Search terms to find this application. Do NOT translate or localize the semicolons! The list MUST also end with a semicolon!
#: data/org.freedesktop.Tuhi.desktop.in:12
msgid "tablet;wacom;ink;"
msgstr "tablet;wacom;ink;"
#: data/ui/AboutDialog.ui.in:13
msgid "Visit Tuhis website"
msgstr "Tuhi'nin sitesini ziyaret edin"
#: data/ui/AppMenu.ui:5
msgid "Portrait"
msgstr "Portre"
#: data/ui/AppMenu.ui:10
msgid "Landscape"
msgstr "Yatay"
#: data/ui/AppMenu.ui:17
msgid "Help"
msgstr "Yardım"
#: data/ui/AppMenu.ui:21
msgid "About"
msgstr "Hakkında"
#: data/ui/DrawingPerspective.ui:68
msgid "Undo delete drawing"
msgstr "Silinen çizimi geri al"
#: data/ui/DrawingPerspective.ui:132
msgid "Press the button on the device to synchronize drawings"
msgstr "Çizimlerinizi senkron etmek için cihazdaki düğmeye basın"
#: data/ui/ErrorPerspective.ui:21
#, fuzzy
msgid ""
"TuhiGUI is an interactive GUI to download data from Tuhi.\n"
"\n"
"Tuhi connects to tablets of the Wacom Ink range. It allows you to download "
"the drawings stored on those devices as SVGs for processing later.\n"
"\n"
"Tuhi is a DBus server that needs to be running for the Tuhi GUI to connect "
"to it. Connecting to the DBus server should take less than a second. If you "
"read this far, your Tuhi DBus server is not running or responding and needs "
"to be restarted."
msgstr ""
"TuhiGUI, Tuhi'den çizimlerinizi indirmek için bir araçtır.\n"
"\n"
"Tuhi Wacom Ink serisi cihazlara bağlanır. Ayrıca cihazlarda depolanan "
"çizimlerinizi işlemek üzere SVG olarak indirmenizi sağlar.\n"
"\n"
"Tuhi, TuhiGUI'nin bağlanabilmesi için çalışan bir DBus sunucusudur. DBus "
"sunucusuna bağlanmak saniyeden çok daha kısa sürede gerçekleşir. Eğer buraya "
"kadar okuduysanız, Tuhi DBus sunucusu çalışmıyor, yanıt vermiyor ve yeniden "
"başlatılması gerekiyordur."
#: data/ui/ErrorPerspective.ui:69
msgid "Connecting to Tuhi"
msgstr "Tuhi'ye bağlanılıyor"
#: data/ui/ErrorPerspective.ui:96
msgid ""
"This should take less than a second. Make sure the Tuhi DBus server is "
"running."
msgstr ""
"Bir saniyeden daha kısa sürer. Tuhi DBus sunucusunun çalıştığından emin olun."
#: data/ui/MainWindow.ui:166
msgid "Authorization error while connecting to the device "
msgstr "Cihaza bağlanırken yetkilendirme hatası oluştu "
#: data/ui/MainWindow.ui:176
msgid "Register"
msgstr "Kayıt"
#: data/ui/SetupPerspective.ui:7
msgid "Initial Device Setup"
msgstr "İlk Cihaz Kurulumu"
#: data/ui/SetupPerspective.ui:30
msgid "Quit"
msgstr "Çıkış"
#: data/ui/SetupPerspective.ui:70
msgid "Hold the button on the device until the blue light is flashing."
msgstr "Mavı ışık yanıp sönmeye başlayana kadar düğmeye basılı tutun."
#: data/ui/SetupPerspective.ui:103
msgid "Searching for device"
msgstr "Cihaz aranıyor"
#: data/ui/SetupPerspective.ui:137
msgid "Connecting to LE Paper"
msgstr "LE Paper'a bağlanıyor"
#: data/ui/SetupPerspective.ui:170
msgid "Connecting to device..."
msgstr "Cihaza bağlanıyor ..."
#: data/ui/SetupPerspective.ui:206
msgid "Press the button on the device now!"
msgstr "Şimdi cihazdaki düğmeye basın!"
#: data/ui/SetupPerspective.ui:240
msgid "waiting for reply"
msgstr "yanıt bekleniyor"
#. Translators: the default filename to save to
#: tuhi/gui/drawing.py:121
msgid "untitled.svg"
msgstr "yenibelge.svg"
#. Translators: filter name to show all/any files
#: tuhi/gui/drawing.py:125
msgid "Any files"
msgstr "Herhangi bir dosya"
#. Translators: filter to show svg files only
#: tuhi/gui/drawing.py:129
msgid "SVG files"
msgstr "SVG dosyaları"
#. Translators: filter to show png files only
#: tuhi/gui/drawing.py:133
msgid "PNG files"
msgstr "PNG dosyaları"
#: tuhi/gui/window.py:68
#, python-brace-format
msgid "Connecting to {device.name}"
msgstr "{device.name} cihaz bağlantısı kuruluyor"

269
test/test_messages.py Executable file → Normal file
View File

@ -19,19 +19,19 @@
import calendar
import os
import pytest
import sys
import unittest
import time
sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)) + '/..') # noqa
from tuhi.protocol import * # noqa
from tuhi.protocol import *
SUCCESS = NordicData([0xb3, 0x1, 0x00])
class TestUtils(object):
class TestUtils(unittest.TestCase):
def test_hex_string(self):
values = [
([0x00, 0x12], '00 12'),
@ -44,12 +44,12 @@ class TestUtils(object):
]
for v in values:
assert as_hex_string(v[0]) == v[1]
self.assertEqual(as_hex_string(v[0]), v[1])
with pytest.raises(ValueError):
with self.assertRaises(ValueError):
as_hex_string(1)
with pytest.raises(ValueError):
with self.assertRaises(ValueError):
as_hex_string('0x00')
def test_protocol_version(self):
@ -64,14 +64,14 @@ class TestUtils(object):
]
for v in values:
assert ProtocolVersion.from_string(v[0]) == v[1]
self.assertEqual(ProtocolVersion.from_string(v[0]), v[1])
# No real reason we couldn't support those but right now they
# aren't, so let's test for it.
with pytest.raises(ValueError):
with self.assertRaises(ValueError):
ProtocolVersion.from_string('Slate')
with pytest.raises(ValueError):
with self.assertRaises(ValueError):
ProtocolVersion.from_string('IntuosPro')
def test_little_u16(self):
@ -81,12 +81,12 @@ class TestUtils(object):
]
for v in values:
assert little_u16(v[0]) == bytes(v[1])
assert little_u16(v[1]) == v[0]
self.assertEqual(little_u16(v[0]), bytes(v[1]))
self.assertEqual(little_u16(v[1]), v[0])
invalid = [0x10000, -1, [0x00, 0x00, 0x00]]
for v in invalid:
with pytest.raises(AssertionError):
with self.assertRaises(AssertionError):
little_u16(v)
def test_little_u32(self):
@ -98,20 +98,20 @@ class TestUtils(object):
]
for v in values:
assert little_u32(v[0]) == bytes(v[1])
assert little_u32(v[1]) == v[0]
self.assertEqual(little_u32(v[0]), bytes(v[1]))
self.assertEqual(little_u32(v[1]), v[0])
invalid = [0x100000000, -1, [0x00, 0x00, 0x00, 0x00, 0x00]]
for v in invalid:
with pytest.raises(AssertionError):
with self.assertRaises(AssertionError):
little_u32(v)
class TestProtocolAny(object):
class TestProtocolAny(unittest.TestCase):
protocol_version = ProtocolVersion.ANY
def test_get_protocol(self):
assert Protocol(self.protocol_version, callback=None) is not None
self.assertIsNotNone(Protocol(self.protocol_version, callback=None))
def test_has_all_messages(self):
p = Protocol(self.protocol_version, callback=None)
@ -136,47 +136,47 @@ class TestProtocolAny(object):
def test_connect(self, cb=None):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xe6
assert request.length == 6
self.assertEqual(request.opcode, 0xe6)
self.assertEqual(request.length, 6)
return SUCCESS
if cb is None:
cb = _cb
p = Protocol(self.protocol_version, callback=cb)
with pytest.raises(TypeError):
with self.assertRaises(TypeError):
p.execute(Interactions.CONNECT) # missing argument
uuid = 'abcdef123456'
msg = p.execute(Interactions.CONNECT, uuid)
assert msg.uuid == uuid
self.assertEqual(msg.uuid, uuid)
with pytest.raises(ValueError):
with self.assertRaises(ValueError):
p.execute(Interactions.CONNECT, 'too-long-an-id')
with pytest.raises(binascii.Error):
with self.assertRaises(binascii.Error):
uuid = 'uvwxyz123456'
p.execute(Interactions.CONNECT, uuid)
def test_get_name(self, cb=None, name='test dev name\x0a'):
def test_get_name(self, cb=None, name='test dev name'):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xbb
assert request.length == 1
assert request[0] == 0x00
self.assertEqual(request.opcode, 0xbb)
self.assertEqual(request.length, 1)
self.assertEqual(request[0], 0x00)
return NordicData([0xbc, len(name)] + list(bytes(name, encoding='ascii')))
cb = cb or _cb
p = Protocol(self.protocol_version, callback=cb)
msg = p.execute(Interactions.GET_NAME)
assert msg.name == name
self.assertEqual(msg.name, name)
def test_set_name(self, cb=None, name='test dev name'):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xbb
assert request.length == len(name) + 1
assert request[-1] == 0xa # spark needs a trailing linebreak
assert bytes(request[:-1]).decode('utf-8') == name
self.assertEqual(request.opcode, 0xbb)
self.assertEqual(request.length, len(name) + 1)
self.assertEqual(request[-1], 0xa) # spark needs a trailing linebreak
self.assertEqual(bytes(request[:-1]).decode('utf-8'), name)
return SUCCESS
cb = cb or _cb
@ -186,8 +186,8 @@ class TestProtocolAny(object):
def test_get_time(self, cb=None, ts=time.time()):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xb6
assert request.length == 1
self.assertEqual(request.opcode, 0xb6)
self.assertEqual(request.length, 1)
t = time.strftime('%y%m%d%H%M%S', time.gmtime(ts))
t = [int(i) for i in binascii.unhexlify(t)]
return NordicData([0xbd, len(t)] + t)
@ -196,15 +196,15 @@ class TestProtocolAny(object):
p = Protocol(self.protocol_version, callback=cb)
msg = p.execute(Interactions.GET_TIME)
assert msg.timestamp == int(ts)
self.assertEqual(msg.timestamp, int(ts))
def test_set_time(self, cb=None, ts=time.time()):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xb6
assert request.length == 6
self.assertEqual(request.opcode, 0xb6)
self.assertEqual(request.length, 6)
str_timestamp = ''.join([f'{b:02x}' for b in request])
t = calendar.timegm(time.strptime(str_timestamp, '%y%m%d%H%M%S'))
assert int(t) == int(ts)
self.assertEqual(int(t), int(ts))
return SUCCESS
cb = cb or _cb
@ -214,8 +214,8 @@ class TestProtocolAny(object):
def test_get_fw(self, cb=None, fw='abcdef-123456'):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xb7
assert request.length == 1
self.assertEqual(request.opcode, 0xb7)
self.assertEqual(request.length, 1)
data = [int(c, 16) for c in fw.split('-')[request[0]]]
return NordicData([0xb8, len(data) + 1, 0x00] + data)
@ -223,43 +223,43 @@ class TestProtocolAny(object):
p = Protocol(self.protocol_version, callback=cb)
msg = p.execute(Interactions.GET_FIRMWARE)
assert msg.firmware == fw
self.assertEqual(msg.firmware, fw)
def test_get_battery(self, cb=None, battery=(1, 78)):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xb9
assert request.length == 1
self.assertEqual(request.opcode, 0xb9)
self.assertEqual(request.length, 1)
return NordicData([0xba, 2, battery[1], battery[0]])
cb = cb or _cb
p = Protocol(self.protocol_version, callback=cb)
msg = p.execute(Interactions.GET_BATTERY)
assert msg.battery_is_charging == battery[0]
assert msg.battery_percent == battery[1]
self.assertEqual(msg.battery_is_charging, battery[0])
self.assertEqual(msg.battery_percent, battery[1])
def test_get_width(self, cb=None):
# this is hardcoded for the spark
p = Protocol(self.protocol_version, callback=None)
msg = p.execute(Interactions.GET_WIDTH)
assert msg.width == 21000
self.assertEqual(msg.width, 21000)
def test_get_height(self, cb=None):
# this is hardcoded for the spark
p = Protocol(self.protocol_version, callback=None)
msg = p.execute(Interactions.GET_HEIGHT)
assert msg.height == 14800
self.assertEqual(msg.height, 14800)
def test_get_point_size(self, cb=None):
# this is hardcoded for the spark
p = Protocol(self.protocol_version, callback=None)
msg = p.execute(Interactions.GET_POINT_SIZE)
assert msg.point_size == 10
self.assertEqual(msg.point_size, 10)
def test_unknown_e3(self, cb=None):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xe3
assert request.length == 1
self.assertEqual(request.opcode, 0xe3)
self.assertEqual(request.length, 1)
return SUCCESS
cb = cb or _cb
@ -269,9 +269,9 @@ class TestProtocolAny(object):
def test_filetransfer_reporting_type(self, cb=None):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xec
assert request.length == 6
assert request, [0x06, 0x00, 0x00, 0x00, 0x00 == 0x00]
self.assertEqual(request.opcode, 0xec)
self.assertEqual(request.length, 6)
self.assertEqual(request, [0x06, 0x00, 0x00, 0x00, 0x00, 0x00])
return SUCCESS
cb = cb or _cb
@ -284,9 +284,9 @@ class TestProtocolAny(object):
mode = Mode.LIVE
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xb1
assert request.length == 1
assert request[0] == mode
self.assertEqual(request.opcode, 0xb1)
self.assertEqual(request.length, 1)
self.assertEqual(request[0], mode)
return SUCCESS
cb = cb or _cb
@ -299,9 +299,9 @@ class TestProtocolAny(object):
# this is a weird double call, see the protocol
# We reply 0xc7 first, and then 0xcd
if request is not None:
assert request.opcode == 0xc5
assert request.length == 1
assert request[0] == 0x00
self.assertEqual(request.opcode, 0xc5)
self.assertEqual(request.length, 1)
self.assertEqual(request[0], 0x00)
data = list(count.to_bytes(4, byteorder='big'))
return NordicData([0xc7, len(data)] + data)
else:
@ -313,14 +313,14 @@ class TestProtocolAny(object):
p = Protocol(self.protocol_version, callback=cb)
msg = p.execute(Interactions.GET_STROKES)
assert msg.count == count
assert msg.timestamp == int(ts)
self.assertEqual(msg.count, count)
self.assertEqual(msg.timestamp, int(ts))
def test_available_files_count(self, cb=None, ndata=1234):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xc1
assert request.length == 1
assert request[0] == 0x00
self.assertEqual(request.opcode, 0xc1)
self.assertEqual(request.length, 1)
self.assertEqual(request[0], 0x00)
data = list(ndata.to_bytes(2, byteorder='big'))
return NordicData([0xc2, len(data)] + data)
@ -328,13 +328,13 @@ class TestProtocolAny(object):
p = Protocol(self.protocol_version, callback=cb)
msg = p.execute(Interactions.AVAILABLE_FILES_COUNT)
assert msg.count == ndata
self.assertEqual(msg.count, ndata)
def test_download_oldest_file(self, cb=None):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xc3
assert request.length == 1
assert request[0] == 0x00
self.assertEqual(request.opcode, 0xc3)
self.assertEqual(request.length, 1)
self.assertEqual(request[0], 0x00)
return NordicData([0xc8, 1, 0xbe])
cb = cb or _cb
@ -344,9 +344,9 @@ class TestProtocolAny(object):
def test_delete_oldest_file(self, cb=None):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xca
assert request.length == 1
assert request[0] == 0x00
self.assertEqual(request.opcode, 0xca)
self.assertEqual(request.length, 1)
self.assertEqual(request[0], 0x00)
# no reply
cb = cb or _cb
@ -356,9 +356,9 @@ class TestProtocolAny(object):
def test_register_complete(self, cb=None):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xe5
assert request.length == 1
assert request[0] == 0x00
self.assertEqual(request.opcode, 0xe5)
self.assertEqual(request.length, 1)
self.assertEqual(request[0], 0x00)
return SUCCESS
cb = cb or _cb
@ -368,35 +368,16 @@ class TestProtocolAny(object):
def test_register_press_button(self, cb=None, uuid=None):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xe3
assert request.length == 1
assert request[0] == 0x01
self.assertEqual(request.opcode, 0xe3)
self.assertEqual(request.length, 1)
self.assertEqual(request[0], 0x01)
# no reply
cb = cb or _cb
p = Protocol(self.protocol_version, callback=cb)
msg = p.execute(Interactions.REGISTER_PRESS_BUTTON, uuid=uuid)
assert msg.uuid == uuid
def test_error_invalid_state(self):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
return NordicData([0xb3, 0x1, 0x1])
p = Protocol(self.protocol_version, callback=_cb)
# a "random" collection of requests that we want to check for
with pytest.raises(DeviceError) as cm:
p.execute(Interactions.CONNECT, uuid='abcdef123456')
assert cm.value.errorcode == DeviceError.ErrorCode.GENERAL_ERROR
with pytest.raises(DeviceError) as cm:
p.execute(Interactions.GET_STROKES)
assert cm.value.errorcode == DeviceError.ErrorCode.GENERAL_ERROR
with pytest.raises(DeviceError) as cm:
p.execute(Interactions.SET_MODE, Mode.PAPER)
assert cm.value.errorcode == DeviceError.ErrorCode.GENERAL_ERROR
self.assertEqual(msg.uuid, uuid)
class TestProtocolSpark(TestProtocolAny):
@ -404,14 +385,14 @@ class TestProtocolSpark(TestProtocolAny):
def test_register_wait_for_button(self, cb=None):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request is None
self.assertIsNone(request)
return NordicData([0xe4, 0x00])
cb = cb or _cb
p = Protocol(self.protocol_version, callback=cb)
msg = p.execute(Interactions.REGISTER_WAIT_FOR_BUTTON)
assert msg.protocol_version == self.protocol_version
self.assertEqual(msg.protocol_version, self.protocol_version)
class TestProtocolSlate(TestProtocolSpark):
@ -419,9 +400,9 @@ class TestProtocolSlate(TestProtocolSpark):
def test_get_width(self, cb=None, width=1234):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xea
assert request.length == 2
assert request[0] == 3
self.assertEqual(request.opcode, 0xea)
self.assertEqual(request.length, 2)
self.assertEqual(request[0], 3)
data = [0x03, 0x00] + list(width.to_bytes(4, byteorder='little'))
return NordicData([0xeb, len(data)] + data)
@ -430,13 +411,13 @@ class TestProtocolSlate(TestProtocolSpark):
p = Protocol(self.protocol_version, callback=cb)
msg = p.execute(Interactions.GET_WIDTH)
assert msg.width == width
self.assertEqual(msg.width, width)
def test_get_height(self, cb=None, height=4321):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xea
assert request.length == 2
assert request[0] == 4
self.assertEqual(request.opcode, 0xea)
self.assertEqual(request.length, 2)
self.assertEqual(request[0], 4)
data = [0x04, 0x00] + list(height.to_bytes(4, byteorder='little'))
return NordicData([0xeb, len(data)] + data)
@ -445,13 +426,13 @@ class TestProtocolSlate(TestProtocolSpark):
p = Protocol(self.protocol_version, callback=cb)
msg = p.execute(Interactions.GET_HEIGHT)
assert msg.height == height
self.assertEqual(msg.height, height)
def test_get_strokes(self, cb=None, count=1024, ts=time.time()):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xcc
assert request.length == 1
assert request[0] == 0x00
self.assertEqual(request.opcode, 0xcc)
self.assertEqual(request.length, 1)
self.assertEqual(request[0], 0x00)
c = list(count.to_bytes(4, byteorder='little'))
t = time.strftime('%y%m%d%H%M%S', time.gmtime(ts))
t = [int(i) for i in binascii.unhexlify(t)]
@ -462,9 +443,9 @@ class TestProtocolSlate(TestProtocolSpark):
def test_available_files_count(self, cb=None, ndata=1234):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xc1
assert request.length == 1
assert request[0] == 0x00
self.assertEqual(request.opcode, 0xc1)
self.assertEqual(request.length, 1)
self.assertEqual(request[0], 0x00)
data = list(ndata.to_bytes(2, byteorder='little'))
return NordicData([0xc2, len(data)] + data)
@ -472,24 +453,24 @@ class TestProtocolSlate(TestProtocolSpark):
def test_delete_oldest_file(self, cb=None):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xca
assert request.length == 1
assert request[0] == 0x00
self.assertEqual(request.opcode, 0xca)
self.assertEqual(request.length, 1)
self.assertEqual(request[0], 0x00)
return SUCCESS
super().test_delete_oldest_file(cb or _cb)
def test_register_press_button(self, cb=None, uuid='abcdef123456'):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xe7
assert request.length == 6
self.assertEqual(request.opcode, 0xe7)
self.assertEqual(request.length, 6)
# no reply
super().test_register_press_button(cb or _cb, uuid)
def test_register_wait_for_button(self, cb=None):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request is None
self.assertIsNone(request)
return NordicData([0xe4, 0x00])
super().test_register_wait_for_button(cb or _cb)
@ -500,34 +481,34 @@ class TestProtocolIntuosPro(TestProtocolSlate):
def test_connect(self, cb=None):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xe6
assert request.length == 6
self.assertEqual(request.opcode, 0xe6)
self.assertEqual(request.length, 6)
return NordicData([0x50, 0x06] + request) # replies with the uuid
super().test_connect(cb or _cb)
def test_get_name(self, cb=None, name='test dev name'):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xdb
assert request.length == 1
assert request[0] == 0x00
self.assertEqual(request.opcode, 0xdb)
self.assertEqual(request.length, 1)
self.assertEqual(request[0], 0x00)
return NordicData([0xbc, len(name)] + list(bytes(name, encoding='ascii')))
super().test_get_name(cb or _cb, name=name)
def test_set_name(self, cb=None, name='test dev name'):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xdb
assert request.length == len(name)
assert bytes(request).decode('utf-8') == name
self.assertEqual(request.opcode, 0xdb)
self.assertEqual(request.length, len(name))
self.assertEqual(bytes(request).decode('utf-8'), name)
return SUCCESS
super().test_set_name(cb or _cb, name=name)
def test_get_time(self, cb=None, ts=time.time()):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xd6
assert request.length == 1
self.assertEqual(request.opcode, 0xd6)
self.assertEqual(request.length, 1)
t = list(int(ts).to_bytes(length=4, byteorder='little')) + [0x00, 0x00]
return NordicData([0xbd, len(t)] + t)
@ -535,18 +516,18 @@ class TestProtocolIntuosPro(TestProtocolSlate):
def test_set_time(self, cb=None, ts=time.time()):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xb6
assert request.length == 6
self.assertEqual(request.opcode, 0xb6)
self.assertEqual(request.length, 6)
t = int.from_bytes(request[0:4], byteorder='little')
assert int(t) == int(ts)
self.assertEqual(int(t), int(ts))
return SUCCESS
super().test_set_time(cb or _cb, ts=ts)
def test_get_fw(self, cb=None, fw='anything-string'):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xb7
assert request.length == 1
self.assertEqual(request.opcode, 0xb7)
self.assertEqual(request.length, 1)
data = bytes(fw.split('-')[request[0]].encode('utf8'))
return NordicData([0xb8, len(data) + 1, 0x00] + list(data))
@ -554,9 +535,9 @@ class TestProtocolIntuosPro(TestProtocolSlate):
def test_get_strokes(self, cb=None, count=1024, ts=time.time()):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xcc
assert request.length == 1
assert request[0] == 0x00
self.assertEqual(request.opcode, 0xcc)
self.assertEqual(request.length, 1)
self.assertEqual(request[0], 0x00)
c = list(count.to_bytes(4, byteorder='little'))
t = list(int(ts).to_bytes(4, byteorder='little'))
data = c + t
@ -566,16 +547,16 @@ class TestProtocolIntuosPro(TestProtocolSlate):
def test_register_wait_for_button(self, cb=None):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request is None
self.assertIsNone(request)
return NordicData([0x53, 0x00])
super().test_register_wait_for_button(cb or _cb)
def test_get_point_size(self, cb=None, pointsize=12):
def _cb(request, requires_reply=True, userdata=None, timeout=5):
assert request.opcode == 0xea
assert request.length == 2
assert request[0] == 0x14
self.assertEqual(request.opcode, 0xea)
self.assertEqual(request.length, 2)
self.assertEqual(request[0], 0x14)
ps = little_u32(pointsize)
return NordicData([0xeb, 6, 0x14, 0x00] + list(ps))
@ -583,4 +564,8 @@ class TestProtocolIntuosPro(TestProtocolSlate):
p = Protocol(self.protocol_version, callback=cb)
msg = p.execute(Interactions.GET_POINT_SIZE)
assert msg.point_size == pointsize - 1
self.assertEqual(msg.point_size, pointsize - 1)
if __name__ == "__main__":
unittest.main(sys.argv[1:])

View File

@ -18,8 +18,8 @@
#
import os
import pytest
import sys
import unittest
import xdg.BaseDirectory
from pathlib import Path
import yaml
@ -27,54 +27,39 @@ import logging
sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)) + '/..') # noqa
from tuhi.protocol import * # noqa
from tuhi.util import flatten # noqa
from tuhi.protocol import *
from tuhi.util import flatten
logger = logging.getLogger('tuhi') # piggyback the debug messages
logger.setLevel(logging.DEBUG)
def pytest_generate_tests(metafunc):
# for any test function that takes a "logfile" argument return the list
# of all current logfiles in XDG_DATA_HOME/tuhi
# This means the test gets better the more logfiles are present on the
# user's machine.
if 'logfile' in metafunc.fixturenames:
basedir = Path(xdg.BaseDirectory.xdg_data_home) / 'tuhi'
def loads_and_has_data(filename):
with open(filename) as fd:
try:
yml = yaml.load(fd, Loader=yaml.Loader)
return yml is not None
except Exception as e:
logger.error(f'Exception triggered by file {filename}')
raise e
logfiles = [f for f in basedir.glob('**/raw/log-*.yaml') if loads_and_has_data(f)]
metafunc.parametrize('logfile', logfiles)
def test_log_files(logfile):
def load_pen_data(filename):
class TestLogFiles(unittest.TestCase):
'''
Special test class that loads a yaml file created by tuhi compiles
the StrokeData from it. This class autogenerates its own tests, see
the main() handling.
'''
def load_pen_data(self, filename):
with open(filename) as fd:
yml = yaml.load(fd, Loader=yaml.Loader)
# all recv lists that have source PEN
pendata = [d['recv'] for d in yml['data'] if 'recv' in d and 'source' in d and d['source'] == 'PEN']
return list(flatten(pendata))
data = load_pen_data(logfile)
if not data: # Recordings without Pen data can be skipped
pytest.skip('Recording without pen data')
StrokeFile(data)
def _test_file(self, fname):
data = self.load_pen_data(fname)
if not data: # Recordings without Pen data can be skipped
raise unittest.SkipTest()
StrokeFile(data)
class TestStrokeParsers(object):
class TestStrokeParsers(unittest.TestCase):
def test_identify_file_header(self):
data = [0x67, 0x82, 0x69, 0x65]
assert StrokeDataType.identify(data) == StrokeDataType.FILE_HEADER
self.assertEqual(StrokeDataType.identify(data), StrokeDataType.FILE_HEADER)
data = [0x62, 0x38, 0x62, 0x74]
assert StrokeDataType.identify(data) == StrokeDataType.FILE_HEADER
self.assertEqual(StrokeDataType.identify(data), StrokeDataType.FILE_HEADER)
others = [
# with header
@ -92,54 +77,54 @@ class TestStrokeParsers(object):
[0x62, 0x38, 0x62, 0x73],
]
for data in others:
assert StrokeDataType.identify(data) != StrokeDataType.FILE_HEADER, data
self.assertNotEqual(StrokeDataType.identify(data), StrokeDataType.FILE_HEADER, msg=data)
def test_identify_stroke_header(self):
data = [0xff, 0xfa] # two bytes are enough to identify
assert StrokeDataType.identify(data) == StrokeDataType.STROKE_HEADER
self.assertEqual(StrokeDataType.identify(data), StrokeDataType.STROKE_HEADER)
data = [0x3, 0xfa] # lowest bits set, not a correct packet but identify doesn't care
assert StrokeDataType.identify(data) == StrokeDataType.STROKE_HEADER
self.assertEqual(StrokeDataType.identify(data), StrokeDataType.STROKE_HEADER)
data = [0xfc, 0xfa] # lowest bits unset, must be something else
assert StrokeDataType.identify(data) != StrokeDataType.STROKE_HEADER
self.assertNotEqual(StrokeDataType.identify(data), StrokeDataType.STROKE_HEADER)
def test_identify_stroke_point(self):
data = [0xff, 0xff, 0xff] # three bytes are enough to identify
assert StrokeDataType.identify(data) == StrokeDataType.POINT
self.assertEqual(StrokeDataType.identify(data), StrokeDataType.POINT)
data = [0xff, 0xff, 0xff, 1, 2, 3, 4, 5, 6]
assert StrokeDataType.identify(data) == StrokeDataType.POINT
self.assertEqual(StrokeDataType.identify(data), StrokeDataType.POINT)
# wrong header, but observed in the wild
data = [0xbf, 0xff, 0xff, 1, 2, 3, 4, 5, 6]
assert StrokeDataType.identify(data) == StrokeDataType.POINT
self.assertEqual(StrokeDataType.identify(data), StrokeDataType.POINT)
data = [0xfc, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff] # stroke end
assert StrokeDataType.identify(data) != StrokeDataType.POINT
self.assertNotEqual(StrokeDataType.identify(data), StrokeDataType.POINT)
data = [0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff] # EOF
assert StrokeDataType.identify(data) != StrokeDataType.POINT
self.assertNotEqual(StrokeDataType.identify(data), StrokeDataType.POINT)
def test_identify_stroke_lost_point(self):
data = [0xff, 0xdd, 0xdd]
assert StrokeDataType.identify(data) == StrokeDataType.LOST_POINT
self.assertEqual(StrokeDataType.identify(data), StrokeDataType.LOST_POINT)
def test_identify_eof(self):
data = [0xff] * 9
assert StrokeDataType.identify(data) == StrokeDataType.EOF
self.assertEqual(StrokeDataType.identify(data), StrokeDataType.EOF)
def test_identify_stroke_end(self):
data = [0xfc, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff]
assert StrokeDataType.identify(data) == StrokeDataType.STROKE_END
self.assertEqual(StrokeDataType.identify(data), StrokeDataType.STROKE_END)
def test_identify_delta(self):
for i in range(256):
data = [i, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6]
if i & 0x3 == 0:
assert StrokeDataType.identify(data), StrokeDataType.DELTA == f'packet: {data}'
self.assertEqual(StrokeDataType.identify(data), StrokeDataType.DELTA, f'packet: {data}')
else:
assert StrokeDataType.identify(data), StrokeDataType.DELTA != f'packet: {data}'
self.assertNotEqual(StrokeDataType.identify(data), StrokeDataType.DELTA, f'packet: {data}')
def test_parse_stroke_header(self):
F_NEW_LAYER = 0x40
@ -149,81 +134,81 @@ class TestStrokeParsers(object):
data = [0xff, 0xfa, flags, 0x1f, 0x73, 0x53, 0x5d, 0x2e, 0x01]
packet = StrokeHeader(data)
assert packet.size == 9
assert packet.is_new_layer == 1
assert packet.pen_id == 0
assert packet.pen_type == pen_type
assert packet.timestamp == 1565750047
self.assertEqual(packet.size, 9)
self.assertEqual(packet.is_new_layer, 1)
self.assertEqual(packet.pen_id, 0)
self.assertEqual(packet.pen_type, pen_type)
self.assertEqual(packet.timestamp, 1565750047)
# new layer off
flags = pen_type
data = [0xff, 0xfa, flags, 0x1f, 0x73, 0x53, 0x5d, 0x2e, 0x01]
packet = StrokeHeader(data)
assert packet.size == 9
assert packet.is_new_layer == 0
assert packet.pen_id == 0
assert packet.pen_type == pen_type
assert packet.timestamp == 1565750047
self.assertEqual(packet.size, 9)
self.assertEqual(packet.is_new_layer, 0)
self.assertEqual(packet.pen_id, 0)
self.assertEqual(packet.pen_type, pen_type)
self.assertEqual(packet.timestamp, 1565750047)
# pen type change
pen_type = 1
flags = F_NEW_LAYER | pen_type
data = [0xff, 0xfa, flags, 0x1f, 0x73, 0x53, 0x5d, 0x2e, 0x01]
packet = StrokeHeader(data)
assert packet.size == 9
assert packet.is_new_layer == 1
assert packet.pen_id == 0
assert packet.pen_type == pen_type
assert packet.timestamp == 1565750047
self.assertEqual(packet.size, 9)
self.assertEqual(packet.is_new_layer, 1)
self.assertEqual(packet.pen_id, 0)
self.assertEqual(packet.pen_type, pen_type)
self.assertEqual(packet.timestamp, 1565750047)
# with pen id
flags = F_NEW_LAYER | F_PEN_ID | pen_type
pen_id = [0xff, 0x0a, 0x87, 0x75, 0x80, 0x28, 0x42, 0x00, 0x10]
data = [0xff, 0xfa, flags, 0x1f, 0x73, 0x53, 0x5d, 0x2e, 0x01] + pen_id
packet = StrokeHeader(data)
assert packet.size == 18
assert packet.is_new_layer == 1
assert packet.pen_id == 0x100042288075870a
assert packet.pen_type == pen_type
assert packet.timestamp == 1565750047
self.assertEqual(packet.size, 18)
self.assertEqual(packet.is_new_layer, 1)
self.assertEqual(packet.pen_id, 0x100042288075870a)
self.assertEqual(packet.pen_type, pen_type)
self.assertEqual(packet.timestamp, 1565750047)
def test_parse_stroke_point(self):
# 0xff means 2 bytes each for abs coords
data = [0xff, 0xff, 0xff, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6]
packet = StrokePoint(data)
assert packet.size == 9
assert packet.x == 0x0201
assert packet.y == 0x0403
assert packet.p == 0x0605
assert packet.dx is None
assert packet.dy is None
assert packet.dp is None
self.assertEqual(packet.size, 9)
self.assertEqual(packet.x, 0x0201)
self.assertEqual(packet.y, 0x0403)
self.assertEqual(packet.p, 0x0605)
self.assertIsNone(packet.dx)
self.assertIsNone(packet.dy)
self.assertIsNone(packet.dp)
# 0xbf means: 1 byte for pressure delta, i.e. the 0x6 is skipped
data = [0xbf, 0xff, 0xff, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6]
packet = StrokePoint(data)
assert packet.size == 8
assert packet.x == 0x0201
assert packet.y == 0x0403
assert packet.p is None
assert packet.dx is None
assert packet.dy is None
assert packet.dp == 0x5
self.assertEqual(packet.size, 8)
self.assertEqual(packet.x, 0x0201)
self.assertEqual(packet.y, 0x0403)
self.assertIsNone(packet.p)
self.assertIsNone(packet.dx)
self.assertIsNone(packet.dy)
self.assertEqual(packet.dp, 0x5)
def test_parse_lost_point(self):
data = [0xff, 0xdd, 0xdd, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6]
packet = StrokeLostPoint(data)
assert packet.size == 9
assert packet.nlost == 0x0201
self.assertEqual(packet.size, 9)
self.assertEqual(packet.nlost, 0x0201)
def test_parse_eof(self):
data = [0xff] * 9
packet = StrokeEOF(data)
assert packet.size == 9
self.assertEqual(packet.size, 9)
data = [0xfc] + [0xff] * 6
packet = StrokeEOF(data)
assert packet.size == 7
self.assertEqual(packet.size, 7)
def test_parse_delta(self):
x_delta = 0b00001000 # noqa
@ -236,93 +221,93 @@ class TestStrokeParsers(object):
flags = x_delta
data = [flags, 1]
packet = StrokeDelta(data)
assert packet.size == len(data)
assert packet.dx == 1
assert packet.dy is None
assert packet.dp is None
assert packet.x is None
assert packet.y is None
assert packet.p is None
self.assertEqual(packet.size, len(data))
self.assertEqual(packet.dx, 1)
self.assertIsNone(packet.dy)
self.assertIsNone(packet.dp)
self.assertIsNone(packet.x)
self.assertIsNone(packet.y)
self.assertIsNone(packet.p)
flags = y_delta
data = [flags, 2]
packet = StrokeDelta(data)
assert packet.size == len(data)
assert packet.dx is None
assert packet.dy == 2
assert packet.dp is None
assert packet.x is None
assert packet.y is None
assert packet.p is None
self.assertEqual(packet.size, len(data))
self.assertIsNone(packet.dx)
self.assertEqual(packet.dy, 2)
self.assertIsNone(packet.dp)
self.assertIsNone(packet.x)
self.assertIsNone(packet.y)
self.assertIsNone(packet.p)
flags = p_delta
data = [flags, 3]
packet = StrokeDelta(data)
assert packet.size == len(data)
assert packet.dx is None
assert packet.dy is None
assert packet.dp == 3
assert packet.x is None
assert packet.y is None
assert packet.p is None
self.assertEqual(packet.size, len(data))
self.assertIsNone(packet.dx)
self.assertIsNone(packet.dy)
self.assertEqual(packet.dp, 3)
self.assertIsNone(packet.x)
self.assertIsNone(packet.y)
self.assertIsNone(packet.p)
flags = x_delta | p_delta
data = [flags, 3, 5]
packet = StrokeDelta(data)
assert packet.size == len(data)
assert packet.dx == 3
assert packet.dy is None
assert packet.dp == 5
assert packet.x is None
assert packet.y is None
assert packet.p is None
self.assertEqual(packet.size, len(data))
self.assertEqual(packet.dx, 3)
self.assertIsNone(packet.dy)
self.assertEqual(packet.dp, 5)
self.assertIsNone(packet.x)
self.assertIsNone(packet.y)
self.assertIsNone(packet.p)
flags = x_delta | y_delta | p_delta
data = [flags, 3, 5, 7]
packet = StrokeDelta(data)
assert packet.size == len(data)
assert packet.dx == 3
assert packet.dy == 5
assert packet.dp == 7
assert packet.x is None
assert packet.y is None
assert packet.p is None
self.assertEqual(packet.size, len(data))
self.assertEqual(packet.dx, 3)
self.assertEqual(packet.dy, 5)
self.assertEqual(packet.dp, 7)
self.assertIsNone(packet.x)
self.assertIsNone(packet.y)
self.assertIsNone(packet.p)
flags = x_abs | y_abs | p_abs
data = [flags, 1, 2, 3, 4, 5, 6]
packet = StrokeDelta(data)
assert packet.size == len(data)
assert packet.x == 0x0201
assert packet.y == 0x0403
assert packet.p == 0x0605
assert packet.dx is None
assert packet.dy is None
assert packet.dp is None
self.assertEqual(packet.size, len(data))
self.assertEqual(packet.x, 0x0201)
self.assertEqual(packet.y, 0x0403)
self.assertEqual(packet.p, 0x0605)
self.assertIsNone(packet.dx)
self.assertIsNone(packet.dy)
self.assertIsNone(packet.dp)
flags = y_abs
data = [flags, 2, 3]
packet = StrokeDelta(data)
assert packet.size == len(data)
assert packet.x is None
assert packet.y == 0x0302
assert packet.p is None
assert packet.dx is None
assert packet.dy is None
assert packet.dp is None
self.assertEqual(packet.size, len(data))
self.assertIsNone(packet.x)
self.assertEqual(packet.y, 0x0302)
self.assertIsNone(packet.p)
self.assertIsNone(packet.dx)
self.assertIsNone(packet.dy)
self.assertIsNone(packet.dp)
flags = x_abs | y_delta | p_delta
data = [flags, 2, 3, 4, 5]
packet = StrokeDelta(data)
assert packet.size == len(data)
assert packet.x == 0x0302
assert packet.y is None
assert packet.p is None
assert packet.dx is None
assert packet.dy == 4
assert packet.dp == 5
self.assertEqual(packet.size, len(data))
self.assertEqual(packet.x, 0x0302)
self.assertIsNone(packet.y)
self.assertIsNone(packet.p)
self.assertIsNone(packet.dx)
self.assertEqual(packet.dy, 4)
self.assertEqual(packet.dp, 5)
class TestStrokes(object):
class TestStrokes(unittest.TestCase):
def test_single_stroke(self):
data = '''
67 82 69 65 22 73 53 5d 00 00 02 00 00 00 00 00 ff fa c3 1f
@ -443,3 +428,39 @@ class TestStrokes(object):
p = Protocol(ProtocolVersion.INTUOS_PRO, None, None)
p.parse_pen_data(b)
# How does this work?
# The test generater spits out a simple test function that just calls the
# real test function
#
# Then we search for all yaml files with logs we have and generate a unique
# test name (based on the timestamp) for that file. Result: we have
# a unittests function for each log file found in the directory.
def generator(logfile):
def test(self):
self._test_file(logfile)
return test
def search_for_tests():
basedir = Path(xdg.BaseDirectory.xdg_data_home) / 'tuhi'
for logfile in basedir.glob('**/raw/log-*.yaml'):
with open(logfile) as fd:
try:
yml = yaml.load(fd, Loader=yaml.Loader)
if not yml:
continue
timestamp = yml['time']
test_name = f'test_log_{timestamp}'
test = generator(logfile)
setattr(TestLogFiles, test_name, test)
except Exception as e:
logger.error(f'Exception triggered by file {logfile}')
raise e
search_for_tests()
if __name__ == '__main__':
unittest.main()

View File

@ -1,49 +0,0 @@
#!/usr/bin/env python3
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
from pathlib import Path
import argparse
import json
import os
import sys
# This tool isn't installed, so we can assume that the tuhi module is always
# in the parent directory
sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)) + '/..') # noqa
from tuhi.export import JsonSvg, JsonPng
parser = argparse.ArgumentParser(description='Converter tool from Tuhi JSON files to SVG or PNG.')
parser.add_argument('filename', help='The JSON file to export ($HOME/.local/share/tuhi/*.json)')
parser.add_argument('--format',
help='The format to generate. Default: svg',
default='svg',
choices=['svg', 'png'])
parser.add_argument('--output',
type=str,
help='The output file name. Default: "$PWD/inputfile.suffix"',
default=None)
parser.add_argument('--orientation',
help='The orientation of the image',
default='landscape',
choices=['landscape', 'portrait', 'reverse-landscape', 'reverse-portrait'])
ns = parser.parse_args()
if ns.output is None:
ns.output = f"{Path(ns.filename).stem}.{ns.format}"
js = json.load(open(ns.filename))
if ns.format == 'svg':
JsonSvg(js, ns.orientation, ns.output)
elif ns.format == 'png':
JsonPng(js, ns.orientation, ns.output)

View File

@ -20,6 +20,7 @@ import errno
import os
import json
import logging
import re
import readline
import struct
import threading
@ -29,15 +30,13 @@ import configparser
from pathlib import Path
try:
from tuhi.export import JsonSvg, JsonPng
import tuhi.dbusclient
from tuhi.svg import JsonSvg
except ModuleNotFoundError:
# If PYTHONPATH isn't set up or we never installed Tuhi, the module
# isn't available. And since we don't install kete, we can assume that
# we're still in the git repo, so messing with the path is "fine".
sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)) + '/..') # noqa
from tuhi.export import JsonSvg, JsonPng
import tuhi.dbusclient
from tuhi.svg import JsonSvg
CONFIG_PATH = Path(xdg.BaseDirectory.xdg_data_home, 'tuhi-kete')
@ -99,6 +98,13 @@ logger = logging.getLogger('tuhi-kete')
logger.addHandler(logger_handler)
logger.setLevel(logging.INFO)
TUHI_DBUS_NAME = 'org.freedesktop.tuhi1'
ORG_FREEDESKTOP_TUHI1_MANAGER = 'org.freedesktop.tuhi1.Manager'
ORG_FREEDESKTOP_TUHI1_DEVICE = 'org.freedesktop.tuhi1.Device'
ROOT_PATH = '/org/freedesktop/tuhi1'
ORG_BLUEZ_DEVICE1 = 'org.bluez.Device1'
# remove ':' from the completer delimiters of readline so we can match on
# device addresses
completer_delims = readline.get_completer_delims()
@ -112,52 +118,340 @@ def b2hex(bs):
return ' '.join([''.join(s) for s in zip(hx[::2], hx[1::2])])
class TuhiKeteManager(tuhi.dbusclient.TuhiDBusClientManager):
def __init__(self):
super().__init__()
self.connect('unregistered_device', self._on_unregistered_device)
class DBusError(Exception):
def __init__(self, message):
self.message = message
self.sigs = {}
for d in self.devices:
self.sigs[d] = []
self._connect_device(d)
def _disconnect_device_signals(self, device):
class _DBusObject(GObject.Object):
_connection = None
def __init__(self, name, interface, objpath):
GObject.GObject.__init__(self)
if _DBusObject._connection is None:
self._connect_to_session()
self.interface = interface
self.objpath = objpath
try:
for s in self.sigs[device]:
device.disconnect(s)
self.sigs[device] = []
except KeyError:
pass
def _on_unregistered_device(self, manager, device):
self._disconnect_device_signals(device)
def log_press_required(device):
logger.info(f'{device}: Press button on device now')
device.connect('button-press-required', log_press_required)
def log_registered(device):
logger.info(f'{device}: Registration successful')
device.connect('registered', log_registered)
device.connect('registered', self._connect_device)
def _connect_device(self, device):
self._disconnect_device_signals(device)
def log_sync_state(device, pspec):
if device.sync_state:
logger.debug(f'{device}: Communicating with device')
self.proxy = Gio.DBusProxy.new_sync(self._connection,
Gio.DBusProxyFlags.NONE, None,
name, objpath, interface, None)
except GLib.Error as e:
if (e.domain == 'g-io-error-quark' and
e.code == Gio.IOErrorEnum.DBUS_ERROR):
raise DBusError(e.message)
else:
logger.debug(f'{device}: Communication complete')
device.connect('notify::sync-state', log_sync_state)
raise e
def log_device_error(d, err):
if self.proxy.get_name_owner() is None:
raise DBusError(f'No-one is handling {name}, is the daemon running?')
self.proxy.connect('g-properties-changed', self._on_properties_changed)
self.proxy.connect('g-signal', self._on_signal_received)
def _connect_to_session(self):
try:
_DBusObject._connection = Gio.bus_get_sync(Gio.BusType.SESSION, None)
except GLib.Error as e:
if (e.domain == 'g-io-error-quark' and
e.code == Gio.IOErrorEnum.DBUS_ERROR):
raise DBusError(e.message)
else:
raise e
def _on_properties_changed(self, proxy, changed_props, invalidated_props):
# Implement this in derived classes to respond to property changes
pass
def _on_signal_received(self, proxy, sender, signal, parameters):
# Implement this in derived classes to respond to signals
pass
def property(self, name):
p = self.proxy.get_cached_property(name)
if p is not None:
return p.unpack()
return p
def terminate(self):
del(self.proxy)
class _DBusSystemObject(_DBusObject):
'''
Same as the _DBusObject, but connects to the system bus instead
'''
def __init__(self, name, interface, objpath):
self._connect_to_system()
super().__init__(name, interface, objpath)
def _connect_to_system(self):
try:
self._connection = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)
except GLib.Error as e:
if (e.domain == 'g-io-error-quark' and
e.code == Gio.IOErrorEnum.DBUS_ERROR):
raise DBusError(e.message)
else:
raise e
class BlueZDevice(_DBusSystemObject):
def __init__(self, objpath):
super().__init__('org.bluez', ORG_BLUEZ_DEVICE1, objpath)
self.proxy.connect('g-properties-changed', self._on_properties_changed)
@GObject.Property
def connected(self):
return self.proxy.get_cached_property('Connected').unpack()
def _on_properties_changed(self, obj, properties, invalidated_properties):
properties = properties.unpack()
if 'Connected' in properties:
self.notify('connected')
class TuhiKeteDevice(_DBusObject):
def __init__(self, manager, objpath):
_DBusObject.__init__(self, TUHI_DBUS_NAME,
ORG_FREEDESKTOP_TUHI1_DEVICE,
objpath)
self.manager = manager
self.is_registering = False
self.live = False
self._bluez_device = BlueZDevice(self.property('BlueZDevice'))
self._bluez_device.connect('notify::connected', self._on_connected)
@classmethod
def is_device_address(cls, string):
if re.match(r'[0-9a-f]{2}(:[0-9a-f]{2}){5}$', string.lower()):
return string
raise argparse.ArgumentTypeError(f'"{string}" is not a valid device address')
@GObject.Property
def address(self):
return self._bluez_device.property('Address')
@GObject.Property
def name(self):
return self._bluez_device.property('Name')
@GObject.Property
def listening(self):
return self.property('Listening')
@GObject.Property
def drawings_available(self):
return self.property('DrawingsAvailable')
@GObject.Property
def battery_percent(self):
return self.property('BatteryPercent')
@GObject.Property
def battery_state(self):
return self.property('BatteryState')
@GObject.Property
def connected(self):
return self._bluez_device.connected
def _on_connected(self, bluez_device, pspec):
self.notify('connected')
def register(self):
logger.debug(f'{self}: Register')
# FIXME: Register() doesn't return anything useful yet, so we wait until
# the device is in the Manager's Devices property
self.s1 = self.manager.connect('notify::devices', self._on_mgr_devices_updated)
self.is_registering = True
self.proxy.Register()
def start_listening(self):
self.proxy.StartListening()
def stop_listening(self):
try:
self.proxy.StopListening()
except GLib.Error as e:
if (e.domain != 'g-dbus-error-quark' or
e.code != Gio.IOErrorEnum.EXISTS or
Gio.dbus_error_get_remote_error(e) != 'org.freedesktop.DBus.Error.ServiceUnknown'):
raise e
def start_live(self, fd):
fd_list = Gio.UnixFDList.new()
fd_list.append(fd)
res, fds = self.proxy.call_with_unix_fd_list_sync('org.freedesktop.tuhi1.Device.StartLive',
GLib.Variant('(h)', (fd,)),
Gio.DBusCallFlags.NO_AUTO_START,
-1,
fd_list,
None)
if res[0] == 0:
self.live = True
def stop_live(self):
self.proxy.StopLive()
self.live = False
def json(self, timestamp):
SUPPORTED_FILE_FORMAT = 1
return self.proxy.GetJSONData('(ut)', SUPPORTED_FILE_FORMAT, timestamp)
def _on_signal_received(self, proxy, sender, signal, parameters):
if signal == 'ButtonPressRequired':
logger.info(f'{self}: Press button on device now')
elif signal == 'ListeningStopped':
err = parameters[0]
if err == -errno.EACCES:
logger.error(f'{device}: wrong device, please re-register.')
logger.error(f'{self}: wrong device, please re-register.')
elif err < 0:
logger.error(f'{device}: an error occured: {os.strerror(-err)}')
device.connect('device-error', log_device_error)
logger.error(f'{self}: an error occured: {os.strerror(-err)}')
self.notify('listening')
elif signal == 'SyncState':
state = parameters[0]
if state:
logger.debug(f'{self}: Downloading from device')
else:
logger.debug(f'{self}: Download done')
def _on_properties_changed(self, proxy, changed_props, invalidated_props):
if changed_props is None:
return
changed_props = changed_props.unpack()
if 'DrawingsAvailable' in changed_props:
self.notify('drawings-available')
elif 'Listening' in changed_props:
self.notify('listening')
elif 'BatteryPercent' in changed_props:
self.notify('battery-percent')
elif 'BatteryState' in changed_props:
self.notify('battery-state')
def __repr__(self):
return f'{self.address} - {self.name}'
def _on_mgr_devices_updated(self, manager, pspec):
if not self.is_registering:
return
for d in manager.devices:
if d.address == self.address:
self.is_registering = False
self.manager.disconnect(self.s1)
del(self.s1)
logger.info(f'{self}: Registration successful')
def terminate(self):
try:
self.manager.disconnect(self.s1)
except AttributeError:
pass
self._bluez_device.terminate()
super(TuhiKeteDevice, self).terminate()
class TuhiKeteManager(_DBusObject):
__gsignals__ = {
'unregistered-device':
(GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_PYOBJECT,)),
}
def __init__(self):
_DBusObject.__init__(self, TUHI_DBUS_NAME,
ORG_FREEDESKTOP_TUHI1_MANAGER,
ROOT_PATH)
self._devices = {}
self._unregistered_devices = {}
for objpath in self.property('Devices'):
device = TuhiKeteDevice(self, objpath)
self._devices[device.address] = device
@GObject.Property
def devices(self):
return [v for k, v in self._devices.items()]
@GObject.Property
def unregistered_devices(self):
return [v for k, v in self._unregistered_devices.items()]
@GObject.Property
def searching(self):
return self.proxy.get_cached_property('Searching')
def start_search(self):
self._unregistered_devices = {}
self.proxy.StartSearch()
def stop_search(self):
try:
self.proxy.StopSearch()
except GLib.Error as e:
if (e.domain != 'g-dbus-error-quark' or
e.code != Gio.IOErrorEnum.EXISTS or
Gio.dbus_error_get_remote_error(e) != 'org.freedesktop.DBus.Error.ServiceUnknown'):
raise e
self._unregistered_devices = {}
def terminate(self):
for dev in self._devices.values():
dev.terminate()
self._devices = {}
self._unregistered_devices = {}
super(TuhiKeteManager, self).terminate()
def _on_properties_changed(self, proxy, changed_props, invalidated_props):
if changed_props is None:
return
changed_props = changed_props.unpack()
if 'Devices' in changed_props:
objpaths = changed_props['Devices']
for objpath in objpaths:
try:
d = self._unregistered_devices[objpath]
self._devices[d.address] = d
del self._unregistered_devices[objpath]
except KeyError:
# if we called Register() on an existing device it's not
# in unregistered devices
pass
self.notify('devices')
if 'Searching' in changed_props:
self.notify('searching')
def _handle_unregistered_device(self, objpath):
for addr, dev in self._devices.items():
if dev.objpath == objpath:
self.emit('unregistered-device', dev)
return
device = TuhiKeteDevice(self, objpath)
self._unregistered_devices[objpath] = device
logger.debug(f'New unregistered device: {device}')
self.emit('unregistered-device', device)
def _on_signal_received(self, proxy, sender, signal, parameters):
if signal == 'SearchStopped':
self.notify('searching')
elif signal == 'UnregisteredDevice':
objpath = parameters[0]
self._handle_unregistered_device(objpath)
def __getitem__(self, btaddr):
return self._devices[btaddr]
class Worker(GObject.Object):
@ -208,7 +502,7 @@ class Searcher(Worker):
logger.error('Another client is already searching')
return
logger.debug('Starting searching')
logger.debug(f'Starting searching')
self.manager.start_search()
def stop(self):
@ -289,7 +583,6 @@ class Fetcher(Worker):
super(Fetcher, self).__init__(manager)
self.device = None
self.timestamps = None
self.format = args.format
address = args.address
index = args.index
@ -327,12 +620,8 @@ class Fetcher(Worker):
data = json.loads(jsondata)
t = time.localtime(data['timestamp'])
t = time.strftime('%Y-%m-%d-%H-%M', t)
if self.format == 'png':
path = f'{data["devicename"]}-{t}.png'
JsonPng(data, self.orientation, filename=path)
else:
path = f'{data["devicename"]}-{t}.svg'
JsonSvg(data, self.orientation, filename=path)
path = f'{data["devicename"]}-{t}.svg'
JsonSvg(data, self.orientation, filename=path)
logger.info(f'{data["devicename"]}: saved file "{path}"')
@ -454,7 +743,7 @@ class TuhiKeteShell(cmd.Cmd):
readline.set_history_length(100)
Gio.bus_watch_name(Gio.BusType.SESSION,
tuhi.dbusclient.TUHI_DBUS_NAME,
TUHI_DBUS_NAME,
Gio.BusNameWatcherFlags.NONE,
self._on_name_appeared,
self._on_name_vanished)
@ -586,7 +875,7 @@ class TuhiKeteShell(cmd.Cmd):
add_help=False)
parser.add_argument('-h', action='help', help=argparse.SUPPRESS)
parser.add_argument('address', metavar='12:34:56:AB:CD:EF',
type=tuhi.dbusclient.TuhiDBusClientDevice.is_device_address,
type=TuhiKeteDevice.is_device_address,
default=None,
help='the address of the device to listen to')
parser.add_argument('mode', choices=['on', 'off'], nargs='?',
@ -697,17 +986,13 @@ class TuhiKeteShell(cmd.Cmd):
add_help=False)
parser.add_argument('-h', action='help', help=argparse.SUPPRESS)
parser.add_argument('address', metavar='12:34:56:AB:CD:EF',
type=tuhi.dbusclient.TuhiDBusClientDevice.is_device_address,
type=TuhiKeteDevice.is_device_address,
default=None,
help='the address of the device to fetch drawing from')
parser.add_argument('index', metavar='{<index>|all}',
type=is_index_or_all,
const='all', nargs='?', default='all',
help='the index of the drawing to fetch or a literal "all"')
parser.add_argument('--format', metavar='{svg|png}',
default='svg',
choices=['svg', 'png'],
help='output file format')
try:
parsed_args = parser.parse_args(args.split())
@ -798,7 +1083,7 @@ class TuhiKeteShell(cmd.Cmd):
add_help=False)
parser.add_argument('-h', action='help', help=argparse.SUPPRESS)
parser.add_argument('address', metavar='12:34:56:AB:CD:EF',
type=tuhi.dbusclient.TuhiDBusClientDevice.is_device_address,
type=TuhiKeteDevice.is_device_address,
default=None,
help='the address of the device to register')
@ -852,7 +1137,7 @@ class TuhiKeteShell(cmd.Cmd):
add_help=False)
parser.add_argument('-h', action='help', help=argparse.SUPPRESS)
parser.add_argument('address', metavar='12:34:56:AB:CD:EF',
type=tuhi.dbusclient.TuhiDBusClientDevice.is_device_address,
type=TuhiKeteDevice.is_device_address,
default=None, nargs='?',
help='the address of the device to listen to')
@ -904,7 +1189,7 @@ class TuhiKeteShell(cmd.Cmd):
add_help=False)
parser.add_argument('-h', action='help', help=argparse.SUPPRESS)
parser.add_argument('address', metavar='12:34:56:AB:CD:EF',
type=tuhi.dbusclient.TuhiDBusClientDevice.is_device_address,
type=TuhiKeteDevice.is_device_address,
default=None, nargs='?',
help='the address of the device to listen to')
parser.add_argument('mode', choices=['on', 'off'], nargs='?',
@ -961,7 +1246,7 @@ def main(args):
with TuhiKeteShell() as shell:
shell.run()
except tuhi.dbusclient.DBusError as e:
except DBusError as e:
logger.error(e.message)

View File

@ -28,11 +28,11 @@ import logging
# This tool isn't installed, so we can assume that the tuhi module is always
# in the parent directory
sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)) + '/..') # noqa
from tuhi.util import flatten # noqa
from tuhi.drawing import Drawing # noqa
from tuhi.protocol import StrokeFile # noqa
from tuhi.export import JsonSvg, JsonPng # noqa
from tuhi.wacom import WacomProtocolSpark, WacomProtocolIntuosPro, WacomProtocolSlate # noqa
from tuhi.util import flatten
from tuhi.drawing import Drawing
from tuhi.protocol import StrokeFile
from tuhi.svg import JsonSvg
from tuhi.wacom import WacomProtocolSpark, WacomProtocolIntuosPro, WacomProtocolSlate
logging.basicConfig(format='%(asctime)s %(levelname)s: %(name)s: %(message)s',
level=logging.INFO,
@ -68,7 +68,6 @@ def parse_file(filename, file_format, tablet_model, orientation):
# gotta convert to Drawings, then to json string, then to json, then
# to svg. ffs.
svgname = f'{stem}.svg'
pngname = f'{stem}.png'
jsonname = f'{stem}.json'
d = Drawing(svgname, (width * point_size, height * point_size), timestamp)
@ -86,13 +85,10 @@ def parse_file(filename, file_format, tablet_model, orientation):
with open(jsonname, 'w') as fd:
fd.write(d.to_json())
return
else:
from io import StringIO
js = json.load(StringIO(d.to_json()))
if file_format == 'svg':
JsonSvg(js, orientation, d.name)
elif file_format == 'png':
JsonPng(js, orientation, pngname)
from io import StringIO
js = json.load(StringIO(d.to_json()))
JsonSvg(js, orientation, d.name)
def fetch_files():
@ -146,7 +142,7 @@ def main(args=sys.argv):
parser.add_argument('--format',
help='The format to generate. Default: svg',
default='svg',
choices=['svg', 'png', 'json'])
choices=['svg', 'json'])
ns = parser.parse_args(args[1:])
if ns.verbose:

View File

@ -19,16 +19,6 @@ import sys
import multiprocessing
from multiprocessing import reduction
try:
import tuhi.dbusclient
except ModuleNotFoundError:
# If PYTHONPATH isn't set up or we never installed Tuhi, the module
# isn't available. And since we don't install tuhi-live, we can assume that
# we're still in the git repo, so messing with the path is "fine".
sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)) + '/..') # noqa
import tuhi.dbusclient
manager = None
logger = None
@ -45,8 +35,10 @@ def open_uhid_process(queue_in, conn_out):
def maybe_start_tuhi(queue):
sys.path
try:
should_start, args = queue.get()
should_start, verbose = queue.get()
except KeyboardInterrupt:
return 0
@ -56,15 +48,14 @@ def maybe_start_tuhi(queue):
sys.path.append(os.getcwd())
import tuhi.base
import signal
# we don't want to kill Tuhi on ctrl+c because we won't be able to reset
# live mode. Instead we rely on tuhi-live to take us down when it exits
signal.signal(signal.SIGINT, signal.SIG_IGN)
args = ['tuhi-live'] + args # argparse in tuhi.base.main skips argv[0]
tuhi.base.main(args)
if verbose:
tuhi.base.logger.setLevel(logging.DEBUG)
t = tuhi.base.Tuhi()
while True:
try:
t.run()
except KeyboardInterrupt:
pass
def start_tuhi_server(args):
@ -93,7 +84,7 @@ def start_tuhi_server(args):
except GLib.Error as e:
if (e.domain == 'g-io-error-quark' and
e.code == Gio.IOErrorEnum.DBUS_ERROR):
raise tuhi.dbusclient.DBusError(e.message)
raise kete.DBusError(e.message)
else:
raise e
@ -102,50 +93,50 @@ def start_tuhi_server(args):
try:
proxy = Gio.DBusProxy.new_sync(connection,
Gio.DBusProxyFlags.NONE, None,
tuhi.dbusclient.TUHI_DBUS_NAME,
tuhi.dbusclient.ROOT_PATH,
tuhi.dbusclient.ORG_FREEDESKTOP_TUHI1_MANAGER,
kete.TUHI_DBUS_NAME,
kete.ROOT_PATH,
kete.ORG_FREEDESKTOP_TUHI1_MANAGER,
None)
except GLib.Error as e:
if (e.domain == 'g-io-error-quark' and
e.code == Gio.IOErrorEnum.DBUS_ERROR):
raise tuhi.dbusclient.DBusError(e.message)
raise kete.DBusError(e.message)
else:
raise e
started = proxy.get_name_owner() is not None
if not started:
print(f'No-one is handling {tuhi.dbusclient.TUHI_DBUS_NAME}, attempting to start a daemon')
print(f'No-one is handling {kete.TUHI_DBUS_NAME}, attempting to start a daemon')
queue.put((not started, args))
queue.put((not started, args.verbose))
def run_live(request_fd_queue, conn_fd):
import kete
from gi.repository import Gio, GLib
def on_name_appeared(connection, name, client):
global manager
logger.info('Connected to the Tuhi daemon')
manager = tuhi.dbusclient.TuhiDBusClientManager()
manager = kete.TuhiKeteManager()
for device in manager.devices:
if device.live:
logger.info(f'{device} is already live, stopping first')
device.stop_live()
logger.info(f'starting live on {device}, please press button on the device')
request_fd_queue.put(os.getpid())
fd = reduction.recv_handle(conn_fd)
device.start_live(fd)
Gio.bus_watch_name(Gio.BusType.SESSION,
tuhi.dbusclient.TUHI_DBUS_NAME,
kete.TUHI_DBUS_NAME,
Gio.BusNameWatcherFlags.NONE,
on_name_appeared,
None)
mainloop = GLib.MainLoop()
connected_devices = 0
def on_disconnect(dev, pspec):
mainloop.quit()
@ -156,6 +147,10 @@ def run_live(request_fd_queue, conn_fd):
except KeyboardInterrupt:
pass
finally:
for device in manager.devices:
if device.live:
connected_devices += 1
for device in manager.devices:
if device.live and device.connected:
logger.info(f'stopping live on {device}')
@ -183,33 +178,30 @@ def drop_privileges():
os.setresuid(uid, uid, uid)
pw = pwd.getpwuid(uid)
path = os.environ['PATH']
display = os.environ['DISPLAY']
# we completely clear the environment and start a new and controlled one
os.environ.clear()
os.environ['XDG_RUNTIME_DIR'] = f'/run/user/{uid}'
os.environ['HOME'] = pw.pw_dir
os.environ['PATH'] = path
os.environ['DISPLAY'] = display
def parse(args):
parser = argparse.ArgumentParser(description='Tool to start live mode')
parser.add_argument('--flatpak-compatibility-mode',
help='Use the flatpak xdg directories',
desc = 'tool to start the live mode on all devices tuhi knows about'
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('-v', '--verbose',
help='Show some debugging informations',
action='store_true',
default=False)
ns, remaining_args = parser.parse_known_args(args[1:])
return ns, remaining_args
return parser.parse_args(args[1:])
def main(args=sys.argv):
if not os.geteuid() == 0:
sys.exit('Script must be run as root')
our_args, remaining_args = parse(args)
args = parse(args)
request_fd_queue = multiprocessing.Queue()
conn_in, conn_out = multiprocessing.Pipe()
@ -219,19 +211,7 @@ def main(args=sys.argv):
drop_privileges()
if our_args.flatpak_compatibility_mode:
from pathlib import Path
# tuhi-live is usually started through sudo, so let's get to the
# user's home directory here.
userhome = Path(os.path.expanduser('~' + os.getlogin()))
basedir = userhome / '.var' / 'app' / 'org.freedesktop.Tuhi'
print(f'Using flatpak xdg dirs in {basedir}')
os.environ['XDG_DATA_HOME'] = os.fspath(basedir / 'data')
os.environ['XDG_CONFIG_HOME'] = os.fspath(basedir / 'config')
os.environ['XDG_CACHE_HOME'] = os.fspath(basedir / 'cache')
start_tuhi_server(remaining_args)
start_tuhi_server(args)
run_live(request_fd_queue, conn_in)

View File

@ -5,15 +5,9 @@ import sys
import os
from pathlib import Path
try:
# 3.30 is the first one with Gtk.Template
gi.check_version('3.30') # NOQA
except ValueError as e:
print(e, file=sys.stderr)
sys.exit(1)
gi.require_version('Gio', '2.0') # NOQA
from gi.repository import Gio
gi.require_version('Gtk', '3.0') # NOQA
from gi.repository import Gio, Gtk, Gdk
@devel@ # NOQA
@ -21,12 +15,61 @@ resource = Gio.resource_load(os.fspath(Path('@pkgdatadir@', 'tuhi.gresource')))
Gio.Resource._register(resource)
def install_excepthook():
old_hook = sys.excepthook
def new_hook(etype, evalue, etb):
old_hook(etype, evalue, etb)
while Gtk.main_level():
Gtk.main_quit()
sys.exit()
sys.excepthook = new_hook
def gtk_style():
css = b"""
flowboxchild:selected {
background-color: white;
}
.bg-white {
background-color: white;
}
.bg-paper {
border-radius: 5px;
background-color: #ebe9e8;
}
.drawing {
background-color: white;
border-radius: 5px;
}
"""
screen = Gdk.Screen.get_default()
if screen is None:
print('Error: Unable to connect to screen. Make sure DISPLAY or WAYLAND_DISPLAY are set', file=sys.stderr)
sys.exit(1)
style_provider = Gtk.CssProvider()
style_provider.load_from_data(css)
Gtk.StyleContext.add_provider_for_screen(
screen,
style_provider,
Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION
)
if __name__ == "__main__":
import gettext
import locale
import signal
from tuhi.gui.application import Application
install_excepthook()
gtk_style()
locale.bindtextdomain('tuhi', '@localedir@')
locale.textdomain('tuhi')
gettext.bindtextdomain('tuhi', '@localedir@')
from tuhi.gui.application import main
main(sys.argv)
gettext.textdomain('tuhi')
signal.signal(signal.SIGINT, signal.SIG_DFL)
exit_status = Application().run(sys.argv)
sys.exit(exit_status)

24
tuhi.in
View File

@ -14,7 +14,6 @@
import sys
import subprocess
from pathlib import Path
import argparse
tuhi_server = Path('@libexecdir@', 'tuhi-server')
tuhi_gui = Path('@libexecdir@', 'tuhi-gui')
@ -23,27 +22,10 @@ tuhi_gui = Path('@libexecdir@', 'tuhi-gui')
@devel@ # NOQA
if __name__ == '__main__':
if sys.version_info < (3, 6):
sys.exit('Python 3.6 or later required')
parser = argparse.ArgumentParser(description='Tuhi')
parser.add_argument('--flatpak-compatibility-mode',
help='Use the flatpak xdg directories',
action='store_true',
default=False)
ns, remainder = parser.parse_known_args()
if ns.flatpak_compatibility_mode:
import os
basedir = Path.home() / '.var' / 'app' / 'org.freedesktop.Tuhi'
print(f'Using flatpak xdg dirs in {basedir}')
os.environ['XDG_DATA_HOME'] = os.fspath(basedir / 'data')
os.environ['XDG_CONFIG_HOME'] = os.fspath(basedir / 'config')
os.environ['XDG_CACHE_HOME'] = os.fspath(basedir / 'cache')
tuhi = subprocess.Popen([tuhi_server] + remainder)
args = sys.argv[1:]
tuhi = subprocess.Popen([tuhi_server] + args)
try:
subprocess.run([tuhi_gui] + remainder)
subprocess.run([tuhi_gui] + args)
except KeyboardInterrupt:
pass
tuhi.terminate()

View File

@ -21,11 +21,11 @@ from pathlib import Path
try:
from gi.repository import GObject, GLib
except Exception as e:
print('************ Importing gi.repository failed **********')
print('* This is an issue with the gi module, not with tuhi *')
print('******************************************************')
print('The full exception is below:')
print('')
print(f'************ Importing gi.repository failed **********')
print(f'* This is an issue with the gi module, not with tuhi *')
print(f'******************************************************')
print(f'The full exception is below:')
print(f'')
raise e
from tuhi.dbusserver import TuhiDBusServer
@ -37,7 +37,7 @@ DEFAULT_CONFIG_PATH = Path(xdg.BaseDirectory.xdg_data_home, 'tuhi')
logger = logging.getLogger('tuhi')
WACOM_COMPANY_IDS = [0x4755, 0x4157, 0x424d]
WACOM_COMPANY_IDS = [0x4755, 0x4157]
class TuhiDevice(GObject.Object):

View File

@ -206,11 +206,11 @@ class BlueZDevice(GObject.Object):
'''
i = self.obj.get_interface(ORG_BLUEZ_DEVICE1)
if self.connected:
self.logger.info('Device is already connected')
self.logger.info(f'Device is already connected')
self.emit('connected')
return
self.logger.debug('Connecting')
self.logger.info(f'Connecting')
i.Connect(result_handler=self._on_connect_result)
def _on_connect_result(self, obj, result, user_data):
@ -219,7 +219,7 @@ class BlueZDevice(GObject.Object):
result.code == Gio.IOErrorEnum.DBUS_ERROR and
Gio.dbus_error_get_remote_error(result) == 'org.bluez.Error.Failed' and
'Operation already in progress' in result.message):
self.logger.debug('Already connecting')
self.logger.debug(f'Already connecting')
elif isinstance(result, Exception):
self.logger.error(f'Connection failed: {result}')
@ -230,11 +230,11 @@ class BlueZDevice(GObject.Object):
'''
i = self.obj.get_interface(ORG_BLUEZ_DEVICE1)
if not i.get_cached_property('Connected').get_boolean():
self.logger.info('Device is already disconnected')
self.logger.info(f'Device is already disconnected')
self.emit('disconnected')
return
self.logger.debug('Disconnecting')
self.logger.info(f'Disconnecting')
i.Disconnect(result_handler=self._on_disconnect_result)
def _on_disconnect_result(self, obj, result, user_data):
@ -246,9 +246,9 @@ class BlueZDevice(GObject.Object):
if 'Connected' in properties:
if properties['Connected']:
self.logger.debug('Connection established')
self.logger.info('Connection established')
else:
self.logger.debug('Disconnected')
self.logger.info('Disconnected')
self.emit('disconnected')
if 'ServicesResolved' in properties:
if properties['ServicesResolved']:

View File

@ -426,7 +426,7 @@ class TuhiDBusDevice(_TuhiDBus):
fds_list = message.get_unix_fd_list()
if fds_list is None or fds_list.get_length() != 1:
logger.error('uhid fds not provided')
logger.error(f'uhid fds not provided')
result = GLib.Variant.new_int32(-errno.EINVAL)
invocation.return_value(GLib.Variant.new_tuple(result))
return

View File

@ -1,160 +0,0 @@
#!/usr/bin/env python3
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
from gi.repository import GObject
import svgwrite
from svgwrite import mm
import cairo
class ImageExportBase(GObject.Object):
def __init__(self, json, orientation, filename, *args, **kwargs):
super().__init__(*args, **kwargs)
self.json = json
self.timestamp = json['timestamp']
self.filename = filename
self.orientation = orientation.lower()
self._convert()
@property
def output_dimensions(self):
dimensions = self.json['dimensions']
if dimensions == [0, 0]:
width, height = 100, 100
else:
# Original dimensions are too big for most Standards
# so we scale them down
width = dimensions[0] / self._output_scaling_factor
height = dimensions[1] / self._output_scaling_factor
if self.orientation in ['portrait', 'reverse-portrait']:
return height, width
else:
return width, height
@property
def output_strokes(self):
width, height = self.output_dimensions
strokes = []
for s in self.json['strokes']:
points_with_sk_width = []
for p in s['points']:
x, y = p['position']
# Scaling coordinates
x = x / self._output_scaling_factor
y = y / self._output_scaling_factor
if self.orientation == 'reverse-portrait':
x, y = y, height - x
elif self.orientation == 'portrait':
x, y = width - y, x
elif self.orientation == 'reverse-landscape':
x, y = width - x, height - y
# Pressure normalized range is [0, 0xffff]
delta = (p['pressure'] - 0x8000) / 0x8000
stroke_width = self._base_pen_width + self._pen_pressure_width_factor * delta
points_with_sk_width.append((x, y, stroke_width))
strokes.append(points_with_sk_width)
return strokes
class JsonSvg(ImageExportBase):
_output_scaling_factor = 1000
_base_pen_width = 0.4
_pen_pressure_width_factor = 0.2
# Change this value down to reduce size, change it up to improve accuracy. measured in px
_width_precision = 10
def _convert(self):
width, height = self.output_dimensions
size = width * mm, height * mm
# Make sure to set viewBox here so mm doesn't have to be specified in all later parts
svg = svgwrite.Drawing(filename=self.filename, size=size, viewBox=(f'0 0 {width} {height}'))
g = svgwrite.container.Group(id='layer0')
for sk_num, stroke_points in enumerate(self.output_strokes):
path = None
stroke_width_p = None
for i, (x, y, stroke_width) in enumerate(stroke_points):
if not x or not y:
continue
# Reduce precision of the width
stroke_width = int(stroke_width * self._width_precision) / self._width_precision
# Create a new path per object and per unique width
if stroke_width_p != stroke_width:
if path:
g.add(path)
# Reduce width by mm to px at 96dpi (see SVG/CSS specification)
width_px = stroke_width * 0.26458
path = svg.path(id=f'sk_{sk_num}_{i}', style=f'fill:none;stroke:black;stroke-width:{width_px}')
stroke_width_p = stroke_width
path.push("M", f'{x:.2f}', f'{y:.2f}')
else:
# Continue writing segment line with next coords
path.push("L", f'{x:.2f}', f'{y:.2f}')
if path:
g.add(path)
svg.add(g)
svg.save()
class JsonPng(ImageExportBase):
_output_scaling_factor = 100
_base_pen_width = 3
_pen_pressure_width_factor = 1
def _convert(self):
width, height = self.output_dimensions
width, height = int(width), int(height)
surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, width, height)
ctx = cairo.Context(surface)
# Paint a transparent background
ctx.set_source_rgba(0, 0, 0, 0)
ctx.paint()
ctx.set_antialias(cairo.Antialias.DEFAULT)
ctx.set_line_join(cairo.LINE_JOIN_ROUND)
ctx.set_source_rgb(0, 0, 0)
for sk_num, stroke_points in enumerate(self.output_strokes):
for i, (x, y, stroke_width) in enumerate(stroke_points):
ctx.set_line_width(stroke_width)
if i == 0:
ctx.move_to(x, y)
else:
ctx.line_to(x, y)
ctx.stroke()
surface.write_to_png(self.filename)

View File

@ -11,17 +11,18 @@
# GNU General Public License for more details.
#
from gi.repository import Gio, GLib, Gtk
import logging
from .window import MainWindow
from .config import Config
import xdg.BaseDirectory
from pathlib import Path
import logging
import sys
import xdg.BaseDirectory
import gi
gi.require_version("Gio", "2.0")
gi.require_version("Gtk", "3.0")
from gi.repository import Gio, GLib, Gtk, Gdk # NOQA
logging.basicConfig(format='%(asctime)s %(levelname)s: %(name)s: %(message)s',
level=logging.INFO,
@ -51,9 +52,6 @@ class Application(Gtk.Application):
GLib.OptionFlags.NONE,
GLib.OptionArg.NONE,
'download first drawing only but do not remove it from the device')
self.set_accels_for_action('app.quit', ['<Ctrl>Q'])
self._tuhi = None
def do_startup(self):
@ -104,61 +102,3 @@ class Application(Gtk.Application):
def _help(self, action, param):
import time
Gtk.show_uri(None, 'https://github.com/tuhiproject/tuhi/wiki', time.time())
def install_excepthook():
old_hook = sys.excepthook
def new_hook(etype, evalue, etb):
old_hook(etype, evalue, etb)
while Gtk.main_level():
Gtk.main_quit()
sys.exit()
sys.excepthook = new_hook
def gtk_style():
css = b"""
flowboxchild:selected {
background-color: white;
}
.bg-white {
background-color: white;
}
.bg-paper {
border-radius: 5px;
background-color: #ebe9e8;
}
.drawing {
background-color: white;
border-radius: 5px;
}
"""
screen = Gdk.Screen.get_default()
if screen is None:
print('Error: Unable to connect to screen. Make sure DISPLAY or WAYLAND_DISPLAY are set', file=sys.stderr)
sys.exit(1)
style_provider = Gtk.CssProvider()
style_provider.load_from_data(css)
Gtk.StyleContext.add_provider_for_screen(screen,
style_provider,
Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION)
def main(argv):
if sys.version_info < (3, 6):
sys.exit('Python 3.6 or later required')
import gettext
import locale
import signal
install_excepthook()
gtk_style()
locale.textdomain('tuhi')
gettext.textdomain('tuhi')
signal.signal(signal.SIGINT, signal.SIG_DFL)
exit_status = Application().run(argv)
sys.exit(exit_status)

View File

@ -46,7 +46,7 @@ class Config(GObject.Object):
if not self.path.exists():
return
logger.debug('configuration found')
logger.debug(f'configuration found')
self.config.read(self.path)
def _load_cached_drawings(self):
@ -69,7 +69,7 @@ class Config(GObject.Object):
self.config[section][key] = value
self._write()
@GObject.Property
@GObject.property
def orientation(self):
try:
return self.config['Device']['Orientation']
@ -81,7 +81,7 @@ class Config(GObject.Object):
assert(orientation in ['landscape', 'portrait'])
self._add_key('Device', 'Orientation', orientation)
@GObject.Property
@GObject.property
def drawings(self):
return self._drawings

View File

@ -12,21 +12,18 @@
#
from gettext import gettext as _
from gi.repository import GObject, Gtk, GdkPixbuf, Gdk
import xdg.BaseDirectory
import os
from pathlib import Path
from .config import Config
from tuhi.export import JsonSvg, JsonPng
from tuhi.svg import JsonSvg
import gi
gi.require_version("Gtk", "3.0")
from gi.repository import GObject, Gtk, GdkPixbuf, Gdk # NOQA
DATA_PATH = Path(xdg.BaseDirectory.xdg_cache_home, 'tuhi')
SVG_DATA_PATH = Path(DATA_PATH, 'svg')
PNG_DATA_PATH = Path(DATA_PATH, 'png')
DATA_PATH = Path(xdg.BaseDirectory.xdg_cache_home, 'tuhi', 'svg')
@Gtk.Template(resource_path='/org/freedesktop/Tuhi/ui/Drawing.ui')
@ -42,8 +39,7 @@ class Drawing(Gtk.EventBox):
super().__init__()
self.orientation = Config().orientation
Config().connect('notify::orientation', self._on_orientation_changed)
SVG_DATA_PATH.mkdir(parents=True, exist_ok=True)
PNG_DATA_PATH.mkdir(parents=True, exist_ok=True)
DATA_PATH.mkdir(parents=True, exist_ok=True)
self.json_data = json_data
self._zoom = zoom
@ -59,12 +55,8 @@ class Drawing(Gtk.EventBox):
self.redraw()
def process_svg(self):
path = os.fspath(Path(SVG_DATA_PATH, f'{self.json_data["timestamp"]}.svg'))
self.svg = JsonSvg(
self.json_data,
self.orientation,
path
)
path = os.fspath(Path(DATA_PATH, f'{self.json_data["timestamp"]}.svg'))
self.svg = JsonSvg(self.json_data, self.orientation, path)
width, height = -1, -1
if 'portrait' in self.orientation:
height = 1000
@ -75,14 +67,6 @@ class Drawing(Gtk.EventBox):
height=height,
preserve_aspect_ratio=True)
def process_png(self):
path = os.fspath(Path(PNG_DATA_PATH, f'{self.json_data["timestamp"]}.png'))
self.png = JsonPng(
self.json_data,
self.orientation,
path
)
def redraw(self):
ratio = self.pixbuf.get_height() / self.pixbuf.get_width()
base = 250 + self.zoom * 50
@ -112,9 +96,11 @@ class Drawing(Gtk.EventBox):
@Gtk.Template.Callback('_on_download_button_clicked')
def _on_download_button_clicked(self, button):
dialog = Gtk.FileChooserNative()
dialog.set_action(Gtk.FileChooserAction.SAVE)
dialog.set_transient_for(self.get_toplevel())
dialog = Gtk.FileChooserDialog(_('Please choose a file'),
None,
Gtk.FileChooserAction.SAVE,
(Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL,
Gtk.STOCK_SAVE, Gtk.ResponseType.OK))
dialog.set_do_overwrite_confirmation(True)
# Translators: the default filename to save to
@ -128,33 +114,21 @@ class Drawing(Gtk.EventBox):
# Translators: filter to show svg files only
filter_svg.set_name(_('SVG files'))
filter_svg.add_pattern('*.svg')
filter_png = Gtk.FileFilter()
# Translators: filter to show png files only
filter_png.set_name(_('PNG files'))
filter_png.add_pattern('*.png')
dialog.add_filter(filter_svg)
dialog.add_filter(filter_png)
dialog.add_filter(filter_any)
response = dialog.run()
if response == Gtk.ResponseType.ACCEPT:
if response == Gtk.ResponseType.OK:
import shutil
file = dialog.get_filename()
# regenerate the SVG based on the current rotation.
# where we used the orientation buttons, we haven't updated the
# file itself.
self.process_svg()
if file.lower().endswith('.png'):
# regenerate the PNG based on the current rotation.
# where we used the orientation buttons, we haven't updated the
# file itself.
self.process_png()
shutil.move(self.png.filename, file)
else:
# regenerate the SVG based on the current rotation.
# where we used the orientation buttons, we haven't updated the
# file itself.
self.process_svg()
shutil.copyfile(self.svg.filename, file)
# FIXME: error handling
file = dialog.get_filename()
shutil.copyfile(self.svg.filename, file)
# FIXME: error handling
dialog.destroy()

View File

@ -11,15 +11,15 @@
# GNU General Public License for more details.
#
from gi.repository import GObject, Gtk
from .drawing import Drawing
from .config import Config
import time
import gi
import logging
import gi
gi.require_version("Gtk", "3.0")
from gi.repository import GObject, Gtk # NOQA
logger = logging.getLogger('tuhi.gui.drawingperspective')
@ -55,7 +55,7 @@ class Flowbox(Gtk.Box):
self.flowbox_drawings.remove(child)
self.flowbox_drawings.foreach(delete_matching_child, drawing)
@GObject.Property
@GObject.property
def is_empty(self):
return not self.flowbox_drawings.get_children()
@ -74,13 +74,7 @@ class DrawingPerspective(Gtk.Stack):
super().__init__(*args, **kwargs)
self.known_drawings = {} # type {timestamp: Drawing()}
self.flowboxes = {}
# Add an expanding emtpy label to the bottom - this pushes all the
# real stuff up to the top, forcing a nice alignment
fake_label = Gtk.Label("")
fake_label.show()
self.box_all_drawings.pack_end(fake_label, expand=True, fill=True, padding=100)
self._zoom = 0
self._want_listen = True
def _cache_drawings(self, device, pspec):
# The config backend filters duplicates anyway, so don't care here
@ -92,7 +86,7 @@ class DrawingPerspective(Gtk.Stack):
def _hash(drawing):
return time.strftime('%Y%m', time.gmtime(drawing.timestamp))
for js in sorted(config.drawings, key=lambda j: j['timestamp']):
for js in sorted(config.drawings, reverse=True, key=lambda j: j['timestamp']):
ts = js['timestamp']
if ts in self.known_drawings:
continue
@ -107,7 +101,7 @@ class DrawingPerspective(Gtk.Stack):
except KeyError:
fb = Flowbox(time.gmtime(drawing.timestamp))
self.flowboxes[key] = fb
self.box_all_drawings.pack_end(fb, expand=False, fill=True, padding=0)
self.box_all_drawings.add(fb)
finally:
fb.insert(drawing)
@ -132,13 +126,8 @@ class DrawingPerspective(Gtk.Stack):
def device(self, device):
self._device = device
self._signals = []
sig = device.connect('notify::connected', self._on_connected)
self._signals.append(sig)
sig = device.connect('notify::listening', self._on_listening_stopped)
self._signals.append(sig)
sig = device.connect('device-error', self._on_device_error)
self._signals.append(sig)
device.connect('notify::connected', self._on_connected)
device.connect('notify::listening', self._on_listening_stopped)
# This is a bit convoluted. We need to cache all drawings
# because Tuhi doesn't have guaranteed storage. So any json that
@ -179,20 +168,11 @@ class DrawingPerspective(Gtk.Stack):
pass
def _on_listening_stopped(self, device, pspec):
if not device.listening and self._want_listen:
if not device.listening:
logger.debug(f'{device.name} - listening stopped, restarting')
# We never want to stop listening
device.start_listening()
def _on_device_error(self, device, error):
import errno
if error == -errno.EACCES:
# No point to keep getting notified
for sig in self._signals:
device.disconnect(sig)
self._signals = []
self._want_listen = False
@Gtk.Template.Callback('_on_undo_close_clicked')
def _on_undo_close_clicked(self, button):
self.overlay_undo.set_reveal_child(False)

View File

@ -18,7 +18,7 @@ import os
import logging
import re
logger = logging.getLogger('tuhi.dbusclient')
logger = logging.getLogger('tuhi.gui.dbus')
TUHI_DBUS_NAME = 'org.freedesktop.tuhi1'
ORG_FREEDESKTOP_TUHI1_MANAGER = 'org.freedesktop.tuhi1.Manager'
@ -37,7 +37,7 @@ class _DBusObject(GObject.Object):
_connection = None
def __init__(self, name, interface, objpath):
super().__init__()
GObject.GObject.__init__(self)
# this is not handled asynchronously because if we fail to
# get the session bus, we have other issues
@ -149,18 +149,20 @@ class BlueZDevice(_DBusSystemObject):
self.notify('connected')
class TuhiDBusClientDevice(_DBusObject):
class TuhiKeteDevice(_DBusObject):
__gsignals__ = {
'button-press-required':
(GObject.SignalFlags.RUN_FIRST, None, ()),
(GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_PYOBJECT,)),
'registered':
(GObject.SignalFlags.RUN_FIRST, None, ()),
(GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_PYOBJECT,)),
'device-error':
(GObject.SignalFlags.RUN_FIRST, None, (int,)),
}
def __init__(self, manager, objpath):
super().__init__(TUHI_DBUS_NAME, ORG_FREEDESKTOP_TUHI1_DEVICE, objpath)
_DBusObject.__init__(self, TUHI_DBUS_NAME,
ORG_FREEDESKTOP_TUHI1_DEVICE,
objpath)
self.manager = manager
self.is_registering = False
self._bluez_device = BlueZDevice(self.property('BlueZDevice'))
@ -209,10 +211,6 @@ class TuhiDBusClientDevice(_DBusObject):
def sync_state(self):
return self._sync_state
@GObject.Property
def live(self):
return self.property('Live')
def _on_connected(self, bluez_device, pspec):
self.notify('connected')
@ -243,7 +241,7 @@ class TuhiDBusClientDevice(_DBusObject):
def _on_signal_received(self, proxy, sender, signal, parameters):
if signal == 'ButtonPressRequired':
logger.info(f'{self}: Press button on device now')
self.emit('button-press-required')
self.emit('button-press-required', self)
elif signal == 'ListeningStopped':
err = parameters[0]
if err == -errno.EACCES:
@ -270,8 +268,6 @@ class TuhiDBusClientDevice(_DBusObject):
self.notify('battery-percent')
elif 'BatteryState' in changed_props:
self.notify('battery-state')
elif 'Live' in changed_props:
self.notify('live')
def __repr__(self):
return f'{self.address} - {self.name}'
@ -286,21 +282,7 @@ class TuhiDBusClientDevice(_DBusObject):
self.manager.disconnect(self.s1)
del(self.s1)
logger.info(f'{self}: Registration successful')
self.emit('registered')
def start_live(self, fd):
fd_list = Gio.UnixFDList.new()
fd_list.append(fd)
res, fds = self.proxy.call_with_unix_fd_list_sync('org.freedesktop.tuhi1.Device.StartLive',
GLib.Variant('(h)', (fd,)),
Gio.DBusCallFlags.NO_AUTO_START,
-1,
fd_list,
None)
def stop_live(self):
self.proxy.StopLive()
self.emit('registered', self)
def terminate(self):
try:
@ -308,17 +290,19 @@ class TuhiDBusClientDevice(_DBusObject):
except AttributeError:
pass
self._bluez_device.terminate()
super().terminate()
super(TuhiKeteDevice, self).terminate()
class TuhiDBusClientManager(_DBusObject):
class TuhiKeteManager(_DBusObject):
__gsignals__ = {
'unregistered-device':
(GObject.SignalFlags.RUN_FIRST, None, (GObject.TYPE_PYOBJECT,)),
}
def __init__(self):
super().__init__(TUHI_DBUS_NAME, ORG_FREEDESKTOP_TUHI1_MANAGER, ROOT_PATH)
_DBusObject.__init__(self, TUHI_DBUS_NAME,
ORG_FREEDESKTOP_TUHI1_MANAGER,
ROOT_PATH)
self._devices = {}
self._unregistered_devices = {}
@ -332,7 +316,7 @@ class TuhiDBusClientManager(_DBusObject):
def _init(self, *args, **kwargs):
logger.info('manager is online')
for objpath in self.property('Devices'):
device = TuhiDBusClientDevice(self, objpath)
device = TuhiKeteDevice(self, objpath)
self._devices[device.address] = device
@GObject.Property
@ -366,7 +350,7 @@ class TuhiDBusClientManager(_DBusObject):
dev.terminate()
self._devices = {}
self._unregistered_devices = {}
super().terminate()
super(TuhiKeteManager, self).terminate()
def _on_properties_changed(self, proxy, changed_props, invalidated_props):
if changed_props is None:
@ -395,7 +379,7 @@ class TuhiDBusClientManager(_DBusObject):
self.emit('unregistered-device', dev)
return
device = TuhiDBusClientDevice(self, objpath)
device = TuhiKeteDevice(self, objpath)
self._unregistered_devices[objpath] = device
logger.debug(f'New unregistered device: {device}')

View File

@ -11,19 +11,49 @@
# GNU General Public License for more details.
#
from .drawingperspective import DrawingPerspective
from .config import Config
from tuhi.dbusclient import TuhiDBusClientManager
from gettext import gettext as _
import logging
from gi.repository import Gtk, Gio, GLib, GObject
from .drawingperspective import DrawingPerspective
from .tuhi import TuhiKeteManager
from .config import Config
import logging
import gi
gi.require_version("Gtk", "3.0")
from gi.repository import Gtk, Gio, GLib, GObject # NOQA
logger = logging.getLogger('tuhi.gui.window')
MENU_XML = """
<?xml version="1.0" encoding="UTF-8"?>
<interface>
<menu id="primary-menu">
<section>
<item>
<attribute name="label" translatable="yes">Portrait</attribute>
<attribute name="action">win.orientation</attribute>
<attribute name="target">portrait</attribute>
</item>
<item>
<attribute name="label" translatable="yes">Landscape</attribute>
<attribute name="action">win.orientation</attribute>
<attribute name="target">landscape</attribute>
</item>
</section>
<section>
<item>
<attribute name="label" translatable="yes">Help</attribute>
<attribute name="action">app.help</attribute>
</item>
<item>
<attribute name="label" translatable="yes">About</attribute>
<attribute name="action">app.about</attribute>
</item>
</section>
</menu>
</interface>
"""
@Gtk.Template(resource_path="/org/freedesktop/Tuhi/ui/ErrorPerspective.ui")
class ErrorPerspective(Gtk.Box):
@ -70,14 +100,14 @@ class SetupDialog(Gtk.Dialog):
self._sig = device.connect('button-press-required', self._on_button_press_required)
device.register()
def _on_button_press_required(self, device):
device.disconnect(self._sig)
def _on_button_press_required(self, tuhi, device):
tuhi.disconnect(self._sig)
self.stack.set_visible_child_name('page2')
self._sig = device.connect('registered', self._on_registered)
def _on_registered(self, device):
device.disconnect(self._sig)
def _on_registered(self, tuhi, device):
tuhi.disconnect(self._sig)
self.device = device
self.response(Gtk.ResponseType.OK)
@ -102,7 +132,7 @@ class MainWindow(Gtk.ApplicationWindow):
super().__init__(**kwargs)
self.maximize()
self._tuhi = TuhiDBusClientManager()
self._tuhi = TuhiKeteManager()
action = Gio.SimpleAction.new_stateful('orientation', GLib.VariantType('s'),
GLib.Variant('s', 'landscape'))
@ -110,7 +140,7 @@ class MainWindow(Gtk.ApplicationWindow):
action.set_state(GLib.Variant.new_string(Config().orientation))
self.add_action(action)
builder = Gtk.Builder.new_from_resource('/org/freedesktop/Tuhi/ui/AppMenu.ui')
builder = Gtk.Builder.new_from_string(MENU_XML, -1)
menu = builder.get_object("primary-menu")
self.menubutton1.set_menu_model(menu)
@ -137,7 +167,7 @@ class MainWindow(Gtk.ApplicationWindow):
dp = DrawingPerspective()
self._add_perspective(dp)
self.headerbar.set_title('Tuhi')
self.headerbar.set_title(f'Tuhi')
self.stack_perspectives.set_visible_child_name(dp.name)
if not self._tuhi.devices:
@ -214,7 +244,6 @@ class MainWindow(Gtk.ApplicationWindow):
# register.
for sig in self._signals:
device.disconnect(sig)
self._signals = []
def _add_perspective(self, perspective):
self.stack_perspectives.add_named(perspective, perspective.name)

View File

@ -144,7 +144,7 @@ def as_hex_string(data):
elif isinstance(data, list):
return ' '.join([f'{x:02x}' for x in data])
raise ValueError('Unsupported data format {data.__class__.__name__} for {data}')
raise ValueError('Unsupported data format {data.__class__} for {data}')
def _get_protocol_dictionary(protocol):
@ -337,8 +337,8 @@ class NordicData(list):
if self.length != len(data):
raise UnexpectedDataError(bs, f'Invalid data: length field {self.length}, data length is {len(data)}')
def __str__(self):
return f'{self.name if self.name else "UNKNOWN"} {self.opcode:02x} / {self.length:02x} / {as_hex_string(self)}'
def __repr__(self):
return f'{self.name if self.Name else "UNKNOWN"}{self.opcode:02x} / {self.length:02x} / {as_hex_string(self)}'
class ProtocolError(Exception):
@ -361,7 +361,7 @@ class MissingReplyError(ProtocolError):
def __init__(self, request, message=None):
self.request = request
def __str__(self):
def __repr__(self):
return f'Missing reply for request {self.request}. {self.message}'
@ -390,8 +390,8 @@ class UnexpectedReply(ProtocolError):
super().__init__(message)
self.msg = msg
def __str__(self):
return f'{self.__class__.__name__}: {self.msg}: {self.message}'
def __repr__(self):
return f'{self.__class__}: {self.msg}: {self.message}'
class UnexpectedDataError(ProtocolError):
@ -412,8 +412,8 @@ class UnexpectedDataError(ProtocolError):
super().__init__(*args, **kwargs)
self.bytes = bytes
def __str__(self):
return f'{self.__class__.__name__}: {self.bytes} - {self.message}'
def __repr__(self):
return f'{self.__class__}: {self.bytes} - {self.message}'
class DeviceError(ProtocolError):
@ -451,9 +451,12 @@ class DeviceError(ProtocolError):
if self.errorcode == DeviceError.ErrorCode.INVALID_STATE:
self.errno = errno.EBADE
def __str__(self):
def __repr__(self):
return f'DeviceError.{self.errorcode.name}'
def __str__(self):
return repr(self)
class Msg(object):
'''
@ -523,16 +526,18 @@ class Msg(object):
self._args = args
def _handle_reply(self, reply):
'''
Override this in the subclass to handle the reply. Note that the
default 0xb3 message is handled automaticaly, this is only for
non-default replies.
'''Override this in the subclass to handle the reply.
No return value, just throw the appropriate exception on failure.
This is the default reply handler that deals with the 0xb3 ACK/Error
messages and throws the respective exceptions.
:param reply: A :class:`NordicData` object
'''
raise NotImplementedError(f'{reply} needs customized handling')
if reply.opcode != 0xb3:
raise UnexpectedReply(self)
if reply[0] != 0x00:
raise DeviceError(reply[0])
def execute(self):
'''
@ -552,14 +557,7 @@ class Msg(object):
if self.reply is None:
raise MissingReplyError(self.request)
try:
# 0xb3 is always handled by us, anything else requires a
# custom reply handler
if self.reply.opcode == 0xb3:
if self.reply[0] != 0x00:
raise DeviceError(self.reply[0])
else:
self._handle_reply(self.reply)
self._handle_reply(self.reply)
# no exception? we can assume success
self.errorcode = DeviceError.ErrorCode.SUCCESS
except DeviceError as e:
@ -567,8 +565,8 @@ class Msg(object):
raise e
return self # allow chaining
def __str__(self):
return f'{self.__class__.__name__}: {self.interaction} - {self.request}{self.reply}'
def __repr__(self):
return f'{self.__class__}: {self.interaction} - {self.request}{self.reply}'
class MsgConnectIntuosPro(Msg):
@ -651,18 +649,10 @@ class MsgGetName(Msg):
opcode = 0xbb
protocol = ProtocolVersion.ANY
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = ""
def _handle_reply(self, reply):
if reply.opcode != 0xbc:
raise UnexpectedReply(f'Unknown reply: {reply.opcode}')
self.name += bytes(reply).decode('utf-8')
if bytes(reply)[-1] != 0x0a:
self.requires_request = False
self.execute()
self.requires_request = True
self.name = bytes(reply).decode('utf-8')
class MsgGetNameIntuosPro(Msg):
@ -1384,13 +1374,16 @@ class StrokeParsingError(ProtocolError):
self.message = message
self.data = data
def __str__(self):
def __repr__(self):
if self.data:
datastr = f' data: {list2hex(self.data)}'
else:
datastr = ''
return f'{self.message}{datastr}'
def __str__(self):
return self.__repr__()
class StrokeDataType(enum.Enum):
UNKNOWN = enum.auto()
@ -1616,7 +1609,7 @@ class StrokeFile(object):
points.append(last_point)
else:
# should never get here
raise StrokeParsingError('Failed to parse', data[:16])
raise StrokeParsingError(f'Failed to parse', data[:16])
logger.debug(f'Offset {consumed}: {packet}')
consumed += packet.size
@ -1643,7 +1636,7 @@ class StrokePacketUnknown(StrokePacket):
self.size = 1 + nbytes
self.data = data[:self.size]
def __str__(self):
def __repr__(self):
return f'Unknown packet: {list2hex(self.data)}'
@ -1677,9 +1670,9 @@ class StrokeFileHeader(StrokePacket):
func = file_formats[key]
func(data)
except KeyError:
raise StrokeParsingError('Unknown file format:', data[:4])
raise StrokeParsingError(f'Unknown file format:', data[:4])
def __str__(self):
def __repr__(self):
t = time.strftime("%y%m%d%H%M%S", time.gmtime(self.timestamp))
return f'FileHeader: time: {t}, stroke count: {self.nstrokes}'
@ -1726,7 +1719,7 @@ class StrokeHeader(StrokePacket):
elif payload[0:3] == [0xff, 0xee, 0xee]:
self._parse_slate(data, header, payload)
else:
raise StrokeParsingError('Invalid StrokeHeader, expected ff fa or ff ee.', data[:8])
raise StrokeParsingError(f'Invalid StrokeHeader, expected ff fa or ff ee.', data[:8])
def _parse_slate(self, data, header, payload):
self.pen_id = 0
@ -1754,22 +1747,21 @@ class StrokeHeader(StrokePacket):
# if the pen id flag is set, the pen ID comes in the next 8-byte
# packet (plus 0xff header)
if needs_pen_id:
pen_packet = data[self.size:] # header + 8 bytes payload
pen_packet = data[self.size + 1:]
if not pen_packet:
raise StrokeParsingError('Missing pen ID packet')
pen_header = pen_packet[0]
if pen_header != 0xff:
raise StrokeParsingError(f'Unexpected pen id packet header: {pen_header}.', pen_packet[:9])
header = data[0]
if header != 0xff:
raise StrokeParsingError(f'Unexpected pen id packet header: {header}.', data[:9])
pen_payload = pen_packet[1:]
nbytes = bin(pen_header).count('1')
self.pen_id = little_u64(pen_payload[:8])
nbytes = bin(header).count('1')
self.pen_id = little_u64(pen_packet[:8])
self.size += 1 + nbytes
def __str__(self):
def __repr__(self):
if self.timestamp is not None:
t = time.strftime('%y%m%d%H%M%S', time.gmtime(self.timestamp))
t = time.strftime(f'%y%m%d%H%M%S', time.gmtime(self.timestamp))
else:
t = time.strftime(f'boot+{self.time_offset/1000}s')
return f'StrokeHeader: time: {t} new layer: {self.is_new_layer}, pen type: {self.pen_type}, pen id: {self.pen_id:#x}'
@ -1823,7 +1815,7 @@ class StrokeDelta(object):
# 8 bit delta
delta = int.from_bytes(bytes([databytes[0]]), byteorder='little', signed=True)
if delta == 0:
raise StrokeParsingError('StrokeDelta: invalid delta of zero', data)
raise StrokeParsingError(f'StrokeDelta: invalid delta of zero', data)
assert delta != 0
size = 1
elif mask == 3:
@ -1833,7 +1825,7 @@ class StrokeDelta(object):
return value, delta, size
if (data[0] & 0b11) != 0:
raise NotImplementedError('LSB two bits set in mask - this is not supposed to happen')
raise NotImplementedError(f'LSB two bits set in mask - this is not supposed to happen')
xmask = (data[0] & 0b00001100) >> 2
ymask = (data[0] & 0b00110000) >> 4
@ -1857,7 +1849,7 @@ class StrokeDelta(object):
self.size = offset
def __str__(self):
def __repr__(self):
def printstring(delta, abs):
return f'{delta:+5d}' if delta is not None \
else f'{abs:5d}' if abs is not None \
@ -1878,7 +1870,7 @@ class StrokePoint(StrokeDelta):
header = data[0]
payload = data[1:]
if payload[:2] != [0xff, 0xff]:
raise StrokeParsingError('Invalid StrokePoint, expected ff ff ff', data[:9])
raise StrokeParsingError(f'Invalid StrokePoint, expected ff ff ff', data[:9])
# This is a wrapper around StrokeDelta which does the mask parsing.
# In theory the StrokePoint would be a separate packet but it
@ -1896,7 +1888,7 @@ class StrokePoint(StrokeDelta):
# self.y = little_u16(data[4:6])
# self.pressure = little_u16(data[6:8])
def __str__(self):
def __repr__(self):
return f'StrokePoint: {self.x}/{self.y} pressure: {self.p}'
@ -1906,7 +1898,7 @@ class StrokeEOF(StrokePacket):
payload = data[1:]
nbytes = bin(header).count('1')
if payload[:nbytes] != [0xff] * nbytes:
raise StrokeParsingError('Invalid EOF, expected 0xff only', data[:9])
raise StrokeParsingError(f'Invalid EOF, expected 0xff only', data[:9])
self.size = nbytes + 1
@ -1916,11 +1908,11 @@ class StrokeEndOfStroke(StrokePacket):
payload = data[1:]
nbytes = bin(header).count('1')
if payload[:nbytes] != [0xff] * nbytes:
raise StrokeParsingError('Invalid EndOfStroke, expected 0xff only', data[:9])
raise StrokeParsingError(f'Invalid EndOfStroke, expected 0xff only', data[:9])
self.size = nbytes + 1
self.data = data[:self.size]
def __str__(self):
def __repr__(self):
return f'EndOfStroke: {list2hex(self.data)}'
@ -1937,6 +1929,6 @@ class StrokeLostPoint(StrokePacket):
header = data[0]
payload = data[1:]
if payload[:2] != [0xdd, 0xdd]:
raise StrokeParsingError('Invalid StrokeLostPoint, expected ff dd dd', data[:9])
raise StrokeParsingError(f'Invalid StrokeLostPoint, expected ff dd dd', data[:9])
self.nlost = little_u16(payload[2:4])
self.size = bin(header).count('1') + 1

82
tuhi/svg.py Normal file
View File

@ -0,0 +1,82 @@
#!/usr/bin/env python3
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
from gi.repository import GObject
import svgwrite
from svgwrite import mm
class JsonSvg(GObject.Object):
def __init__(self, json, orientation, filename, *args, **kwargs):
super().__init__(*args, **kwargs)
self.json = json
self.timestamp = json['timestamp']
self.filename = filename
self.orientation = orientation.lower()
self._convert()
def _convert(self):
js = self.json
dimensions = js['dimensions']
if dimensions == [0, 0]:
width, height = 100, 100
else:
# Original dimensions are too big for SVG Standard
# so we normalize them
width, height = dimensions[0] / 1000, dimensions[1] / 1000
if self.orientation in ['portrait', 'reverse-portrait']:
size = (height * mm, width * mm)
else:
size = (width * mm, height * mm)
svg = svgwrite.Drawing(filename=self.filename, size=size)
g = svgwrite.container.Group(id='layer0')
for stroke_num, s in enumerate(js['strokes']):
points_with_sk_width = []
for p in s['points']:
x, y = p['position']
# Normalize coordinates too
x, y = x / 1000, y / 1000
if self.orientation == 'reverse-portrait':
x, y = y, width - x
elif self.orientation == 'portrait':
x, y = height - y, x
elif self.orientation == 'reverse-landscape':
x, y = width - x, height - y
# Pressure normalized range is [0, 0xffff]
delta = (p['pressure'] - 0x8000) / 0x8000
stroke_width = 0.4 + 0.20 * delta
points_with_sk_width.append((x, y, stroke_width))
lines = svgwrite.container.Group(id=f'strokes_{stroke_num}', stroke='black')
for i, (x, y, stroke_width) in enumerate(points_with_sk_width):
if i != 0:
xp, yp, stroke_width_p = points_with_sk_width[i - 1]
lines.add(
svg.line(
start=(xp * mm, yp * mm),
end=(x * mm, y * mm),
stroke_width=stroke_width,
style='fill:none'
)
)
g.add(lines)
svg.add(g)
svg.save()

View File

@ -12,13 +12,13 @@
#
def list2hex(lst, groupsize=8):
def list2hex(l, groupsize=8):
'''Converts a list of integers to a two-letter hex string in the form
"1a 2b c3"'''
slices = []
for idx in range(0, len(lst), groupsize):
s = ' '.join([f'{x:02x}' for x in lst[idx:idx + groupsize]])
for idx in range(0, len(l), groupsize):
s = ' '.join([f'{x:02x}' for x in l[idx:idx + groupsize]])
slices.append(s)
return ' '.join(slices)

View File

@ -113,10 +113,10 @@ def b2hex(bs):
return ' '.join([''.join(s) for s in zip(hx[::2], hx[1::2])])
def list2hexlist(lst):
def list2hexlist(l):
'''Converts a list of integers to a two-letter prefixed hex string in the form
"[0x1a, 0x32, 0xab]"'''
return '[' + ', '.join([f'{x:#04x}' for x in lst]) + ']'
return '[' + ', '.join([f'{x:#04x}' for x in l]) + ']'
class DataLogger(object):
@ -204,7 +204,7 @@ class DataLogger(object):
self.logfile.write(f'name: {self.device.name}\n')
self.logfile.write(f'bluetooth: {self.btaddr}\n')
self.logfile.write(f'time: {timestamp} # host time: {time.strftime("%Y-%m-%d %H:%M:%S")}\n')
self.logfile.write('data:\n')
self.logfile.write(f'data:\n')
def _close_file(self):
if self.logfile is None:
@ -268,6 +268,78 @@ class DataLogger(object):
self._in_context = False
class WacomPacket(GObject.Object):
'''
A single protocol packet of variable length. The protocol format is a
single-byte bitmask followed by up to 8 bytes (depending on the number
of 1-bits in the bitmask). Each byte represents the matching bit in the
bitmask, i.e. the data is non-sparse.
If the bitmask has 0x1 and/or 0x2 set, those two bytes make up the
opcode of the command. So the possible layouts are:
| bitmask | opcode1 | opcode2 | payload ...
| bitmask | opcode1 | payload ...
| bitmask | opcode2 | payload ...
| bitmask | payload
On most normal packets containing motion data, the opcode is not
present.
Attributes:
bitmask .. single byte with a bitmask denoting the contents
opcode ... the 16-bit opcode or None for 'special' packets. Note that
the opcode is converted into an integer from the
little-endian protocol format
bytes .... a list of the payload bytes as sent by the device. This is
a non-sparse list matching the number of set bits in the
bitmask. it does not include the bitmask.
args ..... a sparse list of the payload bytes, expanded to match the
bitmask so that args[x] is the value for each bit x in
bitmask. it does not include the bitmask.
length ... length of the packet in bytes, including bitmask
'''
def __init__(self, data):
self.bitmask = data[0]
nbytes = bin(self.bitmask).count('1')
self.bytes = data[1:1 + nbytes]
self.length = nbytes + 1 # for the bitmask
idx = 0
# 2-byte opcode, but only if the bitmask is set for either byte
opcode = 0
if self.bitmask & 0x1:
opcode |= self.bytes[idx]
idx += 1
if self.bitmask & 0x2:
opcode |= self.bytes[idx] << 8
idx += 1
self.opcode = opcode if opcode else None
self.args = []
vals = self.bytes.copy()
mask = self.bitmask
while mask != 0:
self.args.append(vals.pop(0) if mask & 0x1 else 0x00)
mask >>= 1
def __repr__(self):
debug_data = []
debug_data.append(f'{self.bitmask:02x} ({self.bitmask:08b}) |')
if self.opcode:
debug_data.append(f'{self.opcode:04x} |')
else:
debug_data.append(f' |')
for i in range(2, 8): # start at 2 to skip the opcode
if self.bitmask & (1 << i):
debug_data.append(f'{self.args[i]:02x}')
else:
debug_data.append(' ')
return " ".join(debug_data)
class WacomProtocolLowLevelComm(GObject.Object):
'''
Internal class to handle the communication with the Wacom device.
@ -351,13 +423,6 @@ class WacomRegisterHelper(WacomProtocolLowLevelComm):
except AuthorizationError:
# this is expected
pass
except Exception as e:
logger.exception('Got other Exception while registering Spark device')
if e.errorcode == DeviceError.ErrorCode.GENERAL_ERROR:
logger.debug('Got GENERAL_ERROR while registering Spark device')
pass
else:
raise
# The "press button now command" on the spark
self.p.execute(Interactions.REGISTER_PRESS_BUTTON)
@ -414,7 +479,7 @@ class WacomProtocolBase(WacomProtocolLowLevelComm):
device.connect_gatt_value(WACOM_OFFLINE_CHRC_PEN_DATA_UUID,
self._on_pen_data_received)
@GObject.Property
@GObject.property
def dimensions(self):
return (self.width, self.height)
@ -441,7 +506,7 @@ class WacomProtocolBase(WacomProtocolLowLevelComm):
data = value[2:]
while data:
if bytes(data) == b'\xff\xff\xff\xff\xff\xff':
logger.debug('Pen left proximity')
logger.debug(f'Pen left proximity')
if self._uhid_device is not None:
self._uhid_device.call_input_event([1, 0, 0, 0, 0, 0, 0, 0])
@ -485,7 +550,7 @@ class WacomProtocolBase(WacomProtocolLowLevelComm):
tdelta = time.mktime(time.gmtime()) - time.mktime(t)
if abs(tdelta) > 300:
logger.error('device time is out by more than 5 minutes')
logger.error(f'device time is out by more than 5 minutes')
def get_battery_info(self):
msg = self.p.execute(Interactions.GET_BATTERY)
@ -652,7 +717,7 @@ class WacomProtocolBase(WacomProtocolLowLevelComm):
self.emit('drawing', drawing)
file_count -= 1
if TuhiConfig().peek_at_drawing:
logger.info('Not deleting drawing from device')
logger.info(f'Not deleting drawing from device')
if file_count > 0:
logger.info(f'{file_count} more files on device but I can only download the oldest one')
break
@ -740,13 +805,12 @@ class WacomProtocolSlate(WacomProtocolSpark):
self._on_sysevent_data_received)
def live_mode(self, mode, uhid):
if mode:
# Slate tablet has two models A5 and A4
# Here, we read real tablet dimensions before
# starting live mode
self.update_dimensions()
self.x_max = int(self.width / self.point_size) - 1000
self.y_max = int(self.height / self.point_size) - 500
# Slate tablet has two models A5 and A4
# Here, we read real tablet dimensions before
# starting live mode
self.update_dimensions()
self.x_max = self.width - 1000
self.y_max = self.height - 500
return super().live_mode(mode, uhid)
@ -886,7 +950,7 @@ class WacomDevice(GObject.Object):
protocol = ProtocolVersion.from_string(self._config['Protocol'])
self._init_protocol(protocol)
except (KeyError, ValueError):
logger.error('Missing or invalid Protocol entry in config file. Treating this device as unregistered')
logger.error(f'Missing or invalid Protocol entry in config file. Treating this device as unregistered')
self._uuid = None
def _init_protocol(self, protocol):
@ -963,7 +1027,7 @@ class WacomDevice(GObject.Object):
mode = args[0]
logger.debug(f'{self._device.address}: starting for mode {mode.name}')
logger.debug(f'{self._device.address}: starting')
self._is_running = True
exception = None
try:
@ -980,7 +1044,7 @@ class WacomDevice(GObject.Object):
logger.error(f'**** Exception: {e} ****')
exception = e
except AuthorizationError as e:
logger.error('Authorization failed, device needs to be re-registered')
logger.error(f'Authorization failed, device needs to be re-registered')
exception = e
finally:
self.sync_state = 0