merge revision(s) 39679,39682,39683,39685,39686,39694: [Backport #7999]

* thread_pthread.c (set_nonblock): new helper function for set
	  O_NONBLOCK.

	* thread_pthread.c (rb_thread_create_timer_thread): set O_NONBLOCK
	  to timer_thread_pipe[0] too.

	* thread_pthread.c (consume_communication_pipe): retry when
	  read returned CCP_READ_BUFF_SIZE.

	* thread_pthread.c (rb_thread_create_timer_thread): factor out
	  creating communication pipe logic into separate function.

	* thread_pthread.c (setup_communication_pipe): new helper function.

	* thread_pthread.c (set_nonblock): moves a definition before
	  setup_communication_pipe.

	* thread_pthread.c (rb_thread_wakeup_timer_thread_fd): add fd
	  argument and remove hardcoded dependency of timer_thread_pipe[1].

	* thread_pthread.c (consume_communication_pipe): add fd argument.

	* thread_pthread.c (close_communication_pipe): ditto.

	* thread_pthread.c (timer_thread_sleep): adjust the above changes.

	* thread_pthread.c (setup_communication_pipe_internal): factor
	  out pipe initialize logic.

	* thread_pthread.c (ARRAY_SIZE): new.

	* thread_pthread.c (gvl_acquire_common): use low priority
	  notification for avoiding timer thread interval confusion.
	  If we use timer_thread_pipe[1], every gvl_yield() request
	  one more gvl_yield(). It lead to thread starvation.
	  [Bug #7999] [ruby-core:53095]

	* thread_pthread.c (rb_reserved_fd_p): adds timer_thread_pipe_low
	  to reserved fds.

	* process.c (setup_communication_pipe): remove unused function.
	  it was unintentionally added r39683.


git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/branches/ruby_2_0_0@39727 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
nagachika 2013-03-11 15:54:49 +00:00
parent c98f39af77
commit 6901ebcb7c
3 changed files with 165 additions and 60 deletions

View file

@ -1,3 +1,51 @@
Tue Mar 12 00:53:34 2013 KOSAKI Motohiro <kosaki.motohiro@gmail.com>
* process.c (setup_communication_pipe): remove unused function.
it was unintentionally added r39683.
Tue Mar 12 00:53:34 2013 KOSAKI Motohiro <kosaki.motohiro@gmail.com>
* thread_pthread.c (ARRAY_SIZE): new.
* thread_pthread.c (gvl_acquire_common): use low priority
notification for avoiding timer thread interval confusion.
If we use timer_thread_pipe[1], every gvl_yield() request
one more gvl_yield(). It lead to thread starvation.
[Bug #7999] [ruby-core:53095]
* thread_pthread.c (rb_reserved_fd_p): adds timer_thread_pipe_low
to reserved fds.
Tue Mar 12 00:53:34 2013 KOSAKI Motohiro <kosaki.motohiro@gmail.com>
* thread_pthread.c (rb_thread_wakeup_timer_thread_fd): add fd
argument and remove hardcoded dependency of timer_thread_pipe[1].
* thread_pthread.c (consume_communication_pipe): add fd argument.
* thread_pthread.c (close_communication_pipe): ditto.
* thread_pthread.c (timer_thread_sleep): adjust the above changes.
* thread_pthread.c (setup_communication_pipe_internal): factor
out pipe initialize logic.
Tue Mar 12 00:53:34 2013 KOSAKI Motohiro <kosaki.motohiro@gmail.com>
* thread_pthread.c (rb_thread_create_timer_thread): factor out
creating communication pipe logic into separate function.
* thread_pthread.c (setup_communication_pipe): new helper function.
* thread_pthread.c (set_nonblock): moves a definition before
setup_communication_pipe.
Tue Mar 12 00:53:34 2013 KOSAKI Motohiro <kosaki.motohiro@gmail.com>
* thread_pthread.c (consume_communication_pipe): retry when
read returned CCP_READ_BUFF_SIZE.
Tue Mar 12 00:53:34 2013 KOSAKI Motohiro <kosaki.motohiro@gmail.com>
* thread_pthread.c (set_nonblock): new helper function for set
O_NONBLOCK.
* thread_pthread.c (rb_thread_create_timer_thread): set O_NONBLOCK
to timer_thread_pipe[0] too.
Tue Mar 12 00:51:23 2013 KOSAKI Motohiro <kosaki.motohiro@gmail.com> Tue Mar 12 00:51:23 2013 KOSAKI Motohiro <kosaki.motohiro@gmail.com>
* thread_pthread.c (timer_thread_sleep): use poll() instead of * thread_pthread.c (timer_thread_sleep): use poll() instead of

View file

@ -44,6 +44,7 @@ static void native_cond_broadcast(rb_thread_cond_t *cond);
static void native_cond_wait(rb_thread_cond_t *cond, pthread_mutex_t *mutex); static void native_cond_wait(rb_thread_cond_t *cond, pthread_mutex_t *mutex);
static void native_cond_initialize(rb_thread_cond_t *cond, int flags); static void native_cond_initialize(rb_thread_cond_t *cond, int flags);
static void native_cond_destroy(rb_thread_cond_t *cond); static void native_cond_destroy(rb_thread_cond_t *cond);
static void rb_thread_wakeup_timer_thread_low(void);
static pthread_t timer_thread_id; static pthread_t timer_thread_id;
#define RB_CONDATTR_CLOCK_MONOTONIC 1 #define RB_CONDATTR_CLOCK_MONOTONIC 1
@ -63,6 +64,10 @@ static pthread_t timer_thread_id;
# define USE_SLEEPY_TIMER_THREAD 0 # define USE_SLEEPY_TIMER_THREAD 0
#endif #endif
#ifndef ARRAY_SIZE
#define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
#endif
static void static void
gvl_acquire_common(rb_vm_t *vm) gvl_acquire_common(rb_vm_t *vm)
{ {
@ -70,8 +75,12 @@ gvl_acquire_common(rb_vm_t *vm)
vm->gvl.waiting++; vm->gvl.waiting++;
if (vm->gvl.waiting == 1) { if (vm->gvl.waiting == 1) {
/* transit to polling mode */ /*
rb_thread_wakeup_timer_thread(); * Wake up timer thread iff timer thread is slept.
* When timer thread is polling mode, we don't want to
* make confusing timer thread interval time.
*/
rb_thread_wakeup_timer_thread_low();
} }
while (vm->gvl.acquired) { while (vm->gvl.acquired) {
@ -1145,12 +1154,12 @@ static int check_signal_thread_list(void) { return 0; }
#if USE_SLEEPY_TIMER_THREAD #if USE_SLEEPY_TIMER_THREAD
static int timer_thread_pipe[2] = {-1, -1}; static int timer_thread_pipe[2] = {-1, -1};
static int timer_thread_pipe_low[2] = {-1, -1}; /* low priority */
static int timer_thread_pipe_owner_process; static int timer_thread_pipe_owner_process;
/* only use signal-safe system calls here */ /* only use signal-safe system calls here */
void static void
rb_thread_wakeup_timer_thread(void) rb_thread_wakeup_timer_thread_fd(int fd)
{ {
ssize_t result; ssize_t result;
@ -1158,7 +1167,7 @@ rb_thread_wakeup_timer_thread(void)
if (timer_thread_pipe_owner_process == getpid()) { if (timer_thread_pipe_owner_process == getpid()) {
const char *buff = "!"; const char *buff = "!";
retry: retry:
if ((result = write(timer_thread_pipe[1], buff, 1)) <= 0) { if ((result = write(fd, buff, 1)) <= 0) {
switch (errno) { switch (errno) {
case EINTR: goto retry; case EINTR: goto retry;
case EAGAIN: case EAGAIN:
@ -1177,36 +1186,111 @@ rb_thread_wakeup_timer_thread(void)
} }
} }
void
rb_thread_wakeup_timer_thread(void)
{
rb_thread_wakeup_timer_thread_fd(timer_thread_pipe[1]);
}
static void
rb_thread_wakeup_timer_thread_low(void)
{
rb_thread_wakeup_timer_thread_fd(timer_thread_pipe_low[1]);
}
/* VM-dependent API is not available for this function */ /* VM-dependent API is not available for this function */
static void static void
consume_communication_pipe(void) consume_communication_pipe(int fd)
{ {
#define CCP_READ_BUFF_SIZE 1024 #define CCP_READ_BUFF_SIZE 1024
/* buffer can be shared because no one refers to them. */ /* buffer can be shared because no one refers to them. */
static char buff[CCP_READ_BUFF_SIZE]; static char buff[CCP_READ_BUFF_SIZE];
ssize_t result; ssize_t result;
retry: while (1) {
result = read(timer_thread_pipe[0], buff, CCP_READ_BUFF_SIZE); result = read(fd, buff, sizeof(buff));
if (result < 0) { if (result == 0) {
switch (errno) { return;
case EINTR: goto retry; }
default: else if (result < 0) {
rb_async_bug_errno("consume_communication_pipe: read\n", errno); switch (errno) {
case EINTR:
continue; /* retry */
case EAGAIN:
return;
default:
rb_async_bug_errno("consume_communication_pipe: read\n", errno);
}
} }
} }
} }
static void static void
close_communication_pipe(void) close_communication_pipe(int pipes[2])
{ {
if (close(timer_thread_pipe[0]) < 0) { if (close(pipes[0]) < 0) {
rb_bug_errno("native_stop_timer_thread - close(ttp[0])", errno); rb_bug_errno("native_stop_timer_thread - close(ttp[0])", errno);
} }
if (close(timer_thread_pipe[1]) < 0) { if (close(pipes[1]) < 0) {
rb_bug_errno("native_stop_timer_thread - close(ttp[1])", errno); rb_bug_errno("native_stop_timer_thread - close(ttp[1])", errno);
} }
timer_thread_pipe[0] = timer_thread_pipe[1] = -1; pipes[0] = pipes[1] = -1;
}
#if USE_SLEEPY_TIMER_THREAD
static void
set_nonblock(int fd)
{
int oflags;
int err;
oflags = fcntl(fd, F_GETFL);
if (oflags == -1)
rb_sys_fail(0);
oflags |= O_NONBLOCK;
err = fcntl(fd, F_SETFL, oflags);
if (err == -1)
rb_sys_fail(0);
}
#endif
#if USE_SLEEPY_TIMER_THREAD
static void
setup_communication_pipe_internal(int pipes[2])
{
int err;
if (pipes[0] != -1) {
/* close pipe of parent process */
close_communication_pipe(pipes);
}
err = rb_cloexec_pipe(pipes);
if (err != 0) {
rb_bug_errno("setup_communication_pipe: Failed to create communication pipe for timer thread", errno);
}
rb_update_max_fd(pipes[0]);
rb_update_max_fd(pipes[1]);
set_nonblock(pipes[0]);
set_nonblock(pipes[1]);
}
#endif /* USE_SLEEPY_TIMER_THREAD */
/* communication pipe with timer thread and signal handler */
static void
setup_communication_pipe(void)
{
#if USE_SLEEPY_TIMER_THREAD
if (timer_thread_pipe_owner_process == getpid()) {
/* already set up. */
return;
}
setup_communication_pipe_internal(timer_thread_pipe);
setup_communication_pipe_internal(timer_thread_pipe_low);
/* validate pipe on this process */
timer_thread_pipe_owner_process = getpid();
#endif /* USE_SLEEPY_TIMER_THREAD */
} }
/** /**
@ -1220,27 +1304,30 @@ timer_thread_sleep(rb_global_vm_lock_t* gvl)
{ {
int result; int result;
int need_polling; int need_polling;
struct pollfd pollfd; struct pollfd pollfds[2];
pollfd.fd = timer_thread_pipe[0]; pollfds[0].fd = timer_thread_pipe[0];
pollfd.events = POLLIN; pollfds[0].events = POLLIN;
pollfds[1].fd = timer_thread_pipe_low[0];
pollfds[1].events = POLLIN;
need_polling = check_signal_thread_list(); need_polling = check_signal_thread_list();
if (gvl->waiting > 0 || need_polling) { if (gvl->waiting > 0 || need_polling) {
/* polling (TIME_QUANTUM_USEC usec) */ /* polling (TIME_QUANTUM_USEC usec) */
result = poll(&pollfd, 1, TIME_QUANTUM_USEC/1000); result = poll(pollfds, 1, TIME_QUANTUM_USEC/1000);
} }
else { else {
/* wait (infinite) */ /* wait (infinite) */
result = poll(&pollfd, 1, -1); result = poll(pollfds, ARRAY_SIZE(pollfds), -1);
} }
if (result == 0) { if (result == 0) {
/* maybe timeout */ /* maybe timeout */
} }
else if (result > 0) { else if (result > 0) {
consume_communication_pipe(); consume_communication_pipe(timer_thread_pipe[0]);
consume_communication_pipe(timer_thread_pipe_low[0]);
} }
else { /* result < 0 */ else { /* result < 0 */
switch (errno) { switch (errno) {
@ -1340,39 +1427,7 @@ rb_thread_create_timer_thread(void)
# endif # endif
#endif #endif
#if USE_SLEEPY_TIMER_THREAD setup_communication_pipe();
/* communication pipe with timer thread and signal handler */
if (timer_thread_pipe_owner_process != getpid()) {
if (timer_thread_pipe[0] != -1) {
/* close pipe of parent process */
close_communication_pipe();
}
err = rb_cloexec_pipe(timer_thread_pipe);
if (err != 0) {
rb_bug_errno("thread_timer: Failed to create communication pipe for timer thread", errno);
}
rb_update_max_fd(timer_thread_pipe[0]);
rb_update_max_fd(timer_thread_pipe[1]);
# if defined(HAVE_FCNTL) && defined(F_GETFL) && defined(F_SETFL) && defined(O_NONBLOCK)
{
int oflags;
int err;
oflags = fcntl(timer_thread_pipe[1], F_GETFL);
if (oflags == -1)
rb_sys_fail(0);
oflags |= O_NONBLOCK;
err = fcntl(timer_thread_pipe[1], F_SETFL, oflags);
if (err == -1)
rb_sys_fail(0);
}
# endif /* defined(HAVE_FCNTL) && defined(F_GETFL) && defined(F_SETFL) */
/* validate pipe on this process */
timer_thread_pipe_owner_process = getpid();
}
#endif /* USE_SLEEPY_TIMER_THREAD */
/* create timer thread */ /* create timer thread */
if (timer_thread_id) { if (timer_thread_id) {
@ -1467,8 +1522,10 @@ int
rb_reserved_fd_p(int fd) rb_reserved_fd_p(int fd)
{ {
#if USE_SLEEPY_TIMER_THREAD #if USE_SLEEPY_TIMER_THREAD
if (fd == timer_thread_pipe[0] || if (fd == timer_thread_pipe[0] ||
fd == timer_thread_pipe[1]) { fd == timer_thread_pipe[1] ||
fd == timer_thread_pipe_low[0] ||
fd == timer_thread_pipe_low[1]) {
return 1; return 1;
} }
else { else {

View file

@ -1,6 +1,6 @@
#define RUBY_VERSION "2.0.0" #define RUBY_VERSION "2.0.0"
#define RUBY_RELEASE_DATE "2013-03-12" #define RUBY_RELEASE_DATE "2013-03-12"
#define RUBY_PATCHLEVEL 55 #define RUBY_PATCHLEVEL 56
#define RUBY_RELEASE_YEAR 2013 #define RUBY_RELEASE_YEAR 2013
#define RUBY_RELEASE_MONTH 3 #define RUBY_RELEASE_MONTH 3