lib/semantic_logger/appender/kafka.rb in semantic_logger-4.10.0 vs lib/semantic_logger/appender/kafka.rb in semantic_logger-4.11.0
- old
+ new
@@ -21,11 +21,11 @@
# )
module SemanticLogger
module Appender
class Kafka < SemanticLogger::Subscriber
attr_accessor :seed_brokers, :client_id, :connect_timeout, :socket_timeout,
- :ssl_ca_cert, :ssl_client_cert, :ssl_client_cert_key,
+ :ssl_ca_cert, :ssl_client_cert, :ssl_client_cert_key, :ssl_ca_certs_from_system,
:delivery_threshold, :delivery_interval,
:topic, :partition, :partition_key, :key
# Send log messages to Kafka in JSON format.
#
@@ -77,10 +77,13 @@
# ssl_client_cert_key [String]
# A PEM encoded client cert key to use with a SSL connection.
# Must be used in combination with ssl_client_cert.
# Default: nil
#
+ # ssl_ca_certs_from_system: [boolean]
+ # Delegate SSL CA cert to the system certs
+ #
# delivery_threshold: [Integer]
# Number of messages between triggering a delivery of messages to Apache Kafka.
# Default: 100
#
# delivery_interval: [Integer]
@@ -114,42 +117,44 @@
#
# metrics: [Boolean]
# Send metrics only events to kafka.
# Default: true
def initialize(seed_brokers:, client_id: "semantic-logger", connect_timeout: nil, socket_timeout: nil,
- ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil,
+ ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil, ssl_ca_certs_from_system: false,
topic: "log_messages", partition: nil, partition_key: nil, key: nil,
delivery_threshold: 100, delivery_interval: 10,
metrics: true, **args, &block)
- @seed_brokers = seed_brokers
- @client_id = client_id
- @connect_timeout = connect_timeout
- @socket_timeout = socket_timeout
- @ssl_ca_cert = ssl_ca_cert
- @ssl_client_cert = ssl_client_cert
- @ssl_client_cert_key = ssl_client_cert_key
- @topic = topic
- @partition = partition
- @partition_key = partition_key
- @key = key
- @delivery_threshold = delivery_threshold
- @delivery_interval = delivery_interval
+ @seed_brokers = seed_brokers
+ @client_id = client_id
+ @connect_timeout = connect_timeout
+ @socket_timeout = socket_timeout
+ @ssl_ca_cert = ssl_ca_cert
+ @ssl_client_cert = ssl_client_cert
+ @ssl_client_cert_key = ssl_client_cert_key
+ @ssl_ca_certs_from_system = ssl_ca_certs_from_system
+ @topic = topic
+ @partition = partition
+ @partition_key = partition_key
+ @key = key
+ @delivery_threshold = delivery_threshold
+ @delivery_interval = delivery_interval
super(metrics: metrics, **args, &block)
reopen
end
def reopen
@kafka = ::Kafka.new(
- seed_brokers: seed_brokers,
- client_id: client_id,
- connect_timeout: connect_timeout,
- socket_timeout: socket_timeout,
- ssl_ca_cert: ssl_ca_cert,
- ssl_client_cert: ssl_client_cert,
- ssl_client_cert_key: ssl_client_cert_key,
- logger: logger
+ seed_brokers: seed_brokers,
+ client_id: client_id,
+ connect_timeout: connect_timeout,
+ socket_timeout: socket_timeout,
+ ssl_ca_cert: ssl_ca_cert,
+ ssl_client_cert: ssl_client_cert,
+ ssl_client_cert_key: ssl_client_cert_key,
+ ssl_ca_certs_from_system: ssl_ca_certs_from_system,
+ logger: logger
)
@producer = @kafka.async_producer(
delivery_threshold: delivery_threshold,
delivery_interval: delivery_interval