lib/graphql/dataloader.rb in graphql-1.12.3 vs lib/graphql/dataloader.rb in graphql-1.12.4

- old
+ new

@@ -25,40 +25,28 @@ class Dataloader def self.use(schema) schema.dataloader_class = self end - def initialize(multiplex_context) - @context = multiplex_context + def initialize @source_cache = Hash.new { |h, source_class| h[source_class] = Hash.new { |h2, batch_parameters| source = source_class.new(*batch_parameters) source.setup(self) h2[batch_parameters] = source } } - @waiting_fibers = [] - @yielded_fibers = {} + @pending_jobs = [] end - # @return [Hash] the {Multiplex} context - attr_reader :context - - # @api private - attr_reader :yielded_fibers - - # Add some work to this dataloader to be scheduled later. - # @param block Some work to enqueue - # @return [void] - def enqueue(&block) - @waiting_fibers << Fiber.new { - begin - yield - rescue StandardError => exception - exception - end - } - nil + # Get a Source instance from this dataloader, for calling `.load(...)` or `.request(...)` on. + # + # @param source_class [Class<GraphQL::Dataloader::Source] + # @param batch_parameters [Array<Object>] + # @return [GraphQL::Dataloader::Source] An instance of {source_class}, initialized with `self, *batch_parameters`, + # and cached for the lifetime of this {Multiplex}. + def with(source_class, *batch_parameters) + @source_cache[source_class][batch_parameters] end # Tell the dataloader that this fiber is waiting for data. # # Dataloader will resume the fiber after the requested data has been loaded (by another Fiber). @@ -67,37 +55,74 @@ def yield Fiber.yield nil end - # @param path [Array<String, Integer>] A graphql response path - # @return [Boolean] True if the current Fiber has yielded once via Dataloader at {path} - def yielded?(path) - @yielded_fibers[Fiber.current] == path + # @api private Nothing to see here + def append_job(&job) + # Given a block, queue it up to be worked through when `#run` is called. + # (If the dataloader is already running, than a Fiber will pick this up later.) + @pending_jobs.push(job) + nil end - # Run all Fibers until they're all done - # - # Each cycle works like this: - # - # - Run each pending execution fiber (`@waiting_fibers`), - # - Then run each pending Source, preparing more data for those fibers. - # - Run each pending Source _again_ (if one Source requested more data from another Source) - # - Continue until there are no pending sources - # - Repeat: run execution fibers again ... - # - # @return [void] + # @api private Move along, move along def run - # Start executing Fibers. This will run until all the Fibers are done. - already_run_fibers = [] - while (current_fiber = @waiting_fibers.pop) - # Run each execution fiber, enqueuing it in `already_run_fibers` - # if it's still `.alive?`. - # Any spin-off continuations will be enqueued in `@waiting_fibers` (via {#enqueue}) - resume_fiber_and_enqueue_continuation(current_fiber, already_run_fibers) + # At a high level, the algorithm is: + # + # A) Inside Fibers, run jobs from the queue one-by-one + # - When one of the jobs yields to the dataloader (`Fiber.yield`), then that fiber will pause + # - In that case, if there are still pending jobs, a new Fiber will be created to run jobs + # - Continue until all jobs have been _started_ by a Fiber. (Any number of those Fibers may be waiting to be resumed, after their data is loaded) + # B) Once all known jobs have been run until they are complete or paused for data, run all pending data sources. + # - Similarly, create a Fiber to consume pending sources and tell them to load their data. + # - If one of those Fibers pauses, then create a new Fiber to continue working through remaining pending sources. + # - When a source causes another source to become pending, run the newly-pending source _first_, since it's a dependency of the previous one. + # C) After all pending sources have been completely loaded (there are no more pending sources), resume any Fibers that were waiting for data. + # - Those Fibers assume that source caches will have been populated with the data they were waiting for. + # - Those Fibers may request data from a source again, in which case they will yeilded and be added to a new pending fiber list. + # D) Once all pending fibers have been resumed once, return to `A` above. + # + # For whatever reason, the best implementation I could find was to order the steps `[D, A, B, C]`, with a special case for skipping `D` + # on the first pass. I just couldn't find a better way to write the loops in a way that was DRY and easy to read. + # + pending_fibers = [] + next_fibers = [] + first_pass = true - if @waiting_fibers.empty? + while first_pass || (f = pending_fibers.shift) + if first_pass + first_pass = false + else + # These fibers were previously waiting for sources to load data, + # resume them. (They might wait again, in which case, re-enqueue them.) + f.resume + if f.alive? + next_fibers << f + end + end + + while @pending_jobs.any? + # Create a Fiber to consume jobs until one of the jobs yields + # or jobs run out + f = Fiber.new { + while (job = @pending_jobs.shift) + job.call + end + } + result = f.resume + if result.is_a?(StandardError) + raise result + end + # In this case, the job yielded. Queue it up to run again after + # we load whatever it's waiting for. + if f.alive? + next_fibers << f + end + end + + if pending_fibers.empty? # Now, run all Sources which have become pending _before_ resuming GraphQL execution. # Sources might queue up other Sources, which is fine -- those will also run before resuming execution. # # This is where an evented approach would be even better -- can we tell which # fibers are ready to continue, and continue execution there? @@ -107,72 +132,49 @@ else nil end if source_fiber_stack + # Use a stack with `.pop` here so that when a source causes another source to become pending, + # that newly-pending source will run _before_ the one that depends on it. + # (See below where the old fiber is pushed to the stack, then the new fiber is pushed on the stack.) while (outer_source_fiber = source_fiber_stack.pop) - resume_fiber_and_enqueue_continuation(outer_source_fiber, source_fiber_stack) + result = outer_source_fiber.resume + if result.is_a?(StandardError) + raise result + end + if outer_source_fiber.alive? + source_fiber_stack << outer_source_fiber + end # If this source caused more sources to become pending, run those before running this one again: next_source_fiber = create_source_fiber if next_source_fiber source_fiber_stack << next_source_fiber end end end - - # We ran all the first round of execution fibers, - # and we ran all the pending sources. - # So pick up any paused execution fibers and repeat. - @waiting_fibers.concat(already_run_fibers) - already_run_fibers.clear + # Move newly-enqueued Fibers on to the list to be resumed. + # Clear out the list of next-round Fibers, so that + # any Fibers that pause can be put on it. + pending_fibers.concat(next_fibers) + next_fibers.clear end end + + if @pending_jobs.any? + raise "Invariant: #{@pending_jobs.size} pending jobs" + elsif pending_fibers.any? + raise "Invariant: #{pending_fibers.size} pending fibers" + elsif next_fibers.any? + raise "Invariant: #{next_fibers.size} next fibers" + end nil end - # Get a Source instance from this dataloader, for calling `.load(...)` or `.request(...)` on. - # - # @param source_class [Class<GraphQL::Dataloader::Source] - # @param batch_parameters [Array<Object>] - # @return [GraphQL::Dataloader::Source] An instance of {source_class}, initialized with `self, *batch_parameters`, - # and cached for the lifetime of this {Multiplex}. - def with(source_class, *batch_parameters) - @source_cache[source_class][batch_parameters] - end - - # @api private - attr_accessor :current_runtime - private - # Check if this fiber is still alive. - # If it is, and it should continue, then enqueue a continuation. - # If it is, re-enqueue it in `fiber_queue`. - # Otherwise, clean it up from @yielded_fibers. - # @return [void] - def resume_fiber_and_enqueue_continuation(fiber, fiber_stack) - result = fiber.resume - if result.is_a?(StandardError) - raise result - end - - # This fiber yielded; there's more to do here. - # (If `#alive?` is false, then the fiber concluded without yielding.) - if fiber.alive? - if !@yielded_fibers.include?(fiber) - # This fiber hasn't yielded yet, we should enqueue a continuation fiber - @yielded_fibers[fiber] = current_runtime.progress_path - current_runtime.enqueue_selections_fiber - end - fiber_stack << fiber - else - # Keep this set clean so that fibers can be GC'ed during execution - @yielded_fibers.delete(fiber) - end - end - # If there are pending sources, return a fiber for running them. # Otherwise, return `nil`. # # @return [Fiber, nil] def create_source_fiber @@ -185,9 +187,17 @@ end end end if pending_sources + # By passing the whole array into this Fiber, it's possible that we set ourselves up for a bunch of no-ops. + # For example, if you have sources `[a, b, c]`, and `a` is loaded, then `b` yields to wait for `d`, then + # the next fiber would be dispatched with `[c, d]`. It would fulfill `c`, then `d`, then eventually + # the previous fiber would start up again. `c` would no longer be pending, but it would still receive `.run_pending_keys`. + # That method is short-circuited since it isn't pending any more, but it's still a waste. + # + # This design could probably be improved by maintaining a `@pending_sources` queue which is shared by the fibers, + # similar to `@pending_jobs`. That way, when a fiber is resumed, it would never pick up work that was finished by a different fiber. source_fiber = Fiber.new do pending_sources.each(&:run_pending_keys) end end