from time import perf_counter from datetime import datetime, timedelta from collections import deque import re from os.path import exists import RPi.GPIO as GPIO GPIO.setmode(GPIO.BCM) # TODO client: add a way to disable live plotting? server: stop # sending data periodically to everybody! from random import random from time import sleep class Sensor(): def __init__(self, autocorr=0.7): self.prevval = random() * 100 self.auto = autocorr self.measure = None def read(self): sleep(0.5) return (perf_counter(), int((random() * (1-self.auto) + self.prevval * self.auto) * 100) / 100) class GPIOState(Sensor): def __init__(self, pin, transform=lambda x: 1-x): self.measure = 'Switch' self.pin = pin self.transform = transform def read(self): try: self.value = GPIO.input(self.pin) except: self.value = None self.time = perf_counter() return (self.time, self.transform(self.value)) class Temperature1W(Sensor): def __init__(self, address): self.measure = 'Temperature' self.address = address def path(self): return f'/sys/bus/w1/devices/{self.address}/w1_slave' def read(self): path = self.path() if not exists(path): # WIP return (perf_counter(), 0.0) with open(path, "r") as f: content = f.readlines() time = perf_counter() if len(content) < 2: return None if content[0].strip()[-3:] != "YES": print("INVALID CHECKSUM") return (time, None) return (time, int(re.search("t=([0-9]+)", content[1]).group(1)) / 1000.0) class Sensors(): def __init__(self, history=2621440): self.starttime = (datetime.now(), perf_counter()) self.scan() self.values = {} self.history = deque([], maxlen=history) def perf_datetime(self, offset): time = offset - self.starttime[1] return self.starttime[0] + timedelta(seconds=time) "Scan for new sensors" def scan(self): # FIXME: should scan, apply stored data and return this self.available_sensors = { 'T_ext': Temperature1W('28-06214252b671'), 'T_food': Temperature1W('28-062142531e5a'), 'heater': GPIOState(22), } def list(self): return { k: self.available_sensors[k].measure for k in self.available_sensors.keys() } def value_tuple(self): return tuple((k, *self.values[k]) for k in self.values.keys()) def read(self): for sensor in self.available_sensors.keys(): time, value = self.available_sensors[sensor].read() self.values[sensor] = (str(self.perf_datetime(time)), value) self.history.append(self.value_tuple()) def get_sensor_value(self, sensor_name): return self.values.get(sensor_name, None) def get(self): return self.value_tuple() def get_history(self): return tuple(self.history) sensors = Sensors() def get_sensor_value(sensor_name): return sensors.get_sensor_value(sensor_name)[1]