mirror of
https://github.com/ruby/ruby.git
synced 2025-08-15 13:39:04 +02:00
Fix lock ordering issue for rb_ractor_sched_wait() and rb_ractor_sched_wakeup()
In rb_ractor_sched_wait() (ex: Ractor.receive), we acquire RACTOR_LOCK(cr) and then thread_sched_lock(cur_th). However, on wakeup if we're a dnt, in thread_sched_wait_running_turn() we acquire thread_sched_lock(cur_th) after condvar wakeup and then RACTOR_LOCK(cr). This lock inversion can cause a deadlock with rb_ractor_wakeup_all() (ex: port.send(obj)), where we acquire RACTOR_LOCK(other_r) and then thread_sched_lock(other_th). So, the error happens: nt 1: Ractor.receive rb_ractor_sched_wait() after condvar wakeup in thread_sched_wait_running_turn(): - thread_sched_lock(cur_th) (condvar) # acquires lock - rb_ractor_lock_self(cr) # deadlock here: tries to acquire, HANGS nt 2: port.send ractor_wakeup_all() - RACTOR_LOCK(port_r) # acquires lock - thread_sched_lock # tries to acquire, HANGS To fix it, we now unlock the thread_sched_lock before acquiring the ractor_lock in rb_ractor_sched_wait(). Script that reproduces issue: ```ruby require "async" class RactorWrapper def initialize @ractor = Ractor.new do Ractor.recv # Ractor doesn't start until explicitly told to # Do some calculations fib = ->(x) { x < 2 ? 1 : fib.call(x - 1) + fib.call(x - 2) } fib.call(20) end end def take_async @ractor.send(nil) Thread.new { @ractor.value }.value end end Async do |task| 10_000.times do |i| task.async do RactorWrapper.new.take_async puts i end end end exit 0 ``` Fixes [Bug #21398] Co-authored-by: John Hawthorn <john.hawthorn@shopify.com>
This commit is contained in:
parent
e639e5fd1a
commit
07878ebe78
2 changed files with 44 additions and 8 deletions
|
@ -162,6 +162,45 @@ class TestRactor < Test::Unit::TestCase
|
|||
RUBY
|
||||
end
|
||||
|
||||
# [Bug #21398]
|
||||
def test_port_receive_dnt_with_port_send
|
||||
assert_ractor(<<~'RUBY', timeout: 30)
|
||||
THREADS = 10
|
||||
JOBS_PER_THREAD = 50
|
||||
ARRAY_SIZE = 20_000
|
||||
def ractor_job(job_count, array_size)
|
||||
port = Ractor::Port.new
|
||||
workers = (1..4).map do |i|
|
||||
Ractor.new(port) do |job_port|
|
||||
while job = Ractor.receive
|
||||
result = job.map { |x| x * 2 }.sum
|
||||
job_port.send result
|
||||
end
|
||||
end
|
||||
end
|
||||
jobs = Array.new(job_count) { Array.new(array_size) { rand(1000) } }
|
||||
jobs.each_with_index do |job, i|
|
||||
w_idx = i % 4
|
||||
workers[w_idx].send(job)
|
||||
end
|
||||
results = []
|
||||
jobs.size.times do
|
||||
result = port.receive # dnt receive
|
||||
results << result
|
||||
end
|
||||
results
|
||||
end
|
||||
threads = []
|
||||
# creates 40 ractors (THREADSx4)
|
||||
THREADS.times do
|
||||
threads << Thread.new do
|
||||
ractor_job(JOBS_PER_THREAD, ARRAY_SIZE)
|
||||
end
|
||||
end
|
||||
threads.each(&:join)
|
||||
RUBY
|
||||
end
|
||||
|
||||
def assert_make_shareable(obj)
|
||||
refute Ractor.shareable?(obj), "object was already shareable"
|
||||
Ractor.make_shareable(obj)
|
||||
|
|
|
@ -1351,6 +1351,7 @@ rb_ractor_sched_wait(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_fun
|
|||
}
|
||||
|
||||
thread_sched_lock(sched, th);
|
||||
rb_ractor_unlock_self(cr);
|
||||
{
|
||||
// setup sleep
|
||||
bool can_direct_transfer = !th_has_dedicated_nt(th);
|
||||
|
@ -1358,16 +1359,12 @@ rb_ractor_sched_wait(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_fun
|
|||
th->status = THREAD_STOPPED_FOREVER;
|
||||
RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th);
|
||||
thread_sched_wakeup_next_thread(sched, th, can_direct_transfer);
|
||||
|
||||
rb_ractor_unlock_self(cr);
|
||||
{
|
||||
// sleep
|
||||
thread_sched_wait_running_turn(sched, th, can_direct_transfer);
|
||||
th->status = THREAD_RUNNABLE;
|
||||
}
|
||||
rb_ractor_lock_self(cr);
|
||||
}
|
||||
thread_sched_unlock(sched, th);
|
||||
rb_ractor_lock_self(cr);
|
||||
|
||||
ubf_clear(th);
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue