Browse Source

Resetting

pull/1/head
Nate Schoolfield 1 year ago
commit
a712460ce2
  1. 9
      .gitignore
  2. 30
      README
  3. 322
      bluesleep.py
  4. 90
      constants.py
  5. 318
      miband.py
  6. 2
      requirements.txt

9
.gitignore

@ -0,0 +1,9 @@
mac.txt
auth_key.txt
*.pyc
/__pycache__
*.swp
*.csv
*.wav
*.code-workspace
/.vscode

30
README

@ -0,0 +1,30 @@
BLESleep: A project to implement sleep tracking and nightmare interruption on Linux using the Mi Band 4 fitness tracker
It is inspired by the "Nightware" device, which is an FDA approved medical device: https://www.fda.gov/news-events/press-announcements/fda-permits-marketing-new-device-designed-reduce-sleep-disturbance-related-nightmares-certain-adults
This project is based on the satcar77/miband4 project.
USAGE:
You will need to create two .txt files in the base directory:
auth_key.txt: the authentication key for your miband 4. See https://github.com/argrento/huami-token for details on obtaining this.
mac.txt: the Bluetooth MAC address for your miband 4.
CURRENT STATUS:
Right now the project does not yet fulfill its purpose.
If you execute the project as-is, it will connect to the miband and plot HR and gyroscope data to the screen.
I'm currently working on massaging the returned data to create signals that can be usefully incorporated into a detection algorithm.
GOALS:
* Publish statistics to Google Sheets or other easy-to-use target. I'd like to make this usable by non-technical folks, so S3 buckets and CloudWatch are out of scope.
* Incorporate an LSTM neural network as the detection algorithm. This will require a dedicated "training period", as each user will likely produce different signals.
* Create a GUI dashboard to display sleep statistics and other controls. My vision for the end product is a standalone RasPi device with a 7" touchscreen display.
* Add support for multiple (cheap) fitness trackers. I'd like this work to be usable by low-income folks who can't afford Apple Watches.
* Create a framework to incorporate additional biometrics/signals as they become available. For example, SpO2, galvanic skin response, respiration, BP, etc.
* Create a holistic sleep-tracking interface to provide users the ability to clearly observe the effects of things like changes in routine, medication, diet, etc.
DISCLAIMER:
None of the statements on this web site have been evaluated by the FDA.
Furthermore, none of the statements herein should be construed as dispensing medical advice, or making claims regarding the cure or treatment of diseases.
These statements have not been evaluated by the Food and Drug Administration.
These project is not intended to diagnose, treat, cure, or prevent any diseases.

322
bluesleep.py

