lib/kafka/fetcher.rb in ruby-kafka-0.6.2 vs lib/kafka/fetcher.rb in ruby-kafka-0.6.3
- old
+ new
@@ -2,18 +2,17 @@
require "kafka/fetch_operation"
module Kafka
class Fetcher
- MAX_QUEUE_SIZE = 100
-
attr_reader :queue
- def initialize(cluster:, logger:, instrumenter:)
+ def initialize(cluster:, logger:, instrumenter:, max_queue_size:)
@cluster = cluster
@logger = logger
@instrumenter = instrumenter
+ @max_queue_size = max_queue_size
@queue = Queue.new
@commands = Queue.new
@next_offsets = Hash.new { |h, k| h[k] = {} }
@@ -87,13 +86,13 @@
@logger.debug "Handling fetcher command: #{cmd}"
send("handle_#{cmd}", *args)
elsif !@running
sleep 0.1
- elsif @queue.size < MAX_QUEUE_SIZE
+ elsif @queue.size < @max_queue_size
step
else
- @logger.warn "Reached max fetcher queue size (#{MAX_QUEUE_SIZE}), sleeping 1s"
+ @logger.warn "Reached max fetcher queue size (#{@max_queue_size}), sleeping 1s"
sleep 1
end
end
def handle_configure(min_bytes, max_bytes, max_wait_time)