Use ipc queue for all messages (#3585)

I was able to reproduce #3579 in Linux by running:
`sudo sysctl net.core.wmem_default=10000`

If a subscription message was too big to be sent at once, it was
possible to break a client by sending a reply to an other message sent
by the client. Eg:
- Write 8192 out of 11612 bytes of a workspace event.
- Blockingly write the reply to a workspace change message.
- Write the rest 3420 bytes of the workspace event.

This commit fixes this by utilizing the ipc queue for all types of
writes.

ipc_receive_message can only be called from a callback started in
ipc_new_client. This callback uses the same file descriptor with the
client also created in ipc_new_client. When the client is deleted, the
read callback is now also stopped. Thus, we can assume that whenever
ipc_receive_message is called, the corresponding client should still
exist.

- ipc_client now contains pointers to both write and read watchers. When
freed, a client will stop both of them.
- IPC_HANDLERs now work with ipc_clients instead of fds.

Fixes #3579.
This commit is contained in:
Orestis 2019-01-12 14:13:03 +02:00 committed by Michael Stapelberg
parent cf375927f0
commit 54e7a31568
2 changed files with 99 additions and 133 deletions

View File

@ -35,7 +35,8 @@ typedef struct ipc_client {
* event has been sent by i3. */ * event has been sent by i3. */
bool first_tick_sent; bool first_tick_sent;
struct ev_io *callback; struct ev_io *read_callback;
struct ev_io *write_callback;
struct ev_timer *timeout; struct ev_timer *timeout;
uint8_t *buffer; uint8_t *buffer;
size_t buffer_size; size_t buffer_size;
@ -54,12 +55,12 @@ typedef struct ipc_client {
* message_type is the type of the message as the sender specified it. * message_type is the type of the message as the sender specified it.
* *
*/ */
typedef void (*handler_t)(int, uint8_t *, int, uint32_t, uint32_t); typedef void (*handler_t)(ipc_client *, uint8_t *, int, uint32_t, uint32_t);
/* Macro to declare a callback */ /* Macro to declare a callback */
#define IPC_HANDLER(name) \ #define IPC_HANDLER(name) \
static void handle_##name(int fd, uint8_t *message, \ static void handle_##name(ipc_client *client, uint8_t *message, \
int size, uint32_t message_size, \ int size, uint32_t message_size, \
uint32_t message_type) uint32_t message_type)
/** /**

221
src/ipc.c
View File

@ -38,46 +38,6 @@ static void set_nonblock(int sockfd) {
err(-1, "Could not set O_NONBLOCK"); err(-1, "Could not set O_NONBLOCK");
} }
/*
* Given a message and a message type, create the corresponding header, merge it
* with the message and append it to the given client's output buffer.
*
*/
static void append_payload(ipc_client *client, uint32_t message_type, const char *payload) {
const size_t size = strlen(payload);
const i3_ipc_header_t header = {
.magic = {'i', '3', '-', 'i', 'p', 'c'},
.size = size,
.type = message_type};
const size_t header_size = sizeof(i3_ipc_header_t);
const size_t message_size = header_size + size;
client->buffer = srealloc(client->buffer, client->buffer_size + message_size);
memcpy(client->buffer + client->buffer_size, ((void *)&header), header_size);
memcpy(client->buffer + client->buffer_size + header_size, payload, size);
client->buffer_size += message_size;
}
static void free_ipc_client(ipc_client *client) {
close(client->fd);
ev_io_stop(main_loop, client->callback);
FREE(client->callback);
if (client->timeout) {
ev_timer_stop(main_loop, client->timeout);
FREE(client->timeout);
}
free(client->buffer);
for (int i = 0; i < client->num_events; i++) {
free(client->events[i]);
}
free(client->events);
TAILQ_REMOVE(&all_clients, client, clients);
free(client);
}
static void ipc_client_timeout(EV_P_ ev_timer *w, int revents); static void ipc_client_timeout(EV_P_ ev_timer *w, int revents);
static void ipc_socket_writeable_cb(EV_P_ struct ev_io *w, int revents); static void ipc_socket_writeable_cb(EV_P_ struct ev_io *w, int revents);
@ -89,8 +49,8 @@ void ipc_set_kill_timeout(ev_tstamp new) {
/* /*
* Try to write the contents of the pending buffer to the client's subscription * Try to write the contents of the pending buffer to the client's subscription
* socket. Will set, reset or clear the timeout and io callbacks depending on * socket. Will set, reset or clear the timeout and io write callbacks depending
* the result of the write operation. * on the result of the write operation.
* *
*/ */
static void ipc_push_pending(ipc_client *client) { static void ipc_push_pending(ipc_client *client) {
@ -108,13 +68,13 @@ static void ipc_push_pending(ipc_client *client) {
ev_timer_stop(main_loop, client->timeout); ev_timer_stop(main_loop, client->timeout);
FREE(client->timeout); FREE(client->timeout);
} }
ev_io_stop(main_loop, client->callback); ev_io_stop(main_loop, client->write_callback);
return; return;
} }
/* Otherwise, make sure that the io callback is enabled and create a new /* Otherwise, make sure that the io callback is enabled and create a new
* timer if needed. */ * timer if needed. */
ev_io_start(main_loop, client->callback); ev_io_start(main_loop, client->write_callback);
if (!client->timeout) { if (!client->timeout) {
struct ev_timer *timeout = scalloc(1, sizeof(struct ev_timer)); struct ev_timer *timeout = scalloc(1, sizeof(struct ev_timer));
@ -140,6 +100,54 @@ static void ipc_push_pending(ipc_client *client) {
client->buffer = srealloc(client->buffer, client->buffer_size); client->buffer = srealloc(client->buffer, client->buffer_size);
} }
/*
* Given a message and a message type, create the corresponding header, merge it
* with the message and append it to the given client's output buffer. Also,
* send the message if the client's buffer was empty.
*
*/
static void ipc_send_client_message(ipc_client *client, size_t size, const uint32_t message_type, const uint8_t *payload) {
const i3_ipc_header_t header = {
.magic = {'i', '3', '-', 'i', 'p', 'c'},
.size = size,
.type = message_type};
const size_t header_size = sizeof(i3_ipc_header_t);
const size_t message_size = header_size + size;
const bool push_now = (client->buffer_size == 0);
client->buffer = srealloc(client->buffer, client->buffer_size + message_size);
memcpy(client->buffer + client->buffer_size, ((void *)&header), header_size);
memcpy(client->buffer + client->buffer_size + header_size, payload, size);
client->buffer_size += message_size;
if (push_now) {
ipc_push_pending(client);
}
}
static void free_ipc_client(ipc_client *client) {
DLOG("Disconnecting client on fd %d\n", client->fd);
close(client->fd);
ev_io_stop(main_loop, client->read_callback);
FREE(client->read_callback);
ev_io_stop(main_loop, client->write_callback);
FREE(client->write_callback);
if (client->timeout) {
ev_timer_stop(main_loop, client->timeout);
FREE(client->timeout);
}
free(client->buffer);
for (int i = 0; i < client->num_events; i++) {
free(client->events[i]);
}
free(client->events);
TAILQ_REMOVE(&all_clients, client, clients);
free(client);
}
/* /*
* Sends the specified event to all IPC clients which are currently connected * Sends the specified event to all IPC clients which are currently connected
* and subscribed to this kind of event. * and subscribed to this kind of event.
@ -148,21 +156,11 @@ static void ipc_push_pending(ipc_client *client) {
void ipc_send_event(const char *event, uint32_t message_type, const char *payload) { void ipc_send_event(const char *event, uint32_t message_type, const char *payload) {
ipc_client *current; ipc_client *current;
TAILQ_FOREACH(current, &all_clients, clients) { TAILQ_FOREACH(current, &all_clients, clients) {
/* see if this client is interested in this event */
bool interested = false;
for (int i = 0; i < current->num_events; i++) { for (int i = 0; i < current->num_events; i++) {
if (strcasecmp(current->events[i], event) != 0) if (strcasecmp(current->events[i], event) == 0) {
continue; ipc_send_client_message(current, strlen(payload), message_type, (uint8_t *)payload);
interested = true; break;
break; }
}
if (!interested)
continue;
const bool push_now = (current->buffer_size == 0);
append_payload(current, message_type, payload);
if (push_now) {
ipc_push_pending(current);
} }
} }
} }
@ -234,8 +232,8 @@ IPC_HANDLER(run_command) {
ylength length; ylength length;
yajl_gen_get_buf(gen, &reply, &length); yajl_gen_get_buf(gen, &reply, &length);
ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_COMMAND, ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_COMMAND,
(const uint8_t *)reply); (const uint8_t *)reply);
yajl_gen_free(gen); yajl_gen_free(gen);
} }
@ -838,7 +836,7 @@ IPC_HANDLER(tree) {
ylength length; ylength length;
y(get_buf, &payload, &length); y(get_buf, &payload, &length);
ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_TREE, payload); ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_TREE, payload);
y(free); y(free);
} }
@ -902,7 +900,7 @@ IPC_HANDLER(get_workspaces) {
ylength length; ylength length;
y(get_buf, &payload, &length); y(get_buf, &payload, &length);
ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_WORKSPACES, payload); ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_WORKSPACES, payload);
y(free); y(free);
} }
@ -956,7 +954,7 @@ IPC_HANDLER(get_outputs) {
ylength length; ylength length;
y(get_buf, &payload, &length); y(get_buf, &payload, &length);
ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_OUTPUTS, payload); ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_OUTPUTS, payload);
y(free); y(free);
} }
@ -983,7 +981,7 @@ IPC_HANDLER(get_marks) {
ylength length; ylength length;
y(get_buf, &payload, &length); y(get_buf, &payload, &length);
ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_MARKS, payload); ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_MARKS, payload);
y(free); y(free);
} }
@ -1016,7 +1014,7 @@ IPC_HANDLER(get_version) {
ylength length; ylength length;
y(get_buf, &payload, &length); y(get_buf, &payload, &length);
ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_VERSION, payload); ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_VERSION, payload);
y(free); y(free);
} }
@ -1041,7 +1039,7 @@ IPC_HANDLER(get_bar_config) {
ylength length; ylength length;
y(get_buf, &payload, &length); y(get_buf, &payload, &length);
ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_BAR_CONFIG, payload); ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_BAR_CONFIG, payload);
y(free); y(free);
return; return;
} }
@ -1078,7 +1076,7 @@ IPC_HANDLER(get_bar_config) {
ylength length; ylength length;
y(get_buf, &payload, &length); y(get_buf, &payload, &length);
ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_BAR_CONFIG, payload); ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_BAR_CONFIG, payload);
y(free); y(free);
} }
@ -1100,7 +1098,7 @@ IPC_HANDLER(get_binding_modes) {
ylength length; ylength length;
y(get_buf, &payload, &length); y(get_buf, &payload, &length);
ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_BINDING_MODES, payload); ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_BINDING_MODES, payload);
y(free); y(free);
} }
@ -1139,21 +1137,6 @@ static int add_subscription(void *extra, const unsigned char *s,
IPC_HANDLER(subscribe) { IPC_HANDLER(subscribe) {
yajl_handle p; yajl_handle p;
yajl_status stat; yajl_status stat;
ipc_client *current, *client = NULL;
/* Search the ipc_client structure for this connection */
TAILQ_FOREACH(current, &all_clients, clients) {
if (current->fd != fd)
continue;
client = current;
break;
}
if (client == NULL) {
ELOG("Could not find ipc_client data structure for fd %d\n", fd);
return;
}
/* Setup the JSON parser */ /* Setup the JSON parser */
static yajl_callbacks callbacks = { static yajl_callbacks callbacks = {
@ -1170,13 +1153,13 @@ IPC_HANDLER(subscribe) {
yajl_free_error(p, err); yajl_free_error(p, err);
const char *reply = "{\"success\":false}"; const char *reply = "{\"success\":false}";
ipc_send_message(fd, strlen(reply), I3_IPC_REPLY_TYPE_SUBSCRIBE, (const uint8_t *)reply); ipc_send_client_message(client, strlen(reply), I3_IPC_REPLY_TYPE_SUBSCRIBE, (const uint8_t *)reply);
yajl_free(p); yajl_free(p);
return; return;
} }
yajl_free(p); yajl_free(p);
const char *reply = "{\"success\":true}"; const char *reply = "{\"success\":true}";
ipc_send_message(fd, strlen(reply), I3_IPC_REPLY_TYPE_SUBSCRIBE, (const uint8_t *)reply); ipc_send_client_message(client, strlen(reply), I3_IPC_REPLY_TYPE_SUBSCRIBE, (const uint8_t *)reply);
if (client->first_tick_sent) { if (client->first_tick_sent) {
return; return;
@ -1195,7 +1178,7 @@ IPC_HANDLER(subscribe) {
client->first_tick_sent = true; client->first_tick_sent = true;
const char *payload = "{\"first\":true,\"payload\":\"\"}"; const char *payload = "{\"first\":true,\"payload\":\"\"}";
ipc_send_message(client->fd, strlen(payload), I3_IPC_EVENT_TICK, (const uint8_t *)payload); ipc_send_client_message(client, strlen(payload), I3_IPC_EVENT_TICK, (const uint8_t *)payload);
} }
/* /*
@ -1215,7 +1198,7 @@ IPC_HANDLER(get_config) {
ylength length; ylength length;
y(get_buf, &payload, &length); y(get_buf, &payload, &length);
ipc_send_message(fd, length, I3_IPC_REPLY_TYPE_CONFIG, payload); ipc_send_client_message(client, length, I3_IPC_REPLY_TYPE_CONFIG, payload);
y(free); y(free);
} }
@ -1244,7 +1227,7 @@ IPC_HANDLER(send_tick) {
y(free); y(free);
const char *reply = "{\"success\":true}"; const char *reply = "{\"success\":true}";
ipc_send_message(fd, strlen(reply), I3_IPC_REPLY_TYPE_TICK, (const uint8_t *)reply); ipc_send_client_message(client, strlen(reply), I3_IPC_REPLY_TYPE_TICK, (const uint8_t *)reply);
DLOG("Sent tick event\n"); DLOG("Sent tick event\n");
} }
@ -1295,7 +1278,7 @@ IPC_HANDLER(sync) {
yajl_free_error(p, err); yajl_free_error(p, err);
const char *reply = "{\"success\":false}"; const char *reply = "{\"success\":false}";
ipc_send_message(fd, strlen(reply), I3_IPC_REPLY_TYPE_SYNC, (const uint8_t *)reply); ipc_send_client_message(client, strlen(reply), I3_IPC_REPLY_TYPE_SYNC, (const uint8_t *)reply);
yajl_free(p); yajl_free(p);
return; return;
} }
@ -1304,7 +1287,7 @@ IPC_HANDLER(sync) {
DLOG("received IPC sync request (rnd = %d, window = 0x%08x)\n", state.rnd, state.window); DLOG("received IPC sync request (rnd = %d, window = 0x%08x)\n", state.rnd, state.window);
sync_respond(state.window, state.rnd); sync_respond(state.window, state.rnd);
const char *reply = "{\"success\":true}"; const char *reply = "{\"success\":true}";
ipc_send_message(fd, strlen(reply), I3_IPC_REPLY_TYPE_SYNC, (const uint8_t *)reply); ipc_send_client_message(client, strlen(reply), I3_IPC_REPLY_TYPE_SYNC, (const uint8_t *)reply);
} }
/* The index of each callback function corresponds to the numeric /* The index of each callback function corresponds to the numeric
@ -1338,6 +1321,8 @@ static void ipc_receive_message(EV_P_ struct ev_io *w, int revents) {
uint32_t message_type; uint32_t message_type;
uint32_t message_length; uint32_t message_length;
uint8_t *message = NULL; uint8_t *message = NULL;
ipc_client *client = (ipc_client *)w->data;
assert(client->fd == w->fd);
int ret = ipc_recv_message(w->fd, &message_type, &message_length, &message); int ret = ipc_recv_message(w->fd, &message_type, &message_length, &message);
/* EOF or other error */ /* EOF or other error */
@ -1350,25 +1335,8 @@ static void ipc_receive_message(EV_P_ struct ev_io *w, int revents) {
/* If not, there was some kind of error. We dont bother and close the /* If not, there was some kind of error. We dont bother and close the
* connection. Delete the client from the list of clients. */ * connection. Delete the client from the list of clients. */
bool closed = false; free_ipc_client(client);
ipc_client *current;
TAILQ_FOREACH(current, &all_clients, clients) {
if (current->fd != w->fd)
continue;
free_ipc_client(current);
closed = true;
break;
}
if (!closed) {
close(w->fd);
}
ev_io_stop(EV_A_ w);
free(w);
FREE(message); FREE(message);
DLOG("IPC: client disconnected\n");
return; return;
} }
@ -1376,7 +1344,7 @@ static void ipc_receive_message(EV_P_ struct ev_io *w, int revents) {
DLOG("Unhandled message type: %d\n", message_type); DLOG("Unhandled message type: %d\n", message_type);
else { else {
handler_t h = handlers[message_type]; handler_t h = handlers[message_type];
h(w->fd, message, 0, message_length, message_type); h(client, message, 0, message_length, message_type);
} }
FREE(message); FREE(message);
@ -1448,36 +1416,33 @@ static void ipc_socket_writeable_cb(EV_P_ ev_io *w, int revents) {
void ipc_new_client(EV_P_ struct ev_io *w, int revents) { void ipc_new_client(EV_P_ struct ev_io *w, int revents) {
struct sockaddr_un peer; struct sockaddr_un peer;
socklen_t len = sizeof(struct sockaddr_un); socklen_t len = sizeof(struct sockaddr_un);
int client; int fd;
if ((client = accept(w->fd, (struct sockaddr *)&peer, &len)) < 0) { if ((fd = accept(w->fd, (struct sockaddr *)&peer, &len)) < 0) {
if (errno == EINTR) if (errno != EINTR) {
return;
else
perror("accept()"); perror("accept()");
}
return; return;
} }
/* Close this file descriptor on exec() */ /* Close this file descriptor on exec() */
(void)fcntl(client, F_SETFD, FD_CLOEXEC); (void)fcntl(fd, F_SETFD, FD_CLOEXEC);
set_nonblock(client); set_nonblock(fd);
struct ev_io *package = scalloc(1, sizeof(struct ev_io)); ipc_client *client = scalloc(1, sizeof(ipc_client));
ev_io_init(package, ipc_receive_message, client, EV_READ); client->fd = fd;
ev_io_start(EV_A_ package);
ipc_client *new = scalloc(1, sizeof(ipc_client)); client->read_callback = scalloc(1, sizeof(struct ev_io));
client->read_callback->data = client;
ev_io_init(client->read_callback, ipc_receive_message, fd, EV_READ);
ev_io_start(EV_A_ client->read_callback);
package = scalloc(1, sizeof(struct ev_io)); client->write_callback = scalloc(1, sizeof(struct ev_io));
package->data = new; client->write_callback->data = client;
ev_io_init(package, ipc_socket_writeable_cb, client, EV_WRITE); ev_io_init(client->write_callback, ipc_socket_writeable_cb, fd, EV_WRITE);
DLOG("IPC: new client connected on fd %d\n", w->fd); DLOG("IPC: new client connected on fd %d\n", w->fd);
TAILQ_INSERT_TAIL(&all_clients, client, clients);
new->fd = client;
new->callback = package;
TAILQ_INSERT_TAIL(&all_clients, new, clients);
} }
/* /*