lib/deimos/producer.rb in deimos-ruby-1.24.2 vs lib/deimos/producer.rb in deimos-ruby-2.0.0.pre.alpha1
- old
+ new
@@ -1,15 +1,15 @@
# frozen_string_literal: true
require 'deimos/message'
require 'deimos/shared_config'
-require 'phobos/producer'
require 'active_support/notifications'
# :nodoc:
module Deimos
class << self
+
# Run a block without allowing any messages to be produced to Kafka.
# Optionally add a list of producer classes to limit the disabling to those
# classes.
# @param producer_classes [Array<Class>, Class]
# @return [void]
@@ -48,10 +48,12 @@
# Are producers disabled? If a class is passed in, check only that class.
# Otherwise check if the global disable flag is set.
# @param producer_class [Class]
# @return [Boolean]
def producers_disabled?(producer_class=nil)
+ return true if Deimos.config.producers.disabled
+
Thread.current[:frk_disable_all_producers] ||
Thread.current[:frk_disabled_producers]&.include?(producer_class)
end
end
@@ -62,30 +64,10 @@
# @return [Integer]
MAX_BATCH_SIZE = 500
class << self
- # @return [Hash]
- def config
- @config ||= {
- encode_key: true,
- namespace: Deimos.config.producers.schema_namespace
- }
- end
-
- # Set the topic.
- # @param topic [String]
- # @return [String] the current topic if no argument given.
- def topic(topic=nil)
- if topic
- config[:topic] = topic
- return
- end
- # accessor
- "#{Deimos.config.producers.topic_prefix}#{config[:topic]}"
- end
-
# Override the default partition key (which is the payload key).
# @param _payload [Hash] the payload being passed into the produce method.
# Will include `payload_key` if it is part of the original payload.
# @return [String]
def partition_key(_payload)
@@ -96,48 +78,63 @@
# @param payload [Hash, SchemaClass::Record] with an optional payload_key hash key.
# @param topic [String] if specifying the topic
# @param headers [Hash] if specifying headers
# @return [void]
def publish(payload, topic: self.topic, headers: nil)
- publish_list([payload], topic: topic, headers: headers)
+ produce([{payload: payload, topic: topic, headers: headers}])
end
+ # Produce a list of messages in WaterDrop message hash format.
+ # @param messages [Array<Hash>]
+ # @param backend [Class < Deimos::Backend]
+ def produce(messages, backend: determine_backend_class)
+ return if Deimos.producers_disabled?(self)
+
+ messages.each do |m|
+ m[:label] = m
+ m[:partition_key] ||= self.partition_key(m[:payload])
+ end
+ messages.in_groups_of(MAX_BATCH_SIZE, false) do |batch|
+ self.produce_batch(backend, batch)
+ end
+ end
+
# Publish a list of messages.
# @param payloads [Array<Hash, SchemaClass::Record>] with optional payload_key hash key.
# @param sync [Boolean] if given, override the default setting of
# whether to publish synchronously.
# @param force_send [Boolean] if true, ignore the configured backend
# and send immediately to Kafka.
# @param topic [String] if specifying the topic
# @param headers [Hash] if specifying headers
# @return [void]
def publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil)
- return if Deimos.config.kafka.seed_brokers.blank? ||
- Deimos.config.producers.disabled ||
- Deimos.producers_disabled?(self)
+ backend = determine_backend_class(sync, force_send)
- raise 'Topic not specified. Please specify the topic.' if topic.blank?
-
- backend_class = determine_backend_class(sync, force_send)
- Deimos.instrument(
- 'encode_messages',
- producer: self,
- topic: topic,
- payloads: payloads
- ) do
- messages = Array(payloads).map { |p| Deimos::Message.new(p.to_h, self, headers: headers) }
- messages.each { |m| _process_message(m, topic) }
- messages.in_groups_of(MAX_BATCH_SIZE, false) do |batch|
- self.produce_batch(backend_class, batch)
- end
+ messages = Array(payloads).map do |p|
+ {
+ payload: p&.to_h,
+ headers: headers,
+ topic: topic,
+ partition_key: self.partition_key(p)
+ }
end
+ self.produce(messages, backend: backend)
end
+ def karafka_config
+ Deimos.karafka_configs.find { |topic| topic.producer_class == self }
+ end
+
+ def topic
+ karafka_config.name
+ end
+
# @param sync [Boolean]
# @param force_send [Boolean]
# @return [Class<Deimos::Backends::Base>]
- def determine_backend_class(sync, force_send)
+ def determine_backend_class(sync=false, force_send=false)
backend = if force_send
:kafka
else
Deimos.config.producers.backend
end
@@ -149,88 +146,14 @@
"Deimos::Backends::#{backend.to_s.classify}".constantize
end
# Send a batch to the backend.
# @param backend [Class<Deimos::Backends::Base>]
- # @param batch [Array<Deimos::Message>]
+ # @param batch [Array<Hash>]
# @return [void]
def produce_batch(backend, batch)
backend.publish(producer_class: self, messages: batch)
end
- # @return [Deimos::SchemaBackends::Base]
- def encoder
- @encoder ||= Deimos.schema_backend(schema: config[:schema],
- namespace: config[:namespace])
- end
-
- # @return [Deimos::SchemaBackends::Base]
- def key_encoder
- @key_encoder ||= Deimos.schema_backend(schema: config[:key_schema],
- namespace: config[:namespace])
- end
-
- # Override this in active record producers to add
- # non-schema fields to check for updates
- # @return [Array<String>] fields to check for updates
- def watched_attributes
- self.encoder.schema_fields.map(&:name)
- end
-
- private
-
- # @param message [Message]
- # @param topic [String]
- def _process_message(message, topic)
- # this violates the Law of Demeter but it has to happen in a very
- # specific order and requires a bunch of methods on the producer
- # to work correctly.
- message.add_fields(encoder.schema_fields.map(&:name))
- message.partition_key = self.partition_key(message.payload)
- message.key = _retrieve_key(message.payload)
- # need to do this before _coerce_fields because that might result
- # in an empty payload which is an *error* whereas this is intended.
- message.payload = nil if message.payload.blank?
- message.coerce_fields(encoder)
- message.encoded_key = _encode_key(message.key)
- message.topic = topic
- message.encoded_payload = if message.payload.nil?
- nil
- else
- encoder.encode(message.payload,
- topic: "#{Deimos.config.producers.topic_prefix}#{config[:topic]}-value")
- end
- end
-
- # @param key [Object]
- # @return [String|Object]
- def _encode_key(key)
- if key.nil?
- return nil if config[:no_keys] # no key is fine, otherwise it's a problem
-
- raise 'No key given but a key is required! Use `key_config none: true` to avoid using keys.'
- end
- if config[:encode_key] && config[:key_field].nil? &&
- config[:key_schema].nil?
- raise 'No key config given - if you are not encoding keys, please use `key_config plain: true`'
- end
-
- if config[:key_field]
- encoder.encode_key(config[:key_field], key, topic: "#{Deimos.config.producers.topic_prefix}#{config[:topic]}-key")
- elsif config[:key_schema]
- key_encoder.encode(key, topic: "#{Deimos.config.producers.topic_prefix}#{config[:topic]}-key")
- else
- key
- end
- end
-
- # @param payload [Hash]
- # @return [String]
- def _retrieve_key(payload)
- key = payload.delete(:payload_key)
- return key if key
-
- config[:key_field] ? payload[config[:key_field]] : nil
- end
end
end
end