lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.17.4 vs lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.17.5
- old
+ new
@@ -47,10 +47,12 @@
config_param :use_event_time, :bool, :default => false, :desc => 'Use fluentd event time for kafka create_time'
config_param :headers, :hash, default: {}, symbolize_keys: true, value_type: :string,
:desc => 'Kafka message headers'
config_param :headers_from_record, :hash, default: {}, symbolize_keys: true, value_type: :string,
:desc => 'Kafka message headers where the header value is a jsonpath to a record value'
+ config_param :resolve_seed_brokers, :bool, :default => false,
+ :desc => "support brokers' hostname with multiple addresses"
config_param :get_kafka_client_log, :bool, :default => false
config_param :ignore_exceptions, :array, :default => [], value_type: :string, :desc => "Ignorable exception list"
config_param :exception_backup, :bool, :default => true, :desc => "Chunk backup flag when ignore exception occured"
@@ -101,22 +103,22 @@
logger = @get_kafka_client_log ? log : nil
if @scram_mechanism != nil && @username != nil && @password != nil
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
- sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname,
+ sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname, resolve_seed_brokers: @resolve_seed_brokers,
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
elsif @username != nil && @password != nil
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl,
- ssl_verify_hostname: @ssl_verify_hostname,
+ ssl_verify_hostname: @ssl_verify_hostname, resolve_seed_brokers: @resolve_seed_brokers,
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
else
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert_file_path: @ssl_ca_cert,
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, sasl_over_ssl: @sasl_over_ssl,
- ssl_verify_hostname: @ssl_verify_hostname,
+ ssl_verify_hostname: @ssl_verify_hostname, resolve_seed_brokers: @resolve_seed_brokers,
partitioner: Kafka::Partitioner.new(hash_function: @partitioner_hash_function))
end
log.info "initialized kafka producer: #{@client_id}"
rescue Exception => e
if raise_error # During startup, error should be reported to engine and stop its phase for safety.