lib/kafka/producer.rb in ruby-kafka-0.3.1 vs lib/kafka/producer.rb in ruby-kafka-0.3.2
- old
+ new
@@ -179,16 +179,19 @@
# @param partition_key [String] the key that should be used to assign a partition.
#
# @raise [BufferOverflow] if the maximum buffer size has been reached.
# @return [nil]
def produce(value, key: nil, topic:, partition: nil, partition_key: nil)
+ create_time = Time.now
+
message = PendingMessage.new(
value: value,
key: key,
topic: topic,
partition: partition,
partition_key: partition_key,
+ create_time: create_time,
)
if buffer_size >= @max_buffer_size
raise BufferOverflow, "Max buffer size (#{@max_buffer_size} messages) exceeded"
end
@@ -202,10 +205,11 @@
Instrumentation.instrument("produce_message.producer.kafka", {
value: value,
key: key,
topic: topic,
+ create_time: create_time,
buffer_size: buffer_size,
max_buffer_size: @max_buffer_size,
})
nil
@@ -320,9 +324,10 @@
@buffer.write(
value: message.value,
key: message.key,
topic: message.topic,
partition: partition,
+ create_time: message.create_time,
)
end
rescue Kafka::Error => e
@logger.error "Failed to assign pending message to a partition: #{e}"
@cluster.mark_as_stale!