Use ipc queue for all messages (#3585)
I was able to reproduce #3579 in Linux by running: `sudo sysctl net.core.wmem_default=10000` If a subscription message was too big to be sent at once, it was possible to break a client by sending a reply to an other message sent by the client. Eg: - Write 8192 out of 11612 bytes of a workspace event. - Blockingly write the reply to a workspace change message. - Write the rest 3420 bytes of the workspace event. This commit fixes this by utilizing the ipc queue for all types of writes. ipc_receive_message can only be called from a callback started in ipc_new_client. This callback uses the same file descriptor with the client also created in ipc_new_client. When the client is deleted, the read callback is now also stopped. Thus, we can assume that whenever ipc_receive_message is called, the corresponding client should still exist. - ipc_client now contains pointers to both write and read watchers. When freed, a client will stop both of them. - IPC_HANDLERs now work with ipc_clients instead of fds. Fixes #3579.
This commit is contained in:
parent
e2e964151e
commit
5aa0459be0
|
@ -35,7 +35,8 @@ typedef struct ipc_client {
|
|||
* event has been sent by i3. */
|
||||
bool first_tick_sent;
|
||||
|
||||
struct ev_io *callback;
|
||||
struct ev_io *read_callback;
|
||||
struct ev_io *write_callback;
|
||||
struct ev_timer *timeout;
|
||||
uint8_t *buffer;
|
||||
size_t buffer_size;
|
||||
|
@ -54,12 +55,12 @@ typedef struct ipc_client {
|
|||
* message_type is the type of the message as the sender specified it.
|
||||
*
|
||||
*/
|
||||
typedef void (*handler_t)(int, uint8_t *, int, uint32_t, uint32_t);
|
||||
typedef void (*handler_t)(ipc_client *, uint8_t *, int, uint32_t, uint32_t);
|
||||
|
||||
/* Macro to declare a callback */
|
||||
#define IPC_HANDLER(name) \
|
||||
static void handle_##name(int fd, uint8_t *message, \
|
||||
int size, uint32_t message_size, \
|
||||
#define IPC_HANDLER(name) \
|
||||
static void handle_##name(ipc_client *client, uint8_t *message, \
|
||||
int size, uint32_t message_size, \
|
||||
uint32_t message_type)
|
||||
|
||||
/**
|
||||
|
|
221
src/ipc.c
221
src/ipc.c
|
@ -38,46 +38,6 @@ static void set_nonblock(int sockfd) {
|
|||
err(-1, "Could not set O_NONBLOCK");
|
||||
}
|
||||
|
||||
/*
|
||||
* Given a message and a message type, create the corresponding header, merge it
|
||||
* with the message and append it to the given client's output buffer.
|
||||
*
|
||||
*/
|
||||
static void append_payload(ipc_client *client, uint32_t message_type, const char *payload) {
|
||||
const size_t size = strlen(payload);
|
||||
const i3_ipc_header_t header = {
|
||||
.magic = {'i', '3', '-', 'i', 'p', 'c'},
|
||||
.size = size,
|
||||
.type = message_type};
|
||||
const size_t header_size = sizeof(i3_ipc_header_t);
|
||||
const size_t message_size = header_size + size;
|
||||
|
||||
client->buffer = srealloc(client->buffer, client->buffer_size + message_size);
|
||||
memcpy(client->buffer + client->buffer_size, ((void *)&header), header_size);
|
||||
memcpy(client->buffer + client->buffer_size + header_size, payload, size);
|
||||
client->buffer_size += message_size;
|
||||
}
|
||||
|
||||
static void free_ipc_client(ipc_client *client) {
|
||||
close(client->fd);
|
||||
|
||||
ev_io_stop(main_loop, client->callback);
|
||||
FREE(client->callback);
|
||||
if (client->timeout) {
|
||||
ev_timer_stop(main_loop, client->timeout);
|
||||
FREE(client->timeout);
|
||||
}
|
||||
|
||||
free(client->buffer);
|
||||
|
||||
for (int i = 0; i < client->num_events; i++) {
|
||||
free(client->events[i]);
|
||||
}
|
||||
free(client->events);
|
||||
TAILQ_REMOVE(&all_clients, client, clients);
|
||||
free(client);
|
||||
}
|
||||
|
||||
static void ipc_client_timeout(EV_P_ ev_timer *w, int revents);
|
||||
static void ipc_socket_writeable_cb(EV_P_ struct ev_io *w, int revents);
|
||||
|
||||
|
@ -89,8 +49,8 @@ void ipc_set_kill_timeout(ev_tstamp new) {
|
|||
|
||||
/*
|
||||
* Try to write the contents of the pending buffer to the client's subscription
|
||||
* socket. Will set, reset or clear the timeout and io callbacks depending on
|
||||
* the result of the write operation.
|
||||
* socket. Will set, reset or clear the timeout and io write callbacks depending
|
||||
* on the result of the write operation.
|
||||
*
|
||||
*/
|
||||
static void ipc_push_pending(ipc_client *client) {
|
||||
|
@ -108,13 +68,13 @@ static void ipc_push_pending(ipc_client *client) {
|
|||
ev_timer_stop(main_loop, client->timeout);
|
||||
FREE(client->timeout);
|
||||
}
|
||||
ev_io_stop(main_loop, client->callback);
|
||||
ev_io_stop(main_loop, client->write_callback);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Otherwise, make sure that the io callback is enabled and create a new
|
||||
* timer if needed. */
|
||||
ev_io_start(main_loop, client->callback);
|
||||
ev_io_start(main_loop, client->write_callback);
|
||||
|
||||
if (!client->timeout) {
|
||||
struct ev_timer *timeout = scalloc(1, sizeof(struct ev_timer));
|
||||
|
@ -140,6 +100,54 @@ static void ipc_push_pending(ipc_client *client) {
|
|||
client->buffer = srealloc(client->buffer, client->buffer_size);
|
||||
}
|
||||
|
||||
/*
|
||||
* Given a message and a message type, create the corresponding header, merge it
|
||||
* with the message and append it to the given client's output buffer. Also,
|
||||
* send the message if the client's buffer was empty.
|
||||
*
|
||||
*/
|
||||
static void ipc_send_client_message(ipc_client *client, size_t size, const uint32_t message_type, const uint8_t *payload) {
|
||||
const i3_ipc_header_t header = {
|
||||
.magic = {'i', '3', '-', 'i', 'p', 'c'},
|
||||
.size = size,
|
||||
.type = message_type};
|
||||
const size_t header_size = sizeof(i3_ipc_header_t);
|
||||
const size_t message_size = header_size + size;
|
||||
|
||||
const bool push_now = (client->buffer_size == 0);
|
||||
client->buffer = srealloc(client->buffer, client->buffer_size + message_size);
|
||||
memcpy(client->buffer + client->buffer_size, ((void *)&header), header_size);
|
||||
memcpy(client->buffer + client->buffer_size + header_size, payload, size);
|
||||
client->buffer_size += message_size;
|
||||
|
||||
if (push_now) {
|
||||
ipc_push_pending(client);
|
||||
}
|
||||
}
|
||||
|
||||
static void free_ipc_client(ipc_client *client) {
|
||||
DLOG("Disconnecting client on fd %d\n", client->fd);
|
||||
close(client->fd);
|
||||
|
||||
ev_io_stop(main_loop, client->read_callback);
|
||||
FREE(client->read_callback);
|
||||
ev_io_stop(main_loop, client->write_callback);
|
||||
FREE(client->write_callback);
|
||||
if (client->timeout) {
|
||||
ev_timer_stop(main_loop, client->timeout);
|
||||
FREE(client->timeout);
|
||||
}
|
||||
|
||||
free(client->buffer);
|
||||
|
||||
for (int i = 0; i < client->num_events; i++) {
|
||||
free(client->events[i]);
|
||||
}
|
||||
free(client->events);
|
||||
TAILQ_REMOVE(&all_clients, client, clients);
|
||||
free(client);
|
||||
}
|
||||
|
||||
/*
|
||||
* Sends the specified event to all IPC clients which are currently connected
|
||||
* and subscribed to this kind of event.
|
||||
|
@ -148,21 +156,11 @@ static void ipc_push_pending(ipc_client *client) {
|
|||
void ipc_send_event(const char *event, uint32_t message_type, const char *payload) {
|
||||
ipc_client *current;
|
||||
TAILQ_FOREACH(current, &all_clients, clients) {
|
||||
/* see if this client is interested in this event */
|
||||
bool interested = false;
|
||||
for (int i = 0; i < current->num_events; i++) {
|
||||
if (strcasecmp(current->events[i], event) != 0)
|
||||
continue;
|
||||
interested = true;
|
||||
break;
|
||||
}
|
||||
if (!interested)
|
||||
continue;
|
||||
|
||||
const bool push_now = (current->buffer_size == 0);
|
||||
append_payload(current, message_type, payload);
|
||||
if (push_now) {
|
||||
ipc_push_pending(current);
|
||||
if (strcasecmp(current->events[i], event) == 0) {
|
||||
ipc_send_client_message(current, strlen(payload), message_type, (uint8_t *)payload);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -234,8 +232,8 @@ IPC_HANDLER(run_command) {
|
|||
ylength length;
|
||||
yajl_gen_get_buf(gen, &reply, &length);
|
||||
|
||||
ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_COMMAND,
|
||||
(const uint8_t *)reply);
|
||||
ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_COMMAND,
|
||||
(const uint8_t *)reply);
|
||||
|
||||
yajl_gen_free(gen);
|
||||
}
|
||||
|
@ -843,7 +841,7 @@ IPC_HANDLER(tree) {
|
|||
ylength length;
|
||||
y(get_buf, &payload, &length);
|
||||
|
||||
ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_TREE, payload);
|
||||
ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_TREE, payload);
|
||||
y(free);
|
||||
}
|
||||
|
||||
|
@ -907,7 +905,7 @@ IPC_HANDLER(get_workspaces) {
|
|||
ylength length;
|
||||
y(get_buf, &payload, &length);
|
||||
|
||||
ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_WORKSPACES, payload);
|
||||
ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_WORKSPACES, payload);
|
||||
y(free);
|
||||
}
|
||||
|
||||
|
@ -961,7 +959,7 @@ IPC_HANDLER(get_outputs) {
|
|||
ylength length;
|
||||
y(get_buf, &payload, &length);
|
||||
|
||||
ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_OUTPUTS, payload);
|
||||
ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_OUTPUTS, payload);
|
||||
y(free);
|
||||
}
|
||||
|
||||
|
@ -988,7 +986,7 @@ IPC_HANDLER(get_marks) {
|
|||
ylength length;
|
||||
y(get_buf, &payload, &length);
|
||||
|
||||
ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_MARKS, payload);
|
||||
ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_MARKS, payload);
|
||||
y(free);
|
||||
}
|
||||
|
||||
|
@ -1021,7 +1019,7 @@ IPC_HANDLER(get_version) {
|
|||
ylength length;
|
||||
y(get_buf, &payload, &length);
|
||||
|
||||
ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_VERSION, payload);
|
||||
ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_VERSION, payload);
|
||||
y(free);
|
||||
}
|
||||
|
||||
|
@ -1046,7 +1044,7 @@ IPC_HANDLER(get_bar_config) {
|
|||
ylength length;
|
||||
y(get_buf, &payload, &length);
|
||||
|
||||
ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_BAR_CONFIG, payload);
|
||||
ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_BAR_CONFIG, payload);
|
||||
y(free);
|
||||
return;
|
||||
}
|
||||
|
@ -1083,7 +1081,7 @@ IPC_HANDLER(get_bar_config) {
|
|||
ylength length;
|
||||
y(get_buf, &payload, &length);
|
||||
|
||||
ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_BAR_CONFIG, payload);
|
||||
ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_BAR_CONFIG, payload);
|
||||
y(free);
|
||||
}
|
||||
|
||||
|
@ -1105,7 +1103,7 @@ IPC_HANDLER(get_binding_modes) {
|
|||
ylength length;
|
||||
y(get_buf, &payload, &length);
|
||||
|
||||
ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_BINDING_MODES, payload);
|
||||
ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_BINDING_MODES, payload);
|
||||
y(free);
|
||||
}
|
||||
|
||||
|
@ -1144,21 +1142,6 @@ static int add_subscription(void *extra, const unsigned char *s,
|
|||
IPC_HANDLER(subscribe) {
|
||||
yajl_handle p;
|
||||
yajl_status stat;
|
||||
ipc_client *current, *client = NULL;
|
||||
|
||||
/* Search the ipc_client structure for this connection */
|
||||
TAILQ_FOREACH(current, &all_clients, clients) {
|
||||
if (current->fd != fd)
|
||||
continue;
|
||||
|
||||
client = current;
|
||||
break;
|
||||
}
|
||||
|
||||
if (client == NULL) {
|
||||
ELOG("Could not find ipc_client data structure for fd %d\n", fd);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Setup the JSON parser */
|
||||
static yajl_callbacks callbacks = {
|
||||
|
@ -1175,13 +1158,13 @@ IPC_HANDLER(subscribe) {
|
|||
yajl_free_error(p, err);
|
||||
|
||||
const char *reply = "{\"success\":false}";
|
||||
ipc_send_message(fd, strlen(reply), I3_IPC_REPLY_TYPE_SUBSCRIBE, (const uint8_t *)reply);
|
||||
ipc_send_client_message(client, strlen(reply), I3_IPC_REPLY_TYPE_SUBSCRIBE, (const uint8_t *)reply);
|
||||
yajl_free(p);
|
||||
return;
|
||||
}
|
||||
yajl_free(p);
|
||||
const char *reply = "{\"success\":true}";
|
||||
ipc_send_message(fd, strlen(reply), I3_IPC_REPLY_TYPE_SUBSCRIBE, (const uint8_t *)reply);
|
||||
ipc_send_client_message(client, strlen(reply), I3_IPC_REPLY_TYPE_SUBSCRIBE, (const uint8_t *)reply);
|
||||
|
||||
if (client->first_tick_sent) {
|
||||
return;
|
||||
|
@ -1200,7 +1183,7 @@ IPC_HANDLER(subscribe) {
|
|||
|
||||
client->first_tick_sent = true;
|
||||
const char *payload = "{\"first\":true,\"payload\":\"\"}";
|
||||
ipc_send_message(client->fd, strlen(payload), I3_IPC_EVENT_TICK, (const uint8_t *)payload);
|
||||
ipc_send_client_message(client, strlen(payload), I3_IPC_EVENT_TICK, (const uint8_t *)payload);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -1220,7 +1203,7 @@ IPC_HANDLER(get_config) {
|
|||
ylength length;
|
||||
y(get_buf, &payload, &length);
|
||||
|
||||
ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_CONFIG, payload);
|
||||
ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_CONFIG, payload);
|
||||
y(free);
|
||||
}
|
||||
|
||||
|
@ -1249,7 +1232,7 @@ IPC_HANDLER(send_tick) {
|
|||
y(free);
|
||||
|
||||
const char *reply = "{\"success\":true}";
|
||||
ipc_send_message(fd, strlen(reply), I3_IPC_REPLY_TYPE_TICK, (const uint8_t *)reply);
|
||||
ipc_send_client_message(client, strlen(reply), I3_IPC_REPLY_TYPE_TICK, (const uint8_t *)reply);
|
||||
DLOG("Sent tick event\n");
|
||||
}
|
||||
|
||||
|
@ -1300,7 +1283,7 @@ IPC_HANDLER(sync) {
|
|||
yajl_free_error(p, err);
|
||||
|
||||
const char *reply = "{\"success\":false}";
|
||||
ipc_send_message(fd, strlen(reply), I3_IPC_REPLY_TYPE_SYNC, (const uint8_t *)reply);
|
||||
ipc_send_client_message(client, strlen(reply), I3_IPC_REPLY_TYPE_SYNC, (const uint8_t *)reply);
|
||||
yajl_free(p);
|
||||
return;
|
||||
}
|
||||
|
@ -1309,7 +1292,7 @@ IPC_HANDLER(sync) {
|
|||
DLOG("received IPC sync request (rnd = %d, window = 0x%08x)\n", state.rnd, state.window);
|
||||
sync_respond(state.window, state.rnd);
|
||||
const char *reply = "{\"success\":true}";
|
||||
ipc_send_message(fd, strlen(reply), I3_IPC_REPLY_TYPE_SYNC, (const uint8_t *)reply);
|
||||
ipc_send_client_message(client, strlen(reply), I3_IPC_REPLY_TYPE_SYNC, (const uint8_t *)reply);
|
||||
}
|
||||
|
||||
/* The index of each callback function corresponds to the numeric
|
||||
|
@ -1343,6 +1326,8 @@ static void ipc_receive_message(EV_P_ struct ev_io *w, int revents) {
|
|||
uint32_t message_type;
|
||||
uint32_t message_length;
|
||||
uint8_t *message = NULL;
|
||||
ipc_client *client = (ipc_client *)w->data;
|
||||
assert(client->fd == w->fd);
|
||||
|
||||
int ret = ipc_recv_message(w->fd, &message_type, &message_length, &message);
|
||||
/* EOF or other error */
|
||||
|
@ -1355,25 +1340,8 @@ static void ipc_receive_message(EV_P_ struct ev_io *w, int revents) {
|
|||
|
||||
/* If not, there was some kind of error. We don’t bother and close the
|
||||
* connection. Delete the client from the list of clients. */
|
||||
bool closed = false;
|
||||
ipc_client *current;
|
||||
TAILQ_FOREACH(current, &all_clients, clients) {
|
||||
if (current->fd != w->fd)
|
||||
continue;
|
||||
|
||||
free_ipc_client(current);
|
||||
closed = true;
|
||||
break;
|
||||
}
|
||||
if (!closed) {
|
||||
close(w->fd);
|
||||
}
|
||||
|
||||
ev_io_stop(EV_A_ w);
|
||||
free(w);
|
||||
free_ipc_client(client);
|
||||
FREE(message);
|
||||
|
||||
DLOG("IPC: client disconnected\n");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1381,7 +1349,7 @@ static void ipc_receive_message(EV_P_ struct ev_io *w, int revents) {
|
|||
DLOG("Unhandled message type: %d\n", message_type);
|
||||
else {
|
||||
handler_t h = handlers[message_type];
|
||||
h(w->fd, message, 0, message_length, message_type);
|
||||
h(client, message, 0, message_length, message_type);
|
||||
}
|
||||
|
||||
FREE(message);
|
||||
|
@ -1453,36 +1421,33 @@ static void ipc_socket_writeable_cb(EV_P_ ev_io *w, int revents) {
|
|||
void ipc_new_client(EV_P_ struct ev_io *w, int revents) {
|
||||
struct sockaddr_un peer;
|
||||
socklen_t len = sizeof(struct sockaddr_un);
|
||||
int client;
|
||||
if ((client = accept(w->fd, (struct sockaddr *)&peer, &len)) < 0) {
|
||||
if (errno == EINTR)
|
||||
return;
|
||||
else
|
||||
int fd;
|
||||
if ((fd = accept(w->fd, (struct sockaddr *)&peer, &len)) < 0) {
|
||||
if (errno != EINTR) {
|
||||
perror("accept()");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/* Close this file descriptor on exec() */
|
||||
(void)fcntl(client, F_SETFD, FD_CLOEXEC);
|
||||
(void)fcntl(fd, F_SETFD, FD_CLOEXEC);
|
||||
|
||||
set_nonblock(client);
|
||||
set_nonblock(fd);
|
||||
|
||||
struct ev_io *package = scalloc(1, sizeof(struct ev_io));
|
||||
ev_io_init(package, ipc_receive_message, client, EV_READ);
|
||||
ev_io_start(EV_A_ package);
|
||||
ipc_client *client = scalloc(1, sizeof(ipc_client));
|
||||
client->fd = fd;
|
||||
|
||||
ipc_client *new = scalloc(1, sizeof(ipc_client));
|
||||
client->read_callback = scalloc(1, sizeof(struct ev_io));
|
||||
client->read_callback->data = client;
|
||||
ev_io_init(client->read_callback, ipc_receive_message, fd, EV_READ);
|
||||
ev_io_start(EV_A_ client->read_callback);
|
||||
|
||||
package = scalloc(1, sizeof(struct ev_io));
|
||||
package->data = new;
|
||||
ev_io_init(package, ipc_socket_writeable_cb, client, EV_WRITE);
|
||||
client->write_callback = scalloc(1, sizeof(struct ev_io));
|
||||
client->write_callback->data = client;
|
||||
ev_io_init(client->write_callback, ipc_socket_writeable_cb, fd, EV_WRITE);
|
||||
|
||||
DLOG("IPC: new client connected on fd %d\n", w->fd);
|
||||
|
||||
new->fd = client;
|
||||
new->callback = package;
|
||||
|
||||
TAILQ_INSERT_TAIL(&all_clients, new, clients);
|
||||
TAILQ_INSERT_TAIL(&all_clients, client, clients);
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
Loading…
Reference in New Issue