lib/deimos/producer.rb in deimos-ruby-1.16.3 vs lib/deimos/producer.rb in deimos-ruby-1.16.4
- old
+ new
@@ -9,11 +9,12 @@
module Deimos
class << self
# Run a block without allowing any messages to be produced to Kafka.
# Optionally add a list of producer classes to limit the disabling to those
# classes.
- # @param producer_classes [Array<Class>|Class]
+ # @param producer_classes [Array<Class>, Class]
+ # @return [void]
def disable_producers(*producer_classes, &block)
if producer_classes.any?
_disable_producer_classes(producer_classes, &block)
return
end
@@ -29,11 +30,11 @@
ensure
Thread.current[:frk_disable_all_producers] = false
end
end
- # :nodoc:
+ # @!visibility private
def _disable_producer_classes(producer_classes)
Thread.current[:frk_disabled_producers] ||= Set.new
producers_to_disable = producer_classes -
Thread.current[:frk_disabled_producers].to_a
Thread.current[:frk_disabled_producers] += producers_to_disable
@@ -41,10 +42,11 @@
Thread.current[:frk_disabled_producers] -= producers_to_disable
end
# Are producers disabled? If a class is passed in, check only that class.
# Otherwise check if the global disable flag is set.
+ # @param producer_class [Class]
# @return [Boolean]
def producers_disabled?(producer_class=nil)
Thread.current[:frk_disable_all_producers] ||
Thread.current[:frk_disabled_producers]&.include?(producer_class)
end
@@ -52,10 +54,11 @@
# Producer to publish messages to a given kafka topic.
class Producer
include SharedConfig
+ # @return [Integer]
MAX_BATCH_SIZE = 500
class << self
# @return [Hash]
@@ -85,23 +88,25 @@
def partition_key(_payload)
nil
end
# Publish the payload to the topic.
- # @param payload [Hash|SchemaClass::Record] with an optional payload_key hash key.
+ # @param payload [Hash, SchemaClass::Record] with an optional payload_key hash key.
# @param topic [String] if specifying the topic
+ # @return [void]
def publish(payload, topic: self.topic)
publish_list([payload], topic: topic)
end
# Publish a list of messages.
- # @param payloads [Array<Hash|SchemaClass::Record>] with optional payload_key hash key.
+ # @param payloads [Array<Hash, SchemaClass::Record>] with optional payload_key hash key.
# @param sync [Boolean] if given, override the default setting of
# whether to publish synchronously.
# @param force_send [Boolean] if true, ignore the configured backend
# and send immediately to Kafka.
# @param topic [String] if specifying the topic
+ # @return [void]
def publish_list(payloads, sync: nil, force_send: false, topic: self.topic)
return if Deimos.config.kafka.seed_brokers.blank? ||
Deimos.config.producers.disabled ||
Deimos.producers_disabled?(self)
@@ -122,11 +127,11 @@
end
end
# @param sync [Boolean]
# @param force_send [Boolean]
- # @return [Class < Deimos::Backend]
+ # @return [Class<Deimos::Backends::Base>]
def determine_backend_class(sync, force_send)
backend = if force_send
:kafka
else
Deimos.config.producers.backend
@@ -138,11 +143,12 @@
end
"Deimos::Backends::#{backend.to_s.classify}".constantize
end
# Send a batch to the backend.
- # @param backend [Class < Deimos::Backend]
+ # @param backend [Class<Deimos::Backends::Base>]
# @param batch [Array<Deimos::Message>]
+ # @return [void]
def produce_batch(backend, batch)
backend.publish(producer_class: self, messages: batch)
end
# @return [Deimos::SchemaBackends::Base]