lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.9.6 vs lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.10.0

- old
+ new

@@ -53,13 +53,13 @@ config_param :ack_timeout, :time, :default => nil, :desc => "How long the producer waits for acks." config_param :compression_codec, :string, :default => nil, :desc => <<-DESC The codec the producer uses to compress messages. -Supported codecs: (gzip|snappy) +Supported codecs depends on ruby-kafka: https://github.com/zendesk/ruby-kafka#compression DESC - + config_param :max_send_limit_bytes, :size, :default => nil config_param :active_support_notification_regex, :string, :default => nil, :desc => <<-DESC Add a regular expression to capture ActiveSupport notifications from the Kafka client requires activesupport gem - records will be generated under fluent_kafka_stats.** DESC @@ -85,19 +85,21 @@ logger = @get_kafka_client_log ? log : nil if @scram_mechanism != nil && @username != nil && @password != nil @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert), ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain), ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password, - sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl) + sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname) elsif @username != nil && @password != nil @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert), ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain), - ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl) + ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl, + ssl_verify_hostname: @ssl_verify_hostname) else @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert), ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain), - ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, sasl_over_ssl: @sasl_over_ssl) + ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, sasl_over_ssl: @sasl_over_ssl, + ssl_verify_hostname: @ssl_verify_hostname) end log.info "initialized kafka producer: #{@client_id}" rescue Exception => e if raise_error # During startup, error should be reported to engine and stop its phase for safety. raise e @@ -212,9 +214,14 @@ partition_key = (@exclude_partition_key ? record.delete(@partition_key_key) : record[@partition_key_key]) || @default_partition_key partition = (@exclude_partition ? record.delete(@partition_key) : record[@partition_key]) || @default_partition message_key = (@exclude_message_key ? record.delete(@message_key_key) : record[@message_key_key]) || @default_message_key record_buf = @formatter_proc.call(tag, time, record) + record_buf_bytes = record_buf.bytesize + if @max_send_limit_bytes && record_buf_bytes > @max_send_limit_bytes + log.warn "record size exceeds max_send_limit_bytes. Skip event:", :time => time, :record => record + next + end rescue StandardError => e log.warn "unexpected error during format record. Skip broken event:", :error => e.to_s, :error_class => e.class.to_s, :time => time, :record => record next end