Sha256: 9024d5b7f329c370470d1d47b77979dac9b84b664f65161364a317339731638e

Contents?: true

Size: 936 Bytes

Versions: 1

Compression:

Stored size: 936 Bytes

Contents

# frozen_string_literal: true

# WaterDrop library
module WaterDrop
  # Async producer for messages
  class AsyncProducer < BaseProducer
    # Performs message delivery using deliver_async method
    # @param message [String] message that we want to send to Kafka
    # @param options [Hash] options (including topic) for producer
    # @raise [WaterDrop::Errors::InvalidMessageOptions] raised when message options are
    #   somehow invalid and we cannot perform delivery because of that
    def self.call(message, options)
      attempts ||= 0
      attempts += 1

      validate!(options)
      return unless WaterDrop.config.deliver
      DeliveryBoy.deliver_async(message, options)
    rescue Kafka::Error => e
      if attempts > WaterDrop.config.kafka.max_retries
        WaterDrop.logger.error e
        raise e
      else
        WaterDrop.logger.warn "Retrying delivery after: #{e}"
        retry
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
waterdrop-1.0.1 lib/water_drop/async_producer.rb