358 lines
13 KiB
Python
358 lines
13 KiB
Python
# Simple Finite State Machine used to control states
|
|
import json
|
|
from scm import eval, _, GLOBAL_ENV, stringify, load
|
|
from time import perf_counter
|
|
from chat import chat
|
|
from persona import ManualIntervention
|
|
from sensors import get_sensor_value
|
|
from control import fixed_duty_wrapper
|
|
from actuators import actuators
|
|
from utils import consumption_diff, consumption_to_string, stohms
|
|
|
|
MY_GLOBAL_ENV = (
|
|
_('make-duty-controller', 4, fixed_duty_wrapper,
|
|
_('format-time', 1, lambda x: stohms(x.car),
|
|
GLOBAL_ENV)))
|
|
|
|
def safe_eval(data, env, notify=True):
|
|
try:
|
|
return eval(data, env)
|
|
except Exception as e:
|
|
err = ' '.join(('evaluating:', data, ':', str(e)))
|
|
if notify:
|
|
chat.send_sync(err)
|
|
return True
|
|
return err
|
|
|
|
def load_phase(json):
|
|
def fallback(v):
|
|
return json.get(v, '#t')
|
|
return Phase(
|
|
json['name'], json['description'],
|
|
onexit=fallback('on_exit'),
|
|
onload=fallback('on_load'),
|
|
nextcond=fallback('exit_condition'))
|
|
|
|
def load_recipe(json):
|
|
return Recipe(
|
|
json['name'], json['description'],
|
|
json.get('controllers', ()),
|
|
[load_phase(p) for p in json['phases']])
|
|
|
|
def load_recipes(file):
|
|
with open(file, "r") as f:
|
|
recipes = json.loads(f.read())
|
|
return [load_recipe(recipe) for recipe in recipes]
|
|
|
|
def store_recipes(file, recipes):
|
|
print(file, recipes)
|
|
with open(file, "w") as f:
|
|
f.write(json.dumps(recipes, indent=2))
|
|
|
|
def manual_intervention_wrapper(state):
|
|
def wrapper(args):
|
|
label = None
|
|
options = None
|
|
if len(args) > 0:
|
|
label = stringify(args.car, quote=False)
|
|
if len(args) > 1:
|
|
options = tuple(
|
|
(stringify(o.car, quote=False), o.cdr.car)
|
|
for o in args.cdr.car)
|
|
resp = state.manual.get(label, options)
|
|
if resp[0]:
|
|
print('REQUESTING CLIENT UPDATE')
|
|
state.onupdate()
|
|
return resp[1]
|
|
return wrapper
|
|
|
|
|
|
class State():
|
|
def __init__(self, recipe_file, onupdate=lambda: None):
|
|
self.recipes = load_recipes(recipe_file)
|
|
self.recipe_file = recipe_file
|
|
self.recipe = None
|
|
self.phase = None
|
|
self.manual = ManualIntervention()
|
|
self.baseenv = (
|
|
_('get-sensor', 1, lambda x: get_sensor_value(stringify(x.car, quote=False)),
|
|
_('set-actuator', 2, self.set_actuator,
|
|
_('notify', 1, lambda x: chat.send_sync(stringify(x.car, quote=False)),
|
|
_('hours', 1, lambda x: x.car * 60 * 60,
|
|
_('minutes', 1, lambda x: x.car * 60,
|
|
_('seconds', 1, lambda x: x.car,
|
|
_('manual-intervention', -1, manual_intervention_wrapper(self),
|
|
_('set-target', 2, self.set_target,
|
|
_('get-sensor-target', 1, self.get_sensor_target,
|
|
_('get-controller-target', 1, self.get_controller_target,
|
|
_('set-controller', 2, self.set_controller,
|
|
_('reload-recipes', 0, lambda x: self.reloadRecipes(),
|
|
MY_GLOBAL_ENV)))))))))))))
|
|
self.env = self.baseenv
|
|
self.envdata = {}
|
|
self.onupdate = onupdate
|
|
|
|
def reloadRecipes(self):
|
|
# chat.send_sync("Reloading recipes!")
|
|
print("Reloading recipes!")
|
|
self.recipes = load_recipes(self.recipe_file)
|
|
|
|
def getState(self):
|
|
return {
|
|
'phase': self.phase,
|
|
'phase-time': int(self.phase_time()),
|
|
'phase-consumption': consumption_to_string(self.phase_consumption()),
|
|
'recipe-time': int(self.recipe_time()),
|
|
'recipe-consumption': consumption_to_string(self.recipe_consumption())
|
|
}
|
|
|
|
def now(self):
|
|
return perf_counter()
|
|
|
|
def postload(self):
|
|
if self.recipe is None:
|
|
return
|
|
self.phase = 0
|
|
self.env = (
|
|
_('recipe-name', 0, lambda _: self.recipe.name,
|
|
_('recipe-description', 0, lambda _: self.recipe.description,
|
|
_('recipe-consumption', 0, lambda _: self.recipe_consumption(),
|
|
_('recipe-time', 0, lambda _: self.recipe_time(),
|
|
_('time-in-this-phase', 0, lambda _: self.phase_time(),
|
|
_('phase-consumption', 0, lambda _: self.phase_consumption(),
|
|
_('phase-time', 0, lambda _: self.phase_time(),
|
|
self.baseenv))))))))
|
|
self.envdata['controllers'] = {}
|
|
for controller in self.recipe.controllers:
|
|
name, ctrl = safe_eval(controller, self.env)
|
|
self.envdata['controllers'][name] = ctrl
|
|
self.envdata['sensors'] = {}
|
|
self.envdata['recipe-start-consumption'] = self.get_total_consumption()
|
|
self.envdata['current-recipe-loadtime'] = self.now()
|
|
self.loadphase(self.phase)
|
|
|
|
def recipeById(self, id):
|
|
try:
|
|
id = int(id)
|
|
except:
|
|
id = -1
|
|
print(self.recipes)
|
|
if id > -1 and id < len(self.recipes):
|
|
return self.recipes[id]
|
|
return None
|
|
|
|
def loadByIdx(self, recipe):
|
|
self.recipe = self.recipeById(recipe)
|
|
self.postload()
|
|
|
|
def stop(self):
|
|
phase = self.current_phase()
|
|
if phase is not None:
|
|
phase.exit(self.env)
|
|
self.done(message='(concat "Recipe " (recipe-name) " stopped manually")')
|
|
|
|
def loadByName(self, recipe):
|
|
r = tuple(i for (i, r) in enumerate(self.recipes) if r.name == recipe)
|
|
if len(r) > 0:
|
|
self.recipe = self.recipes[r[0]]
|
|
self.postload()
|
|
|
|
def current_phase(self):
|
|
if self.recipe is None:
|
|
return None
|
|
if self.phase >= len(self.recipe.phases) or self.phase < 0:
|
|
return None
|
|
return self.recipe.phases[self.phase]
|
|
|
|
def check(self):
|
|
return self.current_phase().satisfied_p(self.env)
|
|
|
|
def run_step(self):
|
|
# Update sensor values
|
|
for sensor in self.envdata.get('sensors', ()):
|
|
_, ctrl, target = self.envdata['sensors'][sensor]
|
|
self.envdata['sensors'][sensor] = (get_sensor_value(sensor), ctrl, target)
|
|
# Apply actuators
|
|
for controller in self.envdata.get('controllers', {}).keys():
|
|
ctrl = self.envdata['controllers'][controller]
|
|
try:
|
|
if ctrl.input_label not in self.envdata['sensors'].keys():
|
|
print(f'Missing sensor {ctrl.input_label}')
|
|
continue
|
|
response = ctrl.apply(self.envdata['sensors'][ctrl.input_label][0])
|
|
actuators[controller].enable(response)
|
|
except Exception as e:
|
|
print(f"UNKNOWN BUG HERE in run_step {e} on controller {ctrl}")
|
|
if self.check():
|
|
if self.next() is None:
|
|
return True
|
|
print(f'Waiting for {self.current_phase().nextcond}')
|
|
return False
|
|
|
|
def next(self):
|
|
self.current_phase().exit(self.env)
|
|
|
|
if self.phase < len(self.recipe.phases) - 1:
|
|
self.phase += 1
|
|
phase = self.recipe.phases[self.phase]
|
|
self.loadphase(self.phase)
|
|
self.manual.clear()
|
|
return phase
|
|
self.onupdate()
|
|
return None
|
|
|
|
def phase_time(self, arg=None):
|
|
start = self.envdata.get('current-phase-loadtime', 0)
|
|
return start and self.now() - start
|
|
|
|
def recipe_time(self, arg=None):
|
|
start = self.envdata.get('current-recipe-loadtime', 0)
|
|
return start and self.now() - start
|
|
|
|
def loadphase(self, phaseidx):
|
|
phase = self.recipe.phases[phaseidx]
|
|
self.envdata['current-phase-loadtime'] = self.now()
|
|
self.envdata['current-phase-start-consumption'] = self.get_total_consumption()
|
|
resp = safe_eval(phase.onload, self.env)
|
|
self.onupdate()
|
|
return resp
|
|
|
|
def done(self, message=None):
|
|
self.envdata['recipe-done-consumption'] = self.get_total_consumption()
|
|
self.recipe.done(self.env, message)
|
|
# Stop all actuators
|
|
for controller in self.envdata.get('controllers', {}).keys():
|
|
actuators[controller].disable()
|
|
self.recipe = None
|
|
self.env = self.baseenv
|
|
self.envdata = {}
|
|
self.onupdate()
|
|
|
|
def set_target(self, args):
|
|
sensor, target = (stringify(args.car, quote=False), args.cdr.car)
|
|
# We get the controller, and set it to target
|
|
value, controller, _ = self.envdata['sensors'][sensor]
|
|
self.envdata['sensors'][sensor] = (value, controller, target)
|
|
self.envdata['controllers'][controller].set_target(target)
|
|
print('setting sensor', sensor, controller, 'to target', target)
|
|
|
|
def get_sensor_target(self, args):
|
|
sensor = stringify(args.car, quote=False)
|
|
if sensor not in self.envdata['sensors'].keys():
|
|
print(f'Sensor {sensor} not found')
|
|
return None
|
|
_, controller, target = self.envdata['sensors'][sensor]
|
|
if controller is None:
|
|
print('Sensor has no associated controller')
|
|
return None
|
|
return target
|
|
|
|
def get_controller_target(self, args):
|
|
controller = stringify(args.car, quote=False)
|
|
if controller not in self.envdata['controllers'].keys():
|
|
print(f'Controller {controller} not found')
|
|
return None
|
|
return self.envdata['controllers'][controller].get_target()
|
|
|
|
def set_controller(self, args):
|
|
controller, sensor = (stringify(args.car, quote=False),
|
|
stringify(args.cdr.car, quote=False))
|
|
self.envdata['controllers'][controller].set_input_label(sensor)
|
|
# Value/Controller/Target
|
|
self.envdata['sensors'][sensor] = (get_sensor_value(sensor), controller, None)
|
|
print('linking sensor', sensor, 'to controller', controller,
|
|
'(', self.envdata['controllers'][controller], ')')
|
|
|
|
def set_actuator(self, args):
|
|
actuator, value = (stringify(args.car, quote=False), args.cdr.car)
|
|
if value:
|
|
actuators[actuator].enable()
|
|
else:
|
|
actuators[actuator].disable()
|
|
print('setting actuator', actuator, 'to value', value,
|
|
', remember to change it manually if stopping?')
|
|
|
|
def get_total_consumption(self):
|
|
return {actuator: actuators[actuator].consumption() for
|
|
actuator in actuators.keys()}
|
|
|
|
def phase_consumption(self):
|
|
before = self.envdata.get('current-phase-start-consumption', {})
|
|
after = self.get_total_consumption()
|
|
return consumption_diff(before, after)
|
|
|
|
def recipe_consumption(self):
|
|
return consumption_diff(
|
|
self.envdata.get('recipe-start-consumption', {}),
|
|
self.get_total_consumption())
|
|
|
|
|
|
class Recipe():
|
|
def __init__(self, name, description='',
|
|
controllers=(),
|
|
phases=[]):
|
|
self.name = name
|
|
self.controllers = controllers
|
|
self.description = description
|
|
self.phases = phases
|
|
|
|
def getState(self, currentphase=None):
|
|
def phasetodict(i, p):
|
|
return {
|
|
'name': p.name,
|
|
'text': p.text,
|
|
'nextcond': p.nextcond,
|
|
'current': currentphase == i,
|
|
# 'loaded_time': p.loaded_time,
|
|
'onload': p.onload,
|
|
'onexit': p.onexit,
|
|
# 'envdata': p.envdata
|
|
}
|
|
|
|
return {
|
|
'name': self.name,
|
|
'description': self.description,
|
|
'controllers': self.controllers,
|
|
'phases': [
|
|
phasetodict(i, p)
|
|
for (i, p) in enumerate(self.phases)
|
|
],
|
|
# 'phase': self.phase
|
|
}
|
|
|
|
def done(self, env, message=None):
|
|
message = message or '''
|
|
(concat "Recipe " (recipe-name) " completed at " (now)
|
|
" in " (format-time (recipe-time)) ", Consumption: " (recipe-consumption))
|
|
'''
|
|
#
|
|
safe_eval(f'(notify {message})', env)
|
|
|
|
class Phase():
|
|
def __init__(self, name, text='', nextcond='#t', onload='', onexit=''):
|
|
self.name = name
|
|
self.text = text
|
|
self.nextcond = nextcond
|
|
self.onload = onload
|
|
self.onexit = onexit
|
|
# self.loaded_time = None
|
|
# self.env = env
|
|
# self.envdata = {}
|
|
|
|
def satisfied_p(self, env):
|
|
return safe_eval(self.nextcond, env)
|
|
|
|
def exit(self, env):
|
|
value = safe_eval(self.onexit, env)
|
|
message = '''
|
|
(notify
|
|
(concat "Phase done in " (format-time (phase-time))
|
|
", with consumption: " (phase-consumption)))
|
|
'''
|
|
safe_eval(message, env)
|
|
return value
|
|
|
|
def revert(self, env):
|
|
# Undo state changes (attuators values)
|
|
pass
|