from . import controllers from scm import stringify from time import perf_counter class FixedDutyCycle(controllers.Controller): def __init__(self, *args, **kwargs): self.start = kwargs.pop('start_time', perf_counter()) self.period = kwargs.pop('period') self.set_duty(kwargs.pop('duty_perc')) operator = kwargs.pop('operator', '<') functions = { '<': lambda a, b: a < b, '>': lambda a, b: a > b, '==': lambda a, b: a == b, } self.offset = 0.1 if operator == "<" else -0.1 self.operator = functions[operator] super().__init__(*args, **kwargs) self.last_time = None self.last_output = False self.total_failed_read = 0 self.failed_read = 0 def set_duty(self, duty=None): # Updates the duty cycle to a point in time where the switch # must occur if duty is not None: self.duty_perc = duty self.duty = self.duty_perc/100 * self.period def set_period(self, period): # Shift start point so that we know where in the new period we are. # Then, move period # "|-----X----------------|" punto % old_period # "|-----X----|-----------|" start = now - (punto % old_period) # "|----|X----|-----|-----|" self.start = self.last_time - (self.timepoint() % self.period) print(period, self.start) self.period = period self.set_duty() def timepoint(self): return ((self.last_time - self.start) % self.period) def one_cycle(self, time): self.last_time = time if self.input is None: self.total_failed_read += 1 self.failed_read += 1 # TODO: find a sane default failed read value if self.failed_read > 100: print("ERROR: TOO MANY CONSEQUTIVE FAILED READ; DISABLED") return False if self.input is None: print("WARNING: READ FAILED, KEEPING LAST ESTIMATE") return self.last_output self.failed_read = 0 target = self.target + self.offset * (1 if self.last_output else -1) self.last_output = (self.timepoint() < self.duty) if ( self.operator(self.input, target)) else False return self.last_output # 0. refractary_period is fixed (e.g. 30s, depends on the hardware) # 1. period = refractary_period / max(min(duty, 1-duty), 0.01) # this guarantees that the shortest period is always the refractary period # Store a) ON/OFF HISTORY and the number of times we inhibit control # The duty must be adjusted automatically # a) Take the mean() of the history # b) if mean << duty: current duty is too high, set it to mean # c) if mean = duty: current duty is either too low (so we are always on, when we can) or it is fine. # use the percentage of skipped periods over total number of tests # HOW LONG MUST BE THE HISTORY? def fixed_duty_wrapper(args): name, duty, period, op = stringify( args.car, quote=False), args.cdr.car, args.cdr.cdr.car, args.cdr.cdr.cdr.car print('name, duty, period, operation', name, duty*100, period, op) return (name, FixedDutyCycle( target=None, # reach_period_s=60*60, duty_perc=duty*100, period=period, start_time=perf_counter(), operator=op))