# Simple Finite State Machine used to control states import json from scm import eval, _, GLOBAL_ENV, stringify, load from time import perf_counter from chat import chat from persona import ManualIntervention from sensors import get_sensor_value from control import fixed_duty_wrapper from actuators import actuators from utils import consumption_diff, consumption_to_string, stohms MY_GLOBAL_ENV = ( _('make-duty-controller', 3, fixed_duty_wrapper, _('format-time', 1, lambda x: stohms(x.car), GLOBAL_ENV))) def safe_eval(data, env): try: return eval(data, env) except Exception as e: err = ' '.join(('evaluating:', data, ':', str(e))) chat.send_sync(err) return True def load_phase(json): def fallback(v): return json.get(v, '#t') return Phase( json['name'], json['description'], onexit=fallback('on_exit'), onload=fallback('on_load'), nextcond=fallback('exit_condition')) def load_recipe(json): return Recipe( json['name'], json['description'], json.get('controllers', ()), [load_phase(p) for p in json['phases']]) def load_recipes(file): with open(file, "r") as f: recipes = json.loads(f.read()) return [load_recipe(recipe) for recipe in recipes] def store_recipes(file, recipes): print(file, recipes) with open(file, "w") as f: f.write(json.dumps(recipes, indent=2)) def manual_intervention_wrapper(state): def wrapper(args): label = None options = None if len(args) > 0: label = stringify(args.car, quote=False) if len(args) > 1: options = tuple( (stringify(o.car, quote=False), o.cdr.car) for o in args.cdr.car) resp = state.manual.get(label, options) if resp[0]: print('REQUESTING CLIENT UPDATE') state.onupdate() return resp[1] return wrapper class State(): def __init__(self, recipe_file, onupdate=lambda: None): self.recipes = load_recipes(recipe_file) self.recipe_file = recipe_file self.recipe = None self.phase = None self.manual = ManualIntervention() self.baseenv = ( _('get-sensor', 1, lambda x: get_sensor_value(stringify(x.car, quote=False)), _('set-actuator', 2, self.set_actuator, _('notify', 1, lambda x: chat.send_sync(stringify(x.car, quote=False)), _('hours', 1, lambda x: x.car * 60 * 60, _('minutes', 1, lambda x: x.car * 60, _('seconds', 1, lambda x: x.car, _('manual-intervention', -1, manual_intervention_wrapper(self), _('set-target', 2, self.set_target, _('set-controller', 2, self.set_controller, _('reload-recipes', 0, lambda x: self.reloadRecipes(), MY_GLOBAL_ENV))))))))))) self.env = self.baseenv self.envdata = {} self.onupdate = onupdate def reloadRecipes(self): # chat.send_sync("Reloading recipes!") print("Reloading recipes!") self.recipes = load_recipes(self.recipe_file) def getState(self): return { 'phase': self.phase, 'phase-time': int(self.phase_time()), 'phase-consumption': consumption_to_string(self.phase_consumption()), 'recipe-time': int(self.recipe_time()), 'recipe-consumption': consumption_to_string(self.recipe_consumption()) } def now(self): return perf_counter() def postload(self): if self.recipe is None: return self.phase = 0 self.env = ( _('recipe-name', 0, lambda _: self.recipe.name, _('recipe-description', 0, lambda _: self.recipe.description, _('recipe-consumption', 0, lambda _: self.recipe_consumption(), _('recipe-time', 0, lambda _: self.recipe_time(), _('time-in-this-phase', 0, lambda _: self.phase_time(), _('phase-consumption', 0, lambda _: self.phase_consumption(), _('phase-time', 0, lambda _: self.phase_time(), self.baseenv)))))))) self.envdata['controllers'] = {} for controller in self.recipe.controllers: name, ctrl = safe_eval(controller, self.env) self.envdata['controllers'][name] = ctrl self.envdata['sensors'] = {} self.envdata['recipe-start-consumption'] = self.get_total_consumption() self.envdata['current-recipe-loadtime'] = self.now() self.loadphase(self.phase) def recipeById(self, id): try: id = int(id) except: id = -1 print(self.recipes) if id > -1 and id < len(self.recipes): return self.recipes[id] return None def loadByIdx(self, recipe): self.recipe = self.recipeById(recipe) self.postload() def stop(self): self.current_phase().exit(self.env) self.done(message='(concat "Recipe " (recipe-name) " stopped manually")') def loadByName(self, recipe): r = tuple(i for (i, r) in enumerate(self.recipes) if r.name == recipe) if len(r) > 0: self.recipe = self.recipes[r[0]] self.postload() def current_phase(self): if self.recipe is None: return None if self.phase >= len(self.recipe.phases) or self.phase < 0: return None return self.recipe.phases[self.phase] def check(self): return self.current_phase().satisfied_p(self.env) def run_step(self): # Update sensor values for sensor in self.envdata.get('sensors', ()): _, ctrl, target = self.envdata['sensors'][sensor] self.envdata['sensors'][sensor] = (get_sensor_value(sensor), ctrl, target) # Apply actuators for controller in self.envdata.get('controllers', {}).keys(): ctrl = self.envdata['controllers'][controller] try: if ctrl.input_label not in self.envdata['sensors'].keys(): print(f'Missing sensor {ctrl.input_label}') continue response = ctrl.apply(self.envdata['sensors'][ctrl.input_label][0]) actuators[controller].enable(response) except Exception as e: print(f"UNKNOWN BUG HERE in run_step {e}") if self.check(): if self.next() is None: return True print(f'Waiting for {self.current_phase().nextcond}') return False def next(self): self.current_phase().exit(self.env) if self.phase < len(self.recipe.phases) - 1: self.phase += 1 phase = self.recipe.phases[self.phase] self.loadphase(self.phase) self.manual.clear() return phase self.onupdate() return None def phase_time(self, arg=None): start = self.envdata.get('current-phase-loadtime', 0) return start and self.now() - start def recipe_time(self, arg=None): start = self.envdata.get('current-recipe-loadtime', 0) return start and self.now() - start def loadphase(self, phaseidx): phase = self.recipe.phases[phaseidx] self.envdata['current-phase-loadtime'] = self.now() self.envdata['current-phase-start-consumption'] = self.get_total_consumption() resp = safe_eval(phase.onload, self.env) self.onupdate() return resp def done(self, message=None): self.envdata['recipe-done-consumption'] = self.get_total_consumption() self.recipe.done(self.env, message) # Stop all actuators for controller in self.envdata.get('controllers', {}).keys(): actuators[controller].disable() self.recipe = None self.env = self.baseenv self.envdata = {} self.onupdate() def set_target(self, args): sensor, target = (stringify(args.car, quote=False), args.cdr.car) value, controller, _ = self.envdata['sensors'][sensor] self.envdata['sensors'][sensor] = (value, controller, target) self.envdata['controllers'][controller].set_target(target) print('setting sensor', sensor, controller, 'to target', target) def set_controller(self, args): controller, sensor = (stringify(args.car, quote=False), stringify(args.cdr.car, quote=False)) self.envdata['controllers'][controller].set_input_label(sensor) # Value/Controller/Target self.envdata['sensors'][sensor] = (get_sensor_value(sensor), controller, None) print('linking sensor', sensor, 'to controller', controller, '(', self.envdata['controllers'][controller], ')') def set_actuator(self, args): actuator, value = (stringify(args.car, quote=False), args.cdr.car) if value: actuators[actuator].enable() else: actuators[actuator].disable() print('setting actuator ', actuator, 'to value', value, ', remember to change it manually if stopping?') def get_total_consumption(self): return {actuator: actuators[actuator].consumption() for actuator in actuators.keys()} def phase_consumption(self): before = self.envdata.get('current-phase-start-consumption', {}) after = self.get_total_consumption() return consumption_diff(before, after) def recipe_consumption(self): return consumption_diff( self.envdata.get('recipe-start-consumption', {}), self.get_total_consumption()) class Recipe(): def __init__(self, name, description='', controllers=(), phases=[]): self.name = name self.controllers = controllers self.description = description self.phases = phases def getState(self, currentphase=None): def phasetodict(i, p): return { 'name': p.name, 'text': p.text, 'nextcond': p.nextcond, 'current': currentphase == i, # 'loaded_time': p.loaded_time, 'onload': p.onload, 'onexit': p.onexit, # 'envdata': p.envdata } return { 'name': self.name, 'description': self.description, 'controllers': self.controllers, 'phases': [ phasetodict(i, p) for (i, p) in enumerate(self.phases) ], # 'phase': self.phase } def done(self, env, message=None): message = message or ''' (concat "Recipe " (recipe-name) " completed at " (now) " in " (format-time (recipe-time)) ", Consumption: " (recipe-consumption)) ''' # safe_eval(f'(notify {message})', env) class Phase(): def __init__(self, name, text='', nextcond='#t', onload='', onexit=''): self.name = name self.text = text self.nextcond = nextcond self.onload = onload self.onexit = onexit # self.loaded_time = None # self.env = env # self.envdata = {} def satisfied_p(self, env): return safe_eval(self.nextcond, env) def exit(self, env): value = safe_eval(self.onexit, env) message = ''' (notify (concat "Phase done in " (format-time (phase-time)) ", with consumption: " (phase-consumption))) ''' safe_eval(message, env) return value def revert(self, env): # Undo state changes (attuators values) pass