from time import perf_counter from datetime import datetime, timedelta from collections import deque from statistics import mean from itertools import islice import re from os.path import exists try: import RPi.GPIO as GPIO GPIO.setmode(GPIO.BCM) rpi = True except: rpi = False try: # SHT40 import board import adafruit_sht4x enable_sht = True except: print("Adafruit SHT4X not available!") enable_sht = False try: import AHT20 enable_aht = True except: print("Adafruit SHT4X not available!") enable_aht = False # TODO client: add a way to disable live plotting? server: stop # sending data periodically to everybody! from random import random from time import sleep # How many seconds to use WINDOW_SIZE_S = 5 class Sensor(): def __init__(self, autocorr=0.7): # FIXME: DEPENDS ON SAMPLING RATE self.history = deque([], int(WINDOW_SIZE_S * 2)) ## This is just for debugging self.prevval = random() * 100 self.auto = autocorr self.measure = None def read(self): sleep(0.5) return self.store( perf_counter(), int((random() * (1-self.auto) + self.prevval * self.auto) * 100) / 100) def smooth(self): # TODO: Implement *correctly* this (take into account the time) # m = mean([el[1] for el in self.history]) # Non mi piace proprio in realtà return (self.history[-1][0], m) def store(self, value, time): self.history.append((value, time)) # self.smooth() return (value, time) class GPIOState(Sensor): def __init__(self, pin, transform=lambda x: 1-x): super().__init__() self.measure = 'Switch' self.pin = pin self.transform = transform def read(self): try: self.value = GPIO.input(self.pin) except: print(f"Could not read pin {self.pin}") self.value = 0 self.time = perf_counter() # Even if we don't need to return the smoothed value, we still smooth it # to get the percentage of use over the period (if it will be needed) value = self.transform(self.value) self.store(self.time, value) return (self.time, value) class Temperature1W(Sensor): def __init__(self, address): super().__init__() self.measure = 'Temperature' self.address = address def path(self): return f'/sys/bus/w1/devices/{self.address}/w1_slave' def read(self): path = self.path() if not exists(path): # WIP return (perf_counter(), 0.0) with open(path, "r") as f: content = f.readlines() time = perf_counter() if len(content) < 2: return None if content[0].strip()[-3:] != "YES": print("INVALID CHECKSUM") return (time, None) return self.store( time, int(re.search("t=([0-9]+)", content[1]).group(1)) / 1000.0) if enable_sht: i2c = board.I2C() # uses board.SCL and board.SDA sht = adafruit_sht4x.SHT4x(i2c) print('Serial: ', hex(sht.serial_number)) SHT40_DEFAULT = adafruit_sht4x.Mode.NOHEAT_HIGHPRECISION sht.mode = SHT40_DEFAULT if enable_aht: aht = AHT20.AHT20() class SHT40(Sensor): def __init__(self, what, every=600): super().__init__() if what not in ("Temperature", "Humidity"): print("ERROR: invalid sensor value: ", what) return self.measure = what self.heat_every = every if self.is_humidity(): self.last_heat = perf_counter() if not enable_sht: return # alternative: HIGHHEAT_1S self.heatmode = adafruit_sht4x.Mode.LOWHEAT_100MS self.standardmode = SHT40_DEFAULT self.serial = hex(sht.serial_number) # mode: adafruit_sht4x.Mode.string[sht.mode] # Can also set the mode to enable heater # sht.mode = adafruit_sht4x.Mode.LOWHEAT_100MS def is_temperature(self): return self.measure == "Temperature" def is_humidity(self): return self.measure == "Humidity" def reset_mode(self): sht.mode = self.standardmode def read(self): time = perf_counter() if not enable_sht: return (time, None) reset = False if self.is_humidity(): timediff = time - self.last_heat if timediff > self.heat_every: reset = True sht.mode = self.heatmode temperature, relative_humidity = sht.measurements if reset: self.reset_mode() self.last_heat = perf_counter() return self.store( time, temperature if self.measure == 'Temperature' else relative_humidity) class AHT40(Sensor): def __init__(self, what): super().__init__() if what not in ("Temperature", "Humidity"): print("ERROR: invalid sensor value: ", what) return self.measure = what if not enable_aht: return def is_temperature(self): return self.measure == "Temperature" def is_humidity(self): return self.measure == "Humidity" def read(self): time = perf_counter() if not enable_aht: return (time, None) return self.store( time, aht.get_temperature_crc8() if ( self.measure == 'Temperature' ) else aht.get_humidity_crc8()) class Sensors(): def __init__(self, history=2621440): self.starttime = (datetime.now(), perf_counter()) self.scan() self.values = {} self.history = deque([], maxlen=history) def perf_datetime(self, offset): time = offset - self.starttime[1] return self.starttime[0] + timedelta(seconds=time) "Scan for new sensors" def scan(self): # Read something like this from a stored prefs json # sensors_names_map = {} # Then, scan, apply existing names and use placholders for new sensors # FIXME: should scan, apply stored data and return this self.available_sensors = { 'T_food': Temperature1W('28-06214252b671'), 'T_ext': SHT40('Temperature'), 'H_ext': SHT40('Humidity', every=60*60), 'T_ext2': AHT40('Temperature'), 'H_ext2': AHT40('Humidity'), 'heater': GPIOState(22), 'humidifier/heater2': GPIOState(17), 'freezer': GPIOState(14, transform=lambda x: x), 'fan': GPIOState(27), } def list(self): return { k: self.available_sensors[k].measure for k in self.available_sensors.keys() } def value_tuple(self): return tuple((k, *self.values[k]) for k in self.values.keys()) def read(self): for sensor in self.available_sensors.keys(): try: time, value = self.available_sensors[sensor].read() self.values[sensor] = (str(self.perf_datetime(time)), value) except Exception as e: print("Error reading sensors", sensor, ": ", e) self.history.append(self.value_tuple()) def get_sensor_value(self, sensor_name): return self.values.get(sensor_name, None) def get(self): return self.value_tuple() def get_history(self, n=None): l = len(self.history) return tuple(islice(self.history, max(0, l - (n or l)), l)) sensors = Sensors() def get_sensor_value(sensor_name): return sensors.get_sensor_value(sensor_name)[1]