# frozen_string_literal: true module Phobos module Producer def self.included(base) base.extend(Phobos::Producer::ClassMethods) end def producer Phobos::Producer::PublicAPI.new(self) end class PublicAPI MissingRequiredArgumentsError = Class.new(StandardError) do def initialize super('You need to provide a topic name and a payload') end end def initialize(host_obj) @host_obj = host_obj end def publish(*args, **kwargs) Phobos.deprecate(deprecate_positional_args_message('publish')) if kwargs.empty? args = normalize_arguments(*args, **kwargs) class_producer.publish(**args) end def async_publish(*args, **kwargs) Phobos.deprecate(deprecate_positional_args_message('async_publish')) if kwargs.empty? args = normalize_arguments(*args, **kwargs) class_producer.async_publish(**args) end # @param messages [Array(Hash(:topic, :payload, :key, :headers))] # e.g.: [ # { topic: 'A', payload: 'message-1', key: '1', headers: { foo: 'bar' } }, # { topic: 'B', payload: 'message-2', key: '2', headers: { foo: 'bar' } } # ] # def publish_list(messages) class_producer.publish_list(messages) end def async_publish_list(messages) class_producer.async_publish_list(messages) end private def class_producer @host_obj.class.producer end # rubocop:disable Metrics/ParameterLists def normalize_arguments(p_topic = nil, p_payload = nil, p_key = nil, p_partition_key = nil, p_headers = {}, **kwargs) {}.tap do |args| { topic: p_topic, payload: p_payload, key: p_key, partition_key: p_partition_key, headers: p_headers }.each { |k, v| args[k] = kwargs[k] || v } raise MissingRequiredArgumentsError if [:topic, :payload].any? { |k| args[k].nil? } kwargs.each do |k, v| next if args.key?(k) args[:headers][k] = v end end end # rubocop:enable Metrics/ParameterLists def deprecate_positional_args_message(method_name) "The `#{method_name}` method should now receive keyword arguments " \ 'rather than positional ones. Please update your publishers. This will ' \ 'not be backwards compatible in the future.' end end module ClassMethods def producer Phobos::Producer::ClassMethods::PublicAPI.new end class PublicAPI NAMESPACE = :phobos_producer_store ASYNC_PRODUCER_PARAMS = [:max_queue_size, :delivery_threshold, :delivery_interval].freeze INTERNAL_PRODUCER_PARAMS = [:persistent_connections].freeze # This method configures the kafka client used with publish operations # performed by the host class # # @param kafka_client [Kafka::Client] # def configure_kafka_client(kafka_client) async_producer_shutdown producer_store[:kafka_client] = kafka_client end def kafka_client producer_store[:kafka_client] end def create_sync_producer client = kafka_client || configure_kafka_client(Phobos.create_kafka_client) sync_producer = client.producer(regular_configs) if Phobos.config.producer_hash[:persistent_connections] producer_store[:sync_producer] = sync_producer end sync_producer end def sync_producer producer_store[:sync_producer] end def sync_producer_shutdown sync_producer&.shutdown producer_store[:sync_producer] = nil end def publish(topic:, payload:, key: nil, partition_key: nil, headers: nil) publish_list([{ topic: topic, payload: payload, key: key, partition_key: partition_key, headers: headers }]) end def publish_list(messages) producer = sync_producer || create_sync_producer produce_messages(producer, messages) producer.deliver_messages ensure producer&.shutdown unless Phobos.config.producer_hash[:persistent_connections] end def create_async_producer client = kafka_client || configure_kafka_client(Phobos.create_kafka_client) async_producer = client.async_producer(async_configs) producer_store[:async_producer] = async_producer end def async_producer producer_store[:async_producer] end def async_publish(topic:, payload:, key: nil, partition_key: nil, headers: nil) async_publish_list([{ topic: topic, payload: payload, key: key, partition_key: partition_key, headers: headers }]) end def async_publish_list(messages) producer = async_producer || create_async_producer produce_messages(producer, messages) producer.deliver_messages unless async_automatic_delivery? end def async_producer_shutdown async_producer&.deliver_messages async_producer&.shutdown producer_store[:async_producer] = nil end def regular_configs Phobos.config.producer_hash .reject { |k, _| ASYNC_PRODUCER_PARAMS.include?(k) } .reject { |k, _| INTERNAL_PRODUCER_PARAMS.include?(k) } end def async_configs Phobos.config.producer_hash .reject { |k, _| INTERNAL_PRODUCER_PARAMS.include?(k) } end private def produce_messages(producer, messages) messages.each do |message| partition_key = message[:partition_key] || message[:key] producer.produce(message[:payload], topic: message[:topic], key: message[:key], headers: message[:headers], partition_key: partition_key) end end def async_automatic_delivery? async_configs.fetch(:delivery_threshold, 0).positive? || async_configs.fetch(:delivery_interval, 0).positive? end def producer_store Thread.current[NAMESPACE] ||= {} end end end end end