Sha256: c7c85b57befa0b6c4999c7cff9177b649bd4b0b9e1e4c2008ac31866f34f3bce

Contents?: true

Size: 936 Bytes

Versions: 7

Compression:

Stored size: 936 Bytes

Contents

# frozen_string_literal: true

# WaterDrop library
module WaterDrop
  # Async producer for messages
  class AsyncProducer < BaseProducer
    # Performs message delivery using deliver_async method
    # @param message [String] message that we want to send to Kafka
    # @param options [Hash] options (including topic) for producer
    # @raise [WaterDrop::Errors::InvalidMessageOptions] raised when message options are
    #   somehow invalid and we cannot perform delivery because of that
    def self.call(message, options)
      attempts_count ||= 0
      attempts_count += 1

      validate!(options)
      return unless WaterDrop.config.deliver

      d_method = WaterDrop.config.raise_on_buffer_overflow ? :deliver_async! : :deliver_async

      DeliveryBoy.send(d_method, message, options)
    rescue Kafka::Error => error
      graceful_attempt?(attempts_count, message, options, error) ? retry : raise(error)
    end
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
waterdrop-1.2.5 lib/water_drop/async_producer.rb
waterdrop-1.2.4 lib/water_drop/async_producer.rb
waterdrop-1.2.3 lib/water_drop/async_producer.rb
waterdrop-1.2.2 lib/water_drop/async_producer.rb
waterdrop-1.2.1 lib/water_drop/async_producer.rb
waterdrop-1.2.0 lib/water_drop/async_producer.rb
waterdrop-1.2.0.beta1 lib/water_drop/async_producer.rb