diff --git a/bytepatterns.py b/bytepatterns.py deleted file mode 100644 index f9f2429..0000000 --- a/bytepatterns.py +++ /dev/null @@ -1,50 +0,0 @@ - -class miband4(): - - class bytepatterns(): - vibration = 'ff{:02x}00000001' - vibration_stop = 'ff0000000000' - - gyro_start = '01{:02x}19' - start = '0100' - stop = '0000' - heart_measure_keepalive = '16' - stop_heart_measure_continues = '150100' - start_heart_measure_continues = '150101' - stop_heart_measure_manual = '150200' - fetch_begin = '100101' - fetch_error = '100104' - fetch_continue = '100201' - fetch_complete = '100204' - auth_ok = '100301' - request_random_number = '0200' - auth_key_prefix = '0300' - - def vibration(duration): - if duration == 0: - byte_pattern = miband4.bytepatterns.vibration_stop - else: - byte_pattern = miband4.bytepatterns.vibration - return bytes.fromhex(byte_pattern.format(duration)) - - def gyro_start(sensitivity): - byte_pattern = miband4.bytepatterns.gyro_start - return bytes.fromhex(byte_pattern.format(sensitivity)) - - start = bytes.fromhex(bytepatterns.start) - stop = bytes.fromhex(bytepatterns.stop) - - heart_measure_keepalive = bytes.fromhex(bytepatterns.heart_measure_keepalive) - stop_heart_measure_continues = bytes.fromhex(bytepatterns.stop_heart_measure_continues) - start_heart_measure_continues = bytes.fromhex(bytepatterns.start_heart_measure_continues) - stop_heart_measure_manual = bytes.fromhex(bytepatterns.stop_heart_measure_manual) - - fetch_begin = bytes.fromhex(bytepatterns.fetch_begin) - fetch_error = bytes.fromhex(bytepatterns.fetch_error) - fetch_continue = bytes.fromhex(bytepatterns.fetch_continue) - fetch_complete = bytes.fromhex(bytepatterns.fetch_complete) - - auth_ok = bytes.fromhex(bytepatterns.auth_ok) - request_random_number = bytes.fromhex(bytepatterns.request_random_number) - auth_key_prefix = bytes.fromhex(bytepatterns.auth_key_prefix) - \ No newline at end of file diff --git a/constants.py b/constants.py index 3692015..f566dee 100644 --- a/constants.py +++ b/constants.py @@ -89,4 +89,59 @@ class QUEUE_TYPES(object): RAW_ACCEL = 'raw_accel' RAW_HEART = 'raw_heart' RAW_GYRO = 'raw_gyro' - AVG_GYRO = 'avg_gyro' \ No newline at end of file + AVG_GYRO = 'avg_gyro' + + +class BYTEPATTERNS(): + + __metaclass__ = Immutable + + vibration_hex = 'ff{:02x}00000001' + vibration_stop_hex = 'ff0000000000' + + gyro_start_hex = '01{:02x}19' + start_hex = '0100' + stop_hex = '0000' + + heart_measure_keepalive_hex = '16' + stop_heart_measure_continues_hex = '150100' + start_heart_measure_continues_hex = '150101' + stop_heart_measure_manual_hex = '150200' + + fetch_begin_hex = '100101' + fetch_error_hex = '100104' + fetch_continue_hex = '100201' + fetch_complete_hex = '100204' + + auth_ok_hex = '100301' + request_random_number_hex = '0200' + auth_key_prefix_hex = '0300' + + def vibration(duration): + if duration == 0: + byte_pattern = BYTEPATTERNS.vibration_stop_hex + else: + byte_pattern = BYTEPATTERNS.vibration_hex + return bytes.fromhex(byte_pattern.format(duration)) + + def gyro_start(sensitivity): + #sensitivity should be from 1 to 3 + byte_pattern = BYTEPATTERNS.gyro_start_hex + return bytes.fromhex(byte_pattern.format(sensitivity)) + + start = bytes.fromhex(start_hex) + stop = bytes.fromhex(stop_hex) + + heart_measure_keepalive = bytes.fromhex(heart_measure_keepalive_hex) + stop_heart_measure_continues = bytes.fromhex(stop_heart_measure_continues_hex) + start_heart_measure_continues = bytes.fromhex(start_heart_measure_continues_hex) + stop_heart_measure_manual = bytes.fromhex(stop_heart_measure_manual_hex) + + fetch_begin = bytes.fromhex(fetch_begin_hex) + fetch_error = bytes.fromhex(fetch_error_hex) + fetch_continue = bytes.fromhex(fetch_continue_hex) + fetch_complete = bytes.fromhex(fetch_complete_hex) + + auth_ok = bytes.fromhex(auth_ok_hex) + request_random_number = bytes.fromhex(request_random_number_hex) + auth_key_prefix = bytes.fromhex(auth_key_prefix_hex) \ No newline at end of file diff --git a/miband.py b/miband.py index fbc215d..b574ebf 100644 --- a/miband.py +++ b/miband.py @@ -2,8 +2,6 @@ import sys, os, time import logging import struct -from bytepatterns import miband4 as bytepattern - from bluepy.btle import ( Peripheral, DefaultDelegate, ADDR_TYPE_RANDOM, ADDR_TYPE_PUBLIC, @@ -14,7 +12,7 @@ from Crypto.Cipher import AES from datetime import datetime from constants import ( - UUIDS, AUTH_STATES, ALERT_TYPES, QUEUE_TYPES, MUSICSTATE + UUIDS, AUTH_STATES, QUEUE_TYPES, BYTEPATTERNS ) from queue import Queue, Empty @@ -29,16 +27,16 @@ class Delegate(DefaultDelegate): def handleNotification(self, hnd, data): if hnd == self.device._char_auth.getHandle(): - if data[:3] == bytepattern.fetch_begin: + if data[:3] == BYTEPATTERNS.fetch_begin: self.device._req_rdn() - elif data[:3] == bytepattern.fetch_error: + elif data[:3] == BYTEPATTERNS.fetch_error: self.device.state = AUTH_STATES.KEY_SENDING_FAILED - elif data[:3] == bytepattern.fetch_continue: + elif data[:3] == BYTEPATTERNS.fetch_continue: random_nr = data[3:] self.device._send_enc_rdn(random_nr) - elif data[:3] == bytepattern.fetch_complete: + elif data[:3] == BYTEPATTERNS.fetch_complete: self.device.state = AUTH_STATES.REQUEST_RN_ERROR - elif data[:3] == bytepattern.auth_ok: + elif data[:3] == BYTEPATTERNS.auth_ok: self.device.state = AUTH_STATES.AUTH_OK else: self.device.state = AUTH_STATES.AUTH_FAILED @@ -53,7 +51,7 @@ class Delegate(DefaultDelegate): print("Unhandled data on handle 0x38: {}".format(data)) elif hnd == self.device._char_hz.getHandle(): if len(data) == 20 and struct.unpack('b', data[0:1])[0] == 1: - self.device.queue.put((QUEUE_TYPES.RAW_ACCEL, data)) + self.device.queue.put((QUEUE_TYPES.RAW_GYRO, data)) elif len(data) == 11: #print("Unknown data: {}".format(bytes.hex(data, " "))) #print(struct.unpack('BBBBBBBBBB', data[1:])) @@ -136,10 +134,10 @@ class miband(Peripheral): def _auth_notif(self, enabled): if enabled: self._log.info("Enabling Auth Service notifications status...") - self._desc_auth.write(bytepattern.start, True) + self._desc_auth.write(BYTEPATTERNS.start, True) elif not enabled: self._log.info("Disabling Auth Service notifications status...") - self._desc_auth.write(bytepattern.stop, True) + self._desc_auth.write(BYTEPATTERNS.stop, True) else: self._log.error("Something went wrong while changing the Auth Service notifications status...") @@ -147,15 +145,15 @@ class miband(Peripheral): def _auth_previews_data_notif(self, enabled): if enabled: self._log.info("Enabling Fetch Char notifications status...") - self._desc_fetch.write(bytepattern.start, True) + self._desc_fetch.write(BYTEPATTERNS.start, True) self._log.info("Enabling Activity Char notifications status...") - self._desc_activity.write(bytepattern.start, True) + self._desc_activity.write(BYTEPATTERNS.start, True) self.activity_notif_enabled = True else: self._log.info("Disabling Fetch Char notifications status...") - self._desc_fetch.write(bytepattern.stop, True) + self._desc_fetch.write(BYTEPATTERNS.stop, True) self._log.info("Disabling Activity Char notifications status...") - self._desc_activity.write(bytepattern.stop, True) + self._desc_activity.write(BYTEPATTERNS.stop, True) self.activity_notif_enabled = False @@ -176,13 +174,13 @@ class miband(Peripheral): def _req_rdn(self): self._log.info("Requesting random number...") - self._char_auth.write(bytepattern.request_random_number) + self._char_auth.write(BYTEPATTERNS.request_random_number) self.waitForNotifications(self.timeout) def _send_enc_rdn(self, data): self._log.info("Sending encrypted random number") - cmd = bytepattern.auth_key_prefix + self._encrypt(data) + cmd = BYTEPATTERNS.auth_key_prefix + self._encrypt(data) send_cmd = struct.pack('<18s', cmd) self._char_auth.write(send_cmd) self.waitForNotifications(self.timeout) @@ -211,7 +209,7 @@ class miband(Peripheral): _type = queue_data[0] if self.heart_measure_callback and _type == QUEUE_TYPES.HEART: self.heart_measure_callback(self._parse_heart_measure(queue_data[1])) - elif self.gyro_raw_callback and _type == QUEUE_TYPES.RAW_ACCEL: + elif self.gyro_raw_callback and _type == QUEUE_TYPES.RAW_GYRO: self.gyro_raw_callback(self._parse_raw_gyro(queue_data[1])) elif self.gyro_avg_callback and _type == QUEUE_TYPES.AVG_GYRO: self.gyro_avg_callback(self._parse_avg_gyro(queue_data[1])) @@ -270,7 +268,7 @@ class miband(Peripheral): # '255' means 'continuous vibration' # I've arbitrarily assigned the otherwise pointless value of '0' to indicate 'stop_vibration' # These modes do not require pulse timing to avoid strange behavior. - self.write_cmd(self._char_alert, bytepattern.vibration(value), queued=True) + self.write_cmd(self._char_alert, BYTEPATTERNS.vibration(value), queued=True) else: # A value of '150' will vibrate for ~200ms, hence vibration_scaler. # This isn't exact however, but does leave a ~5ms gap between pulses. @@ -280,7 +278,7 @@ class miband(Peripheral): vibration_scaler = 0.75 ms = round(value / vibration_scaler) vibration_duration = ms / 1000 - self.write_cmd(self._char_alert, bytepattern.vibration(value), queued=True) + self.write_cmd(self._char_alert, BYTEPATTERNS.vibration(value), queued=True) time.sleep(vibration_duration) @@ -306,26 +304,25 @@ class miband(Peripheral): def send_gyro_start(self, sensitivity): if not self.gyro_started_flag: self._log.info("Starting gyro...") - self.write_req(self._sensor_handle, bytepattern.start) - self.write_req(self._steps_handle, bytepattern.start) - self.write_req(self._hz_handle, bytepattern.start) + self.write_req(self._sensor_handle, BYTEPATTERNS.start) + self.write_req(self._steps_handle, BYTEPATTERNS.start) + self.write_req(self._hz_handle, BYTEPATTERNS.start) self.gyro_started_flag = True - #self.write_cmd(self._char_sensor, bytepattern.gyro_start(sensitivity)) - self.write_cmd(self._char_sensor, bytes.fromhex('010119')) - self.write_req(self._sensor_handle, bytepattern.stop) + self.write_cmd(self._char_sensor, BYTEPATTERNS.gyro_start(sensitivity)) + self.write_req(self._sensor_handle, BYTEPATTERNS.stop) self.write_cmd(self._char_sensor, b'\x02') def send_heart_measure_start(self): self._log.info("Starting heart measure...") - self.write_cmd(self._char_heart_ctrl, bytepattern.stop_heart_measure_manual, response=True) - self.write_cmd(self._char_heart_ctrl, bytepattern.stop_heart_measure_continues, response=True) - self.write_req(self._heart_measure_handle, bytepattern.start) - self.write_cmd(self._char_heart_ctrl, bytepattern.start_heart_measure_continues, response=True) + self.write_cmd(self._char_heart_ctrl, BYTEPATTERNS.stop_heart_measure_manual, response=True) + self.write_cmd(self._char_heart_ctrl, BYTEPATTERNS.stop_heart_measure_continues, response=True) + self.write_req(self._heart_measure_handle, BYTEPATTERNS.start) + self.write_cmd(self._char_heart_ctrl, BYTEPATTERNS.start_heart_measure_continues, response=True) def send_heart_measure_keepalive(self): - self.write_cmd(self._char_heart_ctrl, bytepattern.heart_measure_keepalive, response=True) + self.write_cmd(self._char_heart_ctrl, BYTEPATTERNS.heart_measure_keepalive, response=True) def start_heart_and_gyro(self, sensitivity, callback): @@ -333,7 +330,7 @@ class miband(Peripheral): self.gyro_raw_callback = callback self.send_gyro_start(sensitivity) - #self.send_heart_measure_start() + self.send_heart_measure_start() heartbeat_time = time.time() while True: diff --git a/vibrate.py b/vibrate.py index 42cd65a..0f1896e 100644 --- a/vibrate.py +++ b/vibrate.py @@ -7,7 +7,7 @@ band = None # Notes: # The miband4 does not (seem to) support different vibration intensities, rather the values sent (2-255) # represent how long the vibration motor runs. A value of 30 roughly corresponds to 60ms of motor run time. -# Sending a value of 255 triggers continuous vibration. +# Sending a value of 255 triggecd rs continuous vibration. # Currently "continuous" mode doesn't work, as it doesn't turn off. # This will be fixed shortly. @@ -59,8 +59,8 @@ def timed_vibration(settings): def generate_random_vibration_pattern(pulse_count): #pulse_duration_range and pulse_interval_range_ms are arbitrary pulse_duration_range = { - 'low': 60, - 'high': 100 + 'low': 80, + 'high': 120 } pulse_interval_range_ms = { 'low': 100,