Sha256: ebe4ba4c84298fbb8adc91b70b7ee23e2da433e9cafe9a30ae113b71744b1c88
Contents?: true
Size: 924 Bytes
Versions: 4
Compression:
Stored size: 924 Bytes
Contents
# frozen_string_literal: true # WaterDrop library module WaterDrop # Async producer for messages class AsyncProducer < BaseProducer # Performs message delivery using deliver_async method # @param message [String] message that we want to send to Kafka # @param options [Hash] options (including topic) for producer # @raise [WaterDrop::Errors::InvalidMessageOptions] raised when message options are # somehow invalid and we cannot perform delivery because of that def self.call(message, options) attempts_count ||= 0 attempts_count += 1 validate!(options) return unless WaterDrop.config.deliver d_method = WaterDrop.config.raise_on_buffer_overflow ? :deliver_async! : :deliver_async DeliveryBoy.send(d_method, message, options) rescue Kafka::Error => e graceful_attempt?(attempts_count, message, options, e) ? retry : raise(e) end end end
Version data entries
4 entries across 4 versions & 1 rubygems