Sha256: 3bbf7c8de02034324176f3bda9bed7b1582d0bb26dd40402e038a2d3c23b154c

Contents?: true

Size: 1.29 KB

Versions: 5

Compression:

Stored size: 1.29 KB

Contents

# frozen_string_literal: true

module Karafka
  module Patches
    # Batches for Ruby Kafka gem
    module RubyKafka
      # This patch allows us to inject business logic in between fetches and before the consumer
      # stop, so we can perform stop commit or anything else that we need since
      # ruby-kafka fetch loop does not allow that directly
      # We don't wan't to use poll ruby-kafka api as it brings many more problems that we would
      # have to take care of. That way, nothing like that ever happens but we get the control
      # over the stopping process that we need (since we're the once that initiate it for each
      # thread)
      def consumer_loop
        super do
          controllers = Karafka::Persistence::Controller
                        .all
                        .values
                        .flat_map(&:values)
                        .select { |ctrl| ctrl.respond_to?(:run_callbacks) }

          if Karafka::App.stopped?
            controllers.each { |ctrl| ctrl.run_callbacks :before_stop }
            Karafka::Persistence::Consumer.read.stop
          else
            controllers.each { |ctrl| ctrl.run_callbacks :before_poll }
            yield
            controllers.each { |ctrl| ctrl.run_callbacks :after_poll }
          end
        end
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
karafka-1.1.2 lib/karafka/patches/ruby_kafka.rb
karafka-1.1.1 lib/karafka/patches/ruby_kafka.rb
karafka-1.1.0 lib/karafka/patches/ruby_kafka.rb
karafka-1.1.0.alpha2 lib/karafka/patches/ruby_kafka.rb
karafka-1.1.0.alpha1 lib/karafka/patches/ruby_kafka.rb