lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.6.6 vs lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.7.0
- old
+ new
@@ -72,13 +72,23 @@
end
def refresh_client(raise_error = true)
begin
logger = @get_kafka_client_log ? log : nil
- @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
- ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
- sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab)
+ if @scram_mechanism != nil && @username != nil && @password != nil
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
+ sasl_scram_username: @username, sasl_scram_password: @password, sasl_scram_mechanism: @scram_mechanism)
+ elsif @username != nil && @password != nil
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
+ sasl_plain_username: @username, sasl_plain_password: @password)
+ else
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
+ sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab)
+ end
log.info "initialized kafka producer: #{@client_id}"
rescue Exception => e
if raise_error # During startup, error should be reported to engine and stop its phase for safety.
raise e
else
@@ -104,14 +114,14 @@
raise Fluent::ConfigError, "format/@type is required."
end
@formatter_proc = setup_formatter(formatter_conf)
if @default_topic.nil?
- if @chunk_keys.include?('topic') && !@chunk_keys.include?('tag')
+ if @chunk_keys.include?('topic') && !@chunk_key_tag
log.warn "Use 'topic' field of event record for topic but no fallback. Recommend to set default_topic or set 'tag' in buffer chunk keys like <buffer topic,tag>"
end
else
- if @chunk_keys.include?('tag')
+ if @chunk_key_tag
log.warn "default_topic is set. Fluentd's event tag is not used for topic"
end
end
@producer_opts = {max_retries: @max_send_retries, required_acks: @required_acks}