diff --git a/test/ruby/test_ractor.rb b/test/ruby/test_ractor.rb index 0a456a1d0f..74de2bf9cd 100644 --- a/test/ruby/test_ractor.rb +++ b/test/ruby/test_ractor.rb @@ -162,6 +162,45 @@ class TestRactor < Test::Unit::TestCase RUBY end + # [Bug #21398] + def test_port_receive_dnt_with_port_send + assert_ractor(<<~'RUBY', timeout: 30) + THREADS = 10 + JOBS_PER_THREAD = 50 + ARRAY_SIZE = 20_000 + def ractor_job(job_count, array_size) + port = Ractor::Port.new + workers = (1..4).map do |i| + Ractor.new(port) do |job_port| + while job = Ractor.receive + result = job.map { |x| x * 2 }.sum + job_port.send result + end + end + end + jobs = Array.new(job_count) { Array.new(array_size) { rand(1000) } } + jobs.each_with_index do |job, i| + w_idx = i % 4 + workers[w_idx].send(job) + end + results = [] + jobs.size.times do + result = port.receive # dnt receive + results << result + end + results + end + threads = [] + # creates 40 ractors (THREADSx4) + THREADS.times do + threads << Thread.new do + ractor_job(JOBS_PER_THREAD, ARRAY_SIZE) + end + end + threads.each(&:join) + RUBY + end + def assert_make_shareable(obj) refute Ractor.shareable?(obj), "object was already shareable" Ractor.make_shareable(obj) diff --git a/thread_pthread.c b/thread_pthread.c index 377e1d9f64..730ecb5416 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -1351,6 +1351,7 @@ rb_ractor_sched_wait(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_fun } thread_sched_lock(sched, th); + rb_ractor_unlock_self(cr); { // setup sleep bool can_direct_transfer = !th_has_dedicated_nt(th); @@ -1358,16 +1359,12 @@ rb_ractor_sched_wait(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_fun th->status = THREAD_STOPPED_FOREVER; RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); thread_sched_wakeup_next_thread(sched, th, can_direct_transfer); - - rb_ractor_unlock_self(cr); - { - // sleep - thread_sched_wait_running_turn(sched, th, can_direct_transfer); - th->status = THREAD_RUNNABLE; - } - rb_ractor_lock_self(cr); + // sleep + thread_sched_wait_running_turn(sched, th, can_direct_transfer); + th->status = THREAD_RUNNABLE; } thread_sched_unlock(sched, th); + rb_ractor_lock_self(cr); ubf_clear(th);