@ -0,0 +1,322 @@
#!/usr/bin/env python3
from bluepy import btle
from bluepy.btle import BTLEDisconnectError
from miband import miband
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import csv
import random
from os import path
import threading
import re
import subprocess
import time
from datetime import datetime
sleep_data = {
'heartrate': {
'value_name': 'bpm',
'periods': [5, 10, 15],
'raw_data': [],
'averaged_data': [],
},
'movement':{
'value_name': 'movement',
'periods': [10, 30, 60],
'raw_data': [],
'averaged_data': [],
'workspace': {
'gyro_last_x' : 0,
'gyro_last_y' : 0,
'gyro_last_z' : 0
}
}
}
auth_key_filename = 'auth_key.txt'
mac_filename = 'mac.txt'
csv_filename = "sleep_data.csv"
plt.style.use('dark_background')
graph_figure = plt.figure()
graph_axes = graph_figure.add_subplot(1, 1, 1)
graph_data = {}
last_tick_time = None
tick_seconds = 1
fieldnames = []
for data_type, _ in sleep_data.items():
periods = sleep_data[data_type]['periods']
for period in periods:
fieldnames.append(data_type + str(period))
#-------------------------------------------------------------------------#
def write_csv(data):
global fieldnames
global csv_filename
if not path.exists(csv_filename):
with open(csv_filename, 'w', newline='') as csvfile:
csv_writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
csv_writer.writeheader()
csv_writer.writerow(data)
else:
with open(csv_filename, 'a', newline='') as csvfile:
csv_writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
csv_writer.writerow(data)
def get_mac_address(filename):
mac_regex_pattern = re.compile(r'([0-9a-fA-F]{2}(?::[0-9a-fA-F]{2}){5})')
try:
with open(filename, "r") as f:
regex_match_from_file = re.search(mac_regex_pattern, f.read().strip())
if regex_match_from_file:
MAC_ADDR = regex_match_from_file[0]
else:
print ("No valid MAC address found in " + str(filename))
exit(1)
except FileNotFoundError:
print ("MAC file not found: " + filename)
exit(1)
return MAC_ADDR
def get_auth_key(filename):
authkey_regex_pattern = re.compile(r'([0-9a-fA-F]){32}')
try:
with open(filename, "r") as f:
regex_match_from_file = re.search(authkey_regex_pattern, f.read().strip())
if regex_match_from_file:
AUTH_KEY = bytes.fromhex(regex_match_from_file[0])
else:
print ("No valid auth key found in " + str(filename))
exit(1)
except FileNotFoundError:
print ("Auth key file not found: " + filename)
exit(1)
return AUTH_KEY
def process_heartrate_data(heartrate_data, tick_time):
print("BPM: " + str(heartrate_data))
if heartrate_data > 0:
value_name = sleep_data['heartrate']['value_name']
sleep_data['heartrate']['raw_data'].append({ 'time': tick_time, value_name: heartrate_data } )
def process_gyro_data(gyro_data, tick_time):
# Each gyro reading from miband4 comes over as a group of three, each containing x,y,z values
# This function summarizes the values into a single consolidated movement value
global sleep_data
gyro_last_x = sleep_data['movement']['workspace']['gyro_last_x']
gyro_last_y = sleep_data['movement']['workspace']['gyro_last_y']
gyro_last_z = sleep_data['movement']['workspace']['gyro_last_z']
value_name = sleep_data['movement']['value_name']
gyro_movement = 0
for gyro_datum in gyro_data:
gyro_delta_x = abs(gyro_datum['x'] - gyro_last_x)
gyro_last_x = gyro_datum['x']
gyro_delta_y = abs(gyro_datum['y'] - gyro_last_y)
gyro_last_y = gyro_datum['y']
gyro_delta_z = abs(gyro_datum['z'] - gyro_last_z)
gyro_last_z = gyro_datum['z']
gyro_delta_sum = gyro_delta_x + gyro_delta_y + gyro_delta_z
gyro_movement += gyro_delta_sum
sleep_data['movement']['workspace']['gyro_last_x'] = gyro_last_x
sleep_data['movement']['workspace']['gyro_last_y'] = gyro_last_y
sleep_data['movement']['workspace']['gyro_last_z'] = gyro_last_z
sleep_data['movement']['raw_data'].append({ 'time': tick_time, value_name: gyro_movement } )
def flush_old_raw_data(tick_time):
global sleep_data
for data_type, _ in sleep_data.items():
periods = sleep_data[data_type]['periods']
cleaned_raw_data = []
for raw_datum in sleep_data[data_type]['raw_data']:
datum_age = tick_time - raw_datum['time']
if datum_age < max(periods):
cleaned_raw_data.append(raw_datum)
sleep_data[data_type]['raw_data'] = cleaned_raw_data
def average_raw_data(tick_time):
global sleep_data
csv_out = {}
timestamp = datetime.fromtimestamp(tick_time)
for data_type, _ in sleep_data.items():
period_averages_dict = {}
period_averages_dict['time'] = timestamp
periods = sleep_data[data_type]['periods']
value_name = sleep_data[data_type]['value_name']
flush_old_raw_data(tick_time)
for period_seconds in periods:
period_data = []
period_averages_dict[period_seconds] = 0
for raw_datum in sleep_data[data_type]['raw_data']:
datum_age_seconds = tick_time - raw_datum['time']
if datum_age_seconds < period_seconds:
period_data.append(raw_datum[value_name])
if len(period_data) > 0:
period_data_average = sum(period_data) / len(period_data)
else:
print ("(" + data_type + ") Period data empty: " + str(period_seconds))
period_data_average = 0
period_averages_dict[period_seconds] = zero_to_nan(period_data_average)
csv_out[data_type + str(period_seconds)] = zero_to_nan(period_data_average)
sleep_data[data_type]['averaged_data'].append(period_averages_dict)
write_csv(csv_out)
def zero_to_nan(value):
if value == 0:
return (float('nan'))
else:
return int(value)
def sleep_monitor_callback(data):
global sleep_data
global last_tick_time
tick_time = time.time()
if not last_tick_time:
last_tick_time = time.time()
if data[0] == "GYRO":
process_gyro_data(data[1], tick_time)
if data[0] == "HR":
process_heartrate_data(data[1], tick_time)
if (tick_time - last_tick_time) >= tick_seconds:
average_raw_data(tick_time)
last_tick_time = time.time()
def init_graph_data():
for data_type, _ in sleep_data.items():
data_periods = sleep_data[data_type]['periods']
graph_data[data_type] = {
'time': [],
'data': {}
}
for period in data_periods:
graph_data[data_type]['data'][period] = []
def update_graph_data():
global sleep_data
global graph_data
for data_type, _ in sleep_data.items():
if len(sleep_data[data_type]['averaged_data']) > 1:
data_periods = sleep_data[data_type]['periods']
starting_index = max([(len(graph_data[data_type]['time']) - 1), 0])
ending_index = len(sleep_data[data_type]['averaged_data']) - 1
for sleep_datum in sleep_data[data_type]['averaged_data'][starting_index:ending_index]:
graph_data[data_type]['time'].append(sleep_datum['time'])
for period in data_periods:
graph_data[data_type]['data'][period].append(sleep_datum[period])
def graph_animation(i):
global sleep_data
global graph_axes
global graph_data
plotflag = False
if len(graph_data) == 0:
init_graph_data()
update_graph_data()
for data_type, _ in graph_data.items():
if len(graph_data[data_type]['time']) > 0:
graph_axes.clear()
break
for data_type, _ in sleep_data.items():
if len(graph_data[data_type]['time']) > 0:
plotflag = True
data_periods = sleep_data[data_type]['periods']
for period in data_periods:
axis_label = sleep_data[data_type]['value_name'] + " " + str(period) + "sec"
graph_axes.plot(graph_data[data_type]['time'], graph_data[data_type]['data'][period], label=axis_label)
if plotflag:
plt.legend()
def connect():
global band
global mac_filename
global auth_key_filename
success = False
MAC_ADDR = get_mac_address(mac_filename)
AUTH_KEY = get_auth_key(auth_key_filename)
while not success:
try:
band = miband(MAC_ADDR, AUTH_KEY, debug=True)
success = band.initialize()
break
except BTLEDisconnectError:
print('Connection to the MIBand failed. Trying out again in 3 seconds')
time.sleep(3)
continue
except KeyboardInterrupt:
print("\nExit.")
exit()
def start_data_pull():
global band
while True:
try:
band.start_heart_and_gyro(callback=sleep_monitor_callback)
except BTLEDisconnectError:
band.gyro_started_flag = False
connect()
if __name__ == "__main__":
connect()
data_gather_thread = threading.Thread(target=start_data_pull)
data_gather_thread.start()
ani = animation.FuncAnimation(graph_figure, graph_animation, interval=1000)
plt.show()
#import simpleaudio as sa
# comfort_wav = 'comfort.wav'
# wave_obj = sa.WaveObject.from_wave_file(comfort_wav)
# comfort_delay = 30
# comfort_lasttime = time.time()

