lib/graphql/dataloader.rb in graphql-2.3.18 vs lib/graphql/dataloader.rb in graphql-2.3.19

- old
+ new

@@ -22,22 +22,27 @@ # dataloader.with(Sources::Record, Team).load(object.team_id) # end # class Dataloader class << self - attr_accessor :default_nonblocking + attr_accessor :default_nonblocking, :default_fiber_limit end - NonblockingDataloader = Class.new(self) { self.default_nonblocking = true } - - def self.use(schema, nonblocking: nil) - schema.dataloader_class = if nonblocking + def self.use(schema, nonblocking: nil, fiber_limit: nil) + dataloader_class = if nonblocking warn("`nonblocking: true` is deprecated from `GraphQL::Dataloader`, please use `GraphQL::Dataloader::AsyncDataloader` instead. Docs: https://graphql-ruby.org/dataloader/async_dataloader.") - NonblockingDataloader + Class.new(self) { self.default_nonblocking = true } else self end + + if fiber_limit + dataloader_class = Class.new(dataloader_class) + dataloader_class.default_fiber_limit = fiber_limit + end + + schema.dataloader_class = dataloader_class end # Call the block with a Dataloader instance, # then run all enqueued jobs and return the result of the block. def self.with_dataloading(&block) @@ -48,18 +53,22 @@ } dataloader.run result end - def initialize(nonblocking: self.class.default_nonblocking) + def initialize(nonblocking: self.class.default_nonblocking, fiber_limit: self.class.default_fiber_limit) @source_cache = Hash.new { |h, k| h[k] = {} } @pending_jobs = [] if !nonblocking.nil? @nonblocking = nonblocking end + @fiber_limit = fiber_limit end + # @return [Integer, nil] + attr_reader :fiber_limit + def nonblocking? @nonblocking end # This is called before the fiber is spawned, from the parent context (i.e. from @@ -176,31 +185,32 @@ end end end def run + jobs_fiber_limit, total_fiber_limit = calculate_fiber_limit job_fibers = [] next_job_fibers = [] source_fibers = [] next_source_fibers = [] first_pass = true manager = spawn_fiber do while first_pass || job_fibers.any? first_pass = false - while (f = (job_fibers.shift || spawn_job_fiber)) + while (f = (job_fibers.shift || (((next_job_fibers.size + job_fibers.size) < jobs_fiber_limit) && spawn_job_fiber))) if f.alive? finished = run_fiber(f) if !finished next_job_fibers << f end end end join_queues(job_fibers, next_job_fibers) - while source_fibers.any? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) } - while (f = source_fibers.shift || spawn_source_fiber) + while (source_fibers.any? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) }) + while (f = source_fibers.shift || (((job_fibers.size + source_fibers.size + next_source_fibers.size + next_job_fibers.size) < total_fiber_limit) && spawn_source_fiber)) if f.alive? finished = run_fiber(f) if !finished next_source_fibers << f end @@ -239,9 +249,20 @@ cleanup_fiber } end private + + def calculate_fiber_limit + total_fiber_limit = @fiber_limit || Float::INFINITY + if total_fiber_limit < 4 + raise ArgumentError, "Dataloader fiber limit is too low (#{total_fiber_limit}), it must be at least 4" + end + total_fiber_limit -= 1 # deduct one fiber for `manager` + # Deduct at least one fiber for sources + jobs_fiber_limit = total_fiber_limit - 2 + return jobs_fiber_limit, total_fiber_limit + end def join_queues(prev_queue, new_queue) @nonblocking && Fiber.scheduler.run prev_queue.concat(new_queue) new_queue.clear