mirror of
https://github.com/ruby/ruby.git
synced 2025-08-15 13:39:04 +02:00
Make waiting_fd
behaviour per-IO. (#13127)
- `rb_thread_fd_close` is deprecated and now a no-op. - IO operations (including close) no longer take a vm-wide lock.
This commit is contained in:
parent
a6435befa7
commit
425fa0aeb5
Notes:
git
2025-05-13 10:02:17 +00:00
Merged-By: ioquatix <samuel@codeotaku.com>
14 changed files with 214 additions and 377 deletions
256
thread.c
256
thread.c
|
@ -99,6 +99,8 @@
|
|||
#include "vm_debug.h"
|
||||
#include "vm_sync.h"
|
||||
|
||||
#include "ccan/list/list.h"
|
||||
|
||||
#ifndef USE_NATIVE_THREAD_PRIORITY
|
||||
#define USE_NATIVE_THREAD_PRIORITY 0
|
||||
#define RUBY_THREAD_PRIORITY_MAX 3
|
||||
|
@ -149,13 +151,6 @@ MAYBE_UNUSED(static int consume_communication_pipe(int fd));
|
|||
static volatile int system_working = 1;
|
||||
static rb_internal_thread_specific_key_t specific_key_count;
|
||||
|
||||
struct waiting_fd {
|
||||
struct ccan_list_node wfd_node; /* <=> vm.waiting_fds */
|
||||
rb_thread_t *th;
|
||||
int fd;
|
||||
struct rb_io_close_wait_list *busy;
|
||||
};
|
||||
|
||||
/********************************************************************************/
|
||||
|
||||
#define THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION
|
||||
|
@ -1694,44 +1689,45 @@ waitfd_to_waiting_flag(int wfd_event)
|
|||
return wfd_event << 1;
|
||||
}
|
||||
|
||||
static void
|
||||
thread_io_setup_wfd(rb_thread_t *th, int fd, struct waiting_fd *wfd)
|
||||
{
|
||||
wfd->fd = fd;
|
||||
wfd->th = th;
|
||||
wfd->busy = NULL;
|
||||
struct io_blocking_operation_arguments {
|
||||
struct rb_io *io;
|
||||
struct rb_io_blocking_operation *blocking_operation;
|
||||
};
|
||||
|
||||
RB_VM_LOCK_ENTER();
|
||||
{
|
||||
ccan_list_add(&th->vm->waiting_fds, &wfd->wfd_node);
|
||||
static VALUE
|
||||
io_blocking_operation_release(VALUE _arguments) {
|
||||
struct io_blocking_operation_arguments *arguments = (void*)_arguments;
|
||||
struct rb_io_blocking_operation *blocking_operation = arguments->blocking_operation;
|
||||
|
||||
ccan_list_del(&blocking_operation->list);
|
||||
|
||||
rb_io_t *io = arguments->io;
|
||||
rb_thread_t *thread = io->closing_ec->thread_ptr;
|
||||
rb_fiber_t *fiber = io->closing_ec->fiber_ptr;
|
||||
|
||||
if (thread->scheduler != Qnil) {
|
||||
rb_fiber_scheduler_unblock(thread->scheduler, io->self, rb_fiberptr_self(fiber));
|
||||
} else {
|
||||
rb_thread_wakeup(thread->self);
|
||||
}
|
||||
RB_VM_LOCK_LEAVE();
|
||||
|
||||
return Qnil;
|
||||
}
|
||||
|
||||
static void
|
||||
thread_io_wake_pending_closer(struct waiting_fd *wfd)
|
||||
rb_io_blocking_operation_release(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation)
|
||||
{
|
||||
bool has_waiter = wfd->busy && RB_TEST(wfd->busy->wakeup_mutex);
|
||||
if (has_waiter) {
|
||||
rb_mutex_lock(wfd->busy->wakeup_mutex);
|
||||
}
|
||||
VALUE wakeup_mutex = io->wakeup_mutex;
|
||||
|
||||
/* Needs to be protected with RB_VM_LOCK because we don't know if
|
||||
wfd is on the global list of pending FD ops or if it's on a
|
||||
struct rb_io_close_wait_list close-waiter. */
|
||||
RB_VM_LOCK_ENTER();
|
||||
ccan_list_del(&wfd->wfd_node);
|
||||
RB_VM_LOCK_LEAVE();
|
||||
if (RB_TEST(wakeup_mutex)) {
|
||||
struct io_blocking_operation_arguments arguments = {
|
||||
.io = io,
|
||||
.blocking_operation = blocking_operation
|
||||
};
|
||||
|
||||
if (has_waiter) {
|
||||
rb_thread_t *th = rb_thread_ptr(wfd->busy->closing_thread);
|
||||
if (th->scheduler != Qnil) {
|
||||
rb_fiber_scheduler_unblock(th->scheduler, wfd->busy->closing_thread, wfd->busy->closing_fiber);
|
||||
}
|
||||
else {
|
||||
rb_thread_wakeup(wfd->busy->closing_thread);
|
||||
}
|
||||
rb_mutex_unlock(wfd->busy->wakeup_mutex);
|
||||
rb_mutex_synchronize(wakeup_mutex, io_blocking_operation_release, (VALUE)&arguments);
|
||||
} else {
|
||||
ccan_list_del(&blocking_operation->list);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1802,12 +1798,11 @@ rb_thread_mn_schedulable(VALUE thval)
|
|||
VALUE
|
||||
rb_thread_io_blocking_call(struct rb_io* io, rb_blocking_function_t *func, void *data1, int events)
|
||||
{
|
||||
rb_execution_context_t *volatile ec = GET_EC();
|
||||
rb_thread_t *volatile th = rb_ec_thread_ptr(ec);
|
||||
rb_execution_context_t * ec = GET_EC();
|
||||
rb_thread_t *th = rb_ec_thread_ptr(ec);
|
||||
|
||||
RUBY_DEBUG_LOG("th:%u fd:%d ev:%d", rb_th_serial(th), io->fd, events);
|
||||
|
||||
struct waiting_fd waiting_fd;
|
||||
volatile VALUE val = Qundef; /* shouldn't be used */
|
||||
volatile int saved_errno = 0;
|
||||
enum ruby_tag_type state;
|
||||
|
@ -1822,7 +1817,11 @@ rb_thread_io_blocking_call(struct rb_io* io, rb_blocking_function_t *func, void
|
|||
// `func` or not (as opposed to some previously set value).
|
||||
errno = 0;
|
||||
|
||||
thread_io_setup_wfd(th, fd, &waiting_fd);
|
||||
struct rb_io_blocking_operation blocking_operation = {
|
||||
.ec = ec,
|
||||
};
|
||||
ccan_list_add(&io->blocking_operations, &blocking_operation.list);
|
||||
|
||||
{
|
||||
EC_PUSH_TAG(ec);
|
||||
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
|
||||
|
@ -1847,15 +1846,13 @@ rb_thread_io_blocking_call(struct rb_io* io, rb_blocking_function_t *func, void
|
|||
th = rb_ec_thread_ptr(ec);
|
||||
th->mn_schedulable = prev_mn_schedulable;
|
||||
}
|
||||
/*
|
||||
* must be deleted before jump
|
||||
* this will delete either from waiting_fds or on-stack struct rb_io_close_wait_list
|
||||
*/
|
||||
thread_io_wake_pending_closer(&waiting_fd);
|
||||
|
||||
rb_io_blocking_operation_release(io, &blocking_operation);
|
||||
|
||||
if (state) {
|
||||
EC_JUMP_TAG(ec, state);
|
||||
}
|
||||
|
||||
/* TODO: check func() */
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(ec);
|
||||
|
||||
|
@ -2639,76 +2636,81 @@ rb_ec_reset_raised(rb_execution_context_t *ec)
|
|||
return 1;
|
||||
}
|
||||
|
||||
int
|
||||
rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy)
|
||||
static size_t
|
||||
thread_io_close_notify_all(struct rb_io *io)
|
||||
{
|
||||
rb_vm_t *vm = GET_THREAD()->vm;
|
||||
struct waiting_fd *wfd = 0, *next;
|
||||
ccan_list_head_init(&busy->pending_fd_users);
|
||||
int has_any;
|
||||
VALUE wakeup_mutex;
|
||||
RUBY_ASSERT_CRITICAL_SECTION_ENTER();
|
||||
|
||||
RB_VM_LOCK_ENTER();
|
||||
{
|
||||
ccan_list_for_each_safe(&vm->waiting_fds, wfd, next, wfd_node) {
|
||||
if (wfd->fd == fd) {
|
||||
rb_thread_t *th = wfd->th;
|
||||
VALUE err;
|
||||
size_t count = 0;
|
||||
rb_vm_t *vm = io->closing_ec->thread_ptr->vm;
|
||||
VALUE error = vm->special_exceptions[ruby_error_stream_closed];
|
||||
|
||||
ccan_list_del(&wfd->wfd_node);
|
||||
ccan_list_add(&busy->pending_fd_users, &wfd->wfd_node);
|
||||
struct rb_io_blocking_operation *blocking_operation;
|
||||
ccan_list_for_each(&io->blocking_operations, blocking_operation, list) {
|
||||
rb_execution_context_t *ec = blocking_operation->ec;
|
||||
|
||||
wfd->busy = busy;
|
||||
err = th->vm->special_exceptions[ruby_error_stream_closed];
|
||||
rb_threadptr_pending_interrupt_enque(th, err);
|
||||
rb_threadptr_interrupt(th);
|
||||
}
|
||||
}
|
||||
rb_thread_t *thread = ec->thread_ptr;
|
||||
rb_threadptr_pending_interrupt_enque(thread, error);
|
||||
|
||||
// This operation is slow:
|
||||
rb_threadptr_interrupt(thread);
|
||||
|
||||
count += 1;
|
||||
}
|
||||
|
||||
has_any = !ccan_list_empty(&busy->pending_fd_users);
|
||||
busy->closing_thread = rb_thread_current();
|
||||
busy->closing_fiber = rb_fiber_current();
|
||||
wakeup_mutex = Qnil;
|
||||
if (has_any) {
|
||||
wakeup_mutex = rb_mutex_new();
|
||||
RBASIC_CLEAR_CLASS(wakeup_mutex); /* hide from ObjectSpace */
|
||||
RUBY_ASSERT_CRITICAL_SECTION_LEAVE();
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
size_t
|
||||
rb_thread_io_close_interrupt(struct rb_io *io)
|
||||
{
|
||||
// We guard this operation based on `io->closing_ec` -> only one thread will ever enter this function.
|
||||
if (io->closing_ec) {
|
||||
return 0;
|
||||
}
|
||||
busy->wakeup_mutex = wakeup_mutex;
|
||||
|
||||
RB_VM_LOCK_LEAVE();
|
||||
// If there are no blocking operations, we are done:
|
||||
if (ccan_list_empty(&io->blocking_operations)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* If the caller didn't pass *busy as a pointer to something on the stack,
|
||||
we need to guard this mutex object on _our_ C stack for the duration
|
||||
of this function. */
|
||||
RB_GC_GUARD(wakeup_mutex);
|
||||
return has_any;
|
||||
// Otherwise, we are now closing the IO:
|
||||
rb_execution_context_t *ec = GET_EC();
|
||||
io->closing_ec = ec;
|
||||
|
||||
// This is used to ensure the correct execution context is woken up after the blocking operation is interrupted:
|
||||
io->wakeup_mutex = rb_mutex_new();
|
||||
|
||||
return thread_io_close_notify_all(io);
|
||||
}
|
||||
|
||||
void
|
||||
rb_notify_fd_close_wait(struct rb_io_close_wait_list *busy)
|
||||
rb_thread_io_close_wait(struct rb_io* io)
|
||||
{
|
||||
if (!RB_TEST(busy->wakeup_mutex)) {
|
||||
/* There was nobody else using this file when we closed it, so we
|
||||
never bothered to allocate a mutex*/
|
||||
VALUE wakeup_mutex = io->wakeup_mutex;
|
||||
|
||||
if (!RB_TEST(wakeup_mutex)) {
|
||||
// There was nobody else using this file when we closed it, so we never bothered to allocate a mutex:
|
||||
return;
|
||||
}
|
||||
|
||||
rb_mutex_lock(busy->wakeup_mutex);
|
||||
while (!ccan_list_empty(&busy->pending_fd_users)) {
|
||||
rb_mutex_sleep(busy->wakeup_mutex, Qnil);
|
||||
rb_mutex_lock(wakeup_mutex);
|
||||
while (!ccan_list_empty(&io->blocking_operations)) {
|
||||
rb_mutex_sleep(wakeup_mutex, Qnil);
|
||||
}
|
||||
rb_mutex_unlock(busy->wakeup_mutex);
|
||||
rb_mutex_unlock(wakeup_mutex);
|
||||
|
||||
// We are done closing:
|
||||
io->wakeup_mutex = Qnil;
|
||||
io->closing_ec = NULL;
|
||||
}
|
||||
|
||||
void
|
||||
rb_thread_fd_close(int fd)
|
||||
{
|
||||
struct rb_io_close_wait_list busy;
|
||||
|
||||
if (rb_notify_fd_close(fd, &busy)) {
|
||||
rb_notify_fd_close_wait(&busy);
|
||||
}
|
||||
rb_warn("rb_thread_fd_close is deprecated (and is now a no-op).");
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -4412,14 +4414,17 @@ thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout)
|
|||
}};
|
||||
volatile int result = 0;
|
||||
nfds_t nfds;
|
||||
struct waiting_fd wfd;
|
||||
struct rb_io_blocking_operation blocking_operation;
|
||||
enum ruby_tag_type state;
|
||||
volatile int lerrno;
|
||||
|
||||
rb_execution_context_t *ec = GET_EC();
|
||||
rb_thread_t *th = rb_ec_thread_ptr(ec);
|
||||
|
||||
thread_io_setup_wfd(th, fd, &wfd);
|
||||
if (io) {
|
||||
blocking_operation.ec = ec;
|
||||
ccan_list_add(&io->blocking_operations, &blocking_operation.list);
|
||||
}
|
||||
|
||||
if (timeout == NULL && thread_io_wait_events(th, fd, events, NULL)) {
|
||||
// fd is readable
|
||||
|
@ -4428,25 +4433,27 @@ thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout)
|
|||
errno = 0;
|
||||
}
|
||||
else {
|
||||
EC_PUSH_TAG(wfd.th->ec);
|
||||
EC_PUSH_TAG(ec);
|
||||
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
|
||||
rb_hrtime_t *to, rel, end = 0;
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(ec);
|
||||
timeout_prepare(&to, &rel, &end, timeout);
|
||||
do {
|
||||
nfds = numberof(fds);
|
||||
result = wait_for_single_fd_blocking_region(wfd.th, fds, nfds, to, &lerrno);
|
||||
result = wait_for_single_fd_blocking_region(th, fds, nfds, to, &lerrno);
|
||||
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
|
||||
RUBY_VM_CHECK_INTS_BLOCKING(ec);
|
||||
} while (wait_retryable(&result, lerrno, to, end));
|
||||
}
|
||||
EC_POP_TAG();
|
||||
}
|
||||
|
||||
thread_io_wake_pending_closer(&wfd);
|
||||
if (io) {
|
||||
rb_io_blocking_operation_release(io, &blocking_operation);
|
||||
}
|
||||
|
||||
if (state) {
|
||||
EC_JUMP_TAG(wfd.th->ec, state);
|
||||
EC_JUMP_TAG(ec, state);
|
||||
}
|
||||
|
||||
if (result < 0) {
|
||||
|
@ -4479,6 +4486,9 @@ thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout)
|
|||
}
|
||||
#else /* ! USE_POLL - implement rb_io_poll_fd() using select() */
|
||||
struct select_args {
|
||||
struct rb_io *io;
|
||||
struct rb_io_blocking_operation *blocking_operation;
|
||||
|
||||
union {
|
||||
int fd;
|
||||
int error;
|
||||
|
@ -4486,7 +4496,6 @@ struct select_args {
|
|||
rb_fdset_t *read;
|
||||
rb_fdset_t *write;
|
||||
rb_fdset_t *except;
|
||||
struct waiting_fd wfd;
|
||||
struct timeval *tv;
|
||||
};
|
||||
|
||||
|
@ -4517,7 +4526,10 @@ select_single_cleanup(VALUE ptr)
|
|||
{
|
||||
struct select_args *args = (struct select_args *)ptr;
|
||||
|
||||
thread_io_wake_pending_closer(&args->wfd);
|
||||
if (args->blocking_operation) {
|
||||
rb_io_blocking_operation_release(args->io, args->blocking_operation);
|
||||
}
|
||||
|
||||
if (args->read) rb_fd_term(args->read);
|
||||
if (args->write) rb_fd_term(args->write);
|
||||
if (args->except) rb_fd_term(args->except);
|
||||
|
@ -4542,22 +4554,31 @@ thread_io_wait(struct rb_io *io, int fd, int events, struct timeval *timeout)
|
|||
{
|
||||
rb_fdset_t rfds, wfds, efds;
|
||||
struct select_args args;
|
||||
int r;
|
||||
VALUE ptr = (VALUE)&args;
|
||||
rb_thread_t *th = GET_THREAD();
|
||||
|
||||
struct rb_io_blocking_operation blocking_operation;
|
||||
if (io) {
|
||||
args.io = io;
|
||||
blocking_operation.ec = GET_EC();
|
||||
ccan_list_add(&io->blocking_operations, &blocking_operation.list);
|
||||
args.blocking_operation = &blocking_operation;
|
||||
} else {
|
||||
args.io = NULL;
|
||||
blocking_operation.ec = NULL;
|
||||
args.blocking_operation = NULL;
|
||||
}
|
||||
|
||||
args.as.fd = fd;
|
||||
args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
|
||||
args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
|
||||
args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL;
|
||||
args.tv = timeout;
|
||||
thread_io_setup_wfd(th, fd, &args.wfd);
|
||||
|
||||
r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr);
|
||||
if (r == -1)
|
||||
int result = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr);
|
||||
if (result == -1)
|
||||
errno = args.as.error;
|
||||
|
||||
return r;
|
||||
return result;
|
||||
}
|
||||
#endif /* ! USE_POLL */
|
||||
|
||||
|
@ -5651,21 +5672,6 @@ rb_check_deadlock(rb_ractor_t *r)
|
|||
}
|
||||
}
|
||||
|
||||
// Used for VM memsize reporting. Returns the size of a list of waiting_fd
|
||||
// structs. Defined here because the struct definition lives here as well.
|
||||
size_t
|
||||
rb_vm_memsize_waiting_fds(struct ccan_list_head *waiting_fds)
|
||||
{
|
||||
struct waiting_fd *waitfd = 0;
|
||||
size_t size = 0;
|
||||
|
||||
ccan_list_for_each(waiting_fds, waitfd, wfd_node) {
|
||||
size += sizeof(struct waiting_fd);
|
||||
}
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
static void
|
||||
update_line_coverage(VALUE data, const rb_trace_arg_t *trace_arg)
|
||||
{
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue