mirror of
https://github.com/ruby/ruby.git
synced 2025-09-15 08:33:58 +02:00
thread_pthread.c: eliminate timer thread by restructuring GVL
This reverts commit 194a6a2c68
(r64203).
Race conditions which caused the original reversion will be fixed
in the subsequent commit.
[ruby-core:88360] [Misc #14937]
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@64352 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
parent
4d2e0fffb0
commit
48b6bd74e2
11 changed files with 738 additions and 576 deletions
696
thread_pthread.c
696
thread_pthread.c
|
@ -45,27 +45,21 @@ void rb_native_cond_broadcast(rb_nativethread_cond_t *cond);
|
|||
void rb_native_cond_wait(rb_nativethread_cond_t *cond, rb_nativethread_lock_t *mutex);
|
||||
void rb_native_cond_initialize(rb_nativethread_cond_t *cond);
|
||||
void rb_native_cond_destroy(rb_nativethread_cond_t *cond);
|
||||
static void rb_thread_wakeup_timer_thread_low(void);
|
||||
static void clear_thread_cache_altstack(void);
|
||||
static void ubf_wakeup_all_threads(void);
|
||||
static int ubf_threads_empty(void);
|
||||
static int native_cond_timedwait(rb_nativethread_cond_t *, pthread_mutex_t *,
|
||||
const struct timespec *);
|
||||
static const struct timespec *sigwait_timeout(rb_thread_t *, int sigwait_fd,
|
||||
const struct timespec *,
|
||||
int *drained_p);
|
||||
|
||||
#define TIMER_THREAD_MASK (1)
|
||||
#define TIMER_THREAD_SLEEPY (2|TIMER_THREAD_MASK)
|
||||
#define TIMER_THREAD_BUSY (4|TIMER_THREAD_MASK)
|
||||
#define TIMER_THREAD_CREATED_P() (timer_thread_pipe.owner_process == getpid())
|
||||
|
||||
#if defined(HAVE_POLL) && defined(HAVE_FCNTL) && defined(F_GETFL) && \
|
||||
defined(F_SETFL) && defined(O_NONBLOCK) && \
|
||||
defined(F_GETFD) && defined(F_SETFD) && defined(FD_CLOEXEC)
|
||||
/* The timer thread sleeps while only one Ruby thread is running. */
|
||||
# define TIMER_IMPL TIMER_THREAD_SLEEPY
|
||||
#else
|
||||
# define TIMER_IMPL TIMER_THREAD_BUSY
|
||||
#endif
|
||||
|
||||
static struct {
|
||||
pthread_t id;
|
||||
int created;
|
||||
} timer_thread;
|
||||
#define TIMER_THREAD_CREATED_P() (timer_thread.created != 0)
|
||||
/* for testing, and in case we come across a platform w/o pipes: */
|
||||
#define BUSY_WAIT_SIGNALS (0)
|
||||
#define THREAD_INVALID ((const rb_thread_t *)-1)
|
||||
static const rb_thread_t *sigwait_th;
|
||||
|
||||
#ifdef HAVE_SCHED_YIELD
|
||||
#define native_thread_yield() (void)sched_yield()
|
||||
|
@ -82,49 +76,96 @@ static pthread_condattr_t *condattr_monotonic = &condattr_mono;
|
|||
static const void *const condattr_monotonic = NULL;
|
||||
#endif
|
||||
|
||||
/* 100ms. 10ms is too small for user level thread scheduling
|
||||
* on recent Linux (tested on 2.6.35)
|
||||
*/
|
||||
#define TIME_QUANTUM_USEC (100 * 1000)
|
||||
|
||||
static struct timespec native_cond_timeout(rb_nativethread_cond_t *,
|
||||
struct timespec rel);
|
||||
|
||||
static void
|
||||
gvl_acquire_common(rb_vm_t *vm)
|
||||
gvl_acquire_common(rb_vm_t *vm, rb_thread_t *th)
|
||||
{
|
||||
if (vm->gvl.acquired) {
|
||||
native_thread_data_t *nd = &th->native_thread_data;
|
||||
|
||||
if (!vm->gvl.waiting++) {
|
||||
/*
|
||||
* Wake up timer thread iff timer thread is slept.
|
||||
* When timer thread is polling mode, we don't want to
|
||||
* make confusing timer thread interval time.
|
||||
*/
|
||||
rb_thread_wakeup_timer_thread_low();
|
||||
}
|
||||
VM_ASSERT(th->unblock.func == 0 && "we reuse ubf_list for GVL waitq");
|
||||
|
||||
while (vm->gvl.acquired) {
|
||||
rb_native_cond_wait(&vm->gvl.cond, &vm->gvl.lock);
|
||||
}
|
||||
list_add_tail(&vm->gvl.waitq, &nd->ubf_list);
|
||||
do {
|
||||
if (!vm->gvl.timer) {
|
||||
static struct timespec ts;
|
||||
static int err = ETIMEDOUT;
|
||||
|
||||
--vm->gvl.waiting;
|
||||
/*
|
||||
* become designated timer thread to kick vm->gvl.acquired
|
||||
* periodically. Continue on old timeout if it expired:
|
||||
*/
|
||||
if (err == ETIMEDOUT) {
|
||||
ts.tv_sec = 0;
|
||||
ts.tv_nsec = TIME_QUANTUM_USEC * 1000;
|
||||
ts = native_cond_timeout(&nd->cond.gvlq, ts);
|
||||
}
|
||||
vm->gvl.timer = th;
|
||||
err = native_cond_timedwait(&nd->cond.gvlq, &vm->gvl.lock, &ts);
|
||||
vm->gvl.timer = 0;
|
||||
ubf_wakeup_all_threads();
|
||||
|
||||
if (vm->gvl.need_yield) {
|
||||
vm->gvl.need_yield = 0;
|
||||
/*
|
||||
* Timeslice. We can't touch thread_destruct_lock here,
|
||||
* as the process may fork while this thread is contending
|
||||
* for GVL:
|
||||
*/
|
||||
if (vm->gvl.acquired) timer_thread_function();
|
||||
}
|
||||
else {
|
||||
rb_native_cond_wait(&nd->cond.gvlq, &vm->gvl.lock);
|
||||
}
|
||||
} while (vm->gvl.acquired);
|
||||
|
||||
list_del_init(&nd->ubf_list);
|
||||
|
||||
if (vm->gvl.need_yield) {
|
||||
vm->gvl.need_yield = 0;
|
||||
rb_native_cond_signal(&vm->gvl.switch_cond);
|
||||
}
|
||||
}
|
||||
}
|
||||
vm->gvl.acquired = th;
|
||||
/*
|
||||
* Designate the next gvl.timer thread, favor the last thread in
|
||||
* the waitq since it will be in waitq longest
|
||||
*/
|
||||
if (!vm->gvl.timer) {
|
||||
native_thread_data_t *last;
|
||||
|
||||
vm->gvl.acquired = 1;
|
||||
last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list);
|
||||
if (last) {
|
||||
rb_native_cond_signal(&last->cond.gvlq);
|
||||
}
|
||||
else if (!ubf_threads_empty()) {
|
||||
rb_thread_wakeup_timer_thread(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
gvl_acquire(rb_vm_t *vm, rb_thread_t *th)
|
||||
{
|
||||
rb_native_mutex_lock(&vm->gvl.lock);
|
||||
gvl_acquire_common(vm);
|
||||
gvl_acquire_common(vm, th);
|
||||
rb_native_mutex_unlock(&vm->gvl.lock);
|
||||
}
|
||||
|
||||
static void
|
||||
static native_thread_data_t *
|
||||
gvl_release_common(rb_vm_t *vm)
|
||||
{
|
||||
native_thread_data_t *next;
|
||||
vm->gvl.acquired = 0;
|
||||
if (vm->gvl.waiting > 0)
|
||||
rb_native_cond_signal(&vm->gvl.cond);
|
||||
next = list_top(&vm->gvl.waitq, native_thread_data_t, ubf_list);
|
||||
if (next) rb_native_cond_signal(&next->cond.gvlq);
|
||||
|
||||
return next;
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -138,34 +179,38 @@ gvl_release(rb_vm_t *vm)
|
|||
static void
|
||||
gvl_yield(rb_vm_t *vm, rb_thread_t *th)
|
||||
{
|
||||
rb_native_mutex_lock(&vm->gvl.lock);
|
||||
native_thread_data_t *next;
|
||||
|
||||
gvl_release_common(vm);
|
||||
rb_native_mutex_lock(&vm->gvl.lock);
|
||||
next = gvl_release_common(vm);
|
||||
|
||||
/* An another thread is processing GVL yield. */
|
||||
if (UNLIKELY(vm->gvl.wait_yield)) {
|
||||
while (vm->gvl.wait_yield)
|
||||
while (vm->gvl.wait_yield)
|
||||
rb_native_cond_wait(&vm->gvl.switch_wait_cond, &vm->gvl.lock);
|
||||
goto acquire;
|
||||
}
|
||||
|
||||
if (vm->gvl.waiting > 0) {
|
||||
/* Wait until another thread task take GVL. */
|
||||
vm->gvl.need_yield = 1;
|
||||
vm->gvl.wait_yield = 1;
|
||||
while (vm->gvl.need_yield)
|
||||
else if (next) {
|
||||
/* Wait until another thread task takes GVL. */
|
||||
vm->gvl.need_yield = 1;
|
||||
vm->gvl.wait_yield = 1;
|
||||
while (vm->gvl.need_yield)
|
||||
rb_native_cond_wait(&vm->gvl.switch_cond, &vm->gvl.lock);
|
||||
vm->gvl.wait_yield = 0;
|
||||
vm->gvl.wait_yield = 0;
|
||||
rb_native_cond_broadcast(&vm->gvl.switch_wait_cond);
|
||||
}
|
||||
else {
|
||||
rb_native_mutex_unlock(&vm->gvl.lock);
|
||||
sched_yield();
|
||||
rb_native_mutex_unlock(&vm->gvl.lock);
|
||||
/*
|
||||
* GVL was not contended when we released, so we have no potential
|
||||
* contenders for reacquisition. Perhaps they are stuck in blocking
|
||||
* region w/o GVL, too, so we kick them:
|
||||
*/
|
||||
ubf_wakeup_all_threads();
|
||||
native_thread_yield();
|
||||
rb_native_mutex_lock(&vm->gvl.lock);
|
||||
rb_native_cond_broadcast(&vm->gvl.switch_wait_cond);
|
||||
}
|
||||
|
||||
rb_native_cond_broadcast(&vm->gvl.switch_wait_cond);
|
||||
acquire:
|
||||
gvl_acquire_common(vm);
|
||||
gvl_acquire_common(vm, th);
|
||||
rb_native_mutex_unlock(&vm->gvl.lock);
|
||||
}
|
||||
|
||||
|
@ -173,11 +218,11 @@ static void
|
|||
gvl_init(rb_vm_t *vm)
|
||||
{
|
||||
rb_native_mutex_initialize(&vm->gvl.lock);
|
||||
rb_native_cond_initialize(&vm->gvl.cond);
|
||||
rb_native_cond_initialize(&vm->gvl.switch_cond);
|
||||
rb_native_cond_initialize(&vm->gvl.switch_wait_cond);
|
||||
list_head_init(&vm->gvl.waitq);
|
||||
vm->gvl.acquired = 0;
|
||||
vm->gvl.waiting = 0;
|
||||
vm->gvl.timer = 0;
|
||||
vm->gvl.need_yield = 0;
|
||||
vm->gvl.wait_yield = 0;
|
||||
}
|
||||
|
@ -185,10 +230,16 @@ gvl_init(rb_vm_t *vm)
|
|||
static void
|
||||
gvl_destroy(rb_vm_t *vm)
|
||||
{
|
||||
rb_native_cond_destroy(&vm->gvl.switch_wait_cond);
|
||||
rb_native_cond_destroy(&vm->gvl.switch_cond);
|
||||
rb_native_cond_destroy(&vm->gvl.cond);
|
||||
rb_native_mutex_destroy(&vm->gvl.lock);
|
||||
/*
|
||||
* only called once at VM shutdown (not atfork), another thread
|
||||
* may still grab vm->gvl.lock when calling gvl_release at
|
||||
* the end of thread_start_func_2
|
||||
*/
|
||||
if (0) {
|
||||
rb_native_cond_destroy(&vm->gvl.switch_wait_cond);
|
||||
rb_native_cond_destroy(&vm->gvl.switch_cond);
|
||||
rb_native_mutex_destroy(&vm->gvl.lock);
|
||||
}
|
||||
clear_thread_cache_altstack();
|
||||
}
|
||||
|
||||
|
@ -433,7 +484,9 @@ native_thread_init(rb_thread_t *th)
|
|||
#ifdef USE_UBF_LIST
|
||||
list_node_init(&nd->ubf_list);
|
||||
#endif
|
||||
rb_native_cond_initialize(&nd->sleep_cond);
|
||||
rb_native_cond_initialize(&nd->cond.gvlq);
|
||||
if (&nd->cond.gvlq != &nd->cond.intr)
|
||||
rb_native_cond_initialize(&nd->cond.intr);
|
||||
ruby_thread_set_native(th);
|
||||
}
|
||||
|
||||
|
@ -444,7 +497,11 @@ native_thread_init(rb_thread_t *th)
|
|||
static void
|
||||
native_thread_destroy(rb_thread_t *th)
|
||||
{
|
||||
rb_native_cond_destroy(&th->native_thread_data.sleep_cond);
|
||||
native_thread_data_t *nd = &th->native_thread_data;
|
||||
|
||||
rb_native_cond_destroy(&nd->cond.gvlq);
|
||||
if (&nd->cond.gvlq != &nd->cond.intr)
|
||||
rb_native_cond_destroy(&nd->cond.intr);
|
||||
|
||||
/*
|
||||
* prevent false positive from ruby_thread_has_gvl_p if that
|
||||
|
@ -1012,17 +1069,6 @@ native_thread_create(rb_thread_t *th)
|
|||
return err;
|
||||
}
|
||||
|
||||
#if (TIMER_IMPL & TIMER_THREAD_MASK)
|
||||
static void
|
||||
native_thread_join(pthread_t th)
|
||||
{
|
||||
int err = pthread_join(th, 0);
|
||||
if (err) {
|
||||
rb_raise(rb_eThreadError, "native_thread_join() failed (%d)", err);
|
||||
}
|
||||
}
|
||||
#endif /* TIMER_THREAD_MASK */
|
||||
|
||||
#if USE_NATIVE_THREAD_PRIORITY
|
||||
|
||||
static void
|
||||
|
@ -1064,15 +1110,15 @@ ubf_pthread_cond_signal(void *ptr)
|
|||
{
|
||||
rb_thread_t *th = (rb_thread_t *)ptr;
|
||||
thread_debug("ubf_pthread_cond_signal (%p)\n", (void *)th);
|
||||
rb_native_cond_signal(&th->native_thread_data.sleep_cond);
|
||||
rb_native_cond_signal(&th->native_thread_data.cond.intr);
|
||||
}
|
||||
|
||||
static void
|
||||
native_sleep(rb_thread_t *th, struct timespec *timeout_rel)
|
||||
native_cond_sleep(rb_thread_t *th, struct timespec *timeout_rel)
|
||||
{
|
||||
struct timespec timeout;
|
||||
rb_nativethread_lock_t *lock = &th->interrupt_lock;
|
||||
rb_nativethread_cond_t *cond = &th->native_thread_data.sleep_cond;
|
||||
rb_nativethread_cond_t *cond = &th->native_thread_data.cond.intr;
|
||||
|
||||
if (timeout_rel) {
|
||||
/* Solaris cond_timedwait() return EINVAL if an argument is greater than
|
||||
|
@ -1164,17 +1210,30 @@ static void
|
|||
ubf_select(void *ptr)
|
||||
{
|
||||
rb_thread_t *th = (rb_thread_t *)ptr;
|
||||
rb_vm_t *vm = th->vm;
|
||||
|
||||
register_ubf_list(th);
|
||||
|
||||
/*
|
||||
* ubf_wakeup_thread() doesn't guarantee to wake up a target thread.
|
||||
* Therefore, we repeatedly call ubf_wakeup_thread() until a target thread
|
||||
* exit from ubf function.
|
||||
* In the other hands, we shouldn't call rb_thread_wakeup_timer_thread()
|
||||
* if running on timer thread because it may make endless wakeups.
|
||||
* exit from ubf function. We must designate a timer-thread to perform
|
||||
* this operation.
|
||||
*/
|
||||
if (!pthread_equal(pthread_self(), timer_thread.id))
|
||||
rb_thread_wakeup_timer_thread();
|
||||
rb_native_mutex_lock(&vm->gvl.lock);
|
||||
if (!vm->gvl.timer) {
|
||||
native_thread_data_t *last;
|
||||
|
||||
last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list);
|
||||
if (last) {
|
||||
rb_native_cond_signal(&last->cond.gvlq);
|
||||
}
|
||||
else {
|
||||
rb_thread_wakeup_timer_thread(0);
|
||||
}
|
||||
}
|
||||
rb_native_mutex_unlock(&vm->gvl.lock);
|
||||
|
||||
ubf_wakeup_thread(th);
|
||||
}
|
||||
|
||||
|
@ -1211,39 +1270,16 @@ static int ubf_threads_empty(void) { return 1; }
|
|||
#define TT_DEBUG 0
|
||||
#define WRITE_CONST(fd, str) (void)(write((fd),(str),sizeof(str)-1)<0)
|
||||
|
||||
/* 100ms. 10ms is too small for user level thread scheduling
|
||||
* on recent Linux (tested on 2.6.35)
|
||||
*/
|
||||
#define TIME_QUANTUM_USEC (100 * 1000)
|
||||
|
||||
#if TIMER_IMPL == TIMER_THREAD_SLEEPY
|
||||
static struct {
|
||||
/*
|
||||
* Read end of each pipe is closed inside timer thread for shutdown
|
||||
* Write ends are closed by a normal Ruby thread during shutdown
|
||||
*/
|
||||
/* pipes are closed in forked children when owner_process does not match */
|
||||
int normal[2];
|
||||
int low[2];
|
||||
|
||||
/* volatile for signal handler use: */
|
||||
volatile rb_pid_t owner_process;
|
||||
} timer_thread_pipe = {
|
||||
{-1, -1},
|
||||
{-1, -1}, /* low priority */
|
||||
};
|
||||
|
||||
NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd));
|
||||
static void
|
||||
async_bug_fd(const char *mesg, int errno_arg, int fd)
|
||||
{
|
||||
char buff[64];
|
||||
size_t n = strlcpy(buff, mesg, sizeof(buff));
|
||||
if (n < sizeof(buff)-3) {
|
||||
ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd);
|
||||
}
|
||||
rb_async_bug_errno(buff, errno_arg);
|
||||
}
|
||||
|
||||
/* only use signal-safe system calls here */
|
||||
static void
|
||||
rb_thread_wakeup_timer_thread_fd(int fd)
|
||||
|
@ -1275,49 +1311,33 @@ rb_thread_wakeup_timer_thread_fd(int fd)
|
|||
}
|
||||
|
||||
void
|
||||
rb_thread_wakeup_timer_thread(void)
|
||||
rb_thread_wakeup_timer_thread(int sig)
|
||||
{
|
||||
/* must be safe inside sighandler, so no mutex */
|
||||
if (timer_thread_pipe.owner_process == getpid()) {
|
||||
rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]);
|
||||
}
|
||||
}
|
||||
rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]);
|
||||
|
||||
static void
|
||||
rb_thread_wakeup_timer_thread_low(void)
|
||||
{
|
||||
if (timer_thread_pipe.owner_process == getpid()) {
|
||||
rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.low[1]);
|
||||
}
|
||||
}
|
||||
/*
|
||||
* system_working check is required because vm and main_thread are
|
||||
* freed during shutdown
|
||||
*/
|
||||
if (sig && system_working) {
|
||||
volatile rb_execution_context_t *ec;
|
||||
rb_vm_t *vm = GET_VM();
|
||||
rb_thread_t *mth;
|
||||
|
||||
/* VM-dependent API is not available for this function */
|
||||
static void
|
||||
consume_communication_pipe(int fd)
|
||||
{
|
||||
#define CCP_READ_BUFF_SIZE 1024
|
||||
/* buffer can be shared because no one refers to them. */
|
||||
static char buff[CCP_READ_BUFF_SIZE];
|
||||
ssize_t result;
|
||||
/*
|
||||
* FIXME: root VM and main_thread should be static and not
|
||||
* on heap for maximum safety (and startup/shutdown speed)
|
||||
*/
|
||||
if (!vm) return;
|
||||
mth = vm->main_thread;
|
||||
if (!mth || !system_working) return;
|
||||
|
||||
while (1) {
|
||||
result = read(fd, buff, sizeof(buff));
|
||||
if (result == 0) {
|
||||
return;
|
||||
}
|
||||
else if (result < 0) {
|
||||
int e = errno;
|
||||
switch (e) {
|
||||
case EINTR:
|
||||
continue; /* retry */
|
||||
case EAGAIN:
|
||||
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
|
||||
case EWOULDBLOCK:
|
||||
#endif
|
||||
return;
|
||||
default:
|
||||
async_bug_fd("consume_communication_pipe: read", e, fd);
|
||||
}
|
||||
/* this relies on GC for grace period before cont_free */
|
||||
ec = ACCESS_ONCE(rb_execution_context_t *, mth->ec);
|
||||
|
||||
if (ec) RUBY_VM_SET_TRAP_INTERRUPT(ec);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1350,6 +1370,7 @@ set_nonblock(int fd)
|
|||
rb_sys_fail(0);
|
||||
}
|
||||
|
||||
/* communication pipe with timer thread and signal handler */
|
||||
static int
|
||||
setup_communication_pipe_internal(int pipes[2])
|
||||
{
|
||||
|
@ -1374,108 +1395,6 @@ setup_communication_pipe_internal(int pipes[2])
|
|||
return 0;
|
||||
}
|
||||
|
||||
/* communication pipe with timer thread and signal handler */
|
||||
static int
|
||||
setup_communication_pipe(void)
|
||||
{
|
||||
rb_pid_t owner = timer_thread_pipe.owner_process;
|
||||
|
||||
if (owner && owner != getpid()) {
|
||||
CLOSE_INVALIDATE(normal[0]);
|
||||
CLOSE_INVALIDATE(normal[1]);
|
||||
CLOSE_INVALIDATE(low[0]);
|
||||
CLOSE_INVALIDATE(low[1]);
|
||||
}
|
||||
|
||||
if (setup_communication_pipe_internal(timer_thread_pipe.normal) < 0) {
|
||||
return errno;
|
||||
}
|
||||
if (setup_communication_pipe_internal(timer_thread_pipe.low) < 0) {
|
||||
return errno;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Let the timer thread sleep a while.
|
||||
*
|
||||
* The timer thread sleeps until woken up by rb_thread_wakeup_timer_thread() if only one Ruby thread is running.
|
||||
* @pre the calling context is in the timer thread.
|
||||
*/
|
||||
static inline void
|
||||
timer_thread_sleep(rb_vm_t *vm)
|
||||
{
|
||||
int result;
|
||||
int need_polling;
|
||||
struct pollfd pollfds[2];
|
||||
|
||||
pollfds[0].fd = timer_thread_pipe.normal[0];
|
||||
pollfds[0].events = POLLIN;
|
||||
pollfds[1].fd = timer_thread_pipe.low[0];
|
||||
pollfds[1].events = POLLIN;
|
||||
|
||||
need_polling = !ubf_threads_empty();
|
||||
|
||||
if (SIGCHLD_LOSSY && !need_polling) {
|
||||
rb_native_mutex_lock(&vm->waitpid_lock);
|
||||
if (!list_empty(&vm->waiting_pids) || !list_empty(&vm->waiting_grps)) {
|
||||
need_polling = 1;
|
||||
}
|
||||
rb_native_mutex_unlock(&vm->waitpid_lock);
|
||||
}
|
||||
|
||||
if (vm->gvl.waiting > 0 || need_polling) {
|
||||
/* polling (TIME_QUANTUM_USEC usec) */
|
||||
result = poll(pollfds, 1, TIME_QUANTUM_USEC/1000);
|
||||
}
|
||||
else {
|
||||
/* wait (infinite) */
|
||||
result = poll(pollfds, numberof(pollfds), -1);
|
||||
}
|
||||
|
||||
if (result == 0) {
|
||||
/* maybe timeout */
|
||||
}
|
||||
else if (result > 0) {
|
||||
consume_communication_pipe(timer_thread_pipe.normal[0]);
|
||||
consume_communication_pipe(timer_thread_pipe.low[0]);
|
||||
}
|
||||
else { /* result < 0 */
|
||||
int e = errno;
|
||||
switch (e) {
|
||||
case EBADF:
|
||||
case EINVAL:
|
||||
case ENOMEM: /* from Linux man */
|
||||
case EFAULT: /* from FreeBSD man */
|
||||
rb_async_bug_errno("thread_timer: select", e);
|
||||
default:
|
||||
/* ignore */;
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif /* TIMER_THREAD_SLEEPY */
|
||||
|
||||
#if TIMER_IMPL == TIMER_THREAD_BUSY
|
||||
# define PER_NANO 1000000000
|
||||
void rb_thread_wakeup_timer_thread(void) {}
|
||||
static void rb_thread_wakeup_timer_thread_low(void) {}
|
||||
|
||||
static rb_nativethread_lock_t timer_thread_lock;
|
||||
static rb_nativethread_cond_t timer_thread_cond;
|
||||
|
||||
static inline void
|
||||
timer_thread_sleep(rb_vm_t *unused)
|
||||
{
|
||||
struct timespec ts;
|
||||
ts.tv_sec = 0;
|
||||
ts.tv_nsec = TIME_QUANTUM_USEC * 1000;
|
||||
ts = native_cond_timeout(&timer_thread_cond, ts);
|
||||
|
||||
native_cond_timedwait(&timer_thread_cond, &timer_thread_lock, &ts);
|
||||
}
|
||||
#endif /* TIMER_IMPL == TIMER_THREAD_BUSY */
|
||||
|
||||
#if !defined(SET_CURRENT_THREAD_NAME) && defined(__linux__) && defined(PR_SET_NAME)
|
||||
# define SET_CURRENT_THREAD_NAME(name) prctl(PR_SET_NAME, name)
|
||||
#endif
|
||||
|
@ -1526,137 +1445,26 @@ native_set_another_thread_name(rb_nativethread_id_t thread_id, VALUE name)
|
|||
return name;
|
||||
}
|
||||
|
||||
static void *
|
||||
thread_timer(void *p)
|
||||
{
|
||||
rb_vm_t *vm = p;
|
||||
#ifdef HAVE_PTHREAD_SIGMASK /* mainly to enable SIGCHLD */
|
||||
{
|
||||
sigset_t mask;
|
||||
sigemptyset(&mask);
|
||||
pthread_sigmask(SIG_SETMASK, &mask, NULL);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (TT_DEBUG) WRITE_CONST(2, "start timer thread\n");
|
||||
|
||||
#ifdef SET_CURRENT_THREAD_NAME
|
||||
SET_CURRENT_THREAD_NAME("ruby-timer-thr");
|
||||
#endif
|
||||
|
||||
#if TIMER_IMPL == TIMER_THREAD_BUSY
|
||||
rb_native_mutex_initialize(&timer_thread_lock);
|
||||
rb_native_cond_initialize(&timer_thread_cond);
|
||||
rb_native_mutex_lock(&timer_thread_lock);
|
||||
#endif
|
||||
while (system_working > 0) {
|
||||
|
||||
/* timer function */
|
||||
ubf_wakeup_all_threads();
|
||||
timer_thread_function(0);
|
||||
|
||||
if (TT_DEBUG) WRITE_CONST(2, "tick\n");
|
||||
|
||||
/* wait */
|
||||
timer_thread_sleep(vm);
|
||||
}
|
||||
#if TIMER_IMPL == TIMER_THREAD_BUSY
|
||||
rb_native_mutex_unlock(&timer_thread_lock);
|
||||
rb_native_cond_destroy(&timer_thread_cond);
|
||||
rb_native_mutex_destroy(&timer_thread_lock);
|
||||
#endif
|
||||
|
||||
if (TT_DEBUG) WRITE_CONST(2, "finish timer thread\n");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
#if (TIMER_IMPL & TIMER_THREAD_MASK)
|
||||
static void
|
||||
rb_thread_create_timer_thread(void)
|
||||
{
|
||||
if (!timer_thread.created) {
|
||||
size_t stack_size = 0;
|
||||
int err;
|
||||
pthread_attr_t attr;
|
||||
rb_vm_t *vm = GET_VM();
|
||||
/* we only create the pipe, and lazy-spawn */
|
||||
rb_pid_t current = getpid();
|
||||
rb_pid_t owner = timer_thread_pipe.owner_process;
|
||||
|
||||
err = pthread_attr_init(&attr);
|
||||
if (err != 0) {
|
||||
rb_warn("pthread_attr_init failed for timer: %s, scheduling broken",
|
||||
strerror(err));
|
||||
return;
|
||||
}
|
||||
# ifdef PTHREAD_STACK_MIN
|
||||
{
|
||||
size_t stack_min = PTHREAD_STACK_MIN; /* may be dynamic, get only once */
|
||||
const size_t min_size = (4096 * 4);
|
||||
/* Allocate the machine stack for the timer thread
|
||||
* at least 16KB (4 pages). FreeBSD 8.2 AMD64 causes
|
||||
* machine stack overflow only with PTHREAD_STACK_MIN.
|
||||
*/
|
||||
enum {
|
||||
needs_more_stack =
|
||||
#if defined HAVE_VALGRIND_MEMCHECK_H && defined __APPLE__
|
||||
1
|
||||
#else
|
||||
THREAD_DEBUG != 0
|
||||
#endif
|
||||
};
|
||||
stack_size = stack_min;
|
||||
if (stack_size < min_size) stack_size = min_size;
|
||||
if (needs_more_stack) {
|
||||
stack_size += +((BUFSIZ - 1) / stack_min + 1) * stack_min;
|
||||
}
|
||||
err = pthread_attr_setstacksize(&attr, stack_size);
|
||||
if (err != 0) {
|
||||
rb_bug("pthread_attr_setstacksize(.., %"PRIuSIZE") failed: %s",
|
||||
stack_size, strerror(err));
|
||||
}
|
||||
}
|
||||
# endif
|
||||
if (owner && owner != current) {
|
||||
CLOSE_INVALIDATE(normal[0]);
|
||||
CLOSE_INVALIDATE(normal[1]);
|
||||
}
|
||||
|
||||
#if TIMER_IMPL == TIMER_THREAD_SLEEPY
|
||||
err = setup_communication_pipe();
|
||||
if (err) return;
|
||||
#endif /* TIMER_THREAD_SLEEPY */
|
||||
if (setup_communication_pipe_internal(timer_thread_pipe.normal) < 0) return;
|
||||
|
||||
/* create timer thread */
|
||||
if (timer_thread.created) {
|
||||
rb_bug("rb_thread_create_timer_thread: Timer thread was already created\n");
|
||||
}
|
||||
err = pthread_create(&timer_thread.id, &attr, thread_timer, vm);
|
||||
pthread_attr_destroy(&attr);
|
||||
|
||||
if (err == EINVAL) {
|
||||
/*
|
||||
* Even if we are careful with our own stack use in thread_timer(),
|
||||
* any third-party libraries (eg libkqueue) which rely on __thread
|
||||
* storage can cause small stack sizes to fail. So lets hope the
|
||||
* default stack size is enough for them:
|
||||
*/
|
||||
stack_size = 0;
|
||||
err = pthread_create(&timer_thread.id, NULL, thread_timer, vm);
|
||||
}
|
||||
if (err != 0) {
|
||||
rb_warn("pthread_create failed for timer: %s, scheduling broken",
|
||||
strerror(err));
|
||||
if (stack_size) {
|
||||
rb_warn("timer thread stack size: %"PRIuSIZE, stack_size);
|
||||
}
|
||||
else {
|
||||
rb_warn("timer thread stack size: system default");
|
||||
}
|
||||
VM_ASSERT(err == 0);
|
||||
return;
|
||||
}
|
||||
#if TIMER_IMPL == TIMER_THREAD_SLEEPY
|
||||
/* validate pipe on this process */
|
||||
timer_thread_pipe.owner_process = getpid();
|
||||
#endif /* TIMER_THREAD_SLEEPY */
|
||||
timer_thread.created = 1;
|
||||
if (owner != current) {
|
||||
/* validate pipe on this process */
|
||||
sigwait_th = THREAD_INVALID;
|
||||
timer_thread_pipe.owner_process = current;
|
||||
}
|
||||
}
|
||||
#endif /* TIMER_IMPL & TIMER_THREAD_MASK */
|
||||
|
||||
static int
|
||||
native_stop_timer_thread(void)
|
||||
|
@ -1665,24 +1473,6 @@ native_stop_timer_thread(void)
|
|||
stopped = --system_working <= 0;
|
||||
|
||||
if (TT_DEBUG) fprintf(stderr, "stop timer thread\n");
|
||||
if (stopped) {
|
||||
#if TIMER_IMPL == TIMER_THREAD_SLEEPY
|
||||
/* kick timer thread out of sleep */
|
||||
rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]);
|
||||
#endif
|
||||
|
||||
/* timer thread will stop looping when system_working <= 0: */
|
||||
native_thread_join(timer_thread.id);
|
||||
|
||||
/*
|
||||
* don't care if timer_thread_pipe may fill up at this point.
|
||||
* If we restart timer thread, signals will be processed, if
|
||||
* we don't, it's because we're in a different child
|
||||
*/
|
||||
|
||||
if (TT_DEBUG) fprintf(stderr, "joined timer thread\n");
|
||||
timer_thread.created = 0;
|
||||
}
|
||||
return stopped;
|
||||
}
|
||||
|
||||
|
@ -1739,20 +1529,14 @@ ruby_stack_overflowed_p(const rb_thread_t *th, const void *addr)
|
|||
int
|
||||
rb_reserved_fd_p(int fd)
|
||||
{
|
||||
#if TIMER_IMPL == TIMER_THREAD_SLEEPY
|
||||
if ((fd == timer_thread_pipe.normal[0] ||
|
||||
fd == timer_thread_pipe.normal[1] ||
|
||||
fd == timer_thread_pipe.low[0] ||
|
||||
fd == timer_thread_pipe.low[1]) &&
|
||||
fd == timer_thread_pipe.normal[1]) &&
|
||||
timer_thread_pipe.owner_process == getpid()) { /* async-signal-safe */
|
||||
return 1;
|
||||
}
|
||||
else {
|
||||
return 0;
|
||||
}
|
||||
#else
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
rb_nativethread_id_t
|
||||
|
@ -1803,7 +1587,7 @@ rb_sleep_cond_get(const rb_execution_context_t *ec)
|
|||
{
|
||||
rb_thread_t *th = rb_ec_thread_ptr(ec);
|
||||
|
||||
return &th->native_thread_data.sleep_cond;
|
||||
return &th->native_thread_data.cond.intr;
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -1813,4 +1597,126 @@ rb_sleep_cond_put(rb_nativethread_cond_t *cond)
|
|||
}
|
||||
#endif /* USE_NATIVE_SLEEP_COND */
|
||||
|
||||
int
|
||||
rb_sigwait_fd_get(const rb_thread_t *th)
|
||||
{
|
||||
if (timer_thread_pipe.owner_process == getpid() &&
|
||||
timer_thread_pipe.normal[0] >= 0) {
|
||||
if (ATOMIC_PTR_CAS(sigwait_th, THREAD_INVALID, th) == THREAD_INVALID) {
|
||||
return timer_thread_pipe.normal[0];
|
||||
}
|
||||
}
|
||||
return -1; /* avoid thundering herd */
|
||||
}
|
||||
|
||||
void
|
||||
rb_sigwait_fd_put(const rb_thread_t *th, int fd)
|
||||
{
|
||||
const rb_thread_t *old;
|
||||
|
||||
VM_ASSERT(timer_thread_pipe.normal[0] == fd);
|
||||
old = ATOMIC_PTR_EXCHANGE(sigwait_th, THREAD_INVALID);
|
||||
if (old != th) assert(old == th);
|
||||
}
|
||||
|
||||
#ifndef HAVE_PPOLL
|
||||
/* TODO: don't ignore sigmask */
|
||||
static int
|
||||
ruby_ppoll(struct pollfd *fds, nfds_t nfds,
|
||||
const struct timespec *ts, const sigset_t *sigmask)
|
||||
{
|
||||
int timeout_ms;
|
||||
|
||||
if (ts) {
|
||||
int tmp, tmp2;
|
||||
|
||||
if (ts->tv_sec > INT_MAX/1000)
|
||||
timeout_ms = INT_MAX;
|
||||
else {
|
||||
tmp = (int)(ts->tv_sec * 1000);
|
||||
/* round up 1ns to 1ms to avoid excessive wakeups for <1ms sleep */
|
||||
tmp2 = (int)((ts->tv_nsec + 999999L) / (1000L * 1000L));
|
||||
if (INT_MAX - tmp < tmp2)
|
||||
timeout_ms = INT_MAX;
|
||||
else
|
||||
timeout_ms = (int)(tmp + tmp2);
|
||||
}
|
||||
}
|
||||
else
|
||||
timeout_ms = -1;
|
||||
|
||||
return poll(fds, nfds, timeout_ms);
|
||||
}
|
||||
# define ppoll(fds,nfds,ts,sigmask) ruby_ppoll((fds),(nfds),(ts),(sigmask))
|
||||
#endif
|
||||
|
||||
void
|
||||
rb_sigwait_sleep(rb_thread_t *th, int sigwait_fd, const struct timespec *ts)
|
||||
{
|
||||
struct pollfd pfd;
|
||||
|
||||
pfd.fd = sigwait_fd;
|
||||
pfd.events = POLLIN;
|
||||
|
||||
if (!BUSY_WAIT_SIGNALS && ubf_threads_empty()) {
|
||||
(void)ppoll(&pfd, 1, ts, 0);
|
||||
check_signals_nogvl(th, sigwait_fd);
|
||||
}
|
||||
else {
|
||||
struct timespec end, diff;
|
||||
const struct timespec *to;
|
||||
int n = 0;
|
||||
|
||||
if (ts) {
|
||||
getclockofday(&end);
|
||||
timespec_add(&end, ts);
|
||||
diff = *ts;
|
||||
ts = &diff;
|
||||
}
|
||||
/*
|
||||
* tricky: this needs to return on spurious wakeup (no auto-retry).
|
||||
* But we also need to distinguish between periodic quantum
|
||||
* wakeups, so we care about the result of consume_communication_pipe
|
||||
*/
|
||||
for (;;) {
|
||||
to = sigwait_timeout(th, sigwait_fd, ts, &n);
|
||||
if (n) return;
|
||||
n = ppoll(&pfd, 1, to, 0);
|
||||
if (check_signals_nogvl(th, sigwait_fd))
|
||||
return;
|
||||
if (n || (th && RUBY_VM_INTERRUPTED(th->ec)))
|
||||
return;
|
||||
if (ts && timespec_update_expire(&diff, &end))
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
native_sleep(rb_thread_t *th, struct timespec *timeout_rel)
|
||||
{
|
||||
int sigwait_fd = rb_sigwait_fd_get(th);
|
||||
|
||||
if (sigwait_fd >= 0) {
|
||||
rb_native_mutex_lock(&th->interrupt_lock);
|
||||
th->unblock.func = ubf_sigwait;
|
||||
rb_native_mutex_unlock(&th->interrupt_lock);
|
||||
|
||||
GVL_UNLOCK_BEGIN(th);
|
||||
|
||||
if (!RUBY_VM_INTERRUPTED(th->ec)) {
|
||||
rb_sigwait_sleep(th, sigwait_fd, timeout_rel);
|
||||
}
|
||||
else {
|
||||
check_signals_nogvl(th, sigwait_fd);
|
||||
}
|
||||
unblock_function_clear(th);
|
||||
GVL_UNLOCK_END(th);
|
||||
rb_sigwait_fd_put(th, sigwait_fd);
|
||||
rb_sigwait_fd_migrate(th->vm);
|
||||
}
|
||||
else {
|
||||
native_cond_sleep(th, timeout_rel);
|
||||
}
|
||||
}
|
||||
#endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue