gattlib-py/examples/nordic_thingy: Add example for sound support

pull/118/head
Olivier Martin 2019-07-14 14:10:47 +02:00 committed by Olivier Martin
parent e5daed3484
commit 4e9b1094d4
4 changed files with 321 additions and 178 deletions

View File

@ -0,0 +1,183 @@
import struct
import sys
import threading
from dbus.mainloop.glib import DBusGMainLoop
try:
from gi.repository import GLib, GObject
except ImportError:
import gobject as GObject
import sys
import numpy
from matplotlib.pylab import *
from mpl_toolkits.axes_grid1 import host_subplot
import matplotlib.animation as animation
from gattlib import uuid
last_measures = {
'temperature': { 'value': None, 'min': None, 'max': None },
'pressure': { 'value': None, 'min': None, 'max': None },
'humidity': { 'value': None, 'min': None, 'max': None },
}
def temperature_notification(value, user_data):
last_measures['temperature']['value'] = float("%d.%d" % (value[0], value[1]))
print("Temperature: %f" % last_measures['temperature']['value'])
def pressure_notification(value, user_data):
(pressure_integer, pressure_decimal) = struct.unpack("<IB", value)
last_measures['pressure']['value'] = float("%d.%d" % (pressure_integer, pressure_decimal))
print("Pressure: %f" % last_measures['pressure']['value'])
def humidity_notification(value, user_data):
last_measures['humidity']['value'] = value[0]
print("Humidity: %d%%" % last_measures['humidity']['value'])
# Data Placeholders
temperature = zeros(0)
humidity = zeros(0)
t = zeros(0)
x = 0.0
xmax = 1000.0
temp_line = None
hum_line = None
ax_temp = None
ax_hum = None
simulation = None
def graph_init():
global x
global temperature, humidity, t
global temp_line, hum_line
global ax_temp, ax_hum
global simulation
font = {'size' : 9}
matplotlib.rc('font', **font)
# Setup figure and subplots
f0 = figure(num=0, figsize=(12, 8)) # , dpi = 100)
f0.suptitle("Nordic Thingy", fontsize=12)
ax_temp = host_subplot(111)
ax_hum = ax_temp.twinx()
# Set titles of subplots
ax_temp.set_title('Temperature/Humidity vs Time')
# set y-limits
ax_temp.set_ylim(0, 45)
ax_hum.set_ylim(0, 100)
# sex x-limits
ax_temp.set_xlim(0, xmax)
ax_hum.set_xlim(0, xmax)
# Turn on grids
ax_temp.grid(True)
# set label names
ax_temp.set_xlabel("t")
ax_temp.set_ylabel("temperature")
ax_hum.set_ylabel("humidity")
temp_line, = ax_temp.plot(t, temperature, 'b-', label="temperature")
hum_line, = ax_hum.plot(t, humidity, 'g-', label="humidity")
# set lagends
ax_temp.legend([temp_line, hum_line], [temp_line.get_label(), hum_line.get_label()])
# interval: draw new frame every 'interval' ms
# Note: We expose simulation to prevent Python garbage collector to rmeove it!
simulation = animation.FuncAnimation(f0, graph_update, blit=False, interval=20)
plt.show()
def graph_update(self):
global x, xmax
global temperature, humidity, t
global temp_line, hum_line
if last_measures['temperature']['value']:
temperature = append(temperature, last_measures['temperature']['value'])
if last_measures['temperature']['min']:
last_measures['temperature']['min'] = min(last_measures['temperature']['min'], last_measures['temperature']['value'] - 5)
else:
last_measures['temperature']['min'] = last_measures['temperature']['value'] - 5
if last_measures['temperature']['max']:
last_measures['temperature']['max'] = max(last_measures['temperature']['max'], last_measures['temperature']['value'] + 5)
else:
last_measures['temperature']['max'] = last_measures['temperature']['value'] + 5
ax_temp.set_ylim(last_measures['temperature']['min'], last_measures['temperature']['max'])
if last_measures['humidity']['value']:
humidity = append(humidity, last_measures['humidity']['value'])
if last_measures['humidity']['min']:
last_measures['humidity']['min'] = min(last_measures['humidity']['min'], last_measures['humidity']['value'] - 20)
else:
last_measures['humidity']['min'] = last_measures['humidity']['value'] - 20
if last_measures['humidity']['max']:
last_measures['humidity']['max'] = max(last_measures['humidity']['max'], last_measures['humidity']['value'] + 20)
else:
last_measures['humidity']['max'] = last_measures['humidity']['value'] + 20
ax_hum.set_ylim(last_measures['humidity']['min'], last_measures['humidity']['max'])
t = append(t, x)
x += 0.05
temp_line.set_data(t, temperature)
hum_line.set_data(t, humidity)
if x >= xmax - 1.00:
temp_line.axes.set_xlim(x - xmax + 1.0, x + 1.0)
hum_line.axes.set_xlim(x - xmax + 1.0, x + 1.0)
return temp_line, hum_line
def environment_service(args, gatt_device):
NORDIC_THINGY_WEATHER_STATION_SERVICE = uuid.gattlib_uuid_str_to_int("EF680200-9B35-4933-9B10-52FFA9740042")
NORDIC_THINGY_TEMPERATURE_CHAR = uuid.gattlib_uuid_str_to_int("EF680201-9B35-4933-9B10-52FFA9740042")
NORDIC_THINGY_PRESSURE_CHAR = uuid.gattlib_uuid_str_to_int("EF680202-9B35-4933-9B10-52FFA9740042")
NORDIC_THINGY_HUMIDITY_CHAR = uuid.gattlib_uuid_str_to_int("EF680203-9B35-4933-9B10-52FFA9740042")
NORDIC_THINGY_AIR_QUALITY_CHAR = uuid.gattlib_uuid_str_to_int("EF680204-9B35-4933-9B10-52FFA9740042")
temperature_characteristic = gatt_device.characteristics[NORDIC_THINGY_TEMPERATURE_CHAR]
pressure_characteristic = gatt_device.characteristics[NORDIC_THINGY_PRESSURE_CHAR]
humidity_characteristic = gatt_device.characteristics[NORDIC_THINGY_HUMIDITY_CHAR]
air_quality_characteristic = gatt_device.characteristics[NORDIC_THINGY_AIR_QUALITY_CHAR]
# Initialize graph
threading.Thread(target=graph_init).start()
try:
DBusGMainLoop(set_as_default=True)
mainloop = GLib.MainLoop()
temperature_characteristic.register_notification(temperature_notification)
temperature_characteristic.notification_start()
pressure_characteristic.register_notification(pressure_notification)
pressure_characteristic.notification_start()
humidity_characteristic.register_notification(humidity_notification)
humidity_characteristic.notification_start()
mainloop.run()
except KeyboardInterrupt:
mainloop.quit()
finally:
humidity_characteristic.notification_stop()
pressure_characteristic.notification_stop()
temperature_characteristic.notification_stop()
gatt_device.disconnect()

