drop .format(), switch to f{} everywhere

and change some double quotes into single quotes when encountering them.

It's a new project, so force use of Python 3.6 right now

Fixes #29
This commit is contained in:
Benjamin Tissoires 2018-01-29 11:38:14 +01:00 committed by Peter Hutterer
parent 2aa8b0dc95
commit 04f25a0a38
7 changed files with 62 additions and 57 deletions

View File

@ -26,6 +26,11 @@ Installation
$> tuhi $> tuhi
``` ```
Note
----
TUHI requires Python v3.6 or above.
Units used by this interface Units used by this interface
---------------------------- ----------------------------

View File

@ -103,7 +103,7 @@ class _DBusObject(GObject.Object):
raise e raise e
if self.proxy.get_name_owner() is None: if self.proxy.get_name_owner() is None:
raise DBusError('No-one is handling {}, is the daemon running?'.format(name)) raise DBusError(f'No-one is handling {name}, is the daemon running?')
self.proxy.connect('g-properties-changed', self._on_properties_changed) self.proxy.connect('g-properties-changed', self._on_properties_changed)
self.proxy.connect('g-signal', self._on_signal_received) self.proxy.connect('g-signal', self._on_signal_received)
@ -164,7 +164,7 @@ class TuhiKeteDevice(_DBusObject):
return self.property('DrawingsAvailable') return self.property('DrawingsAvailable')
def pair(self): def pair(self):
logger.debug('{}: Pairing'.format(self)) logger.debug(f'{self}: Pairing')
# FIXME: Pair() doesn't return anything useful yet, so we wait until # FIXME: Pair() doesn't return anything useful yet, so we wait until
# the device is in the Manager's Devices property # the device is in the Manager's Devices property
self.manager.connect('notify::devices', self._on_mgr_devices_updated) self.manager.connect('notify::devices', self._on_mgr_devices_updated)
@ -201,7 +201,7 @@ class TuhiKeteDevice(_DBusObject):
self.notify('listening') self.notify('listening')
def __repr__(self): def __repr__(self):
return '{} - {}'.format(self.address, self.name) return f'{self.address} - {self.name}'
def _on_mgr_devices_updated(self, manager, pspec): def _on_mgr_devices_updated(self, manager, pspec):
if not self.is_pairing: if not self.is_pairing:
@ -298,7 +298,7 @@ class TuhiKeteManager(_DBusObject):
objpath = parameters[0] objpath = parameters[0]
device = TuhiKeteDevice(self, objpath) device = TuhiKeteDevice(self, objpath)
self._pairable_devices[objpath] = device self._pairable_devices[objpath] = device
logger.debug('Found pairable device: {}'.format(device)) logger.debug(f'Found pairable device: {device}')
self.emit('pairable-device', device) self.emit('pairable-device', device)
def _on_name_vanished(self, connection, name): def _on_name_vanished(self, connection, name):
@ -409,7 +409,7 @@ class Searcher(Worker):
device.pair() device.pair()
def _on_pairable_device(self, manager, device): def _on_pairable_device(self, manager, device):
logger.info('Pairable device: {}'.format(device)) logger.info(f'Pairable device: {device}')
if self.interactive: if self.interactive:
self._on_pairable_device_interactive(manager, device) self._on_pairable_device_interactive(manager, device)
@ -427,7 +427,7 @@ class Listener(Worker):
self.device = d self.device = d
break break
else: else:
logger.error("{}: device not found".format(args.address)) logger.error(f'{args.address}: device not found')
# FIXME: this should be an exception # FIXME: this should be an exception
return return
@ -439,16 +439,16 @@ class Listener(Worker):
self._log_drawings_available(self.device) self._log_drawings_available(self.device)
if self.device.listening: if self.device.listening:
logger.info("{}: device already listening".format(self.device)) logger.info(f'{self.device}: device already listening')
return return
logger.debug("{}: starting listening".format(self.device)) logger.debug(f'{self.device}: starting listening')
self.s1 = self.device.connect('notify::listening', self._on_device_listening) self.s1 = self.device.connect('notify::listening', self._on_device_listening)
self.s2 = self.device.connect('notify::drawings-available', self._on_drawings_available) self.s2 = self.device.connect('notify::drawings-available', self._on_drawings_available)
self.device.start_listening() self.device.start_listening()
def stop(self): def stop(self):
logger.debug("{}: stopping listening".format(self.device)) logger.debug(f'{self.device}: stopping listening')
try: try:
self.device.stop_listening() self.device.stop_listening()
self.device.disconnect(self.s1) self.device.disconnect(self.s1)
@ -463,14 +463,14 @@ class Listener(Worker):
if self.device.listening: if self.device.listening:
return return
logger.info('{}: Listening stopped'.format(device)) logger.info(f'{device}: Listening stopped')
def _on_drawings_available(self, device, pspec): def _on_drawings_available(self, device, pspec):
self._log_drawings_available(device) self._log_drawings_available(device)
def _log_drawings_available(self, device): def _log_drawings_available(self, device):
s = ", ".join(["{}".format(t) for t in device.drawings_available]) s = ', '.join([f'{t}' for t in device.drawings_available])
logger.info('{}: drawings available: {}'.format(device, s)) logger.info(f'{device}: drawings available: {s}')
class Fetcher(Worker): class Fetcher(Worker):
@ -486,7 +486,7 @@ class Fetcher(Worker):
self.device = d self.device = d
break break
else: else:
logger.error("{}: device not found".format(address)) logger.error(f'{address}: device not found')
return return
if index != 'all': if index != 'all':
@ -496,7 +496,7 @@ class Fetcher(Worker):
raise ValueError() raise ValueError()
self.indices = [index] self.indices = [index]
except ValueError: except ValueError:
logger.error("Invalid index {}".format(index)) logger.error(f'Invalid index {index}')
return return
else: else:
self.indices = self.device.drawings_available self.indices = self.device.drawings_available
@ -858,7 +858,7 @@ class TuhiKeteShell(cmd.Cmd):
device = d device = d
break break
else: else:
logger.error("{}: device not found".format(address)) logger.error(f'{address}: device not found')
return return
device.pair() device.pair()

