Sha256: 50d70d7a9acbcd9d8d6a0e3afc54f376c6e0ae3dfc976ab319985f04cb48f448

Contents?: true

Size: 1.03 KB

Versions: 1

Compression:

Stored size: 1.03 KB

Contents

# frozen_string_literal: true

module WaterDrop
  # Patches to external components
  module Patches
    # Rdkafka related patches
    module Rdkafka
      # Rdkafka::Metadata patches
      module Metadata
        # We overwrite this method because there were reports of metadata operation timing out
        # when Kafka was under stress. While the messages dispatch will be retried, metadata
        # fetch happens prior to that, effectively crashing the process. Metadata fetch was not
        # being retried at all.
        #
        # @param args [Array<Object>] all the metadata original arguments
        def initialize(*args)
          attempt ||= 0
          attempt += 1

          super(*args)
        rescue Rdkafka::RdkafkaError => e
          raise unless e.code == :timed_out
          raise if attempt > 10

          backoff_factor = 2**attempt
          timeout = backoff_factor * 0.1

          sleep(timeout)

          retry
        end
      end
    end
  end
end

::Rdkafka::Metadata.prepend ::WaterDrop::Patches::Rdkafka::Metadata

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
waterdrop-2.4.4 lib/waterdrop/patches/rdkafka/metadata.rb