Sha256: d22f9e2a571f93084b2bfcd8ebc2e883a6a3a0b1a3de24444674910ee71641ad
Contents?: true
Size: 1.27 KB
Versions: 18
Compression:
Stored size: 1.27 KB
Contents
# frozen_string_literal: true module Karafka module Patches # Patches for Ruby Kafka gem module RubyKafka # This patch allows us to inject business logic in between fetches and before the consumer # stop, so we can perform stop commit or anything else that we need since # ruby-kafka fetch loop does not allow that directly # We don't wan't to use poll ruby-kafka api as it brings many more problems that we would # have to take care of. That way, nothing like that ever happens but we get the control # over the stopping process that we need (since we're the once that initiate it for each # thread) def consumer_loop super do consumers = Karafka::Persistence::Consumer .all .values .flat_map(&:values) .select { |ctrl| ctrl.respond_to?(:run_callbacks) } if Karafka::App.stopped? consumers.each { |ctrl| ctrl.run_callbacks :before_stop } Karafka::Persistence::Client.read.stop else consumers.each { |ctrl| ctrl.run_callbacks :before_poll } yield consumers.each { |ctrl| ctrl.run_callbacks :after_poll } end end end end end end
Version data entries
18 entries across 18 versions & 1 rubygems