lib/kafka/producer.rb in ruby-kafka-0.3.8 vs lib/kafka/producer.rb in ruby-kafka-0.3.9

- old
+ new

@@ -132,11 +132,11 @@ def initialize(cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:) @cluster = cluster @logger = logger @instrumenter = instrumenter - @required_acks = required_acks + @required_acks = required_acks == :all ? -1 : required_acks @ack_timeout = ack_timeout @max_retries = max_retries @retry_backoff = retry_backoff @max_buffer_size = max_buffer_size @max_buffer_bytesize = max_buffer_bytesize @@ -183,15 +183,16 @@ # @return [nil] def produce(value, key: nil, topic:, partition: nil, partition_key: nil) create_time = Time.now message = PendingMessage.new( - value: value, - key: key, - topic: topic, - partition: partition, - partition_key: partition_key, - create_time: create_time, + value, + key, + topic, + partition, + partition_key, + create_time, + key.to_s.bytesize + value.to_s.bytesize ) if buffer_size >= @max_buffer_size buffer_overflow topic, "Max buffer size (#{@max_buffer_size} messages) exceeded" end