lib/kafka/fetcher.rb in ruby-kafka-0.7.5 vs lib/kafka/fetcher.rb in ruby-kafka-0.7.6.beta1
- old
+ new
@@ -6,11 +6,11 @@
class Fetcher
attr_reader :queue
def initialize(cluster:, logger:, instrumenter:, max_queue_size:, group:)
@cluster = cluster
- @logger = logger
+ @logger = TaggedLogger.new(logger)
@instrumenter = instrumenter
@max_queue_size = max_queue_size
@group = group
@queue = Queue.new
@@ -53,11 +53,11 @@
@thread = Thread.new do
while @running
loop
end
- @logger.info "Fetcher thread exited."
+ @logger.info "#{@group} Fetcher thread exited."
end
@thread.abort_on_exception = true
end
def stop
@@ -92,10 +92,11 @@
private
attr_reader :current_reset_counter
def loop
+ @logger.push_tags(@group.to_s)
@instrumenter.instrument("loop.fetcher", {
queue_size: @queue.size,
})
return unless @running
@@ -110,9 +111,11 @@
step
else
@logger.warn "Reached max fetcher queue size (#{@max_queue_size}), sleeping 1s"
sleep 1
end
+ ensure
+ @logger.pop_tags
end
def handle_configure(min_bytes, max_bytes, max_wait_time)
@min_bytes = min_bytes
@max_bytes = max_bytes