Sha256: ddb9e969ae5f69af8a9094e5db8cfec94bd3331f170e739909fc4df5616633ec

Contents?: true

Size: 1.4 KB

Versions: 17

Compression:

Stored size: 1.4 KB

Contents

# frozen_string_literal: true

module Karafka
  module Web
    module Tracking
      module Consumers
        module Listeners
          # Tracks pausing and un-pausing of topics partitions for both user requested and
          # automatic events.
          class Pausing < Base
            # Indicate pause
            #
            # @param event [Karafka::Core::Monitoring::Event]
            def on_consumer_consuming_pause(event)
              track do |sampler|
                sampler.pauses[pause_id(event)] = {
                  timeout: event[:timeout],
                  paused_till: monotonic_now + event[:timeout]
                }
              end
            end

            # Indicate pause ended
            #
            # @param event [Karafka::Core::Monitoring::Event]
            def on_client_resume(event)
              track do |sampler|
                sampler.pauses.delete pause_id(event)
              end
            end

            private

            # @param event [Karafka::Core::Monitoring::Event]
            # @return [String] pause id built from consumer group and topic details
            def pause_id(event)
              topic = event[:topic]
              partition = event[:partition]
              subscription_group_id = event[:subscription_group].id

              [subscription_group_id, topic, partition].join('-')
            end
          end
        end
      end
    end
  end
end

Version data entries

17 entries across 17 versions & 1 rubygems

Version Path
karafka-web-0.10.4 lib/karafka/web/tracking/consumers/listeners/pausing.rb
karafka-web-0.10.3 lib/karafka/web/tracking/consumers/listeners/pausing.rb
karafka-web-0.10.2 lib/karafka/web/tracking/consumers/listeners/pausing.rb
karafka-web-0.10.1 lib/karafka/web/tracking/consumers/listeners/pausing.rb
karafka-web-0.10.0 lib/karafka/web/tracking/consumers/listeners/pausing.rb
karafka-web-0.10.0.rc2 lib/karafka/web/tracking/consumers/listeners/pausing.rb
karafka-web-0.10.0.rc1 lib/karafka/web/tracking/consumers/listeners/pausing.rb
karafka-web-0.10.0.beta1 lib/karafka/web/tracking/consumers/listeners/pausing.rb
karafka-web-0.9.1 lib/karafka/web/tracking/consumers/listeners/pausing.rb
karafka-web-0.9.0 lib/karafka/web/tracking/consumers/listeners/pausing.rb
karafka-web-0.9.0.rc3 lib/karafka/web/tracking/consumers/listeners/pausing.rb
karafka-web-0.9.0.rc2 lib/karafka/web/tracking/consumers/listeners/pausing.rb
karafka-web-0.9.0.rc1 lib/karafka/web/tracking/consumers/listeners/pausing.rb
karafka-web-0.8.2 lib/karafka/web/tracking/consumers/listeners/pausing.rb
karafka-web-0.8.1 lib/karafka/web/tracking/consumers/listeners/pausing.rb
karafka-web-0.8.0 lib/karafka/web/tracking/consumers/listeners/pausing.rb
karafka-web-0.8.0.rc1 lib/karafka/web/tracking/consumers/listeners/pausing.rb