hakkoso/sensors.py

90 lines
2.6 KiB
Python

from time import perf_counter
from datetime import datetime, timedelta
from collections import deque
import re
# TODO client: add a way to disable live plotting? server: stop
# sending data periodically to everybody!
from random import random
from time import sleep
class Sensor():
def __init__(self, autocorr=0.7):
self.prevval = random() * 100
self.auto = autocorr
self.measure = None
def read(self):
sleep(0.5)
return (perf_counter(),
int((random() * (1-self.auto) + self.prevval * self.auto) * 100) / 100)
class Temperature1W(Sensor):
def __init__(self, address):
super().__init__()
self.measure = 'Temperature'
self.address = address
def path(self):
return f'/sys/bus/w1/devices/{self.address}/w1_slave'
def read(self):
with open(self.path(), "r") as f:
content = f.readlines()
time = perf_counter()
if len(content) < 2:
return None
if content[0].strip()[-3:] != "YES":
print("INVALID CHECKSUM")
return None
return (time, int(re.search("t=([0-9]+)", content[1]).group(1)) / 1000.0)
class Sensors():
def __init__(self, history=2621440):
self.starttime = (datetime.now(), perf_counter())
self.scan()
self.values = {}
self.history = deque([], maxlen=history)
def perf_datetime(self, offset):
time = offset - self.starttime[1]
return self.starttime[0] + timedelta(seconds=time)
"Scan for new sensors"
def scan(self):
# FIXME: should scan, apply stored data and return this
self.available_sensors = {
'T_ext': Temperature1W('28-06214252b671'),
'T_food': Temperature1W('28-062142531e5a'),
}
def list(self):
return {
k: self.available_sensors[k].measure
for k in self.available_sensors.keys()
}
def value_tuple(self):
return tuple((k, *self.values[k]) for k in self.values.keys())
def read(self):
for sensor in self.available_sensors.keys():
time, value = self.available_sensors[sensor].read()
self.values[sensor] = (str(self.perf_datetime(time)), value)
self.history.append(self.value_tuple())
def get_sensor_value(self, sensor_name):
return self.values.get(sensor_name, None)
def get(self):
return self.value_tuple()
def get_history(self):
return tuple(self.history)
sensors = Sensors()
def get_sensor_value(sensor_name):
return sensors.get_sensor_value(sensor_name)[1]