Sha256: b607415fa3c8963dc3a210291296b696aa874d5241fbeeb2d3c6c202caa087ec
Contents?: true
Size: 1.48 KB
Versions: 1
Compression:
Stored size: 1.48 KB
Contents
# frozen_string_literal: true module WaterDrop # Patches to external components module Patches # `Rdkafka::Producer` patches module RdkafkaProducer # Errors upon which we want to retry message production # @note Since production happens async, those errors should only occur when using # partition_key, thus only then we handle them RETRYABLES = %w[ leader_not_available err_not_leader_for_partition invalid_replication_factor transport timed_out ].freeze # How many attempts do we want to make before re-raising the error MAX_ATTEMPTS = 5 private_constant :RETRYABLES, :MAX_ATTEMPTS # @param args [Object] anything `Rdkafka::Producer#produce` accepts # # @note This can be removed once this: https://github.com/appsignal/rdkafka-ruby/issues/163 # is resolved. def produce(**args) attempt ||= 0 attempt += 1 super rescue Rdkafka::RdkafkaError => e raise unless args.key?(:partition_key) # We care only about specific errors # https://docs.confluent.io/platform/current/clients/librdkafka/html/md_INTRODUCTION.html raise unless RETRYABLES.any? { |message| e.message.to_s.include?(message) } raise if attempt > MAX_ATTEMPTS max_sleep = 2**attempt / 10.0 sleep rand(0.01..max_sleep) retry end end end end Rdkafka::Producer.prepend(WaterDrop::Patches::RdkafkaProducer)
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
waterdrop-2.0.2 | lib/water_drop/patches/rdkafka_producer.rb |