View File

@ -1,194 +1,32 @@
#!/usr/bin/env python3
import argparse
import struct
import sys
import threading
from dbus.mainloop.glib import DBusGMainLoop
try:
from gi.repository import GLib, GObject
except ImportError:
import gobject as GObject
import sys
import numpy
from matplotlib.pylab import *
from mpl_toolkits.axes_grid1 import host_subplot
import matplotlib.animation as animation
from gattlib import device, uuid
last_measures = {
'temperature': { 'value': None, 'min': None, 'max': None },
'pressure': { 'value': None, 'min': None, 'max': None },
'humidity': { 'value': None, 'min': None, 'max': None },
}
def temperature_notification(value, user_data):
last_measures['temperature']['value'] = float("%d.%d" % (value[0], value[1]))
print("Temperature: %f" % last_measures['temperature']['value'])
def pressure_notification(value, user_data):
(pressure_integer, pressure_decimal) = struct.unpack("<IB", value)
last_measures['pressure']['value'] = float("%d.%d" % (pressure_integer, pressure_decimal))
print("Pressure: %f" % last_measures['pressure']['value'])
def humidity_notification(value, user_data):
last_measures['humidity']['value'] = value[0]
print("Humidity: %d%%" % last_measures['humidity']['value'])
# Data Placeholders
temperature = zeros(0)
humidity = zeros(0)
t = zeros(0)
x = 0.0
xmax = 1000.0
temp_line = None
hum_line = None
ax_temp = None
ax_hum = None
simulation = None
def graph_init():
global x
global temperature, humidity, t
global temp_line, hum_line
global ax_temp, ax_hum
global simulation
font = {'size' : 9}
matplotlib.rc('font', **font)
# Setup figure and subplots
f0 = figure(num=0, figsize=(12, 8)) # , dpi = 100)
f0.suptitle("Nordic Thingy", fontsize=12)
ax_temp = host_subplot(111)
ax_hum = ax_temp.twinx()
# Set titles of subplots
ax_temp.set_title('Temperature/Humidity vs Time')
# set y-limits
ax_temp.set_ylim(0, 45)
ax_hum.set_ylim(0, 100)
# sex x-limits
ax_temp.set_xlim(0, xmax)
ax_hum.set_xlim(0, xmax)
# Turn on grids
ax_temp.grid(True)
# set label names
ax_temp.set_xlabel("t")
ax_temp.set_ylabel("temperature")
ax_hum.set_ylabel("humidity")
temp_line, = ax_temp.plot(t, temperature, 'b-', label="temperature")
hum_line, = ax_hum.plot(t, humidity, 'g-', label="humidity")
# set lagends
ax_temp.legend([temp_line, hum_line], [temp_line.get_label(), hum_line.get_label()])
# interval: draw new frame every 'interval' ms
# Note: We expose simulation to prevent Python garbage collector to rmeove it!
simulation = animation.FuncAnimation(f0, graph_update, blit=False, interval=20)
plt.show()
def graph_update(self):
global x, xmax
global temperature, humidity, t
global temp_line, hum_line
if last_measures['temperature']['value']:
temperature = append(temperature, last_measures['temperature']['value'])
if last_measures['temperature']['min']:
last_measures['temperature']['min'] = min(last_measures['temperature']['min'], last_measures['temperature']['value'] - 5)
else:
last_measures['temperature']['min'] = last_measures['temperature']['value'] - 5
if last_measures['temperature']['max']:
last_measures['temperature']['max'] = max(last_measures['temperature']['max'], last_measures['temperature']['value'] + 5)
else:
last_measures['temperature']['max'] = last_measures['temperature']['value'] + 5
ax_temp.set_ylim(last_measures['temperature']['min'], last_measures['temperature']['max'])
if last_measures['humidity']['value']:
humidity = append(humidity, last_measures['humidity']['value'])
if last_measures['humidity']['min']:
last_measures['humidity']['min'] = min(last_measures['humidity']['min'], last_measures['humidity']['value'] - 20)
else:
last_measures['humidity']['min'] = last_measures['humidity']['value'] - 20
if last_measures['humidity']['max']:
last_measures['humidity']['max'] = max(last_measures['humidity']['max'], last_measures['humidity']['value'] + 20)
else:
last_measures['humidity']['max'] = last_measures['humidity']['value'] + 20
ax_hum.set_ylim(last_measures['humidity']['min'], last_measures['humidity']['max'])
t = append(t, x)
x += 0.05
temp_line.set_data(t, temperature)
hum_line.set_data(t, humidity)
if x >= xmax - 1.00:
temp_line.axes.set_xlim(x - xmax + 1.0, x + 1.0)
hum_line.axes.set_xlim(x - xmax + 1.0, x + 1.0)
return temp_line, hum_line
from gattlib import device
from environment_service import environment_service
from sound_service import sound_service
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Gattlib example for Nordic Thingy')
parser.add_argument('mac', type=str, help='Mac Address of the GATT device to connect')
subparsers = parser.add_subparsers(help='sub-command help')
environment_parser = subparsers.add_parser('environment', help='Environment Command')
environment_parser.set_defaults(func=environment_service)
sound_parser = subparsers.add_parser('sound', help='Sound Command')
sound_parser.add_argument('--wav', type=str, help='WAV file to play')
sound_parser.set_defaults(func=sound_service)
args = parser.parse_args()
NORDIC_THINGY_WEATHER_STATION_SERVICE = uuid.gattlib_uuid_str_to_int("EF680200-9B35-4933-9B10-52FFA9740042")
NORDIC_THINGY_TEMPERATURE_CHAR = uuid.gattlib_uuid_str_to_int("EF680201-9B35-4933-9B10-52FFA9740042")
NORDIC_THINGY_PRESSURE_CHAR = uuid.gattlib_uuid_str_to_int("EF680202-9B35-4933-9B10-52FFA9740042")
NORDIC_THINGY_HUMIDITY_CHAR = uuid.gattlib_uuid_str_to_int("EF680203-9B35-4933-9B10-52FFA9740042")
NORDIC_THINGY_AIR_QUALITY_CHAR = uuid.gattlib_uuid_str_to_int("EF680204-9B35-4933-9B10-52FFA9740042")
if not hasattr(args, 'func'):
raise RuntimeError("Please specify the command to launch: 'environment', 'sound'")
gatt_device = device.Device(adapter=None, addr=args.mac)
gatt_device.connect()
gatt_device.discover()
temperature_characteristic = gatt_device.characteristics[NORDIC_THINGY_TEMPERATURE_CHAR]
pressure_characteristic = gatt_device.characteristics[NORDIC_THINGY_PRESSURE_CHAR]
humidity_characteristic = gatt_device.characteristics[NORDIC_THINGY_HUMIDITY_CHAR]
air_quality_characteristic = gatt_device.characteristics[NORDIC_THINGY_AIR_QUALITY_CHAR]
# Initialize graph
threading.Thread(target=graph_init).start()
try:
DBusGMainLoop(set_as_default=True)
mainloop = GLib.MainLoop()
temperature_characteristic.register_notification(temperature_notification)
temperature_characteristic.notification_start()
pressure_characteristic.register_notification(pressure_notification)
pressure_characteristic.notification_start()
humidity_characteristic.register_notification(humidity_notification)
humidity_characteristic.notification_start()
mainloop.run()
except KeyboardInterrupt:
mainloop.quit()
finally:
humidity_characteristic.notification_stop()
pressure_characteristic.notification_stop()
temperature_characteristic.notification_stop()
gatt_device.disconnect()
# Launch the sub-command specific function
args.func(args, gatt_device)

