Merge pull request #3 from NateSchoolfield/FigureTest

Figure test
This commit is contained in:
NateSchoolfield 2021-02-05 13:21:58 -08:00 committed by GitHub
commit c7a09e3474
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 112 additions and 40 deletions

View File

@ -10,8 +10,10 @@ import sleepdata, vibrate
auth_key_filename = 'auth_key.txt'
mac_filename = 'mac.txt'
maximize_graph = False
vibration_settings = {
'interval_minutes': 0.2,
'interval_minutes': 45,
'duration_seconds': 5,
'type': 'random'
}
@ -68,7 +70,7 @@ def sleep_monitor_callback(data):
if not sleepdata.last_tick_time:
sleepdata.last_tick_time = time.time()
if data[0] == "GYRO":
if data[0] == "GYRO_RAW":
sleepdata.process_gyro_data(data[1], tick_time)
elif data[0] == "HR":
sleepdata.process_heartrate_data(data[1], tick_time)
@ -120,7 +122,7 @@ if __name__ == "__main__":
connect()
threading.Thread(target=start_data_pull).start()
threading.Thread(target=start_vibration).start()
#sleepdata.init_graph()
sleepdata.init_graph(maximize=maximize_graph, graph_displaytime_mins=5)

View File

@ -88,3 +88,5 @@ class QUEUE_TYPES(object):
HEART = 'heart'
RAW_ACCEL = 'raw_accel'
RAW_HEART = 'raw_heart'
RAW_GYRO = 'raw_gyro'
AVG_GYRO = 'avg_gyro'

View File

