Broke vibration out into its own module.

This commit is contained in:
NateSchoolfield 2021-02-01 23:25:59 -08:00
parent 91d17efa96
commit 45e603270e
5 changed files with 205 additions and 99 deletions

View File

@ -1,31 +1,26 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from bluepy import btle import time, re, threading
from bluepy.btle import BTLEDisconnectError from bluepy.btle import BTLEDisconnectError
from miband import miband from miband import miband
import sleepdata import sleepdata, vibrate
import threading
import re
import random
import subprocess
import time
from datetime import datetime
auth_key_filename = 'auth_key.txt' auth_key_filename = 'auth_key.txt'
mac_filename = 'mac.txt' mac_filename = 'mac.txt'
csv_filename = "sleep_data.csv"
vibration_settings = {
'interval_minutes': 0.2,
'duration_seconds': 5,
'type': 'random'
}
band = None band = None
buzz_timer = time.time()
buzz_minutes = 45
buzz_delay = buzz_minutes * 60
#-------------------------------------------------------------------------# #-------------------------------------------------------------------------#
class regex_patterns(): class regex_patterns():
mac_regex_pattern = re.compile(r'([0-9a-fA-F]{2}(?::[0-9a-fA-F]{2}){5})') mac_regex_pattern = re.compile(r'([0-9a-fA-F]{2}(?::[0-9a-fA-F]{2}){5})')
authkey_regex_pattern = re.compile(r'([0-9a-fA-F]){32}') authkey_regex_pattern = re.compile(r'([0-9a-fA-F]){32}')
@ -60,67 +55,27 @@ def get_auth_key(filename):
exit(1) exit(1)
return AUTH_KEY return AUTH_KEY
def process_data(data, tick_time):
if data[0] == "GYRO":
sleepdata.process_gyro_data(data[1], tick_time)
elif data[0] == "HR":
sleepdata.process_heartrate_data(data[1], tick_time)
def average_data(tick_time): def average_data(tick_time):
if (tick_time - sleepdata.last_tick_time) >= sleepdata.tick_seconds: if (tick_time - sleepdata.last_tick_time) >= sleepdata.tick_seconds:
sleepdata.average_raw_data(tick_time) sleepdata.average_raw_data(tick_time)
sleepdata.last_tick_time = time.time() sleepdata.last_tick_time = time.time()
def timed_buzzing(buzz_delay, buzz_duration):
buzz_timer = time.time()
tick_time = time.time()
while True:
elapsed_time = tick_time - buzz_timer
if elapsed_time >= buzz_delay:
print("Buzz timer expired, buzzing")
vibrate_random(buzz_duration)
buzz_timer = tick_time
else:
tick_time = time.time()
time.sleep(1)
def generate_random_vibration_pattern(count):
pulse_pattern = []
pulse_range = [120, 240]
pulse_interval_range = [1, 8]
for _ in range(count):
buzz_pulse = random.randrange(pulse_range[0], pulse_range[1])
buzz_delay = random.randrange(pulse_interval_range[0], pulse_interval_range[1])/10
pulse_pattern.append([buzz_pulse, buzz_delay])
return pulse_pattern
def vibrate_random(duration):
print("Sending random vibration...")
duration_start = time.time()
pulse_pattern = generate_random_vibration_pattern(20)
while True:
if (time.time() - duration_start) >= duration:
print ("Stopping vibration")
band.vibrate(0)
break
else:
for pattern in pulse_pattern:
if (time.time() - duration_start) >= duration:
break
vibrate_ms = pattern[0]
vibro_delay = pattern[1]
band.vibrate(vibrate_ms)
time.sleep(vibro_delay)
def sleep_monitor_callback(data): def sleep_monitor_callback(data):
tick_time = time.time() tick_time = time.time()
if not sleepdata.last_tick_time: if not sleepdata.last_tick_time:
sleepdata.last_tick_time = time.time() sleepdata.last_tick_time = time.time()
process_data(data, tick_time)
if data[0] == "GYRO":
sleepdata.process_gyro_data(data[1], tick_time)
elif data[0] == "HR":
sleepdata.process_heartrate_data(data[1], tick_time)
average_data(tick_time) average_data(tick_time)
def connect(): def connect():
global band global band
success = False success = False
@ -134,6 +89,7 @@ def connect():
try: try:
band = miband(MAC_ADDR, AUTH_KEY, debug=True) band = miband(MAC_ADDR, AUTH_KEY, debug=True)
success = band.initialize() success = band.initialize()
vibrate.band = band
except BTLEDisconnectError: except BTLEDisconnectError:
print(msg.format(timeout)) print(msg.format(timeout))
time.sleep(timeout) time.sleep(timeout)
@ -141,6 +97,7 @@ def connect():
print("\nExit.") print("\nExit.")
exit() exit()
def start_data_pull(): def start_data_pull():
while True: while True:
try: try:
@ -149,37 +106,20 @@ def start_data_pull():
band.gyro_started_flag = False band.gyro_started_flag = False
connect() connect()
def vibrate_pattern(duration):
print("Sending vibration...")
duration_start = time.time()
pulse_pattern = [[30, 0.01], [60, 0.01], [90, 0.01], [120, 0.01], [150, 0.01], [180, 0.01]]
def start_vibration():
while True: while True:
if (time.time() - duration_start) >= duration: try:
print ("Stopping vibration") vibrate.timed_vibration(vibration_settings)
band.vibrate(0) except BTLEDisconnectError:
break print("Vibration thread waiting for band reconnect...")
else: time.sleep(1)
for pattern in pulse_pattern:
if (time.time() - duration_start) >= duration:
break
vibrate_ms = pattern[0]
vibro_delay = pattern[1]
band.vibrate(vibrate_ms)
time.sleep(vibro_delay)
def vibrate_rolling():
print("Sending rolling vibration...")
for x in range(10):
for x in range(20, 40, 1):
band.vibrate(x)
for x in range(40, 20, -1):
band.vibrate(x)
if __name__ == "__main__": if __name__ == "__main__":
connect() connect()
threading.Thread(target=start_data_pull).start() threading.Thread(target=start_data_pull).start()
threading.Thread(target=timed_buzzing, args=([buzz_delay, 15])).start() threading.Thread(target=start_vibration).start()
#sleepdata.init_graph() #sleepdata.init_graph()

