Sha256: 31fc28695a4d9ea8997ff87b0f5e6a37a0c4af0c7c877d86cc92837a9a295bff

Contents?: true

Size: 1.77 KB

Versions: 27

Compression:

Stored size: 1.77 KB

Contents

# frozen_string_literal: true

module Karafka
  # Namespace for various other libs patches
  module Patches
    # Patches for Ruby Kafka gem
    module RubyKafka
      # This patch allows us to inject business logic in between fetches and before the consumer
      # stop, so we can perform stop commit or anything else that we need since
      # ruby-kafka fetch loop does not allow that directly
      # We don't won't to use poll ruby-kafka api as it brings many more problems that we would
      # have to take care of. That way, nothing like that ever happens but we get the control
      # over the stopping process that we need (since we're the once that initiate it for each
      # thread)
      def consumer_loop
        super do
          consumers = Karafka::Persistence::Consumers
                      .current
                      .values
                      .flat_map(&:values)
                      .select { |consumer| consumer.class.respond_to?(:after_fetch) }

          if Karafka::App.stopping?
            publish_event(consumers, 'before_stop')
            Karafka::Persistence::Client.read.stop
          else
            publish_event(consumers, 'before_poll')
            yield
            publish_event(consumers, 'after_poll')
          end
        end
      end

      private

      # Notifies consumers about particular events happening
      # @param consumers [Array<Object>] all consumers that want to be notified about an event
      # @param event_name [String] name of the event that happened
      def publish_event(consumers, event_name)
        consumers.each do |consumer|
          key = "consumers.#{Helpers::Inflector.map(consumer.class.to_s)}.#{event_name}"
          Karafka::App.monitor.instrument(key, context: consumer)
        end
      end
    end
  end
end

Version data entries

27 entries across 27 versions & 1 rubygems

Version Path
karafka-1.4.15 lib/karafka/patches/ruby_kafka.rb
karafka-1.4.14 lib/karafka/patches/ruby_kafka.rb
karafka-1.4.13 lib/karafka/patches/ruby_kafka.rb
karafka-1.4.12 lib/karafka/patches/ruby_kafka.rb
karafka-1.4.11 lib/karafka/patches/ruby_kafka.rb
karafka-1.4.10 lib/karafka/patches/ruby_kafka.rb
karafka-1.4.9 lib/karafka/patches/ruby_kafka.rb
karafka-1.4.8 lib/karafka/patches/ruby_kafka.rb
karafka-1.4.7 lib/karafka/patches/ruby_kafka.rb
karafka-1.4.6 lib/karafka/patches/ruby_kafka.rb
karafka-1.4.5 lib/karafka/patches/ruby_kafka.rb
karafka-1.4.4 lib/karafka/patches/ruby_kafka.rb
karafka-1.4.3 lib/karafka/patches/ruby_kafka.rb
karafka-1.4.2 lib/karafka/patches/ruby_kafka.rb
karafka-1.4.1 lib/karafka/patches/ruby_kafka.rb
karafka-1.4.0 lib/karafka/patches/ruby_kafka.rb
karafka-1.4.0.rc2 lib/karafka/patches/ruby_kafka.rb
karafka-1.4.0.rc1 lib/karafka/patches/ruby_kafka.rb
karafka-1.3.7 lib/karafka/patches/ruby_kafka.rb
karafka-1.3.6 lib/karafka/patches/ruby_kafka.rb