Sha256: 7d2be320c48fc2613632b5094b3b0da5d2aaa6b141329238769725cdd0d747e2
Contents?: true
Size: 832 Bytes
Versions: 7
Compression:
Stored size: 832 Bytes
Contents
# frozen_string_literal: true # WaterDrop library module WaterDrop # Sync producer for messages class SyncProducer < BaseProducer # Performs message delivery using deliver_async method # @param message [String] message that we want to send to Kafka # @param options [Hash] options (including topic) for producer # @raise [WaterDrop::Errors::InvalidMessageOptions] raised when message options are # somehow invalid and we cannot perform delivery because of that def self.call(message, options) attempts_count ||= 0 attempts_count += 1 validate!(options) return unless WaterDrop.config.deliver DeliveryBoy.deliver(message, options) rescue Kafka::Error => error graceful_attempt?(attempts_count, message, options, error) ? retry : raise(error) end end end
Version data entries
7 entries across 7 versions & 1 rubygems