259 lines
7.5 KiB
Python
259 lines
7.5 KiB
Python
from time import perf_counter
|
|
from datetime import datetime, timedelta
|
|
from collections import deque
|
|
from statistics import mean
|
|
from itertools import islice
|
|
import re
|
|
from os.path import exists
|
|
|
|
try:
|
|
import RPi.GPIO as GPIO
|
|
GPIO.setmode(GPIO.BCM)
|
|
rpi = True
|
|
except:
|
|
rpi = False
|
|
|
|
try:
|
|
# SHT40
|
|
import board
|
|
import adafruit_sht4x
|
|
enable_sht = True
|
|
except:
|
|
print("Adafruit SHT4X not available!")
|
|
enable_sht = False
|
|
|
|
try:
|
|
import AHT20
|
|
enable_aht = True
|
|
except:
|
|
print("Adafruit SHT4X not available!")
|
|
enable_aht = False
|
|
|
|
# TODO client: add a way to disable live plotting? server: stop
|
|
# sending data periodically to everybody!
|
|
|
|
from random import random
|
|
from time import sleep
|
|
|
|
# How many seconds to use
|
|
WINDOW_SIZE_S = 5
|
|
|
|
class Sensor():
|
|
def __init__(self, autocorr=0.7):
|
|
# FIXME: DEPENDS ON SAMPLING RATE
|
|
self.history = deque([], int(WINDOW_SIZE_S * 2))
|
|
|
|
## This is just for debugging
|
|
self.prevval = random() * 100
|
|
self.auto = autocorr
|
|
self.measure = None
|
|
|
|
def read(self):
|
|
sleep(0.5)
|
|
return self.store(
|
|
perf_counter(),
|
|
int((random() * (1-self.auto) + self.prevval * self.auto) * 100) / 100)
|
|
|
|
def smooth(self):
|
|
# TODO: Implement *correctly* this (take into account the time)
|
|
# m = mean([el[1] for el in self.history])
|
|
# Non mi piace proprio in realtà
|
|
return (self.history[-1][0], m)
|
|
|
|
def store(self, value, time):
|
|
self.history.append((value, time))
|
|
# self.smooth()
|
|
return (value, time)
|
|
|
|
|
|
class GPIOState(Sensor):
|
|
def __init__(self, pin, transform=lambda x: 1-x):
|
|
super().__init__()
|
|
self.measure = 'Switch'
|
|
|
|
self.pin = pin
|
|
self.transform = transform
|
|
|
|
def read(self):
|
|
try:
|
|
self.value = GPIO.input(self.pin)
|
|
except:
|
|
print(f"Could not read pin {self.pin}")
|
|
self.value = 0
|
|
self.time = perf_counter()
|
|
|
|
# Even if we don't need to return the smoothed value, we still smooth it
|
|
# to get the percentage of use over the period (if it will be needed)
|
|
value = self.transform(self.value)
|
|
self.store(self.time, value)
|
|
|
|
return (self.time, value)
|
|
|
|
class Temperature1W(Sensor):
|
|
def __init__(self, address):
|
|
super().__init__()
|
|
self.measure = 'Temperature'
|
|
self.address = address
|
|
|
|
def path(self):
|
|
return f'/sys/bus/w1/devices/{self.address}/w1_slave'
|
|
|
|
def read(self):
|
|
path = self.path()
|
|
if not exists(path):
|
|
# WIP
|
|
return (perf_counter(), 0.0)
|
|
with open(path, "r") as f:
|
|
content = f.readlines()
|
|
time = perf_counter()
|
|
if len(content) < 2:
|
|
return None
|
|
if content[0].strip()[-3:] != "YES":
|
|
print("INVALID CHECKSUM")
|
|
return (time, None)
|
|
return self.store(
|
|
time, int(re.search("t=([0-9]+)", content[1]).group(1)) / 1000.0)
|
|
|
|
if enable_sht:
|
|
i2c = board.I2C() # uses board.SCL and board.SDA
|
|
sht = adafruit_sht4x.SHT4x(i2c)
|
|
print('Serial: ', hex(sht.serial_number))
|
|
SHT40_DEFAULT = adafruit_sht4x.Mode.NOHEAT_HIGHPRECISION
|
|
sht.mode = SHT40_DEFAULT
|
|
|
|
if enable_aht:
|
|
aht = AHT20.AHT20()
|
|
|
|
class SHT40(Sensor):
|
|
def __init__(self, what, every=600):
|
|
super().__init__()
|
|
if what not in ("Temperature", "Humidity"):
|
|
print("ERROR: invalid sensor value: ", what)
|
|
return
|
|
self.measure = what
|
|
self.heat_every = every
|
|
if self.is_humidity():
|
|
self.last_heat = perf_counter()
|
|
if not enable_sht:
|
|
return
|
|
# alternative: HIGHHEAT_1S
|
|
self.heatmode = adafruit_sht4x.Mode.LOWHEAT_100MS
|
|
self.standardmode = SHT40_DEFAULT
|
|
self.serial = hex(sht.serial_number)
|
|
# mode: adafruit_sht4x.Mode.string[sht.mode]
|
|
# Can also set the mode to enable heater
|
|
# sht.mode = adafruit_sht4x.Mode.LOWHEAT_100MS
|
|
|
|
def is_temperature(self):
|
|
return self.measure == "Temperature"
|
|
|
|
def is_humidity(self):
|
|
return self.measure == "Humidity"
|
|
|
|
def reset_mode(self):
|
|
sht.mode = self.standardmode
|
|
|
|
def read(self):
|
|
time = perf_counter()
|
|
if not enable_sht:
|
|
return (time, None)
|
|
reset = False
|
|
if self.is_humidity():
|
|
timediff = time - self.last_heat
|
|
if timediff > self.heat_every:
|
|
reset = True
|
|
sht.mode = self.heatmode
|
|
temperature, relative_humidity = sht.measurements
|
|
if reset:
|
|
self.reset_mode()
|
|
self.last_heat = perf_counter()
|
|
return self.store(
|
|
time, temperature if self.measure == 'Temperature' else relative_humidity)
|
|
|
|
class AHT40(Sensor):
|
|
def __init__(self, what):
|
|
super().__init__()
|
|
if what not in ("Temperature", "Humidity"):
|
|
print("ERROR: invalid sensor value: ", what)
|
|
return
|
|
self.measure = what
|
|
if not enable_aht:
|
|
return
|
|
|
|
def is_temperature(self):
|
|
return self.measure == "Temperature"
|
|
|
|
def is_humidity(self):
|
|
return self.measure == "Humidity"
|
|
|
|
def read(self):
|
|
time = perf_counter()
|
|
if not enable_aht:
|
|
return (time, None)
|
|
return self.store(
|
|
time, aht20.get_temperature_crc8() if (
|
|
self.measure == 'Temperature'
|
|
) else aht20.get_humidity_crc8())
|
|
|
|
class Sensors():
|
|
def __init__(self, history=2621440):
|
|
self.starttime = (datetime.now(), perf_counter())
|
|
self.scan()
|
|
self.values = {}
|
|
self.history = deque([], maxlen=history)
|
|
|
|
def perf_datetime(self, offset):
|
|
time = offset - self.starttime[1]
|
|
return self.starttime[0] + timedelta(seconds=time)
|
|
|
|
"Scan for new sensors"
|
|
def scan(self):
|
|
# Read something like this from a stored prefs json
|
|
# sensors_names_map = {}
|
|
# Then, scan, apply existing names and use placholders for new sensors
|
|
# FIXME: should scan, apply stored data and return this
|
|
self.available_sensors = {
|
|
'T_food': Temperature1W('28-06214252b671'),
|
|
'T_ext': SHT40('Temperature'),
|
|
'H_ext': SHT40('Humidity'),
|
|
'T_ext2': AHT40('Temperature'),
|
|
'H_ext2': AHT40('Humidity'),
|
|
'heater': GPIOState(22),
|
|
'humidifier/heater2': GPIOState(17),
|
|
'freezer': GPIOState(14),
|
|
'fan': GPIOState(27),
|
|
}
|
|
|
|
def list(self):
|
|
return {
|
|
k: self.available_sensors[k].measure
|
|
for k in self.available_sensors.keys()
|
|
}
|
|
|
|
def value_tuple(self):
|
|
return tuple((k, *self.values[k]) for k in self.values.keys())
|
|
|
|
def read(self):
|
|
for sensor in self.available_sensors.keys():
|
|
try:
|
|
time, value = self.available_sensors[sensor].read()
|
|
self.values[sensor] = (str(self.perf_datetime(time)), value)
|
|
except Exception as e:
|
|
print("Error reading sensors", sensor, ": ", e)
|
|
self.history.append(self.value_tuple())
|
|
|
|
def get_sensor_value(self, sensor_name):
|
|
return self.values.get(sensor_name, None)
|
|
|
|
def get(self):
|
|
return self.value_tuple()
|
|
|
|
def get_history(self, n=None):
|
|
l = len(self.history)
|
|
return tuple(islice(self.history, max(0, l - (n or l)), l))
|
|
|
|
sensors = Sensors()
|
|
|
|
def get_sensor_value(sensor_name):
|
|
return sensors.get_sensor_value(sensor_name)[1]
|