mirror of
https://github.com/ruby/ruby.git
synced 2025-09-15 08:33:58 +02:00
* thread.c: fix Mutex to be interruptable lock.
* thread_win32.ci, thread_win32.h, thread_pthread.ci, thread_pthread.h: prepare native_cond_*() which are based on pthread_cond_*() spec. * prelude.rb: fix Mutex#synchronize method. * vm_core.h, include/ruby/intern.h: change unblock function interface (to pass some user data). * file.c, process.c: ditto. * benchmark/bm_vm2_mutex.rb: add a benchmark for mutex. * benchmark/bm_vm3_thread_mutex.rb: add a benchmark for mutex with contension. * benchmark/run.rb: fix to remove ENV['RUBYLIB'] for matzruby. * test/ruby/test_thread.rb: add a test. * common.mk: fix benchmark options. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@13290 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
parent
51fb5511e0
commit
6244e502cc
20 changed files with 363 additions and 95 deletions
142
thread.c
142
thread.c
|
@ -80,7 +80,8 @@ st_delete_wrap(st_table * table, VALUE key)
|
|||
|
||||
#define THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION
|
||||
|
||||
static rb_unblock_function_t* set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func);
|
||||
static void set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *ptr,
|
||||
rb_unblock_function_t **oldfunc, void **oldptr);
|
||||
|
||||
#define GVL_UNLOCK_BEGIN() do { \
|
||||
rb_thread_t *_th_stored = GET_THREAD(); \
|
||||
|
@ -92,10 +93,12 @@ static rb_unblock_function_t* set_unblock_function(rb_thread_t *th, rb_unblock_f
|
|||
rb_thread_set_current(_th_stored); \
|
||||
} while(0)
|
||||
|
||||
#define BLOCKING_REGION(exec, ubf) do { \
|
||||
#define BLOCKING_REGION(exec, ubf, ubfarg) do { \
|
||||
rb_thread_t *__th = GET_THREAD(); \
|
||||
int __prev_status = __th->status; \
|
||||
rb_unblock_function_t *__oldubf = set_unblock_function(__th, ubf); \
|
||||
rb_unblock_function_t *__oldubf; \
|
||||
void *__oldubfarg; \
|
||||
set_unblock_function(__th, ubf, ubfarg, &__oldubf, &__oldubfarg); \
|
||||
__th->status = THREAD_STOPPED; \
|
||||
thread_debug("enter blocking region (%p)\n", __th); \
|
||||
GVL_UNLOCK_BEGIN(); {\
|
||||
|
@ -104,7 +107,7 @@ static rb_unblock_function_t* set_unblock_function(rb_thread_t *th, rb_unblock_f
|
|||
GVL_UNLOCK_END(); \
|
||||
thread_debug("leave blocking region (%p)\n", __th); \
|
||||
remove_signal_thread_list(__th); \
|
||||
set_unblock_function(__th, __oldubf); \
|
||||
set_unblock_function(__th, __oldubf, __oldubfarg, 0, 0); \
|
||||
if (__th->status == THREAD_STOPPED) { \
|
||||
__th->status = __prev_status; \
|
||||
} \
|
||||
|
@ -191,11 +194,10 @@ rb_thread_debug(const char *fmt, ...)
|
|||
#endif
|
||||
|
||||
|
||||
static rb_unblock_function_t *
|
||||
set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func)
|
||||
static void
|
||||
set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg,
|
||||
rb_unblock_function_t **oldfunc, void **oldarg)
|
||||
{
|
||||
rb_unblock_function_t *oldfunc;
|
||||
|
||||
check_ints:
|
||||
RUBY_VM_CHECK_INTS(); /* check signal or so */
|
||||
native_mutex_lock(&th->interrupt_lock);
|
||||
|
@ -204,12 +206,12 @@ set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func)
|
|||
goto check_ints;
|
||||
}
|
||||
else {
|
||||
oldfunc = th->unblock_function;
|
||||
if (oldfunc) *oldfunc = th->unblock_function;
|
||||
if (oldarg) *oldarg = th->unblock_function_arg;
|
||||
th->unblock_function = func;
|
||||
th->unblock_function_arg = arg;
|
||||
}
|
||||
native_mutex_unlock(&th->interrupt_lock);
|
||||
|
||||
return oldfunc;
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -218,7 +220,7 @@ rb_thread_interrupt(rb_thread_t *th)
|
|||
native_mutex_lock(&th->interrupt_lock);
|
||||
th->interrupt_flag = 1;
|
||||
if (th->unblock_function) {
|
||||
(th->unblock_function)(th);
|
||||
(th->unblock_function)(th, th->unblock_function_arg);
|
||||
}
|
||||
else {
|
||||
/* none */
|
||||
|
@ -661,8 +663,8 @@ rb_thread_s_critical(VALUE self)
|
|||
|
||||
VALUE
|
||||
rb_thread_blocking_region(
|
||||
rb_blocking_function_t *func, void *data,
|
||||
rb_unblock_function_t *ubf)
|
||||
rb_blocking_function_t *func, void *data1,
|
||||
rb_unblock_function_t *ubf, void *data2)
|
||||
{
|
||||
VALUE val;
|
||||
rb_thread_t *th = GET_THREAD();
|
||||
|
@ -670,9 +672,10 @@ rb_thread_blocking_region(
|
|||
if (ubf == RB_UBF_DFL) {
|
||||
ubf = ubf_select;
|
||||
}
|
||||
|
||||
BLOCKING_REGION({
|
||||
val = func(th, data);
|
||||
}, ubf);
|
||||
val = func(th, data1);
|
||||
}, ubf, data2);
|
||||
|
||||
return val;
|
||||
}
|
||||
|
@ -1747,14 +1750,14 @@ do_select(int n, fd_set *read, fd_set *write, fd_set *except,
|
|||
if (except) *except = orig_except;
|
||||
wait = &wait_100ms;
|
||||
} while (__th->interrupt_flag == 0 && (timeout == 0 || subst(timeout, &wait_100ms)));
|
||||
}, 0);
|
||||
}, 0, 0);
|
||||
} while (result == 0 && (timeout == 0 || subst(timeout, &wait_100ms)));
|
||||
}
|
||||
#else
|
||||
BLOCKING_REGION({
|
||||
result = select(n, read, write, except, timeout);
|
||||
if (result < 0) lerrno = errno;
|
||||
}, ubf_select);
|
||||
}, ubf_select, 0);
|
||||
#endif
|
||||
|
||||
errno = lerrno;
|
||||
|
@ -2146,11 +2149,13 @@ thgroup_add(VALUE group, VALUE thread)
|
|||
*/
|
||||
|
||||
typedef struct mutex_struct {
|
||||
rb_thread_t *th;
|
||||
rb_thread_lock_t lock;
|
||||
rb_thread_cond_t cond;
|
||||
rb_thread_t volatile *th;
|
||||
volatile int cond_waiting;
|
||||
} mutex_t;
|
||||
|
||||
#define GetMutexVal(obj, tobj) \
|
||||
#define GetMutexPtr(obj, tobj) \
|
||||
Data_Get_Struct(obj, mutex_t, tobj)
|
||||
|
||||
static void
|
||||
|
@ -2169,10 +2174,8 @@ mutex_free(void *ptr)
|
|||
{
|
||||
if (ptr) {
|
||||
mutex_t *mutex = ptr;
|
||||
if (mutex->th) {
|
||||
native_mutex_unlock(&mutex->lock);
|
||||
}
|
||||
native_mutex_destroy(&mutex->lock);
|
||||
native_cond_destroy(&mutex->cond);
|
||||
}
|
||||
ruby_xfree(ptr);
|
||||
}
|
||||
|
@ -2184,8 +2187,8 @@ mutex_alloc(VALUE klass)
|
|||
mutex_t *mutex;
|
||||
|
||||
obj = Data_Make_Struct(klass, mutex_t, mutex_mark, mutex_free, mutex);
|
||||
mutex->th = 0;
|
||||
native_mutex_initialize(&mutex->lock);
|
||||
native_cond_initialize(&mutex->cond);
|
||||
return obj;
|
||||
}
|
||||
|
||||
|
@ -2217,7 +2220,7 @@ VALUE
|
|||
rb_mutex_locked_p(VALUE self)
|
||||
{
|
||||
mutex_t *mutex;
|
||||
GetMutexVal(self, mutex);
|
||||
GetMutexPtr(self, mutex);
|
||||
return mutex->th ? Qtrue : Qfalse;
|
||||
}
|
||||
|
||||
|
@ -2229,22 +2232,67 @@ rb_mutex_locked_p(VALUE self)
|
|||
* lock was granted.
|
||||
*/
|
||||
VALUE
|
||||
rb_mutex_try_lock(VALUE self)
|
||||
rb_mutex_trylock(VALUE self)
|
||||
{
|
||||
mutex_t *mutex;
|
||||
GetMutexVal(self, mutex);
|
||||
VALUE locked = Qfalse;
|
||||
GetMutexPtr(self, mutex);
|
||||
|
||||
if (mutex->th == GET_THREAD()) {
|
||||
rb_raise(rb_eThreadError, "deadlock; recursive locking");
|
||||
}
|
||||
|
||||
if (native_mutex_trylock(&mutex->lock) != EBUSY) {
|
||||
native_mutex_lock(&mutex->lock);
|
||||
if (mutex->th == 0) {
|
||||
mutex->th = GET_THREAD();
|
||||
return Qtrue;
|
||||
locked = Qtrue;
|
||||
}
|
||||
else {
|
||||
return Qfalse;
|
||||
native_mutex_unlock(&mutex->lock);
|
||||
|
||||
return locked;
|
||||
}
|
||||
|
||||
static VALUE
|
||||
lock_func(rb_thread_t *th, void *ptr)
|
||||
{
|
||||
int locked = 0;
|
||||
mutex_t *mutex = (mutex_t *)ptr;
|
||||
|
||||
while (locked == 0) {
|
||||
native_mutex_lock(&mutex->lock);
|
||||
|
||||
if (mutex->th == 0) {
|
||||
mutex->th = th;
|
||||
locked = 1;
|
||||
}
|
||||
else {
|
||||
mutex->cond_waiting++;
|
||||
native_cond_wait(&mutex->cond, &mutex->lock);
|
||||
|
||||
if (th->interrupt_flag) {
|
||||
locked = 1;
|
||||
}
|
||||
else if (mutex->th == 0) {
|
||||
mutex->th = th;
|
||||
locked = 1;
|
||||
}
|
||||
}
|
||||
|
||||
native_mutex_unlock(&mutex->lock);
|
||||
}
|
||||
return Qnil;
|
||||
}
|
||||
|
||||
static void
|
||||
lock_interrupt(rb_thread_t *th, void *ptr)
|
||||
{
|
||||
mutex_t *mutex = (mutex_t *)ptr;
|
||||
native_mutex_lock(&mutex->lock);
|
||||
if (mutex->cond_waiting > 0) {
|
||||
native_cond_broadcast(&mutex->cond);
|
||||
mutex->cond_waiting = 0;
|
||||
}
|
||||
native_mutex_unlock(&mutex->lock);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -2257,21 +2305,17 @@ rb_mutex_try_lock(VALUE self)
|
|||
VALUE
|
||||
rb_mutex_lock(VALUE self)
|
||||
{
|
||||
mutex_t *mutex;
|
||||
GetMutexVal(self, mutex);
|
||||
if (rb_mutex_trylock(self) == Qfalse) {
|
||||
mutex_t *mutex;
|
||||
rb_thread_t *th = GET_THREAD();
|
||||
GetMutexPtr(self, mutex);
|
||||
|
||||
if (mutex->th == GET_THREAD()) {
|
||||
rb_raise(rb_eThreadError, "deadlock; recursive locking");
|
||||
while (mutex->th != th) {
|
||||
rb_thread_blocking_region(lock_func, mutex, lock_interrupt, mutex);
|
||||
RUBY_VM_CHECK_INTS();
|
||||
}
|
||||
}
|
||||
|
||||
if (native_mutex_trylock(&mutex->lock) != 0) {
|
||||
/* can't cancel */
|
||||
GVL_UNLOCK_BEGIN();
|
||||
native_mutex_lock(&mutex->lock);
|
||||
GVL_UNLOCK_END();
|
||||
}
|
||||
|
||||
mutex->th = GET_THREAD();
|
||||
return self;
|
||||
}
|
||||
|
||||
|
@ -2286,14 +2330,22 @@ VALUE
|
|||
rb_mutex_unlock(VALUE self)
|
||||
{
|
||||
mutex_t *mutex;
|
||||
GetMutexVal(self, mutex);
|
||||
GetMutexPtr(self, mutex);
|
||||
|
||||
if (mutex->th != GET_THREAD()) {
|
||||
rb_raise(rb_eThreadError,
|
||||
"Attempt to unlock a mutex which is locked by another thread");
|
||||
}
|
||||
|
||||
native_mutex_lock(&mutex->lock);
|
||||
mutex->th = 0;
|
||||
if (mutex->cond_waiting > 0) {
|
||||
/* waiting thread */
|
||||
native_cond_signal(&mutex->cond);
|
||||
mutex->cond_waiting--;
|
||||
}
|
||||
native_mutex_unlock(&mutex->lock);
|
||||
|
||||
return self;
|
||||
}
|
||||
|
||||
|
@ -2963,7 +3015,7 @@ Init_Thread(void)
|
|||
rb_define_alloc_func(rb_cMutex, mutex_alloc);
|
||||
rb_define_method(rb_cMutex, "initialize", mutex_initialize, 0);
|
||||
rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0);
|
||||
rb_define_method(rb_cMutex, "try_lock", rb_mutex_try_lock, 0);
|
||||
rb_define_method(rb_cMutex, "try_lock", rb_mutex_trylock, 0);
|
||||
rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0);
|
||||
rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0);
|
||||
rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue