270 lines
9.0 KiB
Python
270 lines
9.0 KiB
Python
# Simple Finite State Machine used to control states
|
|
import json
|
|
from scm import eval, _, GLOBAL_ENV, stringify, load
|
|
from time import perf_counter
|
|
from chat import chat
|
|
from persona import ManualIntervention
|
|
from sensors import get_sensor_value
|
|
from control import fixed_duty_wrapper
|
|
from actuators import actuators
|
|
|
|
MY_GLOBAL_ENV = (
|
|
_('make-duty-controller', 3, fixed_duty_wrapper,
|
|
GLOBAL_ENV))
|
|
|
|
def safe_eval(data, env):
|
|
try:
|
|
return eval(data, env)
|
|
except Exception as e:
|
|
err = ' '.join(('evaluating:', data, ':', str(e)))
|
|
chat.send_sync(err)
|
|
return True
|
|
|
|
def load_phase(json):
|
|
def fallback(v):
|
|
return json.get(v, '#t')
|
|
return Phase(
|
|
json['name'], json['text'],
|
|
onexit=fallback('on_exit'),
|
|
onload=fallback('on_load'),
|
|
nextcond=fallback('exit_condition'))
|
|
|
|
def load_recipe(json):
|
|
return Recipe(
|
|
json['name'], json['description'],
|
|
json.get('controllers', ()),
|
|
[load_phase(p) for p in json['phases']])
|
|
|
|
def load_recipes(file):
|
|
with open(file, "r") as f:
|
|
recipes = json.loads(f.read())
|
|
return [load_recipe(recipe) for recipe in recipes]
|
|
|
|
def store_recipes(file, recipes):
|
|
print(file, recipes)
|
|
with open(file, "w") as f:
|
|
f.write(json.dumps(recipes, indent=2))
|
|
|
|
def manual_intervention_wrapper(state):
|
|
def wrapper(args):
|
|
label = None
|
|
options = None
|
|
if len(args) > 0:
|
|
label = stringify(args.car, quote=False)
|
|
if len(args) > 1:
|
|
options = tuple(
|
|
(stringify(o.car, quote=False), o.cdr.car)
|
|
for o in args.cdr.car)
|
|
resp = state.manual.get(label, options)
|
|
if resp[0]:
|
|
print('REQUESTING CLIENT UPDATE')
|
|
state.onupdate()
|
|
return resp[1]
|
|
return wrapper
|
|
|
|
|
|
class State():
|
|
def __init__(self, recipe_file, onupdate=lambda: None):
|
|
self.recipes = load_recipes(recipe_file)
|
|
self.recipe = None
|
|
self.phase = None
|
|
self.manual = ManualIntervention()
|
|
self.baseenv = (
|
|
_('get-sensor', 1, lambda x: get_sensor_value(stringify(x.car, quote=False)),
|
|
_('notify', 1, lambda x: chat.send_sync(stringify(x.car, quote=False)),
|
|
_('hours', 1, lambda x: x.car * 60 * 60,
|
|
_('minutes', 1, lambda x: x.car * 60,
|
|
_('seconds', 1, lambda x: x.car,
|
|
_('manual-intervention', -1, manual_intervention_wrapper(self),
|
|
_('set-target', 2, self.set_target,
|
|
_('set-controller', 2, self.set_controller,
|
|
MY_GLOBAL_ENV)))))))))
|
|
self.env = self.baseenv
|
|
self.envdata = {}
|
|
self.onupdate = onupdate
|
|
|
|
def getState(self):
|
|
return {
|
|
'phase': self.phase
|
|
}
|
|
|
|
def now(self):
|
|
return perf_counter()
|
|
|
|
def postload(self):
|
|
if self.recipe is None:
|
|
return
|
|
self.phase = 0
|
|
self.env = (
|
|
_('recipe-name', 0, lambda x: self.recipe.name,
|
|
_('recipe-description', 0, lambda x: self.recipe.description,
|
|
_('time-in-this-phase', 0, lambda x: self.phase_time(),
|
|
self.baseenv))))
|
|
self.envdata['controllers'] = {}
|
|
for controller in self.recipe.controllers:
|
|
name, ctrl = safe_eval(controller, self.env)
|
|
self.envdata['controllers'][name] = ctrl
|
|
self.envdata['sensors'] = {}
|
|
self.loadphase(self.phase)
|
|
|
|
def recipeById(self, id):
|
|
try:
|
|
id = int(id)
|
|
except:
|
|
id = -1
|
|
print(self.recipes)
|
|
if id > -1 and id < len(self.recipes):
|
|
return self.recipes[id]
|
|
return None
|
|
|
|
def loadByIdx(self, recipe):
|
|
self.recipe = self.recipeById(recipe)
|
|
self.postload()
|
|
|
|
def stop(self):
|
|
self.current_phase().exit(self.env)
|
|
self.done(message='(concat "Recipe " (recipe-name) " stopped manually")')
|
|
|
|
def loadByName(self, recipe):
|
|
r = tuple(i for (i, r) in enumerate(self.recipes) if r.name == recipe)
|
|
if len(r) > 0:
|
|
self.recipe = self.recipes[r[0]]
|
|
self.postload()
|
|
|
|
def current_phase(self):
|
|
if self.recipe is None:
|
|
return None
|
|
if self.phase >= len(self.recipe.phases) or self.phase < 0:
|
|
return None
|
|
return self.recipe.phases[self.phase]
|
|
|
|
def check(self):
|
|
return self.current_phase().satisfied_p(self.env)
|
|
|
|
def run_step(self):
|
|
# Update sensor values
|
|
for sensor in self.envdata.get('sensors', ()):
|
|
_, ctrl, target = self.envdata['sensors'][sensor]
|
|
self.envdata['sensors'][sensor] = (get_sensor_value(sensor), ctrl, target)
|
|
# Apply actuators
|
|
for controller in self.envdata.get('controllers', {}).keys():
|
|
ctrl = self.envdata['controllers'][controller]
|
|
try:
|
|
if ctrl.input_label not in self.envdata['sensors'].keys():
|
|
print(f'Missing sensor {ctrl.input_label}')
|
|
continue
|
|
response = ctrl.apply(self.envdata['sensors'][ctrl.input_label][0])
|
|
actuators[controller].enable(response)
|
|
except:
|
|
print("UNKNOWN BUG HERE in run_step")
|
|
if self.check():
|
|
if self.next() is None:
|
|
return True
|
|
print(f'Waiting for {self.current_phase().nextcond}')
|
|
return False
|
|
|
|
def next(self):
|
|
self.current_phase().exit(self.env)
|
|
if self.phase < len(self.recipe.phases) - 1:
|
|
self.phase += 1
|
|
phase = self.recipe.phases[self.phase]
|
|
self.loadphase(self.phase)
|
|
self.manual.clear()
|
|
return phase
|
|
self.onupdate()
|
|
return None
|
|
|
|
def phase_time(self, arg=None):
|
|
return (self.now() - self.envdata['current-phase-loadtime'])
|
|
|
|
def loadphase(self, phaseidx):
|
|
phase = self.recipe.phases[phaseidx]
|
|
self.envdata['current-phase-loadtime'] = self.now()
|
|
resp = safe_eval(phase.onload, self.env)
|
|
self.onupdate()
|
|
return resp
|
|
|
|
def done(self, message=None):
|
|
self.recipe.done(self.env, message)
|
|
# Stop all actuators
|
|
for controller in self.envdata.get('controllers', {}).keys():
|
|
actuators[controller].disable()
|
|
self.recipe = None
|
|
self.env = self.baseenv
|
|
self.envdata = {}
|
|
self.onupdate()
|
|
|
|
def set_target(self, args):
|
|
sensor, target = (stringify(args.car, quote=False), args.cdr.car)
|
|
value, controller, _ = self.envdata['sensors'][sensor]
|
|
self.envdata['sensors'][sensor] = (value, controller, target)
|
|
self.envdata['controllers'][controller].set_target(target)
|
|
print('setting sensor', sensor, controller, 'to target', target)
|
|
|
|
def set_controller(self, args):
|
|
controller, sensor = (stringify(args.car, quote=False),
|
|
stringify(args.cdr.car, quote=False))
|
|
self.envdata['controllers'][controller].set_input_label(sensor)
|
|
# Value/Controller/Target
|
|
self.envdata['sensors'][sensor] = (get_sensor_value(sensor), controller, None)
|
|
print('linking sensor', sensor, 'to controller', controller,
|
|
'(', self.envdata['controllers'][controller], ')')
|
|
|
|
class Recipe():
|
|
def __init__(self, name, description='',
|
|
controllers=(),
|
|
phases=[]):
|
|
self.name = name
|
|
self.controllers = controllers
|
|
self.description = description
|
|
self.phases = phases
|
|
|
|
def getState(self, currentphase=None):
|
|
def phasetodict(i, p):
|
|
return {
|
|
'name': p.name,
|
|
'text': p.text,
|
|
'nextcond': p.nextcond,
|
|
'current': currentphase == i,
|
|
# 'loaded_time': p.loaded_time,
|
|
'onload': p.onload,
|
|
'onexit': p.onexit,
|
|
# 'envdata': p.envdata
|
|
}
|
|
|
|
return {
|
|
'name': self.name,
|
|
'description': self.description,
|
|
'controllers': self.controllers,
|
|
'phases': [
|
|
phasetodict(i, p)
|
|
for (i, p) in enumerate(self.phases)
|
|
],
|
|
# 'phase': self.phase
|
|
}
|
|
|
|
def done(self, env, message=None):
|
|
message = message or '(concat "Recipe " (recipe-name) " completed at " (now))'
|
|
safe_eval(f'(notify {message})', env)
|
|
|
|
class Phase():
|
|
def __init__(self, name, text='', nextcond='#t', onload='', onexit=''):
|
|
self.name = name
|
|
self.text = text
|
|
self.nextcond = nextcond
|
|
self.onload = onload
|
|
self.onexit = onexit
|
|
# self.loaded_time = None
|
|
# self.env = env
|
|
# self.envdata = {}
|
|
|
|
def satisfied_p(self, env):
|
|
return safe_eval(self.nextcond, env)
|
|
|
|
def exit(self, env):
|
|
return safe_eval(self.onexit, env)
|
|
|
|
def revert(self, env):
|
|
# Undo state changes (attuators values)
|
|
pass
|