lib/semantic_logger/appender/kafka.rb in semantic_logger-4.2.0 vs lib/semantic_logger/appender/kafka.rb in semantic_logger-4.2.1
- old
+ new
@@ -17,165 +17,168 @@
# seed_brokers: ["kafka1:9092", "kafka2:9092"],
#
# # Set an optional client id in order to identify the client to Kafka:
# client_id: "my-application",
# )
-class SemanticLogger::Appender::Kafka < SemanticLogger::Subscriber
- attr_accessor :seed_brokers, :client_id, :connect_timeout, :socket_timeout,
- :ssl_ca_cert, :ssl_client_cert, :ssl_client_cert_key,
- :delivery_threshold, :delivery_interval,
- :topic, :partition, :partition_key, :key
+module SemanticLogger
+ module Appender
+ class Kafka < SemanticLogger::Subscriber
+ attr_accessor :seed_brokers, :client_id, :connect_timeout, :socket_timeout,
+ :ssl_ca_cert, :ssl_client_cert, :ssl_client_cert_key,
+ :delivery_threshold, :delivery_interval,
+ :topic, :partition, :partition_key, :key
- # Send log messages to Kafka in JSON format.
- #
- # Kafka Parameters:
- #
- # seed_brokers: [Array<String>, String]
- # The list of brokers used to initialize the client. Either an Array of connections,
- # or a comma separated string of connections.
- # Connections can either be a string of "port:protocol" or a full URI with a scheme.
- # If there's a scheme it's ignored and only host/port are used.
- #
- # client_id: [String]
- # The identifier for this application.
- # Default: semantic-logger
- #
- # topic: [String]
- # Topic to publish log messages to.
- # Default: 'log_messages'
- #
- # partition: [Integer]
- # The partition that the message should be written to.
- # Default: nil
- #
- # partition_key: [String]
- # The key that should be used to assign a partition.
- # Default: nil
- #
- # key: [String]
- # The message key.
- # Default: nil
- #
- # connect_timeout: [Integer]
- # The timeout setting for connecting to brokers.
- # Default: nil
- #
- # socket_timeout: [Integer]
- # The timeout setting for socket connections.
- # Default: nil
- #
- # ssl_ca_cert: [String, Array<String>]
- # A PEM encoded CA cert, or an Array of PEM encoded CA certs, to use with a SSL connection.
- # Default: nil
- #
- # ssl_client_cert: [String]
- # A PEM encoded client cert to use with a SSL connection.
- # Must be used in combination with ssl_client_cert_key.
- # Default: nil
- #
- # ssl_client_cert_key [String]
- # A PEM encoded client cert key to use with a SSL connection.
- # Must be used in combination with ssl_client_cert.
- # Default: nil
- #
- # delivery_threshold: [Integer]
- # Number of messages between triggering a delivery of messages to Apache Kafka.
- # Default: 100
- #
- # delivery_interval: [Integer]
- # Number of seconds between triggering a delivery of messages to Apache Kafka.
- # Default: 5
- #
- # Semantic Logger Parameters:
- #
- # level: [:trace | :debug | :info | :warn | :error | :fatal]
- # Override the log level for this appender.
- # Default: SemanticLogger.default_level
- #
- # formatter: [Object|Proc|Symbol|Hash]
- # An instance of a class that implements #call, or a Proc to be used to format
- # the output from this appender
- # Default: :raw_json (See: #call)
- #
- # filter: [Regexp|Proc]
- # RegExp: Only include log messages where the class name matches the supplied.
- # regular expression. All other messages will be ignored.
- # Proc: Only include log messages where the supplied Proc returns true
- # The Proc must return true or false.
- #
- # host: [String]
- # Name of this host to appear in log messages.
- # Default: SemanticLogger.host
- #
- # application: [String]
- # Name of this application to appear in log messages.
- # Default: SemanticLogger.application
- def initialize(seed_brokers:, client_id: 'semantic-logger', connect_timeout: nil, socket_timeout: nil,
- ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil,
- topic: 'log_messages', partition: nil, partition_key: nil, key: nil,
- delivery_threshold: 100, delivery_interval: 10,
- level: nil, formatter: nil, filter: nil, application: nil, host: nil, &block)
+ # Send log messages to Kafka in JSON format.
+ #
+ # Kafka Parameters:
+ #
+ # seed_brokers: [Array<String>, String]
+ # The list of brokers used to initialize the client. Either an Array of connections,
+ # or a comma separated string of connections.
+ # Connections can either be a string of "port:protocol" or a full URI with a scheme.
+ # If there's a scheme it's ignored and only host/port are used.
+ #
+ # client_id: [String]
+ # The identifier for this application.
+ # Default: semantic-logger
+ #
+ # topic: [String]
+ # Topic to publish log messages to.
+ # Default: 'log_messages'
+ #
+ # partition: [Integer]
+ # The partition that the message should be written to.
+ # Default: nil
+ #
+ # partition_key: [String]
+ # The key that should be used to assign a partition.
+ # Default: nil
+ #
+ # key: [String]
+ # The message key.
+ # Default: nil
+ #
+ # connect_timeout: [Integer]
+ # The timeout setting for connecting to brokers.
+ # Default: nil
+ #
+ # socket_timeout: [Integer]
+ # The timeout setting for socket connections.
+ # Default: nil
+ #
+ # ssl_ca_cert: [String, Array<String>]
+ # A PEM encoded CA cert, or an Array of PEM encoded CA certs, to use with a SSL connection.
+ # Default: nil
+ #
+ # ssl_client_cert: [String]
+ # A PEM encoded client cert to use with a SSL connection.
+ # Must be used in combination with ssl_client_cert_key.
+ # Default: nil
+ #
+ # ssl_client_cert_key [String]
+ # A PEM encoded client cert key to use with a SSL connection.
+ # Must be used in combination with ssl_client_cert.
+ # Default: nil
+ #
+ # delivery_threshold: [Integer]
+ # Number of messages between triggering a delivery of messages to Apache Kafka.
+ # Default: 100
+ #
+ # delivery_interval: [Integer]
+ # Number of seconds between triggering a delivery of messages to Apache Kafka.
+ # Default: 5
+ #
+ # Semantic Logger Parameters:
+ #
+ # level: [:trace | :debug | :info | :warn | :error | :fatal]
+ # Override the log level for this appender.
+ # Default: SemanticLogger.default_level
+ #
+ # formatter: [Object|Proc|Symbol|Hash]
+ # An instance of a class that implements #call, or a Proc to be used to format
+ # the output from this appender
+ # Default: :raw_json (See: #call)
+ #
+ # filter: [Regexp|Proc]
+ # RegExp: Only include log messages where the class name matches the supplied.
+ # regular expression. All other messages will be ignored.
+ # Proc: Only include log messages where the supplied Proc returns true
+ # The Proc must return true or false.
+ #
+ # host: [String]
+ # Name of this host to appear in log messages.
+ # Default: SemanticLogger.host
+ #
+ # application: [String]
+ # Name of this application to appear in log messages.
+ # Default: SemanticLogger.application
+ def initialize(seed_brokers:, client_id: 'semantic-logger', connect_timeout: nil, socket_timeout: nil,
+ ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil,
+ topic: 'log_messages', partition: nil, partition_key: nil, key: nil,
+ delivery_threshold: 100, delivery_interval: 10,
+ level: nil, formatter: nil, filter: nil, application: nil, host: nil, &block)
- @seed_brokers = seed_brokers
- @client_id = @client_id
- @connect_timeout = connect_timeout
- @socket_timeout = socket_timeout
- @ssl_ca_cert = ssl_ca_cert
- @ssl_client_cert = ssl_client_cert
- @ssl_client_cert_key = ssl_client_cert_key
- @topic = topic
- @partition = partition
- @partition_key = partition_key
- @key = key
- @delivery_threshold = delivery_threshold
- @delivery_interval = delivery_interval
+ @seed_brokers = seed_brokers
+ @client_id = client_id
+ @connect_timeout = connect_timeout
+ @socket_timeout = socket_timeout
+ @ssl_ca_cert = ssl_ca_cert
+ @ssl_client_cert = ssl_client_cert
+ @ssl_client_cert_key = ssl_client_cert_key
+ @topic = topic
+ @partition = partition
+ @partition_key = partition_key
+ @key = key
+ @delivery_threshold = delivery_threshold
+ @delivery_interval = delivery_interval
- super(level: level, formatter: formatter, filter: filter, application: application, host: host, &block)
- reopen
- end
+ super(level: level, formatter: formatter, filter: filter, application: application, host: host, &block)
+ reopen
+ end
- def reopen
- @kafka = ::Kafka.new(
- seed_brokers: seed_brokers,
- client_id: client_id,
- connect_timeout: connect_timeout,
- socket_timeout: socket_timeout,
- ssl_ca_cert: ssl_ca_cert,
- ssl_client_cert: ssl_client_cert,
- ssl_client_cert_key: ssl_client_cert_key,
- logger: logger
- )
+ def reopen
+ @kafka = ::Kafka.new(
+ seed_brokers: seed_brokers,
+ client_id: client_id,
+ connect_timeout: connect_timeout,
+ socket_timeout: socket_timeout,
+ ssl_ca_cert: ssl_ca_cert,
+ ssl_client_cert: ssl_client_cert,
+ ssl_client_cert_key: ssl_client_cert_key,
+ logger: logger
+ )
- @producer = @kafka.async_producer(
- delivery_threshold: delivery_threshold,
- delivery_interval: delivery_interval
- )
- end
+ @producer = @kafka.async_producer(
+ delivery_threshold: delivery_threshold,
+ delivery_interval: delivery_interval
+ )
+ end
- def close
- @producer.shutdown if @producer
- @producer = nil
- @kafka.close if @kafka
- @kafka = nil
- end
+ def close
+ @producer&.shutdown
+ @producer = nil
+ @kafka&.close
+ @kafka = nil
+ end
- # Forward log messages to Kafka producer thread.
- def log(log)
- json = formatter.call(log, self)
- @producer.produce(json, topic: topic, partition: partition, partition_key: partition_key, key: key)
- end
+ # Forward log messages to Kafka producer thread.
+ def log(log)
+ json = formatter.call(log, self)
+ @producer.produce(json, topic: topic, partition: partition, partition_key: partition_key, key: key)
+ end
- # Use JSON Formatter by default.
- def default_formatter
- SemanticLogger::Formatters::Json.new
- end
+ # Use JSON Formatter by default.
+ def default_formatter
+ SemanticLogger::Formatters::Json.new
+ end
- # Restart producer thread since there is no other way to flush.
- def flush
- @producer.shutdown
- @producer = @kafka.async_producer(
- delivery_threshold: delivery_threshold,
- delivery_interval: delivery_interval
- )
+ # Restart producer thread since there is no other way to flush.
+ def flush
+ @producer.shutdown
+ @producer = @kafka.async_producer(
+ delivery_threshold: delivery_threshold,
+ delivery_interval: delivery_interval
+ )
+ end
+ end
end
-
end