Sha256: f8e92688246b1f70b9d391a7cd2ac5b668eb77fc8db2c8d05ae5f02d09d27cf7

Contents?: true

Size: 1.24 KB

Versions: 10

Compression:

Stored size: 1.24 KB

Contents

# frozen_string_literal: true

module WaterDrop
  # Patches to external components
  module Patches
    # Rdkafka related patches
    module Rdkafka
      # Rdkafka::Metadata patches
      module Metadata
        # Errors upon which we retry the metadata fetch
        RETRIED_ERRORS = %i[
          timed_out
          leader_not_available
        ].freeze

        private_constant :RETRIED_ERRORS

        # We overwrite this method because there were reports of metadata operation timing out
        # when Kafka was under stress. While the messages dispatch will be retried, metadata
        # fetch happens prior to that, effectively crashing the process. Metadata fetch was not
        # being retried at all.
        #
        # @param args [Array<Object>] all the metadata original arguments
        def initialize(*args)
          attempt ||= 0
          attempt += 1

          super(*args)
        rescue ::Rdkafka::RdkafkaError => e
          raise unless RETRIED_ERRORS.include?(e.code)
          raise if attempt > 10

          backoff_factor = 2**attempt
          timeout = backoff_factor * 0.1

          sleep(timeout)

          retry
        end
      end
    end
  end
end

::Rdkafka::Metadata.prepend ::WaterDrop::Patches::Rdkafka::Metadata

Version data entries

10 entries across 10 versions & 1 rubygems

Version Path
waterdrop-2.6.0 lib/waterdrop/patches/rdkafka/metadata.rb
waterdrop-2.5.3 lib/waterdrop/patches/rdkafka/metadata.rb
waterdrop-2.5.2 lib/waterdrop/patches/rdkafka/metadata.rb
waterdrop-2.5.1 lib/waterdrop/patches/rdkafka/metadata.rb
waterdrop-2.5.0 lib/waterdrop/patches/rdkafka/metadata.rb
waterdrop-2.4.11 lib/waterdrop/patches/rdkafka/metadata.rb
waterdrop-2.4.10 lib/waterdrop/patches/rdkafka/metadata.rb
waterdrop-2.4.9 lib/waterdrop/patches/rdkafka/metadata.rb
waterdrop-2.4.8 lib/waterdrop/patches/rdkafka/metadata.rb
waterdrop-2.4.7 lib/waterdrop/patches/rdkafka/metadata.rb