lib/phobos/producer.rb in phobos-2.1.4 vs lib/phobos/producer.rb in phobos-2.1.5

- old
+ new

@@ -1,48 +1,63 @@ # frozen_string_literal: true module Phobos module Producer + # @!visibility private def self.included(base) base.extend(Phobos::Producer::ClassMethods) end + # @return [Phobos::Producer::PublicAPI] def producer Phobos::Producer::PublicAPI.new(self) end class PublicAPI def initialize(host_obj) @host_obj = host_obj end + # @param topic [String] + # @param payload [String] + # @param key [String] + # @param partition_key [Integer] + # @param headers [Hash] + # @return [void] def publish(topic:, payload:, key: nil, partition_key: nil, headers: nil) class_producer.publish(topic: topic, payload: payload, key: key, partition_key: partition_key, headers: headers) end + # @param topic [String] + # @param payload [String] + # @param key [String] + # @param partition_key [Integer] + # @param headers [Hash] + # @return [void] def async_publish(topic:, payload:, key: nil, partition_key: nil, headers: nil) class_producer.async_publish(topic: topic, payload: payload, key: key, partition_key: partition_key, headers: headers) end - # @param messages [Array(Hash(:topic, :payload, :key, :headers))] + # @param messages [Array<Hash>] # e.g.: [ # { topic: 'A', payload: 'message-1', key: '1', headers: { foo: 'bar' } }, # { topic: 'B', payload: 'message-2', key: '2', headers: { foo: 'bar' } } # ] # def publish_list(messages) class_producer.publish_list(messages) end + # @param messages [Array<Hash>] def async_publish_list(messages) class_producer.async_publish_list(messages) end private @@ -51,96 +66,123 @@ @host_obj.class.producer end end module ClassMethods + # @return [Phobos::Producer::ClassMethods::PublicAPI] def producer Phobos::Producer::ClassMethods::PublicAPI.new end class PublicAPI + # @return [Symbol] NAMESPACE = :phobos_producer_store + # @return [Array<Symbol>] ASYNC_PRODUCER_PARAMS = [:max_queue_size, :delivery_threshold, :delivery_interval].freeze + # @return [Array<Symbol>] INTERNAL_PRODUCER_PARAMS = [:persistent_connections].freeze # This method configures the kafka client used with publish operations # performed by the host class # # @param kafka_client [Kafka::Client] - # + # @return [void] def configure_kafka_client(kafka_client) async_producer_shutdown producer_store[:kafka_client] = kafka_client end + # @return [Kafka::Client] def kafka_client producer_store[:kafka_client] end + # @return [Kafka::Producer] def create_sync_producer client = kafka_client || configure_kafka_client(Phobos.create_kafka_client(:producer)) sync_producer = client.producer(**regular_configs) if Phobos.config.producer_hash[:persistent_connections] producer_store[:sync_producer] = sync_producer end sync_producer end + # @return [Kafka::Producer] def sync_producer producer_store[:sync_producer] end + # @return [void] def sync_producer_shutdown sync_producer&.shutdown producer_store[:sync_producer] = nil end + # @param topic [String] + # @param payload [String] + # @param partition_key [Integer] + # @param headers [Hash] + # @return [void] def publish(topic:, payload:, key: nil, partition_key: nil, headers: nil) publish_list([{ topic: topic, payload: payload, key: key, partition_key: partition_key, headers: headers }]) end + # @param messages [Array<Hash>] + # @return [void] def publish_list(messages) producer = sync_producer || create_sync_producer produce_messages(producer, messages) producer.deliver_messages ensure producer&.shutdown unless Phobos.config.producer_hash[:persistent_connections] end + # @return [Kafka::AsyncProducer] def create_async_producer client = kafka_client || configure_kafka_client(Phobos.create_kafka_client(:producer)) async_producer = client.async_producer(**async_configs) producer_store[:async_producer] = async_producer end + # @return [Kafka::AsyncProducer] def async_producer producer_store[:async_producer] end + # @param topic [String] + # @param payload [String] + # @param partition_key [Integer] + # @param headers [Hash] + # @return [void] def async_publish(topic:, payload:, key: nil, partition_key: nil, headers: nil) async_publish_list([{ topic: topic, payload: payload, key: key, partition_key: partition_key, headers: headers }]) end + # @param messages [Array<Hash>] + # @return [void] def async_publish_list(messages) producer = async_producer || create_async_producer produce_messages(producer, messages) producer.deliver_messages unless async_automatic_delivery? end + # @return [void] def async_producer_shutdown async_producer&.deliver_messages async_producer&.shutdown producer_store[:async_producer] = nil end + # @return [Hash] def regular_configs Phobos.config.producer_hash .reject { |k, _| ASYNC_PRODUCER_PARAMS.include?(k) } .reject { |k, _| INTERNAL_PRODUCER_PARAMS.include?(k) } end + # @return [Hash] def async_configs Phobos.config.producer_hash .reject { |k, _| INTERNAL_PRODUCER_PARAMS.include?(k) } end