lib/graphql/dataloader.rb in graphql-2.3.18 vs lib/graphql/dataloader.rb in graphql-2.3.19
- old
+ new
@@ -22,22 +22,27 @@
# dataloader.with(Sources::Record, Team).load(object.team_id)
# end
#
class Dataloader
class << self
- attr_accessor :default_nonblocking
+ attr_accessor :default_nonblocking, :default_fiber_limit
end
- NonblockingDataloader = Class.new(self) { self.default_nonblocking = true }
-
- def self.use(schema, nonblocking: nil)
- schema.dataloader_class = if nonblocking
+ def self.use(schema, nonblocking: nil, fiber_limit: nil)
+ dataloader_class = if nonblocking
warn("`nonblocking: true` is deprecated from `GraphQL::Dataloader`, please use `GraphQL::Dataloader::AsyncDataloader` instead. Docs: https://graphql-ruby.org/dataloader/async_dataloader.")
- NonblockingDataloader
+ Class.new(self) { self.default_nonblocking = true }
else
self
end
+
+ if fiber_limit
+ dataloader_class = Class.new(dataloader_class)
+ dataloader_class.default_fiber_limit = fiber_limit
+ end
+
+ schema.dataloader_class = dataloader_class
end
# Call the block with a Dataloader instance,
# then run all enqueued jobs and return the result of the block.
def self.with_dataloading(&block)
@@ -48,18 +53,22 @@
}
dataloader.run
result
end
- def initialize(nonblocking: self.class.default_nonblocking)
+ def initialize(nonblocking: self.class.default_nonblocking, fiber_limit: self.class.default_fiber_limit)
@source_cache = Hash.new { |h, k| h[k] = {} }
@pending_jobs = []
if !nonblocking.nil?
@nonblocking = nonblocking
end
+ @fiber_limit = fiber_limit
end
+ # @return [Integer, nil]
+ attr_reader :fiber_limit
+
def nonblocking?
@nonblocking
end
# This is called before the fiber is spawned, from the parent context (i.e. from
@@ -176,31 +185,32 @@
end
end
end
def run
+ jobs_fiber_limit, total_fiber_limit = calculate_fiber_limit
job_fibers = []
next_job_fibers = []
source_fibers = []
next_source_fibers = []
first_pass = true
manager = spawn_fiber do
while first_pass || job_fibers.any?
first_pass = false
- while (f = (job_fibers.shift || spawn_job_fiber))
+ while (f = (job_fibers.shift || (((next_job_fibers.size + job_fibers.size) < jobs_fiber_limit) && spawn_job_fiber)))
if f.alive?
finished = run_fiber(f)
if !finished
next_job_fibers << f
end
end
end
join_queues(job_fibers, next_job_fibers)
- while source_fibers.any? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) }
- while (f = source_fibers.shift || spawn_source_fiber)
+ while (source_fibers.any? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) })
+ while (f = source_fibers.shift || (((job_fibers.size + source_fibers.size + next_source_fibers.size + next_job_fibers.size) < total_fiber_limit) && spawn_source_fiber))
if f.alive?
finished = run_fiber(f)
if !finished
next_source_fibers << f
end
@@ -239,9 +249,20 @@
cleanup_fiber
}
end
private
+
+ def calculate_fiber_limit
+ total_fiber_limit = @fiber_limit || Float::INFINITY
+ if total_fiber_limit < 4
+ raise ArgumentError, "Dataloader fiber limit is too low (#{total_fiber_limit}), it must be at least 4"
+ end
+ total_fiber_limit -= 1 # deduct one fiber for `manager`
+ # Deduct at least one fiber for sources
+ jobs_fiber_limit = total_fiber_limit - 2
+ return jobs_fiber_limit, total_fiber_limit
+ end
def join_queues(prev_queue, new_queue)
@nonblocking && Fiber.scheduler.run
prev_queue.concat(new_queue)
new_queue.clear