Moved bytepatterns into constants.py, tweaked random vibration intensity

This commit is contained in:
NateSchoolfield 2021-02-05 15:56:02 -08:00
parent f94f1af6d6
commit f1b9497f0d
4 changed files with 88 additions and 86 deletions

View File

@ -1,50 +0,0 @@
class miband4():
class bytepatterns():
vibration = 'ff{:02x}00000001'
vibration_stop = 'ff0000000000'
gyro_start = '01{:02x}19'
start = '0100'
stop = '0000'
heart_measure_keepalive = '16'
stop_heart_measure_continues = '150100'
start_heart_measure_continues = '150101'
stop_heart_measure_manual = '150200'
fetch_begin = '100101'
fetch_error = '100104'
fetch_continue = '100201'
fetch_complete = '100204'
auth_ok = '100301'
request_random_number = '0200'
auth_key_prefix = '0300'
def vibration(duration):
if duration == 0:
byte_pattern = miband4.bytepatterns.vibration_stop
else:
byte_pattern = miband4.bytepatterns.vibration
return bytes.fromhex(byte_pattern.format(duration))
def gyro_start(sensitivity):
byte_pattern = miband4.bytepatterns.gyro_start
return bytes.fromhex(byte_pattern.format(sensitivity))
start = bytes.fromhex(bytepatterns.start)
stop = bytes.fromhex(bytepatterns.stop)
heart_measure_keepalive = bytes.fromhex(bytepatterns.heart_measure_keepalive)
stop_heart_measure_continues = bytes.fromhex(bytepatterns.stop_heart_measure_continues)
start_heart_measure_continues = bytes.fromhex(bytepatterns.start_heart_measure_continues)
stop_heart_measure_manual = bytes.fromhex(bytepatterns.stop_heart_measure_manual)
fetch_begin = bytes.fromhex(bytepatterns.fetch_begin)
fetch_error = bytes.fromhex(bytepatterns.fetch_error)
fetch_continue = bytes.fromhex(bytepatterns.fetch_continue)
fetch_complete = bytes.fromhex(bytepatterns.fetch_complete)
auth_ok = bytes.fromhex(bytepatterns.auth_ok)
request_random_number = bytes.fromhex(bytepatterns.request_random_number)
auth_key_prefix = bytes.fromhex(bytepatterns.auth_key_prefix)

View File

@ -90,3 +90,58 @@ class QUEUE_TYPES(object):
RAW_HEART = 'raw_heart' RAW_HEART = 'raw_heart'
RAW_GYRO = 'raw_gyro' RAW_GYRO = 'raw_gyro'
AVG_GYRO = 'avg_gyro' AVG_GYRO = 'avg_gyro'
class BYTEPATTERNS():
__metaclass__ = Immutable
vibration_hex = 'ff{:02x}00000001'
vibration_stop_hex = 'ff0000000000'
gyro_start_hex = '01{:02x}19'
start_hex = '0100'
stop_hex = '0000'
heart_measure_keepalive_hex = '16'
stop_heart_measure_continues_hex = '150100'
start_heart_measure_continues_hex = '150101'
stop_heart_measure_manual_hex = '150200'
fetch_begin_hex = '100101'
fetch_error_hex = '100104'
fetch_continue_hex = '100201'
fetch_complete_hex = '100204'
auth_ok_hex = '100301'
request_random_number_hex = '0200'
auth_key_prefix_hex = '0300'
def vibration(duration):
if duration == 0:
byte_pattern = BYTEPATTERNS.vibration_stop_hex
else:
byte_pattern = BYTEPATTERNS.vibration_hex
return bytes.fromhex(byte_pattern.format(duration))
def gyro_start(sensitivity):
#sensitivity should be from 1 to 3
byte_pattern = BYTEPATTERNS.gyro_start_hex
return bytes.fromhex(byte_pattern.format(sensitivity))
start = bytes.fromhex(start_hex)
stop = bytes.fromhex(stop_hex)
heart_measure_keepalive = bytes.fromhex(heart_measure_keepalive_hex)
stop_heart_measure_continues = bytes.fromhex(stop_heart_measure_continues_hex)
start_heart_measure_continues = bytes.fromhex(start_heart_measure_continues_hex)
stop_heart_measure_manual = bytes.fromhex(stop_heart_measure_manual_hex)
fetch_begin = bytes.fromhex(fetch_begin_hex)
fetch_error = bytes.fromhex(fetch_error_hex)
fetch_continue = bytes.fromhex(fetch_continue_hex)
fetch_complete = bytes.fromhex(fetch_complete_hex)
auth_ok = bytes.fromhex(auth_ok_hex)
request_random_number = bytes.fromhex(request_random_number_hex)
auth_key_prefix = bytes.fromhex(auth_key_prefix_hex)

