hakkoso/phasectrl.py

313 lines
11 KiB
Python

# Simple Finite State Machine used to control states
import json
from scm import eval, _, GLOBAL_ENV, stringify, load
from time import perf_counter
from chat import chat
from persona import ManualIntervention
from sensors import get_sensor_value
from control import fixed_duty_wrapper
from actuators import actuators
from utils import consumption_diff, consumption_to_string
MY_GLOBAL_ENV = (
_('make-duty-controller', 3, fixed_duty_wrapper,
GLOBAL_ENV))
def safe_eval(data, env):
try:
return eval(data, env)
except Exception as e:
err = ' '.join(('evaluating:', data, ':', str(e)))
chat.send_sync(err)
return True
def load_phase(json):
def fallback(v):
return json.get(v, '#t')
return Phase(
json['name'], json['description'],
onexit=fallback('on_exit'),
onload=fallback('on_load'),
nextcond=fallback('exit_condition'))
def load_recipe(json):
return Recipe(
json['name'], json['description'],
json.get('controllers', ()),
[load_phase(p) for p in json['phases']])
def load_recipes(file):
with open(file, "r") as f:
recipes = json.loads(f.read())
return [load_recipe(recipe) for recipe in recipes]
def store_recipes(file, recipes):
print(file, recipes)
with open(file, "w") as f:
f.write(json.dumps(recipes, indent=2))
def manual_intervention_wrapper(state):
def wrapper(args):
label = None
options = None
if len(args) > 0:
label = stringify(args.car, quote=False)
if len(args) > 1:
options = tuple(
(stringify(o.car, quote=False), o.cdr.car)
for o in args.cdr.car)
resp = state.manual.get(label, options)
if resp[0]:
print('REQUESTING CLIENT UPDATE')
state.onupdate()
return resp[1]
return wrapper
class State():
def __init__(self, recipe_file, onupdate=lambda: None):
self.recipes = load_recipes(recipe_file)
self.recipe_file = recipe_file
self.recipe = None
self.phase = None
self.manual = ManualIntervention()
self.baseenv = (
_('get-sensor', 1, lambda x: get_sensor_value(stringify(x.car, quote=False)),
_('set-actuator', 2, self.set_actuator,
_('notify', 1, lambda x: chat.send_sync(stringify(x.car, quote=False)),
_('hours', 1, lambda x: x.car * 60 * 60,
_('minutes', 1, lambda x: x.car * 60,
_('seconds', 1, lambda x: x.car,
_('manual-intervention', -1, manual_intervention_wrapper(self),
_('set-target', 2, self.set_target,
_('set-controller', 2, self.set_controller,
_('reload-recipes', 0, lambda x: self.reloadRecipes(),
MY_GLOBAL_ENV)))))))))))
self.env = self.baseenv
self.envdata = {}
self.onupdate = onupdate
def reloadRecipes(self):
# chat.send_sync("Reloading recipes!")
print("Reloading recipes!")
self.recipes = load_recipes(self.recipe_file)
def getState(self):
return {
'phase': self.phase,
'phase-time': self.phase_time(),
'phase-consumption': self.consumption()
}
def now(self):
return perf_counter()
def postload(self):
if self.recipe is None:
return
self.phase = 0
self.env = (
_('recipe-name', 0, lambda x: self.recipe.name,
_('recipe-description', 0, lambda x: self.recipe.description,
_('recipe-consumption', 0, lambda x: self.recipe.consumption(),
_('time-in-this-phase', 0, lambda x: self.phase_time(),
_('phase-consumption', 0, lambda x: self.consumption(),
self.baseenv))))))
self.envdata['controllers'] = {}
for controller in self.recipe.controllers:
name, ctrl = safe_eval(controller, self.env)
self.envdata['controllers'][name] = ctrl
self.envdata['sensors'] = {}
self.envdata['recipe-start-consumption'] = self.consumption()
self.loadphase(self.phase)
def recipeById(self, id):
try:
id = int(id)
except:
id = -1
print(self.recipes)
if id > -1 and id < len(self.recipes):
return self.recipes[id]
return None
def loadByIdx(self, recipe):
self.recipe = self.recipeById(recipe)
self.postload()
def stop(self):
self.current_phase().exit(self.env)
self.done(message='(concat "Recipe " (recipe-name) " stopped manually")')
def loadByName(self, recipe):
r = tuple(i for (i, r) in enumerate(self.recipes) if r.name == recipe)
if len(r) > 0:
self.recipe = self.recipes[r[0]]
self.postload()
def current_phase(self):
if self.recipe is None:
return None
if self.phase >= len(self.recipe.phases) or self.phase < 0:
return None
return self.recipe.phases[self.phase]
def check(self):
return self.current_phase().satisfied_p(self.env)
def run_step(self):
# Update sensor values
for sensor in self.envdata.get('sensors', ()):
_, ctrl, target = self.envdata['sensors'][sensor]
self.envdata['sensors'][sensor] = (get_sensor_value(sensor), ctrl, target)
# Apply actuators
for controller in self.envdata.get('controllers', {}).keys():
ctrl = self.envdata['controllers'][controller]
try:
if ctrl.input_label not in self.envdata['sensors'].keys():
print(f'Missing sensor {ctrl.input_label}')
continue
response = ctrl.apply(self.envdata['sensors'][ctrl.input_label][0])
actuators[controller].enable(response)
except Exception as e:
print(f"UNKNOWN BUG HERE in run_step {e}")
if self.check():
if self.next() is None:
return True
print(f'Waiting for {self.current_phase().nextcond}')
return False
def next(self):
self.current_phase().exit(self.env)
if self.phase < len(self.recipe.phases) - 1:
self.phase += 1
phase = self.recipe.phases[self.phase]
self.loadphase(self.phase)
self.manual.clear()
return phase
self.onupdate()
return None
def phase_time(self, arg=None):
return (self.now() - self.envdata.get('current-phase-loadtime', 0))
def loadphase(self, phaseidx):
phase = self.recipe.phases[phaseidx]
self.envdata['current-phase-loadtime'] = self.now()
self.envdata['current-phase-start-consumption'] = self.get_total_consumption()
resp = safe_eval(phase.onload, self.env)
self.onupdate()
return resp
def done(self, message=None):
self.envdata['recipe-done-consumption'] = self.get_total_consumption()
self.recipe.done(self.env, message)
# Stop all actuators
for controller in self.envdata.get('controllers', {}).keys():
actuators[controller].disable()
self.recipe = None
self.env = self.baseenv
self.envdata = {}
self.onupdate()
def set_target(self, args):
sensor, target = (stringify(args.car, quote=False), args.cdr.car)
value, controller, _ = self.envdata['sensors'][sensor]
self.envdata['sensors'][sensor] = (value, controller, target)
self.envdata['controllers'][controller].set_target(target)
print('setting sensor', sensor, controller, 'to target', target)
def set_controller(self, args):
controller, sensor = (stringify(args.car, quote=False),
stringify(args.cdr.car, quote=False))
self.envdata['controllers'][controller].set_input_label(sensor)
# Value/Controller/Target
self.envdata['sensors'][sensor] = (get_sensor_value(sensor), controller, None)
print('linking sensor', sensor, 'to controller', controller,
'(', self.envdata['controllers'][controller], ')')
def set_actuator(self, args):
actuator, value = (stringify(args.car, quote=False), args.cdr.car)
if value:
actuators[actuator].enable()
else:
actuators[actuator].disable()
print('setting actuator ', actuator, 'to value', value,
', remember to change it manually if stopping?')
def get_total_consumption(self):
return {actuator: actuators[actuator].consumption() for
actuator in actuators.keys()}
def consumption(self):
before = self.envdata['current-phase-start-consumption']
after = self.get_total_consumption()
return consumption_diff(before, after)
class Recipe():
def __init__(self, name, description='',
controllers=(),
phases=[]):
self.name = name
self.controllers = controllers
self.description = description
self.phases = phases
def getState(self, currentphase=None):
def phasetodict(i, p):
return {
'name': p.name,
'text': p.text,
'nextcond': p.nextcond,
'current': currentphase == i,
# 'loaded_time': p.loaded_time,
'onload': p.onload,
'onexit': p.onexit,
# 'envdata': p.envdata
}
return {
'name': self.name,
'description': self.description,
'controllers': self.controllers,
'phases': [
phasetodict(i, p)
for (i, p) in enumerate(self.phases)
],
# 'phase': self.phase
}
def consumption(self):
return consumption_to_string(consumption_diff(
self.envdata['recipe-start-consumption'],
self.envdata['recipe-end-consumption']))
def done(self, env, message=None):
message = message or '''
(concat "Recipe " (recipe-name) " completed at " (now) "\n"
"Consumption: " (recipe-consumption))
'''
safe_eval(f'(notify {message})', env)
class Phase():
def __init__(self, name, text='', nextcond='#t', onload='', onexit=''):
self.name = name
self.text = text
self.nextcond = nextcond
self.onload = onload
self.onexit = onexit
# self.loaded_time = None
# self.env = env
# self.envdata = {}
def satisfied_p(self, env):
return safe_eval(self.nextcond, env)
def exit(self, env):
return safe_eval(self.onexit, env)
def revert(self, env):
# Undo state changes (attuators values)
pass