lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.6.6 vs lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.7.0

- old
+ new

@@ -72,13 +72,23 @@ end def refresh_client(raise_error = true) begin logger = @get_kafka_client_log ? log : nil - @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert), - ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), - sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab) + if @scram_mechanism != nil && @username != nil && @password != nil + @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert), + ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), + sasl_scram_username: @username, sasl_scram_password: @password, sasl_scram_mechanism: @scram_mechanism) + elsif @username != nil && @password != nil + @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert), + ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), + sasl_plain_username: @username, sasl_plain_password: @password) + else + @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert), + ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), + sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab) + end log.info "initialized kafka producer: #{@client_id}" rescue Exception => e if raise_error # During startup, error should be reported to engine and stop its phase for safety. raise e else @@ -104,14 +114,14 @@ raise Fluent::ConfigError, "format/@type is required." end @formatter_proc = setup_formatter(formatter_conf) if @default_topic.nil? - if @chunk_keys.include?('topic') && !@chunk_keys.include?('tag') + if @chunk_keys.include?('topic') && !@chunk_key_tag log.warn "Use 'topic' field of event record for topic but no fallback. Recommend to set default_topic or set 'tag' in buffer chunk keys like <buffer topic,tag>" end else - if @chunk_keys.include?('tag') + if @chunk_key_tag log.warn "default_topic is set. Fluentd's event tag is not used for topic" end end @producer_opts = {max_retries: @max_send_retries, required_acks: @required_acks}