2022-11-19 18:52:01 +01:00
|
|
|
from . import controllers
|
|
|
|
from scm import stringify
|
|
|
|
from time import perf_counter
|
|
|
|
|
|
|
|
class FixedDutyCycle(controllers.Controller):
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
self.start = kwargs.pop('start_time', perf_counter())
|
|
|
|
self.period = kwargs.pop('period')
|
|
|
|
self.set_duty(kwargs.pop('duty_perc'))
|
2023-02-25 10:56:13 +01:00
|
|
|
operator = kwargs.pop('operator', '<')
|
|
|
|
functions = {
|
|
|
|
'<': lambda (a, b): a < b,
|
|
|
|
'>': lambda (a, b): a > b,
|
|
|
|
'==': lambda (a, b): a == b,
|
|
|
|
}
|
|
|
|
self.operator = functions[operator]
|
2022-11-19 18:52:01 +01:00
|
|
|
super().__init__(*args, **kwargs)
|
|
|
|
self.last_time = None
|
2023-02-21 16:34:40 +01:00
|
|
|
self.total_failed_read = 0
|
|
|
|
self.failed_read = 0
|
2022-11-19 18:52:01 +01:00
|
|
|
|
|
|
|
def set_duty(self, duty=None):
|
|
|
|
# Updates the duty cycle to a point in time where the switch
|
|
|
|
# must occur
|
|
|
|
if duty is not None:
|
|
|
|
self.duty_perc = duty
|
|
|
|
self.duty = self.duty_perc/100 * self.period
|
|
|
|
|
|
|
|
def set_period(self, period):
|
|
|
|
# Shift start point so that we know where in the new period we are.
|
|
|
|
# Then, move period
|
|
|
|
# "|-----X----------------|" punto % old_period
|
|
|
|
# "|-----X----|-----------|" start = now - (punto % old_period)
|
|
|
|
# "|----|X----|-----|-----|"
|
|
|
|
self.start = self.last_time - (self.timepoint() % self.period)
|
|
|
|
print(period, self.start)
|
|
|
|
self.period = period
|
|
|
|
self.set_duty()
|
|
|
|
|
|
|
|
def timepoint(self):
|
|
|
|
return ((self.last_time - self.start) % self.period)
|
|
|
|
|
|
|
|
def one_cycle(self, time):
|
|
|
|
self.last_time = time
|
2023-02-21 16:34:40 +01:00
|
|
|
if self.input is None:
|
|
|
|
self.total_failed_read += 1
|
|
|
|
self.failed_read += 1
|
|
|
|
# TODO: WARN/EXIT IF WE HAVE TOO MANY FAILED READS?
|
|
|
|
return False
|
|
|
|
self.failed_read = 0
|
2022-11-19 18:52:01 +01:00
|
|
|
return (self.timepoint() < self.duty) if (
|
2023-02-25 10:56:13 +01:00
|
|
|
self.operator(self.input, self.target)) else False
|
2022-11-19 18:52:01 +01:00
|
|
|
|
|
|
|
|
2023-02-21 16:34:40 +01:00
|
|
|
# 0. refractary_period is fixed (e.g. 30s, depends on the hardware)
|
|
|
|
# 1. period = refractary_period / max(min(duty, 1-duty), 0.01)
|
|
|
|
# this guarantees that the shortest period is always the refractary period
|
|
|
|
# Store a) ON/OFF HISTORY and the number of times we inhibit control
|
|
|
|
# The duty must be adjusted automatically
|
|
|
|
# a) Take the mean() of the history
|
|
|
|
# b) if mean << duty: current duty is too high, set it to mean
|
|
|
|
# c) if mean = duty: current duty is either too low (so we are always on, when we can) or it is fine.
|
|
|
|
# use the percentage of skipped periods over total number of tests
|
|
|
|
# HOW LONG MUST BE THE HISTORY?
|
|
|
|
|
2022-11-19 18:52:01 +01:00
|
|
|
def fixed_duty_wrapper(args):
|
2023-02-25 10:56:13 +01:00
|
|
|
name, duty, period, op = stringify(
|
|
|
|
args.car, quote=False), args.cdr.car, args.cdr.cdr.car, args.cdr.cdr.cdr.car
|
|
|
|
print('name, duty, period, operation', name, duty*100, period, op)
|
2022-11-19 18:52:01 +01:00
|
|
|
return (name, FixedDutyCycle(
|
|
|
|
target=None, # reach_period_s=60*60,
|
|
|
|
duty_perc=duty*100, period=period,
|
2023-02-25 10:56:13 +01:00
|
|
|
start_time=perf_counter(), operator=op))
|