View File

@ -3,6 +3,8 @@ class miband4():
class bytepatterns(): class bytepatterns():
vibration = 'ff{:02x}00000001' vibration = 'ff{:02x}00000001'
vibration_stop = 'ff0000000000'
gyro_start = '01{:02x}19' gyro_start = '01{:02x}19'
start = '0100' start = '0100'
stop = '0000' stop = '0000'
@ -19,7 +21,10 @@ class miband4():
auth_key_prefix = '0300' auth_key_prefix = '0300'
def vibration(duration): def vibration(duration):
byte_pattern = miband4.bytepatterns.vibration if duration == 0:
byte_pattern = miband4.bytepatterns.vibration_stop
else:
byte_pattern = miband4.bytepatterns.vibration
return bytes.fromhex(byte_pattern.format(duration)) return bytes.fromhex(byte_pattern.format(duration))
def gyro_start(sensitivity): def gyro_start(sensitivity):

View File

@ -224,13 +224,23 @@ class miband(Peripheral):
except Empty: except Empty:
break break
def vibrate(self, ms): def vibrate(self, value):
vibration_scaler = 0.75 if value == 255 or value == 0:
ms = min([round(ms / vibration_scaler), 255]) # '255' means 'continuous vibration'
sent_value = int(ms / 2) # I've arbitrarily assigned the otherwise pointless value of '0' to indicate 'stop_vibration'
vibration_duration = ms / 1000 # These modes do not require pulse timing to avoid strange behavior.
self.write_cmd(self._char_alert, bytepattern.vibration(sent_value), queued=True) self.write_cmd(self._char_alert, bytepattern.vibration(value), queued=True)
time.sleep(vibration_duration) else:
# A value of '150' will vibrate for ~200ms, hence vibration_scaler.
# This isn't exact however, but does leave a ~5ms gap between pulses.
# A scaler any lower causes the pulses to be indistinguishable from each other to a human.
# I considered making this function accept a desired amount of vibration time in ms,
# however it was fiddly and I couldn't get it right. More work could be done here.
vibration_scaler = 0.75
ms = round(value / vibration_scaler)
vibration_duration = ms / 1000
self.write_cmd(self._char_alert, bytepattern.vibration(value), queued=True)
time.sleep(vibration_duration)
def write_cmd(self, characteristic, data, response=False, queued=False): def write_cmd(self, characteristic, data, response=False, queued=False):
if queued: if queued:

View File