View File

@ -97,7 +97,7 @@ class TuhiDevice(GObject.Object):
self._bluez_device.connect_device() self._bluez_device.connect_device()
def _on_bluez_device_connected(self, bluez_device): def _on_bluez_device_connected(self, bluez_device):
logger.debug('{}: connected'.format(bluez_device.address)) logger.debug(f'{bluez_device.address}: connected')
if self._wacom_device is None: if self._wacom_device is None:
self._wacom_device = WacomDevice(bluez_device, self._uuid) self._wacom_device = WacomDevice(bluez_device, self._uuid)
self._wacom_device.connect('drawing', self._on_drawing_received) self._wacom_device.connect('drawing', self._on_drawing_received)
@ -109,7 +109,7 @@ class TuhiDevice(GObject.Object):
self.pairing_mode = False self.pairing_mode = False
def _on_bluez_device_disconnected(self, bluez_device): def _on_bluez_device_disconnected(self, bluez_device):
logger.debug('{}: disconnected'.format(bluez_device.address)) logger.debug(f'{bluez_device.address}: disconnected')
def _on_pair_requested(self, dbus_device): def _on_pair_requested(self, dbus_device):
if self.paired: if self.paired:
@ -228,9 +228,9 @@ class Tuhi(GObject.Object):
if not pairing_device: if not pairing_device:
if uuid is None: if uuid is None:
logger.info('{}: device without config, must be paired first'.format(bluez_device.address)) logger.info(f'{bluez_device.address}: device without config, must be paired first')
return return
logger.debug('{}: UUID {}'.format(bluez_device.address, uuid)) logger.debug(f'{bluez_device.address}: UUID {uuid}')
# create the device if unknown from us # create the device if unknown from us
if bluez_device.address not in self.devices: if bluez_device.address not in self.devices:
@ -243,7 +243,7 @@ class Tuhi(GObject.Object):
if pairing_device: if pairing_device:
d.paired = False d.paired = False
logger.debug('{}: call Pair() on device'.format(bluez_device.objpath)) logger.debug(f'{bluez_device.objpath}: call Pair() on device')
elif d.listening: elif d.listening:
d.connect_device() d.connect_device()

