Sha256: 50d70d7a9acbcd9d8d6a0e3afc54f376c6e0ae3dfc976ab319985f04cb48f448
Contents?: true
Size: 1.03 KB
Versions: 1
Compression:
Stored size: 1.03 KB
Contents
# frozen_string_literal: true module WaterDrop # Patches to external components module Patches # Rdkafka related patches module Rdkafka # Rdkafka::Metadata patches module Metadata # We overwrite this method because there were reports of metadata operation timing out # when Kafka was under stress. While the messages dispatch will be retried, metadata # fetch happens prior to that, effectively crashing the process. Metadata fetch was not # being retried at all. # # @param args [Array<Object>] all the metadata original arguments def initialize(*args) attempt ||= 0 attempt += 1 super(*args) rescue Rdkafka::RdkafkaError => e raise unless e.code == :timed_out raise if attempt > 10 backoff_factor = 2**attempt timeout = backoff_factor * 0.1 sleep(timeout) retry end end end end end ::Rdkafka::Metadata.prepend ::WaterDrop::Patches::Rdkafka::Metadata
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
waterdrop-2.4.4 | lib/waterdrop/patches/rdkafka/metadata.rb |