gatt_notification: Use thread pool of one thread to ensure serial processing of notification

pull/266/head
Olivier Martin 2024-02-14 14:08:48 +01:00 committed by Olivier Martin
parent db8aee543b
commit 7f6979c82b
3 changed files with 60 additions and 27 deletions

View File

@ -48,12 +48,13 @@ struct gattlib_notification_device_thread_args {
size_t data_length;
};
static gpointer _gattlib_notification_device_thread(gpointer data) {
void gattlib_notification_device_thread(gpointer data, gpointer user_data) {
struct gattlib_notification_device_thread_args* args = data;
struct gattlib_handler* handler = user_data;
args->connection->notification.callback.notification_handler(
handler->callback.notification_handler(
args->uuid, args->data, args->data_length,
args->connection->notification.user_data
handler->user_data
);
if (args->uuid != NULL) {
@ -62,15 +63,9 @@ static gpointer _gattlib_notification_device_thread(gpointer data) {
if (args->data != NULL) {
free(args->data);
}
return NULL;
}
static void* _notification_device_thread_args_allocator(va_list args) {
gatt_connection_t* connection = va_arg(args, gatt_connection_t*);
const uuid_t* uuid = va_arg(args, const uuid_t*);
const uint8_t* data = va_arg(args, const uint8_t*);
size_t data_length = va_arg(args, size_t);
static void* _notification_device_thread_args_allocator(gatt_connection_t* connection, const uuid_t* uuid, const uint8_t* data, size_t data_length) {
struct gattlib_notification_device_thread_args* thread_args = malloc(sizeof(struct gattlib_notification_device_thread_args));
thread_args->connection = connection;
thread_args->uuid = malloc(sizeof(uuid_t));
@ -87,11 +82,17 @@ static void* _notification_device_thread_args_allocator(va_list args) {
}
void gattlib_on_gatt_notification(gatt_connection_t* connection, const uuid_t* uuid, const uint8_t* data, size_t data_length) {
gattlib_handler_dispatch_to_thread(
&connection->on_connection,
gattlib_notification_device_python_callback /* python_callback */,
_gattlib_notification_device_thread /* thread_func */,
"gattlib_notification_device" /* thread_name */,
_notification_device_thread_args_allocator /* thread_args_allocator */,
connection, uuid, data, data_length);
GError *error = NULL;
assert(connection->notification.thread_pool != NULL);
void* arg = _notification_device_thread_args_allocator(connection, uuid, data, data_length);
if (arg == NULL) {
GATTLIB_LOG(GATTLIB_ERROR, "gattlib_on_gatt_notification: Failed to allocate arguments for thread");
return;
}
g_thread_pool_push(connection->notification.thread_pool, arg, &error);
if (error != NULL) {
GATTLIB_LOG(GATTLIB_ERROR, "gattlib_on_gatt_notification: Failed to push thread in pool: %s", error->message);
}
}

View File

@ -9,21 +9,47 @@
#include "gattlib_internal.h"
int gattlib_register_notification(gatt_connection_t* connection, gattlib_event_handler_t notification_handler, void* user_data) {
GError *error = NULL;
if (connection == NULL) {
return GATTLIB_INVALID_PARAMETER;
}
connection->notification.callback.notification_handler = notification_handler;
connection->notification.user_data = user_data;
return GATTLIB_SUCCESS;
connection->notification.thread_pool = g_thread_pool_new(
gattlib_notification_device_thread,
&connection->notification,
1 /* max_threads */, FALSE /* exclusive */, &error);
if (error != NULL) {
GATTLIB_LOG(GATTLIB_ERROR, "gattlib_register_notification: Failed to create thread pool: %s", error->message);
return GATTLIB_ERROR_INTERNAL;
} else {
assert(connection->notification.thread_pool != NULL);
return GATTLIB_SUCCESS;
}
}
int gattlib_register_indication(gatt_connection_t* connection, gattlib_event_handler_t indication_handler, void* user_data) {
GError *error = NULL;
if (connection == NULL) {
return GATTLIB_INVALID_PARAMETER;
}
connection->indication.callback.notification_handler = indication_handler;
connection->indication.user_data = user_data;
return GATTLIB_SUCCESS;
connection->indication.thread_pool = g_thread_pool_new(
gattlib_notification_device_thread,
&connection->indication,
1 /* max_threads */, FALSE /* exclusive */, &error);
if (error != NULL) {
GATTLIB_LOG(GATTLIB_ERROR, "gattlib_register_indication: Failed to create thread pool: %s", error->message);
return GATTLIB_ERROR_INTERNAL;
} else {
return GATTLIB_SUCCESS;
}
}
int gattlib_register_on_disconnect(gatt_connection_t *connection, gattlib_disconnection_handler_t handler, void* user_data) {
@ -119,16 +145,18 @@ void gattlib_handler_free(struct gattlib_handler* handler) {
// Reset callback to stop calling it after we stopped
handler->callback.callback = NULL;
if (handler->python_args == NULL) {
return;
if (handler->python_args != NULL) {
struct gattlib_python_args* args = handler->python_args;
Py_DECREF(args->callback);
Py_DECREF(args->args);
handler->python_args = NULL;
free(handler->python_args);
}
struct gattlib_python_args* args = handler->python_args;
Py_DECREF(args->callback);
Py_DECREF(args->args);
free(args);
handler->python_args = NULL;
if (handler->thread_pool != NULL) {
g_thread_pool_free(handler->thread_pool, FALSE /* immediate */, TRUE /* wait */);
handler->thread_pool = NULL;
}
}
bool gattlib_has_valid_handler(struct gattlib_handler* handler) {

View File

@ -33,6 +33,8 @@ struct gattlib_handler {
void* user_data;
// We create a thread to ensure the callback is not blocking the mainloop
GThread *thread;
// Thread pool
GThreadPool *thread_pool;
// In case of Python callback and argument, we keep track to free it when we stopped to discover BLE devices
void* python_args;
};
@ -52,6 +54,8 @@ void gattlib_handler_dispatch_to_thread(struct gattlib_handler* handler, void (*
void gattlib_handler_free(struct gattlib_handler* handler);
bool gattlib_has_valid_handler(struct gattlib_handler* handler);
void gattlib_notification_device_thread(gpointer data, gpointer user_data);
#if defined(WITH_PYTHON)
// Callback used by Python to create arguments used by native callback
void* gattlib_python_callback_args(PyObject* python_callback, PyObject* python_args);