lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.9.6 vs lib/fluent/plugin/out_kafka2.rb in fluent-plugin-kafka-0.10.0
- old
+ new
@@ -53,13 +53,13 @@
config_param :ack_timeout, :time, :default => nil,
:desc => "How long the producer waits for acks."
config_param :compression_codec, :string, :default => nil,
:desc => <<-DESC
The codec the producer uses to compress messages.
-Supported codecs: (gzip|snappy)
+Supported codecs depends on ruby-kafka: https://github.com/zendesk/ruby-kafka#compression
DESC
-
+ config_param :max_send_limit_bytes, :size, :default => nil
config_param :active_support_notification_regex, :string, :default => nil,
:desc => <<-DESC
Add a regular expression to capture ActiveSupport notifications from the Kafka client
requires activesupport gem - records will be generated under fluent_kafka_stats.**
DESC
@@ -85,19 +85,21 @@
logger = @get_kafka_client_log ? log : nil
if @scram_mechanism != nil && @username != nil && @password != nil
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
- sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl)
+ sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
elsif @username != nil && @password != nil
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
- ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl)
+ ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl,
+ ssl_verify_hostname: @ssl_verify_hostname)
else
@kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
- ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, sasl_over_ssl: @sasl_over_ssl)
+ ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, sasl_over_ssl: @sasl_over_ssl,
+ ssl_verify_hostname: @ssl_verify_hostname)
end
log.info "initialized kafka producer: #{@client_id}"
rescue Exception => e
if raise_error # During startup, error should be reported to engine and stop its phase for safety.
raise e
@@ -212,9 +214,14 @@
partition_key = (@exclude_partition_key ? record.delete(@partition_key_key) : record[@partition_key_key]) || @default_partition_key
partition = (@exclude_partition ? record.delete(@partition_key) : record[@partition_key]) || @default_partition
message_key = (@exclude_message_key ? record.delete(@message_key_key) : record[@message_key_key]) || @default_message_key
record_buf = @formatter_proc.call(tag, time, record)
+ record_buf_bytes = record_buf.bytesize
+ if @max_send_limit_bytes && record_buf_bytes > @max_send_limit_bytes
+ log.warn "record size exceeds max_send_limit_bytes. Skip event:", :time => time, :record => record
+ next
+ end
rescue StandardError => e
log.warn "unexpected error during format record. Skip broken event:", :error => e.to_s, :error_class => e.class.to_s, :time => time, :record => record
next
end