lib/kafka/fetcher.rb in ruby-kafka-0.6.2 vs lib/kafka/fetcher.rb in ruby-kafka-0.6.3

- old
+ new

@@ -2,18 +2,17 @@ require "kafka/fetch_operation" module Kafka class Fetcher - MAX_QUEUE_SIZE = 100 - attr_reader :queue - def initialize(cluster:, logger:, instrumenter:) + def initialize(cluster:, logger:, instrumenter:, max_queue_size:) @cluster = cluster @logger = logger @instrumenter = instrumenter + @max_queue_size = max_queue_size @queue = Queue.new @commands = Queue.new @next_offsets = Hash.new { |h, k| h[k] = {} } @@ -87,13 +86,13 @@ @logger.debug "Handling fetcher command: #{cmd}" send("handle_#{cmd}", *args) elsif !@running sleep 0.1 - elsif @queue.size < MAX_QUEUE_SIZE + elsif @queue.size < @max_queue_size step else - @logger.warn "Reached max fetcher queue size (#{MAX_QUEUE_SIZE}), sleeping 1s" + @logger.warn "Reached max fetcher queue size (#{@max_queue_size}), sleeping 1s" sleep 1 end end def handle_configure(min_bytes, max_bytes, max_wait_time)