Sha256: 9024d5b7f329c370470d1d47b77979dac9b84b664f65161364a317339731638e
Contents?: true
Size: 936 Bytes
Versions: 1
Compression:
Stored size: 936 Bytes
Contents
# frozen_string_literal: true # WaterDrop library module WaterDrop # Async producer for messages class AsyncProducer < BaseProducer # Performs message delivery using deliver_async method # @param message [String] message that we want to send to Kafka # @param options [Hash] options (including topic) for producer # @raise [WaterDrop::Errors::InvalidMessageOptions] raised when message options are # somehow invalid and we cannot perform delivery because of that def self.call(message, options) attempts ||= 0 attempts += 1 validate!(options) return unless WaterDrop.config.deliver DeliveryBoy.deliver_async(message, options) rescue Kafka::Error => e if attempts > WaterDrop.config.kafka.max_retries WaterDrop.logger.error e raise e else WaterDrop.logger.warn "Retrying delivery after: #{e}" retry end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
waterdrop-1.0.1 | lib/water_drop/async_producer.rb |