do not crash when SHT sensor library is missing

This commit is contained in:
Nicolò Balzarotti 2023-02-21 13:58:12 +01:00
parent 4b1a3e9a7e
commit b9575f8b6e
1 changed files with 60 additions and 18 deletions

View File

@ -4,12 +4,21 @@ from collections import deque
import re import re
from os.path import exists from os.path import exists
import RPi.GPIO as GPIO try:
GPIO.setmode(GPIO.BCM) import RPi.GPIO as GPIO
GPIO.setmode(GPIO.BCM)
rpi = True
except:
rpi = False
# SHT40 try:
import board # SHT40
import adafruit_sht4x import board
import adafruit_sht4x
enable_sht = True
except:
print("Adafruit SHT4X not available!")
enable_sht = False
# TODO client: add a way to disable live plotting? server: stop # TODO client: add a way to disable live plotting? server: stop
# sending data periodically to everybody! # sending data periodically to everybody!
@ -39,7 +48,8 @@ class GPIOState(Sensor):
try: try:
self.value = GPIO.input(self.pin) self.value = GPIO.input(self.pin)
except: except:
self.value = None print(f"Could not read pin {self.pin}")
self.value = 0
self.time = perf_counter() self.time = perf_counter()
return (self.time, self.transform(self.value)) return (self.time, self.transform(self.value))
@ -67,27 +77,53 @@ class Temperature1W(Sensor):
return (time, None) return (time, None)
return (time, int(re.search("t=([0-9]+)", content[1]).group(1)) / 1000.0) return (time, int(re.search("t=([0-9]+)", content[1]).group(1)) / 1000.0)
if enable_sht:
i2c = board.I2C() # uses board.SCL and board.SDA
sht = adafruit_sht4x.SHT4x(i2c)
print('Serial: ', hex(sht.serial_number))
SHT40_DEFAULT = adafruit_sht4x.Mode.NOHEAT_HIGHPRECISION
sht.mode = SHT40_DEFAULT
class SHT40(Sensor): class SHT40(Sensor):
def __init__(self, what): def __init__(self, what, every=60):
if what not in ("Temperature", "Humidity"):
print("ERROR: invalid sensor value: ", what)
return
self.measure = what self.measure = what
self.i2c = board.I2C() # uses board.SCL and board.SDA if not enable_sht:
self.sht = adafruit_sht4x.SHT4x(i2c) return
print('Serial: ', hex(self.sht.serial_number)) self.heat_every = every
# FIXME: if self.measure != "Temperature":
self.sht.mode = adafruit_sht4x.Mode.NOHEAT_HIGHPRECISION self.last_heat = perf_counter()
self.heatmode = adafruit_sht4x.Mode.HIGHHEAT_1S
self.standardmode = SHT40_DEFAULT
# mode: adafruit_sht4x.Mode.string[sht.mode]
# Can also set the mode to enable heater # Can also set the mode to enable heater
# sht.mode = adafruit_sht4x.Mode.LOWHEAT_100MS # sht.mode = adafruit_sht4x.Mode.LOWHEAT_100MS
# modes = [adafruit_sht4x.Mode.HIGHHEAT_1S, # modes = [adafruit_sht4x.Mode.HIGHHEAT_1S,
# adafruit_sht4x.Mode.LOWHEAT_100MS, # adafruit_sht4x.Mode.LOWHEAT_100MS,
# adafruit_sht4x.Mode.NOHEAT_HIGHPRECISION] # adafruit_sht4x.Mode.NOHEAT_HIGHPRECISION]
def is_temperature(self):
return self.measure == "Temperature"
def is_humidity(self):
return self.measure == "Humidity"
def reset_mode(self):
sht.mode = self.standardmode
def read(self): def read(self):
time = perf_counter() time = perf_counter()
reset = False
if self.is_humidity():
timediff = time - self.last_heat
if timediff > self.heat_every:
reset = True
sht.mode = self.heatmode
temperature, relative_humidity = sht.measurements temperature, relative_humidity = sht.measurements
# print("Temperature: %0.1f C" % temperature) if reset:
# print("Humidity: %0.1f %%" % (relative_humidity)) self.reset_mode()
# print("") return (time, temperature if self.measure == 'Temperature' else relative_humidity)
return (time, temerature if self.measure == 'Temperature' else relative_humidityg)
class Sensors(): class Sensors():
def __init__(self, history=2621440): def __init__(self, history=2621440):
@ -102,6 +138,9 @@ class Sensors():
"Scan for new sensors" "Scan for new sensors"
def scan(self): def scan(self):
# Read something like this from a stored prefs json
# sensors_names_map = {}
# Then, scan, apply existing names and use placholders for new sensors
# FIXME: should scan, apply stored data and return this # FIXME: should scan, apply stored data and return this
self.available_sensors = { self.available_sensors = {
'T_food': Temperature1W('28-06214252b671'), 'T_food': Temperature1W('28-06214252b671'),
@ -123,8 +162,11 @@ class Sensors():
def read(self): def read(self):
for sensor in self.available_sensors.keys(): for sensor in self.available_sensors.keys():
time, value = self.available_sensors[sensor].read() try:
self.values[sensor] = (str(self.perf_datetime(time)), value) time, value = self.available_sensors[sensor].read()
self.values[sensor] = (str(self.perf_datetime(time)), value)
except Exception as e:
print("Error reading sensors", sensor, ": ", e)
self.history.append(self.value_tuple()) self.history.append(self.value_tuple())
def get_sensor_value(self, sensor_name): def get_sensor_value(self, sensor_name):