From 04f25a0a38347c5e50949d3d4068a2113149b663 Mon Sep 17 00:00:00 2001 From: Benjamin Tissoires Date: Mon, 29 Jan 2018 11:38:14 +0100 Subject: [PATCH] drop .format(), switch to f{} everywhere and change some double quotes into single quotes when encountering them. It's a new project, so force use of Python 3.6 right now Fixes #29 --- README.md | 5 +++++ tools/tuhi-kete.py | 30 +++++++++++++++--------------- tuhi/base.py | 10 +++++----- tuhi/ble.py | 42 +++++++++++++++++++++--------------------- tuhi/config.py | 10 +++++----- tuhi/dbusserver.py | 10 +++++----- tuhi/wacom.py | 12 ++++++------ 7 files changed, 62 insertions(+), 57 deletions(-) diff --git a/README.md b/README.md index fa98405..7d54304 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,11 @@ Installation $> tuhi ``` +Note +---- + +TUHI requires Python v3.6 or above. + Units used by this interface ---------------------------- diff --git a/tools/tuhi-kete.py b/tools/tuhi-kete.py index e3c8757..0cc09bc 100755 --- a/tools/tuhi-kete.py +++ b/tools/tuhi-kete.py @@ -103,7 +103,7 @@ class _DBusObject(GObject.Object): raise e if self.proxy.get_name_owner() is None: - raise DBusError('No-one is handling {}, is the daemon running?'.format(name)) + raise DBusError(f'No-one is handling {name}, is the daemon running?') self.proxy.connect('g-properties-changed', self._on_properties_changed) self.proxy.connect('g-signal', self._on_signal_received) @@ -164,7 +164,7 @@ class TuhiKeteDevice(_DBusObject): return self.property('DrawingsAvailable') def pair(self): - logger.debug('{}: Pairing'.format(self)) + logger.debug(f'{self}: Pairing') # FIXME: Pair() doesn't return anything useful yet, so we wait until # the device is in the Manager's Devices property self.manager.connect('notify::devices', self._on_mgr_devices_updated) @@ -201,7 +201,7 @@ class TuhiKeteDevice(_DBusObject): self.notify('listening') def __repr__(self): - return '{} - {}'.format(self.address, self.name) + return f'{self.address} - {self.name}' def _on_mgr_devices_updated(self, manager, pspec): if not self.is_pairing: @@ -298,7 +298,7 @@ class TuhiKeteManager(_DBusObject): objpath = parameters[0] device = TuhiKeteDevice(self, objpath) self._pairable_devices[objpath] = device - logger.debug('Found pairable device: {}'.format(device)) + logger.debug(f'Found pairable device: {device}') self.emit('pairable-device', device) def _on_name_vanished(self, connection, name): @@ -409,7 +409,7 @@ class Searcher(Worker): device.pair() def _on_pairable_device(self, manager, device): - logger.info('Pairable device: {}'.format(device)) + logger.info(f'Pairable device: {device}') if self.interactive: self._on_pairable_device_interactive(manager, device) @@ -427,7 +427,7 @@ class Listener(Worker): self.device = d break else: - logger.error("{}: device not found".format(args.address)) + logger.error(f'{args.address}: device not found') # FIXME: this should be an exception return @@ -439,16 +439,16 @@ class Listener(Worker): self._log_drawings_available(self.device) if self.device.listening: - logger.info("{}: device already listening".format(self.device)) + logger.info(f'{self.device}: device already listening') return - logger.debug("{}: starting listening".format(self.device)) + logger.debug(f'{self.device}: starting listening') self.s1 = self.device.connect('notify::listening', self._on_device_listening) self.s2 = self.device.connect('notify::drawings-available', self._on_drawings_available) self.device.start_listening() def stop(self): - logger.debug("{}: stopping listening".format(self.device)) + logger.debug(f'{self.device}: stopping listening') try: self.device.stop_listening() self.device.disconnect(self.s1) @@ -463,14 +463,14 @@ class Listener(Worker): if self.device.listening: return - logger.info('{}: Listening stopped'.format(device)) + logger.info(f'{device}: Listening stopped') def _on_drawings_available(self, device, pspec): self._log_drawings_available(device) def _log_drawings_available(self, device): - s = ", ".join(["{}".format(t) for t in device.drawings_available]) - logger.info('{}: drawings available: {}'.format(device, s)) + s = ', '.join([f'{t}' for t in device.drawings_available]) + logger.info(f'{device}: drawings available: {s}') class Fetcher(Worker): @@ -486,7 +486,7 @@ class Fetcher(Worker): self.device = d break else: - logger.error("{}: device not found".format(address)) + logger.error(f'{address}: device not found') return if index != 'all': @@ -496,7 +496,7 @@ class Fetcher(Worker): raise ValueError() self.indices = [index] except ValueError: - logger.error("Invalid index {}".format(index)) + logger.error(f'Invalid index {index}') return else: self.indices = self.device.drawings_available @@ -858,7 +858,7 @@ class TuhiKeteShell(cmd.Cmd): device = d break else: - logger.error("{}: device not found".format(address)) + logger.error(f'{address}: device not found') return device.pair() diff --git a/tuhi/base.py b/tuhi/base.py index 103a1a5..5c7c2ae 100644 --- a/tuhi/base.py +++ b/tuhi/base.py @@ -97,7 +97,7 @@ class TuhiDevice(GObject.Object): self._bluez_device.connect_device() def _on_bluez_device_connected(self, bluez_device): - logger.debug('{}: connected'.format(bluez_device.address)) + logger.debug(f'{bluez_device.address}: connected') if self._wacom_device is None: self._wacom_device = WacomDevice(bluez_device, self._uuid) self._wacom_device.connect('drawing', self._on_drawing_received) @@ -109,7 +109,7 @@ class TuhiDevice(GObject.Object): self.pairing_mode = False def _on_bluez_device_disconnected(self, bluez_device): - logger.debug('{}: disconnected'.format(bluez_device.address)) + logger.debug(f'{bluez_device.address}: disconnected') def _on_pair_requested(self, dbus_device): if self.paired: @@ -228,9 +228,9 @@ class Tuhi(GObject.Object): if not pairing_device: if uuid is None: - logger.info('{}: device without config, must be paired first'.format(bluez_device.address)) + logger.info(f'{bluez_device.address}: device without config, must be paired first') return - logger.debug('{}: UUID {}'.format(bluez_device.address, uuid)) + logger.debug(f'{bluez_device.address}: UUID {uuid}') # create the device if unknown from us if bluez_device.address not in self.devices: @@ -243,7 +243,7 @@ class Tuhi(GObject.Object): if pairing_device: d.paired = False - logger.debug('{}: call Pair() on device'.format(bluez_device.objpath)) + logger.debug(f'{bluez_device.objpath}: call Pair() on device') elif d.listening: d.connect_device() diff --git a/tuhi/ble.py b/tuhi/ble.py index d430ed5..3899095 100755 --- a/tuhi/ble.py +++ b/tuhi/ble.py @@ -72,7 +72,7 @@ class BlueZCharacteristic(GObject.Object): pass def __repr__(self): - return 'Characteristic {}:{}'.format(self.uuid, self.objpath) + return f'Characteristic {self.uuid}:{self.objpath}' class BlueZDevice(GObject.Object): @@ -110,7 +110,7 @@ class BlueZDevice(GObject.Object): self.interface = obj.get_interface(ORG_BLUEZ_DEVICE1) assert(self.interface is not None) - logger.debug('Device {} - {} - {}'.format(self.objpath, self.address, self.name)) + logger.debug(f'Device {self.objpath} - {self.address} - {self.name}') self.characteristics = {} self.resolve(om) @@ -172,7 +172,7 @@ class BlueZDevice(GObject.Object): if device != self.objpath: continue - logger.debug("GattService1: {} for device {}".format(obj.get_object_path(), device)) + logger.debug(f'GattService1: {obj.get_object_path()} for device {device}') self.gatt_services.append(obj) self._resolve_gatt_characteristics(obj, objects) @@ -190,7 +190,7 @@ class BlueZDevice(GObject.Object): if chrc.uuid in self.characteristics: continue - logger.debug("GattCharacteristic: {} for service {}".format(chrc.uuid, service)) + logger.debug(f'GattCharacteristic: {chrc.uuid} for service {service}') self.characteristics[chrc.uuid] = chrc @@ -201,11 +201,11 @@ class BlueZDevice(GObject.Object): """ i = self.obj.get_interface(ORG_BLUEZ_DEVICE1) if self.connected: - logger.info('{}: Device is already connected'.format(self.address)) + logger.info(f'{self.address}: Device is already connected') self.emit('connected') return - logger.info('{}: Connecting'.format(self.address)) + logger.info(f'{self.address}: Connecting') i.Connect(result_handler=self._on_connect_result) def _on_connect_result(self, obj, result, user_data): @@ -214,9 +214,9 @@ class BlueZDevice(GObject.Object): result.code == Gio.IOErrorEnum.DBUS_ERROR and Gio.dbus_error_get_remote_error(result) == 'org.bluez.Error.Failed' and 'Operation already in progress' in result.message): - logger.debug('{}: Already connecting'.format(self.address)) + logger.debug(f'{self.address}: Already connecting') elif isinstance(result, Exception): - logger.error('Connection failed: {}'.format(result)) + logger.error(f'Connection failed: {result}') def disconnect_device(self): """ @@ -225,16 +225,16 @@ class BlueZDevice(GObject.Object): """ i = self.obj.get_interface(ORG_BLUEZ_DEVICE1) if not i.get_cached_property('Connected').get_boolean(): - logger.info('{}: Device is already disconnected'.format(self.address)) + logger.info(f'{self.address}: Device is already disconnected') self.emit('disconnected') return - logger.info('{}: Disconnecting'.format(self.address)) + logger.info(f'{self.address}: Disconnecting') i.Disconnect(result_handler=self._on_disconnect_result) def _on_disconnect_result(self, obj, result, user_data): if isinstance(result, Exception): - logger.error('Disconnection failed: {}'.format(result)) + logger.error(f'Disconnection failed: {result}') def _on_properties_changed(self, obj, properties, invalidated_properties): properties = properties.unpack() @@ -264,7 +264,7 @@ class BlueZDevice(GObject.Object): pass def __repr__(self): - return 'Device {}:{}'.format(self.name, self.objpath) + return f'Device {self.name}:{self.objpath}' class BlueZDeviceManager(GObject.Object): @@ -340,12 +340,12 @@ class BlueZDeviceManager(GObject.Object): objpath = obj.get_object_path() try: i.StartDiscovery() - logger.debug('{}: Discovery started (timeout {})'.format(objpath, timeout)) + logger.debug(f'{objpath}: Discovery started (timeout {timeout})') except GLib.Error as e: if (e.domain == 'g-io-error-quark' and e.code == Gio.IOErrorEnum.DBUS_ERROR and Gio.dbus_error_get_remote_error(e) == 'org.bluez.Error.InProgress'): - logger.debug('{}: Already listening'.format(objpath)) + logger.debug(f'{objpath}: Already listening') if timeout > 0: GObject.timeout_add_seconds(timeout, self._discovery_timeout_expired) @@ -372,9 +372,9 @@ class BlueZDeviceManager(GObject.Object): objpath = obj.get_object_path() try: i.StopDiscovery() - logger.debug('{}: Discovery stopped'.format(objpath)) + logger.debug(f'{objpath}: Discovery stopped') except GLib.Error as e: - logger.debug('{}: Failed to stop discovery ({})'.format(objpath, e)) + logger.debug(f'{objpath}: Failed to stop discovery ({e})') # reset the discovery filters i.SetDiscoveryFilter('(a{sv})', {}) @@ -383,14 +383,14 @@ class BlueZDeviceManager(GObject.Object): def _on_device_updated(self, device): """Callback for Device's properties-changed""" - logger.debug('Object updated: {}'.format(device.name)) + logger.debug(f'Object updated: {device.name}') self.emit("device-updated", device) def _on_om_object_added(self, om, obj): """Callback for ObjectManager's object-added""" objpath = obj.get_object_path() - logger.debug('Object added: {}'.format(objpath)) + logger.debug(f'Object added: {objpath}') needs_resolve = self._process_object(obj, event=True) # we had at least one characteristic added, need to resolve the @@ -403,7 +403,7 @@ class BlueZDeviceManager(GObject.Object): def _on_om_object_removed(self, om, obj): """Callback for ObjectManager's object-removed""" objpath = obj.get_object_path() - logger.debug('Object removed: {}'.format(objpath)) + logger.debug(f'Object removed: {objpath}') def _process_object(self, obj, event=True): """Process a single DBusProxyObject""" @@ -419,7 +419,7 @@ class BlueZDeviceManager(GObject.Object): def _process_adapter(self, obj): objpath = obj.get_object_path() - logger.debug('Adapter: {}'.format(objpath)) + logger.debug(f'Adapter: {objpath}') def _process_device(self, obj, event=True): dev = BlueZDevice(self._om, obj) @@ -429,4 +429,4 @@ class BlueZDeviceManager(GObject.Object): def _process_characteristic(self, obj): objpath = obj.get_object_path() - logger.debug('Characteristic {}'.format(objpath)) + logger.debug(f'Characteristic {objpath}') diff --git a/tuhi/config.py b/tuhi/config.py index 4db61f5..5bd8e0b 100644 --- a/tuhi/config.py +++ b/tuhi/config.py @@ -60,7 +60,7 @@ class TuhiConfig(GObject.Object): if not os.path.isfile(path): continue - logger.debug("{}: configuration found".format(entry.name)) + logger.debug(f'{entry.name}: configuration found') config = configparser.ConfigParser() config.read(path) @@ -71,7 +71,7 @@ class TuhiConfig(GObject.Object): assert is_btaddr(address) assert len(uuid) == 12 - logger.debug("{}: adding new config, UUID {}".format(address, uuid)) + logger.debug(f'{address}: adding new config, UUID {uuid}') path = os.path.join(ROOT_PATH, address) try: os.mkdir(path) @@ -104,11 +104,11 @@ class TuhiConfig(GObject.Object): assert drawing is not None if address not in self.devices: - logger.error("{}: cannot store drawings for unknown device".format(address)) + logger.error(f'{address}: cannot store drawings for unknown device') return - logger.debug("{}: adding new drawing, timestamp {}".format(address, drawing.timestamp)) - path = os.path.join(ROOT_PATH, address, "{}.json".format(drawing.timestamp)) + logger.debug(f'{address}: adding new drawing, timestamp {drawing.timestamp}') + path = os.path.join(ROOT_PATH, address, f'{drawing.timestamp}.json') with open(path, "w") as f: f.write(drawing.to_json()) diff --git a/tuhi/dbusserver.py b/tuhi/dbusserver.py index 16b2c24..18513d1 100755 --- a/tuhi/dbusserver.py +++ b/tuhi/dbusserver.py @@ -135,7 +135,7 @@ class TuhiDBusDevice(_TuhiDBus): def __init__(self, device, connection): objpath = device.address.replace(':', '_') - objpath = "{}/{}".format(BASE_PATH, objpath) + objpath = f'{BASE_PATH}/{objpath}' _TuhiDBus.__init__(self, connection, objpath, INTF_DEVICE) self.name = device.name @@ -243,7 +243,7 @@ class TuhiDBusDevice(_TuhiDBus): def _start_listening(self, connection, sender): if self.listening: - logger.debug("{} - already listening".format(self)) + logger.debug(f'{self} - already listening') # silently ignore it for the current client but send EAGAIN to # other clients @@ -261,7 +261,7 @@ class TuhiDBusDevice(_TuhiDBus): callback=self._on_name_owner_changed_signal_cb, user_data=sender) self._listening_client = (sender, s) - logger.debug('Listening started on {} for {}'.format(self.name, sender)) + logger.debug(f'Listening started on {self.name} for {sender}') self.listening = True self.notify('listening') @@ -281,7 +281,7 @@ class TuhiDBusDevice(_TuhiDBus): connection.signal_unsubscribe(self._listening_client[1]) self._listening_client = None - logger.debug('Listening stopped on {} for {}'.format(self.name, sender)) + logger.debug(f'Listening stopped on {self.name} for {sender}') self.notify('listening') @@ -311,7 +311,7 @@ class TuhiDBusDevice(_TuhiDBus): self.signal('ButtonPressRequired') def __repr__(self): - return "{} - {}".format(self.objpath, self.name) + return f'{self.objpath} - {self.name}' class TuhiDBusServer(_TuhiDBus): diff --git a/tuhi/wacom.py b/tuhi/wacom.py index a131179..7272564 100644 --- a/tuhi/wacom.py +++ b/tuhi/wacom.py @@ -57,7 +57,7 @@ def b2hex(bs): def list2hex(l): '''Converts a list of integers to a two-letter hex string in the form "1a 2b c3"''' - return ' '.join(['{:02x}'.format(x) for x in l]) + return ' '.join([f'{x:02x}' for x in l]) class NordicData(list): @@ -351,7 +351,7 @@ class WacomDevice(GObject.Object): expected_opcode=0xcf) # logger.debug(f'cc returned {data} ') count = int.from_bytes(data[0:4], byteorder='little') - str_timestamp = ''.join(['{:02x}'.format(d) for d in data[4:]]) + str_timestamp = ''.join([f'{d:02x}' for d in data[4:]]) timestamp = time.strptime(str_timestamp, "%y%m%d%H%M%S") return count, timestamp @@ -366,7 +366,7 @@ class WacomDevice(GObject.Object): data = self.wait_nordic_data(0xcd, 5) # logger.debug(f'cc returned {data} ') - str_timestamp = ''.join(['{:02x}'.format(d) for d in data]) + str_timestamp = ''.join([f'{d:02x}' for d in data]) timestamp = time.strptime(str_timestamp, "%y%m%d%H%M%S") return count, timestamp @@ -609,7 +609,7 @@ class WacomDevice(GObject.Object): def pair_device(self): self._uuid = uuid.uuid4().hex[:12] - logger.debug("{}: pairing device, assigned {}".format(self.device.address, self.uuid)) + logger.debug(f'{self.device.address}: pairing device, assigned {self.uuid}') if self.is_slate(): self.pair_device_slate() else: @@ -619,10 +619,10 @@ class WacomDevice(GObject.Object): def run(self): if self._is_running: - logger.error('{}: already synching, ignoring this request'.format(self.device.address)) + logger.error(f'{self.device.address}: already synching, ignoring this request') return - logger.debug('{}: starting'.format(self.device.address)) + logger.debug(f'{self.device.address}: starting') self._is_running = True exception = None try: