Sha256: d22f9e2a571f93084b2bfcd8ebc2e883a6a3a0b1a3de24444674910ee71641ad

Contents?: true

Size: 1.27 KB

Versions: 18

Compression:

Stored size: 1.27 KB

Contents

# frozen_string_literal: true

module Karafka
  module Patches
    # Patches for Ruby Kafka gem
    module RubyKafka
      # This patch allows us to inject business logic in between fetches and before the consumer
      # stop, so we can perform stop commit or anything else that we need since
      # ruby-kafka fetch loop does not allow that directly
      # We don't wan't to use poll ruby-kafka api as it brings many more problems that we would
      # have to take care of. That way, nothing like that ever happens but we get the control
      # over the stopping process that we need (since we're the once that initiate it for each
      # thread)
      def consumer_loop
        super do
          consumers = Karafka::Persistence::Consumer
                      .all
                      .values
                      .flat_map(&:values)
                      .select { |ctrl| ctrl.respond_to?(:run_callbacks) }

          if Karafka::App.stopped?
            consumers.each { |ctrl| ctrl.run_callbacks :before_stop }
            Karafka::Persistence::Client.read.stop
          else
            consumers.each { |ctrl| ctrl.run_callbacks :before_poll }
            yield
            consumers.each { |ctrl| ctrl.run_callbacks :after_poll }
          end
        end
      end
    end
  end
end

Version data entries

18 entries across 18 versions & 1 rubygems

Version Path
karafka-1.2.13 lib/karafka/patches/ruby_kafka.rb
karafka-1.2.12 lib/karafka/patches/ruby_kafka.rb
karafka-1.2.11 lib/karafka/patches/ruby_kafka.rb
karafka-1.2.10 lib/karafka/patches/ruby_kafka.rb
karafka-1.2.9 lib/karafka/patches/ruby_kafka.rb
karafka-1.2.8 lib/karafka/patches/ruby_kafka.rb
karafka-1.2.7 lib/karafka/patches/ruby_kafka.rb
karafka-1.2.6 lib/karafka/patches/ruby_kafka.rb
karafka-1.2.5 lib/karafka/patches/ruby_kafka.rb
karafka-1.2.4 lib/karafka/patches/ruby_kafka.rb
karafka-1.2.3 lib/karafka/patches/ruby_kafka.rb
karafka-1.2.2 lib/karafka/patches/ruby_kafka.rb
karafka-1.2.1 lib/karafka/patches/ruby_kafka.rb
karafka-1.2.0 lib/karafka/patches/ruby_kafka.rb
karafka-1.2.0.beta4 lib/karafka/patches/ruby_kafka.rb
karafka-1.2.0.beta3 lib/karafka/patches/ruby_kafka.rb
karafka-1.2.0.beta2 lib/karafka/patches/ruby_kafka.rb
karafka-1.2.0.beta1 lib/karafka/patches/ruby_kafka.rb