lib/graphql/dataloader.rb in graphql-2.1.9 vs lib/graphql/dataloader.rb in graphql-2.1.10
- old
+ new
@@ -116,16 +116,11 @@
#
# Dataloader will resume the fiber after the requested data has been loaded (by another Fiber).
#
# @return [void]
def yield
- if use_fiber_resume?
- Fiber.yield
- else
- parent_fiber = Thread.current[:parent_fiber]
- parent_fiber.transfer
- end
+ Fiber.yield
nil
end
# @api private Nothing to see here
def append_job(&job)
@@ -166,93 +161,88 @@
run
res
ensure
@pending_jobs = prev_queue
prev_pending_keys.each do |source_instance, pending|
- source_instance.pending.merge!(pending)
+ pending.each do |key, value|
+ if !source_instance.results.key?(key)
+ source_instance.pending[key] = value
+ end
+ end
end
end
def run
job_fibers = []
next_job_fibers = []
source_fibers = []
next_source_fibers = []
first_pass = true
+ manager = spawn_fiber do
+ while first_pass || job_fibers.any?
+ first_pass = false
- while first_pass || job_fibers.any?
- first_pass = false
-
- while (f = job_fibers.shift || spawn_job_fiber)
- if f.alive?
- run_fiber(f)
- next_job_fibers << f
+ while (f = (job_fibers.shift || spawn_job_fiber))
+ if f.alive?
+ finished = run_fiber(f)
+ if !finished
+ next_job_fibers << f
+ end
+ end
end
- end
- join_queues(job_fibers, next_job_fibers)
+ join_queues(job_fibers, next_job_fibers)
- while source_fibers.any? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) }
- while (f = source_fibers.shift || spawn_source_fiber)
- if f.alive?
- run_fiber(f)
- next_source_fibers << f
+ while source_fibers.any? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) }
+ while (f = source_fibers.shift || spawn_source_fiber)
+ if f.alive?
+ finished = run_fiber(f)
+ if !finished
+ next_source_fibers << f
+ end
+ end
end
+ join_queues(source_fibers, next_source_fibers)
end
- join_queues(source_fibers, next_source_fibers)
end
end
+ run_fiber(manager)
+
+ if manager.alive?
+ raise "Invariant: Manager fiber didn't terminate properly."
+ end
+
if job_fibers.any?
raise "Invariant: job fibers should have exited but #{job_fibers.size} remained"
end
if source_fibers.any?
raise "Invariant: source fibers should have exited but #{source_fibers.size} remained"
end
-
rescue UncaughtThrowError => e
throw e.tag, e.value
end
def run_fiber(f)
- if use_fiber_resume?
- f.resume
- else
- f.transfer
- end
+ f.resume
end
def spawn_fiber
fiber_vars = get_fiber_variables
- parent_fiber = use_fiber_resume? ? nil : Fiber.current
Fiber.new(blocking: !@nonblocking) {
set_fiber_variables(fiber_vars)
- Thread.current[:parent_fiber] = parent_fiber
yield
# With `.transfer`, you have to explicitly pass back to the parent --
# if the fiber is allowed to terminate normally, control is passed to the main fiber instead.
- if parent_fiber
- parent_fiber.transfer(true)
- else
- true
- end
+ true
}
end
private
def join_queues(prev_queue, new_queue)
@nonblocking && Fiber.scheduler.run
prev_queue.concat(new_queue)
new_queue.clear
- end
-
- def use_fiber_resume?
- Fiber.respond_to?(:scheduler) &&
- (
- (defined?(::DummyScheduler) && Fiber.scheduler.is_a?(::DummyScheduler)) ||
- (defined?(::Evt) && ::Evt::Scheduler.singleton_class::BACKENDS.any? { |be| Fiber.scheduler.is_a?(be) }) ||
- (defined?(::Libev) && Fiber.scheduler.is_a?(::Libev::Scheduler))
- )
end
def spawn_job_fiber
if @pending_jobs.any?
spawn_fiber do