View File

@ -72,7 +72,7 @@ class BlueZCharacteristic(GObject.Object):
pass pass
def __repr__(self): def __repr__(self):
return 'Characteristic {}:{}'.format(self.uuid, self.objpath) return f'Characteristic {self.uuid}:{self.objpath}'
class BlueZDevice(GObject.Object): class BlueZDevice(GObject.Object):
@ -110,7 +110,7 @@ class BlueZDevice(GObject.Object):
self.interface = obj.get_interface(ORG_BLUEZ_DEVICE1) self.interface = obj.get_interface(ORG_BLUEZ_DEVICE1)
assert(self.interface is not None) assert(self.interface is not None)
logger.debug('Device {} - {} - {}'.format(self.objpath, self.address, self.name)) logger.debug(f'Device {self.objpath} - {self.address} - {self.name}')
self.characteristics = {} self.characteristics = {}
self.resolve(om) self.resolve(om)
@ -172,7 +172,7 @@ class BlueZDevice(GObject.Object):
if device != self.objpath: if device != self.objpath:
continue continue
logger.debug("GattService1: {} for device {}".format(obj.get_object_path(), device)) logger.debug(f'GattService1: {obj.get_object_path()} for device {device}')
self.gatt_services.append(obj) self.gatt_services.append(obj)
self._resolve_gatt_characteristics(obj, objects) self._resolve_gatt_characteristics(obj, objects)
@ -190,7 +190,7 @@ class BlueZDevice(GObject.Object):
if chrc.uuid in self.characteristics: if chrc.uuid in self.characteristics:
continue continue
logger.debug("GattCharacteristic: {} for service {}".format(chrc.uuid, service)) logger.debug(f'GattCharacteristic: {chrc.uuid} for service {service}')
self.characteristics[chrc.uuid] = chrc self.characteristics[chrc.uuid] = chrc
@ -201,11 +201,11 @@ class BlueZDevice(GObject.Object):
""" """
i = self.obj.get_interface(ORG_BLUEZ_DEVICE1) i = self.obj.get_interface(ORG_BLUEZ_DEVICE1)
if self.connected: if self.connected:
logger.info('{}: Device is already connected'.format(self.address)) logger.info(f'{self.address}: Device is already connected')
self.emit('connected') self.emit('connected')
return return
logger.info('{}: Connecting'.format(self.address)) logger.info(f'{self.address}: Connecting')
i.Connect(result_handler=self._on_connect_result) i.Connect(result_handler=self._on_connect_result)
def _on_connect_result(self, obj, result, user_data): def _on_connect_result(self, obj, result, user_data):
@ -214,9 +214,9 @@ class BlueZDevice(GObject.Object):
result.code == Gio.IOErrorEnum.DBUS_ERROR and result.code == Gio.IOErrorEnum.DBUS_ERROR and
Gio.dbus_error_get_remote_error(result) == 'org.bluez.Error.Failed' and Gio.dbus_error_get_remote_error(result) == 'org.bluez.Error.Failed' and
'Operation already in progress' in result.message): 'Operation already in progress' in result.message):
logger.debug('{}: Already connecting'.format(self.address)) logger.debug(f'{self.address}: Already connecting')
elif isinstance(result, Exception): elif isinstance(result, Exception):
logger.error('Connection failed: {}'.format(result)) logger.error(f'Connection failed: {result}')
def disconnect_device(self): def disconnect_device(self):
""" """
@ -225,16 +225,16 @@ class BlueZDevice(GObject.Object):
""" """
i = self.obj.get_interface(ORG_BLUEZ_DEVICE1) i = self.obj.get_interface(ORG_BLUEZ_DEVICE1)
if not i.get_cached_property('Connected').get_boolean(): if not i.get_cached_property('Connected').get_boolean():
logger.info('{}: Device is already disconnected'.format(self.address)) logger.info(f'{self.address}: Device is already disconnected')
self.emit('disconnected') self.emit('disconnected')
return return
logger.info('{}: Disconnecting'.format(self.address)) logger.info(f'{self.address}: Disconnecting')
i.Disconnect(result_handler=self._on_disconnect_result) i.Disconnect(result_handler=self._on_disconnect_result)
def _on_disconnect_result(self, obj, result, user_data): def _on_disconnect_result(self, obj, result, user_data):
if isinstance(result, Exception): if isinstance(result, Exception):
logger.error('Disconnection failed: {}'.format(result)) logger.error(f'Disconnection failed: {result}')
def _on_properties_changed(self, obj, properties, invalidated_properties): def _on_properties_changed(self, obj, properties, invalidated_properties):
properties = properties.unpack() properties = properties.unpack()
@ -264,7 +264,7 @@ class BlueZDevice(GObject.Object):
pass pass
def __repr__(self): def __repr__(self):
return 'Device {}:{}'.format(self.name, self.objpath) return f'Device {self.name}:{self.objpath}'
class BlueZDeviceManager(GObject.Object): class BlueZDeviceManager(GObject.Object):
@ -340,12 +340,12 @@ class BlueZDeviceManager(GObject.Object):
objpath = obj.get_object_path() objpath = obj.get_object_path()
try: try:
i.StartDiscovery() i.StartDiscovery()
logger.debug('{}: Discovery started (timeout {})'.format(objpath, timeout)) logger.debug(f'{objpath}: Discovery started (timeout {timeout})')
except GLib.Error as e: except GLib.Error as e:
if (e.domain == 'g-io-error-quark' and if (e.domain == 'g-io-error-quark' and
e.code == Gio.IOErrorEnum.DBUS_ERROR and e.code == Gio.IOErrorEnum.DBUS_ERROR and
Gio.dbus_error_get_remote_error(e) == 'org.bluez.Error.InProgress'): Gio.dbus_error_get_remote_error(e) == 'org.bluez.Error.InProgress'):
logger.debug('{}: Already listening'.format(objpath)) logger.debug(f'{objpath}: Already listening')
if timeout > 0: if timeout > 0:
GObject.timeout_add_seconds(timeout, self._discovery_timeout_expired) GObject.timeout_add_seconds(timeout, self._discovery_timeout_expired)
@ -372,9 +372,9 @@ class BlueZDeviceManager(GObject.Object):
objpath = obj.get_object_path() objpath = obj.get_object_path()
try: try:
i.StopDiscovery() i.StopDiscovery()
logger.debug('{}: Discovery stopped'.format(objpath)) logger.debug(f'{objpath}: Discovery stopped')
except GLib.Error as e: except GLib.Error as e:
logger.debug('{}: Failed to stop discovery ({})'.format(objpath, e)) logger.debug(f'{objpath}: Failed to stop discovery ({e})')
# reset the discovery filters # reset the discovery filters
i.SetDiscoveryFilter('(a{sv})', {}) i.SetDiscoveryFilter('(a{sv})', {})
@ -383,14 +383,14 @@ class BlueZDeviceManager(GObject.Object):
def _on_device_updated(self, device): def _on_device_updated(self, device):
"""Callback for Device's properties-changed""" """Callback for Device's properties-changed"""
logger.debug('Object updated: {}'.format(device.name)) logger.debug(f'Object updated: {device.name}')
self.emit("device-updated", device) self.emit("device-updated", device)
def _on_om_object_added(self, om, obj): def _on_om_object_added(self, om, obj):
"""Callback for ObjectManager's object-added""" """Callback for ObjectManager's object-added"""
objpath = obj.get_object_path() objpath = obj.get_object_path()
logger.debug('Object added: {}'.format(objpath)) logger.debug(f'Object added: {objpath}')
needs_resolve = self._process_object(obj, event=True) needs_resolve = self._process_object(obj, event=True)
# we had at least one characteristic added, need to resolve the # we had at least one characteristic added, need to resolve the
@ -403,7 +403,7 @@ class BlueZDeviceManager(GObject.Object):
def _on_om_object_removed(self, om, obj): def _on_om_object_removed(self, om, obj):
"""Callback for ObjectManager's object-removed""" """Callback for ObjectManager's object-removed"""
objpath = obj.get_object_path() objpath = obj.get_object_path()
logger.debug('Object removed: {}'.format(objpath)) logger.debug(f'Object removed: {objpath}')
def _process_object(self, obj, event=True): def _process_object(self, obj, event=True):
"""Process a single DBusProxyObject""" """Process a single DBusProxyObject"""
@ -419,7 +419,7 @@ class BlueZDeviceManager(GObject.Object):
def _process_adapter(self, obj): def _process_adapter(self, obj):
objpath = obj.get_object_path() objpath = obj.get_object_path()
logger.debug('Adapter: {}'.format(objpath)) logger.debug(f'Adapter: {objpath}')
def _process_device(self, obj, event=True): def _process_device(self, obj, event=True):
dev = BlueZDevice(self._om, obj) dev = BlueZDevice(self._om, obj)
@ -429,4 +429,4 @@ class BlueZDeviceManager(GObject.Object):
def _process_characteristic(self, obj): def _process_characteristic(self, obj):
objpath = obj.get_object_path() objpath = obj.get_object_path()
logger.debug('Characteristic {}'.format(objpath)) logger.debug(f'Characteristic {objpath}')

