lib/graphql/dataloader.rb in graphql-1.12.24 vs lib/graphql/dataloader.rb in graphql-1.13.0
- old
+ new
@@ -21,14 +21,24 @@
# def team
# dataloader.with(Sources::Record, Team).load(object.team_id)
# end
#
class Dataloader
- def self.use(schema)
- schema.dataloader_class = self
+ class << self
+ attr_accessor :default_nonblocking
end
+ AsyncDataloader = Class.new(self) { self.default_nonblocking = true }
+
+ def self.use(schema, nonblocking: nil)
+ schema.dataloader_class = if nonblocking
+ AsyncDataloader
+ else
+ self
+ end
+ end
+
# Call the block with a Dataloader instance,
# then run all enqueued jobs and return the result of the block.
def self.with_dataloading(&block)
dataloader = self.new
result = nil
@@ -37,22 +47,29 @@
}
dataloader.run
result
end
- def initialize
+ def initialize(nonblocking: self.class.default_nonblocking)
@source_cache = Hash.new { |h, k| h[k] = {} }
@pending_jobs = []
+ if !nonblocking.nil?
+ @nonblocking = nonblocking
+ end
end
+ def nonblocking?
+ @nonblocking
+ end
+
# Get a Source instance from this dataloader, for calling `.load(...)` or `.request(...)` on.
#
# @param source_class [Class<GraphQL::Dataloader::Source]
# @param batch_parameters [Array<Object>]
# @return [GraphQL::Dataloader::Source] An instance of {source_class}, initialized with `self, *batch_parameters`,
# and cached for the lifetime of this {Multiplex}.
- if RUBY_VERSION < "3"
+ if RUBY_VERSION < "3" || RUBY_ENGINE != "ruby" # truffle-ruby wasn't doing well with the implementation below
def with(source_class, *batch_args)
batch_key = source_class.batch_key_for(*batch_args)
@source_cache[source_class][batch_key] ||= begin
source = source_class.new(*batch_args)
source.setup(self)
@@ -115,10 +132,13 @@
end
end
# @api private Move along, move along
def run
+ if @nonblocking && !Fiber.scheduler
+ raise "`nonblocking: true` requires `Fiber.scheduler`, assign one with `Fiber.set_scheduler(...)` before executing GraphQL."
+ end
# At a high level, the algorithm is:
#
# A) Inside Fibers, run jobs from the queue one-by-one
# - When one of the jobs yields to the dataloader (`Fiber.yield`), then that fiber will pause
# - In that case, if there are still pending jobs, a new Fiber will be created to run jobs
@@ -135,10 +155,12 @@
# For whatever reason, the best implementation I could find was to order the steps `[D, A, B, C]`, with a special case for skipping `D`
# on the first pass. I just couldn't find a better way to write the loops in a way that was DRY and easy to read.
#
pending_fibers = []
next_fibers = []
+ pending_source_fibers = []
+ next_source_fibers = []
first_pass = true
while first_pass || (f = pending_fibers.shift)
if first_pass
first_pass = false
@@ -172,35 +194,31 @@
# Sources might queue up other Sources, which is fine -- those will also run before resuming execution.
#
# This is where an evented approach would be even better -- can we tell which
# fibers are ready to continue, and continue execution there?
#
- source_fiber_queue = if (first_source_fiber = create_source_fiber)
- [first_source_fiber]
- else
- nil
+ if (first_source_fiber = create_source_fiber)
+ pending_source_fibers << first_source_fiber
end
- if source_fiber_queue
- while (outer_source_fiber = source_fiber_queue.shift)
+ while pending_source_fibers.any?
+ while (outer_source_fiber = pending_source_fibers.pop)
resume(outer_source_fiber)
-
- # If this source caused more sources to become pending, run those before running this one again:
- next_source_fiber = create_source_fiber
- if next_source_fiber
- source_fiber_queue << next_source_fiber
- end
-
if outer_source_fiber.alive?
- source_fiber_queue << outer_source_fiber
+ next_source_fibers << outer_source_fiber
end
+ if (next_source_fiber = create_source_fiber)
+ pending_source_fibers << next_source_fiber
+ end
end
+ join_queues(pending_source_fibers, next_source_fibers)
+ next_source_fibers.clear
end
# Move newly-enqueued Fibers on to the list to be resumed.
# Clear out the list of next-round Fibers, so that
# any Fibers that pause can be put on it.
- pending_fibers.concat(next_fibers)
+ join_queues(pending_fibers, next_fibers)
next_fibers.clear
end
end
if @pending_jobs.any?
@@ -211,10 +229,18 @@
raise "Invariant: #{next_fibers.size} next fibers"
end
nil
end
+ def join_queues(previous_queue, next_queue)
+ if @nonblocking
+ Fiber.scheduler.run
+ next_queue.select!(&:alive?)
+ end
+ previous_queue.concat(next_queue)
+ end
+
private
# If there are pending sources, return a fiber for running them.
# Otherwise, return `nil`.
#
@@ -264,12 +290,19 @@
Thread.current.keys.each do |fiber_var_key|
fiber_locals[fiber_var_key] = Thread.current[fiber_var_key]
end
- Fiber.new do
- fiber_locals.each { |k, v| Thread.current[k] = v }
- yield
+ if @nonblocking
+ Fiber.new(blocking: false) do
+ fiber_locals.each { |k, v| Thread.current[k] = v }
+ yield
+ end
+ else
+ Fiber.new do
+ fiber_locals.each { |k, v| Thread.current[k] = v }
+ yield
+ end
end
end
end
end