lib/kafka/fetcher.rb in ruby-kafka-0.7.5 vs lib/kafka/fetcher.rb in ruby-kafka-0.7.6.beta1

- old
+ new

@@ -6,11 +6,11 @@ class Fetcher attr_reader :queue def initialize(cluster:, logger:, instrumenter:, max_queue_size:, group:) @cluster = cluster - @logger = logger + @logger = TaggedLogger.new(logger) @instrumenter = instrumenter @max_queue_size = max_queue_size @group = group @queue = Queue.new @@ -53,11 +53,11 @@ @thread = Thread.new do while @running loop end - @logger.info "Fetcher thread exited." + @logger.info "#{@group} Fetcher thread exited." end @thread.abort_on_exception = true end def stop @@ -92,10 +92,11 @@ private attr_reader :current_reset_counter def loop + @logger.push_tags(@group.to_s) @instrumenter.instrument("loop.fetcher", { queue_size: @queue.size, }) return unless @running @@ -110,9 +111,11 @@ step else @logger.warn "Reached max fetcher queue size (#{@max_queue_size}), sleeping 1s" sleep 1 end + ensure + @logger.pop_tags end def handle_configure(min_bytes, max_bytes, max_wait_time) @min_bytes = min_bytes @max_bytes = max_bytes