lib/deimos/producer.rb in deimos-ruby-1.16.3 vs lib/deimos/producer.rb in deimos-ruby-1.16.4

- old
+ new

@@ -9,11 +9,12 @@ module Deimos class << self # Run a block without allowing any messages to be produced to Kafka. # Optionally add a list of producer classes to limit the disabling to those # classes. - # @param producer_classes [Array<Class>|Class] + # @param producer_classes [Array<Class>, Class] + # @return [void] def disable_producers(*producer_classes, &block) if producer_classes.any? _disable_producer_classes(producer_classes, &block) return end @@ -29,11 +30,11 @@ ensure Thread.current[:frk_disable_all_producers] = false end end - # :nodoc: + # @!visibility private def _disable_producer_classes(producer_classes) Thread.current[:frk_disabled_producers] ||= Set.new producers_to_disable = producer_classes - Thread.current[:frk_disabled_producers].to_a Thread.current[:frk_disabled_producers] += producers_to_disable @@ -41,10 +42,11 @@ Thread.current[:frk_disabled_producers] -= producers_to_disable end # Are producers disabled? If a class is passed in, check only that class. # Otherwise check if the global disable flag is set. + # @param producer_class [Class] # @return [Boolean] def producers_disabled?(producer_class=nil) Thread.current[:frk_disable_all_producers] || Thread.current[:frk_disabled_producers]&.include?(producer_class) end @@ -52,10 +54,11 @@ # Producer to publish messages to a given kafka topic. class Producer include SharedConfig + # @return [Integer] MAX_BATCH_SIZE = 500 class << self # @return [Hash] @@ -85,23 +88,25 @@ def partition_key(_payload) nil end # Publish the payload to the topic. - # @param payload [Hash|SchemaClass::Record] with an optional payload_key hash key. + # @param payload [Hash, SchemaClass::Record] with an optional payload_key hash key. # @param topic [String] if specifying the topic + # @return [void] def publish(payload, topic: self.topic) publish_list([payload], topic: topic) end # Publish a list of messages. - # @param payloads [Array<Hash|SchemaClass::Record>] with optional payload_key hash key. + # @param payloads [Array<Hash, SchemaClass::Record>] with optional payload_key hash key. # @param sync [Boolean] if given, override the default setting of # whether to publish synchronously. # @param force_send [Boolean] if true, ignore the configured backend # and send immediately to Kafka. # @param topic [String] if specifying the topic + # @return [void] def publish_list(payloads, sync: nil, force_send: false, topic: self.topic) return if Deimos.config.kafka.seed_brokers.blank? || Deimos.config.producers.disabled || Deimos.producers_disabled?(self) @@ -122,11 +127,11 @@ end end # @param sync [Boolean] # @param force_send [Boolean] - # @return [Class < Deimos::Backend] + # @return [Class<Deimos::Backends::Base>] def determine_backend_class(sync, force_send) backend = if force_send :kafka else Deimos.config.producers.backend @@ -138,11 +143,12 @@ end "Deimos::Backends::#{backend.to_s.classify}".constantize end # Send a batch to the backend. - # @param backend [Class < Deimos::Backend] + # @param backend [Class<Deimos::Backends::Base>] # @param batch [Array<Deimos::Message>] + # @return [void] def produce_batch(backend, batch) backend.publish(producer_class: self, messages: batch) end # @return [Deimos::SchemaBackends::Base]