lib/graphql/dataloader.rb in graphql-2.1.9 vs lib/graphql/dataloader.rb in graphql-2.1.10

- old
+ new

@@ -116,16 +116,11 @@ # # Dataloader will resume the fiber after the requested data has been loaded (by another Fiber). # # @return [void] def yield - if use_fiber_resume? - Fiber.yield - else - parent_fiber = Thread.current[:parent_fiber] - parent_fiber.transfer - end + Fiber.yield nil end # @api private Nothing to see here def append_job(&job) @@ -166,93 +161,88 @@ run res ensure @pending_jobs = prev_queue prev_pending_keys.each do |source_instance, pending| - source_instance.pending.merge!(pending) + pending.each do |key, value| + if !source_instance.results.key?(key) + source_instance.pending[key] = value + end + end end end def run job_fibers = [] next_job_fibers = [] source_fibers = [] next_source_fibers = [] first_pass = true + manager = spawn_fiber do + while first_pass || job_fibers.any? + first_pass = false - while first_pass || job_fibers.any? - first_pass = false - - while (f = job_fibers.shift || spawn_job_fiber) - if f.alive? - run_fiber(f) - next_job_fibers << f + while (f = (job_fibers.shift || spawn_job_fiber)) + if f.alive? + finished = run_fiber(f) + if !finished + next_job_fibers << f + end + end end - end - join_queues(job_fibers, next_job_fibers) + join_queues(job_fibers, next_job_fibers) - while source_fibers.any? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) } - while (f = source_fibers.shift || spawn_source_fiber) - if f.alive? - run_fiber(f) - next_source_fibers << f + while source_fibers.any? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) } + while (f = source_fibers.shift || spawn_source_fiber) + if f.alive? + finished = run_fiber(f) + if !finished + next_source_fibers << f + end + end end + join_queues(source_fibers, next_source_fibers) end - join_queues(source_fibers, next_source_fibers) end end + run_fiber(manager) + + if manager.alive? + raise "Invariant: Manager fiber didn't terminate properly." + end + if job_fibers.any? raise "Invariant: job fibers should have exited but #{job_fibers.size} remained" end if source_fibers.any? raise "Invariant: source fibers should have exited but #{source_fibers.size} remained" end - rescue UncaughtThrowError => e throw e.tag, e.value end def run_fiber(f) - if use_fiber_resume? - f.resume - else - f.transfer - end + f.resume end def spawn_fiber fiber_vars = get_fiber_variables - parent_fiber = use_fiber_resume? ? nil : Fiber.current Fiber.new(blocking: !@nonblocking) { set_fiber_variables(fiber_vars) - Thread.current[:parent_fiber] = parent_fiber yield # With `.transfer`, you have to explicitly pass back to the parent -- # if the fiber is allowed to terminate normally, control is passed to the main fiber instead. - if parent_fiber - parent_fiber.transfer(true) - else - true - end + true } end private def join_queues(prev_queue, new_queue) @nonblocking && Fiber.scheduler.run prev_queue.concat(new_queue) new_queue.clear - end - - def use_fiber_resume? - Fiber.respond_to?(:scheduler) && - ( - (defined?(::DummyScheduler) && Fiber.scheduler.is_a?(::DummyScheduler)) || - (defined?(::Evt) && ::Evt::Scheduler.singleton_class::BACKENDS.any? { |be| Fiber.scheduler.is_a?(be) }) || - (defined?(::Libev) && Fiber.scheduler.is_a?(::Libev::Scheduler)) - ) end def spawn_job_fiber if @pending_jobs.any? spawn_fiber do