View File

@ -2,8 +2,6 @@ import sys, os, time
import logging import logging
import struct import struct
from bytepatterns import miband4 as bytepattern
from bluepy.btle import ( from bluepy.btle import (
Peripheral, DefaultDelegate, Peripheral, DefaultDelegate,
ADDR_TYPE_RANDOM, ADDR_TYPE_PUBLIC, ADDR_TYPE_RANDOM, ADDR_TYPE_PUBLIC,
@ -14,7 +12,7 @@ from Crypto.Cipher import AES
from datetime import datetime from datetime import datetime
from constants import ( from constants import (
UUIDS, AUTH_STATES, ALERT_TYPES, QUEUE_TYPES, MUSICSTATE UUIDS, AUTH_STATES, QUEUE_TYPES, BYTEPATTERNS
) )
from queue import Queue, Empty from queue import Queue, Empty
@ -29,16 +27,16 @@ class Delegate(DefaultDelegate):
def handleNotification(self, hnd, data): def handleNotification(self, hnd, data):
if hnd == self.device._char_auth.getHandle(): if hnd == self.device._char_auth.getHandle():
if data[:3] == bytepattern.fetch_begin: if data[:3] == BYTEPATTERNS.fetch_begin:
self.device._req_rdn() self.device._req_rdn()
elif data[:3] == bytepattern.fetch_error: elif data[:3] == BYTEPATTERNS.fetch_error:
self.device.state = AUTH_STATES.KEY_SENDING_FAILED self.device.state = AUTH_STATES.KEY_SENDING_FAILED
elif data[:3] == bytepattern.fetch_continue: elif data[:3] == BYTEPATTERNS.fetch_continue:
random_nr = data[3:] random_nr = data[3:]
self.device._send_enc_rdn(random_nr) self.device._send_enc_rdn(random_nr)
elif data[:3] == bytepattern.fetch_complete: elif data[:3] == BYTEPATTERNS.fetch_complete:
self.device.state = AUTH_STATES.REQUEST_RN_ERROR self.device.state = AUTH_STATES.REQUEST_RN_ERROR
elif data[:3] == bytepattern.auth_ok: elif data[:3] == BYTEPATTERNS.auth_ok:
self.device.state = AUTH_STATES.AUTH_OK self.device.state = AUTH_STATES.AUTH_OK
else: else:
self.device.state = AUTH_STATES.AUTH_FAILED self.device.state = AUTH_STATES.AUTH_FAILED
@ -53,7 +51,7 @@ class Delegate(DefaultDelegate):
print("Unhandled data on handle 0x38: {}".format(data)) print("Unhandled data on handle 0x38: {}".format(data))
elif hnd == self.device._char_hz.getHandle(): elif hnd == self.device._char_hz.getHandle():
if len(data) == 20 and struct.unpack('b', data[0:1])[0] == 1: if len(data) == 20 and struct.unpack('b', data[0:1])[0] == 1:
self.device.queue.put((QUEUE_TYPES.RAW_ACCEL, data)) self.device.queue.put((QUEUE_TYPES.RAW_GYRO, data))
elif len(data) == 11: elif len(data) == 11:
#print("Unknown data: {}".format(bytes.hex(data, " "))) #print("Unknown data: {}".format(bytes.hex(data, " ")))
#print(struct.unpack('BBBBBBBBBB', data[1:])) #print(struct.unpack('BBBBBBBBBB', data[1:]))
@ -136,10 +134,10 @@ class miband(Peripheral):
def _auth_notif(self, enabled): def _auth_notif(self, enabled):
if enabled: if enabled:
self._log.info("Enabling Auth Service notifications status...") self._log.info("Enabling Auth Service notifications status...")
self._desc_auth.write(bytepattern.start, True) self._desc_auth.write(BYTEPATTERNS.start, True)
elif not enabled: elif not enabled:
self._log.info("Disabling Auth Service notifications status...") self._log.info("Disabling Auth Service notifications status...")
self._desc_auth.write(bytepattern.stop, True) self._desc_auth.write(BYTEPATTERNS.stop, True)
else: else:
self._log.error("Something went wrong while changing the Auth Service notifications status...") self._log.error("Something went wrong while changing the Auth Service notifications status...")
@ -147,15 +145,15 @@ class miband(Peripheral):
def _auth_previews_data_notif(self, enabled): def _auth_previews_data_notif(self, enabled):
if enabled: if enabled:
self._log.info("Enabling Fetch Char notifications status...") self._log.info("Enabling Fetch Char notifications status...")
self._desc_fetch.write(bytepattern.start, True) self._desc_fetch.write(BYTEPATTERNS.start, True)
self._log.info("Enabling Activity Char notifications status...") self._log.info("Enabling Activity Char notifications status...")
self._desc_activity.write(bytepattern.start, True) self._desc_activity.write(BYTEPATTERNS.start, True)
self.activity_notif_enabled = True self.activity_notif_enabled = True
else: else:
self._log.info("Disabling Fetch Char notifications status...") self._log.info("Disabling Fetch Char notifications status...")
self._desc_fetch.write(bytepattern.stop, True) self._desc_fetch.write(BYTEPATTERNS.stop, True)
self._log.info("Disabling Activity Char notifications status...") self._log.info("Disabling Activity Char notifications status...")
self._desc_activity.write(bytepattern.stop, True) self._desc_activity.write(BYTEPATTERNS.stop, True)
self.activity_notif_enabled = False self.activity_notif_enabled = False
@ -176,13 +174,13 @@ class miband(Peripheral):
def _req_rdn(self): def _req_rdn(self):
self._log.info("Requesting random number...") self._log.info("Requesting random number...")
self._char_auth.write(bytepattern.request_random_number) self._char_auth.write(BYTEPATTERNS.request_random_number)
self.waitForNotifications(self.timeout) self.waitForNotifications(self.timeout)
def _send_enc_rdn(self, data): def _send_enc_rdn(self, data):
self._log.info("Sending encrypted random number") self._log.info("Sending encrypted random number")
cmd = bytepattern.auth_key_prefix + self._encrypt(data) cmd = BYTEPATTERNS.auth_key_prefix + self._encrypt(data)
send_cmd = struct.pack('<18s', cmd) send_cmd = struct.pack('<18s', cmd)
self._char_auth.write(send_cmd) self._char_auth.write(send_cmd)
self.waitForNotifications(self.timeout) self.waitForNotifications(self.timeout)
@ -211,7 +209,7 @@ class miband(Peripheral):
_type = queue_data[0] _type = queue_data[0]
if self.heart_measure_callback and _type == QUEUE_TYPES.HEART: if self.heart_measure_callback and _type == QUEUE_TYPES.HEART:
self.heart_measure_callback(self._parse_heart_measure(queue_data[1])) self.heart_measure_callback(self._parse_heart_measure(queue_data[1]))
elif self.gyro_raw_callback and _type == QUEUE_TYPES.RAW_ACCEL: elif self.gyro_raw_callback and _type == QUEUE_TYPES.RAW_GYRO:
self.gyro_raw_callback(self._parse_raw_gyro(queue_data[1])) self.gyro_raw_callback(self._parse_raw_gyro(queue_data[1]))
elif self.gyro_avg_callback and _type == QUEUE_TYPES.AVG_GYRO: elif self.gyro_avg_callback and _type == QUEUE_TYPES.AVG_GYRO:
self.gyro_avg_callback(self._parse_avg_gyro(queue_data[1])) self.gyro_avg_callback(self._parse_avg_gyro(queue_data[1]))
@ -270,7 +268,7 @@ class miband(Peripheral):
# '255' means 'continuous vibration' # '255' means 'continuous vibration'
# I've arbitrarily assigned the otherwise pointless value of '0' to indicate 'stop_vibration' # I've arbitrarily assigned the otherwise pointless value of '0' to indicate 'stop_vibration'
# These modes do not require pulse timing to avoid strange behavior. # These modes do not require pulse timing to avoid strange behavior.
self.write_cmd(self._char_alert, bytepattern.vibration(value), queued=True) self.write_cmd(self._char_alert, BYTEPATTERNS.vibration(value), queued=True)
else: else:
# A value of '150' will vibrate for ~200ms, hence vibration_scaler. # A value of '150' will vibrate for ~200ms, hence vibration_scaler.
# This isn't exact however, but does leave a ~5ms gap between pulses. # This isn't exact however, but does leave a ~5ms gap between pulses.
@ -280,7 +278,7 @@ class miband(Peripheral):
vibration_scaler = 0.75 vibration_scaler = 0.75
ms = round(value / vibration_scaler) ms = round(value / vibration_scaler)
vibration_duration = ms / 1000 vibration_duration = ms / 1000
self.write_cmd(self._char_alert, bytepattern.vibration(value), queued=True) self.write_cmd(self._char_alert, BYTEPATTERNS.vibration(value), queued=True)
time.sleep(vibration_duration) time.sleep(vibration_duration)
@ -306,26 +304,25 @@ class miband(Peripheral):
def send_gyro_start(self, sensitivity): def send_gyro_start(self, sensitivity):
if not self.gyro_started_flag: if not self.gyro_started_flag:
self._log.info("Starting gyro...") self._log.info("Starting gyro...")
self.write_req(self._sensor_handle, bytepattern.start) self.write_req(self._sensor_handle, BYTEPATTERNS.start)
self.write_req(self._steps_handle, bytepattern.start) self.write_req(self._steps_handle, BYTEPATTERNS.start)
self.write_req(self._hz_handle, bytepattern.start) self.write_req(self._hz_handle, BYTEPATTERNS.start)
self.gyro_started_flag = True self.gyro_started_flag = True
#self.write_cmd(self._char_sensor, bytepattern.gyro_start(sensitivity)) self.write_cmd(self._char_sensor, BYTEPATTERNS.gyro_start(sensitivity))
self.write_cmd(self._char_sensor, bytes.fromhex('010119')) self.write_req(self._sensor_handle, BYTEPATTERNS.stop)
self.write_req(self._sensor_handle, bytepattern.stop)
self.write_cmd(self._char_sensor, b'\x02') self.write_cmd(self._char_sensor, b'\x02')
def send_heart_measure_start(self): def send_heart_measure_start(self):
self._log.info("Starting heart measure...") self._log.info("Starting heart measure...")
self.write_cmd(self._char_heart_ctrl, bytepattern.stop_heart_measure_manual, response=True) self.write_cmd(self._char_heart_ctrl, BYTEPATTERNS.stop_heart_measure_manual, response=True)
self.write_cmd(self._char_heart_ctrl, bytepattern.stop_heart_measure_continues, response=True) self.write_cmd(self._char_heart_ctrl, BYTEPATTERNS.stop_heart_measure_continues, response=True)
self.write_req(self._heart_measure_handle, bytepattern.start) self.write_req(self._heart_measure_handle, BYTEPATTERNS.start)
self.write_cmd(self._char_heart_ctrl, bytepattern.start_heart_measure_continues, response=True) self.write_cmd(self._char_heart_ctrl, BYTEPATTERNS.start_heart_measure_continues, response=True)
def send_heart_measure_keepalive(self): def send_heart_measure_keepalive(self):
self.write_cmd(self._char_heart_ctrl, bytepattern.heart_measure_keepalive, response=True) self.write_cmd(self._char_heart_ctrl, BYTEPATTERNS.heart_measure_keepalive, response=True)
def start_heart_and_gyro(self, sensitivity, callback): def start_heart_and_gyro(self, sensitivity, callback):
@ -333,7 +330,7 @@ class miband(Peripheral):
self.gyro_raw_callback = callback self.gyro_raw_callback = callback
self.send_gyro_start(sensitivity) self.send_gyro_start(sensitivity)
#self.send_heart_measure_start() self.send_heart_measure_start()
heartbeat_time = time.time() heartbeat_time = time.time()
while True: while True:

View File

@ -7,7 +7,7 @@ band = None
# Notes: # Notes:
# The miband4 does not (seem to) support different vibration intensities, rather the values sent (2-255) # The miband4 does not (seem to) support different vibration intensities, rather the values sent (2-255)
# represent how long the vibration motor runs. A value of 30 roughly corresponds to 60ms of motor run time. # represent how long the vibration motor runs. A value of 30 roughly corresponds to 60ms of motor run time.
# Sending a value of 255 triggers continuous vibration. # Sending a value of 255 triggecd rs continuous vibration.
# Currently "continuous" mode doesn't work, as it doesn't turn off. # Currently "continuous" mode doesn't work, as it doesn't turn off.
# This will be fixed shortly. # This will be fixed shortly.
@ -59,8 +59,8 @@ def timed_vibration(settings):
def generate_random_vibration_pattern(pulse_count): def generate_random_vibration_pattern(pulse_count):
#pulse_duration_range and pulse_interval_range_ms are arbitrary #pulse_duration_range and pulse_interval_range_ms are arbitrary
pulse_duration_range = { pulse_duration_range = {
'low': 60, 'low': 80,
'high': 100 'high': 120
} }
pulse_interval_range_ms = { pulse_interval_range_ms = {
'low': 100, 'low': 100,