lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.10.0 vs lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.11.0
- old
+ new
@@ -34,10 +34,11 @@
config_param :offset_zookeeper, :string, :default => nil
config_param :offset_zk_root_node, :string, :default => '/fluent-plugin-kafka'
config_param :use_record_time, :bool, :default => false,
:desc => "Replace message timestamp with contents of 'time' field."
+ config_param :get_kafka_client_log, :bool, :default => false
config_param :time_format, :string, :default => nil,
:desc => "Time format to be used to parse 'time' filed."
config_param :kafka_message_key, :string, :default => nil,
:desc => "Set kafka's message key to this field"
@@ -173,20 +174,21 @@
opt = {}
opt[:max_bytes] = @max_bytes if @max_bytes
opt[:max_wait_time] = @max_wait_time if @max_wait_time
opt[:min_bytes] = @min_bytes if @min_bytes
+ logger = @get_kafka_client_log ? log : nil
if @scram_mechanism != nil && @username != nil && @password != nil
- @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: log, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl)
elsif @username != nil && @password != nil
- @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: log, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
ssl_ca_certs_from_system: @ssl_ca_certs_from_system,sasl_plain_username: @username, sasl_plain_password: @password)
else
- @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: log, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab)
end
@zookeeper = Zookeeper.new(@offset_zookeeper) if @offset_zookeeper