lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.7.6 vs lib/fluent/plugin/in_kafka.rb in fluent-plugin-kafka-0.7.7
- old
+ new
@@ -171,14 +171,24 @@
opt = {}
opt[:max_bytes] = @max_bytes if @max_bytes
opt[:max_wait_time] = @max_wait_time if @max_wait_time
opt[:min_bytes] = @min_bytes if @min_bytes
- @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id,
- ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
- ssl_client_cert: read_ssl_file(@ssl_client_cert),
- ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
- sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab)
+ if @scram_mechanism != nil && @username != nil && @password != nil
+ @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
+ ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
+ sasl_scram_mechanism: @scram_mechanism)
+ elsif @username != nil && @password != nil
+ @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
+ ssl_ca_certs_from_system: @ssl_ca_certs_from_system,sasl_plain_username: @username, sasl_plain_password: @password)
+ else
+ @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
+ ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab)
+ end
+
@zookeeper = Zookeeper.new(@offset_zookeeper) if @offset_zookeeper
@topic_watchers = @topic_list.map {|topic_entry|
offset_manager = OffsetManager.new(topic_entry, @zookeeper, @offset_zk_root_node) if @offset_zookeeper
TopicWatcher.new(