lib/graphql/dataloader.rb in graphql-1.12.24 vs lib/graphql/dataloader.rb in graphql-1.13.0

- old
+ new

@@ -21,14 +21,24 @@ # def team # dataloader.with(Sources::Record, Team).load(object.team_id) # end # class Dataloader - def self.use(schema) - schema.dataloader_class = self + class << self + attr_accessor :default_nonblocking end + AsyncDataloader = Class.new(self) { self.default_nonblocking = true } + + def self.use(schema, nonblocking: nil) + schema.dataloader_class = if nonblocking + AsyncDataloader + else + self + end + end + # Call the block with a Dataloader instance, # then run all enqueued jobs and return the result of the block. def self.with_dataloading(&block) dataloader = self.new result = nil @@ -37,22 +47,29 @@ } dataloader.run result end - def initialize + def initialize(nonblocking: self.class.default_nonblocking) @source_cache = Hash.new { |h, k| h[k] = {} } @pending_jobs = [] + if !nonblocking.nil? + @nonblocking = nonblocking + end end + def nonblocking? + @nonblocking + end + # Get a Source instance from this dataloader, for calling `.load(...)` or `.request(...)` on. # # @param source_class [Class<GraphQL::Dataloader::Source] # @param batch_parameters [Array<Object>] # @return [GraphQL::Dataloader::Source] An instance of {source_class}, initialized with `self, *batch_parameters`, # and cached for the lifetime of this {Multiplex}. - if RUBY_VERSION < "3" + if RUBY_VERSION < "3" || RUBY_ENGINE != "ruby" # truffle-ruby wasn't doing well with the implementation below def with(source_class, *batch_args) batch_key = source_class.batch_key_for(*batch_args) @source_cache[source_class][batch_key] ||= begin source = source_class.new(*batch_args) source.setup(self) @@ -115,10 +132,13 @@ end end # @api private Move along, move along def run + if @nonblocking && !Fiber.scheduler + raise "`nonblocking: true` requires `Fiber.scheduler`, assign one with `Fiber.set_scheduler(...)` before executing GraphQL." + end # At a high level, the algorithm is: # # A) Inside Fibers, run jobs from the queue one-by-one # - When one of the jobs yields to the dataloader (`Fiber.yield`), then that fiber will pause # - In that case, if there are still pending jobs, a new Fiber will be created to run jobs @@ -135,10 +155,12 @@ # For whatever reason, the best implementation I could find was to order the steps `[D, A, B, C]`, with a special case for skipping `D` # on the first pass. I just couldn't find a better way to write the loops in a way that was DRY and easy to read. # pending_fibers = [] next_fibers = [] + pending_source_fibers = [] + next_source_fibers = [] first_pass = true while first_pass || (f = pending_fibers.shift) if first_pass first_pass = false @@ -172,35 +194,31 @@ # Sources might queue up other Sources, which is fine -- those will also run before resuming execution. # # This is where an evented approach would be even better -- can we tell which # fibers are ready to continue, and continue execution there? # - source_fiber_queue = if (first_source_fiber = create_source_fiber) - [first_source_fiber] - else - nil + if (first_source_fiber = create_source_fiber) + pending_source_fibers << first_source_fiber end - if source_fiber_queue - while (outer_source_fiber = source_fiber_queue.shift) + while pending_source_fibers.any? + while (outer_source_fiber = pending_source_fibers.pop) resume(outer_source_fiber) - - # If this source caused more sources to become pending, run those before running this one again: - next_source_fiber = create_source_fiber - if next_source_fiber - source_fiber_queue << next_source_fiber - end - if outer_source_fiber.alive? - source_fiber_queue << outer_source_fiber + next_source_fibers << outer_source_fiber end + if (next_source_fiber = create_source_fiber) + pending_source_fibers << next_source_fiber + end end + join_queues(pending_source_fibers, next_source_fibers) + next_source_fibers.clear end # Move newly-enqueued Fibers on to the list to be resumed. # Clear out the list of next-round Fibers, so that # any Fibers that pause can be put on it. - pending_fibers.concat(next_fibers) + join_queues(pending_fibers, next_fibers) next_fibers.clear end end if @pending_jobs.any? @@ -211,10 +229,18 @@ raise "Invariant: #{next_fibers.size} next fibers" end nil end + def join_queues(previous_queue, next_queue) + if @nonblocking + Fiber.scheduler.run + next_queue.select!(&:alive?) + end + previous_queue.concat(next_queue) + end + private # If there are pending sources, return a fiber for running them. # Otherwise, return `nil`. # @@ -264,12 +290,19 @@ Thread.current.keys.each do |fiber_var_key| fiber_locals[fiber_var_key] = Thread.current[fiber_var_key] end - Fiber.new do - fiber_locals.each { |k, v| Thread.current[k] = v } - yield + if @nonblocking + Fiber.new(blocking: false) do + fiber_locals.each { |k, v| Thread.current[k] = v } + yield + end + else + Fiber.new do + fiber_locals.each { |k, v| Thread.current[k] = v } + yield + end end end end end