hakkoso/app.py

114 lines
2.9 KiB
Python
Raw Normal View History

import json
from flask import Flask, Response
from flask import render_template, render_template_string, escape
from flask import request, send_file
from flask_socketio import SocketIO, join_room
from random import randint
from urllib.parse import urlparse, urlunparse
from os.path import exists
from phasectrl import State, ManualIntervention
from sensors import sensors
import phasectrl
from time import perf_counter, sleep
def create_app():
app = Flask('hakkoso')
app.config['DATABASE'] = 'server.db'
return app
app = create_app()
socketio = SocketIO(app, async_mode='threading')
def updateState():
socketio.emit('state', make_state())
print('SENT STATE UPDATE', make_state())
statemachine = State('recipes.json', updateState)
def make_state():
return {
'manual': statemachine.manual.getState(),
'recipes': tuple(r.getState() for r in statemachine.recipes),
'recipe': (statemachine.recipe.getState(statemachine.phase)
if statemachine.recipe else None),
'state': statemachine.getState(),
'sensors': {
'units': {
'Temperature': '°C',
'Acidity': 'pH',
'Humidity': 'RH',
'Durezza Acqua': 'BOH',
},
'found': sensors.list()
}
}
@app.route('/')
def index():
return render_template('index.html')
@app.route('/recipe-editor')
def recipe_editor():
return render_template('recipe-editor.html')
@app.route('/status')
def status():
return make_state()
@app.route('/dist/<filename>')
def dist(filename):
# TODO: check file exists and path is under src
filename = 'dist/' + filename
if exists(filename):
return send_file(filename)
print('Missing', filename)
return ''
@app.route('/recipe')
def recipe():
return render_template('recipe.html', recipe=phasectrl.recipe)
@socketio.on('new client')
def handle_new_client():
updateState()
socketio.emit('sensor history', sensors.get_history())
@socketio.on('manual response')
def handle_manual_response(response):
if statemachine.recipe is not None:
statemachine.manual.set(response)
updateState()
@socketio.on('load recipe idx')
def load_recipe_idx(idx):
statemachine.loadByIdx(idx)
def run_recipes():
while True:
while statemachine.recipe is not None:
done = statemachine.run_step()
if done:
statemachine.done()
updateState()
sleep(1)
def read_esnsors():
while True:
sensors.read()
socketio.emit('sensors', (sensors.get(),))
sleep(0.5)
import threading
if __name__ == '__main__':
thread = threading.Thread(target=run_recipes, daemon=True)
thread.start()
sensors_thread = threading.Thread(target=read_esnsors, daemon=True)
sensors_thread.start()
print('RUN APP')
socketio.run(app)