View File

@ -60,7 +60,7 @@ class TuhiConfig(GObject.Object):
if not os.path.isfile(path): if not os.path.isfile(path):
continue continue
logger.debug("{}: configuration found".format(entry.name)) logger.debug(f'{entry.name}: configuration found')
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read(path) config.read(path)
@ -71,7 +71,7 @@ class TuhiConfig(GObject.Object):
assert is_btaddr(address) assert is_btaddr(address)
assert len(uuid) == 12 assert len(uuid) == 12
logger.debug("{}: adding new config, UUID {}".format(address, uuid)) logger.debug(f'{address}: adding new config, UUID {uuid}')
path = os.path.join(ROOT_PATH, address) path = os.path.join(ROOT_PATH, address)
try: try:
os.mkdir(path) os.mkdir(path)
@ -104,11 +104,11 @@ class TuhiConfig(GObject.Object):
assert drawing is not None assert drawing is not None
if address not in self.devices: if address not in self.devices:
logger.error("{}: cannot store drawings for unknown device".format(address)) logger.error(f'{address}: cannot store drawings for unknown device')
return return
logger.debug("{}: adding new drawing, timestamp {}".format(address, drawing.timestamp)) logger.debug(f'{address}: adding new drawing, timestamp {drawing.timestamp}')
path = os.path.join(ROOT_PATH, address, "{}.json".format(drawing.timestamp)) path = os.path.join(ROOT_PATH, address, f'{drawing.timestamp}.json')
with open(path, "w") as f: with open(path, "w") as f:
f.write(drawing.to_json()) f.write(drawing.to_json())

