# frozen_string_literal: true module Rimless # The global rimless Apache Kafka consumer application based on # the Karafka framework. # # rubocop:disable Style/ClassVars because we just work as a singleton class ConsumerApp < ::Karafka::App # We track our own initialization with this class variable @@rimless_initialized = false class << self # Initialize the Karafka framework and our global consumer application # with all our conventions/opinions. # # @return [Rimless::ConsumerApp] our self for chaining def initialize! # When already initialized, skip it return self if @@rimless_initialized # Initialize all the parts one by one initialize_rails! initialize_monitors! initialize_karafka! initialize_logger! initialize_code_reload! # Load the custom Karafka boot file when it exists, it contains # custom configurations and the topic/consumer routing table require ::Karafka.boot_file if ::Karafka.boot_file.exist? # Set our custom initialization process as completed to # skip subsequent calls @@rimless_initialized = true self end # Check if Rails is available and not already initialized, then # initialize it. def initialize_rails! rails_env = ::Karafka.root.join('config', 'environment.rb') # Stop, when Rails is already initialized return if defined? Rails # Stop, when there is no Rails at all return unless rails_env.exist? ENV['RAILS_ENV'] ||= 'development' ENV['KARAFKA_ENV'] = ENV.fetch('RAILS_ENV', nil) require rails_env Rails.application.eager_load! end # We like to listen to instrumentation and logging events to allow our # users to handle them like they need. def initialize_monitors! [ WaterDrop::Instrumentation::StdoutListener.new, ::Karafka::Instrumentation::StdoutListener.new, ::Karafka::Instrumentation::ProctitleListener.new ].each do |listener| ::Karafka.monitor.subscribe(listener) end end # Configure the pure basics on the Karafka application. # # rubocop:disable Metrics/MethodLength because of the various settings # rubocop:disable Metrics/AbcSize dito def initialize_karafka! setup do |config| mapper = Rimless::Karafka::PassthroughMapper.new config.consumer_mapper = config.topic_mapper = mapper config.deserializer = Rimless::Karafka::AvroDeserializer.new config.kafka.seed_brokers = Rimless.configuration.kafka_brokers config.client_id = Rimless.configuration.client_id config.logger = Rimless.logger config.backend = :sidekiq config.batch_fetching = true config.batch_consuming = false config.shutdown_timeout = 10 end end # rubocop:enable Metrics/MethodLength # rubocop:enable Metrics/AbcSize # When we run in development mode, we want to write the logs # to file and to stdout. def initialize_logger! # Skip when configured not to extend the logger return unless Rimless.configuration.extend_dev_logger # Skip when not in development environment or in the server process return unless Rimless.env.development? && server? $stdout.sync = true Rimless.logger.extend(ActiveSupport::Logger.broadcast( ActiveSupport::Logger.new($stdout) )) end # Perform code hot-reloading when we are in Rails and in development # mode. def initialize_code_reload! return unless defined?(Rails) && Rails.env.development? ::Karafka.monitor.subscribe(::Karafka::CodeReloader.new( *Rails.application.reloaders )) end # Allows the user to re-configure the Karafka application if this is # needed. (eg. to set some ruby-kafka driver default settings, etc) # # @return [Rimless::ConsumerApp] our self for chaining def configure(&block) setup(&block) self end # Configure the topics-consumer routing table in a lean way. # # Examples: # # topics({ app: :test_app, name: :admins } => YourConsumer) # topics({ app: :test_app, names: %i[users admins] } => YourConsumer) # # @param topics [Hash{Hash => Class}] the topic to consumer mapping # # rubocop:disable Metrics/MethodLength because of the Karafka DSL def topics(topics) consumer_groups.draw do consumer_group Rimless.configuration.client_id do topics.each do |topic_parts, dest_consumer| Rimless.consumer.topic_names(topic_parts).each do |topic_name| topic(topic_name) do consumer dest_consumer worker Rimless::ConsumerJob interchanger Rimless::Karafka::Base64Interchanger.new end end end end end self end # rubocop:enable Metrics/MethodLength # Build the conventional Apache Kafka topic names from the given parts. # This allows various forms like single strings/symbols and a hash in the # form of +{ app: [String, Symbol], name: [String, Symbol], names: # [Array] }+. This allows the maximum of flexibility. # # @param parts [String, Symbol, Hash{Symbol => Mixed}] the topic # name parts # @return [Array] the full topic names def topic_names(parts) # We have a single app, but multiple names so we handle them if parts.is_a?(Hash) && parts.key?(:names) return parts[:names].map do |name| Rimless.topic(parts.merge(name: name)) end end # We cannot handle the given input [Rimless.topic(parts)] end # Check if we run as the Karafka server (consumer) process or not. # # @return [Boolean] whenever we run as the Karafka server or not def server? $PROGRAM_NAME.end_with?('karafka') && ARGV.include?('server') end end end # rubocop:enable Style/ClassVars # A rimless top-level concern which adds lean access to # the consumer application. module Consumer extend ActiveSupport::Concern class_methods do # A simple shortcut to fetch the Karafka consumer application. # # @return [Rimless::ConsumerApp] the Karafka consumer application class def consumer ConsumerApp.initialize! end end end end