# frozen_string_literal: true require "tmpdir" require "king_konf" require "racecar/liveness_probe" require "racecar/instrumenter" require "racecar/rebalance_listener" module Racecar class Config < KingKonf::Config env_prefix :racecar STATISTICS_DISABLED_VALUE = 0 desc "A list of Kafka brokers in the cluster that you're consuming from" list :brokers, default: ["localhost:9092"] desc "A string used to identify the client in logs and metrics" string :client_id, default: "racecar" desc "How frequently to commit offset positions" float :offset_commit_interval, default: 10 desc "How often to send a heartbeat message to Kafka" float :heartbeat_interval, default: 10 desc "The minimum number of messages in the local consumer queue" integer :min_message_queue_size, default: 2000 desc "Which partition assignment strategy to use, range, roundrobin or cooperative-sticky. -- https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md" string :partition_assignment_strategy, default: "range,roundrobin" desc "Kafka consumer configuration options, separated with '=' -- https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md" list :consumer, default: [] desc "Kafka producer configuration options, separated with '=' -- https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md" list :producer, default: [] desc "The maxium number of messages that get consumed within one batch" integer :fetch_messages, default: 1000 desc "Minimum number of bytes the broker responds with" integer :fetch_min_bytes, default: 1 desc "Automatically store offset of last message provided to application" boolean :synchronous_commits, default: false desc "How long to pause a partition for if the consumer raises an exception while processing a message -- set to -1 to pause indefinitely" float :pause_timeout, default: 10 desc "When `pause_timeout` and `pause_with_exponential_backoff` are configured, this sets an upper limit on the pause duration" float :max_pause_timeout, default: nil desc "Whether to exponentially increase the pause timeout on successive errors -- the timeout is doubled each time" boolean :pause_with_exponential_backoff, default: false desc "The idle timeout after which a consumer is kicked out of the group" float :session_timeout, default: 30 desc "The maximum time between two message fetches before the consumer is kicked out of the group (in seconds)" integer :max_poll_interval, default: 5*60 desc "How long to wait when trying to communicate with a Kafka broker" float :socket_timeout, default: 30 desc "How long to allow the Kafka brokers to wait before returning messages (in seconds)" float :max_wait_time, default: 1 desc "How long to try to deliver a produced message before finally giving up (in seconds)" float :message_timeout, default: 5*60 desc "Maximum amount of data the broker shall return for a Fetch request" integer :max_bytes, default: 10485760 desc "A prefix used when generating consumer group names" string :group_id_prefix desc "The group id to use for a given group of consumers" string :group_id desc "A filename that log messages should be written to" string :logfile desc "The log level for the Racecar logs" string :log_level, default: "info" desc "The strategy used to determine which topic partition a message is written to when Racecar produces a value to Kafka; defaults to `consistent_random`" symbol :partitioner, allowed_values: %i{consistent consistent_random murmur2 murmur2_random fnv1a fnv1a_random}, default: :consistent_random desc "Protocol used to communicate with brokers" symbol :security_protocol, allowed_values: %i{plaintext ssl sasl_plaintext sasl_ssl} desc "File or directory path to CA certificate(s) for verifying the broker's key" string :ssl_ca_location desc "Path to CRL for verifying broker's certificate validity" string :ssl_crl_location desc "Path to client's keystore (PKCS#12) used for authentication" string :ssl_keystore_location desc "Client's keystore (PKCS#12) password" string :ssl_keystore_password desc "Path to the certificate used for authentication" string :ssl_certificate_location desc "Path to client's certificate used for authentication" string :ssl_key_location desc "Client's certificate password" string :ssl_key_password desc "SASL mechanism to use for authentication" string :sasl_mechanism, allowed_values: %w{GSSAPI PLAIN SCRAM-SHA-256 SCRAM-SHA-512} desc "Kerberos principal name that Kafka runs as, not including /hostname@REALM" string :sasl_kerberos_service_name desc "This client's Kerberos principal name" string :sasl_kerberos_principal desc "Full kerberos kinit command string, %{config.prop.name} is replaced by corresponding config object value, %{broker.name} returns the broker's hostname" string :sasl_kerberos_kinit_cmd desc "Path to Kerberos keytab file. Uses system default if not set" string :sasl_kerberos_keytab desc "Minimum time in milliseconds between key refresh attempts" integer :sasl_kerberos_min_time_before_relogin desc "SASL username for use with the PLAIN and SASL-SCRAM-.. mechanism" string :sasl_username desc "SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism" string :sasl_password desc "Whether to use SASL over SSL." boolean :sasl_over_ssl, default: true desc "The file in which to store the Racecar process' PID when daemonized" string :pidfile desc "Run the Racecar process in the background as a daemon" boolean :daemonize, default: false desc "The codec used to compress messages with" symbol :producer_compression_codec, allowed_values: %i{none lz4 snappy gzip} desc "Enable Datadog metrics" boolean :datadog_enabled, default: false desc "The host running the Datadog agent" string :datadog_host desc "The port of the Datadog agent" integer :datadog_port desc "The namespace to use for Datadog metrics" string :datadog_namespace desc "Tags that should always be set on Datadog metrics" list :datadog_tags desc "Whether to check the server certificate is valid for the hostname" boolean :ssl_verify_hostname, default: true desc "Whether to boot Rails when starting the consumer" boolean :without_rails, default: false desc "How frequently librdkafka should report statistics to your application (in seconds). A statistics callback must also be provided. This should be defined with a `statistics_callback` method on your processor. Stats are disabled if this value is set to 0, or there is no callback defined. This is set by default to 1 second for backward compatibility, however this can be quite memory intensive" integer :statistics_interval, default: 1 desc "Whether to enable liveness probe behavior (touch the file)" boolean :liveness_probe_enabled, default: false desc "Path to a file Racecar will touch to show liveness" string :liveness_probe_file_path, default: "#{Dir.tmpdir}/racecar-liveness" desc "Used only by the liveness probe: Max time (in seconds) between liveness events before the process is considered not healthy" integer :liveness_probe_max_interval, default: 5 desc "Strategy for switching topics when there are multiple subscriptions. `exhaust-topic` will only switch when the consumer poll returns no messages. `round-robin` will switch after each poll regardless.\nWarning: `round-robin` will be the default in Racecar 3.x" string :multi_subscription_strategy, allowed_values: %w(round-robin exhaust-topic), default: "exhaust-topic" # The error handler must be set directly on the object. attr_reader :error_handler attr_accessor :subscriptions, :logger, :parallel_workers def statistics_interval_ms if Rdkafka::Config.statistics_callback statistics_interval * 1000 else STATISTICS_DISABLED_VALUE end end def max_wait_time_ms max_wait_time * 1000 end def initialize(env: ENV) super(env: env) @error_handler = proc {} @subscriptions = [] @logger = Logger.new(STDOUT) end def inspect self.class.variables .map(&:name) .map {|key| [key, get(key).inspect].join(" = ") } .join("\n") end def validate! if brokers.empty? raise ConfigError, "`brokers` must not be empty" end if socket_timeout <= max_wait_time raise ConfigError, "`socket_timeout` must be longer than `max_wait_time`" end if max_pause_timeout && !pause_with_exponential_backoff? raise ConfigError, "`max_pause_timeout` only makes sense when `pause_with_exponential_backoff` is enabled" end end def load_consumer_class(consumer_class) self.consumer_class = consumer_class self.group_id = consumer_class.group_id || self.group_id self.group_id ||= [ # Configurable and optional prefix: group_id_prefix, # MyFunnyConsumer => my-funny-consumer consumer_class.name.gsub(/[a-z][A-Z]/) { |str| "#{str[0]}-#{str[1]}" }.downcase, ].compact.join self.parallel_workers = consumer_class.parallel_workers self.subscriptions = consumer_class.subscriptions self.max_wait_time = consumer_class.max_wait_time || self.max_wait_time self.fetch_messages = consumer_class.fetch_messages || self.fetch_messages self.pidfile ||= "#{group_id}.pid" end attr_accessor :consumer_class def on_error(&handler) @error_handler = handler end def rdkafka_consumer consumer_config = consumer.map do |param| param.split("=", 2).map(&:strip) end.to_h consumer_config.merge!(rdkafka_security_config) consumer_config end def rdkafka_producer producer_config = producer.map do |param| param.split("=", 2).map(&:strip) end.to_h producer_config.merge!(rdkafka_security_config) producer_config end def instrumenter @instrumenter ||= begin default_payload = { client_id: client_id, group_id: group_id } if defined?(ActiveSupport::Notifications) # ActiveSupport needs `concurrent-ruby` but doesn't `require` it. require 'concurrent/utility/monotonic_time' Instrumenter.new(backend: ActiveSupport::Notifications, default_payload: default_payload) else logger.warn "ActiveSupport::Notifications not available, instrumentation is disabled" NullInstrumenter end end end attr_writer :instrumenter def install_liveness_probe liveness_probe.tap(&:install) end def liveness_probe require "active_support/notifications" @liveness_probe ||= LivenessProbe.new( ActiveSupport::Notifications, liveness_probe_file_path, liveness_probe_max_interval ) end private def rdkafka_security_config { "security.protocol" => security_protocol, "enable.ssl.certificate.verification" => ssl_verify_hostname, "ssl.ca.location" => ssl_ca_location, "ssl.crl.location" => ssl_crl_location, "ssl.keystore.location" => ssl_keystore_location, "ssl.keystore.password" => ssl_keystore_password, "ssl.certificate.location" => ssl_certificate_location, "ssl.key.location" => ssl_key_location, "ssl.key.password" => ssl_key_password, "sasl.mechanism" => sasl_mechanism, "sasl.kerberos.service.name" => sasl_kerberos_service_name, "sasl.kerberos.principal" => sasl_kerberos_principal, "sasl.kerberos.kinit.cmd" => sasl_kerberos_kinit_cmd, "sasl.kerberos.keytab" => sasl_kerberos_keytab, "sasl.kerberos.min.time.before.relogin" => sasl_kerberos_min_time_before_relogin, "sasl.username" => sasl_username, "sasl.password" => sasl_password, }.compact end end end