View File

@ -135,7 +135,7 @@ class TuhiDBusDevice(_TuhiDBus):
def __init__(self, device, connection): def __init__(self, device, connection):
objpath = device.address.replace(':', '_') objpath = device.address.replace(':', '_')
objpath = "{}/{}".format(BASE_PATH, objpath) objpath = f'{BASE_PATH}/{objpath}'
_TuhiDBus.__init__(self, connection, objpath, INTF_DEVICE) _TuhiDBus.__init__(self, connection, objpath, INTF_DEVICE)
self.name = device.name self.name = device.name
@ -243,7 +243,7 @@ class TuhiDBusDevice(_TuhiDBus):
def _start_listening(self, connection, sender): def _start_listening(self, connection, sender):
if self.listening: if self.listening:
logger.debug("{} - already listening".format(self)) logger.debug(f'{self} - already listening')
# silently ignore it for the current client but send EAGAIN to # silently ignore it for the current client but send EAGAIN to
# other clients # other clients
@ -261,7 +261,7 @@ class TuhiDBusDevice(_TuhiDBus):
callback=self._on_name_owner_changed_signal_cb, callback=self._on_name_owner_changed_signal_cb,
user_data=sender) user_data=sender)
self._listening_client = (sender, s) self._listening_client = (sender, s)
logger.debug('Listening started on {} for {}'.format(self.name, sender)) logger.debug(f'Listening started on {self.name} for {sender}')
self.listening = True self.listening = True
self.notify('listening') self.notify('listening')
@ -281,7 +281,7 @@ class TuhiDBusDevice(_TuhiDBus):
connection.signal_unsubscribe(self._listening_client[1]) connection.signal_unsubscribe(self._listening_client[1])
self._listening_client = None self._listening_client = None
logger.debug('Listening stopped on {} for {}'.format(self.name, sender)) logger.debug(f'Listening stopped on {self.name} for {sender}')
self.notify('listening') self.notify('listening')
@ -311,7 +311,7 @@ class TuhiDBusDevice(_TuhiDBus):
self.signal('ButtonPressRequired') self.signal('ButtonPressRequired')
def __repr__(self): def __repr__(self):
return "{} - {}".format(self.objpath, self.name) return f'{self.objpath} - {self.name}'
class TuhiDBusServer(_TuhiDBus): class TuhiDBusServer(_TuhiDBus):

