# frozen_string_literal: true module Rimless # The top-level Apache Kafka helpers. module KafkaHelpers extend ActiveSupport::Concern # rubocop:disable Metrics/BlockLength because its an Active Support concern class_methods do # Generate a common topic name for Apache Kafka while taking care of # configured prefixes. # # @param args [Array] the relative topic name # @return [String] the complete topic name # # @example Name only # Rimless.topic(:users) # @example Name with app # Rimless.topic(:users, app: 'test-api') # @example Mix and match # Rimless.topic(name: 'test', app: :fancy_app) # # rubocop:disable Metrics/AbcSize because of the usage flexibility def topic(*args) opts = args.last name = args.first if [String, Symbol].member?(args.first.class) if opts.is_a?(Hash) name = opts[:name] if opts.key?(:name) app = opts[:app] if opts.key?(:app) end name ||= nil app ||= Rimless.configuration.app_name raise ArgumentError, 'No name given' if name.nil? "#{Rimless.topic_prefix(app)}#{name}" end # rubocop:enable Metrics/AbcSize # Send a single message to Apache Kafka. The data is encoded according to # the given Apache Avro schema. The destination Kafka topic may be a # relative name, or a hash which is passed to the +.topic+ method to # manipulate the application details. The message is send is a # synchronous, blocking way. # # @param data [Hash{Symbol => Mixed}] the raw data, unencoded # @param schema [String, Symbol] the Apache Avro schema to use # @param topic [String, Symbol, Hash{Symbol => Mixed}] the destination # Apache Kafka topic def sync_message(data:, schema:, topic:, **args) encoded = Rimless.encode(data, schema: schema) sync_raw_message(data: encoded, topic: topic, **args) end alias_method :message, :sync_message # Send a single message to Apache Kafka. The data is encoded according to # the given Apache Avro schema. The destination Kafka topic may be a # relative name, or a hash which is passed to the +.topic+ method to # manipulate the application details. The message is send is an # asynchronous, non-blocking way. # # @param data [Hash{Symbol => Mixed}] the raw data, unencoded # @param schema [String, Symbol] the Apache Avro schema to use # @param topic [String, Symbol, Hash{Symbol => Mixed}] the destination # Apache Kafka topic def async_message(data:, schema:, topic:, **args) encoded = Rimless.encode(data, schema: schema) async_raw_message(data: encoded, topic: topic, **args) end # Send a single message to Apache Kafka. The data is not touched, so you # need to encode it yourself before you pass it in. The destination Kafka # topic may be a relative name, or a hash which is passed to the +.topic+ # method to manipulate the application details. The message is send is a # synchronous, blocking way. # # @param data [Hash{Symbol => Mixed}] the raw data, unencoded # @param topic [String, Symbol, Hash{Symbol => Mixed}] the destination # Apache Kafka topic def sync_raw_message(data:, topic:, **args) args = args.merge(topic: topic(topic)) WaterDrop::SyncProducer.call(data, **args) end alias_method :raw_message, :sync_raw_message # Send a single message to Apache Kafka. The data is not touched, so you # need to encode it yourself before you pass it in. The destination Kafka # topic may be a relative name, or a hash which is passed to the +.topic+ # method to manipulate the application details. The message is send is an # asynchronous, non-blocking way. # # @param data [Hash{Symbol => Mixed}] the raw data, unencoded # @param topic [String, Symbol, Hash{Symbol => Mixed}] the destination # Apache Kafka topic def async_raw_message(data:, topic:, **args) args = args.merge(topic: topic(topic)) WaterDrop::AsyncProducer.call(data, **args) end end # rubocop:enable Metrics/BlockLength end end