@ -26,9 +26,8 @@ sleep_data = {
} }
} }
last_heartrate = 0
last_tick_time = None
tick_seconds = 0.5 tick_seconds = 0.5
last_tick_time = None
datestamp = datetime.now().strftime("%Y_%m_%d") datestamp = datetime.now().strftime("%Y_%m_%d")
csv_header_name_format = '{}_{}' csv_header_name_format = '{}_{}'
@ -39,7 +38,7 @@ graph_figure = plt.figure()
graph_axes = graph_figure.add_subplot(1, 1, 1) graph_axes = graph_figure.add_subplot(1, 1, 1)
graph_data = {} graph_data = {}
last_heartrate = 0
class Average_Gyro_Data(): class Average_Gyro_Data():
gyro_last_x = 0 gyro_last_x = 0
@ -62,7 +61,6 @@ class Average_Gyro_Data():
return gyro_movement return gyro_movement
def write_csv(data, name): def write_csv(data, name):
fieldnames = ['time'] fieldnames = ['time']
for fieldname in data[0]: for fieldname in data[0]:
@ -108,6 +106,7 @@ def flush_old_raw_data(tick_time):
if old_raw_data: if old_raw_data:
write_csv(old_raw_data, 'raw') write_csv(old_raw_data, 'raw')
def average_raw_data(tick_time): def average_raw_data(tick_time):
global last_heartrate global last_heartrate
timestamp = datetime.fromtimestamp(tick_time) timestamp = datetime.fromtimestamp(tick_time)
@ -151,7 +150,7 @@ def process_gyro_data(gyro_data, tick_time):
sleep_move = sleep_data['movement'] sleep_move = sleep_data['movement']
value_name = sleep_move['value_name'] value_name = sleep_move['value_name']
gyro_movement = average_gyro_data.process(gyro_data) gyro_movement = average_gyro_data.process(gyro_data)
print("Gyro: {}".format(gyro_movement)) #print("Gyro: {}".format(gyro_movement))
sleep_move['raw_data'].append({ sleep_move['raw_data'].append({
'time': tick_time, 'time': tick_time,
value_name: gyro_movement value_name: gyro_movement

152
vibrate.py Normal file
View File

@ -0,0 +1,152 @@
import time
import random
import logging
band = None
# Notes:
# The miband4 does not (seem to) support different vibration intensities, rather the values sent (2-255)
# represent how long the vibration motor runs. A value of 30 roughly corresponds to 60ms of motor run time.
# Sending a value of 255 triggers continuous vibration.
# Currently "continuous" mode doesn't work, as it doesn't turn off.
# This will be fixed shortly.
if __name__ == 'vibrate':
FORMAT = '%(asctime)-15s %(name)s (%(levelname)s) > %(message)s'
logging.basicConfig(format=FORMAT)
vibration_log_level = logging.INFO
vibration_log = logging.getLogger(__name__)
vibration_log.setLevel(vibration_log_level)
def timed_vibration(settings):
interval_minutes = settings['interval_minutes']
duration_seconds = settings['duration_seconds']
type = settings['type']
buzz_timer = time.time()
tick_time = time.time()
buzz_delay = interval_minutes * 60
vibration_log.info("Starting vibration timer: {} minutes".format(interval_minutes))
if type not in ['random', 'pattern', 'rolling', 'continuous']:
vibration_log.warn("Invalid or no vibration type specified: {}".format(type))
vibration_log.warn("Must be one of these: random, pattern, rolling, continuous")
return
while True:
elapsed_time = tick_time - buzz_timer
if elapsed_time >= buzz_delay:
print("Buzz timer expired, buzzing")
if type == 'random':
vibrate_random(duration_seconds)
elif type == 'pattern':
vibrate_pattern(duration_seconds)
elif type == 'rolling':
vibrate_rolling(duration_seconds)
elif type == 'continuous':
vibrate_continuous(duration_seconds)
buzz_timer = tick_time
else:
tick_time = time.time()
time.sleep(0.5)
def generate_random_vibration_pattern(pulse_count):
#pulse_duration_range and pulse_interval_range_ms are arbitrary
pulse_duration_range = {
'low': 60,
'high': 100
}
pulse_interval_range_ms = {
'low': 100,
'high': 800
}
output_pulse_pattern = []
for _ in range(pulse_count):
pulse_duration = random.randrange(pulse_duration_range['low'], pulse_duration_range['high'])
pulse_interval = random.randrange(pulse_interval_range_ms['low'], pulse_interval_range_ms['high'])/1000
output_pulse_pattern.append([pulse_duration, pulse_interval])
return output_pulse_pattern
def vibrate_random(duration_seconds):
print("Sending random vibration...")
duration_start = time.time()
pattern_length = 20 #This value is arbitrary
pulse_pattern = generate_random_vibration_pattern(pattern_length)
while True:
if (time.time() - duration_start) >= duration_seconds:
print ("Stopping vibration")
band.vibrate(0)
break
else:
for pattern in pulse_pattern:
if (time.time() - duration_start) >= duration_seconds:
break
vibrate_ms = pattern[0]
vibro_delay = pattern[1]
band.vibrate(vibrate_ms)
time.sleep(vibro_delay)
def vibrate_pattern(duration_seconds):
print("Sending vibration...")
duration_start = time.time()
#This pattern is an example.
pulse_pattern = [[30, 0.01], [60, 0.01], [90, 0.01], [120, 0.01], [150, 0.01], [180, 0.01]]
while True:
if (time.time() - duration_start) >= duration_seconds:
print ("Stopping vibration")
band.vibrate(0)
break
else:
for pattern in pulse_pattern:
if (time.time() - duration_start) >= duration_seconds:
break
vibrate_ms = pattern[0]
vibro_delay = pattern[1]
band.vibrate(vibrate_ms)
time.sleep(vibro_delay)
def vibrate_rolling(duration_seconds):
print("Sending rolling vibration...")
duration_start = time.time()
while True:
if (time.time() - duration_start) >= duration_seconds:
print ("Stopping vibration")
band.vibrate(0)
break
else:
for x in range(10):
for x in range(20, 40, 1):
band.vibrate(x)
for x in range(40, 20, -1):
band.vibrate(x)
def vibrate_continuous(duration_seconds):
#Currently broken, still working on this bit.
print("Sending continuous vibration...")
duration_start = time.time()
while True:
if (time.time() - duration_start) >= duration_seconds:
print ("Stopping vibration")
band.vibrate(0)
break
else:
band.vibrate(1)