@ -1,7 +1,6 @@
import sys, os, time
import logging
import struct
import binascii
from bytepatterns import miband4 as bytepattern
@ -50,14 +49,26 @@ class Delegate(DefaultDelegate):
self.device.queue.put((QUEUE_TYPES.RAW_ACCEL, data))
elif len(data) == 16:
self.device.queue.put((QUEUE_TYPES.RAW_HEART, data))
else:
print("Unhandled data on handle 0x38: {}".format(data))
elif hnd == self.device._char_hz.getHandle():
if len(data) == 20 and struct.unpack('b', data[0:1])[0] == 1:
self.device.queue.put((QUEUE_TYPES.RAW_ACCEL, data))
elif len(data) == 11:
#print("Unknown data: {}".format(bytes.hex(data, " ")))
#print(struct.unpack('BBBBBBBBBB', data[1:]))
# Seems to be a counter of the time the gyro is enabled.
#print(struct.unpack(">x2L", data))
#print(struct.unpack("<x5H", data))
...
elif len(data) == 8:
self.device.queue.put((QUEUE_TYPES.AVG_GYRO, data))
else:
print ("Unhandled handle: " + str(hnd) + " | Data: " + str(data))
#print("Unknown sensor data ({}): {}".format(len(data), bytes.hex(data, " ")))
...
else:
#print ("Unhandled handle: " + str(hnd) + " | Data: " + bytes.hex(data, " "))
...
class miband(Peripheral):
@ -80,6 +91,7 @@ class miband(Peripheral):
self.heart_measure_callback = None
self.heart_raw_callback = None
self.gyro_raw_callback = None
self.gyro_avg_callback = None
self.auth_key = key
self.queue = Queue()
self.write_queue = Queue()
@ -120,6 +132,7 @@ class miband(Peripheral):
self.waitForNotifications(0.1)
self.setDelegate( Delegate(self) )
def _auth_notif(self, enabled):
if enabled:
self._log.info("Enabling Auth Service notifications status...")
@ -130,6 +143,7 @@ class miband(Peripheral):
else:
self._log.error("Something went wrong while changing the Auth Service notifications status...")
def _auth_previews_data_notif(self, enabled):
if enabled:
self._log.info("Enabling Fetch Char notifications status...")
@ -144,6 +158,7 @@ class miband(Peripheral):
self._desc_activity.write(bytepattern.stop, True)
self.activity_notif_enabled = False
def initialize(self):
self._req_rdn()
while True:
@ -158,11 +173,13 @@ class miband(Peripheral):
self._log.error(self.state)
return False
def _req_rdn(self):
self._log.info("Requesting random number...")
self._char_auth.write(bytepattern.request_random_number)
self.waitForNotifications(self.timeout)
def _send_enc_rdn(self, data):
self._log.info("Sending encrypted random number")
cmd = bytepattern.auth_key_prefix + self._encrypt(data)
@ -170,10 +187,12 @@ class miband(Peripheral):
self._char_auth.write(send_cmd)
self.waitForNotifications(self.timeout)
def _encrypt(self, message):
aes = AES.new(self.auth_key, AES.MODE_ECB)
return aes.encrypt(message)
def _get_from_queue(self, _type):
try:
res = self.queue.get(False)
@ -184,30 +203,51 @@ class miband(Peripheral):
return None
return res[1]
def _parse_queue(self):
while True:
try:
res = self.queue.get(False)
_type = res[0]
queue_data = self.queue.get(False)
_type = queue_data[0]
if self.heart_measure_callback and _type == QUEUE_TYPES.HEART:
self.heart_measure_callback(self._parse_heart_measure(res[1]))
self.heart_measure_callback(self._parse_heart_measure(queue_data[1]))
elif self.gyro_raw_callback and _type == QUEUE_TYPES.RAW_ACCEL:
self.gyro_raw_callback(self._parse_raw_gyro(res[1]))
self.gyro_raw_callback(self._parse_raw_gyro(queue_data[1]))
elif self.gyro_avg_callback and _type == QUEUE_TYPES.AVG_GYRO:
self.gyro_avg_callback(self._parse_avg_gyro(queue_data[1]))
except Empty:
break
def _parse_avg_gyro(self, bytes):
gyro_avg_data = struct.unpack('<b3h', bytes[1:])
gyro_dict = {
'gyro_time': gyro_avg_data[0],
'gyro_avg_x': gyro_avg_data[1],
'gyro_avg_y': gyro_avg_data[2],
'gyro_avg_z': gyro_avg_data[3]
}
return_tuple = ['GYRO_AVG', gyro_dict]
return return_tuple
def _parse_heart_measure(self, bytes):
res = struct.unpack('bb', bytes)[1]
return_tuple = ["HR", res]
print("BPM: {}".format(res))
return return_tuple
def _parse_raw_gyro(self, bytes):
res = []
for i in range(3):
g = struct.unpack('hhh', bytes[2 + i * 6:8 + i * 6])
res.append({'x': g[0], 'y': g[1], 'z': g[2]})
return_tuple = ["GYRO", res]
gyro_raw_data_list = []
for i in range(2, 20, 6):
gyro_raw_data = struct.unpack("3h", bytes[i:(i+6)])
gyro_dict = {
'gyro_raw_x': gyro_raw_data[0],
'gyro_raw_y': gyro_raw_data[1],
'gyro_raw_z': gyro_raw_data[2]
}
gyro_raw_data_list.append(gyro_dict)
return_tuple = ["GYRO_RAW", gyro_raw_data_list]
return return_tuple
@ -224,6 +264,7 @@ class miband(Peripheral):
except Empty:
break
def vibrate(self, value):
if value == 255 or value == 0:
# '255' means 'continuous vibration'
@ -242,12 +283,14 @@ class miband(Peripheral):
self.write_cmd(self._char_alert, bytepattern.vibration(value), queued=True)
time.sleep(vibration_duration)
def write_cmd(self, characteristic, data, response=False, queued=False):
if queued:
self.write_queue.put(['write_cmd', [characteristic, data, response]])
else:
characteristic.write(data, withResponse=response)
def write_req(self, handle, data, response=True, queued=False):
if queued:
self.write_queue.put(['write_req', [handle, data, response]])
@ -259,6 +302,7 @@ class miband(Peripheral):
self.process_write_queue()
self.waitForNotifications(wait)
def send_gyro_start(self, sensitivity):
if not self.gyro_started_flag:
self._log.info("Starting gyro...")
@ -266,10 +310,12 @@ class miband(Peripheral):
self.write_req(self._steps_handle, bytepattern.start)
self.write_req(self._hz_handle, bytepattern.start)
self.gyro_started_flag = True
self.write_cmd(self._char_sensor, bytepattern.gyro_start(sensitivity))
#self.write_cmd(self._char_sensor, bytepattern.gyro_start(sensitivity))
self.write_cmd(self._char_sensor, bytes.fromhex('010119'))
self.write_req(self._sensor_handle, bytepattern.stop)
self.write_cmd(self._char_sensor, b'\x02')
def send_heart_measure_start(self):
self._log.info("Starting heart measure...")
self.write_cmd(self._char_heart_ctrl, bytepattern.stop_heart_measure_manual, response=True)
@ -281,12 +327,13 @@ class miband(Peripheral):
def send_heart_measure_keepalive(self):
self.write_cmd(self._char_heart_ctrl, bytepattern.heart_measure_keepalive, response=True)
def start_heart_and_gyro(self, sensitivity, callback):
self.heart_measure_callback = callback
self.gyro_raw_callback = callback
self.send_gyro_start(sensitivity)
self.send_heart_measure_start()
#self.send_heart_measure_start()
heartbeat_time = time.time()
while True:

View File

