Merge pull request #3263 from orestisf1993/misbehaving-ipc-queue-2999
Kill misbehaving subscribed clients instead of hanging
This commit is contained in:
commit
5a54a1ab04
8
docs/ipc
8
docs/ipc
|
@ -693,6 +693,14 @@ program does not want to cope which such kinds of race conditions (an
|
|||
event based library may not have a problem here), I suggest you create a
|
||||
separate connection to receive events.
|
||||
|
||||
If an event message needs to be sent and the socket is not writeable (write
|
||||
returns EAGAIN, happens when the socket doesn't have enough buffer space for
|
||||
writing new data) then i3 uses a queue system to store outgoing messages for
|
||||
each client. This is combined with a timer: if the message queue for a client is
|
||||
not empty and no data where successfully written in the past 10 seconds, the
|
||||
connection is killed. Practically, this means that your client should try to
|
||||
always read events from the socket to avoid having its connection closed.
|
||||
|
||||
=== Subscribing to events
|
||||
|
||||
By sending a message of type SUBSCRIBE with a JSON-encoded array as payload
|
||||
|
|
|
@ -62,6 +62,7 @@ CFGFUN(assign_output, const char *output);
|
|||
CFGFUN(assign, const char *workspace, bool is_number);
|
||||
CFGFUN(no_focus);
|
||||
CFGFUN(ipc_socket, const char *path);
|
||||
CFGFUN(ipc_kill_timeout, const long timeout_ms);
|
||||
CFGFUN(restart_state, const char *path);
|
||||
CFGFUN(popup_during_fullscreen, const char *value);
|
||||
CFGFUN(color, const char *colorclass, const char *border, const char *background, const char *text, const char *indicator, const char *child_border);
|
||||
|
|
|
@ -35,6 +35,11 @@ typedef struct ipc_client {
|
|||
* event has been sent by i3. */
|
||||
bool first_tick_sent;
|
||||
|
||||
struct ev_io *callback;
|
||||
struct ev_timer *timeout;
|
||||
uint8_t *buffer;
|
||||
size_t buffer_size;
|
||||
|
||||
TAILQ_ENTRY(ipc_client)
|
||||
clients;
|
||||
} ipc_client;
|
||||
|
@ -124,3 +129,9 @@ void ipc_send_barconfig_update_event(Barconfig *barconfig);
|
|||
* For the binding events, we send the serialized binding struct.
|
||||
*/
|
||||
void ipc_send_binding_event(const char *event_type, Binding *bind);
|
||||
|
||||
/**
|
||||
* Set the maximum duration that we allow for a connection with an unwriteable
|
||||
* socket.
|
||||
*/
|
||||
void ipc_set_kill_timeout(ev_tstamp new);
|
||||
|
|
|
@ -166,6 +166,14 @@ int sasprintf(char **strp, const char *fmt, ...);
|
|||
*/
|
||||
ssize_t writeall(int fd, const void *buf, size_t count);
|
||||
|
||||
/**
|
||||
* Like writeall, but instead of retrying upon EAGAIN (returned when a write
|
||||
* would block), the function stops and returns the total number of bytes
|
||||
* written so far.
|
||||
*
|
||||
*/
|
||||
ssize_t writeall_nonblock(int fd, const void *buf, size_t count);
|
||||
|
||||
/**
|
||||
* Safe-wrapper around writeall which exits if it returns -1 (meaning that
|
||||
* write failed)
|
||||
|
|
|
@ -68,10 +68,9 @@ int sasprintf(char **strp, const char *fmt, ...) {
|
|||
|
||||
ssize_t writeall(int fd, const void *buf, size_t count) {
|
||||
size_t written = 0;
|
||||
ssize_t n = 0;
|
||||
|
||||
while (written < count) {
|
||||
n = write(fd, buf + written, count - written);
|
||||
const ssize_t n = write(fd, buf + written, count - written);
|
||||
if (n == -1) {
|
||||
if (errno == EINTR || errno == EAGAIN)
|
||||
continue;
|
||||
|
@ -83,6 +82,25 @@ ssize_t writeall(int fd, const void *buf, size_t count) {
|
|||
return written;
|
||||
}
|
||||
|
||||
ssize_t writeall_nonblock(int fd, const void *buf, size_t count) {
|
||||
size_t written = 0;
|
||||
|
||||
while (written < count) {
|
||||
const ssize_t n = write(fd, buf + written, count - written);
|
||||
if (n == -1) {
|
||||
if (errno == EAGAIN) {
|
||||
return written;
|
||||
} else if (errno == EINTR) {
|
||||
continue;
|
||||
} else {
|
||||
return n;
|
||||
}
|
||||
}
|
||||
written += (size_t)n;
|
||||
}
|
||||
return written;
|
||||
}
|
||||
|
||||
ssize_t swrite(int fd, const void *buf, size_t count) {
|
||||
ssize_t n;
|
||||
|
||||
|
|
|
@ -48,6 +48,7 @@ state INITIAL:
|
|||
'show_marks' -> SHOW_MARKS
|
||||
'workspace' -> WORKSPACE
|
||||
'ipc_socket', 'ipc-socket' -> IPC_SOCKET
|
||||
'ipc_kill_timeout' -> IPC_KILL_TIMEOUT
|
||||
'restart_state' -> RESTART_STATE
|
||||
'popup_during_fullscreen' -> POPUP_DURING_FULLSCREEN
|
||||
exectype = 'exec_always', 'exec' -> EXEC
|
||||
|
@ -281,6 +282,11 @@ state IPC_SOCKET:
|
|||
path = string
|
||||
-> call cfg_ipc_socket($path)
|
||||
|
||||
# ipc_kill_timeout
|
||||
state IPC_KILL_TIMEOUT:
|
||||
timeout = number
|
||||
-> call cfg_ipc_kill_timeout(&timeout)
|
||||
|
||||
# restart_state <path> (for testcases)
|
||||
state RESTART_STATE:
|
||||
path = string
|
||||
|
|
|
@ -446,6 +446,10 @@ CFGFUN(no_focus) {
|
|||
TAILQ_INSERT_TAIL(&assignments, assignment, assignments);
|
||||
}
|
||||
|
||||
CFGFUN(ipc_kill_timeout, const long timeout_ms) {
|
||||
ipc_set_kill_timeout(timeout_ms / 1000.0);
|
||||
}
|
||||
|
||||
/*******************************************************************************
|
||||
* Bar configuration (i3bar)
|
||||
******************************************************************************/
|
||||
|
|
197
src/ipc.c
197
src/ipc.c
|
@ -38,6 +38,108 @@ static void set_nonblock(int sockfd) {
|
|||
err(-1, "Could not set O_NONBLOCK");
|
||||
}
|
||||
|
||||
/*
|
||||
* Given a message and a message type, create the corresponding header, merge it
|
||||
* with the message and append it to the given client's output buffer.
|
||||
*
|
||||
*/
|
||||
static void append_payload(ipc_client *client, uint32_t message_type, const char *payload) {
|
||||
const size_t size = strlen(payload);
|
||||
const i3_ipc_header_t header = {
|
||||
.magic = {'i', '3', '-', 'i', 'p', 'c'},
|
||||
.size = size,
|
||||
.type = message_type};
|
||||
const size_t header_size = sizeof(i3_ipc_header_t);
|
||||
const size_t message_size = header_size + size;
|
||||
|
||||
client->buffer = srealloc(client->buffer, client->buffer_size + message_size);
|
||||
memcpy(client->buffer + client->buffer_size, ((void *)&header), header_size);
|
||||
memcpy(client->buffer + client->buffer_size + header_size, payload, size);
|
||||
client->buffer_size += message_size;
|
||||
}
|
||||
|
||||
static void free_ipc_client(ipc_client *client) {
|
||||
close(client->fd);
|
||||
|
||||
ev_io_stop(main_loop, client->callback);
|
||||
FREE(client->callback);
|
||||
if (client->timeout) {
|
||||
ev_timer_stop(main_loop, client->timeout);
|
||||
FREE(client->timeout);
|
||||
}
|
||||
|
||||
free(client->buffer);
|
||||
|
||||
for (int i = 0; i < client->num_events; i++) {
|
||||
free(client->events[i]);
|
||||
}
|
||||
free(client->events);
|
||||
TAILQ_REMOVE(&all_clients, client, clients);
|
||||
free(client);
|
||||
}
|
||||
|
||||
static void ipc_client_timeout(EV_P_ ev_timer *w, int revents);
|
||||
static void ipc_socket_writeable_cb(EV_P_ struct ev_io *w, int revents);
|
||||
|
||||
static ev_tstamp kill_timeout = 10.0;
|
||||
|
||||
void ipc_set_kill_timeout(ev_tstamp new) {
|
||||
kill_timeout = new;
|
||||
}
|
||||
|
||||
/*
|
||||
* Try to write the contents of the pending buffer to the client's subscription
|
||||
* socket. Will set, reset or clear the timeout and io callbacks depending on
|
||||
* the result of the write operation.
|
||||
*
|
||||
*/
|
||||
static void ipc_push_pending(ipc_client *client) {
|
||||
const ssize_t result = writeall_nonblock(client->fd, client->buffer, client->buffer_size);
|
||||
if (result < 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if ((size_t)result == client->buffer_size) {
|
||||
/* Everything was written successfully: clear the timer and stop the io
|
||||
* callback. */
|
||||
FREE(client->buffer);
|
||||
client->buffer_size = 0;
|
||||
if (client->timeout) {
|
||||
ev_timer_stop(main_loop, client->timeout);
|
||||
FREE(client->timeout);
|
||||
}
|
||||
ev_io_stop(main_loop, client->callback);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Otherwise, make sure that the io callback is enabled and create a new
|
||||
* timer if needed. */
|
||||
ev_io_start(main_loop, client->callback);
|
||||
|
||||
if (!client->timeout) {
|
||||
struct ev_timer *timeout = scalloc(1, sizeof(struct ev_timer));
|
||||
ev_timer_init(timeout, ipc_client_timeout, kill_timeout, 0.);
|
||||
timeout->data = client;
|
||||
client->timeout = timeout;
|
||||
ev_set_priority(timeout, EV_MINPRI);
|
||||
ev_timer_start(main_loop, client->timeout);
|
||||
} else if (result > 0) {
|
||||
/* Keep the old timeout when nothing is written. Otherwise, we would
|
||||
* keep a dead connection by continuously renewing its timeouts. */
|
||||
ev_timer_stop(main_loop, client->timeout);
|
||||
ev_timer_set(client->timeout, kill_timeout, 0.0);
|
||||
ev_timer_start(main_loop, client->timeout);
|
||||
}
|
||||
if (result == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
/* Shift the buffer to the left and reduce the allocated space. */
|
||||
client->buffer_size -= (size_t)result;
|
||||
memmove(client->buffer, client->buffer + result, client->buffer_size);
|
||||
client->buffer = srealloc(client->buffer, client->buffer_size);
|
||||
}
|
||||
|
||||
/*
|
||||
* Sends the specified event to all IPC clients which are currently connected
|
||||
* and subscribed to this kind of event.
|
||||
|
@ -57,7 +159,11 @@ void ipc_send_event(const char *event, uint32_t message_type, const char *payloa
|
|||
if (!interested)
|
||||
continue;
|
||||
|
||||
ipc_send_message(current->fd, strlen(payload), message_type, (const uint8_t *)payload);
|
||||
const bool push_now = (current->buffer_size == 0);
|
||||
append_payload(current, message_type, payload);
|
||||
if (push_now) {
|
||||
ipc_push_pending(current);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -99,12 +205,7 @@ void ipc_shutdown(shutdown_reason_t reason) {
|
|||
while (!TAILQ_EMPTY(&all_clients)) {
|
||||
current = TAILQ_FIRST(&all_clients);
|
||||
shutdown(current->fd, SHUT_RDWR);
|
||||
close(current->fd);
|
||||
for (int i = 0; i < current->num_events; i++)
|
||||
free(current->events[i]);
|
||||
free(current->events);
|
||||
TAILQ_REMOVE(&all_clients, current, clients);
|
||||
free(current);
|
||||
free_ipc_client(current);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1247,25 +1348,21 @@ static void ipc_receive_message(EV_P_ struct ev_io *w, int revents) {
|
|||
return;
|
||||
}
|
||||
|
||||
/* If not, there was some kind of error. We don’t bother
|
||||
* and close the connection */
|
||||
close(w->fd);
|
||||
|
||||
/* Delete the client from the list of clients */
|
||||
/* If not, there was some kind of error. We don’t bother and close the
|
||||
* connection. Delete the client from the list of clients. */
|
||||
bool closed = false;
|
||||
ipc_client *current;
|
||||
TAILQ_FOREACH(current, &all_clients, clients) {
|
||||
if (current->fd != w->fd)
|
||||
continue;
|
||||
|
||||
for (int i = 0; i < current->num_events; i++)
|
||||
free(current->events[i]);
|
||||
free(current->events);
|
||||
/* We can call TAILQ_REMOVE because we break out of the
|
||||
* TAILQ_FOREACH afterwards */
|
||||
TAILQ_REMOVE(&all_clients, current, clients);
|
||||
free(current);
|
||||
free_ipc_client(current);
|
||||
closed = true;
|
||||
break;
|
||||
}
|
||||
if (!closed) {
|
||||
close(w->fd);
|
||||
}
|
||||
|
||||
ev_io_stop(EV_A_ w);
|
||||
free(w);
|
||||
|
@ -1285,6 +1382,60 @@ static void ipc_receive_message(EV_P_ struct ev_io *w, int revents) {
|
|||
FREE(message);
|
||||
}
|
||||
|
||||
static void ipc_client_timeout(EV_P_ ev_timer *w, int revents) {
|
||||
/* No need to be polite and check for writeability, the other callback would
|
||||
* have been called by now. */
|
||||
ipc_client *client = (ipc_client *)w->data;
|
||||
|
||||
char *cmdline = NULL;
|
||||
#if defined(__linux__) && defined(SO_PEERCRED)
|
||||
struct ucred peercred;
|
||||
socklen_t so_len = sizeof(peercred);
|
||||
if (getsockopt(client->fd, SOL_SOCKET, SO_PEERCRED, &peercred, &so_len) != 0) {
|
||||
goto end;
|
||||
}
|
||||
char *exepath;
|
||||
sasprintf(&exepath, "/proc/%d/cmdline", peercred.pid);
|
||||
|
||||
int fd = open(exepath, O_RDONLY);
|
||||
free(exepath);
|
||||
if (fd == -1) {
|
||||
goto end;
|
||||
}
|
||||
char buf[512] = {'\0'}; /* cut off cmdline for the error message. */
|
||||
const ssize_t n = read(fd, buf, sizeof(buf));
|
||||
close(fd);
|
||||
if (n < 0) {
|
||||
goto end;
|
||||
}
|
||||
for (char *walk = buf; walk < buf + n - 1; walk++) {
|
||||
if (*walk == '\0') {
|
||||
*walk = ' ';
|
||||
}
|
||||
}
|
||||
cmdline = buf;
|
||||
#endif
|
||||
|
||||
end:
|
||||
if (cmdline) {
|
||||
ELOG("client %p with pid %d and cmdline '%s' on fd %d timed out, killing\n", client, peercred.pid, cmdline, client->fd);
|
||||
} else {
|
||||
ELOG("client %p on fd %d timed out, killing\n", client, client->fd);
|
||||
}
|
||||
|
||||
free_ipc_client(client);
|
||||
}
|
||||
|
||||
static void ipc_socket_writeable_cb(EV_P_ ev_io *w, int revents) {
|
||||
DLOG("fd %d writeable\n", w->fd);
|
||||
ipc_client *client = (ipc_client *)w->data;
|
||||
|
||||
/* If this callback is called then there should be a corresponding active
|
||||
* timer. */
|
||||
assert(client->timeout != NULL);
|
||||
ipc_push_pending(client);
|
||||
}
|
||||
|
||||
/*
|
||||
* Handler for activity on the listening socket, meaning that a new client
|
||||
* has just connected and we should accept() him. Sets up the event handler
|
||||
|
@ -1313,10 +1464,16 @@ void ipc_new_client(EV_P_ struct ev_io *w, int revents) {
|
|||
ev_io_init(package, ipc_receive_message, client, EV_READ);
|
||||
ev_io_start(EV_A_ package);
|
||||
|
||||
ipc_client *new = scalloc(1, sizeof(ipc_client));
|
||||
|
||||
package = scalloc(1, sizeof(struct ev_io));
|
||||
package->data = new;
|
||||
ev_io_init(package, ipc_socket_writeable_cb, client, EV_WRITE);
|
||||
|
||||
DLOG("IPC: new client connected on fd %d\n", w->fd);
|
||||
|
||||
ipc_client *new = scalloc(1, sizeof(ipc_client));
|
||||
new->fd = client;
|
||||
new->callback = package;
|
||||
|
||||
TAILQ_INSERT_TAIL(&all_clients, new, clients);
|
||||
}
|
||||
|
|
|
@ -501,6 +501,7 @@ my $expected_all_tokens = "ERROR: CONFIG: Expected one of these tokens: <end>, '
|
|||
workspace
|
||||
ipc_socket
|
||||
ipc-socket
|
||||
ipc_kill_timeout
|
||||
restart_state
|
||||
popup_during_fullscreen
|
||||
exec_always
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
#!perl
|
||||
# vim:ts=4:sw=4:expandtab
|
||||
#
|
||||
# Please read the following documents before working on tests:
|
||||
# • https://build.i3wm.org/docs/testsuite.html
|
||||
# (or docs/testsuite)
|
||||
#
|
||||
# • https://build.i3wm.org/docs/lib-i3test.html
|
||||
# (alternatively: perldoc ./testcases/lib/i3test.pm)
|
||||
#
|
||||
# • https://build.i3wm.org/docs/ipc.html
|
||||
# (or docs/ipc)
|
||||
#
|
||||
# • http://onyxneon.com/books/modern_perl/modern_perl_a4.pdf
|
||||
# (unless you are already familiar with Perl)
|
||||
#
|
||||
# Test that i3 will not hang if a connected client stops reading from its
|
||||
# subscription socket and that the client is killed after a delay.
|
||||
# Ticket: #2999
|
||||
# Bug still in: 4.15-180-g715cea61
|
||||
use i3test i3_config => <<EOT;
|
||||
# i3 config file (v4)
|
||||
font -misc-fixed-medium-r-normal--13-120-75-75-C-70-iso10646-1
|
||||
# Set the timeout to 500ms to reduce the duration of this test.
|
||||
ipc_kill_timeout 500
|
||||
EOT
|
||||
|
||||
# Manually connect to i3 so that we can choose to not read events
|
||||
use IO::Socket::UNIX;
|
||||
my $sock = IO::Socket::UNIX->new(Peer => get_socket_path());
|
||||
my $magic = "i3-ipc";
|
||||
my $payload = '["workspace"]';
|
||||
my $message = $magic . pack("LL", length($payload), 2) . $payload;
|
||||
print $sock $message;
|
||||
|
||||
# Constantly switch between 2 workspaces to generate events.
|
||||
fresh_workspace;
|
||||
open_window;
|
||||
fresh_workspace;
|
||||
open_window;
|
||||
|
||||
eval {
|
||||
local $SIG{ALRM} = sub { die "Timeout\n" };
|
||||
# 500 is an arbitrarily large number to make sure that the socket becomes
|
||||
# non-writeable.
|
||||
for (my $i = 0; $i < 500; $i++) {
|
||||
alarm 1;
|
||||
cmd 'workspace back_and_forth';
|
||||
alarm 0;
|
||||
}
|
||||
};
|
||||
ok(!$@, 'i3 didn\'t hang');
|
||||
|
||||
# Wait for connection timeout
|
||||
sleep 1;
|
||||
|
||||
use IO::Select;
|
||||
my $s = IO::Select->new($sock);
|
||||
my $reached_eof = 0;
|
||||
while ($s->can_read(0.05)) {
|
||||
if (read($sock, my $buffer, 100) == 0) {
|
||||
$reached_eof = 1;
|
||||
last;
|
||||
}
|
||||
}
|
||||
ok($reached_eof, 'socket connection closed');
|
||||
|
||||
close $sock;
|
||||
done_testing;
|
Loading…
Reference in New Issue