90
constants.py

@ -0,0 +1,90 @@
___all__ = ['UUIDS']
class Immutable(type):
def __call__(*args):
raise Exception("You can't create instance of immutable object")
def __setattr__(*args):
raise Exception("You can't modify immutable object")
class UUIDS(object):
__metaclass__ = Immutable
BASE = "0000%s-0000-1000-8000-00805f9b34fb"
SERVICE_MIBAND1 = BASE % 'fee0'
SERVICE_MIBAND2 = BASE % 'fee1'
SERVICE_ALERT = BASE % '1802'
SERVICE_ALERT_NOTIFICATION = BASE % '1811'
SERVICE_HEART_RATE = BASE % '180d'
SERVICE_DEVICE_INFO = BASE % '180a'
CHARACTERISTIC_HZ = "00000002-0000-3512-2118-0009af100700"
CHARACTERISTIC_SENSOR = "00000001-0000-3512-2118-0009af100700"
CHARACTERISTIC_AUTH = "00000009-0000-3512-2118-0009af100700"
CHARACTERISTIC_HEART_RATE_MEASURE = "00002a37-0000-1000-8000-00805f9b34fb"
CHARACTERISTIC_HEART_RATE_CONTROL = "00002a39-0000-1000-8000-00805f9b34fb"
CHARACTERISTIC_ALERT = "00002a06-0000-1000-8000-00805f9b34fb"
CHARACTERISTIC_CUSTOM_ALERT = "00002a46-0000-1000-8000-00805f9b34fb"
CHARACTERISTIC_BATTERY = "00000006-0000-3512-2118-0009af100700"
CHARACTERISTIC_STEPS = "00000007-0000-3512-2118-0009af100700"
CHARACTERISTIC_LE_PARAMS = BASE % "FF09"
CHARACTERISTIC_REVISION = 0x2a28
CHARACTERISTIC_SERIAL = 0x2a25
CHARACTERISTIC_HRDW_REVISION = 0x2a27
CHARACTERISTIC_CONFIGURATION = "00000003-0000-3512-2118-0009af100700"
CHARACTERISTIC_DEVICEEVENT = "00000010-0000-3512-2118-0009af100700"
CHARACTERISTIC_CHUNKED_TRANSFER = "00000020-0000-3512-2118-0009af100700"
CHARACTERISTIC_MUSIC_NOTIFICATION = "00000010-0000-3512-2118-0009af100700"
CHARACTERISTIC_CURRENT_TIME = BASE % '2A2B'
CHARACTERISTIC_AGE = BASE % '2A80'
CHARACTERISTIC_USER_SETTINGS = "00000008-0000-3512-2118-0009af100700"
CHARACTERISTIC_ACTIVITY_DATA = "00000005-0000-3512-2118-0009af100700"
CHARACTERISTIC_FETCH = "00000004-0000-3512-2118-0009af100700"
NOTIFICATION_DESCRIPTOR = 0x2902
# Device Firmware Update
SERVICE_DFU_FIRMWARE = "00001530-0000-3512-2118-0009af100700"
CHARACTERISTIC_DFU_FIRMWARE = "00001531-0000-3512-2118-0009af100700"
CHARACTERISTIC_DFU_FIRMWARE_WRITE = "00001532-0000-3512-2118-0009af100700"
class AUTH_STATES(object):
__metaclass__ = Immutable
AUTH_OK = "Auth ok"
AUTH_FAILED = "Auth failed"
ENCRIPTION_KEY_FAILED = "Encryption key auth fail, sending new key"
KEY_SENDING_FAILED = "Key sending failed"
REQUEST_RN_ERROR = "Something went wrong when requesting the random number"
class ALERT_TYPES(object):
__metaclass__ = Immutable
NONE = '\x00'
MESSAGE = '\x01'
PHONE = '\x02'
class MUSICSTATE(object):
__metaclass__ = Immutable
PLAYED = 0
PAUSED = 1
class QUEUE_TYPES(object):
__metaclass__ = Immutable
HEART = 'heart'
RAW_ACCEL = 'raw_accel'
RAW_HEART = 'raw_heart'