@ -1,6 +1,6 @@
from datetime import datetime
from os import path
import csv
import csv, time
import matplotlib.pyplot as plt
import matplotlib.animation as animation
@ -17,15 +17,11 @@ sleep_data = {
'value_name': 'movement',
'periods': [10, 30, 60],
'raw_data': [],
'averaged_data': [],
'workspace': {
'gyro_last_x' : 0,
'gyro_last_y' : 0,
'gyro_last_z' : 0
}
'averaged_data': []
}
}
tick_seconds = 0.5
last_tick_time = None
@ -35,9 +31,13 @@ csv_filename_format = '{}_{}.csv'
plt.style.use('dark_background')
graph_figure = plt.figure()
graph_figure.canvas.set_window_title('blesleep')
graph_axes = graph_figure.add_subplot(1, 1, 1)
graph_data = {}
graph_displaytime_minutes = None
last_heartrate = 0
class Average_Gyro_Data():
@ -50,12 +50,12 @@ class Average_Gyro_Data():
def process(self, gyro_data):
gyro_movement = 0
for gyro_datum in gyro_data:
gyro_delta_x = abs(gyro_datum['x'] - self.gyro_last_x)
self.gyro_last_x = gyro_datum['x']
gyro_delta_y = abs(gyro_datum['y'] - self.gyro_last_y)
self.gyro_last_y = gyro_datum['y']
gyro_delta_z = abs(gyro_datum['z'] - self.gyro_last_z)
self.gyro_last_z = gyro_datum['z']
gyro_delta_x = abs(gyro_datum['gyro_raw_x'] - self.gyro_last_x)
self.gyro_last_x = gyro_datum['gyro_raw_x']
gyro_delta_y = abs(gyro_datum['gyro_raw_y'] - self.gyro_last_y)
self.gyro_last_y = gyro_datum['gyro_raw_y']
gyro_delta_z = abs(gyro_datum['gyro_raw_z'] - self.gyro_last_z)
self.gyro_last_z = gyro_datum['gyro_raw_z']
gyro_delta_sum = gyro_delta_x + gyro_delta_y + gyro_delta_z
gyro_movement += gyro_delta_sum
return gyro_movement
@ -107,6 +107,22 @@ def flush_old_raw_data(tick_time):
write_csv(old_raw_data, 'raw')
def flush_old_graph_data(graph_displaytime_minutes):
graph_displaytime_seconds = graph_displaytime_minutes * 60
tick_time = time.time()
for data_type in sleep_data:
s_data = sleep_data[data_type]
cleaned_graph_data = []
old_graph_data = []
for avg_datum in s_data['averaged_data']:
datum_age = tick_time - datetime.timestamp(avg_datum['time'])
if datum_age < graph_displaytime_seconds:
cleaned_graph_data.append(avg_datum)
else:
old_graph_data.append(avg_datum)
s_data['averaged_data'] = cleaned_graph_data
def average_raw_data(tick_time):
global last_heartrate
timestamp = datetime.fromtimestamp(tick_time)
@ -142,7 +158,6 @@ def average_raw_data(tick_time):
csv_out[csv_header_field_name] = zero_to_nan(period_data_average)
s_data['averaged_data'].append(period_averages_dict)
write_csv([csv_out], 'avg')
@ -175,12 +190,13 @@ def zero_to_nan(value):
def update_graph_data():
for data_type in sleep_data:
s_data = sleep_data[data_type] # Re-referenced to shorten name
s_data = sleep_data[data_type]
avg_data = s_data['averaged_data']
if len(avg_data) > 1:
g_data = graph_data[data_type] # Re-referenced to short name
g_data = graph_data[data_type]
data_periods = s_data['periods']
starting_index = max([(len(g_data['time']) - 1), 0])
@ -207,13 +223,11 @@ def init_graph_data():
def graph_animation(i):
global graph_axes
global graph_data
plotflag = False
if len(graph_data) == 0:
init_graph_data()
flush_old_graph_data(graph_displaytime_minutes)
update_graph_data()
for data_type in graph_data:
@ -221,6 +235,7 @@ def graph_animation(i):
graph_axes.clear()
break
plotflag = False
for data_type in sleep_data:
s_data = sleep_data[data_type]
g_data = graph_data[data_type]
@ -237,7 +252,13 @@ def graph_animation(i):
plt.legend()
def init_graph():
def init_graph(graph_displaytime_mins=60, maximize=False):
global graph_displaytime_minutes
graph_displaytime_minutes = graph_displaytime_mins
if maximize:
figure_manager = plt.get_current_fig_manager()
figure_manager.full_screen_toggle()
ani = animation.FuncAnimation(graph_figure, graph_animation, interval=1000)
plt.show()