View File

@ -57,7 +57,7 @@ def b2hex(bs):
def list2hex(l): def list2hex(l):
'''Converts a list of integers to a two-letter hex string in the form '''Converts a list of integers to a two-letter hex string in the form
"1a 2b c3"''' "1a 2b c3"'''
return ' '.join(['{:02x}'.format(x) for x in l]) return ' '.join([f'{x:02x}' for x in l])
class NordicData(list): class NordicData(list):
@ -351,7 +351,7 @@ class WacomDevice(GObject.Object):
expected_opcode=0xcf) expected_opcode=0xcf)
# logger.debug(f'cc returned {data} ') # logger.debug(f'cc returned {data} ')
count = int.from_bytes(data[0:4], byteorder='little') count = int.from_bytes(data[0:4], byteorder='little')
str_timestamp = ''.join(['{:02x}'.format(d) for d in data[4:]]) str_timestamp = ''.join([f'{d:02x}' for d in data[4:]])
timestamp = time.strptime(str_timestamp, "%y%m%d%H%M%S") timestamp = time.strptime(str_timestamp, "%y%m%d%H%M%S")
return count, timestamp return count, timestamp
@ -366,7 +366,7 @@ class WacomDevice(GObject.Object):
data = self.wait_nordic_data(0xcd, 5) data = self.wait_nordic_data(0xcd, 5)
# logger.debug(f'cc returned {data} ') # logger.debug(f'cc returned {data} ')
str_timestamp = ''.join(['{:02x}'.format(d) for d in data]) str_timestamp = ''.join([f'{d:02x}' for d in data])
timestamp = time.strptime(str_timestamp, "%y%m%d%H%M%S") timestamp = time.strptime(str_timestamp, "%y%m%d%H%M%S")
return count, timestamp return count, timestamp
@ -609,7 +609,7 @@ class WacomDevice(GObject.Object):
def pair_device(self): def pair_device(self):
self._uuid = uuid.uuid4().hex[:12] self._uuid = uuid.uuid4().hex[:12]
logger.debug("{}: pairing device, assigned {}".format(self.device.address, self.uuid)) logger.debug(f'{self.device.address}: pairing device, assigned {self.uuid}')
if self.is_slate(): if self.is_slate():
self.pair_device_slate() self.pair_device_slate()
else: else:
@ -619,10 +619,10 @@ class WacomDevice(GObject.Object):
def run(self): def run(self):
if self._is_running: if self._is_running:
logger.error('{}: already synching, ignoring this request'.format(self.device.address)) logger.error(f'{self.device.address}: already synching, ignoring this request')
return return
logger.debug('{}: starting'.format(self.device.address)) logger.debug(f'{self.device.address}: starting')
self._is_running = True self._is_running = True
exception = None exception = None
try: try: