Expose scheduler as public interface & bug fixes. (#3945)

* Rename `rb_scheduler` to `rb_fiber_scheduler`.

* Use public interface if available.

* Use `rb_check_funcall` where possible.

* Don't use `unblock` unless the fiber was non-blocking.
This commit is contained in:
Samuel Williams 2021-02-09 19:39:56 +13:00 committed by GitHub
parent 3c593f28ed
commit 5f69a7f604
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
Notes: git 2021-02-09 15:40:27 +09:00
Merged-By: ioquatix <samuel@codeotaku.com>
18 changed files with 356 additions and 245 deletions

View file

@ -15,24 +15,25 @@ def fetch_topics(topics)
topics.each do |topic|
Fiber.new(blocking: Fiber.current.blocking?) do
uri = URI("https://www.google.com/search?q=#{topic}")
responses[topic] = Net::HTTP.get(uri).scan(topic).size
response = Net::HTTP.get(uri)
responses[topic] = response.scan(topic).size
end.resume
end
Thread.fiber_scheduler&.run
Fiber.scheduler&.run
return responses
end
def sweep(repeats: 3, **options)
times = (1..8).map do |i|
$stderr.puts "Measuring #{i} topic(s)..."
$stderr.puts "Measuring #{i} topic(s) #{options.inspect}..."
topics = TOPICS[0...i]
Thread.new do
Benchmark.realtime do
scheduler = Scheduler.new
Fiber.set_scheduler scheduler
Fiber.set_scheduler(scheduler)
repeats.times do
Fiber.new(**options) do
@ -49,5 +50,5 @@ def sweep(repeats: 3, **options)
puts JSON.dump(times.map{|value| value.round(3)})
end
sweep(blocking: true)
# sweep(blocking: true)
sweep(blocking: false)

View file

@ -47,6 +47,8 @@ class Scheduler
end
def run
# $stderr.puts [__method__, Fiber.current].inspect
while @readable.any? or @writable.any? or @waiting.any? or @blocking.positive?
# Can only handle file descriptors up to 1024...
readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)
@ -54,9 +56,11 @@ class Scheduler
# puts "readable: #{readable}" if readable&.any?
# puts "writable: #{writable}" if writable&.any?
selected = {}
readable&.each do |io|
if fiber = @readable.delete(io)
fiber.resume
selected[fiber] = IO::READABLE
elsif io == @urgent.first
@urgent.first.read_nonblock(1024)
end
@ -64,10 +68,14 @@ class Scheduler
writable&.each do |io|
if fiber = @writable.delete(io)
fiber.resume
selected[fiber] |= IO::WRITABLE
end
end
selected.each do |fiber, events|
fiber.resume(events)
end
if @waiting.any?
time = current_time
waiting, @waiting = @waiting, {}
@ -96,6 +104,8 @@ class Scheduler
end
def close
# $stderr.puts [__method__, Fiber.current].inspect
raise "Scheduler already closed!" if @closed
self.run
@ -118,6 +128,8 @@ class Scheduler
end
def process_wait(pid, flags)
# $stderr.puts [__method__, pid, flags, Fiber.current].inspect
# This is a very simple way to implement a non-blocking wait:
Thread.new do
Process::Status.wait(pid, flags)
@ -125,6 +137,8 @@ class Scheduler
end
def io_wait(io, events, duration)
# $stderr.puts [__method__, io, events, duration, Fiber.current].inspect
unless (events & IO::READABLE).zero?
@readable[io] = Fiber.current
end
@ -134,12 +148,12 @@ class Scheduler
end
Fiber.yield
return true
end
# Used for Kernel#sleep and Mutex#sleep
def kernel_sleep(duration = nil)
# $stderr.puts [__method__, duration, Fiber.current].inspect
self.block(:sleep, duration)
return true
@ -171,6 +185,8 @@ class Scheduler
# This might be called from another thread.
def unblock(blocker, fiber)
# $stderr.puts [__method__, blocker, fiber].inspect
# $stderr.puts blocker.backtrace.inspect
# $stderr.puts fiber.backtrace.inspect
@lock.synchronize do
@ready << fiber

45
test/fiber/test_thread.rb Normal file
View file

@ -0,0 +1,45 @@
# frozen_string_literal: true
require "test/unit"
require_relative 'scheduler'
class TestFiberThread < Test::Unit::TestCase
def test_thread_join
thread = Thread.new do
scheduler = Scheduler.new
Fiber.set_scheduler scheduler
result = nil
Fiber.schedule do
result = Thread.new{:done}.value
end
scheduler.run
result
end
assert_equal :done, thread.value
end
def test_thread_join_blocking
thread = Thread.new do
scheduler = Scheduler.new
Fiber.set_scheduler scheduler
result = nil
Fiber.schedule do
Fiber.new(blocking: true) do
# This can deadlock if the blocking state is not taken into account:
Thread.new do
sleep(0)
result = :done
end.join
end.resume
end
scheduler.run
result
end
assert_equal :done, thread.value
end
end