lib/fluent/plugin/out_kafka_buffered.rb in roched-fluent-plugin-kafka-0.6.14 vs lib/fluent/plugin/out_kafka_buffered.rb in roched-fluent-plugin-kafka-0.6.15
- old
+ new
@@ -110,12 +110,26 @@
log.info "brokers has been refreshed via Zookeeper: #{@seed_brokers}"
end
begin
if @seed_brokers.length > 0
logger = @get_kafka_client_log ? log : nil
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
- ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
- sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab)
+ log.warn "user #{@username} pwd #{@password}"
+ if @scram_mechanism != nil && @username != nil && @password != nil
+ log.warn "using scram authentication"
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
+ sasl_scram_username: @username, sasl_scram_password: @password, sasl_scram_mechanism: @scram_mechanism)
+ elsif @username != nil && @password != nil
+ log.warn "using plain authentication"
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
+ sasl_plain_username: @username, sasl_plain_password: @password)
+ else
+ log.warn "no auth"
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
+ sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab)
+ end
log.info "initialized kafka producer: #{@client_id}"
else
log.warn "No brokers found on Zookeeper"
end
rescue Exception => e