lib/kafka/producer.rb in ruby-kafka-0.3.1 vs lib/kafka/producer.rb in ruby-kafka-0.3.2

- old
+ new

@@ -179,16 +179,19 @@ # @param partition_key [String] the key that should be used to assign a partition. # # @raise [BufferOverflow] if the maximum buffer size has been reached. # @return [nil] def produce(value, key: nil, topic:, partition: nil, partition_key: nil) + create_time = Time.now + message = PendingMessage.new( value: value, key: key, topic: topic, partition: partition, partition_key: partition_key, + create_time: create_time, ) if buffer_size >= @max_buffer_size raise BufferOverflow, "Max buffer size (#{@max_buffer_size} messages) exceeded" end @@ -202,10 +205,11 @@ Instrumentation.instrument("produce_message.producer.kafka", { value: value, key: key, topic: topic, + create_time: create_time, buffer_size: buffer_size, max_buffer_size: @max_buffer_size, }) nil @@ -320,9 +324,10 @@ @buffer.write( value: message.value, key: message.key, topic: message.topic, partition: partition, + create_time: message.create_time, ) end rescue Kafka::Error => e @logger.error "Failed to assign pending message to a partition: #{e}" @cluster.mark_as_stale!