318
miband.py

@ -0,0 +1,318 @@
import sys,os,time
import logging
from bluepy.btle import Peripheral, DefaultDelegate, ADDR_TYPE_RANDOM,ADDR_TYPE_PUBLIC, BTLEException, BTLEDisconnectError
from constants import UUIDS, AUTH_STATES, ALERT_TYPES, QUEUE_TYPES, MUSICSTATE
import struct
from datetime import datetime, timedelta
from Crypto.Cipher import AES
from datetime import datetime
try:
from Queue import Queue, Empty
except ImportError:
from queue import Queue, Empty
try:
xrange
except NameError:
xrange = range
class Delegate(DefaultDelegate):
def __init__(self, device):
DefaultDelegate.__init__(self)
self.device = device
self.pkg = 0
def handleNotification(self, hnd, data):
if hnd == self.device._char_auth.getHandle():
if data[:3] == b'\x10\x01\x01':
self.device._req_rdn()
elif data[:3] == b'\x10\x01\x04':
self.device.state = AUTH_STATES.KEY_SENDING_FAILED
elif data[:3] == b'\x10\x02\x01':
# 16 bytes
random_nr = data[3:]
self.device._send_enc_rdn(random_nr)
elif data[:3] == b'\x10\x02\x04':
self.device.state = AUTH_STATES.REQUEST_RN_ERROR
elif data[:3] == b'\x10\x03\x01':
self.device.state = AUTH_STATES.AUTH_OK
else:
self.device.state = AUTH_STATES.AUTH_FAILED
elif hnd == self.device._char_heart_measure.getHandle():
self.device.queue.put((QUEUE_TYPES.HEART, data))
elif hnd == 0x38:
if len(data) == 20 and struct.unpack('b', data[0:1])[0] == 1:
self.device.queue.put((QUEUE_TYPES.RAW_ACCEL, data))
elif len(data) == 16:
self.device.queue.put((QUEUE_TYPES.RAW_HEART, data))
# The fetch characteristic controls the communication with the activity characteristic.
elif hnd == self.device._char_fetch.getHandle():
if data[:3] == b'\x10\x01\x01':
# get timestamp from what date the data actually is received
year = struct.unpack("<H", data[7:9])[0]
month = struct.unpack("b", data[9:10])[0]
day = struct.unpack("b", data[10:11])[0]
hour = struct.unpack("b", data[11:12])[0]
minute = struct.unpack("b", data[12:13])[0]
self.device.first_timestamp = datetime(year, month, day, hour, minute)
print("Fetch data from {}-{}-{} {}:{}".format(year, month, day, hour, minute))
self.pkg = 0 #reset the packing index
self.device._char_fetch.write(b'\x02', False)
elif data[:3] == b'\x10\x02\x01':
if self.device.last_timestamp > self.device.end_timestamp - timedelta(minutes=1):
print("Finished fetching")
return
print("Trigger more communication")
time.sleep(1)
t = self.device.last_timestamp + timedelta(minutes=1)
self.device.start_get_previews_data(t)
elif data[:3] == b'\x10\x02\x04':
print("No more activity fetch possible")
return
else:
print("Unexpected data on handle " + str(hnd) + ": " + str(data))
return
elif hnd == self.device._char_activity.getHandle():
if len(data) % 4 == 1:
self.pkg += 1
i = 1
while i < len(data):
index = int(self.pkg) * 4 + (i - 1) / 4
timestamp = self.device.first_timestamp + timedelta(minutes=index)
self.device.last_timestamp = timestamp
category = struct.unpack("<B", data[i:i + 1])[0]
intensity = struct.unpack("B", data[i + 1:i + 2])[0]
steps = struct.unpack("B", data[i + 2:i + 3])[0]
heart_rate = struct.unpack("B", data[i + 3:i + 4])[0]
if timestamp < self.device.end_timestamp:
self.device.activity_callback(timestamp,category,intensity,steps,heart_rate)
i += 4
elif hnd == self.device._char_hz.getHandle():
if len(data) == 20 and struct.unpack('b', data[0:1])[0] == 1:
self.device.queue.put((QUEUE_TYPES.RAW_ACCEL, data))
else:
print ("Unhandled handle: " + str(hnd) + " | Data: " + str(data))
class miband(Peripheral):
_send_rnd_cmd = struct.pack('<2s', b'\x02\x00')
_send_enc_key = struct.pack('<2s', b'\x03\x00')
def __init__(self, mac_address,key=None, timeout=0.5, debug=False):
FORMAT = '%(asctime)-15s %(name)s (%(levelname)s) > %(message)s'
logging.basicConfig(format=FORMAT)
log_level = logging.WARNING if not debug else logging.DEBUG
self._log = logging.getLogger(self.__class__.__name__)
self._log.setLevel(log_level)
self._log.info('Connecting to ' + mac_address)
Peripheral.__init__(self, mac_address, addrType=ADDR_TYPE_PUBLIC)
self._log.info('Connected')
if not key:
self.setSecurityLevel(level = "medium")
self.timeout = timeout
self.mac_address = mac_address
self.state = None
self.heart_measure_callback = None
self.heart_raw_callback = None
self.gyro_raw_callback = None
self.auth_key = key
self.queue = Queue()
self.gyro_started_flag = False
self.start_bytes = b'\x01\x00'
self.stop_bytes = b"\x00\x00"
self.gyro_sensitivity = 1
self.svc_1 = self.getServiceByUUID(UUIDS.SERVICE_MIBAND1)
self.svc_2 = self.getServiceByUUID(UUIDS.SERVICE_MIBAND2)
self.svc_heart = self.getServiceByUUID(UUIDS.SERVICE_HEART_RATE)
self.svc_alert = self.getServiceByUUID(UUIDS.SERVICE_ALERT)
self._char_alert = self.svc_alert.getCharacteristics(UUIDS.CHARACTERISTIC_ALERT)[0]
self._char_auth = self.svc_2.getCharacteristics(UUIDS.CHARACTERISTIC_AUTH)[0]
self._desc_auth = self._char_auth.getDescriptors(forUUID=UUIDS.NOTIFICATION_DESCRIPTOR)[0]
self._char_heart_ctrl = self.svc_heart.getCharacteristics(UUIDS.CHARACTERISTIC_HEART_RATE_CONTROL)[0]
self._char_heart_measure = self.svc_heart.getCharacteristics(UUIDS.CHARACTERISTIC_HEART_RATE_MEASURE)[0]
self._heart_measure_handle = self._char_heart_measure.getHandle() + 1
# Recorded information
self._char_fetch = self.getCharacteristics(uuid=UUIDS.CHARACTERISTIC_FETCH)[0]
self._desc_fetch = self._char_fetch.getDescriptors(forUUID=UUIDS.NOTIFICATION_DESCRIPTOR)[0]
self._char_activity = self.getCharacteristics(uuid=UUIDS.CHARACTERISTIC_ACTIVITY_DATA)[0]
self._desc_activity = self._char_activity.getDescriptors(forUUID=UUIDS.NOTIFICATION_DESCRIPTOR)[0]
# Sensor characteristics and handles/descriptors
self._char_hz = self.svc_1.getCharacteristics(UUIDS.CHARACTERISTIC_HZ)[0]
self._hz_handle = self._char_hz.getHandle() + 1
self._char_sensor = self.svc_1.getCharacteristics(UUIDS.CHARACTERISTIC_SENSOR)[0]
self._sensor_handle = self._char_sensor.getHandle() + 1
self._char_steps = self.svc_1.getCharacteristics(UUIDS.CHARACTERISTIC_STEPS)[0]
self._steps_handle = self._char_steps.getHandle() + 1
self._auth_notif(True)
self.activity_notif_enabled = False
self.waitForNotifications(0.1)
self.setDelegate( Delegate(self) )
def generateAuthKey(self):
if(self.auth_key):
return struct.pack('<18s',b'\x01\x00'+ self.auth_key)
def _auth_notif(self, enabled):
if enabled:
self._log.info("Enabling Auth Service notifications status...")
self._desc_auth.write(self.start_bytes, True)
elif not enabled:
self._log.info("Disabling Auth Service notifications status...")
self._desc_auth.write(self.stop_bytes, True)
else:
self._log.error("Something went wrong while changing the Auth Service notifications status...")
def _auth_previews_data_notif(self, enabled):
if enabled:
self._log.info("Enabling Fetch Char notifications status...")
self._desc_fetch.write(self.start_bytes, True)
self._log.info("Enabling Activity Char notifications status...")
self._desc_activity.write(self.start_bytes, True)
self.activity_notif_enabled = True
else:
self._log.info("Disabling Fetch Char notifications status...")
self._desc_fetch.write(self.stop_bytes, True)
self._log.info("Disabling Activity Char notifications status...")
self._desc_activity.write(self.stop_bytes, True)
self.activity_notif_enabled = False
def initialize(self):
self._req_rdn()
while True:
self.waitForNotifications(0.1)
if self.state == AUTH_STATES.AUTH_OK:
self._log.info('Initialized')
self._auth_notif(False)
return True
elif self.state is None:
continue
self._log.error(self.state)
return False
def _req_rdn(self):
self._log.info("Requesting random number...")
self._char_auth.write(self._send_rnd_cmd)
self.waitForNotifications(self.timeout)
def _send_enc_rdn(self, data):
self._log.info("Sending encrypted random number")
cmd = self._send_enc_key + self._encrypt(data)
send_cmd = struct.pack('<18s', cmd)
self._char_auth.write(send_cmd)
self.waitForNotifications(self.timeout)
def _encrypt(self, message):
aes = AES.new(self.auth_key, AES.MODE_ECB)
return aes.encrypt(message)
def _get_from_queue(self, _type):
try:
res = self.queue.get(False)
except Empty:
return None
if res[0] != _type:
self.queue.put(res)
return None
return res[1]
def _parse_queue(self):
while True:
try:
res = self.queue.get(False)
_type = res[0]
if self.heart_measure_callback and _type == QUEUE_TYPES.HEART:
self.heart_measure_callback(self._parse_heart_measure(res[1]))
elif self.gyro_raw_callback and _type == QUEUE_TYPES.RAW_ACCEL:
self.gyro_raw_callback(self._parse_raw_gyro(res[1]))
except Empty:
break
def _parse_heart_measure(self, bytes):
res = struct.unpack('bb', bytes)[1]
return_tuple = ["HR", res]
return return_tuple
def _parse_raw_gyro(self, bytes):
res = []
for i in xrange(3):
g = struct.unpack('hhh', bytes[2 + i * 6:8 + i * 6])
res.append({'x': g[0], 'y': g[1], 'z': g[2]})
return_tuple = ["GYRO", res]
return return_tuple
def send_vibration(self, duration):
duration_time = time.time()
pulse_time = time.time()
vibro_start_value = 30
#pulse_value = 100
duration = 20
vibro_current_value = vibro_start_value
while True:
if (time.time() - duration_time) >= duration:
print ("Stopping vibration")
self._char_alert.write(b'\x00\x00\x00\x00\x00\x00', withResponse=False)
break
else:
if ((time.time() - pulse_time)*1000) >= vibro_current_value:
pulse_time = time.time()
self._char_alert.write(b'\xff' + (vibro_current_value).to_bytes(1, 'big') + b'\x00\x00\x00\x01', withResponse=False)
vibro_current_value += 1
print (vibro_current_value)
if vibro_current_value > 255:
vibro_current_value = vibro_start_value
def send_gyro_start(self):
if not self.gyro_started_flag:
self._log.info("Starting gyro...")
self.writeCharacteristic(self._sensor_handle, self.start_bytes, withResponse=True)
self.writeCharacteristic(self._steps_handle, self.start_bytes, withResponse=True)
self.writeCharacteristic(self._hz_handle, self.start_bytes, withResponse=True)
self.gyro_started_flag = True
self._char_sensor.write(b'\x01' + bytes([self.gyro_sensitivity]) + b'\x19', withResponse=False)
self.writeCharacteristic(self._sensor_handle, self.stop_bytes, withResponse=True)
self._char_sensor.write(b'\x02', withResponse=False)
def send_heart_measure_start(self):
self._log.info("Starting heart measure...")
# stop heart monitor continues & manual
self._char_heart_ctrl.write(b'\x15\x02\x00', True)
self._char_heart_ctrl.write(b'\x15\x01\x00', True)
# enable heart monitor notifications
self.writeCharacteristic(self._heart_measure_handle, self.start_bytes, withResponse=True)
# start heart monitor continues
self._char_heart_ctrl.write(b'\x15\x01\x01', True)
def send_heart_measure_keepalive(self):
self._char_heart_ctrl.write(b'\x16', True)
def start_heart_and_gyro(self, callback):
self.heart_measure_callback = callback
self.gyro_raw_callback = callback
self.send_gyro_start()
self.send_heart_measure_start()
heartbeat_time = time.time()
while True:
self.waitForNotifications(0.5)
self._parse_queue()
if (time.time() - heartbeat_time) >= 12:
heartbeat_time = time.time()
self.send_heart_measure_keepalive()
self.send_gyro_start()

2
requirements.txt

@ -0,0 +1,2 @@
bluepy
pycrypto
Loading…
Cancel
Save