lib/semantic_logger/appender/kafka.rb in semantic_logger-4.2.0 vs lib/semantic_logger/appender/kafka.rb in semantic_logger-4.2.1

- old
+ new

@@ -17,165 +17,168 @@ # seed_brokers: ["kafka1:9092", "kafka2:9092"], # # # Set an optional client id in order to identify the client to Kafka: # client_id: "my-application", # ) -class SemanticLogger::Appender::Kafka < SemanticLogger::Subscriber - attr_accessor :seed_brokers, :client_id, :connect_timeout, :socket_timeout, - :ssl_ca_cert, :ssl_client_cert, :ssl_client_cert_key, - :delivery_threshold, :delivery_interval, - :topic, :partition, :partition_key, :key +module SemanticLogger + module Appender + class Kafka < SemanticLogger::Subscriber + attr_accessor :seed_brokers, :client_id, :connect_timeout, :socket_timeout, + :ssl_ca_cert, :ssl_client_cert, :ssl_client_cert_key, + :delivery_threshold, :delivery_interval, + :topic, :partition, :partition_key, :key - # Send log messages to Kafka in JSON format. - # - # Kafka Parameters: - # - # seed_brokers: [Array<String>, String] - # The list of brokers used to initialize the client. Either an Array of connections, - # or a comma separated string of connections. - # Connections can either be a string of "port:protocol" or a full URI with a scheme. - # If there's a scheme it's ignored and only host/port are used. - # - # client_id: [String] - # The identifier for this application. - # Default: semantic-logger - # - # topic: [String] - # Topic to publish log messages to. - # Default: 'log_messages' - # - # partition: [Integer] - # The partition that the message should be written to. - # Default: nil - # - # partition_key: [String] - # The key that should be used to assign a partition. - # Default: nil - # - # key: [String] - # The message key. - # Default: nil - # - # connect_timeout: [Integer] - # The timeout setting for connecting to brokers. - # Default: nil - # - # socket_timeout: [Integer] - # The timeout setting for socket connections. - # Default: nil - # - # ssl_ca_cert: [String, Array<String>] - # A PEM encoded CA cert, or an Array of PEM encoded CA certs, to use with a SSL connection. - # Default: nil - # - # ssl_client_cert: [String] - # A PEM encoded client cert to use with a SSL connection. - # Must be used in combination with ssl_client_cert_key. - # Default: nil - # - # ssl_client_cert_key [String] - # A PEM encoded client cert key to use with a SSL connection. - # Must be used in combination with ssl_client_cert. - # Default: nil - # - # delivery_threshold: [Integer] - # Number of messages between triggering a delivery of messages to Apache Kafka. - # Default: 100 - # - # delivery_interval: [Integer] - # Number of seconds between triggering a delivery of messages to Apache Kafka. - # Default: 5 - # - # Semantic Logger Parameters: - # - # level: [:trace | :debug | :info | :warn | :error | :fatal] - # Override the log level for this appender. - # Default: SemanticLogger.default_level - # - # formatter: [Object|Proc|Symbol|Hash] - # An instance of a class that implements #call, or a Proc to be used to format - # the output from this appender - # Default: :raw_json (See: #call) - # - # filter: [Regexp|Proc] - # RegExp: Only include log messages where the class name matches the supplied. - # regular expression. All other messages will be ignored. - # Proc: Only include log messages where the supplied Proc returns true - # The Proc must return true or false. - # - # host: [String] - # Name of this host to appear in log messages. - # Default: SemanticLogger.host - # - # application: [String] - # Name of this application to appear in log messages. - # Default: SemanticLogger.application - def initialize(seed_brokers:, client_id: 'semantic-logger', connect_timeout: nil, socket_timeout: nil, - ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, - topic: 'log_messages', partition: nil, partition_key: nil, key: nil, - delivery_threshold: 100, delivery_interval: 10, - level: nil, formatter: nil, filter: nil, application: nil, host: nil, &block) + # Send log messages to Kafka in JSON format. + # + # Kafka Parameters: + # + # seed_brokers: [Array<String>, String] + # The list of brokers used to initialize the client. Either an Array of connections, + # or a comma separated string of connections. + # Connections can either be a string of "port:protocol" or a full URI with a scheme. + # If there's a scheme it's ignored and only host/port are used. + # + # client_id: [String] + # The identifier for this application. + # Default: semantic-logger + # + # topic: [String] + # Topic to publish log messages to. + # Default: 'log_messages' + # + # partition: [Integer] + # The partition that the message should be written to. + # Default: nil + # + # partition_key: [String] + # The key that should be used to assign a partition. + # Default: nil + # + # key: [String] + # The message key. + # Default: nil + # + # connect_timeout: [Integer] + # The timeout setting for connecting to brokers. + # Default: nil + # + # socket_timeout: [Integer] + # The timeout setting for socket connections. + # Default: nil + # + # ssl_ca_cert: [String, Array<String>] + # A PEM encoded CA cert, or an Array of PEM encoded CA certs, to use with a SSL connection. + # Default: nil + # + # ssl_client_cert: [String] + # A PEM encoded client cert to use with a SSL connection. + # Must be used in combination with ssl_client_cert_key. + # Default: nil + # + # ssl_client_cert_key [String] + # A PEM encoded client cert key to use with a SSL connection. + # Must be used in combination with ssl_client_cert. + # Default: nil + # + # delivery_threshold: [Integer] + # Number of messages between triggering a delivery of messages to Apache Kafka. + # Default: 100 + # + # delivery_interval: [Integer] + # Number of seconds between triggering a delivery of messages to Apache Kafka. + # Default: 5 + # + # Semantic Logger Parameters: + # + # level: [:trace | :debug | :info | :warn | :error | :fatal] + # Override the log level for this appender. + # Default: SemanticLogger.default_level + # + # formatter: [Object|Proc|Symbol|Hash] + # An instance of a class that implements #call, or a Proc to be used to format + # the output from this appender + # Default: :raw_json (See: #call) + # + # filter: [Regexp|Proc] + # RegExp: Only include log messages where the class name matches the supplied. + # regular expression. All other messages will be ignored. + # Proc: Only include log messages where the supplied Proc returns true + # The Proc must return true or false. + # + # host: [String] + # Name of this host to appear in log messages. + # Default: SemanticLogger.host + # + # application: [String] + # Name of this application to appear in log messages. + # Default: SemanticLogger.application + def initialize(seed_brokers:, client_id: 'semantic-logger', connect_timeout: nil, socket_timeout: nil, + ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, + topic: 'log_messages', partition: nil, partition_key: nil, key: nil, + delivery_threshold: 100, delivery_interval: 10, + level: nil, formatter: nil, filter: nil, application: nil, host: nil, &block) - @seed_brokers = seed_brokers - @client_id = @client_id - @connect_timeout = connect_timeout - @socket_timeout = socket_timeout - @ssl_ca_cert = ssl_ca_cert - @ssl_client_cert = ssl_client_cert - @ssl_client_cert_key = ssl_client_cert_key - @topic = topic - @partition = partition - @partition_key = partition_key - @key = key - @delivery_threshold = delivery_threshold - @delivery_interval = delivery_interval + @seed_brokers = seed_brokers + @client_id = client_id + @connect_timeout = connect_timeout + @socket_timeout = socket_timeout + @ssl_ca_cert = ssl_ca_cert + @ssl_client_cert = ssl_client_cert + @ssl_client_cert_key = ssl_client_cert_key + @topic = topic + @partition = partition + @partition_key = partition_key + @key = key + @delivery_threshold = delivery_threshold + @delivery_interval = delivery_interval - super(level: level, formatter: formatter, filter: filter, application: application, host: host, &block) - reopen - end + super(level: level, formatter: formatter, filter: filter, application: application, host: host, &block) + reopen + end - def reopen - @kafka = ::Kafka.new( - seed_brokers: seed_brokers, - client_id: client_id, - connect_timeout: connect_timeout, - socket_timeout: socket_timeout, - ssl_ca_cert: ssl_ca_cert, - ssl_client_cert: ssl_client_cert, - ssl_client_cert_key: ssl_client_cert_key, - logger: logger - ) + def reopen + @kafka = ::Kafka.new( + seed_brokers: seed_brokers, + client_id: client_id, + connect_timeout: connect_timeout, + socket_timeout: socket_timeout, + ssl_ca_cert: ssl_ca_cert, + ssl_client_cert: ssl_client_cert, + ssl_client_cert_key: ssl_client_cert_key, + logger: logger + ) - @producer = @kafka.async_producer( - delivery_threshold: delivery_threshold, - delivery_interval: delivery_interval - ) - end + @producer = @kafka.async_producer( + delivery_threshold: delivery_threshold, + delivery_interval: delivery_interval + ) + end - def close - @producer.shutdown if @producer - @producer = nil - @kafka.close if @kafka - @kafka = nil - end + def close + @producer&.shutdown + @producer = nil + @kafka&.close + @kafka = nil + end - # Forward log messages to Kafka producer thread. - def log(log) - json = formatter.call(log, self) - @producer.produce(json, topic: topic, partition: partition, partition_key: partition_key, key: key) - end + # Forward log messages to Kafka producer thread. + def log(log) + json = formatter.call(log, self) + @producer.produce(json, topic: topic, partition: partition, partition_key: partition_key, key: key) + end - # Use JSON Formatter by default. - def default_formatter - SemanticLogger::Formatters::Json.new - end + # Use JSON Formatter by default. + def default_formatter + SemanticLogger::Formatters::Json.new + end - # Restart producer thread since there is no other way to flush. - def flush - @producer.shutdown - @producer = @kafka.async_producer( - delivery_threshold: delivery_threshold, - delivery_interval: delivery_interval - ) + # Restart producer thread since there is no other way to flush. + def flush + @producer.shutdown + @producer = @kafka.async_producer( + delivery_threshold: delivery_threshold, + delivery_interval: delivery_interval + ) + end + end end - end