from time import perf_counter from datetime import datetime, timedelta from collections import deque import re from os.path import exists # TODO client: add a way to disable live plotting? server: stop # sending data periodically to everybody! from random import random from time import sleep class Sensor(): def __init__(self, autocorr=0.7): self.prevval = random() * 100 self.auto = autocorr self.measure = None def read(self): sleep(0.5) return (perf_counter(), int((random() * (1-self.auto) + self.prevval * self.auto) * 100) / 100) class Temperature1W(Sensor): def __init__(self, address): self.measure = 'Temperature' self.address = address def path(self): return f'/sys/bus/w1/devices/{self.address}/w1_slave' def read(self): path = self.path() if not exists(path): # WIP return (perf_counter(), 0.0) with open(path, "r") as f: content = f.readlines() time = perf_counter() if len(content) < 2: return None if content[0].strip()[-3:] != "YES": print("INVALID CHECKSUM") return None return (time, int(re.search("t=([0-9]+)", content[1]).group(1)) / 1000.0) class Sensors(): def __init__(self, history=2621440): self.starttime = (datetime.now(), perf_counter()) self.scan() self.values = {} self.history = deque([], maxlen=history) def perf_datetime(self, offset): time = offset - self.starttime[1] return self.starttime[0] + timedelta(seconds=time) "Scan for new sensors" def scan(self): # FIXME: should scan, apply stored data and return this self.available_sensors = { 'T_ext': Temperature1W('28-06214252b671'), 'T_food': Temperature1W('28-062142531e5a'), } def list(self): return { k: self.available_sensors[k].measure for k in self.available_sensors.keys() } def value_tuple(self): return tuple((k, *self.values[k]) for k in self.values.keys()) def read(self): for sensor in self.available_sensors.keys(): time, value = self.available_sensors[sensor].read() self.values[sensor] = (str(self.perf_datetime(time)), value) self.history.append(self.value_tuple()) def get_sensor_value(self, sensor_name): return self.values.get(sensor_name, None) def get(self): return self.value_tuple() def get_history(self): return tuple(self.history) sensors = Sensors() def get_sensor_value(sensor_name): return sensors.get_sensor_value(sensor_name)[1]