View File

@ -0,0 +1,122 @@
import time
import threading
import wave
from gattlib import uuid
from dbus.mainloop.glib import DBusGMainLoop
try:
from gi.repository import GLib, GObject
except ImportError:
import gobject as GObject
m_thingy_buffer_free = threading.Event()
m_mainloop = None
def speaker_status_notification(value, user_data):
global m_thingy_buffer_free
if value == b'\x01':
print("Thingy's Buffer warning")
m_thingy_buffer_free.clear()
elif value == b'\x02':
print("Thingy's Buffer ready")
m_thingy_buffer_free.set()
elif value == b'\x10':
print("Thingy's Packet disregarded")
elif value == b'\x11':
print("Thingy's Invalid command")
elif value == b'\x00':
print("Thingy's Finished")
else:
raise RuntimeError("Invalid Speaker notification value: %s" % value)
def play_sample(config_characteristic, speaker_characteristic):
# Read the current configuration and only change the speaker configuration (not the microphone configuration)
sound_config = config_characteristic.read()
sound_config[0] = 0x03
config_characteristic.write(sound_config)
# Test speaker
speaker_characteristic.write(b'\x03')
m_mainloop.quit()
def play_wav_file(config_characteristic, speaker_characteristic, wav_filepath):
global m_thingy_buffer_free
wav_file = wave.open(wav_filepath)
# Python library only support non-compressed WAV file
if wav_file.getcomptype() != 'NONE':
raise RuntimeError("Please give a non-compressed WAV file")
if wav_file.getsampwidth() != 1:
raise RuntimeError("Nordic Thingy52 only supports 8-bit WAV file")
if wav_file.getframerate() != 8000:
raise RuntimeError("Nordic Thingy52 only supports 8kHz WAV file")
if wav_file.getnchannels() == 2:
print("Warning: Your WAV file is a stereo file")
frames = wav_file.readframes(wav_file.getnframes())
# Read the current configuration and only change the speaker configuration (not the microphone configuration)
sound_config = config_characteristic.read()
sound_config[0] = 0x02
config_characteristic.write(sound_config)
stream = speaker_characteristic.stream_open()
# We assume the buffer is free when we start
m_thingy_buffer_free.set()
# We send one frame at a time
max_frame_size = stream.mtu * 1
while len(frames) > 0:
if not m_thingy_buffer_free.is_set():
m_thingy_buffer_free.wait()
stream.write(frames[0:max_frame_size])
frames = frames[max_frame_size:]
# Arbitraty value
time.sleep(0.03)
stream.close()
print("All WAV file has been sent")
m_mainloop.quit()
def sound_service(args, gatt_device):
global m_mainloop
NORDIC_THINGY_SOUND_SERVICE = uuid.gattlib_uuid_str_to_int("EF680500-9B35-4933-9B10-52FFA9740042")
NORDIC_THINGY_CONFIG_CHAR = uuid.gattlib_uuid_str_to_int("EF680501-9B35-4933-9B10-52FFA9740042")
NORDIC_THINGY_SPEAKER_CHAR = uuid.gattlib_uuid_str_to_int("EF680502-9B35-4933-9B10-52FFA9740042")
NORDIC_THINGY_SPEAKER_STATUS_CHAR = uuid.gattlib_uuid_str_to_int("EF680503-9B35-4933-9B10-52FFA9740042")
NORDIC_THINGY_MICROPHONE_CHAR = uuid.gattlib_uuid_str_to_int("EF680504-9B35-4933-9B10-52FFA9740042")
config_characteristic = gatt_device.characteristics[NORDIC_THINGY_CONFIG_CHAR]
speaker_characteristic = gatt_device.characteristics[NORDIC_THINGY_SPEAKER_CHAR]
speaker_status_characteristic = gatt_device.characteristics[NORDIC_THINGY_SPEAKER_STATUS_CHAR]
microphone_characteristic = gatt_device.characteristics[NORDIC_THINGY_MICROPHONE_CHAR]
try:
DBusGMainLoop(set_as_default=True)
m_mainloop = GLib.MainLoop()
speaker_status_characteristic.register_notification(speaker_status_notification)
speaker_status_characteristic.notification_start()
if args.wav:
threading.Thread(target=play_wav_file, args=(config_characteristic, speaker_characteristic, args.wav)).start()
else:
threading.Thread(target=play_sample, args=(config_characteristic, speaker_characteristic)).start()
m_mainloop.run()
except KeyboardInterrupt:
m_mainloop.quit()
finally:
speaker_status_characteristic.notification_stop()
gatt_device.disconnect()