lib/graphql/dataloader.rb in graphql-1.12.3 vs lib/graphql/dataloader.rb in graphql-1.12.4
- old
+ new
@@ -25,40 +25,28 @@
class Dataloader
def self.use(schema)
schema.dataloader_class = self
end
- def initialize(multiplex_context)
- @context = multiplex_context
+ def initialize
@source_cache = Hash.new { |h, source_class| h[source_class] = Hash.new { |h2, batch_parameters|
source = source_class.new(*batch_parameters)
source.setup(self)
h2[batch_parameters] = source
}
}
- @waiting_fibers = []
- @yielded_fibers = {}
+ @pending_jobs = []
end
- # @return [Hash] the {Multiplex} context
- attr_reader :context
-
- # @api private
- attr_reader :yielded_fibers
-
- # Add some work to this dataloader to be scheduled later.
- # @param block Some work to enqueue
- # @return [void]
- def enqueue(&block)
- @waiting_fibers << Fiber.new {
- begin
- yield
- rescue StandardError => exception
- exception
- end
- }
- nil
+ # Get a Source instance from this dataloader, for calling `.load(...)` or `.request(...)` on.
+ #
+ # @param source_class [Class<GraphQL::Dataloader::Source]
+ # @param batch_parameters [Array<Object>]
+ # @return [GraphQL::Dataloader::Source] An instance of {source_class}, initialized with `self, *batch_parameters`,
+ # and cached for the lifetime of this {Multiplex}.
+ def with(source_class, *batch_parameters)
+ @source_cache[source_class][batch_parameters]
end
# Tell the dataloader that this fiber is waiting for data.
#
# Dataloader will resume the fiber after the requested data has been loaded (by another Fiber).
@@ -67,37 +55,74 @@
def yield
Fiber.yield
nil
end
- # @param path [Array<String, Integer>] A graphql response path
- # @return [Boolean] True if the current Fiber has yielded once via Dataloader at {path}
- def yielded?(path)
- @yielded_fibers[Fiber.current] == path
+ # @api private Nothing to see here
+ def append_job(&job)
+ # Given a block, queue it up to be worked through when `#run` is called.
+ # (If the dataloader is already running, than a Fiber will pick this up later.)
+ @pending_jobs.push(job)
+ nil
end
- # Run all Fibers until they're all done
- #
- # Each cycle works like this:
- #
- # - Run each pending execution fiber (`@waiting_fibers`),
- # - Then run each pending Source, preparing more data for those fibers.
- # - Run each pending Source _again_ (if one Source requested more data from another Source)
- # - Continue until there are no pending sources
- # - Repeat: run execution fibers again ...
- #
- # @return [void]
+ # @api private Move along, move along
def run
- # Start executing Fibers. This will run until all the Fibers are done.
- already_run_fibers = []
- while (current_fiber = @waiting_fibers.pop)
- # Run each execution fiber, enqueuing it in `already_run_fibers`
- # if it's still `.alive?`.
- # Any spin-off continuations will be enqueued in `@waiting_fibers` (via {#enqueue})
- resume_fiber_and_enqueue_continuation(current_fiber, already_run_fibers)
+ # At a high level, the algorithm is:
+ #
+ # A) Inside Fibers, run jobs from the queue one-by-one
+ # - When one of the jobs yields to the dataloader (`Fiber.yield`), then that fiber will pause
+ # - In that case, if there are still pending jobs, a new Fiber will be created to run jobs
+ # - Continue until all jobs have been _started_ by a Fiber. (Any number of those Fibers may be waiting to be resumed, after their data is loaded)
+ # B) Once all known jobs have been run until they are complete or paused for data, run all pending data sources.
+ # - Similarly, create a Fiber to consume pending sources and tell them to load their data.
+ # - If one of those Fibers pauses, then create a new Fiber to continue working through remaining pending sources.
+ # - When a source causes another source to become pending, run the newly-pending source _first_, since it's a dependency of the previous one.
+ # C) After all pending sources have been completely loaded (there are no more pending sources), resume any Fibers that were waiting for data.
+ # - Those Fibers assume that source caches will have been populated with the data they were waiting for.
+ # - Those Fibers may request data from a source again, in which case they will yeilded and be added to a new pending fiber list.
+ # D) Once all pending fibers have been resumed once, return to `A` above.
+ #
+ # For whatever reason, the best implementation I could find was to order the steps `[D, A, B, C]`, with a special case for skipping `D`
+ # on the first pass. I just couldn't find a better way to write the loops in a way that was DRY and easy to read.
+ #
+ pending_fibers = []
+ next_fibers = []
+ first_pass = true
- if @waiting_fibers.empty?
+ while first_pass || (f = pending_fibers.shift)
+ if first_pass
+ first_pass = false
+ else
+ # These fibers were previously waiting for sources to load data,
+ # resume them. (They might wait again, in which case, re-enqueue them.)
+ f.resume
+ if f.alive?
+ next_fibers << f
+ end
+ end
+
+ while @pending_jobs.any?
+ # Create a Fiber to consume jobs until one of the jobs yields
+ # or jobs run out
+ f = Fiber.new {
+ while (job = @pending_jobs.shift)
+ job.call
+ end
+ }
+ result = f.resume
+ if result.is_a?(StandardError)
+ raise result
+ end
+ # In this case, the job yielded. Queue it up to run again after
+ # we load whatever it's waiting for.
+ if f.alive?
+ next_fibers << f
+ end
+ end
+
+ if pending_fibers.empty?
# Now, run all Sources which have become pending _before_ resuming GraphQL execution.
# Sources might queue up other Sources, which is fine -- those will also run before resuming execution.
#
# This is where an evented approach would be even better -- can we tell which
# fibers are ready to continue, and continue execution there?
@@ -107,72 +132,49 @@
else
nil
end
if source_fiber_stack
+ # Use a stack with `.pop` here so that when a source causes another source to become pending,
+ # that newly-pending source will run _before_ the one that depends on it.
+ # (See below where the old fiber is pushed to the stack, then the new fiber is pushed on the stack.)
while (outer_source_fiber = source_fiber_stack.pop)
- resume_fiber_and_enqueue_continuation(outer_source_fiber, source_fiber_stack)
+ result = outer_source_fiber.resume
+ if result.is_a?(StandardError)
+ raise result
+ end
+ if outer_source_fiber.alive?
+ source_fiber_stack << outer_source_fiber
+ end
# If this source caused more sources to become pending, run those before running this one again:
next_source_fiber = create_source_fiber
if next_source_fiber
source_fiber_stack << next_source_fiber
end
end
end
-
- # We ran all the first round of execution fibers,
- # and we ran all the pending sources.
- # So pick up any paused execution fibers and repeat.
- @waiting_fibers.concat(already_run_fibers)
- already_run_fibers.clear
+ # Move newly-enqueued Fibers on to the list to be resumed.
+ # Clear out the list of next-round Fibers, so that
+ # any Fibers that pause can be put on it.
+ pending_fibers.concat(next_fibers)
+ next_fibers.clear
end
end
+
+ if @pending_jobs.any?
+ raise "Invariant: #{@pending_jobs.size} pending jobs"
+ elsif pending_fibers.any?
+ raise "Invariant: #{pending_fibers.size} pending fibers"
+ elsif next_fibers.any?
+ raise "Invariant: #{next_fibers.size} next fibers"
+ end
nil
end
- # Get a Source instance from this dataloader, for calling `.load(...)` or `.request(...)` on.
- #
- # @param source_class [Class<GraphQL::Dataloader::Source]
- # @param batch_parameters [Array<Object>]
- # @return [GraphQL::Dataloader::Source] An instance of {source_class}, initialized with `self, *batch_parameters`,
- # and cached for the lifetime of this {Multiplex}.
- def with(source_class, *batch_parameters)
- @source_cache[source_class][batch_parameters]
- end
-
- # @api private
- attr_accessor :current_runtime
-
private
- # Check if this fiber is still alive.
- # If it is, and it should continue, then enqueue a continuation.
- # If it is, re-enqueue it in `fiber_queue`.
- # Otherwise, clean it up from @yielded_fibers.
- # @return [void]
- def resume_fiber_and_enqueue_continuation(fiber, fiber_stack)
- result = fiber.resume
- if result.is_a?(StandardError)
- raise result
- end
-
- # This fiber yielded; there's more to do here.
- # (If `#alive?` is false, then the fiber concluded without yielding.)
- if fiber.alive?
- if !@yielded_fibers.include?(fiber)
- # This fiber hasn't yielded yet, we should enqueue a continuation fiber
- @yielded_fibers[fiber] = current_runtime.progress_path
- current_runtime.enqueue_selections_fiber
- end
- fiber_stack << fiber
- else
- # Keep this set clean so that fibers can be GC'ed during execution
- @yielded_fibers.delete(fiber)
- end
- end
-
# If there are pending sources, return a fiber for running them.
# Otherwise, return `nil`.
#
# @return [Fiber, nil]
def create_source_fiber
@@ -185,9 +187,17 @@
end
end
end
if pending_sources
+ # By passing the whole array into this Fiber, it's possible that we set ourselves up for a bunch of no-ops.
+ # For example, if you have sources `[a, b, c]`, and `a` is loaded, then `b` yields to wait for `d`, then
+ # the next fiber would be dispatched with `[c, d]`. It would fulfill `c`, then `d`, then eventually
+ # the previous fiber would start up again. `c` would no longer be pending, but it would still receive `.run_pending_keys`.
+ # That method is short-circuited since it isn't pending any more, but it's still a waste.
+ #
+ # This design could probably be improved by maintaining a `@pending_sources` queue which is shared by the fibers,
+ # similar to `@pending_jobs`. That way, when a fiber is resumed, it would never pick up work that was finished by a different fiber.
source_fiber = Fiber.new do
pending_sources.each(&:run_pending_keys)
end
end