hakkoso/phasectrl.py

251 lines
8.6 KiB
Python

# Simple Finite State Machine used to control states
import json
from scm import eval, _, GLOBAL_ENV, stringify, load
from time import perf_counter
from chat import chat
from persona import ManualIntervention
from sensors import get_sensor_value
from control import fixed_duty_wrapper
from actuators import actuators
MY_GLOBAL_ENV = (
_('make-duty-controller', 3, fixed_duty_wrapper,
GLOBAL_ENV))
def safe_eval(data, env):
try:
return eval(data, env)
except Exception as e:
err = ' '.join(('evaluating:', data, ':', str(e)))
chat.send_sync(err)
return True
def load_recipe(json):
def load_phase(json):
def fallback(v):
return json.get(v, '#t')
return Phase(
json['name'], json['description'],
onexit=fallback('on_exit'),
onload=fallback('on_load'),
nextcond=fallback('exit_condition'))
return Recipe(
json['name'], json['variant'], json['description'],
json.get('controllers', ()),
tuple(load_phase(p) for p in json['phases']))
def load_recipes(file):
with open(file, "r") as f:
recipes = json.loads(f.read())
return tuple(load_recipe(recipe) for recipe in recipes)
def manual_intervention_wrapper(state):
def wrapper(args):
label = None
options = None
if len(args) > 0:
label = stringify(args.car, quote=False)
if len(args) > 1:
options = tuple(
(stringify(o.car, quote=False), o.cdr.car)
for o in args.cdr.car)
resp = state.manual.get(label, options)
if resp[0]:
print('REQUESTING CLIENT UPDATE')
state.onupdate()
return resp[1]
return wrapper
class State():
def __init__(self, recipe_file, onupdate=lambda: None):
self.recipes = load_recipes(recipe_file)
self.recipe = None
self.phase = None
self.manual = ManualIntervention()
self.baseenv = (
_('get-sensor', 1, lambda x: get_sensor_value(stringify(x.car, quote=False)),
_('notify', 1, lambda x: chat.send_sync(stringify(x.car, quote=False)),
_('hours', 1, lambda x: x.car * 60 * 60,
_('minutes', 1, lambda x: x.car * 60,
_('seconds', 1, lambda x: x.car,
_('manual-intervention', -1, manual_intervention_wrapper(self),
_('set-target', 2, self.set_target,
_('set-controller', 2, self.set_controller,
MY_GLOBAL_ENV)))))))))
self.env = self.baseenv
self.envdata = {}
self.onupdate = onupdate
def getState(self):
return {
'phase': self.phase
}
def now(self):
return perf_counter()
def postload(self):
if self.recipe is None:
return
self.phase = 0
self.env = (
_('recipe-name', 0, lambda x: self.recipe.name,
_('recipe-description', 0, lambda x: self.recipe.description,
_('time-in-this-phase', 0, lambda x: self.phase_time(),
self.baseenv))))
self.envdata['controllers'] = {}
for controller in self.recipe.controllers:
name, ctrl = safe_eval(controller, self.env)
self.envdata['controllers'][name] = ctrl
self.envdata['sensors'] = {}
self.loadphase(self.phase)
def loadByIdx(self, recipe):
self.recipe = self.recipes[recipe]
self.postload()
def stop(self):
self.current_phase().exit(self.env)
self.done(message='(concat "Recipe " (recipe-name) " stopped manually")')
def loadByName(self, recipe):
r = tuple(i for (i, r) in enumerate(self.recipes) if r.name == recipe)
if len(r) > 0:
self.recipe = self.recipes[r[0]]
self.postload()
def current_phase(self):
if self.recipe is None:
return None
if self.phase >= len(self.recipe.phases) or self.phase < 0:
return None
return self.recipe.phases[self.phase]
def check(self):
return self.current_phase().satisfied_p(self.env)
def run_step(self):
# Update sensor values
for sensor in self.envdata.get('sensors', ()):
_, ctrl, target = self.envdata['sensors'][sensor]
self.envdata['sensors'][sensor] = (get_sensor_value(sensor), ctrl, target)
# Apply actuators
for controller in self.envdata.get('controllers', {}).keys():
ctrl = self.envdata['controllers'][controller]
if ctrl.input_label not in self.envdata['sensors'].keys():
print(f'Missing sensor {ctrl.input_label}')
continue
response = ctrl.apply(self.envdata['sensors'][ctrl.input_label][0])
actuators[controller].enable(response)
if self.check():
if self.next() is None:
return True
print(f'Waiting for {self.current_phase().nextcond}')
return False
def next(self):
self.current_phase().exit(self.env)
if self.phase < len(self.recipe.phases) - 1:
self.phase += 1
phase = self.recipe.phases[self.phase]
self.loadphase(self.phase)
self.manual.clear()
return phase
self.onupdate()
return None
def phase_time(self, arg=None):
return (self.now() - self.envdata['current-phase-loadtime'])
def loadphase(self, phaseidx):
phase = self.recipe.phases[phaseidx]
self.envdata['current-phase-loadtime'] = self.now()
resp = safe_eval(phase.onload, self.env)
self.onupdate()
return resp
def done(self, message=None):
self.recipe.done(self.env, message)
# Stop all actuators
for controller in self.envdata.get('controllers', {}).keys():
actuators[controller].disable()
self.recipe = None
self.env = self.baseenv
self.envdata = {}
self.onupdate()
def set_target(self, args):
sensor, target = (stringify(args.car, quote=False), args.cdr.car)
value, controller, _ = self.envdata['sensors'][sensor]
self.envdata['sensors'][sensor] = (value, controller, target)
self.envdata['controllers'][controller].set_target(target)
print('setting sensor', sensor, controller, 'to target', target)
def set_controller(self, args):
controller, sensor = (stringify(args.car, quote=False),
stringify(args.cdr.car, quote=False))
self.envdata['controllers'][controller].set_input_label(sensor)
# Value/Controller/Target
self.envdata['sensors'][sensor] = (get_sensor_value(sensor), controller, None)
print('linking sensor', sensor, 'to controller', controller,
'(', self.envdata['controllers'][controller], ')')
class Recipe():
def __init__(self, name, variant='default', description='',
controllers=(),
phases=()):
self.name = name
self.controllers = controllers
self.description = description
self.phases = phases
def getState(self, currentphase=None):
def phasetodict(i, p):
return {
'name': p.name,
'text': p.text,
'nextcond': p.nextcond,
'current': currentphase == i,
# 'loaded_time': p.loaded_time,
'onload': p.onload,
'onexit': p.onexit,
# 'envdata': p.envdata
}
return {
'name': self.name,
'description': self.description,
'controllers': self.controllers,
'phases': [
phasetodict(i, p)
for (i, p) in enumerate(self.phases)
],
# 'phase': self.phase
}
def done(self, env, message=None):
message = message or '(concat "Recipe " (recipe-name) " completed at " (now))'
safe_eval(f'(notify {message})', env)
class Phase():
def __init__(self, name, text='', nextcond='#t', onload='', onexit=''):
self.name = name
self.text = text
self.nextcond = nextcond
self.onload = onload
self.onexit = onexit
# self.loaded_time = None
# self.env = env
# self.envdata = {}
def satisfied_p(self, env):
return safe_eval(self.nextcond, env)
def exit(self, env):
return safe_eval(self.onexit, env)
def revert(self, env):
# Undo state changes (attuators values)
pass